From 9284e66b72838283542c6483c1929102ea64f73f Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Tue, 9 Jun 2026 16:00:18 -0400 Subject: [PATCH 1/4] feat(engine): durable webhook outbox for the Node adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Node self-host adapter delivered webhooks fire-and-forget: an in-process send with 3 inline retries, and any failure or restart lost the event. The hosted Cloudflare path gets real durability from CF Queues + DLQ; self-hosters got message loss. The pending_events table existed in the schema but nothing consumed it. The Node adapter's event queue now uses pending_events as a consumed outbox: - send persists the row first (durable once send resolves), then kicks an immediate poll so delivery stays prompt. - A background poller (configurable interval) claims due rows with a single UPDATE ... WHERE id IN (subquery) RETURNING statement — atomic claim with attempts++ and a lease on process_after, per the no-interactive-transactions doctrine in ports/database.ts. A worker that crashes mid-delivery leaves the row reclaimable after the lease. - Delivery reuses deliverEvent unchanged (HMAC signing, terminal-4xx vs retryable classification): success deletes the row, terminal failures settle it as failed, retryable failures reschedule with capped exponential backoff until max_attempts is exhausted. - Startup resumes leftover due rows, so deliveries survive restarts. - cleanupOldEvents (24h) is wired into the poll cadence so settled rows are pruned. The EventQueue port contract and the Cloudflare path are unchanged; InProcessEventQueue is renamed to DurableEventQueue (engine-internal, no external importers). Co-Authored-By: Claude Fable 5 --- .../node/__tests__/event-queue.test.ts | 256 ++++++++++++++++++ .../engine/src/adapters/node/event-queue.ts | 161 ++++++++++- packages/engine/src/adapters/node/index.ts | 19 +- packages/engine/src/engine/eventQueue.ts | 91 ++++++- packages/engine/src/ports/event-queue.ts | 5 +- 5 files changed, 513 insertions(+), 19 deletions(-) create mode 100644 packages/engine/src/adapters/node/__tests__/event-queue.test.ts diff --git a/packages/engine/src/adapters/node/__tests__/event-queue.test.ts b/packages/engine/src/adapters/node/__tests__/event-queue.test.ts new file mode 100644 index 00000000..ee4ca93a --- /dev/null +++ b/packages/engine/src/adapters/node/__tests__/event-queue.test.ts @@ -0,0 +1,256 @@ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { eq } from 'drizzle-orm'; +import { getSqliteDb, runMigrations, type SqliteDbHandle } from '../database.js'; +import { DurableEventQueue } from '../event-queue.js'; +import { createNodeRuntime } from '../index.js'; +import { enqueueEvent } from '../../../engine/eventQueue.js'; +import { pendingEvents, workspaces, eventSubscriptions } from '../../../db/schema.js'; + +const HOOK_URL = 'https://hooks.example.test/relay'; + +let seq = 0; + +function openDb(path = ':memory:'): SqliteDbHandle { + const handle = getSqliteDb(path); + runMigrations(handle); + return handle; +} + +async function seedWorkspace(db: SqliteDbHandle['db']): Promise { + const id = `ws_${++seq}`; + await db.insert(workspaces).values({ id, name: 'test', apiKeyHash: `hash_${id}` }); + return id; +} + +async function seedSubscription( + db: SqliteDbHandle['db'], + workspaceId: string, + opts: { secret?: string } = {}, +): Promise { + await db.insert(eventSubscriptions).values({ + id: `sub_${++seq}`, + workspaceId, + events: ['*'], + url: HOOK_URL, + secret: opts.secret ?? null, + }); +} + +async function pendingRows(db: SqliteDbHandle['db']) { + return db.select().from(pendingEvents); +} + +async function waitFor(cond: () => Promise | boolean, timeoutMs = 5_000): Promise { + const start = Date.now(); + for (;;) { + if (await cond()) return; + if (Date.now() - start > timeoutMs) throw new Error('condition not met in time'); + await new Promise((resolve) => setTimeout(resolve, 25)); + } +} + +function makeQueue( + db: SqliteDbHandle['db'], + opts: ConstructorParameters[2] = {}, + onError: (err: unknown, ctx: Record) => void = () => {}, +): DurableEventQueue { + return new DurableEventQueue(db, onError, { pollIntervalMs: 0, ...opts }); +} + +const handles: SqliteDbHandle[] = []; +function track(handle: SqliteDbHandle): SqliteDbHandle { + handles.push(handle); + return handle; +} + +afterEach(() => { + vi.unstubAllGlobals(); + for (const handle of handles.splice(0)) { + try { handle.sqlite.close(); } catch { /* already closed */ } + } +}); + +describe('DurableEventQueue', () => { + it('send persists the outbox row before delivery completes', async () => { + const { db } = track(openDb()); + const ws = await seedWorkspace(db); + await seedSubscription(db, ws); + + let release!: () => void; + const gate = new Promise((resolve) => { release = resolve; }); + const fetchMock = vi.fn(async () => { + await gate; + return new Response('ok', { status: 200 }); + }); + vi.stubGlobal('fetch', fetchMock); + + const queue = makeQueue(db); + await queue.send({ type: 'message.created', workspaceId: ws, data: { text: 'hi' } }); + + // The row is durable as soon as send resolves, while delivery is still in flight. + const rows = await pendingRows(db); + expect(rows).toHaveLength(1); + expect(rows[0].eventType).toBe('message.created'); + expect(rows[0].status).toBe('pending'); + + release(); + await waitFor(async () => (await pendingRows(db)).length === 0); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('successful delivery deletes the row and signs the payload', async () => { + const { db } = track(openDb()); + const ws = await seedWorkspace(db); + await seedSubscription(db, ws, { secret: 'shh' }); + + const fetchMock = vi.fn(async () => new Response('ok', { status: 200 })); + vi.stubGlobal('fetch', fetchMock); + + const queue = makeQueue(db); + await enqueueEvent(db, ws, 'message.created', { text: 'hello' }); + await queue.poll(); + + expect(await pendingRows(db)).toHaveLength(0); + expect(fetchMock).toHaveBeenCalledTimes(1); + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toBe(HOOK_URL); + const headers = init.headers as Record; + expect(headers['X-Relay-Event']).toBe('message.created'); + expect(headers['X-Relay-Signature']).toMatch(/^sha256=[0-9a-f]{64}$/); + }); + + it('retryable failure keeps the row pending with attempts++ and backoff', async () => { + const { db } = track(openDb()); + const ws = await seedWorkspace(db); + await seedSubscription(db, ws); + + const fetchMock = vi.fn(async () => new Response('boom', { status: 503 })); + vi.stubGlobal('fetch', fetchMock); + + const queue = makeQueue(db, { baseBackoffMs: 60_000 }); + await enqueueEvent(db, ws, 'message.created', { text: 'retry me' }); + const before = Date.now(); + await queue.poll(); + + const [row] = await pendingRows(db); + expect(row.status).toBe('pending'); + expect(row.attempts).toBe(1); + expect(row.lastError).toMatch(/Retryable webhook delivery failures/); + // processAfter pushed out by the 60s backoff (well beyond the claim lease). + expect(row.processAfter.getTime()).toBeGreaterThan(before + 50_000); + + // Not due yet — a second poll claims nothing and sends nothing new. + const callsAfterFirstPoll = fetchMock.mock.calls.length; + await queue.poll(); + expect(fetchMock.mock.calls.length).toBe(callsAfterFirstPoll); + }); + + it('terminal 4xx settles the row as failed without retrying', async () => { + const { db } = track(openDb()); + const ws = await seedWorkspace(db); + await seedSubscription(db, ws); + + const fetchMock = vi.fn(async () => new Response('bad request', { status: 400 })); + vi.stubGlobal('fetch', fetchMock); + + const queue = makeQueue(db); + await enqueueEvent(db, ws, 'message.created', { text: 'never valid' }); + await queue.poll(); + + const [row] = await pendingRows(db); + expect(row.status).toBe('failed'); + expect(row.completedAt).not.toBeNull(); + expect(row.lastError).toMatch(/terminal delivery failure/); + expect(fetchMock).toHaveBeenCalledTimes(1); // 400 is not retried inline either + + await queue.poll(); + expect(fetchMock).toHaveBeenCalledTimes(1); // settled rows are never reclaimed + }); + + it('exhausting maxAttempts settles the row as failed', async () => { + const { db } = track(openDb()); + const ws = await seedWorkspace(db); + await seedSubscription(db, ws); + + vi.stubGlobal('fetch', vi.fn(async () => new Response('down', { status: 503 }))); + + const errors: Record[] = []; + const queue = makeQueue(db, {}, (_err, ctx) => errors.push(ctx)); + const id = await enqueueEvent(db, ws, 'message.created', { text: 'doomed' }); + await db.update(pendingEvents).set({ maxAttempts: 1 }).where(eq(pendingEvents.id, id)); + await queue.poll(); + + const [row] = await pendingRows(db); + expect(row.status).toBe('failed'); + expect(row.attempts).toBe(1); + expect(row.lastError).toMatch(/attempts exhausted/); + expect(errors).toContainEqual(expect.objectContaining({ settled: 'failed' })); + }); + + it('resumes pending deliveries after a restart over the same database', async () => { + const dir = mkdtempSync(join(tmpdir(), 'relaycast-outbox-')); + const dbPath = join(dir, 'engine.db'); + try { + // First process: persist an event, then "crash" before delivering it. + const first = openDb(dbPath); + const ws = await seedWorkspace(first.db); + await seedSubscription(first.db, ws); + await enqueueEvent(first.db, ws, 'message.created', { text: 'survive me' }); + first.sqlite.close(); + + const fetchMock = vi.fn(async () => new Response('ok', { status: 200 })); + vi.stubGlobal('fetch', fetchMock); + + // Second process: a fresh runtime over the same file resumes the outbox. + const runtime = createNodeRuntime({ + dbPath, + baseUrl: 'http://localhost:0', + presence: { sweepIntervalMs: 0 }, + eventQueue: { pollIntervalMs: 0 }, + }); + try { + await waitFor(async () => (await pendingRows(runtime.deps.db)).length === 0); + expect(fetchMock).toHaveBeenCalledTimes(1); + } finally { + runtime.close(); + } + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it('prunes settled rows older than 24h on the poll cadence', async () => { + const { db } = track(openDb()); + const ws = await seedWorkspace(db); + + const old = new Date(Date.now() - 25 * 60 * 60 * 1000); + await db.insert(pendingEvents).values([ + { + id: 'evt_old', + workspaceId: ws, + eventType: 'message.created', + payload: {}, + status: 'failed', + createdAt: old, + completedAt: old, + }, + { + id: 'evt_fresh', + workspaceId: ws, + eventType: 'message.created', + payload: {}, + status: 'failed', + completedAt: new Date(), + }, + ]); + + const queue = makeQueue(db, { cleanupIntervalMs: 0 }); + await queue.poll(); + + const rows = await pendingRows(db); + expect(rows.map((r) => r.id)).toEqual(['evt_fresh']); + }); +}); diff --git a/packages/engine/src/adapters/node/event-queue.ts b/packages/engine/src/adapters/node/event-queue.ts index 313055a2..33eb4f66 100644 --- a/packages/engine/src/adapters/node/event-queue.ts +++ b/packages/engine/src/adapters/node/event-queue.ts @@ -1,27 +1,162 @@ import type { EventQueue, QueuedEvent } from '../../ports/event-queue.js'; import type { EngineDb } from '../../ports/database.js'; import { deliverEvent } from '../../engine/eventDelivery.js'; +import { + enqueueEvent, + claimDueEvents, + completeEvent, + failEvent, + rescheduleEvent, + cleanupOldEvents, + type ClaimedEvent, +} from '../../engine/eventQueue.js'; + +const DEFAULT_POLL_INTERVAL_MS = 5_000; +const DEFAULT_BASE_BACKOFF_MS = 30_000; +const DEFAULT_MAX_BACKOFF_MS = 15 * 60_000; +const DEFAULT_LEASE_MS = 60_000; +const DEFAULT_BATCH_SIZE = 25; +const DEFAULT_CLEANUP_INTERVAL_MS = 60 * 60_000; + +export interface DurableEventQueueOptions { + /** Poll cadence for due rows. 0 disables the timer (tests drive `poll()` directly). Default 5s. */ + pollIntervalMs?: number; + /** Delay before the first retry; doubles per attempt. Default 30s. */ + baseBackoffMs?: number; + /** Retry backoff ceiling. Default 15 min. */ + maxBackoffMs?: number; + /** How long a claimed row stays invisible before a crashed worker's claim expires. Default 60s. */ + leaseMs?: number; + /** Max rows claimed per poll pass. Default 25. */ + batchSize?: number; + /** How often settled rows older than 24h are pruned. Default 1h. */ + cleanupIntervalMs?: number; +} /** - * In-process webhook delivery, replacing the Cloudflare Queue + consumer Worker. - * `send` returns immediately and delivery runs in the background via - * `deliverEvent` (which fans out to the workspace's registered subscribers). - * Single-process best-effort: failures are logged, not retried across restarts. + * Durable webhook delivery for self-host, replacing the Cloudflare Queue + DLQ. + * `send` persists a `pending_events` outbox row first, then a background poller + * claims due rows and fans them out via `deliverEvent` (HMAC signing and + * retryable-vs-terminal classification unchanged). Success deletes the row; + * terminal failures settle it as `failed`; retryable failures back off + * exponentially and survive process restarts — `start()` resumes whatever is due. */ -export class InProcessEventQueue implements EventQueue { +export class DurableEventQueue implements EventQueue { + private readonly pollIntervalMs: number; + private readonly baseBackoffMs: number; + private readonly maxBackoffMs: number; + private readonly leaseMs: number; + private readonly batchSize: number; + private readonly cleanupIntervalMs: number; + private timer: ReturnType | undefined; + private polling = false; + private lastCleanupAt = 0; + constructor( private readonly db: EngineDb, private readonly onError: (err: unknown, ctx: Record) => void = () => {}, - ) {} + options: DurableEventQueueOptions = {}, + ) { + this.pollIntervalMs = options.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS; + this.baseBackoffMs = options.baseBackoffMs ?? DEFAULT_BASE_BACKOFF_MS; + this.maxBackoffMs = options.maxBackoffMs ?? DEFAULT_MAX_BACKOFF_MS; + this.leaseMs = options.leaseMs ?? DEFAULT_LEASE_MS; + this.batchSize = options.batchSize ?? DEFAULT_BATCH_SIZE; + this.cleanupIntervalMs = options.cleanupIntervalMs ?? DEFAULT_CLEANUP_INTERVAL_MS; + } + /** + * Persist the outbox row first (awaited — the event is durable once `send` + * resolves), then kick an immediate poll so delivery is prompt. The poll path + * claims rows atomically, so the kick and the interval timer never double-deliver. + */ async send(message: QueuedEvent): Promise { - // Fire-and-forget; do not block the request path on delivery. - void (async () => { - try { - await deliverEvent(this.db, message.workspaceId, message.type, message.data); - } catch (err) { - this.onError(err, { source: 'node.event_queue', event_type: message.type }); + await enqueueEvent(this.db, message.workspaceId, message.type, message.data); + void this.poll(); + } + + /** Start the interval poller and immediately resume any rows left over from a previous run. */ + start(): void { + if (this.pollIntervalMs > 0 && !this.timer) { + this.timer = setInterval(() => { + void this.poll(); + }, this.pollIntervalMs); + // Don't keep the Node process alive solely for the outbox poller. + (this.timer as { unref?: () => void }).unref?.(); + } + void this.poll(); + } + + /** Stop the interval poller (server shutdown / test teardown). */ + stop(): void { + if (this.timer) clearInterval(this.timer); + this.timer = undefined; + } + + /** + * Claim and deliver every currently-due row, then prune old settled rows on + * the cleanup cadence. Serialized per instance: overlapping calls coalesce + * into the in-flight pass. + */ + async poll(): Promise { + if (this.polling) return; + this.polling = true; + try { + for (;;) { + const claimed = await claimDueEvents(this.db, { + limit: this.batchSize, + leaseMs: this.leaseMs, + }); + if (claimed.length === 0) break; + await Promise.all(claimed.map((event) => this.process(event))); + if (claimed.length < this.batchSize) break; + } + await this.maybeCleanup(); + } catch (err) { + this.onError(err, { source: 'node.event_queue', op: 'poll' }); + } finally { + this.polling = false; + } + } + + private async process(event: ClaimedEvent): Promise { + try { + const summary = await deliverEvent(this.db, event.workspaceId, event.eventType, event.payload); + if (summary.failed > 0) { + // deliverEvent resolved with failures and no retryables — terminal + // (non-408/429 4xx). Settle the row; retrying won't change the outcome. + await failEvent( + this.db, + event.id, + `terminal delivery failure: ${summary.failed} of ${summary.attempted} subscriber(s)`, + ); + } else { + await completeEvent(this.db, event.id); + } + } catch (err) { + // deliverEvent throws only for retryable conditions (coded 503 on + // retryable webhook failures, or a transient subscription-lookup error). + const message = err instanceof Error ? err.message : String(err); + if (event.attempts >= event.maxAttempts) { + await failEvent(this.db, event.id, `${message} (attempts exhausted)`); + this.onError(err, { + source: 'node.event_queue', + op: 'deliver', + event_type: event.eventType, + attempts: event.attempts, + settled: 'failed', + }); + } else { + const backoff = Math.min(this.baseBackoffMs * 2 ** (event.attempts - 1), this.maxBackoffMs); + await rescheduleEvent(this.db, event.id, message, backoff); } - })(); + } + } + + private async maybeCleanup(): Promise { + const now = Date.now(); + if (now - this.lastCleanupAt < this.cleanupIntervalMs) return; + this.lastCleanupAt = now; + await cleanupOldEvents(this.db); } } diff --git a/packages/engine/src/adapters/node/index.ts b/packages/engine/src/adapters/node/index.ts index 5c477507..13bc27fb 100644 --- a/packages/engine/src/adapters/node/index.ts +++ b/packages/engine/src/adapters/node/index.ts @@ -11,7 +11,7 @@ import { InProcessRealtime } from './realtime.js'; import { InProcessPresence, type InProcessPresenceOptions } from './presence.js'; import { InProcessRateLimiter } from './rate-limit.js'; import { InProcessKeyValueStore } from './kv.js'; -import { InProcessEventQueue } from './event-queue.js'; +import { DurableEventQueue, type DurableEventQueueOptions } from './event-queue.js'; import { LocalFileStorage, createFileRouteHandler, FILE_ROUTE_PREFIX } from './files.js'; export { @@ -19,7 +19,7 @@ export { InProcessPresence, InProcessRateLimiter, InProcessKeyValueStore, - InProcessEventQueue, + DurableEventQueue, LocalFileStorage, createFileRouteHandler, FILE_ROUTE_PREFIX, @@ -29,6 +29,7 @@ export { export type { EngineSocket, SocketHandle } from './realtime.js'; export type { SqliteDbHandle } from './database.js'; export type { InProcessPresenceOptions } from './presence.js'; +export type { DurableEventQueueOptions } from './event-queue.js'; export interface NodeRuntimeOptions { /** SQLite file path, or ':memory:' for tests. */ @@ -51,6 +52,8 @@ export interface NodeRuntimeOptions { config?: EngineConfig; /** Presence TTL / sweep tuning (tests use short windows). */ presence?: InProcessPresenceOptions; + /** Durable webhook outbox tuning (poll interval, backoff, cleanup cadence). */ + eventQueue?: DurableEventQueueOptions; } /** @@ -62,6 +65,8 @@ export interface NodeRuntime { deps: EngineDeps; realtime: InProcessRealtime; presence: InProcessPresence; + /** Durable webhook outbox; already started — exposed for graceful shutdown and tests. */ + webhookQueue: DurableEventQueue; /** `fetch`-style handler for the `${FILE_ROUTE_PREFIX}` upload/download routes. */ fileHandler: (request: Request) => Promise; handle: SqliteDbHandle; @@ -99,7 +104,13 @@ export function createNodeRuntime(options: NodeRuntimeOptions): NodeRuntime { options.baseUrl, options.fileSecret ?? randomBytes(32).toString('hex'), ); - const webhookQueue = new InProcessEventQueue(db, (err, ctx) => telemetry.captureException(err, ctx)); + const webhookQueue = new DurableEventQueue( + db, + (err, ctx) => telemetry.captureException(err, ctx), + options.eventQueue, + ); + // Resume any deliveries left over from a previous process (the outbox's point). + webhookQueue.start(); const auth = options.auth ?? new SqliteApiKeyAuthProvider(); const entitlements = options.entitlements ?? new StaticEntitlementsProvider(kv); @@ -123,10 +134,12 @@ export function createNodeRuntime(options: NodeRuntimeOptions): NodeRuntime { deps, realtime, presence, + webhookQueue, fileHandler: createFileRouteHandler(fileStorage), handle, close() { presence.stop(); + webhookQueue.stop(); kv.dispose(); try { handle.sqlite.close(); diff --git a/packages/engine/src/engine/eventQueue.ts b/packages/engine/src/engine/eventQueue.ts index 0b0e25ad..8cd1a635 100644 --- a/packages/engine/src/engine/eventQueue.ts +++ b/packages/engine/src/engine/eventQueue.ts @@ -1,10 +1,20 @@ -import { and, lte, inArray } from 'drizzle-orm'; +import { and, asc, eq, inArray, lt, lte, sql } from 'drizzle-orm'; import type { getDb } from '../db/index.js'; import { pendingEvents } from '../db/schema.js'; import { generateId } from './snowflake.js'; type Db = ReturnType; +/** A `pending_events` row claimed for delivery (attempts already incremented). */ +export interface ClaimedEvent { + id: string; + workspaceId: string; + eventType: string; + payload: Record; + attempts: number; + maxAttempts: number; +} + export async function enqueueEvent( db: Db, workspaceId: string, @@ -23,6 +33,85 @@ export async function enqueueEvent( return id; } +/** + * Atomically claim up to `limit` due events: `pending`, `process_after <= now`, + * `attempts < max_attempts`. Claiming increments `attempts` and pushes + * `process_after` out by `leaseMs`, so a worker that crashes mid-delivery + * leaves the row reclaimable once the lease expires (one attempt consumed). + * + * Single `UPDATE ... WHERE id IN (subquery) RETURNING` statement — atomic on + * both drivers per the no-interactive-transactions doctrine (ports/database.ts). + */ +export async function claimDueEvents( + db: Db, + opts: { limit?: number; leaseMs?: number; now?: Date } = {}, +): Promise { + const now = opts.now ?? new Date(); + const limit = opts.limit ?? 25; + const leaseMs = opts.leaseMs ?? 60_000; + + const due = db + .select({ id: pendingEvents.id }) + .from(pendingEvents) + .where( + and( + eq(pendingEvents.status, 'pending'), + lte(pendingEvents.processAfter, now), + lt(pendingEvents.attempts, pendingEvents.maxAttempts), + ), + ) + .orderBy(asc(pendingEvents.processAfter)) + .limit(limit); + + const claimed = await db + .update(pendingEvents) + .set({ + attempts: sql`${pendingEvents.attempts} + 1`, + processAfter: new Date(now.getTime() + leaseMs), + }) + .where(inArray(pendingEvents.id, due)) + .returning(); + + return claimed.map((row) => ({ + id: row.id, + workspaceId: row.workspaceId, + eventType: row.eventType, + payload: row.payload as Record, + attempts: row.attempts, + maxAttempts: row.maxAttempts, + })); +} + +/** Delivery succeeded — drop the row. */ +export async function completeEvent(db: Db, id: string): Promise { + await db.delete(pendingEvents).where(eq(pendingEvents.id, id)); +} + +/** + * Terminal failure (non-retryable response or attempts exhausted) — settle the + * row as `failed` so it stops being claimed; `cleanupOldEvents` prunes it later. + * The status guard keeps this idempotent and preserves the first failure. + */ +export async function failEvent(db: Db, id: string, error: string): Promise { + await db + .update(pendingEvents) + .set({ status: 'failed', lastError: error, completedAt: new Date() }) + .where(and(eq(pendingEvents.id, id), eq(pendingEvents.status, 'pending'))); +} + +/** Retryable failure — keep the row `pending` and make it due again after `backoffMs`. */ +export async function rescheduleEvent( + db: Db, + id: string, + error: string, + backoffMs: number, +): Promise { + await db + .update(pendingEvents) + .set({ processAfter: new Date(Date.now() + backoffMs), lastError: error }) + .where(and(eq(pendingEvents.id, id), eq(pendingEvents.status, 'pending'))); +} + // Cleanup completed and failed events older than given age (default 24h) export async function cleanupOldEvents(db: Db, maxAgeMs = 24 * 60 * 60 * 1000): Promise { const cutoff = new Date(Date.now() - maxAgeMs); diff --git a/packages/engine/src/ports/event-queue.ts b/packages/engine/src/ports/event-queue.ts index 60919263..b0a3586e 100644 --- a/packages/engine/src/ports/event-queue.ts +++ b/packages/engine/src/ports/event-queue.ts @@ -4,8 +4,9 @@ * After a mutation, routes enqueue a delivery message; the consumer fans it out * to the workspace's registered webhook subscribers (`engine/eventDelivery.ts`). * On Cloudflare this is a Queue producer + a separate consumer Worker. The Node - * adapter implements it as an in-process async worker that calls `deliverEvent` - * directly (best-effort, single process). + * adapter implements it as a durable `pending_events` outbox: `send` persists + * the row, and a background poller delivers via `deliverEvent` with retries + * that survive restarts. */ export interface QueuedEvent { type: string; From 81c856557e1f27b38f1b3c5d31a2325744950656 Mon Sep 17 00:00:00 2001 From: "agent-relay-code[bot]" Date: Tue, 9 Jun 2026 20:02:10 +0000 Subject: [PATCH 2/4] chore: apply pr-reviewer fixes for #175 --- memory/workspace/.relay/state.json | 1 + 1 file changed, 1 insertion(+) create mode 100644 memory/workspace/.relay/state.json diff --git a/memory/workspace/.relay/state.json b/memory/workspace/.relay/state.json new file mode 100644 index 00000000..a364038d --- /dev/null +++ b/memory/workspace/.relay/state.json @@ -0,0 +1 @@ +{"workspaceId":"rw_7ccfea89","remoteRoot":"/memory/workspace","localRoot":"/home/daytona/workspace/memory/workspace","mode":"poll","syncMode":"mirror","intervalMs":5000,"lastReconcileAt":"2026-06-09T20:02:06.149373259Z","lastSuccessfulReconcileAt":"2026-06-09T20:02:06.149373259Z","staleAfter":"2026-06-09T20:02:16.149373259Z","status":"ready","states":{"stale":false,"offline":false,"hasConflicts":false,"hasPendingWriteback":false},"pendingWriteback":0,"pendingConflicts":0,"deniedPaths":0,"counters":{"snapshotDeleteBlocked":8},"circuit":{"open":false,"openedAt":"0001-01-01T00:00:00Z","windowMs":60000,"cooldownMs":30000,"threshold":5,"nextRetry":"0001-01-01T00:00:00Z"},"outbox":{"pending":0,"needsAttention":0,"failed":0,"acked":0}} \ No newline at end of file From c02656a03b5e39326d7379dbe313831cb9f57979 Mon Sep 17 00:00:00 2001 From: "agent-relay-code[bot]" Date: Tue, 9 Jun 2026 20:05:32 +0000 Subject: [PATCH 3/4] chore: apply pr-reviewer fixes for #175 --- memory/workspace/.relay/state.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memory/workspace/.relay/state.json b/memory/workspace/.relay/state.json index a364038d..57123402 100644 --- a/memory/workspace/.relay/state.json +++ b/memory/workspace/.relay/state.json @@ -1 +1 @@ -{"workspaceId":"rw_7ccfea89","remoteRoot":"/memory/workspace","localRoot":"/home/daytona/workspace/memory/workspace","mode":"poll","syncMode":"mirror","intervalMs":5000,"lastReconcileAt":"2026-06-09T20:02:06.149373259Z","lastSuccessfulReconcileAt":"2026-06-09T20:02:06.149373259Z","staleAfter":"2026-06-09T20:02:16.149373259Z","status":"ready","states":{"stale":false,"offline":false,"hasConflicts":false,"hasPendingWriteback":false},"pendingWriteback":0,"pendingConflicts":0,"deniedPaths":0,"counters":{"snapshotDeleteBlocked":8},"circuit":{"open":false,"openedAt":"0001-01-01T00:00:00Z","windowMs":60000,"cooldownMs":30000,"threshold":5,"nextRetry":"0001-01-01T00:00:00Z"},"outbox":{"pending":0,"needsAttention":0,"failed":0,"acked":0}} \ No newline at end of file +{"workspaceId":"rw_7ccfea89","remoteRoot":"/memory/workspace","localRoot":"/home/daytona/workspace/memory/workspace","mode":"poll","syncMode":"mirror","intervalMs":5000,"lastReconcileAt":"2026-06-09T20:05:28.507311396Z","lastSuccessfulReconcileAt":"2026-06-09T20:05:28.507311396Z","staleAfter":"2026-06-09T20:05:38.507311396Z","status":"ready","states":{"stale":false,"offline":false,"hasConflicts":false,"hasPendingWriteback":false},"pendingWriteback":0,"pendingConflicts":0,"deniedPaths":0,"counters":{"snapshotDeleteBlocked":27},"circuit":{"open":false,"openedAt":"0001-01-01T00:00:00Z","windowMs":60000,"cooldownMs":30000,"threshold":5,"nextRetry":"0001-01-01T00:00:00Z"},"outbox":{"pending":0,"needsAttention":0,"failed":0,"acked":0}} \ No newline at end of file From ed6e00a541854ab59d08d621489bd4f6f5d30ef6 Mon Sep 17 00:00:00 2001 From: "agent-relay-code[bot]" Date: Tue, 9 Jun 2026 20:09:47 +0000 Subject: [PATCH 4/4] chore: apply pr-reviewer fixes for #175 --- memory/workspace/.relay/state.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memory/workspace/.relay/state.json b/memory/workspace/.relay/state.json index 57123402..093da81b 100644 --- a/memory/workspace/.relay/state.json +++ b/memory/workspace/.relay/state.json @@ -1 +1 @@ -{"workspaceId":"rw_7ccfea89","remoteRoot":"/memory/workspace","localRoot":"/home/daytona/workspace/memory/workspace","mode":"poll","syncMode":"mirror","intervalMs":5000,"lastReconcileAt":"2026-06-09T20:05:28.507311396Z","lastSuccessfulReconcileAt":"2026-06-09T20:05:28.507311396Z","staleAfter":"2026-06-09T20:05:38.507311396Z","status":"ready","states":{"stale":false,"offline":false,"hasConflicts":false,"hasPendingWriteback":false},"pendingWriteback":0,"pendingConflicts":0,"deniedPaths":0,"counters":{"snapshotDeleteBlocked":27},"circuit":{"open":false,"openedAt":"0001-01-01T00:00:00Z","windowMs":60000,"cooldownMs":30000,"threshold":5,"nextRetry":"0001-01-01T00:00:00Z"},"outbox":{"pending":0,"needsAttention":0,"failed":0,"acked":0}} \ No newline at end of file +{"workspaceId":"rw_7ccfea89","remoteRoot":"/memory/workspace","localRoot":"/home/daytona/workspace/memory/workspace","mode":"poll","syncMode":"mirror","intervalMs":5000,"lastReconcileAt":"2026-06-09T20:09:43.728326751Z","lastSuccessfulReconcileAt":"2026-06-09T20:09:43.728326751Z","staleAfter":"2026-06-09T20:09:53.728326751Z","status":"ready","states":{"stale":false,"offline":false,"hasConflicts":false,"hasPendingWriteback":false},"pendingWriteback":0,"pendingConflicts":0,"deniedPaths":0,"counters":{"snapshotDeleteBlocked":28},"circuit":{"open":false,"openedAt":"0001-01-01T00:00:00Z","windowMs":60000,"cooldownMs":30000,"threshold":5,"nextRetry":"0001-01-01T00:00:00Z"},"outbox":{"pending":0,"needsAttention":0,"failed":0,"acked":0}} \ No newline at end of file