diff --git a/src/helpers.rs b/src/helpers.rs index d2f7570f3..2e26ba9bb 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,6 +1,8 @@ use std::{ + collections::HashMap, ffi::OsStr, path::Path, + sync::atomic::{AtomicU64, Ordering}, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; @@ -990,6 +992,83 @@ pub(crate) fn is_auto_suggestion(output: &str) -> bool { has_cursor_ghost || has_send_hint } +/// Case-insensitive comparison for agent names. +/// +/// Agent names may have inconsistent casing across registration, WebSocket +/// events, and API responses. Centralising the comparison here prevents +/// recurrences of the case-sensitivity routing bugs seen in commits 64bcb2f7 +/// and PR #641. +pub(crate) fn agent_name_eq(a: &str, b: &str) -> bool { + a.eq_ignore_ascii_case(b) +} + +/// Check whether *any* of the `self_names` match `name` (case-insensitive). +pub(crate) fn is_self_name<'a, I>(self_names: I, name: &str) -> bool +where + I: IntoIterator, +{ + self_names.into_iter().any(|n| agent_name_eq(n, name)) +} + +static DM_DROPS_TOTAL: AtomicU64 = AtomicU64::new(0); + +/// Return the total number of DMs silently dropped due to participant +/// resolution failures. Useful for metrics / incident detection. +pub(crate) fn dm_drops_total() -> u64 { + DM_DROPS_TOTAL.load(Ordering::Relaxed) +} + +pub(crate) const DM_PARTICIPANT_CACHE_TTL: Duration = Duration::from_secs(30); +const MAX_DM_CACHE_ENTRIES: usize = 8192; + +pub(crate) async fn resolve_dm_participants_cached( + http: &relay_broker::relaycast_ws::RelaycastHttpClient, + cache: &mut HashMap)>, + workspace_id: &str, + conversation_id: &str, +) -> Vec { + let workspace_id = workspace_id.trim(); + let conversation_id = conversation_id.trim(); + if conversation_id.is_empty() { + return vec![]; + } + let cache_key = format!("{workspace_id}:{conversation_id}"); + + if let Some((fetched_at, participants)) = cache.get(&cache_key) { + if fetched_at.elapsed() < DM_PARTICIPANT_CACHE_TTL { + return participants.clone(); + } + } + + match http.get_dm_participants(conversation_id).await { + Ok(fetched) => { + let fetched: Vec = fetched; + if cache.len() >= MAX_DM_CACHE_ENTRIES { + if let Some(oldest_key) = cache + .iter() + .min_by_key(|(_, (ts, _))| *ts) + .map(|(k, _)| k.clone()) + { + cache.remove(&oldest_key); + } + } + cache.insert(cache_key, (Instant::now(), fetched.clone())); + fetched + } + Err(error) => { + DM_DROPS_TOTAL.fetch_add(1, Ordering::Relaxed); + tracing::warn!( + workspace_id = %workspace_id, + conversation_id = %conversation_id, + error = %error, + dm_drops_total = DM_DROPS_TOTAL.load(Ordering::Relaxed), + "failed resolving DM participants — DM silently dropped" + ); + vec![] + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -1867,4 +1946,48 @@ mod tests { assert_eq!(normalize_cli_name("/usr/local/bin/claude"), "claude"); assert_eq!(normalize_cli_name("codex"), "codex"); } + + // ==================== agent_name_eq / is_self_name tests ==================== + + #[test] + fn agent_name_eq_case_insensitive() { + assert!(agent_name_eq("Alice", "alice")); + assert!(agent_name_eq("alice", "ALICE")); + assert!(agent_name_eq("Worker-1", "worker-1")); + assert!(!agent_name_eq("Alice", "Bob")); + } + + #[test] + fn agent_name_eq_empty_strings() { + assert!(agent_name_eq("", "")); + assert!(!agent_name_eq("", "Alice")); + } + + #[test] + fn is_self_name_matches_any() { + let names = vec!["Alice".to_string(), "alice-dev".to_string()]; + assert!(is_self_name(&names, "alice")); + assert!(is_self_name(&names, "ALICE")); + assert!(is_self_name(&names, "Alice-Dev")); + assert!(!is_self_name(&names, "Bob")); + } + + #[test] + fn is_self_name_empty_list() { + let names: Vec = vec![]; + assert!(!is_self_name(&names, "Alice")); + } + + // ==================== DM participant cache tests ==================== + + #[test] + fn dm_cache_ttl_constant_is_reasonable() { + assert!(DM_PARTICIPANT_CACHE_TTL.as_secs() > 0); + assert!(DM_PARTICIPANT_CACHE_TTL.as_secs() <= 300); + } + + #[test] + fn dm_cache_eviction_cap_is_set() { + assert_eq!(MAX_DM_CACHE_ENTRIES, 8192); + } } diff --git a/src/main.rs b/src/main.rs index b7cf66ec2..a66ffb334 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,11 +15,11 @@ mod swarm_tui; mod wrap; use helpers::{ - detect_bypass_permissions_prompt, detect_claude_trust_prompt, detect_codex_model_prompt, - detect_gemini_action_required, detect_gemini_trust_prompt, detect_gemini_untrusted_banner, - detect_opencode_permission_prompt, floor_char_boundary, is_auto_suggestion, - is_bypass_selection_menu, is_in_editor_mode, normalize_cli_name, parse_cli_command, strip_ansi, - TerminalQueryParser, + agent_name_eq, detect_bypass_permissions_prompt, detect_claude_trust_prompt, + detect_codex_model_prompt, detect_gemini_action_required, detect_gemini_trust_prompt, + detect_gemini_untrusted_banner, detect_opencode_permission_prompt, floor_char_boundary, + is_auto_suggestion, is_bypass_selection_menu, is_in_editor_mode, is_self_name, + normalize_cli_name, parse_cli_command, strip_ansi, TerminalQueryParser, }; use listen_api::{broadcast_if_relevant, listen_api_router, ListenApiRequest}; use routing::display_target_for_dashboard; @@ -64,7 +64,7 @@ use spawner::{spawn_env_vars, terminate_child, Spawner}; const DEFAULT_DELIVERY_RETRY_MS: u64 = 1_000; const MAX_DELIVERY_RETRIES: u32 = 10; const DEFAULT_RELAYCAST_BASE_URL: &str = "https://api.relaycast.dev"; -const DM_PARTICIPANT_CACHE_TTL: Duration = Duration::from_secs(30); +use helpers::resolve_dm_participants_cached; const THREAD_HISTORY_LIMIT: usize = 1_000; const DEFAULT_HTTP_API_LOCAL_DELIVERY_TIMEOUT_MS: u64 = 3_000; const DEFAULT_HTTP_API_RELAYCAST_SEND_TIMEOUT_MS: u64 = 20_000; @@ -2963,7 +2963,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { if delivery_plan.needs_dm_resolution { let conversation_id = mapped.target.clone(); tracing::info!(conversation_id = %conversation_id, "resolving DM participants"); - let participants = resolve_dm_participants( + let participants = resolve_dm_participants_cached( &workspace_http, &mut dm_participants_cache, &workspace_id, @@ -2974,7 +2974,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { if let Some(participant) = participants .iter() - .find(|participant| !participant.eq_ignore_ascii_case(&mapped.from)) + .find(|participant| !agent_name_eq(participant, &mapped.from)) { delivery_plan.display_target = participant.clone(); } @@ -3003,9 +3003,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { let display_target = display_target_for_dashboard(&delivery_plan.display_target, &workspace_self_names, &workspace_self_name); - let display_from = if workspace_self_names - .iter() - .any(|name| mapped.from.eq_ignore_ascii_case(name)) + let display_from = if is_self_name(&workspace_self_names, &mapped.from) { workspace_self_name.clone() } else { @@ -5234,42 +5232,6 @@ async fn retry_pending_delivery( } } -async fn resolve_dm_participants( - relaycast_http: &RelaycastHttpClient, - dm_participants_cache: &mut HashMap)>, - workspace_id: &str, - conversation_id: &str, -) -> Vec { - let workspace_id = workspace_id.trim(); - let conversation_id = conversation_id.trim(); - if conversation_id.is_empty() { - return vec![]; - } - let cache_key = format!("{workspace_id}:{conversation_id}"); - - if let Some((fetched_at, participants)) = dm_participants_cache.get(&cache_key) { - if fetched_at.elapsed() < DM_PARTICIPANT_CACHE_TTL { - return participants.clone(); - } - } - - let fetched = relaycast_http - .get_dm_participants(conversation_id) - .await - .unwrap_or_else(|error| { - tracing::debug!( - workspace_id = %workspace_id, - conversation_id = %conversation_id, - error = %error, - "failed resolving DM participants" - ); - vec![] - }); - - dm_participants_cache.insert(cache_key, (Instant::now(), fetched.clone())); - fetched -} - fn drop_pending_for_worker( pending_deliveries: &mut HashMap, worker_name: &str, diff --git a/src/wrap.rs b/src/wrap.rs index 50d4d681c..5b3f9ea5b 100644 --- a/src/wrap.rs +++ b/src/wrap.rs @@ -3,7 +3,8 @@ use std::time::{Duration, Instant}; use super::*; use crate::helpers::{ - check_echo_in_output, floor_char_boundary, format_injection_for_worker_with_workspace, + agent_name_eq, check_echo_in_output, floor_char_boundary, + format_injection_for_worker_with_workspace, is_self_name, resolve_dm_participants_cached, ActivityDetector, DeliveryOutcome, PendingActivity, PendingVerification, ThrottleState, ACTIVITY_BUFFER_KEEP_BYTES, ACTIVITY_BUFFER_MAX_BYTES, ACTIVITY_WINDOW, MAX_VERIFICATION_ATTEMPTS, VERIFICATION_WINDOW, @@ -668,6 +669,7 @@ pub(crate) async fn run_wrap( // Dedup for WS events let mut dedup = DedupCache::new(Duration::from_secs(300), 8192); + let mut dm_participants_cache: HashMap)> = HashMap::new(); // Buffer for extracting message IDs from MCP tool responses in PTY output. // When the agent sends messages via MCP, the response contains the message ID. @@ -1016,16 +1018,27 @@ pub(crate) async fn run_wrap( &workspace_id, workspace_alias.as_deref(), ) { + // Skip presence and reaction events — they carry no content + // to inject and cause agents to respond to empty messages. + if matches!(mapped.kind, InboundKind::Presence | InboundKind::ReactionReceived) { + tracing::debug!( + kind = ?mapped.kind, + from = %mapped.from, + "skipping non-message event in wrap mode" + ); + continue; + } + let dedup_key = format!("{}:{}", mapped.workspace_id, mapped.event_id); if !dedup.insert_if_new(&dedup_key, Instant::now()) { tracing::debug!(event_id = %mapped.event_id, workspace_id = %mapped.workspace_id, "dedup: skipping relay event"); continue; } - if workspace_self_names.contains(&mapped.from) + if is_self_name(&workspace_self_names, &mapped.from) || mapped .sender_agent_id .as_ref() - .is_some_and(|id| workspace_self_agent_ids.contains(id)) + .is_some_and(|id| workspace_self_agent_ids.iter().any(|self_id| agent_name_eq(self_id, id))) { tracing::debug!( from = %mapped.from, @@ -1037,21 +1050,41 @@ pub(crate) async fn run_wrap( // DM routing: only deliver DMs addressed to this agent. // Channel messages (target starts with '#') are broadcast - // to all subscribers. Allow through: empty targets (presence), - // thread replies, conversation_id fallbacks. + // to all subscribers. Allow through: empty targets (presence) + // and thread replies. if !mapped.target.is_empty() && !mapped.target.starts_with('#') && mapped.target != "thread" - && !mapped.target.starts_with("dm_") - && !mapped.target.starts_with("conv_") - && !workspace_self_names.contains(&mapped.target) { - tracing::debug!( - target = %mapped.target, - self_names = ?workspace_self_names, - "skipping DM not addressed to this agent" - ); - continue; + if mapped.target.starts_with("dm_") || mapped.target.starts_with("conv_") { + // Conversation-ID target: resolve participants to check + // if this wrapped agent is part of the DM. + let participants = resolve_dm_participants_cached( + &workspace_child_http, + &mut dm_participants_cache, + &workspace_id, + &mapped.target, + ).await; + let is_participant = workspace_self_names.iter().any(|name| { + participants.iter().any(|p| agent_name_eq(p, name)) + }); + if !is_participant { + tracing::debug!( + target = %mapped.target, + participants = ?participants, + self_names = ?workspace_self_names, + "skipping DM — agent not in participants" + ); + continue; + } + } else if !is_self_name(&workspace_self_names, &mapped.target) { + tracing::debug!( + target = %mapped.target, + self_names = ?workspace_self_names, + "skipping DM not addressed to this agent" + ); + continue; + } } let delivery_id = format!("wrap_{}", mapped.event_id);