[world] Restructure stream interface, require run ID for all step and stream operations#1293
Conversation
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
… operations in namespace Signed-off-by: Peter Wielander <mittgfu@gmail.com>
🦋 Changeset detectedLatest commit: 2c89b17 The changes in this PR will be included in the next version bump. This PR includes changesets to release 21 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📊 Benchmark Results
workflow with no steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 1 step💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 10 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 25 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro workflow with 50 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) Promise.all with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro Promise.all with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) Promise.all with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro Promise.race with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro Promise.race with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro Promise.race with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro workflow with 10 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro workflow with 25 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 50 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 10 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 25 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro workflow with 50 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) stream pipeline with 5 transform steps (1MB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) 10 parallel streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) fan-out fan-in 10 streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
❌ Some benchmark jobs failed:
Check the workflow run for details. |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests🌍 Community Worlds (73 failed)mongodb (7 failed):
redis (7 failed):
turso (59 failed):
Details by Category✅ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
✅ 📋 Other
|
# Conflicts: # packages/world-local/src/storage/steps-storage.ts # packages/world-local/src/streamer.ts # packages/world-vercel/src/steps.ts # packages/world-vercel/src/streamer.ts
…hods Merge main into peter/step-optional, resolving conflicts in the Streamer interface, world-local streamer, world-postgres streamer, and writable stream tests. Incorporates negative startIndex support from main (#1460) and pendingOps flush behavior (#1446) into the namespaced streams API. Additionally swaps argument order so runId is always the first parameter for all stream methods: write, writeMulti, close, get, and list. Updates all implementations (local, postgres, vercel), callers (serialization, runtime, CLI, web), tests, and documentation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Here's my review of PR #1293:
Summary
This PR restructures the Streamer interface on the World type, moving flat methods (writeToStream, closeStream, readFromStream, listStreamsByRunId) into a namespaced world.streams.* object (streams.write, streams.close, streams.get, streams.list). It also swaps argument order to put runId first everywhere and makes runId required (non-optional) for steps.get.
Potential Regressions
1. readStreamServerAction signature change not fully propagated
The function signature changed to accept an optional runId as a 4th parameter:
export async function readStreamServerAction(
env: EnvMap,
streamId: string,
startIndex?: number,
runId?: stringBut the route caller at packages/web/app/routes/api.stream.$streamId.tsx (line 36) still calls readStreamServerAction({}, streamId, startIndex) without passing runId. This works today because all world implementations ignore _runId in their get method and it defaults to '', but it's fragile and defeats the purpose of requiring runId.
2. CLI showStream passes empty string as runId fallback
const rawStream = await world.streams.get(opts.runId ?? '', streamId);If --run is not provided, opts.runId is undefined and an empty string is used. This works only because no implementation validates runId in get, but it's a landmine for future implementations that might.
3. readStream in runs.ts public API signature changed
The exported readStream function gained a new required runId parameter between world and streamId. Any external consumers importing this from @workflow/core will break at compile time. Since this is marked as a breaking change in the changeset, this may be intentional, but it's worth verifying no external consumers exist.
4. Community/third-party world implementations will break
The Streamer interface changed from flat methods to a nested streams namespace. Any community world implementing the old interface (e.g., Turso, MongoDB, Redis worlds shown in CI) will need updating. The E2E results confirm this: the Turso community world has 54 failures.
Non-Blocking Issues
- Changeset uses "patch" for what's explicitly labeled a
**BREAKING CHANGE**. Per repo conventions all changes use patch, but consumers should be aware. - The
Worldinterface type ordering changed fromextends Queue, Storage, Streamertoextends Queue, Streamer, Storage. This shouldn't have runtime impact but is worth noting. - Documentation at
docs/content/docs/deploying/building-a-world.mdxwas correctly updated, but the API reference docs (get-writable.mdx,defining-tools.mdx) reference workflow user-land functions (not world methods) so those are fine.
Combine branch's streams sub-object restructure with main's new features (getStreamChunks/getStreamInfo → getChunks/getInfo, streamFlushIntervalMs, writeMulti pagination, pg driver change). Update all call sites, tests, and documentation to match. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…r required runId WorkflowServerWritableStream constructor was changed from (name, runId) to (runId, name), but step/writable-stream.ts was not updated, causing streams written inside steps to target the wrong stream ID and timing out the outputStreamInsideStepWorkflow e2e test. Also update the "should retrieve a step with only stepId" unit test to pass the now-required runId. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The webhook handler (resumeWebhook) needs to read hook metadata to determine the respondWith behavior. However, the webhook Lambda may not have the deployment encryption key available, causing metadata hydration to fail with "Encrypted stream data encountered but no encryption key is available". Fix by passing undefined instead of the encryption key when serializing hook metadata in the suspension handler. Hook metadata is small (just respondWith config) and doesn't need encryption. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The package version was reset from 4.2.x-beta to 4.0.0 for the v5
beta release. The encryption format capability check used 4.2.0-beta.64
as the minimum version, causing getRunCapabilities("4.0.0") to report
encryption as unsupported. This made resumeHook strip the encryption
key, while the step handler still encrypted data — causing "Encrypted
stream data encountered but no encryption key is available" errors in
the webhook handler.
Fix by lowering the minVersion to 4.0.0 to cover the reset range.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…anges
The PR branches had version 4.0.0 (intermediate changeset reset) instead
of 5.0.0-beta.0 (the actual published version). This caused
getRunCapabilities("4.0.0") to report encryption as unsupported, breaking
the webhook respondWith flow on Vercel Prod deployments.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
TooTallNate
left a comment
There was a problem hiding this comment.
The restructuring itself is well-executed — the world.streams.* namespace is cleaner, runId-first parameter order is consistent across all 7 methods, and all callers in core, CLI, web, e2e tests, and all three world implementations have been updated. However, there are several issues to address before this can merge.
Meta
The PR has no description. For a breaking change of this scope (28 files, restructuring a public interface used by community worlds), please add a summary explaining:
- What changed and why (namespace restructure, runId-first ordering, steps.get requiring runId)
- Migration path for community world implementors
- What callers need to update
Blocking (3)
-
Health check
streams.getuses a freshly generatedrunIdthat doesn't match the write-siderunId—handleHealthCheckMessagewrites withgenerateHealthCheckRunId(), but the reader inhealthCheck()callsgenerateHealthCheckRunId()again, producing a different ID. This works today because all three world implementations ignorerunIdfor stream reads, but it violates the interface contract and will break any backend that scopes reads byrunId. The reader should either use the samerunIdas the writer (requires coordination, e.g. deriving it deterministically fromcorrelationId) or the interface should explicitly document thatrunIdis advisory for reads. -
readStreamServerActionand CLIshowStreampassrunId ?? ''(empty string) — Bothpackages/web/app/server/workflow-server-actions.server.tsandpackages/cli/src/lib/inspect/output.tsuserunId ?? ''as a fallback whenrunIdis not provided. An empty string for a requiredrunIdparameter will produce malformed API URLs (/v2/runs//stream/...) or incorrect DB queries. IfrunIdis now required by the interface, these callers should also require it (or throw a clear error for missingrunId). -
Changeset uses
patchfor a breaking change — should bemajor— AGENTS.md (line 183): "Use the correct semver bump type:patchfor bug fixes,minorfor new features,majorfor breaking changes." The changeset description already says**BREAKING CHANGE**(correct), but the bump type ispatch. While onmain(pre-release mode) the bump type doesn't affect beta numbering, it does matter when backported tostable(AGENTS.md line 184). Both changesets should usemajorfor all affected packages.
Non-blocking (4)
-
world-vercel
streams.getignoresrunId— The implementation has_runId(unused) and passesundefinedtogetStreamUrl. This preserves existing behavior but creates an asymmetry: writes go to/v2/runs/{runId}/stream/{name}but reads go to/v2/stream/{name}. Worth a comment explaining why. -
WorkflowServerReadableStreamdoesn't validaterunId— The constructor validatesname(non-empty string) but notrunId, unlikeWorkflowServerWritableStreamwhich validates both. Add arunIdcheck for parity. -
steps.getchangeset (step-run-required.md) doesn't include@workflow/core,@workflow/cli, or@workflow/web— If any callers in those packages changed behavior (e.g. now always passingrunIdwhere they previously passedundefined), those packages should be in the changeset too. -
Community worlds will break at compile time — Expected and documented as breaking, but worth calling out explicitly: any JS (non-TS) consumer won't get a compile error, they'll get a runtime
TypeError: world.writeToStream is not a function. The migration path should be noted in the PR description.
- Derive health check runId deterministically from correlationId so writer and reader produce the same value - Make runId required in readStreamServerAction (drop optional fallback) - Throw explicit error in CLI showStream when --run is missing - Bump changesets to major for breaking changes - Add missing packages to step-run-required changeset Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
TooTallNate
left a comment
There was a problem hiding this comment.
All three blocking issues from my previous review are resolved in 2c89b170:
-
Changesets
patch→major— Bothbright-pears-drum.mdandstep-run-required.mdnow usemajorfor all affected packages. The second changeset also added the missing@workflow/core,@workflow/cli, and@workflow/webpackages. -
Empty-string
runIdfallbacks — CLI now throws'--run is required when showing a stream'instead of passing''. WebreadStreamServerActionmakesrunId: stringrequired (no longer optional). -
Health check
runIdmismatch —generateHealthCheckRunIdis now deterministic:wrun_hc_${correlationId}. Both the writer (handleHealthCheckMessage) and reader (healthCheck) derive the same runId from the sharedcorrelationId. Clean fix.
The PR description is also present now (was missing before).
LGTM.
No description provided.