Skip to content

Add DynamoDB Streams event source emulator#2356

Open
normj wants to merge 23 commits intodevfrom
normj/lambda-ddbstreams
Open

Add DynamoDB Streams event source emulator#2356
normj wants to merge 23 commits intodevfrom
normj/lambda-ddbstreams

Conversation

@normj
Copy link
Copy Markdown
Member

@normj normj commented May 6, 2026

Summary

Adds DynamoDB Streams event source emulation to the Lambda Test Tool, enabling local testing of Lambda functions triggered by DynamoDB Streams without deploying to AWS.

This will be released along with this PR for the Aspire integration.

Design Decisions

Architecture: Follows the SQS Event Source Pattern

The implementation mirrors the existing SQS event source poller architecture with three layers:

  1. Config model (DynamoDBStreamsEventSourceConfig) — User-facing input with optional fields (TableName, BatchSize, Profile, Region, FunctionName, LambdaRuntimeApi, PollingIntervalMs)
  2. Process orchestrator (DynamoDBStreamsEventSourceProcess) — Parses config, constructs DI container, starts hosted services
  3. Background service (DynamoDBStreamsEventSourceBackgroundService) — Long-running IHostedService that polls the stream and invokes the Lambda function

Config Input Formats

Supports three input formats (same as SQS):

  • JSON object{"TableName": "my-table", "BatchSize": 50}
  • JSON array — Multiple event sources in one config
  • Key-value pairsTableName=my-table,BatchSize=50

Config can also be a file path. Values prefixed with env: are resolved from environment variables at startup.

Stream ARN Resolution

  • If TableName starts with arn: and contains /stream/, it is used directly as the stream ARN (no lookup needed)
  • Otherwise, calls DescribeTable to get LatestStreamArn from the table

Stream Polling Strategy

The polling strategy is designed to only deliver records created after the test tool starts, avoiding replay of historical data:

Startup:

  • Calls DescribeStream (paginated) to discover all shards
  • Open shards (no EndingSequenceNumber): polled with LATEST iterator type — only future records are delivered
  • Closed shards at startup: recorded in a closedAtStartup set and never polled — they contain only pre-existing historical data

Ongoing shard discovery:

  • Re-discovery triggers every 30 seconds OR immediately when any active shard is exhausted
  • Existing shard iterators are preserved (maintains read position)
  • Shards in closedAtStartup are always skipped
  • Newly discovered shards (not previously tracked, not in closedAtStartup): polled with TRIM_HORIZON — all records in these shards were created after tool startup, so all are relevant

Concurrency: All active shards are polled concurrently via Task.WhenAll.

Backoff: Configurable polling interval (default 1000ms) between poll cycles when no records are found.

Record Conversion

Converts from Amazon.DynamoDBv2.Model.Record (SDK type) to DynamoDBEvent.DynamodbStreamRecord (Lambda event type). This is necessary because the DynamoDB Streams SDK and the Lambda event model use different type hierarchies for the same data:

  • Maps all AttributeValue types recursively (S, N, B, BOOL, NULL, SS, NS, BS, L, M)
  • Preserves empty Lists and Maps as non-null empty collections
  • Preserves Keys, NewImage, OldImage from the StreamRecord
  • Sets EventSource to "aws:dynamodb" and extracts region from the stream ARN
  • Includes UserIdentity when present (e.g. TTL deletions by DynamoDB service)

CLI Integration

New --dynamodbstreams-eventsource-config option on RunCommand, parallel to the existing --sqs-eventsource-config. Validated at startup — if provided, the Lambda emulator port must also be configured.

Configuration Options

Property Default Description
TableName required DynamoDB table name or full stream ARN
FunctionName default function Lambda function to invoke
BatchSize 100 Max records per GetRecords call
PollingIntervalMs 1000 Delay (ms) between polls when no records found
LambdaRuntimeApi auto Lambda runtime API endpoint
Profile null AWS credentials profile
Region null AWS region

Testing

12 unit tests across 2 test files:

Record Conversion Tests (ConvertDynamoDBStreamsRecordTests.cs)

  • ConvertBasicRecord — Basic INSERT record with keys and new image
  • ConvertRecordWithAllAttributeTypes — S, N, BOOL, NULL, L, M, SS, NS attributes
  • ConvertRecordWithUserIdentity — REMOVE record with TTL UserIdentity
  • ConvertRecordWithBinaryAttributes — B and BS attributes with byte content preservation
  • ConvertRecordWithEmptyListAndMap — Empty List/Map preserved as non-null empty collections
  • ConvertMultipleRecords — Batch conversion via ConvertToLambdaRecords with count/ordering

Config Parsing Tests (ParseDynamoDBStreamsEventSourceConfigTests.cs)

  • ParseValidJsonObject — Single JSON object with all fields
  • ParseInvalidJsonObject — Malformed JSON throws InvalidOperationException
  • ParseValidJsonArray — JSON array of multiple configs
  • ParseInvalidJsonArray — Malformed array throws InvalidOperationException
  • ParseKeyPairs — Key=value format (case-insensitive, whitespace-tolerant)
  • InvalidKeyPairString — Invalid pairs throw InvalidOperationException

@normj normj changed the base branch from master to dev May 6, 2026 06:20
@normj normj changed the base branch from dev to master May 6, 2026 06:22
Implement DynamoDB Streams polling following the existing SQS event source pattern.
The emulator polls DynamoDB Streams for records, converts them to Lambda
DynamoDBEvent format, and invokes the connected Lambda function via the
emulated runtime API.

New CLI option: --dynamodbstreams-eventsource-config
Config format: TableName=X,FunctionName=Y,LambdaRuntimeApi=Z,BatchSize=N,Profile=P,Region=R
Supports env: prefix for environment variable indirection.

New files:
- DynamoDBStreamsEventSourceConfig.cs
- DynamoDBStreamsEventSourceBackgroundServiceConfig.cs
- DynamoDBStreamsEventSourceProcess.cs
- DynamoDBStreamsEventSourceBackgroundService.cs

Unit tests:
- ParseDynamoDBStreamsEventSourceConfigTests.cs
- ConvertDynamoDBStreamsRecordTests.cs
@normj normj force-pushed the normj/lambda-ddbstreams branch from 59cbb35 to 7ba1e7a Compare May 6, 2026 06:33
@normj normj changed the base branch from master to dev May 6, 2026 06:33
@normj normj requested a review from Copilot May 6, 2026 06:40
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds DynamoDB Streams event source emulation to the Lambda Test Tool v2 so developers can locally invoke Lambda functions from DynamoDB Streams without deploying AWS infrastructure, following the existing “event source poller” pattern used for SQS.

Changes:

  • Introduces a new --dynamodbstreams-eventsource-config CLI option and wiring in RunCommand.
  • Adds DynamoDB Streams event source process/config types and a background poller that reads stream records and invokes the Lambda runtime API.
  • Adds unit tests for config parsing and record conversion; updates project dependencies for DynamoDB Streams + Lambda event models.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs Adds unit tests for DynamoDB Streams event source config parsing (JSON + key/value).
Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs Adds unit tests for converting DynamoDB Streams SDK records into DynamoDBEvent records.
Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj Adds test dependencies for DynamoDB Streams and Lambda DynamoDB event types.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs Adds process/orchestrator to parse config, configure AWS client, and start hosted poller service(s).
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs Adds user-facing config model for DynamoDB Streams event source inputs.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs Adds DI config object for the poller background service.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs Implements stream discovery/polling, Lambda invocation, and SDK→Lambda record conversion.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs Adds CLI option + help text for --dynamodbstreams-eventsource-config.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs Wires the DynamoDB Streams event source process into tool startup and env-var evaluation.
Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj Adds runtime dependencies for DynamoDB Streams and Lambda DynamoDB event types.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

normj added 2 commits May 5, 2026 23:47
Address PR review comments:
- Poll shards concurrently using Task.WhenAll instead of sequential loop
- Re-discover shards when any iterator becomes null (not just when all are null)
- Paginate DescribeStream to handle streams with many shards
- Preserve empty List/Map attribute values (non-null but empty collections)
- Add unit tests for Binary (B), Binary Set (BS), and empty List/Map conversion
@normj normj force-pushed the normj/lambda-ddbstreams branch from 36df4ef to 340bc67 Compare May 6, 2026 06:55
normj added 17 commits May 6, 2026 17:36
When the TableName config value is already a stream ARN (starts with
'arn:' and contains '/stream/'), skip the ListStreams lookup and use
it directly.
…re-discovery

- Add ShardIteratorType config (default: LATEST) propagated from Aspire options
- Add PollingIntervalMs config (default: 1000) for tunable poll frequency
- Fix shard re-discovery to only replace exhausted iterators, preserving active ones
- Add key-value parsing for new config options
Reworked GetStreamArnForTable to use the DynamoDB DescribeTable API
instead of the DynamoDB Streams ListStreams API. DescribeTable returns
the LatestStreamArn property which guarantees we get the active stream
for the table rather than potentially picking up a stale stream from
ListStreams results.
Added logging for shard discovery, polling cycles, sleep intervals,
and shard re-discovery events to aid debugging stream processing.
Instead of calling DescribeStream every time any shard iterator becomes
null (which happens frequently on quiet tables as DynamoDB closes idle
shards), simply remove exhausted shards from the active list. Full
re-discovery only happens when ALL shards are exhausted.
…s shard re-discoveries

Track shard iterators by shard ID in a dictionary instead of a positional list.
On shard re-discovery, only create new iterators for newly discovered shards -
existing shards keep their current NextShardIterator position. This prevents
re-processing of already-invoked records when shards are re-discovered after
a shard closes or after empty poll thresholds.
… startup

- Remove ShardIteratorType configuration (now internally managed)
- At startup: use LATEST iterator for open shards, track closed shards to skip
- On re-discovery (every 30s or on shard exhaustion): preserve existing iterators,
  skip shards closed at startup, use TRIM_HORIZON for newly discovered shards
- This ensures only records written after the test tool starts are delivered to Lambda
@normj normj marked this pull request as ready for review May 7, 2026 20:19
@normj normj requested review from a team as code owners May 7, 2026 20:19
@normj normj requested review from GarrettBeatty and philasmar May 7, 2026 20:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants