From 9bb5a91bb2b578795463be2f17b26650e197d92e Mon Sep 17 00:00:00 2001 From: M3gA-Mind Date: Fri, 3 Apr 2026 03:06:51 +0530 Subject: [PATCH 1/6] feat(skills): core RPC data stats, tool timeout env, ping state merge (#191) - Add openhuman.skills_data_stats and SkillDataDirectoryStats (disk usage). - Centralize tool execution timeout via OPENHUMAN_TOOL_TIMEOUT_SECS (tool_timeout). - Apply timeout to skill event loop, agent tool loop, harness default, delegate. - Ping scheduler: merge connection_error into published_state via registry. - JSON-RPC e2e: assert skills_data_stats. Closes #214 Closes #218 Part of #213 (backend) Made-with: Cursor --- .env.example | 2 + src/openhuman/agent/harness/executor.rs | 2 +- src/openhuman/agent/harness/types.rs | 3 +- src/openhuman/agent/loop_/tool_loop.rs | 17 +++--- src/openhuman/mod.rs | 1 + src/openhuman/skills/ping_scheduler.rs | 47 ++++++----------- src/openhuman/skills/qjs_engine.rs | 52 +++++++++++++++++++ .../skills/qjs_skill_instance/event_loop.rs | 9 +++- src/openhuman/skills/schemas.rs | 27 ++++++++++ src/openhuman/skills/skill_registry.rs | 23 ++++++++ src/openhuman/tool_timeout.rs | 29 +++++++++++ src/openhuman/tools/delegate.rs | 9 ++-- tests/json_rpc_e2e.rs | 13 +++++ 13 files changed, 185 insertions(+), 49 deletions(-) create mode 100644 src/openhuman/tool_timeout.rs diff --git a/.env.example b/.env.example index 7dd775bf1..db828a2c1 100644 --- a/.env.example +++ b/.env.example @@ -46,6 +46,8 @@ OPENHUMAN_MODEL= OPENHUMAN_WORKSPACE= # [optional] Default: 0.7 OPENHUMAN_TEMPERATURE=0.7 +# [optional] Skill + agent tool execution timeout in seconds (default 120, max 3600) +# OPENHUMAN_TOOL_TIMEOUT_SECS= # --------------------------------------------------------------------------- # Runtime flags diff --git a/src/openhuman/agent/harness/executor.rs b/src/openhuman/agent/harness/executor.rs index b7f69cae1..c9d33111c 100644 --- a/src/openhuman/agent/harness/executor.rs +++ b/src/openhuman/agent/harness/executor.rs @@ -490,7 +490,7 @@ fn resolve_timeout(archetype: AgentArchetype, config: &OrchestratorConfig) -> Du .archetypes .get(&archetype.to_string()) .and_then(|ac| ac.timeout_secs) - .unwrap_or(120); + .unwrap_or_else(crate::openhuman::tool_timeout::tool_execution_timeout_secs); Duration::from_secs(secs) } diff --git a/src/openhuman/agent/harness/types.rs b/src/openhuman/agent/harness/types.rs index a4369cb3f..7eb01b36f 100644 --- a/src/openhuman/agent/harness/types.rs +++ b/src/openhuman/agent/harness/types.rs @@ -1,6 +1,7 @@ //! Shared types for the multi-agent harness: requests, results, task status. use super::archetypes::AgentArchetype; +use crate::openhuman::tool_timeout::tool_execution_timeout_duration; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -48,7 +49,7 @@ pub struct SubAgentRequest { } fn default_subagent_timeout() -> Duration { - Duration::from_secs(120) + tool_execution_timeout_duration() } fn is_default_timeout(d: &Duration) -> bool { diff --git a/src/openhuman/agent/loop_/tool_loop.rs b/src/openhuman/agent/loop_/tool_loop.rs index e71e6b7f8..0f91636c3 100644 --- a/src/openhuman/agent/loop_/tool_loop.rs +++ b/src/openhuman/agent/loop_/tool_loop.rs @@ -285,12 +285,9 @@ pub(crate) async fn run_tool_call_loop( ); let result = if let Some(tool) = find_tool(tools_registry, &call.name) { - // Execute with a 120-second timeout to prevent hangs. - match tokio::time::timeout( - std::time::Duration::from_secs(120), - tool.execute(call.arguments.clone()), - ) - .await + let tool_deadline = crate::openhuman::tool_timeout::tool_execution_timeout_duration(); + let timeout_secs = crate::openhuman::tool_timeout::tool_execution_timeout_secs(); + match tokio::time::timeout(tool_deadline, tool.execute(call.arguments.clone())).await { Ok(Ok(r)) => { if r.success { @@ -323,9 +320,13 @@ pub(crate) async fn run_tool_call_loop( tracing::error!( iteration, tool = call.name.as_str(), - "[agent_loop] tool execution timed out after 120s" + secs = timeout_secs, + "[agent_loop] tool execution timed out" ); - format!("Error: tool '{}' timed out after 120 seconds", call.name) + format!( + "Error: tool '{}' timed out after {} seconds", + call.name, timeout_secs + ) } } } else { diff --git a/src/openhuman/mod.rs b/src/openhuman/mod.rs index 0118fd37d..1077ecefc 100644 --- a/src/openhuman/mod.rs +++ b/src/openhuman/mod.rs @@ -38,6 +38,7 @@ pub mod service; pub mod skills; pub mod subconscious; pub mod team; +pub mod tool_timeout; pub mod tools; pub mod util; pub mod voice; diff --git a/src/openhuman/skills/ping_scheduler.rs b/src/openhuman/skills/ping_scheduler.rs index 559e0c858..9fa6dc9a2 100644 --- a/src/openhuman/skills/ping_scheduler.rs +++ b/src/openhuman/skills/ping_scheduler.rs @@ -11,6 +11,7 @@ //! Architecture follows the same pattern as `CronScheduler`: a background Tokio //! task with `tokio::select!` for a tick interval + a stop signal via a watch channel. +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -222,40 +223,22 @@ impl PingScheduler { } } _ => { - // Network or other error: update published state, keep running - if let Some(snap) = registry.get_skill(skill_id) { - // We need to update the skill's published_state through the - // registry. The SkillState is behind an Arc>, which - // we can reach via the snapshot's backing state. However, the - // registry only exposes snapshots (copies). We use an RPC - // message to let the skill instance update its own state. - // - // A simpler approach: directly update published_state via the - // SkillState Arc that the registry entry holds. Since - // SkillRegistry doesn't expose the Arc directly, we send a - // state/set RPC to the skill, which is the same mechanism - // the frontend uses. - let _ = snap; // used for logging context - - // Send a state update via RPC (skills handle "state/set" - // in their reverse-RPC handler, but here we update the - // published_state directly through the skill message loop). - let (tx, rx) = tokio::sync::oneshot::channel(); - let _ = registry.send_message( + // Network or other error: merge into published_state, keep running + let mut patch = HashMap::new(); + patch.insert( + "connection_status".to_string(), + serde_json::json!("error"), + ); + patch.insert( + "connection_error".to_string(), + serde_json::json!(error_message), + ); + if let Err(e) = registry.merge_published_state(skill_id, patch) { + log::warn!( + "[ping] Could not merge ping failure into published state for '{}': {}", skill_id, - SkillMessage::Rpc { - method: "state/set".to_string(), - params: serde_json::json!({ - "partial": { - "connection_status": "error", - "connection_error": error_message, - } - }), - reply: tx, - }, + e ); - // Don't block on the reply — fire-and-forget - let _ = tokio::time::timeout(Duration::from_secs(5), rx).await; } } } diff --git a/src/openhuman/skills/qjs_engine.rs b/src/openhuman/skills/qjs_engine.rs index f34e127db..993dcba97 100644 --- a/src/openhuman/skills/qjs_engine.rs +++ b/src/openhuman/skills/qjs_engine.rs @@ -879,4 +879,56 @@ impl RuntimeEngine { pub fn skill_data_dir(&self, skill_id: &str) -> PathBuf { self.skills_data_dir.join(skill_id) } + + /// Total file count and byte size under the skill's data directory (recursive). + pub fn skill_data_directory_stats(&self, skill_id: &str) -> SkillDataDirectoryStats { + let path = self.skill_data_dir(skill_id); + let exists = path.exists(); + let (total_bytes, file_count) = directory_byte_and_file_count(&path).unwrap_or((0, 0)); + SkillDataDirectoryStats { + exists, + path: path.display().to_string(), + total_bytes, + file_count, + } + } +} + +/// Disk usage for a skill's persisted data folder (exposed to the UI for sync summary). +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct SkillDataDirectoryStats { + pub exists: bool, + pub path: String, + pub total_bytes: u64, + pub file_count: u64, +} + +fn directory_byte_and_file_count(path: &std::path::Path) -> std::io::Result<(u64, u64)> { + use std::fs; + if !path.exists() { + return Ok((0, 0)); + } + let mut total_bytes = 0u64; + let mut file_count = 0u64; + fn walk( + path: &std::path::Path, + total_bytes: &mut u64, + file_count: &mut u64, + ) -> std::io::Result<()> { + let read = fs::read_dir(path)?; + for entry in read { + let entry = entry?; + let meta = entry.metadata()?; + let p = entry.path(); + if meta.is_dir() { + walk(&p, total_bytes, file_count)?; + } else if meta.is_file() { + *total_bytes += meta.len(); + *file_count += 1; + } + } + Ok(()) + } + walk(path, &mut total_bytes, &mut file_count)?; + Ok((total_bytes, file_count)) } diff --git a/src/openhuman/skills/qjs_skill_instance/event_loop.rs b/src/openhuman/skills/qjs_skill_instance/event_loop.rs index bf0e95eb1..149df2c10 100644 --- a/src/openhuman/skills/qjs_skill_instance/event_loop.rs +++ b/src/openhuman/skills/qjs_skill_instance/event_loop.rs @@ -6,6 +6,7 @@ use parking_lot::RwLock; use tokio::sync::mpsc; use crate::openhuman::memory::MemoryClientRef; +use crate::openhuman::tool_timeout::{tool_execution_timeout_duration, tool_execution_timeout_secs}; use crate::openhuman::skills::quickjs_libs::qjs_ops; use crate::openhuman::skills::types::{SkillMessage, SkillStatus, ToolResult}; @@ -233,7 +234,11 @@ pub(crate) async fn run_event_loop( ); } if tokio::time::Instant::now() >= ptc.deadline { - log::error!("[skill:{}] Async tool call timed out after 120s", skill_id); + log::error!( + "[skill:{}] Async tool call timed out after {}s", + skill_id, + tool_execution_timeout_secs() + ); // Dump JS error state for debugging let error_info = ctx .with(|js_ctx| { @@ -357,7 +362,7 @@ async fn handle_message( ); *pending_tool = Some(PendingToolCall { reply, - deadline: tokio::time::Instant::now() + Duration::from_secs(120), + deadline: tokio::time::Instant::now() + tool_execution_timeout_duration(), }); } Err(e) => { diff --git a/src/openhuman/skills/schemas.rs b/src/openhuman/skills/schemas.rs index 94e7f1021..3ccdd2a50 100644 --- a/src/openhuman/skills/schemas.rs +++ b/src/openhuman/skills/schemas.rs @@ -37,6 +37,7 @@ pub fn all_controller_schemas() -> Vec { skills_schema("data_read"), skills_schema("data_write"), skills_schema("data_dir"), + skills_schema("data_stats"), skills_schema("enable"), skills_schema("disable"), skills_schema("is_enabled"), @@ -142,6 +143,10 @@ pub fn all_registered_controllers() -> Vec { schema: skills_schema("data_dir"), handler: handle_skills_data_dir, }, + RegisteredController { + schema: skills_schema("data_stats"), + handler: handle_skills_data_stats, + }, RegisteredController { schema: skills_schema("enable"), handler: handle_skills_enable, @@ -525,6 +530,18 @@ fn skills_schema(function: &str) -> ControllerSchema { required: true, }], }, + "data_stats" => ControllerSchema { + namespace: "skills", + function: "data_stats", + description: "Recursive file count and byte size for a skill's data directory.", + inputs: vec![skill_id_input("The skill ID.")], + outputs: vec![FieldSchema { + name: "result", + ty: TypeSchema::Json, + comment: "exists, path, total_bytes, file_count.", + required: true, + }], + }, "enable" => ControllerSchema { namespace: "skills", function: "enable", @@ -891,6 +908,16 @@ fn handle_skills_data_dir(params: Map) -> ControllerFuture { }) } +fn handle_skills_data_stats(params: Map) -> ControllerFuture { + Box::pin(async move { + let p: SkillIdParams = + serde_json::from_value(Value::Object(params)).map_err(|e| e.to_string())?; + let engine = require_engine()?; + let stats = engine.skill_data_directory_stats(&p.skill_id); + serde_json::to_value(&stats).map_err(|e| e.to_string()) + }) +} + fn handle_skills_enable(params: Map) -> ControllerFuture { Box::pin(async move { let p: SkillIdParams = diff --git a/src/openhuman/skills/skill_registry.rs b/src/openhuman/skills/skill_registry.rs index 7fc8bef1e..bfdc9935a 100644 --- a/src/openhuman/skills/skill_registry.rs +++ b/src/openhuman/skills/skill_registry.rs @@ -307,6 +307,29 @@ impl SkillRegistry { self.skills.read().contains_key(skill_id) } + /// Merge `patch` into a running skill's `published_state` (e.g. ping scheduler health). + pub fn merge_published_state( + &self, + skill_id: &str, + patch: HashMap, + ) -> Result<(), String> { + let skills = self.skills.read(); + let entry = skills + .get(skill_id) + .ok_or_else(|| format!("Skill '{}' not found", skill_id))?; + let mut state = entry.state.write(); + if state.status != SkillStatus::Running { + return Err(format!( + "Skill '{}' is not running (status: {:?})", + skill_id, state.status + )); + } + for (k, v) in patch { + state.published_state.insert(k, v); + } + Ok(()) + } + /// Send a message to a specific skill's message loop. /// Returns an error if the skill is not registered or the channel is full. pub fn send_message(&self, skill_id: &str, msg: SkillMessage) -> Result<(), String> { diff --git a/src/openhuman/tool_timeout.rs b/src/openhuman/tool_timeout.rs new file mode 100644 index 000000000..157e7121c --- /dev/null +++ b/src/openhuman/tool_timeout.rs @@ -0,0 +1,29 @@ +//! Wall-clock timeouts for tool execution (skills runtime + agent loop). +//! +//! Override with the `OPENHUMAN_TOOL_TIMEOUT_SECS` environment variable (1–3600; default 120). + +use std::sync::OnceLock; +use std::time::Duration; + +const DEFAULT_SECS: u64 = 120; +const MAX_SECS: u64 = 3600; + +fn resolved_secs() -> u64 { + static SECS: OnceLock = OnceLock::new(); + *SECS.get_or_init(|| { + std::env::var("OPENHUMAN_TOOL_TIMEOUT_SECS") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|&n| (1..=MAX_SECS).contains(&n)) + .unwrap_or(DEFAULT_SECS) + }) +} + +/// Seconds — used for logging and matching frontend timeouts. +pub fn tool_execution_timeout_secs() -> u64 { + resolved_secs() +} + +pub fn tool_execution_timeout_duration() -> Duration { + Duration::from_secs(resolved_secs()) +} diff --git a/src/openhuman/tools/delegate.rs b/src/openhuman/tools/delegate.rs index 31f0699dd..675fa330b 100644 --- a/src/openhuman/tools/delegate.rs +++ b/src/openhuman/tools/delegate.rs @@ -1,5 +1,6 @@ use super::traits::{Tool, ToolResult}; use crate::openhuman::config::DelegateAgentConfig; +use crate::openhuman::tool_timeout::tool_execution_timeout_secs; use crate::openhuman::providers::{self, Provider}; use crate::openhuman::security::policy::ToolOperation; use crate::openhuman::security::SecurityPolicy; @@ -9,9 +10,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -/// Default timeout for sub-agent provider calls. -const DELEGATE_TIMEOUT_SECS: u64 = 120; - /// Tool that delegates a subtask to a named agent with a different /// provider/model configuration. Enables multi-agent workflows where /// a primary agent can hand off specialized work (research, coding, @@ -250,9 +248,10 @@ impl Tool for DelegateTool { let temperature = agent_config.temperature.unwrap_or(0.7); + let delegate_timeout_secs = tool_execution_timeout_secs(); // Wrap the provider call in a timeout to prevent indefinite blocking let result = tokio::time::timeout( - Duration::from_secs(DELEGATE_TIMEOUT_SECS), + Duration::from_secs(delegate_timeout_secs), provider.chat_with_system( agent_config.system_prompt.as_deref(), &full_prompt, @@ -269,7 +268,7 @@ impl Tool for DelegateTool { success: false, output: String::new(), error: Some(format!( - "Agent '{agent_name}' timed out after {DELEGATE_TIMEOUT_SECS}s" + "Agent '{agent_name}' timed out after {delegate_timeout_secs}s" )), }); } diff --git a/tests/json_rpc_e2e.rs b/tests/json_rpc_e2e.rs index 2ca3a444f..537dbba5c 100644 --- a/tests/json_rpc_e2e.rs +++ b/tests/json_rpc_e2e.rs @@ -1155,6 +1155,19 @@ async fn json_rpc_skills_runtime_start_tools_call_stop() { Some("e2e-runtime") ); + let data_stats = post_json_rpc( + &rpc_base, + 211, + "openhuman.skills_data_stats", + json!({"skill_id": "e2e-runtime"}), + ) + .await; + let ds = assert_no_jsonrpc_error(&data_stats, "skills_data_stats"); + assert_eq!(ds.get("exists"), Some(&json!(true))); + assert!(ds.get("path").and_then(Value::as_str).is_some()); + assert!(ds.get("total_bytes").and_then(Value::as_u64).is_some()); + assert!(ds.get("file_count").and_then(Value::as_u64).is_some()); + // 3. List tools let tools = post_json_rpc( &rpc_base, From 9a29b560e5f52371c46132437856ec520f49450d Mon Sep 17 00:00:00 2001 From: M3gA-Mind Date: Fri, 3 Apr 2026 03:07:00 +0530 Subject: [PATCH 2/6] feat(app): skills sync stats UI, reconnect resync, FE timeouts, chat errors (#191) - useSkillDataDirectoryStats + Skills.tsx merge disk stats with skill state. - resyncRunningSkillsAfterReconnect on socket connect; disconnectSkill OAuth cleanup in finally. - withTimeout for callTool/triggerSync; VITE_TOOL_TIMEOUT_SECS in app .env.example. - Structured ChatSendError in Conversations (data-chat-send-error-code). Closes #213 Closes #215 Closes #216 Closes #217 Closes #219 Made-with: Cursor --- app/.env.example | 4 ++ app/src/chat/chatSendError.ts | 24 ++++++++++ app/src/lib/skills/hooks.ts | 48 +++++++++++++++++++ app/src/lib/skills/manager.ts | 77 ++++++++++++++++++++----------- app/src/pages/Conversations.tsx | 42 +++++++++++++---- app/src/pages/Skills.tsx | 36 +++++++++++++-- app/src/services/socketService.ts | 5 +- app/src/utils/tauriCommands.ts | 2 +- app/src/utils/withTimeout.ts | 33 +++++++++++++ 9 files changed, 228 insertions(+), 43 deletions(-) create mode 100644 app/src/chat/chatSendError.ts create mode 100644 app/src/utils/withTimeout.ts diff --git a/app/.env.example b/app/.env.example index e681b986d..306d5ee28 100644 --- a/app/.env.example +++ b/app/.env.example @@ -21,3 +21,7 @@ VITE_DEV_JWT_TOKEN= # [optional] Dev-only: force onboarding flow to always show VITE_DEV_FORCE_ONBOARDING=false + +# [optional] Client-side timeout for skill callTool/triggerSync (seconds; default 120, max 3600). +# Should match OPENHUMAN_TOOL_TIMEOUT_SECS on the core when set. +# VITE_TOOL_TIMEOUT_SECS= diff --git a/app/src/chat/chatSendError.ts b/app/src/chat/chatSendError.ts new file mode 100644 index 000000000..cd6da051e --- /dev/null +++ b/app/src/chat/chatSendError.ts @@ -0,0 +1,24 @@ +/** Structured chat send / delivery errors (issue #219) — stable `code` for analytics and tests. */ + +export type ChatSendErrorCode = + | 'socket_disconnected' + | 'local_model_failed' + | 'cloud_send_failed' + | 'voice_transcription' + | 'microphone_unavailable' + | 'microphone_recording' + | 'microphone_access' + | 'voice_playback' + | 'safety_timeout'; + +export interface ChatSendError { + code: ChatSendErrorCode; + message: string; +} + +export function chatSendError( + code: ChatSendErrorCode, + message: string, +): ChatSendError { + return { code, message }; +} diff --git a/app/src/lib/skills/hooks.ts b/app/src/lib/skills/hooks.ts index 40aa0c988..c531cb582 100644 --- a/app/src/lib/skills/hooks.ts +++ b/app/src/lib/skills/hooks.ts @@ -7,6 +7,8 @@ import { useCallback, useEffect, useRef, useState } from 'react'; +import type { SkillSyncStatsLike } from '../../pages/skillsSyncUi'; +import { runtimeSkillDataStats } from '../../utils/tauriCommands'; import type { SkillConnectionStatus, SkillHostConnectionState } from './types'; import { onSkillStateChange } from './skillEvents'; import { @@ -209,3 +211,49 @@ export function useSkillConnectionInfo(skillId: string): { isInitialized: !!hostState?.is_initialized, }; } + +/** + * Disk usage under the skill data directory (from core RPC). Refreshes on skill events + * and periodically while the skill is connected. + */ +export function useSkillDataDirectoryStats( + skillId: string, + fetchEnabled: boolean, +): Pick | undefined { + const [stats, setStats] = useState< + Pick | undefined + >(undefined); + + useEffect(() => { + if (!fetchEnabled) { + setStats(undefined); + return; + } + let cancelled = false; + const load = async () => { + try { + const d = await runtimeSkillDataStats(skillId); + if (!cancelled) { + setStats({ + localDataBytes: d.total_bytes, + localFileCount: d.file_count, + }); + } + } catch { + if (!cancelled) setStats(undefined); + } + }; + void load(); + const interval = setInterval(load, 30_000); + const unsub = onSkillStateChange((id) => { + if (!id || id === skillId) void load(); + }); + return () => { + cancelled = true; + clearInterval(interval); + unsub(); + }; + }, [skillId, fetchEnabled]); + + return stats; +} diff --git a/app/src/lib/skills/manager.ts b/app/src/lib/skills/manager.ts index c7fd9768d..b8043e8a9 100644 --- a/app/src/lib/skills/manager.ts +++ b/app/src/lib/skills/manager.ts @@ -30,6 +30,7 @@ import { runtimeSkillDataRead, runtimeSkillDataWrite, } from "../../utils/tauriCommands"; +import { toolExecutionTimeoutMsFromEnv, withTimeout } from "../../utils/withTimeout"; // Env vars kept for reverse RPC compatibility (may be used by skills via state) @@ -110,6 +111,15 @@ class SkillManager { } } + /** + * After realtime socket reconnect: refresh tool lists for every running skill so + * `tool:sync` matches the Rust engine (issue #215). + */ + async resyncRunningSkillsAfterReconnect(): Promise { + const ids = [...this.runtimes.keys()]; + await Promise.all(ids.map((id) => this.activateSkill(id))); + } + /** * Activate a skill that has completed setup — list its tools and mark as ready. */ @@ -197,7 +207,12 @@ class SkillManager { console.error(`[SkillManager] callTool failed — skill "${skillId}" has no running runtime`); throw new Error(`Skill ${skillId} is not running`); } - const result = await runtime.callTool(name, args); + const timeoutMs = toolExecutionTimeoutMsFromEnv(); + const result = await withTimeout( + runtime.callTool(name, args), + timeoutMs, + `[SkillManager] callTool skill="${skillId}" tool="${name}"`, + ); console.log(`[SkillManager] callTool result skill="${skillId}" tool="${name}" isError=${result.isError}`); return result; } @@ -229,16 +244,25 @@ class SkillManager { * Progress updates are published to Redux via the skill's state fields. */ async triggerSync(skillId: string): Promise { + const timeoutMs = toolExecutionTimeoutMsFromEnv(); const runtime = this.runtimes.get(skillId); if (runtime) { - await runtime.triggerSync(); + await withTimeout( + runtime.triggerSync(), + timeoutMs, + `[SkillManager] triggerSync skill="${skillId}"`, + ); } else { // Try via core RPC pass-through try { - await callCoreRpc({ - method: "openhuman.skills_sync", - params: { skill_id: skillId }, - }); + await withTimeout( + callCoreRpc({ + method: "openhuman.skills_sync", + params: { skill_id: skillId }, + }), + timeoutMs, + `[SkillManager] skills_sync skill="${skillId}"`, + ); } catch { // Skill not running — skip sync silently } @@ -366,30 +390,27 @@ class SkillManager { // Revoke OAuth credential before stopping so the running skill can clean up // its in-memory state and the event loop deletes oauth_credential.json. let revokeSucceeded = false; - if (credentialId) { - try { - await rpcRevokeOAuth(skillId, credentialId); - revokeSucceeded = true; - } catch (err) { - console.debug( - "[SkillManager] oauth/revoked failed (runtime may be stopped):", - err, - ); - } + try { + await rpcRevokeOAuth(skillId, credentialId ?? "default"); + revokeSucceeded = true; + } catch (err) { + console.debug( + "[SkillManager] oauth/revoked failed (runtime may be stopped):", + err, + ); } - await this.stopSkill(skillId); - - // Host-side fallback: if the RPC couldn't reach the runtime (already stopped, - // or non-OAuth skill), delete the persisted credential file so it isn't - // restored on next start. - if (!revokeSucceeded) { - await removePersistedOAuthCredential(skillId).catch((err) => { - console.debug( - "[SkillManager] host-side credential cleanup failed:", - err, - ); - }); + try { + await this.stopSkill(skillId); + } finally { + if (!revokeSucceeded) { + await removePersistedOAuthCredential(skillId).catch((err) => { + console.debug( + "[SkillManager] host-side credential cleanup failed:", + err, + ); + }); + } } await rpcSetSetupComplete(skillId, false).catch(() => {}); diff --git a/app/src/pages/Conversations.tsx b/app/src/pages/Conversations.tsx index a8743875e..0b3ad6410 100644 --- a/app/src/pages/Conversations.tsx +++ b/app/src/pages/Conversations.tsx @@ -3,6 +3,7 @@ import { useEffect, useRef, useState } from 'react'; import Markdown from 'react-markdown'; import { useNavigate } from 'react-router-dom'; +import { type ChatSendError, chatSendError } from '../chat/chatSendError'; import { useLocalModelStatus } from '../hooks/useLocalModelStatus'; import { creditsApi, type TeamUsage } from '../services/api/creditsApi'; import { inferenceApi, type ModelInfo } from '../services/api/inferenceApi'; @@ -107,7 +108,7 @@ const Conversations = () => { const [selectedModel, setSelectedModel] = useState('agentic-v1'); const [isLoadingModels, setIsLoadingModels] = useState(false); const [isSending, setIsSending] = useState(false); - const [sendError, setSendError] = useState(null); + const [sendError, setSendError] = useState(null); const socketStatus = useAppSelector(selectSocketStatus); const [toolTimelineByThread, setToolTimelineByThread] = useState< Record @@ -568,7 +569,9 @@ const Conversations = () => { if (!trimmed || !selectedThreadId || isSending) return; if (!isLocalModelActiveRef.current && socketStatus !== 'connected') { - setSendError('Realtime socket is not connected.'); + setSendError( + chatSendError('socket_disconnected', 'Realtime socket is not connected.'), + ); return; } @@ -602,6 +605,12 @@ const Conversations = () => { sendingTimeoutRef.current = setTimeout(() => { console.warn('[chat] safety timeout: clearing isSending after 120s with no response'); setIsSending(false); + setSendError( + chatSendError( + 'safety_timeout', + 'No response from the assistant after 2 minutes. Try again or check your connection.', + ), + ); dispatch(setActiveThread(null)); sendingTimeoutRef.current = null; }, 120_000); @@ -648,7 +657,7 @@ const Conversations = () => { } catch (err) { pendingReactionRef.current.delete(sendingThreadId); const msg = err instanceof Error ? err.message : String(err); - setSendError(msg); + setSendError(chatSendError('local_model_failed', msg)); dispatch( addInferenceResponse({ content: 'Local model error — please try again.', @@ -674,7 +683,7 @@ const Conversations = () => { sendingTimeoutRef.current = null; } const msg = err instanceof Error ? err.message : String(err); - setSendError(msg); + setSendError(chatSendError('cloud_send_failed', msg)); setIsSending(false); dispatch(setActiveThread(null)); } @@ -719,7 +728,9 @@ const Conversations = () => { await handleSendMessage(transcript); } catch (err) { const message = err instanceof Error ? err.message : String(err); - setSendError(`Voice transcription failed: ${message}`); + setSendError( + chatSendError('voice_transcription', `Voice transcription failed: ${message}`), + ); setVoiceStatus(null); } finally { setIsTranscribing(false); @@ -730,7 +741,10 @@ const Conversations = () => { if (!rustChat || isSending || isTranscribing) return; if (!canUseMicrophoneApi) { setSendError( - 'Microphone capture is unavailable in this runtime. Use Text mode, or run the desktop app bundle with microphone permissions enabled.' + chatSendError( + 'microphone_unavailable', + 'Microphone capture is unavailable in this runtime. Use Text mode, or run the desktop app bundle with microphone permissions enabled.', + ), ); return; } @@ -766,7 +780,9 @@ const Conversations = () => { setIsRecording(false); mediaStreamRef.current?.getTracks().forEach(track => track.stop()); mediaStreamRef.current = null; - setSendError('Microphone recording failed.'); + setSendError( + chatSendError('microphone_recording', 'Microphone recording failed.'), + ); }; recorder.onstop = () => { void transcribeAndSendAudio(recorder.mimeType); @@ -779,7 +795,9 @@ const Conversations = () => { recorder.start(); } catch (err) { const message = err instanceof Error ? err.message : String(err); - setSendError(`Microphone access failed: ${message}`); + setSendError( + chatSendError('microphone_access', `Microphone access failed: ${message}`), + ); setVoiceStatus(null); } }; @@ -815,7 +833,7 @@ const Conversations = () => { await audio.play(); } catch { if (!cancelled) { - setSendError('Failed to play voice reply.'); + setSendError(chatSendError('voice_playback', 'Failed to play voice reply.')); } } finally { if (!cancelled) { @@ -1318,7 +1336,11 @@ const Conversations = () => { {sendError && (
-

{sendError}

+

+ {sendError.message} +