fix(core): chain unconsumed event check onto promiseQueue to prevent false positives#1254
Conversation
…false positives The EventsConsumer's unconsumed event check (setTimeout(0)) was racing against the promiseQueue's async deserialization. When parallel steps completed and their hydrateStepReturnValue did real async work (e.g., decryption), the setTimeout(0) fired before the promise chain resolved the step results and triggered the next subscribe() call. This caused step_created events for sequential steps to be falsely flagged as unconsumed/orphaned. Fix: chain the unconsumed check onto the promiseQueue via getPromiseQueue() so it only fires after all pending async work completes. Use process.nextTick (not setTimeout) after the queue drains to give synchronous subscribe() calls from resolved user code a chance to cancel. Version-based cancellation replaces clearTimeout since the check is now promise-based. Adds getPromiseQueue option to EventsConsumerOptions. The workflow.ts context uses a getter/setter to keep the promiseQueue holder in sync. Reproduction test: parallel steps A+B with 10ms mock deserialization delay, followed by sequential step C. Previously failed with 'Unconsumed event: step_created(C)'. Now passes.
🦋 Changeset detectedLatest commit: 1bb2a66 The changes in this PR will be included in the next version bump. This PR includes changesets to release 14 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests🌍 Community Worlds (48 failed)turso (48 failed):
Details by Category✅ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
✅ 📋 Other
|
📊 Benchmark Results
workflow with no steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro | Express workflow with 1 step💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) | Express workflow with 10 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) workflow with 25 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) workflow with 50 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) Promise.all with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.all with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.all with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro Promise.race with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) | Express Promise.race with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) | Express Promise.race with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
|
There was a problem hiding this comment.
Pull request overview
This PR fixes a production timing race in EventsConsumer where the “unconsumed event” orphan check could run before pending async hydration/deserialization work completed, causing valid step_created events (especially after parallel steps) to be falsely flagged as orphaned.
Changes:
- Add
getPromiseQueue()toEventsConsumerOptionsand chain the unconsumed-event check onto the currentpromiseQueueto ensure pending async work completes first. - Update
runWorkflowto expose a mutablepromiseQueuereference via a getter/setter soEventsConsumeralways sees the latest queue state. - Add/adjust tests to reproduce the parallel-steps + async-deserialization race and validate the corrected ordering/behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| packages/core/src/events-consumer.ts | Defers unconsumed-event check by chaining onto the promise queue and uses version-based cancellation. |
| packages/core/src/workflow.ts | Introduces a mutable promiseQueue holder and wires EventsConsumer to the live queue via getPromiseQueue. |
| packages/core/src/events-consumer.test.ts | Updates unit tests to reflect promise-queue/nextTick-based deferral semantics. |
| packages/core/src/workflow.test.ts | Adds regression tests reproducing the production false-positive scenario (parallel steps + async hydration). |
| packages/core/src/workflow/sleep.test.ts | Updates test harness to pass getPromiseQueue option. |
| packages/core/src/workflow/hook.test.ts | Updates test harness to pass getPromiseQueue option. |
| packages/core/src/step.test.ts | Updates test harness to pass getPromiseQueue option. |
| packages/core/src/async-deserialization-ordering.test.ts | Updates test harness to pass getPromiseQueue option. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
…e unconsumed events The unconsumed event check was firing during the async gap between run_started consumption and the workflow function subscribing its first step callbacks. This happened because hydrateWorkflowArguments is async, and during its await, the EventsConsumer advanced to step_created events that had no subscriber yet. Fix: chain hydrateWorkflowArguments onto the promiseQueue so the unconsumed check (which waits for the queue to drain) doesn't fire until after the workflow arguments are hydrated and the workflow function has been invoked.
…ise propagation setTimeout(0) is insufficient because Node.js does not guarantee that macrotasks fire after all cross-context (VM boundary) microtasks settle. After promiseQueue resolves and resolve() fires in the host context, there are multiple microtask hops through the VM boundary before the workflow code actually calls subscribe(). A 100ms delay provides sufficient time for this propagation while still detecting truly orphaned events promptly. Also update sleep.test.ts to wait 200ms for the unconsumed check.
Summary
Fixes a production bug where the
EventsConsumer's unconsumed event check falsely flagged validstep_createdevents as orphaned when parallel steps performed async deserialization (e.g., encryption/decryption).Problem
When parallel steps A and B completed, their
hydrateStepReturnValuecalls did real async work via thepromiseQueue. TheEventsConsumer'ssetTimeout(0)unconsumed check fired before the promise chain resolved the step results and triggered the nextsubscribe()call. This caused step C'sstep_createdevent to be falsely flagged:Event log pattern:
Fix
promiseQueuevia a newgetPromiseQueueoption onEventsConsumerOptions, so it only fires after all pending async work (deserialization/decryption) completesprocess.nextTick(notsetTimeout) after the queue drains to avoid keeping the event loop aliveclearTimeoutsince the check is now promise-basedworkflow.tsuses a getter/setter on the context to keepgetPromiseQueuein sync as the queue is mutatedTest plan
workflow.test.ts): parallel steps A+B with 10ms mock deserialization delay, followed by sequential step C. Previously failed with the production error. Now passes.