diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedDataSource.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedDataSource.cs
deleted file mode 100644
index db6ae3c8d00c..000000000000
--- a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedDataSource.cs
+++ /dev/null
@@ -1,156 +0,0 @@
-/*******************************************************************************
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * Licensed under the Apache License, Version 2.0 (the "License"). You may not use
- * this file except in compliance with the License. A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file.
- * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- * *****************************************************************************
- * __ _ _ ___
- * ( )( \/\/ )/ __)
- * /__\ \ / \__ \
- * (_)(_) \/\/ (___/
- *
- * AWS SDK for .NET
- * API Version: 2006-03-01
- *
- */
-using Amazon.Runtime.Internal.Util;
-using System;
-using System.Diagnostics.CodeAnalysis;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Amazon.S3.Transfer.Internal
-{
- ///
- /// ArrayPool-based buffered data source that reads from pre-buffered part data.
- /// Manages ArrayPool lifecycle and provides efficient buffer-to-buffer copying.
- ///
- internal class BufferedDataSource : IPartDataSource
- {
- private readonly StreamPartBuffer _partBuffer;
- private bool _disposed = false;
-
- #region Logger
-
- private Logger Logger
- {
- get
- {
- return Logger.GetLogger(typeof(TransferUtility));
- }
- }
-
- #endregion
-
- ///
- public int PartNumber => _partBuffer.PartNumber;
-
- ///
- public bool IsComplete => _partBuffer.RemainingBytes == 0;
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// The containing the buffered part data.
- /// Thrown when is null.
- public BufferedDataSource(StreamPartBuffer partBuffer)
- {
- _partBuffer = partBuffer ?? throw new ArgumentNullException(nameof(partBuffer));
-
- Logger.DebugFormat("BufferedDataSource: Created for part {0} (BufferLength={1}, RemainingBytes={2})",
- _partBuffer.PartNumber, _partBuffer.Length, _partBuffer.RemainingBytes);
- }
-
- ///
- public Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- try
- {
- if (buffer == null)
- throw new ArgumentNullException(nameof(buffer));
- if (offset < 0)
- throw new ArgumentOutOfRangeException(nameof(offset), "Offset must be non-negative");
- if (count < 0)
- throw new ArgumentOutOfRangeException(nameof(count), "Count must be non-negative");
- if (offset + count > buffer.Length)
- throw new ArgumentException("Offset and count exceed buffer bounds");
-
- if (_partBuffer.RemainingBytes == 0)
- {
- Logger.DebugFormat("BufferedDataSource: [Part {0}] Reached end of buffer (RemainingBytes=0)", _partBuffer.PartNumber);
- return Task.FromResult(0); // End of part
- }
-
- // Calculate bytes to copy from buffered part
- var availableBytes = _partBuffer.RemainingBytes;
- var bytesToRead = Math.Min(count, availableBytes);
-
- Logger.DebugFormat("BufferedDataSource: [Part {0}] Reading {1} bytes (Requested={2}, Available={3}, CurrentPosition={4})",
- _partBuffer.PartNumber, bytesToRead, count, availableBytes, _partBuffer.CurrentPosition);
-
- Buffer.BlockCopy(
- _partBuffer.ArrayPoolBuffer, // Source: ArrayPool buffer
- _partBuffer.CurrentPosition, // Source offset
- buffer, // Destination: user buffer
- offset, // Destination offset
- bytesToRead // Bytes to copy
- );
-
- // Update position in the part buffer
- _partBuffer.CurrentPosition += bytesToRead;
-
- Logger.DebugFormat("BufferedDataSource: [Part {0}] Read complete (BytesRead={1}, NewPosition={2}, RemainingBytes={3}, IsComplete={4})",
- _partBuffer.PartNumber, bytesToRead, _partBuffer.CurrentPosition, _partBuffer.RemainingBytes, IsComplete);
-
- return Task.FromResult(bytesToRead);
- }
- catch (Exception ex)
- {
- Logger.Error(ex, "BufferedDataSource: [Part {0}] Error during read: {1}", _partBuffer.PartNumber, ex.Message);
-
- // On any error during read (including validation), mark the buffer as consumed to prevent further reads
- _partBuffer.CurrentPosition = _partBuffer.Length;
- throw;
- }
- }
-
- private void ThrowIfDisposed()
- {
- if (_disposed)
- throw new ObjectDisposedException(nameof(BufferedDataSource));
- }
-
- ///
- [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Dispose methods should not throw exceptions")]
- public void Dispose()
- {
- if (!_disposed)
- {
- try
- {
- Logger.DebugFormat("BufferedDataSource: [Part {0}] Disposing (Returning buffer to ArrayPool)", _partBuffer.PartNumber);
-
- // Dispose the underlying StreamPartBuffer, which returns ArrayPool buffer to pool
- _partBuffer?.Dispose();
- }
- catch (Exception ex)
- {
- Logger.Error(ex, "BufferedDataSource: [Part {0}] Error during disposal: {1}", _partBuffer.PartNumber, ex.Message);
-
- // Suppressing CA1031: Dispose methods should not throw exceptions
- // Continue disposal process silently on any errors
- }
-
- _disposed = true;
- }
- }
- }
-}
diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs
index 256a0228d086..49a014f734f8 100644
--- a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs
+++ b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs
@@ -147,7 +147,7 @@ private void ProcessStreamingPart(
// Add the streaming data source to the buffer manager
// After this succeeds, the buffer manager owns the data source
- _partBufferManager.AddBuffer(streamingDataSource);
+ _partBufferManager.AddDataSource(streamingDataSource);
// Mark ownership transfer by nulling our reference
// If ReleaseBufferSpace() throws, we no longer own the data source, so we won't dispose it
@@ -181,17 +181,17 @@ private void ProcessStreamingPart(
/// Cancellation token for the operation.
///
/// This method is called when the part arrives out of the expected sequential order.
- /// The part data is buffered into ArrayPool memory for later sequential consumption.
+ /// The part data is buffered into ArrayPool memory using ChunkedBufferStream for later sequential consumption.
///
/// OWNERSHIP:
- /// - Response is read and buffered into StreamPartBuffer
+ /// - Response is read and buffered into ChunkedBufferStream
/// - Response is disposed immediately after buffering (no longer needed)
- /// - StreamPartBuffer is added to buffer manager (buffer manager takes ownership)
- /// - Buffer manager will dispose StreamPartBuffer during cleanup
+ /// - ChunkedPartDataSource (wrapping ChunkedBufferStream) is added to buffer manager (buffer manager takes ownership)
+ /// - Buffer manager will dispose ChunkedPartDataSource during cleanup
///
/// ERROR HANDLING:
/// - Always dispose response in catch block since we own it throughout this method
- /// - BufferPartFromResponseAsync handles its own cleanup of StreamPartBuffer on error
+ /// - BufferPartFromResponseAsync handles its own cleanup of ChunkedBufferStream on error
///
private async Task ProcessBufferedPartAsync(
int partNumber,
@@ -204,7 +204,7 @@ private async Task ProcessBufferedPartAsync(
try
{
// Buffer the part from the response stream into memory
- var buffer = await BufferPartFromResponseAsync(
+ var dataSource = await BufferPartFromResponseAsync(
partNumber,
response,
cancellationToken).ConfigureAwait(false);
@@ -212,11 +212,11 @@ private async Task ProcessBufferedPartAsync(
// Response has been fully read and buffered - dispose it now
response?.Dispose();
- _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into memory",
- partNumber, buffer.Length);
+ _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Successfully buffered part data",
+ partNumber);
// Add the buffered part to the buffer manager
- _partBufferManager.AddBuffer(buffer);
+ _partBufferManager.AddDataSource(dataSource);
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Added to buffer manager (capacity will be released after consumption)",
partNumber);
@@ -257,77 +257,61 @@ public void Dispose()
}
///
- /// Buffers a part from the GetObjectResponse stream into ArrayPool memory.
- /// Used when a part arrives out of order and cannot be streamed directly.
+ /// Buffers a part from the GetObjectResponse stream into memory using ChunkedBufferStream.
+ /// Uses multiple small ArrayPool chunks (80KB each) to avoid the 2GB byte[] array size limitation
+ /// and Large Object Heap allocations. Used when a part arrives out of order and cannot be streamed directly.
///
/// The part number being buffered.
/// The GetObjectResponse containing the part data stream.
/// Cancellation token for the operation.
- /// A containing the buffered part data.
- /// Thrown when buffering fails. The StreamPartBuffer will be disposed automatically.
- private async Task BufferPartFromResponseAsync(
+ /// An containing the buffered part data.
+ /// Thrown when buffering fails. The data source will be disposed automatically.
+ private async Task BufferPartFromResponseAsync(
int partNumber,
GetObjectResponse response,
CancellationToken cancellationToken)
{
- StreamPartBuffer downloadedPart = null;
+ long expectedBytes = response.ContentLength;
+
+ ChunkedBufferStream chunkedStream = null;
try
{
- // Use ContentLength to determine exact bytes to read and allocate
- long expectedBytes = response.ContentLength;
- int initialBufferSize = (int)expectedBytes;
-
- _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Allocating buffer of size {1} bytes from ArrayPool",
- partNumber, initialBufferSize);
+ _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffering {1} bytes using chunked buffer stream",
+ partNumber, expectedBytes);
- downloadedPart = StreamPartBuffer.Create(partNumber, initialBufferSize);
+ chunkedStream = new ChunkedBufferStream(expectedBytes);
- // Get reference to the buffer for writing
- var partBuffer = downloadedPart.ArrayPoolBuffer;
-
- // Create a MemoryStream wrapper around the pooled buffer
- // writable: true allows WriteResponseStreamAsync to write to it
- // The MemoryStream starts at position 0 and can grow up to initialBufferSize
- using (var memoryStream = new MemoryStream(partBuffer, 0, initialBufferSize, writable: true))
- {
- _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into buffer",
- partNumber);
-
- // Use GetObjectResponse's stream copy logic which includes:
- // - Progress tracking with events
- // - Size validation (ContentLength vs bytes read)
- // - Buffered reading with proper chunk sizes
- await response.WriteResponseStreamAsync(
- memoryStream,
- null, // destination identifier (not needed for memory stream)
- _config.BufferSize,
- cancellationToken,
- validateSize: true)
- .ConfigureAwait(false);
-
- int totalRead = (int)memoryStream.Position;
-
- _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Read {1} bytes from response stream",
- partNumber, totalRead);
+ _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into chunked buffers",
+ partNumber);
- // Set the length to reflect actual bytes read
- downloadedPart.SetLength(totalRead);
+ // Write response stream to chunked buffer stream
+ // ChunkedBufferStream automatically allocates chunks as needed
+ await response.WriteResponseStreamAsync(
+ chunkedStream,
+ null,
+ _config.BufferSize,
+ cancellationToken,
+ validateSize: true)
+ .ConfigureAwait(false);
+
+ _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into chunked stream",
+ partNumber, chunkedStream.Length);
- if (totalRead != expectedBytes)
- {
- _logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
- partNumber, expectedBytes, totalRead);
- }
+ if (chunkedStream.Length != expectedBytes)
+ {
+ _logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
+ partNumber, expectedBytes, chunkedStream.Length);
}
- return downloadedPart;
+ // Switch to read mode and wrap in IPartDataSource
+ chunkedStream.SwitchToReadMode();
+ return new ChunkedPartDataSource(partNumber, chunkedStream);
}
catch (Exception ex)
{
- _logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to buffer part from response stream", partNumber);
- // If something goes wrong, StreamPartBuffer.Dispose() will handle cleanup
- downloadedPart?.Dispose();
+ _logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to buffer part", partNumber);
+ chunkedStream?.Dispose();
throw;
}
}
diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedBufferStream.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedBufferStream.cs
new file mode 100644
index 000000000000..b9bd8fbbef13
--- /dev/null
+++ b/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedBufferStream.cs
@@ -0,0 +1,409 @@
+/*******************************************************************************
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Licensed under the Apache License, Version 2.0 (the "License"). You may not use
+ * this file except in compliance with the License. A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file.
+ * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ * *****************************************************************************
+ * __ _ _ ___
+ * ( )( \/\/ )/ __)
+ * /__\ \ / \__ \
+ * (_)(_) \/\/ (___/
+ *
+ * AWS SDK for .NET
+ * API Version: 2006-03-01
+ *
+ */
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Amazon.S3.Transfer.Internal
+{
+ ///
+ /// A writable and readable stream that stores data across multiple small ArrayPool buffers,
+ /// avoiding the 2GB array size limitation and Large Object Heap (LOH) allocations.
+ ///
+ ///
+ /// Design Goals:
+ ///
+ /// - Keep each buffer chunk below 85KB to avoid LOH allocation
+ /// - Support parts larger than 2GB (int.MaxValue)
+ /// - Efficient memory management via ArrayPool
+ /// - Present standard Stream interface for easy integration
+ ///
+ ///
+ /// Size Limits:
+ ///
+ /// Maximum supported stream size is approximately 140TB (int.MaxValue * CHUNK_SIZE bytes).
+ /// This limit exists because chunk indexing uses int for List indexing.
+ ///
+ ///
+ /// Usage Pattern:
+ ///
+ /// var stream = new ChunkedBufferStream();
+ ///
+ /// // Write phase: Stream data in
+ /// await response.WriteResponseStreamAsync(stream, ...);
+ ///
+ /// // Read phase: Stream data out (automatically switches to read mode)
+ /// int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
+ ///
+ /// // Cleanup: Returns all chunks to ArrayPool
+ /// stream.Dispose();
+ ///
+ ///
+ /// Note: The stream automatically switches to read mode on the first read operation.
+ /// You can optionally call explicitly for clarity.
+ ///
+ ///
+ internal class ChunkedBufferStream : Stream
+ {
+ ///
+ /// Size of each buffer chunk. Set to 64KB to match ArrayPool bucket size and stay below the 85KB Large Object Heap threshold.
+ /// If we chose any higher than 64KB, ArrayPool would round up to 128KB (which would go to LOH).
+ ///
+ private const int CHUNK_SIZE = 65536; // 64KB - matches ArrayPool bucket, safely below 85KB LOH threshold
+
+ ///
+ /// Maximum supported stream size. This limit exists because chunk indexing uses int for List indexing.
+ /// With 64KB chunks, this allows approximately 140TB of data.
+ ///
+ private const long MAX_STREAM_SIZE = (long)int.MaxValue * CHUNK_SIZE;
+
+ private readonly List _chunks;
+ private long _length = 0;
+ private long _position = 0;
+ private bool _isReadMode = false;
+ private bool _disposed = false;
+
+ ///
+ /// Creates a new ChunkedBufferStream with default initial capacity.
+ ///
+ public ChunkedBufferStream()
+ {
+ _chunks = new List();
+ }
+
+ ///
+ /// Creates a new ChunkedBufferStream with pre-allocated capacity for the expected size.
+ /// This avoids List resizing during writes, improving performance for known sizes.
+ ///
+ /// The estimated total size in bytes. Used to pre-allocate the chunk list capacity.
+ public ChunkedBufferStream(long estimatedSize)
+ {
+ if (estimatedSize > 0)
+ {
+ // Ceiling division formula: (n + d - 1) / d calculates ceil(n / d) using integer arithmetic.
+ // This computes how many chunks are needed to hold estimatedSize bytes, rounding up to
+ // ensure the last partial chunk is accounted for. Avoids floating-point math for performance.
+ // Example: For 100 bytes with CHUNK_SIZE=32: (100 + 31) / 32 = 131 / 32 = 4 chunks
+ // (simple division 100/32=3 would only hold 96 bytes, losing 4 bytes)
+ long estimatedChunks = (estimatedSize + CHUNK_SIZE - 1) / CHUNK_SIZE;
+ int capacity = (int)Math.Min(estimatedChunks, int.MaxValue);
+ _chunks = new List(capacity);
+ }
+ else
+ {
+ _chunks = new List();
+ }
+ }
+
+ ///
+ /// Throws if the stream has been disposed.
+ ///
+ /// Thrown if the stream is disposed.
+ private void ThrowIfDisposed()
+ {
+ if (_disposed)
+ throw new ObjectDisposedException(nameof(ChunkedBufferStream));
+ }
+
+ ///
+ /// Gets a value indicating whether the stream supports reading.
+ /// Returns true after the first read operation or after has been called.
+ ///
+ public override bool CanRead => _isReadMode;
+
+ ///
+ /// Gets a value indicating whether the stream supports seeking.
+ /// Always returns false - seeking is not supported.
+ ///
+ public override bool CanSeek => false;
+
+ ///
+ /// Gets a value indicating whether the stream supports writing.
+ /// Returns true before is called, false after.
+ ///
+ public override bool CanWrite => !_isReadMode;
+
+ ///
+ /// Gets the length in bytes of the stream.
+ ///
+ /// Thrown if the stream has been disposed.
+ public override long Length
+ {
+ get
+ {
+ ThrowIfDisposed();
+ return _length;
+ }
+ }
+
+ ///
+ /// Gets or sets the position within the current stream.
+ /// Setting the position is not supported and will throw .
+ ///
+ /// Thrown if the stream has been disposed.
+ /// Thrown when attempting to set the position.
+ public override long Position
+ {
+ get
+ {
+ ThrowIfDisposed();
+ return _position;
+ }
+ set => throw new NotSupportedException("Seeking not supported");
+ }
+
+ ///
+ /// Writes a sequence of bytes to the stream and advances the current position by the number of bytes written.
+ /// Automatically allocates new chunks from ArrayPool as needed.
+ ///
+ /// The buffer containing data to write.
+ /// The zero-based byte offset in buffer at which to begin copying bytes.
+ /// The number of bytes to write.
+ /// Thrown if the stream has been disposed.
+ /// Thrown if stream is in read mode.
+ /// Thrown if buffer is null.
+ /// Thrown if offset or count is negative or exceeds buffer bounds.
+ /// Thrown if the write would exceed the maximum supported stream size (approximately 175TB).
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ ThrowIfDisposed();
+
+ if (_isReadMode)
+ throw new NotSupportedException("Cannot write after switching to read mode");
+
+ if (buffer == null)
+ throw new ArgumentNullException(nameof(buffer));
+ if (offset < 0)
+ throw new ArgumentOutOfRangeException(nameof(offset), "Offset must be non-negative");
+ if (count < 0)
+ throw new ArgumentOutOfRangeException(nameof(count), "Count must be non-negative");
+ if (offset + count > buffer.Length)
+ throw new ArgumentException("Offset and count exceed buffer bounds");
+
+ // Check for overflow before writing - prevents chunk index overflow for extremely large streams
+ if (_length > MAX_STREAM_SIZE - count)
+ throw new IOException($"Write would exceed maximum supported stream size of {MAX_STREAM_SIZE} bytes (approximately 175TB).");
+
+ int remaining = count;
+ int sourceOffset = offset;
+
+ while (remaining > 0)
+ {
+ // Calculate which chunk and offset within chunk for current write position
+ int chunkIndex = (int)(_length / CHUNK_SIZE);
+ int offsetInChunk = (int)(_length % CHUNK_SIZE);
+
+ // Allocate new chunk if we've filled all existing chunks
+ if (chunkIndex >= _chunks.Count)
+ {
+ _chunks.Add(ArrayPool.Shared.Rent(CHUNK_SIZE));
+ }
+
+ // Copy data to current chunk
+ int bytesToWrite = Math.Min(remaining, CHUNK_SIZE - offsetInChunk);
+ Buffer.BlockCopy(buffer, sourceOffset, _chunks[chunkIndex], offsetInChunk, bytesToWrite);
+
+ _length += bytesToWrite;
+ sourceOffset += bytesToWrite;
+ remaining -= bytesToWrite;
+ }
+
+ _position = _length;
+ }
+
+ ///
+ /// Asynchronously writes a sequence of bytes to the stream.
+ ///
+ /// The buffer containing data to write.
+ /// The zero-based byte offset in buffer at which to begin copying bytes.
+ /// The number of bytes to write.
+ /// The token to monitor for cancellation requests.
+ /// A task representing the asynchronous write operation.
+ ///
+ /// Delegates to synchronous as ArrayPool operations are fast and don't benefit from async.
+ ///
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ Write(buffer, offset, count);
+ return Task.CompletedTask;
+ }
+
+ ///
+ /// Reads a sequence of bytes from the stream and advances the position by the number of bytes read.
+ /// Automatically switches to read mode on the first read if not already in read mode.
+ ///
+ /// The buffer to read data into.
+ /// The zero-based byte offset in buffer at which to begin storing data.
+ /// The maximum number of bytes to read.
+ ///
+ /// The total number of bytes read into the buffer. This can be less than the requested count
+ /// if that many bytes are not currently available, or zero if the end of the stream is reached.
+ ///
+ /// Thrown if the stream has been disposed.
+ /// Thrown if buffer is null.
+ /// Thrown if offset or count is negative or exceeds buffer bounds.
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ ThrowIfDisposed();
+
+ // Automatically switch to read mode on first read
+ if (!_isReadMode)
+ {
+ _isReadMode = true;
+ _position = 0;
+ }
+
+ if (buffer == null)
+ throw new ArgumentNullException(nameof(buffer));
+ if (offset < 0)
+ throw new ArgumentOutOfRangeException(nameof(offset), "Offset must be non-negative");
+ if (count < 0)
+ throw new ArgumentOutOfRangeException(nameof(count), "Count must be non-negative");
+ if (offset + count > buffer.Length)
+ throw new ArgumentException("Offset and count exceed buffer bounds");
+
+ if (_position >= _length)
+ return 0; // End of stream
+
+ long bytesAvailable = _length - _position;
+ int bytesToRead = (int)Math.Min(count, bytesAvailable);
+ int bytesRead = 0;
+
+ while (bytesRead < bytesToRead)
+ {
+ int chunkIndex = (int)(_position / CHUNK_SIZE);
+ int offsetInChunk = (int)(_position % CHUNK_SIZE);
+ int bytesInThisChunk = Math.Min(bytesToRead - bytesRead, CHUNK_SIZE - offsetInChunk);
+
+ Buffer.BlockCopy(_chunks[chunkIndex], offsetInChunk, buffer, offset + bytesRead, bytesInThisChunk);
+
+ _position += bytesInThisChunk;
+ bytesRead += bytesInThisChunk;
+ }
+
+ return bytesRead;
+ }
+
+ ///
+ /// Asynchronously reads a sequence of bytes from the stream.
+ /// Automatically switches to read mode on the first read if not already in read mode.
+ ///
+ /// The buffer to read data into.
+ /// The zero-based byte offset in buffer at which to begin storing data.
+ /// The maximum number of bytes to read.
+ /// The token to monitor for cancellation requests.
+ ///
+ /// A task representing the asynchronous read operation. The task result contains the total number
+ /// of bytes read into the buffer.
+ ///
+ ///
+ /// Delegates to synchronous as buffer operations are fast and don't benefit from async.
+ ///
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ return Task.FromResult(Read(buffer, offset, count));
+ }
+
+ ///
+ /// Switches the stream from write mode to read mode.
+ /// This method is optional - the stream will automatically switch to read mode on the first read operation.
+ /// Call this explicitly if you want to switch modes before reading or for clarity in your code.
+ /// Resets the position to the beginning of the stream.
+ ///
+ /// Thrown if the stream has been disposed.
+ /// Thrown if already in read mode.
+ public void SwitchToReadMode()
+ {
+ ThrowIfDisposed();
+
+ if (_isReadMode)
+ throw new InvalidOperationException("Stream is already in read mode");
+
+ _isReadMode = true;
+ _position = 0;
+ }
+
+ ///
+ /// Flushes any buffered data. This is a no-op for this stream type.
+ ///
+ public override void Flush()
+ {
+ // No-op: No buffering layer that needs flushing
+ }
+
+ ///
+ /// Sets the position within the current stream.
+ /// Not supported - always throws .
+ ///
+ /// Always thrown.
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException("Seeking is not supported");
+ }
+
+ ///
+ /// Sets the length of the current stream.
+ /// Not supported - always throws .
+ ///
+ /// Always thrown.
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException("SetLength is not supported");
+ }
+
+ ///
+ /// Releases the unmanaged resources used by the stream and optionally releases the managed resources.
+ /// Returns all ArrayPool chunks back to the shared pool.
+ ///
+ /// true to release both managed and unmanaged resources; false to release only unmanaged resources.
+ [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Dispose methods should not throw exceptions")]
+ protected override void Dispose(bool disposing)
+ {
+ if (!_disposed && disposing)
+ {
+ try
+ {
+ // Return all chunks to ArrayPool
+ foreach (var chunk in _chunks)
+ {
+ ArrayPool.Shared.Return(chunk);
+ }
+ _chunks.Clear();
+ }
+ catch (Exception)
+ {
+ // Suppress exceptions in Dispose - continue cleanup
+ }
+
+ _disposed = true;
+ }
+
+ base.Dispose(disposing);
+ }
+ }
+}
diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedPartDataSource.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedPartDataSource.cs
new file mode 100644
index 000000000000..afc2cabf9ffa
--- /dev/null
+++ b/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedPartDataSource.cs
@@ -0,0 +1,136 @@
+/*******************************************************************************
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Licensed under the Apache License, Version 2.0 (the "License"). You may not use
+ * this file except in compliance with the License. A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file.
+ * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ * *****************************************************************************
+ * __ _ _ ___
+ * ( )( \/\/ )/ __)
+ * /__\ \ / \__ \
+ * (_)(_) \/\/ (___/
+ *
+ * AWS SDK for .NET
+ * API Version: 2006-03-01
+ *
+ */
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Amazon.S3.Transfer.Internal
+{
+ ///
+ /// IPartDataSource implementation that wraps a ChunkedBufferStream for large buffered parts
+ ///
+ ///
+ /// Purpose:
+ ///
+ /// - Wraps ChunkedBufferStream to present IPartDataSource interface
+ /// - Enables seamless integration with PartBufferManager
+ /// - Handles disposal of underlying ChunkedBufferStream
+ ///
+ ///
+ /// Lifecycle:
+ ///
+ /// // Created in BufferedPartDataHandler for large parts
+ /// var chunkedStream = new ChunkedBufferStream();
+ /// await response.WriteResponseStreamAsync(chunkedStream, ...);
+ /// chunkedStream.SwitchToReadMode();
+ /// var dataSource = new ChunkedPartDataSource(partNumber, chunkedStream);
+ ///
+ /// // Added to PartBufferManager
+ /// _partBufferManager.AddDataSource(dataSource);
+ ///
+ /// // Consumer reads sequentially
+ /// await dataSource.ReadAsync(buffer, offset, count, ct);
+ ///
+ /// // Disposed by PartBufferManager when part is consumed
+ /// dataSource.Dispose(); // Returns all chunks to ArrayPool
+ ///
+ ///
+ internal class ChunkedPartDataSource : IPartDataSource
+ {
+ private readonly ChunkedBufferStream _stream;
+ private bool _disposed = false;
+
+ ///
+ /// Gets the part number for this data source.
+ /// Used by PartBufferManager for sequential ordering.
+ ///
+ public int PartNumber { get; }
+
+ ///
+ /// Gets whether this data source has been fully consumed.
+ /// Returns true when all data has been read from the chunked buffer stream.
+ ///
+ public bool IsComplete => _stream.Position >= _stream.Length;
+
+ ///
+ /// Creates a new ChunkedPartDataSource wrapping a ChunkedBufferStream.
+ /// The ChunkedBufferStream must already be in read mode (SwitchToReadMode called).
+ /// Takes ownership of the stream and will dispose it when this data source is disposed.
+ ///
+ /// The part number for ordering.
+ /// The ChunkedBufferStream containing the buffered part data. Must be in read mode.
+ /// Thrown if stream is null.
+ /// Thrown if the stream is not in read mode (CanRead is false).
+ public ChunkedPartDataSource(int partNumber, ChunkedBufferStream stream)
+ {
+ PartNumber = partNumber;
+ _stream = stream ?? throw new ArgumentNullException(nameof(stream));
+
+ if (!_stream.CanRead)
+ throw new InvalidOperationException("ChunkedBufferStream must be in read mode (call SwitchToReadMode first)");
+ }
+
+ ///
+ /// Asynchronously reads a sequence of bytes from the chunked buffer stream.
+ ///
+ /// The buffer to read data into.
+ /// The zero-based byte offset in buffer at which to begin storing data.
+ /// The maximum number of bytes to read.
+ /// The token to monitor for cancellation requests.
+ ///
+ /// A task representing the asynchronous read operation. The task result contains the total number
+ /// of bytes read into the buffer. This can be less than the requested count if that many bytes
+ /// are not currently available, or zero if the end of the stream is reached.
+ ///
+ /// Thrown if the object has been disposed.
+ public Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ if (_disposed)
+ throw new ObjectDisposedException(nameof(ChunkedPartDataSource));
+
+ return _stream.ReadAsync(buffer, offset, count, cancellationToken);
+ }
+
+ ///
+ /// Returns a string representation of this ChunkedPartDataSource for debugging.
+ ///
+ /// A string describing this chunked part data source.
+ public override string ToString()
+ {
+ return $"ChunkedPartDataSource(Part={PartNumber}, Length={_stream.Length} bytes, Position={_stream.Position})";
+ }
+
+
+ ///
+ /// Releases all resources used by this ChunkedPartDataSource.
+ /// Disposes the underlying ChunkedBufferStream, which returns all ArrayPool chunks.
+ ///
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ _stream?.Dispose();
+ _disposed = true;
+ }
+ }
+ }
+}
diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs
index 5f5c214421b5..686a89691ce4 100644
--- a/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs
+++ b/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs
@@ -44,19 +44,7 @@ internal interface IPartBufferManager : IDisposable
/// The part data source to add.
void AddDataSource(IPartDataSource dataSource);
- ///
- /// Adds a downloaded part buffer and signals readers when next expected part arrives.
- ///
- /// The downloaded part buffer to add.
- void AddBuffer(StreamPartBuffer buffer);
-
- ///
- /// Adds a part data source (streaming or buffered) and signals readers when next expected part arrives.
- ///
- /// The part data source to add (can be StreamingDataSource or BufferedDataSource).
- /// A task that completes when the data source has been added and signaling is complete.
- void AddBuffer(IPartDataSource dataSource);
-
+
///
/// Reads data from the buffer manager. Automatically handles sequential part consumption
/// and reads across part boundaries to fill the buffer when possible, matching standard Stream.Read() behavior.
diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs
index 33edf2fa0ad1..a27622ea4377 100644
--- a/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs
+++ b/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs
@@ -82,8 +82,8 @@ namespace Amazon.S3.Transfer.Internal
/// - Blocks if are already buffered in memory
/// - Example: With MaxInMemoryParts=10, if parts 5-14 are buffered, the task downloading
/// part 15 blocks here until the reader consumes and releases part 5's buffer
- /// 2. Read part data from S3 into pooled buffer
- /// 3. Add buffered part: await
+ /// 2. Read part data from S3 into chunked ArrayPool buffers
+ /// 3. Add buffered part: await
/// - Adds buffer to _partDataSources dictionary
/// - Signals _partAvailable to wake consumer if waiting
/// 4. Consumer eventually releases the buffer slot after reading the part
@@ -176,17 +176,8 @@ internal class PartBufferManager : IPartBufferManager
#endregion
- #region Logger
+ private readonly Logger _logger = Logger.GetLogger(typeof(PartBufferManager));
- private Logger Logger
- {
- get
- {
- return Logger.GetLogger(typeof(TransferUtility));
- }
- }
-
- #endregion
///
/// Initializes a new instance of the class.
@@ -205,7 +196,7 @@ public PartBufferManager(BufferedDownloadConfiguration config)
);
_partAvailable = new AutoResetEvent(false);
- Logger.DebugFormat("PartBufferManager initialized with MaxInMemoryParts={0}", config.MaxInMemoryParts);
+ _logger.DebugFormat("PartBufferManager initialized with MaxInMemoryParts={0}", config.MaxInMemoryParts);
}
///
@@ -236,12 +227,12 @@ public async Task WaitForBufferSpaceAsync(CancellationToken cancellationToken)
ThrowIfDisposed();
var availableBefore = _bufferSpaceAvailable.CurrentCount;
- Logger.DebugFormat("PartBufferManager: Waiting for buffer space (Available slots before wait: {0})", availableBefore);
+ _logger.DebugFormat("PartBufferManager: Waiting for buffer space (Available slots before wait: {0})", availableBefore);
await _bufferSpaceAvailable.WaitAsync(cancellationToken).ConfigureAwait(false);
var availableAfter = _bufferSpaceAvailable.CurrentCount;
- Logger.DebugFormat("PartBufferManager: Buffer space acquired (Available slots after acquire: {0})", availableAfter);
+ _logger.DebugFormat("PartBufferManager: Buffer space acquired (Available slots after acquire: {0})", availableAfter);
}
///
@@ -267,50 +258,25 @@ public void AddDataSource(IPartDataSource dataSource)
if (dataSource == null)
throw new ArgumentNullException(nameof(dataSource));
- Logger.DebugFormat("PartBufferManager: Adding part {0} (BufferedParts count before add: {1})",
+ _logger.DebugFormat("PartBufferManager: Adding part {0} (BufferedParts count before add: {1})",
dataSource.PartNumber, _partDataSources.Count);
// Add the data source to the collection
if (!_partDataSources.TryAdd(dataSource.PartNumber, dataSource))
{
// Duplicate part number - this shouldn't happen in normal operation
- Logger.Error(null, "PartBufferManager: Duplicate part {0} attempted to be added", dataSource.PartNumber);
+ _logger.Error(null, "PartBufferManager: Duplicate part {0} attempted to be added", dataSource.PartNumber);
dataSource?.Dispose(); // Clean up the duplicate part
throw new InvalidOperationException($"Duplicate part {dataSource.PartNumber} attempted to be added");
}
- Logger.DebugFormat("PartBufferManager: Part {0} added successfully (BufferedParts count after add: {1}). Signaling _partAvailable.",
+ _logger.DebugFormat("PartBufferManager: Part {0} added successfully (BufferedParts count after add: {1}). Signaling _partAvailable.",
dataSource.PartNumber, _partDataSources.Count);
// Signal that a new part is available
_partAvailable.Set();
}
- ///
- public void AddBuffer(StreamPartBuffer buffer)
- {
- ThrowIfDisposed();
-
- if (buffer == null)
- throw new ArgumentNullException(nameof(buffer));
-
- // Create a BufferedDataSource and add it
- var bufferedSource = new BufferedDataSource(buffer);
- AddDataSource(bufferedSource);
- }
-
- ///
- public void AddBuffer(IPartDataSource dataSource)
- {
- ThrowIfDisposed();
-
- if (dataSource == null)
- throw new ArgumentNullException(nameof(dataSource));
-
- // Delegate directly to AddDataSourceAsync which already handles IPartDataSource
- AddDataSource(dataSource);
- }
-
///
public async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
@@ -395,7 +361,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat
{
var currentPartNumber = _nextExpectedPartNumber;
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Expecting part {0} (Requested bytes: {1}, BufferedParts count: {2})",
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Expecting part {0} (Requested bytes: {1}, BufferedParts count: {2})",
currentPartNumber, count, _partDataSources.Count);
// Wait for the current part to become available.
@@ -405,7 +371,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat
// Example: If parts 3, 5, 7 are available but we need part 2, we wait here.
while (!_partDataSources.ContainsKey(currentPartNumber))
{
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} not yet available. Waiting on _partAvailable event...",
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} not yet available. Waiting on _partAvailable event...",
currentPartNumber);
// Check for completion first to avoid indefinite waiting.
@@ -414,12 +380,12 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat
{
if (state.Item2 != null) // Check for exception
{
- Logger.Error(state.Item2, "PartBufferManager.ReadFromCurrentPart: Download failed while waiting for part {0}",
+ _logger.Error(state.Item2, "PartBufferManager.ReadFromCurrentPart: Download failed while waiting for part {0}",
currentPartNumber);
throw new InvalidOperationException("Multipart download failed", state.Item2);
}
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Download complete, part {0} not available. Returning EOF.",
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Download complete, part {0} not available. Returning EOF.",
currentPartNumber);
// True EOF - all parts downloaded, no more data coming
return (0, false);
@@ -434,11 +400,11 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat
// and calls AddDataSourceAsync, it signals this event, waking us to check again.
await Task.Run(() => _partAvailable.WaitOne(), cancellationToken).ConfigureAwait(false);
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Woke from _partAvailable wait. Rechecking for part {0}...",
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Woke from _partAvailable wait. Rechecking for part {0}...",
currentPartNumber);
}
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} is available. Reading from data source...",
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} is available. Reading from data source...",
currentPartNumber);
// At this point, the expected part is available in the dictionary.
@@ -446,7 +412,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat
if (!_partDataSources.TryGetValue(currentPartNumber, out var dataSource))
{
// Log technical details for troubleshooting
- Logger.Error(null, "PartBufferManager: Part {0} disappeared after availability check. This indicates a race condition in the buffer manager.", currentPartNumber);
+ _logger.Error(null, "PartBufferManager: Part {0} disappeared after availability check. This indicates a race condition in the buffer manager.", currentPartNumber);
// Throw user-friendly exception
throw new InvalidOperationException("Multipart download failed due to an internal error.");
@@ -457,13 +423,13 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat
// Read from this part's buffer into the caller's buffer.
var partBytesRead = await dataSource.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Read {0} bytes from part {1}. IsComplete={2}",
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Read {0} bytes from part {1}. IsComplete={2}",
partBytesRead, currentPartNumber, dataSource.IsComplete);
// If this part is fully consumed, perform cleanup and advance to next part.
if (dataSource.IsComplete)
{
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} is complete. Cleaning up and advancing to next part...",
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} is complete. Cleaning up and advancing to next part...",
currentPartNumber);
// Remove from collection
@@ -482,7 +448,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat
// Advance to next part.
_nextExpectedPartNumber++;
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Cleaned up part {0}. Next expected part: {1} (BufferedParts count: {2})",
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Cleaned up part {0}. Next expected part: {1} (BufferedParts count: {2})",
currentPartNumber, _nextExpectedPartNumber, _partDataSources.Count);
// Continue reading to fill buffer across part boundaries.
@@ -498,11 +464,11 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat
// If part is not complete but we got 0 bytes, it's EOF
if (partBytesRead == 0)
{
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} returned 0 bytes (EOF)", currentPartNumber);
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} returned 0 bytes (EOF)", currentPartNumber);
return (0, false);
}
- Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} has more data. Returning {1} bytes (will resume on next call)",
+ _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} has more data. Returning {1} bytes (will resume on next call)",
currentPartNumber, partBytesRead);
// Part still has more data available. Return what we read.
@@ -511,7 +477,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat
}
catch (Exception ex)
{
- Logger.Error(ex, "PartBufferManager.ReadFromCurrentPart: Error reading from part {0}: {1}",
+ _logger.Error(ex, "PartBufferManager.ReadFromCurrentPart: Error reading from part {0}: {1}",
currentPartNumber, ex.Message);
// Clean up on failure to prevent resource leaks
@@ -543,7 +509,7 @@ public void ReleaseBufferSpace()
_bufferSpaceAvailable.Release();
var availableAfter = _bufferSpaceAvailable.CurrentCount;
- Logger.DebugFormat("PartBufferManager: Buffer space released (Available slots after release: {0})", availableAfter);
+ _logger.DebugFormat("PartBufferManager: Buffer space released (Available slots after release: {0})", availableAfter);
}
///
@@ -568,11 +534,11 @@ public void MarkDownloadComplete(Exception exception)
{
if (exception != null)
{
- Logger.Error(exception, "PartBufferManager: Download marked complete with error. Signaling completion.");
+ _logger.Error(exception, "PartBufferManager: Download marked complete with error. Signaling completion.");
}
else
{
- Logger.DebugFormat("PartBufferManager: Download marked complete successfully. Signaling completion.");
+ _logger.DebugFormat("PartBufferManager: Download marked complete successfully. Signaling completion.");
}
// Create and assign new completion state atomically
diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/StreamPartBuffer.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/StreamPartBuffer.cs
deleted file mode 100644
index a850a9f9ad38..000000000000
--- a/sdk/src/Services/S3/Custom/Transfer/Internal/StreamPartBuffer.cs
+++ /dev/null
@@ -1,175 +0,0 @@
-/*******************************************************************************
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * Licensed under the Apache License, Version 2.0 (the "License"). You may not use
- * this file except in compliance with the License. A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file.
- * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- * *****************************************************************************
- * __ _ _ ___
- * ( )( \/\/ )/ __)
- * /__\ \ / \__ \
- * (_)(_) \/\/ (___/
- *
- * AWS SDK for .NET
- * API Version: 2006-03-01
- *
- */
-using System;
-using System.Buffers;
-using System.Diagnostics.CodeAnalysis;
-using Amazon.S3.Model;
-
-namespace Amazon.S3.Transfer.Internal
-{
- ///
- /// Container for downloaded part data optimized for streaming scenarios.
- /// Uses ArrayPool buffers and tracks reading position for sequential access
- /// by BufferedMultipartStream.
- ///
- internal class StreamPartBuffer : IDisposable
- {
- private bool _disposed = false;
-
- ///
- /// Gets or sets the part number for priority queue ordering.
- /// For Part GET strategy: Uses the actual part number from the multipart upload.
- /// For Range GET strategy: Calculated based on byte range position.
- ///
- public int PartNumber { get; set; }
-
- ///
- /// Gets or sets the ArrayPool buffer containing the downloaded part data.
- /// Ownership belongs to this StreamPartBuffer and will be returned to pool on disposal.
- ///
- public byte[] ArrayPoolBuffer { get; set; }
-
- ///
- /// Gets or sets the current reading position within the buffer.
- /// Used by BufferedMultipartStream for sequential reading.
- ///
- public int CurrentPosition { get; set; } = 0;
-
- ///
- /// Gets the number of bytes remaining to be read from current position.
- ///
- public int RemainingBytes => Math.Max(0, Length - CurrentPosition);
-
- ///
- /// Gets or sets the length of valid data in the ArrayPool buffer.
- /// The buffer may be larger than this due to ArrayPool size rounding.
- ///
- public int Length { get; set; }
-
- ///
- /// Creates a new StreamPartBuffer for streaming scenarios.
- /// For internal use only - external callers should use Create() factory method.
- ///
- /// The part number for ordering
- /// The ArrayPool buffer containing the data (ownership transferred)
- /// The length of valid data in the buffer
- internal StreamPartBuffer(int partNumber, byte[] arrayPoolBuffer, int length)
- {
- PartNumber = partNumber;
- ArrayPoolBuffer = arrayPoolBuffer;
- Length = length;
- CurrentPosition = 0;
- }
-
- ///
- /// Creates a new StreamPartBuffer with a rented ArrayPool buffer.
- /// The StreamPartBuffer takes ownership and will return the buffer on disposal.
- ///
- /// The part number for ordering
- /// Initial capacity needed for the buffer
- /// A StreamPartBuffer with rented buffer ready for writing
- public static StreamPartBuffer Create(int partNumber, int capacity)
- {
- var buffer = ArrayPool.Shared.Rent(capacity);
- return new StreamPartBuffer(partNumber, buffer, 0); // Length will be set after writing
- }
-
- ///
- /// Sets the length of valid data in the buffer after writing.
- /// Can only be called once to prevent state corruption.
- ///
- /// The number of valid bytes written to the buffer
- /// Thrown if length has already been set
- /// Thrown if length is negative or exceeds buffer capacity
- internal void SetLength(int length)
- {
- if (Length > 0)
- throw new InvalidOperationException("Length has already been set and cannot be changed");
-
- if (length < 0)
- throw new ArgumentOutOfRangeException(nameof(length), "Length must be non-negative");
-
- if (ArrayPoolBuffer != null && length > ArrayPoolBuffer.Length)
- throw new ArgumentOutOfRangeException(nameof(length), "Length exceeds buffer capacity");
-
- Length = length;
- }
-
- ///
- /// Returns a string representation of this StreamPartBuffer for debugging.
- ///
- /// A string describing this stream part buffer
- public override string ToString()
- {
- return $"StreamPartBuffer(Part={PartNumber}, ArrayPool={Length} bytes, pos={CurrentPosition}, remaining={RemainingBytes})";
- }
-
- #region IDisposable Implementation
-
- ///
- /// Releases all resources used by this StreamPartBuffer.
- ///
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- ///
- /// Releases the unmanaged resources used by the StreamPartBuffer and optionally releases the managed resources.
- /// Returns the ArrayPool buffer back to the shared pool.
- ///
- /// True to release both managed and unmanaged resources; false to release only unmanaged resources.
- [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Dispose methods should not throw exceptions")]
- protected virtual void Dispose(bool disposing)
- {
- if (!_disposed && disposing)
- {
- try
- {
- // Return ArrayPool buffer to shared pool
- if (ArrayPoolBuffer != null)
- {
- ArrayPool.Shared.Return(ArrayPoolBuffer);
- ArrayPoolBuffer = null;
- }
- }
- catch (Exception)
- {
-
- }
-
- _disposed = true;
- }
- }
-
- ///
- /// Finalizer to ensure resources are cleaned up if Dispose is not called.
- ///
- ~StreamPartBuffer()
- {
- Dispose(false);
- }
-
- #endregion
- }
-}
diff --git a/sdk/test/Services/S3/UnitTests/Custom/BufferedDataSourceTests.cs b/sdk/test/Services/S3/UnitTests/Custom/BufferedDataSourceTests.cs
deleted file mode 100644
index 24b25b97ea93..000000000000
--- a/sdk/test/Services/S3/UnitTests/Custom/BufferedDataSourceTests.cs
+++ /dev/null
@@ -1,487 +0,0 @@
-using Amazon.S3.Transfer.Internal;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace AWSSDK.UnitTests
-{
- ///
- /// Unit tests for BufferedDataSource class.
- /// Tests reading from pre-buffered StreamPartBuffer data.
- ///
- [TestClass]
- public class BufferedDataSourceTests
- {
- #region Constructor Tests
-
- [TestMethod]
- public void Constructor_WithValidPartBuffer_CreatesDataSource()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
-
- // Act
- var dataSource = new BufferedDataSource(partBuffer);
-
- // Assert
- Assert.IsNotNull(dataSource);
- Assert.AreEqual(1, dataSource.PartNumber);
- Assert.IsFalse(dataSource.IsComplete);
-
- // Cleanup
- dataSource.Dispose();
- }
-
- [TestMethod]
- [ExpectedException(typeof(ArgumentNullException))]
- public void Constructor_WithNullPartBuffer_ThrowsArgumentNullException()
- {
- // Act
- var dataSource = new BufferedDataSource(null);
-
- // Assert - ExpectedException
- }
-
- #endregion
-
- #region Property Tests
-
- [TestMethod]
- public void PartNumber_ReturnsPartBufferPartNumber()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(5, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- try
- {
- // Act & Assert
- Assert.AreEqual(5, dataSource.PartNumber);
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- [TestMethod]
- public void IsComplete_WhenNoRemainingBytes_ReturnsTrue()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- partBuffer.CurrentPosition = 512; // Move to end
- var dataSource = new BufferedDataSource(partBuffer);
-
- try
- {
- // Act & Assert
- Assert.IsTrue(dataSource.IsComplete);
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- [TestMethod]
- public void IsComplete_WhenRemainingBytes_ReturnsFalse()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- try
- {
- // Act & Assert
- Assert.IsFalse(dataSource.IsComplete);
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- #endregion
-
- #region ReadAsync Tests - Happy Path
-
- [TestMethod]
- public async Task ReadAsync_ReadsDataFromPartBuffer()
- {
- // Arrange
- byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- Buffer.BlockCopy(testData, 0, testBuffer, 0, 512);
-
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- byte[] readBuffer = new byte[512];
-
- try
- {
- // Act
- int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 512, CancellationToken.None);
-
- // Assert
- Assert.AreEqual(512, bytesRead);
- Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, readBuffer, 0, 512));
- Assert.IsTrue(dataSource.IsComplete);
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- [TestMethod]
- public async Task ReadAsync_WithPartialRead_ReturnsRequestedBytes()
- {
- // Arrange
- byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- Buffer.BlockCopy(testData, 0, testBuffer, 0, 512);
-
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- byte[] readBuffer = new byte[256];
-
- try
- {
- // Act
- int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 256, CancellationToken.None);
-
- // Assert
- Assert.AreEqual(256, bytesRead);
- Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, readBuffer, 0, 256));
- Assert.IsFalse(dataSource.IsComplete);
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- [TestMethod]
- public async Task ReadAsync_WithFullRead_ReadsAllRemainingBytes()
- {
- // Arrange
- byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- Buffer.BlockCopy(testData, 0, testBuffer, 0, 512);
-
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- byte[] readBuffer = new byte[1024]; // Larger than available
-
- try
- {
- // Act
- int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 1024, CancellationToken.None);
-
- // Assert
- Assert.AreEqual(512, bytesRead); // Only 512 available
- Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, readBuffer, 0, 512));
- Assert.IsTrue(dataSource.IsComplete);
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- [TestMethod]
- public async Task ReadAsync_WhenComplete_ReturnsZero()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- partBuffer.CurrentPosition = 512; // Move to end
- var dataSource = new BufferedDataSource(partBuffer);
-
- byte[] readBuffer = new byte[256];
-
- try
- {
- // Act
- int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 256, CancellationToken.None);
-
- // Assert
- Assert.AreEqual(0, bytesRead);
- Assert.IsTrue(dataSource.IsComplete);
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- #endregion
-
- #region ReadAsync Tests - Parameter Validation
-
- [TestMethod]
- [ExpectedException(typeof(ArgumentNullException))]
- public async Task ReadAsync_WithNullBuffer_ThrowsArgumentNullException()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- try
- {
- // Act
- await dataSource.ReadAsync(null, 0, 100, CancellationToken.None);
-
- // Assert - ExpectedException
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- [TestMethod]
- [ExpectedException(typeof(ArgumentOutOfRangeException))]
- public async Task ReadAsync_WithNegativeOffset_ThrowsArgumentOutOfRangeException()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
- byte[] readBuffer = new byte[256];
-
- try
- {
- // Act
- await dataSource.ReadAsync(readBuffer, -1, 100, CancellationToken.None);
-
- // Assert - ExpectedException
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- [TestMethod]
- [ExpectedException(typeof(ArgumentOutOfRangeException))]
- public async Task ReadAsync_WithNegativeCount_ThrowsArgumentOutOfRangeException()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
- byte[] readBuffer = new byte[256];
-
- try
- {
- // Act
- await dataSource.ReadAsync(readBuffer, 0, -1, CancellationToken.None);
-
- // Assert - ExpectedException
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- [TestMethod]
- [ExpectedException(typeof(ArgumentException))]
- public async Task ReadAsync_WithOffsetCountExceedingBounds_ThrowsArgumentException()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
- byte[] readBuffer = new byte[256];
-
- try
- {
- // Act - offset + count > buffer.Length
- await dataSource.ReadAsync(readBuffer, 100, 200, CancellationToken.None);
-
- // Assert - ExpectedException
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- #endregion
-
- #region ReadAsync Tests - Multiple Reads
-
- [TestMethod]
- public async Task ReadAsync_MultipleReads_ConsumesAllData()
- {
- // Arrange
- byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- Buffer.BlockCopy(testData, 0, testBuffer, 0, 512);
-
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- byte[] readBuffer1 = new byte[256];
- byte[] readBuffer2 = new byte[256];
-
- try
- {
- // Act - Read in two chunks
- int bytesRead1 = await dataSource.ReadAsync(readBuffer1, 0, 256, CancellationToken.None);
- int bytesRead2 = await dataSource.ReadAsync(readBuffer2, 0, 256, CancellationToken.None);
-
- // Assert
- Assert.AreEqual(256, bytesRead1);
- Assert.AreEqual(256, bytesRead2);
- Assert.IsTrue(dataSource.IsComplete);
-
- // Verify data correctness
- Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, readBuffer1, 0, 256));
-
- // Extract second segment manually for .NET Framework compatibility
- byte[] secondSegment = new byte[256];
- Buffer.BlockCopy(testData, 256, secondSegment, 0, 256);
- Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(
- secondSegment,
- readBuffer2,
- 0,
- 256));
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- [TestMethod]
- public async Task ReadAsync_ReadingToEnd_ReturnsZeroOnSubsequentReads()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- byte[] readBuffer = new byte[512];
-
- try
- {
- // Act - Read all data
- int bytesRead1 = await dataSource.ReadAsync(readBuffer, 0, 512, CancellationToken.None);
-
- // Try to read again
- int bytesRead2 = await dataSource.ReadAsync(readBuffer, 0, 512, CancellationToken.None);
-
- // Assert
- Assert.AreEqual(512, bytesRead1);
- Assert.AreEqual(0, bytesRead2);
- Assert.IsTrue(dataSource.IsComplete);
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- #endregion
-
- #region Error Handling Tests
-
- [TestMethod]
- public async Task ReadAsync_WhenExceptionDuringRead_MarksBufferConsumed()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- // Create a buffer that will cause BlockCopy to fail (null destination)
- byte[] readBuffer = null;
-
- try
- {
- // Act & Assert - Should throw ArgumentNullException
- await Assert.ThrowsExceptionAsync(async () =>
- {
- await dataSource.ReadAsync(readBuffer, 0, 512, CancellationToken.None);
- });
-
- // Verify buffer was marked as consumed (position set to Length)
- Assert.IsTrue(dataSource.IsComplete);
- }
- finally
- {
- dataSource.Dispose();
- }
- }
-
- #endregion
-
- #region Disposal Tests
-
- [TestMethod]
- public void Dispose_DisposesUnderlyingPartBuffer()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- // Act
- dataSource.Dispose();
-
- // Assert - The underlying part buffer should be disposed (ArrayPoolBuffer nulled)
- Assert.IsNull(partBuffer.ArrayPoolBuffer);
- }
-
- [TestMethod]
- public void Dispose_MultipleCalls_IsIdempotent()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
-
- // Act - Dispose multiple times
- dataSource.Dispose();
- dataSource.Dispose();
- dataSource.Dispose();
-
- // Assert - Should not throw
- Assert.IsNull(partBuffer.ArrayPoolBuffer);
- }
-
- [TestMethod]
- [ExpectedException(typeof(ObjectDisposedException))]
- public async Task ReadAsync_AfterDispose_ThrowsObjectDisposedException()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
- byte[] readBuffer = new byte[256];
-
- // Dispose the data source
- dataSource.Dispose();
-
- // Act - Try to read after disposal
- await dataSource.ReadAsync(readBuffer, 0, 256, CancellationToken.None);
-
- // Assert - ExpectedException
- }
-
- #endregion
- }
-}
diff --git a/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs
index 645dae927051..7e8ca79cdfcc 100644
--- a/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs
+++ b/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs
@@ -76,7 +76,7 @@ public async Task ProcessPartAsync_InOrderPart_CreatesStreamingDataSource()
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
IPartDataSource capturedDataSource = null;
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()))
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()))
.Callback((ds) => capturedDataSource = ds);
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -109,7 +109,7 @@ public async Task ProcessPartAsync_InOrderPart_ReleasesCapacityImmediately()
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var mockBufferManager = new Mock();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -122,8 +122,8 @@ public async Task ProcessPartAsync_InOrderPart_ReleasesCapacityImmediately()
// Assert - ReleaseBufferSpace should be called (through ReleaseCapacity)
// Handler calls ReleaseBufferSpace directly, which eventually calls the manager's method
- // We verify the AddBuffer was called with a StreamingDataSource
- mockBufferManager.Verify(m => m.AddBuffer(
+ // We verify the AddDataSource was called with a StreamingDataSource
+ mockBufferManager.Verify(m => m.AddDataSource(
It.Is(ds => ds is StreamingDataSource)), Times.Once);
}
finally
@@ -139,7 +139,7 @@ public async Task ProcessPartAsync_InOrderPart_DoesNotDisposeResponse()
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var mockBufferManager = new Mock();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -170,7 +170,7 @@ public async Task ProcessPartAsync_MultipleInOrderParts_AllStreamDirectly()
mockBufferManager.Setup(m => m.NextExpectedPartNumber)
.Returns(() => streamingCount + 1);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()))
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()))
.Callback((ds) =>
{
if (ds is StreamingDataSource)
@@ -207,9 +207,9 @@ public async Task ProcessPartAsync_OutOfOrderPart_BuffersToMemory()
var mockBufferManager = new Mock();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
- StreamPartBuffer capturedBuffer = null;
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()))
- .Callback((buffer) => capturedBuffer = buffer);
+ ChunkedPartDataSource capturedDataSource = null;
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()))
+ .Callback((ds) => capturedDataSource = ds as ChunkedPartDataSource);
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -222,13 +222,13 @@ public async Task ProcessPartAsync_OutOfOrderPart_BuffersToMemory()
await handler.ProcessPartAsync(2, response, CancellationToken.None);
// Assert
- Assert.IsNotNull(capturedBuffer);
- Assert.AreEqual(2, capturedBuffer.PartNumber);
- Assert.AreEqual(512, capturedBuffer.Length);
+ Assert.IsNotNull(capturedDataSource);
+ Assert.AreEqual(2, capturedDataSource.PartNumber);
- // Verify data was buffered correctly
+ // Verify data was buffered correctly by reading from the ChunkedPartDataSource
byte[] bufferData = new byte[512];
- Buffer.BlockCopy(capturedBuffer.ArrayPoolBuffer, 0, bufferData, 0, 512);
+ int bytesRead = await capturedDataSource.ReadAsync(bufferData, 0, 512, CancellationToken.None);
+ Assert.AreEqual(512, bytesRead);
Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, bufferData, 0, 512));
}
finally
@@ -244,7 +244,7 @@ public async Task ProcessPartAsync_OutOfOrderPart_DisposesResponse()
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var mockBufferManager = new Mock();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -272,7 +272,7 @@ public async Task ProcessPartAsync_OutOfOrderPart_DoesNotReleaseCapacityImmediat
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var mockBufferManager = new Mock();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -283,9 +283,9 @@ public async Task ProcessPartAsync_OutOfOrderPart_DoesNotReleaseCapacityImmediat
// Act
await handler.ProcessPartAsync(2, response, CancellationToken.None);
- // Assert - AddBuffer should be called with StreamPartBuffer (not IPartDataSource)
- mockBufferManager.Verify(m => m.AddBuffer(
- It.IsAny()), Times.Once);
+ // Assert - AddDataSource should be called with ChunkedPartDataSource
+ mockBufferManager.Verify(m => m.AddDataSource(
+ It.IsAny()), Times.Once);
// Note: Capacity will be released later when the buffer is consumed by the reader
}
@@ -313,7 +313,7 @@ public async Task ProcessPartAsync_MixedInOrderAndOutOfOrder_HandlesCorrectly()
var streamingParts = 0;
var bufferedParts = 0;
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()))
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()))
.Callback((ds) =>
{
if (ds is StreamingDataSource)
@@ -323,8 +323,8 @@ public async Task ProcessPartAsync_MixedInOrderAndOutOfOrder_HandlesCorrectly()
}
});
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()))
- .Callback((buffer) => bufferedParts++);
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()))
+ .Callback((ds) => bufferedParts++);
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -356,8 +356,8 @@ public async Task ProcessPartAsync_InOrderFollowedByOutOfOrder_HandlesCorrectly(
.Returns(1)
.Returns(2);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -368,11 +368,11 @@ public async Task ProcessPartAsync_InOrderFollowedByOutOfOrder_HandlesCorrectly(
await handler.ProcessPartAsync(3, CreateMockGetObjectResponse(512), CancellationToken.None);
// Assert
- mockBufferManager.Verify(m => m.AddBuffer(
+ mockBufferManager.Verify(m => m.AddDataSource(
It.Is(ds => ds is StreamingDataSource && ds.PartNumber == 1)), Times.Once);
- mockBufferManager.Verify(m => m.AddBuffer(
- It.Is(b => b.PartNumber == 3)), Times.Once);
+ mockBufferManager.Verify(m => m.AddDataSource(
+ It.Is(ds => ds.PartNumber == 3)), Times.Once);
}
finally
{
@@ -392,8 +392,8 @@ public async Task ProcessPartAsync_OutOfOrderFollowedByInOrder_HandlesCorrectly(
// Part 1 (in order): calls it twice, should return 1 both times
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -404,10 +404,10 @@ public async Task ProcessPartAsync_OutOfOrderFollowedByInOrder_HandlesCorrectly(
await handler.ProcessPartAsync(1, CreateMockGetObjectResponse(512), CancellationToken.None);
// Assert
- mockBufferManager.Verify(m => m.AddBuffer(
- It.Is(b => b.PartNumber == 2)), Times.Once);
+ mockBufferManager.Verify(m => m.AddDataSource(
+ It.Is(ds => ds.PartNumber == 2)), Times.Once);
- mockBufferManager.Verify(m => m.AddBuffer(
+ mockBufferManager.Verify(m => m.AddDataSource(
It.Is(ds => ds is StreamingDataSource && ds.PartNumber == 1)), Times.Once);
}
finally
@@ -429,20 +429,20 @@ public async Task ProcessPartAsync_InOrderVsOutOfOrder_VerifyStreamingVsBufferin
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
- // Capture StreamingDataSource additions (streaming path - NO ArrayPool allocation)
- mockBufferManager.Setup(m => m.AddBuffer(
+ // Capture StreamingDataSource additions (streaming path - NO buffering)
+ mockBufferManager.Setup(m => m.AddDataSource(
It.IsAny()))
.Callback((ds) =>
{
streamingPartNumbers.Add(ds.PartNumber);
});
- // Capture StreamPartBuffer additions (buffering path - USES ArrayPool)
- mockBufferManager.Setup(m => m.AddBuffer(
- It.IsAny()))
- .Callback((buffer) =>
+ // Capture ChunkedPartDataSource additions (buffering path - uses ChunkedBufferStream)
+ mockBufferManager.Setup(m => m.AddDataSource(
+ It.IsAny()))
+ .Callback((ds) =>
{
- bufferedPartNumbers.Add(buffer.PartNumber);
+ bufferedPartNumbers.Add(ds.PartNumber);
});
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -456,11 +456,11 @@ public async Task ProcessPartAsync_InOrderVsOutOfOrder_VerifyStreamingVsBufferin
await handler.ProcessPartAsync(3, CreateMockGetObjectResponse(512), CancellationToken.None);
// Assert
- // Part 1 should use streaming path (no ArrayPool allocation)
+ // Part 1 should use streaming path (no buffering)
Assert.AreEqual(1, streamingPartNumbers.Count, "Expected exactly 1 part to stream");
Assert.AreEqual(1, streamingPartNumbers[0], "Part 1 should stream directly");
- // Part 3 should use buffering path (ArrayPool allocation)
+ // Part 3 should use buffering path (ChunkedBufferStream)
Assert.AreEqual(1, bufferedPartNumbers.Count, "Expected exactly 1 part to be buffered");
Assert.AreEqual(3, bufferedPartNumbers[0], "Part 3 should be buffered");
}
@@ -485,7 +485,7 @@ public async Task ProcessPartAsync_AllInOrderParts_NoBufferingAllStreaming()
.Returns(() => currentExpectedPart);
// Capture streaming additions
- mockBufferManager.Setup(m => m.AddBuffer(
+ mockBufferManager.Setup(m => m.AddDataSource(
It.IsAny()))
.Callback((ds) =>
{
@@ -494,11 +494,11 @@ public async Task ProcessPartAsync_AllInOrderParts_NoBufferingAllStreaming()
});
// Capture buffering additions
- mockBufferManager.Setup(m => m.AddBuffer(
- It.IsAny()))
- .Callback((buffer) =>
+ mockBufferManager.Setup(m => m.AddDataSource(
+ It.IsAny()))
+ .Callback((ds) =>
{
- bufferedPartNumbers.Add(buffer.PartNumber);
+ bufferedPartNumbers.Add(ds.PartNumber);
});
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -539,7 +539,7 @@ public async Task ProcessPartAsync_StreamingPathError_ReleasesCapacity()
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var mockBufferManager = new Mock();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()))
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()))
.Throws(new InvalidOperationException("Test error"));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -570,7 +570,7 @@ public async Task ProcessPartAsync_BufferingPathError_ReleasesCapacity()
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var mockBufferManager = new Mock();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()))
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()))
.Throws(new InvalidOperationException("Test error"));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -779,7 +779,7 @@ public async Task ProcessPartAsync_StreamingPart_ReleasesCapacityOnlyOnce()
mockBufferManager.Setup(m => m.ReleaseBufferSpace())
.Callback(() => releaseCount++);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -797,8 +797,8 @@ public async Task ProcessPartAsync_StreamingPart_ReleasesCapacityOnlyOnce()
"ProcessPartAsync should not release capacity for streaming parts. " +
"Capacity is released by PartBufferManager when consumer completes reading.");
- // Verify AddBuffer was called with StreamingDataSource (streaming path taken)
- mockBufferManager.Verify(m => m.AddBuffer(
+ // Verify AddDataSource was called with StreamingDataSource (streaming path taken)
+ mockBufferManager.Verify(m => m.AddDataSource(
It.Is(ds => ds is StreamingDataSource)), Times.Once);
}
finally
@@ -822,7 +822,7 @@ public async Task ProcessPartAsync_BufferedPart_DoesNotReleaseCapacityImmediatel
mockBufferManager.Setup(m => m.ReleaseBufferSpace())
.Callback(() => releaseCount++);
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()));
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
@@ -839,9 +839,9 @@ public async Task ProcessPartAsync_BufferedPart_DoesNotReleaseCapacityImmediatel
"ProcessPartAsync should not release capacity for buffered parts. " +
"Capacity is released by PartBufferManager when consumer completes reading.");
- // Verify AddBuffer was called with StreamPartBuffer (buffering path taken)
- mockBufferManager.Verify(m => m.AddBuffer(
- It.IsAny()), Times.Once);
+ // Verify AddDataSource was called with ChunkedPartDataSource (buffering path taken)
+ mockBufferManager.Verify(m => m.AddDataSource(
+ It.IsAny()), Times.Once);
}
finally
{
@@ -865,7 +865,7 @@ public async Task ProcessPartAsync_StreamingPartError_DoesNotDoubleRelease()
.Callback(() => releaseCount++);
// Simulate error when adding buffer
- mockBufferManager.Setup(m => m.AddBuffer(It.IsAny()))
+ mockBufferManager.Setup(m => m.AddDataSource(It.IsAny()))
.Throws(new InvalidOperationException("Test error"));
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
diff --git a/sdk/test/Services/S3/UnitTests/Custom/ChunkedBufferStreamTests.cs b/sdk/test/Services/S3/UnitTests/Custom/ChunkedBufferStreamTests.cs
new file mode 100644
index 000000000000..68689e4a417a
--- /dev/null
+++ b/sdk/test/Services/S3/UnitTests/Custom/ChunkedBufferStreamTests.cs
@@ -0,0 +1,864 @@
+using Amazon.S3.Transfer.Internal;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System;
+using System.IO;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace AWSSDK.UnitTests
+{
+ ///
+ /// Unit tests for ChunkedBufferStream class.
+ /// Tests ArrayPool-based chunked buffer management and Stream interface compliance.
+ ///
+ [TestClass]
+ public class ChunkedBufferStreamTests
+ {
+ #region Creation and Mode Tests
+
+ [TestMethod]
+ public void Constructor_InitializesInWriteMode()
+ {
+ // Arrange & Act
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Assert
+ Assert.IsTrue(stream.CanWrite);
+ Assert.IsFalse(stream.CanRead);
+ Assert.IsFalse(stream.CanSeek);
+ Assert.AreEqual(0, stream.Length);
+ Assert.AreEqual(0, stream.Position);
+ }
+ }
+
+ [TestMethod]
+ public void SwitchToReadMode_TransitionsCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = Encoding.UTF8.GetBytes("Test data for read mode");
+ stream.Write(testData, 0, testData.Length);
+
+ // Act
+ stream.SwitchToReadMode();
+
+ // Assert
+ Assert.IsTrue(stream.CanRead);
+ Assert.IsFalse(stream.CanWrite);
+ Assert.IsFalse(stream.CanSeek);
+ Assert.AreEqual(0, stream.Position); // Position reset to 0
+ Assert.AreEqual(testData.Length, stream.Length);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public void SwitchToReadMode_CalledTwice_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ stream.SwitchToReadMode();
+
+ // Act - Should throw
+ stream.SwitchToReadMode();
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(NotSupportedException))]
+ public void Write_AfterSwitchToReadMode_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ stream.SwitchToReadMode();
+ byte[] testData = new byte[100];
+
+ // Act - Should throw
+ stream.Write(testData, 0, testData.Length);
+ }
+ }
+
+ #endregion
+
+ #region Write Operation Tests
+
+ [TestMethod]
+ public void Write_SingleChunk_WritesCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = new byte[1024];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)(i % 256);
+
+ // Act
+ stream.Write(testData, 0, testData.Length);
+
+ // Assert
+ Assert.AreEqual(testData.Length, stream.Length);
+ Assert.AreEqual(testData.Length, stream.Position);
+
+ // Verify data by reading back
+ stream.SwitchToReadMode();
+ byte[] readBuffer = new byte[testData.Length];
+ int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length);
+
+ Assert.AreEqual(testData.Length, bytesRead);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ [TestMethod]
+ public void Write_MultipleChunks_WritesCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Write 200KB of data (spans multiple 80KB chunks)
+ int totalSize = 200 * 1024;
+ byte[] testData = new byte[totalSize];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)(i % 256);
+
+ // Act
+ stream.Write(testData, 0, testData.Length);
+
+ // Assert
+ Assert.AreEqual(totalSize, stream.Length);
+
+ // Verify data by reading back
+ stream.SwitchToReadMode();
+ byte[] readBuffer = new byte[totalSize];
+ int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length);
+
+ Assert.AreEqual(totalSize, bytesRead);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ [TestMethod]
+ public void Write_AtChunkBoundary_WritesCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Write exactly 80KB (one chunk size)
+ int chunkSize = 81920; // 80KB
+ byte[] testData = new byte[chunkSize];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)(i % 256);
+
+ // Act
+ stream.Write(testData, 0, testData.Length);
+
+ // Assert
+ Assert.AreEqual(chunkSize, stream.Length);
+
+ // Write one more byte to trigger next chunk
+ byte[] oneByte = new byte[] { 42 };
+ stream.Write(oneByte, 0, 1);
+ Assert.AreEqual(chunkSize + 1, stream.Length);
+
+ // Verify data
+ stream.SwitchToReadMode();
+ byte[] readBuffer = new byte[chunkSize + 1];
+ int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length);
+
+ Assert.AreEqual(chunkSize + 1, bytesRead);
+ for (int i = 0; i < chunkSize; i++)
+ Assert.AreEqual(testData[i], readBuffer[i]);
+ Assert.AreEqual(42, readBuffer[chunkSize]);
+ }
+ }
+
+ [TestMethod]
+ public async Task WriteAsync_DelegatesToWrite()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = Encoding.UTF8.GetBytes("Test async write");
+
+ // Act
+ await stream.WriteAsync(testData, 0, testData.Length, CancellationToken.None);
+
+ // Assert
+ Assert.AreEqual(testData.Length, stream.Length);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentNullException))]
+ public void Write_NullBuffer_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Act - Should throw
+ stream.Write(null, 0, 100);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void Write_NegativeOffset_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] buffer = new byte[100];
+
+ // Act - Should throw
+ stream.Write(buffer, -1, 50);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void Write_NegativeCount_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] buffer = new byte[100];
+
+ // Act - Should throw
+ stream.Write(buffer, 0, -1);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void Write_OffsetAndCountExceedBufferBounds_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] buffer = new byte[100];
+
+ // Act - Should throw
+ stream.Write(buffer, 50, 60); // 50 + 60 = 110 > 100
+ }
+ }
+
+ #endregion
+
+ #region Read Operation Tests
+
+ [TestMethod]
+ public void Read_SingleChunk_ReadsCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = new byte[1024];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)(i % 256);
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ // Act
+ byte[] readBuffer = new byte[testData.Length];
+ int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length);
+
+ // Assert
+ Assert.AreEqual(testData.Length, bytesRead);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ Assert.AreEqual(testData.Length, stream.Position);
+ }
+ }
+
+ [TestMethod]
+ public void Read_AcrossMultipleChunks_ReadsCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Write 200KB spanning multiple chunks
+ int totalSize = 200 * 1024;
+ byte[] testData = new byte[totalSize];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)(i % 256);
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ // Act - Read in one go
+ byte[] readBuffer = new byte[totalSize];
+ int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length);
+
+ // Assert
+ Assert.AreEqual(totalSize, bytesRead);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ [TestMethod]
+ public void Read_InMultipleChunks_ReadsCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = new byte[10000];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)(i % 256);
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ // Act - Read in multiple smaller reads
+ byte[] readBuffer = new byte[testData.Length];
+ int totalRead = 0;
+ int chunkSize = 1000;
+
+ while (totalRead < testData.Length)
+ {
+ int bytesRead = stream.Read(readBuffer, totalRead, Math.Min(chunkSize, testData.Length - totalRead));
+ totalRead += bytesRead;
+ }
+
+ // Assert
+ Assert.AreEqual(testData.Length, totalRead);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ [TestMethod]
+ public void Read_AtEndOfStream_ReturnsZero()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = new byte[100];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ // Read all data
+ byte[] readBuffer = new byte[100];
+ stream.Read(readBuffer, 0, 100);
+
+ // Act - Try to read more
+ int bytesRead = stream.Read(readBuffer, 0, 100);
+
+ // Assert
+ Assert.AreEqual(0, bytesRead);
+ Assert.AreEqual(100, stream.Position);
+ }
+ }
+
+ [TestMethod]
+ public void Read_PartialData_ReadsAvailableBytes()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = new byte[50];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)i;
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ // Act - Try to read more than available
+ byte[] readBuffer = new byte[100];
+ int bytesRead = stream.Read(readBuffer, 0, 100);
+
+ // Assert - Should only read 50 bytes
+ Assert.AreEqual(50, bytesRead);
+ for (int i = 0; i < 50; i++)
+ Assert.AreEqual(testData[i], readBuffer[i]);
+ }
+ }
+
+ [TestMethod]
+ public async Task ReadAsync_DelegatesToRead()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = Encoding.UTF8.GetBytes("Test async read");
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ // Act
+ byte[] readBuffer = new byte[testData.Length];
+ int bytesRead = await stream.ReadAsync(readBuffer, 0, readBuffer.Length, CancellationToken.None);
+
+ // Assert
+ Assert.AreEqual(testData.Length, bytesRead);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentNullException))]
+ public void Read_NullBuffer_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ stream.SwitchToReadMode();
+
+ // Act - Should throw
+ stream.Read(null, 0, 100);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void Read_NegativeOffset_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ stream.SwitchToReadMode();
+ byte[] buffer = new byte[100];
+
+ // Act - Should throw
+ stream.Read(buffer, -1, 50);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void Read_NegativeCount_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ stream.SwitchToReadMode();
+ byte[] buffer = new byte[100];
+
+ // Act - Should throw
+ stream.Read(buffer, 0, -1);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void Read_OffsetAndCountExceedBufferBounds_ThrowsException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ stream.SwitchToReadMode();
+ byte[] buffer = new byte[100];
+
+ // Act - Should throw
+ stream.Read(buffer, 50, 60); // 50 + 60 = 110 > 100
+ }
+ }
+
+ #endregion
+
+ #region Property Tests
+
+ [TestMethod]
+ public void CanRead_ReflectsCurrentMode()
+ {
+ // Arrange & Act
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Assert - In write mode
+ Assert.IsFalse(stream.CanRead);
+
+ // Switch to read mode
+ stream.SwitchToReadMode();
+ Assert.IsTrue(stream.CanRead);
+ }
+ }
+
+ [TestMethod]
+ public void CanWrite_ReflectsCurrentMode()
+ {
+ // Arrange & Act
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Assert - In write mode
+ Assert.IsTrue(stream.CanWrite);
+
+ // Switch to read mode
+ stream.SwitchToReadMode();
+ Assert.IsFalse(stream.CanWrite);
+ }
+ }
+
+ [TestMethod]
+ public void CanSeek_AlwaysReturnsFalse()
+ {
+ // Arrange & Act
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Assert - In write mode
+ Assert.IsFalse(stream.CanSeek);
+
+ // Switch to read mode
+ stream.SwitchToReadMode();
+ Assert.IsFalse(stream.CanSeek);
+ }
+ }
+
+ [TestMethod]
+ public void Length_ReturnsCorrectValue()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Assert - Initially zero
+ Assert.AreEqual(0, stream.Length);
+
+ // Write some data
+ byte[] testData = new byte[5000];
+ stream.Write(testData, 0, testData.Length);
+
+ // Assert - After writing
+ Assert.AreEqual(5000, stream.Length);
+
+ // Switch to read mode
+ stream.SwitchToReadMode();
+
+ // Assert - Length unchanged
+ Assert.AreEqual(5000, stream.Length);
+ }
+ }
+
+ [TestMethod]
+ public void Position_GetReturnsCorrectValue()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = new byte[1000];
+ stream.Write(testData, 0, testData.Length);
+
+ // Assert - Position at end after write
+ Assert.AreEqual(1000, stream.Position);
+
+ // Switch to read mode
+ stream.SwitchToReadMode();
+
+ // Assert - Position reset to 0
+ Assert.AreEqual(0, stream.Position);
+
+ // Read some data
+ byte[] readBuffer = new byte[300];
+ stream.Read(readBuffer, 0, 300);
+
+ // Assert - Position updated after read
+ Assert.AreEqual(300, stream.Position);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(NotSupportedException))]
+ public void Position_Set_ThrowsNotSupportedException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Act - Should throw
+ stream.Position = 100;
+ }
+ }
+
+ #endregion
+
+ #region Not Supported Operations
+
+ [TestMethod]
+ [ExpectedException(typeof(NotSupportedException))]
+ public void Seek_ThrowsNotSupportedException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Act - Should throw
+ stream.Seek(0, SeekOrigin.Begin);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(NotSupportedException))]
+ public void SetLength_ThrowsNotSupportedException()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Act - Should throw
+ stream.SetLength(1000);
+ }
+ }
+
+ [TestMethod]
+ public void Flush_DoesNotThrow()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ byte[] testData = new byte[100];
+ stream.Write(testData, 0, testData.Length);
+
+ // Act - Should not throw
+ stream.Flush();
+
+ // Assert - No-op, just verify it doesn't throw
+ Assert.AreEqual(100, stream.Length);
+ }
+ }
+
+ #endregion
+
+ #region Disposal Tests
+
+ [TestMethod]
+ public void Dispose_ReturnsChunksToArrayPool()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[100000]; // Enough to allocate chunks
+ stream.Write(testData, 0, testData.Length);
+
+ // Act
+ stream.Dispose();
+
+ // Assert - Accessing properties should throw ObjectDisposedException
+ try
+ {
+ var length = stream.Length;
+ Assert.Fail("Expected ObjectDisposedException");
+ }
+ catch (ObjectDisposedException)
+ {
+ // Expected
+ }
+ }
+
+ [TestMethod]
+ public void Dispose_MultipleCalls_IsIdempotent()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[1000];
+ stream.Write(testData, 0, testData.Length);
+
+ // Act - Dispose multiple times
+ stream.Dispose();
+ stream.Dispose();
+ stream.Dispose();
+
+ // Assert - Should not throw
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ObjectDisposedException))]
+ public void Write_AfterDispose_ThrowsObjectDisposedException()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ stream.Dispose();
+
+ // Act - Should throw
+ byte[] testData = new byte[100];
+ stream.Write(testData, 0, testData.Length);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ObjectDisposedException))]
+ public void Read_AfterDispose_ThrowsObjectDisposedException()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ stream.SwitchToReadMode();
+ stream.Dispose();
+
+ // Act - Should throw
+ byte[] buffer = new byte[100];
+ stream.Read(buffer, 0, 100);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ObjectDisposedException))]
+ public void Length_AfterDispose_ThrowsObjectDisposedException()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ stream.Dispose();
+
+ // Act - Should throw
+ var length = stream.Length;
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ObjectDisposedException))]
+ public void Position_AfterDispose_ThrowsObjectDisposedException()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ stream.Dispose();
+
+ // Act - Should throw
+ var position = stream.Position;
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ObjectDisposedException))]
+ public void SwitchToReadMode_AfterDispose_ThrowsObjectDisposedException()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ stream.Dispose();
+
+ // Act - Should throw
+ stream.SwitchToReadMode();
+ }
+
+ #endregion
+
+ #region Edge Cases
+
+ [TestMethod]
+ public void EmptyStream_SwitchToReadMode_WorksCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Act - Don't write any data, just switch to read mode
+ stream.SwitchToReadMode();
+
+ // Assert
+ Assert.AreEqual(0, stream.Length);
+ Assert.AreEqual(0, stream.Position);
+ Assert.IsTrue(stream.CanRead);
+
+ // Read should return 0
+ byte[] buffer = new byte[100];
+ int bytesRead = stream.Read(buffer, 0, 100);
+ Assert.AreEqual(0, bytesRead);
+ }
+ }
+
+ [TestMethod]
+ public void Write_AtExactChunkBoundaries_HandlesCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ int chunkSize = 81920; // 80KB
+
+ // Write exactly 2 chunks
+ byte[] chunk1 = new byte[chunkSize];
+ byte[] chunk2 = new byte[chunkSize];
+
+ for (int i = 0; i < chunkSize; i++)
+ {
+ chunk1[i] = 1;
+ chunk2[i] = 2;
+ }
+
+ // Act
+ stream.Write(chunk1, 0, chunkSize);
+ stream.Write(chunk2, 0, chunkSize);
+
+ // Assert
+ Assert.AreEqual(chunkSize * 2, stream.Length);
+
+ // Verify data
+ stream.SwitchToReadMode();
+ byte[] readBuffer = new byte[chunkSize * 2];
+ int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length);
+
+ Assert.AreEqual(chunkSize * 2, bytesRead);
+
+ // Verify first chunk
+ for (int i = 0; i < chunkSize; i++)
+ Assert.AreEqual(1, readBuffer[i]);
+
+ // Verify second chunk
+ for (int i = chunkSize; i < chunkSize * 2; i++)
+ Assert.AreEqual(2, readBuffer[i]);
+ }
+ }
+
+ [TestMethod]
+ public void Read_AtChunkBoundary_HandlesCorrectly()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ int chunkSize = 81920; // 80KB
+
+ // Write 2.5 chunks worth of data
+ byte[] testData = new byte[chunkSize * 2 + chunkSize / 2];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)(i % 256);
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ // Act - Read exactly one chunk
+ byte[] readBuffer1 = new byte[chunkSize];
+ int bytesRead1 = stream.Read(readBuffer1, 0, chunkSize);
+
+ // Act - Read next chunk
+ byte[] readBuffer2 = new byte[chunkSize];
+ int bytesRead2 = stream.Read(readBuffer2, 0, chunkSize);
+
+ // Act - Read remaining
+ byte[] readBuffer3 = new byte[chunkSize];
+ int bytesRead3 = stream.Read(readBuffer3, 0, chunkSize);
+
+ // Assert
+ Assert.AreEqual(chunkSize, bytesRead1);
+ Assert.AreEqual(chunkSize, bytesRead2);
+ Assert.AreEqual(chunkSize / 2, bytesRead3);
+
+ // Verify data integrity
+ for (int i = 0; i < chunkSize; i++)
+ Assert.AreEqual((byte)(i % 256), readBuffer1[i]);
+
+ for (int i = 0; i < chunkSize; i++)
+ Assert.AreEqual((byte)((chunkSize + i) % 256), readBuffer2[i]);
+
+ for (int i = 0; i < chunkSize / 2; i++)
+ Assert.AreEqual((byte)((chunkSize * 2 + i) % 256), readBuffer3[i]);
+ }
+ }
+
+ [TestMethod]
+ public void LargeData_HandlesMultipleChunks()
+ {
+ // Arrange
+ using (var stream = new ChunkedBufferStream())
+ {
+ // Write 500KB of data (spans ~6 chunks)
+ int totalSize = 500 * 1024;
+ byte[] testData = new byte[totalSize];
+ var random = new Random(42); // Fixed seed for reproducibility
+ random.NextBytes(testData);
+
+ // Act - Write
+ stream.Write(testData, 0, testData.Length);
+
+ // Assert - Length
+ Assert.AreEqual(totalSize, stream.Length);
+
+ // Act - Read back
+ stream.SwitchToReadMode();
+ byte[] readBuffer = new byte[totalSize];
+ int bytesRead = stream.Read(readBuffer, 0, totalSize);
+
+ // Assert - Data integrity
+ Assert.AreEqual(totalSize, bytesRead);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ #endregion
+ }
+}
diff --git a/sdk/test/Services/S3/UnitTests/Custom/ChunkedPartDataSourceTests.cs b/sdk/test/Services/S3/UnitTests/Custom/ChunkedPartDataSourceTests.cs
new file mode 100644
index 000000000000..e920c1db9537
--- /dev/null
+++ b/sdk/test/Services/S3/UnitTests/Custom/ChunkedPartDataSourceTests.cs
@@ -0,0 +1,602 @@
+using Amazon.S3.Transfer.Internal;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace AWSSDK.UnitTests
+{
+ ///
+ /// Unit tests for ChunkedPartDataSource class.
+ /// Tests IPartDataSource implementation wrapping ChunkedBufferStream.
+ ///
+ [TestClass]
+ public class ChunkedPartDataSourceTests
+ {
+ #region Creation Tests
+
+ [TestMethod]
+ public void Constructor_WithValidStream_CreatesDataSource()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = Encoding.UTF8.GetBytes("Test data for part");
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ // Act
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Assert
+ Assert.AreEqual(1, dataSource.PartNumber);
+ Assert.IsFalse(dataSource.IsComplete);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentNullException))]
+ public void Constructor_WithNullStream_ThrowsException()
+ {
+ // Act - Should throw
+ var dataSource = new ChunkedPartDataSource(1, null);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public void Constructor_WithStreamNotInReadMode_ThrowsException()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = Encoding.UTF8.GetBytes("Test data");
+ stream.Write(testData, 0, testData.Length);
+ // Don't call SwitchToReadMode()
+
+ // Act - Should throw
+ var dataSource = new ChunkedPartDataSource(1, stream);
+ }
+
+ [TestMethod]
+ public void Constructor_SetsPartNumberCorrectly()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ stream.SwitchToReadMode();
+
+ // Act
+ using (var dataSource = new ChunkedPartDataSource(5, stream))
+ {
+ // Assert
+ Assert.AreEqual(5, dataSource.PartNumber);
+ }
+ }
+
+ #endregion
+
+ #region ReadAsync Tests
+
+ [TestMethod]
+ public async Task ReadAsync_DelegatesToUnderlyingStream()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = Encoding.UTF8.GetBytes("Test data for reading");
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Act
+ byte[] readBuffer = new byte[testData.Length];
+ int bytesRead = await dataSource.ReadAsync(readBuffer, 0, readBuffer.Length, CancellationToken.None);
+
+ // Assert
+ Assert.AreEqual(testData.Length, bytesRead);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ [TestMethod]
+ public async Task ReadAsync_ReturnsCorrectByteCount()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[1000];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)(i % 256);
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Act
+ byte[] readBuffer = new byte[500];
+ int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 500, CancellationToken.None);
+
+ // Assert
+ Assert.AreEqual(500, bytesRead);
+ for (int i = 0; i < 500; i++)
+ Assert.AreEqual(testData[i], readBuffer[i]);
+ }
+ }
+
+ [TestMethod]
+ public async Task ReadAsync_HandlesPartialReads()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[100];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)i;
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Act - Try to read more than available
+ byte[] readBuffer = new byte[200];
+ int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 200, CancellationToken.None);
+
+ // Assert - Should only read 100 bytes
+ Assert.AreEqual(100, bytesRead);
+ for (int i = 0; i < 100; i++)
+ Assert.AreEqual(testData[i], readBuffer[i]);
+ }
+ }
+
+ [TestMethod]
+ public async Task ReadAsync_RespectsCancellationToken()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = Encoding.UTF8.GetBytes("Test cancellation");
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ var cts = new CancellationTokenSource();
+ cts.Cancel(); // Cancel immediately
+
+ // Act & Assert
+ byte[] readBuffer = new byte[100];
+ try
+ {
+ await dataSource.ReadAsync(readBuffer, 0, 100, cts.Token);
+ Assert.Fail("Expected OperationCanceledException");
+ }
+ catch (OperationCanceledException)
+ {
+ // Expected
+ }
+ }
+ }
+
+ [TestMethod]
+ public async Task ReadAsync_WorksWithVariousBufferSizes()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ int totalSize = 10000;
+ byte[] testData = new byte[totalSize];
+ for (int i = 0; i < totalSize; i++)
+ testData[i] = (byte)(i % 256);
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Act - Read in varying chunk sizes
+ byte[] readBuffer = new byte[totalSize];
+ int totalRead = 0;
+ int[] chunkSizes = { 100, 500, 1000, 2000, 6400 }; // Various sizes
+
+ foreach (int chunkSize in chunkSizes)
+ {
+ int bytesRead = await dataSource.ReadAsync(readBuffer, totalRead, chunkSize, CancellationToken.None);
+ totalRead += bytesRead;
+ }
+
+ // Assert
+ Assert.AreEqual(totalSize, totalRead);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ [TestMethod]
+ public async Task ReadAsync_ReturnsZeroAtEndOfStream()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[100];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Read all data
+ byte[] readBuffer = new byte[100];
+ await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None);
+
+ // Act - Try to read more
+ int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None);
+
+ // Assert
+ Assert.AreEqual(0, bytesRead);
+ }
+ }
+
+ #endregion
+
+ #region IsComplete Property Tests
+
+ [TestMethod]
+ public void IsComplete_ReturnsFalse_WhenDataRemains()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[1000];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Assert
+ Assert.IsFalse(dataSource.IsComplete);
+ }
+ }
+
+ [TestMethod]
+ public async Task IsComplete_ReturnsTrue_WhenFullyConsumed()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[100];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Act - Read all data
+ byte[] readBuffer = new byte[100];
+ await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None);
+
+ // Assert
+ Assert.IsTrue(dataSource.IsComplete);
+ }
+ }
+
+ [TestMethod]
+ public async Task IsComplete_UpdatesCorrectly_AsDataIsRead()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[300];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Assert - Initially not complete
+ Assert.IsFalse(dataSource.IsComplete);
+
+ // Act - Read partial data
+ byte[] readBuffer = new byte[100];
+ await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None);
+
+ // Assert - Still not complete
+ Assert.IsFalse(dataSource.IsComplete);
+
+ // Act - Read more data
+ await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None);
+
+ // Assert - Still not complete
+ Assert.IsFalse(dataSource.IsComplete);
+
+ // Act - Read remaining data
+ await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None);
+
+ // Assert - Now complete
+ Assert.IsTrue(dataSource.IsComplete);
+ }
+ }
+
+ [TestMethod]
+ public void IsComplete_ReturnsTrue_ForEmptyStream()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ stream.SwitchToReadMode(); // Empty stream
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Assert - Empty stream is considered "complete" since Position == Length (both 0)
+ Assert.IsTrue(dataSource.IsComplete);
+ }
+ }
+
+ #endregion
+
+ #region Disposal Tests
+
+ [TestMethod]
+ public void Dispose_ReleasesUnderlyingStream()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[1000];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ var dataSource = new ChunkedPartDataSource(1, stream);
+
+ // Act
+ dataSource.Dispose();
+
+ // Assert - Stream should be disposed, accessing it should throw
+ try
+ {
+ var length = stream.Length;
+ Assert.Fail("Expected ObjectDisposedException");
+ }
+ catch (ObjectDisposedException)
+ {
+ // Expected
+ }
+ }
+
+ [TestMethod]
+ public void Dispose_MultipleCalls_AreSafe()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[1000];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ var dataSource = new ChunkedPartDataSource(1, stream);
+
+ // Act - Dispose multiple times
+ dataSource.Dispose();
+ dataSource.Dispose();
+ dataSource.Dispose();
+
+ // Assert - Should not throw
+ }
+
+ [TestMethod]
+ public void Dispose_ReturnsChunksToArrayPool()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ // Write enough data to allocate multiple chunks
+ byte[] testData = new byte[200 * 1024]; // 200KB
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ var dataSource = new ChunkedPartDataSource(1, stream);
+
+ // Act
+ dataSource.Dispose();
+
+ // Assert - Stream disposed, chunks returned to pool
+ // We can't directly verify ArrayPool state, but we can verify disposal
+ try
+ {
+ var length = stream.Length;
+ Assert.Fail("Expected ObjectDisposedException");
+ }
+ catch (ObjectDisposedException)
+ {
+ // Expected - confirms disposal happened
+ }
+ }
+
+ #endregion
+
+ #region ToString Tests
+
+ [TestMethod]
+ public void ToString_ReturnsExpectedFormat()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[1500];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(3, stream))
+ {
+ // Act
+ string result = dataSource.ToString();
+
+ // Assert - Verify format contains key information
+ Assert.IsTrue(result.Contains("Part=3"));
+ Assert.IsTrue(result.Contains("1500 bytes"));
+ Assert.IsTrue(result.Contains("Position=0"));
+ }
+ }
+
+ [TestMethod]
+ public async Task ToString_ReflectsCurrentPosition()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[1000];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(2, stream))
+ {
+ // Read 300 bytes
+ byte[] readBuffer = new byte[300];
+ await dataSource.ReadAsync(readBuffer, 0, 300, CancellationToken.None);
+
+ // Act
+ string result = dataSource.ToString();
+
+ // Assert - Position should be reflected
+ Assert.IsTrue(result.Contains("Part=2"));
+ Assert.IsTrue(result.Contains("1000 bytes"));
+ Assert.IsTrue(result.Contains("Position=300"));
+ }
+ }
+
+ #endregion
+
+ #region Integration Tests
+
+ [TestMethod]
+ public async Task FullReadCycle_FromCreationToDisposal()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ int totalSize = 100 * 1024; // 100KB
+ byte[] testData = new byte[totalSize];
+ for (int i = 0; i < totalSize; i++)
+ testData[i] = (byte)(i % 256);
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ var dataSource = new ChunkedPartDataSource(1, stream);
+
+ try
+ {
+ // Act - Read all data in chunks
+ byte[] readBuffer = new byte[totalSize];
+ int totalRead = 0;
+ int chunkSize = 8192; // 8KB chunks
+
+ while (!dataSource.IsComplete)
+ {
+ int bytesRead = await dataSource.ReadAsync(
+ readBuffer,
+ totalRead,
+ Math.Min(chunkSize, totalSize - totalRead),
+ CancellationToken.None);
+
+ if (bytesRead == 0)
+ break;
+
+ totalRead += bytesRead;
+ }
+
+ // Assert
+ Assert.AreEqual(totalSize, totalRead);
+ Assert.IsTrue(dataSource.IsComplete);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ finally
+ {
+ // Cleanup
+ dataSource.Dispose();
+ }
+ }
+
+ [TestMethod]
+ public async Task SequentialReading_HandlesCorrectly()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[5000];
+ for (int i = 0; i < testData.Length; i++)
+ testData[i] = (byte)(i % 256);
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Act - Sequential reads with different sizes
+ byte[] readBuffer = new byte[5000];
+ int offset = 0;
+
+ // Read 1000 bytes
+ int read1 = await dataSource.ReadAsync(readBuffer, offset, 1000, CancellationToken.None);
+ offset += read1;
+
+ // Read 2000 bytes
+ int read2 = await dataSource.ReadAsync(readBuffer, offset, 2000, CancellationToken.None);
+ offset += read2;
+
+ // Read remaining
+ int read3 = await dataSource.ReadAsync(readBuffer, offset, 2000, CancellationToken.None);
+ offset += read3;
+
+ // Assert
+ Assert.AreEqual(1000, read1);
+ Assert.AreEqual(2000, read2);
+ Assert.AreEqual(2000, read3);
+ Assert.AreEqual(5000, offset);
+ Assert.IsTrue(dataSource.IsComplete);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ [TestMethod]
+ public async Task LargePartData_HandlesMultipleChunks()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ // Write 300KB spanning multiple 80KB chunks
+ int totalSize = 300 * 1024;
+ byte[] testData = new byte[totalSize];
+ var random = new Random(42); // Fixed seed for reproducibility
+ random.NextBytes(testData);
+
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(1, stream))
+ {
+ // Act - Read in 64KB chunks
+ byte[] readBuffer = new byte[totalSize];
+ int totalRead = 0;
+ int chunkSize = 64 * 1024;
+
+ while (totalRead < totalSize)
+ {
+ int bytesRead = await dataSource.ReadAsync(
+ readBuffer,
+ totalRead,
+ Math.Min(chunkSize, totalSize - totalRead),
+ CancellationToken.None);
+
+ if (bytesRead == 0)
+ break;
+
+ totalRead += bytesRead;
+ }
+
+ // Assert
+ Assert.AreEqual(totalSize, totalRead);
+ Assert.IsTrue(dataSource.IsComplete);
+ CollectionAssert.AreEqual(testData, readBuffer);
+ }
+ }
+
+ [TestMethod]
+ public void PartNumber_RemainsConstant()
+ {
+ // Arrange
+ var stream = new ChunkedBufferStream();
+ byte[] testData = new byte[1000];
+ stream.Write(testData, 0, testData.Length);
+ stream.SwitchToReadMode();
+
+ using (var dataSource = new ChunkedPartDataSource(7, stream))
+ {
+ // Assert - Check multiple times
+ Assert.AreEqual(7, dataSource.PartNumber);
+ Assert.AreEqual(7, dataSource.PartNumber);
+ Assert.AreEqual(7, dataSource.PartNumber);
+ }
+ }
+
+ #endregion
+ }
+}
diff --git a/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs
index 37ab24be0fb3..fa8506878171 100644
--- a/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs
+++ b/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs
@@ -80,10 +80,13 @@ public async Task NextExpectedPartNumber_IncrementsAfterPartComplete()
try
{
// Add part 1
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer);
+ manager.AddDataSource(dataSource);
// Read part 1 completely
byte[] readBuffer = new byte[512];
@@ -137,9 +140,12 @@ public async Task WaitForBufferSpaceAsync_WhenMaxPartsReached_Blocks()
for (int i = 1; i <= 2; i++)
{
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(i, testBuffer, 512);
- manager.AddBuffer(partBuffer);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(i, chunkedStream);
+ manager.AddDataSource(dataSource);
}
// Act - Try to wait for space (should block)
@@ -172,9 +178,12 @@ public async Task WaitForBufferSpaceAsync_AfterRelease_AllowsAccess()
{
// Take the one available slot
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- manager.AddBuffer(partBuffer);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
+ manager.AddDataSource(dataSource);
// Release space
manager.ReleaseBufferSpace();
@@ -234,7 +243,7 @@ public async Task WaitForBufferSpaceAsync_WithCancellation_ThrowsOperationCancel
#region AddBuffer Tests
[TestMethod]
- public async Task AddBuffer_CreatesBufferedDataSource()
+ public async Task AddBuffer_CreatesChunkedPartDataSource()
{
// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
@@ -242,13 +251,16 @@ public async Task AddBuffer_CreatesBufferedDataSource()
try
{
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
// Act
- manager.AddBuffer(partBuffer);
+ manager.AddDataSource(dataSource);
// Assert - Should be able to read from part 1
byte[] readBuffer = new byte[512];
@@ -272,7 +284,7 @@ public void AddBuffer_WithNullBuffer_ThrowsArgumentNullException()
try
{
// Act
- manager.AddBuffer((IPartDataSource)null);
+ manager.AddDataSource((IPartDataSource)null);
// Assert - ExpectedException
}
@@ -302,10 +314,13 @@ public async Task AddBuffer_SignalsPartAvailable()
await Task.Delay(50);
// Add the part
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer);
+ manager.AddDataSource(dataSource);
// Assert - Read should complete
int bytesRead = await readTask;
@@ -330,9 +345,11 @@ public async Task AddDataSource_AddsToCollection()
try
{
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var dataSource = new BufferedDataSource(partBuffer);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
// Act
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
@@ -372,7 +389,7 @@ public void AddDataSource_WithNullDataSource_ThrowsArgumentNullException()
[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
- public void AddDataSource_WithDuplicatePartNumber_ThrowsInvalidOperationException()
+ public async Task AddDataSource_WithDuplicatePartNumber_ThrowsInvalidOperationException()
{
// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
@@ -381,15 +398,19 @@ public void AddDataSource_WithDuplicatePartNumber_ThrowsInvalidOperationExceptio
try
{
// Add part 1
- byte[] testBuffer1 = ArrayPool.Shared.Rent(512);
- var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 512);
- var dataSource1 = new BufferedDataSource(partBuffer1);
+ byte[] testData1 = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream1 = new ChunkedBufferStream();
+ await chunkedStream1.WriteAsync(testData1, 0, 512);
+ chunkedStream1.SwitchToReadMode();
+ var dataSource1 = new ChunkedPartDataSource(1, chunkedStream1);
manager.AddDataSource(dataSource1);
// Try to add duplicate part 1
- byte[] testBuffer2 = ArrayPool.Shared.Rent(512);
- var partBuffer2 = new StreamPartBuffer(1, testBuffer2, 512);
- var dataSource2 = new BufferedDataSource(partBuffer2);
+ byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream2 = new ChunkedBufferStream();
+ await chunkedStream2.WriteAsync(testData2, 0, 512);
+ chunkedStream2.SwitchToReadMode();
+ var dataSource2 = new ChunkedPartDataSource(1, chunkedStream2);
// Act
manager.AddDataSource(dataSource2);
@@ -416,12 +437,12 @@ public async Task ReadAsync_ReadsDataSequentially()
try
{
byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- Buffer.BlockCopy(testData, 0, testBuffer, 0, 512);
-
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer);
+ manager.AddDataSource(dataSource);
// Act
byte[] readBuffer = new byte[512];
@@ -447,10 +468,13 @@ public async Task ReadAsync_AdvancesNextExpectedPartNumber()
try
{
// Add part 1
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer);
+ manager.AddDataSource(dataSource);
// Read part 1 completely
byte[] readBuffer = new byte[512];
@@ -581,10 +605,13 @@ public async Task ReadAsync_WaitsForPartAvailability()
Assert.IsFalse(readTask.IsCompleted);
// Add the part asynchronously
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer);
+ manager.AddDataSource(dataSource);
// Assert - Read should complete
int bytesRead = await readTask;
@@ -662,19 +689,21 @@ public async Task ReadAsync_ReadingAcrossPartBoundary_FillsBuffer()
{
// Add Part 1 (100 bytes)
byte[] testData1 = MultipartDownloadTestHelpers.GenerateTestData(100, 0);
- byte[] testBuffer1 = ArrayPool.Shared.Rent(100);
- Buffer.BlockCopy(testData1, 0, testBuffer1, 0, 100);
- var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100);
+ var chunkedStream1 = new ChunkedBufferStream();
+ await chunkedStream1.WriteAsync(testData1, 0, 100);
+ chunkedStream1.SwitchToReadMode();
+ var dataSource1 = new ChunkedPartDataSource(1, chunkedStream1);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer1);
+ manager.AddDataSource(dataSource1);
// Add Part 2 (100 bytes)
byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 100);
- byte[] testBuffer2 = ArrayPool.Shared.Rent(100);
- Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100);
- var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100);
+ var chunkedStream2 = new ChunkedBufferStream();
+ await chunkedStream2.WriteAsync(testData2, 0, 100);
+ chunkedStream2.SwitchToReadMode();
+ var dataSource2 = new ChunkedPartDataSource(2, chunkedStream2);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer2);
+ manager.AddDataSource(dataSource2);
// Act - Request 150 bytes (spans both parts)
byte[] readBuffer = new byte[150];
@@ -711,11 +740,12 @@ public async Task ReadAsync_MultiplePartsInSingleRead_AdvancesCorrectly()
for (int i = 1; i <= 3; i++)
{
byte[] testData = MultipartDownloadTestHelpers.GeneratePartSpecificData(50, i);
- byte[] testBuffer = ArrayPool.Shared.Rent(50);
- Buffer.BlockCopy(testData, 0, testBuffer, 0, 50);
- var partBuffer = new StreamPartBuffer(i, testBuffer, 50);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 50);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(i, chunkedStream);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer);
+ manager.AddDataSource(dataSource);
}
// Act - Read 150 bytes (all 3 parts)
@@ -742,10 +772,13 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart()
try
{
// Add part 1
- byte[] testBuffer1 = ArrayPool.Shared.Rent(100);
- var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100);
+ byte[] testData1 = MultipartDownloadTestHelpers.GenerateTestData(100, 0);
+ var chunkedStream1 = new ChunkedBufferStream();
+ await chunkedStream1.WriteAsync(testData1, 0, 100);
+ chunkedStream1.SwitchToReadMode();
+ var dataSource1 = new ChunkedPartDataSource(1, chunkedStream1);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer1);
+ manager.AddDataSource(dataSource1);
// Read part 1 completely
byte[] readBuffer = new byte[100];
@@ -755,10 +788,13 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart()
Assert.AreEqual(2, manager.NextExpectedPartNumber);
// Add part 2
- byte[] testBuffer2 = ArrayPool.Shared.Rent(100);
- var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100);
+ byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 0);
+ var chunkedStream2 = new ChunkedBufferStream();
+ await chunkedStream2.WriteAsync(testData2, 0, 100);
+ chunkedStream2.SwitchToReadMode();
+ var dataSource2 = new ChunkedPartDataSource(2, chunkedStream2);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer2);
+ manager.AddDataSource(dataSource2);
// Read part 2
int bytesRead = await manager.ReadAsync(readBuffer, 0, 100, CancellationToken.None);
@@ -783,18 +819,21 @@ public async Task ReadAsync_EmptyPart_ContinuesToNextPart()
try
{
// Add empty part 1
- byte[] testBuffer1 = ArrayPool.Shared.Rent(100);
- var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 0); // 0 bytes
+ var chunkedStream1 = new ChunkedBufferStream();
+ // Write 0 bytes - empty part
+ chunkedStream1.SwitchToReadMode();
+ var dataSource1 = new ChunkedPartDataSource(1, chunkedStream1);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer1);
+ manager.AddDataSource(dataSource1);
// Add part 2 with data
byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 0);
- byte[] testBuffer2 = ArrayPool.Shared.Rent(100);
- Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100);
- var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100);
+ var chunkedStream2 = new ChunkedBufferStream();
+ await chunkedStream2.WriteAsync(testData2, 0, 100);
+ chunkedStream2.SwitchToReadMode();
+ var dataSource2 = new ChunkedPartDataSource(2, chunkedStream2);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer2);
+ manager.AddDataSource(dataSource2);
// Act - Try to read 100 bytes starting from part 1
byte[] readBuffer = new byte[100];
@@ -975,7 +1014,7 @@ public async Task AddBufferAsync_IPartDataSource_WithStreamingDataSource_AddsSuc
// Act
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(streamingSource);
+ manager.AddDataSource(streamingSource);
// Assert - Should be able to read from part 1
byte[] readBuffer = new byte[512];
@@ -990,7 +1029,7 @@ public async Task AddBufferAsync_IPartDataSource_WithStreamingDataSource_AddsSuc
}
[TestMethod]
- public async Task AddBufferAsync_IPartDataSource_WithBufferedDataSource_AddsSuccessfully()
+ public async Task AddBufferAsync_IPartDataSource_WithChunkedPartDataSource_AddsSuccessfully()
{
// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
@@ -998,16 +1037,16 @@ public async Task AddBufferAsync_IPartDataSource_WithBufferedDataSource_AddsSucc
try
{
- // Create a BufferedDataSource
+ // Create a ChunkedPartDataSource
byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- Buffer.BlockCopy(testData, 0, testBuffer, 0, 512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- var bufferedSource = new BufferedDataSource(partBuffer);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var chunkedSource = new ChunkedPartDataSource(1, chunkedStream);
// Act
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(bufferedSource);
+ manager.AddDataSource(chunkedSource);
// Assert - Should be able to read from part 1
byte[] readBuffer = new byte[512];
@@ -1032,7 +1071,7 @@ public void AddBufferAsync_IPartDataSource_WithNull_ThrowsArgumentNullException(
try
{
// Act
- manager.AddBuffer((IPartDataSource)null);
+ manager.AddDataSource((IPartDataSource)null);
// Assert - ExpectedException
}
@@ -1072,7 +1111,7 @@ public async Task AddBufferAsync_IPartDataSource_SignalsPartAvailable()
// Act
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(streamingSource);
+ manager.AddDataSource(streamingSource);
// Assert - Read should complete
int bytesRead = await readTask;
@@ -1106,7 +1145,7 @@ public async Task ReadAsync_FromStreamingDataSource_ReadsCorrectly()
};
var streamingSource = new StreamingDataSource(1, response);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(streamingSource);
+ manager.AddDataSource(streamingSource);
// Act - Read in multiple chunks
byte[] readBuffer = new byte[400];
@@ -1146,15 +1185,16 @@ public async Task ReadAsync_FromMixedSources_ReadsSequentially()
};
var streamingSource = new StreamingDataSource(1, response1);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer((IPartDataSource)streamingSource);
+ manager.AddDataSource((IPartDataSource)streamingSource);
// Add buffered source for part 2
var testData2 = MultipartDownloadTestHelpers.GenerateTestData(500, 500);
- byte[] testBuffer2 = ArrayPool.Shared.Rent(500);
- Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 500);
- var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 500);
+ var chunkedStream2 = new ChunkedBufferStream();
+ await chunkedStream2.WriteAsync(testData2, 0, 500);
+ chunkedStream2.SwitchToReadMode();
+ var dataSource2 = new ChunkedPartDataSource(2, chunkedStream2);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(partBuffer2);
+ manager.AddDataSource(dataSource2);
// Act - Read across both parts
byte[] readBuffer = new byte[750];
@@ -1195,7 +1235,7 @@ public async Task ReadAsync_StreamingDataSource_DisposesAfterCompletion()
};
var streamingSource = new StreamingDataSource(1, response);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(streamingSource);
+ manager.AddDataSource(streamingSource);
// Act - Read all data
byte[] readBuffer = new byte[512];
@@ -1231,7 +1271,7 @@ public async Task ReadAsync_MultipleStreamingSources_ReadsSequentially()
};
var streamingSource = new StreamingDataSource(i, response);
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
- manager.AddBuffer(streamingSource);
+ manager.AddDataSource(streamingSource);
}
// Act - Read across all parts
@@ -1253,33 +1293,39 @@ public async Task ReadAsync_MultipleStreamingSources_ReadsSequentially()
#region Disposal Tests
[TestMethod]
- public void Dispose_DisposesAllDataSources()
+ public async Task Dispose_DisposesAllDataSources()
{
// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var manager = new PartBufferManager(config);
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- manager.AddBuffer(partBuffer);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
+ manager.AddDataSource(dataSource);
// Act
manager.Dispose();
- // Assert - The underlying part buffer should be disposed
- Assert.IsNull(partBuffer.ArrayPoolBuffer);
+ // Assert - The underlying chunked stream should be disposed
+ Assert.ThrowsException(() => chunkedStream.Position);
}
[TestMethod]
- public void Dispose_ClearsCollection()
+ public async Task Dispose_ClearsCollection()
{
// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var manager = new PartBufferManager(config);
- byte[] testBuffer = ArrayPool.Shared.Rent(512);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
- manager.AddBuffer(partBuffer);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 512);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(1, chunkedStream);
+ manager.AddDataSource(dataSource);
// Act
manager.Dispose();
@@ -1395,9 +1441,12 @@ public async Task NextExpectedPartNumber_ConcurrentReads_SeeConsistentValue()
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
// Add part
- byte[] testBuffer = ArrayPool.Shared.Rent(100);
- var partBuffer = new StreamPartBuffer(partNum, testBuffer, 100);
- manager.AddBuffer(partBuffer);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(100, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 100);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(partNum, chunkedStream);
+ manager.AddDataSource(dataSource);
// Read part completely to trigger increment
byte[] readBuffer = new byte[100];
@@ -1617,9 +1666,12 @@ public async Task BufferCapacity_ConcurrentOperations_RespectsMaxCountLimit()
}
// Simulate buffering the part
- byte[] testBuffer = ArrayPool.Shared.Rent(100);
- var partBuffer = new StreamPartBuffer(capturedPartNum, testBuffer, 100);
- manager.AddBuffer(partBuffer);
+ byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(100, 0);
+ var chunkedStream = new ChunkedBufferStream();
+ await chunkedStream.WriteAsync(testData, 0, 100);
+ chunkedStream.SwitchToReadMode();
+ var dataSource = new ChunkedPartDataSource(capturedPartNum, chunkedStream);
+ manager.AddDataSource(dataSource);
// Simulate some processing time
await Task.Delay(10);
diff --git a/sdk/test/Services/S3/UnitTests/Custom/StreamPartBufferTests.cs b/sdk/test/Services/S3/UnitTests/Custom/StreamPartBufferTests.cs
deleted file mode 100644
index 2ddde6d48238..000000000000
--- a/sdk/test/Services/S3/UnitTests/Custom/StreamPartBufferTests.cs
+++ /dev/null
@@ -1,396 +0,0 @@
-using Amazon.S3.Transfer.Internal;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System;
-using System.Buffers;
-
-namespace AWSSDK.UnitTests
-{
- ///
- /// Unit tests for StreamPartBuffer class.
- /// Tests ArrayPool buffer management and position tracking.
- ///
- [TestClass]
- public class StreamPartBufferTests
- {
- #region Creation Tests
-
- [TestMethod]
- public void Create_WithValidParameters_CreatesBuffer()
- {
- // Arrange
- int partNumber = 1;
- int capacity = 1024;
- int actualLength = 512;
-
- // Act
- var partBuffer = StreamPartBuffer.Create(partNumber, capacity);
-
- try
- {
- // Simulate writing data
- partBuffer.SetLength(actualLength);
-
- // Assert
- Assert.AreEqual(partNumber, partBuffer.PartNumber);
- Assert.IsNotNull(partBuffer.ArrayPoolBuffer);
- Assert.IsTrue(partBuffer.ArrayPoolBuffer.Length >= capacity); // ArrayPool may return larger
- Assert.AreEqual(actualLength, partBuffer.Length);
- Assert.AreEqual(0, partBuffer.CurrentPosition);
- Assert.AreEqual(actualLength, partBuffer.RemainingBytes);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- [TestMethod]
- public void Create_InitializesWithZeroLength()
- {
- // Arrange
- int partNumber = 2;
- int capacity = 2048;
-
- // Act
- var partBuffer = StreamPartBuffer.Create(partNumber, capacity);
-
- try
- {
- // Assert - Length should be 0 until SetLength is called
- Assert.AreEqual(partNumber, partBuffer.PartNumber);
- Assert.IsNotNull(partBuffer.ArrayPoolBuffer);
- Assert.AreEqual(0, partBuffer.Length);
- Assert.AreEqual(0, partBuffer.CurrentPosition);
- Assert.AreEqual(0, partBuffer.RemainingBytes);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- #endregion
-
- #region Property Tests
-
- [TestMethod]
- public void RemainingBytes_ReturnsCorrectValue()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
- partBuffer.SetLength(500);
-
- try
- {
- // Act & Assert - At start
- Assert.AreEqual(500, partBuffer.RemainingBytes);
-
- // Act & Assert - After reading some bytes
- partBuffer.CurrentPosition = 100;
- Assert.AreEqual(400, partBuffer.RemainingBytes);
-
- // Act & Assert - At end
- partBuffer.CurrentPosition = 500;
- Assert.AreEqual(0, partBuffer.RemainingBytes);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- [TestMethod]
- public void Length_ReturnsCorrectValue()
- {
- // Arrange
- int actualLength = 1000;
- var partBuffer = StreamPartBuffer.Create(1, 2048);
- partBuffer.SetLength(actualLength);
-
- try
- {
- // Act & Assert
- Assert.AreEqual(actualLength, partBuffer.Length);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- [TestMethod]
- public void CurrentPosition_CanBeUpdated()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
- partBuffer.SetLength(500);
-
- try
- {
- // Act
- partBuffer.CurrentPosition = 250;
-
- // Assert
- Assert.AreEqual(250, partBuffer.CurrentPosition);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- #endregion
-
- #region Reading Position Tests
-
- [TestMethod]
- public void CurrentPosition_AfterReading_UpdatesCorrectly()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
- partBuffer.SetLength(500);
-
- try
- {
- // Simulate reading 100 bytes
- partBuffer.CurrentPosition += 100;
- Assert.AreEqual(100, partBuffer.CurrentPosition);
- Assert.AreEqual(400, partBuffer.RemainingBytes);
-
- // Simulate reading another 150 bytes
- partBuffer.CurrentPosition += 150;
- Assert.AreEqual(250, partBuffer.CurrentPosition);
- Assert.AreEqual(250, partBuffer.RemainingBytes);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- [TestMethod]
- public void RemainingBytes_WhenFullyRead_ReturnsZero()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
- partBuffer.SetLength(500);
-
- try
- {
- // Act - Read all bytes
- partBuffer.CurrentPosition = 500;
-
- // Assert
- Assert.AreEqual(0, partBuffer.RemainingBytes);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- #endregion
-
- #region SetLength Tests
-
- [TestMethod]
- public void SetLength_WithValidLength_SetsCorrectly()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
-
- // Act
- partBuffer.SetLength(500);
-
- try
- {
- // Assert
- Assert.AreEqual(500, partBuffer.Length);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- [TestMethod]
- [ExpectedException(typeof(InvalidOperationException))]
- public void SetLength_CalledTwice_ThrowsException()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
- partBuffer.SetLength(500);
-
- try
- {
- // Act - Try to set length again
- partBuffer.SetLength(600);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- [TestMethod]
- [ExpectedException(typeof(ArgumentOutOfRangeException))]
- public void SetLength_WithNegativeLength_ThrowsException()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
-
- try
- {
- // Act
- partBuffer.SetLength(-1);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- [TestMethod]
- [ExpectedException(typeof(ArgumentOutOfRangeException))]
- public void SetLength_ExceedsBufferCapacity_ThrowsException()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
-
- try
- {
- // Act - Try to set length larger than buffer capacity
- partBuffer.SetLength(10000);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- #endregion
-
- #region Disposal Tests
-
- [TestMethod]
- public void Dispose_ReturnsBufferToArrayPool()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
- partBuffer.SetLength(500);
-
- // Act
- partBuffer.Dispose();
-
- // Assert - Buffer should be returned (verified by checking it's nulled)
- Assert.IsNull(partBuffer.ArrayPoolBuffer);
- }
-
- [TestMethod]
- public void Dispose_MultipleCalls_IsIdempotent()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
- partBuffer.SetLength(500);
-
- // Act - Dispose multiple times
- partBuffer.Dispose();
- partBuffer.Dispose();
- partBuffer.Dispose();
-
- // Assert - Should not throw
- Assert.IsNull(partBuffer.ArrayPoolBuffer);
- }
-
- [TestMethod]
- public void Dispose_SetsArrayPoolBufferToNull()
- {
- // Arrange
- var partBuffer = StreamPartBuffer.Create(1, 1024);
- partBuffer.SetLength(500);
-
- // Act
- partBuffer.Dispose();
-
- // Assert
- Assert.IsNull(partBuffer.ArrayPoolBuffer);
- }
-
- #endregion
-
- #region Edge Cases
-
- [TestMethod]
- public void Constructor_WithEmptyBuffer_HandlesCorrectly()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 0);
-
- try
- {
- // Assert
- Assert.AreEqual(0, partBuffer.Length);
- Assert.AreEqual(0, partBuffer.RemainingBytes);
- Assert.AreEqual(0, partBuffer.CurrentPosition);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- [TestMethod]
- public void RemainingBytes_WhenPositionBeyondLength_ReturnsZero()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(1, testBuffer, 500);
-
- try
- {
- // Act - Position beyond actual length
- partBuffer.CurrentPosition = 600;
-
- // Assert - RemainingBytes uses Math.Max(0, ...) to prevent negative
- Assert.AreEqual(0, partBuffer.RemainingBytes);
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- #endregion
-
- #region ToString Tests
-
- [TestMethod]
- public void ToString_ReturnsExpectedFormat()
- {
- // Arrange
- byte[] testBuffer = ArrayPool.Shared.Rent(1024);
- var partBuffer = new StreamPartBuffer(3, testBuffer, 500);
-
- try
- {
- partBuffer.CurrentPosition = 100;
-
- // Act
- string result = partBuffer.ToString();
-
- // Assert - Verify format contains key information
- Assert.IsTrue(result.Contains("Part=3"));
- Assert.IsTrue(result.Contains("500 bytes"));
- Assert.IsTrue(result.Contains("pos=100"));
- Assert.IsTrue(result.Contains("remaining=400"));
- }
- finally
- {
- partBuffer.Dispose();
- }
- }
-
- #endregion
- }
-}