fix(broker): make delivery handling durable and observable#1073
Conversation
Four delivery-durability fixes in the Rust broker: - Timeout-fallback acks are no longer conflated with verified successes: pty_worker records a new DeliveryOutcome::Unverified in the injection throttle (breaks the success streak without backing off), and the broker forwards verification/reason on delivery_verified events so unverified deliveries are observable. The fallback ack itself is kept — re-injection stays disabled to avoid duplicate deliveries. - Graceful shutdown no longer clears pending deliveries: a non-empty pending map is persisted (atomic temp-file rename) for redelivery on the next start, with a warn-level count; the pending file is only removed when the map is actually empty. - Pending deliveries are persisted on every map mutation (enqueue, ack/remove, retry bookkeeping) via a dirty-tracking PendingDeliveryStore flushed by the event loop, instead of only on the 500ms maintenance tick. - Per-worker queue-cap evictions (MAX_PENDING_PER_WORKER) now emit a delivery_dropped broker event instead of only a tracing warning; queue_inbound_for_delivery_mode surfaces the evicted sender to both the HTTP API and relaycast inbound call sites. BrokerEvent::DeliveryVerified gains optional verification/reason fields, mirrored in @agent-relay/harness-driver protocol types. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
📝 WalkthroughWalkthroughThe broker's delivery-state durability improves through a dirty-tracked persistent storage abstraction, queue-cap evictions surface as broker events, and timeout-fallback acknowledgments are distinguished from verified deliveries via a new ChangesDelivery Durability & Observability
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint install failed due to a network error. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI. |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
packages/harness-driver/src/protocol.ts (1)
348-350: ⚡ Quick winNarrow
verificationto known literals instead of plainstring.Using
'echo' | 'timeout_fallback'here makes downstream handling safer and self-documenting.Suggested fix
- /** 'echo' when confirmed in PTY output, 'timeout_fallback' when acked unverified. */ - verification?: string; + /** 'echo' when confirmed in PTY output, 'timeout_fallback' when acked unverified. */ + verification?: 'echo' | 'timeout_fallback'; reason?: string;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/harness-driver/src/protocol.ts` around lines 348 - 350, The verification property on the relevant type/interface should be narrowed from a plain string to a union of the known literal values; change the type of verification to 'echo' | 'timeout_fallback' (instead of string) in the interface definition in protocol.ts and update any consumers that assume arbitrary strings (e.g., code reading verification, switch/case or comparisons) to handle these two literals explicitly or add a fallback branch to keep behavior unchanged; ensure TypeScript compiles and adjust tests/types that relied on broader typing.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@CHANGELOG.md`:
- Around line 125-128: Rewrite the four CHANGELOG.md bullets for
agent-relay-broker to remove implementation/internal details and make them
impact-first: consolidate each into a concise user-visible change for
"agent-relay-broker" (e.g., persist pending deliveries across restarts; persist
pending state more durably to prevent loss on crashes; make timeout-fallback
acks observable via delivery_verified with verification: \"timeout_fallback\"
and include reason; emit delivery_dropped when the per-worker queue evicts
messages), omitting mentions of maintenance ticks, file removal logic, eviction
cap values (256), and wording about logging vs events — keep only the observable
behavioral changes and their user-facing effects.
In `@crates/broker/src/protocol.rs`:
- Around line 401-406: The DeliveryVerified enum variant in WorkerToBroker must
be extended to include the new verification and reason metadata so the Rust
protocol matches the struct fields; update the WorkerToBroker::DeliveryVerified
variant signature to add verification: Option<String> and reason: Option<String>
(with the same serde(default, skip_serializing_if = "Option::is_none") semantics
as the surrounding struct), then update every construction, pattern-match, and
serialization/deserialization site that creates or deconstructs
WorkerToBroker::DeliveryVerified (e.g., builders, matches, and tests) to accept
and forward these two Option<String> fields so they are preserved across Rust↔TS
round-trips.
In `@crates/broker/src/runtime/event_loop.rs`:
- Around line 127-137: The current flush_pending_deliveries clears the dirty
flag by calling self.pending_deliveries.take_dirty() before attempting
save_pending_deliveries, so if the write fails the dirty state is lost; change
the flow in flush_pending_deliveries to first check the dirty state without
clearing (use an is_dirty-like check instead of take_dirty), proceed with
save_pending_deliveries(&self.paths.pending, &self.pending_deliveries), and only
clear/take the dirty flag (call take_dirty() or an explicit clear) after a
successful save; if save_pending_deliveries returns Err, leave the dirty flag
set so pending mutations will be retried later (refer to
flush_pending_deliveries, pending_deliveries.take_dirty(),
save_pending_deliveries, and self.paths.persist).
---
Nitpick comments:
In `@packages/harness-driver/src/protocol.ts`:
- Around line 348-350: The verification property on the relevant type/interface
should be narrowed from a plain string to a union of the known literal values;
change the type of verification to 'echo' | 'timeout_fallback' (instead of
string) in the interface definition in protocol.ts and update any consumers that
assume arbitrary strings (e.g., code reading verification, switch/case or
comparisons) to handle these two literals explicitly or add a fallback branch to
keep behavior unchanged; ensure TypeScript compiles and adjust tests/types that
relied on broader typing.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: af257686-ffad-4c59-bf1b-41203c2964b8
📒 Files selected for processing (17)
.agentworkforce/trajectories/active/traj_b1jrutolckfb/trajectory.json.agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb.trace.json.agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/summary.md.agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/trajectory.jsonCHANGELOG.mdcrates/broker/src/broker/delivery_verification.rscrates/broker/src/protocol.rscrates/broker/src/pty_worker.rscrates/broker/src/runtime/api.rscrates/broker/src/runtime/delivery.rscrates/broker/src/runtime/event_loop.rscrates/broker/src/runtime/init.rscrates/broker/src/runtime/maintenance.rscrates/broker/src/runtime/relaycast_events.rscrates/broker/src/runtime/tests.rscrates/broker/src/runtime/worker_events.rspackages/harness-driver/src/protocol.ts
💤 Files with no reviewable changes (1)
- .agentworkforce/trajectories/active/traj_b1jrutolckfb/trajectory.json
| - `agent-relay-broker` no longer discards pending deliveries on graceful shutdown: a non-empty pending map is persisted for redelivery on the next start, and the pending file is only removed when nothing is pending. | ||
| - `agent-relay-broker` persists pending deliveries on every change (enqueue, ack, retry) instead of only on the 500ms maintenance tick, so a crash between ticks cannot lose queued messages. | ||
| - `agent-relay-broker` timeout-fallback delivery acks are now observable: `delivery_verified` events carry `verification: "timeout_fallback"` plus a reason, and unverified deliveries no longer count as successes in the injection throttle. | ||
| - `agent-relay-broker` emits a `delivery_dropped` event when the per-worker pending queue cap (256) evicts the oldest message, instead of only logging a warning. |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Remove implementation details per coding guidelines.
The changelog entries include implementation details that should be omitted according to the guideline "Omit issue/PR links, internal notes, and implementation details." As per coding guidelines, changelog entries should be concise and impact-first, focusing on user-visible changes.
📝 Suggested simplifications
-agent-relay-broker no longer discards pending deliveries on graceful shutdown: a non-empty pending map is persisted for redelivery on the next start, and the pending file is only removed when nothing is pending.
+agent-relay-broker no longer discards pending deliveries on graceful shutdown: pending deliveries are persisted for redelivery on the next start.-agent-relay-broker persists pending deliveries on every change (enqueue, ack, retry) instead of only on the 500ms maintenance tick, so a crash between ticks cannot lose queued messages.
+agent-relay-broker persists pending deliveries immediately on every change (enqueue, ack, retry) instead of only periodically, preventing message loss on crash.-agent-relay-broker timeout-fallback delivery acks are now observable: delivery_verified events carry verification: "timeout_fallback" plus a reason, and unverified deliveries no longer count as successes in the injection throttle.
+agent-relay-broker timeout-fallback delivery acks are now observable: delivery_verified events carry verification: "timeout_fallback" plus a reason, distinguishing them from echo-verified successes.-agent-relay-broker emits a delivery_dropped event when the per-worker pending queue cap (256) evicts the oldest message, instead of only logging a warning.
+agent-relay-broker emits a delivery_dropped event when the per-worker pending queue capacity is exceeded and the oldest message is evicted, instead of only logging a warning.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - `agent-relay-broker` no longer discards pending deliveries on graceful shutdown: a non-empty pending map is persisted for redelivery on the next start, and the pending file is only removed when nothing is pending. | |
| - `agent-relay-broker` persists pending deliveries on every change (enqueue, ack, retry) instead of only on the 500ms maintenance tick, so a crash between ticks cannot lose queued messages. | |
| - `agent-relay-broker` timeout-fallback delivery acks are now observable: `delivery_verified` events carry `verification: "timeout_fallback"` plus a reason, and unverified deliveries no longer count as successes in the injection throttle. | |
| - `agent-relay-broker` emits a `delivery_dropped` event when the per-worker pending queue cap (256) evicts the oldest message, instead of only logging a warning. | |
| - `agent-relay-broker` no longer discards pending deliveries on graceful shutdown: pending deliveries are persisted for redelivery on the next start. | |
| - `agent-relay-broker` persists pending deliveries immediately on every change (enqueue, ack, retry) instead of only periodically, preventing message loss on crash. | |
| - `agent-relay-broker` timeout-fallback delivery acks are now observable: `delivery_verified` events carry `verification: "timeout_fallback"` plus a reason, distinguishing them from echo-verified successes. | |
| - `agent-relay-broker` emits a `delivery_dropped` event when the per-worker pending queue capacity is exceeded and the oldest message is evicted, instead of only logging a warning. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@CHANGELOG.md` around lines 125 - 128, Rewrite the four CHANGELOG.md bullets
for agent-relay-broker to remove implementation/internal details and make them
impact-first: consolidate each into a concise user-visible change for
"agent-relay-broker" (e.g., persist pending deliveries across restarts; persist
pending state more durably to prevent loss on crashes; make timeout-fallback
acks observable via delivery_verified with verification: \"timeout_fallback\"
and include reason; emit delivery_dropped when the per-worker queue evicts
messages), omitting mentions of maintenance ticks, file removal logic, eviction
cap values (256), and wording about logging vs events — keep only the observable
behavioral changes and their user-facing effects.
Source: Coding guidelines
| /// "echo" when confirmed in PTY output, "timeout_fallback" when the | ||
| /// delivery was acked without echo verification. | ||
| #[serde(default, skip_serializing_if = "Option::is_none")] | ||
| verification: Option<String>, | ||
| #[serde(default, skip_serializing_if = "Option::is_none")] | ||
| reason: Option<String>, |
There was a problem hiding this comment.
Align WorkerToBroker::DeliveryVerified with the new verification metadata.
Line 401 introduces verification semantics at the protocol layer, but WorkerToBroker::DeliveryVerified (Line 542) still omits verification/reason. This creates a Rust↔TS contract drift and can drop fields on typed round-trips.
Suggested fix
@@
DeliveryVerified {
delivery_id: DeliveryId,
event_id: EventId,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ verification: Option<String>,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ reason: Option<String>,
},🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/broker/src/protocol.rs` around lines 401 - 406, The DeliveryVerified
enum variant in WorkerToBroker must be extended to include the new verification
and reason metadata so the Rust protocol matches the struct fields; update the
WorkerToBroker::DeliveryVerified variant signature to add verification:
Option<String> and reason: Option<String> (with the same serde(default,
skip_serializing_if = "Option::is_none") semantics as the surrounding struct),
then update every construction, pattern-match, and serialization/deserialization
site that creates or deconstructs WorkerToBroker::DeliveryVerified (e.g.,
builders, matches, and tests) to accept and forward these two Option<String>
fields so they are preserved across Rust↔TS round-trips.
| fn flush_pending_deliveries(&mut self) { | ||
| if !self.pending_deliveries.take_dirty() || !self.paths.persist { | ||
| return; | ||
| } | ||
| if let Err(error) = save_pending_deliveries(&self.paths.pending, &self.pending_deliveries) { | ||
| tracing::warn!( | ||
| path = %self.paths.pending.display(), | ||
| error = %error, | ||
| "failed to persist pending deliveries" | ||
| ); | ||
| } |
There was a problem hiding this comment.
Preserve dirty state when snapshot persistence fails.
Line 128 clears the dirty flag before the write attempt, and Line 131 logs failures without restoring it. If that write fails, pending-delivery mutations can remain unsaved until another mutation occurs.
💡 Suggested fix
diff --git a/crates/broker/src/runtime/delivery.rs b/crates/broker/src/runtime/delivery.rs
@@
impl PendingDeliveryStore {
@@
pub(crate) fn take_dirty(&mut self) -> bool {
std::mem::take(&mut self.dirty)
}
+
+ pub(crate) fn mark_dirty(&mut self) {
+ self.dirty = true;
+ }
}
diff --git a/crates/broker/src/runtime/event_loop.rs b/crates/broker/src/runtime/event_loop.rs
@@
fn flush_pending_deliveries(&mut self) {
- if !self.pending_deliveries.take_dirty() || !self.paths.persist {
+ if !self.paths.persist || !self.pending_deliveries.take_dirty() {
return;
}
if let Err(error) = save_pending_deliveries(&self.paths.pending, &self.pending_deliveries) {
tracing::warn!(
path = %self.paths.pending.display(),
error = %error,
"failed to persist pending deliveries"
);
+ self.pending_deliveries.mark_dirty();
}
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/broker/src/runtime/event_loop.rs` around lines 127 - 137, The current
flush_pending_deliveries clears the dirty flag by calling
self.pending_deliveries.take_dirty() before attempting save_pending_deliveries,
so if the write fails the dirty state is lost; change the flow in
flush_pending_deliveries to first check the dirty state without clearing (use an
is_dirty-like check instead of take_dirty), proceed with
save_pending_deliveries(&self.paths.pending, &self.pending_deliveries), and only
clear/take the dirty flag (call take_dirty() or an explicit clear) after a
successful save; if save_pending_deliveries returns Err, leave the dirty flag
set so pending mutations will be retried later (refer to
flush_pending_deliveries, pending_deliveries.take_dirty(),
save_pending_deliveries, and self.paths.persist).
…urability # Conflicts: # .agentworkforce/trajectories/active/traj_b1jrutolckfb/trajectory.json # CHANGELOG.md # crates/broker/src/runtime/tests.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
crates/broker/src/runtime/relaycast_events.rs (1)
275-277: ⚡ Quick winConsider extracting token seeding logic to reduce duplication.
The token seeding block appears identically in both the primary spawn path (lines 275-277) and the fallback spawn path (lines 505-507). While the duplication is localized and the logic is simple, extracting it into a helper would improve maintainability if the seeding behavior evolves.
♻️ Suggested refactor
+fn maybe_seed_or_register_agent( + workspace_http: &WorkspaceHttpClient, + name: &str, + cli: &str, + ws_value: &Value, +) -> impl Future<Output = Option<String>> + '_ { + async move { + if let Some(token) = relaycast_ws_spawn_token(ws_value) { + seed_supplied_agent_token(workspace_http, name, &token); + Some(token) + } else { + const REG_TIMEOUT: Duration = Duration::from_secs(3); + match tokio::time::timeout( + REG_TIMEOUT, + workspace_http.register_agent_token(name, Some(cli)), + ) + .await + { + Ok(Ok(token)) => { + tracing::info!(worker = %name, "pre-registered agent via broker for WS spawn"); + Some(token) + } + Ok(Err(error)) => { + tracing::warn!(worker = %name, error = %error, "WS spawn pre-registration failed"); + None + } + Err(_) => { + tracing::warn!(worker = %name, "WS spawn pre-registration timed out (3s)"); + None + } + } + } + } +}Then replace both blocks with:
let worker_relay_key = maybe_seed_or_register_agent(&workspace_http, &name, &cli, &ws_value).await;Also applies to: 505-507
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/broker/src/runtime/relaycast_events.rs` around lines 275 - 277, Extract the duplicated token seeding logic into a single async helper (e.g., maybe_seed_or_register_agent) that accepts the same context used in both places (references to workspace_http, name, cli, ws_value) and returns the token or worker key; move the existing relaycast_ws_spawn_token call and seed_supplied_agent_token invocation into that helper and await it where the original blocks were, then replace both occurrences (the primary spawn path and the fallback spawn path) with a call like let worker_relay_key = maybe_seed_or_register_agent(&workspace_http, &name, &cli, &ws_value).await to remove duplication and centralize seeding behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@crates/broker/src/runtime/relaycast_events.rs`:
- Around line 275-277: Extract the duplicated token seeding logic into a single
async helper (e.g., maybe_seed_or_register_agent) that accepts the same context
used in both places (references to workspace_http, name, cli, ws_value) and
returns the token or worker key; move the existing relaycast_ws_spawn_token call
and seed_supplied_agent_token invocation into that helper and await it where the
original blocks were, then replace both occurrences (the primary spawn path and
the fallback spawn path) with a call like let worker_relay_key =
maybe_seed_or_register_agent(&workspace_http, &name, &cli, &ws_value).await to
remove duplication and centralize seeding behavior.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: d7ce28c0-2969-463a-a7a3-0908ddd5605f
⛔ Files ignored due to path filters (1)
package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (10)
.agentworkforce/trajectories/active/traj_b1jrutolckfb/trajectory.jsonCHANGELOG.mdcrates/broker/src/protocol.rscrates/broker/src/pty_worker.rscrates/broker/src/runtime/api.rscrates/broker/src/runtime/delivery.rscrates/broker/src/runtime/relaycast_events.rscrates/broker/src/runtime/tests.rscrates/broker/src/runtime/worker_events.rspackages/harness-driver/src/protocol.ts
🚧 Files skipped from review as they are similar to previous changes (8)
- crates/broker/src/runtime/worker_events.rs
- packages/harness-driver/src/protocol.ts
- CHANGELOG.md
- crates/broker/src/runtime/api.rs
- crates/broker/src/protocol.rs
- crates/broker/src/pty_worker.rs
- crates/broker/src/runtime/delivery.rs
- crates/broker/src/runtime/tests.rs
Summary
Fixes four delivery-durability defects in the Rust broker (
crates/broker), keeping the diff scoped to delivery semantics — no changes to PTY injection, parsing, or routing.1. Timeout-fallback acks no longer counted as verified successes
pty_workerrecords a newDeliveryOutcome::Unverifiedin the injection throttle on echo-verification timeout: it breaks the consecutive-success streak (so unverified deliveries can never drive the delay down) without backing off like a failure.delivery_verifiedhandler now reads the worker frame'sverification/reasonfields, logs timeout fallbacks at info, and forwardsverification: "timeout_fallback"(plus reason) on the emitted event. The echo-verified path explicitly sendsverification: "echo".BrokerEvent::DeliveryVerifiedgains optionalverification/reasonfields, mirrored in@agent-relay/harness-driverprotocol types.2. Graceful shutdown no longer drops pending deliveries
shutdown_runtimepreviously didpending_deliveries.clear()and deleted the pending file. It now callspersist_pending_on_shutdown: a non-empty map is written back via the existing atomic temp-file-rename writer (warn-level log with count) so the next start redelivers; the file is only removed when the map is actually empty. Without--persistit warns that the deliveries will be lost.3. Pending deliveries persisted on every mutation, not per tick
PendingDeliveryStorewraps the pending map withDeref/DerefMutdirty tracking — any mutable access (insert, ack/remove, retry bookkeeping) marks it dirty, and the event loop flushes the snapshot right after the mutating event. The 500ms tick-time snapshot inmaintenance.rsis removed as redundant.--persistdefault kept off deliberately: the flag also gates state/lock/PID files, MCP config injection mode, and the ephemeral owner-lease shutdown path (lease_durationis only armed when--persistis absent). Flipping the default would change ephemeral one-shot SDK sessions well beyond delivery durability, so persistence remains opt-in for the serve path.4. Queue-cap evictions emit a real event
queue_inbound_for_delivery_modenow returnsInboundQueueResultcarrying the evicted sender when the per-worker cap (MAX_PENDING_PER_WORKER = 256) forces out the oldest message. Both call sites (HTTP API send and relaycast inbound) emitBrokerEvent::DeliveryDropped { name, count: 1, reason }, matching the existingdelivery_droppedTS event shape thatassertNoDroppedDeliverieschecks.Deferred
Verification
cargo fmt --check -p agent-relay-broker— cleancargo clippy -p agent-relay-broker --all-targets— no new warnings (one pre-existingargs.get(0)warning insnippets.rs, untouched)cargo test -p agent-relay-broker— 709 passed, 0 failed (scoped to the broker crate; workspace-wide build not run)Unverified, eviction surfacing at the cap, shutdown persistence round-trip / empty-map file removal / no-persist no-write,PendingDeliveryStoredirty tracking,delivery_verifiedprotocol round-trips with and withoutverificationpackages/harness-driver/src/protocol.tschange is additive optional fields;tscreports no errors inprotocol.ts(other pre-existing errors come from unbuilt@agent-relay/sdkworkspace declarations)🤖 Generated with Claude Code