From 79da8ab70b1d9477ccb95e6d331177bfa8f0d5c1 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Tue, 17 Feb 2026 13:33:35 -0800 Subject: [PATCH] Auto-inject workflow run/step ID VQS headers from queue payload Move `x-workflow-run-id` and `x-workflow-step-id` header injection into `world-vercel` queue implementation so callers don't need to set them manually. The headers are extracted from the queue payload itself. Co-Authored-By: Claude Opus 4.6 --- .changeset/auto-vqs-run-id-header.md | 6 ++++ packages/core/src/runtime/step-handler.ts | 34 ++++++------------- .../core/src/runtime/suspension-handler.ts | 1 - packages/world-vercel/src/queue.ts | 27 ++++++++++++++- 4 files changed, 42 insertions(+), 26 deletions(-) create mode 100644 .changeset/auto-vqs-run-id-header.md diff --git a/.changeset/auto-vqs-run-id-header.md b/.changeset/auto-vqs-run-id-header.md new file mode 100644 index 0000000000..049b0cb800 --- /dev/null +++ b/.changeset/auto-vqs-run-id-header.md @@ -0,0 +1,6 @@ +--- +'@workflow/world-vercel': patch +'@workflow/core': patch +--- + +Auto-inject `x-workflow-run-id` and `x-workflow-step-id` VQS headers from queue payload in `world-vercel` diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index bf69f28ed0..78a8127423 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -265,18 +265,11 @@ const stepHandler = getWorldHandlers().createQueueHandler( }); // Re-invoke the workflow to handle the failed step - await queueMessage( - world, - getWorkflowQueueName(workflowName), - { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { - headers: { 'x-workflow-run-id': workflowRunId }, - } - ); + await queueMessage(world, getWorkflowQueueName(workflowName), { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); return; } @@ -608,18 +601,11 @@ const stepHandler = getWorldHandlers().createQueueHandler( } } - await queueMessage( - world, - getWorkflowQueueName(workflowName), - { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { - headers: { 'x-workflow-run-id': workflowRunId }, - } - ); + await queueMessage(world, getWorkflowQueueName(workflowName), { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); } ); }); diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index 5f9f4b7852..e6737cf199 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -204,7 +204,6 @@ export async function handleSuspension({ { idempotencyKey: queueItem.correlationId, headers: { - 'x-workflow-run-id': runId, ...extractTraceHeaders(traceCarrier), }, } diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index bfed59e674..ad0afd61de 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -48,6 +48,28 @@ const MAX_DELAY_SECONDS = Number( process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS || 82800 // 23 hours - leave 1h buffer before 24h retention limit ); +/** + * Extract known identifiers from a queue payload and return them as VQS headers. + * This ensures observability headers are always set without relying on callers. + */ +function getHeadersFromPayload( + payload: QueuePayload +): Record | undefined { + const headers: Record = {}; + + if ('runId' in payload && typeof payload.runId === 'string') { + headers['x-workflow-run-id'] = payload.runId; + } + if ('workflowRunId' in payload && typeof payload.workflowRunId === 'string') { + headers['x-workflow-run-id'] = payload.workflowRunId; + } + if ('stepId' in payload && typeof payload.stepId === 'string') { + headers['x-workflow-step-id'] = payload.stepId; + } + + return Object.keys(headers).length > 0 ? headers : undefined; +} + type QueueFunction = ( queueName: ValidQueueName, payload: QueuePayload, @@ -114,7 +136,10 @@ export function createQueue(config?: APIConfig): Queue { { idempotencyKey: opts?.idempotencyKey, delaySeconds: opts?.delaySeconds, - headers: opts?.headers, + headers: { + ...getHeadersFromPayload(payload), + ...opts?.headers, + }, } ); return { messageId: MessageId.parse(messageId) };