feat(engine): atomic multi-statement write paths via optional transaction capability#174
Conversation
…tion capability The database port gains an optional TransactionCapability that adapters attach when their driver supports interactive transactions, plus a runAtomic(db, fn) helper that uses it when present and falls back to plain sequential statements otherwise (unchanged D1 behavior). The Node better-sqlite3 adapter implements the capability with manual BEGIN IMMEDIATE / COMMIT / ROLLBACK, serialized through a promise queue so concurrent requests on the shared connection cannot interleave with an open transaction. Wrapped write paths (DB writes only — realtime/webhook fanout stays in routes, outside the transaction): - channel message send (message + attachments + deliveries + message_log) - DM send (message + attachments + delivery + message_log) - group DM send (message + attachments + deliveries) - thread reply (reply + deliveries) - markRead (read receipt + delivery transition + lastReadId) On self-host, a failure mid-send no longer leaves a message row with no delivery rows (silent durable-delivery loss). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
|
Warning Review limit reached
More reviews will be available in 51 minutes. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR adds a TransactionCapability and runAtomic helper, implements withTransaction in the Node SQLite adapter, wraps core durable write paths (messages, DMs, group DMs, replies, receipts) in adapter-aware atomic blocks, adds tests for rollback/concurrency/fallback, and converts a persisted workspace state JSON to snake_case. ChangesTransaction atomicity integration
Workspace state format
Sequence DiagramsequenceDiagram
participant postMessage as postMessage()
participant runAtomic as runAtomic(db)
participant adapter as NodeEngineDb.withTransaction()
participant sqlite as SQLite (BEGIN IMMEDIATE)
participant callback as TX callback (tx param)
postMessage->>runAtomic: async write function
runAtomic->>adapter: check for TransactionCapability
adapter->>sqlite: BEGIN IMMEDIATE
adapter->>callback: execute callback with tx handle
callback->>sqlite: INSERT message via tx
callback->>sqlite: INSERT attachments via tx
callback->>sqlite: INSERT deliveries via tx
callback->>sqlite: INSERT messageLogs via tx
callback-->>adapter: return { message, agent, attachments, deliveryRecords }
adapter->>sqlite: COMMIT
sqlite-->>adapter: success
adapter-->>runAtomic: return result
runAtomic-->>postMessage: result
Note over adapter,sqlite: On error: ROLLBACK all mutations<br/>then rethrow (sequential fallback if no capability)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a45fd66fff
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| sqlite.exec('BEGIN IMMEDIATE'); | ||
| try { | ||
| const result = await fn(db as unknown as EngineDb); |
There was a problem hiding this comment.
Prevent unrelated statements from joining transactions
Because the transaction is opened on the shared SQLite connection and fn receives the global db handle, any request that does not go through runAtomic can still issue statements while this callback is between awaits; I checked createGroupDm, which still does multiple plain await db.insert(...) calls outside runAtomic. In that concurrent scenario those unrelated writes become part of the open transaction and can be committed or rolled back with this request, so a failed channel/DM send can silently discard another successful operation.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
packages/engine/src/__tests__/atomicity.test.ts (1)
1-176: ⚡ Quick winConsider adding atomicity tests for the two remaining write paths.
The test suite comprehensively validates atomicity for channel sends, DM sends, and read receipts, but the PR objectives state that five write paths were transactionalized. Missing coverage:
- Group DM send (
postGroupMessage): message + attachments + deliveries- Thread reply (
postReply): reply + deliveriesAdding mid-operation failure tests for these paths (similar to lines 82–95 and 97–109) would ensure all transactionalized operations roll back correctly and provide complete validation of the transaction integration.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/engine/src/__tests__/atomicity.test.ts` around lines 1 - 176, Tests are missing atomicity coverage for postGroupMessage and postReply; add two tests modeled after the existing cases for channel DM and channel sends: for postGroupMessage seed a workspace/group with members, injectInsertFailure on deliveries (and attachments if applicable) before calling postGroupMessage and expect the injected error, restore, then assert no orphan rows exist in messages, messageLogs, deliveries (and attachments); for postReply seed a channel/thread, injectInsertFailure on deliveries before calling postReply and expect the injected error, restore, then assert replies, messageLogs and deliveries were rolled back; use the existing helpers injectInsertFailure, injectUpdateFailure, seed, and the db handle to locate where to add these tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@memory/workspace/.relay/state.json`:
- Line 1: Convert every camelCase JSON key in this state object to snake_case to
match the project's wire-format guidelines: rename top-level keys
workspaceId→workspace_id, remoteRoot→remote_root, localRoot→local_root,
syncMode→sync_mode, intervalMs→interval_ms, lastReconcileAt→last_reconcile_at,
lastSuccessfulReconcileAt→last_successful_reconcile_at, staleAfter→stale_after,
pendingWriteback→pending_writeback, pendingConflicts→pending_conflicts,
deniedPaths→denied_paths; inside "states" rename hasConflicts→has_conflicts and
hasPendingWriteback→has_pending_writeback; inside "counters" rename
snapshotDeleteBlocked→snapshot_delete_blocked; inside "circuit" rename
openedAt→opened_at, windowMs→window_ms, cooldownMs→cooldown_ms,
nextRetry→next_retry; inside "outbox" rename needsAttention→needs_attention —
update the JSON keys accordingly so all wire fields use snake_case while
preserving values and structure.
---
Nitpick comments:
In `@packages/engine/src/__tests__/atomicity.test.ts`:
- Around line 1-176: Tests are missing atomicity coverage for postGroupMessage
and postReply; add two tests modeled after the existing cases for channel DM and
channel sends: for postGroupMessage seed a workspace/group with members,
injectInsertFailure on deliveries (and attachments if applicable) before calling
postGroupMessage and expect the injected error, restore, then assert no orphan
rows exist in messages, messageLogs, deliveries (and attachments); for postReply
seed a channel/thread, injectInsertFailure on deliveries before calling
postReply and expect the injected error, restore, then assert replies,
messageLogs and deliveries were rolled back; use the existing helpers
injectInsertFailure, injectUpdateFailure, seed, and the db handle to locate
where to add these tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: ab24a02c-32f9-4d67-bd9f-0e1c3793a92e
📒 Files selected for processing (10)
memory/workspace/.relay/state.jsonpackages/engine/src/__tests__/atomicity.test.tspackages/engine/src/adapters/node/database.tspackages/engine/src/engine/dm.tspackages/engine/src/engine/groupDm.tspackages/engine/src/engine/message.tspackages/engine/src/engine/receipt.tspackages/engine/src/engine/thread.tspackages/engine/src/ports/database.tspackages/engine/src/ports/index.ts
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/engine/src/engine/groupDm.ts (1)
191-238:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRecheck membership inside the atomic block.
The authorization and conversation reads on Lines 114-145 happen before
runAtomic, but the Node adapter queueswithTransactioncalls. A sender can be removed from the group while waiting for the transaction slot and still get a committed message, because Lines 191-238 never revalidatedmParticipants.leftAtor the conversation row ontx. Please move those reads into the atomic block, or re-check them withtxbefore inserting the message.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/engine/src/engine/groupDm.ts` around lines 191 - 238, The code inserts a message and delivery rows inside runAtomic but still uses earlier non-transactional reads for authorization/membership; re-check membership and conversation state inside the same transaction to avoid race conditions. Move the SELECT of dmParticipants (and any conversation/authorization reads formerly done before runAtomic) into the runAtomic callback using the tx instance, verify the sender is still present (dmParticipants.leftAt IS NULL) and the conversation row is still valid on tx before inserting into messages; if revalidation fails, abort the transaction (throw or return an error) so no message or deliveries (rows created with generateId and inserted into messages/deliveries) are committed. Ensure you reference and update the same table names used here (dmParticipants, conversation(s), messages, deliveries) and keep the existing insertion logic for attachments and deliveries inside that atomic block.
🧹 Nitpick comments (1)
packages/engine/src/engine/groupDm.ts (1)
87-93: ⚡ Quick winBatch the participant inserts into one statement.
Lines 88-93 keep the transaction open for one insert per member. On the Node adapter, transactional callers are serialized, so larger groups block unrelated writers longer than necessary. A single multi-row insert keeps the atomic section shorter.
Suggested refactor
- // Add creator + all participants - for (const agentId of uniqueIds) { - await tx.insert(dmParticipants).values({ - conversationId, - agentId, - }); - } + // Add creator + all participants + await tx.insert(dmParticipants).values( + uniqueIds.map((agentId) => ({ + conversationId, + agentId, + })), + );🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/engine/src/engine/groupDm.ts` around lines 87 - 93, The loop that does one insert per participant (iterating uniqueIds and calling tx.insert(dmParticipants).values({ conversationId, agentId })) keeps the transaction open longer; batch the inserts into a single multi-row insert instead: build an array of participant objects using the existing conversationId and each agentId from uniqueIds, then call tx.insert(dmParticipants).values(participantArray) once (await that) to shorten the atomic section and reduce contention.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@packages/engine/src/engine/groupDm.ts`:
- Around line 191-238: The code inserts a message and delivery rows inside
runAtomic but still uses earlier non-transactional reads for
authorization/membership; re-check membership and conversation state inside the
same transaction to avoid race conditions. Move the SELECT of dmParticipants
(and any conversation/authorization reads formerly done before runAtomic) into
the runAtomic callback using the tx instance, verify the sender is still present
(dmParticipants.leftAt IS NULL) and the conversation row is still valid on tx
before inserting into messages; if revalidation fails, abort the transaction
(throw or return an error) so no message or deliveries (rows created with
generateId and inserted into messages/deliveries) are committed. Ensure you
reference and update the same table names used here (dmParticipants,
conversation(s), messages, deliveries) and keep the existing insertion logic for
attachments and deliveries inside that atomic block.
---
Nitpick comments:
In `@packages/engine/src/engine/groupDm.ts`:
- Around line 87-93: The loop that does one insert per participant (iterating
uniqueIds and calling tx.insert(dmParticipants).values({ conversationId, agentId
})) keeps the transaction open longer; batch the inserts into a single multi-row
insert instead: build an array of participant objects using the existing
conversationId and each agentId from uniqueIds, then call
tx.insert(dmParticipants).values(participantArray) once (await that) to shorten
the atomic section and reduce contention.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: d670638c-1071-4513-99ca-b45aee5117ea
📒 Files selected for processing (4)
memory/workspace/.relay/state.jsonpackages/engine/src/__tests__/atomicity.test.tspackages/engine/src/adapters/node/database.tspackages/engine/src/engine/groupDm.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/engine/src/adapters/node/database.ts
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
* feat(engine): atomic multi-statement write paths via optional transaction capability The database port gains an optional TransactionCapability that adapters attach when their driver supports interactive transactions, plus a runAtomic(db, fn) helper that uses it when present and falls back to plain sequential statements otherwise (unchanged D1 behavior). The Node better-sqlite3 adapter implements the capability with manual BEGIN IMMEDIATE / COMMIT / ROLLBACK, serialized through a promise queue so concurrent requests on the shared connection cannot interleave with an open transaction. Wrapped write paths (DB writes only — realtime/webhook fanout stays in routes, outside the transaction): - channel message send (message + attachments + deliveries + message_log) - DM send (message + attachments + delivery + message_log) - group DM send (message + attachments + deliveries) - thread reply (reply + deliveries) - markRead (read receipt + delivery transition + lastReadId) On self-host, a failure mid-send no longer leaves a message row with no delivery rows (silent durable-delivery loss). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> * chore: apply pr-reviewer fixes for #174 * chore: apply pr-reviewer fixes for #174 * chore: apply pr-reviewer fixes for #174 * chore: apply pr-reviewer fixes for #174 * feat(engine): atomic write batches for D1/hosted handles Write paths gained transactional atomicity on Node in the transactional-write-paths change, but the hosted Cloudflare deployment runs on D1, which has no interactive transactions — a crash between the message insert and the deliveries insert still left a message with no delivery rows. D1 does execute db.batch([...]) atomically, and drizzle's DrizzleD1Database exposes batch() natively, so the hosted handle can get all-or-nothing writes with zero cloud-side changes. - ports/database.ts: add AtomicWrite (a built-but-unexecuted drizzle statement) and BatchCapability (D1-style atomic batch), and replace runAtomic(fn) with runAtomicWrites(db, statements). Resolution order: withTransaction (Node) -> batch (D1, detected structurally since only atomic-batch drivers expose the method; better-sqlite3's drizzle instance has no batch member) -> sequential (bare handles, historical behavior). - The five multi-statement write paths (channel send, DM send, group DM send, thread reply, markRead) now do all reads up front and hand runAtomicWrites a pure statement list, so the same list runs under a transaction, one atomic batch, or sequentially. No write depends on a prior write's DB-returned value (IDs are app-generated snowflakes); .returning() rows are recovered from the per-statement results. - message.ts reads attachment details directly from files by id before the writes (the junction rows don't exist yet mid-batch); dm.ts builds the message+attachment inserts via buildDmMessageWrites; console.ts gains buildMessageLogWrite so the log insert can join the batch. - Tests: fake D1-style batch handle (records SQL, executes all-or-nothing) asserting each path issues exactly one batch with the expected statement kinds, batch failure leaves no orphan rows, and bare handles still run sequentially. Failure injection now fires at statement execution (mid-atomic-unit) rather than at build time. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> * chore: apply pr-reviewer fixes for #179 * chore: apply pr-reviewer fixes for #179 --------- Co-authored-by: Claude Fable 5 <noreply@anthropic.com> Co-authored-by: agent-relay-code[bot] <agent-relay-code[bot]@users.noreply.github.com>
Problem
Engine write paths are non-transactional because D1 forbids interactive transactions (
src/ports/database.tsdoctrine). Message send runs 4+ sequential statements (message → attachments → per-recipient deliveries → message_log); a crash mid-way persists a message with no delivery rows — silent durable-delivery loss for every recipient. Same shape in DM send, group DM send, thread reply, andmarkRead's triple write. The Node self-host adapter runs better-sqlite3, which supports transactions fine — the D1 limitation was being imposed on self-hosters unnecessarily.Approach
src/ports/database.ts): optionalTransactionCapability { withTransaction<T>(fn: (tx: EngineDb) => Promise<T>): Promise<T> }an adapter may attach to its handle, plusrunAtomic(db, fn)which uses the capability when present and otherwise runsfndirectly (today's sequential behavior). The D1 doctrine note is kept, now scoped to adapters that lack the capability: cross-row invariants that must hold on every adapter still need single-statement atomicity.src/adapters/node/database.ts): implements the capability with manualBEGIN IMMEDIATE/COMMIT/ROLLBACKon the shared connection. Drizzle's better-sqlite3db.transaction()requires a sync callback, so it can't host the engine's async write paths. Transactions are serialized through a promise queue so a concurrent request's statement can't interleave into (and roll back with) an open transaction. No D1 / cloud changes.runAtomic, business logic unchanged:engine/message.tsengine/dm.tsengine/groupDm.tsengine/thread.tsmarkReadtriple write (read receipt + delivery transition + lastReadId) —engine/receipt.tsFire-and-forget fanout (WS broadcast, webhook queue) lives in routes and runs after the engine functions return, so it stays outside the transaction. External I/O (A2A forwarding in
sendDm) also stays outside.Tests
New
src/__tests__/atomicity.test.ts(6 tests) on the Node adapter:markRead→ receipt + delivery transition rolled back; retry succeedsResults: full engine suite 61/61 passing (8 files),
tscbuild clean,eslintclean (npx turbo build/test --filter=@relaycast/engine).Notes
withTransactionduring one of the transaction's await points share the single better-sqlite3 connection and would join the open transaction. The promise queue closes this for the multi-statement writers (which all go throughwithTransaction); single-statement writes are atomic on their own.🤖 Generated with Claude Code