Open
Conversation
Implement DynamoDB Streams polling following the existing SQS event source pattern. The emulator polls DynamoDB Streams for records, converts them to Lambda DynamoDBEvent format, and invokes the connected Lambda function via the emulated runtime API. New CLI option: --dynamodbstreams-eventsource-config Config format: TableName=X,FunctionName=Y,LambdaRuntimeApi=Z,BatchSize=N,Profile=P,Region=R Supports env: prefix for environment variable indirection. New files: - DynamoDBStreamsEventSourceConfig.cs - DynamoDBStreamsEventSourceBackgroundServiceConfig.cs - DynamoDBStreamsEventSourceProcess.cs - DynamoDBStreamsEventSourceBackgroundService.cs Unit tests: - ParseDynamoDBStreamsEventSourceConfigTests.cs - ConvertDynamoDBStreamsRecordTests.cs
59cbb35 to
7ba1e7a
Compare
There was a problem hiding this comment.
Pull request overview
Adds DynamoDB Streams event source emulation to the Lambda Test Tool v2 so developers can locally invoke Lambda functions from DynamoDB Streams without deploying AWS infrastructure, following the existing “event source poller” pattern used for SQS.
Changes:
- Introduces a new
--dynamodbstreams-eventsource-configCLI option and wiring inRunCommand. - Adds DynamoDB Streams event source process/config types and a background poller that reads stream records and invokes the Lambda runtime API.
- Adds unit tests for config parsing and record conversion; updates project dependencies for DynamoDB Streams + Lambda event models.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs | Adds unit tests for DynamoDB Streams event source config parsing (JSON + key/value). |
| Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs | Adds unit tests for converting DynamoDB Streams SDK records into DynamoDBEvent records. |
| Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj | Adds test dependencies for DynamoDB Streams and Lambda DynamoDB event types. |
| Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs | Adds process/orchestrator to parse config, configure AWS client, and start hosted poller service(s). |
| Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs | Adds user-facing config model for DynamoDB Streams event source inputs. |
| Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs | Adds DI config object for the poller background service. |
| Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs | Implements stream discovery/polling, Lambda invocation, and SDK→Lambda record conversion. |
| Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs | Adds CLI option + help text for --dynamodbstreams-eventsource-config. |
| Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs | Wires the DynamoDB Streams event source process into tool startup and env-var evaluation. |
| Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj | Adds runtime dependencies for DynamoDB Streams and Lambda DynamoDB event types. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address PR review comments: - Poll shards concurrently using Task.WhenAll instead of sequential loop - Re-discover shards when any iterator becomes null (not just when all are null) - Paginate DescribeStream to handle streams with many shards - Preserve empty List/Map attribute values (non-null but empty collections) - Add unit tests for Binary (B), Binary Set (BS), and empty List/Map conversion
36df4ef to
340bc67
Compare
…lambda-dotnet into normj/lambda-ddbstreams
When the TableName config value is already a stream ARN (starts with 'arn:' and contains '/stream/'), skip the ListStreams lookup and use it directly.
…lambda-dotnet into normj/lambda-ddbstreams
…re-discovery - Add ShardIteratorType config (default: LATEST) propagated from Aspire options - Add PollingIntervalMs config (default: 1000) for tunable poll frequency - Fix shard re-discovery to only replace exhausted iterators, preserving active ones - Add key-value parsing for new config options
…lambda-dotnet into normj/lambda-ddbstreams
Reworked GetStreamArnForTable to use the DynamoDB DescribeTable API instead of the DynamoDB Streams ListStreams API. DescribeTable returns the LatestStreamArn property which guarantees we get the active stream for the table rather than potentially picking up a stale stream from ListStreams results.
Added logging for shard discovery, polling cycles, sleep intervals, and shard re-discovery events to aid debugging stream processing.
…lambda-dotnet into normj/lambda-ddbstreams
Instead of calling DescribeStream every time any shard iterator becomes null (which happens frequently on quiet tables as DynamoDB closes idle shards), simply remove exhausted shards from the active list. Full re-discovery only happens when ALL shards are exhausted.
…lambda-dotnet into normj/lambda-ddbstreams
…close, add diagnostic logging
…lambda-dotnet into normj/lambda-ddbstreams
…s shard re-discoveries Track shard iterators by shard ID in a dictionary instead of a positional list. On shard re-discovery, only create new iterators for newly discovered shards - existing shards keep their current NextShardIterator position. This prevents re-processing of already-invoked records when shards are re-discovered after a shard closes or after empty poll thresholds.
…lambda-dotnet into normj/lambda-ddbstreams
… startup - Remove ShardIteratorType configuration (now internally managed) - At startup: use LATEST iterator for open shards, track closed shards to skip - On re-discovery (every 30s or on shard exhaustion): preserve existing iterators, skip shards closed at startup, use TRIM_HORIZON for newly discovered shards - This ensures only records written after the test tool starts are delivered to Lambda
…lambda-dotnet into normj/lambda-ddbstreams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds DynamoDB Streams event source emulation to the Lambda Test Tool, enabling local testing of Lambda functions triggered by DynamoDB Streams without deploying to AWS.
This will be released along with this PR for the Aspire integration.
Design Decisions
Architecture: Follows the SQS Event Source Pattern
The implementation mirrors the existing SQS event source poller architecture with three layers:
DynamoDBStreamsEventSourceConfig) — User-facing input with optional fields (TableName, BatchSize, Profile, Region, FunctionName, LambdaRuntimeApi, PollingIntervalMs)DynamoDBStreamsEventSourceProcess) — Parses config, constructs DI container, starts hosted servicesDynamoDBStreamsEventSourceBackgroundService) — Long-runningIHostedServicethat polls the stream and invokes the Lambda functionConfig Input Formats
Supports three input formats (same as SQS):
{"TableName": "my-table", "BatchSize": 50}TableName=my-table,BatchSize=50Config can also be a file path. Values prefixed with
env:are resolved from environment variables at startup.Stream ARN Resolution
TableNamestarts witharn:and contains/stream/, it is used directly as the stream ARN (no lookup needed)DescribeTableto getLatestStreamArnfrom the tableStream Polling Strategy
The polling strategy is designed to only deliver records created after the test tool starts, avoiding replay of historical data:
Startup:
DescribeStream(paginated) to discover all shardsEndingSequenceNumber): polled withLATESTiterator type — only future records are deliveredclosedAtStartupset and never polled — they contain only pre-existing historical dataOngoing shard discovery:
closedAtStartupare always skippedTRIM_HORIZON— all records in these shards were created after tool startup, so all are relevantConcurrency: All active shards are polled concurrently via
Task.WhenAll.Backoff: Configurable polling interval (default 1000ms) between poll cycles when no records are found.
Record Conversion
Converts from
Amazon.DynamoDBv2.Model.Record(SDK type) toDynamoDBEvent.DynamodbStreamRecord(Lambda event type). This is necessary because the DynamoDB Streams SDK and the Lambda event model use different type hierarchies for the same data:AttributeValuetypes recursively (S, N, B, BOOL, NULL, SS, NS, BS, L, M)Keys,NewImage,OldImagefrom theStreamRecordEventSourceto"aws:dynamodb"and extracts region from the stream ARNUserIdentitywhen present (e.g. TTL deletions by DynamoDB service)CLI Integration
New
--dynamodbstreams-eventsource-configoption onRunCommand, parallel to the existing--sqs-eventsource-config. Validated at startup — if provided, the Lambda emulator port must also be configured.Configuration Options
TableNameFunctionNameBatchSizePollingIntervalMsLambdaRuntimeApiProfileRegionTesting
12 unit tests across 2 test files:
Record Conversion Tests (
ConvertDynamoDBStreamsRecordTests.cs)ConvertBasicRecord— Basic INSERT record with keys and new imageConvertRecordWithAllAttributeTypes— S, N, BOOL, NULL, L, M, SS, NS attributesConvertRecordWithUserIdentity— REMOVE record with TTL UserIdentityConvertRecordWithBinaryAttributes— B and BS attributes with byte content preservationConvertRecordWithEmptyListAndMap— Empty List/Map preserved as non-null empty collectionsConvertMultipleRecords— Batch conversion via ConvertToLambdaRecords with count/orderingConfig Parsing Tests (
ParseDynamoDBStreamsEventSourceConfigTests.cs)ParseValidJsonObject— Single JSON object with all fieldsParseInvalidJsonObject— Malformed JSON throws InvalidOperationExceptionParseValidJsonArray— JSON array of multiple configsParseInvalidJsonArray— Malformed array throws InvalidOperationExceptionParseKeyPairs— Key=value format (case-insensitive, whitespace-tolerant)InvalidKeyPairString— Invalid pairs throw InvalidOperationException