Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/negative-start-index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/world": patch
"@workflow/world-local": patch
"@workflow/world-postgres": patch
"@workflow/core": patch
---

Support negative `startIndex` for streaming (e.g. `-3` reads last 3 chunks)
2 changes: 1 addition & 1 deletion docs/content/docs/ai/resumable-streams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export async function GET(
}
```

The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received.
The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. Negative values are also supported (e.g. `-5` starts 5 chunks before the end), which is useful for custom stream consumers (such as a dashboard showing recent output) that want to show the most recent output without replaying the full stream.

</Step>

Expand Down
16 changes: 16 additions & 0 deletions docs/content/docs/foundations/streaming.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ export async function GET(

This allows clients to reconnect and continue receiving data from where they left off, rather than restarting from the beginning.

`startIndex` also supports **negative values** to read relative to the end of the stream. For example, `startIndex: -5` starts 5 chunks before the current end. This is useful when you want to show the most recent output without reading the entire stream history.

On an active (not-yet-closed) stream, the negative index resolves relative to the chunk count at connection time; any chunks written afterward are still delivered normally.

{/* @skip-typecheck: incomplete code sample */}
```typescript
// Read only the last 10 chunks
const stream = run.getReadable({ startIndex: -10 });
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add some note/callout to be careful about paginating on this. Because streams are live and will continue enqueueing chunks, relative numbers will map to different absolute chunk "IDs" on subsequent calls. I think accurate pagination is only possible when cursor based, and streams don't support cursor based pagination yet so clients should account for that limitation

Copy link
Copy Markdown
Member Author

@VaguelySerious VaguelySerious Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a problem when using run.getReadable directly, but it is a big problem for resumption, i.e. WorkflowChatTransport. I'm working on this in a follow-up PR together with a metadata/tail endpoint, and cursor-based pagination. I'll add a comment in this PR to mention the limitation

```

If the absolute value exceeds the total number of chunks, reading starts from the beginning (the value is clamped to 0).

<Callout type="warn">
Because streams are live and continue receiving chunks, negative `startIndex` values resolve to different absolute positions on each call. Accurate pagination over a live stream requires cursor-based access, which is not yet supported. Keep this in mind when building clients that paginate over stream data.
</Callout>

## Streams as Data Types

[`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) and [`WritableStream`](https://developer.mozilla.org/en-US/docs/Web/API/WritableStream) are standard Web Streams API types that Workflow DevKit makes serializable. These are not custom types - they follow the web standard - but Workflow DevKit adds the ability to pass them between functions while maintaining their streaming capabilities.
Expand Down
123 changes: 85 additions & 38 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -614,49 +614,96 @@ describe('e2e', () => {
// Output stream tests use run.getReadable() which requires in-process streaming
// infrastructure. The local world's streamer uses an EventEmitter that doesn't work
// cross-process (test runner ↔ workbench app).
test.skipIf(isLocalDeployment())(
'outputStreamWorkflow',
{ timeout: 60_000 },
async () => {
const run = await start(await e2e('outputStreamWorkflow'), []);
const reader = run.getReadable().getReader();
const namedReader = run.getReadable({ namespace: 'test' }).getReader();

// First chunk from default stream: binary data
const r1 = await reader.read();
assert(r1.value);
assert(r1.value instanceof Uint8Array);
expect(Buffer.from(r1.value).toString()).toEqual('Hello, world!');

// First chunk from named stream: binary data
const r1Named = await namedReader.read();
assert(r1Named.value);
assert(r1Named.value instanceof Uint8Array);
expect(Buffer.from(r1Named.value).toString()).toEqual(
'Hello, named stream!'
);

// Second chunk from default stream: JSON object
const r2 = await reader.read();
assert(r2.value);
expect(r2.value).toEqual({ foo: 'test' });
//
// outputStreamWorkflow writes 2 chunks to the default stream:
// chunk 0: binary "Hello, world!"
// chunk 1: object { foo: 'test' }
// and 2 chunks to the "test" named stream:
// chunk 0: binary "Hello, named stream!"
// chunk 1: object { foo: 'bar' }
describe.skipIf(isLocalDeployment())('outputStreamWorkflow', () => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we skipping this test in local deployment? 🤔 this might be outdated and we should probably re-enable streaming tests in local deployments

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already being done so this PR isn't changing it. I might try re-enabling in the next PR

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah just fly by noticed this

const startIndexCases = [
{
name: 'no startIndex (reads all chunks)',
startIndex: undefined,
expectedDefault: [
{ type: 'binary', value: 'Hello, world!' },
{ type: 'object', value: { foo: 'test' } },
],
expectedNamed: [
{ type: 'binary', value: 'Hello, named stream!' },
{ type: 'object', value: { foo: 'bar' } },
],
// Can stream in real-time without waiting for completion
waitForCompletion: false,
},
{
name: 'positive startIndex (skips first chunk)',
startIndex: 1,
expectedDefault: [{ type: 'object', value: { foo: 'test' } }],
expectedNamed: [{ type: 'object', value: { foo: 'bar' } }],
// Positive startIndex needs the stream written up to that point
waitForCompletion: true,
},
{
name: 'negative startIndex (reads from end)',
startIndex: -1,
expectedDefault: [{ type: 'object', value: { foo: 'test' } }],
expectedNamed: [{ type: 'object', value: { foo: 'bar' } }],
// Negative startIndex resolves at connection time using knownChunkCount,
// so the stream must be fully written before connecting the reader.
waitForCompletion: true,
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The e2e coverage for negative startIndex only tests -1 (last chunk). The unit tests in world-local/src/streamer.test.ts are more thorough (covering -2 and clamping with -100), but consider adding a clamping case to this data-driven loop too — e.g., startIndex: -100 expecting all chunks — so the full resolution path is exercised end-to-end across all worlds.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a broader test in the next PR

] as const;

for (const tc of startIndexCases) {
test(tc.name, { timeout: 60_000 }, async () => {
const run = await start(await e2e('outputStreamWorkflow'), []);

if (tc.waitForCompletion) {
await run.returnValue;
}

// Second chunk from named stream: JSON object
const r2Named = await namedReader.read();
assert(r2Named.value);
expect(r2Named.value).toEqual({ foo: 'bar' });
const reader = run
.getReadable({ startIndex: tc.startIndex })
.getReader();
const namedReader = run
.getReadable({ namespace: 'test', startIndex: tc.startIndex })
.getReader();

for (const expected of tc.expectedDefault) {
const { value } = await reader.read();
assert(value);
if (expected.type === 'binary') {
assert(value instanceof Uint8Array);
expect(Buffer.from(value).toString()).toEqual(expected.value);
} else {
expect(value).toEqual(expected.value);
}
}

// Streams should be closed
const r3 = await reader.read();
expect(r3.done).toBe(true);
// Default stream should be closed after expected chunks
expect((await reader.read()).done).toBe(true);

for (const expected of tc.expectedNamed) {
const { value } = await namedReader.read();
assert(value);
if (expected.type === 'binary') {
assert(value instanceof Uint8Array);
expect(Buffer.from(value).toString()).toEqual(expected.value);
} else {
expect(value).toEqual(expected.value);
}
}

const r3Named = await namedReader.read();
expect(r3Named.done).toBe(true);
// Named stream should be closed after expected chunks
expect((await namedReader.read()).done).toBe(true);

const returnValue = await run.returnValue;
expect(returnValue).toEqual('done');
const returnValue = await run.returnValue;
expect(returnValue).toEqual('done');
});
}
);
});

test.skipIf(isLocalDeployment())(
'outputStreamInsideStepWorkflow - getWritable() called inside step functions',
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/runtime/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export interface WorkflowReadableStreamOptions {
namespace?: string;
/**
* The index number of the starting chunk to begin reading the stream from.
* Negative values start from the end (e.g. -3 reads the last 3 chunks).
*/
startIndex?: number;
/**
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/runtime/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface StopSleepResult {
export interface ReadStreamOptions {
/**
* The index to start reading from. Defaults to 0.
* Negative values start from the end (e.g. -3 reads the last 3 chunks).
*/
startIndex?: number;
}
Expand Down
57 changes: 57 additions & 0 deletions packages/world-local/src/streamer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,63 @@ describe('streamer', () => {
// Should successfully read remaining chunks
expect(chunks.join('')).toBe('chunk2chunk3');
});

it('should support negative startIndex to read from the end', async () => {
const { streamer } = await setupStreamer();
const streamName = 'negative-index-stream';

// Write 4 chunks
await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk0');
await new Promise((resolve) => setTimeout(resolve, 2));
await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk1');
await new Promise((resolve) => setTimeout(resolve, 2));
await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk2');
await new Promise((resolve) => setTimeout(resolve, 2));
await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk3');
await streamer.closeStream(streamName, TEST_RUN_ID);

// Read with startIndex=-2 → last 2 chunks
const stream = await streamer.readFromStream(streamName, -2);
const reader = stream.getReader();

const chunks: string[] = [];
let done = false;
while (!done) {
const result = await reader.read();
done = result.done;
if (result.value) {
chunks.push(Buffer.from(result.value).toString());
}
}

expect(chunks.join('')).toBe('chunk2chunk3');
});

it('should clamp negative startIndex that exceeds chunk count to 0', async () => {
const { streamer } = await setupStreamer();
const streamName = 'negative-clamped-stream';

await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk0');
await new Promise((resolve) => setTimeout(resolve, 2));
await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk1');
await streamer.closeStream(streamName, TEST_RUN_ID);

// -100 exceeds total count, should clamp to 0 and return all chunks
const stream = await streamer.readFromStream(streamName, -100);
const reader = stream.getReader();

const chunks: string[] = [];
let done = false;
while (!done) {
const result = await reader.read();
done = result.done;
if (result.value) {
chunks.push(Buffer.from(result.value).toString());
}
}

expect(chunks.join('')).toBe('chunk0chunk1');
});
});

describe('integration scenarios', () => {
Expand Down
26 changes: 25 additions & 1 deletion packages/world-local/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,33 @@ export function createStreamer(basedir: string, tag?: string): Streamer {
.filter((file) => file.startsWith(`${name}-`))
.sort(); // ULID lexicographic sort = chronological order

// Resolve negative startIndex relative to the number of data chunks
// (excluding the trailing EOF marker chunk, if present).
let dataChunkCount = chunkFiles.length;
if (
typeof startIndex === 'number' &&
startIndex < 0 &&
chunkFiles.length > 0
) {
const lastFile = chunkFiles[chunkFiles.length - 1];
const lastExt = fileExtMap.get(lastFile) ?? '.bin';
// Note: this incurs an extra disk read to check the EOF marker.
// Acceptable since negative startIndex is not a hot path.
const lastChunk = deserializeChunk(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads and deserializes the last chunk file just to check eof. On the hot path for a stream with many chunks, this is an extra disk read every time a reader connects with a negative startIndex. Not blocking, but worth a comment noting the cost, or consider whether the EOF state could be tracked more cheaply (e.g., checking for a sentinel file).

await readBuffer(path.join(chunksDir, `${lastFile}${lastExt}`))
);
if (lastChunk?.eof === true) {
dataChunkCount--;
}
}
const resolvedStartIndex =
typeof startIndex === 'number' && startIndex < 0
? Math.max(0, dataChunkCount + startIndex)
: startIndex;

// Process existing chunks, skipping any already delivered via events
let isComplete = false;
for (let i = startIndex; i < chunkFiles.length; i++) {
for (let i = resolvedStartIndex; i < chunkFiles.length; i++) {
const file = chunkFiles[i];
// Extract chunk ID from filename: "streamName-chunkId" or "streamName-chunkId.tag"
const rawChunkId = file.substring(name.length + 1);
Expand Down
10 changes: 10 additions & 0 deletions packages/world-postgres/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ export function createStreamer(
.where(and(eq(streams.streamId, name)))
.orderBy(streams.chunkId);

// Resolve negative offset relative to the data chunk count
// (excluding the trailing EOF marker, if present)
if (typeof offset === 'number' && offset < 0) {
const dataCount =
chunks.length > 0 && chunks[chunks.length - 1].eof
? chunks.length - 1
: chunks.length;
offset = Math.max(0, dataCount + offset);
}

for (const chunk of [...chunks, ...(buffer ?? [])]) {
enqueue(chunk);
}
Expand Down
1 change: 1 addition & 0 deletions packages/world-vercel/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ function httpLog(
);
}
}

import {
ErrorType,
getSpanKind,
Expand Down
6 changes: 6 additions & 0 deletions packages/world/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ export interface Streamer {
): Promise<void>;

closeStream(name: string, runId: string): Promise<void>;
/**
* Read from a stream starting at the given chunk index.
* Positive values skip that many chunks from the start (0-based).
* Negative values start that many chunks before the current end
* (e.g. -3 on a 10-chunk stream starts at chunk 7). Clamped to 0.
*/
readFromStream(
name: string,
startIndex?: number
Expand Down
Loading