Skip to content

fix(internal): cancel source stream when AsyncIterableStream stops early#1365

Open
VihaanAgarwal wants to merge 1 commit into
VoltAgent:mainfrom
VihaanAgarwal:fix/async-iterable-stream-early-cancel
Open

fix(internal): cancel source stream when AsyncIterableStream stops early#1365
VihaanAgarwal wants to merge 1 commit into
VoltAgent:mainfrom
VihaanAgarwal:fix/async-iterable-stream-early-cancel

Conversation

@VihaanAgarwal

@VihaanAgarwal VihaanAgarwal commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

While consuming an AsyncIterableStream from @voltagent/internal with a for await...of loop that breaks early, I noticed the upstream stream was never torn down.

What was wrong

createAsyncIterableStream overrides [Symbol.asyncIterator] with an iterator that only implements next:

(stream as AsyncIterableStream<T>)[Symbol.asyncIterator] = () => {
  const reader = stream.getReader();
  return {
    async next(): Promise<IteratorResult<T>> {
      const { done, value } = await reader.read();
      return done ? { done: true, value: undefined } : { done: false, value };
    },
  };
};

When a for await...of loop is exited early (break, throw, or return), the runtime calls the iterator’s return() for cleanup. The built-in ReadableStream async iterator implements this and cancels the stream. This custom iterator has no return(), so on early exit the reader lock is kept and the source is never cancelled. For agent/LLM streams that means the upstream request is not aborted when a consumer stops reading.

Fix

Implement return() to cancel the reader, matching the built-in behavior:

async return(): Promise<IteratorResult<T>> {
  await reader.cancel();
  return { done: true, value: undefined };
},

Test

Added a regression test: a source ReadableStream with a cancel spy, iterated with for await...of and broken after the first chunk. It fails on main (source never cancelled) and passes with this change. Full @voltagent/internal suite stays green (129 tests).


Summary by cubic

Ensure AsyncIterableStream in @voltagent/internal cancels the underlying ReadableStream when a for await...of loop stops early. This prevents reader-lock leaks and tears down upstream requests.

  • Bug Fixes
    • Implemented iterator return() to call reader.cancel(), matching built-in ReadableStream behavior.
    • Added a regression test that breaks after the first chunk and asserts the source cancel is called.

Written for commit e38bb4e. Summary will update on new commits.

Review in cubic

Summary by CodeRabbit

  • Bug Fixes
    • Stopped streamed results from staying open when iteration ends early.
    • Breaking out of a for await...of loop now cleanly closes the stream and releases resources.
    • Added test coverage for early stream cancellation behavior.

createAsyncIterableStream returned an async iterator with no `return`
method, so breaking out of a `for await...of` loop early kept the reader
lock and never cancelled the underlying stream. Add `return` to cancel the
reader, matching the built-in ReadableStream async iterator, so the upstream
is torn down on early termination instead of leaking.
@changeset-bot

changeset-bot Bot commented Jun 30, 2026

Copy link
Copy Markdown

🦋 Changeset detected

Latest commit: e38bb4e

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package
Name Type
@voltagent/internal Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

Adds an async return() method to the custom async iterator in createAsyncIterableStream that calls reader.cancel() when a consumer exits a for await...of loop early, preventing reader lock leaks. Includes a test and a patch changeset.

Changes

AsyncIterableStream early-exit cancellation

Layer / File(s) Summary
Iterator return() method, test, and changeset
packages/internal/src/utils/async-iterable-stream.ts, packages/internal/src/utils/async-iterable-stream.spec.ts, .changeset/async-iterable-stream-cancel.md
Adds async return() to the iterator that cancels the ReadableStreamDefaultReader on early exit; test enqueues multiple chunks, breaks after the first, and asserts the cancel flag is set; changeset records the patch.

Estimated code review effort

🎯 1 (Trivial) | ⏱️ ~3 minutes

Poem

A rabbit loops through streams with care,
But sometimes needs to stop mid-air.
Now return() calls cancel() clean,
No lingering locks left unforeseen.
🐇✨ Early exit, gracefully done!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main fix: cancelling the source stream when AsyncIterableStream ends early.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

Tools execution failed with the following error:

Failed to run tools: Ping-pong health check failed


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 3 files

Re-trigger cubic

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/internal/src/utils/async-iterable-stream.ts`:
- Around line 45-47: The async iterator’s return path in
async-iterable-stream.ts cancels the stream but leaves the
ReadableStreamDefaultReader lock held, which can block later getReader() calls.
Update the return() implementation on the iterator to ensure
reader.releaseLock() is always called after reader.cancel(), ideally by wrapping
the cancel logic in a try/finally. Use the existing return() method and reader
symbol in this stream wrapper to locate the fix.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 77e01f0e-d7e5-4d64-98f7-ad48ddec8c11

📥 Commits

Reviewing files that changed from the base of the PR and between af4f6f2 and e38bb4e.

📒 Files selected for processing (3)
  • .changeset/async-iterable-stream-cancel.md
  • packages/internal/src/utils/async-iterable-stream.spec.ts
  • packages/internal/src/utils/async-iterable-stream.ts

Comment on lines +45 to +47
async return(): Promise<IteratorResult<T>> {
await reader.cancel();
return { done: true, value: undefined };

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

Does ReadableStreamDefaultReader.cancel() release the reader lock automatically, or must releaseLock() still be called to allow subsequent getReader() calls?

💡 Result:

No, ReadableStreamDefaultReader.cancel does not automatically release the reader lock [1][2]. The cancel method signals that you are no longer interested in the stream and causes the stream to be cancelled, but it leaves the reader active and the stream locked [1][2]. To allow subsequent getReader calls on the same stream, you must explicitly call releaseLock after calling cancel [1][3]. If you no longer need the reader after cancelling, the standard approach is to call both methods [1]: reader.cancel; reader.releaseLock; Calling releaseLock is the necessary step to make the stream available for a new reader [3]. If you attempt to acquire a new reader while the original reader still holds the lock, the stream will remain locked and the new getReader call will throw a TypeError [4][3].

Citations:


Release the reader lock after canceling.

reader.cancel() does not release the lock, so early exits can still leave the ReadableStreamDefaultReader attached and block later getReader() calls. Wrap this in finally and call reader.releaseLock().

Suggested fix
      async return(): Promise<IteratorResult<T>> {
-        await reader.cancel();
-        return { done: true, value: undefined };
+        try {
+          await reader.cancel();
+        } finally {
+          reader.releaseLock();
+        }
+        return { done: true, value: undefined };
      },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async return(): Promise<IteratorResult<T>> {
await reader.cancel();
return { done: true, value: undefined };
async return(): Promise<IteratorResult<T>> {
try {
await reader.cancel();
} finally {
reader.releaseLock();
}
return { done: true, value: undefined };
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/internal/src/utils/async-iterable-stream.ts` around lines 45 - 47,
The async iterator’s return path in async-iterable-stream.ts cancels the stream
but leaves the ReadableStreamDefaultReader lock held, which can block later
getReader() calls. Update the return() implementation on the iterator to ensure
reader.releaseLock() is always called after reader.cancel(), ideally by wrapping
the cancel logic in a try/finally. Use the existing return() method and reader
symbol in this stream wrapper to locate the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant