diff --git a/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json b/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json
new file mode 100644
index 000000000..3dd6f20c7
--- /dev/null
+++ b/.autover/changes/9b9bb131-676c-4fc3-85a0-78b86d128d58.json
@@ -0,0 +1,11 @@
+{
+ "Projects": [
+ {
+ "Name": "Amazon.Lambda.TestTool",
+ "Type": "Minor",
+ "ChangelogMessages": [
+ "Add support emulating Lambda DynamoDB Stream event source"
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj
index b8e97f6fa..116a13598 100644
--- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj
+++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj
@@ -26,6 +26,9 @@
+
+
+
diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs
index f65d7bad1..d4397b301 100644
--- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs
+++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs
@@ -8,6 +8,7 @@
using Amazon.Lambda.TestTool.Models;
using Amazon.Lambda.TestTool.Processes;
using Amazon.Lambda.TestTool.Processes.SQSEventSource;
+using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
using Amazon.Lambda.TestTool.Services;
using Amazon.Lambda.TestTool.Services.IO;
using Spectre.Console.Cli;
@@ -39,10 +40,10 @@ public override async Task ExecuteAsync(CommandContext context, RunCommandS
{
EvaluateEnvironmentVariables(settings);
- if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig))
+ if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig) && string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig))
{
throw new ArgumentException("At least one of the following parameters must be set: " +
- "--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port or --sqs-eventsource-config");
+ "--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port, --sqs-eventsource-config or --dynamodbstreams-eventsource-config");
}
var tasks = new List();
@@ -87,6 +88,12 @@ public override async Task ExecuteAsync(CommandContext context, RunCommandS
{
var sqsEventSourceProcess = SQSEventSourceProcess.Startup(settings, cancellationTokenSource.Token);
tasks.Add(sqsEventSourceProcess.RunningTask);
+ }
+
+ if (!string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig))
+ {
+ var dynamoDBStreamsProcess = DynamoDBStreamsEventSourceProcess.Startup(settings, cancellationTokenSource.Token);
+ tasks.Add(dynamoDBStreamsProcess.RunningTask);
}
await Task.Run(() => Task.WaitAny(tasks.ToArray(), cancellationTokenSource.Token));
@@ -184,6 +191,16 @@ private void EvaluateEnvironmentVariables(RunCommandSettings settings)
throw new InvalidOperationException($"Environment variable {envVariable} for the SQS event source config was empty");
}
settings.SQSEventSourceConfig = environmentVariables[envVariable]?.ToString();
+ }
+
+ if (settings.DynamoDBStreamsEventSourceConfig != null && settings.DynamoDBStreamsEventSourceConfig.StartsWith(Constants.ArgumentEnvironmentVariablePrefix, StringComparison.CurrentCultureIgnoreCase))
+ {
+ var envVariable = settings.DynamoDBStreamsEventSourceConfig.Substring(Constants.ArgumentEnvironmentVariablePrefix.Length);
+ if (!environmentVariables.Contains(envVariable))
+ {
+ throw new InvalidOperationException($"Environment variable {envVariable} for the DynamoDB Streams event source config was empty");
+ }
+ settings.DynamoDBStreamsEventSourceConfig = environmentVariables[envVariable]?.ToString();
}
}
}
diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs
index 0584eaa30..102490b76 100644
--- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs
+++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs
@@ -82,6 +82,14 @@ public sealed class RunCommandSettings : CommandSettings
[Description("The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=,FunctionName=,VisibilityTimeout=100\". Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout")]
public string? SQSEventSourceConfig { get; set; }
+
+ ///
+ /// The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example "TableName=my-table,FunctionName=function-name,BatchSize=100".
+ /// Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName
+ ///
+ [CommandOption("--dynamodbstreams-eventsource-config ")]
+ [Description("The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example \"TableName=,FunctionName=,BatchSize=100\". Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName")]
+ public string? DynamoDBStreamsEventSourceConfig { get; set; }
///
/// The absolute path used to save global settings and saved requests. You will need to specify a path in order to enable saving global settings and requests.
///
diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs
new file mode 100644
index 000000000..4819bd57a
--- /dev/null
+++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs
@@ -0,0 +1,423 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+using Amazon.DynamoDBv2;
+using Amazon.DynamoDBStreams;
+using Amazon.DynamoDBStreams.Model;
+using Amazon.Lambda.DynamoDBEvents;
+using Amazon.Lambda.Model;
+using Amazon.Lambda.TestTool.Services;
+using Amazon.Runtime;
+using System.Text.Json;
+
+namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
+
+///
+/// IHostedService that will run continually polling a DynamoDB Stream for records and invoking the connected
+/// Lambda function with the polled records.
+///
+public class DynamoDBStreamsEventSourceBackgroundService : BackgroundService
+{
+ private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions
+ {
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase
+ };
+
+ private readonly ILogger _logger;
+ private readonly IAmazonDynamoDB _ddbClient;
+ private readonly IAmazonDynamoDBStreams _streamsClient;
+ private readonly ILambdaClient _lambdaClient;
+ private readonly DynamoDBStreamsEventSourceBackgroundServiceConfig _config;
+
+ ///
+ /// Constructs instance of .
+ ///
+ public DynamoDBStreamsEventSourceBackgroundService(
+ ILogger logger,
+ IAmazonDynamoDB ddbClient,
+ IAmazonDynamoDBStreams streamsClient,
+ DynamoDBStreamsEventSourceBackgroundServiceConfig config,
+ ILambdaClient lambdaClient)
+ {
+ _logger = logger;
+ _ddbClient = ddbClient;
+ _streamsClient = streamsClient;
+ _config = config;
+ _lambdaClient = lambdaClient;
+ }
+
+ ///
+ /// Execute the DynamoDBStreamsEventSourceBackgroundService.
+ ///
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ _logger.LogInformation("Starting DynamoDB Streams poller for table: {tableName}", _config.TableName);
+
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ try
+ {
+ var streamArn = await GetStreamArnForTable(stoppingToken);
+ if (streamArn == null)
+ {
+ _logger.LogWarning("No stream found for table {tableName}. Retrying in 5 seconds.", _config.TableName);
+ await Task.Delay(5000, stoppingToken);
+ continue;
+ }
+
+ await PollStream(streamArn, stoppingToken);
+ }
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
+ {
+ return;
+ }
+ catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested)
+ {
+ return;
+ }
+ catch (Exception e)
+ {
+ _logger.LogWarning(e, "Exception occurred in DynamoDB Streams poller for {tableName}: {message}", _config.TableName, e.Message);
+ await Task.Delay(3000, stoppingToken);
+ }
+ }
+ }
+
+ private async Task GetStreamArnForTable(CancellationToken stoppingToken)
+ {
+ // If the configured value is already a stream ARN, use it directly
+ if (_config.TableName.StartsWith("arn:") && _config.TableName.Contains("/stream/"))
+ {
+ _logger.LogInformation("Using provided stream ARN directly: {streamArn}", _config.TableName);
+ return _config.TableName;
+ }
+
+ _logger.LogInformation("Looking up latest stream ARN for table {tableName}", _config.TableName);
+ var response = await _ddbClient.DescribeTableAsync(_config.TableName, stoppingToken);
+ _logger.LogInformation("Resolved stream ARN: {streamArn}", response.Table.LatestStreamArn);
+ return response.Table.LatestStreamArn;
+ }
+
+ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
+ {
+ // Shard polling strategy:
+ //
+ // Goal: Only deliver records to Lambda that were written AFTER the test tool started.
+ //
+ // 1. At startup, discover all shards. Open shards get a LATEST iterator (future records only).
+ // Closed shards are recorded in a "closed at startup" set and never polled — they contain
+ // only historical data from before the tool started.
+ //
+ // 2. Every 30 seconds (or immediately when a shard is exhausted), re-discover shards:
+ // - Shards already being polled: leave their iterator alone (preserves position).
+ // - Shards in the "closed at startup" set: skip (pre-existing historical data).
+ // - Any other shard (new since startup): poll with TRIM_HORIZON to read all its records,
+ // since the shard was created after the tool started and all its data is relevant.
+
+ var closedAtStartup = new HashSet();
+ var shardIterators = await DiscoverInitialShards(streamArn, closedAtStartup, stoppingToken);
+
+ _logger.LogInformation("Initial discovery: {openCount} open shard(s), {closedCount} closed shard(s) at startup",
+ shardIterators.Count, closedAtStartup.Count);
+
+ var lastDiscoveryTime = DateTime.UtcNow;
+ const int ShardRediscoveryIntervalSeconds = 30;
+
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ // Poll all active shards concurrently
+ var tasks = new List>();
+ foreach (var (shardId, iterator) in shardIterators)
+ {
+ if (iterator == null)
+ continue;
+ tasks.Add(PollShard(shardId, iterator, stoppingToken));
+ }
+
+ var activeCount = tasks.Count;
+ _logger.LogDebug("Polling {activeShardCount} active shard(s)", activeCount);
+
+ if (activeCount == 0)
+ {
+ // No active shards — re-discover
+ shardIterators = await DiscoverNewShards(streamArn, shardIterators, closedAtStartup, stoppingToken);
+ lastDiscoveryTime = DateTime.UtcNow;
+ if (shardIterators.Count == 0)
+ {
+ await Task.Delay(1000, stoppingToken);
+ }
+ continue;
+ }
+
+ var results = await Task.WhenAll(tasks);
+
+ var hasRecords = false;
+ var shardExhausted = false;
+ foreach (var (shardId, response) in results)
+ {
+ if (response == null)
+ continue;
+
+ if (response.NextShardIterator == null)
+ {
+ _logger.LogInformation("Shard {shardId} exhausted (closed), records in final batch: {count}",
+ shardId, response.Records?.Count);
+ shardIterators.Remove(shardId);
+ shardExhausted = true;
+ }
+ else
+ {
+ shardIterators[shardId] = response.NextShardIterator;
+ }
+
+ if (response.Records == null || response.Records.Count == 0)
+ continue;
+
+ hasRecords = true;
+ _logger.LogInformation("Retrieved {recordCount} record(s) from shard {shardId}", response.Records.Count, shardId);
+ var lambdaRecords = ConvertToLambdaRecords(response.Records, streamArn);
+
+ var lambdaPayload = new DynamoDBEvent { Records = lambdaRecords };
+ var invokeRequest = new InvokeRequest
+ {
+ InvocationType = InvocationType.RequestResponse,
+ FunctionName = _config.FunctionName,
+ Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions)
+ };
+
+ _logger.LogInformation("Invoking Lambda function {functionName} with {recordCount} DynamoDB stream records",
+ _config.FunctionName, lambdaRecords.Count);
+
+ var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi);
+
+ if (lambdaResponse.FunctionError != null)
+ {
+ _logger.LogError("Invoking Lambda {function} with {recordCount} DynamoDB stream records failed with error {errorMessage}",
+ _config.FunctionName, lambdaRecords.Count, lambdaResponse.FunctionError);
+ }
+ }
+
+ // Re-discover if a shard was exhausted or 30 seconds have elapsed
+ var timeSinceDiscovery = (DateTime.UtcNow - lastDiscoveryTime).TotalSeconds;
+ if (shardExhausted || timeSinceDiscovery >= ShardRediscoveryIntervalSeconds)
+ {
+ _logger.LogInformation("Re-discovering shards (exhausted={shardExhausted}, elapsed={elapsed}s)",
+ shardExhausted, (int)timeSinceDiscovery);
+ shardIterators = await DiscoverNewShards(streamArn, shardIterators, closedAtStartup, stoppingToken);
+ lastDiscoveryTime = DateTime.UtcNow;
+ continue;
+ }
+
+ if (!hasRecords)
+ {
+ await Task.Delay(_config.PollingIntervalMs, stoppingToken);
+ }
+ }
+ }
+
+ private async Task<(string ShardId, GetRecordsResponse? Response)> PollShard(string shardId, string iterator, CancellationToken stoppingToken)
+ {
+ var response = await _streamsClient.GetRecordsAsync(new GetRecordsRequest
+ {
+ ShardIterator = iterator,
+ Limit = _config.BatchSize
+ }, stoppingToken);
+
+ return (shardId, response);
+ }
+
+ ///
+ /// Initial shard discovery at startup. Uses LATEST for open shards and records closed shard IDs.
+ ///
+ private async Task> DiscoverInitialShards(string streamArn, HashSet closedAtStartup, CancellationToken stoppingToken)
+ {
+ var shards = await GetAllShards(streamArn, stoppingToken);
+ var iterators = new Dictionary();
+
+ foreach (var shard in shards)
+ {
+ var isClosed = shard.SequenceNumberRange?.EndingSequenceNumber != null;
+ if (isClosed)
+ {
+ closedAtStartup.Add(shard.ShardId);
+ continue;
+ }
+
+ // Open shard — use LATEST to only get records created after startup
+ var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest
+ {
+ StreamArn = streamArn,
+ ShardId = shard.ShardId,
+ ShardIteratorType = ShardIteratorType.LATEST
+ }, stoppingToken);
+
+ _logger.LogInformation("Got LATEST iterator for startup shard {shardId}", shard.ShardId);
+ iterators[shard.ShardId] = iteratorResponse.ShardIterator;
+ }
+ _logger.LogInformation("Initial shard discovery complete: {openCount} open shard(s), {closedCount} closed shard(s) at startup",
+ iterators.Count, closedAtStartup.Count);
+
+ return iterators;
+ }
+
+ ///
+ /// Ongoing shard discovery. Preserves existing iterators, skips shards closed at startup,
+ /// and starts TRIM_HORIZON pollers for any new shards (even if closed).
+ ///
+ private async Task> DiscoverNewShards(string streamArn, Dictionary existingIterators, HashSet closedAtStartup, CancellationToken stoppingToken)
+ {
+ var shards = await GetAllShards(streamArn, stoppingToken);
+ var iterators = new Dictionary(existingIterators);
+
+ foreach (var shard in shards)
+ {
+ // Already being polled — leave iterator alone
+ if (iterators.ContainsKey(shard.ShardId))
+ continue;
+
+ // Was closed at startup — skip
+ if (closedAtStartup.Contains(shard.ShardId))
+ continue;
+
+ // New shard discovered after startup — use TRIM_HORIZON to read all its records
+ var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest
+ {
+ StreamArn = streamArn,
+ ShardId = shard.ShardId,
+ ShardIteratorType = ShardIteratorType.TRIM_HORIZON
+ }, stoppingToken);
+
+ _logger.LogInformation("Got TRIM_HORIZON iterator for new shard {shardId}", shard.ShardId);
+ iterators[shard.ShardId] = iteratorResponse.ShardIterator;
+ }
+
+ return iterators;
+ }
+
+ private async Task> GetAllShards(string streamArn, CancellationToken stoppingToken)
+ {
+ _logger.LogDebug("Discovering shards for stream {streamArn}", streamArn);
+ var shards = new List();
+ string? lastEvaluatedShardId = null;
+
+ do
+ {
+ var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest
+ {
+ StreamArn = streamArn,
+ ExclusiveStartShardId = lastEvaluatedShardId
+ }, stoppingToken);
+
+ shards.AddRange(describeResponse.StreamDescription.Shards);
+ lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId;
+ } while (lastEvaluatedShardId != null);
+
+ _logger.LogDebug("There were {shardCount} shard(s) returned from DescribeStream", shards.Count);
+ return shards;
+ }
+
+ ///
+ /// Convert from the SDK's DynamoDB Streams records to the Lambda event's DynamoDB record type.
+ ///
+ internal static IList ConvertToLambdaRecords(List records, string streamArn)
+ {
+ return records.Select(r => ConvertToLambdaRecord(r, streamArn)).ToList();
+ }
+
+ ///
+ /// Convert a single SDK stream record to the Lambda event record type.
+ ///
+ internal static DynamoDBEvent.DynamodbStreamRecord ConvertToLambdaRecord(Record record, string streamArn)
+ {
+ var lambdaRecord = new DynamoDBEvent.DynamodbStreamRecord
+ {
+ EventID = record.EventID,
+ EventName = record.EventName?.Value,
+ EventSource = "aws:dynamodb",
+ EventSourceArn = streamArn,
+ EventVersion = record.EventVersion,
+ AwsRegion = Arn.Parse(streamArn).Region
+ };
+
+ if (record.Dynamodb != null)
+ {
+ lambdaRecord.Dynamodb = new DynamoDBEvent.StreamRecord
+ {
+ ApproximateCreationDateTime = record.Dynamodb.ApproximateCreationDateTime ?? DateTime.MinValue,
+ SequenceNumber = record.Dynamodb.SequenceNumber,
+ SizeBytes = record.Dynamodb.SizeBytes ?? 0,
+ StreamViewType = record.Dynamodb.StreamViewType?.Value
+ };
+
+ if (record.Dynamodb.Keys != null)
+ {
+ lambdaRecord.Dynamodb.Keys = ConvertAttributeMap(record.Dynamodb.Keys);
+ }
+
+ if (record.Dynamodb.NewImage != null)
+ {
+ lambdaRecord.Dynamodb.NewImage = ConvertAttributeMap(record.Dynamodb.NewImage);
+ }
+
+ if (record.Dynamodb.OldImage != null)
+ {
+ lambdaRecord.Dynamodb.OldImage = ConvertAttributeMap(record.Dynamodb.OldImage);
+ }
+ }
+
+ if (record.UserIdentity != null)
+ {
+ lambdaRecord.UserIdentity = new DynamoDBEvent.Identity
+ {
+ PrincipalId = record.UserIdentity.PrincipalId,
+ Type = record.UserIdentity.Type
+ };
+ }
+
+ return lambdaRecord;
+ }
+
+ ///
+ /// Convert SDK AttributeValue dictionary to Lambda event AttributeValue dictionary.
+ ///
+ internal static Dictionary ConvertAttributeMap(Dictionary sdkMap)
+ {
+ var result = new Dictionary();
+ foreach (var kvp in sdkMap)
+ {
+ result[kvp.Key] = ConvertAttributeValue(kvp.Value);
+ }
+ return result;
+ }
+
+ ///
+ /// Convert a single SDK AttributeValue to the Lambda event AttributeValue.
+ ///
+ internal static DynamoDBEvent.AttributeValue ConvertAttributeValue(AttributeValue sdkValue)
+ {
+ var lambdaValue = new DynamoDBEvent.AttributeValue();
+
+ if (sdkValue.S != null)
+ lambdaValue.S = sdkValue.S;
+ if (sdkValue.N != null)
+ lambdaValue.N = sdkValue.N;
+ if (sdkValue.B != null)
+ lambdaValue.B = sdkValue.B;
+ if (sdkValue.BOOL != null)
+ lambdaValue.BOOL = sdkValue.BOOL;
+ if (sdkValue.NULL != null)
+ lambdaValue.NULL = sdkValue.NULL;
+ if (sdkValue.SS != null)
+ lambdaValue.SS = sdkValue.SS;
+ if (sdkValue.NS != null)
+ lambdaValue.NS = sdkValue.NS;
+ if (sdkValue.BS != null)
+ lambdaValue.BS = sdkValue.BS;
+ if (sdkValue.L != null)
+ lambdaValue.L = sdkValue.L.Select(ConvertAttributeValue).ToList();
+ if (sdkValue.M != null)
+ lambdaValue.M = ConvertAttributeMap(sdkValue.M);
+
+ return lambdaValue;
+ }
+}
diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs
new file mode 100644
index 000000000..07e4a4b58
--- /dev/null
+++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs
@@ -0,0 +1,35 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
+
+///
+/// Configuration for the service.
+///
+public class DynamoDBStreamsEventSourceBackgroundServiceConfig
+{
+ ///
+ /// The batch size to read from the stream and send to the Lambda function.
+ ///
+ public required int BatchSize { get; init; } = DynamoDBStreamsEventSourceProcess.DefaultBatchSize;
+
+ ///
+ /// The Lambda function to send the DynamoDB stream records to.
+ ///
+ public required string FunctionName { get; init; }
+
+ ///
+ /// The endpoint where the emulated Lambda runtime API is running.
+ ///
+ public required string LambdaRuntimeApi { get; init; }
+
+ ///
+ /// The DynamoDB table name to read streams from.
+ ///
+ public required string TableName { get; init; }
+
+ ///
+ /// The polling interval in milliseconds between stream reads when no records are found.
+ ///
+ public required int PollingIntervalMs { get; init; } = DynamoDBStreamsEventSourceProcess.DefaultPollingIntervalMs;
+}
diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs
new file mode 100644
index 000000000..4a1524e52
--- /dev/null
+++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs
@@ -0,0 +1,48 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
+
+///
+/// This class represents the input values from the user for DynamoDB Streams event source configuration.
+///
+internal class DynamoDBStreamsEventSourceConfig
+{
+ ///
+ /// The batch size to read from the stream and send to the Lambda function.
+ ///
+ public int? BatchSize { get; set; }
+
+ ///
+ /// The Lambda function to send the DynamoDB stream records to.
+ /// If not set the default function will be used.
+ ///
+ public string? FunctionName { get; set; }
+
+ ///
+ /// The endpoint where the emulated Lambda runtime API is running.
+ /// If not set the current Test Tool instance will be used.
+ ///
+ public string? LambdaRuntimeApi { get; set; }
+
+ ///
+ /// The AWS profile to use for credentials.
+ ///
+ public string? Profile { get; set; }
+
+ ///
+ /// The AWS region the DynamoDB table is in.
+ ///
+ public string? Region { get; set; }
+
+ ///
+ /// The DynamoDB table name to read streams from.
+ ///
+ public string? TableName { get; set; }
+
+ ///
+ /// The polling interval in milliseconds between stream reads when no records are found.
+ /// Default is 1000.
+ ///
+ public int? PollingIntervalMs { get; set; }
+}
diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs
new file mode 100644
index 000000000..cbf7bfd4e
--- /dev/null
+++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs
@@ -0,0 +1,218 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+using Amazon.DynamoDBv2;
+using Amazon.DynamoDBStreams;
+using Amazon.Lambda.TestTool.Commands.Settings;
+using Amazon.Lambda.TestTool.Services;
+using System.Text.Json;
+
+namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
+
+///
+/// Process for handling DynamoDB Streams event source for Lambda functions.
+///
+public class DynamoDBStreamsEventSourceProcess
+{
+ internal const int DefaultBatchSize = 100;
+ internal const int DefaultPollingIntervalMs = 1000;
+
+ ///
+ /// The Parent task for all the tasks started for each DynamoDB Streams event source.
+ ///
+ public required Task RunningTask { get; init; }
+
+ ///
+ /// Startup DynamoDB Streams event sources
+ ///
+ public static DynamoDBStreamsEventSourceProcess Startup(RunCommandSettings settings, CancellationToken cancellationToken = default)
+ {
+ if (string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig))
+ {
+ throw new InvalidOperationException($"The {nameof(RunCommandSettings.DynamoDBStreamsEventSourceConfig)} can not be null when starting the DynamoDB Streams event source process");
+ }
+
+ var configs = LoadDynamoDBStreamsEventSourceConfig(settings.DynamoDBStreamsEventSourceConfig);
+
+ var tasks = new List();
+
+ foreach (var config in configs)
+ {
+ var builder = Host.CreateApplicationBuilder();
+
+ var ddbConfig = new AmazonDynamoDBStreamsConfig();
+ if (!string.IsNullOrEmpty(config.Profile))
+ {
+ ddbConfig.Profile = new Profile(config.Profile);
+ }
+
+ if (!string.IsNullOrEmpty(config.Region))
+ {
+ ddbConfig.RegionEndpoint = RegionEndpoint.GetBySystemName(config.Region);
+ }
+
+ var streamsClient = new AmazonDynamoDBStreamsClient(ddbConfig);
+ builder.Services.AddSingleton(streamsClient);
+
+ var ddbClientConfig = new AmazonDynamoDBConfig();
+ if (!string.IsNullOrEmpty(config.Profile))
+ {
+ ddbClientConfig.Profile = new Profile(config.Profile);
+ }
+ if (!string.IsNullOrEmpty(config.Region))
+ {
+ ddbClientConfig.RegionEndpoint = RegionEndpoint.GetBySystemName(config.Region);
+ }
+ var ddbClient = new AmazonDynamoDBClient(ddbClientConfig);
+ builder.Services.AddSingleton(ddbClient);
+
+ builder.Services.AddSingleton();
+
+ var tableName = config.TableName;
+ if (string.IsNullOrEmpty(tableName))
+ {
+ throw new InvalidOperationException("TableName is a required property for DynamoDB Streams event source config");
+ }
+
+ var lambdaRuntimeApi = config.LambdaRuntimeApi;
+ if (string.IsNullOrEmpty(lambdaRuntimeApi))
+ {
+ if (!settings.LambdaEmulatorPort.HasValue)
+ {
+ throw new InvalidOperationException("No Lambda runtime api endpoint was given as part of the DynamoDB Streams event source config and the current " +
+ "instance of the test tool is not running the Lambda runtime api. Either provide a Lambda runtime api endpoint or set a port for " +
+ "the lambda runtime api when starting the test tool.");
+ }
+ lambdaRuntimeApi = $"http://{settings.LambdaEmulatorHost}:{settings.LambdaEmulatorPort}/";
+ }
+
+ var backgroundServiceConfig = new DynamoDBStreamsEventSourceBackgroundServiceConfig
+ {
+ BatchSize = config.BatchSize ?? DefaultBatchSize,
+ FunctionName = config.FunctionName ?? LambdaRuntimeApi.DefaultFunctionName,
+ LambdaRuntimeApi = lambdaRuntimeApi,
+ TableName = tableName,
+ PollingIntervalMs = config.PollingIntervalMs ?? DefaultPollingIntervalMs
+ };
+
+ builder.Services.AddSingleton(backgroundServiceConfig);
+ builder.Services.AddHostedService();
+
+ var app = builder.Build();
+ var task = app.RunAsync(cancellationToken);
+ tasks.Add(task);
+ }
+
+ return new DynamoDBStreamsEventSourceProcess
+ {
+ RunningTask = Task.WhenAll(tasks)
+ };
+ }
+
+ ///
+ /// Load the DynamoDB Streams event source configs. Supports JSON or comma-delimited key-value pair format.
+ /// If the value points to a file that exists, the file contents will be read.
+ ///
+ internal static List LoadDynamoDBStreamsEventSourceConfig(string configString)
+ {
+ if (File.Exists(configString))
+ {
+ configString = File.ReadAllText(configString);
+ }
+
+ configString = configString.Trim();
+
+ List? configs = null;
+
+ var jsonOptions = new JsonSerializerOptions
+ {
+ PropertyNameCaseInsensitive = true
+ };
+
+ if (configString.StartsWith('['))
+ {
+ try
+ {
+ configs = JsonSerializer.Deserialize>(configString, jsonOptions);
+ if (configs == null)
+ {
+ throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString);
+ }
+ }
+ catch (JsonException e)
+ {
+ throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString, e);
+ }
+ }
+ else if (configString.StartsWith('{'))
+ {
+ try
+ {
+ var config = JsonSerializer.Deserialize(configString, jsonOptions);
+ if (config == null)
+ {
+ throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString);
+ }
+
+ configs = new List { config };
+ }
+ catch (JsonException e)
+ {
+ throw new InvalidOperationException("Failed to parse DynamoDB Streams event source JSON config: " + configString, e);
+ }
+ }
+ else
+ {
+ var config = new DynamoDBStreamsEventSourceConfig();
+ var tokens = configString.Split(',');
+ foreach (var token in tokens)
+ {
+ if (string.IsNullOrWhiteSpace(token))
+ continue;
+
+ var keyValuePair = token.Split('=');
+ if (keyValuePair.Length != 2)
+ {
+ throw new InvalidOperationException("Failed to parse DynamoDB Streams event source config. Format should be \"TableName=,FunctionName=,...\"");
+ }
+
+ switch (keyValuePair[0].ToLower().Trim())
+ {
+ case "batchsize":
+ if (!int.TryParse(keyValuePair[1].Trim(), out var batchSize))
+ {
+ throw new InvalidOperationException("Value for batch size is not a formatted integer");
+ }
+ config.BatchSize = batchSize;
+ break;
+ case "functionname":
+ config.FunctionName = keyValuePair[1].Trim();
+ break;
+ case "lambdaruntimeapi":
+ config.LambdaRuntimeApi = keyValuePair[1].Trim();
+ break;
+ case "profile":
+ config.Profile = keyValuePair[1].Trim();
+ break;
+ case "region":
+ config.Region = keyValuePair[1].Trim();
+ break;
+ case "tablename":
+ config.TableName = keyValuePair[1].Trim();
+ break;
+ case "pollingintervalms":
+ if (!int.TryParse(keyValuePair[1].Trim(), out var pollingInterval))
+ {
+ throw new InvalidOperationException("Value for polling interval is not a formatted integer");
+ }
+ config.PollingIntervalMs = pollingInterval;
+ break;
+ }
+ }
+
+ configs = new List { config };
+ }
+
+ return configs;
+ }
+}
diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj
index 524f55fcd..7a799ca1e 100644
--- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj
+++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Amazon.Lambda.TestTool.UnitTests.csproj
@@ -15,6 +15,9 @@
+
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs
new file mode 100644
index 000000000..d47483ac2
--- /dev/null
+++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs
@@ -0,0 +1,256 @@
+using Amazon.DynamoDBStreams;
+using Amazon.DynamoDBStreams.Model;
+using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
+using Xunit;
+using Record = Amazon.DynamoDBStreams.Model.Record;
+
+namespace Amazon.Lambda.TestTool.UnitTests.DynamoDBStreamsEventSource;
+
+public class ConvertDynamoDBStreamsRecordTests
+{
+ private const string TestStreamArn = "arn:aws:dynamodb:us-west-2:123456789012:table/my-table/stream/2024-01-01T00:00:00.000";
+
+ [Fact]
+ public void ConvertBasicRecord()
+ {
+ var record = new Record
+ {
+ EventID = "event-123",
+ EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"),
+ EventVersion = "1.1",
+ EventSource = "aws:dynamodb",
+ Dynamodb = new StreamRecord
+ {
+ ApproximateCreationDateTime = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc),
+ SequenceNumber = "111111111111111111111",
+ SizeBytes = 256,
+ StreamViewType = new StreamViewType("NEW_AND_OLD_IMAGES"),
+ Keys = new Dictionary
+ {
+ ["Id"] = new AttributeValue { S = "key-1" }
+ },
+ NewImage = new Dictionary
+ {
+ ["Id"] = new AttributeValue { S = "key-1" },
+ ["Name"] = new AttributeValue { S = "Test Item" }
+ }
+ }
+ };
+
+ var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn);
+
+ Assert.Equal("event-123", result.EventID);
+ Assert.Equal("INSERT", result.EventName);
+ Assert.Equal("aws:dynamodb", result.EventSource);
+ Assert.Equal(TestStreamArn, result.EventSourceArn);
+ Assert.Equal("1.1", result.EventVersion);
+ Assert.Equal("us-west-2", result.AwsRegion);
+
+ Assert.NotNull(result.Dynamodb);
+ Assert.Equal("111111111111111111111", result.Dynamodb.SequenceNumber);
+ Assert.Equal(256, result.Dynamodb.SizeBytes);
+ Assert.Equal("NEW_AND_OLD_IMAGES", result.Dynamodb.StreamViewType);
+
+ Assert.Single(result.Dynamodb.Keys);
+ Assert.Equal("key-1", result.Dynamodb.Keys["Id"].S);
+
+ Assert.Equal(2, result.Dynamodb.NewImage.Count);
+ Assert.Equal("key-1", result.Dynamodb.NewImage["Id"].S);
+ Assert.Equal("Test Item", result.Dynamodb.NewImage["Name"].S);
+ }
+
+ [Fact]
+ public void ConvertRecordWithAllAttributeTypes()
+ {
+ var record = new Record
+ {
+ EventID = "event-456",
+ EventName = new Amazon.DynamoDBStreams.OperationType("MODIFY"),
+ EventVersion = "1.1",
+ Dynamodb = new StreamRecord
+ {
+ Keys = new Dictionary
+ {
+ ["Id"] = new AttributeValue { S = "key-1" }
+ },
+ NewImage = new Dictionary
+ {
+ ["StringAttr"] = new AttributeValue { S = "hello" },
+ ["NumberAttr"] = new AttributeValue { N = "42" },
+ ["BoolAttr"] = new AttributeValue { BOOL = true },
+ ["NullAttr"] = new AttributeValue { NULL = true },
+ ["ListAttr"] = new AttributeValue
+ {
+ L = new List
+ {
+ new AttributeValue { S = "item1" },
+ new AttributeValue { N = "2" }
+ }
+ },
+ ["MapAttr"] = new AttributeValue
+ {
+ M = new Dictionary
+ {
+ ["nested"] = new AttributeValue { S = "value" }
+ }
+ },
+ ["StringSetAttr"] = new AttributeValue { SS = new List { "a", "b" } },
+ ["NumberSetAttr"] = new AttributeValue { NS = new List { "1", "2" } }
+ }
+ }
+ };
+
+ var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn);
+
+ var newImage = result.Dynamodb.NewImage;
+ Assert.Equal("hello", newImage["StringAttr"].S);
+ Assert.Equal("42", newImage["NumberAttr"].N);
+ Assert.True(newImage["BoolAttr"].BOOL);
+ Assert.True(newImage["NullAttr"].NULL);
+ Assert.Equal(2, newImage["ListAttr"].L.Count);
+ Assert.Equal("item1", newImage["ListAttr"].L[0].S);
+ Assert.Equal("value", newImage["MapAttr"].M["nested"].S);
+ Assert.Equal(new List { "a", "b" }, newImage["StringSetAttr"].SS);
+ Assert.Equal(new List { "1", "2" }, newImage["NumberSetAttr"].NS);
+ }
+
+ [Fact]
+ public void ConvertRecordWithUserIdentity()
+ {
+ var record = new Record
+ {
+ EventID = "event-789",
+ EventName = new Amazon.DynamoDBStreams.OperationType("REMOVE"),
+ EventVersion = "1.1",
+ UserIdentity = new Identity
+ {
+ PrincipalId = "dynamodb.amazonaws.com",
+ Type = "Service"
+ },
+ Dynamodb = new StreamRecord
+ {
+ Keys = new Dictionary
+ {
+ ["Id"] = new AttributeValue { S = "expired-item" }
+ }
+ }
+ };
+
+ var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn);
+
+ Assert.NotNull(result.UserIdentity);
+ Assert.Equal("dynamodb.amazonaws.com", result.UserIdentity.PrincipalId);
+ Assert.Equal("Service", result.UserIdentity.Type);
+ }
+
+ [Fact]
+ public void ConvertRecordWithBinaryAttributes()
+ {
+ var binaryData = new byte[] { 0x01, 0x02, 0x03, 0xFF };
+ var binarySet = new List
+ {
+ new MemoryStream(new byte[] { 0xAA, 0xBB }),
+ new MemoryStream(new byte[] { 0xCC, 0xDD })
+ };
+
+ var record = new Record
+ {
+ EventID = "event-bin",
+ EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"),
+ EventVersion = "1.1",
+ Dynamodb = new StreamRecord
+ {
+ Keys = new Dictionary
+ {
+ ["Id"] = new AttributeValue { S = "key-1" }
+ },
+ NewImage = new Dictionary
+ {
+ ["BinaryAttr"] = new AttributeValue { B = new MemoryStream(binaryData) },
+ ["BinarySetAttr"] = new AttributeValue { BS = binarySet }
+ }
+ }
+ };
+
+ var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn);
+
+ var newImage = result.Dynamodb.NewImage;
+ Assert.NotNull(newImage["BinaryAttr"].B);
+ Assert.Equal(binaryData, newImage["BinaryAttr"].B.ToArray());
+ Assert.NotNull(newImage["BinarySetAttr"].BS);
+ Assert.Equal(2, newImage["BinarySetAttr"].BS.Count);
+ Assert.Equal(new byte[] { 0xAA, 0xBB }, newImage["BinarySetAttr"].BS[0].ToArray());
+ Assert.Equal(new byte[] { 0xCC, 0xDD }, newImage["BinarySetAttr"].BS[1].ToArray());
+ }
+
+ [Fact]
+ public void ConvertRecordWithEmptyListAndMap()
+ {
+ var record = new Record
+ {
+ EventID = "event-empty",
+ EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"),
+ EventVersion = "1.1",
+ Dynamodb = new StreamRecord
+ {
+ Keys = new Dictionary
+ {
+ ["Id"] = new AttributeValue { S = "key-1" }
+ },
+ NewImage = new Dictionary
+ {
+ ["EmptyList"] = new AttributeValue { L = new List() },
+ ["EmptyMap"] = new AttributeValue { M = new Dictionary() }
+ }
+ }
+ };
+
+ var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn);
+
+ var newImage = result.Dynamodb.NewImage;
+ Assert.NotNull(newImage["EmptyList"].L);
+ Assert.Empty(newImage["EmptyList"].L);
+ Assert.NotNull(newImage["EmptyMap"].M);
+ Assert.Empty(newImage["EmptyMap"].M);
+ }
+
+ [Fact]
+ public void ConvertMultipleRecords()
+ {
+ var records = new List
+ {
+ new Record
+ {
+ EventID = "event-1",
+ EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"),
+ EventVersion = "1.1",
+ Dynamodb = new StreamRecord
+ {
+ Keys = new Dictionary
+ {
+ ["Id"] = new AttributeValue { S = "key-1" }
+ }
+ }
+ },
+ new Record
+ {
+ EventID = "event-2",
+ EventName = new Amazon.DynamoDBStreams.OperationType("MODIFY"),
+ EventVersion = "1.1",
+ Dynamodb = new StreamRecord
+ {
+ Keys = new Dictionary
+ {
+ ["Id"] = new AttributeValue { S = "key-2" }
+ }
+ }
+ }
+ };
+
+ var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecords(records, TestStreamArn);
+
+ Assert.Equal(2, result.Count);
+ Assert.Equal("event-1", result[0].EventID);
+ Assert.Equal("event-2", result[1].EventID);
+ }
+}
diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs
new file mode 100644
index 000000000..836d36c22
--- /dev/null
+++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ParseDynamoDBStreamsEventSourceConfigTests.cs
@@ -0,0 +1,107 @@
+using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
+using Xunit;
+
+namespace Amazon.Lambda.TestTool.UnitTests.DynamoDBStreamsEventSource;
+
+public class ParseDynamoDBStreamsEventSourceConfigTests
+{
+ [Fact]
+ public void ParseValidJsonObject()
+ {
+ string json = """
+{
+ "TableName" : "my-table",
+ "FunctionName" : "LambdaFunction",
+ "BatchSize" : 50,
+ "LambdaRuntimeApi" : "http://localhost:7777/",
+ "Profile" : "beta",
+ "Region" : "us-east-23"
+}
+""";
+
+ var configs = DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json);
+ Assert.Single(configs);
+ Assert.Equal("my-table", configs[0].TableName);
+ Assert.Equal("LambdaFunction", configs[0].FunctionName);
+ Assert.Equal(50, configs[0].BatchSize);
+ Assert.Equal("http://localhost:7777/", configs[0].LambdaRuntimeApi);
+ Assert.Equal("beta", configs[0].Profile);
+ Assert.Equal("us-east-23", configs[0].Region);
+ }
+
+ [Fact]
+ public void ParseInvalidJsonObject()
+ {
+ string json = """
+{
+ "aaa"
+}
+""";
+
+ Assert.Throws(() => DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json));
+ }
+
+ [Fact]
+ public void ParseValidJsonArray()
+ {
+ string json = """
+[
+ {
+ "TableName" : "table-1",
+ "FunctionName" : "Function1",
+ "BatchSize" : 25
+ },
+ {
+ "TableName" : "table-2",
+ "FunctionName" : "Function2",
+ "BatchSize" : 75
+ }
+]
+""";
+
+ var configs = DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json);
+ Assert.Equal(2, configs.Count);
+ Assert.Equal("table-1", configs[0].TableName);
+ Assert.Equal("Function1", configs[0].FunctionName);
+ Assert.Equal(25, configs[0].BatchSize);
+ Assert.Equal("table-2", configs[1].TableName);
+ Assert.Equal("Function2", configs[1].FunctionName);
+ Assert.Equal(75, configs[1].BatchSize);
+ }
+
+ [Fact]
+ public void ParseInvalidJsonArray()
+ {
+ string json = """
+[
+ {"aaa"}
+]
+""";
+
+ Assert.Throws(() => DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(json));
+ }
+
+ [Fact]
+ public void ParseKeyPairs()
+ {
+ var configs = DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(
+ "TableName=my-table ,functionName =LambdaFunction, batchSize=50," +
+ "LambdaRuntimeApi=http://localhost:7777/ ,Profile=beta,Region=us-east-23");
+
+ Assert.Single(configs);
+ Assert.Equal("my-table", configs[0].TableName);
+ Assert.Equal("LambdaFunction", configs[0].FunctionName);
+ Assert.Equal(50, configs[0].BatchSize);
+ Assert.Equal("http://localhost:7777/", configs[0].LambdaRuntimeApi);
+ Assert.Equal("beta", configs[0].Profile);
+ Assert.Equal("us-east-23", configs[0].Region);
+ }
+
+ [Theory]
+ [InlineData("novalue")]
+ [InlineData("BatchSize=noint")]
+ public void InvalidKeyPairString(string keyPairConfig)
+ {
+ Assert.Throws(() => DynamoDBStreamsEventSourceProcess.LoadDynamoDBStreamsEventSourceConfig(keyPairConfig));
+ }
+}