diff --git a/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json b/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json new file mode 100644 index 000000000..3dd6f20c7 --- /dev/null +++ b/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.TestTool", + "Type": "Minor", + "ChangelogMessages": [ + "Add support emulating Lambda DynamoDB Stream event source" + ] + } + ] +} \ No newline at end of file diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj index b8e97f6fa..116a13598 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj @@ -26,6 +26,9 @@ + + + diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs index f65d7bad1..d4397b301 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs @@ -8,6 +8,7 @@ using Amazon.Lambda.TestTool.Models; using Amazon.Lambda.TestTool.Processes; using Amazon.Lambda.TestTool.Processes.SQSEventSource; +using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; using Amazon.Lambda.TestTool.Services; using Amazon.Lambda.TestTool.Services.IO; using Spectre.Console.Cli; @@ -39,10 +40,10 @@ public override async Task ExecuteAsync(CommandContext context, RunCommandS { EvaluateEnvironmentVariables(settings); - if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig)) + if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig) && string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig)) { throw new ArgumentException("At least one of the following parameters must be set: " + - "--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port or --sqs-eventsource-config"); + "--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port, --sqs-eventsource-config or --dynamodbstreams-eventsource-config"); } var tasks = new List(); @@ -87,6 +88,12 @@ public override async Task ExecuteAsync(CommandContext context, RunCommandS { var sqsEventSourceProcess = SQSEventSourceProcess.Startup(settings, cancellationTokenSource.Token); tasks.Add(sqsEventSourceProcess.RunningTask); + } + + if (!string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig)) + { + var dynamoDBStreamsProcess = DynamoDBStreamsEventSourceProcess.Startup(settings, cancellationTokenSource.Token); + tasks.Add(dynamoDBStreamsProcess.RunningTask); } await Task.Run(() => Task.WaitAny(tasks.ToArray(), cancellationTokenSource.Token)); @@ -184,6 +191,16 @@ private void EvaluateEnvironmentVariables(RunCommandSettings settings) throw new InvalidOperationException($"Environment variable {envVariable} for the SQS event source config was empty"); } settings.SQSEventSourceConfig = environmentVariables[envVariable]?.ToString(); + } + + if (settings.DynamoDBStreamsEventSourceConfig != null && settings.DynamoDBStreamsEventSourceConfig.StartsWith(Constants.ArgumentEnvironmentVariablePrefix, StringComparison.CurrentCultureIgnoreCase)) + { + var envVariable = settings.DynamoDBStreamsEventSourceConfig.Substring(Constants.ArgumentEnvironmentVariablePrefix.Length); + if (!environmentVariables.Contains(envVariable)) + { + throw new InvalidOperationException($"Environment variable {envVariable} for the DynamoDB Streams event source config was empty"); + } + settings.DynamoDBStreamsEventSourceConfig = environmentVariables[envVariable]?.ToString(); } } } diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs index 0584eaa30..102490b76 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs @@ -82,6 +82,14 @@ public sealed class RunCommandSettings : CommandSettings [Description("The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=,FunctionName=,VisibilityTimeout=100\". Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout")] public string? SQSEventSourceConfig { get; set; } + + /// + /// The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example "TableName=my-table,FunctionName=function-name,BatchSize=100". + /// Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName + /// + [CommandOption("--dynamodbstreams-eventsource-config ")] + [Description("The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example \"TableName=,FunctionName=,BatchSize=100\". Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName")] + public string? DynamoDBStreamsEventSourceConfig { get; set; } /// /// The absolute path used to save global settings and saved requests. You will need to specify a path in order to enable saving global settings and requests. /// diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs new file mode 100644 index 000000000..4819bd57a --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs @@ -0,0 +1,423 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Amazon.DynamoDBv2; +using Amazon.DynamoDBStreams; +using Amazon.DynamoDBStreams.Model; +using Amazon.Lambda.DynamoDBEvents; +using Amazon.Lambda.Model; +using Amazon.Lambda.TestTool.Services; +using Amazon.Runtime; +using System.Text.Json; + +namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; + +/// +/// IHostedService that will run continually polling a DynamoDB Stream for records and invoking the connected +/// Lambda function with the polled records. +/// +public class DynamoDBStreamsEventSourceBackgroundService : BackgroundService +{ + private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + private readonly ILogger _logger; + private readonly IAmazonDynamoDB _ddbClient; + private readonly IAmazonDynamoDBStreams _streamsClient; + private readonly ILambdaClient _lambdaClient; + private readonly DynamoDBStreamsEventSourceBackgroundServiceConfig _config; + + /// + /// Constructs instance of . + /// + public DynamoDBStreamsEventSourceBackgroundService( + ILogger logger, + IAmazonDynamoDB ddbClient, + IAmazonDynamoDBStreams streamsClient, + DynamoDBStreamsEventSourceBackgroundServiceConfig config, + ILambdaClient lambdaClient) + { + _logger = logger; + _ddbClient = ddbClient; + _streamsClient = streamsClient; + _config = config; + _lambdaClient = lambdaClient; + } + + /// + /// Execute the DynamoDBStreamsEventSourceBackgroundService. + /// + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("Starting DynamoDB Streams poller for table: {tableName}", _config.TableName); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + var streamArn = await GetStreamArnForTable(stoppingToken); + if (streamArn == null) + { + _logger.LogWarning("No stream found for table {tableName}. Retrying in 5 seconds.", _config.TableName); + await Task.Delay(5000, stoppingToken); + continue; + } + + await PollStream(streamArn, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (Exception e) + { + _logger.LogWarning(e, "Exception occurred in DynamoDB Streams poller for {tableName}: {message}", _config.TableName, e.Message); + await Task.Delay(3000, stoppingToken); + } + } + } + + private async Task GetStreamArnForTable(CancellationToken stoppingToken) + { + // If the configured value is already a stream ARN, use it directly + if (_config.TableName.StartsWith("arn:") && _config.TableName.Contains("/stream/")) + { + _logger.LogInformation("Using provided stream ARN directly: {streamArn}", _config.TableName); + return _config.TableName; + } + + _logger.LogInformation("Looking up latest stream ARN for table {tableName}", _config.TableName); + var response = await _ddbClient.DescribeTableAsync(_config.TableName, stoppingToken); + _logger.LogInformation("Resolved stream ARN: {streamArn}", response.Table.LatestStreamArn); + return response.Table.LatestStreamArn; + } + + private async Task PollStream(string streamArn, CancellationToken stoppingToken) + { + // Shard polling strategy: + // + // Goal: Only deliver records to Lambda that were written AFTER the test tool started. + // + // 1. At startup, discover all shards. Open shards get a LATEST iterator (future records only). + // Closed shards are recorded in a "closed at startup" set and never polled — they contain + // only historical data from before the tool started. + // + // 2. Every 30 seconds (or immediately when a shard is exhausted), re-discover shards: + // - Shards already being polled: leave their iterator alone (preserves position). + // - Shards in the "closed at startup" set: skip (pre-existing historical data). + // - Any other shard (new since startup): poll with TRIM_HORIZON to read all its records, + // since the shard was created after the tool started and all its data is relevant. + + var closedAtStartup = new HashSet(); + var shardIterators = await DiscoverInitialShards(streamArn, closedAtStartup, stoppingToken); + + _logger.LogInformation("Initial discovery: {openCount} open shard(s), {closedCount} closed shard(s) at startup", + shardIterators.Count, closedAtStartup.Count); + + var lastDiscoveryTime = DateTime.UtcNow; + const int ShardRediscoveryIntervalSeconds = 30; + + while (!stoppingToken.IsCancellationRequested) + { + // Poll all active shards concurrently + var tasks = new List>(); + foreach (var (shardId, iterator) in shardIterators) + { + if (iterator == null) + continue; + tasks.Add(PollShard(shardId, iterator, stoppingToken)); + } + + var activeCount = tasks.Count; + _logger.LogDebug("Polling {activeShardCount} active shard(s)", activeCount); + + if (activeCount == 0) + { + // No active shards — re-discover + shardIterators = await DiscoverNewShards(streamArn, shardIterators, closedAtStartup, stoppingToken); + lastDiscoveryTime = DateTime.UtcNow; + if (shardIterators.Count == 0) + { + await Task.Delay(1000, stoppingToken); + } + continue; + } + + var results = await Task.WhenAll(tasks); + + var hasRecords = false; + var shardExhausted = false; + foreach (var (shardId, response) in results) + { + if (response == null) + continue; + + if (response.NextShardIterator == null) + { + _logger.LogInformation("Shard {shardId} exhausted (closed), records in final batch: {count}", + shardId, response.Records?.Count); + shardIterators.Remove(shardId); + shardExhausted = true; + } + else + { + shardIterators[shardId] = response.NextShardIterator; + } + + if (response.Records == null || response.Records.Count == 0) + continue; + + hasRecords = true; + _logger.LogInformation("Retrieved {recordCount} record(s) from shard {shardId}", response.Records.Count, shardId); + var lambdaRecords = ConvertToLambdaRecords(response.Records, streamArn); + + var lambdaPayload = new DynamoDBEvent { Records = lambdaRecords }; + var invokeRequest = new InvokeRequest + { + InvocationType = InvocationType.RequestResponse, + FunctionName = _config.FunctionName, + Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) + }; + + _logger.LogInformation("Invoking Lambda function {functionName} with {recordCount} DynamoDB stream records", + _config.FunctionName, lambdaRecords.Count); + + var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi); + + if (lambdaResponse.FunctionError != null) + { + _logger.LogError("Invoking Lambda {function} with {recordCount} DynamoDB stream records failed with error {errorMessage}", + _config.FunctionName, lambdaRecords.Count, lambdaResponse.FunctionError); + } + } + + // Re-discover if a shard was exhausted or 30 seconds have elapsed + var timeSinceDiscovery = (DateTime.UtcNow - lastDiscoveryTime).TotalSeconds; + if (shardExhausted || timeSinceDiscovery >= ShardRediscoveryIntervalSeconds) + { + _logger.LogInformation("Re-discovering shards (exhausted={shardExhausted}, elapsed={elapsed}s)", + shardExhausted, (int)timeSinceDiscovery); + shardIterators = await DiscoverNewShards(streamArn, shardIterators, closedAtStartup, stoppingToken); + lastDiscoveryTime = DateTime.UtcNow; + continue; + } + + if (!hasRecords) + { + await Task.Delay(_config.PollingIntervalMs, stoppingToken); + } + } + } + + private async Task<(string ShardId, GetRecordsResponse? Response)> PollShard(string shardId, string iterator, CancellationToken stoppingToken) + { + var response = await _streamsClient.GetRecordsAsync(new GetRecordsRequest + { + ShardIterator = iterator, + Limit = _config.BatchSize + }, stoppingToken); + + return (shardId, response); + } + + /// + /// Initial shard discovery at startup. Uses LATEST for open shards and records closed shard IDs. + /// + private async Task> DiscoverInitialShards(string streamArn, HashSet closedAtStartup, CancellationToken stoppingToken) + { + var shards = await GetAllShards(streamArn, stoppingToken); + var iterators = new Dictionary(); + + foreach (var shard in shards) + { + var isClosed = shard.SequenceNumberRange?.EndingSequenceNumber != null; + if (isClosed) + { + closedAtStartup.Add(shard.ShardId); + continue; + } + + // Open shard — use LATEST to only get records created after startup + var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest + { + StreamArn = streamArn, + ShardId = shard.ShardId, + ShardIteratorType = ShardIteratorType.LATEST + }, stoppingToken); + + _logger.LogInformation("Got LATEST iterator for startup shard {shardId}", shard.ShardId); + iterators[shard.ShardId] = iteratorResponse.ShardIterator; + } + _logger.LogInformation("Initial shard discovery complete: {openCount} open shard(s), {closedCount} closed shard(s) at startup", + iterators.Count, closedAtStartup.Count); + + return iterators; + } + + /// + /// Ongoing shard discovery. Preserves existing iterators, skips shards closed at startup, + /// and starts TRIM_HORIZON pollers for any new shards (even if closed). + /// + private async Task> DiscoverNewShards(string streamArn, Dictionary existingIterators, HashSet closedAtStartup, CancellationToken stoppingToken) + { + var shards = await GetAllShards(streamArn, stoppingToken); + var iterators = new Dictionary(existingIterators); + + foreach (var shard in shards) + { + // Already being polled — leave iterator alone + if (iterators.ContainsKey(shard.ShardId)) + continue; + + // Was closed at startup — skip + if (closedAtStartup.Contains(shard.ShardId)) + continue; + + // New shard discovered after startup — use TRIM_HORIZON to read all its records + var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest + { + StreamArn = streamArn, + ShardId = shard.ShardId, + ShardIteratorType = ShardIteratorType.TRIM_HORIZON + }, stoppingToken); + + _logger.LogInformation("Got TRIM_HORIZON iterator for new shard {shardId}", shard.ShardId); + iterators[shard.ShardId] = iteratorResponse.ShardIterator; + } + + return iterators; + } + + private async Task> GetAllShards(string streamArn, CancellationToken stoppingToken) + { + _logger.LogDebug("Discovering shards for stream {streamArn}", streamArn); + var shards = new List(); + string? lastEvaluatedShardId = null; + + do + { + var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest + { + StreamArn = streamArn, + ExclusiveStartShardId = lastEvaluatedShardId + }, stoppingToken); + + shards.AddRange(describeResponse.StreamDescription.Shards); + lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId; + } while (lastEvaluatedShardId != null); + + _logger.LogDebug("There were {shardCount} shard(s) returned from DescribeStream", shards.Count); + return shards; + } + + /// + /// Convert from the SDK's DynamoDB Streams records to the Lambda event's DynamoDB record type. + /// + internal static IList ConvertToLambdaRecords(List records, string streamArn) + { + return records.Select(r => ConvertToLambdaRecord(r, streamArn)).ToList(); + } + + /// + /// Convert a single SDK stream record to the Lambda event record type. + /// + internal static DynamoDBEvent.DynamodbStreamRecord ConvertToLambdaRecord(Record record, string streamArn) + { + var lambdaRecord = new DynamoDBEvent.DynamodbStreamRecord + { + EventID = record.EventID, + EventName = record.EventName?.Value, + EventSource = "aws:dynamodb", + EventSourceArn = streamArn, + EventVersion = record.EventVersion, + AwsRegion = Arn.Parse(streamArn).Region + }; + + if (record.Dynamodb != null) + { + lambdaRecord.Dynamodb = new DynamoDBEvent.StreamRecord + { + ApproximateCreationDateTime = record.Dynamodb.ApproximateCreationDateTime ?? DateTime.MinValue, + SequenceNumber = record.Dynamodb.SequenceNumber, + SizeBytes = record.Dynamodb.SizeBytes ?? 0, + StreamViewType = record.Dynamodb.StreamViewType?.Value + }; + + if (record.Dynamodb.Keys != null) + { + lambdaRecord.Dynamodb.Keys = ConvertAttributeMap(record.Dynamodb.Keys); + } + + if (record.Dynamodb.NewImage != null) + { + lambdaRecord.Dynamodb.NewImage = ConvertAttributeMap(record.Dynamodb.NewImage); + } + + if (record.Dynamodb.OldImage != null) + { + lambdaRecord.Dynamodb.OldImage = ConvertAttributeMap(record.Dynamodb.OldImage); + } + } + + if (record.UserIdentity != null) + { + lambdaRecord.UserIdentity = new DynamoDBEvent.Identity + { + PrincipalId = record.UserIdentity.PrincipalId, + Type = record.UserIdentity.Type + }; + } + + return lambdaRecord; + } + + /// + /// Convert SDK AttributeValue dictionary to Lambda event AttributeValue dictionary. + /// + internal static Dictionary ConvertAttributeMap(Dictionary sdkMap) + { + var result = new Dictionary(); + foreach (var kvp in sdkMap) + { + result[kvp.Key] = ConvertAttributeValue(kvp.Value); + } + return result; + } + + /// + /// Convert a single SDK AttributeValue to the Lambda event AttributeValue. + /// + internal static DynamoDBEvent.AttributeValue ConvertAttributeValue(AttributeValue sdkValue) + { + var lambdaValue = new DynamoDBEvent.AttributeValue(); + + if (sdkValue.S != null) + lambdaValue.S = sdkValue.S; + if (sdkValue.N != null) + lambdaValue.N = sdkValue.N; + if (sdkValue.B != null) + lambdaValue.B = sdkValue.B; + if (sdkValue.BOOL != null) + lambdaValue.BOOL = sdkValue.BOOL; + if (sdkValue.NULL != null) + lambdaValue.NULL = sdkValue.NULL; + if (sdkValue.SS != null) + lambdaValue.SS = sdkValue.SS; + if (sdkValue.NS != null) + lambdaValue.NS = sdkValue.NS; + if (sdkValue.BS != null) + lambdaValue.BS = sdkValue.BS; + if (sdkValue.L != null) + lambdaValue.L = sdkValue.L.Select(ConvertAttributeValue).ToList(); + if (sdkValue.M != null) + lambdaValue.M = ConvertAttributeMap(sdkValue.M); + + return lambdaValue; + } +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs new file mode 100644 index 000000000..07e4a4b58 --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs @@ -0,0 +1,35 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; + +/// +/// Configuration for the service. +/// +public class DynamoDBStreamsEventSourceBackgroundServiceConfig +{ + /// + /// The batch size to read from the stream and send to the Lambda function. + /// + public required int BatchSize { get; init; } = DynamoDBStreamsEventSourceProcess.DefaultBatchSize; + + /// + /// The Lambda function to send the DynamoDB stream records to. + /// + public required string FunctionName { get; init; } + + /// + /// The endpoint where the emulated Lambda runtime API is running. + /// + public required string LambdaRuntimeApi { get; init; } + + /// + /// The DynamoDB table name to read streams from. + /// + public required string TableName { get; init; } + + /// + /// The polling interval in milliseconds between stream reads when no records are found. + /// + public required int PollingIntervalMs { get; init; } = DynamoDBStreamsEventSourceProcess.DefaultPollingIntervalMs; +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs new file mode 100644 index 000000000..4a1524e52 --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs @@ -0,0 +1,48 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; + +/// +/// This class represents the input values from the user for DynamoDB Streams event source configuration. +/// +internal class DynamoDBStreamsEventSourceConfig +{ + /// + /// The batch size to read from the stream and send to the Lambda function. + /// + public int? BatchSize { get; set; } + + /// + /// The Lambda function to send the DynamoDB stream records to. + /// If not set the default function will be used. + /// + public string? FunctionName { get; set; } + + /// + /// The endpoint where the emulated Lambda runtime API is running. + /// If not set the current Test Tool instance will be used. + /// + public string? LambdaRuntimeApi { get; set; } + + /// + /// The AWS profile to use for credentials. + /// + public string? Profile { get; set; } + + /// + /// The AWS region the DynamoDB table is in. + /// + public string? Region { get; set; } + + /// + /// The DynamoDB table name to read streams from. + /// + public string? TableName { get; set; } + + /// + /// The polling interval in milliseconds between stream reads when no records are found. + /// Default is 1000. + /// + public int? PollingIntervalMs { get; set; } +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs new file mode 100644 index 000000000..cbf7bfd4e --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs @@ -0,0 +1,218 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Amazon.DynamoDBv2; +using Amazon.DynamoDBStreams; +using Amazon.Lambda.TestTool.Commands.Settings; +using Amazon.Lambda.TestTool.Services; +using System.Text.Json; + +namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; + +/// +/// Process for handling DynamoDB Streams event source for Lambda functions. +/// +public class DynamoDBStreamsEventSourceProcess +{ + internal const int DefaultBatchSize = 100; + internal const int DefaultPollingIntervalMs = 1000; + + /// + /// The Parent task for all the tasks started for each DynamoDB Streams event source. + /// + public required Task RunningTask { get; init; } + + /// + /// Startup DynamoDB Streams event sources + /// + public static DynamoDBStreamsEventSourceProcess Startup(RunCommandSettings settings, CancellationToken cancellationToken = default) + { + if (string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig)) + { + throw new InvalidOperationException($"The {nameof(RunCommandSettings.DynamoDBStreamsEventSourceConfig)} can not be null when starting the DynamoDB Streams event source process"); + } + + var configs = LoadDynamoDBStreamsEventSourceConfig(settings.DynamoDBStreamsEventSourceConfig); + + var tasks = new List(); + + foreach (var config in configs) + { + var builder = Host.CreateApplicationBuilder(); + + var ddbConfig = new AmazonDynamoDBStreamsConfig(); + if (!string.IsNullOrEmpty(config.Profile)) + { + ddbConfig.Profile = new Profile(config.Profile); + } + + if (!string.IsNullOrEmpty(config.Region)) + { + ddbConfig.RegionEndpoint = RegionEndpoint.GetBySystemName(config.Region); + } + + var streamsClient = new AmazonDynamoDBStreamsClient(ddbConfig); + builder.Services.AddSingleton(streamsClient); + + var ddbClientConfig = new AmazonDynamoDBConfig(); + if (!string.IsNullOrEmpty(config.Profile)) + { + ddbClientConfig.Profile = new Profile(config.Profile); + } + if (!string.IsNullOrEmpty(config.Region)) + { + ddbClientConfig.RegionEndpoint = RegionEndpoint.GetBySystemName(config.Region); + } + var ddbClient = new AmazonDynamoDBClient(ddbClientConfig); + builder.Services.AddSingleton(ddbClient); + + builder.Services.AddSingleton(); + + var tableName = config.TableName; + if (string.IsNullOrEmpty(tableName)) + { + throw new InvalidOperationException("TableName is a required property for DynamoDB Streams event source config"); + } + + var lambdaRuntimeApi = config.LambdaRuntimeApi; + if (string.IsNullOrEmpty(lambdaRuntimeApi)) + { + if (!settings.LambdaEmulatorPort.HasValue) + { + throw new InvalidOperationException("No Lambda runtime api endpoint was given as part of the DynamoDB Streams event source config and the current " + + "instance of the test tool is not running the Lambda runtime api. Either provide a Lambda runtime api endpoint or set a port for " + + "the lambda runtime api when starting the test tool."); + } + lambdaRuntimeApi = $"http://{settings.LambdaEmulatorHost}:{settings.LambdaEmulatorPort}/"; + } + + var backgroundServiceConfig = new DynamoDBStreamsEventSourceBackgroundServiceConfig + { + BatchSize = config.BatchSize ?? DefaultBatchSize, + FunctionName = config.FunctionName ?? LambdaRuntimeApi.DefaultFunctionName, + LambdaRuntimeApi = lambdaRuntimeApi, + TableName = tableName, + PollingIntervalMs = config.PollingIntervalMs ?? DefaultPollingIntervalMs + }; + + builder.Services.AddSingleton(backgroundServiceConfig); + builder.Services.AddHostedService(); + + var app = builder.Build(); + var task = app.RunAsync(cancellationToken); + tasks.Add(task); + } + + return new DynamoDBStreamsEventSourceProcess + { + RunningTask = Task.WhenAll(tasks) + }; + } + + /// + /// Load the DynamoDB Streams event source configs. Supports JSON or comma-delimited key-value pair format. + /// If the value points to a file that exists, the file contents will be read. + /// + internal static List LoadDynamoDBStreamsEventSourceConfig(string configString) + { + if (File.Exists(configString)) + { + configString = File.ReadAllText(configString); + } + + configString = configString.Trim(); + + List? configs = null; + + var jsonOptions = new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }; + + if (configString.StartsWith('[')) + { + try + { + configs = JsonSerializer.Deserialize>(configString, jsonOptions); + if (configs == null) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString); + } + } + catch (JsonException e) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString, e); + } + } + else if (configString.StartsWith('{')) + { + try + { + var config = JsonSerializer.Deserialize(configString, jsonOptions); + if (config == null) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString); + } + + configs = new List { config }; + } + catch (JsonException e) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString, e); + } + } + else + { + var config = new DynamoDBStreamsEventSourceConfig(); + var tokens = configString.Split(','); + foreach (var token in tokens) + { + if (string.IsNullOrWhiteSpace(token)) + continue; + + var keyValuePair = token.Split('='); + if (keyValuePair.Length != 2) + { + throw new InvalidOperationException("Failed to parse DynamoDB Streams event source config. Format should be \"TableName=,FunctionName=,...\""); + } + + switch (keyValuePair[0].ToLower().Trim()) + { + case "batchsize": + if (!int.TryParse(keyValuePair[1].Trim(), out var batchSize)) + { + throw new InvalidOperationException("Value for batch size is not a formatted integer"); + } + config.BatchSize = batchSize; + break; + case "functionname": + config.FunctionName = keyValuePair[1].Trim(); + break; + case "lambdaruntimeapi": + config.LambdaRuntimeApi = keyValuePair[1].Trim(); + break; + case "profile": + config.Profile = keyValuePair[1].Trim(); + break; + case "region": + config.Region = keyValuePair[1].Trim(); + break; + case "tablename": + config.TableName = keyValuePair[1].Trim(); + break; + case "pollingintervalms": + if (!int.TryParse(keyValuePair[1].Trim(), out var pollingInterval)) + { + throw new InvalidOperationException("Value for polling interval is not a formatted integer"); + } + config.PollingIntervalMs = pollingInterval; + break; + } + } + + configs = new List { config }; + } + + return configs; + } +} diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj index 524f55fcd..7a799ca1e 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj @@ -15,6 +15,9 @@ + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs new file mode 100644 index 000000000..d47483ac2 --- /dev/null +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs @@ -0,0 +1,256 @@ +using Amazon.DynamoDBStreams; +using Amazon.DynamoDBStreams.Model; +using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; +using Xunit; +using Record = Amazon.DynamoDBStreams.Model.Record; + +namespace Amazon.Lambda.TestTool.UnitTests.DynamoDBStreamsEventSource; + +public class ConvertDynamoDBStreamsRecordTests +{ + private const string TestStreamArn = "arn:aws:dynamodb:us-west-2:123456789012:table/my-table/stream/2024-01-01T00:00:00.000"; + + [Fact] + public void ConvertBasicRecord() + { + var record = new Record + { + EventID = "event-123", + EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"), + EventVersion = "1.1", + EventSource = "aws:dynamodb", + Dynamodb = new StreamRecord + { + ApproximateCreationDateTime = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc), + SequenceNumber = "111111111111111111111", + SizeBytes = 256, + StreamViewType = new StreamViewType("NEW_AND_OLD_IMAGES"), + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + }, + NewImage = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" }, + ["Name"] = new AttributeValue { S = "Test Item" } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + Assert.Equal("event-123", result.EventID); + Assert.Equal("INSERT", result.EventName); + Assert.Equal("aws:dynamodb", result.EventSource); + Assert.Equal(TestStreamArn, result.EventSourceArn); + Assert.Equal("1.1", result.EventVersion); + Assert.Equal("us-west-2", result.AwsRegion); + + Assert.NotNull(result.Dynamodb); + Assert.Equal("111111111111111111111", result.Dynamodb.SequenceNumber); + Assert.Equal(256, result.Dynamodb.SizeBytes); + Assert.Equal("NEW_AND_OLD_IMAGES", result.Dynamodb.StreamViewType); + + Assert.Single(result.Dynamodb.Keys); + Assert.Equal("key-1", result.Dynamodb.Keys["Id"].S); + + Assert.Equal(2, result.Dynamodb.NewImage.Count); + Assert.Equal("key-1", result.Dynamodb.NewImage["Id"].S); + Assert.Equal("Test Item", result.Dynamodb.NewImage["Name"].S); + } + + [Fact] + public void ConvertRecordWithAllAttributeTypes() + { + var record = new Record + { + EventID = "event-456", + EventName = new Amazon.DynamoDBStreams.OperationType("MODIFY"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + }, + NewImage = new Dictionary + { + ["StringAttr"] = new AttributeValue { S = "hello" }, + ["NumberAttr"] = new AttributeValue { N = "42" }, + ["BoolAttr"] = new AttributeValue { BOOL = true }, + ["NullAttr"] = new AttributeValue { NULL = true }, + ["ListAttr"] = new AttributeValue + { + L = new List + { + new AttributeValue { S = "item1" }, + new AttributeValue { N = "2" } + } + }, + ["MapAttr"] = new AttributeValue + { + M = new Dictionary + { + ["nested"] = new AttributeValue { S = "value" } + } + }, + ["StringSetAttr"] = new AttributeValue { SS = new List { "a", "b" } }, + ["NumberSetAttr"] = new AttributeValue { NS = new List { "1", "2" } } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + var newImage = result.Dynamodb.NewImage; + Assert.Equal("hello", newImage["StringAttr"].S); + Assert.Equal("42", newImage["NumberAttr"].N); + Assert.True(newImage["BoolAttr"].BOOL); + Assert.True(newImage["NullAttr"].NULL); + Assert.Equal(2, newImage["ListAttr"].L.Count); + Assert.Equal("item1", newImage["ListAttr"].L[0].S); + Assert.Equal("value", newImage["MapAttr"].M["nested"].S); + Assert.Equal(new List { "a", "b" }, newImage["StringSetAttr"].SS); + Assert.Equal(new List { "1", "2" }, newImage["NumberSetAttr"].NS); + } + + [Fact] + public void ConvertRecordWithUserIdentity() + { + var record = new Record + { + EventID = "event-789", + EventName = new Amazon.DynamoDBStreams.OperationType("REMOVE"), + EventVersion = "1.1", + UserIdentity = new Identity + { + PrincipalId = "dynamodb.amazonaws.com", + Type = "Service" + }, + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "expired-item" } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + Assert.NotNull(result.UserIdentity); + Assert.Equal("dynamodb.amazonaws.com", result.UserIdentity.PrincipalId); + Assert.Equal("Service", result.UserIdentity.Type); + } + + [Fact] + public void ConvertRecordWithBinaryAttributes() + { + var binaryData = new byte[] { 0x01, 0x02, 0x03, 0xFF }; + var binarySet = new List + { + new MemoryStream(new byte[] { 0xAA, 0xBB }), + new MemoryStream(new byte[] { 0xCC, 0xDD }) + }; + + var record = new Record + { + EventID = "event-bin", + EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + }, + NewImage = new Dictionary + { + ["BinaryAttr"] = new AttributeValue { B = new MemoryStream(binaryData) }, + ["BinarySetAttr"] = new AttributeValue { BS = binarySet } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + var newImage = result.Dynamodb.NewImage; + Assert.NotNull(newImage["BinaryAttr"].B); + Assert.Equal(binaryData, newImage["BinaryAttr"].B.ToArray()); + Assert.NotNull(newImage["BinarySetAttr"].BS); + Assert.Equal(2, newImage["BinarySetAttr"].BS.Count); + Assert.Equal(new byte[] { 0xAA, 0xBB }, newImage["BinarySetAttr"].BS[0].ToArray()); + Assert.Equal(new byte[] { 0xCC, 0xDD }, newImage["BinarySetAttr"].BS[1].ToArray()); + } + + [Fact] + public void ConvertRecordWithEmptyListAndMap() + { + var record = new Record + { + EventID = "event-empty", + EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + }, + NewImage = new Dictionary + { + ["EmptyList"] = new AttributeValue { L = new List() }, + ["EmptyMap"] = new AttributeValue { M = new Dictionary() } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn); + + var newImage = result.Dynamodb.NewImage; + Assert.NotNull(newImage["EmptyList"].L); + Assert.Empty(newImage["EmptyList"].L); + Assert.NotNull(newImage["EmptyMap"].M); + Assert.Empty(newImage["EmptyMap"].M); + } + + [Fact] + public void ConvertMultipleRecords() + { + var records = new List + { + new Record + { + EventID = "event-1", + EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-1" } + } + } + }, + new Record + { + EventID = "event-2", + EventName = new Amazon.DynamoDBStreams.OperationType("MODIFY"), + EventVersion = "1.1", + Dynamodb = new StreamRecord + { + Keys = new Dictionary + { + ["Id"] = new AttributeValue { S = "key-2" } + } + } + } + }; + + var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecords(records, TestStreamArn); + + Assert.Equal(2, result.Count); + Assert.Equal("event-1", result[0].EventID); + Assert.Equal("event-2", result[1].EventID); + } +} diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs new file mode 100644 index 000000000..836d36c22 --- /dev/null +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs @@ -0,0 +1,107 @@ +using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; +using Xunit; + +namespace Amazon.Lambda.TestTool.UnitTests.DynamoDBStreamsEventSource; + +public class ParseDynamoDBStreamsEventSourceConfigTests +{ + [Fact] + public void ParseValidJsonObject() + { + string json = """ +{ + "TableName" : "my-table", + "FunctionName" : "LambdaFunction", + "BatchSize" : 50, + "LambdaRuntimeApi" : "http://localhost:7777/", + "Profile" : "beta", + "Region" : "us-east-23" +} +"""; + + var configs = DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json); + Assert.Single(configs); + Assert.Equal("my-table", configs[0].TableName); + Assert.Equal("LambdaFunction", configs[0].FunctionName); + Assert.Equal(50, configs[0].BatchSize); + Assert.Equal("http://localhost:7777/", configs[0].LambdaRuntimeApi); + Assert.Equal("beta", configs[0].Profile); + Assert.Equal("us-east-23", configs[0].Region); + } + + [Fact] + public void ParseInvalidJsonObject() + { + string json = """ +{ + "aaa" +} +"""; + + Assert.Throws(() => DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json)); + } + + [Fact] + public void ParseValidJsonArray() + { + string json = """ +[ + { + "TableName" : "table-1", + "FunctionName" : "Function1", + "BatchSize" : 25 + }, + { + "TableName" : "table-2", + "FunctionName" : "Function2", + "BatchSize" : 75 + } +] +"""; + + var configs = DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json); + Assert.Equal(2, configs.Count); + Assert.Equal("table-1", configs[0].TableName); + Assert.Equal("Function1", configs[0].FunctionName); + Assert.Equal(25, configs[0].BatchSize); + Assert.Equal("table-2", configs[1].TableName); + Assert.Equal("Function2", configs[1].FunctionName); + Assert.Equal(75, configs[1].BatchSize); + } + + [Fact] + public void ParseInvalidJsonArray() + { + string json = """ +[ + {"aaa"} +] +"""; + + Assert.Throws(() => DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json)); + } + + [Fact] + public void ParseKeyPairs() + { + var configs = DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig( + "TableName=my-table ,functionName =LambdaFunction, batchSize=50," + + "LambdaRuntimeApi=http://localhost:7777/ ,Profile=beta,Region=us-east-23"); + + Assert.Single(configs); + Assert.Equal("my-table", configs[0].TableName); + Assert.Equal("LambdaFunction", configs[0].FunctionName); + Assert.Equal(50, configs[0].BatchSize); + Assert.Equal("http://localhost:7777/", configs[0].LambdaRuntimeApi); + Assert.Equal("beta", configs[0].Profile); + Assert.Equal("us-east-23", configs[0].Region); + } + + [Theory] + [InlineData("novalue")] + [InlineData("BatchSize=noint")] + public void InvalidKeyPairString(string keyPairConfig) + { + Assert.Throws(() => DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(keyPairConfig)); + } +}