Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 49 additions & 181 deletions app/src/pages/Conversations.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import Markdown from 'react-markdown';
import { useNavigate } from 'react-router-dom';

import { type ChatSendError, chatSendError } from '../chat/chatSendError';
import { useLocalModelStatus } from '../hooks/useLocalModelStatus';
import { creditsApi, type TeamUsage } from '../services/api/creditsApi';
import {
chatCancel,
type ChatSegmentEvent,
chatSend,
type ChatToolCallEvent,
type ChatToolResultEvent,
segmentText,
subscribeChatEvents,
useRustChat,
} from '../services/chatService';
Expand All @@ -28,14 +29,10 @@ import {
setSelectedThread,
} from '../store/threadSlice';
import type { ThreadMessage } from '../types/thread';
import { getSegmentDelay, segmentMessage } from '../utils/messageSegmentation';
import {
isTauri,
type LocalAiChatMessage,
openhumanAutocompleteAccept,
openhumanAutocompleteCurrent,
openhumanLocalAiChat,
openhumanLocalAiShouldReact,
openhumanVoiceStatus,
openhumanVoiceTranscribeBytes,
openhumanVoiceTts,
Expand Down Expand Up @@ -141,14 +138,7 @@ const Conversations = () => {
Record<string, ToolTimelineEntry[]>
>({});
const rustChat = useRustChat();
const isLocalModelActive = useLocalModelStatus();
const isLocalModelActiveRef = useRef(isLocalModelActive);
const [isDelivering, setIsDelivering] = useState(false);
const deliveryActiveRef = useRef(false);
const [reactionPickerMsgId, setReactionPickerMsgId] = useState<string | null>(null);
const defaultChannelType = useAppSelector(
state => state.channelConnections?.defaultMessagingChannel ?? 'web'
);
const pendingReactionRef = useRef<
Map<string, { msgId: string; content: string; threadId: string }>
>(new Map());
Expand All @@ -158,10 +148,6 @@ const Conversations = () => {
selectedThreadIdRef.current = selectedThreadId;
}, [selectedThreadId]);

useEffect(() => {
isLocalModelActiveRef.current = isLocalModelActive;
}, [isLocalModelActive]);

const [teamUsage, setTeamUsage] = useState<TeamUsage | null>(null);
const [isLoadingBudget, setIsLoadingBudget] = useState(false);

Expand Down Expand Up @@ -305,7 +291,6 @@ const Conversations = () => {

useEffect(() => {
return () => {
deliveryActiveRef.current = false;
mediaRecorderRef.current?.stop();
mediaStreamRef.current?.getTracks().forEach(track => track.stop());
replyAudioRef.current?.pause();
Expand Down Expand Up @@ -399,16 +384,24 @@ const Conversations = () => {
return { ...prev, [event.thread_id]: nextEntries };
});
},
onDone: event => {
const currentState = store.getState() as {
thread: { messagesByThreadId: Record<string, ThreadMessage[]> };
};
const threadMessages = currentState.thread.messagesByThreadId[event.thread_id] || [];
const lastMsg = threadMessages[threadMessages.length - 1];
if (lastMsg?.sender === 'agent' && lastMsg?.content === event.full_response) {
return;
onSegment: (event: ChatSegmentEvent) => {
// Rust delivers segments with delays already applied — just dispatch.
if (event.reaction_emoji) {
const pending = pendingReactionRef.current.get(event.thread_id);
if (pending) {
dispatch(
addReaction({
threadId: pending.threadId,
messageId: pending.msgId,
emoji: event.reaction_emoji,
})
);
pendingReactionRef.current.delete(event.thread_id);
}
}

dispatch(addInferenceResponse({ content: segmentText(event), threadId: event.thread_id }));
},
onDone: event => {
// Update tool timeline
setToolTimelineByThread(prev => {
const existing = prev[event.thread_id] ?? [];
Expand All @@ -425,58 +418,31 @@ const Conversations = () => {
sendingTimeoutRef.current = null;
}

// Fire-and-forget: auto-react to the user's message
const pending = pendingReactionRef.current.get(event.thread_id);
if (pending) {
maybeAutoReact(pending.msgId, pending.content, pending.threadId);
pendingReactionRef.current.delete(event.thread_id);
}

// Multi-bubble delivery gate: only when local model is active
if (!isLocalModelActiveRef.current) {
dispatch(
addInferenceResponse({ content: event.full_response, threadId: event.thread_id })
);
setIsSending(false);
dispatch(setActiveThread(null));
return;
// Apply reaction emoji from Rust (when not segmented — no onSegment fired).
if (event.reaction_emoji) {
const pending = pendingReactionRef.current.get(event.thread_id);
if (pending) {
dispatch(
addReaction({
threadId: pending.threadId,
messageId: pending.msgId,
emoji: event.reaction_emoji,
})
);
}
}
pendingReactionRef.current.delete(event.thread_id);

const segments = segmentMessage(event.full_response);

if (segments.length <= 1) {
// Only add the response bubble if Rust didn't already deliver it
// via chat_segment events (segment_total > 0 means segments were sent).
if (!event.segment_total) {
dispatch(
addInferenceResponse({ content: event.full_response, threadId: event.thread_id })
);
setIsSending(false);
dispatch(setActiveThread(null));
return;
}

// Async delivery: show each segment as a separate bubble with a typing pause
setIsDelivering(true);
deliveryActiveRef.current = true;

void (async () => {
for (let i = 0; i < segments.length; i++) {
if (!deliveryActiveRef.current) break;

if (i > 0) {
await new Promise<void>(resolve =>
setTimeout(resolve, getSegmentDelay(segments[i - 1]))
);
}

if (!deliveryActiveRef.current) break;

dispatch(addInferenceResponse({ content: segments[i], threadId: event.thread_id }));
}

deliveryActiveRef.current = false;
setIsDelivering(false);
setIsSending(false);
// activeThreadId was already cleared by the first addInferenceResponse dispatch
})();
setIsSending(false);
dispatch(setActiveThread(null));
},
onError: event => {
if (event.thread_id !== selectedThreadIdRef.current) return;
Expand Down Expand Up @@ -529,65 +495,18 @@ const Conversations = () => {
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [rustChat, socketStatus]);

/**
* Segment a complete response string and dispatch each segment as a
* separate message bubble with a typing pause between them.
* Local-model-only path — no cloud API calls.
*/
const deliverLocalResponse = async (fullResponse: string, threadId: string) => {
const segments = segmentMessage(fullResponse);

if (segments.length <= 1) {
dispatch(addInferenceResponse({ content: fullResponse, threadId }));
return;
}

setIsDelivering(true);
deliveryActiveRef.current = true;

for (let i = 0; i < segments.length; i++) {
if (!deliveryActiveRef.current) break;

if (i > 0) {
await new Promise<void>(resolve => setTimeout(resolve, getSegmentDelay(segments[i - 1])));
}

if (!deliveryActiveRef.current) break;

dispatch(addInferenceResponse({ content: segments[i], threadId }));
}

deliveryActiveRef.current = false;
setIsDelivering(false);
};

/**
* Fire-and-forget: ask the local model if we should auto-react to the
* user's message with an emoji. Adds a personal touch based on channel type.
*/
const maybeAutoReact = (userMessageId: string, messageContent: string, threadId: string) => {
if (!isTauri() || !isLocalModelActiveRef.current) return;

void openhumanLocalAiShouldReact(messageContent, defaultChannelType)
.then(response => {
const decision = response.result;
if (decision?.should_react && decision.emoji) {
console.debug('[conversations:auto-react] reacting with', decision.emoji);
dispatch(addReaction({ threadId, messageId: userMessageId, emoji: decision.emoji }));
}
})
.catch(err => {
console.debug('[conversations:auto-react] failed:', err);
});
};

const handleSendMessage = async (text?: string) => {
const normalized = text ?? inputValue;
const trimmed = normalized.trim();

if (!trimmed || !selectedThreadId || isSending) return;
if (!isLocalModelActiveRef.current && socketStatus !== 'connected') {
setSendError(chatSendError('socket_disconnected', 'Realtime socket is not connected.'));
if (socketStatus !== 'connected') {
setSendError(
chatSendError(
'socket_disconnected',
'Realtime socket is not connected — responses cannot be delivered without a client ID.'
)
);
return;
}

Expand Down Expand Up @@ -633,61 +552,10 @@ const Conversations = () => {
setToolTimelineByThread(prev => ({ ...prev, [sendingThreadId]: [] }));
dispatch(setActiveThread(sendingThreadId));

// ── Local Ollama path ────────────────────────────────────────────────────
// When a local model is ready, bypass the cloud socket entirely.
// Zero cloud tokens consumed on this path.
if (isLocalModelActiveRef.current) {
try {
// Build message history: convert stored messages + the new user turn
const storedMessages =
(
store.getState() as {
thread: {
messagesByThreadId: Record<string, import('../types/thread').ThreadMessage[]>;
};
}
).thread.messagesByThreadId[sendingThreadId] ?? [];

const history: LocalAiChatMessage[] = storedMessages
.filter(m => m.sender === 'user' || m.sender === 'agent')
.map(m => ({
role: m.sender === 'user' ? ('user' as const) : ('assistant' as const),
content: m.content,
}));

console.debug('[conversations:local] sending to local model', {
historyLength: history.length,
threadId: sendingThreadId,
});

const response = await openhumanLocalAiChat(history);
const reply = response.result?.trim() ?? '';

if (!reply) {
throw new Error('Local model returned an empty response.');
}

await deliverLocalResponse(reply, sendingThreadId);
pendingReactionRef.current.delete(sendingThreadId);
maybeAutoReact(userMessage.id, trimmed, sendingThreadId);
} catch (err) {
pendingReactionRef.current.delete(sendingThreadId);
const msg = err instanceof Error ? err.message : String(err);
setSendError(chatSendError('local_model_failed', msg));
dispatch(
addInferenceResponse({
content: 'Local model error — please try again.',
threadId: sendingThreadId,
})
);
} finally {
setIsSending(false);
dispatch(setActiveThread(null));
}
return;
}

// ── Cloud socket path (unchanged) ────────────────────────────────────────
// ── Cloud socket path ─────────────────────────────────────────────────────
// Always route primary chat through the cloud backend via socket.
// Local model (Ollama) is used only for supplementary features
// (auto-react, autocomplete, etc.) — never as a primary chat path.
try {
await chatSend({ threadId: sendingThreadId, message: trimmed, model: AGENTIC_MODEL_ID });

Expand Down Expand Up @@ -1082,7 +950,7 @@ const Conversations = () => {
</div>
</div>
))}
{((activeThreadId === selectedThreadId && isSending) || isDelivering) && (
{activeThreadId === selectedThreadId && isSending && (
<div className="flex justify-start">
<div className="bg-stone-200/80 rounded-2xl rounded-bl-md px-4 py-3">
<div className="flex items-center gap-1">
Expand Down
Loading
Loading