From 5a652d68a5b907e6c3e90ee7f134539ccf346d12 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Sun, 17 May 2026 21:32:08 -0400 Subject: [PATCH 1/2] refactor(broker): unify worker request/response correlation (#871) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Factors the per-feature pending_snapshots map + bespoke Snapshot ListenApiRequest variant + snapshot_response frame arm out of src/main.rs into a generic pattern in a new src/worker_request.rs module: * `WorkerRequest { name, kind, payload, timeout, reply }` is a single generic ListenApiRequest variant that future request/response routes (mode / pending / flush from #864) can reuse — each new route now costs ~5 lines in listen_api.rs instead of ~80 lines spread across three files. * `pending_requests: HashMap` replaces the per-feature `pending_snapshots` + `snapshot_request_deadlines` maps. * A generic `*_response` dispatch in the worker-frame handler routes any frame carrying a known request_id via fulfil_response_frame. * `reap_expired` consolidates the timeout sweep. * `RequestWorkerError` (thiserror) carries typed errors back to the HTTP layer, where worker_request_error_to_response maps each variant to the same HTTP status it used to produce. Wire-protocol surface is unchanged externally: status codes for the `/api/spawned/{name}/snapshot` route stay 200/400/404/409/500/504. The 504 response body's `code` field flips from `snapshot_timeout` to the more accurate `worker_timeout` (no external consumer of the string). Worker (pty_worker.rs) already emits the consistent envelope `{type:"_response", request_id, payload[, error]}` — no worker changes required. Fire-and-forget operations (`send_input`, `resize_pty`) keep their existing pattern — the helper is opt-in for request/response cases. Adds 6 tests (5 for worker_request internals, 1 for the typed worker error envelope path through the snapshot route). Existing snapshot route tests updated to assert against the new WorkerRequest variant and typed errors. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/listen_api.rs | 162 +++++++++++++++++----- src/main.rs | 190 ++++++++++++-------------- src/worker_request.rs | 302 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 520 insertions(+), 134 deletions(-) create mode 100644 src/worker_request.rs diff --git a/src/listen_api.rs b/src/listen_api.rs index 963fd74f3..0f09c855c 100644 --- a/src/listen_api.rs +++ b/src/listen_api.rs @@ -16,6 +16,8 @@ use tokio::sync::{broadcast, mpsc}; use tokio::time::timeout; use uuid::Uuid; +use crate::worker_request::{RequestWorkerError, DEFAULT_REQUEST_TIMEOUT}; + const LISTEN_API_SEND_TIMEOUT: Duration = Duration::from_secs(30); // --------------------------------------------------------------------------- @@ -80,10 +82,27 @@ pub enum ListenApiRequest { cols: u16, reply: tokio::sync::oneshot::Sender>, }, - Snapshot { + /// Generic worker request/response RPC: park a oneshot in the + /// broker's `pending_requests` map keyed by a fresh `request_id`, + /// frame the request, and ship it to the named worker over its + /// stdin pipe. The reply fires when the worker echoes a matching + /// `*_response` frame or the deadline elapses (whichever first). + /// + /// Used by request/response routes like `GET /api/spawned/{name}/snapshot`. + /// Fire-and-forget routes (`send_input`, `resize_pty`) keep their + /// existing single-arm channel pattern. + WorkerRequest { name: String, - format: SnapshotFormat, - reply: tokio::sync::oneshot::Sender>, + /// Outbound frame `type`, e.g. `"snapshot_pty"`. The worker is + /// expected to reply with `"{kind}_response"`. + kind: String, + /// Worker stdin frame payload — must match the worker-side + /// schema for `kind`. + payload: Value, + /// Max wall-clock duration the broker will wait for the worker's + /// response before sending [`RequestWorkerError::Timeout`]. + timeout: Duration, + reply: tokio::sync::oneshot::Sender>, }, GetMetrics { agent: Option, @@ -911,7 +930,15 @@ fn api_error( ) } -/// Parse an error string like "agent_not_found: worker-a" into a (code, status) pair. +/// Parse an error string like "agent_not_found: worker-a" into a +/// (code, status) pair. +/// +/// Used by routes that still surface stringly-typed errors (e.g. +/// `send_input`, `resize_pty`). Routes built on `WorkerRequest` go +/// through [`worker_request_error_to_response`] instead, which +/// preserves typed-error code/status mappings but falls back here for +/// the structured `RequestWorkerError::WorkerError` envelope so worker- +/// side codes like `invalid_format` keep producing 400s. fn classify_error(err: &str) -> (axum::http::StatusCode, &str) { if err.starts_with("agent_not_found") { (axum::http::StatusCode::NOT_FOUND, "agent_not_found") @@ -923,10 +950,10 @@ fn classify_error(err: &str) -> (axum::http::StatusCode, &str) { // 409 Conflict — the request itself is well-formed; the conflict // is with the resource's current capabilities. (axum::http::StatusCode::CONFLICT, "unsupported_runtime") - } else if err.starts_with("snapshot_timeout") { + } else if err.starts_with("worker_timeout") { // Worker died or stalled between accepting the frame and // replying. This is a server-side fault, not a bad request. - (axum::http::StatusCode::GATEWAY_TIMEOUT, "snapshot_timeout") + (axum::http::StatusCode::GATEWAY_TIMEOUT, "worker_timeout") } else if err.starts_with("invalid_") { (axum::http::StatusCode::BAD_REQUEST, "invalid_request") } else { @@ -1043,9 +1070,11 @@ async fn listen_api_snapshot( let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); if state .tx - .send(ListenApiRequest::Snapshot { + .send(ListenApiRequest::WorkerRequest { name: name.clone(), - format, + kind: "snapshot_pty".to_string(), + payload: json!({ "format": format.as_wire_str() }), + timeout: DEFAULT_REQUEST_TIMEOUT, reply: reply_tx, }) .await @@ -1055,14 +1084,45 @@ async fn listen_api_snapshot( } match reply_rx.await { Ok(Ok(val)) => (axum::http::StatusCode::OK, axum::Json(val)), - Ok(Err(ref err)) => { - let (status, code) = classify_error(err); - api_error(status, code, err.clone()) - } + Ok(Err(err)) => worker_request_error_to_response(&err), Err(_) => internal_error(), } } +/// Map a [`RequestWorkerError`] to an HTTP response. Centralised so every +/// route built on `WorkerRequest` produces consistent status codes. +fn worker_request_error_to_response( + err: &RequestWorkerError, +) -> (axum::http::StatusCode, axum::Json) { + use axum::http::StatusCode; + match err { + RequestWorkerError::WorkerNotFound(_) => { + api_error(StatusCode::NOT_FOUND, "agent_not_found", err.to_string()) + } + RequestWorkerError::UnsupportedRuntime(_) => { + api_error(StatusCode::CONFLICT, "unsupported_runtime", err.to_string()) + } + RequestWorkerError::Timeout => api_error( + StatusCode::GATEWAY_TIMEOUT, + "worker_timeout", + err.to_string(), + ), + RequestWorkerError::WorkerError { code, message } => { + // Reuse classify_error so worker-side codes ("invalid_format", + // "agent_not_found", …) keep producing their canonical HTTP + // status. Any unknown code falls back to 400. + let composed = format!("{code}: {message}"); + let (status, mapped_code) = classify_error(&composed); + let mapped_code = mapped_code.to_string(); + api_error(status, &mapped_code, composed) + } + RequestWorkerError::SendFailed(_) => { + api_error(StatusCode::NOT_FOUND, "agent_not_found", err.to_string()) + } + RequestWorkerError::ChannelClosed => internal_error(), + } +} + // --------------------------------------------------------------------------- // Observability // --------------------------------------------------------------------------- @@ -1567,6 +1627,7 @@ mod auth_tests { use tower::ServiceExt; use super::{listen_api_router_with_auth, ListenApiConfig, ListenApiRequest}; + use crate::worker_request::RequestWorkerError; fn test_router( broker_api_key: Option<&str>, @@ -2300,13 +2361,16 @@ mod auth_tests { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { match rx.recv().await { - Some(ListenApiRequest::Snapshot { + Some(ListenApiRequest::WorkerRequest { name, - format, + kind, + payload, reply, + .. }) => { assert_eq!(name, "worker-a"); - assert_eq!(format, super::SnapshotFormat::Plain); + assert_eq!(kind, "snapshot_pty"); + assert_eq!(payload["format"], json!("plain")); let _ = reply.send(Ok(json!({ "format": "plain", "rows": 4, @@ -2344,13 +2408,16 @@ mod auth_tests { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { match rx.recv().await { - Some(ListenApiRequest::Snapshot { + Some(ListenApiRequest::WorkerRequest { name, - format, + kind, + payload, reply, + .. }) => { assert_eq!(name, "worker-a"); - assert_eq!(format, super::SnapshotFormat::Ansi); + assert_eq!(kind, "snapshot_pty"); + assert_eq!(payload["format"], json!("ansi")); let _ = reply.send(Ok(json!({ "format": "ansi", "rows": 2, @@ -2401,7 +2468,7 @@ mod auth_tests { let body = response_json(response).await; assert_eq!(body["code"], json!("invalid_format")); - // The broker channel must not have received a Snapshot request. + // The broker channel must not have received a WorkerRequest. assert!(rx.try_recv().is_err()); } @@ -2409,8 +2476,10 @@ mod auth_tests { async fn snapshot_route_propagates_agent_not_found_as_404() { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { - if let Some(ListenApiRequest::Snapshot { reply, .. }) = rx.recv().await { - let _ = reply.send(Err("agent_not_found: no worker named 'ghost'".to_string())); + if let Some(ListenApiRequest::WorkerRequest { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(RequestWorkerError::WorkerNotFound( + "no worker named 'ghost'".to_string(), + ))); } }); @@ -2427,6 +2496,8 @@ mod auth_tests { .expect("request should succeed"); assert_eq!(response.status(), StatusCode::NOT_FOUND); + let body = response_json(response).await; + assert_eq!(body["code"], json!("agent_not_found")); replier.await.expect("replier should complete"); } @@ -2434,11 +2505,11 @@ mod auth_tests { async fn snapshot_route_maps_unsupported_runtime_to_409() { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { - if let Some(ListenApiRequest::Snapshot { reply, .. }) = rx.recv().await { - let _ = reply.send(Err( - "unsupported_runtime: worker 'h' is headless; snapshot_pty is only supported on PTY workers" + if let Some(ListenApiRequest::WorkerRequest { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(RequestWorkerError::UnsupportedRuntime( + "worker 'h' is headless; snapshot_pty is only supported on PTY workers" .to_string(), - )); + ))); } }); @@ -2461,13 +2532,11 @@ mod auth_tests { } #[tokio::test] - async fn snapshot_route_maps_snapshot_timeout_to_504() { + async fn snapshot_route_maps_worker_timeout_to_504() { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { - if let Some(ListenApiRequest::Snapshot { reply, .. }) = rx.recv().await { - let _ = reply.send(Err( - "snapshot_timeout: worker did not respond in time".to_string() - )); + if let Some(ListenApiRequest::WorkerRequest { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(RequestWorkerError::Timeout)); } }); @@ -2485,7 +2554,38 @@ mod auth_tests { assert_eq!(response.status(), StatusCode::GATEWAY_TIMEOUT); let body = response_json(response).await; - assert_eq!(body["code"], json!("snapshot_timeout")); + assert_eq!(body["code"], json!("worker_timeout")); + replier.await.expect("replier should complete"); + } + + #[tokio::test] + async fn snapshot_route_propagates_worker_error_envelope() { + let (router, mut rx) = test_router(Some("secret")); + let replier = tokio::spawn(async move { + if let Some(ListenApiRequest::WorkerRequest { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(RequestWorkerError::WorkerError { + code: "invalid_format".to_string(), + message: "unsupported format 'qoi'".to_string(), + })); + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/worker-a/snapshot") + .method("GET") + .header("x-api-key", "secret") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + // classify_error maps "invalid_*" prefixes to 400 / "invalid_request". + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + let body = response_json(response).await; + assert_eq!(body["code"], json!("invalid_request")); replier.await.expect("replier should complete"); } } diff --git a/src/main.rs b/src/main.rs index 81886b139..54a955521 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ mod swarm; mod swarm_tui; mod wait; mod worker; +mod worker_request; mod wrap; use helpers::{ @@ -1393,19 +1394,17 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { let delivery_retry_interval = delivery_retry_interval(); let mut pending_deliveries = load_pending_deliveries(&paths.pending); let mut terminal_failed_deliveries: HashSet = HashSet::new(); - // Outstanding `Snapshot` HTTP requests waiting on a `snapshot_response` - // frame from the PTY worker. Keyed by the `request_id` we put on the - // outbound `snapshot_pty` frame; the reply oneshot is consumed when the - // worker echoes the same `request_id` back. Stale entries are cleaned - // up by the deadline sweep in the `reap_tick` arm — see - // `SNAPSHOT_REQUEST_TIMEOUT` below. (#871 will generalise this - // request/response correlation pattern across other RPCs.) - let mut pending_snapshots: HashMap< - String, - tokio::sync::oneshot::Sender>, - > = HashMap::new(); - const SNAPSHOT_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); - let mut snapshot_request_deadlines: HashMap = HashMap::new(); + // Outstanding worker-bound RPC requests waiting on a `*_response` + // frame from the wrapped worker. Keyed by the `request_id` we put on + // the outbound request frame; the reply `oneshot` is consumed when + // the worker echoes the same `request_id` back, or the entry expires + // via the deadline sweep in the `reap_tick` arm below. + // + // Introduced for `snapshot_pty` (#870) and generalised in #871 so + // future request/response routes (`mode`, `pending`, `flush` from + // #864) can reuse the same correlation infrastructure — see + // `crate::worker_request`. + let mut pending_requests: HashMap = HashMap::new(); let mut dm_participants_cache: HashMap)> = HashMap::new(); let mut recent_thread_messages: VecDeque = VecDeque::new(); if !pending_deliveries.is_empty() { @@ -2198,49 +2197,61 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { }))); } } - ListenApiRequest::Snapshot { name, format, reply } => { - // Stash the reply oneshot under a unique request_id, then - // forward `snapshot_pty` to the worker. The worker echoes - // the request_id back on its `snapshot_response`, which - // the broker uses to fulfill the oneshot (see the - // worker_event_rx arm below). + ListenApiRequest::WorkerRequest { name, kind, payload, timeout, reply } => { + // Generic worker request/response: validate the + // worker exists and supports a PTY (request/response + // routes today all target the PTY side), then ship + // the frame and park the `reply` oneshot in + // `pending_requests`. The response is fulfilled + // either by the `*_response` arm below or by the + // deadline sweep in `reap_tick`. // // Headless workers don't run a VT and don't handle - // `snapshot_pty` — short-circuit with a descriptive - // error rather than letting the request sit in - // `pending_snapshots` until the 5s timeout sweep - // returns a misleading `snapshot_timeout`. + // PTY-oriented RPCs — short-circuit with a typed + // error rather than letting the request sit until + // the timeout sweep returns a misleading + // `worker_timeout`. let runtime = workers .workers .get(&name) .map(|handle| handle.spec.runtime.clone()); match runtime { None => { - let _ = reply.send(Err(format!( - "agent_not_found: no worker named '{name}'" - ))); + let _ = reply.send(Err( + worker_request::RequestWorkerError::WorkerNotFound( + format!("no worker named '{name}'"), + ), + )); } Some(AgentRuntime::Headless) => { - let _ = reply.send(Err(format!( - "unsupported_runtime: worker '{name}' is headless; snapshot_pty is only supported on PTY workers" - ))); + let _ = reply.send(Err( + worker_request::RequestWorkerError::UnsupportedRuntime( + format!("worker '{name}' is headless; {kind} is only supported on PTY workers"), + ), + )); } Some(AgentRuntime::Pty) => { - let request_id = format!("snap_{}", Uuid::new_v4().simple()); + let request_id = format!("req_{}", Uuid::new_v4().simple()); if let Err(err) = workers.send_to_worker( &name, - "snapshot_pty", + &kind, Some(request_id.clone()), - json!({ "format": format.as_wire_str() }), + payload, ).await { - let _ = reply.send(Err(format!( - "agent_not_found: {err}" - ))); + let _ = reply.send(Err( + worker_request::RequestWorkerError::SendFailed( + err.to_string(), + ), + )); } else { - pending_snapshots.insert(request_id.clone(), reply); - snapshot_request_deadlines.insert( + pending_requests.insert( request_id, - Instant::now() + SNAPSHOT_REQUEST_TIMEOUT, + worker_request::PendingRequest { + kind, + worker_name: name, + reply, + deadline: Instant::now() + timeout, + }, ); } } @@ -3198,45 +3209,30 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { "name": name, "error": value.get("payload").cloned().unwrap_or(Value::Null) })).await; - } else if msg_type == "snapshot_response" { - // Route the response back to the HTTP / - // CLI caller using the request_id we put - // on the outbound snapshot_pty frame. - let request_id = value - .get("request_id") - .and_then(Value::as_str) - .map(ToOwned::to_owned); - if let Some(request_id) = request_id { - snapshot_request_deadlines.remove(&request_id); - if let Some(reply) = pending_snapshots.remove(&request_id) { - let payload = value - .get("payload") - .cloned() - .unwrap_or(Value::Null); - // The worker emits an error payload - // shaped like `{ "error": { "code":..., "message":... } }` - // when it can't honour the format. - if let Some(error) = payload.get("error") { - let message = error - .get("message") - .and_then(Value::as_str) - .unwrap_or("snapshot failed"); - let code = error - .get("code") - .and_then(Value::as_str) - .unwrap_or("invalid_format"); - let _ = reply.send(Err(format!("{code}: {message}"))); - } else { - let _ = reply.send(Ok(payload)); - } - } else { - tracing::debug!( - target = "agent_relay::broker", - worker = %name, - request_id = %request_id, - "snapshot_response with no pending caller — dropping" - ); - } + } else if msg_type.ends_with("_response") { + // Generic worker request/response dispatch. + // Any frame whose `type` ends in + // `_response` is routed by `request_id` + // into the matching parked `oneshot` in + // `pending_requests`. The pending entry + // owns the format/error decoding logic + // via `worker_request::fulfil_response_frame`. + let routed = worker_request::fulfil_response_frame( + &mut pending_requests, + &value, + ); + if !routed { + let req_id = value + .get("request_id") + .and_then(Value::as_str) + .unwrap_or(""); + tracing::debug!( + target = "agent_relay::broker", + worker = %name, + msg_type = %msg_type, + request_id = %req_id, + "worker response with no pending caller — dropping" + ); } } else if msg_type == "worker_stream" { let _ = send_event(&sdk_out_tx, json!({ @@ -3540,32 +3536,20 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { _ = reap_tick.tick() => { let now = Instant::now(); - // Time out snapshot requests whose worker never responded. - // Common cause: worker crashed between us sending snapshot_pty - // and it parsing the frame. Without this sweep the HTTP - // handler would hang forever on its oneshot. - let timed_out: Vec = snapshot_request_deadlines - .iter() - .filter_map(|(req_id, deadline)| { - if *deadline <= now { - Some(req_id.clone()) - } else { - None - } - }) - .collect(); - for req_id in timed_out { - snapshot_request_deadlines.remove(&req_id); - if let Some(reply) = pending_snapshots.remove(&req_id) { - tracing::warn!( - target = "agent_relay::broker", - request_id = %req_id, - "snapshot request timed out before worker responded" - ); - let _ = reply.send(Err( - "snapshot_timeout: worker did not respond in time".into(), - )); - } + // Time out worker request/response calls whose worker never + // responded. Common cause: worker crashed between us sending + // the request frame and it parsing the frame. Without this + // sweep the HTTP handler would hang forever on its oneshot. + for (req_id, worker_name, kind) in + worker_request::reap_expired(&mut pending_requests, now) + { + tracing::warn!( + target = "agent_relay::broker", + request_id = %req_id, + worker = %worker_name, + kind = %kind, + "worker request timed out before worker responded" + ); } let due_ids: Vec = pending_deliveries diff --git a/src/worker_request.rs b/src/worker_request.rs new file mode 100644 index 000000000..8633eab55 --- /dev/null +++ b/src/worker_request.rs @@ -0,0 +1,302 @@ +//! Worker request/response correlation for the broker. +//! +//! Several broker → worker operations follow the same shape: the broker +//! sends a typed request frame to a wrapped worker over its +//! JSON-over-stdio pipe, then waits for a typed response frame back so it +//! can fulfil an HTTP / CLI caller's `oneshot`. Today only `snapshot_pty` +//! uses the pattern (introduced in #870), but several routes from #864 +//! (`mode` / `pending` / `flush`) will follow shortly. +//! +//! This module factors out the bookkeeping so each new route costs ~5 +//! lines instead of ~80: +//! +//! * [`PendingRequest`] — one entry in the broker's correlation map, +//! carrying the awaiter's `oneshot` and the timeout deadline. +//! * [`RequestWorkerError`] — the typed error returned to the awaiter +//! (mapped to HTTP status codes by `listen_api::classify_error`). +//! * [`fulfil_response_frame`] — collapses the per-feature +//! `*_response` worker-frame arms into a single dispatch. +//! * [`reap_expired`] — the timeout sweep used by the broker's reap tick. +//! +//! Sending the outbound request frame and parking the `PendingRequest` +//! happens inline in the broker loop's `ListenApiRequest::WorkerRequest` +//! arm — it needs `&mut WorkerRegistry`, `&mut HashMap`, and access to +//! `AgentRuntime` checks that live in `main.rs`, so wrapping it in a +//! helper here would not pay for itself. + +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use serde_json::Value; +use thiserror::Error; +use tokio::sync::oneshot; + +/// Error returned to the broker's HTTP / CLI caller when a request/response +/// round-trip with a worker fails. +/// +/// Variants are intentionally narrow so the API layer can map each to a +/// stable status code (see `listen_api::classify_error`). New variants here +/// require a matching arm in the classifier. +#[derive(Debug, Error)] +pub(crate) enum RequestWorkerError { + /// No worker is registered under the given name. + #[error("agent_not_found: {0}")] + WorkerNotFound(String), + + /// The worker exists but its runtime does not support this request + /// (e.g. `snapshot_pty` against a headless worker). + #[error("unsupported_runtime: {0}")] + UnsupportedRuntime(String), + + /// Failed to enqueue the request frame on the worker's stdin pipe. + /// The worker may have died after the lookup succeeded. + #[error("send_failed: {0}")] + SendFailed(String), + + /// The worker did not respond before the deadline. + #[error("worker_timeout: worker did not respond in time")] + Timeout, + + /// The worker returned a structured `error` envelope. + #[error("{code}: {message}")] + WorkerError { code: String, message: String }, + + /// The broker dropped the awaiter's `oneshot` before responding + /// (shutdown race). Reserved for future use; today the API layer + /// treats `oneshot::error::RecvError` from the broker channel as + /// an internal-server-error response directly, but new call sites + /// will need this variant once `request_worker` is wrapped in a + /// fully async helper. + #[allow(dead_code)] + #[error("channel_closed: broker shut down before responding")] + ChannelClosed, +} + +/// One outstanding worker request, keyed by `request_id` in the broker's +/// correlation map. The awaiter's `oneshot` fires when the matching +/// `*_response` frame arrives or the deadline elapses (whichever first). +pub(crate) struct PendingRequest { + /// What kind of request this is — only used for diagnostic logging + /// when a response arrives with no matching caller. + pub(crate) kind: String, + /// Worker that the request was sent to — used for diagnostics on + /// timeout. + pub(crate) worker_name: String, + /// Reply channel to the HTTP / CLI handler awaiting the response. + pub(crate) reply: oneshot::Sender>, + /// Wall-clock instant after which the request is considered timed + /// out and the entry is dropped by the reap tick sweep. + pub(crate) deadline: Instant, +} + +/// Default deadline for worker request/response round-trips when callers +/// don't specify one explicitly. Matches the previous `snapshot` timeout. +pub(crate) const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); + +/// Route a `*_response` worker frame to the matching parked +/// [`PendingRequest`]. +/// +/// Returns `true` if a pending entry was consumed (whether the awaiter +/// is still listening or not) and `false` if the response carried no +/// `request_id` or no entry was parked under it. Callers in the broker's +/// worker-frame handler use the return value purely for tracing. +/// +/// Response frame shape: +/// +/// ```json +/// { "type": "_response", +/// "request_id": "...", +/// "payload": { "error": { "code": "...", "message": "..." } } | { ... } } +/// ``` +/// +/// `payload.error` is treated as a structured worker-side failure and +/// mapped to [`RequestWorkerError::WorkerError`]; any other payload is +/// forwarded verbatim to the awaiter. +pub(crate) fn fulfil_response_frame( + pending: &mut HashMap, + frame: &Value, +) -> bool { + let Some(request_id) = frame.get("request_id").and_then(Value::as_str) else { + return false; + }; + let Some(entry) = pending.remove(request_id) else { + return false; + }; + + let payload = frame.get("payload").cloned().unwrap_or(Value::Null); + let result = if let Some(error) = payload.get("error") { + let code = error + .get("code") + .and_then(Value::as_str) + .unwrap_or("worker_error") + .to_string(); + let message = error + .get("message") + .and_then(Value::as_str) + .unwrap_or("worker reported an error") + .to_string(); + Err(RequestWorkerError::WorkerError { code, message }) + } else { + Ok(payload) + }; + let _ = entry.reply.send(result); + // `kind` and `worker_name` are read by the timeout sweep below; we + // drop them here without inspection because the response succeeded + // before they were needed for diagnostics. + let _ = (entry.kind, entry.worker_name); + true +} + +/// Drop pending entries whose deadlines have elapsed and notify each +/// awaiter with [`RequestWorkerError::Timeout`]. Called from the broker's +/// reap tick. +/// +/// Returns the list of `(request_id, worker_name, kind)` that were +/// reaped, for the caller to emit structured logs. +pub(crate) fn reap_expired( + pending: &mut HashMap, + now: Instant, +) -> Vec<(String, String, String)> { + let timed_out: Vec = pending + .iter() + .filter_map(|(req_id, entry)| { + if entry.deadline <= now { + Some(req_id.clone()) + } else { + None + } + }) + .collect(); + + let mut reaped = Vec::with_capacity(timed_out.len()); + for req_id in timed_out { + if let Some(entry) = pending.remove(&req_id) { + let worker_name = entry.worker_name.clone(); + let kind = entry.kind.clone(); + let _ = entry.reply.send(Err(RequestWorkerError::Timeout)); + reaped.push((req_id, worker_name, kind)); + } + } + reaped +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn make_entry( + kind: &str, + worker: &str, + deadline: Instant, + ) -> ( + PendingRequest, + oneshot::Receiver>, + ) { + let (tx, rx) = oneshot::channel(); + ( + PendingRequest { + kind: kind.to_string(), + worker_name: worker.to_string(), + reply: tx, + deadline, + }, + rx, + ) + } + + #[tokio::test] + async fn fulfil_response_frame_routes_payload_to_awaiter() { + let mut pending: HashMap = HashMap::new(); + let (entry, rx) = make_entry( + "snapshot_pty", + "worker-a", + Instant::now() + Duration::from_secs(5), + ); + pending.insert("req-1".to_string(), entry); + + let frame = json!({ + "type": "snapshot_response", + "request_id": "req-1", + "payload": { "format": "plain", "screen": "hello" }, + }); + + assert!(fulfil_response_frame(&mut pending, &frame)); + assert!(pending.is_empty()); + + let received = rx.await.expect("reply channel should fire"); + let value = received.expect("payload should be Ok"); + assert_eq!(value["format"], json!("plain")); + assert_eq!(value["screen"], json!("hello")); + } + + #[tokio::test] + async fn fulfil_response_frame_maps_error_envelope() { + let mut pending: HashMap = HashMap::new(); + let (entry, rx) = make_entry( + "snapshot_pty", + "worker-a", + Instant::now() + Duration::from_secs(5), + ); + pending.insert("req-1".to_string(), entry); + + let frame = json!({ + "type": "snapshot_response", + "request_id": "req-1", + "payload": { + "error": { "code": "invalid_format", "message": "boom" } + }, + }); + + assert!(fulfil_response_frame(&mut pending, &frame)); + let received = rx.await.expect("reply channel should fire"); + match received { + Err(RequestWorkerError::WorkerError { code, message }) => { + assert_eq!(code, "invalid_format"); + assert_eq!(message, "boom"); + } + other => panic!("expected WorkerError, got {other:?}"), + } + } + + #[test] + fn fulfil_response_frame_returns_false_without_request_id() { + let mut pending: HashMap = HashMap::new(); + let frame = json!({ "type": "snapshot_response", "payload": {} }); + assert!(!fulfil_response_frame(&mut pending, &frame)); + } + + #[test] + fn fulfil_response_frame_returns_false_when_no_entry() { + let mut pending: HashMap = HashMap::new(); + let frame = json!({ + "type": "snapshot_response", + "request_id": "missing", + "payload": {}, + }); + assert!(!fulfil_response_frame(&mut pending, &frame)); + } + + #[tokio::test] + async fn reap_expired_times_out_overdue_entries() { + let mut pending: HashMap = HashMap::new(); + let now = Instant::now(); + + let (entry_stale, rx_stale) = + make_entry("snapshot_pty", "worker-a", now - Duration::from_millis(10)); + let (entry_fresh, _rx_fresh) = + make_entry("snapshot_pty", "worker-b", now + Duration::from_secs(5)); + pending.insert("stale".to_string(), entry_stale); + pending.insert("fresh".to_string(), entry_fresh); + + let reaped = reap_expired(&mut pending, now); + assert_eq!(reaped.len(), 1); + assert_eq!(reaped[0].0, "stale"); + assert_eq!(reaped[0].1, "worker-a"); + assert_eq!(reaped[0].2, "snapshot_pty"); + assert!(pending.contains_key("fresh")); + + let received = rx_stale.await.expect("reply channel should fire"); + assert!(matches!(received, Err(RequestWorkerError::Timeout))); + } +} From 8de35afbdf7c89dc328a3bce8ce7f28953843aca Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Sun, 17 May 2026 21:39:34 -0400 Subject: [PATCH 2/2] broker(worker_request): fail pending RPC on worker teardown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Finding from PR #873 review (cubic-ish): the new pending_requests map was only drained by the periodic timeout sweep (`reap_expired`). When a worker disappeared (explicit release, `worker_exited` frame from the worker, or `reap_exited` finding the process gone), in-flight RPC callers sat on their oneshots until the 5s deadline elapsed instead of failing fast — wrong status code (504 Gateway Timeout) for what was really a 503 Service Unavailable. Adds `worker_request::fail_for_worker(pending, name)` that drains every entry matching the worker and notifies each awaiter with a new `RequestWorkerError::WorkerDisappeared(name)` variant (mapped to HTTP 503 in the listen-api classifier). Returns the drained `(request_id, kind)` list for caller logging. Adds a thin `fail_pending_requests_for_worker` wrapper in `main.rs` that calls the helper and emits one structured `tracing::warn!` per drained request, mirroring the shape of the existing `drop_pending_for_worker` pattern. Wired into all four worker-teardown sites: - Explicit release via the HTTP API (`agent_released`) - Explicit release via relaycast (`relaycast_release`) - `worker_exited` frame received from the worker - `reap_exited` periodic sweep — both `PermanentlyDead` and the unsupervised exit branch Test: `fail_for_worker_drains_only_matching_entries` exercises a three-worker map, drains one worker's two entries, asserts the other workers' entries remain untouched and the drained awaiters each receive `WorkerDisappeared` carrying the right name. Full suite: 626 passed (was 625). cargo fmt + clippy clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/listen_api.rs | 5 +++ src/main.rs | 30 +++++++++++++++ src/worker_request.rs | 88 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+) diff --git a/src/listen_api.rs b/src/listen_api.rs index 0f09c855c..aa6161906 100644 --- a/src/listen_api.rs +++ b/src/listen_api.rs @@ -1119,6 +1119,11 @@ fn worker_request_error_to_response( RequestWorkerError::SendFailed(_) => { api_error(StatusCode::NOT_FOUND, "agent_not_found", err.to_string()) } + RequestWorkerError::WorkerDisappeared(_) => api_error( + StatusCode::SERVICE_UNAVAILABLE, + "worker_disappeared", + err.to_string(), + ), RequestWorkerError::ChannelClosed => internal_error(), } } diff --git a/src/main.rs b/src/main.rs index 54a955521..54face2ce 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1781,6 +1781,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { json!({"kind":"delivery_dropped","name":&name,"count":dropped,"reason":"agent_released"}), ).await; } + fail_pending_requests_for_worker(&mut pending_requests, &name, "agent_released"); state.agents.remove(&name); if paths.persist { let _ = state.save(&paths.state); } let _ = send_event( @@ -2450,6 +2451,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { json!({"kind":"delivery_dropped","name":name,"count":dropped,"reason":"agent_released"}), ).await; } + fail_pending_requests_for_worker(&mut pending_requests, &name, "relaycast_release"); telemetry.track(TelemetryEvent::AgentRelease { cli: String::new(), release_reason: "relaycast_release".to_string(), @@ -3493,6 +3495,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { }), ).await; } + fail_pending_requests_for_worker(&mut pending_requests, &name, "worker_exited"); let _ = send_event( &sdk_out_tx, json!({ @@ -3692,6 +3695,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { }), ).await; } + fail_pending_requests_for_worker(&mut pending_requests, name, "worker_permanently_dead"); let _ = send_event( &sdk_out_tx, json!({"kind":"agent_permanently_dead","name":name,"reason":reason}), @@ -3731,6 +3735,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { }), ).await; } + fail_pending_requests_for_worker(&mut pending_requests, name, "worker_exited"); let _ = send_event( &sdk_out_tx, json!({"kind":"agent_exited","name":name,"code":code,"signal":signal}), @@ -4121,6 +4126,31 @@ fn drop_pending_for_worker( before.saturating_sub(pending_deliveries.len()) } +/// Drain every in-flight worker request targeting `worker_name` and +/// notify each awaiter with [`worker_request::RequestWorkerError::WorkerDisappeared`]. +/// Called from every worker-teardown path (explicit release, +/// `worker_exited` frame, `reap_exited` periodic sweep) so HTTP callers +/// don't have to wait out the request deadline when the worker has +/// clearly gone. Logs one structured warning per drained request. +fn fail_pending_requests_for_worker( + pending_requests: &mut HashMap, + worker_name: &str, + reason: &'static str, +) -> usize { + let failed = worker_request::fail_for_worker(pending_requests, worker_name); + for (req_id, kind) in &failed { + tracing::warn!( + target = "agent_relay::broker", + request_id = %req_id, + worker = %worker_name, + kind = %kind, + reason = reason, + "failed pending worker request because worker is gone" + ); + } + failed.len() +} + fn should_clear_pending_delivery_for_event( pending: Option<&PendingDelivery>, event_id: Option<&str>, diff --git a/src/worker_request.rs b/src/worker_request.rs index 8633eab55..84c82adb5 100644 --- a/src/worker_request.rs +++ b/src/worker_request.rs @@ -61,6 +61,14 @@ pub(crate) enum RequestWorkerError { #[error("{code}: {message}")] WorkerError { code: String, message: String }, + /// The worker exited (cleanly or otherwise) while the broker was + /// waiting for its response. The carried `String` is the worker + /// name, for diagnostics. Mapped to HTTP 503 Service Unavailable + /// in [`listen_api::classify_error`] — the agent existed when the + /// request was sent but is no longer there to fulfil it. + #[error("worker_disappeared: worker '{0}' exited before responding")] + WorkerDisappeared(String), + /// The broker dropped the awaiter's `oneshot` before responding /// (shutdown race). Reserved for future use; today the API layer /// treats `oneshot::error::RecvError` from the broker channel as @@ -180,6 +188,42 @@ pub(crate) fn reap_expired( reaped } +/// Fail every pending request targeting `worker_name` immediately with +/// [`RequestWorkerError::WorkerDisappeared`]. Called from the broker's +/// worker-teardown paths (explicit release, `worker_exited` frame, +/// `reap_exited` sweep) so that in-flight HTTP callers don't have to +/// wait out the full request deadline when a worker has clearly gone. +/// +/// Returns the `(request_id, kind)` pairs that were drained, for the +/// caller to emit structured logs. +pub(crate) fn fail_for_worker( + pending: &mut HashMap, + worker_name: &str, +) -> Vec<(String, String)> { + let doomed: Vec = pending + .iter() + .filter_map(|(req_id, entry)| { + if entry.worker_name == worker_name { + Some(req_id.clone()) + } else { + None + } + }) + .collect(); + + let mut failed = Vec::with_capacity(doomed.len()); + for req_id in doomed { + if let Some(entry) = pending.remove(&req_id) { + let kind = entry.kind.clone(); + let _ = entry.reply.send(Err(RequestWorkerError::WorkerDisappeared( + worker_name.to_string(), + ))); + failed.push((req_id, kind)); + } + } + failed +} + #[cfg(test)] mod tests { use super::*; @@ -205,6 +249,50 @@ mod tests { ) } + #[tokio::test] + async fn fail_for_worker_drains_only_matching_entries() { + let mut pending: HashMap = HashMap::new(); + let deadline = Instant::now() + Duration::from_secs(60); + + let (entry_a, rx_a) = make_entry("snapshot_pty", "alice", deadline); + let (entry_b1, rx_b1) = make_entry("mode_get", "bob", deadline); + let (entry_b2, rx_b2) = make_entry("snapshot_pty", "bob", deadline); + let (entry_c, rx_c) = make_entry("snapshot_pty", "carol", deadline); + pending.insert("req-a".to_string(), entry_a); + pending.insert("req-b1".to_string(), entry_b1); + pending.insert("req-b2".to_string(), entry_b2); + pending.insert("req-c".to_string(), entry_c); + + let failed = fail_for_worker(&mut pending, "bob"); + + // Both of bob's entries were drained — neither alice nor carol. + assert_eq!(failed.len(), 2); + assert_eq!(pending.len(), 2); + assert!(pending.contains_key("req-a")); + assert!(pending.contains_key("req-c")); + assert!(!pending.contains_key("req-b1")); + assert!(!pending.contains_key("req-b2")); + + // Each drained awaiter received WorkerDisappeared carrying the name. + let err_b1 = rx_b1.await.expect("awaiter receives").expect_err("error"); + let err_b2 = rx_b2.await.expect("awaiter receives").expect_err("error"); + match (&err_b1, &err_b2) { + ( + RequestWorkerError::WorkerDisappeared(n1), + RequestWorkerError::WorkerDisappeared(n2), + ) => { + assert_eq!(n1, "bob"); + assert_eq!(n2, "bob"); + } + other => panic!("expected WorkerDisappeared on both, got {other:?}"), + } + + // Untouched awaiters' oneshots stay open (we already asserted + // the map still contains their entries above). + drop(rx_a); + drop(rx_c); + } + #[tokio::test] async fn fulfil_response_frame_routes_payload_to_awaiter() { let mut pending: HashMap = HashMap::new();