Harden watchdog lag reconciliation and retrigger outbox delivery#265
Harden watchdog lag reconciliation and retrigger outbox delivery#265vsumner wants to merge 10 commits into
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds per-worker watchdog timeouts (defaults and per-agent overrides) with suspension while WaitingForInput; introduces a durable SQLite retrigger outbox with leasing/retry/replay for completion relays; and wires config, API, migration, runtime, tool timeout caps, status, tests, and docs to support these features. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/agent/channel.rs (1)
2364-2380: Prefer a single state-reset path for pending retrigger cleanup.These branches manually clear subsets of pending retrigger fields instead of reusing
clear_pending_retrigger_state(), which increases drift risk as state evolves.♻️ Suggested cleanup
- self.pending_retrigger = false; - self.pending_retrigger_metadata.clear(); + self.clear_pending_retrigger_state(); + self.retrigger_deadline = None; @@ - self.pending_retrigger = false; - self.pending_retrigger_metadata.clear(); - self.pending_retrigger_outbox_id = None; + self.clear_pending_retrigger_state(); return;Also applies to: 2646-2649
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 2364 - 2380, The branches that handle pending retrigger cleanup duplicate logic by individually clearing pending_retrigger_outbox_id, pending_retrigger, and pending_retrigger_metadata; replace those manual clears with a single call to clear_pending_retrigger_state() so all state resets go through the unified path, and ensure the mark_retrigger_outbox_delivered(&outbox_id).await error-handling remains intact (call clear_pending_retrigger_state() after the delivery attempt and warning log). Update the other occurrence mentioned (around the pending_retrigger cleanup at the later block) to use clear_pending_retrigger_state() as well to avoid drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/channel.rs`:
- Around line 848-861: The watchdog currently sets started_at to Instant::now()
inside register_worker_watchdog, which causes late WorkerStarted events to
mis-account hard timeouts; change register_worker_watchdog to use the worker's
recorded spawn instant (e.g., use the spawn_instant/timestamp available on the
Worker/WorkerMeta passed during WorkerStarted handling) when constructing
WorkerWatchdogEntry.started_at instead of Instant::now(); update any other
similar registration sites (the other occurrences you noted: the handlers around
lines 2246–2262, 3259–3265, 3376–3382) to use the recorded spawn instant too,
and add a regression test that simulates a delayed WorkerStarted event with no
Lagged error to assert that watchdog deadline accounting uses the original spawn
time.
In `@src/config.rs`:
- Around line 4363-4374: The worker timeout merge must reject zero values: in
the closure that builds WorkerConfig (the .map(|w| WorkerConfig { ... }) block)
validate both hard_timeout_secs and idle_timeout_secs after unwrapping (or if
provided) and return a parse/merge error (or propagate a ConfigError) if either
is 0; do the same adjustment for the duplicate block that handles defaults (the
similar WorkerConfig construction around lines 4568-4575). Ensure you reference
the WorkerConfig fields (hard_timeout_secs and idle_timeout_secs) and perform
the > 0 check at parse/merge time rather than accepting 0.
In `@src/conversation/history.rs`:
- Around line 510-545: The current read-then-write on attempt_count is racy;
make the increment and next_attempt_at update atomic by using a DB transaction:
begin a transaction on self.pool, run an UPDATE that increments attempt_count
(e.g. "SET attempt_count = attempt_count + 1, last_error = ? WHERE id = ? AND
delivered_at IS NULL") with RETURNING attempt_count (or UPDATE then SELECT the
new attempt_count) to get the new value, compute retry_delay_secs with
retrigger_outbox_retry_delay_secs(next_attempt_count) in Rust, then UPDATE the
same row's next_attempt_at inside the same transaction before committing;
reference outbox_id, retrigger_outbox_retry_delay_secs, next_attempt_at,
last_error and ensure all DB calls use the transaction handle so concurrent
callers cannot lose increments.
---
Nitpick comments:
In `@src/agent/channel.rs`:
- Around line 2364-2380: The branches that handle pending retrigger cleanup
duplicate logic by individually clearing pending_retrigger_outbox_id,
pending_retrigger, and pending_retrigger_metadata; replace those manual clears
with a single call to clear_pending_retrigger_state() so all state resets go
through the unified path, and ensure the
mark_retrigger_outbox_delivered(&outbox_id).await error-handling remains intact
(call clear_pending_retrigger_state() after the delivery attempt and warning
log). Update the other occurrence mentioned (around the pending_retrigger
cleanup at the later block) to use clear_pending_retrigger_state() as well to
avoid drift.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
docs/content/docs/(configuration)/config.mdxdocs/content/docs/(features)/opencode.mdxdocs/content/docs/(features)/workers.mdxmigrations/20260228000001_channel_retrigger_outbox.sqlsrc/agent/channel.rssrc/agent/status.rssrc/api/agents.rssrc/api/config.rssrc/config.rssrc/conversation/history.rstests/context_dump.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
docs/content/docs/(features)/workers.mdx (1)
73-82: Consider adding a cross-reference or brief definition for the retrigger outbox.The terms "channel retrigger outbox" and "channel retrigger queue" (line 82) are introduced without context. Readers unfamiliar with the implementation may not understand what these are or why they matter for durability.
A brief inline clarification or a link to a more detailed explanation (e.g., architecture docs or the config page) would improve accessibility.
📝 Suggested clarification
-Worker completion relays are durable: branch/worker completion notifications are written to a channel retrigger outbox in SQLite before enqueue. If the channel retrigger queue is unavailable, relays are replayed from the outbox when the channel loop is healthy again. +Worker completion relays are durable: branch/worker completion notifications are written to a persistent outbox table in SQLite before being enqueued to the channel. If the channel's event queue is temporarily unavailable, pending relays are replayed from the outbox when the channel loop recovers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/content/docs/`(features)/workers.mdx around lines 73 - 82, Add a short inline definition or cross-reference where "channel retrigger outbox" and "channel retrigger queue" are first used in the "Watchdog Timeouts" section: briefly state that the outbox is a durable SQLite table used to persist worker/branch completion events for later replay, and link to the architecture or channel loop documentation (or the config page `[defaults.worker]` / `[agents.worker]`) for full details; update the sentence mentioning "channel retrigger outbox" and "channel retrigger queue" to include this parenthetical definition or a short "see" link so readers unfamiliar with the implementation understand their purpose and durability role.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@docs/content/docs/`(features)/workers.mdx:
- Around line 73-82: Add a short inline definition or cross-reference where
"channel retrigger outbox" and "channel retrigger queue" are first used in the
"Watchdog Timeouts" section: briefly state that the outbox is a durable SQLite
table used to persist worker/branch completion events for later replay, and link
to the architecture or channel loop documentation (or the config page
`[defaults.worker]` / `[agents.worker]`) for full details; update the sentence
mentioning "channel retrigger outbox" and "channel retrigger queue" to include
this parenthetical definition or a short "see" link so readers unfamiliar with
the implementation understand their purpose and durability role.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
docs/content/docs/(core)/architecture.mdxdocs/content/docs/(features)/workers.mdx
✅ Files skipped from review due to trivial changes (1)
- docs/content/docs/(core)/architecture.mdx
|
Addressed the latest CodeRabbit + Tembo review comments in commit Changes made:
Validation:
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/agent/channel.rs (1)
2181-2185:⚠️ Potential issue | 🟠 MajorGuard stale
WorkerStartedevents after completion to avoid ghost workers.Line 2181 applies status updates before per-event guards, and Line 2268 clears the completion dedupe marker. If
WorkerCompleteis processed before a delayedWorkerStarted, the later start can re-register watchdog/status for a dead worker.💡 Suggested fix shape
ProcessEvent::WorkerStarted { worker_id, channel_id, task, worker_type, .. } => { + if self.completed_workers.contains(worker_id) { + tracing::debug!( + worker_id = %worker_id, + "ignoring stale WorkerStarted after completion" + ); + self.worker_watchdog.remove(worker_id); + self.state.status_block.write().await.remove_worker(*worker_id); + return Ok(()); + } - self.completed_workers.remove(worker_id); // existing start logging + watchdog registration... }Please also add a regression test for out-of-order delivery (
WorkerCompleteobserved beforeWorkerStarted) to lock this behavior.Based on learnings: For changes in async/stateful paths (worker lifecycle, cancellation, retrigger, recall cache behavior), include explicit race/terminal-state reasoning in the PR summary and run targeted tests.
Also applies to: 2254-2274
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 2181 - 2185, The status update currently runs unconditionally before per-event guards, which allows a delayed WorkerStarted to re-register a completed worker; change the order in the event handling (in the block that writes to self.state.status_block) so you first run the per-event guards that check terminal/deduped state (e.g., whether the worker id is present in the completion dedupe set cleared around the current completion logic) and only call status.update(&event) if those guards permit processing; add a regression test that delivers WorkerComplete before WorkerStarted (out-of-order) and asserts that the later WorkerStarted is ignored (no re-registration/watchdog) to lock the behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/agent/channel.rs`:
- Around line 2181-2185: The status update currently runs unconditionally before
per-event guards, which allows a delayed WorkerStarted to re-register a
completed worker; change the order in the event handling (in the block that
writes to self.state.status_block) so you first run the per-event guards that
check terminal/deduped state (e.g., whether the worker id is present in the
completion dedupe set cleared around the current completion logic) and only call
status.update(&event) if those guards permit processing; add a regression test
that delivers WorkerComplete before WorkerStarted (out-of-order) and asserts
that the later WorkerStarted is ignored (no re-registration/watchdog) to lock
the behavior.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
migrations/20260228000001_channel_retrigger_outbox.sqlsrc/agent/channel.rssrc/config.rssrc/conversation/history.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/agent/channel.rs (1)
2624-2635: Consider usingwarnlevel for persistent prune failures.The
debuglevel may hide repeated prune failures that indicate underlying database issues. While a single failure is benign, persistent failures could cause unbounded growth of delivered rows.This is a minor observability suggestion - the current behavior is functionally correct.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 2624 - 2635, Change the log level from debug to warn when prune_delivered_retrigger_outbox fails so persistent prune failures are visible; update the block that awaits self.state.process_run_logger.prune_delivered_retrigger_outbox(RETRIGGER_OUTBOX_PRUNE_LIMIT).await to call tracing::warn! instead of tracing::debug!, retaining channel_id = %self.id, the %error field, and the same message "failed to prune delivered retrigger outbox rows" so error details and context remain in the log.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/agent/channel.rs`:
- Around line 2624-2635: Change the log level from debug to warn when
prune_delivered_retrigger_outbox fails so persistent prune failures are visible;
update the block that awaits
self.state.process_run_logger.prune_delivered_retrigger_outbox(RETRIGGER_OUTBOX_PRUNE_LIMIT).await
to call tracing::warn! instead of tracing::debug!, retaining channel_id =
%self.id, the %error field, and the same message "failed to prune delivered
retrigger outbox rows" so error details and context remain in the log.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/agent/channel.rs (1)
920-925: Bounded dedup set with full-clear eviction is acceptable but has a brief duplicate window.When the set reaches capacity, clearing entirely creates a brief window where a recently-completed worker ID could be processed again if its completion event arrives twice in close succession. The impact is minimal (debug log and early return), and 1024 entries is generous for typical workloads.
If this becomes an issue in production, consider LRU eviction or a two-generation approach (swap sets instead of clearing). For now, the simplicity tradeoff is reasonable.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 920 - 925, The current remember_worker_completion in Channel (function remember_worker_completion) clears the entire completed_workers set when it hits MAX_COMPLETED_WORKER_IDS, creating a short duplicate window; change it to a two-generation approach: add a second HashSet field (e.g., prev_completed_workers), on insertion if completed_workers.len() >= MAX_COMPLETED_WORKER_IDS swap completed_workers with prev_completed_workers and clear the new completed_workers, and update membership checks to consult both sets (or insert into the active set only) so recently-seen IDs survive one rotation instead of being immediately dropped; keep MAX_COMPLETED_WORKER_IDS as the per-generation capacity and update any call sites that rely on remember_worker_completion to tolerate the two-set membership semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/agent/channel.rs`:
- Around line 920-925: The current remember_worker_completion in Channel
(function remember_worker_completion) clears the entire completed_workers set
when it hits MAX_COMPLETED_WORKER_IDS, creating a short duplicate window; change
it to a two-generation approach: add a second HashSet field (e.g.,
prev_completed_workers), on insertion if completed_workers.len() >=
MAX_COMPLETED_WORKER_IDS swap completed_workers with prev_completed_workers and
clear the new completed_workers, and update membership checks to consult both
sets (or insert into the active set only) so recently-seen IDs survive one
rotation instead of being immediately dropped; keep MAX_COMPLETED_WORKER_IDS as
the per-generation capacity and update any call sites that rely on
remember_worker_completion to tolerate the two-set membership semantics.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/config.rs (2)
4371-4380: Use non-abbreviated variable names in worker merge closures.Please rename
wtoworkerin both closures to match repo naming rules and keep these blocks self-describing.As per coding guidelines
**/*.rs: "Don't abbreviate variable names. Usequeuenotq,messagenotmsg,channelnotch."Also applies to: 4506-4515
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config.rs` around lines 4371 - 4380, Rename the abbreviated closure parameter `w` to `worker` in the worker merge closures so names are self-describing; update all usages inside the closures (e.g., `w.hard_timeout_secs`, `w.idle_timeout_secs`, `w.max_tool_timeout_secs`) to `worker.hard_timeout_secs`, `worker.idle_timeout_secs`, `worker.max_tool_timeout_secs`, and do the same in the other occurrence around the `WorkerConfig` merge (the block at 4506-4515) to comply with the repository naming guideline and keep the closures readable.
4368-4400: Extract worker timeout merge+validation into one helper to prevent drift.The same merge/validate logic is duplicated for defaults and agent overrides. A shared helper will reduce future divergence risk.
♻️ Proposed refactor
+fn resolve_worker_config( + worker: TomlWorkerConfig, + fallback: WorkerConfig, + scope: &str, +) -> Result<WorkerConfig> { + let hard_timeout_secs = worker.hard_timeout_secs.unwrap_or(fallback.hard_timeout_secs); + let idle_timeout_secs = worker.idle_timeout_secs.unwrap_or(fallback.idle_timeout_secs); + let max_tool_timeout_secs = worker + .max_tool_timeout_secs + .unwrap_or(fallback.max_tool_timeout_secs); + + if idle_timeout_secs == 0 { + return Err(ConfigError::Invalid(format!( + "{scope} idle_timeout_secs must be greater than 0" + )) + .into()); + } + if max_tool_timeout_secs == 0 { + return Err(ConfigError::Invalid(format!( + "{scope} max_tool_timeout_secs must be greater than 0" + )) + .into()); + } + Ok(WorkerConfig { + hard_timeout_secs, + idle_timeout_secs, + max_tool_timeout_secs, + }) +}Also applies to: 4503-4537
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config.rs` around lines 4368 - 4400, The worker timeout merge and validation logic duplicated around defaults.worker and agent overrides should be extracted into a single helper (e.g., a function like merge_and_validate_worker_config) that accepts the optional override/config partial and a base WorkerConfig, performs the unwrap_or merging for hard_timeout_secs, idle_timeout_secs, max_tool_timeout_secs, runs the two zero-value validations (using ConfigError::Invalid), and returns Result<WorkerConfig>; replace the inline closures that call .map(...).transpose()? and .unwrap_or(base_defaults.worker) (and the duplicate at the agent override site) with calls to this helper so both sites reuse the same merge+validate behavior referencing WorkerConfig, base_defaults.worker, defaults.worker, and ConfigError::Invalid.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/api/config.rs`:
- Around line 589-592: The worker.hard_timeout_secs branch currently accepts
zero; update the handling of worker.hard_timeout_secs to reject 0 the same way
the other timeout fields do by returning StatusCode::BAD_REQUEST when v == 0,
and only convert and set the toml value for positive values (preserving the
i64::try_from(...) conversion and error mapping). Locate the block referencing
worker.hard_timeout_secs and add the zero-check and early error path to mirror
the logic used for the other timeout fields.
---
Nitpick comments:
In `@src/config.rs`:
- Around line 4371-4380: Rename the abbreviated closure parameter `w` to
`worker` in the worker merge closures so names are self-describing; update all
usages inside the closures (e.g., `w.hard_timeout_secs`, `w.idle_timeout_secs`,
`w.max_tool_timeout_secs`) to `worker.hard_timeout_secs`,
`worker.idle_timeout_secs`, `worker.max_tool_timeout_secs`, and do the same in
the other occurrence around the `WorkerConfig` merge (the block at 4506-4515) to
comply with the repository naming guideline and keep the closures readable.
- Around line 4368-4400: The worker timeout merge and validation logic
duplicated around defaults.worker and agent overrides should be extracted into a
single helper (e.g., a function like merge_and_validate_worker_config) that
accepts the optional override/config partial and a base WorkerConfig, performs
the unwrap_or merging for hard_timeout_secs, idle_timeout_secs,
max_tool_timeout_secs, runs the two zero-value validations (using
ConfigError::Invalid), and returns Result<WorkerConfig>; replace the inline
closures that call .map(...).transpose()? and .unwrap_or(base_defaults.worker)
(and the duplicate at the agent override site) with calls to this helper so both
sites reuse the same merge+validate behavior referencing WorkerConfig,
base_defaults.worker, defaults.worker, and ConfigError::Invalid.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
docs/content/docs/(configuration)/config.mdxdocs/content/docs/(features)/workers.mdxsrc/agent/channel.rssrc/agent/worker.rssrc/api/agents.rssrc/api/config.rssrc/config.rssrc/main.rssrc/tools.rssrc/tools/exec.rssrc/tools/shell.rstests/context_dump.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- docs/content/docs/(configuration)/config.mdx
- docs/content/docs/(features)/workers.mdx
Summary
Hardens worker lifecycle handling in channels and makes retrigger completion delivery durable.
This PR closes race windows where delayed lifecycle events could corrupt runtime status/watchdog state, and closes reliability gaps where retrigger completion events could be lost or duplicated around enqueue/restart timing.
Scope
src/agent/channel.rs,src/agent/status.rs)src/conversation/history.rs, migration)src/config.rs,src/api/config.rs,src/api/agents.rs)docs/content/docs/...)Reviewer Guide (Fast Path)
src/agent/channel.rsaround lifecycle event handling:WorkerStartedafter completion is ignored.started_at, not delayed event arrival time.src/conversation/history.rsoutbox methods:next_attempt_at <= now) before synthetic send.attempt_count.migrations/20260228000001_channel_retrigger_outbox.sql:channel_retrigger_outboxwith indexes for due-scan/pruning.Review Finding Closure
src/agent/channel.rs): fixed.WorkerStartedfrom re-registering completed workers.out_of_order_worker_started_after_complete_is_ignored.Concurrency / State Invariants
started_at) and cannot be reset by delayed event delivery.Migration
migrations/20260228000001_channel_retrigger_outbox.sqlValidation
just preflightjust gate-prcargo test -q watchdog_reconcile -- src/agent/channel.rscargo test -q out_of_order_worker_started_after_complete_is_ignored -- src/agent/channel.rscargo test -q retrigger_outbox -- src/conversation/history.rscargo test -q worker_watchdog -- src/config.rsDocs Updated
docs/content/docs/(configuration)/config.mdxdocs/content/docs/(core)/architecture.mdxdocs/content/docs/(features)/workers.mdxdocs/content/docs/(features)/opencode.mdx