From 0d076e0cc415405689a0c71def56367ed4eda733 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 22 May 2026 15:58:12 +0200 Subject: [PATCH 1/2] [core] Share one pipe per step in getWritable() to fix chunk reordering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Repeat calls to `getWritable()` from the same step previously spawned independent TransformStream + flushablePipe pairs that all flushed to the same (runId, name) on the server. On Vercel the 50-100ms HTTP write latency turned the inter-pipe race into deterministic reordering — most visibly when callers acquired a fresh writer per chunk (e.g. an AI SDK text-delta loop). Locally the world-local filesystem path made the race invisible. Cache the writable + pipe state per (runId, namespace) in the step context so repeat calls share one serial sink. Each call still registers a per-call guard in ctx.ops so the step waits for every caller's writes to flush before completing, matching the prior multi-pipe semantics. Closes #2058 Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/getwritable-share-pipe.md | 6 + packages/core/src/flushable-stream.ts | 44 +++++ packages/core/src/step/context-storage.ts | 15 ++ .../core/src/step/writable-stream.test.ts | 184 ++++++++++++++---- packages/core/src/step/writable-stream.ts | 93 +++++---- 5 files changed, 269 insertions(+), 73 deletions(-) create mode 100644 .changeset/getwritable-share-pipe.md diff --git a/.changeset/getwritable-share-pipe.md b/.changeset/getwritable-share-pipe.md new file mode 100644 index 0000000000..a5ca52e9ae --- /dev/null +++ b/.changeset/getwritable-share-pipe.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": patch +"workflow": patch +--- + +Fix `getWritable()` returning a new TransformStream per call, which caused racing pipes to reorder chunks when callers acquired a writer per write. Repeat calls within the same step now share a single pipe per `(runId, namespace)`. diff --git a/packages/core/src/flushable-stream.ts b/packages/core/src/flushable-stream.ts index 9ab2045e86..a9a0f0165c 100644 --- a/packages/core/src/flushable-stream.ts +++ b/packages/core/src/flushable-stream.ts @@ -153,6 +153,50 @@ export function pollWritableLock( state.writablePollingInterval = intervalId; } +/** + * Like {@link pollWritableLock}, but watches an external "shared" pipe's + * state while resolving an independent `guard` state. + * + * Used when multiple callers share a single background pipe (e.g. repeat + * `getWritable()` calls within the same step) and each caller needs its own + * "I'm done" signal that fires after the lock is released and the shared + * pipe has drained, without ending the shared pipe. + */ +export function pollSharedWritableLock( + writable: WritableStream, + sharedState: FlushableStreamState, + guard: FlushableStreamState +): void { + if (guard.writablePollingInterval !== undefined) { + return; + } + + const intervalId = setInterval(() => { + if (guard.doneResolved) { + clearInterval(intervalId); + guard.writablePollingInterval = undefined; + return; + } + + if (sharedState.streamEnded) { + clearInterval(intervalId); + guard.writablePollingInterval = undefined; + guard.doneResolved = true; + sharedState.promise.then(guard.resolve, guard.reject); + return; + } + + if (isWritableUnlockedNotClosed(writable) && sharedState.pendingOps === 0) { + guard.doneResolved = true; + guard.resolve(); + clearInterval(intervalId); + guard.writablePollingInterval = undefined; + } + }, LOCK_POLL_INTERVAL_MS); + + guard.writablePollingInterval = intervalId; +} + /** * Polls a ReadableStream to check if the user has released their lock. * Resolves the done promise when lock is released and no pending ops remain. diff --git a/packages/core/src/step/context-storage.ts b/packages/core/src/step/context-storage.ts index 200925ea4b..88941d6ee3 100644 --- a/packages/core/src/step/context-storage.ts +++ b/packages/core/src/step/context-storage.ts @@ -1,14 +1,29 @@ import { AsyncLocalStorage } from 'node:async_hooks'; import type { CryptoKey } from '../encryption.js'; +import type { FlushableStreamState } from '../flushable-stream.js'; import type { WorkflowMetadata } from '../workflow/get-workflow-metadata.js'; import type { StepMetadata } from './get-step-metadata.js'; +/** + * Per-step cache entry for a `(runId, namespace)` writable stream. + * + * Holds the user-facing `WritableStream` and the shared `FlushableStreamState` + * driving the background pipe to the workflow server. Re-used so repeat calls + * to `getWritable()` within the same step return the same handle instead of + * spawning racing pipes — see https://github.com/vercel/workflow/issues/2058. + */ +export interface CachedWritable { + writable: WritableStream; + state: FlushableStreamState; +} + export type StepContext = { stepMetadata: StepMetadata; workflowMetadata: WorkflowMetadata; ops: Promise[]; closureVars?: Record; encryptionKey?: CryptoKey; + writables?: Map; }; /** diff --git a/packages/core/src/step/writable-stream.test.ts b/packages/core/src/step/writable-stream.test.ts index 01e83a1b53..352d82a32c 100644 --- a/packages/core/src/step/writable-stream.test.ts +++ b/packages/core/src/step/writable-stream.test.ts @@ -2,12 +2,46 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { LOCK_POLL_INTERVAL_MS } from '../flushable-stream.js'; import { setWorld } from '../runtime/world.js'; +// Captures every chunk written to `world.streams.write` / `writeMulti` +// in arrival order, so tests can assert the on-wire sequence after +// going through the (de)serialize transforms. +let writeCalls: Uint8Array[]; + +function makeStepCtx(): any { + return { + stepMetadata: { + stepName: 'test-step', + stepId: 'step_001', + stepStartedAt: new Date(), + attempt: 1, + }, + workflowMetadata: { + workflowName: 'test-workflow', + workflowRunId: 'wrun_test123', + workflowStartedAt: new Date(), + url: 'http://localhost:3000', + features: { encryption: false }, + }, + ops: [] as Promise[], + encryptionKey: undefined, + }; +} + describe('step-level getWritable', () => { beforeEach(() => { + writeCalls = []; const mockWorld = { streams: { - write: vi.fn().mockResolvedValue(undefined), - writeMulti: vi.fn().mockResolvedValue(undefined), + write: vi.fn( + async (_runId: string, _name: string, chunk: Uint8Array) => { + writeCalls.push(chunk); + } + ), + writeMulti: vi.fn( + async (_runId: string, _name: string, chunks: Uint8Array[]) => { + writeCalls.push(...chunks); + } + ), close: vi.fn().mockResolvedValue(undefined), }, }; @@ -23,24 +57,8 @@ describe('step-level getWritable', () => { it('ops promise should resolve when writer lock is released (without closing stream)', async () => { const { contextStorage } = await import('./context-storage.js'); - const ops: Promise[] = []; - const ctx = { - stepMetadata: { - stepName: 'test-step', - stepId: 'step_001', - stepStartedAt: new Date(), - attempt: 1, - }, - workflowMetadata: { - workflowName: 'test-workflow', - workflowRunId: 'wrun_test123', - workflowStartedAt: new Date(), - url: 'http://localhost:3000', - features: { encryption: false }, - }, - ops, - encryptionKey: undefined, - }; + const ctx = makeStepCtx(); + const ops = ctx.ops as Promise[]; const writable = await contextStorage.run(ctx, async () => { const { getWritable } = await import('./writable-stream.js'); @@ -71,24 +89,8 @@ describe('step-level getWritable', () => { it('ops promise should resolve when stream is explicitly closed', async () => { const { contextStorage } = await import('./context-storage.js'); - const ops: Promise[] = []; - const ctx = { - stepMetadata: { - stepName: 'test-step', - stepId: 'step_001', - stepStartedAt: new Date(), - attempt: 1, - }, - workflowMetadata: { - workflowName: 'test-workflow', - workflowRunId: 'wrun_test123', - workflowStartedAt: new Date(), - url: 'http://localhost:3000', - features: { encryption: false }, - }, - ops, - encryptionKey: undefined, - }; + const ctx = makeStepCtx(); + const ops = ctx.ops as Promise[]; const writable = await contextStorage.run(ctx, async () => { const { getWritable } = await import('./writable-stream.js'); @@ -111,4 +113,110 @@ describe('step-level getWritable', () => { ]) ).resolves.not.toThrow(); }); + + // Regression for https://github.com/vercel/workflow/issues/2058. + // Repeat calls to `getWritable()` from the same step previously spawned + // independent TransformStream + pipe pairs that all flushed to the same + // (runId, name). On world-vercel the 50-100ms HTTP write latency turned + // that race window into deterministic reordering; locally it was + // invisible. We now memoize per (runId, namespace) so a single serial + // sink is shared across calls. + it('returns the same writable for repeat calls with the same namespace', async () => { + const { contextStorage } = await import('./context-storage.js'); + const ctx = makeStepCtx(); + + const [a, b] = await contextStorage.run(ctx, async () => { + const { getWritable } = await import('./writable-stream.js'); + return [getWritable(), getWritable()] as const; + }); + + expect(a).toBe(b); + + // Different namespaces still get distinct writables. + const [c, d] = await contextStorage.run(ctx, async () => { + const { getWritable } = await import('./writable-stream.js'); + return [ + getWritable({ namespace: 'left' }), + getWritable({ namespace: 'right' }), + ] as const; + }); + + expect(c).not.toBe(d); + expect(c).not.toBe(a); + }); + + it('preserves chunk order across per-write getWritable() calls in a loop', async () => { + const { contextStorage } = await import('./context-storage.js'); + const { getDeserializeStream } = await import('../serialization.js'); + + const ctx = makeStepCtx(); + const ops = ctx.ops as Promise[]; + + // Repro of the user-reported pattern: acquire a fresh writer per chunk + // and release between writes. With the pre-fix per-call pipe, these + // chunks could land out of order on the server. + const chunks = ['nov', 'o', ' e', '2', 'e', ' ok']; + await contextStorage.run(ctx, async () => { + const { getWritable } = await import('./writable-stream.js'); + for (const chunk of chunks) { + const writer = getWritable().getWriter(); + try { + await writer.write(chunk); + } finally { + writer.releaseLock(); + } + } + }); + + // Wait for all pending writes to flush through the shared pipe. + await Promise.race([ + Promise.all(ops), + new Promise((_, r) => + setTimeout( + () => r(new Error('ops did not resolve')), + LOCK_POLL_INTERVAL_MS * 20 + 500 + ) + ), + ]); + + // Decode the recorded server writes via the matching deserialize + // stream and confirm chunks arrived in the order we wrote them. + const deserialize = getDeserializeStream({}, undefined); + const decoded: string[] = []; + const reader = deserialize.readable.getReader(); + const drain = (async () => { + while (true) { + const r = await reader.read(); + if (r.done) return; + decoded.push(r.value); + } + })(); + + const writer = deserialize.writable.getWriter(); + for (const buf of writeCalls) { + await writer.write(buf); + } + await writer.close(); + await drain; + + expect(decoded).toEqual(chunks); + }); + + it('does not push a single pipe state.promise for every call (per-call guards)', async () => { + const { contextStorage } = await import('./context-storage.js'); + + const ctx = makeStepCtx(); + const ops = ctx.ops as Promise[]; + + await contextStorage.run(ctx, async () => { + const { getWritable } = await import('./writable-stream.js'); + getWritable(); + getWritable(); + getWritable(); + }); + + // Each call registers its own guard so the step waits for every + // caller's writes to flush, even though the underlying pipe is shared. + expect(ops).toHaveLength(3); + }); }); diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index f81349242e..51a2c7abfc 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -2,7 +2,7 @@ import { throwNotInWorkflowOrStepContext } from '../context-errors.js'; import { createFlushableState, flushablePipe, - pollWritableLock, + pollSharedWritableLock, } from '../flushable-stream.js'; import { getExternalReducers, @@ -11,7 +11,7 @@ import { } from '../serialization.js'; import { STREAM_NAME_SYMBOL, STREAM_SERVER_RUN_ID_SYMBOL } from '../symbols.js'; import { getWorkflowRunStreamId } from '../util.js'; -import { contextStorage } from './context-storage.js'; +import { type CachedWritable, contextStorage } from './context-storage.js'; /** * The options for {@link getWritable}. @@ -50,42 +50,65 @@ export function getWritable( const runId = ctx.workflowMetadata.workflowRunId; const name = getWorkflowRunStreamId(runId, namespace); - // Create a transform stream that serializes chunks and pipes to the workflow server - const serialize = getSerializeStream( - getExternalReducers(globalThis, ctx.ops, runId, ctx.encryptionKey), - ctx.encryptionKey - ); + // Cache the writable per (runId, namespace) within the step context. + // + // The previous behavior — constructing a fresh TransformStream and + // background pipe on every call — produced non-deterministic chunk + // ordering when callers acquired a new writer per write (e.g. a + // per-chunk loop). Each pipe flushed to the same (runId, name) server + // stream independently, and on Vercel the 50-100ms HTTP latency + // turned the race window from microseconds into something prod-visible. + // + // Sharing a single TransformStream + pipe across calls makes the + // unsafe pattern correct: writes go through one serial sink in the + // order the user wrote them. See + // https://github.com/vercel/workflow/issues/2058. + const cache = (ctx.writables ??= new Map()); + let cached = cache.get(name); + if (!cached) { + const serialize = getSerializeStream( + getExternalReducers(globalThis, ctx.ops, runId, ctx.encryptionKey), + ctx.encryptionKey + ); + + const serverWritable = new WorkflowServerWritableStream(runId, name); + const state = createFlushableState(); - // Use flushable pipe so the ops promise resolves when the user releases - // their writer lock, not only when the stream is explicitly closed. - // Without this, Vercel functions hang until the runtime timeout because - // .pipeTo() only resolves on stream close. - const serverWritable = new WorkflowServerWritableStream(runId, name); - const state = createFlushableState(); - ctx.ops.push(state.promise); + flushablePipe(serialize.readable, serverWritable, state).catch(() => { + // Errors are handled via state.reject + }); - flushablePipe(serialize.readable, serverWritable, state).catch(() => { - // Errors are handled via state.reject - }); + // Tag the writable with its underlying `(runId, name)` so downstream + // reducers can recognize that it's already backed by a workflow + // server stream. Calling `start(child, [args, theWritable])` from + // the same step uses these tags to emit `{ name, runId }` in the + // dehydrated descriptor, so the child's reviver can open the + // writable against the original `(runId, name)` directly — no + // in-process bridge tied to this step's lifetime. + Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { + value: name, + writable: false, + }); + Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: runId, + writable: false, + }); - pollWritableLock(serialize.writable, state); + cached = { writable: serialize.writable, state }; + cache.set(name, cached); + } - // Tag the writable with its underlying `(runId, name)` so downstream - // reducers can recognize that it's already backed by a workflow - // server stream. Calling `start(child, [args, theWritable])` from - // the same step uses these tags to emit `{ name, runId }` in the - // dehydrated descriptor, so the child's reviver can open the - // writable against the original `(runId, name)` directly — no - // in-process bridge tied to this step's lifetime. - Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { - value: name, - writable: false, - }); - Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { - value: runId, - writable: false, - }); + // Each call registers its own guard in ctx.ops that resolves once the + // writer lock is released AND the shared pipe has drained. Using + // per-call guards preserves the original "step waits for every + // getWritable() caller's writes" semantics even though all calls now + // share one underlying pipe — previously each call pushed the pipe's + // own state.promise onto ctx.ops, so the step naturally awaited all + // outstanding writes; with the shared pipe its single state.promise + // would resolve on the first lock release and miss later writes. + const guard = createFlushableState(); + ctx.ops.push(guard.promise); + pollSharedWritableLock(cached.writable, cached.state, guard); - // Return the writable side of the transform stream - return serialize.writable; + return cached.writable as WritableStream; } From 40106be78b02eb9cf3589561ae1b6b9c04722ebd Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 22 May 2026 19:18:51 +0200 Subject: [PATCH 2/2] Simplify to plain memoization Drop the per-call guard mechanism and `pollSharedWritableLock` helper. With memoization, the cached pipe's single `state.promise` is sufficient: in the loop pattern `await writer.write(chunk)` already blocks until each chunk is flushed (the sink awaits the scheduled flush before resolving), so by the time the loop ends `pendingOps === 0` and the final `releaseLock()` lets `pollWritableLock` resolve the state. One writable, one pipe, one ops entry per `(runId, namespace)`. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/core/src/flushable-stream.ts | 44 ---------- .../core/src/step/writable-stream.test.ts | 8 +- packages/core/src/step/writable-stream.ts | 80 +++++++++---------- 3 files changed, 42 insertions(+), 90 deletions(-) diff --git a/packages/core/src/flushable-stream.ts b/packages/core/src/flushable-stream.ts index a9a0f0165c..9ab2045e86 100644 --- a/packages/core/src/flushable-stream.ts +++ b/packages/core/src/flushable-stream.ts @@ -153,50 +153,6 @@ export function pollWritableLock( state.writablePollingInterval = intervalId; } -/** - * Like {@link pollWritableLock}, but watches an external "shared" pipe's - * state while resolving an independent `guard` state. - * - * Used when multiple callers share a single background pipe (e.g. repeat - * `getWritable()` calls within the same step) and each caller needs its own - * "I'm done" signal that fires after the lock is released and the shared - * pipe has drained, without ending the shared pipe. - */ -export function pollSharedWritableLock( - writable: WritableStream, - sharedState: FlushableStreamState, - guard: FlushableStreamState -): void { - if (guard.writablePollingInterval !== undefined) { - return; - } - - const intervalId = setInterval(() => { - if (guard.doneResolved) { - clearInterval(intervalId); - guard.writablePollingInterval = undefined; - return; - } - - if (sharedState.streamEnded) { - clearInterval(intervalId); - guard.writablePollingInterval = undefined; - guard.doneResolved = true; - sharedState.promise.then(guard.resolve, guard.reject); - return; - } - - if (isWritableUnlockedNotClosed(writable) && sharedState.pendingOps === 0) { - guard.doneResolved = true; - guard.resolve(); - clearInterval(intervalId); - guard.writablePollingInterval = undefined; - } - }, LOCK_POLL_INTERVAL_MS); - - guard.writablePollingInterval = intervalId; -} - /** * Polls a ReadableStream to check if the user has released their lock. * Resolves the done promise when lock is released and no pending ops remain. diff --git a/packages/core/src/step/writable-stream.test.ts b/packages/core/src/step/writable-stream.test.ts index 352d82a32c..ac80d675a4 100644 --- a/packages/core/src/step/writable-stream.test.ts +++ b/packages/core/src/step/writable-stream.test.ts @@ -202,7 +202,7 @@ describe('step-level getWritable', () => { expect(decoded).toEqual(chunks); }); - it('does not push a single pipe state.promise for every call (per-call guards)', async () => { + it('registers exactly one pipe per (runId, namespace), regardless of call count', async () => { const { contextStorage } = await import('./context-storage.js'); const ctx = makeStepCtx(); @@ -213,10 +213,10 @@ describe('step-level getWritable', () => { getWritable(); getWritable(); getWritable(); + // A distinct namespace gets its own pipe. + getWritable({ namespace: 'other' }); }); - // Each call registers its own guard so the step waits for every - // caller's writes to flush, even though the underlying pipe is shared. - expect(ops).toHaveLength(3); + expect(ops).toHaveLength(2); }); }); diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index 51a2c7abfc..99865b93fa 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -2,7 +2,7 @@ import { throwNotInWorkflowOrStepContext } from '../context-errors.js'; import { createFlushableState, flushablePipe, - pollSharedWritableLock, + pollWritableLock, } from '../flushable-stream.js'; import { getExternalReducers, @@ -64,51 +64,47 @@ export function getWritable( // order the user wrote them. See // https://github.com/vercel/workflow/issues/2058. const cache = (ctx.writables ??= new Map()); - let cached = cache.get(name); - if (!cached) { - const serialize = getSerializeStream( - getExternalReducers(globalThis, ctx.ops, runId, ctx.encryptionKey), - ctx.encryptionKey - ); + const cached = cache.get(name); + if (cached) { + return cached.writable as WritableStream; + } - const serverWritable = new WorkflowServerWritableStream(runId, name); - const state = createFlushableState(); + const serialize = getSerializeStream( + getExternalReducers(globalThis, ctx.ops, runId, ctx.encryptionKey), + ctx.encryptionKey + ); - flushablePipe(serialize.readable, serverWritable, state).catch(() => { - // Errors are handled via state.reject - }); + // Use flushable pipe so the ops promise resolves when the user releases + // their writer lock, not only when the stream is explicitly closed. + // Without this, Vercel functions hang until the runtime timeout because + // .pipeTo() only resolves on stream close. + const serverWritable = new WorkflowServerWritableStream(runId, name); + const state = createFlushableState(); + ctx.ops.push(state.promise); - // Tag the writable with its underlying `(runId, name)` so downstream - // reducers can recognize that it's already backed by a workflow - // server stream. Calling `start(child, [args, theWritable])` from - // the same step uses these tags to emit `{ name, runId }` in the - // dehydrated descriptor, so the child's reviver can open the - // writable against the original `(runId, name)` directly — no - // in-process bridge tied to this step's lifetime. - Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { - value: name, - writable: false, - }); - Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { - value: runId, - writable: false, - }); + flushablePipe(serialize.readable, serverWritable, state).catch(() => { + // Errors are handled via state.reject + }); - cached = { writable: serialize.writable, state }; - cache.set(name, cached); - } + pollWritableLock(serialize.writable, state); + + // Tag the writable with its underlying `(runId, name)` so downstream + // reducers can recognize that it's already backed by a workflow + // server stream. Calling `start(child, [args, theWritable])` from + // the same step uses these tags to emit `{ name, runId }` in the + // dehydrated descriptor, so the child's reviver can open the + // writable against the original `(runId, name)` directly — no + // in-process bridge tied to this step's lifetime. + Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { + value: name, + writable: false, + }); + Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: runId, + writable: false, + }); - // Each call registers its own guard in ctx.ops that resolves once the - // writer lock is released AND the shared pipe has drained. Using - // per-call guards preserves the original "step waits for every - // getWritable() caller's writes" semantics even though all calls now - // share one underlying pipe — previously each call pushed the pipe's - // own state.promise onto ctx.ops, so the step naturally awaited all - // outstanding writes; with the shared pipe its single state.promise - // would resolve on the first lock release and miss later writes. - const guard = createFlushableState(); - ctx.ops.push(guard.promise); - pollSharedWritableLock(cached.writable, cached.state, guard); + cache.set(name, { writable: serialize.writable, state }); - return cached.writable as WritableStream; + return serialize.writable as WritableStream; }