From c48e88488b9f92fc175b0c8d874a2ac26a3e1896 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Sun, 17 May 2026 23:17:47 -0400 Subject: [PATCH 1/2] feat(broker): per-agent session mode + pending-queue routes (#864 sub-2) Add server-side `SessionMode` (relay | human) per worker and four new HTTP routes that back the upcoming `agent-relay drive` client: GET/PUT `/api/spawned/{name}/mode`, GET `/api/spawned/{name}/pending`, POST `/api/spawned/{name}/flush`. When a worker is in human mode the broker parks inbound relay messages (both `/api/send` and inbound relaycast) in a per-worker FIFO pending queue (cap 256, FIFO eviction) instead of injecting them; mode flips back to relay auto-drain the queue. Part of #864. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/listen_api.rs | 586 +++++++++++++++++++++- src/main.rs | 294 ++++++++++- src/types.rs | 247 +++++++++ web/content/docs/reference-broker-api.mdx | 113 +++++ 4 files changed, 1236 insertions(+), 4 deletions(-) diff --git a/src/listen_api.rs b/src/listen_api.rs index aa6161906..43cd158f3 100644 --- a/src/listen_api.rs +++ b/src/listen_api.rs @@ -7,8 +7,10 @@ use std::time::{Duration, Instant}; use relay_broker::{ - multi_workspace::WorkspaceMembershipSummary, protocol::MessageInjectionMode, + multi_workspace::WorkspaceMembershipSummary, + protocol::MessageInjectionMode, replay_buffer::ReplayBuffer, + types::{PendingRelayMessage, SessionMode}, }; use serde::Deserialize; use serde_json::{json, Value}; @@ -134,6 +136,66 @@ pub enum ListenApiRequest { RenewLease { reply: tokio::sync::oneshot::Sender>, }, + /// `GET /api/spawned/{name}/mode` — read the current session mode. + GetSessionMode { + name: String, + reply: tokio::sync::oneshot::Sender>, + }, + /// `PUT /api/spawned/{name}/mode` — set the session mode. On a + /// `human → relay` transition the broker drains the pending queue + /// into the worker (via the existing inject path) before replying; + /// `flushed` reports how many messages were injected. + SetSessionMode { + name: String, + mode: SessionMode, + reply: tokio::sync::oneshot::Sender>, + }, + /// `GET /api/spawned/{name}/pending` — snapshot the per-worker + /// pending-message queue (FIFO, head first). + GetPending { + name: String, + reply: tokio::sync::oneshot::Sender, SessionRouteError>>, + }, + /// `POST /api/spawned/{name}/flush` — drain the pending queue and + /// inject every message into the worker via the existing + /// fire-and-forget inject path. Does *not* change the mode. + FlushPending { + name: String, + reply: tokio::sync::oneshot::Sender>, + }, +} + +/// Typed errors for the four session-mode HTTP routes added in #864 +/// sub-PR 2. Keeps the broker arm's reply payload structured so the +/// HTTP handler can map cleanly to 404 without parsing strings. The +/// "broker channel closed" / "reply dropped" failure modes are handled +/// at the HTTP boundary via [`internal_error`], so they don't need a +/// variant here. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SessionRouteError { + /// No worker with that name is currently registered with the broker. + WorkerNotFound(String), +} + +impl std::fmt::Display for SessionRouteError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SessionRouteError::WorkerNotFound(name) => { + write!(f, "agent_not_found: no worker named '{name}'") + } + } + } +} + +impl std::error::Error for SessionRouteError {} + +/// Reply payload for [`ListenApiRequest::SetSessionMode`]. `flushed` +/// is the number of pending messages drained during the transition +/// (always `0` unless we transitioned `human → relay`). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SetSessionModeOk { + pub mode: SessionMode, + pub flushed: usize, } #[derive(Debug, Clone, Deserialize)] @@ -274,6 +336,18 @@ fn listen_api_router_with_auth( "/api/spawned/{name}/snapshot", routing::get(listen_api_snapshot), ) + .route( + "/api/spawned/{name}/mode", + routing::get(listen_api_get_session_mode).put(listen_api_set_session_mode), + ) + .route( + "/api/spawned/{name}/pending", + routing::get(listen_api_get_pending), + ) + .route( + "/api/spawned/{name}/flush", + routing::post(listen_api_flush_pending), + ) .route("/api/metrics", routing::get(listen_api_metrics)) .route("/api/status", routing::get(listen_api_status)) .route( @@ -1089,6 +1163,190 @@ async fn listen_api_snapshot( } } +// --------------------------------------------------------------------------- +// Session mode (per-agent inject vs. queue, plus pending-queue inspection) +// +// Added in #864 sub-PR 2 to back the upcoming `agent-relay drive` client +// (sub-PR 3). The broker keeps a `SessionMode` per worker; `Human` mode +// parks inbound relay messages in a FIFO `pending` queue instead of +// injecting them. These four routes are the server-side surface the +// `drive` client will call. +// --------------------------------------------------------------------------- + +/// `GET /api/spawned/{name}/mode` → `{ "mode": "relay" | "human" }`. +async fn listen_api_get_session_mode( + axum::extract::State(state): axum::extract::State, + axum::extract::Path(name): axum::extract::Path, +) -> (axum::http::StatusCode, axum::Json) { + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + if state + .tx + .send(ListenApiRequest::GetSessionMode { + name: name.clone(), + reply: reply_tx, + }) + .await + .is_err() + { + return internal_error(); + } + match reply_rx.await { + Ok(Ok(mode)) => ( + axum::http::StatusCode::OK, + axum::Json(json!({ "mode": mode.as_wire_str() })), + ), + Ok(Err(err)) => session_route_error_to_response(&err), + Err(_) => internal_error(), + } +} + +#[derive(Debug, Deserialize)] +struct SetSessionModePayload { + mode: String, +} + +/// `PUT /api/spawned/{name}/mode` — body `{ "mode": "relay" | "human" }`. +/// +/// On a `human → relay` transition the broker drains the pending queue +/// into the worker via the existing inject path *before* replying, so a +/// caller flipping back to relay never strands messages. The response +/// reports `flushed` (always `0` unless we drained). +async fn listen_api_set_session_mode( + axum::extract::State(state): axum::extract::State, + axum::extract::Path(name): axum::extract::Path, + axum::Json(body): axum::Json, +) -> (axum::http::StatusCode, axum::Json) { + let Some(mode) = SessionMode::parse(&body.mode) else { + return api_error( + axum::http::StatusCode::BAD_REQUEST, + "invalid_mode", + format!( + "unsupported session mode '{}' (expected 'relay' or 'human')", + body.mode + ), + ); + }; + + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + if state + .tx + .send(ListenApiRequest::SetSessionMode { + name: name.clone(), + mode, + reply: reply_tx, + }) + .await + .is_err() + { + return internal_error(); + } + match reply_rx.await { + Ok(Ok(ok)) => ( + axum::http::StatusCode::OK, + axum::Json(json!({ + "mode": ok.mode.as_wire_str(), + "flushed": ok.flushed, + })), + ), + Ok(Err(err)) => session_route_error_to_response(&err), + Err(_) => internal_error(), + } +} + +/// `GET /api/spawned/{name}/pending` → `{ "pending": [ ... ] }`, FIFO +/// (head of queue first). Empty array when the worker is in relay mode +/// or simply has no pending messages. +async fn listen_api_get_pending( + axum::extract::State(state): axum::extract::State, + axum::extract::Path(name): axum::extract::Path, +) -> (axum::http::StatusCode, axum::Json) { + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + if state + .tx + .send(ListenApiRequest::GetPending { + name: name.clone(), + reply: reply_tx, + }) + .await + .is_err() + { + return internal_error(); + } + match reply_rx.await { + Ok(Ok(messages)) => { + let pending: Vec = messages + .into_iter() + .map(|m| { + let mut payload = json!({ + "from": m.from, + "body": m.body, + "queued_at_ms": m.queued_at_ms, + }); + if let Some(event_id) = m.event_id { + if let Some(obj) = payload.as_object_mut() { + obj.insert("event_id".to_string(), Value::String(event_id)); + } + } + payload + }) + .collect(); + ( + axum::http::StatusCode::OK, + axum::Json(json!({ "pending": pending })), + ) + } + Ok(Err(err)) => session_route_error_to_response(&err), + Err(_) => internal_error(), + } +} + +/// `POST /api/spawned/{name}/flush` → `{ "flushed": N }`. +/// +/// Drains the queue and injects each message into the worker in order +/// using the existing fire-and-forget inject path. The session mode is +/// *not* changed — a caller still in human mode will continue to queue +/// newly-arriving messages. +async fn listen_api_flush_pending( + axum::extract::State(state): axum::extract::State, + axum::extract::Path(name): axum::extract::Path, +) -> (axum::http::StatusCode, axum::Json) { + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + if state + .tx + .send(ListenApiRequest::FlushPending { + name: name.clone(), + reply: reply_tx, + }) + .await + .is_err() + { + return internal_error(); + } + match reply_rx.await { + Ok(Ok(flushed)) => ( + axum::http::StatusCode::OK, + axum::Json(json!({ "flushed": flushed })), + ), + Ok(Err(err)) => session_route_error_to_response(&err), + Err(_) => internal_error(), + } +} + +/// Centralised mapping from [`SessionRouteError`] to HTTP responses for +/// the four session-mode routes. Mirrors +/// [`worker_request_error_to_response`] in shape. +fn session_route_error_to_response( + err: &SessionRouteError, +) -> (axum::http::StatusCode, axum::Json) { + match err { + SessionRouteError::WorkerNotFound(_) => api_error( + axum::http::StatusCode::NOT_FOUND, + "agent_not_found", + err.to_string(), + ), + } +} + /// Map a [`RequestWorkerError`] to an HTTP response. Centralised so every /// route built on `WorkerRequest` produces consistent status codes. fn worker_request_error_to_response( @@ -1631,8 +1889,12 @@ mod auth_tests { use tokio::sync::{broadcast, mpsc}; use tower::ServiceExt; - use super::{listen_api_router_with_auth, ListenApiConfig, ListenApiRequest}; + use super::{ + listen_api_router_with_auth, ListenApiConfig, ListenApiRequest, SessionRouteError, + SetSessionModeOk, + }; use crate::worker_request::RequestWorkerError; + use relay_broker::types::{PendingRelayMessage, SessionMode}; fn test_router( broker_api_key: Option<&str>, @@ -2593,4 +2855,324 @@ mod auth_tests { assert_eq!(body["code"], json!("invalid_request")); replier.await.expect("replier should complete"); } + + // ----------------------------------------------------------------- + // Session mode (#864 sub-PR 2): four routes that back the upcoming + // `agent-relay drive` client. The HTTP layer only forwards typed + // requests over the broker channel — these tests cover the + // request shaping and response mapping, not the broker arms (those + // live in main.rs and are exercised by the broker integration tests). + // ----------------------------------------------------------------- + + #[tokio::test] + async fn get_session_mode_route_returns_mode_string() { + let (router, mut rx) = test_router(Some("secret")); + let replier = tokio::spawn(async move { + match rx.recv().await { + Some(ListenApiRequest::GetSessionMode { name, reply }) => { + assert_eq!(name, "worker-a"); + let _ = reply.send(Ok(SessionMode::Human)); + } + other => panic!("unexpected request: {:?}", other.map(|_| "other")), + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/worker-a/mode") + .method("GET") + .header("x-api-key", "secret") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::OK); + let body = response_json(response).await; + assert_eq!(body, json!({ "mode": "human" })); + replier.await.expect("replier should complete"); + } + + #[tokio::test] + async fn get_session_mode_route_returns_404_when_worker_missing() { + let (router, mut rx) = test_router(Some("secret")); + let replier = tokio::spawn(async move { + if let Some(ListenApiRequest::GetSessionMode { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(SessionRouteError::WorkerNotFound("ghost".into()))); + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/ghost/mode") + .method("GET") + .header("x-api-key", "secret") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + let body = response_json(response).await; + assert_eq!(body["code"], json!("agent_not_found")); + replier.await.expect("replier should complete"); + } + + #[tokio::test] + async fn set_session_mode_route_forwards_parsed_mode_and_returns_flushed() { + let (router, mut rx) = test_router(Some("secret")); + let replier = tokio::spawn(async move { + match rx.recv().await { + Some(ListenApiRequest::SetSessionMode { name, mode, reply }) => { + assert_eq!(name, "worker-a"); + assert_eq!(mode, SessionMode::Relay); + let _ = reply.send(Ok(SetSessionModeOk { + mode: SessionMode::Relay, + flushed: 3, + })); + } + other => panic!("unexpected request: {:?}", other.map(|_| "other")), + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/worker-a/mode") + .method("PUT") + .header("x-api-key", "secret") + .header("content-type", "application/json") + .body(Body::from(json!({ "mode": "relay" }).to_string())) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::OK); + let body = response_json(response).await; + assert_eq!(body, json!({ "mode": "relay", "flushed": 3 })); + replier.await.expect("replier should complete"); + } + + #[tokio::test] + async fn set_session_mode_route_rejects_invalid_mode_without_calling_broker() { + let (router, mut rx) = test_router(Some("secret")); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/worker-a/mode") + .method("PUT") + .header("x-api-key", "secret") + .header("content-type", "application/json") + .body(Body::from(json!({ "mode": "drive" }).to_string())) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + let body = response_json(response).await; + assert_eq!(body["code"], json!("invalid_mode")); + assert!( + rx.try_recv().is_err(), + "invalid mode should not enqueue request" + ); + } + + #[tokio::test] + async fn set_session_mode_route_returns_404_when_worker_missing() { + let (router, mut rx) = test_router(Some("secret")); + let replier = tokio::spawn(async move { + if let Some(ListenApiRequest::SetSessionMode { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(SessionRouteError::WorkerNotFound("ghost".into()))); + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/ghost/mode") + .method("PUT") + .header("x-api-key", "secret") + .header("content-type", "application/json") + .body(Body::from(json!({ "mode": "human" }).to_string())) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + let body = response_json(response).await; + assert_eq!(body["code"], json!("agent_not_found")); + replier.await.expect("replier should complete"); + } + + #[tokio::test] + async fn get_pending_route_returns_fifo_list_with_event_id() { + let (router, mut rx) = test_router(Some("secret")); + let replier = tokio::spawn(async move { + match rx.recv().await { + Some(ListenApiRequest::GetPending { name, reply }) => { + assert_eq!(name, "worker-a"); + let _ = reply.send(Ok(vec![ + PendingRelayMessage { + from: "Alice".to_string(), + body: "one".to_string(), + queued_at_ms: 100, + event_id: Some("evt_1".to_string()), + }, + PendingRelayMessage { + from: "Bob".to_string(), + body: "two".to_string(), + queued_at_ms: 200, + event_id: None, + }, + ])); + } + other => panic!("unexpected request: {:?}", other.map(|_| "other")), + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/worker-a/pending") + .method("GET") + .header("x-api-key", "secret") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::OK); + let body = response_json(response).await; + assert_eq!(body["pending"].as_array().expect("array").len(), 2); + assert_eq!(body["pending"][0]["from"], json!("Alice")); + assert_eq!(body["pending"][0]["body"], json!("one")); + assert_eq!(body["pending"][0]["queued_at_ms"], json!(100)); + assert_eq!(body["pending"][0]["event_id"], json!("evt_1")); + assert_eq!(body["pending"][1]["from"], json!("Bob")); + assert_eq!(body["pending"][1].get("event_id"), None); + replier.await.expect("replier should complete"); + } + + #[tokio::test] + async fn get_pending_route_returns_404_when_worker_missing() { + let (router, mut rx) = test_router(Some("secret")); + let replier = tokio::spawn(async move { + if let Some(ListenApiRequest::GetPending { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(SessionRouteError::WorkerNotFound("ghost".into()))); + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/ghost/pending") + .method("GET") + .header("x-api-key", "secret") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + let body = response_json(response).await; + assert_eq!(body["code"], json!("agent_not_found")); + replier.await.expect("replier should complete"); + } + + #[tokio::test] + async fn flush_route_returns_flushed_count() { + let (router, mut rx) = test_router(Some("secret")); + let replier = tokio::spawn(async move { + match rx.recv().await { + Some(ListenApiRequest::FlushPending { name, reply }) => { + assert_eq!(name, "worker-a"); + let _ = reply.send(Ok(5)); + } + other => panic!("unexpected request: {:?}", other.map(|_| "other")), + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/worker-a/flush") + .method("POST") + .header("x-api-key", "secret") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::OK); + let body = response_json(response).await; + assert_eq!(body, json!({ "flushed": 5 })); + replier.await.expect("replier should complete"); + } + + #[tokio::test] + async fn flush_route_returns_404_when_worker_missing() { + let (router, mut rx) = test_router(Some("secret")); + let replier = tokio::spawn(async move { + if let Some(ListenApiRequest::FlushPending { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(SessionRouteError::WorkerNotFound("ghost".into()))); + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/ghost/flush") + .method("POST") + .header("x-api-key", "secret") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + let body = response_json(response).await; + assert_eq!(body["code"], json!("agent_not_found")); + replier.await.expect("replier should complete"); + } + + #[tokio::test] + async fn session_mode_routes_require_auth() { + let (router, _rx) = test_router(Some("secret")); + for (method, path) in [ + ("GET", "/api/spawned/worker-a/mode"), + ("PUT", "/api/spawned/worker-a/mode"), + ("GET", "/api/spawned/worker-a/pending"), + ("POST", "/api/spawned/worker-a/flush"), + ] { + let response = router + .clone() + .oneshot( + Request::builder() + .uri(path) + .method(method) + .header("content-type", "application/json") + .body(Body::from(json!({ "mode": "relay" }).to_string())) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + assert_eq!( + response.status(), + StatusCode::UNAUTHORIZED, + "{method} {path} should require auth" + ); + } + } } diff --git a/src/main.rs b/src/main.rs index 54face2ce..af1750d3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,7 +28,10 @@ use helpers::{ is_auto_suggestion, is_bypass_selection_menu, is_in_editor_mode, is_self_name, normalize_cli_name, parse_cli_command, strip_ansi, TerminalQueryParser, }; -use listen_api::{broadcast_if_relevant, listen_api_router, ListenApiConfig, ListenApiRequest}; +use listen_api::{ + broadcast_if_relevant, listen_api_router, ListenApiConfig, ListenApiRequest, SessionRouteError, + SetSessionModeOk, +}; use routing::display_target_for_dashboard; use anyhow::{Context, Result}; @@ -61,7 +64,10 @@ use relay_broker::{ replay_buffer::{ReplayBuffer, DEFAULT_REPLAY_CAPACITY}, snippets::ensure_relaycast_mcp_config, telemetry::{ActionSource, TelemetryClient, TelemetryEvent}, - types::{BrokerCommandEvent, BrokerCommandPayload, InboundKind, SenderKind}, + types::{ + BrokerCommandEvent, BrokerCommandPayload, InboundKind, PendingRelayMessage, SenderKind, + SessionDispatch, SessionMode, SessionState, + }, }; use spawner::{spawn_env_vars, Spawner}; @@ -1405,6 +1411,14 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { // #864) can reuse the same correlation infrastructure — see // `crate::worker_request`. let mut pending_requests: HashMap = HashMap::new(); + // Per-worker session-mode + pending-relay-message queue. Lives + // parallel to `workers.workers` so we can swap modes / inspect / + // drain without touching `WorkerHandle` (which holds OS-level + // process state). See #864 sub-PR 2 and `relay_broker::types:: + // SessionState`. Entries are created lazily on first lookup and + // removed wherever workers exit (`Release` arm, `worker_exited` + // frame, `reap_exited` sweep). + let mut session_states: HashMap = HashMap::new(); let mut dm_participants_cache: HashMap)> = HashMap::new(); let mut recent_thread_messages: VecDeque = VecDeque::new(); if !pending_deliveries.is_empty() { @@ -1782,6 +1796,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, &name, "agent_released"); + session_states.remove(&name); state.agents.remove(&name); if paths.persist { let _ = state.save(&paths.state); } let _ = send_event( @@ -1930,6 +1945,36 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ); for worker_name in targets { + // Session-mode gate (#864 sub-PR 2): when the worker + // is in human mode the broker parks the message in + // the per-worker pending queue instead of injecting, + // and counts it as delivered so the HTTP caller's + // ack semantics are unchanged. + match gate_inbound_for_session_mode( + &mut session_states, + &workers, + &worker_name, + &delivery_from, + &text, + Some(&event_id), + ) { + GateOutcome::Queued => { + delivered = delivered.saturating_add(1); + tracing::info!( + target = "relay_broker::http_api", + event_id = %event_id, + to = %normalized_to, + worker = %worker_name, + "queued local delivery (human session mode)" + ); + continue; + } + GateOutcome::WorkerMissing => { + // Fall through to existing path so the + // pre-#864 not-found accounting runs. + } + GateOutcome::Inject => {} + } match timeout( local_delivery_timeout, queue_and_try_delivery_raw( @@ -2344,6 +2389,102 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { "channels": remaining, }))); } + ListenApiRequest::GetSessionMode { name, reply } => { + if !workers.has_worker(&name) { + let _ = reply.send(Err(SessionRouteError::WorkerNotFound(name))); + } else { + let mode = session_states + .get(&name) + .map(|s| s.mode) + .unwrap_or_default(); + let _ = reply.send(Ok(mode)); + } + } + ListenApiRequest::SetSessionMode { name, mode, reply } => { + if !workers.has_worker(&name) { + let _ = reply.send(Err(SessionRouteError::WorkerNotFound(name))); + } else { + let entry = session_states.entry(name.clone()).or_default(); + let previous = entry.mode; + entry.mode = mode; + let to_flush: Vec = + if previous == SessionMode::Human && mode == SessionMode::Relay + { + entry.drain_pending() + } else { + Vec::new() + }; + let flushed = to_flush.len(); + if !to_flush.is_empty() { + tracing::info!( + target = "agent_relay::broker", + worker = %name, + drained = flushed, + "draining pending queue on human → relay transition" + ); + } + for queued in to_flush { + inject_pending_relay_message( + &mut workers, + &mut pending_deliveries, + &name, + &queued, + delivery_retry_interval, + ) + .await; + } + tracing::info!( + target = "agent_relay::broker", + worker = %name, + previous_mode = previous.as_wire_str(), + mode = mode.as_wire_str(), + flushed, + "session mode updated" + ); + let _ = reply.send(Ok(SetSessionModeOk { mode, flushed })); + } + } + ListenApiRequest::GetPending { name, reply } => { + if !workers.has_worker(&name) { + let _ = reply.send(Err(SessionRouteError::WorkerNotFound(name))); + } else { + let snapshot = session_states + .get(&name) + .map(|s| s.pending_snapshot()) + .unwrap_or_default(); + let _ = reply.send(Ok(snapshot)); + } + } + ListenApiRequest::FlushPending { name, reply } => { + if !workers.has_worker(&name) { + let _ = reply.send(Err(SessionRouteError::WorkerNotFound(name))); + } else { + let to_flush: Vec = session_states + .get_mut(&name) + .map(|state| state.drain_pending()) + .unwrap_or_default(); + let flushed = to_flush.len(); + if flushed > 0 { + tracing::info!( + target = "agent_relay::broker", + worker = %name, + drained = flushed, + "flushing pending queue on explicit /flush" + ); + } + for queued in to_flush { + inject_pending_relay_message( + &mut workers, + &mut pending_deliveries, + &name, + &queued, + delivery_retry_interval, + ) + .await; + } + let _ = reply.send(Ok(flushed)); + } + } ListenApiRequest::Shutdown { reply } => { let _ = reply.send(Ok(json!({ "status": "shutting_down" }))); shutdown = true; @@ -2452,6 +2593,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, &name, "relaycast_release"); + session_states.remove(&name); telemetry.track(TelemetryEvent::AgentRelease { cli: String::new(), release_reason: "relaycast_release".to_string(), @@ -2999,6 +3141,29 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { } for worker_name in delivery_plan.targets { + // Session-mode gate (#864 sub-PR 2): mirrors the + // /api/send gate above. Human-mode workers see + // inbound relaycast messages parked in the + // pending queue rather than auto-injected. + match gate_inbound_for_session_mode( + &mut session_states, + &workers, + &worker_name, + &mapped.from, + &mapped.text, + Some(&mapped.event_id), + ) { + GateOutcome::Queued => { + tracing::info!( + target = "agent_relay::broker", + event_id = %mapped.event_id, + worker = %worker_name, + "queued inbound relay message (human session mode)" + ); + continue; + } + GateOutcome::WorkerMissing | GateOutcome::Inject => {} + } if let Err(error) = queue_and_try_delivery( &mut workers, &mut pending_deliveries, @@ -3496,6 +3661,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, &name, "worker_exited"); + session_states.remove(&name); let _ = send_event( &sdk_out_tx, json!({ @@ -3696,6 +3862,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, name, "worker_permanently_dead"); + session_states.remove(name); let _ = send_event( &sdk_out_tx, json!({"kind":"agent_permanently_dead","name":name,"reason":reason}), @@ -3736,6 +3903,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, name, "worker_exited"); + session_states.remove(name); let _ = send_event( &sdk_out_tx, json!({"kind":"agent_exited","name":name,"code":code,"signal":signal}), @@ -4002,6 +4170,128 @@ fn build_agent_metrics(handle: &WorkerHandle) -> AgentMetrics { } } +/// Outcome of [`gate_inbound_for_session_mode`]. Distinguishes the +/// three cases broker call sites care about: continue with the existing +/// inject path, the message was queued (success — caller acks the +/// sender), or there's no worker (caller skips this target). All three +/// already match the pre-#864 behaviour for `relay` mode; only +/// `Queued` is new. +#[derive(Debug, Clone, PartialEq, Eq)] +enum GateOutcome { + Inject, + Queued, + WorkerMissing, +} + +/// Gate an inbound relay message through the per-worker [`SessionMode`]. +/// +/// When the target worker is in [`SessionMode::Human`] the message is +/// appended to the per-worker pending queue and the broker returns +/// [`GateOutcome::Queued`], signalling the caller to skip the existing +/// inject path. Otherwise the caller proceeds normally. +/// +/// Pulled out so the broker has one obvious choke point for the two +/// inbound paths (`/api/send` and the relaycast inbound feed) that the +/// `drive` client (sub-PR 3) needs to intercept. Internal broker-driven +/// injections (`worker_ready` initial task, continuity restore) bypass +/// the gate by not calling this helper. +fn gate_inbound_for_session_mode( + session_states: &mut HashMap, + workers: &WorkerRegistry, + worker_name: &str, + from: &str, + body: &str, + event_id: Option<&str>, +) -> GateOutcome { + if !workers.has_worker(worker_name) { + return GateOutcome::WorkerMissing; + } + let state = session_states.entry(worker_name.to_string()).or_default(); + if state.mode == SessionMode::Relay { + return GateOutcome::Inject; + } + let queued_at_ms = chrono::Utc::now().timestamp_millis().max(0) as u64; + let msg = PendingRelayMessage { + from: from.to_string(), + body: body.to_string(), + queued_at_ms, + event_id: event_id.map(str::to_string), + }; + match state.accept_inbound(msg) { + SessionDispatch::Inject => GateOutcome::Inject, + SessionDispatch::Queued { queue_len } => { + tracing::debug!( + target = "agent_relay::broker", + worker = %worker_name, + from = %from, + queue_len, + "queued inbound relay message (human mode)" + ); + GateOutcome::Queued + } + SessionDispatch::QueuedEvicted { + queue_len, + dropped_from, + } => { + tracing::warn!( + target = "agent_relay::broker", + worker = %worker_name, + from = %from, + dropped_from = %dropped_from, + queue_len, + max_pending = relay_broker::types::MAX_PENDING_PER_WORKER, + "pending queue full — evicting oldest message" + ); + GateOutcome::Queued + } + } +} + +/// Inject a previously-queued pending relay message into the worker via +/// the existing `queue_and_try_delivery_raw` path. Used by the +/// `/api/spawned/{name}/flush` handler and by the auto-drain on a +/// `human → relay` transition. Failures are logged but not propagated — +/// the broker treats `flush` as best-effort fire-and-forget the same +/// way `/api/send` does for individual targets. +async fn inject_pending_relay_message( + workers: &mut WorkerRegistry, + pending_deliveries: &mut HashMap, + worker_name: &str, + msg: &PendingRelayMessage, + retry_interval: Duration, +) { + let event_id = msg + .event_id + .clone() + .unwrap_or_else(|| format!("flush_{}", Uuid::new_v4().simple())); + if let Err(error) = queue_and_try_delivery_raw( + workers, + pending_deliveries, + worker_name, + &event_id, + &msg.from, + worker_name, + &msg.body, + None, + None, + None, + 2, + MessageInjectionMode::Wait, + retry_interval, + ) + .await + { + tracing::warn!( + target = "agent_relay::broker", + worker = %worker_name, + from = %msg.from, + event_id = %event_id, + error = %error, + "failed to inject pending relay message during flush" + ); + } +} + async fn queue_and_try_delivery( workers: &mut WorkerRegistry, pending_deliveries: &mut HashMap, diff --git a/src/types.rs b/src/types.rs index 34dda695d..9af6a4319 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,5 +1,63 @@ use serde::{Deserialize, Serialize}; +/// Per-worker session mode controlling how inbound relay messages are +/// dispatched into the wrapped agent's PTY. +/// +/// - [`SessionMode::Relay`] (default) preserves the broker's pre-#864 +/// behaviour: inbound messages are injected directly into the worker. +/// - [`SessionMode::Human`] holds inbound messages in a per-worker pending +/// queue so a human-driven client (the `agent-relay drive` verb landing +/// in sub-PR 3 of #864) can decide when to flush them. +/// +/// Mode is broker-side state only; the worker process does not observe it. +/// It resets to [`SessionMode::Relay`] on broker restart — there is no +/// disk persistence in this PR. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SessionMode { + /// Inbound messages auto-inject into the worker's PTY. + #[default] + Relay, + /// Inbound messages append to the per-worker pending queue and wait + /// for an explicit flush. + Human, +} + +impl SessionMode { + pub fn as_wire_str(&self) -> &'static str { + match self { + SessionMode::Relay => "relay", + SessionMode::Human => "human", + } + } + + pub fn parse(value: &str) -> Option { + match value.trim().to_ascii_lowercase().as_str() { + "relay" => Some(SessionMode::Relay), + "human" => Some(SessionMode::Human), + _ => None, + } + } +} + +/// A relay message that arrived while a worker was in +/// [`SessionMode::Human`] and therefore got parked in the per-worker +/// pending queue instead of being injected. Drained in FIFO order by +/// `POST /api/spawned/{name}/flush` or the auto-drain on a +/// `human → relay` mode transition. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PendingRelayMessage { + pub from: String, + pub body: String, + /// Unix millis when the broker queued the message. Matches the + /// existing timestamp style elsewhere in this module. + pub queued_at_ms: u64, + /// Inbound event_id when the source carried one. Preserved for + /// telemetry / dedup parity with the auto-inject path. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_id: Option, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum RelayPriority { @@ -107,3 +165,192 @@ pub struct InjectRequest { pub priority: RelayPriority, pub attempts: u32, } + +/// Per-worker session bookkeeping owned by the broker. Tracks the +/// current [`SessionMode`] plus the FIFO pending queue for messages +/// captured while in [`SessionMode::Human`]. The broker keeps one of +/// these per spawned worker in a parallel `HashMap` +/// so the existing `WorkerHandle` (which holds OS-level process state) +/// doesn't have to grow. +#[derive(Debug, Default)] +pub struct SessionState { + pub mode: SessionMode, + pub pending: std::collections::VecDeque, +} + +/// Per-worker cap on the pending queue. Prevents unbounded growth when a +/// human-mode session is left open for hours; oldest message is evicted +/// with a `tracing::warn!` (see [`SessionState::push_pending`]). +pub const MAX_PENDING_PER_WORKER: usize = 256; + +/// Outcome of dispatching one inbound relay message through the session +/// gate. Returned by [`SessionState::accept_inbound`] so the broker can +/// log + telemetry consistently. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SessionDispatch { + /// Worker is in [`SessionMode::Relay`]; the broker should run the + /// existing inject path. + Inject, + /// Worker is in [`SessionMode::Human`]; the message was queued. + /// `queue_len` is the queue size *after* the push. + Queued { queue_len: usize }, + /// Worker is in [`SessionMode::Human`] but the queue was full, so + /// the oldest entry was evicted to make room. `queue_len` is the + /// queue size *after* the eviction + push (always equal to the cap). + QueuedEvicted { + queue_len: usize, + dropped_from: String, + }, +} + +impl SessionState { + pub fn new(mode: SessionMode) -> Self { + Self { + mode, + pending: std::collections::VecDeque::new(), + } + } + + /// Push a pending message, evicting the oldest entry when the + /// per-worker cap would be exceeded. Returns whether an eviction + /// happened plus the evicted message's `from` field (for logging). + fn push_pending(&mut self, msg: PendingRelayMessage) -> Option { + let mut evicted_from = None; + if self.pending.len() >= MAX_PENDING_PER_WORKER { + if let Some(dropped) = self.pending.pop_front() { + evicted_from = Some(dropped.from); + } + } + self.pending.push_back(msg); + evicted_from + } + + /// Gate an inbound relay message through the current session mode. + /// + /// In [`SessionMode::Relay`] the message is *not* enqueued; the + /// caller runs the existing inject path. In [`SessionMode::Human`] + /// the message is appended (with FIFO eviction at the cap) and the + /// caller acks the sender without touching the worker's PTY. + pub fn accept_inbound(&mut self, msg: PendingRelayMessage) -> SessionDispatch { + match self.mode { + SessionMode::Relay => SessionDispatch::Inject, + SessionMode::Human => { + let evicted = self.push_pending(msg); + let queue_len = self.pending.len(); + match evicted { + Some(dropped_from) => SessionDispatch::QueuedEvicted { + queue_len, + dropped_from, + }, + None => SessionDispatch::Queued { queue_len }, + } + } + } + } + + /// Drain the pending queue in FIFO order. Used by `POST /api/flush` + /// and by the auto-drain that runs on a `human → relay` transition. + pub fn drain_pending(&mut self) -> Vec { + self.pending.drain(..).collect() + } + + /// Snapshot the pending queue without modifying it. + pub fn pending_snapshot(&self) -> Vec { + self.pending.iter().cloned().collect() + } +} + +#[cfg(test)] +mod session_tests { + use super::*; + + fn msg(from: &str, body: &str) -> PendingRelayMessage { + PendingRelayMessage { + from: from.to_string(), + body: body.to_string(), + queued_at_ms: 0, + event_id: None, + } + } + + #[test] + fn default_mode_is_relay() { + let state = SessionState::default(); + assert_eq!(state.mode, SessionMode::Relay); + assert!(state.pending.is_empty()); + } + + #[test] + fn relay_mode_does_not_queue() { + let mut state = SessionState::new(SessionMode::Relay); + let outcome = state.accept_inbound(msg("Alice", "hi")); + assert_eq!(outcome, SessionDispatch::Inject); + assert!(state.pending.is_empty()); + } + + #[test] + fn human_mode_queues_in_fifo_order() { + let mut state = SessionState::new(SessionMode::Human); + assert_eq!( + state.accept_inbound(msg("Alice", "one")), + SessionDispatch::Queued { queue_len: 1 } + ); + assert_eq!( + state.accept_inbound(msg("Bob", "two")), + SessionDispatch::Queued { queue_len: 2 } + ); + let drained = state.drain_pending(); + assert_eq!(drained.len(), 2); + assert_eq!(drained[0].from, "Alice"); + assert_eq!(drained[0].body, "one"); + assert_eq!(drained[1].from, "Bob"); + assert!(state.pending.is_empty()); + } + + #[test] + fn human_mode_caps_queue_with_fifo_eviction() { + let mut state = SessionState::new(SessionMode::Human); + for i in 0..MAX_PENDING_PER_WORKER { + assert!(matches!( + state.accept_inbound(msg(&format!("u{i}"), "x")), + SessionDispatch::Queued { .. } + )); + } + // Cap reached — next push evicts the oldest ("u0"). + let outcome = state.accept_inbound(msg("overflow", "y")); + match outcome { + SessionDispatch::QueuedEvicted { + queue_len, + dropped_from, + } => { + assert_eq!(queue_len, MAX_PENDING_PER_WORKER); + assert_eq!(dropped_from, "u0"); + } + other => panic!("expected QueuedEvicted, got {other:?}"), + } + // Newest entry is at the tail; oldest surviving is "u1". + let drained = state.drain_pending(); + assert_eq!(drained.len(), MAX_PENDING_PER_WORKER); + assert_eq!(drained[0].from, "u1"); + assert_eq!(drained.last().expect("non-empty").from, "overflow"); + } + + #[test] + fn pending_snapshot_does_not_mutate() { + let mut state = SessionState::new(SessionMode::Human); + state.accept_inbound(msg("Alice", "hi")); + let snap = state.pending_snapshot(); + assert_eq!(snap.len(), 1); + assert_eq!(state.pending.len(), 1, "snapshot must not drain"); + } + + #[test] + fn parse_round_trips_wire_strings() { + assert_eq!(SessionMode::parse("relay"), Some(SessionMode::Relay)); + assert_eq!(SessionMode::parse("HUMAN"), Some(SessionMode::Human)); + assert_eq!(SessionMode::parse(" human "), Some(SessionMode::Human)); + assert_eq!(SessionMode::parse("drive"), None); + assert_eq!(SessionMode::Relay.as_wire_str(), "relay"); + assert_eq!(SessionMode::Human.as_wire_str(), "human"); + } +} diff --git a/web/content/docs/reference-broker-api.mdx b/web/content/docs/reference-broker-api.mdx index f036e8a81..eaf6773d9 100644 --- a/web/content/docs/reference-broker-api.mdx +++ b/web/content/docs/reference-broker-api.mdx @@ -233,6 +233,81 @@ The command prints the screen to stdout and exits 0 on success. On error (unknown worker, broker unreachable, invalid format) it prints a diagnostic to stderr and exits non-zero. +### Session mode + +Per-agent **session mode** controls how the broker dispatches inbound +relay messages to a spawned worker. Two modes are supported: + +- **`relay`** (default) — inbound messages auto-inject into the worker's + PTY, exactly as the broker has always behaved. Use this for headless + agents that should react to incoming traffic on their own. +- **`human`** — inbound messages are *queued* in a per-worker pending + buffer instead of being injected. A human operator (or the + `agent-relay drive` client) decides when to drain the queue. Useful + when you've taken over an agent's PTY interactively and don't want + background traffic racing your keystrokes. + +Mode is broker-side state only — the worker process never observes it. +Mode resets to `relay` when the broker restarts and the pending queue +is dropped (no on-disk persistence). + +| Method | Path | Purpose | +| ------ | --------------------------------- | -------------------------------------------------------------------------------- | +| `GET` | `/api/spawned/{name}/mode` | Read the current session mode. Returns `{ "mode": "relay" \| "human" }`. | +| `PUT` | `/api/spawned/{name}/mode` | Set the session mode. Body `{ "mode": "relay" \| "human" }`. | +| `GET` | `/api/spawned/{name}/pending` | Snapshot the per-worker pending queue (FIFO, head first). | +| `POST` | `/api/spawned/{name}/flush` | Drain the pending queue and inject every message into the worker. FIFO order. | + +#### `PUT /api/spawned/{name}/mode` + +```json +{ "mode": "human" } +``` + +On a `human → relay` transition the broker auto-drains the pending +queue into the worker (via the normal inject path) **before** returning, +so flipping back to `relay` never strands queued messages. The response +reports how many messages were flushed: + +```json +{ "mode": "relay", "flushed": 3 } +``` + +A `relay → human` flip or a same-mode noop returns `"flushed": 0`. + +Status codes: `200` on success, `400` on a body that isn't +`{ "mode": "relay" }` or `{ "mode": "human" }`, `404` if the agent is +not registered. + +#### `GET /api/spawned/{name}/pending` + +Returns the pending queue without modifying it. + +```json +{ + "pending": [ + { "from": "Bob", "body": "please review #837", "queued_at_ms": 1715812345678, "event_id": "evt_abc" }, + { "from": "Carol", "body": "ping", "queued_at_ms": 1715812360123 } + ] +} +``` + +`event_id` is included when the inbound message carried one (it is +omitted otherwise). The queue is bounded at 256 entries per worker — +when full, the oldest entry is evicted with a broker-side warning. + +#### `POST /api/spawned/{name}/flush` + +Drains the queue and injects each message into the worker in FIFO order. +The session mode is **not** changed; a caller still in `human` mode will +continue queuing newly-arriving messages. + +```json +{ "flushed": 7 } +``` + +Status: `200` on success, `404` if the agent is not registered. + ### Event stream | Method | Path | Purpose | @@ -330,6 +405,44 @@ curl -sX DELETE localhost:3888/api/spawned/Alice \ -H "X-API-Key: $KEY" ``` +## Worked example: take over an agent with session mode + +The four session-mode routes back the upcoming `agent-relay drive` +client. The typical drive-mode flow looks like this: + +```bash +KEY="$RELAY_BROKER_API_KEY" + +# 1. Flip Alice into human mode. Now inbound relay traffic +# will queue instead of injecting. +curl -sX PUT localhost:3888/api/spawned/Alice/mode \ + -H "X-API-Key: $KEY" \ + -d '{"mode":"human"}' + +# 2. Send some messages while Alice is in human mode — +# these land in her pending queue, not her PTY. +curl -sX POST localhost:3888/api/send \ + -H "X-API-Key: $KEY" \ + -d '{"to":"Alice","from":"Bob","message":"please review #837"}' +curl -sX POST localhost:3888/api/send \ + -H "X-API-Key: $KEY" \ + -d '{"to":"Alice","from":"Carol","message":"ping"}' + +# 3. Inspect what's queued. +curl -s localhost:3888/api/spawned/Alice/pending \ + -H "X-API-Key: $KEY" + +# 4. Drain the queue into Alice's PTY when ready. +curl -sX POST localhost:3888/api/spawned/Alice/flush \ + -H "X-API-Key: $KEY" + +# 5. Flip Alice back to relay mode. Any messages still in the +# queue are drained automatically before this call returns. +curl -sX PUT localhost:3888/api/spawned/Alice/mode \ + -H "X-API-Key: $KEY" \ + -d '{"mode":"relay"}' +``` + ## Error envelope Failed responses return a consistent envelope: From 38abde43521e7bbeaf81df6f604ce91336a5cafc Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Sun, 17 May 2026 23:34:26 -0400 Subject: [PATCH 2/2] broker(session): preserve routing metadata + add telemetry events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two parallel changes to PR #884, bundled because they touch the same queue + dispatch paths: (1) Preserve full routing metadata across the human-mode queue. P1 finding from review (cubic / Devin / CodeRabbit / Codex all agreed): `PendingRelayMessage` only stored `from` / `body` / `event_id`, so flushed messages were re-injected with `target` set to the worker's own name and no thread / workspace / priority / mode context. A `#general` message queued in human mode came back as a direct-to-worker DM on drain. Channel-aware agent logic, thread replies, and workspace attribution all broke silently. Extends `PendingRelayMessage` with `target`, `thread_id`, `workspace_id`, `workspace_alias`, `priority`, and `mode`. New `InboundContext<'_>` bundles the args into one call so both gate sites (`/api/send` and the inbound-relaycast feed) capture the same context the existing `queue_and_try_delivery_raw` would have seen at non-queued delivery time. `inject_pending_relay_message` now reads target / thread / workspace / priority / mode straight off the queued `PendingRelayMessage` — a drained message is byte-for-byte equivalent to the original delivery. `GET /api/spawned/{name}/pending` surfaces the new fields too; docs page (`web/content/docs/reference-broker-api.mdx`) updated with the expanded JSON shape. (2) Add telemetry events for mode change / pending drain / queue. Three new event kinds, all routed through the existing `send_event` path so they appear on `/ws` like every other broker event: - `delivery_queued` — per-message event when an inbound is parked in the human-mode queue instead of injected. Payload: `event_id`, `from`, `target`, `reason: "session_mode_human"`. - `agent_session_mode_changed` — fires when the mode flips via `PUT /api/spawned/{name}/mode`. Payload: `previous_mode`, `mode`. - `agent_pending_drained` — fires when the queue is drained (auto-drain on human → relay, or explicit `/flush`). Payload: `count`, `reason` (`mode_transition` | `explicit_flush`). (3) Bonus: serde round-trip test for `SessionMode`. Nit from CodeRabbit: keep `as_wire_str` / `parse` in sync with the `#[serde(rename_all = "snake_case")]` derive. New `session_mode_wire_format_matches_serde_round_trip` test guards against drift between the manual helpers and serde. Plus a direct regression test `pending_message_preserves_full_routing_context` that asserts the full struct round-trips through the queue unchanged. Test status: full broker suite **644 passed** (was 642 baseline before this commit, +2 net for the new tests). `cargo fmt --check` + `cargo clippy --all-targets --all-features -- -D warnings` clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/listen_api.rs | 50 +++++++- src/main.rs | 145 ++++++++++++++++++---- src/types.rs | 91 ++++++++++++++ web/content/docs/reference-broker-api.mdx | 37 +++++- 4 files changed, 293 insertions(+), 30 deletions(-) diff --git a/src/listen_api.rs b/src/listen_api.rs index 43cd158f3..a18bbec9b 100644 --- a/src/listen_api.rs +++ b/src/listen_api.rs @@ -1280,12 +1280,26 @@ async fn listen_api_get_pending( let mut payload = json!({ "from": m.from, "body": m.body, + "target": m.target, + "priority": m.priority, + "mode": m.mode, "queued_at_ms": m.queued_at_ms, }); + let obj = payload.as_object_mut().expect("payload object built above"); + if let Some(thread_id) = m.thread_id { + obj.insert("thread_id".to_string(), Value::String(thread_id)); + } + if let Some(workspace_id) = m.workspace_id { + obj.insert("workspace_id".to_string(), Value::String(workspace_id)); + } + if let Some(workspace_alias) = m.workspace_alias { + obj.insert( + "workspace_alias".to_string(), + Value::String(workspace_alias), + ); + } if let Some(event_id) = m.event_id { - if let Some(obj) = payload.as_object_mut() { - obj.insert("event_id".to_string(), Value::String(event_id)); - } + obj.insert("event_id".to_string(), Value::String(event_id)); } payload }) @@ -1894,6 +1908,7 @@ mod auth_tests { SetSessionModeOk, }; use crate::worker_request::RequestWorkerError; + use relay_broker::protocol::MessageInjectionMode; use relay_broker::types::{PendingRelayMessage, SessionMode}; fn test_router( @@ -3023,12 +3038,24 @@ mod auth_tests { PendingRelayMessage { from: "Alice".to_string(), body: "one".to_string(), + target: "#general".to_string(), + thread_id: Some("thr_42".to_string()), + workspace_id: Some("ws_demo".to_string()), + workspace_alias: Some("Demo".to_string()), + priority: 1, + mode: MessageInjectionMode::Steer, queued_at_ms: 100, event_id: Some("evt_1".to_string()), }, PendingRelayMessage { from: "Bob".to_string(), body: "two".to_string(), + target: "worker-a".to_string(), + thread_id: None, + workspace_id: None, + workspace_alias: None, + priority: 2, + mode: MessageInjectionMode::Wait, queued_at_ms: 200, event_id: None, }, @@ -3053,11 +3080,28 @@ mod auth_tests { assert_eq!(response.status(), StatusCode::OK); let body = response_json(response).await; assert_eq!(body["pending"].as_array().expect("array").len(), 2); + // First entry: channel-targeted, threaded, full workspace + // context, custom priority + mode — all surface in the JSON + // and round-trip via the snapshot serializer. assert_eq!(body["pending"][0]["from"], json!("Alice")); assert_eq!(body["pending"][0]["body"], json!("one")); + assert_eq!(body["pending"][0]["target"], json!("#general")); + assert_eq!(body["pending"][0]["thread_id"], json!("thr_42")); + assert_eq!(body["pending"][0]["workspace_id"], json!("ws_demo")); + assert_eq!(body["pending"][0]["workspace_alias"], json!("Demo")); + assert_eq!(body["pending"][0]["priority"], json!(1)); + assert_eq!(body["pending"][0]["mode"], json!("steer")); assert_eq!(body["pending"][0]["queued_at_ms"], json!(100)); assert_eq!(body["pending"][0]["event_id"], json!("evt_1")); + // Second entry: minimal context — optional fields stay absent + // from the JSON, defaults surface as concrete numbers/strings. assert_eq!(body["pending"][1]["from"], json!("Bob")); + assert_eq!(body["pending"][1]["target"], json!("worker-a")); + assert_eq!(body["pending"][1]["priority"], json!(2)); + assert_eq!(body["pending"][1]["mode"], json!("wait")); + assert_eq!(body["pending"][1].get("thread_id"), None); + assert_eq!(body["pending"][1].get("workspace_id"), None); + assert_eq!(body["pending"][1].get("workspace_alias"), None); assert_eq!(body["pending"][1].get("event_id"), None); replier.await.expect("replier should complete"); } diff --git a/src/main.rs b/src/main.rs index af1750d3e..ab1292c05 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1949,14 +1949,25 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { // is in human mode the broker parks the message in // the per-worker pending queue instead of injecting, // and counts it as delivered so the HTTP caller's - // ack semantics are unchanged. + // ack semantics are unchanged. We pass the FULL + // routing context so the eventual drain reproduces + // the original delivery (channel/thread/workspace + // /priority/mode), not a stripped-down DM. match gate_inbound_for_session_mode( &mut session_states, &workers, &worker_name, - &delivery_from, - &text, - Some(&event_id), + InboundContext { + from: &delivery_from, + body: &text, + target: &normalized_to, + thread_id: thread_id.as_deref(), + workspace_id: Some(selected_workspace_id.as_str()), + workspace_alias: selected_workspace_alias.as_deref(), + priority, + mode: mode.clone(), + event_id: Some(&event_id), + }, ) { GateOutcome::Queued => { delivered = delivered.saturating_add(1); @@ -1967,6 +1978,17 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { worker = %worker_name, "queued local delivery (human session mode)" ); + let _ = send_event( + &sdk_out_tx, + json!({ + "kind":"delivery_queued", + "name":&worker_name, + "event_id":&event_id, + "from":&delivery_from, + "target":&normalized_to, + "reason":"session_mode_human", + }), + ).await; continue; } GateOutcome::WorkerMissing => { @@ -2441,6 +2463,28 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { flushed, "session mode updated" ); + if previous != mode { + let _ = send_event( + &sdk_out_tx, + json!({ + "kind":"agent_session_mode_changed", + "name":&name, + "previous_mode":previous.as_wire_str(), + "mode":mode.as_wire_str(), + }), + ).await; + } + if flushed > 0 { + let _ = send_event( + &sdk_out_tx, + json!({ + "kind":"agent_pending_drained", + "name":&name, + "count":flushed, + "reason":"mode_transition", + }), + ).await; + } let _ = reply.send(Ok(SetSessionModeOk { mode, flushed })); } } @@ -2482,6 +2526,17 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ) .await; } + if flushed > 0 { + let _ = send_event( + &sdk_out_tx, + json!({ + "kind":"agent_pending_drained", + "name":&name, + "count":flushed, + "reason":"explicit_flush", + }), + ).await; + } let _ = reply.send(Ok(flushed)); } } @@ -3144,14 +3199,24 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { // Session-mode gate (#864 sub-PR 2): mirrors the // /api/send gate above. Human-mode workers see // inbound relaycast messages parked in the - // pending queue rather than auto-injected. + // pending queue rather than auto-injected; same + // full-context capture so drains reproduce the + // original delivery (channel/thread/workspace). match gate_inbound_for_session_mode( &mut session_states, &workers, &worker_name, - &mapped.from, - &mapped.text, - Some(&mapped.event_id), + InboundContext { + from: &mapped.from, + body: &mapped.text, + target: &mapped.target, + thread_id: mapped.thread_id.as_deref(), + workspace_id: Some(mapped.workspace_id.as_str()), + workspace_alias: mapped.workspace_alias.as_deref(), + priority: mapped.priority.as_u8(), + mode: MessageInjectionMode::Wait, + event_id: Some(&mapped.event_id), + }, ) { GateOutcome::Queued => { tracing::info!( @@ -3160,6 +3225,17 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { worker = %worker_name, "queued inbound relay message (human session mode)" ); + let _ = send_event( + &sdk_out_tx, + json!({ + "kind":"delivery_queued", + "name":&worker_name, + "event_id":&mapped.event_id, + "from":&mapped.from, + "target":&mapped.target, + "reason":"session_mode_human", + }), + ).await; continue; } GateOutcome::WorkerMissing | GateOutcome::Inject => {} @@ -4195,13 +4271,28 @@ enum GateOutcome { /// `drive` client (sub-PR 3) needs to intercept. Internal broker-driven /// injections (`worker_ready` initial task, continuity restore) bypass /// the gate by not calling this helper. +/// Bundle of routing context the gate captures into the pending queue +/// when a message is held. Mirrors the args `queue_and_try_delivery_raw` +/// expects so a drain reproduces the original delivery exactly — same +/// target (channel / DM / thread sentinel), thread, workspace, +/// priority, and injection mode. +struct InboundContext<'a> { + from: &'a str, + body: &'a str, + target: &'a str, + thread_id: Option<&'a str>, + workspace_id: Option<&'a str>, + workspace_alias: Option<&'a str>, + priority: u8, + mode: MessageInjectionMode, + event_id: Option<&'a str>, +} + fn gate_inbound_for_session_mode( session_states: &mut HashMap, workers: &WorkerRegistry, worker_name: &str, - from: &str, - body: &str, - event_id: Option<&str>, + ctx: InboundContext<'_>, ) -> GateOutcome { if !workers.has_worker(worker_name) { return GateOutcome::WorkerMissing; @@ -4212,10 +4303,16 @@ fn gate_inbound_for_session_mode( } let queued_at_ms = chrono::Utc::now().timestamp_millis().max(0) as u64; let msg = PendingRelayMessage { - from: from.to_string(), - body: body.to_string(), + from: ctx.from.to_string(), + body: ctx.body.to_string(), + target: ctx.target.to_string(), + thread_id: ctx.thread_id.map(str::to_string), + workspace_id: ctx.workspace_id.map(str::to_string), + workspace_alias: ctx.workspace_alias.map(str::to_string), + priority: ctx.priority, + mode: ctx.mode, queued_at_ms, - event_id: event_id.map(str::to_string), + event_id: ctx.event_id.map(str::to_string), }; match state.accept_inbound(msg) { SessionDispatch::Inject => GateOutcome::Inject, @@ -4223,7 +4320,7 @@ fn gate_inbound_for_session_mode( tracing::debug!( target = "agent_relay::broker", worker = %worker_name, - from = %from, + from = %ctx.from, queue_len, "queued inbound relay message (human mode)" ); @@ -4236,7 +4333,7 @@ fn gate_inbound_for_session_mode( tracing::warn!( target = "agent_relay::broker", worker = %worker_name, - from = %from, + from = %ctx.from, dropped_from = %dropped_from, queue_len, max_pending = relay_broker::types::MAX_PENDING_PER_WORKER, @@ -4270,13 +4367,17 @@ async fn inject_pending_relay_message( worker_name, &event_id, &msg.from, - worker_name, + // Use the ORIGINAL routing target captured at queue time — + // `#general`, the DM recipient name, `"thread"`, etc. Falling + // back to `worker_name` here would silently reframe channel + // messages as direct-to-worker messages on drain. + &msg.target, &msg.body, - None, - None, - None, - 2, - MessageInjectionMode::Wait, + msg.thread_id.clone(), + msg.workspace_id.clone(), + msg.workspace_alias.clone(), + msg.priority, + msg.mode.clone(), retry_interval, ) .await diff --git a/src/types.rs b/src/types.rs index 9af6a4319..9f9ce6d2f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; +use crate::protocol::MessageInjectionMode; + /// Per-worker session mode controlling how inbound relay messages are /// dispatched into the wrapped agent's PTY. /// @@ -45,10 +47,42 @@ impl SessionMode { /// pending queue instead of being injected. Drained in FIFO order by /// `POST /api/spawned/{name}/flush` or the auto-drain on a /// `human → relay` mode transition. +/// +/// The full delivery context is captured at queue time so a drain +/// later produces a byte-for-byte equivalent of the original delivery +/// — channel-targeted messages stay channel-targeted, threaded replies +/// stay threaded, workspace attribution survives, and the original +/// priority + injection mode are preserved. Without all of these a +/// flushed `#general` message would be re-injected as a direct +/// message to the worker (since `target` would fall back to the +/// worker's name), which would change agent behaviour silently. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PendingRelayMessage { pub from: String, pub body: String, + /// Original delivery target — channel (`#general`), DM recipient + /// name, or sentinel like `"thread"`. Used as the `target` arg to + /// `queue_and_try_delivery_raw` on drain so the re-injected + /// message matches the original routing. + pub target: String, + /// Original thread id, when the inbound was a thread reply. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub thread_id: Option, + /// Original workspace id, when known. Channel + DM routing both + /// depend on this; dropping it would attribute the flushed + /// message to the wrong workspace. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub workspace_id: Option, + /// Original workspace alias (display name), when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub workspace_alias: Option, + /// Original delivery priority. 0 = P0, …, 4 = P4. Defaults to 2 + /// (P2) when the source didn't carry a priority. + #[serde(default = "default_priority")] + pub priority: u8, + /// Original `wait` vs `steer` injection mode. + #[serde(default)] + pub mode: MessageInjectionMode, /// Unix millis when the broker queued the message. Matches the /// existing timestamp style elsewhere in this module. pub queued_at_ms: u64, @@ -58,6 +92,10 @@ pub struct PendingRelayMessage { pub event_id: Option, } +fn default_priority() -> u8 { + 2 +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum RelayPriority { @@ -268,11 +306,64 @@ mod session_tests { PendingRelayMessage { from: from.to_string(), body: body.to_string(), + // Target defaults to the sender's name in tests that don't + // care about routing — the gating logic only inspects mode + // / queue length, not the routing fields. + target: "worker".to_string(), + thread_id: None, + workspace_id: None, + workspace_alias: None, + priority: 2, + mode: MessageInjectionMode::Wait, queued_at_ms: 0, event_id: None, } } + #[test] + fn session_mode_wire_format_matches_serde_round_trip() { + // Guard against `as_wire_str` / `parse` drifting from the + // `#[serde(rename_all = "snake_case")]` representation. + for variant in [SessionMode::Relay, SessionMode::Human] { + let serialized = serde_json::to_string(&variant) + .expect("SessionMode serializes") + .trim_matches('"') + .to_string(); + assert_eq!(serialized, variant.as_wire_str()); + + let parsed = SessionMode::parse(&serialized).expect("wire form parses"); + assert_eq!(parsed, variant); + + let from_serde: SessionMode = + serde_json::from_str(&format!("\"{serialized}\"")).expect("serde round-trips"); + assert_eq!(from_serde, variant); + } + } + + #[test] + fn pending_message_preserves_full_routing_context() { + // Direct regression for the P1 review comment: a channel + // message queued and surfaced via the pending snapshot must + // round-trip its target / thread / workspace / priority / mode + // unchanged, so a drain re-injects with the original routing. + let queued = PendingRelayMessage { + from: "Bob".to_string(), + body: "ship it".to_string(), + target: "#general".to_string(), + thread_id: Some("thr_abc".to_string()), + workspace_id: Some("ws_demo".to_string()), + workspace_alias: Some("Demo".to_string()), + priority: 1, + mode: MessageInjectionMode::Steer, + queued_at_ms: 123_456, + event_id: Some("evt_xyz".to_string()), + }; + let mut state = SessionState::new(SessionMode::Human); + state.accept_inbound(queued.clone()); + let drained = state.drain_pending(); + assert_eq!(drained, vec![queued]); + } + #[test] fn default_mode_is_relay() { let state = SessionState::default(); diff --git a/web/content/docs/reference-broker-api.mdx b/web/content/docs/reference-broker-api.mdx index eaf6773d9..71f412c12 100644 --- a/web/content/docs/reference-broker-api.mdx +++ b/web/content/docs/reference-broker-api.mdx @@ -286,15 +286,39 @@ Returns the pending queue without modifying it. ```json { "pending": [ - { "from": "Bob", "body": "please review #837", "queued_at_ms": 1715812345678, "event_id": "evt_abc" }, - { "from": "Carol", "body": "ping", "queued_at_ms": 1715812360123 } + { + "from": "Bob", + "body": "please review #837", + "target": "#general", + "priority": 2, + "mode": "wait", + "thread_id": "thr_abc", + "workspace_id": "ws_demo", + "workspace_alias": "Demo", + "queued_at_ms": 1715812345678, + "event_id": "evt_abc" + }, + { + "from": "Carol", + "body": "ping", + "target": "Alice", + "priority": 2, + "mode": "wait", + "queued_at_ms": 1715812360123 + } ] } ``` -`event_id` is included when the inbound message carried one (it is -omitted otherwise). The queue is bounded at 256 entries per worker — -when full, the oldest entry is evicted with a broker-side warning. +Each entry preserves the full routing metadata captured at queue time +— `target` (`#channel`, DM recipient, or `"thread"`), `priority`, +`mode` (`wait` / `steer`), and optional `thread_id` / `workspace_id` +/ `workspace_alias` / `event_id` when the inbound carried them. A +later drain reproduces the original delivery byte-for-byte; a +channel-targeted message stays channel-targeted. + +The queue is bounded at 256 entries per worker — when full, the +oldest entry is evicted with a broker-side warning. #### `POST /api/spawned/{name}/flush` @@ -360,6 +384,9 @@ Durable (replayable via `?sinceSeq=...`): | `delivery_failed` | Message delivery failed. | | `delivery_dropped` | Delivery was dropped (e.g. agent gone). | | `delivery_retry` | Delivery is being retried. | +| `delivery_queued` | Inbound delivery parked in the per-worker pending queue because the worker is in `human` session mode. Payload carries `event_id`, `from`, `target`, and `reason: "session_mode_human"`. | +| `agent_session_mode_changed` | A worker's session mode flipped via `PUT /api/spawned/{name}/mode`. Payload carries `previous_mode` and `mode`. | +| `agent_pending_drained` | The per-worker pending queue was drained. Payload carries `count` and `reason` (`mode_transition` for the auto-drain on `human → relay`, `explicit_flush` for `POST .../flush`). | Ephemeral (broadcast only, no replay):