diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedDataSource.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedDataSource.cs deleted file mode 100644 index db6ae3c8d00c..000000000000 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedDataSource.cs +++ /dev/null @@ -1,156 +0,0 @@ -/******************************************************************************* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * Licensed under the Apache License, Version 2.0 (the "License"). You may not use - * this file except in compliance with the License. A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * ***************************************************************************** - * __ _ _ ___ - * ( )( \/\/ )/ __) - * /__\ \ / \__ \ - * (_)(_) \/\/ (___/ - * - * AWS SDK for .NET - * API Version: 2006-03-01 - * - */ -using Amazon.Runtime.Internal.Util; -using System; -using System.Diagnostics.CodeAnalysis; -using System.Threading; -using System.Threading.Tasks; - -namespace Amazon.S3.Transfer.Internal -{ - /// - /// ArrayPool-based buffered data source that reads from pre-buffered part data. - /// Manages ArrayPool lifecycle and provides efficient buffer-to-buffer copying. - /// - internal class BufferedDataSource : IPartDataSource - { - private readonly StreamPartBuffer _partBuffer; - private bool _disposed = false; - - #region Logger - - private Logger Logger - { - get - { - return Logger.GetLogger(typeof(TransferUtility)); - } - } - - #endregion - - /// - public int PartNumber => _partBuffer.PartNumber; - - /// - public bool IsComplete => _partBuffer.RemainingBytes == 0; - - /// - /// Initializes a new instance of the class. - /// - /// The containing the buffered part data. - /// Thrown when is null. - public BufferedDataSource(StreamPartBuffer partBuffer) - { - _partBuffer = partBuffer ?? throw new ArgumentNullException(nameof(partBuffer)); - - Logger.DebugFormat("BufferedDataSource: Created for part {0} (BufferLength={1}, RemainingBytes={2})", - _partBuffer.PartNumber, _partBuffer.Length, _partBuffer.RemainingBytes); - } - - /// - public Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - ThrowIfDisposed(); - - try - { - if (buffer == null) - throw new ArgumentNullException(nameof(buffer)); - if (offset < 0) - throw new ArgumentOutOfRangeException(nameof(offset), "Offset must be non-negative"); - if (count < 0) - throw new ArgumentOutOfRangeException(nameof(count), "Count must be non-negative"); - if (offset + count > buffer.Length) - throw new ArgumentException("Offset and count exceed buffer bounds"); - - if (_partBuffer.RemainingBytes == 0) - { - Logger.DebugFormat("BufferedDataSource: [Part {0}] Reached end of buffer (RemainingBytes=0)", _partBuffer.PartNumber); - return Task.FromResult(0); // End of part - } - - // Calculate bytes to copy from buffered part - var availableBytes = _partBuffer.RemainingBytes; - var bytesToRead = Math.Min(count, availableBytes); - - Logger.DebugFormat("BufferedDataSource: [Part {0}] Reading {1} bytes (Requested={2}, Available={3}, CurrentPosition={4})", - _partBuffer.PartNumber, bytesToRead, count, availableBytes, _partBuffer.CurrentPosition); - - Buffer.BlockCopy( - _partBuffer.ArrayPoolBuffer, // Source: ArrayPool buffer - _partBuffer.CurrentPosition, // Source offset - buffer, // Destination: user buffer - offset, // Destination offset - bytesToRead // Bytes to copy - ); - - // Update position in the part buffer - _partBuffer.CurrentPosition += bytesToRead; - - Logger.DebugFormat("BufferedDataSource: [Part {0}] Read complete (BytesRead={1}, NewPosition={2}, RemainingBytes={3}, IsComplete={4})", - _partBuffer.PartNumber, bytesToRead, _partBuffer.CurrentPosition, _partBuffer.RemainingBytes, IsComplete); - - return Task.FromResult(bytesToRead); - } - catch (Exception ex) - { - Logger.Error(ex, "BufferedDataSource: [Part {0}] Error during read: {1}", _partBuffer.PartNumber, ex.Message); - - // On any error during read (including validation), mark the buffer as consumed to prevent further reads - _partBuffer.CurrentPosition = _partBuffer.Length; - throw; - } - } - - private void ThrowIfDisposed() - { - if (_disposed) - throw new ObjectDisposedException(nameof(BufferedDataSource)); - } - - /// - [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Dispose methods should not throw exceptions")] - public void Dispose() - { - if (!_disposed) - { - try - { - Logger.DebugFormat("BufferedDataSource: [Part {0}] Disposing (Returning buffer to ArrayPool)", _partBuffer.PartNumber); - - // Dispose the underlying StreamPartBuffer, which returns ArrayPool buffer to pool - _partBuffer?.Dispose(); - } - catch (Exception ex) - { - Logger.Error(ex, "BufferedDataSource: [Part {0}] Error during disposal: {1}", _partBuffer.PartNumber, ex.Message); - - // Suppressing CA1031: Dispose methods should not throw exceptions - // Continue disposal process silently on any errors - } - - _disposed = true; - } - } - } -} diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs index 256a0228d086..49a014f734f8 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs @@ -147,7 +147,7 @@ private void ProcessStreamingPart( // Add the streaming data source to the buffer manager // After this succeeds, the buffer manager owns the data source - _partBufferManager.AddBuffer(streamingDataSource); + _partBufferManager.AddDataSource(streamingDataSource); // Mark ownership transfer by nulling our reference // If ReleaseBufferSpace() throws, we no longer own the data source, so we won't dispose it @@ -181,17 +181,17 @@ private void ProcessStreamingPart( /// Cancellation token for the operation. /// /// This method is called when the part arrives out of the expected sequential order. - /// The part data is buffered into ArrayPool memory for later sequential consumption. + /// The part data is buffered into ArrayPool memory using ChunkedBufferStream for later sequential consumption. /// /// OWNERSHIP: - /// - Response is read and buffered into StreamPartBuffer + /// - Response is read and buffered into ChunkedBufferStream /// - Response is disposed immediately after buffering (no longer needed) - /// - StreamPartBuffer is added to buffer manager (buffer manager takes ownership) - /// - Buffer manager will dispose StreamPartBuffer during cleanup + /// - ChunkedPartDataSource (wrapping ChunkedBufferStream) is added to buffer manager (buffer manager takes ownership) + /// - Buffer manager will dispose ChunkedPartDataSource during cleanup /// /// ERROR HANDLING: /// - Always dispose response in catch block since we own it throughout this method - /// - BufferPartFromResponseAsync handles its own cleanup of StreamPartBuffer on error + /// - BufferPartFromResponseAsync handles its own cleanup of ChunkedBufferStream on error /// private async Task ProcessBufferedPartAsync( int partNumber, @@ -204,7 +204,7 @@ private async Task ProcessBufferedPartAsync( try { // Buffer the part from the response stream into memory - var buffer = await BufferPartFromResponseAsync( + var dataSource = await BufferPartFromResponseAsync( partNumber, response, cancellationToken).ConfigureAwait(false); @@ -212,11 +212,11 @@ private async Task ProcessBufferedPartAsync( // Response has been fully read and buffered - dispose it now response?.Dispose(); - _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into memory", - partNumber, buffer.Length); + _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Successfully buffered part data", + partNumber); // Add the buffered part to the buffer manager - _partBufferManager.AddBuffer(buffer); + _partBufferManager.AddDataSource(dataSource); _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Added to buffer manager (capacity will be released after consumption)", partNumber); @@ -257,77 +257,61 @@ public void Dispose() } /// - /// Buffers a part from the GetObjectResponse stream into ArrayPool memory. - /// Used when a part arrives out of order and cannot be streamed directly. + /// Buffers a part from the GetObjectResponse stream into memory using ChunkedBufferStream. + /// Uses multiple small ArrayPool chunks (80KB each) to avoid the 2GB byte[] array size limitation + /// and Large Object Heap allocations. Used when a part arrives out of order and cannot be streamed directly. /// /// The part number being buffered. /// The GetObjectResponse containing the part data stream. /// Cancellation token for the operation. - /// A containing the buffered part data. - /// Thrown when buffering fails. The StreamPartBuffer will be disposed automatically. - private async Task BufferPartFromResponseAsync( + /// An containing the buffered part data. + /// Thrown when buffering fails. The data source will be disposed automatically. + private async Task BufferPartFromResponseAsync( int partNumber, GetObjectResponse response, CancellationToken cancellationToken) { - StreamPartBuffer downloadedPart = null; + long expectedBytes = response.ContentLength; + + ChunkedBufferStream chunkedStream = null; try { - // Use ContentLength to determine exact bytes to read and allocate - long expectedBytes = response.ContentLength; - int initialBufferSize = (int)expectedBytes; - - _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Allocating buffer of size {1} bytes from ArrayPool", - partNumber, initialBufferSize); + _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffering {1} bytes using chunked buffer stream", + partNumber, expectedBytes); - downloadedPart = StreamPartBuffer.Create(partNumber, initialBufferSize); + chunkedStream = new ChunkedBufferStream(expectedBytes); - // Get reference to the buffer for writing - var partBuffer = downloadedPart.ArrayPoolBuffer; - - // Create a MemoryStream wrapper around the pooled buffer - // writable: true allows WriteResponseStreamAsync to write to it - // The MemoryStream starts at position 0 and can grow up to initialBufferSize - using (var memoryStream = new MemoryStream(partBuffer, 0, initialBufferSize, writable: true)) - { - _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into buffer", - partNumber); - - // Use GetObjectResponse's stream copy logic which includes: - // - Progress tracking with events - // - Size validation (ContentLength vs bytes read) - // - Buffered reading with proper chunk sizes - await response.WriteResponseStreamAsync( - memoryStream, - null, // destination identifier (not needed for memory stream) - _config.BufferSize, - cancellationToken, - validateSize: true) - .ConfigureAwait(false); - - int totalRead = (int)memoryStream.Position; - - _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Read {1} bytes from response stream", - partNumber, totalRead); + _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into chunked buffers", + partNumber); - // Set the length to reflect actual bytes read - downloadedPart.SetLength(totalRead); + // Write response stream to chunked buffer stream + // ChunkedBufferStream automatically allocates chunks as needed + await response.WriteResponseStreamAsync( + chunkedStream, + null, + _config.BufferSize, + cancellationToken, + validateSize: true) + .ConfigureAwait(false); + + _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into chunked stream", + partNumber, chunkedStream.Length); - if (totalRead != expectedBytes) - { - _logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes", - partNumber, expectedBytes, totalRead); - } + if (chunkedStream.Length != expectedBytes) + { + _logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes", + partNumber, expectedBytes, chunkedStream.Length); } - return downloadedPart; + // Switch to read mode and wrap in IPartDataSource + chunkedStream.SwitchToReadMode(); + return new ChunkedPartDataSource(partNumber, chunkedStream); } catch (Exception ex) { - _logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to buffer part from response stream", partNumber); - // If something goes wrong, StreamPartBuffer.Dispose() will handle cleanup - downloadedPart?.Dispose(); + _logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to buffer part", partNumber); + chunkedStream?.Dispose(); throw; } } diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedBufferStream.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedBufferStream.cs new file mode 100644 index 000000000000..b9bd8fbbef13 --- /dev/null +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedBufferStream.cs @@ -0,0 +1,409 @@ +/******************************************************************************* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * ***************************************************************************** + * __ _ _ ___ + * ( )( \/\/ )/ __) + * /__\ \ / \__ \ + * (_)(_) \/\/ (___/ + * + * AWS SDK for .NET + * API Version: 2006-03-01 + * + */ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Amazon.S3.Transfer.Internal +{ + /// + /// A writable and readable stream that stores data across multiple small ArrayPool buffers, + /// avoiding the 2GB array size limitation and Large Object Heap (LOH) allocations. + /// + /// + /// Design Goals: + /// + /// Keep each buffer chunk below 85KB to avoid LOH allocation + /// Support parts larger than 2GB (int.MaxValue) + /// Efficient memory management via ArrayPool + /// Present standard Stream interface for easy integration + /// + /// + /// Size Limits: + /// + /// Maximum supported stream size is approximately 140TB (int.MaxValue * CHUNK_SIZE bytes). + /// This limit exists because chunk indexing uses int for List indexing. + /// + /// + /// Usage Pattern: + /// + /// var stream = new ChunkedBufferStream(); + /// + /// // Write phase: Stream data in + /// await response.WriteResponseStreamAsync(stream, ...); + /// + /// // Read phase: Stream data out (automatically switches to read mode) + /// int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length); + /// + /// // Cleanup: Returns all chunks to ArrayPool + /// stream.Dispose(); + /// + /// + /// Note: The stream automatically switches to read mode on the first read operation. + /// You can optionally call explicitly for clarity. + /// + /// + internal class ChunkedBufferStream : Stream + { + /// + /// Size of each buffer chunk. Set to 64KB to match ArrayPool bucket size and stay below the 85KB Large Object Heap threshold. + /// If we chose any higher than 64KB, ArrayPool would round up to 128KB (which would go to LOH). + /// + private const int CHUNK_SIZE = 65536; // 64KB - matches ArrayPool bucket, safely below 85KB LOH threshold + + /// + /// Maximum supported stream size. This limit exists because chunk indexing uses int for List indexing. + /// With 64KB chunks, this allows approximately 140TB of data. + /// + private const long MAX_STREAM_SIZE = (long)int.MaxValue * CHUNK_SIZE; + + private readonly List _chunks; + private long _length = 0; + private long _position = 0; + private bool _isReadMode = false; + private bool _disposed = false; + + /// + /// Creates a new ChunkedBufferStream with default initial capacity. + /// + public ChunkedBufferStream() + { + _chunks = new List(); + } + + /// + /// Creates a new ChunkedBufferStream with pre-allocated capacity for the expected size. + /// This avoids List resizing during writes, improving performance for known sizes. + /// + /// The estimated total size in bytes. Used to pre-allocate the chunk list capacity. + public ChunkedBufferStream(long estimatedSize) + { + if (estimatedSize > 0) + { + // Ceiling division formula: (n + d - 1) / d calculates ceil(n / d) using integer arithmetic. + // This computes how many chunks are needed to hold estimatedSize bytes, rounding up to + // ensure the last partial chunk is accounted for. Avoids floating-point math for performance. + // Example: For 100 bytes with CHUNK_SIZE=32: (100 + 31) / 32 = 131 / 32 = 4 chunks + // (simple division 100/32=3 would only hold 96 bytes, losing 4 bytes) + long estimatedChunks = (estimatedSize + CHUNK_SIZE - 1) / CHUNK_SIZE; + int capacity = (int)Math.Min(estimatedChunks, int.MaxValue); + _chunks = new List(capacity); + } + else + { + _chunks = new List(); + } + } + + /// + /// Throws if the stream has been disposed. + /// + /// Thrown if the stream is disposed. + private void ThrowIfDisposed() + { + if (_disposed) + throw new ObjectDisposedException(nameof(ChunkedBufferStream)); + } + + /// + /// Gets a value indicating whether the stream supports reading. + /// Returns true after the first read operation or after has been called. + /// + public override bool CanRead => _isReadMode; + + /// + /// Gets a value indicating whether the stream supports seeking. + /// Always returns false - seeking is not supported. + /// + public override bool CanSeek => false; + + /// + /// Gets a value indicating whether the stream supports writing. + /// Returns true before is called, false after. + /// + public override bool CanWrite => !_isReadMode; + + /// + /// Gets the length in bytes of the stream. + /// + /// Thrown if the stream has been disposed. + public override long Length + { + get + { + ThrowIfDisposed(); + return _length; + } + } + + /// + /// Gets or sets the position within the current stream. + /// Setting the position is not supported and will throw . + /// + /// Thrown if the stream has been disposed. + /// Thrown when attempting to set the position. + public override long Position + { + get + { + ThrowIfDisposed(); + return _position; + } + set => throw new NotSupportedException("Seeking not supported"); + } + + /// + /// Writes a sequence of bytes to the stream and advances the current position by the number of bytes written. + /// Automatically allocates new chunks from ArrayPool as needed. + /// + /// The buffer containing data to write. + /// The zero-based byte offset in buffer at which to begin copying bytes. + /// The number of bytes to write. + /// Thrown if the stream has been disposed. + /// Thrown if stream is in read mode. + /// Thrown if buffer is null. + /// Thrown if offset or count is negative or exceeds buffer bounds. + /// Thrown if the write would exceed the maximum supported stream size (approximately 175TB). + public override void Write(byte[] buffer, int offset, int count) + { + ThrowIfDisposed(); + + if (_isReadMode) + throw new NotSupportedException("Cannot write after switching to read mode"); + + if (buffer == null) + throw new ArgumentNullException(nameof(buffer)); + if (offset < 0) + throw new ArgumentOutOfRangeException(nameof(offset), "Offset must be non-negative"); + if (count < 0) + throw new ArgumentOutOfRangeException(nameof(count), "Count must be non-negative"); + if (offset + count > buffer.Length) + throw new ArgumentException("Offset and count exceed buffer bounds"); + + // Check for overflow before writing - prevents chunk index overflow for extremely large streams + if (_length > MAX_STREAM_SIZE - count) + throw new IOException($"Write would exceed maximum supported stream size of {MAX_STREAM_SIZE} bytes (approximately 175TB)."); + + int remaining = count; + int sourceOffset = offset; + + while (remaining > 0) + { + // Calculate which chunk and offset within chunk for current write position + int chunkIndex = (int)(_length / CHUNK_SIZE); + int offsetInChunk = (int)(_length % CHUNK_SIZE); + + // Allocate new chunk if we've filled all existing chunks + if (chunkIndex >= _chunks.Count) + { + _chunks.Add(ArrayPool.Shared.Rent(CHUNK_SIZE)); + } + + // Copy data to current chunk + int bytesToWrite = Math.Min(remaining, CHUNK_SIZE - offsetInChunk); + Buffer.BlockCopy(buffer, sourceOffset, _chunks[chunkIndex], offsetInChunk, bytesToWrite); + + _length += bytesToWrite; + sourceOffset += bytesToWrite; + remaining -= bytesToWrite; + } + + _position = _length; + } + + /// + /// Asynchronously writes a sequence of bytes to the stream. + /// + /// The buffer containing data to write. + /// The zero-based byte offset in buffer at which to begin copying bytes. + /// The number of bytes to write. + /// The token to monitor for cancellation requests. + /// A task representing the asynchronous write operation. + /// + /// Delegates to synchronous as ArrayPool operations are fast and don't benefit from async. + /// + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + Write(buffer, offset, count); + return Task.CompletedTask; + } + + /// + /// Reads a sequence of bytes from the stream and advances the position by the number of bytes read. + /// Automatically switches to read mode on the first read if not already in read mode. + /// + /// The buffer to read data into. + /// The zero-based byte offset in buffer at which to begin storing data. + /// The maximum number of bytes to read. + /// + /// The total number of bytes read into the buffer. This can be less than the requested count + /// if that many bytes are not currently available, or zero if the end of the stream is reached. + /// + /// Thrown if the stream has been disposed. + /// Thrown if buffer is null. + /// Thrown if offset or count is negative or exceeds buffer bounds. + public override int Read(byte[] buffer, int offset, int count) + { + ThrowIfDisposed(); + + // Automatically switch to read mode on first read + if (!_isReadMode) + { + _isReadMode = true; + _position = 0; + } + + if (buffer == null) + throw new ArgumentNullException(nameof(buffer)); + if (offset < 0) + throw new ArgumentOutOfRangeException(nameof(offset), "Offset must be non-negative"); + if (count < 0) + throw new ArgumentOutOfRangeException(nameof(count), "Count must be non-negative"); + if (offset + count > buffer.Length) + throw new ArgumentException("Offset and count exceed buffer bounds"); + + if (_position >= _length) + return 0; // End of stream + + long bytesAvailable = _length - _position; + int bytesToRead = (int)Math.Min(count, bytesAvailable); + int bytesRead = 0; + + while (bytesRead < bytesToRead) + { + int chunkIndex = (int)(_position / CHUNK_SIZE); + int offsetInChunk = (int)(_position % CHUNK_SIZE); + int bytesInThisChunk = Math.Min(bytesToRead - bytesRead, CHUNK_SIZE - offsetInChunk); + + Buffer.BlockCopy(_chunks[chunkIndex], offsetInChunk, buffer, offset + bytesRead, bytesInThisChunk); + + _position += bytesInThisChunk; + bytesRead += bytesInThisChunk; + } + + return bytesRead; + } + + /// + /// Asynchronously reads a sequence of bytes from the stream. + /// Automatically switches to read mode on the first read if not already in read mode. + /// + /// The buffer to read data into. + /// The zero-based byte offset in buffer at which to begin storing data. + /// The maximum number of bytes to read. + /// The token to monitor for cancellation requests. + /// + /// A task representing the asynchronous read operation. The task result contains the total number + /// of bytes read into the buffer. + /// + /// + /// Delegates to synchronous as buffer operations are fast and don't benefit from async. + /// + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + return Task.FromResult(Read(buffer, offset, count)); + } + + /// + /// Switches the stream from write mode to read mode. + /// This method is optional - the stream will automatically switch to read mode on the first read operation. + /// Call this explicitly if you want to switch modes before reading or for clarity in your code. + /// Resets the position to the beginning of the stream. + /// + /// Thrown if the stream has been disposed. + /// Thrown if already in read mode. + public void SwitchToReadMode() + { + ThrowIfDisposed(); + + if (_isReadMode) + throw new InvalidOperationException("Stream is already in read mode"); + + _isReadMode = true; + _position = 0; + } + + /// + /// Flushes any buffered data. This is a no-op for this stream type. + /// + public override void Flush() + { + // No-op: No buffering layer that needs flushing + } + + /// + /// Sets the position within the current stream. + /// Not supported - always throws . + /// + /// Always thrown. + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException("Seeking is not supported"); + } + + /// + /// Sets the length of the current stream. + /// Not supported - always throws . + /// + /// Always thrown. + public override void SetLength(long value) + { + throw new NotSupportedException("SetLength is not supported"); + } + + /// + /// Releases the unmanaged resources used by the stream and optionally releases the managed resources. + /// Returns all ArrayPool chunks back to the shared pool. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Dispose methods should not throw exceptions")] + protected override void Dispose(bool disposing) + { + if (!_disposed && disposing) + { + try + { + // Return all chunks to ArrayPool + foreach (var chunk in _chunks) + { + ArrayPool.Shared.Return(chunk); + } + _chunks.Clear(); + } + catch (Exception) + { + // Suppress exceptions in Dispose - continue cleanup + } + + _disposed = true; + } + + base.Dispose(disposing); + } + } +} diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedPartDataSource.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedPartDataSource.cs new file mode 100644 index 000000000000..afc2cabf9ffa --- /dev/null +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/ChunkedPartDataSource.cs @@ -0,0 +1,136 @@ +/******************************************************************************* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * ***************************************************************************** + * __ _ _ ___ + * ( )( \/\/ )/ __) + * /__\ \ / \__ \ + * (_)(_) \/\/ (___/ + * + * AWS SDK for .NET + * API Version: 2006-03-01 + * + */ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Amazon.S3.Transfer.Internal +{ + /// + /// IPartDataSource implementation that wraps a ChunkedBufferStream for large buffered parts + /// + /// + /// Purpose: + /// + /// Wraps ChunkedBufferStream to present IPartDataSource interface + /// Enables seamless integration with PartBufferManager + /// Handles disposal of underlying ChunkedBufferStream + /// + /// + /// Lifecycle: + /// + /// // Created in BufferedPartDataHandler for large parts + /// var chunkedStream = new ChunkedBufferStream(); + /// await response.WriteResponseStreamAsync(chunkedStream, ...); + /// chunkedStream.SwitchToReadMode(); + /// var dataSource = new ChunkedPartDataSource(partNumber, chunkedStream); + /// + /// // Added to PartBufferManager + /// _partBufferManager.AddDataSource(dataSource); + /// + /// // Consumer reads sequentially + /// await dataSource.ReadAsync(buffer, offset, count, ct); + /// + /// // Disposed by PartBufferManager when part is consumed + /// dataSource.Dispose(); // Returns all chunks to ArrayPool + /// + /// + internal class ChunkedPartDataSource : IPartDataSource + { + private readonly ChunkedBufferStream _stream; + private bool _disposed = false; + + /// + /// Gets the part number for this data source. + /// Used by PartBufferManager for sequential ordering. + /// + public int PartNumber { get; } + + /// + /// Gets whether this data source has been fully consumed. + /// Returns true when all data has been read from the chunked buffer stream. + /// + public bool IsComplete => _stream.Position >= _stream.Length; + + /// + /// Creates a new ChunkedPartDataSource wrapping a ChunkedBufferStream. + /// The ChunkedBufferStream must already be in read mode (SwitchToReadMode called). + /// Takes ownership of the stream and will dispose it when this data source is disposed. + /// + /// The part number for ordering. + /// The ChunkedBufferStream containing the buffered part data. Must be in read mode. + /// Thrown if stream is null. + /// Thrown if the stream is not in read mode (CanRead is false). + public ChunkedPartDataSource(int partNumber, ChunkedBufferStream stream) + { + PartNumber = partNumber; + _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + + if (!_stream.CanRead) + throw new InvalidOperationException("ChunkedBufferStream must be in read mode (call SwitchToReadMode first)"); + } + + /// + /// Asynchronously reads a sequence of bytes from the chunked buffer stream. + /// + /// The buffer to read data into. + /// The zero-based byte offset in buffer at which to begin storing data. + /// The maximum number of bytes to read. + /// The token to monitor for cancellation requests. + /// + /// A task representing the asynchronous read operation. The task result contains the total number + /// of bytes read into the buffer. This can be less than the requested count if that many bytes + /// are not currently available, or zero if the end of the stream is reached. + /// + /// Thrown if the object has been disposed. + public Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (_disposed) + throw new ObjectDisposedException(nameof(ChunkedPartDataSource)); + + return _stream.ReadAsync(buffer, offset, count, cancellationToken); + } + + /// + /// Returns a string representation of this ChunkedPartDataSource for debugging. + /// + /// A string describing this chunked part data source. + public override string ToString() + { + return $"ChunkedPartDataSource(Part={PartNumber}, Length={_stream.Length} bytes, Position={_stream.Position})"; + } + + + /// + /// Releases all resources used by this ChunkedPartDataSource. + /// Disposes the underlying ChunkedBufferStream, which returns all ArrayPool chunks. + /// + public void Dispose() + { + if (!_disposed) + { + _stream?.Dispose(); + _disposed = true; + } + } + } +} diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs index 5f5c214421b5..686a89691ce4 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs @@ -44,19 +44,7 @@ internal interface IPartBufferManager : IDisposable /// The part data source to add. void AddDataSource(IPartDataSource dataSource); - /// - /// Adds a downloaded part buffer and signals readers when next expected part arrives. - /// - /// The downloaded part buffer to add. - void AddBuffer(StreamPartBuffer buffer); - - /// - /// Adds a part data source (streaming or buffered) and signals readers when next expected part arrives. - /// - /// The part data source to add (can be StreamingDataSource or BufferedDataSource). - /// A task that completes when the data source has been added and signaling is complete. - void AddBuffer(IPartDataSource dataSource); - + /// /// Reads data from the buffer manager. Automatically handles sequential part consumption /// and reads across part boundaries to fill the buffer when possible, matching standard Stream.Read() behavior. diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs index 33edf2fa0ad1..a27622ea4377 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs @@ -82,8 +82,8 @@ namespace Amazon.S3.Transfer.Internal /// - Blocks if are already buffered in memory /// - Example: With MaxInMemoryParts=10, if parts 5-14 are buffered, the task downloading /// part 15 blocks here until the reader consumes and releases part 5's buffer - /// 2. Read part data from S3 into pooled buffer - /// 3. Add buffered part: await + /// 2. Read part data from S3 into chunked ArrayPool buffers + /// 3. Add buffered part: await /// - Adds buffer to _partDataSources dictionary /// - Signals _partAvailable to wake consumer if waiting /// 4. Consumer eventually releases the buffer slot after reading the part @@ -176,17 +176,8 @@ internal class PartBufferManager : IPartBufferManager #endregion - #region Logger + private readonly Logger _logger = Logger.GetLogger(typeof(PartBufferManager)); - private Logger Logger - { - get - { - return Logger.GetLogger(typeof(TransferUtility)); - } - } - - #endregion /// /// Initializes a new instance of the class. @@ -205,7 +196,7 @@ public PartBufferManager(BufferedDownloadConfiguration config) ); _partAvailable = new AutoResetEvent(false); - Logger.DebugFormat("PartBufferManager initialized with MaxInMemoryParts={0}", config.MaxInMemoryParts); + _logger.DebugFormat("PartBufferManager initialized with MaxInMemoryParts={0}", config.MaxInMemoryParts); } /// @@ -236,12 +227,12 @@ public async Task WaitForBufferSpaceAsync(CancellationToken cancellationToken) ThrowIfDisposed(); var availableBefore = _bufferSpaceAvailable.CurrentCount; - Logger.DebugFormat("PartBufferManager: Waiting for buffer space (Available slots before wait: {0})", availableBefore); + _logger.DebugFormat("PartBufferManager: Waiting for buffer space (Available slots before wait: {0})", availableBefore); await _bufferSpaceAvailable.WaitAsync(cancellationToken).ConfigureAwait(false); var availableAfter = _bufferSpaceAvailable.CurrentCount; - Logger.DebugFormat("PartBufferManager: Buffer space acquired (Available slots after acquire: {0})", availableAfter); + _logger.DebugFormat("PartBufferManager: Buffer space acquired (Available slots after acquire: {0})", availableAfter); } /// @@ -267,50 +258,25 @@ public void AddDataSource(IPartDataSource dataSource) if (dataSource == null) throw new ArgumentNullException(nameof(dataSource)); - Logger.DebugFormat("PartBufferManager: Adding part {0} (BufferedParts count before add: {1})", + _logger.DebugFormat("PartBufferManager: Adding part {0} (BufferedParts count before add: {1})", dataSource.PartNumber, _partDataSources.Count); // Add the data source to the collection if (!_partDataSources.TryAdd(dataSource.PartNumber, dataSource)) { // Duplicate part number - this shouldn't happen in normal operation - Logger.Error(null, "PartBufferManager: Duplicate part {0} attempted to be added", dataSource.PartNumber); + _logger.Error(null, "PartBufferManager: Duplicate part {0} attempted to be added", dataSource.PartNumber); dataSource?.Dispose(); // Clean up the duplicate part throw new InvalidOperationException($"Duplicate part {dataSource.PartNumber} attempted to be added"); } - Logger.DebugFormat("PartBufferManager: Part {0} added successfully (BufferedParts count after add: {1}). Signaling _partAvailable.", + _logger.DebugFormat("PartBufferManager: Part {0} added successfully (BufferedParts count after add: {1}). Signaling _partAvailable.", dataSource.PartNumber, _partDataSources.Count); // Signal that a new part is available _partAvailable.Set(); } - /// - public void AddBuffer(StreamPartBuffer buffer) - { - ThrowIfDisposed(); - - if (buffer == null) - throw new ArgumentNullException(nameof(buffer)); - - // Create a BufferedDataSource and add it - var bufferedSource = new BufferedDataSource(buffer); - AddDataSource(bufferedSource); - } - - /// - public void AddBuffer(IPartDataSource dataSource) - { - ThrowIfDisposed(); - - if (dataSource == null) - throw new ArgumentNullException(nameof(dataSource)); - - // Delegate directly to AddDataSourceAsync which already handles IPartDataSource - AddDataSource(dataSource); - } - /// public async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { @@ -395,7 +361,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat { var currentPartNumber = _nextExpectedPartNumber; - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Expecting part {0} (Requested bytes: {1}, BufferedParts count: {2})", + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Expecting part {0} (Requested bytes: {1}, BufferedParts count: {2})", currentPartNumber, count, _partDataSources.Count); // Wait for the current part to become available. @@ -405,7 +371,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat // Example: If parts 3, 5, 7 are available but we need part 2, we wait here. while (!_partDataSources.ContainsKey(currentPartNumber)) { - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} not yet available. Waiting on _partAvailable event...", + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} not yet available. Waiting on _partAvailable event...", currentPartNumber); // Check for completion first to avoid indefinite waiting. @@ -414,12 +380,12 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat { if (state.Item2 != null) // Check for exception { - Logger.Error(state.Item2, "PartBufferManager.ReadFromCurrentPart: Download failed while waiting for part {0}", + _logger.Error(state.Item2, "PartBufferManager.ReadFromCurrentPart: Download failed while waiting for part {0}", currentPartNumber); throw new InvalidOperationException("Multipart download failed", state.Item2); } - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Download complete, part {0} not available. Returning EOF.", + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Download complete, part {0} not available. Returning EOF.", currentPartNumber); // True EOF - all parts downloaded, no more data coming return (0, false); @@ -434,11 +400,11 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat // and calls AddDataSourceAsync, it signals this event, waking us to check again. await Task.Run(() => _partAvailable.WaitOne(), cancellationToken).ConfigureAwait(false); - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Woke from _partAvailable wait. Rechecking for part {0}...", + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Woke from _partAvailable wait. Rechecking for part {0}...", currentPartNumber); } - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} is available. Reading from data source...", + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} is available. Reading from data source...", currentPartNumber); // At this point, the expected part is available in the dictionary. @@ -446,7 +412,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat if (!_partDataSources.TryGetValue(currentPartNumber, out var dataSource)) { // Log technical details for troubleshooting - Logger.Error(null, "PartBufferManager: Part {0} disappeared after availability check. This indicates a race condition in the buffer manager.", currentPartNumber); + _logger.Error(null, "PartBufferManager: Part {0} disappeared after availability check. This indicates a race condition in the buffer manager.", currentPartNumber); // Throw user-friendly exception throw new InvalidOperationException("Multipart download failed due to an internal error."); @@ -457,13 +423,13 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat // Read from this part's buffer into the caller's buffer. var partBytesRead = await dataSource.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Read {0} bytes from part {1}. IsComplete={2}", + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Read {0} bytes from part {1}. IsComplete={2}", partBytesRead, currentPartNumber, dataSource.IsComplete); // If this part is fully consumed, perform cleanup and advance to next part. if (dataSource.IsComplete) { - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} is complete. Cleaning up and advancing to next part...", + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} is complete. Cleaning up and advancing to next part...", currentPartNumber); // Remove from collection @@ -482,7 +448,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat // Advance to next part. _nextExpectedPartNumber++; - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Cleaned up part {0}. Next expected part: {1} (BufferedParts count: {2})", + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Cleaned up part {0}. Next expected part: {1} (BufferedParts count: {2})", currentPartNumber, _nextExpectedPartNumber, _partDataSources.Count); // Continue reading to fill buffer across part boundaries. @@ -498,11 +464,11 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat // If part is not complete but we got 0 bytes, it's EOF if (partBytesRead == 0) { - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} returned 0 bytes (EOF)", currentPartNumber); + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} returned 0 bytes (EOF)", currentPartNumber); return (0, false); } - Logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} has more data. Returning {1} bytes (will resume on next call)", + _logger.DebugFormat("PartBufferManager.ReadFromCurrentPart: Part {0} has more data. Returning {1} bytes (will resume on next call)", currentPartNumber, partBytesRead); // Part still has more data available. Return what we read. @@ -511,7 +477,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat } catch (Exception ex) { - Logger.Error(ex, "PartBufferManager.ReadFromCurrentPart: Error reading from part {0}: {1}", + _logger.Error(ex, "PartBufferManager.ReadFromCurrentPart: Error reading from part {0}: {1}", currentPartNumber, ex.Message); // Clean up on failure to prevent resource leaks @@ -543,7 +509,7 @@ public void ReleaseBufferSpace() _bufferSpaceAvailable.Release(); var availableAfter = _bufferSpaceAvailable.CurrentCount; - Logger.DebugFormat("PartBufferManager: Buffer space released (Available slots after release: {0})", availableAfter); + _logger.DebugFormat("PartBufferManager: Buffer space released (Available slots after release: {0})", availableAfter); } /// @@ -568,11 +534,11 @@ public void MarkDownloadComplete(Exception exception) { if (exception != null) { - Logger.Error(exception, "PartBufferManager: Download marked complete with error. Signaling completion."); + _logger.Error(exception, "PartBufferManager: Download marked complete with error. Signaling completion."); } else { - Logger.DebugFormat("PartBufferManager: Download marked complete successfully. Signaling completion."); + _logger.DebugFormat("PartBufferManager: Download marked complete successfully. Signaling completion."); } // Create and assign new completion state atomically diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/StreamPartBuffer.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/StreamPartBuffer.cs deleted file mode 100644 index a850a9f9ad38..000000000000 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/StreamPartBuffer.cs +++ /dev/null @@ -1,175 +0,0 @@ -/******************************************************************************* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * Licensed under the Apache License, Version 2.0 (the "License"). You may not use - * this file except in compliance with the License. A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * ***************************************************************************** - * __ _ _ ___ - * ( )( \/\/ )/ __) - * /__\ \ / \__ \ - * (_)(_) \/\/ (___/ - * - * AWS SDK for .NET - * API Version: 2006-03-01 - * - */ -using System; -using System.Buffers; -using System.Diagnostics.CodeAnalysis; -using Amazon.S3.Model; - -namespace Amazon.S3.Transfer.Internal -{ - /// - /// Container for downloaded part data optimized for streaming scenarios. - /// Uses ArrayPool buffers and tracks reading position for sequential access - /// by BufferedMultipartStream. - /// - internal class StreamPartBuffer : IDisposable - { - private bool _disposed = false; - - /// - /// Gets or sets the part number for priority queue ordering. - /// For Part GET strategy: Uses the actual part number from the multipart upload. - /// For Range GET strategy: Calculated based on byte range position. - /// - public int PartNumber { get; set; } - - /// - /// Gets or sets the ArrayPool buffer containing the downloaded part data. - /// Ownership belongs to this StreamPartBuffer and will be returned to pool on disposal. - /// - public byte[] ArrayPoolBuffer { get; set; } - - /// - /// Gets or sets the current reading position within the buffer. - /// Used by BufferedMultipartStream for sequential reading. - /// - public int CurrentPosition { get; set; } = 0; - - /// - /// Gets the number of bytes remaining to be read from current position. - /// - public int RemainingBytes => Math.Max(0, Length - CurrentPosition); - - /// - /// Gets or sets the length of valid data in the ArrayPool buffer. - /// The buffer may be larger than this due to ArrayPool size rounding. - /// - public int Length { get; set; } - - /// - /// Creates a new StreamPartBuffer for streaming scenarios. - /// For internal use only - external callers should use Create() factory method. - /// - /// The part number for ordering - /// The ArrayPool buffer containing the data (ownership transferred) - /// The length of valid data in the buffer - internal StreamPartBuffer(int partNumber, byte[] arrayPoolBuffer, int length) - { - PartNumber = partNumber; - ArrayPoolBuffer = arrayPoolBuffer; - Length = length; - CurrentPosition = 0; - } - - /// - /// Creates a new StreamPartBuffer with a rented ArrayPool buffer. - /// The StreamPartBuffer takes ownership and will return the buffer on disposal. - /// - /// The part number for ordering - /// Initial capacity needed for the buffer - /// A StreamPartBuffer with rented buffer ready for writing - public static StreamPartBuffer Create(int partNumber, int capacity) - { - var buffer = ArrayPool.Shared.Rent(capacity); - return new StreamPartBuffer(partNumber, buffer, 0); // Length will be set after writing - } - - /// - /// Sets the length of valid data in the buffer after writing. - /// Can only be called once to prevent state corruption. - /// - /// The number of valid bytes written to the buffer - /// Thrown if length has already been set - /// Thrown if length is negative or exceeds buffer capacity - internal void SetLength(int length) - { - if (Length > 0) - throw new InvalidOperationException("Length has already been set and cannot be changed"); - - if (length < 0) - throw new ArgumentOutOfRangeException(nameof(length), "Length must be non-negative"); - - if (ArrayPoolBuffer != null && length > ArrayPoolBuffer.Length) - throw new ArgumentOutOfRangeException(nameof(length), "Length exceeds buffer capacity"); - - Length = length; - } - - /// - /// Returns a string representation of this StreamPartBuffer for debugging. - /// - /// A string describing this stream part buffer - public override string ToString() - { - return $"StreamPartBuffer(Part={PartNumber}, ArrayPool={Length} bytes, pos={CurrentPosition}, remaining={RemainingBytes})"; - } - - #region IDisposable Implementation - - /// - /// Releases all resources used by this StreamPartBuffer. - /// - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - /// - /// Releases the unmanaged resources used by the StreamPartBuffer and optionally releases the managed resources. - /// Returns the ArrayPool buffer back to the shared pool. - /// - /// True to release both managed and unmanaged resources; false to release only unmanaged resources. - [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Dispose methods should not throw exceptions")] - protected virtual void Dispose(bool disposing) - { - if (!_disposed && disposing) - { - try - { - // Return ArrayPool buffer to shared pool - if (ArrayPoolBuffer != null) - { - ArrayPool.Shared.Return(ArrayPoolBuffer); - ArrayPoolBuffer = null; - } - } - catch (Exception) - { - - } - - _disposed = true; - } - } - - /// - /// Finalizer to ensure resources are cleaned up if Dispose is not called. - /// - ~StreamPartBuffer() - { - Dispose(false); - } - - #endregion - } -} diff --git a/sdk/test/Services/S3/UnitTests/Custom/BufferedDataSourceTests.cs b/sdk/test/Services/S3/UnitTests/Custom/BufferedDataSourceTests.cs deleted file mode 100644 index 24b25b97ea93..000000000000 --- a/sdk/test/Services/S3/UnitTests/Custom/BufferedDataSourceTests.cs +++ /dev/null @@ -1,487 +0,0 @@ -using Amazon.S3.Transfer.Internal; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using System; -using System.Buffers; -using System.Threading; -using System.Threading.Tasks; - -namespace AWSSDK.UnitTests -{ - /// - /// Unit tests for BufferedDataSource class. - /// Tests reading from pre-buffered StreamPartBuffer data. - /// - [TestClass] - public class BufferedDataSourceTests - { - #region Constructor Tests - - [TestMethod] - public void Constructor_WithValidPartBuffer_CreatesDataSource() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - - // Act - var dataSource = new BufferedDataSource(partBuffer); - - // Assert - Assert.IsNotNull(dataSource); - Assert.AreEqual(1, dataSource.PartNumber); - Assert.IsFalse(dataSource.IsComplete); - - // Cleanup - dataSource.Dispose(); - } - - [TestMethod] - [ExpectedException(typeof(ArgumentNullException))] - public void Constructor_WithNullPartBuffer_ThrowsArgumentNullException() - { - // Act - var dataSource = new BufferedDataSource(null); - - // Assert - ExpectedException - } - - #endregion - - #region Property Tests - - [TestMethod] - public void PartNumber_ReturnsPartBufferPartNumber() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(5, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - try - { - // Act & Assert - Assert.AreEqual(5, dataSource.PartNumber); - } - finally - { - dataSource.Dispose(); - } - } - - [TestMethod] - public void IsComplete_WhenNoRemainingBytes_ReturnsTrue() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - partBuffer.CurrentPosition = 512; // Move to end - var dataSource = new BufferedDataSource(partBuffer); - - try - { - // Act & Assert - Assert.IsTrue(dataSource.IsComplete); - } - finally - { - dataSource.Dispose(); - } - } - - [TestMethod] - public void IsComplete_WhenRemainingBytes_ReturnsFalse() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - try - { - // Act & Assert - Assert.IsFalse(dataSource.IsComplete); - } - finally - { - dataSource.Dispose(); - } - } - - #endregion - - #region ReadAsync Tests - Happy Path - - [TestMethod] - public async Task ReadAsync_ReadsDataFromPartBuffer() - { - // Arrange - byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - Buffer.BlockCopy(testData, 0, testBuffer, 0, 512); - - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - byte[] readBuffer = new byte[512]; - - try - { - // Act - int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 512, CancellationToken.None); - - // Assert - Assert.AreEqual(512, bytesRead); - Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, readBuffer, 0, 512)); - Assert.IsTrue(dataSource.IsComplete); - } - finally - { - dataSource.Dispose(); - } - } - - [TestMethod] - public async Task ReadAsync_WithPartialRead_ReturnsRequestedBytes() - { - // Arrange - byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - Buffer.BlockCopy(testData, 0, testBuffer, 0, 512); - - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - byte[] readBuffer = new byte[256]; - - try - { - // Act - int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 256, CancellationToken.None); - - // Assert - Assert.AreEqual(256, bytesRead); - Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, readBuffer, 0, 256)); - Assert.IsFalse(dataSource.IsComplete); - } - finally - { - dataSource.Dispose(); - } - } - - [TestMethod] - public async Task ReadAsync_WithFullRead_ReadsAllRemainingBytes() - { - // Arrange - byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - Buffer.BlockCopy(testData, 0, testBuffer, 0, 512); - - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - byte[] readBuffer = new byte[1024]; // Larger than available - - try - { - // Act - int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 1024, CancellationToken.None); - - // Assert - Assert.AreEqual(512, bytesRead); // Only 512 available - Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, readBuffer, 0, 512)); - Assert.IsTrue(dataSource.IsComplete); - } - finally - { - dataSource.Dispose(); - } - } - - [TestMethod] - public async Task ReadAsync_WhenComplete_ReturnsZero() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - partBuffer.CurrentPosition = 512; // Move to end - var dataSource = new BufferedDataSource(partBuffer); - - byte[] readBuffer = new byte[256]; - - try - { - // Act - int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 256, CancellationToken.None); - - // Assert - Assert.AreEqual(0, bytesRead); - Assert.IsTrue(dataSource.IsComplete); - } - finally - { - dataSource.Dispose(); - } - } - - #endregion - - #region ReadAsync Tests - Parameter Validation - - [TestMethod] - [ExpectedException(typeof(ArgumentNullException))] - public async Task ReadAsync_WithNullBuffer_ThrowsArgumentNullException() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - try - { - // Act - await dataSource.ReadAsync(null, 0, 100, CancellationToken.None); - - // Assert - ExpectedException - } - finally - { - dataSource.Dispose(); - } - } - - [TestMethod] - [ExpectedException(typeof(ArgumentOutOfRangeException))] - public async Task ReadAsync_WithNegativeOffset_ThrowsArgumentOutOfRangeException() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - byte[] readBuffer = new byte[256]; - - try - { - // Act - await dataSource.ReadAsync(readBuffer, -1, 100, CancellationToken.None); - - // Assert - ExpectedException - } - finally - { - dataSource.Dispose(); - } - } - - [TestMethod] - [ExpectedException(typeof(ArgumentOutOfRangeException))] - public async Task ReadAsync_WithNegativeCount_ThrowsArgumentOutOfRangeException() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - byte[] readBuffer = new byte[256]; - - try - { - // Act - await dataSource.ReadAsync(readBuffer, 0, -1, CancellationToken.None); - - // Assert - ExpectedException - } - finally - { - dataSource.Dispose(); - } - } - - [TestMethod] - [ExpectedException(typeof(ArgumentException))] - public async Task ReadAsync_WithOffsetCountExceedingBounds_ThrowsArgumentException() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - byte[] readBuffer = new byte[256]; - - try - { - // Act - offset + count > buffer.Length - await dataSource.ReadAsync(readBuffer, 100, 200, CancellationToken.None); - - // Assert - ExpectedException - } - finally - { - dataSource.Dispose(); - } - } - - #endregion - - #region ReadAsync Tests - Multiple Reads - - [TestMethod] - public async Task ReadAsync_MultipleReads_ConsumesAllData() - { - // Arrange - byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - Buffer.BlockCopy(testData, 0, testBuffer, 0, 512); - - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - byte[] readBuffer1 = new byte[256]; - byte[] readBuffer2 = new byte[256]; - - try - { - // Act - Read in two chunks - int bytesRead1 = await dataSource.ReadAsync(readBuffer1, 0, 256, CancellationToken.None); - int bytesRead2 = await dataSource.ReadAsync(readBuffer2, 0, 256, CancellationToken.None); - - // Assert - Assert.AreEqual(256, bytesRead1); - Assert.AreEqual(256, bytesRead2); - Assert.IsTrue(dataSource.IsComplete); - - // Verify data correctness - Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, readBuffer1, 0, 256)); - - // Extract second segment manually for .NET Framework compatibility - byte[] secondSegment = new byte[256]; - Buffer.BlockCopy(testData, 256, secondSegment, 0, 256); - Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch( - secondSegment, - readBuffer2, - 0, - 256)); - } - finally - { - dataSource.Dispose(); - } - } - - [TestMethod] - public async Task ReadAsync_ReadingToEnd_ReturnsZeroOnSubsequentReads() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - byte[] readBuffer = new byte[512]; - - try - { - // Act - Read all data - int bytesRead1 = await dataSource.ReadAsync(readBuffer, 0, 512, CancellationToken.None); - - // Try to read again - int bytesRead2 = await dataSource.ReadAsync(readBuffer, 0, 512, CancellationToken.None); - - // Assert - Assert.AreEqual(512, bytesRead1); - Assert.AreEqual(0, bytesRead2); - Assert.IsTrue(dataSource.IsComplete); - } - finally - { - dataSource.Dispose(); - } - } - - #endregion - - #region Error Handling Tests - - [TestMethod] - public async Task ReadAsync_WhenExceptionDuringRead_MarksBufferConsumed() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - // Create a buffer that will cause BlockCopy to fail (null destination) - byte[] readBuffer = null; - - try - { - // Act & Assert - Should throw ArgumentNullException - await Assert.ThrowsExceptionAsync(async () => - { - await dataSource.ReadAsync(readBuffer, 0, 512, CancellationToken.None); - }); - - // Verify buffer was marked as consumed (position set to Length) - Assert.IsTrue(dataSource.IsComplete); - } - finally - { - dataSource.Dispose(); - } - } - - #endregion - - #region Disposal Tests - - [TestMethod] - public void Dispose_DisposesUnderlyingPartBuffer() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - // Act - dataSource.Dispose(); - - // Assert - The underlying part buffer should be disposed (ArrayPoolBuffer nulled) - Assert.IsNull(partBuffer.ArrayPoolBuffer); - } - - [TestMethod] - public void Dispose_MultipleCalls_IsIdempotent() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - - // Act - Dispose multiple times - dataSource.Dispose(); - dataSource.Dispose(); - dataSource.Dispose(); - - // Assert - Should not throw - Assert.IsNull(partBuffer.ArrayPoolBuffer); - } - - [TestMethod] - [ExpectedException(typeof(ObjectDisposedException))] - public async Task ReadAsync_AfterDispose_ThrowsObjectDisposedException() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); - byte[] readBuffer = new byte[256]; - - // Dispose the data source - dataSource.Dispose(); - - // Act - Try to read after disposal - await dataSource.ReadAsync(readBuffer, 0, 256, CancellationToken.None); - - // Assert - ExpectedException - } - - #endregion - } -} diff --git a/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs index 645dae927051..7e8ca79cdfcc 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs @@ -76,7 +76,7 @@ public async Task ProcessPartAsync_InOrderPart_CreatesStreamingDataSource() mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); IPartDataSource capturedDataSource = null; - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())) + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())) .Callback((ds) => capturedDataSource = ds); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -109,7 +109,7 @@ public async Task ProcessPartAsync_InOrderPart_ReleasesCapacityImmediately() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var mockBufferManager = new Mock(); mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -122,8 +122,8 @@ public async Task ProcessPartAsync_InOrderPart_ReleasesCapacityImmediately() // Assert - ReleaseBufferSpace should be called (through ReleaseCapacity) // Handler calls ReleaseBufferSpace directly, which eventually calls the manager's method - // We verify the AddBuffer was called with a StreamingDataSource - mockBufferManager.Verify(m => m.AddBuffer( + // We verify the AddDataSource was called with a StreamingDataSource + mockBufferManager.Verify(m => m.AddDataSource( It.Is(ds => ds is StreamingDataSource)), Times.Once); } finally @@ -139,7 +139,7 @@ public async Task ProcessPartAsync_InOrderPart_DoesNotDisposeResponse() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var mockBufferManager = new Mock(); mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -170,7 +170,7 @@ public async Task ProcessPartAsync_MultipleInOrderParts_AllStreamDirectly() mockBufferManager.Setup(m => m.NextExpectedPartNumber) .Returns(() => streamingCount + 1); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())) + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())) .Callback((ds) => { if (ds is StreamingDataSource) @@ -207,9 +207,9 @@ public async Task ProcessPartAsync_OutOfOrderPart_BuffersToMemory() var mockBufferManager = new Mock(); mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); - StreamPartBuffer capturedBuffer = null; - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())) - .Callback((buffer) => capturedBuffer = buffer); + ChunkedPartDataSource capturedDataSource = null; + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())) + .Callback((ds) => capturedDataSource = ds as ChunkedPartDataSource); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -222,13 +222,13 @@ public async Task ProcessPartAsync_OutOfOrderPart_BuffersToMemory() await handler.ProcessPartAsync(2, response, CancellationToken.None); // Assert - Assert.IsNotNull(capturedBuffer); - Assert.AreEqual(2, capturedBuffer.PartNumber); - Assert.AreEqual(512, capturedBuffer.Length); + Assert.IsNotNull(capturedDataSource); + Assert.AreEqual(2, capturedDataSource.PartNumber); - // Verify data was buffered correctly + // Verify data was buffered correctly by reading from the ChunkedPartDataSource byte[] bufferData = new byte[512]; - Buffer.BlockCopy(capturedBuffer.ArrayPoolBuffer, 0, bufferData, 0, 512); + int bytesRead = await capturedDataSource.ReadAsync(bufferData, 0, 512, CancellationToken.None); + Assert.AreEqual(512, bytesRead); Assert.IsTrue(MultipartDownloadTestHelpers.VerifyDataMatch(testData, bufferData, 0, 512)); } finally @@ -244,7 +244,7 @@ public async Task ProcessPartAsync_OutOfOrderPart_DisposesResponse() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var mockBufferManager = new Mock(); mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -272,7 +272,7 @@ public async Task ProcessPartAsync_OutOfOrderPart_DoesNotReleaseCapacityImmediat var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var mockBufferManager = new Mock(); mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -283,9 +283,9 @@ public async Task ProcessPartAsync_OutOfOrderPart_DoesNotReleaseCapacityImmediat // Act await handler.ProcessPartAsync(2, response, CancellationToken.None); - // Assert - AddBuffer should be called with StreamPartBuffer (not IPartDataSource) - mockBufferManager.Verify(m => m.AddBuffer( - It.IsAny()), Times.Once); + // Assert - AddDataSource should be called with ChunkedPartDataSource + mockBufferManager.Verify(m => m.AddDataSource( + It.IsAny()), Times.Once); // Note: Capacity will be released later when the buffer is consumed by the reader } @@ -313,7 +313,7 @@ public async Task ProcessPartAsync_MixedInOrderAndOutOfOrder_HandlesCorrectly() var streamingParts = 0; var bufferedParts = 0; - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())) + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())) .Callback((ds) => { if (ds is StreamingDataSource) @@ -323,8 +323,8 @@ public async Task ProcessPartAsync_MixedInOrderAndOutOfOrder_HandlesCorrectly() } }); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())) - .Callback((buffer) => bufferedParts++); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())) + .Callback((ds) => bufferedParts++); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -356,8 +356,8 @@ public async Task ProcessPartAsync_InOrderFollowedByOutOfOrder_HandlesCorrectly( .Returns(1) .Returns(2); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -368,11 +368,11 @@ public async Task ProcessPartAsync_InOrderFollowedByOutOfOrder_HandlesCorrectly( await handler.ProcessPartAsync(3, CreateMockGetObjectResponse(512), CancellationToken.None); // Assert - mockBufferManager.Verify(m => m.AddBuffer( + mockBufferManager.Verify(m => m.AddDataSource( It.Is(ds => ds is StreamingDataSource && ds.PartNumber == 1)), Times.Once); - mockBufferManager.Verify(m => m.AddBuffer( - It.Is(b => b.PartNumber == 3)), Times.Once); + mockBufferManager.Verify(m => m.AddDataSource( + It.Is(ds => ds.PartNumber == 3)), Times.Once); } finally { @@ -392,8 +392,8 @@ public async Task ProcessPartAsync_OutOfOrderFollowedByInOrder_HandlesCorrectly( // Part 1 (in order): calls it twice, should return 1 both times mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -404,10 +404,10 @@ public async Task ProcessPartAsync_OutOfOrderFollowedByInOrder_HandlesCorrectly( await handler.ProcessPartAsync(1, CreateMockGetObjectResponse(512), CancellationToken.None); // Assert - mockBufferManager.Verify(m => m.AddBuffer( - It.Is(b => b.PartNumber == 2)), Times.Once); + mockBufferManager.Verify(m => m.AddDataSource( + It.Is(ds => ds.PartNumber == 2)), Times.Once); - mockBufferManager.Verify(m => m.AddBuffer( + mockBufferManager.Verify(m => m.AddDataSource( It.Is(ds => ds is StreamingDataSource && ds.PartNumber == 1)), Times.Once); } finally @@ -429,20 +429,20 @@ public async Task ProcessPartAsync_InOrderVsOutOfOrder_VerifyStreamingVsBufferin mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); - // Capture StreamingDataSource additions (streaming path - NO ArrayPool allocation) - mockBufferManager.Setup(m => m.AddBuffer( + // Capture StreamingDataSource additions (streaming path - NO buffering) + mockBufferManager.Setup(m => m.AddDataSource( It.IsAny())) .Callback((ds) => { streamingPartNumbers.Add(ds.PartNumber); }); - // Capture StreamPartBuffer additions (buffering path - USES ArrayPool) - mockBufferManager.Setup(m => m.AddBuffer( - It.IsAny())) - .Callback((buffer) => + // Capture ChunkedPartDataSource additions (buffering path - uses ChunkedBufferStream) + mockBufferManager.Setup(m => m.AddDataSource( + It.IsAny())) + .Callback((ds) => { - bufferedPartNumbers.Add(buffer.PartNumber); + bufferedPartNumbers.Add(ds.PartNumber); }); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -456,11 +456,11 @@ public async Task ProcessPartAsync_InOrderVsOutOfOrder_VerifyStreamingVsBufferin await handler.ProcessPartAsync(3, CreateMockGetObjectResponse(512), CancellationToken.None); // Assert - // Part 1 should use streaming path (no ArrayPool allocation) + // Part 1 should use streaming path (no buffering) Assert.AreEqual(1, streamingPartNumbers.Count, "Expected exactly 1 part to stream"); Assert.AreEqual(1, streamingPartNumbers[0], "Part 1 should stream directly"); - // Part 3 should use buffering path (ArrayPool allocation) + // Part 3 should use buffering path (ChunkedBufferStream) Assert.AreEqual(1, bufferedPartNumbers.Count, "Expected exactly 1 part to be buffered"); Assert.AreEqual(3, bufferedPartNumbers[0], "Part 3 should be buffered"); } @@ -485,7 +485,7 @@ public async Task ProcessPartAsync_AllInOrderParts_NoBufferingAllStreaming() .Returns(() => currentExpectedPart); // Capture streaming additions - mockBufferManager.Setup(m => m.AddBuffer( + mockBufferManager.Setup(m => m.AddDataSource( It.IsAny())) .Callback((ds) => { @@ -494,11 +494,11 @@ public async Task ProcessPartAsync_AllInOrderParts_NoBufferingAllStreaming() }); // Capture buffering additions - mockBufferManager.Setup(m => m.AddBuffer( - It.IsAny())) - .Callback((buffer) => + mockBufferManager.Setup(m => m.AddDataSource( + It.IsAny())) + .Callback((ds) => { - bufferedPartNumbers.Add(buffer.PartNumber); + bufferedPartNumbers.Add(ds.PartNumber); }); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -539,7 +539,7 @@ public async Task ProcessPartAsync_StreamingPathError_ReleasesCapacity() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var mockBufferManager = new Mock(); mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())) + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())) .Throws(new InvalidOperationException("Test error")); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -570,7 +570,7 @@ public async Task ProcessPartAsync_BufferingPathError_ReleasesCapacity() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var mockBufferManager = new Mock(); mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())) + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())) .Throws(new InvalidOperationException("Test error")); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -779,7 +779,7 @@ public async Task ProcessPartAsync_StreamingPart_ReleasesCapacityOnlyOnce() mockBufferManager.Setup(m => m.ReleaseBufferSpace()) .Callback(() => releaseCount++); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -797,8 +797,8 @@ public async Task ProcessPartAsync_StreamingPart_ReleasesCapacityOnlyOnce() "ProcessPartAsync should not release capacity for streaming parts. " + "Capacity is released by PartBufferManager when consumer completes reading."); - // Verify AddBuffer was called with StreamingDataSource (streaming path taken) - mockBufferManager.Verify(m => m.AddBuffer( + // Verify AddDataSource was called with StreamingDataSource (streaming path taken) + mockBufferManager.Verify(m => m.AddDataSource( It.Is(ds => ds is StreamingDataSource)), Times.Once); } finally @@ -822,7 +822,7 @@ public async Task ProcessPartAsync_BufferedPart_DoesNotReleaseCapacityImmediatel mockBufferManager.Setup(m => m.ReleaseBufferSpace()) .Callback(() => releaseCount++); - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -839,9 +839,9 @@ public async Task ProcessPartAsync_BufferedPart_DoesNotReleaseCapacityImmediatel "ProcessPartAsync should not release capacity for buffered parts. " + "Capacity is released by PartBufferManager when consumer completes reading."); - // Verify AddBuffer was called with StreamPartBuffer (buffering path taken) - mockBufferManager.Verify(m => m.AddBuffer( - It.IsAny()), Times.Once); + // Verify AddDataSource was called with ChunkedPartDataSource (buffering path taken) + mockBufferManager.Verify(m => m.AddDataSource( + It.IsAny()), Times.Once); } finally { @@ -865,7 +865,7 @@ public async Task ProcessPartAsync_StreamingPartError_DoesNotDoubleRelease() .Callback(() => releaseCount++); // Simulate error when adding buffer - mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())) + mockBufferManager.Setup(m => m.AddDataSource(It.IsAny())) .Throws(new InvalidOperationException("Test error")); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); diff --git a/sdk/test/Services/S3/UnitTests/Custom/ChunkedBufferStreamTests.cs b/sdk/test/Services/S3/UnitTests/Custom/ChunkedBufferStreamTests.cs new file mode 100644 index 000000000000..68689e4a417a --- /dev/null +++ b/sdk/test/Services/S3/UnitTests/Custom/ChunkedBufferStreamTests.cs @@ -0,0 +1,864 @@ +using Amazon.S3.Transfer.Internal; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace AWSSDK.UnitTests +{ + /// + /// Unit tests for ChunkedBufferStream class. + /// Tests ArrayPool-based chunked buffer management and Stream interface compliance. + /// + [TestClass] + public class ChunkedBufferStreamTests + { + #region Creation and Mode Tests + + [TestMethod] + public void Constructor_InitializesInWriteMode() + { + // Arrange & Act + using (var stream = new ChunkedBufferStream()) + { + // Assert + Assert.IsTrue(stream.CanWrite); + Assert.IsFalse(stream.CanRead); + Assert.IsFalse(stream.CanSeek); + Assert.AreEqual(0, stream.Length); + Assert.AreEqual(0, stream.Position); + } + } + + [TestMethod] + public void SwitchToReadMode_TransitionsCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = Encoding.UTF8.GetBytes("Test data for read mode"); + stream.Write(testData, 0, testData.Length); + + // Act + stream.SwitchToReadMode(); + + // Assert + Assert.IsTrue(stream.CanRead); + Assert.IsFalse(stream.CanWrite); + Assert.IsFalse(stream.CanSeek); + Assert.AreEqual(0, stream.Position); // Position reset to 0 + Assert.AreEqual(testData.Length, stream.Length); + } + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void SwitchToReadMode_CalledTwice_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + stream.SwitchToReadMode(); + + // Act - Should throw + stream.SwitchToReadMode(); + } + } + + [TestMethod] + [ExpectedException(typeof(NotSupportedException))] + public void Write_AfterSwitchToReadMode_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + stream.SwitchToReadMode(); + byte[] testData = new byte[100]; + + // Act - Should throw + stream.Write(testData, 0, testData.Length); + } + } + + #endregion + + #region Write Operation Tests + + [TestMethod] + public void Write_SingleChunk_WritesCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = new byte[1024]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)(i % 256); + + // Act + stream.Write(testData, 0, testData.Length); + + // Assert + Assert.AreEqual(testData.Length, stream.Length); + Assert.AreEqual(testData.Length, stream.Position); + + // Verify data by reading back + stream.SwitchToReadMode(); + byte[] readBuffer = new byte[testData.Length]; + int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length); + + Assert.AreEqual(testData.Length, bytesRead); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + [TestMethod] + public void Write_MultipleChunks_WritesCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Write 200KB of data (spans multiple 80KB chunks) + int totalSize = 200 * 1024; + byte[] testData = new byte[totalSize]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)(i % 256); + + // Act + stream.Write(testData, 0, testData.Length); + + // Assert + Assert.AreEqual(totalSize, stream.Length); + + // Verify data by reading back + stream.SwitchToReadMode(); + byte[] readBuffer = new byte[totalSize]; + int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length); + + Assert.AreEqual(totalSize, bytesRead); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + [TestMethod] + public void Write_AtChunkBoundary_WritesCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Write exactly 80KB (one chunk size) + int chunkSize = 81920; // 80KB + byte[] testData = new byte[chunkSize]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)(i % 256); + + // Act + stream.Write(testData, 0, testData.Length); + + // Assert + Assert.AreEqual(chunkSize, stream.Length); + + // Write one more byte to trigger next chunk + byte[] oneByte = new byte[] { 42 }; + stream.Write(oneByte, 0, 1); + Assert.AreEqual(chunkSize + 1, stream.Length); + + // Verify data + stream.SwitchToReadMode(); + byte[] readBuffer = new byte[chunkSize + 1]; + int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length); + + Assert.AreEqual(chunkSize + 1, bytesRead); + for (int i = 0; i < chunkSize; i++) + Assert.AreEqual(testData[i], readBuffer[i]); + Assert.AreEqual(42, readBuffer[chunkSize]); + } + } + + [TestMethod] + public async Task WriteAsync_DelegatesToWrite() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = Encoding.UTF8.GetBytes("Test async write"); + + // Act + await stream.WriteAsync(testData, 0, testData.Length, CancellationToken.None); + + // Assert + Assert.AreEqual(testData.Length, stream.Length); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void Write_NullBuffer_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Act - Should throw + stream.Write(null, 0, 100); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentOutOfRangeException))] + public void Write_NegativeOffset_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] buffer = new byte[100]; + + // Act - Should throw + stream.Write(buffer, -1, 50); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentOutOfRangeException))] + public void Write_NegativeCount_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] buffer = new byte[100]; + + // Act - Should throw + stream.Write(buffer, 0, -1); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void Write_OffsetAndCountExceedBufferBounds_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] buffer = new byte[100]; + + // Act - Should throw + stream.Write(buffer, 50, 60); // 50 + 60 = 110 > 100 + } + } + + #endregion + + #region Read Operation Tests + + [TestMethod] + public void Read_SingleChunk_ReadsCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = new byte[1024]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)(i % 256); + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + // Act + byte[] readBuffer = new byte[testData.Length]; + int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length); + + // Assert + Assert.AreEqual(testData.Length, bytesRead); + CollectionAssert.AreEqual(testData, readBuffer); + Assert.AreEqual(testData.Length, stream.Position); + } + } + + [TestMethod] + public void Read_AcrossMultipleChunks_ReadsCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Write 200KB spanning multiple chunks + int totalSize = 200 * 1024; + byte[] testData = new byte[totalSize]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)(i % 256); + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + // Act - Read in one go + byte[] readBuffer = new byte[totalSize]; + int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length); + + // Assert + Assert.AreEqual(totalSize, bytesRead); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + [TestMethod] + public void Read_InMultipleChunks_ReadsCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = new byte[10000]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)(i % 256); + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + // Act - Read in multiple smaller reads + byte[] readBuffer = new byte[testData.Length]; + int totalRead = 0; + int chunkSize = 1000; + + while (totalRead < testData.Length) + { + int bytesRead = stream.Read(readBuffer, totalRead, Math.Min(chunkSize, testData.Length - totalRead)); + totalRead += bytesRead; + } + + // Assert + Assert.AreEqual(testData.Length, totalRead); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + [TestMethod] + public void Read_AtEndOfStream_ReturnsZero() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = new byte[100]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + // Read all data + byte[] readBuffer = new byte[100]; + stream.Read(readBuffer, 0, 100); + + // Act - Try to read more + int bytesRead = stream.Read(readBuffer, 0, 100); + + // Assert + Assert.AreEqual(0, bytesRead); + Assert.AreEqual(100, stream.Position); + } + } + + [TestMethod] + public void Read_PartialData_ReadsAvailableBytes() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = new byte[50]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)i; + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + // Act - Try to read more than available + byte[] readBuffer = new byte[100]; + int bytesRead = stream.Read(readBuffer, 0, 100); + + // Assert - Should only read 50 bytes + Assert.AreEqual(50, bytesRead); + for (int i = 0; i < 50; i++) + Assert.AreEqual(testData[i], readBuffer[i]); + } + } + + [TestMethod] + public async Task ReadAsync_DelegatesToRead() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = Encoding.UTF8.GetBytes("Test async read"); + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + // Act + byte[] readBuffer = new byte[testData.Length]; + int bytesRead = await stream.ReadAsync(readBuffer, 0, readBuffer.Length, CancellationToken.None); + + // Assert + Assert.AreEqual(testData.Length, bytesRead); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void Read_NullBuffer_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + stream.SwitchToReadMode(); + + // Act - Should throw + stream.Read(null, 0, 100); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentOutOfRangeException))] + public void Read_NegativeOffset_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + stream.SwitchToReadMode(); + byte[] buffer = new byte[100]; + + // Act - Should throw + stream.Read(buffer, -1, 50); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentOutOfRangeException))] + public void Read_NegativeCount_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + stream.SwitchToReadMode(); + byte[] buffer = new byte[100]; + + // Act - Should throw + stream.Read(buffer, 0, -1); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void Read_OffsetAndCountExceedBufferBounds_ThrowsException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + stream.SwitchToReadMode(); + byte[] buffer = new byte[100]; + + // Act - Should throw + stream.Read(buffer, 50, 60); // 50 + 60 = 110 > 100 + } + } + + #endregion + + #region Property Tests + + [TestMethod] + public void CanRead_ReflectsCurrentMode() + { + // Arrange & Act + using (var stream = new ChunkedBufferStream()) + { + // Assert - In write mode + Assert.IsFalse(stream.CanRead); + + // Switch to read mode + stream.SwitchToReadMode(); + Assert.IsTrue(stream.CanRead); + } + } + + [TestMethod] + public void CanWrite_ReflectsCurrentMode() + { + // Arrange & Act + using (var stream = new ChunkedBufferStream()) + { + // Assert - In write mode + Assert.IsTrue(stream.CanWrite); + + // Switch to read mode + stream.SwitchToReadMode(); + Assert.IsFalse(stream.CanWrite); + } + } + + [TestMethod] + public void CanSeek_AlwaysReturnsFalse() + { + // Arrange & Act + using (var stream = new ChunkedBufferStream()) + { + // Assert - In write mode + Assert.IsFalse(stream.CanSeek); + + // Switch to read mode + stream.SwitchToReadMode(); + Assert.IsFalse(stream.CanSeek); + } + } + + [TestMethod] + public void Length_ReturnsCorrectValue() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Assert - Initially zero + Assert.AreEqual(0, stream.Length); + + // Write some data + byte[] testData = new byte[5000]; + stream.Write(testData, 0, testData.Length); + + // Assert - After writing + Assert.AreEqual(5000, stream.Length); + + // Switch to read mode + stream.SwitchToReadMode(); + + // Assert - Length unchanged + Assert.AreEqual(5000, stream.Length); + } + } + + [TestMethod] + public void Position_GetReturnsCorrectValue() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = new byte[1000]; + stream.Write(testData, 0, testData.Length); + + // Assert - Position at end after write + Assert.AreEqual(1000, stream.Position); + + // Switch to read mode + stream.SwitchToReadMode(); + + // Assert - Position reset to 0 + Assert.AreEqual(0, stream.Position); + + // Read some data + byte[] readBuffer = new byte[300]; + stream.Read(readBuffer, 0, 300); + + // Assert - Position updated after read + Assert.AreEqual(300, stream.Position); + } + } + + [TestMethod] + [ExpectedException(typeof(NotSupportedException))] + public void Position_Set_ThrowsNotSupportedException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Act - Should throw + stream.Position = 100; + } + } + + #endregion + + #region Not Supported Operations + + [TestMethod] + [ExpectedException(typeof(NotSupportedException))] + public void Seek_ThrowsNotSupportedException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Act - Should throw + stream.Seek(0, SeekOrigin.Begin); + } + } + + [TestMethod] + [ExpectedException(typeof(NotSupportedException))] + public void SetLength_ThrowsNotSupportedException() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Act - Should throw + stream.SetLength(1000); + } + } + + [TestMethod] + public void Flush_DoesNotThrow() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + byte[] testData = new byte[100]; + stream.Write(testData, 0, testData.Length); + + // Act - Should not throw + stream.Flush(); + + // Assert - No-op, just verify it doesn't throw + Assert.AreEqual(100, stream.Length); + } + } + + #endregion + + #region Disposal Tests + + [TestMethod] + public void Dispose_ReturnsChunksToArrayPool() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[100000]; // Enough to allocate chunks + stream.Write(testData, 0, testData.Length); + + // Act + stream.Dispose(); + + // Assert - Accessing properties should throw ObjectDisposedException + try + { + var length = stream.Length; + Assert.Fail("Expected ObjectDisposedException"); + } + catch (ObjectDisposedException) + { + // Expected + } + } + + [TestMethod] + public void Dispose_MultipleCalls_IsIdempotent() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[1000]; + stream.Write(testData, 0, testData.Length); + + // Act - Dispose multiple times + stream.Dispose(); + stream.Dispose(); + stream.Dispose(); + + // Assert - Should not throw + } + + [TestMethod] + [ExpectedException(typeof(ObjectDisposedException))] + public void Write_AfterDispose_ThrowsObjectDisposedException() + { + // Arrange + var stream = new ChunkedBufferStream(); + stream.Dispose(); + + // Act - Should throw + byte[] testData = new byte[100]; + stream.Write(testData, 0, testData.Length); + } + + [TestMethod] + [ExpectedException(typeof(ObjectDisposedException))] + public void Read_AfterDispose_ThrowsObjectDisposedException() + { + // Arrange + var stream = new ChunkedBufferStream(); + stream.SwitchToReadMode(); + stream.Dispose(); + + // Act - Should throw + byte[] buffer = new byte[100]; + stream.Read(buffer, 0, 100); + } + + [TestMethod] + [ExpectedException(typeof(ObjectDisposedException))] + public void Length_AfterDispose_ThrowsObjectDisposedException() + { + // Arrange + var stream = new ChunkedBufferStream(); + stream.Dispose(); + + // Act - Should throw + var length = stream.Length; + } + + [TestMethod] + [ExpectedException(typeof(ObjectDisposedException))] + public void Position_AfterDispose_ThrowsObjectDisposedException() + { + // Arrange + var stream = new ChunkedBufferStream(); + stream.Dispose(); + + // Act - Should throw + var position = stream.Position; + } + + [TestMethod] + [ExpectedException(typeof(ObjectDisposedException))] + public void SwitchToReadMode_AfterDispose_ThrowsObjectDisposedException() + { + // Arrange + var stream = new ChunkedBufferStream(); + stream.Dispose(); + + // Act - Should throw + stream.SwitchToReadMode(); + } + + #endregion + + #region Edge Cases + + [TestMethod] + public void EmptyStream_SwitchToReadMode_WorksCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Act - Don't write any data, just switch to read mode + stream.SwitchToReadMode(); + + // Assert + Assert.AreEqual(0, stream.Length); + Assert.AreEqual(0, stream.Position); + Assert.IsTrue(stream.CanRead); + + // Read should return 0 + byte[] buffer = new byte[100]; + int bytesRead = stream.Read(buffer, 0, 100); + Assert.AreEqual(0, bytesRead); + } + } + + [TestMethod] + public void Write_AtExactChunkBoundaries_HandlesCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + int chunkSize = 81920; // 80KB + + // Write exactly 2 chunks + byte[] chunk1 = new byte[chunkSize]; + byte[] chunk2 = new byte[chunkSize]; + + for (int i = 0; i < chunkSize; i++) + { + chunk1[i] = 1; + chunk2[i] = 2; + } + + // Act + stream.Write(chunk1, 0, chunkSize); + stream.Write(chunk2, 0, chunkSize); + + // Assert + Assert.AreEqual(chunkSize * 2, stream.Length); + + // Verify data + stream.SwitchToReadMode(); + byte[] readBuffer = new byte[chunkSize * 2]; + int bytesRead = stream.Read(readBuffer, 0, readBuffer.Length); + + Assert.AreEqual(chunkSize * 2, bytesRead); + + // Verify first chunk + for (int i = 0; i < chunkSize; i++) + Assert.AreEqual(1, readBuffer[i]); + + // Verify second chunk + for (int i = chunkSize; i < chunkSize * 2; i++) + Assert.AreEqual(2, readBuffer[i]); + } + } + + [TestMethod] + public void Read_AtChunkBoundary_HandlesCorrectly() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + int chunkSize = 81920; // 80KB + + // Write 2.5 chunks worth of data + byte[] testData = new byte[chunkSize * 2 + chunkSize / 2]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)(i % 256); + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + // Act - Read exactly one chunk + byte[] readBuffer1 = new byte[chunkSize]; + int bytesRead1 = stream.Read(readBuffer1, 0, chunkSize); + + // Act - Read next chunk + byte[] readBuffer2 = new byte[chunkSize]; + int bytesRead2 = stream.Read(readBuffer2, 0, chunkSize); + + // Act - Read remaining + byte[] readBuffer3 = new byte[chunkSize]; + int bytesRead3 = stream.Read(readBuffer3, 0, chunkSize); + + // Assert + Assert.AreEqual(chunkSize, bytesRead1); + Assert.AreEqual(chunkSize, bytesRead2); + Assert.AreEqual(chunkSize / 2, bytesRead3); + + // Verify data integrity + for (int i = 0; i < chunkSize; i++) + Assert.AreEqual((byte)(i % 256), readBuffer1[i]); + + for (int i = 0; i < chunkSize; i++) + Assert.AreEqual((byte)((chunkSize + i) % 256), readBuffer2[i]); + + for (int i = 0; i < chunkSize / 2; i++) + Assert.AreEqual((byte)((chunkSize * 2 + i) % 256), readBuffer3[i]); + } + } + + [TestMethod] + public void LargeData_HandlesMultipleChunks() + { + // Arrange + using (var stream = new ChunkedBufferStream()) + { + // Write 500KB of data (spans ~6 chunks) + int totalSize = 500 * 1024; + byte[] testData = new byte[totalSize]; + var random = new Random(42); // Fixed seed for reproducibility + random.NextBytes(testData); + + // Act - Write + stream.Write(testData, 0, testData.Length); + + // Assert - Length + Assert.AreEqual(totalSize, stream.Length); + + // Act - Read back + stream.SwitchToReadMode(); + byte[] readBuffer = new byte[totalSize]; + int bytesRead = stream.Read(readBuffer, 0, totalSize); + + // Assert - Data integrity + Assert.AreEqual(totalSize, bytesRead); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + #endregion + } +} diff --git a/sdk/test/Services/S3/UnitTests/Custom/ChunkedPartDataSourceTests.cs b/sdk/test/Services/S3/UnitTests/Custom/ChunkedPartDataSourceTests.cs new file mode 100644 index 000000000000..e920c1db9537 --- /dev/null +++ b/sdk/test/Services/S3/UnitTests/Custom/ChunkedPartDataSourceTests.cs @@ -0,0 +1,602 @@ +using Amazon.S3.Transfer.Internal; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace AWSSDK.UnitTests +{ + /// + /// Unit tests for ChunkedPartDataSource class. + /// Tests IPartDataSource implementation wrapping ChunkedBufferStream. + /// + [TestClass] + public class ChunkedPartDataSourceTests + { + #region Creation Tests + + [TestMethod] + public void Constructor_WithValidStream_CreatesDataSource() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = Encoding.UTF8.GetBytes("Test data for part"); + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + // Act + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Assert + Assert.AreEqual(1, dataSource.PartNumber); + Assert.IsFalse(dataSource.IsComplete); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void Constructor_WithNullStream_ThrowsException() + { + // Act - Should throw + var dataSource = new ChunkedPartDataSource(1, null); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void Constructor_WithStreamNotInReadMode_ThrowsException() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = Encoding.UTF8.GetBytes("Test data"); + stream.Write(testData, 0, testData.Length); + // Don't call SwitchToReadMode() + + // Act - Should throw + var dataSource = new ChunkedPartDataSource(1, stream); + } + + [TestMethod] + public void Constructor_SetsPartNumberCorrectly() + { + // Arrange + var stream = new ChunkedBufferStream(); + stream.SwitchToReadMode(); + + // Act + using (var dataSource = new ChunkedPartDataSource(5, stream)) + { + // Assert + Assert.AreEqual(5, dataSource.PartNumber); + } + } + + #endregion + + #region ReadAsync Tests + + [TestMethod] + public async Task ReadAsync_DelegatesToUnderlyingStream() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = Encoding.UTF8.GetBytes("Test data for reading"); + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Act + byte[] readBuffer = new byte[testData.Length]; + int bytesRead = await dataSource.ReadAsync(readBuffer, 0, readBuffer.Length, CancellationToken.None); + + // Assert + Assert.AreEqual(testData.Length, bytesRead); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + [TestMethod] + public async Task ReadAsync_ReturnsCorrectByteCount() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[1000]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)(i % 256); + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Act + byte[] readBuffer = new byte[500]; + int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 500, CancellationToken.None); + + // Assert + Assert.AreEqual(500, bytesRead); + for (int i = 0; i < 500; i++) + Assert.AreEqual(testData[i], readBuffer[i]); + } + } + + [TestMethod] + public async Task ReadAsync_HandlesPartialReads() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[100]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)i; + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Act - Try to read more than available + byte[] readBuffer = new byte[200]; + int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 200, CancellationToken.None); + + // Assert - Should only read 100 bytes + Assert.AreEqual(100, bytesRead); + for (int i = 0; i < 100; i++) + Assert.AreEqual(testData[i], readBuffer[i]); + } + } + + [TestMethod] + public async Task ReadAsync_RespectsCancellationToken() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = Encoding.UTF8.GetBytes("Test cancellation"); + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + var cts = new CancellationTokenSource(); + cts.Cancel(); // Cancel immediately + + // Act & Assert + byte[] readBuffer = new byte[100]; + try + { + await dataSource.ReadAsync(readBuffer, 0, 100, cts.Token); + Assert.Fail("Expected OperationCanceledException"); + } + catch (OperationCanceledException) + { + // Expected + } + } + } + + [TestMethod] + public async Task ReadAsync_WorksWithVariousBufferSizes() + { + // Arrange + var stream = new ChunkedBufferStream(); + int totalSize = 10000; + byte[] testData = new byte[totalSize]; + for (int i = 0; i < totalSize; i++) + testData[i] = (byte)(i % 256); + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Act - Read in varying chunk sizes + byte[] readBuffer = new byte[totalSize]; + int totalRead = 0; + int[] chunkSizes = { 100, 500, 1000, 2000, 6400 }; // Various sizes + + foreach (int chunkSize in chunkSizes) + { + int bytesRead = await dataSource.ReadAsync(readBuffer, totalRead, chunkSize, CancellationToken.None); + totalRead += bytesRead; + } + + // Assert + Assert.AreEqual(totalSize, totalRead); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + [TestMethod] + public async Task ReadAsync_ReturnsZeroAtEndOfStream() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[100]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Read all data + byte[] readBuffer = new byte[100]; + await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None); + + // Act - Try to read more + int bytesRead = await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None); + + // Assert + Assert.AreEqual(0, bytesRead); + } + } + + #endregion + + #region IsComplete Property Tests + + [TestMethod] + public void IsComplete_ReturnsFalse_WhenDataRemains() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[1000]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Assert + Assert.IsFalse(dataSource.IsComplete); + } + } + + [TestMethod] + public async Task IsComplete_ReturnsTrue_WhenFullyConsumed() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[100]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Act - Read all data + byte[] readBuffer = new byte[100]; + await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None); + + // Assert + Assert.IsTrue(dataSource.IsComplete); + } + } + + [TestMethod] + public async Task IsComplete_UpdatesCorrectly_AsDataIsRead() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[300]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Assert - Initially not complete + Assert.IsFalse(dataSource.IsComplete); + + // Act - Read partial data + byte[] readBuffer = new byte[100]; + await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None); + + // Assert - Still not complete + Assert.IsFalse(dataSource.IsComplete); + + // Act - Read more data + await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None); + + // Assert - Still not complete + Assert.IsFalse(dataSource.IsComplete); + + // Act - Read remaining data + await dataSource.ReadAsync(readBuffer, 0, 100, CancellationToken.None); + + // Assert - Now complete + Assert.IsTrue(dataSource.IsComplete); + } + } + + [TestMethod] + public void IsComplete_ReturnsTrue_ForEmptyStream() + { + // Arrange + var stream = new ChunkedBufferStream(); + stream.SwitchToReadMode(); // Empty stream + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Assert - Empty stream is considered "complete" since Position == Length (both 0) + Assert.IsTrue(dataSource.IsComplete); + } + } + + #endregion + + #region Disposal Tests + + [TestMethod] + public void Dispose_ReleasesUnderlyingStream() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[1000]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + var dataSource = new ChunkedPartDataSource(1, stream); + + // Act + dataSource.Dispose(); + + // Assert - Stream should be disposed, accessing it should throw + try + { + var length = stream.Length; + Assert.Fail("Expected ObjectDisposedException"); + } + catch (ObjectDisposedException) + { + // Expected + } + } + + [TestMethod] + public void Dispose_MultipleCalls_AreSafe() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[1000]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + var dataSource = new ChunkedPartDataSource(1, stream); + + // Act - Dispose multiple times + dataSource.Dispose(); + dataSource.Dispose(); + dataSource.Dispose(); + + // Assert - Should not throw + } + + [TestMethod] + public void Dispose_ReturnsChunksToArrayPool() + { + // Arrange + var stream = new ChunkedBufferStream(); + // Write enough data to allocate multiple chunks + byte[] testData = new byte[200 * 1024]; // 200KB + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + var dataSource = new ChunkedPartDataSource(1, stream); + + // Act + dataSource.Dispose(); + + // Assert - Stream disposed, chunks returned to pool + // We can't directly verify ArrayPool state, but we can verify disposal + try + { + var length = stream.Length; + Assert.Fail("Expected ObjectDisposedException"); + } + catch (ObjectDisposedException) + { + // Expected - confirms disposal happened + } + } + + #endregion + + #region ToString Tests + + [TestMethod] + public void ToString_ReturnsExpectedFormat() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[1500]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(3, stream)) + { + // Act + string result = dataSource.ToString(); + + // Assert - Verify format contains key information + Assert.IsTrue(result.Contains("Part=3")); + Assert.IsTrue(result.Contains("1500 bytes")); + Assert.IsTrue(result.Contains("Position=0")); + } + } + + [TestMethod] + public async Task ToString_ReflectsCurrentPosition() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[1000]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(2, stream)) + { + // Read 300 bytes + byte[] readBuffer = new byte[300]; + await dataSource.ReadAsync(readBuffer, 0, 300, CancellationToken.None); + + // Act + string result = dataSource.ToString(); + + // Assert - Position should be reflected + Assert.IsTrue(result.Contains("Part=2")); + Assert.IsTrue(result.Contains("1000 bytes")); + Assert.IsTrue(result.Contains("Position=300")); + } + } + + #endregion + + #region Integration Tests + + [TestMethod] + public async Task FullReadCycle_FromCreationToDisposal() + { + // Arrange + var stream = new ChunkedBufferStream(); + int totalSize = 100 * 1024; // 100KB + byte[] testData = new byte[totalSize]; + for (int i = 0; i < totalSize; i++) + testData[i] = (byte)(i % 256); + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + var dataSource = new ChunkedPartDataSource(1, stream); + + try + { + // Act - Read all data in chunks + byte[] readBuffer = new byte[totalSize]; + int totalRead = 0; + int chunkSize = 8192; // 8KB chunks + + while (!dataSource.IsComplete) + { + int bytesRead = await dataSource.ReadAsync( + readBuffer, + totalRead, + Math.Min(chunkSize, totalSize - totalRead), + CancellationToken.None); + + if (bytesRead == 0) + break; + + totalRead += bytesRead; + } + + // Assert + Assert.AreEqual(totalSize, totalRead); + Assert.IsTrue(dataSource.IsComplete); + CollectionAssert.AreEqual(testData, readBuffer); + } + finally + { + // Cleanup + dataSource.Dispose(); + } + } + + [TestMethod] + public async Task SequentialReading_HandlesCorrectly() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[5000]; + for (int i = 0; i < testData.Length; i++) + testData[i] = (byte)(i % 256); + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Act - Sequential reads with different sizes + byte[] readBuffer = new byte[5000]; + int offset = 0; + + // Read 1000 bytes + int read1 = await dataSource.ReadAsync(readBuffer, offset, 1000, CancellationToken.None); + offset += read1; + + // Read 2000 bytes + int read2 = await dataSource.ReadAsync(readBuffer, offset, 2000, CancellationToken.None); + offset += read2; + + // Read remaining + int read3 = await dataSource.ReadAsync(readBuffer, offset, 2000, CancellationToken.None); + offset += read3; + + // Assert + Assert.AreEqual(1000, read1); + Assert.AreEqual(2000, read2); + Assert.AreEqual(2000, read3); + Assert.AreEqual(5000, offset); + Assert.IsTrue(dataSource.IsComplete); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + [TestMethod] + public async Task LargePartData_HandlesMultipleChunks() + { + // Arrange + var stream = new ChunkedBufferStream(); + // Write 300KB spanning multiple 80KB chunks + int totalSize = 300 * 1024; + byte[] testData = new byte[totalSize]; + var random = new Random(42); // Fixed seed for reproducibility + random.NextBytes(testData); + + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(1, stream)) + { + // Act - Read in 64KB chunks + byte[] readBuffer = new byte[totalSize]; + int totalRead = 0; + int chunkSize = 64 * 1024; + + while (totalRead < totalSize) + { + int bytesRead = await dataSource.ReadAsync( + readBuffer, + totalRead, + Math.Min(chunkSize, totalSize - totalRead), + CancellationToken.None); + + if (bytesRead == 0) + break; + + totalRead += bytesRead; + } + + // Assert + Assert.AreEqual(totalSize, totalRead); + Assert.IsTrue(dataSource.IsComplete); + CollectionAssert.AreEqual(testData, readBuffer); + } + } + + [TestMethod] + public void PartNumber_RemainsConstant() + { + // Arrange + var stream = new ChunkedBufferStream(); + byte[] testData = new byte[1000]; + stream.Write(testData, 0, testData.Length); + stream.SwitchToReadMode(); + + using (var dataSource = new ChunkedPartDataSource(7, stream)) + { + // Assert - Check multiple times + Assert.AreEqual(7, dataSource.PartNumber); + Assert.AreEqual(7, dataSource.PartNumber); + Assert.AreEqual(7, dataSource.PartNumber); + } + } + + #endregion + } +} diff --git a/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs index 37ab24be0fb3..fa8506878171 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs @@ -80,10 +80,13 @@ public async Task NextExpectedPartNumber_IncrementsAfterPartComplete() try { // Add part 1 - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer); + manager.AddDataSource(dataSource); // Read part 1 completely byte[] readBuffer = new byte[512]; @@ -137,9 +140,12 @@ public async Task WaitForBufferSpaceAsync_WhenMaxPartsReached_Blocks() for (int i = 1; i <= 2; i++) { await manager.WaitForBufferSpaceAsync(CancellationToken.None); - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(i, testBuffer, 512); - manager.AddBuffer(partBuffer); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(i, chunkedStream); + manager.AddDataSource(dataSource); } // Act - Try to wait for space (should block) @@ -172,9 +178,12 @@ public async Task WaitForBufferSpaceAsync_AfterRelease_AllowsAccess() { // Take the one available slot await manager.WaitForBufferSpaceAsync(CancellationToken.None); - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - manager.AddBuffer(partBuffer); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); + manager.AddDataSource(dataSource); // Release space manager.ReleaseBufferSpace(); @@ -234,7 +243,7 @@ public async Task WaitForBufferSpaceAsync_WithCancellation_ThrowsOperationCancel #region AddBuffer Tests [TestMethod] - public async Task AddBuffer_CreatesBufferedDataSource() + public async Task AddBuffer_CreatesChunkedPartDataSource() { // Arrange var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); @@ -242,13 +251,16 @@ public async Task AddBuffer_CreatesBufferedDataSource() try { - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); await manager.WaitForBufferSpaceAsync(CancellationToken.None); // Act - manager.AddBuffer(partBuffer); + manager.AddDataSource(dataSource); // Assert - Should be able to read from part 1 byte[] readBuffer = new byte[512]; @@ -272,7 +284,7 @@ public void AddBuffer_WithNullBuffer_ThrowsArgumentNullException() try { // Act - manager.AddBuffer((IPartDataSource)null); + manager.AddDataSource((IPartDataSource)null); // Assert - ExpectedException } @@ -302,10 +314,13 @@ public async Task AddBuffer_SignalsPartAvailable() await Task.Delay(50); // Add the part - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer); + manager.AddDataSource(dataSource); // Assert - Read should complete int bytesRead = await readTask; @@ -330,9 +345,11 @@ public async Task AddDataSource_AddsToCollection() try { - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var dataSource = new BufferedDataSource(partBuffer); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); // Act await manager.WaitForBufferSpaceAsync(CancellationToken.None); @@ -372,7 +389,7 @@ public void AddDataSource_WithNullDataSource_ThrowsArgumentNullException() [TestMethod] [ExpectedException(typeof(InvalidOperationException))] - public void AddDataSource_WithDuplicatePartNumber_ThrowsInvalidOperationException() + public async Task AddDataSource_WithDuplicatePartNumber_ThrowsInvalidOperationException() { // Arrange var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); @@ -381,15 +398,19 @@ public void AddDataSource_WithDuplicatePartNumber_ThrowsInvalidOperationExceptio try { // Add part 1 - byte[] testBuffer1 = ArrayPool.Shared.Rent(512); - var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 512); - var dataSource1 = new BufferedDataSource(partBuffer1); + byte[] testData1 = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream1 = new ChunkedBufferStream(); + await chunkedStream1.WriteAsync(testData1, 0, 512); + chunkedStream1.SwitchToReadMode(); + var dataSource1 = new ChunkedPartDataSource(1, chunkedStream1); manager.AddDataSource(dataSource1); // Try to add duplicate part 1 - byte[] testBuffer2 = ArrayPool.Shared.Rent(512); - var partBuffer2 = new StreamPartBuffer(1, testBuffer2, 512); - var dataSource2 = new BufferedDataSource(partBuffer2); + byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream2 = new ChunkedBufferStream(); + await chunkedStream2.WriteAsync(testData2, 0, 512); + chunkedStream2.SwitchToReadMode(); + var dataSource2 = new ChunkedPartDataSource(1, chunkedStream2); // Act manager.AddDataSource(dataSource2); @@ -416,12 +437,12 @@ public async Task ReadAsync_ReadsDataSequentially() try { byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); - byte[] testBuffer = ArrayPool.Shared.Rent(512); - Buffer.BlockCopy(testData, 0, testBuffer, 0, 512); - - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer); + manager.AddDataSource(dataSource); // Act byte[] readBuffer = new byte[512]; @@ -447,10 +468,13 @@ public async Task ReadAsync_AdvancesNextExpectedPartNumber() try { // Add part 1 - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer); + manager.AddDataSource(dataSource); // Read part 1 completely byte[] readBuffer = new byte[512]; @@ -581,10 +605,13 @@ public async Task ReadAsync_WaitsForPartAvailability() Assert.IsFalse(readTask.IsCompleted); // Add the part asynchronously - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer); + manager.AddDataSource(dataSource); // Assert - Read should complete int bytesRead = await readTask; @@ -662,19 +689,21 @@ public async Task ReadAsync_ReadingAcrossPartBoundary_FillsBuffer() { // Add Part 1 (100 bytes) byte[] testData1 = MultipartDownloadTestHelpers.GenerateTestData(100, 0); - byte[] testBuffer1 = ArrayPool.Shared.Rent(100); - Buffer.BlockCopy(testData1, 0, testBuffer1, 0, 100); - var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100); + var chunkedStream1 = new ChunkedBufferStream(); + await chunkedStream1.WriteAsync(testData1, 0, 100); + chunkedStream1.SwitchToReadMode(); + var dataSource1 = new ChunkedPartDataSource(1, chunkedStream1); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer1); + manager.AddDataSource(dataSource1); // Add Part 2 (100 bytes) byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 100); - byte[] testBuffer2 = ArrayPool.Shared.Rent(100); - Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100); - var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100); + var chunkedStream2 = new ChunkedBufferStream(); + await chunkedStream2.WriteAsync(testData2, 0, 100); + chunkedStream2.SwitchToReadMode(); + var dataSource2 = new ChunkedPartDataSource(2, chunkedStream2); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer2); + manager.AddDataSource(dataSource2); // Act - Request 150 bytes (spans both parts) byte[] readBuffer = new byte[150]; @@ -711,11 +740,12 @@ public async Task ReadAsync_MultiplePartsInSingleRead_AdvancesCorrectly() for (int i = 1; i <= 3; i++) { byte[] testData = MultipartDownloadTestHelpers.GeneratePartSpecificData(50, i); - byte[] testBuffer = ArrayPool.Shared.Rent(50); - Buffer.BlockCopy(testData, 0, testBuffer, 0, 50); - var partBuffer = new StreamPartBuffer(i, testBuffer, 50); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 50); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(i, chunkedStream); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer); + manager.AddDataSource(dataSource); } // Act - Read 150 bytes (all 3 parts) @@ -742,10 +772,13 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart() try { // Add part 1 - byte[] testBuffer1 = ArrayPool.Shared.Rent(100); - var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100); + byte[] testData1 = MultipartDownloadTestHelpers.GenerateTestData(100, 0); + var chunkedStream1 = new ChunkedBufferStream(); + await chunkedStream1.WriteAsync(testData1, 0, 100); + chunkedStream1.SwitchToReadMode(); + var dataSource1 = new ChunkedPartDataSource(1, chunkedStream1); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer1); + manager.AddDataSource(dataSource1); // Read part 1 completely byte[] readBuffer = new byte[100]; @@ -755,10 +788,13 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart() Assert.AreEqual(2, manager.NextExpectedPartNumber); // Add part 2 - byte[] testBuffer2 = ArrayPool.Shared.Rent(100); - var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100); + byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 0); + var chunkedStream2 = new ChunkedBufferStream(); + await chunkedStream2.WriteAsync(testData2, 0, 100); + chunkedStream2.SwitchToReadMode(); + var dataSource2 = new ChunkedPartDataSource(2, chunkedStream2); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer2); + manager.AddDataSource(dataSource2); // Read part 2 int bytesRead = await manager.ReadAsync(readBuffer, 0, 100, CancellationToken.None); @@ -783,18 +819,21 @@ public async Task ReadAsync_EmptyPart_ContinuesToNextPart() try { // Add empty part 1 - byte[] testBuffer1 = ArrayPool.Shared.Rent(100); - var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 0); // 0 bytes + var chunkedStream1 = new ChunkedBufferStream(); + // Write 0 bytes - empty part + chunkedStream1.SwitchToReadMode(); + var dataSource1 = new ChunkedPartDataSource(1, chunkedStream1); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer1); + manager.AddDataSource(dataSource1); // Add part 2 with data byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 0); - byte[] testBuffer2 = ArrayPool.Shared.Rent(100); - Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100); - var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100); + var chunkedStream2 = new ChunkedBufferStream(); + await chunkedStream2.WriteAsync(testData2, 0, 100); + chunkedStream2.SwitchToReadMode(); + var dataSource2 = new ChunkedPartDataSource(2, chunkedStream2); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer2); + manager.AddDataSource(dataSource2); // Act - Try to read 100 bytes starting from part 1 byte[] readBuffer = new byte[100]; @@ -975,7 +1014,7 @@ public async Task AddBufferAsync_IPartDataSource_WithStreamingDataSource_AddsSuc // Act await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(streamingSource); + manager.AddDataSource(streamingSource); // Assert - Should be able to read from part 1 byte[] readBuffer = new byte[512]; @@ -990,7 +1029,7 @@ public async Task AddBufferAsync_IPartDataSource_WithStreamingDataSource_AddsSuc } [TestMethod] - public async Task AddBufferAsync_IPartDataSource_WithBufferedDataSource_AddsSuccessfully() + public async Task AddBufferAsync_IPartDataSource_WithChunkedPartDataSource_AddsSuccessfully() { // Arrange var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); @@ -998,16 +1037,16 @@ public async Task AddBufferAsync_IPartDataSource_WithBufferedDataSource_AddsSucc try { - // Create a BufferedDataSource + // Create a ChunkedPartDataSource byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); - byte[] testBuffer = ArrayPool.Shared.Rent(512); - Buffer.BlockCopy(testData, 0, testBuffer, 0, 512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - var bufferedSource = new BufferedDataSource(partBuffer); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var chunkedSource = new ChunkedPartDataSource(1, chunkedStream); // Act await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(bufferedSource); + manager.AddDataSource(chunkedSource); // Assert - Should be able to read from part 1 byte[] readBuffer = new byte[512]; @@ -1032,7 +1071,7 @@ public void AddBufferAsync_IPartDataSource_WithNull_ThrowsArgumentNullException( try { // Act - manager.AddBuffer((IPartDataSource)null); + manager.AddDataSource((IPartDataSource)null); // Assert - ExpectedException } @@ -1072,7 +1111,7 @@ public async Task AddBufferAsync_IPartDataSource_SignalsPartAvailable() // Act await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(streamingSource); + manager.AddDataSource(streamingSource); // Assert - Read should complete int bytesRead = await readTask; @@ -1106,7 +1145,7 @@ public async Task ReadAsync_FromStreamingDataSource_ReadsCorrectly() }; var streamingSource = new StreamingDataSource(1, response); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(streamingSource); + manager.AddDataSource(streamingSource); // Act - Read in multiple chunks byte[] readBuffer = new byte[400]; @@ -1146,15 +1185,16 @@ public async Task ReadAsync_FromMixedSources_ReadsSequentially() }; var streamingSource = new StreamingDataSource(1, response1); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer((IPartDataSource)streamingSource); + manager.AddDataSource((IPartDataSource)streamingSource); // Add buffered source for part 2 var testData2 = MultipartDownloadTestHelpers.GenerateTestData(500, 500); - byte[] testBuffer2 = ArrayPool.Shared.Rent(500); - Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 500); - var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 500); + var chunkedStream2 = new ChunkedBufferStream(); + await chunkedStream2.WriteAsync(testData2, 0, 500); + chunkedStream2.SwitchToReadMode(); + var dataSource2 = new ChunkedPartDataSource(2, chunkedStream2); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(partBuffer2); + manager.AddDataSource(dataSource2); // Act - Read across both parts byte[] readBuffer = new byte[750]; @@ -1195,7 +1235,7 @@ public async Task ReadAsync_StreamingDataSource_DisposesAfterCompletion() }; var streamingSource = new StreamingDataSource(1, response); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(streamingSource); + manager.AddDataSource(streamingSource); // Act - Read all data byte[] readBuffer = new byte[512]; @@ -1231,7 +1271,7 @@ public async Task ReadAsync_MultipleStreamingSources_ReadsSequentially() }; var streamingSource = new StreamingDataSource(i, response); await manager.WaitForBufferSpaceAsync(CancellationToken.None); - manager.AddBuffer(streamingSource); + manager.AddDataSource(streamingSource); } // Act - Read across all parts @@ -1253,33 +1293,39 @@ public async Task ReadAsync_MultipleStreamingSources_ReadsSequentially() #region Disposal Tests [TestMethod] - public void Dispose_DisposesAllDataSources() + public async Task Dispose_DisposesAllDataSources() { // Arrange var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var manager = new PartBufferManager(config); - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - manager.AddBuffer(partBuffer); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); + manager.AddDataSource(dataSource); // Act manager.Dispose(); - // Assert - The underlying part buffer should be disposed - Assert.IsNull(partBuffer.ArrayPoolBuffer); + // Assert - The underlying chunked stream should be disposed + Assert.ThrowsException(() => chunkedStream.Position); } [TestMethod] - public void Dispose_ClearsCollection() + public async Task Dispose_ClearsCollection() { // Arrange var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var manager = new PartBufferManager(config); - byte[] testBuffer = ArrayPool.Shared.Rent(512); - var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - manager.AddBuffer(partBuffer); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(512, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 512); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(1, chunkedStream); + manager.AddDataSource(dataSource); // Act manager.Dispose(); @@ -1395,9 +1441,12 @@ public async Task NextExpectedPartNumber_ConcurrentReads_SeeConsistentValue() await manager.WaitForBufferSpaceAsync(CancellationToken.None); // Add part - byte[] testBuffer = ArrayPool.Shared.Rent(100); - var partBuffer = new StreamPartBuffer(partNum, testBuffer, 100); - manager.AddBuffer(partBuffer); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(100, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 100); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(partNum, chunkedStream); + manager.AddDataSource(dataSource); // Read part completely to trigger increment byte[] readBuffer = new byte[100]; @@ -1617,9 +1666,12 @@ public async Task BufferCapacity_ConcurrentOperations_RespectsMaxCountLimit() } // Simulate buffering the part - byte[] testBuffer = ArrayPool.Shared.Rent(100); - var partBuffer = new StreamPartBuffer(capturedPartNum, testBuffer, 100); - manager.AddBuffer(partBuffer); + byte[] testData = MultipartDownloadTestHelpers.GenerateTestData(100, 0); + var chunkedStream = new ChunkedBufferStream(); + await chunkedStream.WriteAsync(testData, 0, 100); + chunkedStream.SwitchToReadMode(); + var dataSource = new ChunkedPartDataSource(capturedPartNum, chunkedStream); + manager.AddDataSource(dataSource); // Simulate some processing time await Task.Delay(10); diff --git a/sdk/test/Services/S3/UnitTests/Custom/StreamPartBufferTests.cs b/sdk/test/Services/S3/UnitTests/Custom/StreamPartBufferTests.cs deleted file mode 100644 index 2ddde6d48238..000000000000 --- a/sdk/test/Services/S3/UnitTests/Custom/StreamPartBufferTests.cs +++ /dev/null @@ -1,396 +0,0 @@ -using Amazon.S3.Transfer.Internal; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using System; -using System.Buffers; - -namespace AWSSDK.UnitTests -{ - /// - /// Unit tests for StreamPartBuffer class. - /// Tests ArrayPool buffer management and position tracking. - /// - [TestClass] - public class StreamPartBufferTests - { - #region Creation Tests - - [TestMethod] - public void Create_WithValidParameters_CreatesBuffer() - { - // Arrange - int partNumber = 1; - int capacity = 1024; - int actualLength = 512; - - // Act - var partBuffer = StreamPartBuffer.Create(partNumber, capacity); - - try - { - // Simulate writing data - partBuffer.SetLength(actualLength); - - // Assert - Assert.AreEqual(partNumber, partBuffer.PartNumber); - Assert.IsNotNull(partBuffer.ArrayPoolBuffer); - Assert.IsTrue(partBuffer.ArrayPoolBuffer.Length >= capacity); // ArrayPool may return larger - Assert.AreEqual(actualLength, partBuffer.Length); - Assert.AreEqual(0, partBuffer.CurrentPosition); - Assert.AreEqual(actualLength, partBuffer.RemainingBytes); - } - finally - { - partBuffer.Dispose(); - } - } - - [TestMethod] - public void Create_InitializesWithZeroLength() - { - // Arrange - int partNumber = 2; - int capacity = 2048; - - // Act - var partBuffer = StreamPartBuffer.Create(partNumber, capacity); - - try - { - // Assert - Length should be 0 until SetLength is called - Assert.AreEqual(partNumber, partBuffer.PartNumber); - Assert.IsNotNull(partBuffer.ArrayPoolBuffer); - Assert.AreEqual(0, partBuffer.Length); - Assert.AreEqual(0, partBuffer.CurrentPosition); - Assert.AreEqual(0, partBuffer.RemainingBytes); - } - finally - { - partBuffer.Dispose(); - } - } - - #endregion - - #region Property Tests - - [TestMethod] - public void RemainingBytes_ReturnsCorrectValue() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - partBuffer.SetLength(500); - - try - { - // Act & Assert - At start - Assert.AreEqual(500, partBuffer.RemainingBytes); - - // Act & Assert - After reading some bytes - partBuffer.CurrentPosition = 100; - Assert.AreEqual(400, partBuffer.RemainingBytes); - - // Act & Assert - At end - partBuffer.CurrentPosition = 500; - Assert.AreEqual(0, partBuffer.RemainingBytes); - } - finally - { - partBuffer.Dispose(); - } - } - - [TestMethod] - public void Length_ReturnsCorrectValue() - { - // Arrange - int actualLength = 1000; - var partBuffer = StreamPartBuffer.Create(1, 2048); - partBuffer.SetLength(actualLength); - - try - { - // Act & Assert - Assert.AreEqual(actualLength, partBuffer.Length); - } - finally - { - partBuffer.Dispose(); - } - } - - [TestMethod] - public void CurrentPosition_CanBeUpdated() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - partBuffer.SetLength(500); - - try - { - // Act - partBuffer.CurrentPosition = 250; - - // Assert - Assert.AreEqual(250, partBuffer.CurrentPosition); - } - finally - { - partBuffer.Dispose(); - } - } - - #endregion - - #region Reading Position Tests - - [TestMethod] - public void CurrentPosition_AfterReading_UpdatesCorrectly() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - partBuffer.SetLength(500); - - try - { - // Simulate reading 100 bytes - partBuffer.CurrentPosition += 100; - Assert.AreEqual(100, partBuffer.CurrentPosition); - Assert.AreEqual(400, partBuffer.RemainingBytes); - - // Simulate reading another 150 bytes - partBuffer.CurrentPosition += 150; - Assert.AreEqual(250, partBuffer.CurrentPosition); - Assert.AreEqual(250, partBuffer.RemainingBytes); - } - finally - { - partBuffer.Dispose(); - } - } - - [TestMethod] - public void RemainingBytes_WhenFullyRead_ReturnsZero() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - partBuffer.SetLength(500); - - try - { - // Act - Read all bytes - partBuffer.CurrentPosition = 500; - - // Assert - Assert.AreEqual(0, partBuffer.RemainingBytes); - } - finally - { - partBuffer.Dispose(); - } - } - - #endregion - - #region SetLength Tests - - [TestMethod] - public void SetLength_WithValidLength_SetsCorrectly() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - - // Act - partBuffer.SetLength(500); - - try - { - // Assert - Assert.AreEqual(500, partBuffer.Length); - } - finally - { - partBuffer.Dispose(); - } - } - - [TestMethod] - [ExpectedException(typeof(InvalidOperationException))] - public void SetLength_CalledTwice_ThrowsException() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - partBuffer.SetLength(500); - - try - { - // Act - Try to set length again - partBuffer.SetLength(600); - } - finally - { - partBuffer.Dispose(); - } - } - - [TestMethod] - [ExpectedException(typeof(ArgumentOutOfRangeException))] - public void SetLength_WithNegativeLength_ThrowsException() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - - try - { - // Act - partBuffer.SetLength(-1); - } - finally - { - partBuffer.Dispose(); - } - } - - [TestMethod] - [ExpectedException(typeof(ArgumentOutOfRangeException))] - public void SetLength_ExceedsBufferCapacity_ThrowsException() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - - try - { - // Act - Try to set length larger than buffer capacity - partBuffer.SetLength(10000); - } - finally - { - partBuffer.Dispose(); - } - } - - #endregion - - #region Disposal Tests - - [TestMethod] - public void Dispose_ReturnsBufferToArrayPool() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - partBuffer.SetLength(500); - - // Act - partBuffer.Dispose(); - - // Assert - Buffer should be returned (verified by checking it's nulled) - Assert.IsNull(partBuffer.ArrayPoolBuffer); - } - - [TestMethod] - public void Dispose_MultipleCalls_IsIdempotent() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - partBuffer.SetLength(500); - - // Act - Dispose multiple times - partBuffer.Dispose(); - partBuffer.Dispose(); - partBuffer.Dispose(); - - // Assert - Should not throw - Assert.IsNull(partBuffer.ArrayPoolBuffer); - } - - [TestMethod] - public void Dispose_SetsArrayPoolBufferToNull() - { - // Arrange - var partBuffer = StreamPartBuffer.Create(1, 1024); - partBuffer.SetLength(500); - - // Act - partBuffer.Dispose(); - - // Assert - Assert.IsNull(partBuffer.ArrayPoolBuffer); - } - - #endregion - - #region Edge Cases - - [TestMethod] - public void Constructor_WithEmptyBuffer_HandlesCorrectly() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 0); - - try - { - // Assert - Assert.AreEqual(0, partBuffer.Length); - Assert.AreEqual(0, partBuffer.RemainingBytes); - Assert.AreEqual(0, partBuffer.CurrentPosition); - } - finally - { - partBuffer.Dispose(); - } - } - - [TestMethod] - public void RemainingBytes_WhenPositionBeyondLength_ReturnsZero() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(1, testBuffer, 500); - - try - { - // Act - Position beyond actual length - partBuffer.CurrentPosition = 600; - - // Assert - RemainingBytes uses Math.Max(0, ...) to prevent negative - Assert.AreEqual(0, partBuffer.RemainingBytes); - } - finally - { - partBuffer.Dispose(); - } - } - - #endregion - - #region ToString Tests - - [TestMethod] - public void ToString_ReturnsExpectedFormat() - { - // Arrange - byte[] testBuffer = ArrayPool.Shared.Rent(1024); - var partBuffer = new StreamPartBuffer(3, testBuffer, 500); - - try - { - partBuffer.CurrentPosition = 100; - - // Act - string result = partBuffer.ToString(); - - // Assert - Verify format contains key information - Assert.IsTrue(result.Contains("Part=3")); - Assert.IsTrue(result.Contains("500 bytes")); - Assert.IsTrue(result.Contains("pos=100")); - Assert.IsTrue(result.Contains("remaining=400")); - } - finally - { - partBuffer.Dispose(); - } - } - - #endregion - } -}