From 7ba1e7ac1fb29aef68f8dd2dc883d5ce331f58c7 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Wed, 6 May 2026 05:55:00 +0000 Subject: [PATCH 01/17] Add DynamoDB Streams event source emulator to Lambda Test Tool Implement DynamoDB Streams polling following the existing SQS event source pattern. The emulator polls DynamoDB Streams for records, converts them to Lambda DynamoDBEvent format, and invokes the connected Lambda function via the emulated runtime API. New CLI option: --dynamodbstreams-eventsource-config Config format: TableName=X,FunctionName=Y,LambdaRuntimeApi=Z,BatchSize=N,Profile=P,Region=R Supports env: prefix for environment variable indirection. New files: - DynamoDBStreamsEventSourceConfig.cs - DynamoDBStreamsEventSourceBackgroundServiceConfig.cs - DynamoDBStreamsEventSourceProcess.cs - DynamoDBStreamsEventSourceBackgroundService.cs Unit tests: - ParseDynamoDBStreamsEventSourceConfigTests.cs - ConvertDynamoDBStreamsRecordTests.cs --- .../Amazon.Lambda.TestTool.csproj | 3 + .../Commands/RunCommand.cs | 21 +- .../Commands/Settings/RunCommandSettings.cs | 8 + ...moDBStreamsEventSourceBackgroundService.cs | 293 ++++++++++++++++++ ...reamsEventSourceBackgroundServiceConfig.cs | 30 ++ .../DynamoDBStreamsEventSourceConfig.cs | 42 +++ .../DynamoDBStreamsEventSourceProcess.cs | 195 ++++++++++++ .../Amazon.Lambda.TestTool.UnitTests.csproj | 3 + .../ConvertDynamoDBStreamsRecordTests.cs | 185 +++++++++++ ...seDynamoDBStreamsEventSourceConfigTests.cs | 107 +++++++ 10 files changed, 885 insertions(+), 2 deletions(-) create mode 100644 Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs create mode 100644 Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs create mode 100644 Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs create mode 100644 Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs create mode 100644 Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs create mode 100644 Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj index 351cf6f4d..01f0fadbe 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj @@ -26,6 +26,9 @@ + + + diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs index 036611cd5..a4cf41e4c 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs @@ -8,6 +8,7 @@ using Amazon.Lambda.TestTool.Models; using Amazon.Lambda.TestTool.Processes; using Amazon.Lambda.TestTool.Processes.SQSEventSource; +using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; using Amazon.Lambda.TestTool.Services; using Amazon.Lambda.TestTool.Services.IO; using Spectre.Console.Cli; @@ -39,10 +40,10 @@ public override async Task ExecuteAsync(CommandContext context, RunCommandS { EvaluateEnvironmentVariables(settings); - if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig)) + if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig) && string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig)) { throw new ArgumentException("At least one of the following parameters must be set: " + - "--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port or --sqs-eventsource-config"); + "--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port, --sqs-eventsource-config or --dynamodbstreams-eventsource-config"); } var tasks = new List(); @@ -87,6 +88,12 @@ public override async Task ExecuteAsync(CommandContext context, RunCommandS { var sqsEventSourceProcess = SQSEventSourceProcess.Startup(settings, cancellationTokenSource.Token); tasks.Add(sqsEventSourceProcess.RunningTask); + } + + if (!string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig)) + { + var dynamoDBStreamsProcess = DynamoDBStreamsEventSourceProcess.Startup(settings, cancellationTokenSource.Token); + tasks.Add(dynamoDBStreamsProcess.RunningTask); } await Task.Run(() => Task.WaitAny(tasks.ToArray(), cancellationTokenSource.Token)); @@ -184,6 +191,16 @@ private void EvaluateEnvironmentVariables(RunCommandSettings settings) throw new InvalidOperationException($"Environment variable {envVariable} for the SQS event source config was empty"); } settings.SQSEventSourceConfig = environmentVariables[envVariable]?.ToString(); + } + + if (settings.DynamoDBStreamsEventSourceConfig != null && settings.DynamoDBStreamsEventSourceConfig.StartsWith(Constants.ArgumentEnvironmentVariablePrefix, StringComparison.CurrentCultureIgnoreCase)) + { + var envVariable = settings.DynamoDBStreamsEventSourceConfig.Substring(Constants.ArgumentEnvironmentVariablePrefix.Length); + if (!environmentVariables.Contains(envVariable)) + { + throw new InvalidOperationException($"Environment variable {envVariable} for the DynamoDB Streams event source config was empty"); + } + settings.DynamoDBStreamsEventSourceConfig = environmentVariables[envVariable]?.ToString(); } } } diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs index 0584eaa30..102490b76 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs @@ -82,6 +82,14 @@ public sealed class RunCommandSettings : CommandSettings [Description("The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=,FunctionName=,VisibilityTimeout=100\". Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout")] public string? SQSEventSourceConfig { get; set; } + + /// + /// The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example "TableName=my-table,FunctionName=function-name,BatchSize=100". + /// Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName + /// + [CommandOption("--dynamodbstreams-eventsource-config ")] + [Description("The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example \"TableName=,FunctionName=,BatchSize=100\". Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName")] + public string? DynamoDBStreamsEventSourceConfig { get; set; } /// /// The absolute path used to save global settings and saved requests. You will need to specify a path in order to enable saving global settings and requests. /// diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs new file mode 100644 index 000000000..46fa4fa12 --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -0,0 +1,293 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Amazon.DynamoDBStreams; +using Amazon.DynamoDBStreams.Model; +using Amazon.Lambda.DynamoDBEvents; +using Amazon.Lambda.Model; +using Amazon.Lambda.TestTool.Services; +using Amazon.Runtime; +using System.Text.Json; + +namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; + +/// +/// IHostedService that will run continually polling a DynamoDB Stream for records and invoking the connected +/// Lambda function with the polled records. +/// +public class DynamoDBStreamsEventSourceBackgroundService : BackgroundService +{ + private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + private readonly ILogger _logger; + private readonly IAmazonDynamoDBStreams _streamsClient; + private readonly ILambdaClient _lambdaClient; + private readonly DynamoDBStreamsEventSourceBackgroundServiceConfig _config; + + /// + /// Constructs instance of . + /// + public DynamoDBStreamsEventSourceBackgroundService( + ILogger logger, + IAmazonDynamoDBStreams streamsClient, + DynamoDBStreamsEventSourceBackgroundServiceConfig config, + ILambdaClient lambdaClient) + { + _logger = logger; + _streamsClient = streamsClient; + _config = config; + _lambdaClient = lambdaClient; + } + + /// + /// Execute the DynamoDBStreamsEventSourceBackgroundService. + /// + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("Starting DynamoDB Streams poller for table: {tableName}", _config.TableName); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + var streamArn = await GetStreamArnForTable(stoppingToken); + if (streamArn == null) + { + _logger.LogWarning("No stream found for table {tableName}. Retrying in 5 seconds.", _config.TableName); + await Task.Delay(5000, stoppingToken); + continue; + } + + await PollStream(streamArn, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (Exception e) + { + _logger.LogWarning(e, "Exception occurred in DynamoDB Streams poller for {tableName}: {message}", _config.TableName, e.Message); + await Task.Delay(3000, stoppingToken); + } + } + } + + private async Task GetStreamArnForTable(CancellationToken stoppingToken) + { + var response = await _streamsClient.ListStreamsAsync(new ListStreamsRequest + { + TableName = _config.TableName + }, stoppingToken); + + // Use the first active stream for the table + return response.Streams.FirstOrDefault()?.StreamArn; + } + + private async Task PollStream(string streamArn, CancellationToken stoppingToken) + { + var shardIterators = await GetShardIterators(streamArn, stoppingToken); + + while (!stoppingToken.IsCancellationRequested) + { + var hasRecords = false; + + for (int i = 0; i < shardIterators.Count; i++) + { + if (stoppingToken.IsCancellationRequested) + return; + + var iterator = shardIterators[i]; + if (iterator == null) + continue; + + var getRecordsResponse = await _streamsClient.GetRecordsAsync(new GetRecordsRequest + { + ShardIterator = iterator, + Limit = _config.BatchSize + }, stoppingToken); + + shardIterators[i] = getRecordsResponse.NextShardIterator; + + if (getRecordsResponse.Records.Count == 0) + continue; + + hasRecords = true; + var lambdaRecords = ConvertToLambdaRecords(getRecordsResponse.Records, streamArn); + + var lambdaPayload = new DynamoDBEvent + { + Records = lambdaRecords + }; + + var invokeRequest = new InvokeRequest + { + InvocationType = InvocationType.RequestResponse, + FunctionName = _config.FunctionName, + Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) + }; + + _logger.LogInformation("Invoking Lambda function {functionName} with {recordCount} DynamoDB stream records", + _config.FunctionName, lambdaRecords.Count); + + var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi); + + if (lambdaResponse.FunctionError != null) + { + _logger.LogError("Invoking Lambda {function} with {recordCount} DynamoDB stream records failed with error {errorMessage}", + _config.FunctionName, lambdaRecords.Count, lambdaResponse.FunctionError); + } + } + + // Check for new shards periodically + if (shardIterators.All(s => s == null)) + { + shardIterators = await GetShardIterators(streamArn, stoppingToken); + if (shardIterators.Count == 0) + { + await Task.Delay(1000, stoppingToken); + } + } + else if (!hasRecords) + { + // No records found, wait before polling again + await Task.Delay(1000, stoppingToken); + } + } + } + + private async Task> GetShardIterators(string streamArn, CancellationToken stoppingToken) + { + var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest + { + StreamArn = streamArn + }, stoppingToken); + + var iterators = new List(); + + foreach (var shard in describeResponse.StreamDescription.Shards) + { + var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest + { + StreamArn = streamArn, + ShardId = shard.ShardId, + ShardIteratorType = ShardIteratorType.LATEST + }, stoppingToken); + + iterators.Add(iteratorResponse.ShardIterator); + } + + return iterators; + } + + /// + /// Convert from the SDK's DynamoDB Streams records to the Lambda event's DynamoDB record type. + /// + internal static IList ConvertToLambdaRecords(List records, string streamArn) + { + return records.Select(r => ConvertToLambdaRecord(r, streamArn)).ToList(); + } + + /// + /// Convert a single SDK stream record to the Lambda event record type. + /// + internal static DynamoDBEvent.DynamodbStreamRecord ConvertToLambdaRecord(Record record, string streamArn) + { + var lambdaRecord = new DynamoDBEvent.DynamodbStreamRecord + { + EventID = record.EventID, + EventName = record.EventName?.Value, + EventSource = "aws:dynamodb", + EventSourceArn = streamArn, + EventVersion = record.EventVersion, + AwsRegion = Arn.Parse(streamArn).Region + }; + + if (record.Dynamodb != null) + { + lambdaRecord.Dynamodb = new DynamoDBEvent.StreamRecord + { + ApproximateCreationDateTime = record.Dynamodb.ApproximateCreationDateTime ?? DateTime.MinValue, + SequenceNumber = record.Dynamodb.SequenceNumber, + SizeBytes = record.Dynamodb.SizeBytes ?? 0, + StreamViewType = record.Dynamodb.StreamViewType?.Value + }; + + if (record.Dynamodb.Keys != null) + { + lambdaRecord.Dynamodb.Keys = ConvertAttributeMap(record.Dynamodb.Keys); + } + + if (record.Dynamodb.NewImage != null) + { + lambdaRecord.Dynamodb.NewImage = ConvertAttributeMap(record.Dynamodb.NewImage); + } + + if (record.Dynamodb.OldImage != null) + { + lambdaRecord.Dynamodb.OldImage = ConvertAttributeMap(record.Dynamodb.OldImage); + } + } + + if (record.UserIdentity != null) + { + lambdaRecord.UserIdentity = new DynamoDBEvent.Identity + { + PrincipalId = record.UserIdentity.PrincipalId, + Type = record.UserIdentity.Type + }; + } + + return lambdaRecord; + } + + /// + /// Convert SDK AttributeValue dictionary to Lambda event AttributeValue dictionary. + /// + internal static Dictionary ConvertAttributeMap(Dictionary sdkMap) + { + var result = new Dictionary(); + foreach (var kvp in sdkMap) + { + result[kvp.Key] = ConvertAttributeValue(kvp.Value); + } + return result; + } + + /// + /// Convert a single SDK AttributeValue to the Lambda event AttributeValue. + /// + internal static DynamoDBEvent.AttributeValue ConvertAttributeValue(AttributeValue sdkValue) + { + var lambdaValue = new DynamoDBEvent.AttributeValue(); + + if (sdkValue.S != null) + lambdaValue.S = sdkValue.S; + if (sdkValue.N != null) + lambdaValue.N = sdkValue.N; + if (sdkValue.B != null) + lambdaValue.B = sdkValue.B; + if (sdkValue.BOOL != null) + lambdaValue.BOOL = sdkValue.BOOL; + if (sdkValue.NULL != null) + lambdaValue.NULL = sdkValue.NULL; + if (sdkValue.SS?.Count > 0) + lambdaValue.SS = sdkValue.SS; + if (sdkValue.NS?.Count > 0) + lambdaValue.NS = sdkValue.NS; + if (sdkValue.BS?.Count > 0) + lambdaValue.BS = sdkValue.BS; + if (sdkValue.L?.Count > 0) + lambdaValue.L = sdkValue.L.Select(ConvertAttributeValue).ToList(); + if (sdkValue.M?.Count > 0) + lambdaValue.M = ConvertAttributeMap(sdkValue.M); + + return lambdaValue; + } +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs new file mode 100644 index 000000000..3a6de651c --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs @@ -0,0 +1,30 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; + +/// +/// Configuration for the service. +/// +public class DynamoDBStreamsEventSourceBackgroundServiceConfig +{ + /// + /// The batch size to read from the stream and send to the Lambda function. + /// + public required int BatchSize { get; init; } = DynamoDBStreamsEventSourceProcess.DefaultBatchSize; + + /// + /// The Lambda function to send the DynamoDB stream records to. + /// + public required string FunctionName { get; init; } + + /// + /// The endpoint where the emulated Lambda runtime API is running. + /// + public required string LambdaRuntimeApi { get; init; } + + /// + /// The DynamoDB table name to read streams from. + /// + public required string TableName { get; init; } +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs new file mode 100644 index 000000000..af4b9ea49 --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs @@ -0,0 +1,42 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; + +/// +/// This class represents the input values from the user for DynamoDB Streams event source configuration. +/// +internal class DynamoDBStreamsEventSourceConfig +{ + /// + /// The batch size to read from the stream and send to the Lambda function. + /// + public int? BatchSize { get; set; } + + /// + /// The Lambda function to send the DynamoDB stream records to. + /// If not set the default function will be used. + /// + public string? FunctionName { get; set; } + + /// + /// The endpoint where the emulated Lambda runtime API is running. + /// If not set the current Test Tool instance will be used. + /// + public string? LambdaRuntimeApi { get; set; } + + /// + /// The AWS profile to use for credentials. + /// + public string? Profile { get; set; } + + /// + /// The AWS region the DynamoDB table is in. + /// + public string? Region { get; set; } + + /// + /// The DynamoDB table name to read streams from. + /// + public string? TableName { get; set; } +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs new file mode 100644 index 000000000..2d7cb08f0 --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs @@ -0,0 +1,195 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Amazon.DynamoDBStreams; +using Amazon.Lambda.TestTool.Commands.Settings; +using Amazon.Lambda.TestTool.Services; +using System.Text.Json; + +namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; + +/// +/// Process for handling DynamoDB Streams event source for Lambda functions. +/// +public class DynamoDBStreamsEventSourceProcess +{ + internal const int DefaultBatchSize = 100; + + /// + /// The Parent task for all the tasks started for each DynamoDB Streams event source. + /// + public required Task RunningTask { get; init; } + + /// + /// Startup DynamoDB Streams event sources + /// + public static DynamoDBStreamsEventSourceProcess Startup(RunCommandSettings settings, CancellationToken cancellationToken = default) + { + if (string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig)) + { + throw new InvalidOperationException($"The {nameof(RunCommandSettings.DynamoDBStreamsEventSourceConfig)} can not be null when starting the DynamoDB Streams event source process"); + } + + var configs = LoadDynamoDBStreamsEventSourceConfig(settings.DynamoDBStreamsEventSourceConfig); + + var tasks = new List(); + + foreach (var config in configs) + { + var builder = Host.CreateApplicationBuilder(); + + var ddbConfig = new AmazonDynamoDBStreamsConfig(); + if (!string.IsNullOrEmpty(config.Profile)) + { + ddbConfig.Profile = new Profile(config.Profile); + } + + if (!string.IsNullOrEmpty(config.Region)) + { + ddbConfig.RegionEndpoint = RegionEndpoint.GetBySystemName(config.Region); + } + + var streamsClient = new AmazonDynamoDBStreamsClient(ddbConfig); + builder.Services.AddSingleton(streamsClient); + builder.Services.AddSingleton(); + + var tableName = config.TableName; + if (string.IsNullOrEmpty(tableName)) + { + throw new InvalidOperationException("TableName is a required property for DynamoDB Streams event source config"); + } + + var lambdaRuntimeApi = config.LambdaRuntimeApi; + if (string.IsNullOrEmpty(lambdaRuntimeApi)) + { + if (!settings.LambdaEmulatorPort.HasValue) + { + throw new InvalidOperationException("No Lambda runtime api endpoint was given as part of the DynamoDB Streams event source config and the current " + + "instance of the test tool is not running the Lambda runtime api. Either provide a Lambda runtime api endpoint or set a port for " + + "the lambda runtime api when starting the test tool."); + } + lambdaRuntimeApi = $"http://{settings.LambdaEmulatorHost}:{settings.LambdaEmulatorPort}/"; + } + + var backgroundServiceConfig = new DynamoDBStreamsEventSourceBackgroundServiceConfig + { + BatchSize = config.BatchSize ?? DefaultBatchSize, + FunctionName = config.FunctionName ?? LambdaRuntimeApi.DefaultFunctionName, + LambdaRuntimeApi = lambdaRuntimeApi, + TableName = tableName + }; + + builder.Services.AddSingleton(backgroundServiceConfig); + builder.Services.AddHostedService(); + + var app = builder.Build(); + var task = app.RunAsync(cancellationToken); + tasks.Add(task); + } + + return new DynamoDBStreamsEventSourceProcess + { + RunningTask = Task.WhenAll(tasks) + }; + } + + /// + /// Load the DynamoDB Streams event source configs. Supports JSON or comma-delimited key-value pair format. + /// If the value points to a file that exists, the file contents will be read. + /// + internal static List LoadDynamoDBStreamsEventSourceConfig(string configString) + { + if (File.Exists(configString)) + { + configString = File.ReadAllText(configString); + } + + configString = configString.Trim(); + + List? configs = null; + + var jsonOptions = new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }; + + if (configString.StartsWith('[')) + { + try + { + configs = JsonSerializer.Deserialize>(configString, jsonOptions); + if (configs == null) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString); + } + } + catch (JsonException e) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString, e); + } + } + else if (configString.StartsWith('{')) + { + try + { + var config = JsonSerializer.Deserialize(configString, jsonOptions); + if (config == null) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString); + } + + configs = new List { config }; + } + catch (JsonException e) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString, e); + } + } + else + { + var config = new DynamoDBStreamsEventSourceConfig(); + var tokens = configString.Split(','); + foreach (var token in tokens) + { + if (string.IsNullOrWhiteSpace(token)) + continue; + + var keyValuePair = token.Split('='); + if (keyValuePair.Length != 2) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source config. Format should be \"TableName=,FunctionName=,...\""); + } + + switch (keyValuePair[0].ToLower().Trim()) + { + case "batchsize": + if (!int.TryParse(keyValuePair[1].Trim(), out var batchSize)) + { + throw new InvalidOperationException("Value for batch size is not a formatted integer"); + } + config.BatchSize = batchSize; + break; + case "functionname": + config.FunctionName = keyValuePair[1].Trim(); + break; + case "lambdaruntimeapi": + config.LambdaRuntimeApi = keyValuePair[1].Trim(); + break; + case "profile": + config.Profile = keyValuePair[1].Trim(); + break; + case "region": + config.Region = keyValuePair[1].Trim(); + break; + case "tablename": + config.TableName = keyValuePair[1].Trim(); + break; + } + } + + configs = new List { config }; + } + + return configs; + } +} diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj index 524f55fcd..7a799ca1e 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj @@ -15,6 +15,9 @@ + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs new file mode 100644 index 000000000..56fafd59a --- /dev/null +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs @@ -0,0 +1,185 @@ +using Amazon.DynamoDBStreams; +using Amazon.DynamoDBStreams.Model; +using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; +using Xunit; +using Record = Amazon.DynamoDBStreams.Model.Record; + +namespace Amazon.Lambda.TestTool.UnitTests.DynamoDBStreamsEventSource; + +public class ConvertDynamoDBStreamsRecordTests +{ + private const string TestStreamArn = "arn:aws:dynamodb:us-west-2:123456789012:table/my-table/stream/2024-01-01T00:00:00.000"; + + [Fact] + public void ConvertBasicRecord() + { + var record = new Record + { + EventID = "event-123", + EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"), + EventVersion = "1.1", + EventSource = "aws:dynamodb", + Dynamodb = new StreamRecord + { + ApproximateCreationDateTime = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc), + SequenceNumber = "111111111111111111111", + SizeBytes = 256, + StreamViewType = new StreamViewType("NEW_AND_OLD_IMAGES"), + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + }, + NewImage = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" }, + ["Name"] = new AttributeValue { S = "Test Item" } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + Assert.Equal("event-123", result.EventID); + Assert.Equal("INSERT", result.EventName); + Assert.Equal("aws:dynamodb", result.EventSource); + Assert.Equal(TestStreamArn, result.EventSourceArn); + Assert.Equal("1.1", result.EventVersion); + Assert.Equal("us-west-2", result.AwsRegion); + + Assert.NotNull(result.Dynamodb); + Assert.Equal("111111111111111111111", result.Dynamodb.SequenceNumber); + Assert.Equal(256, result.Dynamodb.SizeBytes); + Assert.Equal("NEW_AND_OLD_IMAGES", result.Dynamodb.StreamViewType); + + Assert.Single(result.Dynamodb.Keys); + Assert.Equal("key-1", result.Dynamodb.Keys["Id"].S); + + Assert.Equal(2, result.Dynamodb.NewImage.Count); + Assert.Equal("key-1", result.Dynamodb.NewImage["Id"].S); + Assert.Equal("Test Item", result.Dynamodb.NewImage["Name"].S); + } + + [Fact] + public void ConvertRecordWithAllAttributeTypes() + { + var record = new Record + { + EventID = "event-456", + EventName = new Amazon.DynamoDBStreams.OperationType("MODIFY"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + }, + NewImage = new Dictionary + { + ["StringAttr"] = new AttributeValue { S = "hello" }, + ["NumberAttr"] = new AttributeValue { N = "42" }, + ["BoolAttr"] = new AttributeValue { BOOL = true }, + ["NullAttr"] = new AttributeValue { NULL = true }, + ["ListAttr"] = new AttributeValue + { + L = new List + { + new AttributeValue { S = "item1" }, + new AttributeValue { N = "2" } + } + }, + ["MapAttr"] = new AttributeValue + { + M = new Dictionary + { + ["nested"] = new AttributeValue { S = "value" } + } + }, + ["StringSetAttr"] = new AttributeValue { SS = new List { "a", "b" } }, + ["NumberSetAttr"] = new AttributeValue { NS = new List { "1", "2" } } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + var newImage = result.Dynamodb.NewImage; + Assert.Equal("hello", newImage["StringAttr"].S); + Assert.Equal("42", newImage["NumberAttr"].N); + Assert.True(newImage["BoolAttr"].BOOL); + Assert.True(newImage["NullAttr"].NULL); + Assert.Equal(2, newImage["ListAttr"].L.Count); + Assert.Equal("item1", newImage["ListAttr"].L[0].S); + Assert.Equal("value", newImage["MapAttr"].M["nested"].S); + Assert.Equal(new List { "a", "b" }, newImage["StringSetAttr"].SS); + Assert.Equal(new List { "1", "2" }, newImage["NumberSetAttr"].NS); + } + + [Fact] + public void ConvertRecordWithUserIdentity() + { + var record = new Record + { + EventID = "event-789", + EventName = new Amazon.DynamoDBStreams.OperationType("REMOVE"), + EventVersion = "1.1", + UserIdentity = new Identity + { + PrincipalId = "dynamodb.amazonaws.com", + Type = "Service" + }, + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "expired-item" } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + Assert.NotNull(result.UserIdentity); + Assert.Equal("dynamodb.amazonaws.com", result.UserIdentity.PrincipalId); + Assert.Equal("Service", result.UserIdentity.Type); + } + + [Fact] + public void ConvertMultipleRecords() + { + var records = new List + { + new Record + { + EventID = "event-1", + EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + } + } + }, + new Record + { + EventID = "event-2", + EventName = new Amazon.DynamoDBStreams.OperationType("MODIFY"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-2" } + } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecords(records, TestStreamArn); + + Assert.Equal(2, result.Count); + Assert.Equal("event-1", result[0].EventID); + Assert.Equal("event-2", result[1].EventID); + } +} diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs new file mode 100644 index 000000000..836d36c22 --- /dev/null +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs @@ -0,0 +1,107 @@ +using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; +using Xunit; + +namespace Amazon.Lambda.TestTool.UnitTests.DynamoDBStreamsEventSource; + +public class ParseDynamoDBStreamsEventSourceConfigTests +{ + [Fact] + public void ParseValidJsonObject() + { + string json = """ +{ + "TableName" : "my-table", + "FunctionName" : "LambdaFunction", + "BatchSize" : 50, + "LambdaRuntimeApi" : "http://localhost:7777/", + "Profile" : "beta", + "Region" : "us-east-23" +} +"""; + + var configs = DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json); + Assert.Single(configs); + Assert.Equal("my-table", configs[0].TableName); + Assert.Equal("LambdaFunction", configs[0].FunctionName); + Assert.Equal(50, configs[0].BatchSize); + Assert.Equal("http://localhost:7777/", configs[0].LambdaRuntimeApi); + Assert.Equal("beta", configs[0].Profile); + Assert.Equal("us-east-23", configs[0].Region); + } + + [Fact] + public void ParseInvalidJsonObject() + { + string json = """ +{ + "aaa" +} +"""; + + Assert.Throws(() => DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json)); + } + + [Fact] + public void ParseValidJsonArray() + { + string json = """ +[ + { + "TableName" : "table-1", + "FunctionName" : "Function1", + "BatchSize" : 25 + }, + { + "TableName" : "table-2", + "FunctionName" : "Function2", + "BatchSize" : 75 + } +] +"""; + + var configs = DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json); + Assert.Equal(2, configs.Count); + Assert.Equal("table-1", configs[0].TableName); + Assert.Equal("Function1", configs[0].FunctionName); + Assert.Equal(25, configs[0].BatchSize); + Assert.Equal("table-2", configs[1].TableName); + Assert.Equal("Function2", configs[1].FunctionName); + Assert.Equal(75, configs[1].BatchSize); + } + + [Fact] + public void ParseInvalidJsonArray() + { + string json = """ +[ + {"aaa"} +] +"""; + + Assert.Throws(() => DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json)); + } + + [Fact] + public void ParseKeyPairs() + { + var configs = DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig( + "TableName=my-table ,functionName =LambdaFunction, batchSize=50," + + "LambdaRuntimeApi=http://localhost:7777/ ,Profile=beta,Region=us-east-23"); + + Assert.Single(configs); + Assert.Equal("my-table", configs[0].TableName); + Assert.Equal("LambdaFunction", configs[0].FunctionName); + Assert.Equal(50, configs[0].BatchSize); + Assert.Equal("http://localhost:7777/", configs[0].LambdaRuntimeApi); + Assert.Equal("beta", configs[0].Profile); + Assert.Equal("us-east-23", configs[0].Region); + } + + [Theory] + [InlineData("novalue")] + [InlineData("BatchSize=noint")] + public void InvalidKeyPairString(string keyPairConfig) + { + Assert.Throws(() => DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(keyPairConfig)); + } +} From 36df4ef37dbc574156c209b8181b1435754801dd Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Tue, 5 May 2026 23:47:30 -0700 Subject: [PATCH 02/17] Add DevConfig file --- .../changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json diff --git a/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json b/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json new file mode 100644 index 000000000..3dd6f20c7 --- /dev/null +++ b/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.TestTool", + "Type": "Minor", + "ChangelogMessages": [ + "Add support emulating Lambda DynamoDB Stream event source" + ] + } + ] +} \ No newline at end of file From 340bc678f3c6eed552407c961db53da5dd8c02a4 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Tue, 5 May 2026 23:47:30 -0700 Subject: [PATCH 03/17] Add DynamoDB Streams event source emulator to Lambda Test Tool Address PR review comments: - Poll shards concurrently using Task.WhenAll instead of sequential loop - Re-discover shards when any iterator becomes null (not just when all are null) - Paginate DescribeStream to handle streams with many shards - Preserve empty List/Map attribute values (non-null but empty collections) - Add unit tests for Binary (B), Binary Set (BS), and empty List/Map conversion --- .../9b9bb131-676c-4fc3-85a0-78b86d128d58.json | 11 ++ ...moDBStreamsEventSourceBackgroundService.cs | 100 ++++++++++++------ .../ConvertDynamoDBStreamsRecordTests.cs | 71 +++++++++++++ 3 files changed, 152 insertions(+), 30 deletions(-) create mode 100644 .autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json diff --git a/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json b/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json new file mode 100644 index 000000000..3dd6f20c7 --- /dev/null +++ b/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.TestTool", + "Type": "Minor", + "ChangelogMessages": [ + "Add support emulating Lambda DynamoDB Stream event source" + ] + } + ] +} \ No newline at end of file diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 46fa4fa12..72e400a18 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -96,30 +96,44 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) while (!stoppingToken.IsCancellationRequested) { - var hasRecords = false; - + // Poll all shards concurrently + var tasks = new List>(); for (int i = 0; i < shardIterators.Count; i++) { - if (stoppingToken.IsCancellationRequested) - return; - - var iterator = shardIterators[i]; - if (iterator == null) + if (shardIterators[i] == null) continue; - var getRecordsResponse = await _streamsClient.GetRecordsAsync(new GetRecordsRequest + var index = i; + var iterator = shardIterators[i]!; + tasks.Add(PollShard(index, iterator, stoppingToken)); + } + + if (tasks.Count == 0) + { + // All iterators exhausted — re-discover shards + shardIterators = await GetShardIterators(streamArn, stoppingToken); + if (shardIterators.Count == 0) { - ShardIterator = iterator, - Limit = _config.BatchSize - }, stoppingToken); + await Task.Delay(1000, stoppingToken); + } + continue; + } + + var results = await Task.WhenAll(tasks); + + var hasRecords = false; + foreach (var (index, response) in results) + { + if (response == null) + continue; - shardIterators[i] = getRecordsResponse.NextShardIterator; + shardIterators[index] = response.NextShardIterator; - if (getRecordsResponse.Records.Count == 0) + if (response.Records.Count == 0) continue; hasRecords = true; - var lambdaRecords = ConvertToLambdaRecords(getRecordsResponse.Records, streamArn); + var lambdaRecords = ConvertToLambdaRecords(response.Records, streamArn); var lambdaPayload = new DynamoDBEvent { @@ -145,33 +159,59 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) } } - // Check for new shards periodically - if (shardIterators.All(s => s == null)) + // Re-discover shards when any iterator becomes null (shard closed/split) + if (shardIterators.Any(s => s == null)) { - shardIterators = await GetShardIterators(streamArn, stoppingToken); - if (shardIterators.Count == 0) + var newShards = await GetShardIterators(streamArn, stoppingToken); + // Merge: keep active iterators, add any new ones + var activeIterators = shardIterators.Where(s => s != null).ToList(); + foreach (var newIter in newShards) { - await Task.Delay(1000, stoppingToken); + if (newIter != null) + activeIterators.Add(newIter); } + shardIterators = activeIterators; } - else if (!hasRecords) + + if (!hasRecords) { - // No records found, wait before polling again await Task.Delay(1000, stoppingToken); } } } - private async Task> GetShardIterators(string streamArn, CancellationToken stoppingToken) + private async Task<(int Index, GetRecordsResponse? Response)> PollShard(int index, string iterator, CancellationToken stoppingToken) { - var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest + var response = await _streamsClient.GetRecordsAsync(new GetRecordsRequest { - StreamArn = streamArn + ShardIterator = iterator, + Limit = _config.BatchSize }, stoppingToken); + return (index, response); + } + + private async Task> GetShardIterators(string streamArn, CancellationToken stoppingToken) + { + var shards = new List(); + string? lastEvaluatedShardId = null; + + // Paginate through all shards + do + { + var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest + { + StreamArn = streamArn, + ExclusiveStartShardId = lastEvaluatedShardId + }, stoppingToken); + + shards.AddRange(describeResponse.StreamDescription.Shards); + lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId; + } while (lastEvaluatedShardId != null); + var iterators = new List(); - foreach (var shard in describeResponse.StreamDescription.Shards) + foreach (var shard in shards) { var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest { @@ -277,15 +317,15 @@ internal static DynamoDBEvent.AttributeValue ConvertAttributeValue(AttributeValu lambdaValue.BOOL = sdkValue.BOOL; if (sdkValue.NULL != null) lambdaValue.NULL = sdkValue.NULL; - if (sdkValue.SS?.Count > 0) + if (sdkValue.SS != null) lambdaValue.SS = sdkValue.SS; - if (sdkValue.NS?.Count > 0) + if (sdkValue.NS != null) lambdaValue.NS = sdkValue.NS; - if (sdkValue.BS?.Count > 0) + if (sdkValue.BS != null) lambdaValue.BS = sdkValue.BS; - if (sdkValue.L?.Count > 0) + if (sdkValue.L != null) lambdaValue.L = sdkValue.L.Select(ConvertAttributeValue).ToList(); - if (sdkValue.M?.Count > 0) + if (sdkValue.M != null) lambdaValue.M = ConvertAttributeMap(sdkValue.M); return lambdaValue; diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs index 56fafd59a..d47483ac2 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs @@ -143,6 +143,77 @@ public void ConvertRecordWithUserIdentity() Assert.Equal("Service", result.UserIdentity.Type); } + [Fact] + public void ConvertRecordWithBinaryAttributes() + { + var binaryData = new byte[] { 0x01, 0x02, 0x03, 0xFF }; + var binarySet = new List + { + new MemoryStream(new byte[] { 0xAA, 0xBB }), + new MemoryStream(new byte[] { 0xCC, 0xDD }) + }; + + var record = new Record + { + EventID = "event-bin", + EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + }, + NewImage = new Dictionary + { + ["BinaryAttr"] = new AttributeValue { B = new MemoryStream(binaryData) }, + ["BinarySetAttr"] = new AttributeValue { BS = binarySet } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + var newImage = result.Dynamodb.NewImage; + Assert.NotNull(newImage["BinaryAttr"].B); + Assert.Equal(binaryData, newImage["BinaryAttr"].B.ToArray()); + Assert.NotNull(newImage["BinarySetAttr"].BS); + Assert.Equal(2, newImage["BinarySetAttr"].BS.Count); + Assert.Equal(new byte[] { 0xAA, 0xBB }, newImage["BinarySetAttr"].BS[0].ToArray()); + Assert.Equal(new byte[] { 0xCC, 0xDD }, newImage["BinarySetAttr"].BS[1].ToArray()); + } + + [Fact] + public void ConvertRecordWithEmptyListAndMap() + { + var record = new Record + { + EventID = "event-empty", + EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + }, + NewImage = new Dictionary + { + ["EmptyList"] = new AttributeValue { L = new List() }, + ["EmptyMap"] = new AttributeValue { M = new Dictionary() } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + var newImage = result.Dynamodb.NewImage; + Assert.NotNull(newImage["EmptyList"].L); + Assert.Empty(newImage["EmptyList"].L); + Assert.NotNull(newImage["EmptyMap"].M); + Assert.Empty(newImage["EmptyMap"].M); + } + [Fact] public void ConvertMultipleRecords() { From ecac16951d644ed10d7945cb39c8a7cec9ccb5b5 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Wed, 6 May 2026 17:36:59 +0000 Subject: [PATCH 04/17] Support stream ARN as direct input to DynamoDB Streams event source When the TableName config value is already a stream ARN (starts with 'arn:' and contains '/stream/'), skip the ListStreams lookup and use it directly. --- .../DynamoDBStreamsEventSourceBackgroundService.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 72e400a18..5b21bb0a9 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -81,6 +81,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task GetStreamArnForTable(CancellationToken stoppingToken) { + // If the configured value is already a stream ARN, use it directly + if (_config.TableName.StartsWith("arn:") && _config.TableName.Contains("/stream/")) + { + return _config.TableName; + } + var response = await _streamsClient.ListStreamsAsync(new ListStreamsRequest { TableName = _config.TableName From db4fd0bfaf5fcb486a8d00ebf679f90471ef9d30 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Wed, 6 May 2026 20:57:28 +0000 Subject: [PATCH 05/17] Make ShardIteratorType and PollingIntervalMs configurable, fix shard re-discovery - Add ShardIteratorType config (default: LATEST) propagated from Aspire options - Add PollingIntervalMs config (default: 1000) for tunable poll frequency - Fix shard re-discovery to only replace exhausted iterators, preserving active ones - Add key-value parsing for new config options --- ...moDBStreamsEventSourceBackgroundService.cs | 23 ++++++++++++------- ...reamsEventSourceBackgroundServiceConfig.cs | 10 ++++++++ .../DynamoDBStreamsEventSourceConfig.cs | 12 ++++++++++ .../DynamoDBStreamsEventSourceProcess.cs | 15 +++++++++++- 4 files changed, 51 insertions(+), 9 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 5b21bb0a9..04a1bb28a 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -169,19 +169,26 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) if (shardIterators.Any(s => s == null)) { var newShards = await GetShardIterators(streamArn, stoppingToken); - // Merge: keep active iterators, add any new ones - var activeIterators = shardIterators.Where(s => s != null).ToList(); - foreach (var newIter in newShards) + // Replace only null entries with new iterators, keep active ones + for (int i = 0; i < shardIterators.Count; i++) { - if (newIter != null) - activeIterators.Add(newIter); + if (shardIterators[i] == null && i < newShards.Count) + { + shardIterators[i] = newShards[i]; + } } - shardIterators = activeIterators; + // Append any additional new shards discovered + for (int i = shardIterators.Count; i < newShards.Count; i++) + { + shardIterators.Add(newShards[i]); + } + // Remove remaining null entries (fully exhausted shards with no replacement) + shardIterators = shardIterators.Where(s => s != null).ToList(); } if (!hasRecords) { - await Task.Delay(1000, stoppingToken); + await Task.Delay(_config.PollingIntervalMs, stoppingToken); } } } @@ -223,7 +230,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) { StreamArn = streamArn, ShardId = shard.ShardId, - ShardIteratorType = ShardIteratorType.LATEST + ShardIteratorType = new ShardIteratorType(_config.ShardIteratorType) }, stoppingToken); iterators.Add(iteratorResponse.ShardIterator); diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs index 3a6de651c..81a21371e 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs @@ -27,4 +27,14 @@ public class DynamoDBStreamsEventSourceBackgroundServiceConfig /// The DynamoDB table name to read streams from. /// public required string TableName { get; init; } + + /// + /// The shard iterator type to use when reading from the stream. + /// + public required string ShardIteratorType { get; init; } = "LATEST"; + + /// + /// The polling interval in milliseconds between stream reads when no records are found. + /// + public required int PollingIntervalMs { get; init; } = DynamoDBStreamsEventSourceProcess.DefaultPollingIntervalMs; } diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs index af4b9ea49..9ed883ea9 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs @@ -39,4 +39,16 @@ internal class DynamoDBStreamsEventSourceConfig /// The DynamoDB table name to read streams from. /// public string? TableName { get; set; } + + /// + /// The shard iterator type to use when reading from the stream. + /// Valid values: LATEST, TRIM_HORIZON. Default is LATEST. + /// + public string? ShardIteratorType { get; set; } + + /// + /// The polling interval in milliseconds between stream reads when no records are found. + /// Default is 1000. + /// + public int? PollingIntervalMs { get; set; } } diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs index 2d7cb08f0..20ca5257a 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs @@ -14,6 +14,7 @@ namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; public class DynamoDBStreamsEventSourceProcess { internal const int DefaultBatchSize = 100; + internal const int DefaultPollingIntervalMs = 1000; /// /// The Parent task for all the tasks started for each DynamoDB Streams event source. @@ -76,7 +77,9 @@ public static DynamoDBStreamsEventSourceProcess Startup(RunCommandSettings setti BatchSize = config.BatchSize ?? DefaultBatchSize, FunctionName = config.FunctionName ?? LambdaRuntimeApi.DefaultFunctionName, LambdaRuntimeApi = lambdaRuntimeApi, - TableName = tableName + TableName = tableName, + ShardIteratorType = config.ShardIteratorType ?? "LATEST", + PollingIntervalMs = config.PollingIntervalMs ?? DefaultPollingIntervalMs }; builder.Services.AddSingleton(backgroundServiceConfig); @@ -184,6 +187,16 @@ internal static List LoadDynamoDBStreamsEventS case "tablename": config.TableName = keyValuePair[1].Trim(); break; + case "sharditeratortype": + config.ShardIteratorType = keyValuePair[1].Trim(); + break; + case "pollingintervalms": + if (!int.TryParse(keyValuePair[1].Trim(), out var pollingInterval)) + { + throw new InvalidOperationException("Value for polling interval is not a formatted integer"); + } + config.PollingIntervalMs = pollingInterval; + break; } } From 2a50358a68e5bd82bcac8551b198de73b1b38612 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Wed, 6 May 2026 21:23:30 +0000 Subject: [PATCH 06/17] Use DescribeTable instead of ListStreams to get stream ARN Reworked GetStreamArnForTable to use the DynamoDB DescribeTable API instead of the DynamoDB Streams ListStreams API. DescribeTable returns the LatestStreamArn property which guarantees we get the active stream for the table rather than potentially picking up a stale stream from ListStreams results. --- .../DynamoDBStreamsEventSourceBackgroundService.cs | 13 ++++++------- .../DynamoDBStreamsEventSourceProcess.cs | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 04a1bb28a..1ed47117f 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -1,6 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +using Amazon.DynamoDBv2; using Amazon.DynamoDBStreams; using Amazon.DynamoDBStreams.Model; using Amazon.Lambda.DynamoDBEvents; @@ -23,6 +24,7 @@ public class DynamoDBStreamsEventSourceBackgroundService : BackgroundService }; private readonly ILogger _logger; + private readonly IAmazonDynamoDB _ddbClient; private readonly IAmazonDynamoDBStreams _streamsClient; private readonly ILambdaClient _lambdaClient; private readonly DynamoDBStreamsEventSourceBackgroundServiceConfig _config; @@ -32,11 +34,13 @@ public class DynamoDBStreamsEventSourceBackgroundService : BackgroundService /// public DynamoDBStreamsEventSourceBackgroundService( ILogger logger, + IAmazonDynamoDB ddbClient, IAmazonDynamoDBStreams streamsClient, DynamoDBStreamsEventSourceBackgroundServiceConfig config, ILambdaClient lambdaClient) { _logger = logger; + _ddbClient = ddbClient; _streamsClient = streamsClient; _config = config; _lambdaClient = lambdaClient; @@ -87,13 +91,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) return _config.TableName; } - var response = await _streamsClient.ListStreamsAsync(new ListStreamsRequest - { - TableName = _config.TableName - }, stoppingToken); - - // Use the first active stream for the table - return response.Streams.FirstOrDefault()?.StreamArn; + var response = await _ddbClient.DescribeTableAsync(_config.TableName, stoppingToken); + return response.Table.LatestStreamArn; } private async Task PollStream(string streamArn, CancellationToken stoppingToken) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs index 20ca5257a..bacfc2de5 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs @@ -1,6 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +using Amazon.DynamoDBv2; using Amazon.DynamoDBStreams; using Amazon.Lambda.TestTool.Commands.Settings; using Amazon.Lambda.TestTool.Services; @@ -52,6 +53,19 @@ public static DynamoDBStreamsEventSourceProcess Startup(RunCommandSettings setti var streamsClient = new AmazonDynamoDBStreamsClient(ddbConfig); builder.Services.AddSingleton(streamsClient); + + var ddbClientConfig = new AmazonDynamoDBConfig(); + if (!string.IsNullOrEmpty(config.Profile)) + { + ddbClientConfig.Profile = new Profile(config.Profile); + } + if (!string.IsNullOrEmpty(config.Region)) + { + ddbClientConfig.RegionEndpoint = RegionEndpoint.GetBySystemName(config.Region); + } + var ddbClient = new AmazonDynamoDBClient(ddbClientConfig); + builder.Services.AddSingleton(ddbClient); + builder.Services.AddSingleton(); var tableName = config.TableName; From f3f69cf1431c12269ab10a42a2a2657eecb0d253 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Wed, 6 May 2026 21:26:30 +0000 Subject: [PATCH 07/17] Add logging to DynamoDB Streams background service Added logging for shard discovery, polling cycles, sleep intervals, and shard re-discovery events to aid debugging stream processing. --- .../DynamoDBStreamsEventSourceBackgroundService.cs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 1ed47117f..9a2f32703 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -88,10 +88,13 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) // If the configured value is already a stream ARN, use it directly if (_config.TableName.StartsWith("arn:") && _config.TableName.Contains("/stream/")) { + _logger.LogInformation("Using provided stream ARN directly: {streamArn}", _config.TableName); return _config.TableName; } + _logger.LogInformation("Looking up latest stream ARN for table {tableName}", _config.TableName); var response = await _ddbClient.DescribeTableAsync(_config.TableName, stoppingToken); + _logger.LogInformation("Resolved stream ARN: {streamArn}", response.Table.LatestStreamArn); return response.Table.LatestStreamArn; } @@ -113,12 +116,16 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) tasks.Add(PollShard(index, iterator, stoppingToken)); } + _logger.LogDebug("Polling {activeShardCount} active shard(s)", tasks.Count); + if (tasks.Count == 0) { // All iterators exhausted — re-discover shards + _logger.LogInformation("All shard iterators exhausted, re-discovering shards"); shardIterators = await GetShardIterators(streamArn, stoppingToken); if (shardIterators.Count == 0) { + _logger.LogDebug("No shards found, sleeping 1000ms before retry"); await Task.Delay(1000, stoppingToken); } continue; @@ -167,6 +174,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) // Re-discover shards when any iterator becomes null (shard closed/split) if (shardIterators.Any(s => s == null)) { + _logger.LogInformation("Detected closed/split shards, re-discovering shard iterators"); var newShards = await GetShardIterators(streamArn, stoppingToken); // Replace only null entries with new iterators, keep active ones for (int i = 0; i < shardIterators.Count; i++) @@ -187,6 +195,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) if (!hasRecords) { + _logger.LogDebug("No records found, sleeping {pollingInterval}ms", _config.PollingIntervalMs); await Task.Delay(_config.PollingIntervalMs, stoppingToken); } } @@ -205,6 +214,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) private async Task> GetShardIterators(string streamArn, CancellationToken stoppingToken) { + _logger.LogInformation("Discovering shards for stream {streamArn}", streamArn); var shards = new List(); string? lastEvaluatedShardId = null; @@ -221,6 +231,8 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId; } while (lastEvaluatedShardId != null); + _logger.LogInformation("Discovered {shardCount} shard(s) for stream", shards.Count); + var iterators = new List(); foreach (var shard in shards) From b38b02adf7f187ee001955696ea1a76e0570f2d2 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Wed, 6 May 2026 22:54:20 +0000 Subject: [PATCH 08/17] Reduce shard re-discovery frequency for quiet tables Instead of calling DescribeStream every time any shard iterator becomes null (which happens frequently on quiet tables as DynamoDB closes idle shards), simply remove exhausted shards from the active list. Full re-discovery only happens when ALL shards are exhausted. --- ...moDBStreamsEventSourceBackgroundService.cs | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 9a2f32703..8d11d9500 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -171,25 +171,12 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) } } - // Re-discover shards when any iterator becomes null (shard closed/split) - if (shardIterators.Any(s => s == null)) + // Remove exhausted shards (null iterators). New shards from splits will be + // picked up when all iterators are exhausted and full re-discovery runs. + var exhaustedCount = shardIterators.Count(s => s == null); + if (exhaustedCount > 0) { - _logger.LogInformation("Detected closed/split shards, re-discovering shard iterators"); - var newShards = await GetShardIterators(streamArn, stoppingToken); - // Replace only null entries with new iterators, keep active ones - for (int i = 0; i < shardIterators.Count; i++) - { - if (shardIterators[i] == null && i < newShards.Count) - { - shardIterators[i] = newShards[i]; - } - } - // Append any additional new shards discovered - for (int i = shardIterators.Count; i < newShards.Count; i++) - { - shardIterators.Add(newShards[i]); - } - // Remove remaining null entries (fully exhausted shards with no replacement) + _logger.LogDebug("Removing {exhaustedCount} exhausted shard(s)", exhaustedCount); shardIterators = shardIterators.Where(s => s != null).ToList(); } From 7eb67401f4c17c2be25be12831742d66f08d31e0 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Wed, 6 May 2026 23:34:44 +0000 Subject: [PATCH 09/17] Fix stream polling: filter to open shards only, re-discover on shard close, add diagnostic logging --- ...moDBStreamsEventSourceBackgroundService.cs | 58 +++++++++++++++---- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 8d11d9500..c506cbb8b 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -101,6 +101,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task PollStream(string streamArn, CancellationToken stoppingToken) { var shardIterators = await GetShardIterators(streamArn, stoppingToken); + var emptyPollCount = 0; while (!stoppingToken.IsCancellationRequested) { @@ -116,16 +117,18 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) tasks.Add(PollShard(index, iterator, stoppingToken)); } - _logger.LogDebug("Polling {activeShardCount} active shard(s)", tasks.Count); + var activeCount = tasks.Count; + _logger.LogInformation("Polling {activeShardCount} active shard(s) out of {totalCount} total", activeCount, shardIterators.Count); - if (tasks.Count == 0) + if (activeCount == 0) { // All iterators exhausted — re-discover shards _logger.LogInformation("All shard iterators exhausted, re-discovering shards"); shardIterators = await GetShardIterators(streamArn, stoppingToken); + emptyPollCount = 0; if (shardIterators.Count == 0) { - _logger.LogDebug("No shards found, sleeping 1000ms before retry"); + _logger.LogInformation("No shards found, sleeping 1000ms before retry"); await Task.Delay(1000, stoppingToken); } continue; @@ -134,17 +137,27 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) var results = await Task.WhenAll(tasks); var hasRecords = false; + var exhaustedInThisPoll = 0; foreach (var (index, response) in results) { if (response == null) continue; + // Log when a shard iterator becomes null (shard closed) + if (response.NextShardIterator == null) + { + exhaustedInThisPoll++; + _logger.LogInformation("Shard at index {index} has been closed (NextShardIterator is null), records in final batch: {count}", + index, response.Records.Count); + } + shardIterators[index] = response.NextShardIterator; if (response.Records.Count == 0) continue; hasRecords = true; + _logger.LogInformation("Retrieved {recordCount} record(s) from shard index {index}", response.Records.Count, index); var lambdaRecords = ConvertToLambdaRecords(response.Records, streamArn); var lambdaPayload = new DynamoDBEvent @@ -171,18 +184,35 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) } } - // Remove exhausted shards (null iterators). New shards from splits will be - // picked up when all iterators are exhausted and full re-discovery runs. + // Remove exhausted shards (null iterators) and re-discover to pick up child shards var exhaustedCount = shardIterators.Count(s => s == null); if (exhaustedCount > 0) { - _logger.LogDebug("Removing {exhaustedCount} exhausted shard(s)", exhaustedCount); - shardIterators = shardIterators.Where(s => s != null).ToList(); + _logger.LogInformation("Removing {exhaustedCount} exhausted shard(s), re-discovering to find child shards", exhaustedCount); + // Re-discover shards immediately when any shard closes, since new records + // will be on child shards that we don't have iterators for yet. + shardIterators = await GetShardIterators(streamArn, stoppingToken); + emptyPollCount = 0; + continue; } - if (!hasRecords) + if (hasRecords) + { + emptyPollCount = 0; + } + else { - _logger.LogDebug("No records found, sleeping {pollingInterval}ms", _config.PollingIntervalMs); + emptyPollCount++; + // After many empty polls, re-discover shards in case the stream topology changed + if (emptyPollCount >= 30) + { + _logger.LogInformation("No records after {count} consecutive polls, re-discovering shards", emptyPollCount); + shardIterators = await GetShardIterators(streamArn, stoppingToken); + emptyPollCount = 0; + continue; + } + + _logger.LogInformation("No records found (empty poll #{count}), sleeping {pollingInterval}ms", emptyPollCount, _config.PollingIntervalMs); await Task.Delay(_config.PollingIntervalMs, stoppingToken); } } @@ -218,11 +248,16 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId; } while (lastEvaluatedShardId != null); - _logger.LogInformation("Discovered {shardCount} shard(s) for stream", shards.Count); + _logger.LogInformation("Discovered {shardCount} total shard(s) for stream", shards.Count); + + // Only get iterators for open (leaf) shards — shards without an EndingSequenceNumber. + // Closed shards with LATEST iterator type will never return new records. + var openShards = shards.Where(s => s.SequenceNumberRange?.EndingSequenceNumber == null).ToList(); + _logger.LogInformation("Filtered to {openCount} open (leaf) shard(s) out of {totalCount} total", openShards.Count, shards.Count); var iterators = new List(); - foreach (var shard in shards) + foreach (var shard in openShards) { var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest { @@ -231,6 +266,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) ShardIteratorType = new ShardIteratorType(_config.ShardIteratorType) }, stoppingToken); + _logger.LogInformation("Got iterator for shard {shardId}", shard.ShardId); iterators.Add(iteratorResponse.ShardIterator); } From 592106da496a21d6a0662d084f02fddcd9ef6a1c Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Wed, 6 May 2026 23:37:29 +0000 Subject: [PATCH 10/17] Change default ShardIteratorType from LATEST to TRIM_HORIZON --- .../DynamoDBStreamsEventSourceConfig.cs | 2 +- .../DynamoDBStreamsEventSourceProcess.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs index 9ed883ea9..5c553f8bf 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs @@ -42,7 +42,7 @@ internal class DynamoDBStreamsEventSourceConfig /// /// The shard iterator type to use when reading from the stream. - /// Valid values: LATEST, TRIM_HORIZON. Default is LATEST. + /// Valid values: LATEST, TRIM_HORIZON. Default is TRIM_HORIZON. /// public string? ShardIteratorType { get; set; } diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs index bacfc2de5..33bd5bb98 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs @@ -92,7 +92,7 @@ public static DynamoDBStreamsEventSourceProcess Startup(RunCommandSettings setti FunctionName = config.FunctionName ?? LambdaRuntimeApi.DefaultFunctionName, LambdaRuntimeApi = lambdaRuntimeApi, TableName = tableName, - ShardIteratorType = config.ShardIteratorType ?? "LATEST", + ShardIteratorType = config.ShardIteratorType ?? "TRIM_HORIZON", PollingIntervalMs = config.PollingIntervalMs ?? DefaultPollingIntervalMs }; From 5be6cd6328eb0d31babbf6d9077d07f23036224f Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Wed, 6 May 2026 23:56:01 +0000 Subject: [PATCH 11/17] Fix DynamoDB Streams checkpointing - preserve iterator position across shard re-discoveries Track shard iterators by shard ID in a dictionary instead of a positional list. On shard re-discovery, only create new iterators for newly discovered shards - existing shards keep their current NextShardIterator position. This prevents re-processing of already-invoked records when shards are re-discovered after a shard closes or after empty poll thresholds. --- ...moDBStreamsEventSourceBackgroundService.cs | 77 ++++++++++--------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index c506cbb8b..7012e8003 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -100,21 +100,22 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task PollStream(string streamArn, CancellationToken stoppingToken) { - var shardIterators = await GetShardIterators(streamArn, stoppingToken); + // Track iterators by shard ID to preserve position across re-discoveries + var shardIterators = await GetShardIterators(streamArn, null, stoppingToken); var emptyPollCount = 0; while (!stoppingToken.IsCancellationRequested) { // Poll all shards concurrently - var tasks = new List>(); - for (int i = 0; i < shardIterators.Count; i++) + var shardIds = shardIterators.Keys.ToList(); + var tasks = new List>(); + foreach (var shardId in shardIds) { - if (shardIterators[i] == null) + var iterator = shardIterators[shardId]; + if (iterator == null) continue; - var index = i; - var iterator = shardIterators[i]!; - tasks.Add(PollShard(index, iterator, stoppingToken)); + tasks.Add(PollShard(shardId, iterator, stoppingToken)); } var activeCount = tasks.Count; @@ -122,9 +123,9 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) if (activeCount == 0) { - // All iterators exhausted — re-discover shards + // All iterators exhausted — re-discover shards but preserve existing positions _logger.LogInformation("All shard iterators exhausted, re-discovering shards"); - shardIterators = await GetShardIterators(streamArn, stoppingToken); + shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken); emptyPollCount = 0; if (shardIterators.Count == 0) { @@ -137,27 +138,27 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) var results = await Task.WhenAll(tasks); var hasRecords = false; - var exhaustedInThisPoll = 0; - foreach (var (index, response) in results) + foreach (var (shardId, response) in results) { if (response == null) continue; - // Log when a shard iterator becomes null (shard closed) if (response.NextShardIterator == null) { - exhaustedInThisPoll++; - _logger.LogInformation("Shard at index {index} has been closed (NextShardIterator is null), records in final batch: {count}", - index, response.Records.Count); + _logger.LogInformation("Shard {shardId} has been closed (NextShardIterator is null), records in final batch: {count}", + shardId, response.Records.Count); + shardIterators.Remove(shardId); + } + else + { + shardIterators[shardId] = response.NextShardIterator; } - - shardIterators[index] = response.NextShardIterator; if (response.Records.Count == 0) continue; hasRecords = true; - _logger.LogInformation("Retrieved {recordCount} record(s) from shard index {index}", response.Records.Count, index); + _logger.LogInformation("Retrieved {recordCount} record(s) from shard {shardId}", response.Records.Count, shardId); var lambdaRecords = ConvertToLambdaRecords(response.Records, streamArn); var lambdaPayload = new DynamoDBEvent @@ -184,14 +185,11 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) } } - // Remove exhausted shards (null iterators) and re-discover to pick up child shards - var exhaustedCount = shardIterators.Count(s => s == null); - if (exhaustedCount > 0) + // If any shards were removed (closed), re-discover to pick up child shards + if (results.Any(r => r.Response?.NextShardIterator == null)) { - _logger.LogInformation("Removing {exhaustedCount} exhausted shard(s), re-discovering to find child shards", exhaustedCount); - // Re-discover shards immediately when any shard closes, since new records - // will be on child shards that we don't have iterators for yet. - shardIterators = await GetShardIterators(streamArn, stoppingToken); + _logger.LogInformation("Closed shard(s) detected, re-discovering to find child shards"); + shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken); emptyPollCount = 0; continue; } @@ -203,11 +201,10 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) else { emptyPollCount++; - // After many empty polls, re-discover shards in case the stream topology changed if (emptyPollCount >= 30) { _logger.LogInformation("No records after {count} consecutive polls, re-discovering shards", emptyPollCount); - shardIterators = await GetShardIterators(streamArn, stoppingToken); + shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken); emptyPollCount = 0; continue; } @@ -218,7 +215,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) } } - private async Task<(int Index, GetRecordsResponse? Response)> PollShard(int index, string iterator, CancellationToken stoppingToken) + private async Task<(string ShardId, GetRecordsResponse? Response)> PollShard(string shardId, string iterator, CancellationToken stoppingToken) { var response = await _streamsClient.GetRecordsAsync(new GetRecordsRequest { @@ -226,16 +223,19 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) Limit = _config.BatchSize }, stoppingToken); - return (index, response); + return (shardId, response); } - private async Task> GetShardIterators(string streamArn, CancellationToken stoppingToken) + /// + /// Discover open shards and get iterators. If existingIterators is provided, only creates new iterators + /// for shards not already tracked — preserving the stream position for known shards. + /// + private async Task> GetShardIterators(string streamArn, Dictionary? existingIterators, CancellationToken stoppingToken) { _logger.LogInformation("Discovering shards for stream {streamArn}", streamArn); var shards = new List(); string? lastEvaluatedShardId = null; - // Paginate through all shards do { var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest @@ -250,15 +250,20 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) _logger.LogInformation("Discovered {shardCount} total shard(s) for stream", shards.Count); - // Only get iterators for open (leaf) shards — shards without an EndingSequenceNumber. - // Closed shards with LATEST iterator type will never return new records. var openShards = shards.Where(s => s.SequenceNumberRange?.EndingSequenceNumber == null).ToList(); _logger.LogInformation("Filtered to {openCount} open (leaf) shard(s) out of {totalCount} total", openShards.Count, shards.Count); - var iterators = new List(); + var iterators = new Dictionary(); foreach (var shard in openShards) { + // Preserve existing iterator position for shards we're already tracking + if (existingIterators != null && existingIterators.TryGetValue(shard.ShardId, out var existingIterator) && existingIterator != null) + { + iterators[shard.ShardId] = existingIterator; + continue; + } + var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest { StreamArn = streamArn, @@ -266,8 +271,8 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) ShardIteratorType = new ShardIteratorType(_config.ShardIteratorType) }, stoppingToken); - _logger.LogInformation("Got iterator for shard {shardId}", shard.ShardId); - iterators.Add(iteratorResponse.ShardIterator); + _logger.LogInformation("Got new iterator for shard {shardId}", shard.ShardId); + iterators[shard.ShardId] = iteratorResponse.ShardIterator; } return iterators; From fd43290d469157a4bf742691294dc68fb4b5c55e Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Thu, 7 May 2026 17:08:13 +0000 Subject: [PATCH 12/17] Rework DynamoDB Streams polling to only process records created after startup - Remove ShardIteratorType configuration (now internally managed) - At startup: use LATEST iterator for open shards, track closed shards to skip - On re-discovery (every 30s or on shard exhaustion): preserve existing iterators, skip shards closed at startup, use TRIM_HORIZON for newly discovered shards - This ensures only records written after the test tool starts are delivered to Lambda --- ...moDBStreamsEventSourceBackgroundService.cs | 153 ++++++++++-------- ...reamsEventSourceBackgroundServiceConfig.cs | 5 - .../DynamoDBStreamsEventSourceConfig.cs | 6 - .../DynamoDBStreamsEventSourceProcess.cs | 4 - 4 files changed, 88 insertions(+), 80 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 7012e8003..dbef1c6c7 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -100,36 +100,37 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task PollStream(string streamArn, CancellationToken stoppingToken) { - // Track iterators by shard ID to preserve position across re-discoveries - var shardIterators = await GetShardIterators(streamArn, null, stoppingToken); - var emptyPollCount = 0; + // Discover initial shards — use LATEST for open shards, track closed shards to skip later + var closedAtStartup = new HashSet(); + var shardIterators = await DiscoverInitialShards(streamArn, closedAtStartup, stoppingToken); + + _logger.LogInformation("Initial discovery: {openCount} open shard(s), {closedCount} closed shard(s) at startup", + shardIterators.Count, closedAtStartup.Count); + + var lastDiscoveryTime = DateTime.UtcNow; + const int ShardRediscoveryIntervalSeconds = 30; while (!stoppingToken.IsCancellationRequested) { - // Poll all shards concurrently - var shardIds = shardIterators.Keys.ToList(); + // Poll all active shards concurrently var tasks = new List>(); - foreach (var shardId in shardIds) + foreach (var (shardId, iterator) in shardIterators) { - var iterator = shardIterators[shardId]; if (iterator == null) continue; - tasks.Add(PollShard(shardId, iterator, stoppingToken)); } var activeCount = tasks.Count; - _logger.LogInformation("Polling {activeShardCount} active shard(s) out of {totalCount} total", activeCount, shardIterators.Count); + _logger.LogInformation("Polling {activeShardCount} active shard(s)", activeCount); if (activeCount == 0) { - // All iterators exhausted — re-discover shards but preserve existing positions - _logger.LogInformation("All shard iterators exhausted, re-discovering shards"); - shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken); - emptyPollCount = 0; + // No active shards — re-discover + shardIterators = await DiscoverNewShards(streamArn, shardIterators, closedAtStartup, stoppingToken); + lastDiscoveryTime = DateTime.UtcNow; if (shardIterators.Count == 0) { - _logger.LogInformation("No shards found, sleeping 1000ms before retry"); await Task.Delay(1000, stoppingToken); } continue; @@ -138,6 +139,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) var results = await Task.WhenAll(tasks); var hasRecords = false; + var shardExhausted = false; foreach (var (shardId, response) in results) { if (response == null) @@ -145,9 +147,10 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) if (response.NextShardIterator == null) { - _logger.LogInformation("Shard {shardId} has been closed (NextShardIterator is null), records in final batch: {count}", + _logger.LogInformation("Shard {shardId} exhausted (closed), records in final batch: {count}", shardId, response.Records.Count); shardIterators.Remove(shardId); + shardExhausted = true; } else { @@ -161,11 +164,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) _logger.LogInformation("Retrieved {recordCount} record(s) from shard {shardId}", response.Records.Count, shardId); var lambdaRecords = ConvertToLambdaRecords(response.Records, streamArn); - var lambdaPayload = new DynamoDBEvent - { - Records = lambdaRecords - }; - + var lambdaPayload = new DynamoDBEvent { Records = lambdaRecords }; var invokeRequest = new InvokeRequest { InvocationType = InvocationType.RequestResponse, @@ -185,31 +184,19 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) } } - // If any shards were removed (closed), re-discover to pick up child shards - if (results.Any(r => r.Response?.NextShardIterator == null)) + // Re-discover if a shard was exhausted or 30 seconds have elapsed + var timeSinceDiscovery = (DateTime.UtcNow - lastDiscoveryTime).TotalSeconds; + if (shardExhausted || timeSinceDiscovery >= ShardRediscoveryIntervalSeconds) { - _logger.LogInformation("Closed shard(s) detected, re-discovering to find child shards"); - shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken); - emptyPollCount = 0; + _logger.LogInformation("Re-discovering shards (exhausted={shardExhausted}, elapsed={elapsed}s)", + shardExhausted, (int)timeSinceDiscovery); + shardIterators = await DiscoverNewShards(streamArn, shardIterators, closedAtStartup, stoppingToken); + lastDiscoveryTime = DateTime.UtcNow; continue; } - if (hasRecords) - { - emptyPollCount = 0; - } - else + if (!hasRecords) { - emptyPollCount++; - if (emptyPollCount >= 30) - { - _logger.LogInformation("No records after {count} consecutive polls, re-discovering shards", emptyPollCount); - shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken); - emptyPollCount = 0; - continue; - } - - _logger.LogInformation("No records found (empty poll #{count}), sleeping {pollingInterval}ms", emptyPollCount, _config.PollingIntervalMs); await Task.Delay(_config.PollingIntervalMs, stoppingToken); } } @@ -227,57 +214,93 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) } /// - /// Discover open shards and get iterators. If existingIterators is provided, only creates new iterators - /// for shards not already tracked — preserving the stream position for known shards. + /// Initial shard discovery at startup. Uses LATEST for open shards and records closed shard IDs. /// - private async Task> GetShardIterators(string streamArn, Dictionary? existingIterators, CancellationToken stoppingToken) + private async Task> DiscoverInitialShards(string streamArn, HashSet closedAtStartup, CancellationToken stoppingToken) { - _logger.LogInformation("Discovering shards for stream {streamArn}", streamArn); - var shards = new List(); - string? lastEvaluatedShardId = null; + var shards = await GetAllShards(streamArn, stoppingToken); + var iterators = new Dictionary(); - do + foreach (var shard in shards) { - var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest + var isClosed = shard.SequenceNumberRange?.EndingSequenceNumber != null; + if (isClosed) + { + closedAtStartup.Add(shard.ShardId); + continue; + } + + // Open shard — use LATEST to only get records created after startup + var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest { StreamArn = streamArn, - ExclusiveStartShardId = lastEvaluatedShardId + ShardId = shard.ShardId, + ShardIteratorType = ShardIteratorType.LATEST }, stoppingToken); - shards.AddRange(describeResponse.StreamDescription.Shards); - lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId; - } while (lastEvaluatedShardId != null); - - _logger.LogInformation("Discovered {shardCount} total shard(s) for stream", shards.Count); + _logger.LogInformation("Got LATEST iterator for startup shard {shardId}", shard.ShardId); + iterators[shard.ShardId] = iteratorResponse.ShardIterator; + } - var openShards = shards.Where(s => s.SequenceNumberRange?.EndingSequenceNumber == null).ToList(); - _logger.LogInformation("Filtered to {openCount} open (leaf) shard(s) out of {totalCount} total", openShards.Count, shards.Count); + return iterators; + } - var iterators = new Dictionary(); + /// + /// Ongoing shard discovery. Preserves existing iterators, skips shards closed at startup, + /// and starts TRIM_HORIZON pollers for any new shards (even if closed). + /// + private async Task> DiscoverNewShards(string streamArn, Dictionary existingIterators, HashSet closedAtStartup, CancellationToken stoppingToken) + { + var shards = await GetAllShards(streamArn, stoppingToken); + var iterators = new Dictionary(existingIterators); - foreach (var shard in openShards) + foreach (var shard in shards) { - // Preserve existing iterator position for shards we're already tracking - if (existingIterators != null && existingIterators.TryGetValue(shard.ShardId, out var existingIterator) && existingIterator != null) - { - iterators[shard.ShardId] = existingIterator; + // Already being polled — leave iterator alone + if (iterators.ContainsKey(shard.ShardId)) + continue; + + // Was closed at startup — skip + if (closedAtStartup.Contains(shard.ShardId)) continue; - } + // New shard discovered after startup — use TRIM_HORIZON to read all its records var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest { StreamArn = streamArn, ShardId = shard.ShardId, - ShardIteratorType = new ShardIteratorType(_config.ShardIteratorType) + ShardIteratorType = ShardIteratorType.TRIM_HORIZON }, stoppingToken); - _logger.LogInformation("Got new iterator for shard {shardId}", shard.ShardId); + _logger.LogInformation("Got TRIM_HORIZON iterator for new shard {shardId}", shard.ShardId); iterators[shard.ShardId] = iteratorResponse.ShardIterator; } return iterators; } + private async Task> GetAllShards(string streamArn, CancellationToken stoppingToken) + { + _logger.LogInformation("Discovering shards for stream {streamArn}", streamArn); + var shards = new List(); + string? lastEvaluatedShardId = null; + + do + { + var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest + { + StreamArn = streamArn, + ExclusiveStartShardId = lastEvaluatedShardId + }, stoppingToken); + + shards.AddRange(describeResponse.StreamDescription.Shards); + lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId; + } while (lastEvaluatedShardId != null); + + _logger.LogInformation("Discovered {shardCount} shard(s)", shards.Count); + return shards; + } + /// /// Convert from the SDK's DynamoDB Streams records to the Lambda event's DynamoDB record type. /// diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs index 81a21371e..07e4a4b58 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs @@ -28,11 +28,6 @@ public class DynamoDBStreamsEventSourceBackgroundServiceConfig /// public required string TableName { get; init; } - /// - /// The shard iterator type to use when reading from the stream. - /// - public required string ShardIteratorType { get; init; } = "LATEST"; - /// /// The polling interval in milliseconds between stream reads when no records are found. /// diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs index 5c553f8bf..4a1524e52 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs @@ -40,12 +40,6 @@ internal class DynamoDBStreamsEventSourceConfig /// public string? TableName { get; set; } - /// - /// The shard iterator type to use when reading from the stream. - /// Valid values: LATEST, TRIM_HORIZON. Default is TRIM_HORIZON. - /// - public string? ShardIteratorType { get; set; } - /// /// The polling interval in milliseconds between stream reads when no records are found. /// Default is 1000. diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs index 33bd5bb98..cbf7bfd4e 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs @@ -92,7 +92,6 @@ public static DynamoDBStreamsEventSourceProcess Startup(RunCommandSettings setti FunctionName = config.FunctionName ?? LambdaRuntimeApi.DefaultFunctionName, LambdaRuntimeApi = lambdaRuntimeApi, TableName = tableName, - ShardIteratorType = config.ShardIteratorType ?? "TRIM_HORIZON", PollingIntervalMs = config.PollingIntervalMs ?? DefaultPollingIntervalMs }; @@ -201,9 +200,6 @@ internal static List LoadDynamoDBStreamsEventS case "tablename": config.TableName = keyValuePair[1].Trim(); break; - case "sharditeratortype": - config.ShardIteratorType = keyValuePair[1].Trim(); - break; case "pollingintervalms": if (!int.TryParse(keyValuePair[1].Trim(), out var pollingInterval)) { From 0799ed085800be95146d9129a308f66da0de2d81 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Thu, 7 May 2026 17:11:28 +0000 Subject: [PATCH 13/17] Add high-level strategy comment to PollStream explaining shard management --- ...DynamoDBStreamsEventSourceBackgroundService.cs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index dbef1c6c7..70b5c5d9d 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -100,7 +100,20 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task PollStream(string streamArn, CancellationToken stoppingToken) { - // Discover initial shards — use LATEST for open shards, track closed shards to skip later + // Shard polling strategy: + // + // Goal: Only deliver records to Lambda that were written AFTER the test tool started. + // + // 1. At startup, discover all shards. Open shards get a LATEST iterator (future records only). + // Closed shards are recorded in a "closed at startup" set and never polled — they contain + // only historical data from before the tool started. + // + // 2. Every 30 seconds (or immediately when a shard is exhausted), re-discover shards: + // - Shards already being polled: leave their iterator alone (preserves position). + // - Shards in the "closed at startup" set: skip (pre-existing historical data). + // - Any other shard (new since startup): poll with TRIM_HORIZON to read all its records, + // since the shard was created after the tool started and all its data is relevant. + var closedAtStartup = new HashSet(); var shardIterators = await DiscoverInitialShards(streamArn, closedAtStartup, stoppingToken); From b32a71aca6c1225cad66ffe7e13c8270ec5ac9f3 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Thu, 7 May 2026 11:58:41 -0700 Subject: [PATCH 14/17] Logging fixes --- .../DynamoDBStreamsEventSourceBackgroundService.cs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 70b5c5d9d..4819bd57a 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -135,7 +135,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) } var activeCount = tasks.Count; - _logger.LogInformation("Polling {activeShardCount} active shard(s)", activeCount); + _logger.LogDebug("Polling {activeShardCount} active shard(s)", activeCount); if (activeCount == 0) { @@ -161,7 +161,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) if (response.NextShardIterator == null) { _logger.LogInformation("Shard {shardId} exhausted (closed), records in final batch: {count}", - shardId, response.Records.Count); + shardId, response.Records?.Count); shardIterators.Remove(shardId); shardExhausted = true; } @@ -170,7 +170,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) shardIterators[shardId] = response.NextShardIterator; } - if (response.Records.Count == 0) + if (response.Records == null || response.Records.Count == 0) continue; hasRecords = true; @@ -254,6 +254,8 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) _logger.LogInformation("Got LATEST iterator for startup shard {shardId}", shard.ShardId); iterators[shard.ShardId] = iteratorResponse.ShardIterator; } + _logger.LogInformation("Initial shard discovery complete: {openCount} open shard(s), {closedCount} closed shard(s) at startup", + iterators.Count, closedAtStartup.Count); return iterators; } @@ -294,7 +296,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) private async Task> GetAllShards(string streamArn, CancellationToken stoppingToken) { - _logger.LogInformation("Discovering shards for stream {streamArn}", streamArn); + _logger.LogDebug("Discovering shards for stream {streamArn}", streamArn); var shards = new List(); string? lastEvaluatedShardId = null; @@ -310,7 +312,7 @@ private async Task> GetAllShards(string streamArn, CancellationToken lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId; } while (lastEvaluatedShardId != null); - _logger.LogInformation("Discovered {shardCount} shard(s)", shards.Count); + _logger.LogDebug("There were {shardCount} shard(s) returned from DescribeStream", shards.Count); return shards; } From 83c1986cb1f0d4276087c8e75634fa79fd40a4a3 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Mon, 11 May 2026 17:28:27 -0700 Subject: [PATCH 15/17] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../Commands/Settings/RunCommandSettings.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs index 102490b76..f4ab3d57f 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs @@ -82,14 +82,14 @@ public sealed class RunCommandSettings : CommandSettings [Description("The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=,FunctionName=,VisibilityTimeout=100\". Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout")] public string? SQSEventSourceConfig { get; set; } - - /// - /// The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example "TableName=my-table,FunctionName=function-name,BatchSize=100". - /// Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName - /// - [CommandOption("--dynamodbstreams-eventsource-config ")] - [Description("The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example \"TableName=,FunctionName=,BatchSize=100\". Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName")] - public string? DynamoDBStreamsEventSourceConfig { get; set; } + + /// + /// The configuration for the DynamoDB Streams event source. The config can be provided as comma delimited key pairs, a JSON object, a JSON array, or a file path to a JSON configuration file. For example "TableName=my-table,FunctionName=function-name,BatchSize=100". + /// Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, PollingIntervalMs, Profile, Region, TableName + /// + [CommandOption("--dynamodbstreams-eventsource-config ")] + [Description("The configuration for the DynamoDB Streams event source. The config can be provided as comma delimited key pairs, a JSON object, a JSON array, or a file path to a JSON configuration file. For example \"TableName=,FunctionName=,BatchSize=100\". Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, PollingIntervalMs, Profile, Region, TableName")] + public string? DynamoDBStreamsEventSourceConfig { get; set; } /// /// The absolute path used to save global settings and saved requests. You will need to specify a path in order to enable saving global settings and requests. /// From ef12e8b88693d47ad0a609650900d1fb54867984 Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Mon, 11 May 2026 17:43:48 -0700 Subject: [PATCH 16/17] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- ...moDBStreamsEventSourceBackgroundService.cs | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 4819bd57a..7c6b831af 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -217,13 +217,25 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken) private async Task<(string ShardId, GetRecordsResponse? Response)> PollShard(string shardId, string iterator, CancellationToken stoppingToken) { - var response = await _streamsClient.GetRecordsAsync(new GetRecordsRequest + try { - ShardIterator = iterator, - Limit = _config.BatchSize - }, stoppingToken); + var response = await _streamsClient.GetRecordsAsync(new GetRecordsRequest + { + ShardIterator = iterator, + Limit = _config.BatchSize + }, stoppingToken); - return (shardId, response); + return (shardId, response); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to poll records for shard {shardId}", shardId); + return (shardId, null); + } } /// From e92ca68d405d102d5656ab557e609bf3370a872d Mon Sep 17 00:00:00 2001 From: Norm Johanson Date: Mon, 11 May 2026 17:44:50 -0700 Subject: [PATCH 17/17] Address PR comments --- .../DynamoDBStreamsEventSourceBackgroundService.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs index 4819bd57a..540340402 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -23,7 +23,7 @@ public class DynamoDBStreamsEventSourceBackgroundService : BackgroundService PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly IAmazonDynamoDB _ddbClient; private readonly IAmazonDynamoDBStreams _streamsClient; private readonly ILambdaClient _lambdaClient; @@ -33,7 +33,7 @@ public class DynamoDBStreamsEventSourceBackgroundService : BackgroundService /// Constructs instance of . /// public DynamoDBStreamsEventSourceBackgroundService( - ILogger logger, + ILogger logger, IAmazonDynamoDB ddbClient, IAmazonDynamoDBStreams streamsClient, DynamoDBStreamsEventSourceBackgroundServiceConfig config,