diff --git a/packages/cli-kit/src/private/node/ui/components/ConcurrentOutput.test.tsx b/packages/cli-kit/src/private/node/ui/components/ConcurrentOutput.test.tsx index a0b2d7757ae..e68b2522ee7 100644 --- a/packages/cli-kit/src/private/node/ui/components/ConcurrentOutput.test.tsx +++ b/packages/cli-kit/src/private/node/ui/components/ConcurrentOutput.test.tsx @@ -262,6 +262,41 @@ describe('ConcurrentOutput', () => { expect(logColumns[1]?.length).toBe(25 + 2) }) + test('renders large chunks split into batches without dropping lines', async () => { + // Given - simulate a large stack trace (>100 lines) arriving as a single write + const processSync = new Synchronizer() + const lineCount = 250 + const largeOutput = Array.from({length: lineCount}, (_, i) => `line ${i + 1}`).join('\n') + + const processes = [ + { + prefix: 'pos-ext', + action: async (stdout: Writable, _stderr: Writable, _signal: AbortSignal) => { + stdout.write(largeOutput) + processSync.resolve() + }, + }, + ] + + // When - keepRunningAfterProcessesResolve prevents the component from unmounting + // before all setImmediate-batched state updates have been applied + const renderInstance = render( + , + ) + await processSync.promise + await waitForContent(renderInstance, `line ${lineCount}`) + + // Then - all lines should be rendered + const frame = unstyled(renderInstance.lastFrame()!) + for (let i = 1; i <= lineCount; i++) { + expect(frame).toContain(`line ${i}`) + } + }) + test('rejects with the error thrown inside one of the processes', async () => { // Given const backendProcess = { diff --git a/packages/cli-kit/src/private/node/ui/components/ConcurrentOutput.tsx b/packages/cli-kit/src/private/node/ui/components/ConcurrentOutput.tsx index c868899c4ea..53f0ec40d2e 100644 --- a/packages/cli-kit/src/private/node/ui/components/ConcurrentOutput.tsx +++ b/packages/cli-kit/src/private/node/ui/components/ConcurrentOutput.tsx @@ -5,9 +5,11 @@ import {Box, Static, Text, TextProps, useApp} from 'ink' import figures from 'figures' import stripAnsi from 'strip-ansi' -import {Writable} from 'stream' +import {Transform, Writable} from 'stream' import {AsyncLocalStorage} from 'node:async_hooks' +const MAX_LINES_PER_BATCH = 20 + export interface ConcurrentOutputProps { processes: OutputProcess[] prefixColumnSize?: number @@ -132,27 +134,57 @@ const ConcurrentOutput: FunctionComponent = ({ const writableStream = useCallback( (process: OutputProcess, prefixes: string[]) => { - return new Writable({ - write(chunk, _encoding, next) { + // Transform: splits incoming chunks into MAX_LINES_PER_BATCH-line pieces. + // Runs synchronously inside the writer's async context, so outputContextStore + // (prefix, stripAnsi overrides set by useConcurrentOutputContext) is available here. + const splitter = new Transform({ + readableObjectMode: true, + transform(chunk, _encoding, callback) { const context = outputContextStore.getStore() const prefix = context?.outputPrefix ?? process.prefix const shouldStripAnsi = context?.stripAnsi ?? true const log = chunk.toString('utf8').replace(/(\n)$/, '') + const allLines = shouldStripAnsi ? stripAnsi(log).split(/\n/) : log.split(/\n/) + // Flag batches that came from a large chunk so the sink knows to yield + // between them. Single-batch writes keep synchronous next() to preserve + // existing behaviour for normal (small) output. + const isLargeChunk = allLines.length > MAX_LINES_PER_BATCH + for (let i = 0; i < allLines.length; i += MAX_LINES_PER_BATCH) { + this.push({prefix, lines: allLines.slice(i, i + MAX_LINES_PER_BATCH), isLargeChunk}) + } + callback() + }, + }) + // Writable: renders each batch into React state. + // For large-chunk batches, setImmediate(next) yields the event loop so keyboard + // shortcuts (q, p) can fire between renders, and creates real Node.js backpressure: + // when next() is pending the pipe pauses the splitter, preventing unbounded + // memory growth from fast producers. + // For normal output (single-batch writes) next() is called synchronously to + // preserve the existing rendering behaviour. + const sink = new Writable({ + objectMode: true, + write( + {prefix, lines, isLargeChunk}: {prefix: string; lines: string[]; isLargeChunk: boolean}, + _encoding, + next, + ) { const index = addPrefix(prefix, prefixes) - - const lines = shouldStripAnsi ? stripAnsi(log).split(/\n/) : log.split(/\n/) setProcessOutput((previousProcessOutput) => [ ...previousProcessOutput, - { - color: lineColor(index), - prefix, - lines, - }, + {color: lineColor(index), prefix, lines}, ]) - next() + if (isLargeChunk) { + setImmediate(next) + } else { + next() + } }, }) + + splitter.pipe(sink) + return splitter }, [lineColor], )