From c649119d2f43f47cb8fe52ea830b8c5f33c4f0c3 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 15 Apr 2026 19:41:58 -0700 Subject: [PATCH] [world-local] Backport: enable cross-process streaming via filesystem polling Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Peter Wielander --- .changeset/backport-streaming-polling.md | 5 + packages/core/e2e/e2e.test.ts | 212 ++++++++++------------ packages/world-local/src/streamer.test.ts | 86 +++++++++ packages/world-local/src/streamer.ts | 101 ++++++++++- 4 files changed, 287 insertions(+), 117 deletions(-) create mode 100644 .changeset/backport-streaming-polling.md diff --git a/.changeset/backport-streaming-polling.md b/.changeset/backport-streaming-polling.md new file mode 100644 index 0000000000..74664fa337 --- /dev/null +++ b/.changeset/backport-streaming-polling.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-local": patch +--- + +Add filesystem polling for cross-process stream reads diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index f1091dc050..2f4306a986 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -272,43 +272,36 @@ describe('e2e', () => { } ); - // ReadableStream return values use the world's streaming infrastructure which - // requires in-process access. The local world's streamer uses an in-process EventEmitter - // that doesn't work cross-process (test runner ↔ workbench app). - test.skipIf(isLocalDeployment())( - 'readableStreamWorkflow', - { timeout: 120_000 }, - async () => { - const run = await start(await e2e('readableStreamWorkflow'), []); - const returnValue = await run.returnValue; - expect(returnValue).toBeInstanceOf(ReadableStream); - - const expected = '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'; - const decoder = new TextDecoder(); - let contents = ''; - // Read chunks until we have all expected content or hit a timeout. - // On Vercel, the stream close event can be delayed even after all - // chunks are delivered, so we stop once we have the expected data - // rather than waiting for the stream to end. - const reader = returnValue.getReader(); - const readDeadline = Date.now() + 60_000; - try { - while (Date.now() < readDeadline) { - const { done, value } = await Promise.race([ - reader.read(), - sleep(30_000).then(() => ({ done: true, value: undefined })), - ]); - if (value) { - contents += decoder.decode(value, { stream: true }); - } - if (done || contents.length >= expected.length) break; + test('readableStreamWorkflow', { timeout: 120_000 }, async () => { + const run = await start(await e2e('readableStreamWorkflow'), []); + const returnValue = await run.returnValue; + expect(returnValue).toBeInstanceOf(ReadableStream); + + const expected = '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'; + const decoder = new TextDecoder(); + let contents = ''; + // Read chunks until we have all expected content or hit a timeout. + // On Vercel, the stream close event can be delayed even after all + // chunks are delivered, so we stop once we have the expected data + // rather than waiting for the stream to end. + const reader = returnValue.getReader(); + const readDeadline = Date.now() + 60_000; + try { + while (Date.now() < readDeadline) { + const { done, value } = await Promise.race([ + reader.read(), + sleep(30_000).then(() => ({ done: true, value: undefined })), + ]); + if (value) { + contents += decoder.decode(value, { stream: true }); } - } finally { - reader.releaseLock(); + if (done || contents.length >= expected.length) break; } - expect(contents).toBe(expected); + } finally { + reader.releaseLock(); } - ); + expect(contents).toBe(expected); + }); test('hookWorkflow', { timeout: 60_000 }, async () => { const token = Math.random().toString(36).slice(2); @@ -623,7 +616,7 @@ describe('e2e', () => { // and 2 chunks to the "test" named stream: // chunk 0: binary "Hello, named stream!" // chunk 1: object { foo: 'bar' } - describe.skipIf(isLocalDeployment())('outputStreamWorkflow', () => { + describe('outputStreamWorkflow', () => { const startIndexCases = [ { name: 'no startIndex (reads all chunks)', @@ -707,88 +700,85 @@ describe('e2e', () => { } }); - describe.skipIf(isLocalDeployment())( - 'outputStreamWorkflow - getTailIndex and getStreamChunks', - () => { - test( - 'getTailIndex returns correct index after stream completes', - { - timeout: 60_000, - }, - async () => { - const run = await start(await e2e('outputStreamWorkflow'), []); - await run.returnValue; + describe('outputStreamWorkflow - getTailIndex and getStreamChunks', () => { + test( + 'getTailIndex returns correct index after stream completes', + { + timeout: 60_000, + }, + async () => { + const run = await start(await e2e('outputStreamWorkflow'), []); + await run.returnValue; - const readable = run.getReadable(); - const tailIndex = await readable.getTailIndex(); + const readable = run.getReadable(); + const tailIndex = await readable.getTailIndex(); - // outputStreamWorkflow writes 2 chunks to the default stream - expect(tailIndex).toBe(1); - } - ); + // outputStreamWorkflow writes 2 chunks to the default stream + expect(tailIndex).toBe(1); + } + ); - test( - 'getTailIndex returns -1 before any chunks are written', - { - timeout: 60_000, - }, - async () => { - const run = await start(await e2e('outputStreamWorkflow'), []); - - // Don't await returnValue — check immediately while stream is - // still being written (or hasn't started yet). The world should - // report tailIndex = -1 for streams with no data. - const readable = run.getReadable({ namespace: 'nonexistent' }); - const tailIndex = await readable.getTailIndex(); - expect(tailIndex).toBe(-1); - } - ); + test( + 'getTailIndex returns -1 before any chunks are written', + { + timeout: 60_000, + }, + async () => { + const run = await start(await e2e('outputStreamWorkflow'), []); - test( - 'getStreamChunks returns same content as reading the stream', - { - timeout: 60_000, - }, - async () => { - const run = await start(await e2e('outputStreamWorkflow'), []); - await run.returnValue; + // Don't await returnValue — check immediately while stream is + // still being written (or hasn't started yet). The world should + // report tailIndex = -1 for streams with no data. + const readable = run.getReadable({ namespace: 'nonexistent' }); + const tailIndex = await readable.getTailIndex(); + expect(tailIndex).toBe(-1); + } + ); - // Read all chunks via the stream - const reader = run.getReadable().getReader(); - const streamChunks: unknown[] = []; - while (true) { - const { done, value } = await reader.read(); - if (done) break; - streamChunks.push(value); - } + test( + 'getStreamChunks returns same content as reading the stream', + { + timeout: 60_000, + }, + async () => { + const run = await start(await e2e('outputStreamWorkflow'), []); + await run.returnValue; + + // Read all chunks via the stream + const reader = run.getReadable().getReader(); + const streamChunks: unknown[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + streamChunks.push(value); + } - // Read all chunks via getStreamChunks pagination - const world = getWorld(); - const streamName = `${run.runId.replace('wrun_', 'strm_')}_user`; - const paginatedChunks: Uint8Array[] = []; - let cursor: string | null = null; - do { - const page = await world.getStreamChunks(streamName, run.runId, { - limit: 1, // small page size to exercise pagination - ...(cursor ? { cursor } : {}), - }); - for (const chunk of page.data) { - paginatedChunks.push(chunk.data); - } - cursor = page.cursor; - if (!page.hasMore) { - expect(page.done).toBe(true); - } - } while (cursor); + // Read all chunks via getStreamChunks pagination + const world = getWorld(); + const streamName = `${run.runId.replace('wrun_', 'strm_')}_user`; + const paginatedChunks: Uint8Array[] = []; + let cursor: string | null = null; + do { + const page = await world.getStreamChunks(streamName, run.runId, { + limit: 1, // small page size to exercise pagination + ...(cursor ? { cursor } : {}), + }); + for (const chunk of page.data) { + paginatedChunks.push(chunk.data); + } + cursor = page.cursor; + if (!page.hasMore) { + expect(page.done).toBe(true); + } + } while (cursor); - // Both methods should return the same number of chunks - expect(paginatedChunks).toHaveLength(streamChunks.length); - } - ); - } - ); + // Both methods should return the same number of chunks + expect(paginatedChunks).toHaveLength(streamChunks.length); + } + ); + }); - test.skipIf(isLocalDeployment())( + test( 'outputStreamInsideStepWorkflow - getWritable() called inside step functions', { timeout: 60_000 }, async () => { @@ -2184,10 +2174,6 @@ describe('e2e', () => { // ============================================================ // Resilient start: run completes even when run_created fails // ============================================================ - // TODO: Switch this to a stream-based workflow (e.g. readableStreamWorkflow) - // to also verify that serialization, flushing, and binary data work correctly - // over the queue boundary. Currently using addTenWorkflow to avoid the - // skipIf(isLocalDeployment()) barrier that stream tests require. test( 'resilient start: addTenWorkflow completes when run_created returns 500', { timeout: 60_000 }, diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index d8522ef2d8..67e8b1b819 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -530,6 +530,92 @@ describe('streamer', () => { }); }); + describe('cross-process polling', () => { + it('should deliver chunks via filesystem polling when EventEmitter is bypassed', async () => { + // Simulate cross-process streaming: write chunk files directly to + // disk (bypassing streamer.writeToStream and thus the EventEmitter) + // and verify the polling-based reader picks them up. + const testDir = await fs.mkdtemp( + path.join(os.tmpdir(), 'streamer-poll-test-') + ); + onTestFinished(async (ctx) => { + if (!ctx.task.result?.errors?.length) { + await fs.rm(testDir, { recursive: true, force: true }); + } + }); + + const streamer = createStreamer(testDir); + const streamName = 'poll-test'; + const chunksDir = path.join(testDir, 'streams', 'chunks'); + await fs.mkdir(chunksDir, { recursive: true }); + + // Start reading — sets up EventEmitter listeners + polling interval + const stream = await streamer.readFromStream(streamName); + const reader = stream.getReader(); + const chunks: string[] = []; + + const readPromise = (async () => { + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + if (result.value) { + chunks.push(Buffer.from(result.value).toString()); + } + } + })(); + + // Let polling start + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Write chunk files directly — no EventEmitter involved + const chunk1 = serializeChunk({ + eof: false, + chunk: Buffer.from('hello'), + }); + await fs.writeFile( + path.join( + chunksDir, + `${streamName}-chnk_01ARZ3NDEKTSV4RRFFQ69G5FAV.bin` + ), + chunk1 + ); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const chunk2 = serializeChunk({ + eof: false, + chunk: Buffer.from(' world'), + }); + await fs.writeFile( + path.join( + chunksDir, + `${streamName}-chnk_01ARZ3NDEKTSV4RRFFQ69G5FAW.bin` + ), + chunk2 + ); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Write EOF chunk to close the stream + const eofChunk = serializeChunk({ + eof: true, + chunk: Buffer.from([]), + }); + await fs.writeFile( + path.join( + chunksDir, + `${streamName}-chnk_01ARZ3NDEKTSV4RRFFQ69G5FAX.bin` + ), + eofChunk + ); + + await readPromise; + + expect(chunks.join('')).toBe('hello world'); + }, 10000); + }); + describe('integration scenarios', () => { it('should handle complete write-close-read cycle', async () => { const { streamer } = await setupStreamer(); diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index 9a55fabb04..bdc3d7ad38 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -403,6 +403,7 @@ export function createStreamer(basedir: string, tag?: string): Streamer { async readFromStream(name: string, startIndex = 0) { const chunksDir = path.join(basedir, 'streams', 'chunks'); let removeListeners = () => {}; + let pollInterval: ReturnType | null = null; return new ReadableStream({ async start(controller) { @@ -416,28 +417,33 @@ export function createStreamer(basedir: string, tag?: string): Streamer { let isReadingFromDisk = true; // Buffer close event if it arrives during disk reading let pendingClose = false; + // Set when the controller is closed; guards against enqueue-after-close + // in the polling callback when closeListener fires mid-iteration. + let streamClosed = false; const chunkListener = (event: { streamName: string; chunkData: Uint8Array; chunkId: string; }) => { - deliveredChunkIds.add(event.chunkId); - // Skip empty chunks to maintain consistency with disk reading behavior - // Empty chunks are not enqueued when read from disk (see line 184-186) if (event.chunkData.byteLength === 0) { + deliveredChunkIds.add(event.chunkId); return; } if (isReadingFromDisk) { + deliveredChunkIds.add(event.chunkId); // Buffer chunks that arrive during disk reading to maintain order // Create a copy to prevent ArrayBuffer detachment when enqueued later bufferedEventChunks.push({ chunkId: event.chunkId, chunkData: Uint8Array.from(event.chunkData), }); - } else { + } else if (!deliveredChunkIds.has(event.chunkId)) { + // Guard against duplicates: polling may have already claimed this + // chunk between its has() check and readBuffer() yield. + deliveredChunkIds.add(event.chunkId); // After disk reading is complete, deliver chunks immediately // Create a copy to prevent ArrayBuffer detachment controller.enqueue(Uint8Array.from(event.chunkData)); @@ -451,8 +457,13 @@ export function createStreamer(basedir: string, tag?: string): Streamer { return; } // Remove listeners before closing + streamClosed = true; streamEmitter.off(`chunk:${name}` as const, chunkListener); streamEmitter.off(`close:${name}` as const, closeListener); + if (pollInterval) { + clearInterval(pollInterval); + pollInterval = null; + } try { controller.close(); } catch { @@ -517,6 +528,8 @@ export function createStreamer(basedir: string, tag?: string): Streamer { isComplete = true; break; } + // Track as handled so polling doesn't re-deliver + deliveredChunkIds.add(chunkId); if (chunk.chunk.byteLength) { // Create a copy to prevent ArrayBuffer detachment controller.enqueue(Uint8Array.from(chunk.chunk)); @@ -554,11 +567,91 @@ export function createStreamer(basedir: string, tag?: string): Streamer { } catch { // Ignore if controller is already closed } + return; } + + // Track pre-startIndex chunks so polling doesn't re-deliver them + for ( + let i = 0; + i < resolvedStartIndex && i < chunkFiles.length; + i++ + ) { + const file = chunkFiles[i]; + const rawChunkId = file.substring(name.length + 1); + const chunkId = tag + ? rawChunkId.replace(`.${tag}`, '') + : rawChunkId; + deliveredChunkIds.add(chunkId); + } + + // Start filesystem polling for cross-process streaming support. + // The EventEmitter only works in-process; when the writer is in a + // separate process (e.g. e2e test runner ↔ workbench app), polling + // the shared filesystem is the fallback delivery mechanism. + let isPolling = false; + pollInterval = setInterval(async () => { + if (isPolling) return; + isPolling = true; + try { + const { files: currentFiles, extMap: currentExtMap } = + await listChunkFilesForStream(chunksDir, name, tag); + + for (const file of currentFiles) { + const rawChunkId = file.substring(name.length + 1); + const chunkId = tag + ? rawChunkId.replace(`.${tag}`, '') + : rawChunkId; + + if (deliveredChunkIds.has(chunkId)) continue; + deliveredChunkIds.add(chunkId); + + const ext = currentExtMap.get(file) ?? '.bin'; + const chunk = deserializeChunk( + await readBuffer(path.join(chunksDir, `${file}${ext}`)) + ); + + if (chunk?.eof === true) { + streamClosed = true; + if (pollInterval) { + clearInterval(pollInterval); + pollInterval = null; + } + streamEmitter.off(`chunk:${name}` as const, chunkListener); + streamEmitter.off(`close:${name}` as const, closeListener); + try { + controller.close(); + } catch { + // Ignore if controller is already closed + } + return; + } + + // Guard against enqueue-after-close: closeListener may have + // fired between our readBuffer() yield and this point. + if (streamClosed) return; + + if (chunk.chunk.byteLength) { + controller.enqueue(Uint8Array.from(chunk.chunk)); + } + } + } catch (err: unknown) { + // Silently ignore transient filesystem errors (ENOENT, EACCES, etc.) + // Surface unexpected errors so bugs aren't hidden + if (!(err instanceof Error && 'code' in err)) { + console.error('[world-local] Unexpected polling error:', err); + } + } finally { + isPolling = false; + } + }, 100); }, cancel() { removeListeners(); + if (pollInterval) { + clearInterval(pollInterval); + pollInterval = null; + } }, }); },