Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/auto-vqs-run-id-header.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@workflow/world-vercel': patch
'@workflow/core': patch
---

Auto-inject `x-workflow-run-id` and `x-workflow-step-id` VQS headers from queue payload in `world-vercel`
34 changes: 10 additions & 24 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,18 +265,11 @@ const stepHandler = getWorldHandlers().createQueueHandler(
});

// Re-invoke the workflow to handle the failed step
await queueMessage(
world,
getWorkflowQueueName(workflowName),
{
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
},
{
headers: { 'x-workflow-run-id': workflowRunId },
}
);
await queueMessage(world, getWorkflowQueueName(workflowName), {
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
});
return;
}

Expand Down Expand Up @@ -608,18 +601,11 @@ const stepHandler = getWorldHandlers().createQueueHandler(
}
}

await queueMessage(
world,
getWorkflowQueueName(workflowName),
{
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
},
{
headers: { 'x-workflow-run-id': workflowRunId },
}
);
await queueMessage(world, getWorkflowQueueName(workflowName), {
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
});
}
);
});
Expand Down
1 change: 0 additions & 1 deletion packages/core/src/runtime/suspension-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ export async function handleSuspension({
{
idempotencyKey: queueItem.correlationId,
headers: {
'x-workflow-run-id': runId,
...extractTraceHeaders(traceCarrier),
},
}
Expand Down
27 changes: 26 additions & 1 deletion packages/world-vercel/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@ const MAX_DELAY_SECONDS = Number(
process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS || 82800 // 23 hours - leave 1h buffer before 24h retention limit
);

/**
* Extract known identifiers from a queue payload and return them as VQS headers.
* This ensures observability headers are always set without relying on callers.
*/
function getHeadersFromPayload(
payload: QueuePayload
): Record<string, string> | undefined {
const headers: Record<string, string> = {};

if ('runId' in payload && typeof payload.runId === 'string') {
headers['x-workflow-run-id'] = payload.runId;
}
if ('workflowRunId' in payload && typeof payload.workflowRunId === 'string') {
Comment on lines +62 to +63
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getHeadersFromPayload can set x-workflow-run-id twice (from runId and workflowRunId), with the latter overwriting the former. Because QueuePayloadSchema is a union of non-strict objects (unknown keys are stripped), a payload object containing both keys can be encoded/parsed as a workflow payload with only runId, while the header ends up reflecting workflowRunId, creating an observability mismatch. Consider deriving headers from the same sanitized/encoded payload you actually send (or explicitly choosing precedence with if/else if, and potentially warning when both are present).

Suggested change
}
if ('workflowRunId' in payload && typeof payload.workflowRunId === 'string') {
} else if (
'workflowRunId' in payload &&
typeof payload.workflowRunId === 'string'
) {

Copilot uses AI. Check for mistakes.
headers['x-workflow-run-id'] = payload.workflowRunId;
}
if ('stepId' in payload && typeof payload.stepId === 'string') {
headers['x-workflow-step-id'] = payload.stepId;
}

return Object.keys(headers).length > 0 ? headers : undefined;
}

type QueueFunction = (
queueName: ValidQueueName,
payload: QueuePayload,
Expand Down Expand Up @@ -114,7 +136,10 @@ export function createQueue(config?: APIConfig): Queue {
{
idempotencyKey: opts?.idempotencyKey,
delaySeconds: opts?.delaySeconds,
headers: opts?.headers,
headers: {
...getHeadersFromPayload(payload),
...opts?.headers,
},
Comment on lines 136 to +142
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New behavior auto-injects VQS headers from the payload, but there’s no test assertion covering that Client.send receives the expected x-workflow-run-id / x-workflow-step-id headers (including the delayed re-enqueue path and merge behavior with opts.headers). Adding a focused unit test in packages/world-vercel/src/queue.test.ts would prevent regressions.

Copilot uses AI. Check for mistakes.
}
);
return { messageId: MessageId.parse(messageId) };
Expand Down
Loading