diff --git a/CHANGELOG.md b/CHANGELOG.md index 082a4728c..17a179e22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Broker inbound delivery now uses one per-agent queue so `auto_inject` and `manual_flush` handle ordering consistently. - Inbound delivery APIs use `/delivery-mode` with `auto_inject` and `manual_flush` names before their first release. - CLI attach commands share SDK-backed broker snapshots, delivery mode changes, streams, and flushes. - PTY terminal query replies use the live VT grid, so cursor-position responses reflect the actual screen. diff --git a/src/listen_api.rs b/src/listen_api.rs index d10f379b7..e3d1da4a3 100644 --- a/src/listen_api.rs +++ b/src/listen_api.rs @@ -152,7 +152,8 @@ pub enum ListenApiRequest { reply: tokio::sync::oneshot::Sender>, }, /// `GET /api/spawned/{name}/pending` — snapshot the per-worker - /// pending-message queue (FIFO, head first). + /// pending-message queue (FIFO, head first). Auto-inject workers usually + /// report an empty queue because they drain in the same broker turn. GetPending { name: String, reply: tokio::sync::oneshot::Sender, DeliveryRouteError>>, @@ -1165,11 +1166,11 @@ async fn listen_api_snapshot( } // --------------------------------------------------------------------------- -// Inbound delivery mode (per-agent inject vs. queue, plus pending-queue inspection) +// Inbound delivery mode (per-agent drain policy plus pending-queue inspection) // -// The broker keeps an `InboundDeliveryMode` per worker; `manual_flush` -// mode parks inbound relay messages in a FIFO `pending` queue instead -// of injecting them. +// The broker keeps an `InboundDeliveryMode` per worker. All inbound relay +// messages pass through a FIFO `pending` queue; `auto_inject` drains it +// immediately, while `manual_flush` parks messages until the caller flushes. // These four routes are the server-side surface the `agent-relay drive` // client calls to flip modes, inspect the queue, and drain it. // --------------------------------------------------------------------------- @@ -1256,8 +1257,8 @@ async fn listen_api_set_inbound_delivery_mode( } /// `GET /api/spawned/{name}/pending` → `{ "pending": [ ... ] }`, FIFO -/// (head of queue first). Empty array when the worker is not in -/// `manual_flush` delivery mode or simply has no pending messages. +/// (head of queue first). In `auto_inject` mode this is normally empty because +/// inbound messages drain in the same broker turn. async fn listen_api_get_pending( axum::extract::State(state): axum::extract::State, axum::extract::Path(name): axum::extract::Path, diff --git a/src/main.rs b/src/main.rs index 8fa84b50c..6bc6f03f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1943,15 +1943,15 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ); for worker_name in targets { - // Inbound-delivery gate: in `manual_flush` mode - // the broker parks the message in the per-worker - // pending queue instead of injecting, and counts - // it as delivered so the HTTP caller's ack + // Inbound-delivery queue: every inbound message + // enters the per-worker FIFO first. `auto_inject` + // drains immediately; `manual_flush` holds and + // counts as delivered so the HTTP caller's ack // semantics are unchanged. We pass the FULL - // routing context so the eventual drain reproduces - // the original delivery (channel/thread/workspace + // routing context so any drain reproduces the + // original delivery (channel/thread/workspace // /priority/mode), not a stripped-down DM. - match gate_inbound_for_delivery_mode( + match queue_inbound_for_delivery_mode( &mut delivery_states, &workers, &worker_name, @@ -1967,7 +1967,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { event_id: Some(&event_id), }, ) { - GateOutcome::Queued => { + InboundQueueOutcome::Queued => { delivered = delivered.saturating_add(1); tracing::info!( target = "relay_broker::http_api", @@ -1989,11 +1989,67 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; continue; } - GateOutcome::WorkerMissing => { + InboundQueueOutcome::DrainNow(to_drain) => { + for queued in to_drain { + let queued_event_id = + queued.event_id.as_deref().unwrap_or(""); + let is_current = + queued.event_id.as_deref() == Some(event_id.as_str()); + match timeout( + local_delivery_timeout, + try_inject_pending_relay_message( + &mut workers, + &mut pending_deliveries, + &worker_name, + &queued, + delivery_retry_interval, + ), + ) + .await + { + Ok(Ok(_)) => { + if is_current { + delivered = delivered.saturating_add(1); + } + } + Ok(Err(error)) => { + if is_current { + delivery_errors = + delivery_errors.saturating_add(1); + } + tracing::warn!( + target = "relay_broker::http_api", + + event_id = %queued_event_id, + to = %queued.target, + worker = %worker_name, + error = %error, + "local delivery attempt failed" + ); + } + Err(_) => { + if is_current { + delivery_errors = + delivery_errors.saturating_add(1); + } + tracing::warn!( + target = "relay_broker::http_api", + + event_id = %queued_event_id, + to = %queued.target, + worker = %worker_name, + timeout_ms = %local_delivery_timeout.as_millis(), + "local delivery attempt timed out" + ); + } + } + } + continue; + } + InboundQueueOutcome::WorkerMissing => { // Fall through so the standard // not-found accounting path runs. } - GateOutcome::Inject => {} } match timeout( local_delivery_timeout, @@ -3195,13 +3251,13 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { } for worker_name in delivery_plan.targets { - // Inbound-delivery gate: mirrors the /api/send - // gate above. Manual-flush workers see inbound - // relaycast messages parked in the pending queue - // rather than auto-injected; same full-context - // capture so drains reproduce the original delivery - // (channel/thread/workspace). - match gate_inbound_for_delivery_mode( + // Inbound-delivery queue: mirrors the /api/send + // queue above. Auto-inject workers drain the queue + // immediately; manual-flush workers leave relaycast + // messages parked until flush. The same full-context + // capture makes drains reproduce the original + // delivery (channel/thread/workspace). + match queue_inbound_for_delivery_mode( &mut delivery_states, &workers, &worker_name, @@ -3217,7 +3273,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { event_id: Some(&mapped.event_id), }, ) { - GateOutcome::Queued => { + InboundQueueOutcome::Queued => { tracing::info!( target = "agent_relay::broker", event_id = %mapped.event_id, @@ -3237,7 +3293,31 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; continue; } - GateOutcome::WorkerMissing | GateOutcome::Inject => {} + InboundQueueOutcome::DrainNow(to_drain) => { + for queued in to_drain { + if let Err(error) = try_inject_pending_relay_message( + &mut workers, + &mut pending_deliveries, + &worker_name, + &queued, + delivery_retry_interval, + ) + .await + { + let _ = send_error( + &sdk_out_tx, + None, + "delivery_failed", + error.to_string(), + true, + Some(json!({"worker": worker_name})), + ) + .await; + } + } + continue; + } + InboundQueueOutcome::WorkerMissing => {} } if let Err(error) = queue_and_try_delivery( &mut workers, @@ -4245,31 +4325,19 @@ fn build_agent_metrics(handle: &WorkerHandle) -> AgentMetrics { } } -/// Outcome of [`gate_inbound_for_delivery_mode`]. Distinguishes the -/// three cases broker call sites care about: continue with the existing -/// inject path, the message was queued (success — caller acks the -/// sender), or there's no worker (caller skips this target). +/// Outcome of [`queue_inbound_for_delivery_mode`]. Distinguishes the +/// three cases broker call sites care about: the message is queued and +/// should wait for an explicit flush, the queue should be drained now, +/// or there's no worker (caller falls through to existing target handling). #[derive(Debug, Clone, PartialEq, Eq)] -enum GateOutcome { - Inject, +enum InboundQueueOutcome { Queued, + DrainNow(Vec), WorkerMissing, } -/// Gate an inbound relay message through the per-worker [`InboundDeliveryMode`]. -/// -/// When the target worker is in [`InboundDeliveryMode::ManualFlush`] the message is -/// appended to the per-worker pending queue and the broker returns -/// [`GateOutcome::Queued`], signalling the caller to skip the existing -/// inject path. Otherwise the caller proceeds normally. -/// -/// Pulled out so the broker has one obvious choke point for the two -/// inbound paths (`/api/send` and the relaycast inbound feed) that the -/// `drive` client needs to intercept. Internal broker-driven injections -/// (`worker_ready` initial task, continuity restore) bypass the gate by -/// not calling this helper. -/// Bundle of routing context the gate captures into the pending queue -/// when a message is held. Mirrors the args `queue_and_try_delivery_raw` +/// Bundle of routing context captured into the pending queue. Mirrors the +/// args `queue_and_try_delivery_raw` /// expects so a drain reproduces the original delivery exactly — same /// target (channel / DM / thread sentinel), thread, workspace, /// priority, and injection mode. @@ -4285,19 +4353,29 @@ struct InboundContext<'a> { event_id: Option<&'a str>, } -fn gate_inbound_for_delivery_mode( +/// Queue an inbound relay message through the per-worker [`InboundDeliveryMode`]. +/// +/// Every inbound message is appended to the per-worker pending queue. In +/// [`InboundDeliveryMode::AutoInject`] the caller immediately drains the queue +/// in the same broker turn; in [`InboundDeliveryMode::ManualFlush`] the message +/// stays parked until an explicit flush or mode transition. +/// +/// Pulled out so the broker has one obvious choke point for the two +/// inbound paths (`/api/send` and the relaycast inbound feed) that the +/// `drive` client needs to intercept. Internal broker-driven injections +/// (`worker_ready` initial task, continuity restore) bypass this queue by +/// not calling this helper. +fn queue_inbound_for_delivery_mode( delivery_states: &mut HashMap, workers: &WorkerRegistry, worker_name: &str, ctx: InboundContext<'_>, -) -> GateOutcome { +) -> InboundQueueOutcome { if !workers.has_worker(worker_name) { - return GateOutcome::WorkerMissing; + return InboundQueueOutcome::WorkerMissing; } let state = delivery_states.entry(worker_name.to_string()).or_default(); - if state.mode == InboundDeliveryMode::AutoInject { - return GateOutcome::Inject; - } + let should_drain = state.should_drain_immediately(); let queued_at_ms = chrono::Utc::now().timestamp_millis().max(0) as u64; let msg = PendingRelayMessage { from: ctx.from.to_string(), @@ -4312,16 +4390,15 @@ fn gate_inbound_for_delivery_mode( event_id: ctx.event_id.map(str::to_string), }; match state.accept_inbound(msg) { - InboundDeliveryDispatch::Inject => GateOutcome::Inject, InboundDeliveryDispatch::Queued { queue_len } => { tracing::debug!( target = "agent_relay::broker", worker = %worker_name, from = %ctx.from, + mode = state.mode.as_wire_str(), queue_len, - "queued inbound relay message (manual_flush delivery mode)" + "queued inbound relay message" ); - GateOutcome::Queued } InboundDeliveryDispatch::QueuedEvicted { queue_len, @@ -4332,13 +4409,68 @@ fn gate_inbound_for_delivery_mode( worker = %worker_name, from = %ctx.from, dropped_from = %dropped_from, + mode = state.mode.as_wire_str(), queue_len, max_pending = relay_broker::types::MAX_PENDING_PER_WORKER, "pending queue full — evicting oldest message" ); - GateOutcome::Queued } } + if should_drain { + let to_drain = state.drain_pending(); + tracing::debug!( + target = "agent_relay::broker", + worker = %worker_name, + drained = to_drain.len(), + "draining inbound queue immediately (auto_inject delivery mode)" + ); + InboundQueueOutcome::DrainNow(to_drain) + } else { + InboundQueueOutcome::Queued + } +} + +async fn try_inject_pending_relay_message( + workers: &mut WorkerRegistry, + pending_deliveries: &mut HashMap, + worker_name: &str, + msg: &PendingRelayMessage, + retry_interval: Duration, +) -> Result<()> { + let event_id = msg + .event_id + .clone() + .unwrap_or_else(|| format!("flush_{}", Uuid::new_v4().simple())); + match timeout( + retry_interval, + queue_and_try_delivery_raw( + workers, + pending_deliveries, + worker_name, + &event_id, + &msg.from, + // Use the ORIGINAL routing target captured at queue time — + // `#general`, the DM recipient name, `"thread"`, etc. Falling + // back to `worker_name` here would silently reframe channel + // messages as direct-to-worker messages on drain. + &msg.target, + &msg.body, + msg.thread_id.clone(), + msg.workspace_id.clone(), + msg.workspace_alias.clone(), + msg.priority, + msg.mode.clone(), + retry_interval, + ), + ) + .await + { + Ok(result) => result, + Err(_) => Err(anyhow::anyhow!( + "pending relay delivery timed out after {}ms", + retry_interval.as_millis() + )), + } } /// Inject a previously-queued pending relay message into the worker via @@ -4354,27 +4486,12 @@ async fn inject_pending_relay_message( msg: &PendingRelayMessage, retry_interval: Duration, ) { - let event_id = msg - .event_id - .clone() - .unwrap_or_else(|| format!("flush_{}", Uuid::new_v4().simple())); - if let Err(error) = queue_and_try_delivery_raw( + let event_id = msg.event_id.as_deref().unwrap_or(""); + if let Err(error) = try_inject_pending_relay_message( workers, pending_deliveries, worker_name, - &event_id, - &msg.from, - // Use the ORIGINAL routing target captured at queue time — - // `#general`, the DM recipient name, `"thread"`, etc. Falling - // back to `worker_name` here would silently reframe channel - // messages as direct-to-worker messages on drain. - &msg.target, - &msg.body, - msg.thread_id.clone(), - msg.workspace_id.clone(), - msg.workspace_alias.clone(), - msg.priority, - msg.mode.clone(), + msg, retry_interval, ) .await @@ -5885,12 +6002,16 @@ mod worker_tests; mod tests { use std::{ collections::{BTreeSet, HashMap, HashSet}, + path::PathBuf, + process::Stdio, time::{Duration, Instant}, }; use crate::helpers::format_injection; - use relay_broker::protocol::{MessageInjectionMode, RelayDelivery}; + use crate::worker::{WorkerEvent, WorkerHandle, WorkerRegistry}; + use relay_broker::protocol::{AgentSpec, MessageInjectionMode, RelayDelivery}; use serde_json::{json, Value}; + use tokio::sync::mpsc; use super::{ build_agent_state_transition_event, build_http_api_spawn_spec, build_thread_infos, @@ -5900,16 +6021,172 @@ mod tests { http_api_local_delivery_timeout, http_api_relaycast_send_timeout, is_auto_suggestion, is_bypass_selection_menu, is_in_editor_mode, is_relaycast_self_control_target, is_unknown_worker_error_message, normalize_channel, normalize_initial_task, - normalize_sender, relaycast_spawn_control_dedup_key, relaycast_ws_control_dedup_key, - relaycast_ws_should_apply_local_spawn_echo_dedup, relaycast_ws_spawn_token, - sender_is_dashboard_label, should_clear_pending_delivery_for_event, strip_ansi, - AgentRuntime, PendingDelivery, ProtocolHeadlessProvider, + normalize_sender, queue_inbound_for_delivery_mode, relaycast_spawn_control_dedup_key, + relaycast_ws_control_dedup_key, relaycast_ws_should_apply_local_spawn_echo_dedup, + relaycast_ws_spawn_token, sender_is_dashboard_label, + should_clear_pending_delivery_for_event, strip_ansi, AgentRuntime, InboundContext, + InboundQueueOutcome, PendingDelivery, ProtocolHeadlessProvider, }; use crate::helpers::floor_char_boundary; use relay_broker::dedup::DedupCache; use relay_broker::relaycast_ws::{ format_worker_preregistration_error, RelaycastRegistrationError, }; + use relay_broker::types::{InboundDeliveryMode, InboundDeliveryState}; + + async fn make_worker_registry_with_worker(name: &str) -> WorkerRegistry { + let (tx, _rx) = mpsc::channel::(16); + let mut registry = WorkerRegistry::new( + tx, + Vec::new(), + PathBuf::from("/tmp/agent-relay-broker-tests"), + Instant::now(), + ); + let mut child = tokio::process::Command::new("cat") + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .expect("test worker process should spawn"); + let stdin = child.stdin.take().expect("test worker stdin should exist"); + registry.workers.insert( + name.to_string(), + WorkerHandle { + spec: AgentSpec { + name: name.to_string(), + runtime: AgentRuntime::Pty, + provider: None, + cli: Some("cat".to_string()), + model: None, + cwd: None, + team: None, + shadow_of: None, + shadow_mode: None, + args: Vec::new(), + channels: Vec::new(), + restart_policy: None, + }, + parent: None, + workspace_id: Some("ws_demo".to_string()), + child, + stdin, + spawned_at: Instant::now(), + }, + ); + registry + } + + async fn cleanup_worker_registry(mut registry: WorkerRegistry) { + for handle in registry.workers.values_mut() { + let _ = handle.child.start_kill(); + let _ = handle.child.wait().await; + } + } + + fn inbound_ctx<'a>(event_id: &'a str) -> InboundContext<'a> { + InboundContext { + from: "Alice", + body: "hello from relay", + target: "#general", + thread_id: Some("thr_123"), + workspace_id: Some("ws_demo"), + workspace_alias: Some("Demo"), + priority: 1, + mode: MessageInjectionMode::Steer, + event_id: Some(event_id), + } + } + + #[tokio::test] + async fn inbound_queue_auto_inject_drains_immediately_with_full_context() { + let worker_name = "worker-a"; + let workers = make_worker_registry_with_worker(worker_name).await; + let mut delivery_states = HashMap::new(); + + let outcome = queue_inbound_for_delivery_mode( + &mut delivery_states, + &workers, + worker_name, + inbound_ctx("evt_auto"), + ); + + match outcome { + InboundQueueOutcome::DrainNow(messages) => { + assert_eq!(messages.len(), 1); + let msg = &messages[0]; + assert_eq!(msg.from, "Alice"); + assert_eq!(msg.body, "hello from relay"); + assert_eq!(msg.target, "#general"); + assert_eq!(msg.thread_id.as_deref(), Some("thr_123")); + assert_eq!(msg.workspace_id.as_deref(), Some("ws_demo")); + assert_eq!(msg.workspace_alias.as_deref(), Some("Demo")); + assert_eq!(msg.priority, 1); + assert_eq!(msg.mode, MessageInjectionMode::Steer); + assert_eq!(msg.event_id.as_deref(), Some("evt_auto")); + } + other => panic!("expected immediate drain, got {other:?}"), + } + assert_eq!( + delivery_states + .get(worker_name) + .expect("state should be created") + .pending_snapshot(), + Vec::new(), + "auto_inject drains the per-worker pending queue in the same broker turn" + ); + + cleanup_worker_registry(workers).await; + } + + #[tokio::test] + async fn inbound_queue_manual_flush_holds_until_explicit_drain() { + let worker_name = "worker-a"; + let workers = make_worker_registry_with_worker(worker_name).await; + let mut delivery_states = HashMap::from([( + worker_name.to_string(), + InboundDeliveryState::new(InboundDeliveryMode::ManualFlush), + )]); + + let outcome = queue_inbound_for_delivery_mode( + &mut delivery_states, + &workers, + worker_name, + inbound_ctx("evt_manual"), + ); + + assert_eq!(outcome, InboundQueueOutcome::Queued); + let snapshot = delivery_states + .get(worker_name) + .expect("manual state should remain present") + .pending_snapshot(); + assert_eq!(snapshot.len(), 1); + assert_eq!(snapshot[0].event_id.as_deref(), Some("evt_manual")); + assert_eq!(snapshot[0].target, "#general"); + + cleanup_worker_registry(workers).await; + } + + #[tokio::test] + async fn inbound_queue_worker_missing_does_not_create_state() { + let (tx, _rx) = mpsc::channel::(16); + let workers = WorkerRegistry::new( + tx, + Vec::new(), + PathBuf::from("/tmp/agent-relay-broker-tests"), + Instant::now(), + ); + let mut delivery_states = HashMap::new(); + + let outcome = queue_inbound_for_delivery_mode( + &mut delivery_states, + &workers, + "ghost", + inbound_ctx("evt_missing"), + ); + + assert_eq!(outcome, InboundQueueOutcome::WorkerMissing); + assert!(delivery_states.is_empty()); + } fn extract_kind_literals(source: &str) -> BTreeSet { let marker = "\"kind\""; diff --git a/src/types.rs b/src/types.rs index 782f0f18d..4ee20451e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -3,13 +3,12 @@ use serde::{Deserialize, Serialize}; use crate::protocol::MessageInjectionMode; /// Per-worker inbound delivery mode controlling how inbound relay messages are -/// dispatched into the wrapped agent's PTY. +/// drained from the broker-owned pending queue into the wrapped agent's PTY. /// -/// - [`InboundDeliveryMode::AutoInject`] (default) injects inbound messages -/// directly into the worker. The user's own keystrokes may also arrive -/// through an attached relay session, so both writers can race. -/// - [`InboundDeliveryMode::ManualFlush`] holds inbound messages in a -/// per-worker pending queue so a client can decide when to flush them. +/// - [`InboundDeliveryMode::AutoInject`] (default) queues inbound messages and +/// drains the queue immediately in the same broker turn. +/// - [`InboundDeliveryMode::ManualFlush`] holds queued inbound messages until a +/// client explicitly flushes them or switches back to auto-inject. /// /// Mode is broker-side state only; the worker process does not observe it. /// It resets to [`InboundDeliveryMode::AutoInject`] on broker restart — there @@ -17,7 +16,7 @@ use crate::protocol::MessageInjectionMode; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum InboundDeliveryMode { - /// Inbound messages auto-inject into the worker's PTY. + /// Inbound messages queue and immediately drain into the worker's PTY. #[default] AutoInject, /// Inbound messages append to the per-worker pending queue and wait @@ -42,10 +41,10 @@ impl InboundDeliveryMode { } } -/// A relay message that arrived while a worker was in -/// [`InboundDeliveryMode::ManualFlush`] and therefore got parked in the -/// per-worker pending queue instead of being injected. Drained in FIFO order -/// by `POST /api/spawned/{name}/flush` or the auto-drain on a +/// A relay message captured in the per-worker pending queue before delivery. +/// [`InboundDeliveryMode::AutoInject`] drains these messages immediately; +/// [`InboundDeliveryMode::ManualFlush`] leaves them parked until +/// `POST /api/spawned/{name}/flush` or the auto-drain on a /// `manual_flush → auto_inject` mode transition. /// /// The full delivery context is captured at queue time so a drain @@ -205,11 +204,10 @@ pub struct InjectRequest { } /// Per-worker inbound delivery bookkeeping owned by the broker. Tracks the -/// current [`InboundDeliveryMode`] plus the FIFO pending queue for messages -/// captured while in [`InboundDeliveryMode::ManualFlush`]. The broker keeps one of -/// these per spawned worker in a parallel `HashMap` -/// so the existing `WorkerHandle` (which holds OS-level process state) -/// doesn't have to grow. +/// current [`InboundDeliveryMode`] plus the FIFO pending queue every inbound +/// relay message passes through. The broker keeps one of these per spawned +/// worker in a parallel `HashMap` so the existing +/// `WorkerHandle` (which holds OS-level process state) doesn't have to grow. #[derive(Debug, Default)] pub struct InboundDeliveryState { pub mode: InboundDeliveryMode, @@ -221,20 +219,16 @@ pub struct InboundDeliveryState { /// with a `tracing::warn!` (see [`InboundDeliveryState::push_pending`]). pub const MAX_PENDING_PER_WORKER: usize = 256; -/// Outcome of dispatching one inbound relay message through the delivery -/// gate. Returned by [`InboundDeliveryState::accept_inbound`] so the broker can +/// Outcome of appending one inbound relay message to the pending queue. +/// Returned by [`InboundDeliveryState::accept_inbound`] so the broker can /// log + telemetry consistently. #[derive(Debug, Clone, PartialEq, Eq)] pub enum InboundDeliveryDispatch { - /// Worker is in [`InboundDeliveryMode::AutoInject`]; the broker should run - /// the existing inject path. - Inject, - /// Worker is in [`InboundDeliveryMode::ManualFlush`]; the message was queued. - /// `queue_len` is the queue size *after* the push. + /// The message was queued. `queue_len` is the queue size *after* the push. Queued { queue_len: usize }, - /// Worker is in [`InboundDeliveryMode::ManualFlush`] but the queue was full, so - /// the oldest entry was evicted to make room. `queue_len` is the - /// queue size *after* the eviction + push (always equal to the cap). + /// The queue was full, so the oldest entry was evicted to make room. + /// `queue_len` is the queue size *after* the eviction + push (always equal + /// to the cap). QueuedEvicted { queue_len: usize, dropped_from: String, @@ -263,30 +257,28 @@ impl InboundDeliveryState { evicted_from } - /// Gate an inbound relay message through the current inbound delivery mode. + /// Append an inbound relay message to the pending queue. /// - /// In [`InboundDeliveryMode::AutoInject`] the message is *not* enqueued; - /// the caller runs the existing inject path. In - /// [`InboundDeliveryMode::ManualFlush`] the message is appended (with FIFO - /// eviction at the cap) and the caller acks the sender without touching the - /// worker's PTY. + /// The current [`InboundDeliveryMode`] is a drain policy, not an admission + /// gate: callers should drain immediately in [`InboundDeliveryMode::AutoInject`] + /// and leave the queue intact in [`InboundDeliveryMode::ManualFlush`]. pub fn accept_inbound(&mut self, msg: PendingRelayMessage) -> InboundDeliveryDispatch { - match self.mode { - InboundDeliveryMode::AutoInject => InboundDeliveryDispatch::Inject, - InboundDeliveryMode::ManualFlush => { - let evicted = self.push_pending(msg); - let queue_len = self.pending.len(); - match evicted { - Some(dropped_from) => InboundDeliveryDispatch::QueuedEvicted { - queue_len, - dropped_from, - }, - None => InboundDeliveryDispatch::Queued { queue_len }, - } - } + let evicted = self.push_pending(msg); + let queue_len = self.pending.len(); + match evicted { + Some(dropped_from) => InboundDeliveryDispatch::QueuedEvicted { + queue_len, + dropped_from, + }, + None => InboundDeliveryDispatch::Queued { queue_len }, } } + /// Whether queued inbound messages should be drained immediately. + pub fn should_drain_immediately(&self) -> bool { + matches!(self.mode, InboundDeliveryMode::AutoInject) + } + /// Drain the pending queue in FIFO order. Used by `POST /api/flush` /// and by the auto-drain that runs on a `manual_flush → auto_inject` /// transition. @@ -377,16 +369,21 @@ mod inbound_delivery_tests { } #[test] - fn auto_inject_mode_does_not_queue() { + fn auto_inject_mode_queues_for_immediate_drain() { let mut state = InboundDeliveryState::new(InboundDeliveryMode::AutoInject); let outcome = state.accept_inbound(msg("Alice", "hi")); - assert_eq!(outcome, InboundDeliveryDispatch::Inject); + assert_eq!(outcome, InboundDeliveryDispatch::Queued { queue_len: 1 }); + assert!(state.should_drain_immediately()); + let drained = state.drain_pending(); + assert_eq!(drained.len(), 1); + assert_eq!(drained[0].from, "Alice"); assert!(state.pending.is_empty()); } #[test] fn manual_flush_mode_queues_in_fifo_order() { let mut state = InboundDeliveryState::new(InboundDeliveryMode::ManualFlush); + assert!(!state.should_drain_immediately()); assert_eq!( state.accept_inbound(msg("Alice", "one")), InboundDeliveryDispatch::Queued { queue_len: 1 }