Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/retry-vqs-handler-errors-immediately.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-vercel": patch
---

Release failed VQS workflow handler messages on the configured retry cadence.
56 changes: 55 additions & 1 deletion packages/world-vercel/src/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,61 @@ describe('createQueue', () => {
queue.createQueueHandler('__wkf_workflow_', async () => undefined);

expect(mockHandleCallback).toHaveBeenCalledTimes(1);
expect(mockHandleCallback).toHaveBeenCalledWith(expect.any(Function));
expect(mockHandleCallback).toHaveBeenCalledWith(expect.any(Function), {
retry: expect.any(Function),
});
});

it('should ask VQS to retry handler errors with bounded backoff', () => {
mockHandleCallback.mockReturnValue(async () => new Response('ok'));
const randomSpy = vi.spyOn(Math, 'random').mockReturnValue(0);

try {
const queue = createQueue();
queue.createQueueHandler('__wkf_workflow_', async () => undefined);

const options = mockHandleCallback.mock.calls[0][1];
expect(
options.retry(new Error('workflow server unavailable'), {
messageId: 'msg-123',
deliveryCount: 1,
})
).toEqual({ afterSeconds: 1 });
expect(
options.retry(new Error('workflow server unavailable'), {
messageId: 'msg-123',
deliveryCount: 2,
})
).toEqual({ afterSeconds: 2 });
expect(
options.retry(new Error('workflow server unavailable'), {
messageId: 'msg-123',
deliveryCount: 4,
})
).toEqual({ afterSeconds: 8 });
expect(
options.retry(new Error('workflow server unavailable'), {
messageId: 'msg-123',
deliveryCount: 8,
})
).toEqual({ afterSeconds: 60 });

randomSpy.mockReturnValue(0.999);
expect(
options.retry(new Error('workflow server unavailable'), {
messageId: 'msg-123',
deliveryCount: 4,
})
).toEqual({ afterSeconds: 6 });
expect(
options.retry(new Error('workflow server unavailable'), {
messageId: 'msg-123',
deliveryCount: 8,
})
).toEqual({ afterSeconds: 45 });
} finally {
randomSpy.mockRestore();
}
});

it('should send new message with delaySeconds when handler returns timeoutSeconds', async () => {
Expand Down
30 changes: 30 additions & 0 deletions packages/world-vercel/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,25 @@ const MAX_DELAY_SECONDS = Number(
process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS || 82800 // 23 hours - leave 1h buffer before 24h retention limit
);

const HANDLER_ERROR_RETRY_AFTER_SECONDS = 1;
const HANDLER_ERROR_MAX_RETRY_AFTER_SECONDS = 60;
const HANDLER_ERROR_RETRY_JITTER_RATIO = 0.25;

function getHandlerErrorRetryAfterSeconds(deliveryCount: number): number {
const backoffSeconds = Math.min(
Math.max(HANDLER_ERROR_RETRY_AFTER_SECONDS, 2 ** (deliveryCount - 1)),
HANDLER_ERROR_MAX_RETRY_AFTER_SECONDS
);
const jitterSeconds = Math.floor(
Math.random() *
(Math.ceil(backoffSeconds * HANDLER_ERROR_RETRY_JITTER_RATIO) + 1)
);
return Math.max(
HANDLER_ERROR_RETRY_AFTER_SECONDS,
backoffSeconds - jitterSeconds
);
}

/**
* Extract known identifiers from a queue payload and return them as VQS headers.
* This ensures observability headers are always set without relying on callers.
Expand Down Expand Up @@ -290,6 +309,17 @@ export function createQueue(config?: APIConfig): Queue {
// we may get a duplicate invocation but won't lose the scheduled wakeup.
await queue(queueName, payload, { deploymentId, delaySeconds });
}
},
{
// Without an explicit retry directive, @vercel/queue leaves failed
// handler messages invisible until the default 300s visibility timeout
// expires. Start retrying quickly, then back off by delivery count
// with jitter so an outage or poison message cannot hot-loop or
// redrive in lockstep. Workflow handlers are event-sourced and must
// remain idempotent because queue retries can happen close together.
retry: (_error, { deliveryCount }) => ({
afterSeconds: getHandlerErrorRetryAfterSeconds(deliveryCount),
}),
}
);

Expand Down