From 5d243a3fa8131e5f41f10946d926a8d8e89fd77c Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 19 Mar 2026 17:44:12 -0700 Subject: [PATCH 1/9] c Signed-off-by: Peter Wielander --- packages/world-vercel/src/utils.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index e333a203ce..6b32a789ff 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -35,6 +35,7 @@ function httpLog( ); } } + import { ErrorType, getSpanKind, @@ -58,7 +59,8 @@ import { version } from './version.js'; * * Example: 'https://workflow-server-git-branch-name.vercel.sh' */ -const WORKFLOW_SERVER_URL_OVERRIDE = ''; +const WORKFLOW_SERVER_URL_OVERRIDE = + 'https://workflow-server-e03hrytg4.vercel.sh'; export interface APIConfig { token?: string; From 22417485146b41d41494eb1f6175fc3e19a55333 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 19 Mar 2026 17:38:12 -0700 Subject: [PATCH 2/9] feat: support negative startIndex for streaming Negative startIndex values (e.g. -3) resolve to n chunks before the known end of the stream. All world implementations (local, postgres, vercel) support this. Includes unit tests, e2e test, and doc updates. Co-Authored-By: Claude Opus 4.6 (1M context) --- .changeset/negative-start-index.md | 8 +++ docs/content/docs/ai/resumable-streams.mdx | 2 +- docs/content/docs/foundations/streaming.mdx | 9 ++++ packages/core/e2e/e2e.test.ts | 25 +++++++++ packages/core/src/runtime/run.ts | 1 + packages/core/src/runtime/runs.ts | 1 + packages/world-local/src/streamer.test.ts | 57 +++++++++++++++++++++ packages/world-local/src/streamer.ts | 20 +++++++- packages/world-postgres/src/streamer.ts | 10 ++++ packages/world/src/interfaces.ts | 6 +++ 10 files changed, 137 insertions(+), 2 deletions(-) create mode 100644 .changeset/negative-start-index.md diff --git a/.changeset/negative-start-index.md b/.changeset/negative-start-index.md new file mode 100644 index 0000000000..c5788620cb --- /dev/null +++ b/.changeset/negative-start-index.md @@ -0,0 +1,8 @@ +--- +"@workflow/world": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +"@workflow/core": patch +--- + +Support negative `startIndex` for streaming (e.g. `-3` reads last 3 chunks) diff --git a/docs/content/docs/ai/resumable-streams.mdx b/docs/content/docs/ai/resumable-streams.mdx index 510c357c90..e4d1f20145 100644 --- a/docs/content/docs/ai/resumable-streams.mdx +++ b/docs/content/docs/ai/resumable-streams.mdx @@ -82,7 +82,7 @@ export async function GET( } ``` -The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. +The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. Negative values are also supported (e.g. `-5` starts 5 chunks before the end), which is useful for showing the most recent output without replaying the full stream. diff --git a/docs/content/docs/foundations/streaming.mdx b/docs/content/docs/foundations/streaming.mdx index 9015d65fbb..1e9d231d02 100644 --- a/docs/content/docs/foundations/streaming.mdx +++ b/docs/content/docs/foundations/streaming.mdx @@ -87,6 +87,15 @@ export async function GET( This allows clients to reconnect and continue receiving data from where they left off, rather than restarting from the beginning. +`startIndex` also supports **negative values** to read relative to the end of the stream. For example, `startIndex: -5` starts 5 chunks before the current end. This is useful when you want to show the most recent output without reading the entire stream history: + +```typescript +// Read only the last 10 chunks +const stream = run.getReadable({ startIndex: -10 }); +``` + +If the absolute value exceeds the total number of chunks, reading starts from the beginning (the value is clamped to 0). + ## Streams as Data Types [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) and [`WritableStream`](https://developer.mozilla.org/en-US/docs/Web/API/WritableStream) are standard Web Streams API types that Workflow DevKit makes serializable. These are not custom types - they follow the web standard - but Workflow DevKit adds the ability to pass them between functions while maintaining their streaming capabilities. diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 1c9eeb8451..bf22d57e1e 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -702,6 +702,31 @@ describe('e2e', () => { } ); + test.skipIf(isLocalDeployment())( + 'outputStreamWorkflow - negative startIndex reads from end', + { timeout: 60_000 }, + async () => { + const run = await start(await e2e('outputStreamWorkflow'), []); + + // Use negative startIndex to read only the last chunk from the default stream. + // outputStreamWorkflow writes 2 chunks to the default stream: + // chunk 0: binary "Hello, world!" + // chunk 1: object { foo: 'test' } + // startIndex: -1 should skip chunk 0 and only return chunk 1. + const reader = run.getReadable({ startIndex: -1 }).getReader(); + + const r1 = await reader.read(); + assert(r1.value); + expect(r1.value).toEqual({ foo: 'test' }); + + const r2 = await reader.read(); + expect(r2.done).toBe(true); + + const returnValue = await run.returnValue; + expect(returnValue).toEqual('done'); + } + ); + test('fetchWorkflow', { timeout: 60_000 }, async () => { const run = await start(await e2e('fetchWorkflow'), []); const returnValue = await run.returnValue; diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 68fd5c377b..c2feb1fbac 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -33,6 +33,7 @@ export interface WorkflowReadableStreamOptions { namespace?: string; /** * The index number of the starting chunk to begin reading the stream from. + * Negative values start from the end (e.g. -3 reads the last 3 chunks). */ startIndex?: number; /** diff --git a/packages/core/src/runtime/runs.ts b/packages/core/src/runtime/runs.ts index 1c4874b2e7..0c8a9ef8fd 100644 --- a/packages/core/src/runtime/runs.ts +++ b/packages/core/src/runtime/runs.ts @@ -23,6 +23,7 @@ export interface StopSleepResult { export interface ReadStreamOptions { /** * The index to start reading from. Defaults to 0. + * Negative values start from the end (e.g. -3 reads the last 3 chunks). */ startIndex?: number; } diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index e381716249..58c78a5970 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -471,6 +471,63 @@ describe('streamer', () => { // Should successfully read remaining chunks expect(chunks.join('')).toBe('chunk2chunk3'); }); + + it('should support negative startIndex to read from the end', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'negative-index-stream'; + + // Write 4 chunks + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk0'); + await new Promise((resolve) => setTimeout(resolve, 2)); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk1'); + await new Promise((resolve) => setTimeout(resolve, 2)); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk2'); + await new Promise((resolve) => setTimeout(resolve, 2)); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk3'); + await streamer.closeStream(streamName, TEST_RUN_ID); + + // Read with startIndex=-2 → last 2 chunks + const stream = await streamer.readFromStream(streamName, -2); + const reader = stream.getReader(); + + const chunks: string[] = []; + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + if (result.value) { + chunks.push(Buffer.from(result.value).toString()); + } + } + + expect(chunks.join('')).toBe('chunk2chunk3'); + }); + + it('should clamp negative startIndex that exceeds chunk count to 0', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'negative-clamped-stream'; + + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk0'); + await new Promise((resolve) => setTimeout(resolve, 2)); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk1'); + await streamer.closeStream(streamName, TEST_RUN_ID); + + // -100 exceeds total count, should clamp to 0 and return all chunks + const stream = await streamer.readFromStream(streamName, -100); + const reader = stream.getReader(); + + const chunks: string[] = []; + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + if (result.value) { + chunks.push(Buffer.from(result.value).toString()); + } + } + + expect(chunks.join('')).toBe('chunk0chunk1'); + }); }); describe('integration scenarios', () => { diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index bd669b0d73..d93eb1961b 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -341,9 +341,27 @@ export function createStreamer(basedir: string, tag?: string): Streamer { .filter((file) => file.startsWith(`${name}-`)) .sort(); // ULID lexicographic sort = chronological order + // Resolve negative startIndex relative to the number of data chunks + // (excluding the trailing EOF marker chunk, if present). + let dataChunkCount = chunkFiles.length; + if (startIndex < 0 && chunkFiles.length > 0) { + const lastFile = chunkFiles[chunkFiles.length - 1]; + const lastExt = fileExtMap.get(lastFile) ?? '.bin'; + const lastChunk = deserializeChunk( + await readBuffer(path.join(chunksDir, `${lastFile}${lastExt}`)) + ); + if (lastChunk?.eof === true) { + dataChunkCount--; + } + } + const resolvedStartIndex = + startIndex < 0 + ? Math.max(0, dataChunkCount + startIndex) + : startIndex; + // Process existing chunks, skipping any already delivered via events let isComplete = false; - for (let i = startIndex; i < chunkFiles.length; i++) { + for (let i = resolvedStartIndex; i < chunkFiles.length; i++) { const file = chunkFiles[i]; // Extract chunk ID from filename: "streamName-chunkId" or "streamName-chunkId.tag" const rawChunkId = file.substring(name.length + 1); diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index 24d0b6aa04..fecdc98026 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -247,6 +247,16 @@ export function createStreamer( .where(and(eq(streams.streamId, name))) .orderBy(streams.chunkId); + // Resolve negative offset relative to the data chunk count + // (excluding the trailing EOF marker, if present) + if (offset < 0) { + const dataCount = + chunks.length > 0 && chunks[chunks.length - 1].eof + ? chunks.length - 1 + : chunks.length; + offset = Math.max(0, dataCount + offset); + } + for (const chunk of [...chunks, ...(buffer ?? [])]) { enqueue(chunk); } diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index d53fd96d14..d12cedb1ea 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -49,6 +49,12 @@ export interface Streamer { ): Promise; closeStream(name: string, runId: string): Promise; + /** + * Read from a stream starting at the given chunk index. + * Positive values skip that many chunks from the start (0-based). + * Negative values start that many chunks before the current end + * (e.g. -3 on a 10-chunk stream starts at chunk 7). Clamped to 0. + */ readFromStream( name: string, startIndex?: number From 45d474d108c943c18ff79f2ba49aae2ded48d7b0 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 20 Mar 2026 09:14:20 -0700 Subject: [PATCH 3/9] undo override Signed-off-by: Peter Wielander --- packages/world-vercel/src/utils.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 6b32a789ff..5fc8274553 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -59,8 +59,7 @@ import { version } from './version.js'; * * Example: 'https://workflow-server-git-branch-name.vercel.sh' */ -const WORKFLOW_SERVER_URL_OVERRIDE = - 'https://workflow-server-e03hrytg4.vercel.sh'; +const WORKFLOW_SERVER_URL_OVERRIDE = ''; export interface APIConfig { token?: string; From 4a221141e8d860c0f84ca3d97f2fcd01626038d3 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 20 Mar 2026 09:43:45 -0700 Subject: [PATCH 4/9] Consolidate outputStream e2e tests into data-driven loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces three separate tests (no startIndex, positive, negative) with a single describe block that iterates over startIndex cases. Fixes the negative startIndex test by waiting for workflow completion before connecting the reader — the backend resolves negative indices at connection time using knownChunkCount, which is 0 if the stream hasn't been fully written yet. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/e2e/e2e.test.ts | 148 +++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 63 deletions(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index bf22d57e1e..3944a2f728 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -614,49 +614,96 @@ describe('e2e', () => { // Output stream tests use run.getReadable() which requires in-process streaming // infrastructure. The local world's streamer uses an EventEmitter that doesn't work // cross-process (test runner ↔ workbench app). - test.skipIf(isLocalDeployment())( - 'outputStreamWorkflow', - { timeout: 60_000 }, - async () => { - const run = await start(await e2e('outputStreamWorkflow'), []); - const reader = run.getReadable().getReader(); - const namedReader = run.getReadable({ namespace: 'test' }).getReader(); - - // First chunk from default stream: binary data - const r1 = await reader.read(); - assert(r1.value); - assert(r1.value instanceof Uint8Array); - expect(Buffer.from(r1.value).toString()).toEqual('Hello, world!'); - - // First chunk from named stream: binary data - const r1Named = await namedReader.read(); - assert(r1Named.value); - assert(r1Named.value instanceof Uint8Array); - expect(Buffer.from(r1Named.value).toString()).toEqual( - 'Hello, named stream!' - ); - - // Second chunk from default stream: JSON object - const r2 = await reader.read(); - assert(r2.value); - expect(r2.value).toEqual({ foo: 'test' }); + // + // outputStreamWorkflow writes 2 chunks to the default stream: + // chunk 0: binary "Hello, world!" + // chunk 1: object { foo: 'test' } + // and 2 chunks to the "test" named stream: + // chunk 0: binary "Hello, named stream!" + // chunk 1: object { foo: 'bar' } + describe.skipIf(isLocalDeployment())('outputStreamWorkflow', () => { + const startIndexCases = [ + { + name: 'no startIndex (reads all chunks)', + startIndex: undefined, + expectedDefault: [ + { type: 'binary', value: 'Hello, world!' }, + { type: 'object', value: { foo: 'test' } }, + ], + expectedNamed: [ + { type: 'binary', value: 'Hello, named stream!' }, + { type: 'object', value: { foo: 'bar' } }, + ], + // Can stream in real-time without waiting for completion + waitForCompletion: false, + }, + { + name: 'positive startIndex (skips first chunk)', + startIndex: 1, + expectedDefault: [{ type: 'object', value: { foo: 'test' } }], + expectedNamed: [{ type: 'object', value: { foo: 'bar' } }], + // Positive startIndex needs the stream written up to that point + waitForCompletion: true, + }, + { + name: 'negative startIndex (reads from end)', + startIndex: -1, + expectedDefault: [{ type: 'object', value: { foo: 'test' } }], + expectedNamed: [{ type: 'object', value: { foo: 'bar' } }], + // Negative startIndex resolves at connection time using knownChunkCount, + // so the stream must be fully written before connecting the reader. + waitForCompletion: true, + }, + ] as const; + + for (const tc of startIndexCases) { + test(tc.name, { timeout: 60_000 }, async () => { + const run = await start(await e2e('outputStreamWorkflow'), []); + + if (tc.waitForCompletion) { + await run.returnValue; + } - // Second chunk from named stream: JSON object - const r2Named = await namedReader.read(); - assert(r2Named.value); - expect(r2Named.value).toEqual({ foo: 'bar' }); + const reader = run + .getReadable({ startIndex: tc.startIndex }) + .getReader(); + const namedReader = run + .getReadable({ namespace: 'test', startIndex: tc.startIndex }) + .getReader(); + + for (const expected of tc.expectedDefault) { + const { value } = await reader.read(); + assert(value); + if (expected.type === 'binary') { + assert(value instanceof Uint8Array); + expect(Buffer.from(value).toString()).toEqual(expected.value); + } else { + expect(value).toEqual(expected.value); + } + } - // Streams should be closed - const r3 = await reader.read(); - expect(r3.done).toBe(true); + // Default stream should be closed after expected chunks + expect((await reader.read()).done).toBe(true); + + for (const expected of tc.expectedNamed) { + const { value } = await namedReader.read(); + assert(value); + if (expected.type === 'binary') { + assert(value instanceof Uint8Array); + expect(Buffer.from(value).toString()).toEqual(expected.value); + } else { + expect(value).toEqual(expected.value); + } + } - const r3Named = await namedReader.read(); - expect(r3Named.done).toBe(true); + // Named stream should be closed after expected chunks + expect((await namedReader.read()).done).toBe(true); - const returnValue = await run.returnValue; - expect(returnValue).toEqual('done'); + const returnValue = await run.returnValue; + expect(returnValue).toEqual('done'); + }); } - ); + }); test.skipIf(isLocalDeployment())( 'outputStreamInsideStepWorkflow - getWritable() called inside step functions', @@ -702,31 +749,6 @@ describe('e2e', () => { } ); - test.skipIf(isLocalDeployment())( - 'outputStreamWorkflow - negative startIndex reads from end', - { timeout: 60_000 }, - async () => { - const run = await start(await e2e('outputStreamWorkflow'), []); - - // Use negative startIndex to read only the last chunk from the default stream. - // outputStreamWorkflow writes 2 chunks to the default stream: - // chunk 0: binary "Hello, world!" - // chunk 1: object { foo: 'test' } - // startIndex: -1 should skip chunk 0 and only return chunk 1. - const reader = run.getReadable({ startIndex: -1 }).getReader(); - - const r1 = await reader.read(); - assert(r1.value); - expect(r1.value).toEqual({ foo: 'test' }); - - const r2 = await reader.read(); - expect(r2.done).toBe(true); - - const returnValue = await run.returnValue; - expect(returnValue).toEqual('done'); - } - ); - test('fetchWorkflow', { timeout: 60_000 }, async () => { const run = await start(await e2e('fetchWorkflow'), []); const returnValue = await run.returnValue; From 34ef29e5a888b0eeb2737f6fe26cca06cbb4aea9 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 20 Mar 2026 09:44:21 -0700 Subject: [PATCH 5/9] fix Signed-off-by: Peter Wielander --- docs/content/docs/foundations/streaming.mdx | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content/docs/foundations/streaming.mdx b/docs/content/docs/foundations/streaming.mdx index 1e9d231d02..d8b6d83a19 100644 --- a/docs/content/docs/foundations/streaming.mdx +++ b/docs/content/docs/foundations/streaming.mdx @@ -89,6 +89,7 @@ This allows clients to reconnect and continue receiving data from where they lef `startIndex` also supports **negative values** to read relative to the end of the stream. For example, `startIndex: -5` starts 5 chunks before the current end. This is useful when you want to show the most recent output without reading the entire stream history: +{/* @skip-typecheck: incomplete code sample */} ```typescript // Read only the last 10 chunks const stream = run.getReadable({ startIndex: -10 }); From 8b994fc74b9ad4a9b371be561df7cfb662e9348b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 20 Mar 2026 09:44:34 -0700 Subject: [PATCH 6/9] test Signed-off-by: Peter Wielander --- packages/core/e2e/e2e.test.ts | 148 +++++++++++++++------------------- 1 file changed, 63 insertions(+), 85 deletions(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 3944a2f728..bf22d57e1e 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -614,96 +614,49 @@ describe('e2e', () => { // Output stream tests use run.getReadable() which requires in-process streaming // infrastructure. The local world's streamer uses an EventEmitter that doesn't work // cross-process (test runner ↔ workbench app). - // - // outputStreamWorkflow writes 2 chunks to the default stream: - // chunk 0: binary "Hello, world!" - // chunk 1: object { foo: 'test' } - // and 2 chunks to the "test" named stream: - // chunk 0: binary "Hello, named stream!" - // chunk 1: object { foo: 'bar' } - describe.skipIf(isLocalDeployment())('outputStreamWorkflow', () => { - const startIndexCases = [ - { - name: 'no startIndex (reads all chunks)', - startIndex: undefined, - expectedDefault: [ - { type: 'binary', value: 'Hello, world!' }, - { type: 'object', value: { foo: 'test' } }, - ], - expectedNamed: [ - { type: 'binary', value: 'Hello, named stream!' }, - { type: 'object', value: { foo: 'bar' } }, - ], - // Can stream in real-time without waiting for completion - waitForCompletion: false, - }, - { - name: 'positive startIndex (skips first chunk)', - startIndex: 1, - expectedDefault: [{ type: 'object', value: { foo: 'test' } }], - expectedNamed: [{ type: 'object', value: { foo: 'bar' } }], - // Positive startIndex needs the stream written up to that point - waitForCompletion: true, - }, - { - name: 'negative startIndex (reads from end)', - startIndex: -1, - expectedDefault: [{ type: 'object', value: { foo: 'test' } }], - expectedNamed: [{ type: 'object', value: { foo: 'bar' } }], - // Negative startIndex resolves at connection time using knownChunkCount, - // so the stream must be fully written before connecting the reader. - waitForCompletion: true, - }, - ] as const; - - for (const tc of startIndexCases) { - test(tc.name, { timeout: 60_000 }, async () => { - const run = await start(await e2e('outputStreamWorkflow'), []); - - if (tc.waitForCompletion) { - await run.returnValue; - } + test.skipIf(isLocalDeployment())( + 'outputStreamWorkflow', + { timeout: 60_000 }, + async () => { + const run = await start(await e2e('outputStreamWorkflow'), []); + const reader = run.getReadable().getReader(); + const namedReader = run.getReadable({ namespace: 'test' }).getReader(); - const reader = run - .getReadable({ startIndex: tc.startIndex }) - .getReader(); - const namedReader = run - .getReadable({ namespace: 'test', startIndex: tc.startIndex }) - .getReader(); - - for (const expected of tc.expectedDefault) { - const { value } = await reader.read(); - assert(value); - if (expected.type === 'binary') { - assert(value instanceof Uint8Array); - expect(Buffer.from(value).toString()).toEqual(expected.value); - } else { - expect(value).toEqual(expected.value); - } - } + // First chunk from default stream: binary data + const r1 = await reader.read(); + assert(r1.value); + assert(r1.value instanceof Uint8Array); + expect(Buffer.from(r1.value).toString()).toEqual('Hello, world!'); - // Default stream should be closed after expected chunks - expect((await reader.read()).done).toBe(true); - - for (const expected of tc.expectedNamed) { - const { value } = await namedReader.read(); - assert(value); - if (expected.type === 'binary') { - assert(value instanceof Uint8Array); - expect(Buffer.from(value).toString()).toEqual(expected.value); - } else { - expect(value).toEqual(expected.value); - } - } + // First chunk from named stream: binary data + const r1Named = await namedReader.read(); + assert(r1Named.value); + assert(r1Named.value instanceof Uint8Array); + expect(Buffer.from(r1Named.value).toString()).toEqual( + 'Hello, named stream!' + ); + + // Second chunk from default stream: JSON object + const r2 = await reader.read(); + assert(r2.value); + expect(r2.value).toEqual({ foo: 'test' }); - // Named stream should be closed after expected chunks - expect((await namedReader.read()).done).toBe(true); + // Second chunk from named stream: JSON object + const r2Named = await namedReader.read(); + assert(r2Named.value); + expect(r2Named.value).toEqual({ foo: 'bar' }); - const returnValue = await run.returnValue; - expect(returnValue).toEqual('done'); - }); + // Streams should be closed + const r3 = await reader.read(); + expect(r3.done).toBe(true); + + const r3Named = await namedReader.read(); + expect(r3Named.done).toBe(true); + + const returnValue = await run.returnValue; + expect(returnValue).toEqual('done'); } - }); + ); test.skipIf(isLocalDeployment())( 'outputStreamInsideStepWorkflow - getWritable() called inside step functions', @@ -749,6 +702,31 @@ describe('e2e', () => { } ); + test.skipIf(isLocalDeployment())( + 'outputStreamWorkflow - negative startIndex reads from end', + { timeout: 60_000 }, + async () => { + const run = await start(await e2e('outputStreamWorkflow'), []); + + // Use negative startIndex to read only the last chunk from the default stream. + // outputStreamWorkflow writes 2 chunks to the default stream: + // chunk 0: binary "Hello, world!" + // chunk 1: object { foo: 'test' } + // startIndex: -1 should skip chunk 0 and only return chunk 1. + const reader = run.getReadable({ startIndex: -1 }).getReader(); + + const r1 = await reader.read(); + assert(r1.value); + expect(r1.value).toEqual({ foo: 'test' }); + + const r2 = await reader.read(); + expect(r2.done).toBe(true); + + const returnValue = await run.returnValue; + expect(returnValue).toEqual('done'); + } + ); + test('fetchWorkflow', { timeout: 60_000 }, async () => { const run = await start(await e2e('fetchWorkflow'), []); const returnValue = await run.returnValue; From 23ce02d9bad955a6d7bd2b0f7a5a034d093f1fcb Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 20 Mar 2026 09:54:14 -0700 Subject: [PATCH 7/9] Consolidate outputStream e2e tests into data-driven loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces three separate tests (no startIndex, positive, negative) with a single describe block that iterates over startIndex cases. Fixes the negative startIndex test by waiting for workflow completion before connecting the reader — the backend resolves negative indices at connection time using knownChunkCount, which is 0 if the stream hasn't been fully written yet. Signed-off-by: Peter Wielander Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/e2e/e2e.test.ts | 148 +++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 63 deletions(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index bf22d57e1e..3944a2f728 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -614,49 +614,96 @@ describe('e2e', () => { // Output stream tests use run.getReadable() which requires in-process streaming // infrastructure. The local world's streamer uses an EventEmitter that doesn't work // cross-process (test runner ↔ workbench app). - test.skipIf(isLocalDeployment())( - 'outputStreamWorkflow', - { timeout: 60_000 }, - async () => { - const run = await start(await e2e('outputStreamWorkflow'), []); - const reader = run.getReadable().getReader(); - const namedReader = run.getReadable({ namespace: 'test' }).getReader(); - - // First chunk from default stream: binary data - const r1 = await reader.read(); - assert(r1.value); - assert(r1.value instanceof Uint8Array); - expect(Buffer.from(r1.value).toString()).toEqual('Hello, world!'); - - // First chunk from named stream: binary data - const r1Named = await namedReader.read(); - assert(r1Named.value); - assert(r1Named.value instanceof Uint8Array); - expect(Buffer.from(r1Named.value).toString()).toEqual( - 'Hello, named stream!' - ); - - // Second chunk from default stream: JSON object - const r2 = await reader.read(); - assert(r2.value); - expect(r2.value).toEqual({ foo: 'test' }); + // + // outputStreamWorkflow writes 2 chunks to the default stream: + // chunk 0: binary "Hello, world!" + // chunk 1: object { foo: 'test' } + // and 2 chunks to the "test" named stream: + // chunk 0: binary "Hello, named stream!" + // chunk 1: object { foo: 'bar' } + describe.skipIf(isLocalDeployment())('outputStreamWorkflow', () => { + const startIndexCases = [ + { + name: 'no startIndex (reads all chunks)', + startIndex: undefined, + expectedDefault: [ + { type: 'binary', value: 'Hello, world!' }, + { type: 'object', value: { foo: 'test' } }, + ], + expectedNamed: [ + { type: 'binary', value: 'Hello, named stream!' }, + { type: 'object', value: { foo: 'bar' } }, + ], + // Can stream in real-time without waiting for completion + waitForCompletion: false, + }, + { + name: 'positive startIndex (skips first chunk)', + startIndex: 1, + expectedDefault: [{ type: 'object', value: { foo: 'test' } }], + expectedNamed: [{ type: 'object', value: { foo: 'bar' } }], + // Positive startIndex needs the stream written up to that point + waitForCompletion: true, + }, + { + name: 'negative startIndex (reads from end)', + startIndex: -1, + expectedDefault: [{ type: 'object', value: { foo: 'test' } }], + expectedNamed: [{ type: 'object', value: { foo: 'bar' } }], + // Negative startIndex resolves at connection time using knownChunkCount, + // so the stream must be fully written before connecting the reader. + waitForCompletion: true, + }, + ] as const; + + for (const tc of startIndexCases) { + test(tc.name, { timeout: 60_000 }, async () => { + const run = await start(await e2e('outputStreamWorkflow'), []); + + if (tc.waitForCompletion) { + await run.returnValue; + } - // Second chunk from named stream: JSON object - const r2Named = await namedReader.read(); - assert(r2Named.value); - expect(r2Named.value).toEqual({ foo: 'bar' }); + const reader = run + .getReadable({ startIndex: tc.startIndex }) + .getReader(); + const namedReader = run + .getReadable({ namespace: 'test', startIndex: tc.startIndex }) + .getReader(); + + for (const expected of tc.expectedDefault) { + const { value } = await reader.read(); + assert(value); + if (expected.type === 'binary') { + assert(value instanceof Uint8Array); + expect(Buffer.from(value).toString()).toEqual(expected.value); + } else { + expect(value).toEqual(expected.value); + } + } - // Streams should be closed - const r3 = await reader.read(); - expect(r3.done).toBe(true); + // Default stream should be closed after expected chunks + expect((await reader.read()).done).toBe(true); + + for (const expected of tc.expectedNamed) { + const { value } = await namedReader.read(); + assert(value); + if (expected.type === 'binary') { + assert(value instanceof Uint8Array); + expect(Buffer.from(value).toString()).toEqual(expected.value); + } else { + expect(value).toEqual(expected.value); + } + } - const r3Named = await namedReader.read(); - expect(r3Named.done).toBe(true); + // Named stream should be closed after expected chunks + expect((await namedReader.read()).done).toBe(true); - const returnValue = await run.returnValue; - expect(returnValue).toEqual('done'); + const returnValue = await run.returnValue; + expect(returnValue).toEqual('done'); + }); } - ); + }); test.skipIf(isLocalDeployment())( 'outputStreamInsideStepWorkflow - getWritable() called inside step functions', @@ -702,31 +749,6 @@ describe('e2e', () => { } ); - test.skipIf(isLocalDeployment())( - 'outputStreamWorkflow - negative startIndex reads from end', - { timeout: 60_000 }, - async () => { - const run = await start(await e2e('outputStreamWorkflow'), []); - - // Use negative startIndex to read only the last chunk from the default stream. - // outputStreamWorkflow writes 2 chunks to the default stream: - // chunk 0: binary "Hello, world!" - // chunk 1: object { foo: 'test' } - // startIndex: -1 should skip chunk 0 and only return chunk 1. - const reader = run.getReadable({ startIndex: -1 }).getReader(); - - const r1 = await reader.read(); - assert(r1.value); - expect(r1.value).toEqual({ foo: 'test' }); - - const r2 = await reader.read(); - expect(r2.done).toBe(true); - - const returnValue = await run.returnValue; - expect(returnValue).toEqual('done'); - } - ); - test('fetchWorkflow', { timeout: 60_000 }, async () => { const run = await start(await e2e('fetchWorkflow'), []); const returnValue = await run.returnValue; From 89939b34f2efcd8e42b7f5c08a5232c4c931b27a Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 20 Mar 2026 12:32:23 -0700 Subject: [PATCH 8/9] Address PR feedback: docs clarifications and explicit type guards - Clarify that negative startIndex is for custom consumers, not WorkflowChatTransport - Add live-stream caveat for negative startIndex resolution timing - Add pagination limitation callout for live streams - Use explicit typeof guards for negative startIndex checks (world-local, world-postgres) - Add cost comment for EOF marker disk read in world-local Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/content/docs/ai/resumable-streams.mdx | 2 +- docs/content/docs/foundations/streaming.mdx | 8 +++++++- packages/world-local/src/streamer.ts | 10 ++++++++-- packages/world-postgres/src/streamer.ts | 2 +- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/docs/content/docs/ai/resumable-streams.mdx b/docs/content/docs/ai/resumable-streams.mdx index e4d1f20145..1ba3636aa8 100644 --- a/docs/content/docs/ai/resumable-streams.mdx +++ b/docs/content/docs/ai/resumable-streams.mdx @@ -82,7 +82,7 @@ export async function GET( } ``` -The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. Negative values are also supported (e.g. `-5` starts 5 chunks before the end), which is useful for showing the most recent output without replaying the full stream. +The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. Negative values are also supported (e.g. `-5` starts 5 chunks before the end), which is useful for custom stream consumers (such as a dashboard showing recent output) that want to show the most recent output without replaying the full stream. Note that `WorkflowChatTransport` itself does not use negative values — it always tracks the exact chunk index incrementally. diff --git a/docs/content/docs/foundations/streaming.mdx b/docs/content/docs/foundations/streaming.mdx index d8b6d83a19..9faed935e8 100644 --- a/docs/content/docs/foundations/streaming.mdx +++ b/docs/content/docs/foundations/streaming.mdx @@ -87,7 +87,9 @@ export async function GET( This allows clients to reconnect and continue receiving data from where they left off, rather than restarting from the beginning. -`startIndex` also supports **negative values** to read relative to the end of the stream. For example, `startIndex: -5` starts 5 chunks before the current end. This is useful when you want to show the most recent output without reading the entire stream history: +`startIndex` also supports **negative values** to read relative to the end of the stream. For example, `startIndex: -5` starts 5 chunks before the current end. This is useful when you want to show the most recent output without reading the entire stream history. + +On an active (not-yet-closed) stream, the negative index resolves relative to the chunk count at connection time; any chunks written afterward are still delivered normally. {/* @skip-typecheck: incomplete code sample */} ```typescript @@ -97,6 +99,10 @@ const stream = run.getReadable({ startIndex: -10 }); If the absolute value exceeds the total number of chunks, reading starts from the beginning (the value is clamped to 0). + +Because streams are live and continue receiving chunks, negative `startIndex` values resolve to different absolute positions on each call. Accurate pagination over a live stream requires cursor-based access, which is not yet supported. Keep this in mind when building clients that paginate over stream data. + + ## Streams as Data Types [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) and [`WritableStream`](https://developer.mozilla.org/en-US/docs/Web/API/WritableStream) are standard Web Streams API types that Workflow DevKit makes serializable. These are not custom types - they follow the web standard - but Workflow DevKit adds the ability to pass them between functions while maintaining their streaming capabilities. diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index d93eb1961b..b5e2ce8e26 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -344,9 +344,15 @@ export function createStreamer(basedir: string, tag?: string): Streamer { // Resolve negative startIndex relative to the number of data chunks // (excluding the trailing EOF marker chunk, if present). let dataChunkCount = chunkFiles.length; - if (startIndex < 0 && chunkFiles.length > 0) { + if ( + typeof startIndex === 'number' && + startIndex < 0 && + chunkFiles.length > 0 + ) { const lastFile = chunkFiles[chunkFiles.length - 1]; const lastExt = fileExtMap.get(lastFile) ?? '.bin'; + // Note: this incurs an extra disk read to check the EOF marker. + // Acceptable since negative startIndex is not a hot path. const lastChunk = deserializeChunk( await readBuffer(path.join(chunksDir, `${lastFile}${lastExt}`)) ); @@ -355,7 +361,7 @@ export function createStreamer(basedir: string, tag?: string): Streamer { } } const resolvedStartIndex = - startIndex < 0 + typeof startIndex === 'number' && startIndex < 0 ? Math.max(0, dataChunkCount + startIndex) : startIndex; diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index fecdc98026..b3e44face4 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -249,7 +249,7 @@ export function createStreamer( // Resolve negative offset relative to the data chunk count // (excluding the trailing EOF marker, if present) - if (offset < 0) { + if (typeof offset === 'number' && offset < 0) { const dataCount = chunks.length > 0 && chunks[chunks.length - 1].eof ? chunks.length - 1 From 7a2ee25f94cbcf10d962685e9be77229d121c04e Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 20 Mar 2026 13:22:14 -0700 Subject: [PATCH 9/9] Update docs/content/docs/ai/resumable-streams.mdx Co-authored-by: Pranay Prakash Signed-off-by: Peter Wielander --- docs/content/docs/ai/resumable-streams.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/docs/ai/resumable-streams.mdx b/docs/content/docs/ai/resumable-streams.mdx index 1ba3636aa8..6fdd8a076a 100644 --- a/docs/content/docs/ai/resumable-streams.mdx +++ b/docs/content/docs/ai/resumable-streams.mdx @@ -82,7 +82,7 @@ export async function GET( } ``` -The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. Negative values are also supported (e.g. `-5` starts 5 chunks before the end), which is useful for custom stream consumers (such as a dashboard showing recent output) that want to show the most recent output without replaying the full stream. Note that `WorkflowChatTransport` itself does not use negative values — it always tracks the exact chunk index incrementally. +The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. Negative values are also supported (e.g. `-5` starts 5 chunks before the end), which is useful for custom stream consumers (such as a dashboard showing recent output) that want to show the most recent output without replaying the full stream.