From 531687d2a5454f9b18f753041466152f0b1f7b56 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 3 Mar 2026 12:51:07 -0800 Subject: [PATCH 1/4] fix(core): ensure async deserialization resolves promises in event log order Add a sequential deserializationChain to WorkflowOrchestratorContext that ensures step/hook hydration resolves in event log order even when individual deserialization takes variable time (e.g. due to encryption/decryption). Without this, concurrent async hydrations could resolve out of order, breaking workflow replay determinism. --- .changeset/fix-deserialization-ordering.md | 5 + .../async-deserialization-ordering.test.ts | 203 ++++++++++++++++++ packages/core/src/private.ts | 7 + packages/core/src/step.test.ts | 1 + packages/core/src/step.ts | 14 +- packages/core/src/workflow.ts | 1 + packages/core/src/workflow/hook.test.ts | 1 + packages/core/src/workflow/hook.ts | 54 +++-- packages/core/src/workflow/sleep.test.ts | 1 + 9 files changed, 257 insertions(+), 30 deletions(-) create mode 100644 .changeset/fix-deserialization-ordering.md create mode 100644 packages/core/src/async-deserialization-ordering.test.ts diff --git a/.changeset/fix-deserialization-ordering.md b/.changeset/fix-deserialization-ordering.md new file mode 100644 index 0000000000..c2bf309ccd --- /dev/null +++ b/.changeset/fix-deserialization-ordering.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Ensure async deserialization resolves step/hook promises in event log order diff --git a/packages/core/src/async-deserialization-ordering.test.ts b/packages/core/src/async-deserialization-ordering.test.ts new file mode 100644 index 0000000000..dbd218fa6c --- /dev/null +++ b/packages/core/src/async-deserialization-ordering.test.ts @@ -0,0 +1,203 @@ +import type { Event } from '@workflow/world'; +import * as nanoid from 'nanoid'; +import { monotonicFactory } from 'ulid'; +import { describe, expect, it, vi } from 'vitest'; +import { EventsConsumer } from './events-consumer.js'; +import type { WorkflowOrchestratorContext } from './private.js'; +import { dehydrateStepReturnValue } from './serialization.js'; +import { createUseStep } from './step.js'; +import { createContext } from './vm/index.js'; + +/** + * These tests verify that when `hydrateStepReturnValue` performs real async + * work (e.g., decryption), the promise resolution order of step results + * remains deterministic — matching the order of events in the event log. + * + * Without a fix, if step A's deserialization takes longer than step B's, + * step B's promise would resolve first, breaking workflow determinism. + */ + +// Helper to setup context to simulate a workflow run +function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { + const context = createContext({ + seed: 'test', + fixedTimestamp: 1753481739458, + }); + const ulid = monotonicFactory(() => context.globalThis.Math.random()); + const workflowStartedAt = context.globalThis.Date.now(); + return { + runId: 'wrun_test', + encryptionKey: undefined, + globalThis: context.globalThis, + eventsConsumer: new EventsConsumer(events, { + onUnconsumedEvent: () => {}, + }), + invocationsQueue: new Map(), + generateUlid: () => ulid(workflowStartedAt), + generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) => + new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) + ), + onWorkflowError: vi.fn(), + deserializationChain: Promise.resolve(), + }; +} + +describe('async deserialization ordering', () => { + it('should resolve step promises in event log order even when deserialization takes variable time', async () => { + // Create two step_completed events with real serialized data. + // We will mock hydrateStepReturnValue to simulate variable async delays. + const resultA = await dehydrateStepReturnValue( + 'result_A', + 'wrun_test', + undefined + ); + const resultB = await dehydrateStepReturnValue( + 'result_B', + 'wrun_test', + undefined + ); + + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { result: resultA }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW', + eventData: { result: resultB }, + createdAt: new Date(), + }, + ]); + + // Mock hydrateStepReturnValue to simulate variable async delay. + // Step A (first event) takes 50ms, Step B (second event) takes 5ms. + // Without ordering guarantees, Step B would resolve before Step A. + const serialization = await import('./serialization.js'); + const originalHydrate = serialization.hydrateStepReturnValue; + let callCount = 0; + const hydrateStub = vi + .spyOn(serialization, 'hydrateStepReturnValue') + .mockImplementation(async (...args) => { + callCount++; + const thisCall = callCount; + // First call (step A): slow. Second call (step B): fast. + const delay = thisCall === 1 ? 50 : 5; + await new Promise((resolve) => setTimeout(resolve, delay)); + return originalHydrate(...args); + }); + + const useStep = createUseStep(ctx); + const stepA = useStep('stepA'); + const stepB = useStep('stepB'); + + // Call both steps — their events will be consumed in order from the event log. + const promiseA = stepA(); + const promiseB = stepB(); + + // Track the order that promises resolve + const resolveOrder: string[] = []; + promiseA.then((val) => resolveOrder.push(`A:${val}`)); + promiseB.then((val) => resolveOrder.push(`B:${val}`)); + + // Wait for both to resolve + const [valA, valB] = await Promise.all([promiseA, promiseB]); + + // Values should be correct regardless + expect(valA).toBe('result_A'); + expect(valB).toBe('result_B'); + + // The critical assertion: promises must resolve in event log order (A before B), + // even though A's deserialization is slower than B's. + expect(resolveOrder).toEqual(['A:result_A', 'B:result_B']); + + hydrateStub.mockRestore(); + }); + + it('should resolve sequential step promises in order with variable async delays', async () => { + // This simulates a workflow that does: const a = await stepA(); const b = await stepB(a); + // Here three steps complete in sequence, each with decreasing deserialization time. + const results = await Promise.all([ + dehydrateStepReturnValue(10, 'wrun_test', undefined), + dehydrateStepReturnValue(20, 'wrun_test', undefined), + dehydrateStepReturnValue(30, 'wrun_test', undefined), + ]); + + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { result: results[0] }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW', + eventData: { result: results[1] }, + createdAt: new Date(), + }, + { + eventId: 'evnt_2', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCX', + eventData: { result: results[2] }, + createdAt: new Date(), + }, + ]); + + const serialization = await import('./serialization.js'); + const originalHydrate = serialization.hydrateStepReturnValue; + let callCount = 0; + const hydrateStub = vi + .spyOn(serialization, 'hydrateStepReturnValue') + .mockImplementation(async (...args) => { + callCount++; + const thisCall = callCount; + // Decreasing delays: 60ms, 30ms, 5ms — maximizes chance of out-of-order resolution + const delays = [60, 30, 5]; + const delay = delays[thisCall - 1] ?? 5; + await new Promise((resolve) => setTimeout(resolve, delay)); + return originalHydrate(...args); + }); + + const useStep = createUseStep(ctx); + const step1 = useStep('step1'); + const step2 = useStep('step2'); + const step3 = useStep('step3'); + + const promise1 = step1(); + const promise2 = step2(); + const promise3 = step3(); + + const resolveOrder: number[] = []; + promise1.then((val) => resolveOrder.push(val as number)); + promise2.then((val) => resolveOrder.push(val as number)); + promise3.then((val) => resolveOrder.push(val as number)); + + const [val1, val2, val3] = await Promise.all([ + promise1, + promise2, + promise3, + ]); + + expect(val1).toBe(10); + expect(val2).toBe(20); + expect(val3).toBe(30); + + // Must resolve in event log order + expect(resolveOrder).toEqual([10, 20, 30]); + + hydrateStub.mockRestore(); + }); +}); diff --git a/packages/core/src/private.ts b/packages/core/src/private.ts index b7067a7b88..002e4d10bb 100644 --- a/packages/core/src/private.ts +++ b/packages/core/src/private.ts @@ -101,4 +101,11 @@ export interface WorkflowOrchestratorContext { onWorkflowError: (error: Error) => void; generateUlid: () => string; generateNanoid: () => string; + /** + * Sequential promise chain that ensures async deserialization (e.g., decryption) + * of event payloads resolves in event log order. Each hydration + resolve + * operation is chained through this promise so that even if individual + * deserialization takes variable time, promises resolve deterministically. + */ + deserializationChain: Promise; } diff --git a/packages/core/src/step.test.ts b/packages/core/src/step.test.ts index 23bea043f7..0517861a02 100644 --- a/packages/core/src/step.test.ts +++ b/packages/core/src/step.test.ts @@ -31,6 +31,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) ), onWorkflowError: vi.fn(), + deserializationChain: Promise.resolve(), }; } diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index b31d211ce0..2c17e9af98 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -142,12 +142,12 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { ctx.invocationsQueue.delete(event.correlationId); // Step has completed, so resolve the Promise with the cached result. - // The hydration is async, so we schedule the resolve via setTimeout - // after hydration completes to preserve macrotask timing semantics. - // We use a single setTimeout that awaits hydration inside it, keeping - // the same scheduling order as the original synchronous code path - // (where setTimeout was called synchronously from this callback). - setTimeout(async () => { + // The hydration is async (e.g., decryption), so we chain it through + // ctx.deserializationChain to ensure that even if deserialization + // takes variable time, promises resolve in event log order. + // Each step's hydration + resolve waits for all prior hydrations + // to complete before executing, preserving deterministic ordering. + ctx.deserializationChain = ctx.deserializationChain.then(async () => { try { const hydratedResult = await hydrateStepReturnValue( event.eventData.result, @@ -159,7 +159,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { } catch (error) { reject(error); } - }, 0); + }); return EventConsumerResult.Finished; } diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index f565f64eb8..36c50fefc4 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -133,6 +133,7 @@ export async function runWorkflow( generateUlid: () => ulid(+startedAt), generateNanoid, invocationsQueue: new Map(), + deserializationChain: Promise.resolve(), }; // Subscribe to the events log to update the timestamp in the vm context diff --git a/packages/core/src/workflow/hook.test.ts b/packages/core/src/workflow/hook.test.ts index eb64cdc5e0..d61325461d 100644 --- a/packages/core/src/workflow/hook.test.ts +++ b/packages/core/src/workflow/hook.test.ts @@ -31,6 +31,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) ), onWorkflowError: vi.fn(), + deserializationChain: Promise.resolve(), }; } diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index 0ea3306774..92c64f478b 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -98,19 +98,24 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { if (promises.length > 0) { const next = promises.shift(); if (next) { - // Reconstruct the payload from the event data - hydrateStepReturnValue( - event.eventData.payload, - ctx.runId, - ctx.encryptionKey, - ctx.globalThis - ) - .then((payload) => { - next.resolve(payload); - }) - .catch((error) => { - next.reject(error); - }); + // Reconstruct the payload from the event data. + // Chain through ctx.deserializationChain to ensure that async + // deserialization (e.g., decryption) resolves in event log order. + ctx.deserializationChain = ctx.deserializationChain.then( + async () => { + try { + const payload = await hydrateStepReturnValue( + event.eventData.payload, + ctx.runId, + ctx.encryptionKey, + ctx.globalThis + ); + next.resolve(payload); + } catch (error) { + next.reject(error); + } + } + ); } } else { payloadsQueue.push(event); @@ -156,18 +161,21 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { if (payloadsQueue.length > 0) { const nextPayload = payloadsQueue.shift(); if (nextPayload) { - hydrateStepReturnValue( - nextPayload.eventData.payload, - ctx.runId, - ctx.encryptionKey, - ctx.globalThis - ) - .then((payload) => { + // Chain through ctx.deserializationChain to ensure that async + // deserialization (e.g., decryption) resolves in event log order. + ctx.deserializationChain = ctx.deserializationChain.then(async () => { + try { + const payload = await hydrateStepReturnValue( + nextPayload.eventData.payload, + ctx.runId, + ctx.encryptionKey, + ctx.globalThis + ); resolvers.resolve(payload); - }) - .catch((error) => { + } catch (error) { resolvers.reject(error); - }); + } + }); return resolvers.promise; } } diff --git a/packages/core/src/workflow/sleep.test.ts b/packages/core/src/workflow/sleep.test.ts index 280ec11df7..0b5b6cf0d8 100644 --- a/packages/core/src/workflow/sleep.test.ts +++ b/packages/core/src/workflow/sleep.test.ts @@ -35,6 +35,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) ), onWorkflowError: vi.fn(), + deserializationChain: Promise.resolve(), }; return ctx; } From 79c3bfd429df38f884c408cdff0334a42add8501 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 3 Mar 2026 13:14:35 -0800 Subject: [PATCH 2/4] test: add hook ordering test and afterEach mock cleanup Address review feedback: - Add hook-specific deserialization ordering test - Add afterEach(vi.restoreAllMocks) to prevent spy leaks - Remove manual mockRestore calls --- .../async-deserialization-ordering.test.ts | 105 ++++++++++++++++-- 1 file changed, 93 insertions(+), 12 deletions(-) diff --git a/packages/core/src/async-deserialization-ordering.test.ts b/packages/core/src/async-deserialization-ordering.test.ts index dbd218fa6c..ec89233fd9 100644 --- a/packages/core/src/async-deserialization-ordering.test.ts +++ b/packages/core/src/async-deserialization-ordering.test.ts @@ -1,12 +1,13 @@ import type { Event } from '@workflow/world'; import * as nanoid from 'nanoid'; import { monotonicFactory } from 'ulid'; -import { describe, expect, it, vi } from 'vitest'; +import { afterEach, describe, expect, it, vi } from 'vitest'; import { EventsConsumer } from './events-consumer.js'; import type { WorkflowOrchestratorContext } from './private.js'; import { dehydrateStepReturnValue } from './serialization.js'; import { createUseStep } from './step.js'; import { createContext } from './vm/index.js'; +import { createCreateHook } from './workflow/hook.js'; /** * These tests verify that when `hydrateStepReturnValue` performs real async @@ -43,6 +44,10 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { } describe('async deserialization ordering', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + it('should resolve step promises in event log order even when deserialization takes variable time', async () => { // Create two step_completed events with real serialized data. // We will mock hydrateStepReturnValue to simulate variable async delays. @@ -82,16 +87,16 @@ describe('async deserialization ordering', () => { const serialization = await import('./serialization.js'); const originalHydrate = serialization.hydrateStepReturnValue; let callCount = 0; - const hydrateStub = vi - .spyOn(serialization, 'hydrateStepReturnValue') - .mockImplementation(async (...args) => { + vi.spyOn(serialization, 'hydrateStepReturnValue').mockImplementation( + async (...args) => { callCount++; const thisCall = callCount; // First call (step A): slow. Second call (step B): fast. const delay = thisCall === 1 ? 50 : 5; await new Promise((resolve) => setTimeout(resolve, delay)); return originalHydrate(...args); - }); + } + ); const useStep = createUseStep(ctx); const stepA = useStep('stepA'); @@ -116,8 +121,6 @@ describe('async deserialization ordering', () => { // The critical assertion: promises must resolve in event log order (A before B), // even though A's deserialization is slower than B's. expect(resolveOrder).toEqual(['A:result_A', 'B:result_B']); - - hydrateStub.mockRestore(); }); it('should resolve sequential step promises in order with variable async delays', async () => { @@ -159,9 +162,8 @@ describe('async deserialization ordering', () => { const serialization = await import('./serialization.js'); const originalHydrate = serialization.hydrateStepReturnValue; let callCount = 0; - const hydrateStub = vi - .spyOn(serialization, 'hydrateStepReturnValue') - .mockImplementation(async (...args) => { + vi.spyOn(serialization, 'hydrateStepReturnValue').mockImplementation( + async (...args) => { callCount++; const thisCall = callCount; // Decreasing delays: 60ms, 30ms, 5ms — maximizes chance of out-of-order resolution @@ -169,7 +171,8 @@ describe('async deserialization ordering', () => { const delay = delays[thisCall - 1] ?? 5; await new Promise((resolve) => setTimeout(resolve, delay)); return originalHydrate(...args); - }); + } + ); const useStep = createUseStep(ctx); const step1 = useStep('step1'); @@ -197,7 +200,85 @@ describe('async deserialization ordering', () => { // Must resolve in event log order expect(resolveOrder).toEqual([10, 20, 30]); + }); + + it('should resolve hook payloads in event log order even when deserialization takes variable time', async () => { + const ops: Promise[] = []; + // Create hook events: hook_received with payloads that have variable deserialization time + const payloadA = await dehydrateStepReturnValue( + { message: 'first' }, + 'wrun_test', + undefined, + ops + ); + const payloadB = await dehydrateStepReturnValue( + { message: 'second' }, + 'wrun_test', + undefined, + ops + ); + + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'hook_received', + correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { payload: payloadA }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'hook_received', + correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { payload: payloadB }, + createdAt: new Date(), + }, + { + eventId: 'evnt_2', + runId: 'wrun_test', + eventType: 'hook_disposed', + correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', + createdAt: new Date(), + }, + ]); + + // Mock hydrateStepReturnValue with variable delays. + // First hook payload: slow (50ms). Second hook payload: fast (5ms). + const serialization = await import('./serialization.js'); + const originalHydrate = serialization.hydrateStepReturnValue; + let callCount = 0; + vi.spyOn(serialization, 'hydrateStepReturnValue').mockImplementation( + async (...args) => { + callCount++; + const thisCall = callCount; + const delay = thisCall === 1 ? 50 : 5; + await new Promise((resolve) => setTimeout(resolve, delay)); + return originalHydrate(...args); + } + ); + + const createHook = createCreateHook(ctx); + const hook = createHook(); + + // Await two payloads from the hook + const resolveOrder: string[] = []; + const promiseA = hook.then((val: any) => { + resolveOrder.push(`A:${val.message}`); + return val; + }); + const promiseB = hook.then((val: any) => { + resolveOrder.push(`B:${val.message}`); + return val; + }); + + const [valA, valB] = await Promise.all([promiseA, promiseB]); + + expect(valA).toEqual({ message: 'first' }); + expect(valB).toEqual({ message: 'second' }); - hydrateStub.mockRestore(); + // Hook payloads must resolve in event log order + expect(resolveOrder).toEqual(['A:first', 'B:second']); }); }); From ef6cd61a4994aa0b76ec16d6860f3ed0e06b2703 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 3 Mar 2026 13:27:37 -0800 Subject: [PATCH 3/4] refactor(core): route all promise resolutions through sequential promiseQueue - Rename deserializationChain -> promiseQueue (it now handles all event-driven promise resolutions, not just deserialization) - Route step_failed, step_created corruption, unexpected event type, and null event suspensions through promiseQueue in step.ts - Route hook_conflict, unexpected event type, null event suspensions, dispose suspensions, and eventLogEmpty suspensions through promiseQueue in hook.ts - Route wait_completed and unexpected event type through promiseQueue in sleep.ts - Remove all setTimeout(0) calls from step.ts, hook.ts, and sleep.ts - Add stress tests: mixed step_completed/step_failed, 10 concurrent steps, sleep+step interleaving, interleaved concurrent steps --- .changeset/fix-deserialization-ordering.md | 2 +- .../async-deserialization-ordering.test.ts | 326 +++++++++++++++++- packages/core/src/private.ts | 11 +- packages/core/src/step.test.ts | 2 +- packages/core/src/step.ts | 23 +- packages/core/src/workflow.ts | 2 +- packages/core/src/workflow/hook.test.ts | 2 +- packages/core/src/workflow/hook.ts | 69 ++-- packages/core/src/workflow/sleep.test.ts | 2 +- packages/core/src/workflow/sleep.ts | 17 +- 10 files changed, 394 insertions(+), 62 deletions(-) diff --git a/.changeset/fix-deserialization-ordering.md b/.changeset/fix-deserialization-ordering.md index c2bf309ccd..66a7968361 100644 --- a/.changeset/fix-deserialization-ordering.md +++ b/.changeset/fix-deserialization-ordering.md @@ -2,4 +2,4 @@ "@workflow/core": patch --- -Ensure async deserialization resolves step/hook promises in event log order +Route all event-driven promise resolutions through a sequential queue to ensure deterministic ordering diff --git a/packages/core/src/async-deserialization-ordering.test.ts b/packages/core/src/async-deserialization-ordering.test.ts index ec89233fd9..d91fd19162 100644 --- a/packages/core/src/async-deserialization-ordering.test.ts +++ b/packages/core/src/async-deserialization-ordering.test.ts @@ -1,3 +1,4 @@ +import { FatalError } from '@workflow/errors'; import type { Event } from '@workflow/world'; import * as nanoid from 'nanoid'; import { monotonicFactory } from 'ulid'; @@ -8,6 +9,7 @@ import { dehydrateStepReturnValue } from './serialization.js'; import { createUseStep } from './step.js'; import { createContext } from './vm/index.js'; import { createCreateHook } from './workflow/hook.js'; +import { createSleep } from './workflow/sleep.js'; /** * These tests verify that when `hydrateStepReturnValue` performs real async @@ -39,7 +41,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) ), onWorkflowError: vi.fn(), - deserializationChain: Promise.resolve(), + promiseQueue: Promise.resolve(), }; } @@ -281,4 +283,326 @@ describe('async deserialization ordering', () => { // Hook payloads must resolve in event log order expect(resolveOrder).toEqual(['A:first', 'B:second']); }); + + it('should resolve mixed step_completed and step_failed in event log order', async () => { + // Simulate: step A completes (slow hydration), step B fails, step C completes (fast hydration) + // All three should resolve/reject in A, B, C order. + const resultA = await dehydrateStepReturnValue( + 'success_A', + 'wrun_test', + undefined + ); + const resultC = await dehydrateStepReturnValue( + 'success_C', + 'wrun_test', + undefined + ); + + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { result: resultA }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'step_failed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW', + eventData: { error: 'step B failed' }, + createdAt: new Date(), + }, + { + eventId: 'evnt_2', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCX', + eventData: { result: resultC }, + createdAt: new Date(), + }, + ]); + + // Slow hydration for step A to test that step_failed (B) still waits for it + const serialization = await import('./serialization.js'); + const originalHydrate = serialization.hydrateStepReturnValue; + let callCount = 0; + vi.spyOn(serialization, 'hydrateStepReturnValue').mockImplementation( + async (...args) => { + callCount++; + const thisCall = callCount; + // step A: 50ms, step C: 5ms (step B has no hydration) + const delay = thisCall === 1 ? 50 : 5; + await new Promise((resolve) => setTimeout(resolve, delay)); + return originalHydrate(...args); + } + ); + + const useStep = createUseStep(ctx); + const stepA = useStep('stepA'); + const stepB = useStep('stepB'); + const stepC = useStep('stepC'); + + const promiseA = stepA(); + const promiseB = stepB(); + const promiseC = stepC(); + + const resolveOrder: string[] = []; + promiseA.then((val) => resolveOrder.push(`A:${val}`)); + promiseB.catch((err) => resolveOrder.push(`B:${err.message}`)); + promiseC.then((val) => resolveOrder.push(`C:${val}`)); + + const results = await Promise.allSettled([promiseA, promiseB, promiseC]); + + expect(results[0]).toEqual({ status: 'fulfilled', value: 'success_A' }); + expect(results[1].status).toBe('rejected'); + expect((results[1] as PromiseRejectedResult).reason).toBeInstanceOf( + FatalError + ); + expect(results[2]).toEqual({ status: 'fulfilled', value: 'success_C' }); + + // Critical: order must be A, B, C regardless of hydration timing + expect(resolveOrder).toEqual([ + 'A:success_A', + 'B:step B failed', + 'C:success_C', + ]); + }); + + it('should handle many concurrent steps (10) with variable delays in correct order', async () => { + const count = 10; + const results = await Promise.all( + Array.from({ length: count }, (_, i) => + dehydrateStepReturnValue(i, 'wrun_test', undefined) + ) + ); + + // Correlation IDs from the deterministic ULID generator + const correlationIds = [ + 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + 'step_01K11TFZ62YS0YYFDQ3E8B9YCW', + 'step_01K11TFZ62YS0YYFDQ3E8B9YCX', + 'step_01K11TFZ62YS0YYFDQ3E8B9YCY', + 'step_01K11TFZ62YS0YYFDQ3E8B9YCZ', + 'step_01K11TFZ62YS0YYFDQ3E8B9YD0', + 'step_01K11TFZ62YS0YYFDQ3E8B9YD1', + 'step_01K11TFZ62YS0YYFDQ3E8B9YD2', + 'step_01K11TFZ62YS0YYFDQ3E8B9YD3', + 'step_01K11TFZ62YS0YYFDQ3E8B9YD4', + ]; + + const events: Event[] = results.map((result, i) => ({ + eventId: `evnt_${i}`, + runId: 'wrun_test', + eventType: 'step_completed' as const, + correlationId: correlationIds[i], + eventData: { result }, + createdAt: new Date(), + })); + + const ctx = setupWorkflowContext(events); + + // Variable delays: reverse order so step 0 is slowest, step 9 is fastest + const serialization = await import('./serialization.js'); + const originalHydrate = serialization.hydrateStepReturnValue; + let callCount = 0; + vi.spyOn(serialization, 'hydrateStepReturnValue').mockImplementation( + async (...args) => { + callCount++; + const delay = (count - callCount + 1) * 10; // 100ms, 90ms, ..., 10ms + await new Promise((resolve) => setTimeout(resolve, delay)); + return originalHydrate(...args); + } + ); + + const useStep = createUseStep(ctx); + const steps = Array.from({ length: count }, (_, i) => useStep(`step${i}`)); + const promises = steps.map((step) => step()); + + const resolveOrder: number[] = []; + for (const [i, p] of promises.entries()) { + p.then(() => resolveOrder.push(i)); + } + + const values = await Promise.all(promises); + + // All values correct + for (let i = 0; i < count; i++) { + expect(values[i]).toBe(i); + } + + // Must resolve in sequential order 0, 1, 2, ..., 9 + expect(resolveOrder).toEqual(Array.from({ length: count }, (_, i) => i)); + }); + + it('should resolve sleep and step promises in event log order', async () => { + // Simulate: step A completes (slow hydration), sleep B completes, step C completes (fast hydration) + const resultA = await dehydrateStepReturnValue( + 'step_result', + 'wrun_test', + undefined + ); + const resultC = await dehydrateStepReturnValue( + 'after_sleep', + 'wrun_test', + undefined + ); + + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { result: resultA }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'wait_created', + correlationId: 'wait_01K11TFZ62YS0YYFDQ3E8B9YCW', + eventData: { resumeAt: new Date('2024-01-01T00:00:05.000Z') }, + createdAt: new Date(), + }, + { + eventId: 'evnt_2', + runId: 'wrun_test', + eventType: 'wait_completed', + correlationId: 'wait_01K11TFZ62YS0YYFDQ3E8B9YCW', + createdAt: new Date(), + }, + { + eventId: 'evnt_3', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCX', + eventData: { result: resultC }, + createdAt: new Date(), + }, + ]); + + // Slow hydration for step A + const serialization = await import('./serialization.js'); + const originalHydrate = serialization.hydrateStepReturnValue; + let callCount = 0; + vi.spyOn(serialization, 'hydrateStepReturnValue').mockImplementation( + async (...args) => { + callCount++; + const thisCall = callCount; + const delay = thisCall === 1 ? 50 : 5; + await new Promise((resolve) => setTimeout(resolve, delay)); + return originalHydrate(...args); + } + ); + + const useStep = createUseStep(ctx); + const sleep = createSleep(ctx); + const stepA = useStep('stepA'); + const stepC = useStep('stepC'); + + const promiseA = stepA(); + const promiseB = sleep('5s'); + const promiseC = stepC(); + + const resolveOrder: string[] = []; + promiseA.then((val) => resolveOrder.push(`step:${val}`)); + promiseB.then(() => resolveOrder.push('sleep')); + promiseC.then((val) => resolveOrder.push(`step:${val}`)); + + await Promise.all([promiseA, promiseB, promiseC]); + + // Must resolve in event log order: step A, sleep, step C + expect(resolveOrder).toEqual([ + 'step:step_result', + 'sleep', + 'step:after_sleep', + ]); + }); + + it('should resolve step_completed interleaved with step_completed from different functions in event log order', async () => { + // Simulate two different step functions whose events are interleaved: + // stepA_created, stepB_created, stepA_completed (slow), stepB_completed (fast) + const resultA = await dehydrateStepReturnValue( + 'value_A', + 'wrun_test', + undefined + ); + const resultB = await dehydrateStepReturnValue( + 'value_B', + 'wrun_test', + undefined + ); + + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'step_started', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: {}, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'step_started', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW', + eventData: {}, + createdAt: new Date(), + }, + { + eventId: 'evnt_2', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { result: resultA }, + createdAt: new Date(), + }, + { + eventId: 'evnt_3', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW', + eventData: { result: resultB }, + createdAt: new Date(), + }, + ]); + + // Step A hydration is slow, step B is fast + const serialization = await import('./serialization.js'); + const originalHydrate = serialization.hydrateStepReturnValue; + let callCount = 0; + vi.spyOn(serialization, 'hydrateStepReturnValue').mockImplementation( + async (...args) => { + callCount++; + const thisCall = callCount; + const delay = thisCall === 1 ? 50 : 5; + await new Promise((resolve) => setTimeout(resolve, delay)); + return originalHydrate(...args); + } + ); + + const useStep = createUseStep(ctx); + const stepA = useStep('stepA'); + const stepB = useStep('stepB'); + + // Launch both concurrently (like Promise.all in a workflow) + const promiseA = stepA(); + const promiseB = stepB(); + + const resolveOrder: string[] = []; + promiseA.then((val) => resolveOrder.push(`A:${val}`)); + promiseB.then((val) => resolveOrder.push(`B:${val}`)); + + const [valA, valB] = await Promise.all([promiseA, promiseB]); + + expect(valA).toBe('value_A'); + expect(valB).toBe('value_B'); + + // Step A must resolve before step B (event log order) + expect(resolveOrder).toEqual(['A:value_A', 'B:value_B']); + }); }); diff --git a/packages/core/src/private.ts b/packages/core/src/private.ts index 002e4d10bb..a269660013 100644 --- a/packages/core/src/private.ts +++ b/packages/core/src/private.ts @@ -102,10 +102,11 @@ export interface WorkflowOrchestratorContext { generateUlid: () => string; generateNanoid: () => string; /** - * Sequential promise chain that ensures async deserialization (e.g., decryption) - * of event payloads resolves in event log order. Each hydration + resolve - * operation is chained through this promise so that even if individual - * deserialization takes variable time, promises resolve deterministically. + * Sequential promise queue that ensures all event-driven promise resolutions + * (step results, hook payloads, failures, suspensions) happen in event log + * order. Every resolve, reject, or workflow error is chained through this + * queue so that even if individual operations take variable time (e.g., + * async decryption), promises resolve deterministically. */ - deserializationChain: Promise; + promiseQueue: Promise; } diff --git a/packages/core/src/step.test.ts b/packages/core/src/step.test.ts index 0517861a02..f23834bfad 100644 --- a/packages/core/src/step.test.ts +++ b/packages/core/src/step.test.ts @@ -31,7 +31,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) ), onWorkflowError: vi.fn(), - deserializationChain: Promise.resolve(), + promiseQueue: Promise.resolve(), }; } diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index 2c17e9af98..2c2a42f136 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -53,11 +53,11 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { // Crucially, if we got here, then this step Promise does // not resolve so that the user workflow code does not proceed any further. // Notify the workflow handler that this step has not been run / has not completed yet. - setTimeout(() => { + ctx.promiseQueue = ctx.promiseQueue.then(() => { ctx.onWorkflowError( new WorkflowSuspension(ctx.invocationsQueue, ctx.globalThis) ); - }, 0); + }); return EventConsumerResult.NotConsumed; } @@ -83,13 +83,13 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { if (!queueItem || queueItem.type !== 'step') { // This indicates event log corruption - step_created received // but the step was never invoked in the workflow during replay. - setTimeout(() => { + ctx.promiseQueue = ctx.promiseQueue.then(() => { reject( new WorkflowRuntimeError( `Corrupted event log: step ${correlationId} (${stepName}) created but not found in invocation queue` ) ); - }, 0); + }); return EventConsumerResult.Finished; } queueItem.hasCreatedEvent = true; @@ -112,8 +112,9 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { if (event.eventType === 'step_failed') { // Terminal state - we can remove the invocationQueue item ctx.invocationsQueue.delete(event.correlationId); - // Step failed - bubble up to workflow - setTimeout(() => { + // Step failed - chain through promiseQueue to ensure + // deterministic ordering of all promise resolutions/rejections. + ctx.promiseQueue = ctx.promiseQueue.then(() => { const errorData = event.eventData.error; const isErrorObject = typeof errorData === 'object' && errorData !== null; @@ -133,7 +134,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { error.stack = errorStack; } reject(error); - }, 0); + }); return EventConsumerResult.Finished; } @@ -143,11 +144,11 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { // Step has completed, so resolve the Promise with the cached result. // The hydration is async (e.g., decryption), so we chain it through - // ctx.deserializationChain to ensure that even if deserialization + // ctx.promiseQueue to ensure that even if deserialization // takes variable time, promises resolve in event log order. // Each step's hydration + resolve waits for all prior hydrations // to complete before executing, preserving deterministic ordering. - ctx.deserializationChain = ctx.deserializationChain.then(async () => { + ctx.promiseQueue = ctx.promiseQueue.then(async () => { try { const hydratedResult = await hydrateStepReturnValue( event.eventData.result, @@ -164,13 +165,13 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { } // An unexpected event type has been received, this event log looks corrupted. Let's fail immediately. - setTimeout(() => { + ctx.promiseQueue = ctx.promiseQueue.then(() => { ctx.onWorkflowError( new WorkflowRuntimeError( `Unexpected event type for step ${correlationId} (name: ${stepName}) "${event.eventType}"` ) ); - }, 0); + }); return EventConsumerResult.Finished; }); diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 36c50fefc4..1fa17589b9 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -133,7 +133,7 @@ export async function runWorkflow( generateUlid: () => ulid(+startedAt), generateNanoid, invocationsQueue: new Map(), - deserializationChain: Promise.resolve(), + promiseQueue: Promise.resolve(), }; // Subscribe to the events log to update the timestamp in the vm context diff --git a/packages/core/src/workflow/hook.test.ts b/packages/core/src/workflow/hook.test.ts index d61325461d..f1d354d632 100644 --- a/packages/core/src/workflow/hook.test.ts +++ b/packages/core/src/workflow/hook.test.ts @@ -31,7 +31,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) ), onWorkflowError: vi.fn(), - deserializationChain: Promise.resolve(), + promiseQueue: Promise.resolve(), }; } diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index 92c64f478b..43a0c8651c 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -46,11 +46,11 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { eventLogEmpty = true; if (promises.length > 0) { - setTimeout(() => { + ctx.promiseQueue = ctx.promiseQueue.then(() => { ctx.onWorkflowError( new WorkflowSuspension(ctx.invocationsQueue, ctx.globalThis) ); - }, 0); + }); } return EventConsumerResult.NotConsumed; } @@ -74,23 +74,30 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { // Remove this hook from the invocations queue ctx.invocationsQueue.delete(correlationId); - // Store the conflict event so we can reject any awaited promises + // Store the conflict event so we can reject any awaited promises. + // Chain through promiseQueue to ensure deterministic ordering. const conflictEvent = event as HookConflictEvent; const conflictError = new WorkflowRuntimeError( `Hook token "${conflictEvent.eventData.token}" is already in use by another workflow`, { slug: ERROR_SLUGS.HOOK_CONFLICT } ); - // Reject any pending promises - for (const resolver of promises) { - resolver.reject(conflictError); - } - promises.length = 0; - // Mark that we have a conflict so future awaits also reject hasConflict = true; conflictErrorRef = conflictError; + // Capture and drain pending promises synchronously so the null event + // handler won't see them and trigger a spurious WorkflowSuspension. + // The actual rejections are deferred through promiseQueue for ordering. + const pendingPromises = promises.slice(); + promises.length = 0; + + ctx.promiseQueue = ctx.promiseQueue.then(() => { + for (const resolver of pendingPromises) { + resolver.reject(conflictError); + } + }); + return EventConsumerResult.Consumed; } @@ -99,23 +106,21 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { const next = promises.shift(); if (next) { // Reconstruct the payload from the event data. - // Chain through ctx.deserializationChain to ensure that async + // Chain through ctx.promiseQueue to ensure that async // deserialization (e.g., decryption) resolves in event log order. - ctx.deserializationChain = ctx.deserializationChain.then( - async () => { - try { - const payload = await hydrateStepReturnValue( - event.eventData.payload, - ctx.runId, - ctx.encryptionKey, - ctx.globalThis - ); - next.resolve(payload); - } catch (error) { - next.reject(error); - } + ctx.promiseQueue = ctx.promiseQueue.then(async () => { + try { + const payload = await hydrateStepReturnValue( + event.eventData.payload, + ctx.runId, + ctx.encryptionKey, + ctx.globalThis + ); + next.resolve(payload); + } catch (error) { + next.reject(error); } - ); + }); } } else { payloadsQueue.push(event); @@ -134,13 +139,13 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { } // An unexpected event type has been received, this event log looks corrupted. Let's fail immediately. - setTimeout(() => { + ctx.promiseQueue = ctx.promiseQueue.then(() => { ctx.onWorkflowError( new WorkflowRuntimeError( `Unexpected event type for hook ${correlationId} (token: ${token}) "${event.eventType}"` ) ); - }, 0); + }); return EventConsumerResult.Finished; }); @@ -161,9 +166,9 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { if (payloadsQueue.length > 0) { const nextPayload = payloadsQueue.shift(); if (nextPayload) { - // Chain through ctx.deserializationChain to ensure that async + // Chain through ctx.promiseQueue to ensure that async // deserialization (e.g., decryption) resolves in event log order. - ctx.deserializationChain = ctx.deserializationChain.then(async () => { + ctx.promiseQueue = ctx.promiseQueue.then(async () => { try { const payload = await hydrateStepReturnValue( nextPayload.eventData.payload, @@ -183,11 +188,11 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { if (eventLogEmpty) { // If the event log is already empty then we know the hook will not be resolved. // Treat this case as a "step not run" scenario and suspend the workflow. - setTimeout(() => { + ctx.promiseQueue = ctx.promiseQueue.then(() => { ctx.onWorkflowError( new WorkflowSuspension(ctx.invocationsQueue, ctx.globalThis) ); - }, 0); + }); } promises.push(resolvers); @@ -219,11 +224,11 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { // never deliver another hook_received after disposal. if (promises.length > 0) { promises.length = 0; - setTimeout(() => { + ctx.promiseQueue = ctx.promiseQueue.then(() => { ctx.onWorkflowError( new WorkflowSuspension(ctx.invocationsQueue, ctx.globalThis) ); - }, 0); + }); } webhookLogger.debug('Hook disposed', { correlationId, token }); diff --git a/packages/core/src/workflow/sleep.test.ts b/packages/core/src/workflow/sleep.test.ts index 0b5b6cf0d8..9a2249cc79 100644 --- a/packages/core/src/workflow/sleep.test.ts +++ b/packages/core/src/workflow/sleep.test.ts @@ -35,7 +35,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) ), onWorkflowError: vi.fn(), - deserializationChain: Promise.resolve(), + promiseQueue: Promise.resolve(), }; return ctx; } diff --git a/packages/core/src/workflow/sleep.ts b/packages/core/src/workflow/sleep.ts index 9a51e69385..f176d4b07a 100644 --- a/packages/core/src/workflow/sleep.ts +++ b/packages/core/src/workflow/sleep.ts @@ -1,9 +1,9 @@ +import { WorkflowRuntimeError } from '@workflow/errors'; import { parseDurationToDate, withResolvers } from '@workflow/utils'; import type { StringValue } from 'ms'; import { EventConsumerResult } from '../events-consumer.js'; import { type WaitInvocationQueueItem, WorkflowSuspension } from '../global.js'; import type { WorkflowOrchestratorContext } from '../private.js'; -import { WorkflowRuntimeError } from '@workflow/errors'; export function createSleep(ctx: WorkflowOrchestratorContext) { return async function sleepImpl( @@ -27,11 +27,11 @@ export function createSleep(ctx: WorkflowOrchestratorContext) { // If there are no events and we're waiting for wait_completed, // suspend the workflow until the wait fires if (!event) { - setTimeout(() => { + ctx.promiseQueue = ctx.promiseQueue.then(() => { ctx.onWorkflowError( new WorkflowSuspension(ctx.invocationsQueue, ctx.globalThis) ); - }, 0); + }); return EventConsumerResult.NotConsumed; } @@ -57,21 +57,22 @@ export function createSleep(ctx: WorkflowOrchestratorContext) { // Remove this wait from the invocations queue (O(1) delete using Map) ctx.invocationsQueue.delete(correlationId); - // Wait has elapsed, resolve the sleep - setTimeout(() => { + // Wait has elapsed - chain through promiseQueue to ensure + // deterministic ordering of all promise resolutions. + ctx.promiseQueue = ctx.promiseQueue.then(() => { resolve(); - }, 0); + }); return EventConsumerResult.Finished; } // An unexpected event type has been received, this event log looks corrupted. Let's fail immediately. - setTimeout(() => { + ctx.promiseQueue = ctx.promiseQueue.then(() => { ctx.onWorkflowError( new WorkflowRuntimeError( `Unexpected event type for wait ${correlationId} "${event.eventType}"` ) ); - }, 0); + }); return EventConsumerResult.Finished; }); From 7def3dbd4dd5fd782acfac52477f62f3e2c8ab9e Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 3 Mar 2026 14:01:29 -0800 Subject: [PATCH 4/4] fix: route createHookPromise conflict rejection through promiseQueue - The hasConflict path in createHookPromise() was rejecting immediately, bypassing the promiseQueue and potentially settling out of event-log order relative to prior queued resolutions. - Add missing runId and encryptionKey to sleep.test.ts context stub. --- packages/core/src/workflow/hook.ts | 8 +++++--- packages/core/src/workflow/sleep.test.ts | 2 ++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index 43a0c8651c..1e42e27153 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -156,10 +156,12 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { function createHookPromise(): Promise { const resolvers = withResolvers(); - // If we have a conflict, reject immediately - // This handles the iterator case where each await should reject + // If we have a conflict, reject through the promiseQueue to maintain + // deterministic ordering with any prior queued resolutions. if (hasConflict && conflictErrorRef) { - resolvers.reject(conflictErrorRef); + ctx.promiseQueue = ctx.promiseQueue.then(() => { + resolvers.reject(conflictErrorRef); + }); return resolvers.promise; } diff --git a/packages/core/src/workflow/sleep.test.ts b/packages/core/src/workflow/sleep.test.ts index 9a2249cc79..8321605300 100644 --- a/packages/core/src/workflow/sleep.test.ts +++ b/packages/core/src/workflow/sleep.test.ts @@ -18,6 +18,8 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { const ulid = monotonicFactory(() => context.globalThis.Math.random()); const workflowStartedAt = context.globalThis.Date.now(); const ctx: WorkflowOrchestratorContext = { + runId: 'wrun_test', + encryptionKey: undefined, globalThis: context.globalThis, // ctx.onWorkflowError is accessed via closure — it's defined below on the same object eventsConsumer: new EventsConsumer(events, {