From 734b4dfd99d5cba952d1cacc1ff0b4b05c34652d Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 4 Feb 2026 16:43:24 -0800 Subject: [PATCH 01/19] [core] Make getWorld interface asynchronous Signed-off-by: Peter Wielander --- .changeset/ninety-dancers-brush.md | 5 + packages/core/src/runtime.ts | 403 ++++++------ packages/core/src/runtime/helpers.ts | 4 +- packages/core/src/runtime/resume-hook.ts | 4 +- packages/core/src/runtime/run.ts | 32 +- packages/core/src/runtime/start.ts | 4 +- packages/core/src/runtime/step-handler.ts | 751 +++++++++++----------- packages/core/src/runtime/world.ts | 44 +- packages/core/src/serialization.ts | 5 +- 9 files changed, 657 insertions(+), 595 deletions(-) create mode 100644 .changeset/ninety-dancers-brush.md diff --git a/.changeset/ninety-dancers-brush.md b/.changeset/ninety-dancers-brush.md new file mode 100644 index 0000000000..a67832c44e --- /dev/null +++ b/.changeset/ninety-dancers-brush.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Make `getWorld` asynchronous so it can use dynamic imports diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 6e229a4971..6b4da97cfa 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -5,6 +5,7 @@ import { SPEC_VERSION_CURRENT, WorkflowInvokePayloadSchema, type WorkflowRun, + type World, } from '@workflow/world'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; @@ -16,7 +17,11 @@ import { withHealthCheck, } from './runtime/helpers.js'; import { handleSuspension } from './runtime/suspension-handler.js'; -import { getWorld, getWorldHandlers } from './runtime/world.js'; +import { + getWorld, + getWorldHandlers, + type WorldHandlers, +} from './runtime/world.js'; import { remapErrorStack } from './source-map.js'; import * as Attribute from './telemetry/semantic-conventions.js'; import { linkToCurrentContext, trace, withTraceContext } from './telemetry.js'; @@ -62,236 +67,238 @@ export { export function workflowEntrypoint( workflowCode: string ): (req: Request) => Promise { - const handler = getWorldHandlers().createQueueHandler( - '__wkf_workflow_', - async (message_, metadata) => { - // Check if this is a health check message - // NOTE: Health check messages are intentionally unauthenticated for monitoring purposes. - // They only write a simple status response to a stream and do not expose sensitive data. - // The stream name includes a unique correlationId that must be known by the caller. - const healthCheck = parseHealthCheckPayload(message_); - if (healthCheck) { - await handleHealthCheckMessage(healthCheck, 'workflow'); - return; - } - - const { - runId, - traceCarrier: traceContext, - requestedAt, - } = WorkflowInvokePayloadSchema.parse(message_); - // Extract the workflow name from the topic name - const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); - const spanLinks = await linkToCurrentContext(); - - // Invoke user workflow within the propagated trace context - return await withTraceContext(traceContext, async () => { - const world = getWorld(); - return trace( - `WORKFLOW ${workflowName}`, - { links: spanLinks }, - async (span) => { - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowOperation('execute'), - // Standard OTEL messaging conventions - ...Attribute.MessagingSystem('vercel-queue'), - ...Attribute.MessagingDestinationName(metadata.queueName), - ...Attribute.MessagingMessageId(metadata.messageId), - ...Attribute.MessagingOperationType('process'), - ...getQueueOverhead({ requestedAt }), - }); - - // TODO: validate `workflowName` exists before consuming message? + const handler = (worldHandlers: WorldHandlers) => + worldHandlers.createQueueHandler( + '__wkf_workflow_', + async (message_, metadata) => { + // Check if this is a health check message + // NOTE: Health check messages are intentionally unauthenticated for monitoring purposes. + // They only write a simple status response to a stream and do not expose sensitive data. + // The stream name includes a unique correlationId that must be known by the caller. + const healthCheck = parseHealthCheckPayload(message_); + if (healthCheck) { + await handleHealthCheckMessage(healthCheck, 'workflow'); + return; + } - span?.setAttributes({ - ...Attribute.WorkflowRunId(runId), - ...Attribute.WorkflowTracePropagated(!!traceContext), - }); + const { + runId, + traceCarrier: traceContext, + requestedAt, + } = WorkflowInvokePayloadSchema.parse(message_); + // Extract the workflow name from the topic name + const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); + const spanLinks = await linkToCurrentContext(); - let workflowStartedAt = -1; - try { - let workflowRun = await world.runs.get(runId); - - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create(runId, { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, - }); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; - } + // Invoke user workflow within the propagated trace context + return await withTraceContext(traceContext, async () => { + const world = await getWorld(); + return trace( + `WORKFLOW ${workflowName}`, + { links: spanLinks }, + async (span) => { + span?.setAttributes({ + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowOperation('execute'), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), + ...getQueueOverhead({ requestedAt }), + }); - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. - if (!workflowRun.startedAt) { - throw new WorkflowRuntimeError( - `Workflow run "${runId}" has no "startedAt" timestamp` - ); - } - workflowStartedAt = +workflowRun.startedAt; + // TODO: validate `workflowName` exists before consuming message? span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), + ...Attribute.WorkflowRunId(runId), + ...Attribute.WorkflowTracePropagated(!!traceContext), }); - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, - } - ); - - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. + let workflowStartedAt = -1; + try { + let workflowRun = await world.runs.get(runId); - return; - } + if (workflowRun.status === 'pending') { + // Transition run to 'running' via event (event-sourced architecture) + const result = await world.events.create(runId, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }); + // Use the run entity from the event response (no extra get call needed) + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` + ); + } + workflowRun = result.run; + } - // Load all events into memory before running - const events = await getAllWorkflowRunEvents(workflowRun.runId); + // At this point, the workflow is "running" and `startedAt` should + // definitely be set. + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` + ); + } + workflowStartedAt = +workflowRun.startedAt; - // Check for any elapsed waits and create wait_completed events - const now = Date.now(); + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); - // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) - const completedWaitIds = new Set( - events - .filter((e) => e.eventType === 'wait_completed') - .map((e) => e.correlationId) - ); + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } + ); - // Collect all waits that need completion - const waitsToComplete = events - .filter( - (e): e is typeof e & { correlationId: string } => - e.eventType === 'wait_created' && - e.correlationId !== undefined && - !completedWaitIds.has(e.correlationId) && - now >= (e.eventData.resumeAt as Date).getTime() - ) - .map((e) => ({ - eventType: 'wait_completed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: e.correlationId, - })); + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - const result = await world.events.create(runId, waitEvent); - // Add the event to the events array so the workflow can see it - events.push(result.event!); - } + return; + } - const result = await runWorkflow( - workflowCode, - workflowRun, - events - ); + // Load all events into memory before running + const events = await getAllWorkflowRunEvents(workflowRun.runId); - // Complete the workflow run via event (event-sourced architecture) - await world.events.create(runId, { - eventType: 'run_completed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - output: result, - }, - }); + // Check for any elapsed waits and create wait_completed events + const now = Date.now(); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('completed'), - ...Attribute.WorkflowEventsCount(events.length), - }); - } catch (err) { - if (WorkflowSuspension.is(err)) { - const suspensionMessage = buildWorkflowSuspensionMessage( - runId, - err.stepCount, - err.hookCount, - err.waitCount + // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) ); - if (suspensionMessage) { - runtimeLogger.debug(suspensionMessage); - } - const result = await handleSuspension({ - suspension: err, - world, - runId, - workflowName, - workflowStartedAt, - span, - }); + // Collect all waits that need completion + const waitsToComplete = events + .filter( + (e): e is typeof e & { correlationId: string } => + e.eventType === 'wait_created' && + e.correlationId !== undefined && + !completedWaitIds.has(e.correlationId) && + now >= (e.eventData.resumeAt as Date).getTime() + ) + .map((e) => ({ + eventType: 'wait_completed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: e.correlationId, + })); - if (result.timeoutSeconds !== undefined) { - return { timeoutSeconds: result.timeoutSeconds }; + // Create all wait_completed events + for (const waitEvent of waitsToComplete) { + const result = await world.events.create(runId, waitEvent); + // Add the event to the events array so the workflow can see it + events.push(result.event!); } - } else { - // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError - // (for instance when the event log is corrupted, this is thrown by the event consumer). We could - // specially handle these if needed. - - const errorName = getErrorName(err); - const errorMessage = - err instanceof Error ? err.message : String(err); - let errorStack = getErrorStack(err); - // Remap error stack using source maps to show original source locations - if (errorStack) { - const parsedName = parseWorkflowName(workflowName); - const filename = parsedName?.moduleSpecifier || workflowName; - errorStack = remapErrorStack( - errorStack, - filename, - workflowCode - ); - } + const result = await runWorkflow( + workflowCode, + workflowRun, + events + ); - runtimeLogger.error('Error while running workflow', { - workflowRunId: runId, - errorName, - errorStack, - }); - // Fail the workflow run via event (event-sourced architecture) + // Complete the workflow run via event (event-sourced architecture) await world.events.create(runId, { - eventType: 'run_failed', + eventType: 'run_completed', specVersion: SPEC_VERSION_CURRENT, eventData: { - error: { - message: errorMessage, - stack: errorStack, - }, - // TODO: include error codes when we define them + output: result, }, }); span?.setAttributes({ - ...Attribute.WorkflowRunStatus('failed'), - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(String(err)), + ...Attribute.WorkflowRunStatus('completed'), + ...Attribute.WorkflowEventsCount(events.length), }); + } catch (err) { + if (WorkflowSuspension.is(err)) { + const suspensionMessage = buildWorkflowSuspensionMessage( + runId, + err.stepCount, + err.hookCount, + err.waitCount + ); + if (suspensionMessage) { + runtimeLogger.debug(suspensionMessage); + } + + const result = await handleSuspension({ + suspension: err, + world, + runId, + workflowName, + workflowStartedAt, + span, + }); + + if (result.timeoutSeconds !== undefined) { + return { timeoutSeconds: result.timeoutSeconds }; + } + } else { + // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError + // (for instance when the event log is corrupted, this is thrown by the event consumer). We could + // specially handle these if needed. + + const errorName = getErrorName(err); + const errorMessage = + err instanceof Error ? err.message : String(err); + let errorStack = getErrorStack(err); + + // Remap error stack using source maps to show original source locations + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = + parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack( + errorStack, + filename, + workflowCode + ); + } + + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, + errorStack, + }); + // Fail the workflow run via event (event-sourced architecture) + await world.events.create(runId, { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: { + message: errorMessage, + stack: errorStack, + }, + // TODO: include error codes when we define them + }, + }); + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('failed'), + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(String(err)), + }); + } } } - } - ); // End withTraceContext - }); - } - ); + ); // End withTraceContext + }); + } + ); - return withHealthCheck(handler); + return withHealthCheck(async (req) => handler(await getWorldHandlers())(req)); } // this is a no-op placeholder as the client is diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 0c1e361330..d5d2bfe30a 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -65,7 +65,7 @@ export async function handleHealthCheckMessage( healthCheck: HealthCheckPayload, endpoint: 'workflow' | 'step' ): Promise { - const world = getWorld(); + const world = await getWorld(); const streamName = getHealthCheckStreamName(healthCheck.correlationId); const response = JSON.stringify({ healthy: true, @@ -259,7 +259,7 @@ export async function getAllWorkflowRunEvents(runId: string): Promise { let hasMore = true; let pagesLoaded = 0; - const world = getWorld(); + const world = await getWorld(); while (hasMore) { // TODO: we're currently loading all the data with resolveRef behaviour. We need to update this // to lazyload the data from the world instead so that we can optimize and make the event log loading diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index b94d511038..00cb82c466 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -24,7 +24,7 @@ import { getWorld } from './world.js'; * @param token - The unique token identifying the hook */ export async function getHookByToken(token: string): Promise { - const world = getWorld(); + const world = await getWorld(); const hook = await world.hooks.getByToken(token); if (typeof hook.metadata !== 'undefined') { hook.metadata = hydrateStepArguments(hook.metadata as any, [], hook.runId); @@ -67,7 +67,7 @@ export async function resumeHook( ): Promise { return await waitedUntil(() => { return trace('hook.resume', async (span) => { - const world = getWorld(); + const world = await getWorld(); try { const hook = diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 298e5850e0..70dffeec25 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -5,8 +5,8 @@ import { } from '@workflow/errors'; import { SPEC_VERSION_CURRENT, - type World, type WorkflowRunStatus, + type World, } from '@workflow/world'; import { getExternalRevivers, @@ -55,18 +55,19 @@ export class Run { * The world object. * @internal */ - private world: World; + private worldPromise: Promise; constructor(runId: string) { this.runId = runId; - this.world = getWorld(); + this.worldPromise = getWorld(); } /** * Cancels the workflow run. */ async cancel(): Promise { - await this.world.events.create(this.runId, { + const world = await this.worldPromise; + await world.events.create(this.runId, { eventType: 'run_cancelled', specVersion: SPEC_VERSION_CURRENT, }); @@ -76,7 +77,9 @@ export class Run { * The status of the workflow run. */ get status(): Promise { - return this.world.runs.get(this.runId).then((run) => run.status); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.status) + ); } /** @@ -91,14 +94,18 @@ export class Run { * The name of the workflow. */ get workflowName(): Promise { - return this.world.runs.get(this.runId).then((run) => run.workflowName); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.workflowName) + ); } /** * The timestamp when the workflow run was created. */ get createdAt(): Promise { - return this.world.runs.get(this.runId).then((run) => run.createdAt); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.createdAt) + ); } /** @@ -106,7 +113,9 @@ export class Run { * Returns undefined if the workflow has not started yet. */ get startedAt(): Promise { - return this.world.runs.get(this.runId).then((run) => run.startedAt); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.startedAt) + ); } /** @@ -114,7 +123,9 @@ export class Run { * Returns undefined if the workflow has not completed yet. */ get completedAt(): Promise { - return this.world.runs.get(this.runId).then((run) => run.completedAt); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.completedAt) + ); } /** @@ -148,9 +159,10 @@ export class Run { * @returns The workflow return value. */ private async pollReturnValue(): Promise { + const world = await this.worldPromise; while (true) { try { - const run = await this.world.runs.get(this.runId); + const run = await world.runs.get(this.runId); if (run.status === 'completed') { return hydrateWorkflowReturnValue(run.output, [], this.runId); diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 13e001b452..5e52754ebd 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -3,13 +3,13 @@ import { WorkflowRuntimeError } from '@workflow/errors'; import { withResolvers } from '@workflow/utils'; import type { WorkflowInvokePayload, World } from '@workflow/world'; import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world'; -import { Run } from './run.js'; import type { Serializable } from '../schemas.js'; import { dehydrateWorkflowArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; import { serializeTraceCarrier, trace } from '../telemetry.js'; import { waitedUntil } from '../util.js'; import { version as workflowCoreVersion } from '../version.js'; +import { Run } from './run.js'; import { getWorld } from './world.js'; export interface StartOptions { @@ -99,7 +99,7 @@ export async function start( ...Attribute.WorkflowArgumentsCount(args.length), }); - const world = opts?.world ?? getWorld(); + const world = opts?.world ?? (await getWorld()); const deploymentId = opts.deploymentId ?? (await world.getDeploymentId()); const ops: Promise[] = []; const { promise: runIdPromise, resolve: resolveRunId } = diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index f170c1bc57..ef5e8cd8f3 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -7,7 +7,11 @@ import { } from '@workflow/errors'; import { pluralize } from '@workflow/utils'; import { getPort } from '@workflow/utils/get-port'; -import { SPEC_VERSION_CURRENT, StepInvokePayloadSchema } from '@workflow/world'; +import { + SPEC_VERSION_CURRENT, + StepInvokePayloadSchema, + type World, +} from '@workflow/world'; import { runtimeLogger, stepLogger } from '../logger.js'; import { getStepFunction } from '../private.js'; import { @@ -31,457 +35,464 @@ import { queueMessage, withHealthCheck, } from './helpers.js'; -import { getWorld, getWorldHandlers } from './world.js'; +import { getWorld, getWorldHandlers, type WorldHandlers } from './world.js'; const DEFAULT_STEP_MAX_RETRIES = 3; -const stepHandler = getWorldHandlers().createQueueHandler( - '__wkf_step_', - async (message_, metadata) => { - // Check if this is a health check message - // NOTE: Health check messages are intentionally unauthenticated for monitoring purposes. - // They only write a simple status response to a stream and do not expose sensitive data. - // The stream name includes a unique correlationId that must be known by the caller. - const healthCheck = parseHealthCheckPayload(message_); - if (healthCheck) { - await handleHealthCheckMessage(healthCheck, 'step'); - return; - } +const stepHandler = (worldHandlers: WorldHandlers) => + worldHandlers.createQueueHandler( + '__wkf_step_', + async (message_, metadata) => { + // Check if this is a health check message + // NOTE: Health check messages are intentionally unauthenticated for monitoring purposes. + // They only write a simple status response to a stream and do not expose sensitive data. + // The stream name includes a unique correlationId that must be known by the caller. + const healthCheck = parseHealthCheckPayload(message_); + if (healthCheck) { + await handleHealthCheckMessage(healthCheck, 'step'); + return; + } + + const { + workflowName, + workflowRunId, + workflowStartedAt, + stepId, + traceCarrier: traceContext, + requestedAt, + } = StepInvokePayloadSchema.parse(message_); + const spanLinks = await linkToCurrentContext(); + // Execute step within the propagated trace context + return await withTraceContext(traceContext, async () => { + // Extract the step name from the topic name + const stepName = metadata.queueName.slice('__wkf_step_'.length); + + // Get the port early to avoid async operations during step execution + const port = await getPort(); + const world = await getWorld(); + return trace( + `step ${stepName}`, + { kind: await getSpanKind('CONSUMER'), links: spanLinks }, + async (span) => { + span?.setAttributes({ + ...Attribute.StepName(stepName), + ...Attribute.StepAttempt(metadata.attempt), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), + ...getQueueOverhead({ requestedAt }), + }); - const { - workflowName, - workflowRunId, - workflowStartedAt, - stepId, - traceCarrier: traceContext, - requestedAt, - } = StepInvokePayloadSchema.parse(message_); - const spanLinks = await linkToCurrentContext(); - // Execute step within the propagated trace context - return await withTraceContext(traceContext, async () => { - // Extract the step name from the topic name - const stepName = metadata.queueName.slice('__wkf_step_'.length); - const world = getWorld(); - - // Get the port early to avoid async operations during step execution - const port = await getPort(); - - return trace( - `step ${stepName}`, - { kind: await getSpanKind('CONSUMER'), links: spanLinks }, - async (span) => { - span?.setAttributes({ - ...Attribute.StepName(stepName), - ...Attribute.StepAttempt(metadata.attempt), - // Standard OTEL messaging conventions - ...Attribute.MessagingSystem('vercel-queue'), - ...Attribute.MessagingDestinationName(metadata.queueName), - ...Attribute.MessagingMessageId(metadata.messageId), - ...Attribute.MessagingOperationType('process'), - ...getQueueOverhead({ requestedAt }), - }); - - const stepFn = getStepFunction(stepName); - if (!stepFn) { - throw new Error(`Step "${stepName}" not found`); - } - if (typeof stepFn !== 'function') { - throw new Error( - `Step "${stepName}" is not a function (got ${typeof stepFn})` - ); - } + const stepFn = getStepFunction(stepName); + if (!stepFn) { + throw new Error(`Step "${stepName}" not found`); + } + if (typeof stepFn !== 'function') { + throw new Error( + `Step "${stepName}" is not a function (got ${typeof stepFn})` + ); + } + + const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowRunId(workflowRunId), - ...Attribute.StepId(stepId), - ...Attribute.StepMaxRetries(maxRetries), - ...Attribute.StepTracePropagated(!!traceContext), - }); - - let step = await world.steps.get(workflowRunId, stepId); - - runtimeLogger.debug('Step execution details', { - stepName, - stepId: step.stepId, - status: step.status, - attempt: step.attempt, - }); - - span?.setAttributes({ - ...Attribute.StepStatus(step.status), - }); - - // Check if the step has a `retryAfter` timestamp that hasn't been reached yet - const now = Date.now(); - if (step.retryAfter && step.retryAfter.getTime() > now) { - const timeoutSeconds = Math.ceil( - (step.retryAfter.getTime() - now) / 1000 - ); span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowRunId(workflowRunId), + ...Attribute.StepId(stepId), + ...Attribute.StepMaxRetries(maxRetries), + ...Attribute.StepTracePropagated(!!traceContext), }); - runtimeLogger.debug('Step retryAfter timestamp not yet reached', { - stepName, - stepId: step.stepId, - retryAfter: step.retryAfter, - timeoutSeconds, - }); - return { timeoutSeconds }; - } - let result: unknown; - - // Check max retries FIRST before any state changes. - // step.attempt tracks how many times step_started has been called. - // If step.attempt >= maxRetries, we've already tried maxRetries times. - // This handles edge cases where the step handler is invoked after max retries have been exceeded - // (e.g., when the step repeatedly times out or fails before reaching the catch handler). - // Without this check, the step would retry forever. - // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 - // Use > here (not >=) because this guards against re-invocation AFTER all attempts are used. - // The post-failure check uses >= to decide whether to retry after a failure. - if (step.attempt > maxRetries + 1) { - const retryCount = step.attempt - 1; - const errorMessage = `Step "${stepName}" exceeded max retries (${retryCount} ${pluralize('retry', 'retries', retryCount)})`; - stepLogger.error('Step exceeded max retries', { - workflowRunId, + let step = await world.steps.get(workflowRunId, stepId); + + runtimeLogger.debug('Step execution details', { stepName, - retryCount, - }); - // Fail the step via event (event-sourced architecture) - await world.events.create(workflowRunId, { - eventType: 'step_failed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: errorMessage, - stack: step.error?.stack, - }, + stepId: step.stepId, + status: step.status, + attempt: step.attempt, }); span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepRetryExhausted(true), + ...Attribute.StepStatus(step.status), }); - // Re-invoke the workflow to handle the failed step - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); - return; - } + // Check if the step has a `retryAfter` timestamp that hasn't been reached yet + const now = Date.now(); + if (step.retryAfter && step.retryAfter.getTime() > now) { + const timeoutSeconds = Math.ceil( + (step.retryAfter.getTime() - now) / 1000 + ); + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + }); + runtimeLogger.debug('Step retryAfter timestamp not yet reached', { + stepName, + stepId: step.stepId, + retryAfter: step.retryAfter, + timeoutSeconds, + }); + return { timeoutSeconds }; + } - try { - if (!['pending', 'running'].includes(step.status)) { - // We should only be running the step if it's either - // a) pending - initial state, or state set on re-try - // b) running - if a step fails mid-execution, like a function timeout - // otherwise, the step has been invoked erroneously - stepLogger.warn('Step invoked erroneously, skipping execution', { + let result: unknown; + + // Check max retries FIRST before any state changes. + // step.attempt tracks how many times step_started has been called. + // If step.attempt >= maxRetries, we've already tried maxRetries times. + // This handles edge cases where the step handler is invoked after max retries have been exceeded + // (e.g., when the step repeatedly times out or fails before reaching the catch handler). + // Without this check, the step would retry forever. + // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 + // Use > here (not >=) because this guards against re-invocation AFTER all attempts are used. + // The post-failure check uses >= to decide whether to retry after a failure. + if (step.attempt > maxRetries + 1) { + const retryCount = step.attempt - 1; + const errorMessage = `Step "${stepName}" exceeded max retries (${retryCount} ${pluralize('retry', 'retries', retryCount)})`; + stepLogger.error('Step exceeded max retries', { workflowRunId, stepName, - expectedStatus: ['pending', 'running'], - actualStatus: step.status, + retryCount, + }); + // Fail the step via event (event-sourced architecture) + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: errorMessage, + stack: step.error?.stack, + }, }); + span?.setAttributes({ - ...Attribute.StepSkipped(true), - ...Attribute.StepSkipReason(step.status), + ...Attribute.StepStatus('failed'), + ...Attribute.StepRetryExhausted(true), + }); + + // Re-invoke the workflow to handle the failed step + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), }); - // There's a chance that a step terminates correctly, but the underlying process - // fails or gets killed before the stepEntrypoint has a chance to re-enqueue the run. - // The queue lease expires and stepEntrypoint again, which leads us here, so - // we optimistically re-enqueue the workflow if the step is in a terminal state, - // under the assumption that this edge case happened. - // Until we move to atomic entity/event updates (World V2), there _could_ be an edge case - // where the we execute this code based on the `step` entity status, but the runtime - // failed to create the `step_completed` event (due to failing between step and event update), - // in which case, this might lead to an infinite loop. - // https://vercel.slack.com/archives/C09125LC4AX/p1765313809066679 - const isTerminalStep = [ - 'completed', - 'failed', - 'cancelled', - ].includes(step.status); - if (isTerminalStep) { - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); - } return; } - // Start the step via event (event-sourced architecture) - // step_started increments the attempt counter in the World implementation - const startResult = await world.events.create(workflowRunId, { - eventType: 'step_started', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - }); - - // Use the step entity from the event response (no extra get call needed) - if (!startResult.step) { - throw new WorkflowRuntimeError( - `step_started event for "${stepId}" did not return step entity` - ); - } - step = startResult.step; + try { + if (!['pending', 'running'].includes(step.status)) { + // We should only be running the step if it's either + // a) pending - initial state, or state set on re-try + // b) running - if a step fails mid-execution, like a function timeout + // otherwise, the step has been invoked erroneously + stepLogger.warn( + 'Step invoked erroneously, skipping execution', + { + workflowRunId, + stepName, + expectedStatus: ['pending', 'running'], + actualStatus: step.status, + } + ); + span?.setAttributes({ + ...Attribute.StepSkipped(true), + ...Attribute.StepSkipReason(step.status), + }); + // There's a chance that a step terminates correctly, but the underlying process + // fails or gets killed before the stepEntrypoint has a chance to re-enqueue the run. + // The queue lease expires and stepEntrypoint again, which leads us here, so + // we optimistically re-enqueue the workflow if the step is in a terminal state, + // under the assumption that this edge case happened. + // Until we move to atomic entity/event updates (World V2), there _could_ be an edge case + // where the we execute this code based on the `step` entity status, but the runtime + // failed to create the `step_completed` event (due to failing between step and event update), + // in which case, this might lead to an infinite loop. + // https://vercel.slack.com/archives/C09125LC4AX/p1765313809066679 + const isTerminalStep = [ + 'completed', + 'failed', + 'cancelled', + ].includes(step.status); + if (isTerminalStep) { + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + } + return; + } - // step.attempt is now the current attempt number (after increment) - const attempt = step.attempt; + // Start the step via event (event-sourced architecture) + // step_started increments the attempt counter in the World implementation + const startResult = await world.events.create(workflowRunId, { + eventType: 'step_started', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + }); - if (!step.startedAt) { - throw new WorkflowRuntimeError( - `Step "${stepId}" has no "startedAt" timestamp` - ); - } - // Hydrate the step input arguments, closure variables, and thisVal - // Track deserialization time for observability - // NOTE: This captures only the synchronous portion of hydration. Any async - // operations (e.g., stream loading) are added to `ops` and executed later - // via Promise.all(ops) - their timing is not included in this measurement. - const deserializeStartTime = Date.now(); - const ops: Promise[] = []; - const hydratedInput = hydrateStepArguments( - step.input, - ops, - workflowRunId - ); - const deserializeTimeMs = Date.now() - deserializeStartTime; - - const args = hydratedInput.args; - const thisVal = hydratedInput.thisVal ?? null; + // Use the step entity from the event response (no extra get call needed) + if (!startResult.step) { + throw new WorkflowRuntimeError( + `step_started event for "${stepId}" did not return step entity` + ); + } + step = startResult.step; - span?.setAttributes({ - ...Attribute.StepArgumentsCount(args.length), - ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), - }); + // step.attempt is now the current attempt number (after increment) + const attempt = step.attempt; - // Track execution time for observability - const executionStartTime = Date.now(); - result = await contextStorage.run( - { - stepMetadata: { - stepId, - stepStartedAt: new Date(+step.startedAt), - attempt, - }, - workflowMetadata: { - workflowRunId, - workflowStartedAt: new Date(+workflowStartedAt), - // TODO: there should be a getUrl method on the world interface itself. This - // solution only works for vercel + local worlds. - url: process.env.VERCEL_URL - ? `https://${process.env.VERCEL_URL}` - : `http://localhost:${port ?? 3000}`, - }, + if (!step.startedAt) { + throw new WorkflowRuntimeError( + `Step "${stepId}" has no "startedAt" timestamp` + ); + } + // Hydrate the step input arguments, closure variables, and thisVal + // Track deserialization time for observability + // NOTE: This captures only the synchronous portion of hydration. Any async + // operations (e.g., stream loading) are added to `ops` and executed later + // via Promise.all(ops) - their timing is not included in this measurement. + const deserializeStartTime = Date.now(); + const ops: Promise[] = []; + const hydratedInput = hydrateStepArguments( + step.input, ops, - closureVars: hydratedInput.closureVars, - }, - () => stepFn.apply(thisVal, args) - ); - const executionTimeMs = Date.now() - executionStartTime; + workflowRunId + ); + const deserializeTimeMs = Date.now() - deserializeStartTime; - span?.setAttributes({ - ...Attribute.QueueExecutionTimeMs(executionTimeMs), - }); + const args = hydratedInput.args; + const thisVal = hydratedInput.thisVal ?? null; - // NOTE: None of the code from this point is guaranteed to run - // Since the step might fail or cause a function timeout and the process might be SIGKILL'd - // The workflow runtime must be resilient to the below code not executing on a failed step - // Track serialization time for observability - const serializeStartTime = Date.now(); - result = dehydrateStepReturnValue(result, ops, workflowRunId); - const serializeTimeMs = Date.now() - serializeStartTime; + span?.setAttributes({ + ...Attribute.StepArgumentsCount(args.length), + ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), + }); - span?.setAttributes({ - ...Attribute.QueueSerializeTimeMs(serializeTimeMs), - }); + // Track execution time for observability + const executionStartTime = Date.now(); + result = await contextStorage.run( + { + stepMetadata: { + stepId, + stepStartedAt: new Date(+step.startedAt), + attempt, + }, + workflowMetadata: { + workflowRunId, + workflowStartedAt: new Date(+workflowStartedAt), + // TODO: there should be a getUrl method on the world interface itself. This + // solution only works for vercel + local worlds. + url: process.env.VERCEL_URL + ? `https://${process.env.VERCEL_URL}` + : `http://localhost:${port ?? 3000}`, + }, + ops, + closureVars: hydratedInput.closureVars, + }, + () => stepFn.apply(thisVal, args) + ); + const executionTimeMs = Date.now() - executionStartTime; - waitUntil( - Promise.all(ops).catch((err) => { - // Ignore expected client disconnect errors (e.g., browser refresh during streaming) - const isAbortError = - err?.name === 'AbortError' || err?.name === 'ResponseAborted'; - if (!isAbortError) throw err; - }) - ); - - // Complete the step via event (event-sourced architecture) - // The event creation atomically updates the step entity - // result was dehydrated above by dehydrateStepReturnValue, which returns Uint8Array - await world.events.create(workflowRunId, { - eventType: 'step_completed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - result: result as Uint8Array, - }, - }); + span?.setAttributes({ + ...Attribute.QueueExecutionTimeMs(executionTimeMs), + }); - span?.setAttributes({ - ...Attribute.StepStatus('completed'), - ...Attribute.StepResultType(typeof result), - }); - } catch (err: unknown) { - span?.setAttributes({ - ...Attribute.StepErrorName(getErrorName(err)), - ...Attribute.StepErrorMessage(String(err)), - }); + // NOTE: None of the code from this point is guaranteed to run + // Since the step might fail or cause a function timeout and the process might be SIGKILL'd + // The workflow runtime must be resilient to the below code not executing on a failed step + // Track serialization time for observability + const serializeStartTime = Date.now(); + result = dehydrateStepReturnValue(result, ops, workflowRunId); + const serializeTimeMs = Date.now() - serializeStartTime; - if (WorkflowAPIError.is(err)) { - if (err.status === 410) { - // Workflow has already completed, so no-op - stepLogger.info( - 'Workflow run already completed, skipping step', - { - workflowRunId, - stepId, - message: err.message, - } - ); - return; - } - } + span?.setAttributes({ + ...Attribute.QueueSerializeTimeMs(serializeTimeMs), + }); - if (FatalError.is(err)) { - const errorStack = getErrorStack(err); - stepLogger.error( - 'Encountered FatalError while executing step, bubbling up to parent workflow', - { - workflowRunId, - stepName, - errorStack, - } + waitUntil( + Promise.all(ops).catch((err) => { + // Ignore expected client disconnect errors (e.g., browser refresh during streaming) + const isAbortError = + err?.name === 'AbortError' || + err?.name === 'ResponseAborted'; + if (!isAbortError) throw err; + }) ); - // Fail the step via event (event-sourced architecture) + + // Complete the step via event (event-sourced architecture) + // The event creation atomically updates the step entity + // result was dehydrated above by dehydrateStepReturnValue, which returns Uint8Array await world.events.create(workflowRunId, { - eventType: 'step_failed', + eventType: 'step_completed', specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, eventData: { - error: String(err), - stack: errorStack, + result: result as Uint8Array, }, }); span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepFatalError(true), + ...Attribute.StepStatus('completed'), + ...Attribute.StepResultType(typeof result), }); - } else { - const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - // step.attempt was incremented by step_started, use it here - const currentAttempt = step.attempt; - + } catch (err: unknown) { span?.setAttributes({ - ...Attribute.StepAttempt(currentAttempt), - ...Attribute.StepMaxRetries(maxRetries), + ...Attribute.StepErrorName(getErrorName(err)), + ...Attribute.StepErrorMessage(String(err)), }); - // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 - if (currentAttempt >= maxRetries + 1) { - // Max retries reached + if (WorkflowAPIError.is(err)) { + if (err.status === 410) { + // Workflow has already completed, so no-op + stepLogger.info( + 'Workflow run already completed, skipping step', + { + workflowRunId, + stepId, + message: err.message, + } + ); + return; + } + } + + if (FatalError.is(err)) { const errorStack = getErrorStack(err); - const retryCount = step.attempt - 1; stepLogger.error( - 'Max retries reached, bubbling error to parent workflow', + 'Encountered FatalError while executing step, bubbling up to parent workflow', { workflowRunId, stepName, - attempt: step.attempt, - retryCount, errorStack, } ); - const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${String(err)}`; // Fail the step via event (event-sourced architecture) await world.events.create(workflowRunId, { eventType: 'step_failed', specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, eventData: { - error: errorMessage, + error: String(err), stack: errorStack, }, }); span?.setAttributes({ ...Attribute.StepStatus('failed'), - ...Attribute.StepRetryExhausted(true), + ...Attribute.StepFatalError(true), }); } else { - // Not at max retries yet - log as a retryable error - if (RetryableError.is(err)) { - stepLogger.warn( - 'Encountered RetryableError, step will be retried', + const maxRetries = + stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; + // step.attempt was incremented by step_started, use it here + const currentAttempt = step.attempt; + + span?.setAttributes({ + ...Attribute.StepAttempt(currentAttempt), + ...Attribute.StepMaxRetries(maxRetries), + }); + + // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 + if (currentAttempt >= maxRetries + 1) { + // Max retries reached + const errorStack = getErrorStack(err); + const retryCount = step.attempt - 1; + stepLogger.error( + 'Max retries reached, bubbling error to parent workflow', { workflowRunId, stepName, - attempt: currentAttempt, - message: err.message, + attempt: step.attempt, + retryCount, + errorStack, } ); + const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${String(err)}`; + // Fail the step via event (event-sourced architecture) + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: errorMessage, + stack: errorStack, + }, + }); + + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepRetryExhausted(true), + }); } else { + // Not at max retries yet - log as a retryable error + if (RetryableError.is(err)) { + stepLogger.warn( + 'Encountered RetryableError, step will be retried', + { + workflowRunId, + stepName, + attempt: currentAttempt, + message: err.message, + } + ); + } else { + const errorStack = getErrorStack(err); + stepLogger.warn('Encountered Error, step will be retried', { + workflowRunId, + stepName, + attempt: currentAttempt, + errorStack, + }); + } + // Set step to pending for retry via event (event-sourced architecture) + // step_retrying records the error and sets status to pending const errorStack = getErrorStack(err); - stepLogger.warn('Encountered Error, step will be retried', { - workflowRunId, - stepName, - attempt: currentAttempt, - errorStack, + await world.events.create(workflowRunId, { + eventType: 'step_retrying', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: String(err), + stack: errorStack, + ...(RetryableError.is(err) && { + retryAfter: err.retryAfter, + }), + }, }); - } - // Set step to pending for retry via event (event-sourced architecture) - // step_retrying records the error and sets status to pending - const errorStack = getErrorStack(err); - await world.events.create(workflowRunId, { - eventType: 'step_retrying', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: String(err), - stack: errorStack, - ...(RetryableError.is(err) && { - retryAfter: err.retryAfter, - }), - }, - }); - const timeoutSeconds = Math.max( - 1, - RetryableError.is(err) - ? Math.ceil((+err.retryAfter.getTime() - Date.now()) / 1000) - : 1 - ); + const timeoutSeconds = Math.max( + 1, + RetryableError.is(err) + ? Math.ceil( + (+err.retryAfter.getTime() - Date.now()) / 1000 + ) + : 1 + ); - span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), - ...Attribute.StepRetryWillRetry(true), - }); + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + ...Attribute.StepRetryWillRetry(true), + }); - // It's a retryable error - so have the queue keep the message visible - // so that it gets retried. - return { timeoutSeconds }; + // It's a retryable error - so have the queue keep the message visible + // so that it gets retried. + return { timeoutSeconds }; + } } } - } - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); - } - ); - }); - } -); + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + } + ); + }); + } + ); /** * A single route that handles any step execution request and routes to the @@ -489,4 +500,6 @@ const stepHandler = getWorldHandlers().createQueueHandler( * for each step, this is temporary. */ export const stepEntrypoint: (req: Request) => Promise = - /* @__PURE__ */ withHealthCheck(stepHandler); + /* @__PURE__ */ withHealthCheck(async (req) => + stepHandler(await getWorldHandlers())(req) + ); diff --git a/packages/core/src/runtime/world.ts b/packages/core/src/runtime/world.ts index 1da089e729..cf5fef7d78 100644 --- a/packages/core/src/runtime/world.ts +++ b/packages/core/src/runtime/world.ts @@ -1,11 +1,9 @@ import { createRequire } from 'node:module'; -import { join } from 'node:path'; +import { pathToFileURL } from 'node:url'; import type { World } from '@workflow/world'; import { createLocalWorld } from '@workflow/world-local'; import { createVercelWorld } from '@workflow/world-vercel'; -const require = createRequire(join(process.cwd(), 'index.js')); - const WorldCache = Symbol.for('@workflow/world//cache'); const StubbedWorldCache = Symbol.for('@workflow/world//stubbedCache'); @@ -22,12 +20,35 @@ function defaultWorld(): 'vercel' | 'local' { return 'local'; } +const dynamicImport = new Function('specifier', 'return import(specifier)') as ( + specifier: string +) => Promise; + +function resolveModulePath(specifier: string): string { + if ( + specifier.startsWith('file://') || + specifier.startsWith('/') || + specifier.startsWith('./') || + specifier.startsWith('../') + ) { + return specifier; + } + try { + const require = createRequire( + pathToFileURL(process.cwd() + '/package.json').href + ); + return pathToFileURL(require.resolve(specifier)).href; + } catch { + return specifier; + } +} + /** * Create a new world instance based on environment variables. * WORKFLOW_TARGET_WORLD is used to determine the target world. * All other environment variables are specific to the target world */ -export const createWorld = (): World => { +export const createWorld = async (): Promise => { const targetWorld = process.env.WORKFLOW_TARGET_WORLD || defaultWorld(); if (targetWorld === 'vercel') { @@ -47,7 +68,8 @@ export const createWorld = (): World => { }); } - const mod = require(targetWorld); + const resolvedPath = resolveModulePath(targetWorld); + const mod = await dynamicImport(resolvedPath); if (typeof mod === 'function') { return mod() as World; } else if (typeof mod.default === 'function') { @@ -61,6 +83,8 @@ export const createWorld = (): World => { ); }; +export type WorldHandlers = Pick; + /** * Some functions from the world are needed at build time, but we do NOT want * to cache the world in those instances for general use, since we don't have @@ -70,23 +94,23 @@ export const createWorld = (): World => { * Once we migrate to a file-based configuration (workflow.config.ts), we should * be able to re-combine getWorld and getWorldHandlers into one singleton. */ -export const getWorldHandlers = (): Pick => { +export const getWorldHandlers = async (): Promise => { if (globalSymbols[StubbedWorldCache]) { return globalSymbols[StubbedWorldCache]; } - const _world = createWorld(); + const _world = await createWorld(); globalSymbols[StubbedWorldCache] = _world; return { createQueueHandler: _world.createQueueHandler, }; }; -export const getWorld = (): World => { +export const getWorld = async (): Promise => { if (globalSymbols[WorldCache]) { return globalSymbols[WorldCache]; } - globalSymbols[WorldCache] = createWorld(); - return globalSymbols[WorldCache]; + globalSymbols[WorldCache] = await createWorld(); + return Promise.resolve(globalSymbols[WorldCache]); }; /** diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 172275e902..5f2f4d1ed7 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -252,7 +252,7 @@ export class WorkflowServerReadableStream extends ReadableStream { pull: async (controller) => { let reader = this.#reader; if (!reader) { - const world = getWorld(); + const world = await getWorld(); const stream = await world.readFromStream(name, startIndex); reader = this.#reader = stream.getReader(); } @@ -293,7 +293,6 @@ export class WorkflowServerWritableStream extends WritableStream { if (typeof name !== 'string' || name.length === 0) { throw new Error(`"name" is required, got "${name}"`); } - const world = getWorld(); // Buffering state for batched writes let buffer: Uint8Array[] = []; @@ -314,6 +313,7 @@ export class WorkflowServerWritableStream extends WritableStream { const _runId = await runId; + const world = await getWorld(); // Use writeToStreamMulti if available for batch writes if ( typeof world.writeToStreamMulti === 'function' && @@ -362,6 +362,7 @@ export class WorkflowServerWritableStream extends WritableStream { await flush(); const _runId = await runId; + const world = await getWorld(); await world.closeStream(name, _runId); }, abort() { From f82f218a8d5f5989cbde0d83c44030be5abbbdb2 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 4 Feb 2026 16:44:13 -0800 Subject: [PATCH 02/19] Fixes Signed-off-by: Peter Wielander --- packages/core/src/runtime.ts | 1 - packages/core/src/runtime/step-handler.ts | 6 +----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 6b4da97cfa..10ac925aa2 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -5,7 +5,6 @@ import { SPEC_VERSION_CURRENT, WorkflowInvokePayloadSchema, type WorkflowRun, - type World, } from '@workflow/world'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index ef5e8cd8f3..26b4bc61e8 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -7,11 +7,7 @@ import { } from '@workflow/errors'; import { pluralize } from '@workflow/utils'; import { getPort } from '@workflow/utils/get-port'; -import { - SPEC_VERSION_CURRENT, - StepInvokePayloadSchema, - type World, -} from '@workflow/world'; +import { SPEC_VERSION_CURRENT, StepInvokePayloadSchema } from '@workflow/world'; import { runtimeLogger, stepLogger } from '../logger.js'; import { getStepFunction } from '../private.js'; import { From a0d82025271b7b973b333e79db8fa7a3fde6488e Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 4 Feb 2026 17:01:44 -0800 Subject: [PATCH 03/19] Fixes Signed-off-by: Peter Wielander --- .../docs/api-reference/workflow-api/get-world.mdx | 10 +++++----- .../content/docs/deploying/world/postgres-world.mdx | 13 +++++++++---- packages/cli/src/lib/inspect/env.ts | 1 + .../web-shared/src/api/workflow-server-actions.ts | 2 +- packages/world-postgres/HOW_IT_WORKS.md | 9 ++++++--- packages/world-testing/src/server.mts | 5 +++-- workbench/astro/scripts/start-with-pg.mjs | 6 +++++- workbench/astro/src/pages/api/test-health-check.ts | 2 +- workbench/example/api/test-health-check.ts | 2 +- workbench/express/src/index.ts | 2 +- workbench/fastify/src/index.ts | 2 +- workbench/hono/src/index.ts | 2 +- workbench/nest/src/app.controller.ts | 2 +- workbench/nest/src/main.ts | 7 +++++-- .../app/api/test-health-check/route.ts | 2 +- workbench/nextjs-turbopack/instrumentation.ts | 6 +++++- .../app/api/test-health-check/route.ts | 2 +- .../nitro-v2/server/api/test-health-check.post.ts | 2 +- workbench/nitro-v3/plugins/start-pg-world.ts | 7 +++++-- .../nitro-v3/routes/api/test-health-check.post.ts | 2 +- workbench/nuxt/server/api/test-health-check.post.ts | 2 +- workbench/nuxt/server/plugins/start-pg-world.ts | 7 +++++-- workbench/sveltekit/src/hooks.server.ts | 7 +++++-- .../src/routes/api/test-health-check/+server.ts | 2 +- workbench/vite/routes/api/test-health-check.post.ts | 2 +- 25 files changed, 68 insertions(+), 38 deletions(-) diff --git a/docs/content/docs/api-reference/workflow-api/get-world.mdx b/docs/content/docs/api-reference/workflow-api/get-world.mdx index 60d7c80b7e..669d4c99c2 100644 --- a/docs/content/docs/api-reference/workflow-api/get-world.mdx +++ b/docs/content/docs/api-reference/workflow-api/get-world.mdx @@ -3,14 +3,14 @@ title: getWorld description: Access the World instance for low-level storage, queuing, and streaming operations. --- -Retrieves the World instance for direct access to workflow storage, queuing, and streaming backends. This function returns a `World` which provides low-level access to manage workflow runs, steps, events, and hooks. +Retrieves the World instance for direct access to workflow storage, queuing, and streaming backends. This async function returns a `Promise` which provides low-level access to manage workflow runs, steps, events, and hooks. Use this function when you need direct access to the underlying workflow infrastructure, such as listing all runs, querying events, or implementing custom workflow management logic. ```typescript lineNumbers import { getWorld } from "workflow/runtime"; -const world = getWorld(); +const world = await getWorld(); ``` ## API Signature @@ -21,7 +21,7 @@ This function does not accept any parameters. ### Returns -Returns a `World` object: +Returns a `Promise` object: { const { getWorld } = await import("workflow/runtime"); - await getWorld().start?.(); + const world = await getWorld(); + await world.start?.(); }; ``` @@ -85,7 +87,8 @@ import { defineNitroPlugin } from "nitro/~internal/runtime/plugin"; export default defineNitroPlugin(async () => { const { getWorld } = await import("workflow/runtime"); - await getWorld().start?.(); + const world = await getWorld(); + await world.start?.(); }); ``` @@ -155,7 +158,8 @@ Number of concurrent workers polling for jobs. Default: `10` ### Programmatic configuration -{/* @skip-typecheck: incomplete code sample */} +{/*@skip-typecheck: incomplete code sample*/} + ```typescript title="workflow.config.ts" lineNumbers import { createWorld } from "@workflow/world-postgres"; @@ -186,6 +190,7 @@ Deploy your application to any cloud that supports long-running servers: - Platform-as-a-Service providers (Railway, Render, Fly.io, etc.) Ensure your deployment has: + 1. Network access to your PostgreSQL database 2. Environment variables configured correctly 3. The `start()` function called on server initialization diff --git a/packages/cli/src/lib/inspect/env.ts b/packages/cli/src/lib/inspect/env.ts index c385f11fae..a34de547f3 100644 --- a/packages/cli/src/lib/inspect/env.ts +++ b/packages/cli/src/lib/inspect/env.ts @@ -42,6 +42,7 @@ export const getEnvVars = (): Record => { WORKFLOW_VERCEL_PROJECT: env.WORKFLOW_VERCEL_PROJECT || '', WORKFLOW_VERCEL_TEAM: env.WORKFLOW_VERCEL_TEAM || '', WORKFLOW_LOCAL_UI: env.WORKFLOW_LOCAL_UI || '', + WORKFLOW_LOCAL_BASE_URL: env.WORKFLOW_LOCAL_BASE_URL || '', PORT: env.PORT || '', WORKFLOW_LOCAL_DATA_DIR: env.WORKFLOW_LOCAL_DATA_DIR || '', WORKFLOW_MANIFEST_PATH: env.WORKFLOW_MANIFEST_PATH || '', diff --git a/packages/web-shared/src/api/workflow-server-actions.ts b/packages/web-shared/src/api/workflow-server-actions.ts index 3122cb4217..d62e1d05e5 100644 --- a/packages/web-shared/src/api/workflow-server-actions.ts +++ b/packages/web-shared/src/api/workflow-server-actions.ts @@ -443,7 +443,7 @@ async function getWorldFromEnv(userEnvMap: EnvMap): Promise { return cachedWorld; } - const world = createWorld(); + const world = await createWorld(); worldCache.set(cacheKey, world); return world; } diff --git a/packages/world-postgres/HOW_IT_WORKS.md b/packages/world-postgres/HOW_IT_WORKS.md index 0ca48cd32f..1334b4ce4f 100644 --- a/packages/world-postgres/HOW_IT_WORKS.md +++ b/packages/world-postgres/HOW_IT_WORKS.md @@ -41,9 +41,12 @@ In **Next.js**, the `world.setup()` function needs to be added to `instrumentati // instrumentation.ts if (process.env.NEXT_RUNTIME !== "edge") { - import("workflow/api").then(async ({ getWorld }) => { - // start listening to the jobs. - await getWorld().start?.(); + import("workflow/runtime").then(async ({ getWorld }) => { + const world = await getWorld(); + if (world.start) { + console.log('Starting workers for pg-boss queues...'); + await world.start(); + } }); } ``` diff --git a/packages/world-testing/src/server.mts b/packages/world-testing/src/server.mts index 9f52d29ec8..c5d61fa6ce 100644 --- a/packages/world-testing/src/server.mts +++ b/packages/world-testing/src/server.mts @@ -65,7 +65,8 @@ const app = new Hono() return ctx.json({ runId, hookId: hook.hookId }); }) .get('/runs/:runId', async (ctx) => { - const run = await getWorld().runs.get(ctx.req.param('runId')); + const world = await getWorld(); + const run = await world.runs.get(ctx.req.param('runId')); // Custom JSON serialization to handle Uint8Array as base64 const json = JSON.stringify(run, (_key, value) => { if (value instanceof Uint8Array) { @@ -112,7 +113,7 @@ serve( } } - const world = getWorld(); + const world = await getWorld(); if (world.start) { console.log(`starting background tasks...`); await world.start().then( diff --git a/workbench/astro/scripts/start-with-pg.mjs b/workbench/astro/scripts/start-with-pg.mjs index c6c5060906..def1d9f795 100644 --- a/workbench/astro/scripts/start-with-pg.mjs +++ b/workbench/astro/scripts/start-with-pg.mjs @@ -8,7 +8,11 @@ async function main() { if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { console.log('Starting Postgres World...'); const { getWorld } = await import('workflow/runtime'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } } // Now start the Astro server diff --git a/workbench/astro/src/pages/api/test-health-check.ts b/workbench/astro/src/pages/api/test-health-check.ts index a340a71d14..c4d73daf28 100644 --- a/workbench/astro/src/pages/api/test-health-check.ts +++ b/workbench/astro/src/pages/api/test-health-check.ts @@ -11,7 +11,7 @@ export async function POST({ request }: { request: Request }) { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/example/api/test-health-check.ts b/workbench/example/api/test-health-check.ts index d838392cd2..3659ff734c 100644 --- a/workbench/example/api/test-health-check.ts +++ b/workbench/example/api/test-health-check.ts @@ -11,7 +11,7 @@ export async function POST(req: Request) { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/express/src/index.ts b/workbench/express/src/index.ts index d29fcd53b9..717b783c0d 100644 --- a/workbench/express/src/index.ts +++ b/workbench/express/src/index.ts @@ -223,7 +223,7 @@ app.post('/api/test-health-check', async (req, res) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/fastify/src/index.ts b/workbench/fastify/src/index.ts index e2c15da35b..3db872c79d 100644 --- a/workbench/fastify/src/index.ts +++ b/workbench/fastify/src/index.ts @@ -276,7 +276,7 @@ server.post('/api/test-health-check', async (req: any, reply) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/hono/src/index.ts b/workbench/hono/src/index.ts index 02232a07e4..2f7d6ff7d1 100644 --- a/workbench/hono/src/index.ts +++ b/workbench/hono/src/index.ts @@ -211,7 +211,7 @@ app.post('/api/test-health-check', async ({ req }) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nest/src/app.controller.ts b/workbench/nest/src/app.controller.ts index b48ad3012b..20139f353f 100644 --- a/workbench/nest/src/app.controller.ts +++ b/workbench/nest/src/app.controller.ts @@ -254,7 +254,7 @@ export class AppController { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint as 'workflow' | 'step', { timeout, }); diff --git a/workbench/nest/src/main.ts b/workbench/nest/src/main.ts index 100c8774eb..b0c22548c6 100644 --- a/workbench/nest/src/main.ts +++ b/workbench/nest/src/main.ts @@ -7,8 +7,11 @@ async function bootstrap() { // Start the Postgres World if configured if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { const { getWorld } = await import('workflow/runtime'); - console.log('Starting Postgres World...'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } } const app = await NestFactory.create(AppModule, { diff --git a/workbench/nextjs-turbopack/app/api/test-health-check/route.ts b/workbench/nextjs-turbopack/app/api/test-health-check/route.ts index d838392cd2..3659ff734c 100644 --- a/workbench/nextjs-turbopack/app/api/test-health-check/route.ts +++ b/workbench/nextjs-turbopack/app/api/test-health-check/route.ts @@ -11,7 +11,7 @@ export async function POST(req: Request) { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nextjs-turbopack/instrumentation.ts b/workbench/nextjs-turbopack/instrumentation.ts index 174137a971..508eddd8d3 100644 --- a/workbench/nextjs-turbopack/instrumentation.ts +++ b/workbench/nextjs-turbopack/instrumentation.ts @@ -5,6 +5,10 @@ registerOTel({ serviceName: 'example-nextjs-workflow' }); if (process.env.NEXT_RUNTIME !== 'edge') { // kickstart the world import('workflow/runtime').then(async ({ getWorld }) => { - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } }); } diff --git a/workbench/nextjs-webpack/app/api/test-health-check/route.ts b/workbench/nextjs-webpack/app/api/test-health-check/route.ts index d838392cd2..3659ff734c 100644 --- a/workbench/nextjs-webpack/app/api/test-health-check/route.ts +++ b/workbench/nextjs-webpack/app/api/test-health-check/route.ts @@ -11,7 +11,7 @@ export async function POST(req: Request) { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nitro-v2/server/api/test-health-check.post.ts b/workbench/nitro-v2/server/api/test-health-check.post.ts index 4e48e30b7f..32cc532b9e 100644 --- a/workbench/nitro-v2/server/api/test-health-check.post.ts +++ b/workbench/nitro-v2/server/api/test-health-check.post.ts @@ -13,7 +13,7 @@ export default defineEventHandler(async (event) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nitro-v3/plugins/start-pg-world.ts b/workbench/nitro-v3/plugins/start-pg-world.ts index 7e9cff2247..43f690fdad 100644 --- a/workbench/nitro-v3/plugins/start-pg-world.ts +++ b/workbench/nitro-v3/plugins/start-pg-world.ts @@ -5,8 +5,11 @@ import { defineNitroPlugin } from 'nitro/~internal/runtime/plugin'; export default defineNitroPlugin(async () => { if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { import('workflow/runtime').then(async ({ getWorld }) => { - console.log('Starting Postgres World...'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } }); } }); diff --git a/workbench/nitro-v3/routes/api/test-health-check.post.ts b/workbench/nitro-v3/routes/api/test-health-check.post.ts index 86222b7a32..84a62e597c 100644 --- a/workbench/nitro-v3/routes/api/test-health-check.post.ts +++ b/workbench/nitro-v3/routes/api/test-health-check.post.ts @@ -11,7 +11,7 @@ export default async ({ req }: { req: Request }) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nuxt/server/api/test-health-check.post.ts b/workbench/nuxt/server/api/test-health-check.post.ts index 4e48e30b7f..32cc532b9e 100644 --- a/workbench/nuxt/server/api/test-health-check.post.ts +++ b/workbench/nuxt/server/api/test-health-check.post.ts @@ -13,7 +13,7 @@ export default defineEventHandler(async (event) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nuxt/server/plugins/start-pg-world.ts b/workbench/nuxt/server/plugins/start-pg-world.ts index 2824d2b3ec..613d8e3110 100644 --- a/workbench/nuxt/server/plugins/start-pg-world.ts +++ b/workbench/nuxt/server/plugins/start-pg-world.ts @@ -5,8 +5,11 @@ import { defineNitroPlugin } from '#imports'; export default defineNitroPlugin(async () => { if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { import('workflow/runtime').then(async ({ getWorld }) => { - console.log('Starting Postgres World...'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } }); } }); diff --git a/workbench/sveltekit/src/hooks.server.ts b/workbench/sveltekit/src/hooks.server.ts index 16d598cf0b..619b303f8b 100644 --- a/workbench/sveltekit/src/hooks.server.ts +++ b/workbench/sveltekit/src/hooks.server.ts @@ -5,7 +5,10 @@ export const init: ServerInit = async () => { // Needed since we test this in CI if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { const { getWorld } = await import('workflow/runtime'); - console.log('Starting Postgres World...'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } } }; diff --git a/workbench/sveltekit/src/routes/api/test-health-check/+server.ts b/workbench/sveltekit/src/routes/api/test-health-check/+server.ts index c68147a822..07d8ba3ac7 100644 --- a/workbench/sveltekit/src/routes/api/test-health-check/+server.ts +++ b/workbench/sveltekit/src/routes/api/test-health-check/+server.ts @@ -12,7 +12,7 @@ export const POST: RequestHandler = async ({ request }) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/vite/routes/api/test-health-check.post.ts b/workbench/vite/routes/api/test-health-check.post.ts index 86222b7a32..84a62e597c 100644 --- a/workbench/vite/routes/api/test-health-check.post.ts +++ b/workbench/vite/routes/api/test-health-check.post.ts @@ -11,7 +11,7 @@ export default async ({ req }: { req: Request }) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); From b086650a82c4e1adc56f03146e822f572b78fabe Mon Sep 17 00:00:00 2001 From: Vercel Date: Thu, 5 Feb 2026 01:04:24 +0000 Subject: [PATCH 04/19] Fix: Race condition in getWorld() and getWorldHandlers() allows multiple world instances to be created when called concurrently Co-authored-by: VaguelySerious --- packages/core/src/runtime/world.ts | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/packages/core/src/runtime/world.ts b/packages/core/src/runtime/world.ts index cf5fef7d78..f52e57a25e 100644 --- a/packages/core/src/runtime/world.ts +++ b/packages/core/src/runtime/world.ts @@ -6,10 +6,16 @@ import { createVercelWorld } from '@workflow/world-vercel'; const WorldCache = Symbol.for('@workflow/world//cache'); const StubbedWorldCache = Symbol.for('@workflow/world//stubbedCache'); +const WorldCachePromise = Symbol.for('@workflow/world//cachePromise'); +const StubbedWorldCachePromise = Symbol.for( + '@workflow/world//stubbedCachePromise' +); const globalSymbols: typeof globalThis & { [WorldCache]?: World; [StubbedWorldCache]?: World; + [WorldCachePromise]?: Promise; + [StubbedWorldCachePromise]?: Promise; } = globalThis; function defaultWorld(): 'vercel' | 'local' { @@ -98,7 +104,11 @@ export const getWorldHandlers = async (): Promise => { if (globalSymbols[StubbedWorldCache]) { return globalSymbols[StubbedWorldCache]; } - const _world = await createWorld(); + // Store the promise immediately to prevent race conditions with concurrent calls + if (!globalSymbols[StubbedWorldCachePromise]) { + globalSymbols[StubbedWorldCachePromise] = createWorld(); + } + const _world = await globalSymbols[StubbedWorldCachePromise]; globalSymbols[StubbedWorldCache] = _world; return { createQueueHandler: _world.createQueueHandler, @@ -109,8 +119,12 @@ export const getWorld = async (): Promise => { if (globalSymbols[WorldCache]) { return globalSymbols[WorldCache]; } - globalSymbols[WorldCache] = await createWorld(); - return Promise.resolve(globalSymbols[WorldCache]); + // Store the promise immediately to prevent race conditions with concurrent calls + if (!globalSymbols[WorldCachePromise]) { + globalSymbols[WorldCachePromise] = createWorld(); + } + globalSymbols[WorldCache] = await globalSymbols[WorldCachePromise]; + return globalSymbols[WorldCache]; }; /** @@ -120,4 +134,6 @@ export const getWorld = async (): Promise => { export const setWorld = (world: World | undefined): void => { globalSymbols[WorldCache] = world; globalSymbols[StubbedWorldCache] = world; + globalSymbols[WorldCachePromise] = undefined; + globalSymbols[StubbedWorldCachePromise] = undefined; }; From efff295cc4642a0c0b1b4d8c9b834a637f63cd79 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 4 Feb 2026 17:38:50 -0800 Subject: [PATCH 05/19] Maybe fix tests Signed-off-by: Peter Wielander --- packages/core/src/runtime/world.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/core/src/runtime/world.ts b/packages/core/src/runtime/world.ts index f52e57a25e..ce6fce748a 100644 --- a/packages/core/src/runtime/world.ts +++ b/packages/core/src/runtime/world.ts @@ -1,4 +1,5 @@ import { createRequire } from 'node:module'; +import { resolve } from 'node:path'; import { pathToFileURL } from 'node:url'; import type { World } from '@workflow/world'; import { createLocalWorld } from '@workflow/world-local'; @@ -31,14 +32,19 @@ const dynamicImport = new Function('specifier', 'return import(specifier)') as ( ) => Promise; function resolveModulePath(specifier: string): string { - if ( - specifier.startsWith('file://') || - specifier.startsWith('/') || - specifier.startsWith('./') || - specifier.startsWith('../') - ) { + // Already a file:// URL + if (specifier.startsWith('file://')) { return specifier; } + // Absolute path - convert to file:// URL + if (specifier.startsWith('/')) { + return pathToFileURL(specifier).href; + } + // Relative path - resolve relative to cwd and convert to file:// URL + if (specifier.startsWith('./') || specifier.startsWith('../')) { + return pathToFileURL(resolve(process.cwd(), specifier)).href; + } + // Package specifier - use require.resolve to find the package try { const require = createRequire( pathToFileURL(process.cwd() + '/package.json').href From 15193e8ea5edd9f9acc4fa2b0379217e89d7c613 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 5 Feb 2026 16:03:35 -0800 Subject: [PATCH 06/19] Fix build Signed-off-by: Peter Wielander --- packages/web-shared/src/index.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/web-shared/src/index.ts b/packages/web-shared/src/index.ts index adc6a96385..d8416ab4a6 100644 --- a/packages/web-shared/src/index.ts +++ b/packages/web-shared/src/index.ts @@ -1,20 +1,18 @@ +export type { + HealthCheckEndpoint, + HealthCheckResult, +} from '@workflow/core/runtime'; export { parseStepName, parseWorkflowName, } from '@workflow/utils/parse-name'; export type { Event, Hook, Step, WorkflowRun } from '@workflow/world'; - export * from './api/workflow-api-client'; export type { EnvMap, - HealthCheckResultWithLatency, PublicServerConfig, } from './api/workflow-server-actions'; export { runHealthCheck } from './api/workflow-server-actions'; -export type { - HealthCheckEndpoint, - HealthCheckResult, -} from '@workflow/core/runtime'; export { ErrorBoundary } from './error-boundary'; export { EventListView } from './event-list-view'; export type { From 187150746d085146ee01ff1cd144f4164132ca98 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 5 Feb 2026 17:31:21 -0800 Subject: [PATCH 07/19] Revert "Fix build" This reverts commit 15193e8ea5edd9f9acc4fa2b0379217e89d7c613. --- packages/web-shared/src/index.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/web-shared/src/index.ts b/packages/web-shared/src/index.ts index d8416ab4a6..adc6a96385 100644 --- a/packages/web-shared/src/index.ts +++ b/packages/web-shared/src/index.ts @@ -1,18 +1,20 @@ -export type { - HealthCheckEndpoint, - HealthCheckResult, -} from '@workflow/core/runtime'; export { parseStepName, parseWorkflowName, } from '@workflow/utils/parse-name'; export type { Event, Hook, Step, WorkflowRun } from '@workflow/world'; + export * from './api/workflow-api-client'; export type { EnvMap, + HealthCheckResultWithLatency, PublicServerConfig, } from './api/workflow-server-actions'; export { runHealthCheck } from './api/workflow-server-actions'; +export type { + HealthCheckEndpoint, + HealthCheckResult, +} from '@workflow/core/runtime'; export { ErrorBoundary } from './error-boundary'; export { EventListView } from './event-list-view'; export type { From 4dcfbc20774fee83d7e59a2c93e625ac03d51979 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 3 Apr 2026 12:21:22 -0700 Subject: [PATCH 08/19] Fix build and test errors after merge - Remove duplicate WORKFLOW_LOCAL_BASE_URL in env.ts - Add await to createWorld() call in CLI setup (now async) - Update step-handler tests for async getWorldHandlers pattern Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/cli/src/lib/inspect/env.ts | 1 - packages/cli/src/lib/inspect/setup.ts | 2 +- .../core/src/runtime/step-handler.test.ts | 33 ++++++++++++++----- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/packages/cli/src/lib/inspect/env.ts b/packages/cli/src/lib/inspect/env.ts index b5db963d70..a659e81785 100644 --- a/packages/cli/src/lib/inspect/env.ts +++ b/packages/cli/src/lib/inspect/env.ts @@ -45,7 +45,6 @@ export const getEnvVars = (): Record => { WORKFLOW_VERCEL_TEAM: env.WORKFLOW_VERCEL_TEAM || '', WORKFLOW_LOCAL_BASE_URL: env.WORKFLOW_LOCAL_BASE_URL || '', WORKFLOW_LOCAL_UI: env.WORKFLOW_LOCAL_UI || '', - WORKFLOW_LOCAL_BASE_URL: env.WORKFLOW_LOCAL_BASE_URL || '', PORT: env.PORT || '', WORKFLOW_LOCAL_DATA_DIR: env.WORKFLOW_LOCAL_DATA_DIR || '', WORKFLOW_MANIFEST_PATH: env.WORKFLOW_MANIFEST_PATH || '', diff --git a/packages/cli/src/lib/inspect/setup.ts b/packages/cli/src/lib/inspect/setup.ts index 2cd7d60f6f..dde0b13599 100644 --- a/packages/cli/src/lib/inspect/setup.ts +++ b/packages/cli/src/lib/inspect/setup.ts @@ -128,7 +128,7 @@ export const setupCliWorld = async ( }, }); } else { - world = createWorld(); + world = await createWorld(); } // Store in the global cache so BaseCommand.finally() can find and close it. diff --git a/packages/core/src/runtime/step-handler.test.ts b/packages/core/src/runtime/step-handler.test.ts index aefac8aee3..b2fd7673c1 100644 --- a/packages/core/src/runtime/step-handler.test.ts +++ b/packages/core/src/runtime/step-handler.test.ts @@ -1,5 +1,13 @@ import { EntityConflictError, WorkflowWorldError } from '@workflow/errors'; -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + afterEach, + beforeAll, + beforeEach, + describe, + expect, + it, + vi, +} from 'vitest'; // Use vi.hoisted so these are available in mock factories const { @@ -47,12 +55,12 @@ vi.mock('@vercel/functions', () => ({ // Mock the world module - createQueueHandler captures the handler vi.mock('./world.js', () => ({ - getWorld: vi.fn(() => ({ + getWorld: vi.fn(async () => ({ events: { create: mockEventsCreate }, queue: mockQueue, getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined), })), - getWorldHandlers: vi.fn(() => ({ + getWorldHandlers: vi.fn(async () => ({ createQueueHandler: vi.fn( ( _prefix: string, @@ -139,9 +147,10 @@ vi.mock('@workflow/utils/get-port', () => ({ getPort: vi.fn().mockResolvedValue(3000), })); -// Import the module AFTER all mocks are set up - this triggers createQueueHandler -// which populates capturedHandlerRef -import './step-handler.js'; +// Import the module AFTER all mocks are set up +// Since getWorldHandlers is now async, we need to call stepEntrypoint +// to trigger createQueueHandler and populate capturedHandlerRef +import { stepEntrypoint } from './step-handler.js'; import { MAX_QUEUE_DELIVERIES } from './constants.js'; import { getStepFunction } from '../private.js'; import { @@ -188,6 +197,12 @@ function createMessage(overrides: Record = {}) { } describe('step-handler 409 handling', () => { + // Trigger the lazy handler initialization by calling stepEntrypoint once. + // This invokes getWorldHandlers() which calls createQueueHandler and captures the handler. + beforeAll(async () => { + await stepEntrypoint(new Request('http://localhost')); + }); + beforeEach(() => { vi.clearAllMocks(); // Re-set mocks after clearAllMocks @@ -205,7 +220,7 @@ describe('step-handler 409 handling', () => { mockStepFn.maxRetries = 3; mockQueueMessage.mockResolvedValue(undefined); // Re-set getWorld mock since clearAllMocks resets it - vi.mocked(getWorld).mockReturnValue({ + vi.mocked(getWorld).mockResolvedValue({ events: { create: mockEventsCreate }, queue: mockQueue, getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined), @@ -506,7 +521,7 @@ describe('step-handler max deliveries', () => { mockStepFn.mockReset().mockResolvedValue('step-result'); mockStepFn.maxRetries = 3; mockQueueMessage.mockResolvedValue(undefined); - vi.mocked(getWorld).mockReturnValue({ + vi.mocked(getWorld).mockResolvedValue({ events: { create: mockEventsCreate }, queue: mockQueue, getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined), @@ -580,7 +595,7 @@ describe('step-handler step not found', () => { mockStepFn.mockReset().mockResolvedValue('step-result'); mockStepFn.maxRetries = 3; mockQueueMessage.mockResolvedValue(undefined); - vi.mocked(getWorld).mockReturnValue({ + vi.mocked(getWorld).mockResolvedValue({ events: { create: mockEventsCreate }, queue: mockQueue, getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined), From b16728cd5d90badb295394d4148c5eec23e04adc Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 3 Apr 2026 13:00:13 -0700 Subject: [PATCH 09/19] Address PR review feedback - Cache resolved queue handlers to avoid per-request createQueueHandler overhead - Clear rejected promise cache in getWorld/getWorldHandlers so failures can be retried - Add BREAKING CHANGE annotation to changeset Co-Authored-By: Claude Opus 4.6 (1M context) --- .changeset/ninety-dancers-brush.md | 3 ++- packages/core/src/runtime.ts | 8 +++++++- packages/core/src/runtime/step-handler.ts | 10 +++++++--- packages/core/src/runtime/world.ts | 16 ++++++++++++---- 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/.changeset/ninety-dancers-brush.md b/.changeset/ninety-dancers-brush.md index a67832c44e..7f2a7c6528 100644 --- a/.changeset/ninety-dancers-brush.md +++ b/.changeset/ninety-dancers-brush.md @@ -1,5 +1,6 @@ --- "@workflow/core": patch +"@workflow/cli": patch --- -Make `getWorld` asynchronous so it can use dynamic imports +**BREAKING CHANGE**: Make `getWorld` and `createWorld` asynchronous to support ESM dynamic imports for custom world modules. All callers must now `await getWorld()`. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 77af212cbc..94910e0ea0 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -607,7 +607,13 @@ export function workflowEntrypoint( } ); - return withHealthCheck(async (req) => handler(await getWorldHandlers())(req)); + let cachedHandler: ((req: Request) => Promise) | undefined; + return withHealthCheck(async (req) => { + if (!cachedHandler) { + cachedHandler = handler(await getWorldHandlers()); + } + return cachedHandler(req); + }); } // this is a no-op placeholder as the client is diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index b79b503a56..ee4984f537 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -877,7 +877,11 @@ const stepHandler = (worldHandlers: WorldHandlers) => * appropriate step function. We may eventually want to create different bundles * for each step, this is temporary. */ +let cachedStepHandler: ((req: Request) => Promise) | undefined; export const stepEntrypoint: (req: Request) => Promise = - /* @__PURE__ */ withHealthCheck(async (req) => - stepHandler(await getWorldHandlers())(req) - ); + /* @__PURE__ */ withHealthCheck(async (req) => { + if (!cachedStepHandler) { + cachedStepHandler = stepHandler(await getWorldHandlers()); + } + return cachedStepHandler(req); + }); diff --git a/packages/core/src/runtime/world.ts b/packages/core/src/runtime/world.ts index 6ee53f512c..b64d7b3f44 100644 --- a/packages/core/src/runtime/world.ts +++ b/packages/core/src/runtime/world.ts @@ -130,9 +130,13 @@ export const getWorldHandlers = async (): Promise => { if (globalSymbols[StubbedWorldCache]) { return globalSymbols[StubbedWorldCache]; } - // Store the promise immediately to prevent race conditions with concurrent calls + // Store the promise immediately to prevent race conditions with concurrent calls. + // Clear on rejection so subsequent calls can retry instead of caching the failure. if (!globalSymbols[StubbedWorldCachePromise]) { - globalSymbols[StubbedWorldCachePromise] = createWorld(); + globalSymbols[StubbedWorldCachePromise] = createWorld().catch((err) => { + globalSymbols[StubbedWorldCachePromise] = undefined; + throw err; + }); } const _world = await globalSymbols[StubbedWorldCachePromise]; globalSymbols[StubbedWorldCache] = _world; @@ -145,9 +149,13 @@ export const getWorld = async (): Promise => { if (globalSymbols[WorldCache]) { return globalSymbols[WorldCache]; } - // Store the promise immediately to prevent race conditions with concurrent calls + // Store the promise immediately to prevent race conditions with concurrent calls. + // Clear on rejection so subsequent calls can retry instead of caching the failure. if (!globalSymbols[WorldCachePromise]) { - globalSymbols[WorldCachePromise] = createWorld(); + globalSymbols[WorldCachePromise] = createWorld().catch((err) => { + globalSymbols[WorldCachePromise] = undefined; + throw err; + }); } globalSymbols[WorldCache] = await globalSymbols[WorldCachePromise]; return globalSymbols[WorldCache]; From 81a548b7afa094ff7ec735785ff09d958d3b2f5c Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 3 Apr 2026 13:35:15 -0700 Subject: [PATCH 10/19] Fix: try require() before dynamicImport for custom world modules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `new Function('specifier', 'return import(specifier)')` throws "A dynamic import callback was not specified" in CJS contexts like the vitest e2e test runner. Try require() first — it works for CJS-compatible packages (e.g. @workflow/world-postgres) and in test runners. Fall back to dynamic import() for ESM-only modules. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/src/runtime/world.ts | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/packages/core/src/runtime/world.ts b/packages/core/src/runtime/world.ts index b64d7b3f44..1fa136ffa8 100644 --- a/packages/core/src/runtime/world.ts +++ b/packages/core/src/runtime/world.ts @@ -9,6 +9,10 @@ import type { World } from '@workflow/world'; import { createLocalWorld } from '@workflow/world-local'; import { createVercelWorld } from '@workflow/world-vercel'; +const require = createRequire( + pathToFileURL(process.cwd() + '/package.json').href +); + const WorldCache = Symbol.for('@workflow/world//cache'); const StubbedWorldCache = Symbol.for('@workflow/world//stubbedCache'); const WorldCachePromise = Symbol.for('@workflow/world//cachePromise'); @@ -24,9 +28,10 @@ const globalSymbols: typeof globalThis & { } = globalThis; /** - * This hides the dynamic import behind a function to prevent the bundler from - * trying to resolve it at build time, instead of at runtime, since the world - * being imported might not exist at build time. + * Hides the dynamic import behind `new Function` to prevent bundlers from + * trying to resolve it at build time, since the world module may not exist + * at build time. Falls back to `require()` in environments where + * `new Function`-based `import()` is unavailable (e.g. CJS test runners). */ const dynamicImport = new Function('specifier', 'return import(specifier)') as ( specifier: string @@ -47,9 +52,6 @@ function resolveModulePath(specifier: string): string { } // Package specifier - use require.resolve to find the package try { - const require = createRequire( - pathToFileURL(process.cwd() + '/package.json').href - ); return pathToFileURL(require.resolve(specifier)).href; } catch { return specifier; @@ -100,8 +102,16 @@ export const createWorld = async (): Promise => { }); } - const resolvedPath = resolveModulePath(targetWorld); - const mod = await dynamicImport(resolvedPath); + // Try require() first — works for CJS-compatible packages and in test + // runners where `new Function`-based import() is unavailable. + // Fall back to dynamic import() for ESM-only modules. + let mod: any; + try { + mod = require(targetWorld); + } catch { + const resolvedPath = resolveModulePath(targetWorld); + mod = await dynamicImport(resolvedPath); + } if (typeof mod === 'function') { return mod() as World; } else if (typeof mod.default === 'function') { From 4300ff047f8f791f38afb0c9820cdae29e2a1287 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 7 Apr 2026 15:46:22 -0700 Subject: [PATCH 11/19] fix: await getWorld() in resilient start e2e test getWorld() is now async, but the resilient start test was missing the await, causing it to spread a Promise instead of a World instance. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/e2e/e2e.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 548e8fe44a..b67330f9bd 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -2196,7 +2196,7 @@ describe('e2e', () => { // (run_created) throws a 500 server error. The queue should still // be dispatched with runInput, and the runtime should bootstrap // the run via the run_started fallback path. - const realWorld = getWorld(); + const realWorld = await getWorld(); let createCallCount = 0; const stubbedWorld: World = { ...realWorld, From a5b0e82dde5cefa755d4c7199d21bad0921f9b2e Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 7 Apr 2026 18:23:40 -0700 Subject: [PATCH 12/19] fix: don't encrypt hook metadata so webhook handler can read it The webhook handler (resumeWebhook) needs to read hook metadata to determine the respondWith behavior. However, the webhook Lambda may not have the deployment encryption key available, causing metadata hydration to fail with "Encrypted stream data encountered but no encryption key is available". Fix by passing undefined instead of the encryption key when serializing hook metadata in the suspension handler. Hook metadata is small (just respondWith config) and doesn't need encryption. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/src/runtime/suspension-handler.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index 20b6a815fe..b31c2443c9 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -105,7 +105,10 @@ export async function handleSuspension({ : ((await dehydrateStepArguments( queueItem.metadata, runId, - encryptionKey, + // Don't encrypt hook metadata — the webhook handler reads it + // via getHookByTokenWithKey and may not have the deployment + // encryption key available (e.g. standalone webhook Lambda). + undefined, suspension.globalThis )) as SerializedData); return { From 8345fcfebab83d8a003dae6e97a67ee2fc41e937 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 8 Apr 2026 09:53:39 -0700 Subject: [PATCH 13/19] fix: update encryption capability minVersion after v5 version reset MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The package version was reset from 4.2.x-beta to 4.0.0 for the v5 beta release. The encryption format capability check used 4.2.0-beta.64 as the minimum version, causing getRunCapabilities("4.0.0") to report encryption as unsupported. This made resumeHook strip the encryption key, while the step handler still encrypted data — causing "Encrypted stream data encountered but no encryption key is available" errors in the webhook handler. Fix by lowering the minVersion to 4.0.0 to cover the reset range. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/src/capabilities.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/core/src/capabilities.ts b/packages/core/src/capabilities.ts index 22baebc1d4..7ff632514f 100644 --- a/packages/core/src/capabilities.ts +++ b/packages/core/src/capabilities.ts @@ -49,7 +49,9 @@ const FORMAT_VERSION_TABLE: ReadonlyArray<{ format: SerializationFormatType; minVersion: string; }> = [ - { format: SerializationFormat.ENCRYPTED, minVersion: '4.2.0-beta.64' }, + // Encryption was introduced in 4.2.0-beta.64 and carried forward through + // the version reset to 4.0.0 for the v5 beta release. Both ranges support it. + { format: SerializationFormat.ENCRYPTED, minVersion: '4.0.0' }, // Future entries: // { format: SerializationFormat.CBOR, minVersion: '5.x.y' }, // { format: SerializationFormat.ENCRYPTED_V2, minVersion: '5.x.y' }, From 8a5acb5c7aaa51f74172017f5b1cb9ab5714d6ee Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 8 Apr 2026 10:21:55 -0700 Subject: [PATCH 14/19] fix: update capabilities test for new encryption minVersion Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/src/capabilities.test.ts | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/core/src/capabilities.test.ts b/packages/core/src/capabilities.test.ts index c402d7b821..d000405c9c 100644 --- a/packages/core/src/capabilities.test.ts +++ b/packages/core/src/capabilities.test.ts @@ -28,16 +28,16 @@ describe('getRunCapabilities', () => { describe('v-prefixed versions', () => { it('handles v-prefixed version strings', () => { // semver.valid() coerces "v" prefix — this is valid input - const { supportedFormats } = getRunCapabilities('v4.2.0-beta.64'); + const { supportedFormats } = getRunCapabilities('v4.0.0'); expect(supportedFormats.has(SerializationFormat.ENCRYPTED)).toBe(true); }); }); describe('pre-encryption versions', () => { it.each([ - '4.1.0-beta.63', - '4.0.1-beta.27', + '3.9.9', '3.0.0', + '2.0.0', ])('%s does not support encryption', (version) => { const { supportedFormats } = getRunCapabilities(version); expect(supportedFormats.has(SerializationFormat.DEVALUE_V1)).toBe(true); @@ -46,14 +46,16 @@ describe('getRunCapabilities', () => { }); describe('encryption-capable versions', () => { - it('supports encryption at the exact cutoff version', () => { - const { supportedFormats } = getRunCapabilities('4.2.0-beta.64'); + it('supports encryption at the exact cutoff version (4.0.0)', () => { + const { supportedFormats } = getRunCapabilities('4.0.0'); expect(supportedFormats.has(SerializationFormat.DEVALUE_V1)).toBe(true); expect(supportedFormats.has(SerializationFormat.ENCRYPTED)).toBe(true); }); it.each([ - '4.2.0-beta.74', + '4.0.1-beta.27', + '4.1.0-beta.63', + '4.2.0-beta.64', '4.2.0', '5.0.0', ])('%s supports encryption', (version) => { From e129d92970cf00310f9a9d695521bbed932e3e94 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 8 Apr 2026 11:02:50 -0700 Subject: [PATCH 15/19] revert: keep encrypting hook metadata in suspension handler Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/src/runtime/suspension-handler.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index b31c2443c9..20b6a815fe 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -105,10 +105,7 @@ export async function handleSuspension({ : ((await dehydrateStepArguments( queueItem.metadata, runId, - // Don't encrypt hook metadata — the webhook handler reads it - // via getHookByTokenWithKey and may not have the deployment - // encryption key available (e.g. standalone webhook Lambda). - undefined, + encryptionKey, suspension.globalThis )) as SerializedData); return { From 9220639e8a2685a33077499867396b6165572efc Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 8 Apr 2026 11:34:00 -0700 Subject: [PATCH 16/19] fix: sync package versions to 5.0.0-beta.0 and revert capabilities changes The PR branches had version 4.0.0 (intermediate changeset reset) instead of 5.0.0-beta.0 (the actual published version). This caused getRunCapabilities("4.0.0") to report encryption as unsupported, breaking the webhook respondWith flow on Vercel Prod deployments. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/package.json | 2 +- packages/core/src/capabilities.test.ts | 14 ++++++-------- packages/core/src/capabilities.ts | 4 +--- packages/workflow/package.json | 2 +- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/packages/core/package.json b/packages/core/package.json index 64e13062e8..9536643d78 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@workflow/core", - "version": "4.0.0", + "version": "5.0.0-beta.0", "description": "Core runtime and engine for Workflow SDK", "type": "module", "main": "dist/index.js", diff --git a/packages/core/src/capabilities.test.ts b/packages/core/src/capabilities.test.ts index d000405c9c..c402d7b821 100644 --- a/packages/core/src/capabilities.test.ts +++ b/packages/core/src/capabilities.test.ts @@ -28,16 +28,16 @@ describe('getRunCapabilities', () => { describe('v-prefixed versions', () => { it('handles v-prefixed version strings', () => { // semver.valid() coerces "v" prefix — this is valid input - const { supportedFormats } = getRunCapabilities('v4.0.0'); + const { supportedFormats } = getRunCapabilities('v4.2.0-beta.64'); expect(supportedFormats.has(SerializationFormat.ENCRYPTED)).toBe(true); }); }); describe('pre-encryption versions', () => { it.each([ - '3.9.9', + '4.1.0-beta.63', + '4.0.1-beta.27', '3.0.0', - '2.0.0', ])('%s does not support encryption', (version) => { const { supportedFormats } = getRunCapabilities(version); expect(supportedFormats.has(SerializationFormat.DEVALUE_V1)).toBe(true); @@ -46,16 +46,14 @@ describe('getRunCapabilities', () => { }); describe('encryption-capable versions', () => { - it('supports encryption at the exact cutoff version (4.0.0)', () => { - const { supportedFormats } = getRunCapabilities('4.0.0'); + it('supports encryption at the exact cutoff version', () => { + const { supportedFormats } = getRunCapabilities('4.2.0-beta.64'); expect(supportedFormats.has(SerializationFormat.DEVALUE_V1)).toBe(true); expect(supportedFormats.has(SerializationFormat.ENCRYPTED)).toBe(true); }); it.each([ - '4.0.1-beta.27', - '4.1.0-beta.63', - '4.2.0-beta.64', + '4.2.0-beta.74', '4.2.0', '5.0.0', ])('%s supports encryption', (version) => { diff --git a/packages/core/src/capabilities.ts b/packages/core/src/capabilities.ts index 7ff632514f..22baebc1d4 100644 --- a/packages/core/src/capabilities.ts +++ b/packages/core/src/capabilities.ts @@ -49,9 +49,7 @@ const FORMAT_VERSION_TABLE: ReadonlyArray<{ format: SerializationFormatType; minVersion: string; }> = [ - // Encryption was introduced in 4.2.0-beta.64 and carried forward through - // the version reset to 4.0.0 for the v5 beta release. Both ranges support it. - { format: SerializationFormat.ENCRYPTED, minVersion: '4.0.0' }, + { format: SerializationFormat.ENCRYPTED, minVersion: '4.2.0-beta.64' }, // Future entries: // { format: SerializationFormat.CBOR, minVersion: '5.x.y' }, // { format: SerializationFormat.ENCRYPTED_V2, minVersion: '5.x.y' }, diff --git a/packages/workflow/package.json b/packages/workflow/package.json index 92ddaae6d9..9ecf8ea472 100644 --- a/packages/workflow/package.json +++ b/packages/workflow/package.json @@ -1,6 +1,6 @@ { "name": "workflow", - "version": "4.0.0", + "version": "5.0.0-beta.0", "description": "Workflow SDK - Build durable, resilient, and observable workflows", "main": "dist/typescript-plugin.cjs", "type": "module", From 3fa3b6aac211674e159b500ed0fb4343fff2f85b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 8 Apr 2026 13:15:57 -0700 Subject: [PATCH 17/19] Flip module resolution order: dynamicImport() before require() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback — try ESM-first since this PR's purpose is ESM support, fall back to require() for CJS test runners where new Function-based import() is unavailable. This avoids the dual-package hazard where require() silently loads the CJS entry for packages that ship both CJS and ESM. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/src/runtime/world.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/core/src/runtime/world.ts b/packages/core/src/runtime/world.ts index 1fa136ffa8..bea5ae961a 100644 --- a/packages/core/src/runtime/world.ts +++ b/packages/core/src/runtime/world.ts @@ -102,15 +102,15 @@ export const createWorld = async (): Promise => { }); } - // Try require() first — works for CJS-compatible packages and in test - // runners where `new Function`-based import() is unavailable. - // Fall back to dynamic import() for ESM-only modules. + // Try dynamic import() first — ESM-first since this PR's purpose is ESM support. + // Fall back to require() for environments where `new Function`-based import() + // is unavailable (e.g. CJS test runners). let mod: any; try { - mod = require(targetWorld); - } catch { const resolvedPath = resolveModulePath(targetWorld); mod = await dynamicImport(resolvedPath); + } catch { + mod = require(targetWorld); } if (typeof mod === 'function') { return mod() as World; From 1c663f6f84bf5f5c88e4d5a4b407e2f6f0a066b2 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 9 Apr 2026 10:34:17 -0700 Subject: [PATCH 18/19] Fix merge artifacts: duplicate import and missing features field Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/src/runtime/step-handler.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 35c59b5c90..f38184f293 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -43,7 +43,6 @@ import { queueMessage, withHealthCheck, } from './helpers.js'; -import { MAX_QUEUE_DELIVERIES } from './constants.js'; import { getWorld, getWorldHandlers, type WorldHandlers } from './world.js'; const DEFAULT_STEP_MAX_RETRIES = 3; @@ -519,6 +518,7 @@ const stepHandler = (worldHandlers: WorldHandlers) => url: isVercel ? `https://${process.env.VERCEL_URL}` : `http://localhost:${port ?? 3000}`, + features: { encryption: !!encryptionKey }, }, ops, closureVars: hydratedInput.closureVars, From 2653faff9058ebf1f61dcec7494c400d68f294cd Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 9 Apr 2026 13:44:44 -0700 Subject: [PATCH 19/19] docs: document async getWorld() and update World SDK skill - Clarify getWorld API card, get-world page, and world overview/streams - Add getWorld() to docs-typecheck globals - SKILL: await getWorld(), world.streams API, bump to 1.7 Made-with: Cursor --- .../api-reference/workflow-api/get-world.mdx | 6 +++--- .../docs/api-reference/workflow-api/index.mdx | 2 +- .../workflow-api/world/index.mdx | 2 +- .../workflow-api/world/streams.mdx | 2 +- packages/docs-typecheck/src/docs-globals.d.ts | 3 +++ skills/workflow/SKILL.md | 20 ++++++++++--------- 6 files changed, 20 insertions(+), 15 deletions(-) diff --git a/docs/content/docs/api-reference/workflow-api/get-world.mdx b/docs/content/docs/api-reference/workflow-api/get-world.mdx index 2bab35f3fd..2feb7a6ab1 100644 --- a/docs/content/docs/api-reference/workflow-api/get-world.mdx +++ b/docs/content/docs/api-reference/workflow-api/get-world.mdx @@ -1,8 +1,8 @@ --- title: getWorld -description: Access the World instance for low-level storage, queuing, and streaming operations. +description: Async function that resolves the World instance for low-level storage, queuing, and streaming operations. type: reference -summary: Use getWorld to access low-level workflow storage, queuing, and streaming backends directly. +summary: Async function that resolves the World instance for low-level workflow storage, queuing, and streaming backends. prerequisites: - /docs/deploying --- @@ -79,7 +79,7 @@ export async function GET(req: Request) { const cursor = url.searchParams.get("cursor") ?? undefined; try { - const world = getWorld(); // [!code highlight] + const world = await getWorld(); // [!code highlight] const runs = await world.runs.list({ pagination: { cursor }, resolveData: "none", diff --git a/docs/content/docs/api-reference/workflow-api/index.mdx b/docs/content/docs/api-reference/workflow-api/index.mdx index ccd1977573..5941e9068d 100644 --- a/docs/content/docs/api-reference/workflow-api/index.mdx +++ b/docs/content/docs/api-reference/workflow-api/index.mdx @@ -28,7 +28,7 @@ The API package is for access and introspection of workflow data to inspect runs Get workflow run status and metadata without waiting for completion. - Get direct access to workflow storage, queuing, and streaming backends. + Async: resolve the World instance for storage, queuing, and streaming backends. Low-level API for inspecting runs, steps, events, hooks, streams, and queues. diff --git a/docs/content/docs/api-reference/workflow-api/world/index.mdx b/docs/content/docs/api-reference/workflow-api/world/index.mdx index e102f37445..1a1c02423a 100644 --- a/docs/content/docs/api-reference/workflow-api/world/index.mdx +++ b/docs/content/docs/api-reference/workflow-api/world/index.mdx @@ -2,7 +2,7 @@ title: World SDK description: Low-level API for inspecting and managing workflow runs, steps, events, hooks, streams, and queues. type: overview -summary: Access workflow infrastructure directly via getWorld() for building observability dashboards, admin tools, and custom integrations. +summary: Access workflow infrastructure via await getWorld() for building observability dashboards, admin tools, and custom integrations. prerequisites: - /docs/api-reference/workflow-api/get-world keywords: diff --git a/docs/content/docs/api-reference/workflow-api/world/streams.mdx b/docs/content/docs/api-reference/workflow-api/world/streams.mdx index cbe9d5f03b..7d8b34e280 100644 --- a/docs/content/docs/api-reference/workflow-api/world/streams.mdx +++ b/docs/content/docs/api-reference/workflow-api/world/streams.mdx @@ -21,7 +21,7 @@ keywords: - stream lifecycle --- -Stream methods live on `world.streams` (the `streams` sub-object of the `world` object returned by `getWorld()`). Use them to write chunks, read streams, and manage stream lifecycle outside of the standard `getWritable()` pattern. +Stream methods live on `world.streams` (the `streams` sub-object of the `World` instance returned by `await getWorld()`). Use them to write chunks, read streams, and manage stream lifecycle outside of the standard `getWritable()` pattern. For most streaming use cases, use [`getWritable()`](/docs/api-reference/workflow/get-writable) inside steps. Direct stream methods are for advanced scenarios like building custom stream consumers or managing streams from outside a workflow. diff --git a/packages/docs-typecheck/src/docs-globals.d.ts b/packages/docs-typecheck/src/docs-globals.d.ts index cd3bc1f596..05741b4aa8 100644 --- a/packages/docs-typecheck/src/docs-globals.d.ts +++ b/packages/docs-typecheck/src/docs-globals.d.ts @@ -232,6 +232,9 @@ declare global { queue: (...args: any[]) => Promise; createQueueHandler: (...args: any[]) => any; }; + /** Resolves the configured World (async — may perform dynamic import / env-based setup). */ + function getWorld(): Promise; + const streamId: string; const streamName: string; const hookId: string; diff --git a/skills/workflow/SKILL.md b/skills/workflow/SKILL.md index 15ec0611ad..54d62f314c 100644 --- a/skills/workflow/SKILL.md +++ b/skills/workflow/SKILL.md @@ -3,7 +3,7 @@ name: workflow description: Creates durable, resumable workflows using Vercel's Workflow SDK. Use when building workflows that need to survive restarts, pause for external events, retry on failure, or coordinate multi-step operations over time. Triggers on mentions of "workflow", "durable functions", "resumable", "workflow sdk", "queue", "event", "push", "subscribe", or step-based orchestration. metadata: author: Vercel Inc. - version: '1.6' + version: '1.7' --- ## *CRITICAL*: Always Use Correct `workflow` Documentation @@ -617,7 +617,7 @@ await resumeWebhook(hook.token, new Request("https://example.com/webhook", { ## Observability & World SDK -Use `getWorld()` to build observability dashboards, admin panels, and inspect workflow state. +Use `await getWorld()` to build observability dashboards, admin panels, and inspect workflow state. `getWorld()` is asynchronous and returns `Promise` (dynamic import / env-based setup). **Key imports:** ```typescript @@ -634,7 +634,7 @@ import { hydrateResourceIO, observabilityRevivers, parseStepName, parseWorkflowN ⚠️ Pagination is nested: `{ pagination: { cursor } }` — NOT `{ cursor }` directly. ```typescript -const world = getWorld(); +const world = await getWorld(); // Runs const { data, cursor } = await world.runs.list({ pagination: { cursor }, resolveData: 'all' | 'none' }); @@ -654,12 +654,14 @@ await world.events.create(runId, { eventType: 'run_cancelled' }); const hook = await world.hooks.get(hookId); const hook = await world.hooks.getByToken(token); -// Streams (methods live directly on world, not nested) -await world.writeToStream(name, runId, chunk); -const readable = await world.readFromStream(name); -const chunks = await world.getStreamChunks(name, runId, { limit, cursor }); -const info = await world.getStreamInfo(name, runId); -const streams = await world.listStreamsByRunId(runId); +// Streams (methods on world.streams) +await world.streams.write(runId, name, chunk); +await world.streams.writeMulti?.(runId, name, chunks); +const readable = await world.streams.get(runId, name, startIndex); +await world.streams.close(runId, name); +const streamNames = await world.streams.list(runId); +const chunks = await world.streams.getChunks(runId, name, { limit, cursor }); +const info = await world.streams.getInfo(runId, name); // Queue (methods live directly on world — internal SDK infrastructure) await world.queue(queueName, payload, opts);