diff --git a/CHANGELOG.md b/CHANGELOG.md index f6c6a1b1e..b8dd8d088 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Release workflow changelog generation now writes concise Keep a Changelog sections and skips web-only, release-only, trajectory, PR-review, placeholder, and withdrawn-tag entries. +### Breaking Changes + +- `agent-relay-broker`'s public Rust protocol types now require typed ID newtypes (`WorkerName`, `DeliveryId`, `EventId`, `WorkspaceId`, `WorkspaceAlias`, `ThreadId`, `AgentId`, `RequestId`, `ChannelName`, `MessageTarget`) on every protocol struct and enum variant in `protocol.rs`, `types.rs`, and `listen_api.rs::ListenApiRequest`. The new wrappers live in `crates/broker/src/lib.rs` under `pub mod ids`. JSON wire format is unchanged because every wrapper is `#[serde(transparent)]`, so the broker ↔ SDK channel and on-disk persisted state remain byte-compatible. + +### Migration Guidance + +- Downstream Rust callers must construct identifiers via `relay_broker::ids::{WorkerName, DeliveryId, EventId, MessageTarget, …}` instead of `String`. Each newtype impls `From` / `From<&str>` and `Deref`, so most string-handling code keeps compiling; only construction sites (`HashMap` keys, struct literals, channel sends) need updates. +- Replace ad-hoc target discrimination (`target.starts_with('#')`, `target == "thread"`) with `MessageTarget::kind()` and match on `MessageTargetKind::{Channel, Thread, DirectMessage, Conversation, Worker}`. + ### Fixed - `web`: PR preview SST deploys use and comment the generated CloudFront URL and AWS's managed disabled cache policy instead of creating per-preview Cloudflare DNS records, ACM certificates, and custom CloudFront cache policies. diff --git a/crates/broker/src/broker.rs b/crates/broker/src/broker.rs index 20e502612..c81fcd92e 100644 --- a/crates/broker/src/broker.rs +++ b/crates/broker/src/broker.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, io::Write, path::Path}; use crate::{ + ids::{ChannelName, WorkerName}, protocol::{AgentRuntime, AgentSpec}, supervisor::RestartPolicy, }; @@ -26,14 +27,14 @@ pub(crate) fn is_pid_alive(pid: u32) -> bool { #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub(crate) struct BrokerState { - pub(crate) agents: HashMap, + pub(crate) agents: HashMap, } #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct PersistedAgent { pub(crate) runtime: AgentRuntime, pub(crate) parent: Option, - pub(crate) channels: Vec, + pub(crate) channels: Vec, #[serde(default)] pub(crate) pid: Option, #[serde(default)] @@ -72,8 +73,8 @@ impl BrokerState { /// Remove persisted agents whose PIDs are no longer alive. /// Returns the names of agents that were cleaned up. #[cfg(unix)] - pub(crate) fn reap_dead_agents(&mut self) -> Vec { - let dead: Vec = self + pub(crate) fn reap_dead_agents(&mut self) -> Vec { + let dead: Vec = self .agents .iter() .filter(|(_, agent)| { @@ -94,9 +95,9 @@ impl BrokerState { } #[cfg(not(unix))] - pub(crate) fn reap_dead_agents(&mut self) -> Vec { + pub(crate) fn reap_dead_agents(&mut self) -> Vec { // On non-Unix platforms, clear all agents without PID info - let dead: Vec = self + let dead: Vec = self .agents .iter() .filter(|(_, agent)| agent.pid.is_none()) diff --git a/crates/broker/src/broker/delivery_verification.rs b/crates/broker/src/broker/delivery_verification.rs index 6543f2274..a199aa384 100644 --- a/crates/broker/src/broker/delivery_verification.rs +++ b/crates/broker/src/broker/delivery_verification.rs @@ -2,7 +2,11 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use serde_json::{json, Value}; -use crate::{util::ansi::strip_ansi, worker::detection::ActivityDetector}; +use crate::{ + ids::{DeliveryId, EventId, MessageTarget, RequestId, WorkspaceAlias, WorkspaceId}, + util::ansi::strip_ansi, + worker::detection::ActivityDetector, +}; pub(crate) const ACTIVITY_WINDOW: Duration = Duration::from_secs(5); pub(crate) const ACTIVITY_BUFFER_MAX_BYTES: usize = 16_000; @@ -65,8 +69,8 @@ impl ThrottleState { #[derive(Debug, Clone)] pub(crate) struct PendingActivity { - pub delivery_id: String, - pub event_id: String, + pub delivery_id: DeliveryId, + pub event_id: EventId, pub expected_echo: String, pub verified_at: Instant, pub output_buffer: String, @@ -85,18 +89,18 @@ pub(crate) const VERIFICATION_WINDOW: std::time::Duration = std::time::Duration: /// A pending delivery waiting for echo verification in PTY output. #[derive(Debug)] pub(crate) struct PendingVerification { - pub delivery_id: String, - pub event_id: String, + pub delivery_id: DeliveryId, + pub event_id: EventId, pub expected_echo: String, pub injected_at: std::time::Instant, pub attempts: usize, pub max_attempts: usize, - pub request_id: Option, - pub workspace_id: Option, - pub workspace_alias: Option, + pub request_id: Option, + pub workspace_id: Option, + pub workspace_alias: Option, pub from: String, pub body: String, - pub target: String, + pub target: MessageTarget, } /// Check if the expected echo string appears in PTY output (after stripping ANSI). diff --git a/crates/broker/src/conversation_log.rs b/crates/broker/src/conversation_log.rs index 3acc79196..56779cde4 100644 --- a/crates/broker/src/conversation_log.rs +++ b/crates/broker/src/conversation_log.rs @@ -87,7 +87,7 @@ impl ConversationLog { let msg_type = match event.kind { crate::types::InboundKind::DmReceived => "DM".to_string(), crate::types::InboundKind::GroupDmReceived => "Group DM".to_string(), - crate::types::InboundKind::MessageCreated => event.target.clone(), + crate::types::InboundKind::MessageCreated => event.target.as_str().to_string(), crate::types::InboundKind::ThreadReply => format!( "Thread {}", short_id(event.thread_id.as_deref().unwrap_or("?")) diff --git a/crates/broker/src/ids.rs b/crates/broker/src/ids.rs new file mode 100644 index 000000000..5f534fcb7 --- /dev/null +++ b/crates/broker/src/ids.rs @@ -0,0 +1,315 @@ +//! Typed identifier newtypes for protocol fields. +//! +//! Each wrapper is `#[serde(transparent)]` so the JSON wire format is +//! identical to the previous bare-`String` form, which means the broker +//! ↔ SDK protocol on disk and over the wire is unchanged. +//! +//! The wrappers impl `Deref`, `Display`, `AsRef`, +//! `Borrow`, `From` / `From<&str>`, and `PartialEq` against +//! `str`/`&str`/`String` so existing call sites that treated these fields +//! as strings keep compiling unchanged. The point is not to force +//! ceremony at use sites — it's to prevent passing a `DeliveryId` where +//! an `EventId` was expected, and to make the meaning of overloaded +//! fields (`target` in particular) legible in the type system. + +use std::borrow::Borrow; +use std::ffi::OsStr; +use std::fmt; +use std::ops::Deref; + +use serde::{Deserialize, Serialize}; + +macro_rules! string_id { + ($(#[$meta:meta])* $name:ident) => { + $(#[$meta])* + #[derive(Debug, Clone, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] + #[serde(transparent)] + pub struct $name(pub String); + + impl $name { + #[inline] + pub fn new(s: impl Into) -> Self { + Self(s.into()) + } + + #[inline] + pub fn as_str(&self) -> &str { + &self.0 + } + + #[inline] + pub fn into_string(self) -> String { + self.0 + } + } + + impl From for $name { + fn from(s: String) -> Self { + Self(s) + } + } + + impl From<&str> for $name { + fn from(s: &str) -> Self { + Self(s.to_string()) + } + } + + impl From<&String> for $name { + fn from(s: &String) -> Self { + Self(s.clone()) + } + } + + impl From<$name> for String { + fn from(v: $name) -> Self { + v.0 + } + } + + impl AsRef for $name { + fn as_ref(&self) -> &str { + &self.0 + } + } + + impl AsRef for $name { + fn as_ref(&self) -> &OsStr { + OsStr::new(&self.0) + } + } + + impl Borrow for $name { + fn borrow(&self) -> &str { + &self.0 + } + } + + impl Deref for $name { + type Target = str; + fn deref(&self) -> &str { + &self.0 + } + } + + impl fmt::Display for $name { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } + } + + impl PartialEq for $name { + fn eq(&self, other: &str) -> bool { + self.0 == other + } + } + + impl PartialEq<&str> for $name { + fn eq(&self, other: &&str) -> bool { + self.0.as_str() == *other + } + } + + impl PartialEq for $name { + fn eq(&self, other: &String) -> bool { + &self.0 == other + } + } + + impl PartialEq<$name> for str { + fn eq(&self, other: &$name) -> bool { + self == other.0.as_str() + } + } + + impl PartialEq<$name> for &str { + fn eq(&self, other: &$name) -> bool { + *self == other.0.as_str() + } + } + + impl PartialEq<$name> for String { + fn eq(&self, other: &$name) -> bool { + self.as_str() == other.0.as_str() + } + } + }; +} + +string_id!( + /// Display name of a worker / spawned agent managed by this broker + /// (e.g. `"lead"`, `"reviewer-1"`). + WorkerName +); +string_id!( + /// Relaycast workspace identifier (e.g. `"ws_abc123"`). + WorkspaceId +); +string_id!( + /// Human-readable workspace alias for display. + WorkspaceAlias +); +string_id!( + /// Per-delivery identifier assigned by the broker when queueing an + /// inbound relay message for a worker. + DeliveryId +); +string_id!( + /// Inbound relay event identifier carried end-to-end for dedup, + /// telemetry, and ack matching. + EventId +); +string_id!( + /// Thread / conversation identifier used to scope replies. + ThreadId +); +string_id!( + /// Relaycast agent identifier (the API-server-side agent record id, + /// distinct from the local [`WorkerName`]). + AgentId +); +string_id!( + /// Per-request correlation identifier on the SDK ↔ broker protocol. + RequestId +); +string_id!( + /// Channel name as it appears in subscribe / unsubscribe payloads + /// and `AgentSpec::channels` — the raw identifier without the + /// leading `#` (e.g. `"general"`, `"ops"`). The `#`-prefixed form + /// is the [`MessageTarget`] convention for routing a message *to* + /// a channel, not the channel's own name. + ChannelName +); + +string_id!( + /// Destination of a relay message — overloaded at the string level + /// across channels (`"#general"`), the thread sentinel + /// (`"thread"`), DM / conversation identifiers (`"dm_..."`, + /// `"conv_..."`), or a bare worker name. + /// + /// Use [`MessageTarget::kind`] to dispatch exhaustively instead of + /// hand-rolling prefix checks. + MessageTarget +); + +/// Discriminated view of [`MessageTarget`]'s overloaded shape. +/// +/// The variants reflect the on-wire conventions used by relaycast and +/// the broker's routing layer. Match this rather than calling +/// `starts_with('#')` / `== "thread"` at every site. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MessageTargetKind<'a> { + /// `#name` channel target. The slice is the name without `#`. + Channel(&'a str), + /// The literal `"thread"` sentinel — a thread reply whose specific + /// recipient is resolved by the broker's thread-routing pass. + Thread, + /// A direct-message conversation identifier (`dm_*`). + DirectMessage(&'a str), + /// A group-DM / conversation identifier (`conv_*`). + Conversation(&'a str), + /// A bare worker display name. + Worker(&'a str), +} + +impl MessageTarget { + /// Classify the target string into its semantic shape. + pub fn kind(&self) -> MessageTargetKind<'_> { + let s = self.0.as_str(); + if let Some(channel) = s.strip_prefix('#') { + MessageTargetKind::Channel(channel) + } else if s == "thread" { + MessageTargetKind::Thread + } else if s.starts_with("dm_") { + MessageTargetKind::DirectMessage(s) + } else if s.starts_with("conv_") { + MessageTargetKind::Conversation(s) + } else { + MessageTargetKind::Worker(s) + } + } + + /// `true` when the target is a `#channel` broadcast. + pub fn is_channel(&self) -> bool { + self.0.starts_with('#') + } + + /// `true` when the target is the `"thread"` sentinel used for + /// thread-reply routing. + pub fn is_thread_sentinel(&self) -> bool { + self.0 == "thread" + } + + /// The thread sentinel value (`"thread"`). + pub fn thread_sentinel() -> Self { + Self("thread".to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serde_transparent_string_roundtrip() { + let w = WorkerName::new("lead"); + let json = serde_json::to_string(&w).unwrap(); + assert_eq!(json, r#""lead""#); + let back: WorkerName = serde_json::from_str(&json).unwrap(); + assert_eq!(back, w); + } + + #[test] + fn deref_and_eq_against_str_work() { + let id = DeliveryId::new("del_1"); + // Deref means &str methods are callable. + assert_eq!(id.len(), 5); + assert!(id.starts_with("del_")); + // PartialEq impls cover the natural comparison directions. + assert!(id == "del_1"); + assert!("del_1" == id); + } + + #[test] + fn message_target_classifies_channel_thread_dm_conv_and_worker() { + assert_eq!( + MessageTarget::new("#general").kind(), + MessageTargetKind::Channel("general") + ); + assert_eq!( + MessageTarget::new("thread").kind(), + MessageTargetKind::Thread + ); + assert_eq!( + MessageTarget::new("dm_abc").kind(), + MessageTargetKind::DirectMessage("dm_abc") + ); + assert_eq!( + MessageTarget::new("conv_xy").kind(), + MessageTargetKind::Conversation("conv_xy") + ); + assert_eq!( + MessageTarget::new("Lead").kind(), + MessageTargetKind::Worker("Lead") + ); + } + + #[test] + fn message_target_helpers_match_kind() { + let t = MessageTarget::new("#ops"); + assert!(t.is_channel()); + assert!(!t.is_thread_sentinel()); + + let t = MessageTarget::thread_sentinel(); + assert!(t.is_thread_sentinel()); + assert_eq!(t.kind(), MessageTargetKind::Thread); + } + + #[test] + fn hashmap_lookup_by_str_via_borrow() { + let mut m = std::collections::HashMap::::new(); + m.insert(WorkerName::new("lead"), 1); + // Borrow means we can look up by &str without allocating. + assert_eq!(m.get("lead").copied(), Some(1)); + } +} diff --git a/crates/broker/src/lib.rs b/crates/broker/src/lib.rs index d28733d15..6fd5c4176 100644 --- a/crates/broker/src/lib.rs +++ b/crates/broker/src/lib.rs @@ -1,3 +1,12 @@ +// `agent-relay-broker` is a binary-with-thin-lib crate: `main.rs` only calls +// `run_cli`, so every code path the binary exercises is reachable through this +// library. The `#[allow(dead_code)]` annotations below mark modules that +// currently carry unused items (constants, helpers, even whole types) — a +// follow-up cleanup will trim them. They are not a side effect of the +// binary/library split; each annotated module has at least one genuinely +// unused public-facing item that the compiler would otherwise warn about. + +pub mod ids; pub mod protocol; pub mod snippets; diff --git a/crates/broker/src/listen_api.rs b/crates/broker/src/listen_api.rs index 69a65963b..fe2d442a5 100644 --- a/crates/broker/src/listen_api.rs +++ b/crates/broker/src/listen_api.rs @@ -11,6 +11,7 @@ use std::{ }; use crate::{ + ids::{ChannelName, MessageTarget, ThreadId, WorkerName, WorkspaceAlias, WorkspaceId}, protocol::MessageInjectionMode, relaycast::WorkspaceMembershipSummary, replay_buffer::ReplayBuffer, @@ -35,16 +36,16 @@ type PtyInputSerializers = Arc, model: Option, args: Vec, task: Option, - channels: Vec, + channels: Vec, cwd: Option, team: Option, - shadow_of: Option, + shadow_of: Option, shadow_mode: Option, continue_from: Option, idle_threshold_secs: Option, @@ -55,13 +56,13 @@ pub enum ListenApiRequest { reply: tokio::sync::oneshot::Sender>, }, SetModel { - name: String, + name: WorkerName, model: String, timeout_ms: Option, reply: tokio::sync::oneshot::Sender>, }, Release { - name: String, + name: WorkerName, reason: Option, reply: tokio::sync::oneshot::Sender>, }, @@ -72,26 +73,26 @@ pub enum ListenApiRequest { reply: tokio::sync::oneshot::Sender>, }, Send { - to: String, + to: MessageTarget, text: String, from: Option, - thread_id: Option, - workspace_id: Option, - workspace_alias: Option, + thread_id: Option, + workspace_id: Option, + workspace_alias: Option, mode: MessageInjectionMode, reply: tokio::sync::oneshot::Sender>, }, SendInput { - name: String, + name: WorkerName, data: String, reply: tokio::sync::oneshot::Sender>, }, CheckPtyInputTarget { - name: String, + name: WorkerName, reply: tokio::sync::oneshot::Sender>, }, ResizePty { - name: String, + name: WorkerName, rows: u16, cols: u16, reply: tokio::sync::oneshot::Sender>, @@ -106,7 +107,7 @@ pub enum ListenApiRequest { /// Fire-and-forget routes (`send_input`, `resize_pty`) keep their /// existing single-arm channel pattern. WorkerRequest { - name: String, + name: WorkerName, /// Outbound frame `type`, e.g. `"snapshot_pty"`. The worker is /// expected to reply with `"{kind}_response"`. kind: String, @@ -119,7 +120,7 @@ pub enum ListenApiRequest { reply: tokio::sync::oneshot::Sender>, }, GetMetrics { - agent: Option, + agent: Option, reply: tokio::sync::oneshot::Sender>, }, GetStatus { @@ -133,13 +134,13 @@ pub enum ListenApiRequest { reply: tokio::sync::oneshot::Sender>, }, SubscribeChannels { - name: String, - channels: Vec, + name: WorkerName, + channels: Vec, reply: tokio::sync::oneshot::Sender>, }, UnsubscribeChannels { - name: String, - channels: Vec, + name: WorkerName, + channels: Vec, reply: tokio::sync::oneshot::Sender>, }, Shutdown { @@ -151,7 +152,7 @@ pub enum ListenApiRequest { /// `GET /api/spawned/{name}/delivery-mode` — read the current inbound /// delivery mode. GetInboundDeliveryMode { - name: String, + name: WorkerName, reply: tokio::sync::oneshot::Sender>, }, /// `PUT /api/spawned/{name}/delivery-mode` — set the inbound delivery mode. @@ -159,7 +160,7 @@ pub enum ListenApiRequest { /// queue into the worker (via the existing inject path) before /// replying; `flushed` reports how many messages were injected. SetInboundDeliveryMode { - name: String, + name: WorkerName, mode: InboundDeliveryMode, reply: tokio::sync::oneshot::Sender>, }, @@ -167,21 +168,21 @@ pub enum ListenApiRequest { /// pending-message queue (FIFO, head first). Auto-inject workers usually /// report an empty queue because they drain in the same broker turn. GetPending { - name: String, + name: WorkerName, reply: tokio::sync::oneshot::Sender, DeliveryRouteError>>, }, /// `POST /api/spawned/{name}/flush` — drain the pending queue and /// inject every message into the worker via the existing /// fire-and-forget inject path. Does *not* change the mode. FlushPending { - name: String, + name: WorkerName, reply: tokio::sync::oneshot::Sender>, }, /// `POST /api/agent-result` — accepts structured result payloads from the /// per-agent MCP tool using a callback token minted at spawn time. SubmitAgentResult { token: String, - name: Option, + name: Option, data: Value, final_result: bool, metadata: Option, @@ -197,7 +198,7 @@ pub enum ListenApiRequest { #[derive(Debug, Clone, PartialEq, Eq)] pub enum DeliveryRouteError { /// No worker with that name is currently registered with the broker. - WorkerNotFound(String), + WorkerNotFound(WorkerName), } impl std::fmt::Display for DeliveryRouteError { @@ -279,7 +280,7 @@ struct ListenApiState { /// relaycast.json or env var. workspace_key: Option, memberships: Vec, - default_workspace_id: Option, + default_workspace_id: Option, /// Broker version string (from Cargo.toml) broker_version: String, /// Whether the broker is in persist mode @@ -313,7 +314,7 @@ pub struct ListenApiConfig { pub replay_buffer: ReplayBuffer, pub workspace_key: Option, pub memberships: Vec, - pub default_workspace_id: Option, + pub default_workspace_id: Option, pub persist: bool, } @@ -430,7 +431,7 @@ fn listen_api_router_with_auth( // --------------------------------------------------------------------------- pub(crate) fn listen_api_health_payload( - default_workspace_id: Option, + default_workspace_id: Option, memberships: Vec, ) -> Value { let startup_error_code = std::env::var("AGENT_RELAY_STARTUP_ERROR_CODE").ok(); @@ -442,7 +443,7 @@ pub(crate) fn listen_api_health_payload( .first() .map(|membership| membership.workspace_id.clone()) }) - .unwrap_or_else(|| "ws_unknown".to_string()); + .unwrap_or_else(|| WorkspaceId::new("ws_unknown")); json!({ "status": status, @@ -670,16 +671,16 @@ async fn listen_api_spawn( if state .tx .send(ListenApiRequest::Spawn { - name: name.clone(), + name: WorkerName::new(name.clone()), cli, transport, model, args, task, - channels, + channels: channels.into_iter().map(ChannelName::from).collect(), cwd, team, - shadow_of, + shadow_of: shadow_of.map(WorkerName::from), shadow_mode, continue_from, idle_threshold_secs, @@ -753,7 +754,7 @@ async fn listen_api_set_model( if state .tx .send(ListenApiRequest::SetModel { - name: name.clone(), + name: WorkerName::new(name.clone()), model: model.clone(), timeout_ms: body.timeout_ms, reply: reply_tx, @@ -846,7 +847,7 @@ async fn listen_api_agent_result( .tx .send(ListenApiRequest::SubmitAgentResult { token, - name, + name: name.map(WorkerName::from), data, final_result, metadata, @@ -884,7 +885,7 @@ async fn listen_api_release( if state .tx .send(ListenApiRequest::Release { - name: name.clone(), + name: WorkerName::new(name.clone()), reason, reply: reply_tx, }) @@ -1016,12 +1017,12 @@ async fn listen_api_send( if state .tx .send(ListenApiRequest::Send { - to: to.clone(), + to: MessageTarget::new(to.clone()), text, from, - thread_id, - workspace_id, - workspace_alias, + thread_id: thread_id.map(ThreadId::from), + workspace_id: workspace_id.map(WorkspaceId::from), + workspace_alias: workspace_alias.map(WorkspaceAlias::from), mode, reply: reply_tx, }) @@ -1331,7 +1332,7 @@ async fn check_pty_input_target( ) -> Result { let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); tx.send(ListenApiRequest::CheckPtyInputTarget { - name: name.to_string(), + name: WorkerName::from(name), reply: reply_tx, }) .await @@ -1348,7 +1349,7 @@ async fn send_pty_input_frame( ) -> Result { let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); tx.send(ListenApiRequest::SendInput { - name: name.to_string(), + name: WorkerName::from(name), data, reply: reply_tx, }) @@ -1449,7 +1450,7 @@ async fn listen_api_resize_pty( if state .tx .send(ListenApiRequest::ResizePty { - name: name.clone(), + name: WorkerName::new(name.clone()), rows: body.rows, cols: body.cols, reply: reply_tx, @@ -1502,7 +1503,7 @@ async fn listen_api_snapshot( if state .tx .send(ListenApiRequest::WorkerRequest { - name: name.clone(), + name: WorkerName::new(name.clone()), kind: "snapshot_pty".to_string(), payload: json!({ "format": format.as_wire_str() }), timeout: DEFAULT_REQUEST_TIMEOUT, @@ -1539,7 +1540,7 @@ async fn listen_api_get_inbound_delivery_mode( if state .tx .send(ListenApiRequest::GetInboundDeliveryMode { - name: name.clone(), + name: WorkerName::new(name.clone()), reply: reply_tx, }) .await @@ -1589,7 +1590,7 @@ async fn listen_api_set_inbound_delivery_mode( if state .tx .send(ListenApiRequest::SetInboundDeliveryMode { - name: name.clone(), + name: WorkerName::new(name.clone()), mode, reply: reply_tx, }) @@ -1622,7 +1623,7 @@ async fn listen_api_get_pending( if state .tx .send(ListenApiRequest::GetPending { - name: name.clone(), + name: WorkerName::new(name.clone()), reply: reply_tx, }) .await @@ -1645,19 +1646,28 @@ async fn listen_api_get_pending( }); let obj = payload.as_object_mut().expect("payload object built above"); if let Some(thread_id) = m.thread_id { - obj.insert("thread_id".to_string(), Value::String(thread_id)); + obj.insert( + "thread_id".to_string(), + Value::String(thread_id.into_string()), + ); } if let Some(workspace_id) = m.workspace_id { - obj.insert("workspace_id".to_string(), Value::String(workspace_id)); + obj.insert( + "workspace_id".to_string(), + Value::String(workspace_id.into_string()), + ); } if let Some(workspace_alias) = m.workspace_alias { obj.insert( "workspace_alias".to_string(), - Value::String(workspace_alias), + Value::String(workspace_alias.into_string()), ); } if let Some(event_id) = m.event_id { - obj.insert("event_id".to_string(), Value::String(event_id)); + obj.insert( + "event_id".to_string(), + Value::String(event_id.into_string()), + ); } payload }) @@ -1686,7 +1696,7 @@ async fn listen_api_flush_pending( if state .tx .send(ListenApiRequest::FlushPending { - name: name.clone(), + name: WorkerName::new(name.clone()), reply: reply_tx, }) .await @@ -1775,7 +1785,7 @@ async fn listen_api_metrics( if state .tx .send(ListenApiRequest::GetMetrics { - agent: query.agent, + agent: query.agent.map(WorkerName::from), reply: reply_tx, }) .await @@ -1932,8 +1942,8 @@ async fn listen_api_subscribe_channels( if state .tx .send(ListenApiRequest::SubscribeChannels { - name: name.clone(), - channels: body.channels, + name: WorkerName::new(name.clone()), + channels: body.channels.into_iter().map(ChannelName::from).collect(), reply: reply_tx, }) .await @@ -1957,8 +1967,8 @@ async fn listen_api_unsubscribe_channels( if state .tx .send(ListenApiRequest::UnsubscribeChannels { - name: name.clone(), - channels: body.channels, + name: WorkerName::new(name.clone()), + channels: body.channels.into_iter().map(ChannelName::from).collect(), reply: reply_tx, }) .await @@ -2266,6 +2276,7 @@ mod auth_tests { listen_api_router_with_auth, DeliveryRouteError, ListenApiConfig, ListenApiRequest, PtyInputFrame, SetInboundDeliveryModeOk, }; + use crate::ids::{EventId, MessageTarget, ThreadId, WorkspaceAlias, WorkspaceId}; use crate::protocol::MessageInjectionMode; use crate::types::{InboundDeliveryMode, PendingRelayMessage}; use crate::worker_request::RequestWorkerError; @@ -3618,19 +3629,19 @@ mod auth_tests { PendingRelayMessage { from: "Alice".to_string(), body: "one".to_string(), - target: "#general".to_string(), - thread_id: Some("thr_42".to_string()), - workspace_id: Some("ws_demo".to_string()), - workspace_alias: Some("Demo".to_string()), + target: MessageTarget::new("#general"), + thread_id: Some(ThreadId::new("thr_42")), + workspace_id: Some(WorkspaceId::new("ws_demo")), + workspace_alias: Some(WorkspaceAlias::new("Demo")), priority: 1, mode: MessageInjectionMode::Steer, queued_at_ms: 100, - event_id: Some("evt_1".to_string()), + event_id: Some(EventId::new("evt_1")), }, PendingRelayMessage { from: "Bob".to_string(), body: "two".to_string(), - target: "worker-a".to_string(), + target: MessageTarget::new("worker-a"), thread_id: None, workspace_id: None, workspace_alias: None, diff --git a/crates/broker/src/protocol.rs b/crates/broker/src/protocol.rs index 5e18f649d..3c031b0f2 100644 --- a/crates/broker/src/protocol.rs +++ b/crates/broker/src/protocol.rs @@ -1,6 +1,10 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; +use crate::ids::{ + ChannelName, DeliveryId, EventId, MessageTarget, RequestId, ThreadId, WorkerName, + WorkspaceAlias, WorkspaceId, +}; use crate::supervisor::RestartPolicy; pub const PROTOCOL_VERSION: u32 = 2; @@ -21,7 +25,7 @@ pub enum HeadlessProvider { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AgentSpec { - pub name: String, + pub name: WorkerName, pub runtime: AgentRuntime, #[serde(default, skip_serializing_if = "Option::is_none")] pub provider: Option, @@ -34,13 +38,13 @@ pub struct AgentSpec { #[serde(skip_serializing_if = "Option::is_none")] pub team: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub shadow_of: Option, + pub shadow_of: Option, #[serde(skip_serializing_if = "Option::is_none")] pub shadow_mode: Option, #[serde(default)] pub args: Vec, #[serde(default)] - pub channels: Vec, + pub channels: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] pub restart_policy: Option, } @@ -55,17 +59,17 @@ pub enum MessageInjectionMode { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RelayDelivery { - pub delivery_id: String, - pub event_id: String, + pub delivery_id: DeliveryId, + pub event_id: EventId, #[serde(default)] - pub workspace_id: Option, + pub workspace_id: Option, #[serde(default)] - pub workspace_alias: Option, + pub workspace_alias: Option, pub from: String, - pub target: String, + pub target: MessageTarget, pub body: String, #[serde(default)] - pub thread_id: Option, + pub thread_id: Option, #[serde(default)] pub priority: Option, #[serde(default)] @@ -78,7 +82,7 @@ pub struct ProtocolEnvelope { #[serde(rename = "type")] pub msg_type: String, #[serde(default)] - pub request_id: Option, + pub request_id: Option, pub payload: T, } @@ -93,31 +97,31 @@ pub enum SdkToBroker { agent: AgentSpec, }, SendMessage { - to: String, + to: MessageTarget, text: String, #[serde(default)] from: Option, #[serde(default)] - thread_id: Option, + thread_id: Option, #[serde(default)] - workspace_id: Option, + workspace_id: Option, #[serde(default)] - workspace_alias: Option, + workspace_alias: Option, #[serde(default)] priority: Option, #[serde(default)] mode: MessageInjectionMode, }, ReleaseAgent { - name: String, + name: WorkerName, }, SubscribeChannels { - name: String, - channels: Vec, + name: WorkerName, + channels: Vec, }, UnsubscribeChannels { - name: String, - channels: Vec, + name: WorkerName, + channels: Vec, }, ListAgents {}, Shutdown {}, @@ -150,126 +154,126 @@ pub struct ProtocolError { #[serde(tag = "kind", rename_all = "snake_case")] pub enum BrokerEvent { AgentSpawned { - name: String, + name: WorkerName, runtime: AgentRuntime, #[serde(default)] provider: Option, - parent: Option, + parent: Option, cli: Option, model: Option, pid: Option, source: Option, }, AgentReleased { - name: String, + name: WorkerName, }, AgentExit { - name: String, + name: WorkerName, reason: String, }, AgentExited { - name: String, + name: WorkerName, code: Option, signal: Option, #[serde(default)] reason: Option, }, AgentContextLow { - name: String, + name: WorkerName, pct: u8, }, RelayInbound { - event_id: String, + event_id: EventId, from: String, - target: String, + target: MessageTarget, body: String, - thread_id: Option, + thread_id: Option, }, WorkerStream { - name: String, + name: WorkerName, stream: String, chunk: String, }, DeliveryRetry { - name: String, - delivery_id: String, - event_id: String, + name: WorkerName, + delivery_id: DeliveryId, + event_id: EventId, attempts: u32, }, DeliveryDropped { - name: String, + name: WorkerName, count: usize, reason: String, }, DeliveryVerified { - name: String, - delivery_id: String, - event_id: String, + name: WorkerName, + delivery_id: DeliveryId, + event_id: EventId, }, DeliveryFailed { - name: String, - delivery_id: String, - event_id: String, + name: WorkerName, + delivery_id: DeliveryId, + event_id: EventId, reason: String, }, MessageDeliveryConfirmed { - name: String, - delivery_id: String, - event_id: String, + name: WorkerName, + delivery_id: DeliveryId, + event_id: EventId, from: String, - to: String, + to: MessageTarget, }, MessageDeliveryFailed { - name: String, + name: WorkerName, #[serde(default)] - delivery_id: Option, + delivery_id: Option, #[serde(default)] - event_id: Option, + event_id: Option, from: String, - to: String, + to: MessageTarget, attempts: u32, #[serde(rename = "lastError")] last_error: String, }, DeliveryQueued { - delivery_id: String, - agent: String, + delivery_id: DeliveryId, + agent: WorkerName, }, DeliveryInjected { - delivery_id: String, - agent: String, + delivery_id: DeliveryId, + agent: WorkerName, }, DeliveryActive { - delivery_id: String, - agent: String, + delivery_id: DeliveryId, + agent: WorkerName, }, DeliveryAck { - delivery_id: String, - agent: String, + delivery_id: DeliveryId, + agent: WorkerName, }, AclDenied { - name: String, + name: WorkerName, sender: String, - owner_chain: Vec, + owner_chain: Vec, }, RelaycastPublished { - event_id: String, - to: String, + event_id: EventId, + to: MessageTarget, target_type: String, }, RelaycastPublishFailed { - event_id: String, - to: String, + event_id: EventId, + to: MessageTarget, reason: String, }, AgentIdle { - name: String, + name: WorkerName, idle_secs: u64, #[serde(default)] since: Option, }, AgentResult { - name: String, + name: WorkerName, result_id: String, data: Value, #[serde(rename = "final")] @@ -278,12 +282,12 @@ pub enum BrokerEvent { metadata: Option, }, AgentBlockedOnSend { - name: String, + name: WorkerName, blocked_secs: u64, pending_delivery_count: usize, }, AgentRestarting { - name: String, + name: WorkerName, #[serde(rename = "code")] exit_code: Option, signal: Option, @@ -291,20 +295,20 @@ pub enum BrokerEvent { delay_ms: u64, }, AgentRestarted { - name: String, + name: WorkerName, restart_count: u32, }, AgentPermanentlyDead { - name: String, + name: WorkerName, reason: String, }, ChannelSubscribed { - name: String, - channels: Vec, + name: WorkerName, + channels: Vec, }, ChannelUnsubscribed { - name: String, - channels: Vec, + name: WorkerName, + channels: Vec, }, } @@ -333,20 +337,20 @@ pub enum BrokerToWorker { #[serde(tag = "type", content = "payload", rename_all = "snake_case")] pub enum WorkerToBroker { WorkerReady { - name: String, + name: WorkerName, runtime: AgentRuntime, }, DeliveryAck { - delivery_id: String, - event_id: String, + delivery_id: DeliveryId, + event_id: EventId, }, DeliveryVerified { - delivery_id: String, - event_id: String, + delivery_id: DeliveryId, + event_id: EventId, }, DeliveryFailed { - delivery_id: String, - event_id: String, + delivery_id: DeliveryId, + event_id: EventId, reason: String, }, WorkerStream { @@ -371,13 +375,14 @@ mod tests { AgentRuntime, AgentSpec, BrokerEvent, BrokerToSdk, BrokerToWorker, HeadlessProvider, MessageInjectionMode, ProtocolEnvelope, RelayDelivery, WorkerToBroker, PROTOCOL_VERSION, }; + use crate::ids::RequestId; #[test] fn sdk_envelope_round_trip() { let frame = ProtocolEnvelope { v: PROTOCOL_VERSION, msg_type: "spawn_agent".to_string(), - request_id: Some("req_1".to_string()), + request_id: Some(RequestId::new("req_1")), payload: json!({ "agent": { "name": "Worker1", diff --git a/crates/broker/src/pty_worker.rs b/crates/broker/src/pty_worker.rs index 9f7e8061b..effecd6b8 100644 --- a/crates/broker/src/pty_worker.rs +++ b/crates/broker/src/pty_worker.rs @@ -5,6 +5,7 @@ use std::{ }; use crate::{ + ids::{DeliveryId, RequestId}, protocol::{MessageInjectionMode, ProtocolEnvelope, RelayDelivery}, pty::PtySession, }; @@ -40,7 +41,7 @@ use base64::Engine; #[derive(Debug, Clone)] struct PendingWorkerInjection { delivery: RelayDelivery, - request_id: Option, + request_id: Option, queued_at: Instant, } @@ -193,7 +194,7 @@ fn should_block_pending_injection( async fn try_emit_worker_ready( out_tx: &mpsc::Sender>, worker_name: &str, - init_request_id: &mut Option, + init_request_id: &mut Option, init_received_at: Option, worker_ready_sent: &mut bool, startup_ready: bool, @@ -305,13 +306,13 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { const MCP_REMINDER_COOLDOWN: Duration = Duration::from_secs(300); let mut last_mcp_reminder_at: Option = None; let mut pending_worker_injections: VecDeque = VecDeque::new(); - let mut pending_worker_delivery_ids: HashSet = HashSet::new(); + let mut pending_worker_delivery_ids: HashSet = HashSet::new(); let wait_for_relaycast_boot = codex_relaycast_boot_expected(&resolved_cli, &effective_args); let mut startup_output = String::new(); let mut startup_total_bytes = 0usize; let mut saw_relaycast_boot = false; let mut post_boot_output = String::new(); - let mut init_request_id: Option = None; + let mut init_request_id: Option = None; let mut init_received_at: Option = None; let mut worker_ready_sent = false; let suppress_multiline_mcp_reminder = cli_basename(&resolved_cli).eq_ignore_ascii_case("agent") diff --git a/crates/broker/src/relaycast/bridge.rs b/crates/broker/src/relaycast/bridge.rs index 09e1cf58c..4f5dc161d 100644 --- a/crates/broker/src/relaycast/bridge.rs +++ b/crates/broker/src/relaycast/bridge.rs @@ -1,5 +1,8 @@ use serde_json::Value; +use crate::ids::{ + AgentId, ChannelName, EventId, MessageTarget, ThreadId, WorkspaceAlias, WorkspaceId, +}; use crate::types::{ BrokerCommandEvent, BrokerCommandPayload, InboundKind, InboundRelayEvent, InjectRequest, RelayPriority, ReleaseParams, SenderKind, SpawnParams, @@ -23,16 +26,16 @@ pub fn map_ws_event( ); Some(InboundRelayEvent { - event_id: event.event_id, - workspace_id: workspace_id.to_string(), - workspace_alias: workspace_alias.map(str::to_string), + event_id: EventId::new(event.event_id), + workspace_id: WorkspaceId::new(workspace_id), + workspace_alias: workspace_alias.map(WorkspaceAlias::from), kind, from: event.from, - sender_agent_id: event.sender_agent_id, + sender_agent_id: event.sender_agent_id.map(AgentId::from), sender_kind: map_sdk_sender_kind(event.sender_kind), - target: event.target, + target: MessageTarget::new(event.target), text: event.text, - thread_id: event.thread_id, + thread_id: event.thread_id.map(ThreadId::from), priority: map_sdk_priority(event.priority), }) } @@ -61,11 +64,11 @@ pub fn map_ws_broker_command( Some(BrokerCommandEvent { command: command.command, - workspace_id: workspace_id.to_string(), - workspace_alias: workspace_alias.map(str::to_string), - channel: command.channel, + workspace_id: WorkspaceId::new(workspace_id), + workspace_alias: workspace_alias.map(WorkspaceAlias::from), + channel: ChannelName::new(command.channel), invoked_by: command.invoked_by, - handler_agent_id: command.handler_agent_id, + handler_agent_id: command.handler_agent_id.map(AgentId::from), payload, }) } @@ -103,7 +106,7 @@ pub fn to_inject_request(event: InboundRelayEvent) -> Option { } Some(InjectRequest { - id: event.event_id, + id: event.event_id.into_string(), workspace_id: event.workspace_id, workspace_alias: event.workspace_alias, from: event.from, diff --git a/crates/broker/src/relaycast/workspace.rs b/crates/broker/src/relaycast/workspace.rs index 60087fb79..554b1d66d 100644 --- a/crates/broker/src/relaycast/workspace.rs +++ b/crates/broker/src/relaycast/workspace.rs @@ -4,6 +4,7 @@ use serde_json::Value; use tokio::sync::mpsc; use crate::events::EventEmitter; +use crate::ids::{AgentId, WorkspaceAlias, WorkspaceId}; use super::{ auth::{AuthClient, AuthSessionSet}, @@ -12,26 +13,26 @@ use super::{ #[derive(Debug, Clone)] pub struct WorkspaceInboundMessage { - pub workspace_id: String, - pub workspace_alias: Option, + pub workspace_id: WorkspaceId, + pub workspace_alias: Option, pub value: Value, } #[derive(Clone)] pub struct WorkspaceSessionHandle { - pub workspace_id: String, - pub workspace_alias: Option, + pub workspace_id: WorkspaceId, + pub workspace_alias: Option, pub relay_workspace_key: String, pub self_name: String, - pub self_agent_id: String, + pub self_agent_id: AgentId, pub self_names: HashSet, - pub self_agent_ids: HashSet, + pub self_agent_ids: HashSet, pub http_client: RelaycastHttpClient, pub ws_control_tx: mpsc::Sender, } pub struct MultiWorkspaceSession { - pub default_workspace_id: Option, + pub default_workspace_id: Option, pub handles: Vec, pub inbound_rx: mpsc::Receiver, } @@ -54,10 +55,14 @@ impl MultiWorkspaceSession { let mut handles = Vec::with_capacity(sessions.memberships.len()); for session in sessions.memberships { - let workspace_id = session.credentials.workspace_id.clone(); - let workspace_alias = session.credentials.workspace_alias.clone(); + let workspace_id = WorkspaceId::new(session.credentials.workspace_id.clone()); + let workspace_alias = session + .credentials + .workspace_alias + .clone() + .map(WorkspaceAlias::from); let relay_workspace_key = session.credentials.api_key.clone(); - let self_agent_id = session.credentials.agent_id.clone(); + let self_agent_id = AgentId::new(session.credentials.agent_id.clone()); let self_token = session.token.clone(); let self_name = session .credentials @@ -137,7 +142,7 @@ impl MultiWorkspaceSession { } Self { - default_workspace_id: sessions.default_workspace_id, + default_workspace_id: sessions.default_workspace_id.map(WorkspaceId::from), handles, inbound_rx, } @@ -182,7 +187,7 @@ impl MultiWorkspaceSession { } } - pub fn http_clients_by_workspace_id(&self) -> HashMap { + pub fn http_clients_by_workspace_id(&self) -> HashMap { self.handles .iter() .map(|handle| (handle.workspace_id.clone(), handle.http_client.clone())) @@ -192,8 +197,8 @@ impl MultiWorkspaceSession { #[derive(Debug, Clone, serde::Serialize)] pub struct WorkspaceMembershipSummary { - pub workspace_id: String, + pub workspace_id: WorkspaceId, #[serde(skip_serializing_if = "Option::is_none")] - pub workspace_alias: Option, + pub workspace_alias: Option, pub is_default: bool, } diff --git a/crates/broker/src/relaycast/ws.rs b/crates/broker/src/relaycast/ws.rs index b77d5a90b..f9d68e589 100644 --- a/crates/broker/src/relaycast/ws.rs +++ b/crates/broker/src/relaycast/ws.rs @@ -19,9 +19,9 @@ pub enum WsControl { Publish(Value), /// Re-subscribe to a list of channels (e.g. after creating/joining a new /// channel that didn't exist when the WS connection was first established). - Subscribe(Vec), + Subscribe(Vec), /// Unsubscribe from channels that an agent has left. - Unsubscribe(Vec), + Unsubscribe(Vec), } #[derive(Clone)] @@ -30,7 +30,7 @@ pub struct RelaycastWsClient { workspace_http: RelaycastHttpClient, /// Reference-counted channel subscriptions: channel_name -> number of agents subscribed. /// The WS only unsubscribes when the count drops to zero. - subscriptions: Arc>>, + subscriptions: Arc>>, } impl RelaycastWsClient { @@ -41,7 +41,7 @@ impl RelaycastWsClient { ) -> Self { let mut subs = HashMap::new(); for ch in channels { - *subs.entry(ch).or_insert(0) += 1; + *subs.entry(crate::ids::ChannelName::from(ch)).or_insert(0) += 1; } Self { ws_base_url: ws_base_url.into(), @@ -50,7 +50,7 @@ impl RelaycastWsClient { } } - pub fn active_subscriptions(&self) -> Vec { + pub fn active_subscriptions(&self) -> Vec { self.subscriptions.lock().keys().cloned().collect() } @@ -104,7 +104,11 @@ impl RelaycastWsClient { // the same join notifications. let active_subscriptions = self.active_subscriptions(); if !active_subscriptions.is_empty() { - if let Err(error) = ws.subscribe(active_subscriptions.clone()).await { + let active_strs: Vec = active_subscriptions + .iter() + .map(|c| c.as_str().to_string()) + .collect(); + if let Err(error) = ws.subscribe(active_strs.clone()).await { tracing::warn!( target = "relay_broker::ws", channels = ?active_subscriptions, @@ -160,7 +164,11 @@ impl RelaycastWsClient { } } if !joined_now.is_empty() { - if let Err(error) = ws.subscribe(joined_now.clone()).await { + let joined_strs: Vec = joined_now + .iter() + .map(|c| c.as_str().to_string()) + .collect(); + if let Err(error) = ws.subscribe(joined_strs).await { tracing::warn!( target = "relay_broker::ws", channels = ?joined_now, @@ -200,7 +208,11 @@ impl RelaycastWsClient { } } if !left_now.is_empty() { - if let Err(error) = ws.unsubscribe(left_now.clone()).await { + let left_strs: Vec = left_now + .iter() + .map(|c| c.as_str().to_string()) + .collect(); + if let Err(error) = ws.unsubscribe(left_strs).await { tracing::warn!( target = "relay_broker::ws", channels = ?left_now, @@ -538,9 +550,9 @@ impl RelaycastHttpClient { /// hardcoded defaults). Channels that already exist are silently skipped /// (409 → no-op). The broker must be a channel member to receive /// `message.created` WebSocket events for that channel. - pub async fn ensure_extra_channels(&self, channels: &[String]) -> Result<()> { + pub async fn ensure_extra_channels(&self, channels: &[crate::ids::ChannelName]) -> Result<()> { let defaults = ["general", "engineering"]; - let extras: Vec<&String> = channels + let extras: Vec<&crate::ids::ChannelName> = channels .iter() .filter(|c| !defaults.contains(&c.as_str())) .collect(); @@ -556,7 +568,7 @@ impl RelaycastHttpClient { }; for name in extras { let request = relaycast::CreateChannelRequest { - name: name.clone(), + name: name.as_str().to_string(), topic: None, metadata: None, }; diff --git a/crates/broker/src/routing.rs b/crates/broker/src/routing.rs index 47c1c90e8..61f0d04ef 100644 --- a/crates/broker/src/routing.rs +++ b/crates/broker/src/routing.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; +use crate::ids::{AgentId, MessageTargetKind}; use crate::types::{InboundKind, InboundRelayEvent}; use crate::runtime::normalize_channel; @@ -7,22 +8,22 @@ use crate::runtime::normalize_channel; #[derive(Clone)] pub(crate) struct RoutingWorker<'a> { pub(crate) name: &'a str, - pub(crate) channels: &'a [String], + pub(crate) channels: &'a [crate::ids::ChannelName], pub(crate) workspace_id: Option<&'a str>, } #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct DeliveryPlan { - pub(crate) workspace_id: String, + pub(crate) workspace_id: crate::ids::WorkspaceId, pub(crate) targets: Vec, - pub(crate) display_target: String, + pub(crate) display_target: crate::ids::MessageTarget, pub(crate) needs_dm_resolution: bool, } pub(crate) fn is_self_echo( event: &InboundRelayEvent, self_names: &HashSet, - self_agent_ids: &HashSet, + self_agent_ids: &HashSet, has_local_target: bool, ) -> bool { let from_self = self_names.contains(&event.from) @@ -71,7 +72,11 @@ pub(crate) fn resolve_delivery_targets( ) -> DeliveryPlan { let ws_id = Some(event.workspace_id.as_str()); - if event.target.starts_with('#') { + // Dispatch on the structural shape of the target rather than hand-rolled + // prefix checks. The Channel / Thread / DM / Worker discrimination lives + // in `MessageTarget::kind()` so future routing changes only touch one + // place. + if matches!(event.target.kind(), MessageTargetKind::Channel(_)) { let targets = worker_names_for_channel_delivery(workers, &event.target, &event.from, ws_id); tracing::debug!( target = "broker::routing", @@ -93,7 +98,9 @@ pub(crate) fn resolve_delivery_targets( // (except the sender) that belong to the same workspace. The WS only // delivers thread.reply to channel subscribers so every matching local // worker is a valid recipient. - if matches!(event.kind, InboundKind::ThreadReply) && event.target == "thread" { + if matches!(event.kind, InboundKind::ThreadReply) + && matches!(event.target.kind(), MessageTargetKind::Thread) + { let targets: Vec = workers .iter() .filter(|w| { @@ -111,7 +118,7 @@ pub(crate) fn resolve_delivery_targets( return DeliveryPlan { workspace_id: event.workspace_id.clone(), targets, - display_target: "thread".to_string(), + display_target: crate::ids::MessageTarget::thread_sentinel(), needs_dm_resolution: false, }; } @@ -255,14 +262,17 @@ mod tests { #[derive(Debug)] struct WorkerFixture { name: String, - channels: Vec, + channels: Vec, } impl WorkerFixture { fn new(name: &str, channels: &[&str]) -> Self { Self { name: name.to_string(), - channels: channels.iter().map(|channel| channel.to_string()).collect(), + channels: channels + .iter() + .map(|channel| crate::ids::ChannelName::from(*channel)) + .collect(), } } } @@ -286,14 +296,14 @@ mod tests { }; InboundRelayEvent { - event_id: "evt_1".to_string(), - workspace_id: "ws_test".to_string(), - workspace_alias: Some("test".to_string()), + event_id: crate::ids::EventId::new("evt_1"), + workspace_id: crate::ids::WorkspaceId::new("ws_test"), + workspace_alias: Some(crate::ids::WorkspaceAlias::new("test")), kind, from: from.to_string(), sender_agent_id: None, sender_kind: SenderKind::Agent, - target: target.to_string(), + target: crate::ids::MessageTarget::new(target), text: "hello".to_string(), thread_id: None, priority, @@ -313,10 +323,10 @@ mod tests { #[test] fn self_echo_detected_by_agent_id() { let self_names = HashSet::new(); - let mut self_agent_ids = HashSet::new(); - self_agent_ids.insert("agt_self".to_string()); + let mut self_agent_ids: HashSet = HashSet::new(); + self_agent_ids.insert(crate::ids::AgentId::new("agt_self")); let mut event = inbound_event(InboundKind::MessageCreated, "Other", "#general"); - event.sender_agent_id = Some("agt_self".to_string()); + event.sender_agent_id = Some(crate::ids::AgentId::new("agt_self")); assert!(is_self_echo(&event, &self_names, &self_agent_ids, false)); } @@ -459,12 +469,12 @@ mod tests { // Event from ws_a should only reach Alpha (but Alpha is the sender, so no targets) let mut event = inbound_event(InboundKind::MessageCreated, "External", "#general"); - event.workspace_id = "ws_a".to_string(); + event.workspace_id = crate::ids::WorkspaceId::new("ws_a"); let plan = resolve_delivery_targets(&event, &routing_workers); assert_eq!(plan.targets, vec!["Alpha".to_string()]); // Event from ws_b should only reach Bravo - event.workspace_id = "ws_b".to_string(); + event.workspace_id = crate::ids::WorkspaceId::new("ws_b"); let plan = resolve_delivery_targets(&event, &routing_workers); assert_eq!(plan.targets, vec!["Bravo".to_string()]); } @@ -491,12 +501,12 @@ mod tests { // Event from ws_a: Alpha matches (no ws filter), Bravo doesn't (ws_b != ws_a) let mut event = inbound_event(InboundKind::MessageCreated, "External", "#general"); - event.workspace_id = "ws_a".to_string(); + event.workspace_id = crate::ids::WorkspaceId::new("ws_a"); let plan = resolve_delivery_targets(&event, &routing_workers); assert_eq!(plan.targets, vec!["Alpha".to_string()]); // Event from ws_b: both match - event.workspace_id = "ws_b".to_string(); + event.workspace_id = crate::ids::WorkspaceId::new("ws_b"); let plan = resolve_delivery_targets(&event, &routing_workers); assert_eq!(plan.targets, vec!["Alpha".to_string(), "Bravo".to_string()]); } diff --git a/crates/broker/src/runtime/api.rs b/crates/broker/src/runtime/api.rs index 1f812d341..9391e4391 100644 --- a/crates/broker/src/runtime/api.rs +++ b/crates/broker/src/runtime/api.rs @@ -944,7 +944,7 @@ impl BrokerRuntime { .send_to_worker( &name, "write_pty", - Some(format!("api_{}", Uuid::new_v4().simple())), + Some(RequestId::new(format!("api_{}", Uuid::new_v4().simple()))), json!({ "data": data }), ) .await @@ -993,7 +993,7 @@ impl BrokerRuntime { .send_to_worker( &name, "resize_pty", - Some(format!("api_{}", Uuid::new_v4().simple())), + Some(RequestId::new(format!("api_{}", Uuid::new_v4().simple()))), json!({ "rows": rows, "cols": cols }), ) .await @@ -1046,7 +1046,7 @@ impl BrokerRuntime { )); } Some(AgentRuntime::Pty) => { - let request_id = format!("req_{}", Uuid::new_v4().simple()); + let request_id = RequestId::new(format!("req_{}", Uuid::new_v4().simple())); if let Err(err) = workers .send_to_worker(&name, &kind, Some(request_id.clone()), payload) .await @@ -1056,10 +1056,10 @@ impl BrokerRuntime { )); } else { pending_requests.insert( - request_id, + request_id.into_string(), worker_request::PendingRequest { kind, - worker_name: name, + worker_name: name.into_string(), reply, deadline: Instant::now() + timeout, }, @@ -1471,7 +1471,7 @@ impl BrokerRuntime { fn workspace_for_channel_update<'a>( workspace_id: Option<&str>, - workspace_lookup: &'a HashMap, + workspace_lookup: &'a HashMap, default_workspace_id: Option<&str>, default_workspace: &'a RelayWorkspace, ) -> &'a RelayWorkspace { @@ -1488,10 +1488,10 @@ fn effective_channel_workspace_id<'a>( workspace_id.or(default_workspace_id) } -fn channel_in_list(channels: &[String], channel: &str) -> bool { +fn channel_in_list(channels: &[ChannelName], channel: &str) -> bool { channels .iter() - .any(|existing| existing.eq_ignore_ascii_case(channel)) + .any(|existing| existing.as_str().eq_ignore_ascii_case(channel)) } fn persist_agent_channels( @@ -1500,13 +1500,13 @@ fn persist_agent_channels( parent: Option, mut spec: AgentSpec, pid: Option, - channels: Vec, + channels: Vec, ) { spec.channels = channels.clone(); let runtime = spec.runtime.clone(); let agent = state .agents - .entry(name.to_string()) + .entry(WorkerName::from(name)) .or_insert_with(|| broker::PersistedAgent { runtime: runtime.clone(), parent: parent.clone(), diff --git a/crates/broker/src/runtime/delivery.rs b/crates/broker/src/runtime/delivery.rs index 061e712d7..6ab763765 100644 --- a/crates/broker/src/runtime/delivery.rs +++ b/crates/broker/src/runtime/delivery.rs @@ -2,7 +2,7 @@ use super::*; #[derive(Debug, Clone)] pub(crate) struct PendingDelivery { - pub(super) worker_name: String, + pub(super) worker_name: WorkerName, pub(super) delivery: RelayDelivery, pub(super) attempts: u32, pub(super) next_retry_at: Instant, @@ -13,7 +13,7 @@ pub(crate) struct PendingDelivery { /// Serializable snapshot of pending deliveries for crash recovery. #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct PersistedPendingDelivery { - pub(super) worker_name: String, + pub(super) worker_name: WorkerName, pub(super) delivery: RelayDelivery, pub(super) attempts: u32, #[serde(default)] @@ -25,16 +25,16 @@ pub(crate) struct PersistedPendingDelivery { #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum DeliveryAttemptOutcome { Attempted { - worker_name: String, + worker_name: WorkerName, attempts: u32, - event_id: String, + event_id: EventId, }, Failed { - worker_name: String, - delivery_id: String, - event_id: String, + worker_name: WorkerName, + delivery_id: DeliveryId, + event_id: EventId, from: String, - to: String, + to: MessageTarget, attempts: u32, last_error: String, }, @@ -47,7 +47,7 @@ pub(crate) fn unix_timestamp_millis() -> u64 { pub(crate) fn save_pending_deliveries( path: &Path, - deliveries: &HashMap, + deliveries: &HashMap, ) -> Result<()> { let persisted: Vec = deliveries .values() @@ -69,7 +69,7 @@ pub(crate) fn save_pending_deliveries( Ok(()) } -pub(crate) fn load_pending_deliveries(path: &Path) -> HashMap { +pub(crate) fn load_pending_deliveries(path: &Path) -> HashMap { let data = match std::fs::read_to_string(path) { Ok(d) => d, Err(_) => return HashMap::new(), @@ -104,7 +104,7 @@ pub(crate) fn load_pending_deliveries(path: &Path) -> HashMap { /// (`worker_ready` initial task, continuity restore) bypass this queue by /// not calling this helper. pub(crate) fn queue_inbound_for_delivery_mode( - delivery_states: &mut HashMap, + delivery_states: &mut HashMap, workers: &WorkerRegistry, worker_name: &str, ctx: InboundContext<'_>, @@ -165,20 +165,22 @@ pub(crate) fn queue_inbound_for_delivery_mode( if !workers.has_worker(worker_name) { return InboundQueueOutcome::WorkerMissing; } - let state = delivery_states.entry(worker_name.to_string()).or_default(); + let state = delivery_states + .entry(WorkerName::from(worker_name)) + .or_default(); let should_drain = state.should_drain_immediately(); let queued_at_ms = chrono::Utc::now().timestamp_millis().max(0) as u64; let msg = PendingRelayMessage { from: ctx.from.to_string(), body: ctx.body.to_string(), - target: ctx.target.to_string(), - thread_id: ctx.thread_id.map(str::to_string), - workspace_id: ctx.workspace_id.map(str::to_string), - workspace_alias: ctx.workspace_alias.map(str::to_string), + target: MessageTarget::new(ctx.target), + thread_id: ctx.thread_id.map(ThreadId::from), + workspace_id: ctx.workspace_id.map(WorkspaceId::from), + workspace_alias: ctx.workspace_alias.map(WorkspaceAlias::from), priority: ctx.priority, mode: ctx.mode, queued_at_ms, - event_id: ctx.event_id.map(str::to_string), + event_id: ctx.event_id.map(EventId::from), }; match state.accept_inbound(msg) { InboundDeliveryDispatch::Queued { queue_len } => { @@ -223,7 +225,7 @@ pub(crate) fn queue_inbound_for_delivery_mode( pub(crate) async fn try_inject_pending_relay_message( workers: &mut WorkerRegistry, - pending_deliveries: &mut HashMap, + pending_deliveries: &mut HashMap, worker_name: &str, msg: &PendingRelayMessage, retry_interval: Duration, @@ -231,7 +233,7 @@ pub(crate) async fn try_inject_pending_relay_message( let event_id = msg .event_id .clone() - .unwrap_or_else(|| format!("flush_{}", Uuid::new_v4().simple())); + .unwrap_or_else(|| EventId::new(format!("flush_{}", Uuid::new_v4().simple()))); match timeout( retry_interval, queue_and_try_delivery_raw( @@ -272,7 +274,7 @@ pub(crate) async fn try_inject_pending_relay_message( /// the same way `/api/send` does for individual targets. pub(crate) async fn inject_pending_relay_message( workers: &mut WorkerRegistry, - pending_deliveries: &mut HashMap, + pending_deliveries: &mut HashMap, worker_name: &str, msg: &PendingRelayMessage, retry_interval: Duration, @@ -300,7 +302,7 @@ pub(crate) async fn inject_pending_relay_message( pub(crate) async fn queue_and_try_delivery( workers: &mut WorkerRegistry, - pending_deliveries: &mut HashMap, + pending_deliveries: &mut HashMap, worker_name: &str, mapped: &crate::types::InboundRelayEvent, retry_interval: Duration, @@ -326,26 +328,26 @@ pub(crate) async fn queue_and_try_delivery( #[allow(clippy::too_many_arguments)] pub(crate) async fn queue_and_try_delivery_raw( workers: &mut WorkerRegistry, - pending_deliveries: &mut HashMap, + pending_deliveries: &mut HashMap, worker_name: &str, event_id: &str, from: &str, target: &str, body: &str, - thread_id: Option, - workspace_id: Option, - workspace_alias: Option, + thread_id: Option, + workspace_id: Option, + workspace_alias: Option, priority: u8, injection_mode: MessageInjectionMode, retry_interval: Duration, ) -> Result<()> { let delivery = RelayDelivery { - delivery_id: format!("del_{}", Uuid::new_v4().simple()), - event_id: event_id.to_string(), + delivery_id: DeliveryId::new(format!("del_{}", Uuid::new_v4().simple())), + event_id: EventId::new(event_id), workspace_id, workspace_alias, from: from.to_string(), - target: target.to_string(), + target: MessageTarget::new(target), body: body.to_string(), thread_id, priority: Some(priority), @@ -355,7 +357,7 @@ pub(crate) async fn queue_and_try_delivery_raw( pending_deliveries.insert( delivery_id.clone(), PendingDelivery { - worker_name: worker_name.to_string(), + worker_name: WorkerName::new(worker_name), delivery, attempts: 0, next_retry_at: Instant::now(), @@ -373,9 +375,9 @@ pub(crate) async fn queue_and_try_delivery_raw( } pub(crate) async fn retry_pending_delivery( - delivery_id: &str, + delivery_id: &DeliveryId, workers: &mut WorkerRegistry, - pending_deliveries: &mut HashMap, + pending_deliveries: &mut HashMap, retry_interval: Duration, ) -> Result { let pending = match pending_deliveries.get(delivery_id) { @@ -461,7 +463,7 @@ pub(crate) async fn retry_pending_delivery( pub(crate) async fn emit_delivery_attempt_outcome( sdk_out_tx: &mpsc::Sender>, - delivery_id: &str, + delivery_id: &DeliveryId, was_retry: bool, outcome: DeliveryAttemptOutcome, ) -> Result<()> { @@ -476,7 +478,7 @@ pub(crate) async fn emit_delivery_attempt_outcome( sdk_out_tx, BrokerEvent::DeliveryRetry { name: worker_name, - delivery_id: delivery_id.to_string(), + delivery_id: delivery_id.clone(), event_id, attempts, }, @@ -514,19 +516,19 @@ pub(crate) async fn emit_delivery_attempt_outcome( #[cfg(test)] pub(crate) fn drop_pending_for_worker( - pending_deliveries: &mut HashMap, + pending_deliveries: &mut HashMap, worker_name: &str, ) -> usize { take_pending_for_worker(pending_deliveries, worker_name).len() } pub(crate) fn take_pending_for_worker( - pending_deliveries: &mut HashMap, + pending_deliveries: &mut HashMap, worker_name: &str, ) -> Vec { - let delivery_ids: Vec = pending_deliveries + let delivery_ids: Vec = pending_deliveries .iter() - .filter(|(_, pending)| pending.worker_name == worker_name) + .filter(|(_, pending)| pending.worker_name.as_str() == worker_name) .map(|(delivery_id, _)| delivery_id.clone()) .collect(); @@ -603,7 +605,7 @@ pub(crate) fn should_clear_pending_delivery_for_event( } pub(crate) fn clear_pending_delivery_if_event_matches( - pending_deliveries: &mut HashMap, + pending_deliveries: &mut HashMap, delivery_id: &str, event_id: Option<&str>, worker_name: &str, diff --git a/crates/broker/src/runtime/event_loop.rs b/crates/broker/src/runtime/event_loop.rs index 568941bc1..95013a59b 100644 --- a/crates/broker/src/runtime/event_loop.rs +++ b/crates/broker/src/runtime/event_loop.rs @@ -7,9 +7,9 @@ pub(crate) struct BrokerRuntime { pub(super) paths: RuntimePaths, pub(super) state: broker::BrokerState, pub(super) workspaces: Vec, - pub(super) workspace_lookup: HashMap, + pub(super) workspace_lookup: HashMap, pub(super) default_workspace: RelayWorkspace, - pub(super) default_workspace_id: Option, + pub(super) default_workspace_id: Option, pub(super) self_names: HashSet, pub(super) ws_control_tx: mpsc::Sender, pub(super) relaycast_http: RelaycastHttpClient, @@ -28,11 +28,11 @@ pub(crate) struct BrokerRuntime { pub(super) reap_tick: tokio::time::Interval, pub(super) dedup: DedupCache, pub(super) delivery_retry_interval: Duration, - pub(super) pending_deliveries: HashMap, - pub(super) terminal_failed_deliveries: HashSet, + pub(super) pending_deliveries: HashMap, + pub(super) terminal_failed_deliveries: HashSet, pub(super) pending_requests: HashMap, - pub(super) delivery_states: HashMap, - pub(super) agent_result_tokens: HashMap, + pub(super) delivery_states: HashMap, + pub(super) agent_result_tokens: HashMap, pub(super) dm_participants_cache: DmParticipantsCache, pub(super) recent_thread_messages: VecDeque, pub(super) shutdown: bool, @@ -145,7 +145,7 @@ impl BrokerRuntime { }); self.telemetry.shutdown(); - let active_workers: Vec = self.workers.workers.keys().cloned().collect(); + let active_workers: Vec = self.workers.workers.keys().cloned().collect(); for worker_name in active_workers { if let Err(error) = self.relaycast_http.mark_agent_offline(&worker_name).await { tracing::warn!( diff --git a/crates/broker/src/runtime/init.rs b/crates/broker/src/runtime/init.rs index d96363e24..eb69d11cc 100644 --- a/crates/broker/src/runtime/init.rs +++ b/crates/broker/src/runtime/init.rs @@ -199,7 +199,7 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re workspaces, ws_inbound_rx, } = relay; - let workspace_lookup: HashMap = workspaces + let workspace_lookup: HashMap = workspaces .iter() .cloned() .map(|workspace| (workspace.workspace_id.clone(), workspace)) @@ -301,7 +301,10 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re } log_startup_phase(startup_debug, broker_start, "default channels ensured"); - let extra_channels = channels_from_csv(&cmd.channels); + let extra_channels: Vec = channels_from_csv(&cmd.channels) + .into_iter() + .map(ChannelName::from) + .collect(); log_startup_phase( startup_debug, broker_start, @@ -360,9 +363,12 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re // themselves through per-spawn env_vars. worker_env.push(( "RELAY_DEFAULT_WORKSPACE".to_string(), - default_workspace_id.clone(), + default_workspace_id.as_str().to_string(), + )); + worker_env.push(( + "RELAY_WORKSPACE_ID".to_string(), + default_workspace_id.into_string(), )); - worker_env.push(("RELAY_WORKSPACE_ID".to_string(), default_workspace_id)); } let (sdk_out_tx, mut sdk_out_rx) = mpsc::channel::>(1024); @@ -405,7 +411,7 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re let dedup = DedupCache::new(Duration::from_secs(300), 8192); let delivery_retry_interval = delivery_retry_interval(); let pending_deliveries = load_pending_deliveries(&paths.pending); - let terminal_failed_deliveries: HashSet = HashSet::new(); + let terminal_failed_deliveries: HashSet = HashSet::new(); // Outstanding worker-bound RPC requests waiting on a `*_response` // frame from the wrapped worker. Keyed by the `request_id` we put on // the outbound request frame; the reply `oneshot` is consumed when @@ -422,8 +428,8 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re // process state). See `relay_broker::types::InboundDeliveryState`. Entries // are created lazily on first lookup and removed wherever workers // exit (`Release` arm or `reap_exited` sweep). - let delivery_states: HashMap = HashMap::new(); - let agent_result_tokens: HashMap = HashMap::new(); + let delivery_states: HashMap = HashMap::new(); + let agent_result_tokens: HashMap = HashMap::new(); let dm_participants_cache = DmParticipantsCache::new(); let recent_thread_messages: VecDeque = VecDeque::new(); if !pending_deliveries.is_empty() { diff --git a/crates/broker/src/runtime/io.rs b/crates/broker/src/runtime/io.rs index b694cdc7c..bddb0d321 100644 --- a/crates/broker/src/runtime/io.rs +++ b/crates/broker/src/runtime/io.rs @@ -2,7 +2,7 @@ use super::*; pub(crate) async fn send_error( tx: &mpsc::Sender>, - request_id: Option, + request_id: Option, code: &str, message: String, retryable: bool, @@ -63,7 +63,7 @@ pub(crate) async fn emit_http_api_event_with_timeout( pub(crate) async fn send_frame( tx: &mpsc::Sender>, msg_type: &str, - request_id: Option, + request_id: Option, payload: Value, ) -> Result<()> { tx.send(ProtocolEnvelope { diff --git a/crates/broker/src/runtime/maintenance.rs b/crates/broker/src/runtime/maintenance.rs index 7ea853c05..98487cc04 100644 --- a/crates/broker/src/runtime/maintenance.rs +++ b/crates/broker/src/runtime/maintenance.rs @@ -33,7 +33,7 @@ impl BrokerRuntime { ); } - let due_ids: Vec = pending_deliveries + let due_ids: Vec = pending_deliveries .iter() .filter_map(|(delivery_id, pending)| { if pending.next_retry_at <= now { @@ -90,7 +90,7 @@ impl BrokerRuntime { let (category, description) = crate::crash_insights::CrashInsights::analyze(*code, signal.as_deref()); crash_insights.record(crate::crash_insights::CrashRecord { - agent_name: name.clone(), + agent_name: name.as_str().to_string(), exit_code: *code, signal: signal.clone(), timestamp: std::time::SystemTime::now() @@ -257,6 +257,7 @@ impl BrokerRuntime { if !*shutdown { let pending_restarts = workers.supervisor.pending_restarts(); for (name, rst) in pending_restarts { + let name = WorkerName::from(name); if let Some(remaining) = relaycast_http.registration_block_remaining(&name) { tracing::debug!( worker = %name, diff --git a/crates/broker/src/runtime/mod.rs b/crates/broker/src/runtime/mod.rs index 343e40c3c..bb3a0efdd 100644 --- a/crates/broker/src/runtime/mod.rs +++ b/crates/broker/src/runtime/mod.rs @@ -26,6 +26,10 @@ use uuid::Uuid; use crate::{ dedup::DedupCache, + ids::{ + AgentId, ChannelName, DeliveryId, EventId, MessageTarget, RequestId, ThreadId, WorkerName, + WorkspaceAlias, WorkspaceId, + }, protocol::{ AgentRuntime, AgentSpec, BrokerEvent, HeadlessProvider as ProtocolHeadlessProvider, MessageInjectionMode, ProtocolEnvelope, RelayDelivery, PROTOCOL_VERSION, diff --git a/crates/broker/src/runtime/relaycast_events.rs b/crates/broker/src/runtime/relaycast_events.rs index 9775fe0f1..ab808da53 100644 --- a/crates/broker/src/runtime/relaycast_events.rs +++ b/crates/broker/src/runtime/relaycast_events.rs @@ -71,7 +71,7 @@ impl BrokerRuntime { if let Ok(ws_event) = serde_json::from_value::(ws_value.clone()) { match ws_event { WsEvent::AgentReleaseRequested(event) => { - let name = event.agent.name; + let name = WorkerName::from(event.agent.name); if is_relaycast_self_control_target( &name, &workspace_self_name, @@ -163,7 +163,7 @@ impl BrokerRuntime { return; } WsEvent::AgentSpawnRequested(event) => { - let name = event.agent.name; + let name = WorkerName::from(event.agent.name); eprintln!( "[agent-relay] received spawn request for '{}' (cli: {})", name, event.agent.cli @@ -210,8 +210,9 @@ impl BrokerRuntime { .as_deref() .map(|ch| { let mut chs = default_spawn_channels(); - if !chs.contains(&ch.to_string()) { - chs.push(ch.to_string()); + let candidate = ChannelName::from(ch); + if !chs.contains(&candidate) { + chs.push(candidate); } chs }) @@ -420,14 +421,15 @@ impl BrokerRuntime { .as_deref() .map(|ch| { let mut chs = default_spawn_channels(); - if !chs.contains(&ch.to_string()) { - chs.push(ch.to_string()); + let candidate = ChannelName::from(ch); + if !chs.contains(&candidate) { + chs.push(candidate); } chs }) .unwrap_or_else(default_spawn_channels); let spec = AgentSpec { - name: name.clone(), + name: WorkerName::from(name.clone()), runtime: AgentRuntime::Pty, provider: None, cli: Some(cli.clone()), @@ -489,7 +491,7 @@ impl BrokerRuntime { if let Some(ref task_text) = effective_task { workers .initial_tasks - .insert(name.clone(), task_text.clone()); + .insert(WorkerName::from(name.clone()), task_text.clone()); } *agent_spawn_count += 1; telemetry.track(TelemetryEvent::AgentSpawn { @@ -501,7 +503,7 @@ impl BrokerRuntime { }); let pid = workers.worker_pid(&name).unwrap_or(0); state.agents.insert( - name.clone(), + WorkerName::from(name.clone()), broker::PersistedAgent { runtime: AgentRuntime::Pty, parent: Some("Relaycast".to_string()), @@ -640,7 +642,7 @@ impl BrokerRuntime { resolved_target = %chan_target, "overriding thread reply display_target with raw WS channel" ); - delivery_plan.display_target = chan_target; + delivery_plan.display_target = MessageTarget::new(chan_target); } } @@ -678,7 +680,7 @@ impl BrokerRuntime { .iter() .find(|participant| !agent_name_eq(participant, &mapped.from)) { - delivery_plan.display_target = participant.clone(); + delivery_plan.display_target = MessageTarget::new(participant.clone()); } let worker_view = workers.routing_workers(); diff --git a/crates/broker/src/runtime/session.rs b/crates/broker/src/runtime/session.rs index 9ae5d82e9..00ae6e845 100644 --- a/crates/broker/src/runtime/session.rs +++ b/crates/broker/src/runtime/session.rs @@ -3,20 +3,20 @@ use super::*; /// Shared Relaycast connection state used by run_init and run_wrap. #[derive(Clone)] pub(crate) struct RelayWorkspace { - pub(crate) workspace_id: String, - pub(crate) workspace_alias: Option, + pub(crate) workspace_id: WorkspaceId, + pub(crate) workspace_alias: Option, pub(crate) relay_workspace_key: String, pub(crate) self_name: String, - pub(crate) self_agent_id: String, + pub(crate) self_agent_id: AgentId, pub(crate) self_names: HashSet, - pub(crate) self_agent_ids: HashSet, + pub(crate) self_agent_ids: HashSet, pub(crate) http_client: RelaycastHttpClient, pub(crate) ws_control_tx: mpsc::Sender, } pub(crate) struct RelaySession { pub(crate) http_base: String, - pub(crate) default_workspace_id: Option, + pub(crate) default_workspace_id: Option, pub(crate) workspaces: Vec, pub(crate) ws_inbound_rx: mpsc::Receiver, } @@ -25,7 +25,7 @@ pub(crate) struct RelaySession { pub(crate) struct RelayReadyState { pub(super) workspace_key: String, pub(super) memberships: Vec, - pub(super) default_workspace_id: Option, + pub(super) default_workspace_id: Option, } pub(crate) async fn serve_startup_api_until_ready( diff --git a/crates/broker/src/runtime/spawn_spec.rs b/crates/broker/src/runtime/spawn_spec.rs index 9781d84bb..7ac1469d0 100644 --- a/crates/broker/src/runtime/spawn_spec.rs +++ b/crates/broker/src/runtime/spawn_spec.rs @@ -9,15 +9,15 @@ pub(crate) fn runtime_label(runtime: &AgentRuntime) -> &'static str { #[allow(clippy::too_many_arguments)] pub(crate) fn build_http_api_spawn_spec( - name: String, + name: WorkerName, cli: String, transport: Option, model: Option, args: Vec, - channels: Vec, + channels: Vec, cwd: Option, team: Option, - shadow_of: Option, + shadow_of: Option, shadow_mode: Option, restart_policy: Option, ) -> Result { diff --git a/crates/broker/src/runtime/tests.rs b/crates/broker/src/runtime/tests.rs index 001771807..900deab4d 100644 --- a/crates/broker/src/runtime/tests.rs +++ b/crates/broker/src/runtime/tests.rs @@ -6,6 +6,9 @@ use std::{ time::{Duration, Instant}, }; +use crate::ids::{ + ChannelName, DeliveryId, EventId, MessageTarget, WorkerName, WorkspaceAlias, WorkspaceId, +}; use crate::protocol::{AgentSpec, MessageInjectionMode, RelayDelivery}; use crate::worker::{AgentWorkState, WorkerEvent, WorkerHandle, WorkerRegistry}; use crate::{ @@ -61,10 +64,10 @@ async fn make_worker_registry_with_worker(name: &str) -> WorkerRegistry { .expect("test worker process should spawn"); let stdin = child.stdin.take().expect("test worker stdin should exist"); registry.workers.insert( - name.to_string(), + WorkerName::from(name), WorkerHandle { spec: AgentSpec { - name: name.to_string(), + name: WorkerName::from(name), runtime: AgentRuntime::Pty, provider: None, cli: Some("cat".to_string()), @@ -78,7 +81,7 @@ async fn make_worker_registry_with_worker(name: &str) -> WorkerRegistry { restart_policy: None, }, parent: None, - workspace_id: Some("ws_demo".to_string()), + workspace_id: Some(WorkspaceId::new("ws_demo")), child, stdin, spawned_at: Instant::now(), @@ -158,7 +161,7 @@ async fn inbound_queue_manual_flush_holds_until_explicit_drain() { let worker_name = "worker-a"; let workers = make_worker_registry_with_worker(worker_name).await; let mut delivery_states = HashMap::from([( - worker_name.to_string(), + WorkerName::from(worker_name), InboundDeliveryState::new(InboundDeliveryMode::ManualFlush), )]); @@ -213,16 +216,16 @@ async fn delivery_retry_fails_promptly_when_recipient_is_gone() { Instant::now(), ); let mut pending_deliveries = HashMap::from([( - "del_gone".to_string(), + DeliveryId::new("del_gone"), PendingDelivery { - worker_name: "ghost".to_string(), + worker_name: WorkerName::from("ghost"), delivery: RelayDelivery { - delivery_id: "del_gone".to_string(), - event_id: "evt_gone".to_string(), - workspace_id: Some("ws_demo".to_string()), - workspace_alias: Some("Demo".to_string()), + delivery_id: DeliveryId::new("del_gone"), + event_id: EventId::new("evt_gone"), + workspace_id: Some(WorkspaceId::new("ws_demo")), + workspace_alias: Some(WorkspaceAlias::new("Demo")), from: "Lead".to_string(), - target: "Worker".to_string(), + target: MessageTarget::new("Worker"), body: "hello".to_string(), thread_id: None, priority: Some(2), @@ -236,7 +239,7 @@ async fn delivery_retry_fails_promptly_when_recipient_is_gone() { )]); let outcome = retry_pending_delivery( - "del_gone", + &DeliveryId::new("del_gone"), &mut workers, &mut pending_deliveries, Duration::from_millis(1), @@ -247,11 +250,11 @@ async fn delivery_retry_fails_promptly_when_recipient_is_gone() { assert_eq!( outcome, DeliveryAttemptOutcome::Failed { - worker_name: "ghost".to_string(), - delivery_id: "del_gone".to_string(), - event_id: "evt_gone".to_string(), + worker_name: WorkerName::from("ghost"), + delivery_id: DeliveryId::new("del_gone"), + event_id: EventId::new("evt_gone"), from: "Lead".to_string(), - to: "Worker".to_string(), + to: MessageTarget::new("Worker"), attempts: 3, last_error: "recipient gone".to_string(), } @@ -280,16 +283,16 @@ async fn delivery_retry_transient_blip_emits_failed_event_for_present_worker() { ); let mut pending_deliveries = HashMap::from([( - "del_blip".to_string(), + DeliveryId::new("del_blip"), PendingDelivery { - worker_name: worker_name.to_string(), + worker_name: WorkerName::from(worker_name), delivery: RelayDelivery { - delivery_id: "del_blip".to_string(), - event_id: "evt_blip".to_string(), - workspace_id: Some("ws_demo".to_string()), - workspace_alias: Some("Demo".to_string()), + delivery_id: DeliveryId::new("del_blip"), + event_id: EventId::new("evt_blip"), + workspace_id: Some(WorkspaceId::new("ws_demo")), + workspace_alias: Some(WorkspaceAlias::new("Demo")), from: "orchestrator".to_string(), - target: worker_name.to_string(), + target: MessageTarget::new(worker_name), body: "transient auth blip".to_string(), thread_id: None, priority: Some(2), @@ -305,7 +308,7 @@ async fn delivery_retry_transient_blip_emits_failed_event_for_present_worker() { let mut final_outcome = None; for retry_index in 1..=MAX_DELIVERY_RETRIES + 1 { match retry_pending_delivery( - "del_blip", + &DeliveryId::new("del_blip"), &mut workers, &mut pending_deliveries, Duration::from_millis(1), @@ -357,7 +360,7 @@ async fn delivery_retry_transient_blip_emits_failed_event_for_present_worker() { ); let (sdk_out_tx, mut sdk_out_rx) = mpsc::channel(4); - emit_delivery_attempt_outcome(&sdk_out_tx, "del_blip", true, outcome) + emit_delivery_attempt_outcome(&sdk_out_tx, &DeliveryId::new("del_blip"), true, outcome) .await .expect("failed outcome should emit to sdk_out_tx"); @@ -392,16 +395,16 @@ async fn delivery_retry_success_clears_stale_last_error() { let worker_name = "worker-clear-error"; let mut workers = make_worker_registry_with_worker(worker_name).await; let mut pending_deliveries = HashMap::from([( - "del_clear".to_string(), + DeliveryId::new("del_clear"), PendingDelivery { - worker_name: worker_name.to_string(), + worker_name: WorkerName::from(worker_name), delivery: RelayDelivery { - delivery_id: "del_clear".to_string(), - event_id: "evt_clear".to_string(), - workspace_id: Some("ws_demo".to_string()), - workspace_alias: Some("Demo".to_string()), + delivery_id: DeliveryId::new("del_clear"), + event_id: EventId::new("evt_clear"), + workspace_id: Some(WorkspaceId::new("ws_demo")), + workspace_alias: Some(WorkspaceAlias::new("Demo")), from: "orchestrator".to_string(), - target: worker_name.to_string(), + target: MessageTarget::new(worker_name), body: "clear stale error".to_string(), thread_id: None, priority: Some(2), @@ -415,7 +418,7 @@ async fn delivery_retry_success_clears_stale_last_error() { )]); let outcome = retry_pending_delivery( - "del_clear", + &DeliveryId::new("del_clear"), &mut workers, &mut pending_deliveries, Duration::from_millis(1), @@ -1174,18 +1177,18 @@ fn http_api_timeout_windows_use_default_and_env_override() { #[test] fn drop_pending_for_worker_removes_only_matching_entries() { - let mut pending = HashMap::new(); + let mut pending: HashMap = HashMap::new(); pending.insert( - "del_1".to_string(), + DeliveryId::new("del_1"), PendingDelivery { - worker_name: "A".to_string(), + worker_name: WorkerName::from("A"), delivery: RelayDelivery { - delivery_id: "del_1".to_string(), - event_id: "evt_1".to_string(), - workspace_id: Some("ws_test".to_string()), - workspace_alias: Some("test".to_string()), + delivery_id: DeliveryId::new("del_1"), + event_id: EventId::new("evt_1"), + workspace_id: Some(WorkspaceId::new("ws_test")), + workspace_alias: Some(WorkspaceAlias::new("test")), from: "x".to_string(), - target: "#general".to_string(), + target: MessageTarget::new("#general"), body: "hello".to_string(), thread_id: None, priority: None, @@ -1198,16 +1201,16 @@ fn drop_pending_for_worker_removes_only_matching_entries() { }, ); pending.insert( - "del_2".to_string(), + DeliveryId::new("del_2"), PendingDelivery { - worker_name: "B".to_string(), + worker_name: WorkerName::from("B"), delivery: RelayDelivery { - delivery_id: "del_2".to_string(), - event_id: "evt_2".to_string(), - workspace_id: Some("ws_test".to_string()), - workspace_alias: Some("test".to_string()), + delivery_id: DeliveryId::new("del_2"), + event_id: EventId::new("evt_2"), + workspace_id: Some(WorkspaceId::new("ws_test")), + workspace_alias: Some(WorkspaceAlias::new("test")), from: "y".to_string(), - target: "#general".to_string(), + target: MessageTarget::new("#general"), body: "world".to_string(), thread_id: None, priority: None, @@ -1229,14 +1232,14 @@ fn drop_pending_for_worker_removes_only_matching_entries() { #[tokio::test] async fn dropped_pending_deliveries_emit_terminal_message_failures() { let pending = PendingDelivery { - worker_name: "A".to_string(), + worker_name: WorkerName::from("A"), delivery: RelayDelivery { - delivery_id: "del_1".to_string(), - event_id: "evt_1".to_string(), - workspace_id: Some("ws_test".to_string()), - workspace_alias: Some("test".to_string()), + delivery_id: DeliveryId::new("del_1"), + event_id: EventId::new("evt_1"), + workspace_id: Some(WorkspaceId::new("ws_test")), + workspace_alias: Some(WorkspaceAlias::new("test")), from: "Lead".to_string(), - target: "A".to_string(), + target: MessageTarget::new("A"), body: "hello".to_string(), thread_id: None, priority: None, @@ -1271,14 +1274,14 @@ async fn dropped_pending_deliveries_emit_terminal_message_failures() { #[test] fn should_clear_pending_delivery_when_event_id_matches() { let pending = PendingDelivery { - worker_name: "A".to_string(), + worker_name: WorkerName::from("A"), delivery: RelayDelivery { - delivery_id: "del_1".to_string(), - event_id: "evt_1".to_string(), - workspace_id: Some("ws_test".to_string()), - workspace_alias: Some("test".to_string()), + delivery_id: DeliveryId::new("del_1"), + event_id: EventId::new("evt_1"), + workspace_id: Some(WorkspaceId::new("ws_test")), + workspace_alias: Some(WorkspaceAlias::new("test")), from: "x".to_string(), - target: "#general".to_string(), + target: MessageTarget::new("#general"), body: "hello".to_string(), thread_id: None, priority: None, @@ -1303,16 +1306,16 @@ fn should_clear_pending_delivery_when_event_id_matches() { #[test] fn clear_pending_delivery_returns_none_for_stale_event_id() { let mut pending = HashMap::from([( - "del_1".to_string(), + DeliveryId::new("del_1"), PendingDelivery { - worker_name: "A".to_string(), + worker_name: WorkerName::from("A"), delivery: RelayDelivery { - delivery_id: "del_1".to_string(), - event_id: "evt_current".to_string(), - workspace_id: Some("ws_test".to_string()), - workspace_alias: Some("test".to_string()), + delivery_id: DeliveryId::new("del_1"), + event_id: EventId::new("evt_current"), + workspace_id: Some(WorkspaceId::new("ws_test")), + workspace_alias: Some(WorkspaceAlias::new("test")), from: "x".to_string(), - target: "#general".to_string(), + target: MessageTarget::new("#general"), body: "hello".to_string(), thread_id: None, priority: None, @@ -1340,14 +1343,14 @@ fn clear_pending_delivery_returns_none_for_stale_event_id() { #[test] fn should_clear_pending_delivery_without_event_id_for_compatibility() { let pending = PendingDelivery { - worker_name: "A".to_string(), + worker_name: WorkerName::from("A"), delivery: RelayDelivery { - delivery_id: "del_1".to_string(), - event_id: "evt_1".to_string(), - workspace_id: Some("ws_test".to_string()), - workspace_alias: Some("test".to_string()), + delivery_id: DeliveryId::new("del_1"), + event_id: EventId::new("evt_1"), + workspace_id: Some(WorkspaceId::new("ws_test")), + workspace_alias: Some(WorkspaceAlias::new("test")), from: "x".to_string(), - target: "#general".to_string(), + target: MessageTarget::new("#general"), body: "hello".to_string(), thread_id: None, priority: None, @@ -1922,15 +1925,15 @@ fn ephemeral_paths_are_unique_per_broker_instance() { #[test] fn http_api_spawn_spec_defaults_to_pty_runtime() { let spec = build_http_api_spawn_spec( - "worker-a".to_string(), + WorkerName::from("worker-a"), "codex".to_string(), None, Some("o3".to_string()), vec!["--fast".to_string()], - vec!["general".to_string()], + vec![ChannelName::from("general")], Some("/tmp/project".to_string()), Some("core".to_string()), - Some("Lead".to_string()), + Some(WorkerName::from("Lead")), Some("subagent".to_string()), None, ) @@ -1945,12 +1948,12 @@ fn http_api_spawn_spec_defaults_to_pty_runtime() { #[test] fn http_api_spawn_spec_uses_headless_runtime_for_supported_providers() { let spec = build_http_api_spawn_spec( - "worker-a".to_string(), + WorkerName::from("worker-a"), "opencode".to_string(), Some("headless".to_string()), Some("ignored".to_string()), vec![], - vec!["general".to_string()], + vec![ChannelName::from("general")], None, None, None, @@ -2005,12 +2008,12 @@ fn headless_provider_command_opencode_places_flags_before_task() { #[test] fn http_api_spawn_spec_rejects_unknown_headless_providers() { let error = build_http_api_spawn_spec( - "worker-a".to_string(), + WorkerName::from("worker-a"), "codex".to_string(), Some("headless".to_string()), None, vec![], - vec!["general".to_string()], + vec![ChannelName::from("general")], None, None, None, diff --git a/crates/broker/src/runtime/util.rs b/crates/broker/src/runtime/util.rs index b54cdb658..0a7095c9c 100644 --- a/crates/broker/src/runtime/util.rs +++ b/crates/broker/src/runtime/util.rs @@ -188,15 +188,15 @@ pub(crate) fn channels_from_csv(raw: &str) -> Vec { /// Reads RELAY_DEFAULT_CHANNELS (comma-separated) or falls back to the /// broker's default channels: vec!["general", "engineering"] — both created /// at startup by ensure_default_channels(). -pub(crate) fn default_spawn_channels() -> Vec { +pub(crate) fn default_spawn_channels() -> Vec { if let Ok(raw) = std::env::var("RELAY_DEFAULT_CHANNELS") { let parsed = channels_from_csv(&raw); if !parsed.is_empty() { - return parsed; + return parsed.into_iter().map(ChannelName::from).collect(); } } // channels: ["general", "engineering"] (must match ensure_default_channels) - vec!["general".to_string(), "engineering".to_string()] + vec![ChannelName::new("general"), ChannelName::new("engineering")] } pub(crate) fn command_targets_self(cmd_event: &BrokerCommandEvent, self_agent_id: &str) -> bool { diff --git a/crates/broker/src/runtime/worker_events.rs b/crates/broker/src/runtime/worker_events.rs index abb5a16a4..c97ae8814 100644 --- a/crates/broker/src/runtime/worker_events.rs +++ b/crates/broker/src/runtime/worker_events.rs @@ -211,7 +211,7 @@ impl BrokerRuntime { "delivery_failed", ); if pending_for_failure.is_some() && !delivery_id.is_empty() { - terminal_failed_deliveries.insert(delivery_id.to_string()); + terminal_failed_deliveries.insert(DeliveryId::from(delivery_id)); } let _ = send_event( sdk_out_tx, diff --git a/crates/broker/src/scheduler.rs b/crates/broker/src/scheduler.rs index 1cf462751..ef20c2824 100644 --- a/crates/broker/src/scheduler.rs +++ b/crates/broker/src/scheduler.rs @@ -3,6 +3,7 @@ use std::{ time::{Duration, Instant}, }; +use crate::ids::MessageTarget; use crate::types::{InjectRequest, RelayPriority}; #[derive(Debug, Clone)] @@ -23,7 +24,7 @@ pub struct Scheduler { coalesce_window: Duration, max_hold: Duration, last_human_keypress: Option, - pending: HashMap<(String, String), CoalesceState>, + pending: HashMap<(String, MessageTarget), CoalesceState>, } impl Scheduler { @@ -96,7 +97,7 @@ impl Scheduler { } pub fn drain_ready(&mut self, now: Instant) -> Vec { - let ready_keys: Vec<(String, String)> = self + let ready_keys: Vec<(String, MessageTarget)> = self .pending .iter() .filter_map(|(key, state)| { diff --git a/crates/broker/src/supervisor.rs b/crates/broker/src/supervisor.rs index d1c00f28e..4f4fcb383 100644 --- a/crates/broker/src/supervisor.rs +++ b/crates/broker/src/supervisor.rs @@ -226,7 +226,7 @@ mod tests { fn test_spec(name: &str) -> AgentSpec { AgentSpec { - name: name.to_string(), + name: crate::ids::WorkerName::from(name), runtime: AgentRuntime::Pty, provider: None, cli: Some("claude".to_string()), @@ -236,7 +236,7 @@ mod tests { shadow_of: None, shadow_mode: None, args: vec![], - channels: vec!["general".to_string()], + channels: vec![crate::ids::ChannelName::from("general")], restart_policy: None, } } diff --git a/crates/broker/src/types.rs b/crates/broker/src/types.rs index bfe1099fa..6c3790b58 100644 --- a/crates/broker/src/types.rs +++ b/crates/broker/src/types.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; +use crate::ids::{ + AgentId, ChannelName, EventId, MessageTarget, ThreadId, WorkerName, WorkspaceAlias, WorkspaceId, +}; use crate::protocol::MessageInjectionMode; /// Per-worker inbound delivery mode controlling how inbound relay messages are @@ -64,18 +67,18 @@ pub struct PendingRelayMessage { /// name, or sentinel like `"thread"`. Used as the `target` arg to /// `queue_and_try_delivery_raw` on drain so the re-injected /// message matches the original routing. - pub target: String, + pub target: MessageTarget, /// Original thread id, when the inbound was a thread reply. #[serde(default, skip_serializing_if = "Option::is_none")] - pub thread_id: Option, + pub thread_id: Option, /// Original workspace id, when known. Channel + DM routing both /// depend on this; dropping it would attribute the flushed /// message to the wrong workspace. #[serde(default, skip_serializing_if = "Option::is_none")] - pub workspace_id: Option, + pub workspace_id: Option, /// Original workspace alias (display name), when known. #[serde(default, skip_serializing_if = "Option::is_none")] - pub workspace_alias: Option, + pub workspace_alias: Option, /// Original delivery priority. 0 = P0, …, 4 = P4. Defaults to 2 /// (P2) when the source didn't carry a priority. #[serde(default = "default_priority")] @@ -89,7 +92,7 @@ pub struct PendingRelayMessage { /// Inbound event_id when the source carried one. Preserved for /// telemetry / dedup parity with the auto-inject path. #[serde(default, skip_serializing_if = "Option::is_none")] - pub event_id: Option, + pub event_id: Option, } fn default_priority() -> u8 { @@ -139,17 +142,17 @@ pub enum SenderKind { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct InboundRelayEvent { - pub event_id: String, - pub workspace_id: String, + pub event_id: EventId, + pub workspace_id: WorkspaceId, #[serde(default, skip_serializing_if = "Option::is_none")] - pub workspace_alias: Option, + pub workspace_alias: Option, pub kind: InboundKind, pub from: String, - pub sender_agent_id: Option, + pub sender_agent_id: Option, pub sender_kind: SenderKind, - pub target: String, + pub target: MessageTarget, pub text: String, - pub thread_id: Option, + pub thread_id: Option, pub priority: RelayPriority, } @@ -160,21 +163,21 @@ pub struct InboundRelayEvent { pub struct BrokerCommandEvent { /// The slash command name (e.g. "/spawn", "/release"). pub command: String, - pub workspace_id: String, - pub workspace_alias: Option, + pub workspace_id: WorkspaceId, + pub workspace_alias: Option, /// Channel the command was invoked in. - pub channel: String, + pub channel: ChannelName, /// Agent ID or name of the invoker. pub invoked_by: String, /// Target command handler agent ID, when provided by Relaycast. - pub handler_agent_id: Option, + pub handler_agent_id: Option, /// Structured parameters for the command. pub payload: BrokerCommandPayload, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SpawnParams { - pub name: String, + pub name: WorkerName, pub cli: String, #[serde(default)] pub args: Vec, @@ -182,7 +185,7 @@ pub struct SpawnParams { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ReleaseParams { - pub name: String, + pub name: WorkerName, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -194,11 +197,11 @@ pub enum BrokerCommandPayload { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct InjectRequest { pub id: String, - pub workspace_id: String, + pub workspace_id: WorkspaceId, #[serde(default, skip_serializing_if = "Option::is_none")] - pub workspace_alias: Option, + pub workspace_alias: Option, pub from: String, - pub target: String, + pub target: MessageTarget, pub body: String, pub priority: RelayPriority, pub attempts: u32, @@ -328,7 +331,7 @@ mod inbound_delivery_tests { // Target defaults to the sender's name in tests that don't // care about routing — the gating logic only inspects mode // / queue length, not the routing fields. - target: "worker".to_string(), + target: MessageTarget::new("worker"), thread_id: None, workspace_id: None, workspace_alias: None, @@ -371,14 +374,14 @@ mod inbound_delivery_tests { let queued = PendingRelayMessage { from: "Bob".to_string(), body: "ship it".to_string(), - target: "#general".to_string(), - thread_id: Some("thr_abc".to_string()), - workspace_id: Some("ws_demo".to_string()), - workspace_alias: Some("Demo".to_string()), + target: MessageTarget::new("#general"), + thread_id: Some(ThreadId::new("thr_abc")), + workspace_id: Some(WorkspaceId::new("ws_demo")), + workspace_alias: Some(WorkspaceAlias::new("Demo")), priority: 1, mode: MessageInjectionMode::Steer, queued_at_ms: 123_456, - event_id: Some("evt_xyz".to_string()), + event_id: Some(EventId::new("evt_xyz")), }; let mut state = InboundDeliveryState::new(InboundDeliveryMode::ManualFlush); state.accept_inbound(queued.clone()); diff --git a/crates/broker/src/worker.rs b/crates/broker/src/worker.rs index 2b952c17e..d4ecc6e41 100644 --- a/crates/broker/src/worker.rs +++ b/crates/broker/src/worker.rs @@ -6,6 +6,7 @@ use std::{ }; use crate::{ + ids::{RequestId, WorkerName}, metrics::MetricsCollector, protocol::{AgentRuntime, AgentSpec, ProtocolEnvelope, RelayDelivery, PROTOCOL_VERSION}, relaycast::configure_relaycast_mcp_with_result, @@ -35,7 +36,7 @@ pub(crate) mod detection; pub(crate) struct WorkerHandle { pub(crate) spec: AgentSpec, pub(crate) parent: Option, - pub(crate) workspace_id: Option, + pub(crate) workspace_id: Option, pub(crate) child: Child, pub(crate) stdin: ChildStdin, pub(crate) spawned_at: Instant, @@ -65,15 +66,15 @@ impl AgentWorkState { #[derive(Debug, Clone)] pub(crate) enum WorkerEvent { - Message { name: String, value: Value }, + Message { name: WorkerName, value: Value }, } pub(crate) struct WorkerRegistry { - pub(crate) workers: HashMap, + pub(crate) workers: HashMap, event_tx: mpsc::Sender, worker_env: Vec<(String, String)>, worker_logs_dir: PathBuf, - pub(crate) initial_tasks: HashMap, + pub(crate) initial_tasks: HashMap, pub(crate) supervisor: Supervisor, pub(crate) metrics: MetricsCollector, } @@ -204,7 +205,7 @@ impl WorkerRegistry { idle_threshold_secs: Option, worker_relay_api_key: Option, skip_relay_prompt: bool, - workspace_id: Option, + workspace_id: Option, agent_result: Option, ) -> Result { let mut spec = spec; @@ -476,7 +477,7 @@ impl WorkerRegistry { &mut self, name: &str, msg_type: &str, - request_id: Option, + request_id: Option, payload: Value, ) -> Result<()> { let handle = self @@ -554,7 +555,7 @@ impl WorkerRegistry { } pub(crate) async fn shutdown_all(&mut self) -> Result<()> { - let names: Vec = self.workers.keys().cloned().collect(); + let names: Vec = self.workers.keys().cloned().collect(); for name in names { if let Err(error) = self.release(&name).await { tracing::warn!(target = "agent_relay::broker", name = %name, error = %error, "worker shutdown failed"); @@ -565,8 +566,8 @@ impl WorkerRegistry { pub(crate) async fn reap_exited( &mut self, - ) -> Result, Option, Option)>> { - let names: Vec = self.workers.keys().cloned().collect(); + ) -> Result, Option, Option)>> { + let names: Vec = self.workers.keys().cloned().collect(); let mut exited = Vec::new(); for name in names { let (status, gone_via_kill0) = if let Some(handle) = self.workers.get_mut(&name) { @@ -851,7 +852,7 @@ fn codex_models_json_contains_model(bytes: &[u8], model: &str) -> Option { fn spawn_worker_reader( tx: mpsc::Sender, - name: String, + name: WorkerName, stream_name: &'static str, reader: R, parse_json: bool, diff --git a/crates/broker/src/wrap.rs b/crates/broker/src/wrap.rs index 0aeef9163..fe9045073 100644 --- a/crates/broker/src/wrap.rs +++ b/crates/broker/src/wrap.rs @@ -4,6 +4,7 @@ use std::time::{Duration, Instant}; use crate::{ control::{can_release_child, is_human_sender}, dedup::DedupCache, + ids::{DeliveryId, EventId, MessageTarget, WorkspaceAlias, WorkspaceId}, pty::PtySession, relaycast::{ agent_name_eq, is_self_name, map_ws_broker_command, map_ws_event, @@ -55,11 +56,11 @@ const GEMINI_ACTION_COOLDOWN: Duration = Duration::from_secs(2); #[derive(Debug, Clone)] pub(crate) struct PendingWrapInjection { pub(crate) from: String, - pub(crate) event_id: String, - pub(crate) workspace_id: Option, - pub(crate) workspace_alias: Option, + pub(crate) event_id: EventId, + pub(crate) workspace_id: Option, + pub(crate) workspace_alias: Option, pub(crate) body: String, - pub(crate) target: String, + pub(crate) target: MessageTarget, pub(crate) queued_at: Instant, } @@ -622,7 +623,7 @@ pub(crate) async fn run_wrap( ws }) .collect(); - let workspace_lookup: std::collections::HashMap = workspaces + let workspace_lookup: std::collections::HashMap = workspaces .iter() .cloned() .map(|workspace| (workspace.workspace_id.clone(), workspace)) @@ -1246,7 +1247,7 @@ pub(crate) async fn run_wrap( // Push to pending verifications for echo verification pending_verifications.push_back(PendingVerification { - delivery_id: format!("wrap_{}", pending.event_id), + delivery_id: DeliveryId::new(format!("wrap_{}", pending.event_id)), event_id: pending.event_id, expected_echo: injection, injected_at: Instant::now(),