feat: worker context injection and duplicate worker spawn prevention#358
Conversation
Add the ability for channels to deliver addendum context to running workers mid-task via a terminate-and-resume pattern. When a user sends a follow-up message while a worker is already running, the channel can route it as injected context rather than spawning a duplicate worker. The route tool now tries interactive input first (WaitingForInput workers), then falls back to context injection (Running workers). The SpacebotHook drains pending injections in on_completion_call and terminates the agent loop so prompt_with_tool_nudge_retry can append the messages to history and re-prompt — no Rig framework changes needed. Also adds a duplicate worker task guard (from closed PR #281) that checks the status block for exact task description matches before spawning.
WalkthroughAdds per-worker context-injection channels and plumbing to inject non-interactive messages into running workers at LLM turn boundaries; introduces duplicate-task reservation and detection to prevent double-spawn; updates routing to prefer interactive follow-up or injection; and buffers/drains injected messages in SpacebotHook. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/channel_dispatch.rs`:
- Around line 332-344: The duplicate-task check in check_duplicate_task using
state.status_block.read() is racy; instead add an atomic reservation mechanism
(e.g. a reserved_tasks HashSet/Map guarded by the same async write lock or a new
RwLock/Mutex on ChannelState) and acquire the write lock to reserve the task
description before any await/spawn happens (use a method like reserve_task(&mut
self, task: &str) that returns an error if already reserved), then clear that
reservation on terminal cleanup paths (e.g. when handling WorkerComplete and
where worker_handles are registered/removed) so stale entries don’t cause false
positives; update all usages (including the analogous checks around the other
occurrences noted) to use the new reserve/release API rather than a read-only
find_duplicate_worker_task call.
In `@src/agent/channel.rs`:
- Around line 93-96: The field worker_injections (Arc<RwLock<HashMap<WorkerId,
tokio::sync::mpsc::Sender<String>>>) reintroduces channel-level context into
workers and must be removed: delete the worker_injections field and any code
that writes to or reads from it (search for worker_injections, WorkerId-based
injection sends, and the route tool paths that call into it) and instead
implement branching/respawn logic that creates a new task with the additional
context; ensure Worker structs and constructors (and any functions referenced by
the route tool) are updated to no longer expect or use worker_injections and
that new context is passed via task creation APIs rather than via in-flight
injection channels.
In `@src/agent/worker.rs`:
- Around line 266-272: The follow-up loop currently calls prompt_once and treats
any PromptCancelled as a generic error, so context-injection Terminate with
CONTEXT_INJECTION_REASON causes the worker to fail; update the follow-up error
handling (before the generic Err(error) arm) to detect PromptCancelled where
SpacebotHook::is_context_injection_reason(&reason) is true, call
follow_up_hook.take_injected_messages() to drain injected messages, append each
to history as Message::User, modify follow_up_prompt (e.g. append a short
"Context updated. Continue processing." hint) and let the loop continue to
re-prompt (mirroring how prompt_with_tool_nudge_retry handles
CONTEXT_INJECTION_REASON) so injected context is applied and retried instead of
propagating the error.
In `@src/tools/route.rs`:
- Around line 107-131: The current code path uses
self.state.worker_injections.read() and inject_tx.send(...) to append arbitrary
channel messages into a running worker's history (mentioned in inject_tx and
RouteOutput), which crosses the worker/branch boundary; instead of sending into
the running worker's injection channel, change the behavior to create a new
branch or spawn a new worker with the provided channel message as a fresh
prompt/task rewrite—i.e., remove the inject_tx.send(...) branch and call a
function like spawn_worker_with_context or create_branch_for_worker (implement
if missing on state) that takes worker_id and args.message, produces a new
worker/branch id and initial prompt, and return a RouteOutput indicating a new
branch/worker was created rather than mutating existing worker history.
- Around line 82-104: The routing code currently treats presence of an input
sender (self.state.worker_inputs) as indication the worker is idle, which
misroutes injections for interactive workers; modify the route logic in
src/tools/route.rs so you first check the worker's explicit state (e.g., compare
to WaitingForInput vs Running from the worker state machine) before using
inputs.get(&worker_id).cloned(); only send on input_tx when the worker state ==
WaitingForInput, otherwise leave inputs alone and let the injection path
(worker_injections) handle the update; if you don't have a worker-state map, add
or use the existing worker state lookup used by the worker loop and gate the
input branch with that explicit check to avoid treating registered senders as
the authoritative state.
In `@src/tools/spawn_worker.rs`:
- Around line 181-190: The duplicate-worker branch currently turns a detected
duplicate (found via status_block.read().await and
status.find_duplicate_worker_task(&args.task)) into a free-form
SpawnWorkerError(String); change it to return a structured tool result instead
of a prose error so callers/LLMs can recover deterministically. Replace the
Err(SpawnWorkerError(...)) path with a typed result variant or structured error
value (e.g., DuplicateWorker { existing_worker_id: existing_id,
recommended_action: "route" }) from the spawn worker API/enum you use, so the
duplicate condition is returned as data (referencing status_block,
find_duplicate_worker_task, existing_id, args.task and the intended "route"
action) rather than a string message.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0e4f1450-c7b0-45bd-918e-914012f5eb02
📒 Files selected for processing (11)
prompts/en/tools/route_description.md.j2src/agent/channel.rssrc/agent/channel_dispatch.rssrc/agent/cortex.rssrc/agent/status.rssrc/agent/worker.rssrc/error.rssrc/hooks/spacebot.rssrc/tools/route.rssrc/tools/spawn_worker.rstests/context_dump.rs
There was a problem hiding this comment.
♻️ Duplicate comments (2)
src/tools/spawn_worker.rs (1)
174-183:⚠️ Potential issue | 🟠 MajorReturn duplicate-worker conflicts as structured output.
This turns a deterministic, recoverable state into a prose
SpawnWorkerError, so the model has to parse text instead of reading fields like the existing worker id and routing accordingly. Please surface this as structuredSpawnWorkerOutputdata (for example, duplicate status +existing_worker_id) and returnOk(...)here.As per coding guidelines "Tool errors are returned as structured results, not panics. The LLM sees the error and can recover (error-as-result for tools pattern)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/spawn_worker.rs` around lines 174 - 183, The code currently returns an Err(SpawnWorkerError(...)) when find_duplicate_worker_task(&args.task) finds an existing worker; change this to return a structured SpawnWorkerOutput indicating a duplicate rather than an error. Inside the duplicate branch (where you call status.find_duplicate_worker_task(&args.task)), construct and return Ok(SpawnWorkerOutput::Duplicate { existing_worker_id: existing_id.clone(), /* optional: reason or hint field if present */ }) or the equivalent variant/fields your SpawnWorkerOutput defines, instead of returning Err(...); keep the read lock usage (self.state.status_block.read().await) and preserve the existing_id value so the caller/LLM can read the structured duplicate status. Ensure any codepaths that expected Err(SpawnWorkerError) are updated to handle the Ok(Duplicate) result accordingly.src/agent/worker.rs (1)
286-293:⚠️ Potential issue | 🟠 MajorShared hook injection now breaks interactive follow-up retries.
Because
self.hookis later cloned for the follow-up path, wiringinject_rxinto the shared hook here also enablesspacebot_context_injectioncancellation duringprompt_once(). The follow-up loop still treats that cancellation as a hard failure, so injecting context while an interactive worker is processing a follow-up now fails the worker instead of appending the messages and retrying. Either keep injection out of the follow-up hook, or mirror the main loop’s context-injection retry handling there.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/worker.rs` around lines 286 - 293, The shared wiring of inject_rx into self.hook via with_inject_rx causes injected-context cancellations to propagate into the follow-up path and break prompt_once retries; fix by not mutating the shared self.hook used for follow-ups. Concretely, stop replacing self.hook with the with_inject_rx version (the std::mem::replace that sets inject_rx into self.hook) and instead create a separate hook instance (e.g., clone self.hook and call with_inject_rx on that local/loop-specific clone) for the main loop’s on_completion_call handling; alternatively, if you prefer to keep injection on the shared hook, replicate the main loop’s context-injection retry handling in the follow-up loop (the code that calls prompt_once) so spacebot_context_injection cancellation is treated as a retry rather than a hard failure.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/agent/worker.rs`:
- Around line 286-293: The shared wiring of inject_rx into self.hook via
with_inject_rx causes injected-context cancellations to propagate into the
follow-up path and break prompt_once retries; fix by not mutating the shared
self.hook used for follow-ups. Concretely, stop replacing self.hook with the
with_inject_rx version (the std::mem::replace that sets inject_rx into
self.hook) and instead create a separate hook instance (e.g., clone self.hook
and call with_inject_rx on that local/loop-specific clone) for the main loop’s
on_completion_call handling; alternatively, if you prefer to keep injection on
the shared hook, replicate the main loop’s context-injection retry handling in
the follow-up loop (the code that calls prompt_once) so
spacebot_context_injection cancellation is treated as a retry rather than a hard
failure.
In `@src/tools/spawn_worker.rs`:
- Around line 174-183: The code currently returns an Err(SpawnWorkerError(...))
when find_duplicate_worker_task(&args.task) finds an existing worker; change
this to return a structured SpawnWorkerOutput indicating a duplicate rather than
an error. Inside the duplicate branch (where you call
status.find_duplicate_worker_task(&args.task)), construct and return
Ok(SpawnWorkerOutput::Duplicate { existing_worker_id: existing_id.clone(), /*
optional: reason or hint field if present */ }) or the equivalent variant/fields
your SpawnWorkerOutput defines, instead of returning Err(...); keep the read
lock usage (self.state.status_block.read().await) and preserve the existing_id
value so the caller/LLM can read the structured duplicate status. Ensure any
codepaths that expected Err(SpawnWorkerError) are updated to handle the
Ok(Duplicate) result accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e6e3ff9e-4345-48c0-b7ba-e2e1c6d99a6b
📒 Files selected for processing (2)
src/agent/worker.rssrc/tools/spawn_worker.rs
…ructured duplicate output, follow-up injection handling - Route tool checks StatusBlock worker status instead of worker_inputs map presence - inject_rx uses Option<Receiver> with .take() instead of dummy channel allocation - Atomic task reservation via reserved_tasks to prevent TOCTOU race in duplicate detection - Follow-up loop now handles CONTEXT_INJECTION_REASON, matching main task loop behavior - Duplicate worker spawn returns structured SpawnWorkerOutput instead of error string
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/agent/channel_dispatch.rs (1)
451-490:⚠️ Potential issue | 🟠 MajorWorker-limit enforcement still misses builtin workers.
check_worker_limit()countsstate.active_workers, but this path only stores handles/input/injection senders and then moves the worker into the task. Since nothing here registers the builtin worker inactive_workers, the concurrent-worker cap can still be exceeded. Countworker_handles/status_blockinstead, or store lightweight active-worker metadata separately fromWorker.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel_dispatch.rs` around lines 451 - 490, The builtin worker paths (Worker::new_interactive and Worker::new) create workers and store input/injection senders (state.worker_inputs, state.worker_injections) but never register lightweight active-worker metadata, so check_worker_limit() that inspects state.active_workers can be bypassed; fix by registering a minimal active-worker entry (e.g., increment or insert into the same structure check_worker_limit() reads) when creating any worker, or change check_worker_limit() to count the existing worker_handles/status_block or entries in state.worker_injections/worker_inputs instead; update the code paths that call Worker::new_interactive and Worker::new to also add/remove the same active-worker marker used by check_worker_limit() so builtin workers are included in the concurrent-worker cap.
♻️ Duplicate comments (1)
src/agent/worker.rs (1)
559-579:⚠️ Potential issue | 🟠 MajorThis turns workers into branches.
These lines append later channel messages into the worker’s history mid-task, so the worker no longer runs from just its fresh prompt + task. If extra conversation context is needed, this repo’s model is to fork a branch or respawn with a rewritten task instead of mutating worker history in place.
Based on learnings: "Don't give workers channel context. Workers get a fresh prompt and a task. If a worker needs conversation context, that's a branch, not a worker"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/worker.rs` around lines 559 - 579, The code is mutating the worker's running history by appending injected channel messages (using follow_up_hook.take_injected_messages() and pushing into history), which turns a worker into a branch; instead, do NOT modify history in-place — drain injected messages but hand them back to the supervisor/manager to spawn a new branch/respawn a worker with a rewritten task. Concretely: remove the loop that pushes into history and the in-place context injection; after detecting SpacebotHook::is_context_injection_reason(reason) and calling follow_up_hook.take_injected_messages(), call or signal the supervisor to create a branch/respawn (e.g., return a variant or set a flag that the caller understands) and set follow_up_prompt only to indicate that a branch should be created; do not append to history or continue the current worker loop.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/tools/route.rs`:
- Around line 128-163: The Some(false) branch currently only looks up
self.state.worker_injections and treats missing senders as "not found", which
causes running OpenCode workers (which register in self.state.worker_inputs, not
worker_injections) to be misreported; update this branch to check
self.state.worker_inputs when no injection sender exists and either (a)
create/register an injection sender for that worker_id so inject_tx.send(...)
can be used for OpenCode workers, or (b) return a structured unsupported-state
RouteOutput (instead of RouteError) indicating injections are not supported for
OpenCode workers; reference the symbols worker_injections, worker_inputs,
inject_tx, RouteOutput and RouteError when making the change so the behavior for
running OpenCode workers is handled explicitly rather than falling through to
the final "not found" error.
---
Outside diff comments:
In `@src/agent/channel_dispatch.rs`:
- Around line 451-490: The builtin worker paths (Worker::new_interactive and
Worker::new) create workers and store input/injection senders
(state.worker_inputs, state.worker_injections) but never register lightweight
active-worker metadata, so check_worker_limit() that inspects
state.active_workers can be bypassed; fix by registering a minimal active-worker
entry (e.g., increment or insert into the same structure check_worker_limit()
reads) when creating any worker, or change check_worker_limit() to count the
existing worker_handles/status_block or entries in
state.worker_injections/worker_inputs instead; update the code paths that call
Worker::new_interactive and Worker::new to also add/remove the same
active-worker marker used by check_worker_limit() so builtin workers are
included in the concurrent-worker cap.
---
Duplicate comments:
In `@src/agent/worker.rs`:
- Around line 559-579: The code is mutating the worker's running history by
appending injected channel messages (using
follow_up_hook.take_injected_messages() and pushing into history), which turns a
worker into a branch; instead, do NOT modify history in-place — drain injected
messages but hand them back to the supervisor/manager to spawn a new
branch/respawn a worker with a rewritten task. Concretely: remove the loop that
pushes into history and the in-place context injection; after detecting
SpacebotHook::is_context_injection_reason(reason) and calling
follow_up_hook.take_injected_messages(), call or signal the supervisor to create
a branch/respawn (e.g., return a variant or set a flag that the caller
understands) and set follow_up_prompt only to indicate that a branch should be
created; do not append to history or continue the current worker loop.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 39d07c0b-20b4-484d-b900-49cbd9ffa06a
📒 Files selected for processing (6)
src/agent/channel.rssrc/agent/channel_dispatch.rssrc/agent/worker.rssrc/tools/route.rssrc/tools/spawn_worker.rstests/context_dump.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/tools/spawn_worker.rs
…orkers without injection channel
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/tools/route.rs (1)
128-154:⚠️ Potential issue | 🟠 MajorDon't inject channel messages into running workers.
This sends arbitrary channel text into a worker mid-task, which breaks the repo's worker/branch boundary. If extra conversation context is needed here, it should become a branch or a newly spawned worker with a rewritten task rather than mutating the running worker's history.
Based on learnings: Don't give workers channel context. Workers get a fresh prompt and a task. If a worker needs conversation context, that's a branch, not a worker.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/route.rs` around lines 128 - 154, The code currently sends args.message into a running worker via self.state.worker_injections -> inject_tx.send(args.message) inside the Some(false) branch, which must be removed; instead, prevent injecting channel text into a running worker and route the request to create a branch or spawn a new worker with a rewritten task (do not mutate the running worker's history). Concretely: remove the inject_tx.send(...) call and the success RouteOutput that claims "Context injected into running worker", and replace that path to either return a RouteOutput indicating routed:false with a clear message instructing the caller to create a branch/new worker, or call your existing branch/worker-creation helper (use the project’s branch/spawn function) to create a new branch/worker from worker_id with args.message as the new task; ensure worker_injections and inject_tx are not used to deliver channel text to an active worker.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/channel_dispatch.rs`:
- Around line 325-328: check_worker_limit currently only reads
state.worker_handles.len() which allows a race where multiple spawns observe
capacity and oversubscribe; change it to perform an atomic reservation via a
write-side admission primitive (e.g., a Semaphore or a reserved-slot counter)
instead of relying on len() alone: acquire/reserve one slot inside
check_worker_limit (using the same primitive shared with spawn logic), return an
error if no slot available, and ensure the reserved slot is released on spawn
failure and during terminal cleanup; update all calls/behaviors around
check_worker_limit, reserve_worker_slot_local, state.worker_handles and any
spawn/cleanup paths to use the new acquire/release protocol so reservations are
atomic with insertion of the worker handle.
In `@src/tools/route.rs`:
- Around line 103-107: The branch that currently maps input_tx.send(...).await
errors to Err(RouteError(...)) should instead return a structured RouteOutput
indicating a non-routed lifecycle event (e.g., routed: false and a descriptive
message like "worker finished / channel closed / worker not found") so the model
can recover; change the map_err/Err(RouteError(...)) paths (around input_tx.send
in route.rs and the similar branches noted at the other ranges) to construct and
return RouteOutput instances with routed: false and appropriate info instead of
returning RouteError.
- Around line 96-127: The match arm for Some(true) currently returns or ends the
arm if the worker_inputs channel is missing, so the intended "fall through to
injection" never happens; restructure the logic so that when worker_is_idle is
Some(true) you attempt to look up input_tx in self.state.worker_inputs and only
return early if input_tx exists and the send succeeds, otherwise allow execution
to continue to the injection path (i.e., do not consume the case that should
fall through). Concretely, replace the match arm handling Some(true) with an if
let or explicit branch that tries inputs.get(&worker_id).cloned(), returns on
successful input_tx.send (as currently), but when no input_tx is found simply
drop inputs and proceed (so downstream injection code runs); keep symbols
worker_is_idle, self.state.worker_inputs, input_tx and the existing
RouteOutput/return behavior unchanged when the send succeeds.
---
Duplicate comments:
In `@src/tools/route.rs`:
- Around line 128-154: The code currently sends args.message into a running
worker via self.state.worker_injections -> inject_tx.send(args.message) inside
the Some(false) branch, which must be removed; instead, prevent injecting
channel text into a running worker and route the request to create a branch or
spawn a new worker with a rewritten task (do not mutate the running worker's
history). Concretely: remove the inject_tx.send(...) call and the success
RouteOutput that claims "Context injected into running worker", and replace that
path to either return a RouteOutput indicating routed:false with a clear message
instructing the caller to create a branch/new worker, or call your existing
branch/worker-creation helper (use the project’s branch/spawn function) to
create a new branch/worker from worker_id with args.message as the new task;
ensure worker_injections and inject_tx are not used to deliver channel text to
an active worker.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7e319141-9500-4a6b-98aa-04fed1593cab
📒 Files selected for processing (2)
src/agent/channel_dispatch.rssrc/tools/route.rs
| async fn check_worker_limit(state: &ChannelState) -> std::result::Result<(), AgentError> { | ||
| let max_workers = **state.deps.runtime_config.max_concurrent_workers.load(); | ||
| let active_worker_count = state.active_workers.read().await.len(); | ||
| let active_worker_count = state.worker_handles.read().await.len(); | ||
| reserve_worker_slot_local(active_worker_count, &state.channel_id, max_workers) |
There was a problem hiding this comment.
Make worker-capacity admission atomic.
This is still a read-only preflight. Two concurrent spawns can both observe worker_handles.len() < max_concurrent_workers and both proceed before either inserts its handle, so the channel can still oversubscribe the worker limit. Please reserve capacity under one write-side admission primitive here as well (for example a semaphore or reserved-slot counter) and release it on spawn failure / terminal cleanup.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/agent/channel_dispatch.rs` around lines 325 - 328, check_worker_limit
currently only reads state.worker_handles.len() which allows a race where
multiple spawns observe capacity and oversubscribe; change it to perform an
atomic reservation via a write-side admission primitive (e.g., a Semaphore or a
reserved-slot counter) instead of relying on len() alone: acquire/reserve one
slot inside check_worker_limit (using the same primitive shared with spawn
logic), return an error if no slot available, and ensure the reserved slot is
released on spawn failure and during terminal cleanup; update all
calls/behaviors around check_worker_limit, reserve_worker_slot_local,
state.worker_handles and any spawn/cleanup paths to use the new acquire/release
protocol so reservations are atomic with insertion of the worker handle.
| match worker_is_idle { | ||
| // Worker is idle (WaitingForInput) — deliver as interactive follow-up. | ||
| Some(true) => { | ||
| let inputs = self.state.worker_inputs.read().await; | ||
| if let Some(input_tx) = inputs.get(&worker_id).cloned() { | ||
| drop(inputs); | ||
|
|
||
| input_tx.send(args.message).await.map_err(|_| { | ||
| RouteError(format!( | ||
| "Worker {worker_id} has stopped accepting input (channel closed)" | ||
| )) | ||
| })?; | ||
|
|
||
| tracing::info!( | ||
| worker_id = %worker_id, | ||
| channel_id = %self.state.channel_id, | ||
| "message routed to interactive worker (input)" | ||
| ); | ||
|
|
||
| return Ok(RouteOutput { | ||
| routed: true, | ||
| worker_id, | ||
| message: format!( | ||
| "Message delivered to worker {worker_id} (follow-up input)." | ||
| ), | ||
| }); | ||
| } | ||
| drop(inputs); | ||
|
|
||
| // Worker is idle but has no input channel — shouldn't happen | ||
| // for interactive workers, but fall through to injection. | ||
| } |
There was a problem hiding this comment.
The idle-path “fall through to injection” never happens.
If worker_is_idle == Some(true) and worker_inputs is missing, this match arm just exits and the function reaches the final error. Rust match arms do not fall through, so the fallback described in the comment is currently unreachable.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/tools/route.rs` around lines 96 - 127, The match arm for Some(true)
currently returns or ends the arm if the worker_inputs channel is missing, so
the intended "fall through to injection" never happens; restructure the logic so
that when worker_is_idle is Some(true) you attempt to look up input_tx in
self.state.worker_inputs and only return early if input_tx exists and the send
succeeds, otherwise allow execution to continue to the injection path (i.e., do
not consume the case that should fall through). Concretely, replace the match
arm handling Some(true) with an if let or explicit branch that tries
inputs.get(&worker_id).cloned(), returns on successful input_tx.send (as
currently), but when no input_tx is found simply drop inputs and proceed (so
downstream injection code runs); keep symbols worker_is_idle,
self.state.worker_inputs, input_tx and the existing RouteOutput/return behavior
unchanged when the send succeeds.
| input_tx.send(args.message).await.map_err(|_| { | ||
| RouteError(format!( | ||
| "Worker {worker_id} has stopped accepting input (channel closed)" | ||
| )) | ||
| })?; |
There was a problem hiding this comment.
Return worker lifecycle misses as RouteOutput, not RouteError.
These branches represent normal, recoverable states for this tool: the worker finished, the sender was closed, or the worker no longer exists. Returning Err(RouteError(...)) hides the recovery path from the model, while this same tool already uses RouteOutput { routed: false, ... } for unsupported states. Please make these structured RouteOutput responses too.
As per coding guidelines: src/tools/**/*.rs: Tool errors are returned as structured results, not panics. The LLM sees the error and can recover (error-as-result for tools pattern).
Also applies to: 134-138, 183-185
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/tools/route.rs` around lines 103 - 107, The branch that currently maps
input_tx.send(...).await errors to Err(RouteError(...)) should instead return a
structured RouteOutput indicating a non-routed lifecycle event (e.g., routed:
false and a descriptive message like "worker finished / channel closed / worker
not found") so the model can recover; change the map_err/Err(RouteError(...))
paths (around input_tx.send in route.rs and the similar branches noted at the
other ranges) to construct and return RouteOutput instances with routed: false
and appropriate info instead of returning RouteError.
…servation, structured duplicate output, follow-up injection handling - Route tool checks StatusBlock worker status instead of worker_inputs map presence - inject_rx uses Option<Receiver> with .take() instead of dummy channel allocation - Atomic task reservation via reserved_tasks to prevent TOCTOU race in duplicate detection - Follow-up loop now handles CONTEXT_INJECTION_REASON, matching main task loop behavior - Duplicate worker spawn returns structured SpawnWorkerOutput instead of error string
…ontext-injection feat: worker context injection and duplicate worker spawn prevention
Summary
SpacebotHook.on_completion_call— no Rig framework changes neededroutetool to support both interactive follow-up (worker_inputs) and context injection for running workers (worker_injections), so a single tool handles both WaitingForInput and Running workersHow it works
inject_txchannel from its constructor, stored inChannelState.worker_injectionsinject_rxis wired intoSpacebotHookviawith_inject_rx()at the start ofWorker::run()on_completion_calldrains pending injected messages and returnsHookAction::Terminatewith reasonspacebot_context_injectionprompt_with_tool_nudge_retrycatches this termination (before the nudge arm), appends injected messages as[Context update from the user]entries in history, and re-prompts with a continuation hintChanges
src/agent/channel.rsworker_injectionsfield toChannelState, cleanup inWorkerCompleteandcancel_worker_with_reasonsrc/agent/channel_dispatch.rsinject_txon worker spawn, add dedup guard viafind_duplicate_worker_tasksrc/agent/cortex.rsinject_txfor detached workers (no parent channel)src/agent/status.rsfind_duplicate_worker_task()with[opencode]prefix normalizationsrc/agent/worker.rsinject_rxfield, returninject_txfrom all constructors, wire into hook inrun()src/error.rsDuplicateWorkerTaskerror variantsrc/hooks/spacebot.rsCONTEXT_INJECTION_REASON, drain logic inon_completion_call, injection handler in retry loopsrc/tools/route.rsworker_inputsfirst, fall back toworker_injectionssrc/tools/spawn_worker.rsprompts/en/tools/route_description.md.j2tests/context_dump.rsworker_injectionsfield to test constructorsTesting
just gate-prpasses (468 tests, clippy clean, fmt clean, integration tests compile)Race/terminal-state reasoning
on_completion_callusestry_recv(non-blocking) so it only picks up messages already in the channel buffer. If a message arrives mid-LLM-call, it gets picked up on the next turn boundary — no lost messages.inject_txis cleaned up in theWorkerCompletehandler andcancel_worker_with_reason. The channel'ssend()will fail with a closed channel error, which the route tool surfaces as "worker has stopped running."Note
This PR implements worker context injection—allowing channels to send context updates to running workers mid-task without spawning duplicates. Uses a terminate-and-resume pattern through the hook system. 11 files modified, 11 tests added, all delivery gates pass.
Written by Tembo for commit c4e56d5. This will update automatically on new commits.