fix(internal): cancel source stream when AsyncIterableStream stops early#1365
fix(internal): cancel source stream when AsyncIterableStream stops early#1365VihaanAgarwal wants to merge 1 commit into
Conversation
createAsyncIterableStream returned an async iterator with no `return` method, so breaking out of a `for await...of` loop early kept the reader lock and never cancelled the underlying stream. Add `return` to cancel the reader, matching the built-in ReadableStream async iterator, so the upstream is torn down on early termination instead of leaking.
🦋 Changeset detectedLatest commit: e38bb4e The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📝 WalkthroughWalkthroughAdds an ChangesAsyncIterableStream early-exit cancellation
Estimated code review effort🎯 1 (Trivial) | ⏱️ ~3 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Tools execution failed with the following error: Failed to run tools: Ping-pong health check failed Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/internal/src/utils/async-iterable-stream.ts`:
- Around line 45-47: The async iterator’s return path in
async-iterable-stream.ts cancels the stream but leaves the
ReadableStreamDefaultReader lock held, which can block later getReader() calls.
Update the return() implementation on the iterator to ensure
reader.releaseLock() is always called after reader.cancel(), ideally by wrapping
the cancel logic in a try/finally. Use the existing return() method and reader
symbol in this stream wrapper to locate the fix.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 77e01f0e-d7e5-4d64-98f7-ad48ddec8c11
📒 Files selected for processing (3)
.changeset/async-iterable-stream-cancel.mdpackages/internal/src/utils/async-iterable-stream.spec.tspackages/internal/src/utils/async-iterable-stream.ts
| async return(): Promise<IteratorResult<T>> { | ||
| await reader.cancel(); | ||
| return { done: true, value: undefined }; |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🌐 Web query:
Does ReadableStreamDefaultReader.cancel() release the reader lock automatically, or must releaseLock() still be called to allow subsequent getReader() calls?
💡 Result:
No, ReadableStreamDefaultReader.cancel does not automatically release the reader lock [1][2]. The cancel method signals that you are no longer interested in the stream and causes the stream to be cancelled, but it leaves the reader active and the stream locked [1][2]. To allow subsequent getReader calls on the same stream, you must explicitly call releaseLock after calling cancel [1][3]. If you no longer need the reader after cancelling, the standard approach is to call both methods [1]: reader.cancel; reader.releaseLock; Calling releaseLock is the necessary step to make the stream available for a new reader [3]. If you attempt to acquire a new reader while the original reader still holds the lock, the stream will remain locked and the new getReader call will throw a TypeError [4][3].
Citations:
- 1: The difference between ReadableStream.cancel() and ReadableStreamDefaultReader.cancel()? whatwg/streams#1033
- 2: reader.cancel([ reason ]) "behaves the same as stream.cancel(reason)" is confusing whatwg/streams#1287
- 3: https://streams.spec.whatwg.org/
- 4: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader/releaseLock
Release the reader lock after canceling.
reader.cancel() does not release the lock, so early exits can still leave the ReadableStreamDefaultReader attached and block later getReader() calls. Wrap this in finally and call reader.releaseLock().
Suggested fix
async return(): Promise<IteratorResult<T>> {
- await reader.cancel();
- return { done: true, value: undefined };
+ try {
+ await reader.cancel();
+ } finally {
+ reader.releaseLock();
+ }
+ return { done: true, value: undefined };
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async return(): Promise<IteratorResult<T>> { | |
| await reader.cancel(); | |
| return { done: true, value: undefined }; | |
| async return(): Promise<IteratorResult<T>> { | |
| try { | |
| await reader.cancel(); | |
| } finally { | |
| reader.releaseLock(); | |
| } | |
| return { done: true, value: undefined }; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/internal/src/utils/async-iterable-stream.ts` around lines 45 - 47,
The async iterator’s return path in async-iterable-stream.ts cancels the stream
but leaves the ReadableStreamDefaultReader lock held, which can block later
getReader() calls. Update the return() implementation on the iterator to ensure
reader.releaseLock() is always called after reader.cancel(), ideally by wrapping
the cancel logic in a try/finally. Use the existing return() method and reader
symbol in this stream wrapper to locate the fix.
While consuming an
AsyncIterableStreamfrom@voltagent/internalwith afor await...ofloop that breaks early, I noticed the upstream stream was never torn down.What was wrong
createAsyncIterableStreamoverrides[Symbol.asyncIterator]with an iterator that only implementsnext:When a
for await...ofloop is exited early (break,throw, orreturn), the runtime calls the iterator’sreturn()for cleanup. The built-inReadableStreamasync iterator implements this and cancels the stream. This custom iterator has noreturn(), so on early exit the reader lock is kept and the source is never cancelled. For agent/LLM streams that means the upstream request is not aborted when a consumer stops reading.Fix
Implement
return()to cancel the reader, matching the built-in behavior:Test
Added a regression test: a source
ReadableStreamwith acancelspy, iterated withfor await...ofand broken after the first chunk. It fails onmain(source never cancelled) and passes with this change. Full@voltagent/internalsuite stays green (129 tests).Summary by cubic
Ensure
AsyncIterableStreamin@voltagent/internalcancels the underlyingReadableStreamwhen afor await...ofloop stops early. This prevents reader-lock leaks and tears down upstream requests.return()to callreader.cancel(), matching built-inReadableStreambehavior.cancelis called.Written for commit e38bb4e. Summary will update on new commits.
Summary by CodeRabbit
for await...ofloop now cleanly closes the stream and releases resources.