diff --git a/.changeset/retry-vqs-handler-errors-immediately.md b/.changeset/retry-vqs-handler-errors-immediately.md new file mode 100644 index 0000000000..793d96d4c0 --- /dev/null +++ b/.changeset/retry-vqs-handler-errors-immediately.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": patch +--- + +Release failed VQS workflow handler messages on the configured retry cadence. diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index 78b3630652..b8b2490af8 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -353,7 +353,61 @@ describe('createQueue', () => { queue.createQueueHandler('__wkf_workflow_', async () => undefined); expect(mockHandleCallback).toHaveBeenCalledTimes(1); - expect(mockHandleCallback).toHaveBeenCalledWith(expect.any(Function)); + expect(mockHandleCallback).toHaveBeenCalledWith(expect.any(Function), { + retry: expect.any(Function), + }); + }); + + it('should ask VQS to retry handler errors with bounded backoff', () => { + mockHandleCallback.mockReturnValue(async () => new Response('ok')); + const randomSpy = vi.spyOn(Math, 'random').mockReturnValue(0); + + try { + const queue = createQueue(); + queue.createQueueHandler('__wkf_workflow_', async () => undefined); + + const options = mockHandleCallback.mock.calls[0][1]; + expect( + options.retry(new Error('workflow server unavailable'), { + messageId: 'msg-123', + deliveryCount: 1, + }) + ).toEqual({ afterSeconds: 1 }); + expect( + options.retry(new Error('workflow server unavailable'), { + messageId: 'msg-123', + deliveryCount: 2, + }) + ).toEqual({ afterSeconds: 2 }); + expect( + options.retry(new Error('workflow server unavailable'), { + messageId: 'msg-123', + deliveryCount: 4, + }) + ).toEqual({ afterSeconds: 8 }); + expect( + options.retry(new Error('workflow server unavailable'), { + messageId: 'msg-123', + deliveryCount: 8, + }) + ).toEqual({ afterSeconds: 60 }); + + randomSpy.mockReturnValue(0.999); + expect( + options.retry(new Error('workflow server unavailable'), { + messageId: 'msg-123', + deliveryCount: 4, + }) + ).toEqual({ afterSeconds: 6 }); + expect( + options.retry(new Error('workflow server unavailable'), { + messageId: 'msg-123', + deliveryCount: 8, + }) + ).toEqual({ afterSeconds: 45 }); + } finally { + randomSpy.mockRestore(); + } }); it('should send new message with delaySeconds when handler returns timeoutSeconds', async () => { diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 530062f677..8fdab46579 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -134,6 +134,25 @@ const MAX_DELAY_SECONDS = Number( process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS || 82800 // 23 hours - leave 1h buffer before 24h retention limit ); +const HANDLER_ERROR_RETRY_AFTER_SECONDS = 1; +const HANDLER_ERROR_MAX_RETRY_AFTER_SECONDS = 60; +const HANDLER_ERROR_RETRY_JITTER_RATIO = 0.25; + +function getHandlerErrorRetryAfterSeconds(deliveryCount: number): number { + const backoffSeconds = Math.min( + Math.max(HANDLER_ERROR_RETRY_AFTER_SECONDS, 2 ** (deliveryCount - 1)), + HANDLER_ERROR_MAX_RETRY_AFTER_SECONDS + ); + const jitterSeconds = Math.floor( + Math.random() * + (Math.ceil(backoffSeconds * HANDLER_ERROR_RETRY_JITTER_RATIO) + 1) + ); + return Math.max( + HANDLER_ERROR_RETRY_AFTER_SECONDS, + backoffSeconds - jitterSeconds + ); +} + /** * Extract known identifiers from a queue payload and return them as VQS headers. * This ensures observability headers are always set without relying on callers. @@ -290,6 +309,17 @@ export function createQueue(config?: APIConfig): Queue { // we may get a duplicate invocation but won't lose the scheduled wakeup. await queue(queueName, payload, { deploymentId, delaySeconds }); } + }, + { + // Without an explicit retry directive, @vercel/queue leaves failed + // handler messages invisible until the default 300s visibility timeout + // expires. Start retrying quickly, then back off by delivery count + // with jitter so an outage or poison message cannot hot-loop or + // redrive in lockstep. Workflow handlers are event-sourced and must + // remain idempotent because queue retries can happen close together. + retry: (_error, { deliveryCount }) => ({ + afterSeconds: getHandlerErrorRetryAfterSeconds(deliveryCount), + }), } );