Skip to content

Add DynamoDB Streams event source emulator#2356

Draft
normj wants to merge 2 commits intodevfrom
normj/lambda-ddbstreams
Draft

Add DynamoDB Streams event source emulator#2356
normj wants to merge 2 commits intodevfrom
normj/lambda-ddbstreams

Conversation

@normj
Copy link
Copy Markdown
Member

@normj normj commented May 6, 2026

Summary

Adds DynamoDB Streams event source emulation to the Lambda Test Tool, enabling local testing of Lambda functions triggered by DynamoDB Streams without deploying to AWS.

Design Decisions

Architecture: Follows the SQS Event Source Pattern

The implementation mirrors the existing SQS event source poller architecture with three layers:

  1. Config model (DynamoDBStreamsEventSourceConfig) — User-facing input with optional fields (TableName, BatchSize, Profile, Region, FunctionName, LambdaRuntimeApi)
  2. Process orchestrator (DynamoDBStreamsEventSourceProcess) — Parses config, constructs DI container, starts hosted services
  3. Background service (DynamoDBStreamsEventSourceBackgroundService) — Long-running IHostedService that polls the stream and invokes the Lambda function

Config Input Formats

Supports three input formats (same as SQS):

  • JSON object{"TableName": "my-table", "BatchSize": 50}
  • JSON array — Multiple event sources in one config
  • Key-value pairsTableName=my-table,BatchSize=50

Config can also be a file path. Values prefixed with env: are resolved from environment variables at startup.

Stream Polling Strategy

  • Uses ShardIteratorType.LATEST — only processes new records, not historical data. This matches the typical local development use case where you want to see changes as they happen.
  • Polls all shards in the stream concurrently within a single loop iteration.
  • Automatically discovers the stream ARN from the table name via ListStreams.
  • Re-discovers shards when all current shard iterators are exhausted (handles shard splits/merges).
  • 1-second backoff when no records are found; 5-second retry when stream is not yet available.

Record Conversion

Converts from Amazon.DynamoDBv2.Model.Record (SDK type) to DynamoDBEvent.DynamodbStreamRecord (Lambda event type). This is necessary because the DynamoDB Streams SDK and the Lambda event model use different type hierarchies for the same data:

  • Maps all AttributeValue types recursively (S, N, B, BOOL, NULL, SS, NS, BS, L, M)
  • Preserves Keys, NewImage, OldImage from the StreamRecord
  • Sets EventSource to "aws:dynamodb" and extracts region from the stream ARN
  • Includes UserIdentity when present

CLI Integration

New --dynamodbstreams-eventsource-config option on RunCommand, parallel to the existing --sqs-eventsource-config. Validated at startup — if provided, the Lambda emulator port must also be configured.

Defaults

  • BatchSize: 100 (matches AWS Lambda default for DynamoDB Streams triggers)
  • FunctionName: Falls back to LambdaRuntimeApi.DefaultFunctionName if not specified
  • LambdaRuntimeApi: Auto-constructed from the test tool's own emulator host/port if not specified

Testing

Unit tests cover:

  • Config parsing for all three formats (JSON object, JSON array, key-value pairs)
  • Record conversion including nested attribute types, user identity, and stream metadata

@normj normj changed the base branch from master to dev May 6, 2026 06:20
@normj normj changed the base branch from dev to master May 6, 2026 06:22
Implement DynamoDB Streams polling following the existing SQS event source pattern.
The emulator polls DynamoDB Streams for records, converts them to Lambda
DynamoDBEvent format, and invokes the connected Lambda function via the
emulated runtime API.

New CLI option: --dynamodbstreams-eventsource-config
Config format: TableName=X,FunctionName=Y,LambdaRuntimeApi=Z,BatchSize=N,Profile=P,Region=R
Supports env: prefix for environment variable indirection.

New files:
- DynamoDBStreamsEventSourceConfig.cs
- DynamoDBStreamsEventSourceBackgroundServiceConfig.cs
- DynamoDBStreamsEventSourceProcess.cs
- DynamoDBStreamsEventSourceBackgroundService.cs

Unit tests:
- ParseDynamoDBStreamsEventSourceConfigTests.cs
- ConvertDynamoDBStreamsRecordTests.cs
@normj normj force-pushed the normj/lambda-ddbstreams branch from 59cbb35 to 7ba1e7a Compare May 6, 2026 06:33
@normj normj changed the base branch from master to dev May 6, 2026 06:33
@normj normj requested a review from Copilot May 6, 2026 06:40
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds DynamoDB Streams event source emulation to the Lambda Test Tool v2 so developers can locally invoke Lambda functions from DynamoDB Streams without deploying AWS infrastructure, following the existing “event source poller” pattern used for SQS.

Changes:

  • Introduces a new --dynamodbstreams-eventsource-config CLI option and wiring in RunCommand.
  • Adds DynamoDB Streams event source process/config types and a background poller that reads stream records and invokes the Lambda runtime API.
  • Adds unit tests for config parsing and record conversion; updates project dependencies for DynamoDB Streams + Lambda event models.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs Adds unit tests for DynamoDB Streams event source config parsing (JSON + key/value).
Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs Adds unit tests for converting DynamoDB Streams SDK records into DynamoDBEvent records.
Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj Adds test dependencies for DynamoDB Streams and Lambda DynamoDB event types.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs Adds process/orchestrator to parse config, configure AWS client, and start hosted poller service(s).
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs Adds user-facing config model for DynamoDB Streams event source inputs.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs Adds DI config object for the poller background service.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs Implements stream discovery/polling, Lambda invocation, and SDK→Lambda record conversion.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs Adds CLI option + help text for --dynamodbstreams-eventsource-config.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs Wires the DynamoDB Streams event source process into tool startup and env-var evaluation.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj Adds runtime dependencies for DynamoDB Streams and Lambda DynamoDB event types.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +101 to +106
for (int i = 0; i < shardIterators.Count; i++)
{
if (stoppingToken.IsCancellationRequested)
return;

var iterator = shardIterators[i];
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Replaced the sequential loop with Task.WhenAll — each shard now gets polled concurrently via a PollShard helper method, and results are processed after all complete.

Comment on lines +148 to +156
// Check for new shards periodically
if (shardIterators.All(s => s == null))
{
shardIterators = await GetShardIterators(streamArn, stoppingToken);
if (shardIterators.Count == 0)
{
await Task.Delay(1000, stoppingToken);
}
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Now re-discovers shards when any iterator becomes null (not just when all are exhausted). New shards are merged with still-active iterators so no records are missed during splits.

Comment on lines +167 to +174
var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest
{
StreamArn = streamArn
}, stoppingToken);

var iterators = new List<string?>();

foreach (var shard in describeResponse.StreamDescription.Shards)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. GetShardIterators now loops on DescribeStream until LastEvaluatedShardId is null, collecting all shards across pages before requesting iterators.

Comment on lines +286 to +288
if (sdkValue.L?.Count > 0)
lambdaValue.L = sdkValue.L.Select(ConvertAttributeValue).ToList();
if (sdkValue.M?.Count > 0)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Changed the null-checks from ?.Count > 0 to just != null for L, M, SS, NS, and BS — empty collections are now preserved as non-null empty lists/dictionaries. Added a unit test (ConvertRecordWithEmptyListAndMap) to verify.

Comment on lines +286 to +288
if (sdkValue.L?.Count > 0)
lambdaValue.L = sdkValue.L.Select(ConvertAttributeValue).ToList();
if (sdkValue.M?.Count > 0)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added ConvertRecordWithBinaryAttributes test covering both B (single binary) and BS (binary set) conversion, verifying byte content is preserved through the mapping.

Address PR review comments:
- Poll shards concurrently using Task.WhenAll instead of sequential loop
- Re-discover shards when any iterator becomes null (not just when all are null)
- Paginate DescribeStream to handle streams with many shards
- Preserve empty List/Map attribute values (non-null but empty collections)
- Add unit tests for Binary (B), Binary Set (BS), and empty List/Map conversion
@normj normj force-pushed the normj/lambda-ddbstreams branch from 36df4ef to 340bc67 Compare May 6, 2026 06:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants