diff --git a/app/src/pages/Conversations.tsx b/app/src/pages/Conversations.tsx index 7fc33efc9..bb794e441 100644 --- a/app/src/pages/Conversations.tsx +++ b/app/src/pages/Conversations.tsx @@ -4,13 +4,14 @@ import Markdown from 'react-markdown'; import { useNavigate } from 'react-router-dom'; import { type ChatSendError, chatSendError } from '../chat/chatSendError'; -import { useLocalModelStatus } from '../hooks/useLocalModelStatus'; import { creditsApi, type TeamUsage } from '../services/api/creditsApi'; import { chatCancel, + type ChatSegmentEvent, chatSend, type ChatToolCallEvent, type ChatToolResultEvent, + segmentText, subscribeChatEvents, useRustChat, } from '../services/chatService'; @@ -28,14 +29,10 @@ import { setSelectedThread, } from '../store/threadSlice'; import type { ThreadMessage } from '../types/thread'; -import { getSegmentDelay, segmentMessage } from '../utils/messageSegmentation'; import { isTauri, - type LocalAiChatMessage, openhumanAutocompleteAccept, openhumanAutocompleteCurrent, - openhumanLocalAiChat, - openhumanLocalAiShouldReact, openhumanVoiceStatus, openhumanVoiceTranscribeBytes, openhumanVoiceTts, @@ -141,14 +138,7 @@ const Conversations = () => { Record >({}); const rustChat = useRustChat(); - const isLocalModelActive = useLocalModelStatus(); - const isLocalModelActiveRef = useRef(isLocalModelActive); - const [isDelivering, setIsDelivering] = useState(false); - const deliveryActiveRef = useRef(false); const [reactionPickerMsgId, setReactionPickerMsgId] = useState(null); - const defaultChannelType = useAppSelector( - state => state.channelConnections?.defaultMessagingChannel ?? 'web' - ); const pendingReactionRef = useRef< Map >(new Map()); @@ -158,10 +148,6 @@ const Conversations = () => { selectedThreadIdRef.current = selectedThreadId; }, [selectedThreadId]); - useEffect(() => { - isLocalModelActiveRef.current = isLocalModelActive; - }, [isLocalModelActive]); - const [teamUsage, setTeamUsage] = useState(null); const [isLoadingBudget, setIsLoadingBudget] = useState(false); @@ -305,7 +291,6 @@ const Conversations = () => { useEffect(() => { return () => { - deliveryActiveRef.current = false; mediaRecorderRef.current?.stop(); mediaStreamRef.current?.getTracks().forEach(track => track.stop()); replyAudioRef.current?.pause(); @@ -399,16 +384,24 @@ const Conversations = () => { return { ...prev, [event.thread_id]: nextEntries }; }); }, - onDone: event => { - const currentState = store.getState() as { - thread: { messagesByThreadId: Record }; - }; - const threadMessages = currentState.thread.messagesByThreadId[event.thread_id] || []; - const lastMsg = threadMessages[threadMessages.length - 1]; - if (lastMsg?.sender === 'agent' && lastMsg?.content === event.full_response) { - return; + onSegment: (event: ChatSegmentEvent) => { + // Rust delivers segments with delays already applied — just dispatch. + if (event.reaction_emoji) { + const pending = pendingReactionRef.current.get(event.thread_id); + if (pending) { + dispatch( + addReaction({ + threadId: pending.threadId, + messageId: pending.msgId, + emoji: event.reaction_emoji, + }) + ); + pendingReactionRef.current.delete(event.thread_id); + } } - + dispatch(addInferenceResponse({ content: segmentText(event), threadId: event.thread_id })); + }, + onDone: event => { // Update tool timeline setToolTimelineByThread(prev => { const existing = prev[event.thread_id] ?? []; @@ -425,58 +418,31 @@ const Conversations = () => { sendingTimeoutRef.current = null; } - // Fire-and-forget: auto-react to the user's message - const pending = pendingReactionRef.current.get(event.thread_id); - if (pending) { - maybeAutoReact(pending.msgId, pending.content, pending.threadId); - pendingReactionRef.current.delete(event.thread_id); - } - - // Multi-bubble delivery gate: only when local model is active - if (!isLocalModelActiveRef.current) { - dispatch( - addInferenceResponse({ content: event.full_response, threadId: event.thread_id }) - ); - setIsSending(false); - dispatch(setActiveThread(null)); - return; + // Apply reaction emoji from Rust (when not segmented — no onSegment fired). + if (event.reaction_emoji) { + const pending = pendingReactionRef.current.get(event.thread_id); + if (pending) { + dispatch( + addReaction({ + threadId: pending.threadId, + messageId: pending.msgId, + emoji: event.reaction_emoji, + }) + ); + } } + pendingReactionRef.current.delete(event.thread_id); - const segments = segmentMessage(event.full_response); - - if (segments.length <= 1) { + // Only add the response bubble if Rust didn't already deliver it + // via chat_segment events (segment_total > 0 means segments were sent). + if (!event.segment_total) { dispatch( addInferenceResponse({ content: event.full_response, threadId: event.thread_id }) ); - setIsSending(false); - dispatch(setActiveThread(null)); - return; } - // Async delivery: show each segment as a separate bubble with a typing pause - setIsDelivering(true); - deliveryActiveRef.current = true; - - void (async () => { - for (let i = 0; i < segments.length; i++) { - if (!deliveryActiveRef.current) break; - - if (i > 0) { - await new Promise(resolve => - setTimeout(resolve, getSegmentDelay(segments[i - 1])) - ); - } - - if (!deliveryActiveRef.current) break; - - dispatch(addInferenceResponse({ content: segments[i], threadId: event.thread_id })); - } - - deliveryActiveRef.current = false; - setIsDelivering(false); - setIsSending(false); - // activeThreadId was already cleared by the first addInferenceResponse dispatch - })(); + setIsSending(false); + dispatch(setActiveThread(null)); }, onError: event => { if (event.thread_id !== selectedThreadIdRef.current) return; @@ -529,65 +495,18 @@ const Conversations = () => { // eslint-disable-next-line react-hooks/exhaustive-deps }, [rustChat, socketStatus]); - /** - * Segment a complete response string and dispatch each segment as a - * separate message bubble with a typing pause between them. - * Local-model-only path — no cloud API calls. - */ - const deliverLocalResponse = async (fullResponse: string, threadId: string) => { - const segments = segmentMessage(fullResponse); - - if (segments.length <= 1) { - dispatch(addInferenceResponse({ content: fullResponse, threadId })); - return; - } - - setIsDelivering(true); - deliveryActiveRef.current = true; - - for (let i = 0; i < segments.length; i++) { - if (!deliveryActiveRef.current) break; - - if (i > 0) { - await new Promise(resolve => setTimeout(resolve, getSegmentDelay(segments[i - 1]))); - } - - if (!deliveryActiveRef.current) break; - - dispatch(addInferenceResponse({ content: segments[i], threadId })); - } - - deliveryActiveRef.current = false; - setIsDelivering(false); - }; - - /** - * Fire-and-forget: ask the local model if we should auto-react to the - * user's message with an emoji. Adds a personal touch based on channel type. - */ - const maybeAutoReact = (userMessageId: string, messageContent: string, threadId: string) => { - if (!isTauri() || !isLocalModelActiveRef.current) return; - - void openhumanLocalAiShouldReact(messageContent, defaultChannelType) - .then(response => { - const decision = response.result; - if (decision?.should_react && decision.emoji) { - console.debug('[conversations:auto-react] reacting with', decision.emoji); - dispatch(addReaction({ threadId, messageId: userMessageId, emoji: decision.emoji })); - } - }) - .catch(err => { - console.debug('[conversations:auto-react] failed:', err); - }); - }; - const handleSendMessage = async (text?: string) => { const normalized = text ?? inputValue; const trimmed = normalized.trim(); if (!trimmed || !selectedThreadId || isSending) return; - if (!isLocalModelActiveRef.current && socketStatus !== 'connected') { - setSendError(chatSendError('socket_disconnected', 'Realtime socket is not connected.')); + if (socketStatus !== 'connected') { + setSendError( + chatSendError( + 'socket_disconnected', + 'Realtime socket is not connected — responses cannot be delivered without a client ID.' + ) + ); return; } @@ -633,61 +552,10 @@ const Conversations = () => { setToolTimelineByThread(prev => ({ ...prev, [sendingThreadId]: [] })); dispatch(setActiveThread(sendingThreadId)); - // ── Local Ollama path ──────────────────────────────────────────────────── - // When a local model is ready, bypass the cloud socket entirely. - // Zero cloud tokens consumed on this path. - if (isLocalModelActiveRef.current) { - try { - // Build message history: convert stored messages + the new user turn - const storedMessages = - ( - store.getState() as { - thread: { - messagesByThreadId: Record; - }; - } - ).thread.messagesByThreadId[sendingThreadId] ?? []; - - const history: LocalAiChatMessage[] = storedMessages - .filter(m => m.sender === 'user' || m.sender === 'agent') - .map(m => ({ - role: m.sender === 'user' ? ('user' as const) : ('assistant' as const), - content: m.content, - })); - - console.debug('[conversations:local] sending to local model', { - historyLength: history.length, - threadId: sendingThreadId, - }); - - const response = await openhumanLocalAiChat(history); - const reply = response.result?.trim() ?? ''; - - if (!reply) { - throw new Error('Local model returned an empty response.'); - } - - await deliverLocalResponse(reply, sendingThreadId); - pendingReactionRef.current.delete(sendingThreadId); - maybeAutoReact(userMessage.id, trimmed, sendingThreadId); - } catch (err) { - pendingReactionRef.current.delete(sendingThreadId); - const msg = err instanceof Error ? err.message : String(err); - setSendError(chatSendError('local_model_failed', msg)); - dispatch( - addInferenceResponse({ - content: 'Local model error — please try again.', - threadId: sendingThreadId, - }) - ); - } finally { - setIsSending(false); - dispatch(setActiveThread(null)); - } - return; - } - - // ── Cloud socket path (unchanged) ──────────────────────────────────────── + // ── Cloud socket path ───────────────────────────────────────────────────── + // Always route primary chat through the cloud backend via socket. + // Local model (Ollama) is used only for supplementary features + // (auto-react, autocomplete, etc.) — never as a primary chat path. try { await chatSend({ threadId: sendingThreadId, message: trimmed, model: AGENTIC_MODEL_ID }); @@ -1082,7 +950,7 @@ const Conversations = () => { ))} - {((activeThreadId === selectedThreadId && isSending) || isDelivering) && ( + {activeThreadId === selectedThreadId && isSending && (
diff --git a/app/src/services/chatService.ts b/app/src/services/chatService.ts index 791363d21..2e623cc4d 100644 --- a/app/src/services/chatService.ts +++ b/app/src/services/chatService.ts @@ -1,6 +1,12 @@ /** - * Chat Service — Socket.IO-first chat transport for desktop and web. + * Chat Service — RPC-based chat transport. + * + * Chat messages are SENT via core RPC (`openhuman.channel_web_chat`). + * Responses and events stream back over the existing Socket.IO connection + * (tool_call, tool_result, chat_done, chat_error) via the web-channel + * event bridge in the Rust core. */ +import { callCoreRpc } from './coreRpcClient'; import { socketService } from './socketService'; export interface ChatToolCallEvent { @@ -26,6 +32,30 @@ export interface ChatDoneEvent { rounds_used: number; total_input_tokens: number; total_output_tokens: number; + /** Emoji reaction decided by the local model (if any). */ + reaction_emoji?: string | null; + /** Total segments when the response was split into bubbles by Rust. */ + segment_total?: number | null; +} + +/** A single segment of a multi-bubble response, emitted before `chat_done`. */ +export interface ChatSegmentEvent { + thread_id: string; + /** + * Wire name is `full_response` for compatibility with {@link WebChannelEvent}, + * but this field contains only the **segment text**, not the full response. + * Use {@link segmentText} for clarity in consuming code. + */ + full_response: string; + request_id: string; + segment_index: number; + segment_total: number; + reaction_emoji?: string | null; +} + +/** Return the segment text from a {@link ChatSegmentEvent} (avoids the misleading wire name). */ +export function segmentText(event: ChatSegmentEvent): string { + return event.full_response; } export interface ChatErrorEvent { @@ -38,6 +68,7 @@ export interface ChatErrorEvent { export interface ChatEventListeners { onToolCall?: (event: ChatToolCallEvent) => void; onToolResult?: (event: ChatToolResultEvent) => void; + onSegment?: (event: ChatSegmentEvent) => void; onDone?: (event: ChatDoneEvent) => void; onError?: (event: ChatErrorEvent) => void; } @@ -62,6 +93,13 @@ export function subscribeChatEvents(listeners: ChatEventListeners): () => void { handlers.push(['chat:tool_result', cb], ['tool_result', cb]); } + if (listeners.onSegment) { + const cb = (payload: unknown) => listeners.onSegment?.(payload as ChatSegmentEvent); + socket.on('chat:segment', cb); + socket.on('chat_segment', cb); + handlers.push(['chat:segment', cb], ['chat_segment', cb]); + } + if (listeners.onDone) { const cb = (payload: unknown) => listeners.onDone?.(payload as ChatDoneEvent); socket.on('chat:done', cb); @@ -89,20 +127,48 @@ export interface ChatSendParams { model: string; } +/** + * Send a chat message via core RPC. + * + * The Rust core spawns the agent loop asynchronously and streams events + * (tool_call, tool_result, chat_done, chat_error) back over the socket + * connection using the `client_id` (socket ID) for routing. + */ export async function chatSend(params: ChatSendParams): Promise { - if (!socketService.isConnected()) { - throw new Error('Socket not connected'); + const socket = socketService.getSocket(); + const clientId = socket?.id; + if (!clientId) { + throw new Error('Socket not connected — no client ID for event routing'); } - const payload = { thread_id: params.threadId, message: params.message, model: params.model }; - - socketService.emit('chat:start', payload); + await callCoreRpc({ + method: 'openhuman.channel_web_chat', + params: { + client_id: clientId, + thread_id: params.threadId, + message: params.message, + model_override: params.model, + }, + }); } +/** + * Cancel an in-flight chat request via core RPC. + */ export async function chatCancel(threadId: string): Promise { - if (!socketService.isConnected()) return false; - socketService.emit('chat:cancel', { thread_id: threadId }); - return true; + const socket = socketService.getSocket(); + const clientId = socket?.id; + if (!clientId) return false; + + try { + await callCoreRpc({ + method: 'openhuman.channel_web_cancel', + params: { client_id: clientId, thread_id: threadId }, + }); + return true; + } catch { + return false; + } } export function useRustChat(): boolean { diff --git a/docs/TODO.md b/docs/TODO.md index c6f3656d1..7af939e84 100644 --- a/docs/TODO.md +++ b/docs/TODO.md @@ -65,6 +65,7 @@ todo - memory skill [] should index properly all the things (sanil) [] should properly register user interactions + [] integrate memory to all the various skills, like autocomplete and others. --- e2e tests to write up diff --git a/src/core/socketio.rs b/src/core/socketio.rs index 2b676a15e..457b73081 100644 --- a/src/core/socketio.rs +++ b/src/core/socketio.rs @@ -29,6 +29,15 @@ pub struct WebChannelEvent { pub success: Option, #[serde(skip_serializing_if = "Option::is_none")] pub round: Option, + /// Emoji reaction the assistant wants to add to the user's message. + #[serde(skip_serializing_if = "Option::is_none")] + pub reaction_emoji: Option, + /// 0-based index when a response is delivered as multiple segments. + #[serde(skip_serializing_if = "Option::is_none")] + pub segment_index: Option, + /// Total number of segments in a segmented delivery. + #[serde(skip_serializing_if = "Option::is_none")] + pub segment_total: Option, } #[derive(Debug, Deserialize)] diff --git a/src/openhuman/channels/providers/mod.rs b/src/openhuman/channels/providers/mod.rs index 479d5b414..057dd21ff 100644 --- a/src/openhuman/channels/providers/mod.rs +++ b/src/openhuman/channels/providers/mod.rs @@ -10,6 +10,7 @@ pub mod linq; #[cfg(feature = "channel-matrix")] pub mod matrix; pub mod mattermost; +mod presentation; pub mod qq; pub mod signal; pub mod slack; diff --git a/src/openhuman/channels/providers/presentation.rs b/src/openhuman/channels/providers/presentation.rs new file mode 100644 index 000000000..631a16973 --- /dev/null +++ b/src/openhuman/channels/providers/presentation.rs @@ -0,0 +1,414 @@ +//! Presentation layer for web-channel chat responses. +//! +//! Handles two concerns that run on the **local model** (zero cloud cost): +//! +//! 1. **Message segmentation** — split an agent response into human-feeling +//! chat bubbles, but *only* when the content is natural-language prose. +//! Code blocks, structured data, and short messages are never split. +//! +//! 2. **Emoji reactions** — decide whether the assistant should react to the +//! user's message with an emoji. + +use crate::core::socketio::WebChannelEvent; +use crate::openhuman::config::rpc as config_rpc; + +use super::web::publish_web_channel_event; + +const MIN_SEGMENT_CHARS: usize = 40; +const MAX_SEGMENTS: usize = 5; + +/// Deliver an agent response to the frontend, applying local-model +/// presentation (segmentation + reaction) when the model is available. +/// +/// Always emits at least one `chat_done` event. When the response is +/// segmented, emits one `chat_segment` per bubble first, then a final +/// `chat_done` with the full text for deduplication. +pub async fn deliver_response( + client_id: &str, + thread_id: &str, + request_id: &str, + full_response: &str, + user_message: &str, +) { + // Spawn reaction decision in parallel — it runs on the local model and + // shouldn't block segmentation or delivery. + let user_msg_owned = user_message.to_string(); + let reaction_handle = tokio::spawn(async move { try_reaction(&user_msg_owned).await }); + + // Segmentation is pure CPU work, runs immediately. + let segments = segment_for_delivery(full_response); + + // Await the reaction result (should already be done or nearly done). + let reaction_emoji = reaction_handle.await.unwrap_or(None); + + if segments.len() <= 1 { + // Single bubble — emit chat_done directly. + publish_web_channel_event(WebChannelEvent { + event: "chat_done".to_string(), + client_id: client_id.to_string(), + thread_id: thread_id.to_string(), + request_id: request_id.to_string(), + full_response: Some(full_response.to_string()), + message: None, + error_type: None, + tool_name: None, + skill_id: None, + args: None, + output: None, + success: None, + round: None, + reaction_emoji, + segment_index: None, + segment_total: None, + }); + return; + } + + let total = segments.len() as u32; + + // Emit each segment as a separate bubble with a human-feeling delay. + for (i, segment) in segments.iter().enumerate() { + if i > 0 { + let delay_ms = segment_delay(&segments[i - 1]); + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + } + + publish_web_channel_event(WebChannelEvent { + event: "chat_segment".to_string(), + client_id: client_id.to_string(), + thread_id: thread_id.to_string(), + request_id: request_id.to_string(), + full_response: Some(segment.clone()), + message: None, + error_type: None, + tool_name: None, + skill_id: None, + args: None, + output: None, + success: None, + round: None, + // Attach reaction emoji only on the first segment. + reaction_emoji: if i == 0 { reaction_emoji.clone() } else { None }, + segment_index: Some(i as u32), + segment_total: Some(total), + }); + } + + // Final chat_done with full text (for deduplication / state sync). + publish_web_channel_event(WebChannelEvent { + event: "chat_done".to_string(), + client_id: client_id.to_string(), + thread_id: thread_id.to_string(), + request_id: request_id.to_string(), + full_response: Some(full_response.to_string()), + message: None, + error_type: None, + tool_name: None, + skill_id: None, + args: None, + output: None, + success: None, + round: None, + reaction_emoji: None, + segment_index: None, + segment_total: Some(total), + }); +} + +// ── Segmentation ───────────────────────────────────────────────────────────── + +/// Decide whether and how to split a response into multiple chat bubbles. +/// +/// Rules (applied in order): +/// - Short messages (< 80 chars) are never split. +/// - Messages containing code fences (```) are never split. +/// - Messages that are predominantly structured (lists, tables, headers) +/// are never split — they read better as a single block. +/// - Otherwise, split on paragraph breaks (\n\n), merging segments that +/// are too short to stand alone. +/// - Fallback: split on sentence boundaries if paragraphs don't yield +/// multiple segments. +fn segment_for_delivery(text: &str) -> Vec { + let trimmed = text.trim(); + + // Don't split short messages. + if trimmed.len() < 80 { + return vec![trimmed.to_string()]; + } + + // Never split messages containing code fences. + if trimmed.contains("```") { + tracing::debug!("[presentation:segment] skipping segmentation: contains code fences"); + return vec![trimmed.to_string()]; + } + + // Never split messages that are predominantly structured content. + if is_structured_content(trimmed) { + tracing::debug!("[presentation:segment] skipping segmentation: structured content"); + return vec![trimmed.to_string()]; + } + + // Strategy 1: paragraph splits. + let paragraphs: Vec<&str> = trimmed + .split("\n\n") + .map(|p| p.trim()) + .filter(|p| !p.is_empty()) + .collect(); + + if paragraphs.len() >= 2 { + let merged = merge_short(¶graphs, "\n\n"); + if merged.len() >= 2 { + tracing::debug!( + segments = merged.len(), + "[presentation:segment] split by paragraphs" + ); + return merged.into_iter().take(MAX_SEGMENTS).collect(); + } + } + + // Strategy 2: sentence splits. + let sentences = split_sentences(trimmed); + if sentences.len() >= 2 { + let grouped = group_sentences(&sentences); + if grouped.len() >= 2 { + tracing::debug!( + segments = grouped.len(), + "[presentation:segment] split by sentences" + ); + return grouped.into_iter().take(MAX_SEGMENTS).collect(); + } + } + + // Fallback: single bubble. + vec![trimmed.to_string()] +} + +/// Returns true if the text is predominantly structured content that +/// shouldn't be split across bubbles (markdown lists, tables, headers). +fn is_structured_content(text: &str) -> bool { + let lines: Vec<&str> = text.lines().collect(); + if lines.is_empty() { + return false; + } + + let structured_count = lines + .iter() + .filter(|line| { + let trimmed = line.trim(); + trimmed.starts_with("- ") + || trimmed.starts_with("* ") + || trimmed.starts_with("| ") + || trimmed.starts_with("# ") + || trimmed.starts_with("## ") + || trimmed.starts_with("### ") + || is_numbered_list_item(trimmed) + }) + .count(); + + // If more than 40% of non-empty lines are structured, don't split. + let non_empty = lines.iter().filter(|l| !l.trim().is_empty()).count(); + non_empty > 0 && (structured_count * 100 / non_empty) > 40 +} + +/// Check if a line starts with a numbered list prefix like "1. " or "12. ". +/// Rejects dates ("2024. ") and decimals by requiring the digits+dot+space +/// to appear at the very start and be followed by text. +fn is_numbered_list_item(line: &str) -> bool { + let bytes = line.as_bytes(); + let mut i = 0; + // Consume one or more leading ASCII digits. + while i < bytes.len() && bytes[i].is_ascii_digit() { + i += 1; + } + // Must have consumed at least one digit, followed by ". ". + i > 0 && i <= 3 && bytes.get(i) == Some(&b'.') && bytes.get(i + 1) == Some(&b' ') +} + +/// Merge adjacent segments shorter than MIN_SEGMENT_CHARS. +fn merge_short(parts: &[&str], joiner: &str) -> Vec { + let mut result: Vec = Vec::new(); + for part in parts { + if !result.is_empty() && part.len() < MIN_SEGMENT_CHARS { + let last = result.last_mut().unwrap(); + last.push_str(joiner); + last.push_str(part); + } else { + result.push(part.to_string()); + } + } + result +} + +/// Split text on sentence-ending punctuation (. ! ?) followed by a space +/// and an uppercase letter. +fn split_sentences(text: &str) -> Vec { + let mut parts: Vec = Vec::new(); + let mut current = String::new(); + let chars: Vec = text.chars().collect(); + + let mut i = 0; + while i < chars.len() { + current.push(chars[i]); + let ch = chars[i]; + + if (ch == '.' || ch == '!' || ch == '?') + && i + 2 < chars.len() + && chars[i + 1] == ' ' + && chars[i + 2].is_ascii_uppercase() + { + let trimmed = current.trim().to_string(); + if !trimmed.is_empty() { + parts.push(trimmed); + } + current.clear(); + i += 2; // skip the space + continue; + } + + i += 1; + } + + let remaining = current.trim().to_string(); + if !remaining.is_empty() { + parts.push(remaining); + } + parts +} + +/// Group sentences into 2-3 bubbles. +fn group_sentences(sentences: &[String]) -> Vec { + let target_count = std::cmp::min(3, (sentences.len() + 1) / 2); + let group_size = (sentences.len() + target_count - 1) / target_count; + let mut groups: Vec = Vec::new(); + + for chunk in sentences.chunks(group_size) { + let joined = chunk.join(" "); + if joined.len() >= MIN_SEGMENT_CHARS { + groups.push(joined); + } else if let Some(last) = groups.last_mut() { + last.push(' '); + last.push_str(&joined); + } else { + groups.push(joined); + } + } + groups +} + +/// Compute a human-feeling inter-bubble delay in milliseconds. +/// Bounded: 500ms–1400ms, scaling with segment length. +fn segment_delay(segment: &str) -> u64 { + let base: u64 = 500; + let per_char: u64 = 2; // ~1.5-2ms per char for a natural reading pace + std::cmp::min(base + (segment.len() as u64) * per_char, 1400) +} + +// ── Reactions ──────────────────────────────────────────────────────────────── + +/// Ask the local model for an emoji reaction to the user's message. +/// Returns `None` if the local model is unavailable or decides no reaction. +async fn try_reaction(user_message: &str) -> Option { + if user_message.trim().is_empty() { + return None; + } + + let config = match config_rpc::load_config_with_timeout().await { + Ok(c) => c, + Err(_) => return None, + }; + + if !config.local_ai.enabled { + return None; + } + + match crate::openhuman::local_ai::ops::local_ai_should_react(&config, user_message, "web").await + { + Ok(outcome) => { + let decision = outcome.value; + if decision.should_react { + decision.emoji + } else { + None + } + } + Err(e) => { + tracing::debug!(error = %e, "[presentation:reaction] local model reaction failed"); + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn short_messages_are_never_split() { + let result = segment_for_delivery("Hello there!"); + assert_eq!(result, vec!["Hello there!"]); + } + + #[test] + fn code_fences_prevent_splitting() { + let text = "Here is some code:\n\n```rust\nfn main() {}\n```\n\nAnd more text after."; + let result = segment_for_delivery(text); + assert_eq!(result.len(), 1); + } + + #[test] + fn paragraph_splitting_works() { + let text = "This is the first paragraph with enough content to stand alone.\n\n\ + This is the second paragraph that also has sufficient length."; + let result = segment_for_delivery(text); + assert_eq!(result.len(), 2); + } + + #[test] + fn structured_content_not_split() { + let text = "Here are the steps:\n\n- First do this thing\n- Then do that thing\n- Finally wrap up\n\nThat should cover it."; + let result = segment_for_delivery(text); + assert_eq!(result.len(), 1); + } + + #[test] + fn sentence_splitting_works() { + let text = "This is the first sentence and it has some length. This is the second sentence that continues the thought. And here is a third sentence to round things out."; + let result = segment_for_delivery(text); + assert!( + result.len() >= 2, + "expected >= 2 segments, got {}", + result.len() + ); + } + + #[test] + fn segment_delay_bounds() { + assert_eq!(segment_delay(""), 500); + assert_eq!(segment_delay(&"x".repeat(1000)), 1400); + assert!(segment_delay("Hello world") > 500); + } + + #[test] + fn numbered_list_detection() { + assert!(is_numbered_list_item("1. First item")); + assert!(is_numbered_list_item("12. Twelfth item")); + assert!(!is_numbered_list_item("2024. Was a good year")); // too many digits + assert!(!is_numbered_list_item("hello 1. world")); // digits not at start + assert!(!is_numbered_list_item("1.5 seconds")); // no space after dot + } + + #[test] + fn max_segments_respected() { + let paras: Vec = (0..10) + .map(|i| { + format!( + "Paragraph number {} has enough content to stand on its own.", + i + ) + }) + .collect(); + let text = paras.join("\n\n"); + let result = segment_for_delivery(&text); + assert!(result.len() <= MAX_SEGMENTS); + } +} diff --git a/src/openhuman/channels/providers/web.rs b/src/openhuman/channels/providers/web.rs index f424ccc16..765dad4ff 100644 --- a/src/openhuman/channels/providers/web.rs +++ b/src/openhuman/channels/providers/web.rs @@ -14,6 +14,8 @@ use crate::openhuman::config::Config; use crate::openhuman::providers::ConversationMessage; use crate::rpc::RpcOutcome; +use super::presentation; + static EVENT_BUS: Lazy> = Lazy::new(|| { let (tx, _rx) = broadcast::channel(512); tx @@ -91,6 +93,9 @@ pub async fn start_chat( output: None, success: None, round: None, + reaction_emoji: None, + segment_index: None, + segment_total: None, }); } } @@ -100,12 +105,13 @@ pub async fn start_chat( let request_id_task = request_id.clone(); let map_key_task = map_key.clone(); + let user_message = message.clone(); let handle = tokio::spawn(async move { let result = run_chat_task( &client_id_task, &thread_id_task, &request_id_task, - &message, + &user_message, model_override, temperature, ) @@ -113,21 +119,18 @@ pub async fn start_chat( match result { Ok(full_response) => { - publish_web_channel_event(WebChannelEvent { - event: "chat_done".to_string(), - client_id: client_id_task.clone(), - thread_id: thread_id_task.clone(), - request_id: request_id_task.clone(), - full_response: Some(full_response), - message: None, - error_type: None, - tool_name: None, - skill_id: None, - args: None, - output: None, - success: None, - round: None, - }); + // ── Presentation layer (local model, fire-and-forget) ───── + // Segment the response into human-readable bubbles and + // decide whether to react — both run via local Ollama if + // available, zero cloud cost. + presentation::deliver_response( + &client_id_task, + &thread_id_task, + &request_id_task, + &full_response, + &user_message, + ) + .await; } Err(err) => { publish_web_channel_event(WebChannelEvent { @@ -144,6 +147,9 @@ pub async fn start_chat( output: None, success: None, round: None, + reaction_emoji: None, + segment_index: None, + segment_total: None, }); } } @@ -207,6 +213,9 @@ pub async fn cancel_chat(client_id: &str, thread_id: &str) -> Result usize { + let mut end = s.len().min(max_bytes); + while end > 0 && !s.is_char_boundary(end) { + end -= 1; + } + end + } + fn fixture(path: &str) -> String { let base = std::path::Path::new(env!("CARGO_MANIFEST_DIR")); std::fs::read_to_string( @@ -113,7 +122,7 @@ mod tests { .await; println!("=== TICK 1 REPORT ==="); - println!("{}", &report1[..report1.len().min(2000)]); + println!("{}", &report1[..truncate_at_char_boundary(&report1, 2000)]); println!("=====================\n"); // Verify tick 1 report contains ingested data @@ -184,7 +193,7 @@ mod tests { .await; println!("=== TICK 2 REPORT ==="); - println!("{}", &report2[..report2.len().min(2000)]); + println!("{}", &report2[..truncate_at_char_boundary(&report2, 2000)]); println!("=====================\n"); // Verify tick 2 report contains NEW data @@ -240,7 +249,7 @@ mod tests { .await; println!("=== TICK 3 REPORT ==="); - println!("{}", &report3[..report3.len().min(2000)]); + println!("{}", &report3[..truncate_at_char_boundary(&report3, 2000)]); println!("=====================\n"); // Tick 3 should show no changes