Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7ba1e7a
Add DynamoDB Streams event source emulator to Lambda Test Tool
normj May 6, 2026
36df4ef
Add DevConfig file
normj May 6, 2026
340bc67
Add DynamoDB Streams event source emulator to Lambda Test Tool
normj May 6, 2026
787e1dc
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
ecac169
Support stream ARN as direct input to DynamoDB Streams event source
normj May 6, 2026
b876946
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
db4fd0b
Make ShardIteratorType and PollingIntervalMs configurable, fix shard …
normj May 6, 2026
640a47b
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
2a50358
Use DescribeTable instead of ListStreams to get stream ARN
normj May 6, 2026
f3f69cf
Add logging to DynamoDB Streams background service
normj May 6, 2026
bc2eab6
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
b38b02a
Reduce shard re-discovery frequency for quiet tables
normj May 6, 2026
e644494
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
7eb6740
Fix stream polling: filter to open shards only, re-discover on shard …
normj May 6, 2026
592106d
Change default ShardIteratorType from LATEST to TRIM_HORIZON
normj May 6, 2026
5ff8065
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
5be6cd6
Fix DynamoDB Streams checkpointing - preserve iterator position acros…
normj May 6, 2026
bc88dd7
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
fd43290
Rework DynamoDB Streams polling to only process records created after…
normj May 7, 2026
0799ed0
Add high-level strategy comment to PollStream explaining shard manage…
normj May 7, 2026
0a7f339
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 7, 2026
b32a71a
Logging fixes
normj May 7, 2026
381450b
Merge branch 'dev' into normj/lambda-ddbstreams
normj May 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.TestTool",
"Type": "Minor",
"ChangelogMessages": [
"Add support emulating Lambda DynamoDB Stream event source"
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
<PackageReference Include="AWSSDK.Extensions.NETCore.Setup" Version="4.0.3.22" />
<PackageReference Include="AWSSDK.Lambda" Version="4.0.13.1" />
<PackageReference Include="AWSSDK.SQS" Version="4.0.2.14" />
<PackageReference Include="AWSSDK.DynamoDBv2" Version="4.0.15.2" />
<PackageReference Include="AWSSDK.DynamoDBStreams" Version="4.0.4.17" />
<PackageReference Include="Amazon.Lambda.DynamoDBEvents" Version="3.1.2" />
<PackageReference Include="AWSSDK.SSO" Version="4.0.2.13" />
<PackageReference Include="AWSSDK.SSOOIDC" Version="4.0.3.14" />
<PackageReference Include="Spectre.Console" Version="0.49.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Amazon.Lambda.TestTool.Models;
using Amazon.Lambda.TestTool.Processes;
using Amazon.Lambda.TestTool.Processes.SQSEventSource;
using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
using Amazon.Lambda.TestTool.Services;
using Amazon.Lambda.TestTool.Services.IO;
using Spectre.Console.Cli;
Expand Down Expand Up @@ -39,10 +40,10 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunCommandS
{
EvaluateEnvironmentVariables(settings);

if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig))
if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig) && string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig))
{
throw new ArgumentException("At least one of the following parameters must be set: " +
"--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port or --sqs-eventsource-config");
"--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port, --sqs-eventsource-config or --dynamodbstreams-eventsource-config");
}

var tasks = new List<Task>();
Expand Down Expand Up @@ -87,6 +88,12 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunCommandS
{
var sqsEventSourceProcess = SQSEventSourceProcess.Startup(settings, cancellationTokenSource.Token);
tasks.Add(sqsEventSourceProcess.RunningTask);
}

if (!string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig))
{
var dynamoDBStreamsProcess = DynamoDBStreamsEventSourceProcess.Startup(settings, cancellationTokenSource.Token);
tasks.Add(dynamoDBStreamsProcess.RunningTask);
}

await Task.Run(() => Task.WaitAny(tasks.ToArray(), cancellationTokenSource.Token));
Expand Down Expand Up @@ -184,6 +191,16 @@ private void EvaluateEnvironmentVariables(RunCommandSettings settings)
throw new InvalidOperationException($"Environment variable {envVariable} for the SQS event source config was empty");
}
settings.SQSEventSourceConfig = environmentVariables[envVariable]?.ToString();
}

if (settings.DynamoDBStreamsEventSourceConfig != null && settings.DynamoDBStreamsEventSourceConfig.StartsWith(Constants.ArgumentEnvironmentVariablePrefix, StringComparison.CurrentCultureIgnoreCase))
{
var envVariable = settings.DynamoDBStreamsEventSourceConfig.Substring(Constants.ArgumentEnvironmentVariablePrefix.Length);
if (!environmentVariables.Contains(envVariable))
{
throw new InvalidOperationException($"Environment variable {envVariable} for the DynamoDB Streams event source config was empty");
}
settings.DynamoDBStreamsEventSourceConfig = environmentVariables[envVariable]?.ToString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ public sealed class RunCommandSettings : CommandSettings
[Description("The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=<queue-url>,FunctionName=<function-name>,VisibilityTimeout=100\". Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout")]
public string? SQSEventSourceConfig { get; set; }


/// <summary>
/// The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example "TableName=my-table,FunctionName=function-name,BatchSize=100".
/// Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName
/// </summary>
[CommandOption("--dynamodbstreams-eventsource-config <CONFIG>")]
[Description("The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example \"TableName=<table-name>,FunctionName=<function-name>,BatchSize=100\". Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName")]
public string? DynamoDBStreamsEventSourceConfig { get; set; }
/// <summary>
/// The absolute path used to save global settings and saved requests. You will need to specify a path in order to enable saving global settings and requests.
/// </summary>
Expand Down
Loading