[core] Memoize getWritable() to fix chunk reordering when called repeatedly#2086
Conversation
Repeat calls to `getWritable()` from the same step previously spawned independent TransformStream + flushablePipe pairs that all flushed to the same (runId, name) on the server. On Vercel the 50-100ms HTTP write latency turned the inter-pipe race into deterministic reordering — most visibly when callers acquired a fresh writer per chunk (e.g. an AI SDK text-delta loop). Locally the world-local filesystem path made the race invisible. Cache the writable + pipe state per (runId, namespace) in the step context so repeat calls share one serial sink. Each call still registers a per-call guard in ctx.ops so the step waits for every caller's writes to flush before completing, matching the prior multi-pipe semantics. Closes #2058 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
🦋 Changeset detectedLatest commit: 40106be The changes in this PR will be included in the next version bump. This PR includes changesets to release 16 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results✅ All tests passed Summary
Details by Category✅ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
✅ 📋 Other
|
📊 Benchmark Results
workflow with no steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro workflow with 1 step💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro workflow with 10 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 25 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 50 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) Promise.all with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro Promise.all with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro Promise.all with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro Promise.race with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) Promise.race with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) Promise.race with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 10 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 25 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 50 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 10 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 25 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) workflow with 50 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro stream pipeline with 5 transform steps (1MB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro 10 parallel streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro fan-out fan-in 10 streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
❌ Some benchmark jobs failed:
Check the workflow run for details. |
TooTallNate
left a comment
There was a problem hiding this comment.
Should we memoize the getWritable() return value instead? Seems simpler to me.
|
@TooTallNate doing now |
Drop the per-call guard mechanism and `pollSharedWritableLock` helper. With memoization, the cached pipe's single `state.promise` is sufficient: in the loop pattern `await writer.write(chunk)` already blocks until each chunk is flushed (the sink awaits the scheduled flush before resolving), so by the time the loop ends `pendingOps === 0` and the final `releaseLock()` lets `pollWritableLock` resolve the state. One writable, one pipe, one ops entry per `(runId, namespace)`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
getWritable() to fix chunk reordering when called repeatedly
TooTallNate
left a comment
There was a problem hiding this comment.
Approve — clean fix, my earlier "memoize the return value" suggestion landed in 40106be78
The bug is real and the fix matches the issue exactly: caching serialize.writable per (runId, namespace) on the step context so repeat calls within the same step go through one serial sink instead of spawning racing pipes that fight for the same (runId, name) server stream.
What I verified
pnpm install --frozen-lockfile✓pnpm turbo run build --filter @workflow/core✓pnpm --filter @workflow/core test✓ (1013/1013)cd packages/core && pnpm exec vitest run src/step/writable-stream.test.ts✓ (5/5 — 2 existing + 3 new)- CI: 65 E2E jobs pass, Unit Tests pass, Windows Tests pass, single
Benchmark Vercel (express)flake unrelated to this change
Test coverage looks great
The three new tests pin the contract from three angles:
returns the same writable for repeat calls with the same namespace— direct identity assertion (and distinct namespaces still get distinct writables)preserves chunk order across per-write getWritable() calls in a loop— exact repro of the issue's'nov', 'o', ' e', '2', 'e', ' ok'scenario, decoded through the matching deserialize streamregisters exactly one pipe per (runId, namespace), regardless of call count— assertsops.length === 2(one shared + one for the distinct namespace)
The chunk-order test is particularly good because it goes through the full serialize → record → deserialize round trip rather than asserting against mock internals.
Per-call ctx.ops guards — no longer needed
The simplification from 0d076e0c → 40106be78 correctly drops per-caller ops registration. Since all callers now share one state.promise, that one promise already covers every writer that holds the lock — when any writer has the lock the pipe is active, when none do the state resolves. The "wait for every caller's writes to drain" semantics fall out for free.
One subtle behavior change worth calling out
A user who does getWritable().close() and then calls getWritable() again will now get back the same closed writable instead of a fresh one. Pre-PR, the second call would have created a new writable they could continue writing to. Post-PR, that second call returns a closed handle.
I don't think anyone is actually depending on this — the issue describes the bug as users not realizing re-acquisition was unsafe, so "close then re-acquire to keep writing" is unlikely to be a real pattern. The AI SDK / per-chunk-loop case (which IS the motivating example) only does releaseLock(), not close(). So this should be fine in practice. Worth mentioning in case anyone hits it post-release.
Branch hygiene note (not a blocker)
The branch is forked from a point before #2091 landed, so the GitHub diff against current main shows misleading "deletions" of .changeset/changelog.mjs, .changeset/config.json's changelog generator reference, and 5 other unrelated changesets. Those are all artifacts of being stale — the PR's actual tree doesn't have those files at all (because they were added on main after the branch point), so a fresh sync with main would make the diff display correctly. Squash-merge will produce a clean single commit regardless, so this doesn't affect what lands. Just flagging because the github UI looks scary at first glance.
Approving.
Fixes #2058, where calling
getWritablemultiple times in the same function causes chunk re-ordering without any user-facing warnings. I'm not sure if this is the right approach or we should just throw an error or warn.The issue
Each call to
getWritable()constructed a freshTransformStream+WorkflowServerWritableStream+ backgroundflushablePipe(...), all flushing to the same(runId, name). On Vercel the 50-100ms HTTP write latency turned the inter-pipe race into deterministic reordering — most visibly when callers acquired a fresh writer per chunk (e.g. an AI SDK text-delta loop). Locallyworld-localmade it look fine because filesystem writes are effectively instant.The fix
(runId, namespace)within a step context so repeat calls togetWritable()no longer spawn racing pipes.ctx.opsso the step still waits for every caller's writes to drain.