Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ impl DedupCache {
}
}

pub fn remove(&mut self, id: &str) {
self.seen.remove(id);
self.order.retain(|(key, _)| key != id);
}

pub fn len(&self) -> usize {
self.seen.len()
}
Expand Down
98 changes: 76 additions & 22 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2552,16 +2552,22 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> {
let spec_for_state = spec.clone();
let effective_task = normalize_initial_task(task.clone());

// Pre-register the agent so its MCP server starts
// with a valid token (same as the SDK spawn_agent path).
// The WS event may include a token, but often doesn't —
// fall back to broker-side registration.
// Pre-register agent token. Claude doesn't need this — it
// bakes the API key into --mcp-config JSON and self-registers.
// Non-Claude CLIs need the token injected into their CLI args
// at spawn time, so we do a quick (3s) registration attempt.
let cli_command = parse_cli_command(&cli).map(|(cmd, _)| cmd).unwrap_or_else(|_| cli.clone());
let cli_name_lower = normalize_cli_name(&cli_command).to_lowercase();
let is_claude = cli_name_lower == "claude" || cli_name_lower.starts_with("claude:");
let worker_relay_key = {
let ws_token = relaycast_ws_spawn_token(&ws_value);
if ws_token.is_some() {
ws_token
} else if is_claude {
// Claude self-registers via its MCP server — skip blocking call
None
} else {
const REG_TIMEOUT: Duration = Duration::from_secs(15);
const REG_TIMEOUT: Duration = Duration::from_secs(3);
match tokio::time::timeout(
REG_TIMEOUT,
workspace_http.register_agent_token(&name, Some(cli.as_str())),
Expand All @@ -2584,7 +2590,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> {
Err(_) => {
tracing::warn!(
worker = %name,
"WS spawn pre-registration timed out; agent will self-register"
"WS spawn pre-registration timed out (3s); agent will self-register"
);
None
}
Expand Down Expand Up @@ -2739,13 +2745,18 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> {
let task_opt = Some(task).filter(|v| !v.trim().is_empty());
let effective_task = normalize_initial_task(task_opt.clone());

// Pre-register (same as primary WS spawn path above).
// Pre-register (same logic as primary WS spawn path).
let cli_command = parse_cli_command(&cli).map(|(cmd, _)| cmd).unwrap_or_else(|_| cli.clone());
let cli_name_lower = normalize_cli_name(&cli_command).to_lowercase();
let is_claude = cli_name_lower == "claude" || cli_name_lower.starts_with("claude:");
let worker_relay_key = {
let ws_token = relaycast_ws_spawn_token(&ws_value);
if ws_token.is_some() {
ws_token
} else if is_claude {
None
} else {
const REG_TIMEOUT: Duration = Duration::from_secs(15);
const REG_TIMEOUT: Duration = Duration::from_secs(3);
match tokio::time::timeout(
REG_TIMEOUT,
workspace_http.register_agent_token(&name, Some(cli.as_str())),
Expand All @@ -2760,7 +2771,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> {
None
}
Err(_) => {
tracing::warn!(worker = %name, "WS spawn fallback pre-registration timed out");
tracing::warn!(worker = %name, "WS spawn fallback pre-registration timed out (3s)");
None
}
}
Expand Down Expand Up @@ -4142,7 +4153,21 @@ async fn handle_sdk_frame(
None
};

workers
// Seed the dedup cache BEFORE spawning so that a Relaycast WS echo
// arriving while the spawn is in progress is correctly deduplicated.
// If spawn fails we remove the entry so retries are not blocked.
note_local_spawn_control_dedup(
dedup,
default_workspace_id.or_else(|| {
workspaces
.first()
.map(|workspace| workspace.workspace_id.as_str())
}),
&name,
worker_relay_key.as_deref(),
);

if let Err(err) = workers
.spawn(
payload.agent.clone(),
None,
Expand All @@ -4151,7 +4176,27 @@ async fn handle_sdk_frame(
payload.skip_relay_prompt,
None,
)
.await?;
.await
{
let err_msg = format!("{err:#}");
// Only clean up dedup if this was a genuinely new spawn attempt
// that failed, not a duplicate request for an already-running
// agent. When the error is "already exists" the dedup entry
// belongs to the prior successful spawn and must be preserved.
if !err_msg.contains("already exists") {
remove_local_spawn_control_dedup(
dedup,
default_workspace_id.or_else(|| {
workspaces
.first()
.map(|workspace| workspace.workspace_id.as_str())
}),
&name,
worker_relay_key.as_deref(),
);
}
return Err(err);
}

// Subscribe the broker's WebSocket to any custom channels the
// spawned agent needs so cloud-routed messages reach the broker.
Expand All @@ -4175,17 +4220,6 @@ async fn handle_sdk_frame(
.await;
}
}

note_local_spawn_control_dedup(
dedup,
default_workspace_id.or_else(|| {
workspaces
.first()
.map(|workspace| workspace.workspace_id.as_str())
}),
&name,
worker_relay_key.as_deref(),
);
if let Some(task) = effective_task.clone() {
workers.initial_tasks.insert(name.clone(), task);
}
Expand Down Expand Up @@ -5657,6 +5691,26 @@ fn note_local_spawn_control_dedup(
}
}

fn remove_local_spawn_control_dedup(
dedup: &mut DedupCache,
workspace_id: Option<&str>,
agent_name: &str,
relay_key: Option<&str>,
) {
let Some(workspace_id) = workspace_id else {
return;
};
let agent_name = agent_name.trim();
if !agent_name.is_empty() {
let key = relaycast_spawn_control_dedup_key(workspace_id, agent_name);
dedup.remove(&key);
}
if let Some(relay_key) = relay_key.map(str::trim).filter(|value| !value.is_empty()) {
let key = relaycast_spawn_control_dedup_key(workspace_id, relay_key);
dedup.remove(&key);
}
}

fn is_unknown_worker_error_message(message: &str) -> bool {
message.contains("unknown worker '")
}
Expand Down
5 changes: 4 additions & 1 deletion src/pty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ fn resolve_command_path(command: &str) -> String {
.unwrap_or_else(|| {
#[cfg(unix)]
{
OsString::from("/usr/local/bin:/usr/bin:/bin:/opt/homebrew/bin")
let home = env::var("HOME").unwrap_or_else(|_| String::from("/root"));
OsString::from(format!(
"{home}/.local/bin:{home}/.opencode/bin:{home}/.claude/local:/usr/local/bin:/usr/bin:/bin:/opt/homebrew/bin"
))
}
#[cfg(windows)]
{
Expand Down
Loading