diff --git a/.changeset/negative-start-index.md b/.changeset/negative-start-index.md new file mode 100644 index 0000000000..c5788620cb --- /dev/null +++ b/.changeset/negative-start-index.md @@ -0,0 +1,8 @@ +--- +"@workflow/world": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +"@workflow/core": patch +--- + +Support negative `startIndex` for streaming (e.g. `-3` reads last 3 chunks) diff --git a/docs/content/docs/ai/resumable-streams.mdx b/docs/content/docs/ai/resumable-streams.mdx index 510c357c90..6fdd8a076a 100644 --- a/docs/content/docs/ai/resumable-streams.mdx +++ b/docs/content/docs/ai/resumable-streams.mdx @@ -82,7 +82,7 @@ export async function GET( } ``` -The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. +The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. Negative values are also supported (e.g. `-5` starts 5 chunks before the end), which is useful for custom stream consumers (such as a dashboard showing recent output) that want to show the most recent output without replaying the full stream. diff --git a/docs/content/docs/foundations/streaming.mdx b/docs/content/docs/foundations/streaming.mdx index 9015d65fbb..9faed935e8 100644 --- a/docs/content/docs/foundations/streaming.mdx +++ b/docs/content/docs/foundations/streaming.mdx @@ -87,6 +87,22 @@ export async function GET( This allows clients to reconnect and continue receiving data from where they left off, rather than restarting from the beginning. +`startIndex` also supports **negative values** to read relative to the end of the stream. For example, `startIndex: -5` starts 5 chunks before the current end. This is useful when you want to show the most recent output without reading the entire stream history. + +On an active (not-yet-closed) stream, the negative index resolves relative to the chunk count at connection time; any chunks written afterward are still delivered normally. + +{/* @skip-typecheck: incomplete code sample */} +```typescript +// Read only the last 10 chunks +const stream = run.getReadable({ startIndex: -10 }); +``` + +If the absolute value exceeds the total number of chunks, reading starts from the beginning (the value is clamped to 0). + + +Because streams are live and continue receiving chunks, negative `startIndex` values resolve to different absolute positions on each call. Accurate pagination over a live stream requires cursor-based access, which is not yet supported. Keep this in mind when building clients that paginate over stream data. + + ## Streams as Data Types [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) and [`WritableStream`](https://developer.mozilla.org/en-US/docs/Web/API/WritableStream) are standard Web Streams API types that Workflow DevKit makes serializable. These are not custom types - they follow the web standard - but Workflow DevKit adds the ability to pass them between functions while maintaining their streaming capabilities. diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 1c9eeb8451..3944a2f728 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -614,49 +614,96 @@ describe('e2e', () => { // Output stream tests use run.getReadable() which requires in-process streaming // infrastructure. The local world's streamer uses an EventEmitter that doesn't work // cross-process (test runner ↔ workbench app). - test.skipIf(isLocalDeployment())( - 'outputStreamWorkflow', - { timeout: 60_000 }, - async () => { - const run = await start(await e2e('outputStreamWorkflow'), []); - const reader = run.getReadable().getReader(); - const namedReader = run.getReadable({ namespace: 'test' }).getReader(); - - // First chunk from default stream: binary data - const r1 = await reader.read(); - assert(r1.value); - assert(r1.value instanceof Uint8Array); - expect(Buffer.from(r1.value).toString()).toEqual('Hello, world!'); - - // First chunk from named stream: binary data - const r1Named = await namedReader.read(); - assert(r1Named.value); - assert(r1Named.value instanceof Uint8Array); - expect(Buffer.from(r1Named.value).toString()).toEqual( - 'Hello, named stream!' - ); - - // Second chunk from default stream: JSON object - const r2 = await reader.read(); - assert(r2.value); - expect(r2.value).toEqual({ foo: 'test' }); + // + // outputStreamWorkflow writes 2 chunks to the default stream: + // chunk 0: binary "Hello, world!" + // chunk 1: object { foo: 'test' } + // and 2 chunks to the "test" named stream: + // chunk 0: binary "Hello, named stream!" + // chunk 1: object { foo: 'bar' } + describe.skipIf(isLocalDeployment())('outputStreamWorkflow', () => { + const startIndexCases = [ + { + name: 'no startIndex (reads all chunks)', + startIndex: undefined, + expectedDefault: [ + { type: 'binary', value: 'Hello, world!' }, + { type: 'object', value: { foo: 'test' } }, + ], + expectedNamed: [ + { type: 'binary', value: 'Hello, named stream!' }, + { type: 'object', value: { foo: 'bar' } }, + ], + // Can stream in real-time without waiting for completion + waitForCompletion: false, + }, + { + name: 'positive startIndex (skips first chunk)', + startIndex: 1, + expectedDefault: [{ type: 'object', value: { foo: 'test' } }], + expectedNamed: [{ type: 'object', value: { foo: 'bar' } }], + // Positive startIndex needs the stream written up to that point + waitForCompletion: true, + }, + { + name: 'negative startIndex (reads from end)', + startIndex: -1, + expectedDefault: [{ type: 'object', value: { foo: 'test' } }], + expectedNamed: [{ type: 'object', value: { foo: 'bar' } }], + // Negative startIndex resolves at connection time using knownChunkCount, + // so the stream must be fully written before connecting the reader. + waitForCompletion: true, + }, + ] as const; + + for (const tc of startIndexCases) { + test(tc.name, { timeout: 60_000 }, async () => { + const run = await start(await e2e('outputStreamWorkflow'), []); + + if (tc.waitForCompletion) { + await run.returnValue; + } - // Second chunk from named stream: JSON object - const r2Named = await namedReader.read(); - assert(r2Named.value); - expect(r2Named.value).toEqual({ foo: 'bar' }); + const reader = run + .getReadable({ startIndex: tc.startIndex }) + .getReader(); + const namedReader = run + .getReadable({ namespace: 'test', startIndex: tc.startIndex }) + .getReader(); + + for (const expected of tc.expectedDefault) { + const { value } = await reader.read(); + assert(value); + if (expected.type === 'binary') { + assert(value instanceof Uint8Array); + expect(Buffer.from(value).toString()).toEqual(expected.value); + } else { + expect(value).toEqual(expected.value); + } + } - // Streams should be closed - const r3 = await reader.read(); - expect(r3.done).toBe(true); + // Default stream should be closed after expected chunks + expect((await reader.read()).done).toBe(true); + + for (const expected of tc.expectedNamed) { + const { value } = await namedReader.read(); + assert(value); + if (expected.type === 'binary') { + assert(value instanceof Uint8Array); + expect(Buffer.from(value).toString()).toEqual(expected.value); + } else { + expect(value).toEqual(expected.value); + } + } - const r3Named = await namedReader.read(); - expect(r3Named.done).toBe(true); + // Named stream should be closed after expected chunks + expect((await namedReader.read()).done).toBe(true); - const returnValue = await run.returnValue; - expect(returnValue).toEqual('done'); + const returnValue = await run.returnValue; + expect(returnValue).toEqual('done'); + }); } - ); + }); test.skipIf(isLocalDeployment())( 'outputStreamInsideStepWorkflow - getWritable() called inside step functions', diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 68fd5c377b..c2feb1fbac 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -33,6 +33,7 @@ export interface WorkflowReadableStreamOptions { namespace?: string; /** * The index number of the starting chunk to begin reading the stream from. + * Negative values start from the end (e.g. -3 reads the last 3 chunks). */ startIndex?: number; /** diff --git a/packages/core/src/runtime/runs.ts b/packages/core/src/runtime/runs.ts index 1c4874b2e7..0c8a9ef8fd 100644 --- a/packages/core/src/runtime/runs.ts +++ b/packages/core/src/runtime/runs.ts @@ -23,6 +23,7 @@ export interface StopSleepResult { export interface ReadStreamOptions { /** * The index to start reading from. Defaults to 0. + * Negative values start from the end (e.g. -3 reads the last 3 chunks). */ startIndex?: number; } diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index e381716249..58c78a5970 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -471,6 +471,63 @@ describe('streamer', () => { // Should successfully read remaining chunks expect(chunks.join('')).toBe('chunk2chunk3'); }); + + it('should support negative startIndex to read from the end', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'negative-index-stream'; + + // Write 4 chunks + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk0'); + await new Promise((resolve) => setTimeout(resolve, 2)); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk1'); + await new Promise((resolve) => setTimeout(resolve, 2)); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk2'); + await new Promise((resolve) => setTimeout(resolve, 2)); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk3'); + await streamer.closeStream(streamName, TEST_RUN_ID); + + // Read with startIndex=-2 → last 2 chunks + const stream = await streamer.readFromStream(streamName, -2); + const reader = stream.getReader(); + + const chunks: string[] = []; + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + if (result.value) { + chunks.push(Buffer.from(result.value).toString()); + } + } + + expect(chunks.join('')).toBe('chunk2chunk3'); + }); + + it('should clamp negative startIndex that exceeds chunk count to 0', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'negative-clamped-stream'; + + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk0'); + await new Promise((resolve) => setTimeout(resolve, 2)); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk1'); + await streamer.closeStream(streamName, TEST_RUN_ID); + + // -100 exceeds total count, should clamp to 0 and return all chunks + const stream = await streamer.readFromStream(streamName, -100); + const reader = stream.getReader(); + + const chunks: string[] = []; + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + if (result.value) { + chunks.push(Buffer.from(result.value).toString()); + } + } + + expect(chunks.join('')).toBe('chunk0chunk1'); + }); }); describe('integration scenarios', () => { diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index bd669b0d73..b5e2ce8e26 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -341,9 +341,33 @@ export function createStreamer(basedir: string, tag?: string): Streamer { .filter((file) => file.startsWith(`${name}-`)) .sort(); // ULID lexicographic sort = chronological order + // Resolve negative startIndex relative to the number of data chunks + // (excluding the trailing EOF marker chunk, if present). + let dataChunkCount = chunkFiles.length; + if ( + typeof startIndex === 'number' && + startIndex < 0 && + chunkFiles.length > 0 + ) { + const lastFile = chunkFiles[chunkFiles.length - 1]; + const lastExt = fileExtMap.get(lastFile) ?? '.bin'; + // Note: this incurs an extra disk read to check the EOF marker. + // Acceptable since negative startIndex is not a hot path. + const lastChunk = deserializeChunk( + await readBuffer(path.join(chunksDir, `${lastFile}${lastExt}`)) + ); + if (lastChunk?.eof === true) { + dataChunkCount--; + } + } + const resolvedStartIndex = + typeof startIndex === 'number' && startIndex < 0 + ? Math.max(0, dataChunkCount + startIndex) + : startIndex; + // Process existing chunks, skipping any already delivered via events let isComplete = false; - for (let i = startIndex; i < chunkFiles.length; i++) { + for (let i = resolvedStartIndex; i < chunkFiles.length; i++) { const file = chunkFiles[i]; // Extract chunk ID from filename: "streamName-chunkId" or "streamName-chunkId.tag" const rawChunkId = file.substring(name.length + 1); diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index 24d0b6aa04..b3e44face4 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -247,6 +247,16 @@ export function createStreamer( .where(and(eq(streams.streamId, name))) .orderBy(streams.chunkId); + // Resolve negative offset relative to the data chunk count + // (excluding the trailing EOF marker, if present) + if (typeof offset === 'number' && offset < 0) { + const dataCount = + chunks.length > 0 && chunks[chunks.length - 1].eof + ? chunks.length - 1 + : chunks.length; + offset = Math.max(0, dataCount + offset); + } + for (const chunk of [...chunks, ...(buffer ?? [])]) { enqueue(chunk); } diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index e333a203ce..5fc8274553 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -35,6 +35,7 @@ function httpLog( ); } } + import { ErrorType, getSpanKind, diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index d53fd96d14..d12cedb1ea 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -49,6 +49,12 @@ export interface Streamer { ): Promise; closeStream(name: string, runId: string): Promise; + /** + * Read from a stream starting at the given chunk index. + * Positive values skip that many chunks from the start (0-based). + * Negative values start that many chunks before the current end + * (e.g. -3 on a 10-chunk stream starts at chunk 7). Clamped to 0. + */ readFromStream( name: string, startIndex?: number