[core] Support negative startIndex for streaming#1460
Conversation
🦋 Changeset detectedLatest commit: 7a2ee25 The changes in this PR will be included in the next version bump. This PR includes changesets to release 20 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📊 Benchmark Results
workflow with no steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 1 step💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 10 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 25 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 50 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) Promise.all with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) Promise.all with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) Promise.all with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) Promise.race with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) Promise.race with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) Promise.race with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 10 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 25 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 50 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 10 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 25 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) workflow with 50 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) stream pipeline with 5 transform steps (1MB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) 10 parallel streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) fan-out fan-in 10 streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
❌ Some benchmark jobs failed:
Check the workflow run for details. |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests▲ Vercel Production (1 failed)example (1 failed):
🌍 Community Worlds (58 failed)mongodb (3 failed):
redis (2 failed):
turso (53 failed):
Details by Category❌ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
✅ 📋 Other
❌ Some E2E test jobs failed:
Check the workflow run for details. |
Negative startIndex values (e.g. -3) resolve to n chunks before the known end of the stream. All world implementations (local, postgres, vercel) support this. Includes unit tests, e2e test, and doc updates. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
dbbf02a to
40ba940
Compare
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
pranaygp
left a comment
There was a problem hiding this comment.
Overall this is a clean, well-tested feature. The implementation is consistent across worlds and the clamping behavior is solid. A few comments inline — the main concern is the docs could better call out the live-stream caveat, and the skill file could mention negative startIndex.
| ``` | ||
|
|
||
| The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. | ||
| The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. Negative values are also supported (e.g. `-5` starts 5 chunks before the end), which is useful for showing the most recent output without replaying the full stream. |
There was a problem hiding this comment.
This sentence mentions negative values as useful for "showing the most recent output without replaying the full stream" — but in the context of WorkflowChatTransport, the transport itself never uses negative startIndex (it tracks chunkIndex incrementally). The mention here might confuse readers into thinking the transport automatically leverages this.
Consider clarifying that this is useful for custom stream consumption (e.g., a dashboard showing recent output), not something WorkflowChatTransport does internally.
|
|
||
| This allows clients to reconnect and continue receiving data from where they left off, rather than restarting from the beginning. | ||
|
|
||
| `startIndex` also supports **negative values** to read relative to the end of the stream. For example, `startIndex: -5` starts 5 chunks before the current end. This is useful when you want to show the most recent output without reading the entire stream history: |
There was a problem hiding this comment.
Worth noting that on a live (not-yet-closed) stream, the negative startIndex resolves at connection time — so -5 means "5 before the current end at the time of connecting", and any chunks written afterward will still be delivered. This is an important subtlety the e2e test handles (via waitForCompletion: true) but the docs do not mention.
A short note like: "On an active stream, the index resolves relative to the chunk count at connection time; subsequent chunks are still delivered." would prevent misunderstanding.
| // Resolve negative startIndex relative to the number of data chunks | ||
| // (excluding the trailing EOF marker chunk, if present). | ||
| let dataChunkCount = chunkFiles.length; | ||
| if (startIndex < 0 && chunkFiles.length > 0) { |
There was a problem hiding this comment.
Nit: startIndex is typed as number | undefined (from the startIndex?: number parameter). The startIndex < 0 check works because undefined < 0 is false in JS, but it reads a bit fragile. Consider an explicit typeof startIndex === 'number' && startIndex < 0 guard to match the style used in world-vercel/src/streamer.ts:127.
| if (startIndex < 0 && chunkFiles.length > 0) { | ||
| const lastFile = chunkFiles[chunkFiles.length - 1]; | ||
| const lastExt = fileExtMap.get(lastFile) ?? '.bin'; | ||
| const lastChunk = deserializeChunk( |
There was a problem hiding this comment.
This reads and deserializes the last chunk file just to check eof. On the hot path for a stream with many chunks, this is an extra disk read every time a reader connects with a negative startIndex. Not blocking, but worth a comment noting the cost, or consider whether the EOF state could be tracked more cheaply (e.g., checking for a sentinel file).
|
|
||
| // Resolve negative offset relative to the data chunk count | ||
| // (excluding the trailing EOF marker, if present) | ||
| if (offset < 0) { |
There was a problem hiding this comment.
Same nit as the local streamer: offset starts as startIndex ?? 0, so it can never be undefined here — but an explicit typeof guard would make the intent clearer at a glance.
| {/* @skip-typecheck: incomplete code sample */} | ||
| ```typescript | ||
| // Read only the last 10 chunks | ||
| const stream = run.getReadable({ startIndex: -10 }); |
There was a problem hiding this comment.
should we add some note/callout to be careful about paginating on this. Because streams are live and will continue enqueueing chunks, relative numbers will map to different absolute chunk "IDs" on subsequent calls. I think accurate pagination is only possible when cursor based, and streams don't support cursor based pagination yet so clients should account for that limitation
There was a problem hiding this comment.
It's not a problem when using run.getReadable directly, but it is a big problem for resumption, i.e. WorkflowChatTransport. I'm working on this in a follow-up PR together with a metadata/tail endpoint, and cursor-based pagination. I'll add a comment in this PR to mention the limitation
| // and 2 chunks to the "test" named stream: | ||
| // chunk 0: binary "Hello, named stream!" | ||
| // chunk 1: object { foo: 'bar' } | ||
| describe.skipIf(isLocalDeployment())('outputStreamWorkflow', () => { |
There was a problem hiding this comment.
why are we skipping this test in local deployment? 🤔 this might be outdated and we should probably re-enable streaming tests in local deployments
There was a problem hiding this comment.
It's already being done so this PR isn't changing it. I might try re-enabling in the next PR
There was a problem hiding this comment.
yeah just fly by noticed this
| // Negative startIndex resolves at connection time using knownChunkCount, | ||
| // so the stream must be fully written before connecting the reader. | ||
| waitForCompletion: true, | ||
| }, |
There was a problem hiding this comment.
The e2e coverage for negative startIndex only tests -1 (last chunk). The unit tests in world-local/src/streamer.test.ts are more thorough (covering -2 and clamping with -100), but consider adding a clamping case to this data-driven loop too — e.g., startIndex: -100 expecting all chunks — so the full resolution path is exercised end-to-end across all worlds.
There was a problem hiding this comment.
Will add a broader test in the next PR
- Clarify that negative startIndex is for custom consumers, not WorkflowChatTransport - Add live-stream caveat for negative startIndex resolution timing - Add pagination limitation callout for live streams - Use explicit typeof guards for negative startIndex checks (world-local, world-postgres) - Add cost comment for EOF marker disk read in world-local Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Pranay Prakash <pranay.gp@gmail.com> Signed-off-by: Peter Wielander <mittgfu@gmail.com>
Negative startIndex values (e.g. -3) resolve to n chunks before the known end of the stream. All world implementations (local, postgres, vercel) support this. Includes unit tests, e2e test, and doc updates.