From 02dec50a9f23874c55a610bca95e7e2f9575a993 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Tue, 9 Jun 2026 15:06:40 -0400 Subject: [PATCH] fix(broker): forward harness to relaycast backend (SDK 2.3.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server-side telemetry on the relaycast backend recorded `harness: "unknown"` for 100% of events, even though the broker already detects the orchestrator harness (claude-code / codex / cursor / …) for its own PostHog events. The detected value was siloed in TelemetryClient and never forwarded to the relaycast SDK, so the `X-Relaycast-Harness` header (HTTP) and `?harness=` query (WS) were never sent and the backend defaulted to "unknown". Forwarding requires SDK >= 2.3.0 (`with_harness` was added in relaycast #161, published as 2.3.0). Bumping `=2.0.0` -> `=2.3.0` also pulls in the v8 event contract the gateway already emits, which is a net fix: the broker at 2.0.0 only recognized the pre-v8 `reaction.added` / `agent.online` event types, while the deployed gateway sends `message.reacted` / `agent.status.*` — so reactions and presence were being silently dropped. Changes: - Bump `relaycast` SDK `=2.0.0` -> `=2.3.0`. - Cache the detected harness in `telemetry::orchestrator_harness{,_opt}()` (process-tree detection walks parent PIDs; resolve once) and reuse it for both our PostHog events and the forwarded value. - Forward the harness at all three relaycast client sites: the WS handshake (`WsClientOptions::with_harness`) and both HTTP `build_relay_client` helpers (`RelayCastOptions::with_harness`). - Inject `AGENT_RELAY_HARNESS` into spawned-agent env via `spawn_env_vars` so JS-SDK agents (`@relaycast/sdk`, env-only harness resolution, no process-tree fallback) report it too. Threaded as a parameter for deterministic tests. - Update 3 stale bridge fixtures to the v8 event names the gateway emits (`message.reacted` with `action`, `agent.status.active`); fix `drops_reaction_without_channel` to exercise the no-channel drop rather than an unknown-type drop. Tests: 701 broker lib tests pass (+2 new harness env tests); clippy clean; fmt clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 4 ++-- crates/broker/Cargo.toml | 2 +- crates/broker/src/relaycast/auth.rs | 5 +++- crates/broker/src/relaycast/bridge.rs | 23 ++++++++++++++----- crates/broker/src/relaycast/ws.rs | 18 +++++++++++---- crates/broker/src/spawner.rs | 33 ++++++++++++++++++++++++++- crates/broker/src/telemetry.rs | 21 ++++++++++++++++- crates/broker/src/wrap.rs | 1 + 8 files changed, 90 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38d3c221f..22be85300 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1999,9 +1999,9 @@ checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" [[package]] name = "relaycast" -version = "2.0.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0524afca2339fa727779b1f914ff47272f62873ac3119d76d198062ac509c6e" +checksum = "364a29a475378ca37cf922ae11733d87f2fdead11d54d41880a6e6f73d94866a" dependencies = [ "futures-util", "reqwest", diff --git a/crates/broker/Cargo.toml b/crates/broker/Cargo.toml index 27c9be125..ab3c770d8 100644 --- a/crates/broker/Cargo.toml +++ b/crates/broker/Cargo.toml @@ -29,7 +29,7 @@ serde_json = "1.0" sha2 = "0.10" shlex = "1.3" thiserror = "2.0" -relaycast = "=2.0.0" +relaycast = "=2.3.0" tokio = { version = "1.44", features = ["full"] } tracing = "0.1" tracing-appender = "0.2" diff --git a/crates/broker/src/relaycast/auth.rs b/crates/broker/src/relaycast/auth.rs index ec51b9153..cba3b0ffe 100644 --- a/crates/broker/src/relaycast/auth.rs +++ b/crates/broker/src/relaycast/auth.rs @@ -867,7 +867,10 @@ fn auth_http_status(err: &anyhow::Error) -> Option { /// Build a `RelayCast` workspace client from an API key and base URL. fn build_relay_client(api_key: &str, base_url: &str) -> Result { - let opts = RelayCastOptions::new(api_key).with_base_url(base_url); + let mut opts = RelayCastOptions::new(api_key).with_base_url(base_url); + if let Some(harness) = crate::telemetry::orchestrator_harness_opt() { + opts = opts.with_harness(harness); + } RelayCast::new(opts).map_err(|e| anyhow::anyhow!("{e}")) } diff --git a/crates/broker/src/relaycast/bridge.rs b/crates/broker/src/relaycast/bridge.rs index 984800d6c..56af4467f 100644 --- a/crates/broker/src/relaycast/bridge.rs +++ b/crates/broker/src/relaycast/bridge.rs @@ -361,8 +361,11 @@ mod tests { #[test] fn maps_presence_top_level() { + // v8 contract: presence transitions are `agent.status.active` / + // `agent.status.offline` (the gateway's engine), not the pre-v8 + // `agent.online`. See engine presence adapter. let event = map_event(&json!({ - "type": "agent.online", + "type": "agent.status.active", "agent": { "name": "alice" } })) .unwrap(); @@ -418,14 +421,18 @@ mod tests { #[test] fn maps_reaction_added() { + // v8 contract: reactions arrive as a single `message.reacted` event + // (with an `action` of "added"/"removed"), not the pre-v8 + // `reaction.added`/`reaction.removed`. See engine reaction route. let event = map_event(&json!({ - "type": "reaction.added", + "type": "message.reacted", + "action": "added", "message_id": "msg_42", "emoji": "thumbsup", "agent_name": "alice", "channel_name": "general" })) - .expect("should map reaction.added"); + .expect("should map message.reacted"); assert_eq!(event.kind, InboundKind::ReactionReceived); assert_eq!(event.from, "alice"); @@ -439,14 +446,17 @@ mod tests { #[test] fn maps_reaction_removed() { + // A reaction removal is the same `message.reacted` event with + // `action: "removed"`; the broker surfaces both as ReactionReceived. let event = map_event(&json!({ - "type": "reaction.removed", + "type": "message.reacted", + "action": "removed", "message_id": "msg_42", "emoji": "thumbsup", "agent_name": "bob", "channel_name": "dev" })) - .expect("should map reaction.removed"); + .expect("should map message.reacted"); assert_eq!(event.kind, InboundKind::ReactionReceived); assert_eq!(event.from, "bob"); @@ -458,7 +468,8 @@ mod tests { // Reactions without a channel (e.g. DM reactions) are dropped from PTY // injection — they're surfaced via the inbox API / piggyback instead. assert!(map_event(&json!({ - "type": "reaction.added", + "type": "message.reacted", + "action": "added", "message_id": "msg_99", "emoji": "rocket", "agent_name": "carol" diff --git a/crates/broker/src/relaycast/ws.rs b/crates/broker/src/relaycast/ws.rs index e486ff0f1..c2959d971 100644 --- a/crates/broker/src/relaycast/ws.rs +++ b/crates/broker/src/relaycast/ws.rs @@ -72,10 +72,15 @@ impl RelaycastWsClient { ); } - let mut ws = WsClient::new( - WsClientOptions::new(self.workspace_http.api_key.clone()) - .with_base_url(self.ws_base_url.clone()), - ); + let mut ws_opts = WsClientOptions::new(self.workspace_http.api_key.clone()) + .with_base_url(self.ws_base_url.clone()); + // Forward the detected harness as the `harness` query param (WS + // upgrades can't set custom headers) so the backend can attribute + // server events to claude-code / codex / etc. instead of "unknown". + if let Some(harness) = crate::telemetry::orchestrator_harness_opt() { + ws_opts = ws_opts.with_harness(harness); + } + let mut ws = WsClient::new(ws_opts); match ws.connect().await { Ok(()) => { @@ -769,7 +774,10 @@ impl RelaycastHttpClient { /// Build a `RelayCast` workspace client from an API key and base URL. fn build_relay_client(api_key: &str, base_url: &str) -> Option { - let opts = RelayCastOptions::new(api_key).with_base_url(base_url); + let mut opts = RelayCastOptions::new(api_key).with_base_url(base_url); + if let Some(harness) = crate::telemetry::orchestrator_harness_opt() { + opts = opts.with_harness(harness); + } RelayCast::new(opts).ok() } diff --git a/crates/broker/src/spawner.rs b/crates/broker/src/spawner.rs index 33ef34c78..6a8c5fc30 100644 --- a/crates/broker/src/spawner.rs +++ b/crates/broker/src/spawner.rs @@ -222,6 +222,7 @@ pub fn spawn_env_vars( channels: &str, workspaces_json: Option<&str>, default_workspace: Option<&str>, + harness: Option<&str>, ) -> Vec<(String, String)> { let mut env = vec![ ("RELAY_AGENT_NAME".to_string(), name.to_string()), @@ -232,6 +233,12 @@ pub fn spawn_env_vars( ("RELAY_CHANNELS".to_string(), channels.to_string()), ("RELAY_STRICT_AGENT_NAME".to_string(), "1".to_string()), ]; + // Forward the detected harness so spawned agents using the JS SDK + // (`@relaycast/sdk`, which resolves harness from env and has no + // process-tree fallback) report it to the backend instead of "unknown". + if let Some(harness) = harness { + env.push(("AGENT_RELAY_HARNESS".to_string(), harness.to_string())); + } if let Some(workspaces_json) = workspaces_json .map(str::trim) .filter(|value| !value.is_empty()) @@ -270,7 +277,31 @@ mod tests { use nix::unistd::{getsid, Pid}; use tokio::process::Command; - use super::{terminate_child, Spawner}; + use super::{spawn_env_vars, terminate_child, Spawner}; + + #[test] + fn spawn_env_vars_forwards_harness_when_present() { + let env = spawn_env_vars( + "a", + "rk_live_x", + "https://gw", + "#c", + None, + None, + Some("codex"), + ); + let harness = env + .iter() + .find(|(k, _)| k == "AGENT_RELAY_HARNESS") + .map(|(_, v)| v.as_str()); + assert_eq!(harness, Some("codex")); + } + + #[test] + fn spawn_env_vars_omits_harness_when_absent() { + let env = spawn_env_vars("a", "rk_live_x", "https://gw", "#c", None, None, None); + assert!(env.iter().all(|(k, _)| k != "AGENT_RELAY_HARNESS")); + } #[tokio::test] async fn release_terminates_child_process() { diff --git a/crates/broker/src/telemetry.rs b/crates/broker/src/telemetry.rs index 3f43b187b..f5933b31d 100644 --- a/crates/broker/src/telemetry.rs +++ b/crates/broker/src/telemetry.rs @@ -437,6 +437,25 @@ fn detect_orchestrator_harness() -> String { .unwrap_or_else(|| UNKNOWN_ORCHESTRATOR_HARNESS.to_string()) } +/// Process-wide cached orchestrator harness: explicit env override, else +/// process-tree detection (claude-code / codex / cursor / …). Detection walks +/// the parent-process chain, so we resolve it once and reuse the result for +/// both our own PostHog events and the harness we forward to the relaycast +/// backend. Returns the [`UNKNOWN_ORCHESTRATOR_HARNESS`] sentinel when +/// undetectable. +pub(crate) fn orchestrator_harness() -> &'static str { + static CACHE: std::sync::OnceLock = std::sync::OnceLock::new(); + CACHE.get_or_init(detect_orchestrator_harness) +} + +/// Like [`orchestrator_harness`] but `None` instead of the `"unknown"` +/// sentinel, so callers can skip forwarding a non-informative value (the +/// relaycast backend already defaults a missing harness to `"unknown"`). +pub(crate) fn orchestrator_harness_opt() -> Option<&'static str> { + let harness = orchestrator_harness(); + (harness != UNKNOWN_ORCHESTRATOR_HARNESS).then_some(harness) +} + /// Best-effort OS release string for telemetry tagging. Shells out to /// `uname -r` on unix (broker is unix-only anyway); returns `None` on /// failure so we just omit the property rather than risking a crash. @@ -562,7 +581,7 @@ impl TelemetryClient { cli_version: env_nonempty("AGENT_RELAY_CLI_VERSION"), sdk_version: env_nonempty("AGENT_RELAY_SDK_VERSION"), os_version: detect_os_version(), - orchestrator_harness: detect_orchestrator_harness(), + orchestrator_harness: orchestrator_harness().to_string(), } } diff --git a/crates/broker/src/wrap.rs b/crates/broker/src/wrap.rs index e85c32236..00f9183bb 100644 --- a/crates/broker/src/wrap.rs +++ b/crates/broker/src/wrap.rs @@ -1061,6 +1061,7 @@ pub(crate) async fn run_wrap( &channels, Some(&child_workspaces_json), default_workspace_id.as_deref(), + crate::telemetry::orchestrator_harness_opt(), ); // Pre-register the child agent so its MCP server // starts with a valid token (avoiding "Not registered"