-
Notifications
You must be signed in to change notification settings - Fork 262
[core] Exclude inline step execution from replay timeout #2013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8f34ee7
ece3bc5
daae939
a0929d5
ae34ff9
1cf51e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| --- | ||
| "@workflow/core": patch | ||
| "@workflow/world": patch | ||
| "@workflow/world-vercel": patch | ||
| --- | ||
|
|
||
| Exclude inline step execution from the workflow replay timeout. Long-running steps no longer hit `REPLAY_TIMEOUT` (fixes #2009). Adds a `WORKFLOW_REPLAY_TIMEOUT_MS` env var override and a new optional `World.processExitTriggersQueueRedelivery` capability used to gate the runtime's `process.exit(1)` failure path. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,11 +19,7 @@ import { classifyRunError, isWorldContractError } from './classify-error.js'; | |
| import { describeError } from './describe-error.js'; | ||
| import { WorkflowSuspension } from './global.js'; | ||
| import { runtimeLogger } from './logger.js'; | ||
| import { | ||
| MAX_QUEUE_DELIVERIES, | ||
| REPLAY_TIMEOUT_MAX_RETRIES, | ||
| REPLAY_TIMEOUT_MS, | ||
| } from './runtime/constants.js'; | ||
| import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js'; | ||
| import { | ||
| getQueueOverhead, | ||
| getWorkflowQueueName, | ||
|
|
@@ -34,6 +30,10 @@ import { | |
| queueMessage, | ||
| withHealthCheck, | ||
| } from './runtime/helpers.js'; | ||
| import { | ||
| handleReplayBudgetExhausted, | ||
| ReplayBudget, | ||
| } from './runtime/replay-budget.js'; | ||
| import { executeStep } from './runtime/step-executor.js'; | ||
| import { handleSuspension } from './runtime/suspension-handler.js'; | ||
| import { | ||
|
|
@@ -284,84 +284,31 @@ export function workflowEntrypoint( | |
|
|
||
| const spanLinks = await linkToCurrentContext(); | ||
|
|
||
| // --- Replay timeout guard --- | ||
| // If the replay takes longer than the timeout, fail the run and exit. | ||
| // This must be lower than the function's maxDuration to ensure | ||
| // the failure is recorded before the platform kills the function. | ||
| let replayTimeout: NodeJS.Timeout | undefined; | ||
| if (process.env.VERCEL_URL !== undefined) { | ||
| replayTimeout = setTimeout(async () => { | ||
| // Allow a few retries before permanently failing the run. | ||
| // On early attempts, just exit so the queue retries the message. | ||
| if (metadata.attempt <= REPLAY_TIMEOUT_MAX_RETRIES) { | ||
| runLogger.warn( | ||
| 'Workflow replay exceeded timeout but will be re-attempted (attempt < maxRetries)', | ||
| { | ||
| timeoutMs: REPLAY_TIMEOUT_MS, | ||
| attempt: metadata.attempt, | ||
| maxRetries: REPLAY_TIMEOUT_MAX_RETRIES, | ||
| } | ||
| ); | ||
| process.exit(1); | ||
| } | ||
|
|
||
| const replayTimeoutDescription = describeError( | ||
| undefined, | ||
| RUN_ERROR_CODES.REPLAY_TIMEOUT | ||
| ); | ||
| runLogger.error( | ||
| 'Workflow replay exceeded timeout and max retries exceeded. Failing the run', | ||
| { | ||
| timeoutMs: REPLAY_TIMEOUT_MS, | ||
| attempt: metadata.attempt, | ||
| maxRetries: REPLAY_TIMEOUT_MAX_RETRIES, | ||
| errorCode: replayTimeoutDescription.errorCode, | ||
| errorAttribution: replayTimeoutDescription.attribution, | ||
| } | ||
| ); | ||
|
|
||
| try { | ||
| const world = await getWorld(); | ||
| const getEncryptionKey = memoizeEncryptionKey(world, runId); | ||
| const timeoutErr = new FatalError( | ||
| `Workflow replay exceeded maximum duration (${REPLAY_TIMEOUT_MS / 1000}s) after ${metadata.attempt} attempts` | ||
| ); | ||
| await world.events.create( | ||
| runId, | ||
| { | ||
| eventType: 'run_failed', | ||
| specVersion: SPEC_VERSION_CURRENT, | ||
| eventData: { | ||
| error: await dehydrateRunError( | ||
| timeoutErr, | ||
| runId, | ||
| await getEncryptionKey() | ||
| ), | ||
| errorCode: RUN_ERROR_CODES.REPLAY_TIMEOUT, | ||
| }, | ||
| }, | ||
| { requestId } | ||
| ); | ||
| } catch (err) { | ||
| // Best effort — process exits regardless. Surface why so | ||
| // operators can diagnose repeat timeouts against the backend. | ||
| runLogger.warn( | ||
| 'Unable to mark run as failed. The queue will continue to retry', | ||
| { | ||
| attempt: metadata.attempt, | ||
| errorName: err instanceof Error ? err.name : 'UnknownError', | ||
| errorMessage: | ||
| err instanceof Error ? err.message : String(err), | ||
| errorStack: err instanceof Error ? err.stack : undefined, | ||
| } | ||
| ); | ||
| } | ||
| // Note that this also prevents the runtime from acking the queue message, | ||
| // so the queue will call back once, after which a 410 will get it to exit early. | ||
| process.exit(1); | ||
| }, REPLAY_TIMEOUT_MS); | ||
| replayTimeout.unref(); | ||
| } | ||
| // --- Replay budget bookkeeping --- | ||
| // The replay budget bounds the *non-step* portion of a single | ||
| // handler invocation: deterministic event-log replay, workflow-VM | ||
| // execution between step boundaries, suspension handling, queue | ||
| // round-trips, etc. Inline step bodies (`"use step"` functions | ||
| // invoked via `executeStep`) are intentionally excluded — they are | ||
| // bounded by the platform's function `maxDuration` and the | ||
| // `NO_INLINE_REPLAY_AFTER_MS` early-return guard below. | ||
| // | ||
| // The budget is checked at loop boundaries (top of each `while` | ||
| // iteration). Note this is *less responsive* than the old | ||
| // `setTimeout`-based approach: a single pathological `runWorkflow` | ||
| // call processing a huge event log can overshoot the budget by up | ||
| // to one iteration before bailing. In practice the headroom built | ||
| // into `MAX_REPLAY_TIMEOUT_MS` (and the platform `maxDuration` | ||
| // SIGTERM as ultimate backstop) gives us slack — the previous | ||
| // `setTimeout` approach also relied on the platform kill as the | ||
| // hard backstop. Do *not* "fix" this by adding a `setInterval`; | ||
| // it would risk the same bug we just removed (bounding step | ||
| // bodies). | ||
| // | ||
| // Earlier versions (pre-#2009 fix) used a single `setTimeout` | ||
| // that also bounded step bodies, which broke any workflow with a | ||
| // single step longer than the budget. | ||
| const replayBudget = new ReplayBudget(); | ||
|
|
||
| return await withTraceContext(traceContext, async () => { | ||
| return await withWorkflowBaggage( | ||
|
|
@@ -418,14 +365,24 @@ export function workflowEntrypoint( | |
| const bgStartedAt = bgRun.startedAt | ||
| ? +bgRun.startedAt | ||
| : Date.now(); | ||
| const stepResult = await executeStep({ | ||
| world, | ||
| workflowRunId: runId, | ||
| workflowName, | ||
| workflowStartedAt: bgStartedAt, | ||
| stepId: incomingStepId, | ||
| stepName: incomingStepName, | ||
| }); | ||
| // Pause the replay budget while the step body runs — | ||
| // step duration is bounded by the platform's function | ||
| // maxDuration, not by the replay timeout. See the | ||
| // ReplayBudget docs for the contract. | ||
| replayBudget.pause(); | ||
| let stepResult: Awaited<ReturnType<typeof executeStep>>; | ||
| try { | ||
| stepResult = await executeStep({ | ||
| world, | ||
| workflowRunId: runId, | ||
| workflowName, | ||
| workflowStartedAt: bgStartedAt, | ||
| stepId: incomingStepId, | ||
| stepName: incomingStepName, | ||
| }); | ||
| } finally { | ||
| replayBudget.resume(); | ||
| } | ||
| if (stepResult.type === 'retry') { | ||
| return { timeoutSeconds: stepResult.timeoutSeconds }; | ||
| } | ||
|
|
@@ -665,6 +622,26 @@ export function workflowEntrypoint( | |
| while (true) { | ||
| loopIteration++; | ||
|
|
||
| // Replay-budget check: bail out (retry or fail) if | ||
| // non-step time within this invocation has exceeded | ||
| // the configured budget. Step bodies are excluded | ||
| // because replayBudget.pause()/resume() bracket every | ||
| // `executeStep` call. | ||
| if (replayBudget.isExhausted()) { | ||
| await handleReplayBudgetExhausted({ | ||
| runId, | ||
| workflowName, | ||
| requestId, | ||
| attempt: metadata.attempt, | ||
| limitMs: replayBudget.configuredLimitMs, | ||
| }); | ||
| // On Vercel, handleReplayBudgetExhausted always | ||
| // exits the process. On local dev it returns; we | ||
| // fall through and the request ends normally | ||
| // (run_failed has been written best-effort). | ||
| return; | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Budget vs step-heavy flow — worth being explicit about the NO_INLINE_REPLAY_AFTER_MS interaction. A reasonable concern is that with V5's inline step execution, the same handler now runs many steps in sequence within one invocation, so non-step time accumulates across loop iterations within one invocation. With many steps that could theoretically exhaust the budget. In practice it can't at default config because The two checks are tightly coupled — if anyone changes
|
||
|
|
||
|
TooTallNate marked this conversation as resolved.
|
||
| // Check timeout before replay | ||
| if ( | ||
| Date.now() - invocationStartTime >= | ||
|
|
@@ -1074,15 +1051,27 @@ export function workflowEntrypoint( | |
| return; | ||
| } | ||
|
|
||
| // Execute inline step | ||
| const stepResult = await executeStep({ | ||
| world, | ||
| workflowRunId: runId, | ||
| workflowName, | ||
| workflowStartedAt, | ||
| stepId: inlineStep.correlationId, | ||
| stepName: inlineStep.stepName, | ||
| }); | ||
| // Execute inline step. Pause the replay budget | ||
| // for the duration of the step body — step | ||
| // duration is bounded by the platform's function | ||
| // maxDuration, not by the replay timeout. Without | ||
| // this the replay-budget check at the top of the | ||
| // next loop iteration would (incorrectly) charge | ||
| // the step body against the budget. | ||
| replayBudget.pause(); | ||
| let stepResult: Awaited<ReturnType<typeof executeStep>>; | ||
| try { | ||
| stepResult = await executeStep({ | ||
| world, | ||
| workflowRunId: runId, | ||
| workflowName, | ||
| workflowStartedAt, | ||
| stepId: inlineStep.correlationId, | ||
| stepName: inlineStep.stepName, | ||
| }); | ||
| } finally { | ||
| replayBudget.resume(); | ||
| } | ||
|
|
||
| if (stepResult.type === 'retry') { | ||
| // Step needs retry — queue self with stepId for retry | ||
|
|
@@ -1274,10 +1263,6 @@ export function workflowEntrypoint( | |
| ); // End trace | ||
| } | ||
| ); // End withWorkflowBaggage | ||
| }).finally(() => { | ||
| if (replayTimeout) { | ||
| clearTimeout(replayTimeout); | ||
| } | ||
| }); // End withTraceContext | ||
| } | ||
| ); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.