feat(engine): persist-first webhook outbox for queue-backed adapters#178
Conversation
The Node self-host adapter delivered webhooks fire-and-forget: an in-process send with 3 inline retries, and any failure or restart lost the event. The hosted Cloudflare path gets real durability from CF Queues + DLQ; self-hosters got message loss. The pending_events table existed in the schema but nothing consumed it. The Node adapter's event queue now uses pending_events as a consumed outbox: - send persists the row first (durable once send resolves), then kicks an immediate poll so delivery stays prompt. - A background poller (configurable interval) claims due rows with a single UPDATE ... WHERE id IN (subquery) RETURNING statement — atomic claim with attempts++ and a lease on process_after, per the no-interactive-transactions doctrine in ports/database.ts. A worker that crashes mid-delivery leaves the row reclaimable after the lease. - Delivery reuses deliverEvent unchanged (HMAC signing, terminal-4xx vs retryable classification): success deletes the row, terminal failures settle it as failed, retryable failures reschedule with capped exponential backoff until max_attempts is exhausted. - Startup resumes leftover due rows, so deliveries survive restarts. - cleanupOldEvents (24h) is wired into the poll cadence so settled rows are pruned. The EventQueue port contract and the Cloudflare path are unchanged; InProcessEventQueue is renamed to DurableEventQueue (engine-internal, no external importers). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Move the pending_events outbox insert from the Node adapter into the engine send path so every adapter gets the same durability guarantee: routes insert the row synchronously in the request path (single cheap INSERT via routes/webhookOutbox.ts), then hand the row id to eventQueue.send in the background. If the queue send is lost (Workers isolate dies after the response, queue outage), the row stays pending and is re-enqueued by the sweep instead of vanishing. - QueuedEvent gains an optional outboxId; adapters that receive it must not insert a second row and the consumer settles the row after delivery. Absent outboxId keeps the legacy contract. - DurableEventQueue.send skips the insert when outboxId is present (no double-insert / double-delivery on the Node path). - New sweepPendingEvents(db, opts) claims due rows via the same atomic claimDueEvents the Node poller uses and returns them with complete/fail/reschedule settle callbacks so a scheduled handler can re-enqueue to an external queue without delivering directly. - Export the outbox primitives (enqueueEvent, claimDueEvents, completeEvent, failEvent, rescheduleEvent, sweepPendingEvents, cleanupOldEvents) from the engine package root for queue consumers. Tests: route-level persist-first ordering incl. sync/async queue-send failures leaving the row sweepable; Node adapter no-double-insert; sweep exactly-once claims under concurrent sweepers, lease expiry, and settle callbacks. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
|
Warning Review limit reached
More reviews will be available in 19 minutes and 21 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (21)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2fdff64ae3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
Resolve webhook outbox conflicts and move idempotent webhook outbox insertion before success record storage.
Problem
#175 made the Node self-host adapter durable, but the hosted (Cloudflare) deployment still has a loss window: the engine enqueues webhook events to the CF Queue inside
waitUntilafter the HTTP response. If the isolate dies between the D1 message insert and the queue send, the webhook event is lost forever — CF Queue retries + DLQ only cover post-enqueue failures.Change
Move the
pending_eventsoutbox insert from the Node adapter into the engine send path, so the persist-first guarantee holds for every adapter:routes/webhookOutbox.ts(new) —sendWebhookEvent(c, event)inserts the outbox row synchronously in the request path (single cheap INSERT, awaited before the response), then hands{ ...event, outboxId }toeventQueue.sendin the background. If the insert itself fails, the route still responds and the event degrades to the legacy fire-and-forget send (nooutboxId). All 26 route call sites now go through it.ports/event-queue.ts) —QueuedEvent.outboxId?: string, backward compatible. Present: row is already durable, adapters must not double-insert, the consumer settles it. Absent: legacy semantics.DurableEventQueue.sendskips the insert whenoutboxIdis present; poller/lease semantics unchanged.sweepPendingEvents(db, opts)(new) — claims due rows via the same atomicclaimDueEventsthe Node poller uses and returns them withcomplete/fail/reschedulesettle callbacks, so a scheduled handler (CF cron) can RE-ENQUEUE them to an external queue. It does not deliver directly.enqueueEvent,claimDueEvents,completeEvent,failEvent,rescheduleEvent,sweepPendingEvents,cleanupOldEvents, plusClaimedEvent/SweptEventtypes are exported from the package root so the cloud queue consumer / scheduled handler can import them.The companion cloud PR (AgentWorkforce/cloud
feature/relaycast-outbox-settlement) wires the CF queue consumer to settle rows and the scheduled handler to sweep + re-enqueue.Tests
Extends the #175 suite (70 passing):
eventQueue.sendis invoked; row remains sweepable when the queue send throws synchronously or rejects asynchronously, and the mutation still returns 201.sendwithoutboxIddoes not double-insert (exactly one delivery).npm test,npm run typecheck,npm run lintall green inpackages/engine.🤖 Generated with Claude Code