diff --git a/agents/package.json b/agents/package.json index 0bbdc39a2..f8bd32f66 100644 --- a/agents/package.json +++ b/agents/package.json @@ -46,9 +46,10 @@ "zod": "^3.25.76" }, "dependencies": { + "@bufbuild/protobuf": "^1.10.0", "@ffmpeg-installer/ffmpeg": "^1.1.0", "@livekit/mutex": "^1.1.1", - "@livekit/protocol": "^1.43.0", + "@livekit/protocol": "^1.45.1", "@livekit/typed-emitter": "^3.0.0", "@opentelemetry/api": "^1.9.0", "@opentelemetry/api-logs": "^0.54.0", diff --git a/agents/src/constants.ts b/agents/src/constants.ts index ba9c37dee..3da61af46 100644 --- a/agents/src/constants.ts +++ b/agents/src/constants.ts @@ -20,3 +20,4 @@ export const RPC_GET_AGENT_INFO = 'lk.agent.get_agent_info'; export const RPC_SEND_MESSAGE = 'lk.agent.send_message'; export const TOPIC_AGENT_REQUEST = 'lk.agent.request'; export const TOPIC_AGENT_RESPONSE = 'lk.agent.response'; +export const TOPIC_SESSION_MESSAGES = 'lk.agent.session'; diff --git a/agents/src/inference/interruption/http_transport.ts b/agents/src/inference/interruption/http_transport.ts index b698ebc50..a1a4d7e4f 100644 --- a/agents/src/inference/interruption/http_transport.ts +++ b/agents/src/inference/interruption/http_transport.ts @@ -154,8 +154,8 @@ export function createHttpTransport( updateUserSpeakingSpan(entry); } const event: OverlappingSpeechEvent = { - type: 'user_overlapping_speech', - timestamp: Date.now(), + type: 'overlapping_speech', + detectedAt: Date.now(), overlapStartedAt: overlapSpeechStartedAt, isInterruption: entry.isInterruption, speechInput: entry.speechInput, diff --git a/agents/src/inference/interruption/interruption_detector.ts b/agents/src/inference/interruption/interruption_detector.ts index 93d420bda..4702e7ed0 100644 --- a/agents/src/inference/interruption/interruption_detector.ts +++ b/agents/src/inference/interruption/interruption_detector.ts @@ -12,7 +12,7 @@ import { InterruptionStreamBase } from './interruption_stream.js'; import type { InterruptionOptions, OverlappingSpeechEvent } from './types.js'; type InterruptionCallbacks = { - user_overlapping_speech: (event: OverlappingSpeechEvent) => void; + overlapping_speech: (event: OverlappingSpeechEvent) => void; metrics_collected: (metrics: InterruptionMetrics) => void; error: (error: InterruptionDetectionError) => void; }; diff --git a/agents/src/inference/interruption/interruption_stream.ts b/agents/src/inference/interruption/interruption_stream.ts index ce45ae804..df6162aae 100644 --- a/agents/src/inference/interruption/interruption_stream.ts +++ b/agents/src/inference/interruption/interruption_stream.ts @@ -281,8 +281,8 @@ export class InterruptionStreamBase { } const e = latestEntry ?? InterruptionCacheEntry.default(); const event: OverlappingSpeechEvent = { - type: 'user_overlapping_speech', - timestamp: chunk.endedAt, + type: 'overlapping_speech', + detectedAt: chunk.endedAt, isInterruption: false, overlapStartedAt: this.overlapSpeechStartedAt, speechInput: e.speechInput, @@ -334,11 +334,11 @@ export class InterruptionStreamBase { const eventEmitter = new TransformStream({ transform: (chunk, controller) => { - this.model.emit('user_overlapping_speech', chunk); + this.model.emit('overlapping_speech', chunk); const metrics: InterruptionMetrics = { type: 'interruption_metrics', - timestamp: chunk.timestamp, + timestamp: chunk.detectedAt, totalDuration: chunk.totalDurationInS * 1000, predictionDuration: chunk.predictionDurationInS * 1000, detectionDelay: chunk.detectionDelayInS * 1000, diff --git a/agents/src/inference/interruption/types.ts b/agents/src/inference/interruption/types.ts index a62596030..d3aae7b95 100644 --- a/agents/src/inference/interruption/types.ts +++ b/agents/src/inference/interruption/types.ts @@ -4,8 +4,8 @@ import type { Span } from '@opentelemetry/api'; export interface OverlappingSpeechEvent { - type: 'user_overlapping_speech'; - timestamp: number; + type: 'overlapping_speech'; + detectedAt: number; isInterruption: boolean; totalDurationInS: number; predictionDurationInS: number; diff --git a/agents/src/inference/interruption/ws_transport.ts b/agents/src/inference/interruption/ws_transport.ts index e497bb1d1..eb99d5bcb 100644 --- a/agents/src/inference/interruption/ws_transport.ts +++ b/agents/src/inference/interruption/ws_transport.ts @@ -229,8 +229,8 @@ export function createWsTransport( ); const event: OverlappingSpeechEvent = { - type: 'user_overlapping_speech', - timestamp: Date.now(), + type: 'overlapping_speech', + detectedAt: Date.now(), isInterruption: true, totalDurationInS: entry.totalDurationInS, predictionDurationInS: entry.predictionDurationInS, diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index da1239dde..d0114753f 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -152,10 +152,11 @@ export class AgentActivity implements RecognitionHooks { this.onError(ev); private readonly onInterruptionOverlappingSpeech = (ev: OverlappingSpeechEvent): void => { - this.agentSession.emit(AgentSessionEventTypes.UserOverlappingSpeech, ev); + this.agentSession.emit(AgentSessionEventTypes.OverlappingSpeech, ev); }; private readonly onInterruptionMetricsCollected = (ev: InterruptionMetrics): void => { + this.agentSession._usageCollector.collect(ev); this.agentSession.emit( AgentSessionEventTypes.MetricsCollected, createMetricsCollectedEvent({ metrics: ev }), @@ -698,6 +699,8 @@ export class AgentActivity implements RecognitionHooks { } } + this.agentSession._usageCollector.collect(ev); + this.agentSession.emit( AgentSessionEventTypes.MetricsCollected, createMetricsCollectedEvent({ metrics: ev }), @@ -935,7 +938,7 @@ export class AgentActivity implements RecognitionHooks { this.restoreInterruptionByAudioActivity(); this.interruptByAudioActivity(); if (this.audioRecognition) { - this.audioRecognition.onEndOfAgentSpeech(ev.overlapStartedAt || ev.timestamp); + this.audioRecognition.onEndOfAgentSpeech(ev.overlapStartedAt || ev.detectedAt); } } @@ -2842,10 +2845,7 @@ export class AgentActivity implements RecognitionHooks { await this._mainTask.cancelAndWait(); } if (this.interruptionDetector) { - this.interruptionDetector.off( - 'user_overlapping_speech', - this.onInterruptionOverlappingSpeech, - ); + this.interruptionDetector.off('overlapping_speech', this.onInterruptionOverlappingSpeech); this.interruptionDetector.off('metrics_collected', this.onInterruptionMetricsCollected); this.interruptionDetector.off('error', this.onInterruptionError); } @@ -2888,7 +2888,7 @@ export class AgentActivity implements RecognitionHooks { try { const detector = new AdaptiveInterruptionDetector(); - detector.on('user_overlapping_speech', this.onInterruptionOverlappingSpeech); + detector.on('overlapping_speech', this.onInterruptionOverlappingSpeech); detector.on('metrics_collected', this.onInterruptionMetricsCollected); detector.on('error', this.onInterruptionError); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 355f15bc5..57b8c81c4 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -41,7 +41,6 @@ import type { VAD } from '../vad.js'; import type { Agent } from './agent.js'; import { AgentActivity } from './agent_activity.js'; import type { _TurnDetector } from './audio_recognition.js'; -import { ClientEventsHandler } from './client_events.js'; import { type AgentEvent, AgentSessionEventTypes, @@ -65,6 +64,7 @@ import { } from './events.js'; import { AgentInput, AgentOutput } from './io.js'; import { RecorderIO } from './recorder_io/index.js'; +import { RoomSessionTransport, SessionHost } from './remote_session.js'; import { DEFAULT_TEXT_INPUT_CALLBACK, RoomIO, @@ -162,7 +162,7 @@ export type AgentSessionCallbacks = { [AgentSessionEventTypes.SpeechCreated]: (ev: SpeechCreatedEvent) => void; [AgentSessionEventTypes.Error]: (ev: ErrorEvent) => void; [AgentSessionEventTypes.Close]: (ev: CloseEvent) => void; - [AgentSessionEventTypes.UserOverlappingSpeech]: (ev: OverlappingSpeechEvent) => void; + [AgentSessionEventTypes.OverlappingSpeech]: (ev: OverlappingSpeechEvent) => void; }; export type AgentSessionOptions = { @@ -204,7 +204,7 @@ export class AgentSession< private nextActivity?: AgentActivity; private updateActivityTask?: Task; private started = false; - private clientEventsHandler?: ClientEventsHandler; + private sessionHost?: SessionHost; private _chatCtx: ChatContext; private _userData: UserData | undefined; @@ -232,7 +232,8 @@ export class AgentSession< private _interruptionDetection?: InterruptionOptions['mode']; - private _usageCollector: ModelUsageCollector = new ModelUsageCollector(); + /** @internal */ + _usageCollector: ModelUsageCollector = new ModelUsageCollector(); /** @internal */ _roomIO?: RoomIO; @@ -322,9 +323,6 @@ export class AgentSession< ): boolean { const eventData = args[0] as AgentEvent; this._recordedEvents.push(eventData); - if (event === AgentSessionEventTypes.MetricsCollected) { - this._usageCollector.collect((eventData as MetricsCollectedEvent).metrics); - } return super.emit(event, ...args); } @@ -422,9 +420,11 @@ export class AgentSession< this._roomIO.start(); - this.clientEventsHandler = new ClientEventsHandler(this, this._roomIO); + const transport = new RoomSessionTransport(room, this._roomIO); + this.sessionHost = new SessionHost(transport); + this.sessionHost.registerSession(this); if (inputOptions?.textEnabled !== false) { - this.clientEventsHandler.registerTextInput( + this.sessionHost.registerTextInput( inputOptions?.textInputCallback ?? DEFAULT_TEXT_INPUT_CALLBACK, ); } @@ -470,8 +470,8 @@ export class AgentSession< await Promise.allSettled(tasks); - if (this.clientEventsHandler) { - await this.clientEventsHandler.start(); + if (this.sessionHost) { + await this.sessionHost.start(); } // Log used IO configuration @@ -1154,8 +1154,8 @@ export class AgentSession< this.output.audio = null; this.output.transcription = null; - await this.clientEventsHandler?.close(); - this.clientEventsHandler = undefined; + await this.sessionHost?.close(); + this.sessionHost = undefined; await this._roomIO?.close(); this._roomIO = undefined; diff --git a/agents/src/voice/client_events.ts b/agents/src/voice/client_events.ts deleted file mode 100644 index 510331072..000000000 --- a/agents/src/voice/client_events.ts +++ /dev/null @@ -1,838 +0,0 @@ -// SPDX-FileCopyrightText: 2025 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -import type { Room, RpcInvocationData, TextStreamInfo, TextStreamReader } from '@livekit/rtc-node'; -import type { TypedEventEmitter } from '@livekit/typed-emitter'; -import EventEmitter from 'events'; -import type { z } from 'zod'; -import { - RPC_GET_AGENT_INFO, - RPC_GET_CHAT_HISTORY, - RPC_GET_SESSION_STATE, - RPC_SEND_MESSAGE, - TOPIC_AGENT_REQUEST, - TOPIC_AGENT_RESPONSE, - TOPIC_CHAT, - TOPIC_CLIENT_EVENTS, -} from '../constants.js'; -import type { OverlappingSpeechEvent } from '../inference/interruption/types.js'; -import type { ToolContext } from '../llm/tool_context.js'; -import { log } from '../log.js'; -import { Future, Task, cancelAndWait, shortuuid } from '../utils.js'; -import type { AgentSession } from './agent_session.js'; -import { - AgentSessionEventTypes, - type AgentStateChangedEvent, - type ConversationItemAddedEvent, - type ErrorEvent, - type FunctionToolsExecutedEvent, - type MetricsCollectedEvent, - type UserInputTranscribedEvent, - type UserStateChangedEvent, -} from './events.js'; -import type { RoomIO } from './room_io/room_io.js'; -import { - agentMetricsToWire, - agentSessionUsageToWire, - chatItemToWire, - chatMessageToWire, - type clientAgentStateChangedSchema, - type clientConversationItemAddedSchema, - type clientErrorSchema, - clientEventSchema, - type clientFunctionToolsExecutedSchema, - type clientMetricsCollectedSchema, - type clientSessionUsageSchema, - type clientUserInputTranscribedSchema, - type clientUserOverlappingSpeechSchema, - type clientUserStateChangedSchema, - functionCallOutputToWire, - functionCallToWire, - getAgentInfoResponseSchema, - getChatHistoryResponseSchema, - getRTCStatsResponseSchema, - getSessionStateResponseSchema, - getSessionUsageResponseSchema, - msToS, - sendMessageRequestSchema, - sendMessageResponseSchema, - streamRequestSchema, - streamResponseSchema, -} from './wire_format.js'; - -/** @experimental */ -export type ClientAgentStateChangedEvent = z.infer; - -/** @experimental */ -export type ClientUserStateChangedEvent = z.infer; - -/** @experimental */ -export type ClientConversationItemAddedEvent = z.infer; - -/** @experimental */ -export type ClientUserInputTranscribedEvent = z.infer; - -/** @experimental */ -export type ClientFunctionToolsExecutedEvent = z.infer; - -/** @experimental */ -export type ClientMetricsCollectedEvent = z.infer; - -/** @experimental */ -export type ClientErrorEvent = z.infer; - -/** @experimental */ -export type ClientUserOverlappingSpeechEvent = z.infer; - -/** @experimental */ -export type ClientSessionUsageEvent = z.infer; - -/** @experimental */ -export type ClientEvent = z.infer; - -/** @experimental */ -export type ClientEventType = ClientEvent['type']; - -/** @experimental */ -export type StreamRequest = z.infer; - -/** @experimental */ -export type StreamResponse = z.infer; - -/** @experimental */ -export type GetSessionStateRequest = Record; - -/** @experimental */ -export type GetSessionStateResponse = z.infer; - -/** @experimental */ -export type GetChatHistoryRequest = Record; - -/** @experimental */ -export type GetChatHistoryResponse = z.infer; - -/** @experimental */ -export type GetAgentInfoRequest = Record; - -/** @experimental */ -export type GetAgentInfoResponse = z.infer; - -/** @experimental */ -export type SendMessageRequest = z.infer; - -/** @experimental */ -export type SendMessageResponse = z.infer; - -/** @experimental */ -export type GetRTCStatsRequest = Record; - -/** @experimental */ -export type GetRTCStatsResponse = z.infer; - -/** @experimental */ -export type GetSessionUsageRequest = Record; - -/** @experimental */ -export type GetSessionUsageResponse = z.infer; - -function serializeOptions(opts: { - turnHandling?: { - endpointing?: unknown; - interruption?: unknown; - }; - maxToolSteps?: number; - userAwayTimeout?: number | null; - preemptiveGeneration?: boolean; - useTtsAlignedTranscript?: boolean; -}): Record { - return { - endpointing: opts.turnHandling?.endpointing ?? {}, - interruption: opts.turnHandling?.interruption ?? {}, - max_tool_steps: opts.maxToolSteps, - user_away_timeout: opts.userAwayTimeout, - preemptive_generation: opts.preemptiveGeneration, - use_tts_aligned_transcript: opts.useTtsAlignedTranscript, - }; -} - -function toolNames(toolCtx: ToolContext | undefined): string[] { - if (!toolCtx) return []; - return Object.keys(toolCtx); -} - -/** @experimental */ -export type RemoteSessionEventTypes = - | 'agent_state_changed' - | 'user_state_changed' - | 'conversation_item_added' - | 'user_input_transcribed' - | 'function_tools_executed' - | 'metrics_collected' - | 'user_overlapping_speech' - | 'session_usage' - | 'error'; - -/** @experimental */ -export type RemoteSessionCallbacks = { - agent_state_changed: (ev: ClientAgentStateChangedEvent) => void; - user_state_changed: (ev: ClientUserStateChangedEvent) => void; - conversation_item_added: (ev: ClientConversationItemAddedEvent) => void; - user_input_transcribed: (ev: ClientUserInputTranscribedEvent) => void; - function_tools_executed: (ev: ClientFunctionToolsExecutedEvent) => void; - metrics_collected: (ev: ClientMetricsCollectedEvent) => void; - user_overlapping_speech: (ev: ClientUserOverlappingSpeechEvent) => void; - session_usage: (ev: ClientSessionUsageEvent) => void; - error: (ev: ClientErrorEvent) => void; -}; - -export interface TextInputEvent { - text: string; - info: TextStreamInfo; - participantIdentity: string; -} - -export type TextInputCallback = (session: AgentSession, ev: TextInputEvent) => void | Promise; - -/** - * Handles exposing AgentSession state to room participants and allows interaction. - * - * This class provides: - * - Event streaming: Automatically streams AgentSession events to clients via a text stream - * - RPC handlers: Allows clients to request state, chat history, and agent info on demand - * - Text input handling: Receives text messages from clients and generates agent replies - */ - -/** @experimental */ -export class ClientEventsHandler { - private readonly session: AgentSession; - private readonly roomIO: RoomIO; - - private textInputCb?: TextInputCallback; - private textStreamHandlerRegistered = false; - private rpcHandlersRegistered = false; - private requestHandlerRegistered = false; - private eventHandlersRegistered = false; - private started = false; - - private readonly tasks = new Set>(); - private readonly logger = log(); - - constructor(session: AgentSession, roomIO: RoomIO) { - this.session = session; - this.roomIO = roomIO; - } - - private get room(): Room { - return this.roomIO.rtcRoom; - } - - async start(): Promise { - if (this.started) return; - - this.started = true; - this.registerRpcHandlers(); - this.registerRequestHandler(); - this.registerEventHandlers(); - } - - async close(): Promise { - if (!this.started) return; - this.started = false; - - if (this.textStreamHandlerRegistered) { - this.room.unregisterTextStreamHandler(TOPIC_CHAT); - this.textStreamHandlerRegistered = false; - } - - if (this.rpcHandlersRegistered) { - const localParticipant = this.room.localParticipant; - if (localParticipant) { - localParticipant.unregisterRpcMethod(RPC_GET_SESSION_STATE); - localParticipant.unregisterRpcMethod(RPC_GET_CHAT_HISTORY); - localParticipant.unregisterRpcMethod(RPC_GET_AGENT_INFO); - localParticipant.unregisterRpcMethod(RPC_SEND_MESSAGE); - } - this.rpcHandlersRegistered = false; - } - - if (this.requestHandlerRegistered) { - this.room.unregisterTextStreamHandler(TOPIC_AGENT_REQUEST); - this.requestHandlerRegistered = false; - } - - if (this.eventHandlersRegistered) { - this.session.off(AgentSessionEventTypes.AgentStateChanged, this.onAgentStateChanged); - this.session.off(AgentSessionEventTypes.UserStateChanged, this.onUserStateChanged); - this.session.off(AgentSessionEventTypes.ConversationItemAdded, this.onConversationItemAdded); - this.session.off(AgentSessionEventTypes.FunctionToolsExecuted, this.onFunctionToolsExecuted); - this.session.off(AgentSessionEventTypes.MetricsCollected, this.onMetricsCollected); - this.session.off(AgentSessionEventTypes.UserInputTranscribed, this.onUserInputTranscribed); - this.session.off(AgentSessionEventTypes.UserOverlappingSpeech, this.onUserOverlapSpeech); - this.session.off(AgentSessionEventTypes.Error, this.onError); - this.eventHandlersRegistered = false; - } - - await cancelAndWait([...this.tasks]); - this.tasks.clear(); - } - - /** - * Registers a callback to handle text input from clients. - * - * This callback will be called when a client sends a text message to the agent. - * The callback should return a promise that resolves when the text input has been processed. - * - * @param textInputCb - The callback to handle text input. - */ - registerTextInput(textInputCb: TextInputCallback): void { - this.textInputCb = textInputCb; - if (this.textStreamHandlerRegistered) return; - this.room.registerTextStreamHandler(TOPIC_CHAT, this.onUserTextInput); - this.textStreamHandlerRegistered = true; - } - - private registerRpcHandlers(): void { - if (this.rpcHandlersRegistered) return; - - const localParticipant = this.room.localParticipant; - if (!localParticipant) return; - - localParticipant.registerRpcMethod(RPC_GET_SESSION_STATE, this.rpcGetSessionState); - localParticipant.registerRpcMethod(RPC_GET_CHAT_HISTORY, this.rpcGetChatHistory); - localParticipant.registerRpcMethod(RPC_GET_AGENT_INFO, this.rpcGetAgentInfo); - localParticipant.registerRpcMethod(RPC_SEND_MESSAGE, this.rpcSendMessage); - this.rpcHandlersRegistered = true; - } - - private registerRequestHandler(): void { - if (this.requestHandlerRegistered) return; - - this.room.registerTextStreamHandler(TOPIC_AGENT_REQUEST, this.onStreamRequest); - this.requestHandlerRegistered = true; - } - - private registerEventHandlers(): void { - if (this.eventHandlersRegistered) return; - - this.session.on(AgentSessionEventTypes.AgentStateChanged, this.onAgentStateChanged); - this.session.on(AgentSessionEventTypes.UserStateChanged, this.onUserStateChanged); - this.session.on(AgentSessionEventTypes.ConversationItemAdded, this.onConversationItemAdded); - this.session.on(AgentSessionEventTypes.FunctionToolsExecuted, this.onFunctionToolsExecuted); - this.session.on(AgentSessionEventTypes.MetricsCollected, this.onMetricsCollected); - this.session.on(AgentSessionEventTypes.UserInputTranscribed, this.onUserInputTranscribed); - this.session.on(AgentSessionEventTypes.UserOverlappingSpeech, this.onUserOverlapSpeech); - this.session.on(AgentSessionEventTypes.Error, this.onError); - this.eventHandlersRegistered = true; - } - - private onStreamRequest = ( - reader: TextStreamReader, - participantInfo: { identity: string }, - ): void => { - const task = Task.from(async () => this.handleStreamRequest(reader, participantInfo.identity)); - this.trackTask(task); - }; - - private async handleStreamRequest( - reader: TextStreamReader, - participantIdentity: string, - ): Promise { - try { - const data = await reader.readAll(); - const request = streamRequestSchema.parse(JSON.parse(data)); - - let responsePayload = ''; - let error: string | null = null; - - try { - switch (request.method) { - case 'get_session_state': - responsePayload = await this.streamGetSessionState(); - break; - case 'get_chat_history': - responsePayload = await this.streamGetChatHistory(); - break; - case 'get_agent_info': - responsePayload = await this.streamGetAgentInfo(); - break; - case 'send_message': - responsePayload = await this.streamSendMessage(request.payload); - break; - case 'get_rtc_stats': - responsePayload = await this.streamGetRtcStats(); - break; - case 'get_session_usage': - responsePayload = await this.streamGetSessionUsage(); - break; - default: - error = `Unknown method: ${request.method}`; - } - } catch (e) { - error = e instanceof Error ? e.message : String(e); - } - - const response: StreamResponse = { - request_id: request.request_id, - payload: responsePayload, - error, - }; - - const localParticipant = this.room.localParticipant; - await localParticipant!.sendText(JSON.stringify(response), { - topic: TOPIC_AGENT_RESPONSE, - destinationIdentities: [participantIdentity], - }); - } catch (e) { - this.logger.warn({ error: e }, 'failed to handle stream request'); - } - } - - private async streamGetSessionState(): Promise { - const agent = this.session.currentAgent; - - const response: GetSessionStateResponse = { - agent_state: this.session.agentState, - user_state: this.session.userState, - agent_id: agent.id, - options: serializeOptions({ - turnHandling: this.session.options.turnHandling, - maxToolSteps: this.session.options.maxToolSteps, - userAwayTimeout: this.session.options.userAwayTimeout, - preemptiveGeneration: this.session.options.preemptiveGeneration, - useTtsAlignedTranscript: this.session.options.useTtsAlignedTranscript, - }), - created_at: msToS(this.session._startedAt ?? Date.now()), - }; - return JSON.stringify(response); - } - - private async streamGetChatHistory(): Promise { - return JSON.stringify({ - items: this.session.history.items.map(chatItemToWire), - }); - } - - private async streamGetAgentInfo(): Promise { - const agent = this.session.currentAgent; - return JSON.stringify({ - id: agent.id, - instructions: agent.instructions, - tools: toolNames(agent.toolCtx), - chat_ctx: agent.chatCtx.items.map(chatItemToWire), - }); - } - - private async streamSendMessage(payload: string): Promise { - const request = sendMessageRequestSchema.parse(JSON.parse(payload)); - const runResult = this.session.run({ userInput: request.text }); - await runResult.wait(); - return JSON.stringify({ - items: runResult.events.map((ev) => chatItemToWire(ev.item)), - }); - } - - private async streamGetRtcStats(): Promise { - // TODO(parity): map rtc stats fields once getRtcStats API shape is finalized in rtc-node. - return JSON.stringify({ - publisher_stats: [], - subscriber_stats: [], - }); - } - - private async streamGetSessionUsage(): Promise { - return JSON.stringify({ - usage: agentSessionUsageToWire(this.session.usage), - created_at: msToS(Date.now()), - }); - } - - private onUserOverlapSpeech = (event: OverlappingSpeechEvent): void => { - const clientEvent: ClientUserOverlappingSpeechEvent = { - type: 'user_overlapping_speech', - is_interruption: event.isInterruption, - created_at: msToS(event.timestamp), - overlap_started_at: event.overlapStartedAt != null ? msToS(event.overlapStartedAt) : null, - detection_delay: event.detectionDelayInS, - sent_at: msToS(Date.now()), - }; - this.streamClientEvent(clientEvent); - }; - - private onAgentStateChanged = (event: AgentStateChangedEvent): void => { - const clientEvent: ClientAgentStateChangedEvent = { - type: 'agent_state_changed', - old_state: event.oldState, - new_state: event.newState, - created_at: msToS(event.createdAt), - }; - this.streamClientEvent(clientEvent); - }; - - private onUserStateChanged = (event: UserStateChangedEvent): void => { - const clientEvent: ClientUserStateChangedEvent = { - type: 'user_state_changed', - old_state: event.oldState, - new_state: event.newState, - created_at: msToS(event.createdAt), - }; - this.streamClientEvent(clientEvent); - }; - - private onConversationItemAdded = (event: ConversationItemAddedEvent): void => { - if (event.item.type !== 'message') { - return; - } - this.streamClientEvent({ - type: 'conversation_item_added', - item: chatMessageToWire(event.item) as ClientConversationItemAddedEvent['item'], - created_at: msToS(event.createdAt), - }); - }; - - private onUserInputTranscribed = (event: UserInputTranscribedEvent): void => { - this.streamClientEvent({ - type: 'user_input_transcribed', - transcript: event.transcript, - is_final: event.isFinal, - language: event.language, - created_at: msToS(event.createdAt), - }); - }; - - private onFunctionToolsExecuted = (event: FunctionToolsExecutedEvent): void => { - this.streamClientEvent({ - type: 'function_tools_executed', - function_calls: event.functionCalls.map( - functionCallToWire, - ) as ClientFunctionToolsExecutedEvent['function_calls'], - function_call_outputs: event.functionCallOutputs.map((o) => - o - ? (functionCallOutputToWire(o) as NonNullable< - ClientFunctionToolsExecutedEvent['function_call_outputs'][number] - >) - : null, - ), - created_at: msToS(event.createdAt), - }); - }; - - private onMetricsCollected = (event: MetricsCollectedEvent): void => { - this.streamClientEvent({ - type: 'metrics_collected', - metrics: agentMetricsToWire(event.metrics) as ClientMetricsCollectedEvent['metrics'], - created_at: msToS(event.createdAt), - }); - - this.streamClientEvent({ - type: 'session_usage', - usage: agentSessionUsageToWire(this.session.usage) as ClientSessionUsageEvent['usage'], - created_at: msToS(Date.now()), - }); - }; - - private onError = (event: ErrorEvent): void => { - const clientEvent: ClientErrorEvent = { - type: 'error', - message: event.error ? String(event.error) : 'Unknown error', - created_at: msToS(event.createdAt), - }; - this.streamClientEvent(clientEvent); - }; - - private getTargetIdentities(): string[] | null { - const linked = this.roomIO.linkedParticipant; - - // TODO(permissions): check linked.permissions.can_subscribe_metrics - return linked ? [linked.identity] : null; - } - - private streamClientEvent(event: ClientEvent): void { - const task = Task.from(async () => this.sendClientEvent(event)); - this.trackTask(task); - } - - private async sendClientEvent(event: ClientEvent): Promise { - if (!this.room.isConnected) return; - - const destinationIdentities = this.getTargetIdentities(); - if (!destinationIdentities) return; - - try { - const localParticipant = this.room.localParticipant; - if (!localParticipant) return; - - const writer = await localParticipant.streamText({ - topic: TOPIC_CLIENT_EVENTS, - destinationIdentities, - }); - await writer.write(JSON.stringify(event)); - await writer.close(); - } catch (e) { - this.logger.warn({ error: e }, 'failed to stream event to clients'); - } - } - - private rpcGetSessionState = async (): Promise => { - return this.streamGetSessionState(); - }; - - private rpcGetChatHistory = async (): Promise => { - return this.streamGetChatHistory(); - }; - - private rpcGetAgentInfo = async (): Promise => { - return this.streamGetAgentInfo(); - }; - - private rpcSendMessage = async (data: RpcInvocationData): Promise => { - return this.streamSendMessage(data.payload); - }; - - private onUserTextInput = ( - reader: TextStreamReader, - participantInfo: { identity: string }, - ): void => { - const linkedParticipant = this.roomIO.linkedParticipant; - if (linkedParticipant && participantInfo.identity !== linkedParticipant.identity) { - return; - } - - const participant = this.room.remoteParticipants.get(participantInfo.identity); - if (!participant) { - this.logger.warn('participant not found, ignoring text input'); - return; - } - - if (!this.textInputCb) { - this.logger.error('text input callback is not set, ignoring text input'); - return; - } - - const task = Task.from(async () => { - const text = await reader.readAll(); - const result = this.textInputCb!(this.session, { - text, - info: reader.info, - participantIdentity: participantInfo.identity, - }); - - if (result instanceof Promise) { - await result; - } - }); - - this.trackTask(task); - }; - - private trackTask(task: Task): void { - this.tasks.add(task); - task.addDoneCallback(() => { - this.tasks.delete(task); - }); - } -} - -/** - * Client-side interface to interact with a remote AgentSession. - * - * This class allows frontends/clients to: - * - Subscribe to real-time events from the agent session - * - Query session state, chat history, and agent info via RPC - * - Send messages to the agent - * - * Example: - * ```typescript - * const session = new RemoteSession(room, agentIdentity); - * session.on('agent_state_changed', (event) => { - * console.log('Agent state changed:', event.new_state); - * }); - * session.on('user_state_changed', (event) => { - * console.log('User state changed:', event.new_state); - * }); - * session.on('conversation_item_added', (event) => { - * console.log('Conversation item added:', event.item); - * }); - * await session.start(); - * - * const state = await session.fetchSessionState(); - * console.log('Session state:', state); - * - * const response = await session.sendMessage('Hello!'); - * console.log('Response:', response); - * ``` - */ -// TODO: expose this class -/** @experimental */ -export class RemoteSession extends (EventEmitter as new () => TypedEventEmitter) { - private readonly room: Room; - private readonly agentIdentity: string; - private started = false; - - private readonly tasks = new Set>(); - private readonly pendingRequests = new Map>(); - private readonly logger = log(); - - constructor(room: Room, agentIdentity: string) { - super(); - this.room = room; - this.agentIdentity = agentIdentity; - } - - async start(): Promise { - if (this.started) return; - this.started = true; - this.room.registerTextStreamHandler(TOPIC_CLIENT_EVENTS, this.onEventStream); - this.room.registerTextStreamHandler(TOPIC_AGENT_RESPONSE, this.onResponseStream); - } - - async close(): Promise { - if (!this.started) return; - - this.started = false; - this.room.unregisterTextStreamHandler(TOPIC_CLIENT_EVENTS); - this.room.unregisterTextStreamHandler(TOPIC_AGENT_RESPONSE); - - for (const pending of this.pendingRequests.values()) { - pending.reject(new Error('RemoteSession closed')); - } - - this.pendingRequests.clear(); - - await cancelAndWait([...this.tasks]); - this.tasks.clear(); - } - - private onEventStream = ( - reader: TextStreamReader, - participantInfo: { identity: string }, - ): void => { - if (participantInfo.identity !== this.agentIdentity) return; - this.trackTask(Task.from(async () => this.readEvent(reader))); - }; - - private onResponseStream = ( - reader: TextStreamReader, - participantInfo: { identity: string }, - ): void => { - if (participantInfo.identity !== this.agentIdentity) return; - this.trackTask(Task.from(async () => this.readResponse(reader))); - }; - - private async readResponse(reader: TextStreamReader): Promise { - try { - const data = await reader.readAll(); - const response = streamResponseSchema.parse(JSON.parse(data)); - const future = this.pendingRequests.get(response.request_id); - this.pendingRequests.delete(response.request_id); - - if (!future || future.done) return; - future.resolve(response); - } catch (e) { - this.logger.warn({ error: e }, 'failed to read stream response'); - } - } - - private async readEvent(reader: TextStreamReader): Promise { - try { - const data = await reader.readAll(); - const event = this.parseEvent(data); - if (event) { - this.emit(event.type, event as never); - } - } catch (e) { - this.logger.warn({ error: e }, 'failed to parse client event'); - } - } - - private parseEvent(data: string): ClientEvent | null { - try { - const result = clientEventSchema.safeParse(JSON.parse(data)); - if (!result.success) { - this.logger.warn({ error: result.error }, 'failed to validate event'); - return null; - } - return result.data; - } catch (e) { - this.logger.warn({ error: e }, 'failed to parse event'); - return null; - } - } - - private async sendRequest(method: string, payload: string, timeout = 60000): Promise { - const requestId = shortuuid('req_'); - const request: StreamRequest = { - request_id: requestId, - method, - payload, - }; - - const future = new Future(); - this.pendingRequests.set(requestId, future); - - const localParticipant = this.room.localParticipant; - if (!localParticipant) { - this.pendingRequests.delete(requestId); - throw new Error('RemoteSession room has no local participant'); - } - - await localParticipant.sendText(JSON.stringify(request), { - topic: TOPIC_AGENT_REQUEST, - destinationIdentities: [this.agentIdentity], - }); - - const timer = setTimeout(() => { - if (!future.done) { - this.pendingRequests.delete(requestId); - future.reject(new Error(`RemoteSession request timed out: ${method}`)); - } - }, timeout); - - try { - const response = await future.await; - if (response.error) { - throw new Error(response.error); - } - return response.payload; - } finally { - clearTimeout(timer); - } - } - - async fetchSessionState(): Promise { - const raw = JSON.parse(await this.sendRequest('get_session_state', '{}')); - return getSessionStateResponseSchema.parse(raw); - } - - async fetchChatHistory(): Promise { - const raw = JSON.parse(await this.sendRequest('get_chat_history', '{}')); - return getChatHistoryResponseSchema.parse(raw); - } - - async fetchAgentInfo(): Promise { - const raw = JSON.parse(await this.sendRequest('get_agent_info', '{}')); - return getAgentInfoResponseSchema.parse(raw); - } - - async sendMessage(text: string, responseTimeout = 60000): Promise { - const payload = JSON.stringify({ text } satisfies SendMessageRequest); - const raw = JSON.parse(await this.sendRequest('send_message', payload, responseTimeout)); - return sendMessageResponseSchema.parse(raw); - } - - async fetchRtcStats(): Promise { - const raw = JSON.parse(await this.sendRequest('get_rtc_stats', '{}')); - return getRTCStatsResponseSchema.parse(raw); - } - - async fetchSessionUsage(): Promise { - const raw = JSON.parse(await this.sendRequest('get_session_usage', '{}')); - return getSessionUsageResponseSchema.parse(raw); - } - - private trackTask(task: Task): void { - this.tasks.add(task); - task.addDoneCallback(() => { - this.tasks.delete(task); - }); - } -} diff --git a/agents/src/voice/events.ts b/agents/src/voice/events.ts index f063af1ce..c9b7dc8bc 100644 --- a/agents/src/voice/events.ts +++ b/agents/src/voice/events.ts @@ -27,7 +27,7 @@ export enum AgentSessionEventTypes { FunctionToolsExecuted = 'function_tools_executed', MetricsCollected = 'metrics_collected', SpeechCreated = 'speech_created', - UserOverlappingSpeech = 'user_overlapping_speech', + OverlappingSpeech = 'overlapping_speech', Error = 'error', Close = 'close', } diff --git a/agents/src/voice/index.ts b/agents/src/voice/index.ts index ea6573ffe..df3200cea 100644 --- a/agents/src/voice/index.ts +++ b/agents/src/voice/index.ts @@ -5,7 +5,16 @@ export { Agent, AgentTask, StopResponse, type AgentOptions, type ModelSettings } export { AgentSession, type AgentSessionOptions, type VoiceOptions } from './agent_session.js'; export * from './avatar/index.js'; export * from './background_audio.js'; -export { type TextInputCallback, type TextInputEvent } from './client_events.js'; +export { + type TextInputCallback, + type TextInputEvent, + RemoteSession, + type RemoteSessionCallbacks, + type RemoteSessionEventTypes, + SessionHost, + SessionTransport, + RoomSessionTransport, +} from './remote_session.js'; export * from './events.js'; export { type TimedString } from './io.js'; export * from './report.js'; diff --git a/agents/src/voice/remote_session.ts b/agents/src/voice/remote_session.ts new file mode 100644 index 000000000..f58278af0 --- /dev/null +++ b/agents/src/voice/remote_session.ts @@ -0,0 +1,1083 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { Timestamp } from '@bufbuild/protobuf'; +import { AgentSession as pb } from '@livekit/protocol'; +import type { ByteStreamReader, Room, TextStreamInfo } from '@livekit/rtc-node'; +import type { TypedEventEmitter } from '@livekit/typed-emitter'; +import EventEmitter from 'events'; +import { TOPIC_SESSION_MESSAGES } from '../constants.js'; +import type { OverlappingSpeechEvent } from '../inference/interruption/types.js'; +import type { + ChatItem, + FunctionCall as FCItem, + FunctionCallOutput as FCOItem, +} from '../llm/chat_context.js'; +import type { ToolContext } from '../llm/tool_context.js'; +import { log } from '../log.js'; +import type { + InterruptionModelUsage, + LLMModelUsage, + STTModelUsage, + TTSModelUsage, +} from '../metrics/model_usage.js'; +import { Future, Task, shortuuid } from '../utils.js'; +import type { AgentSession, AgentSessionUsage } from './agent_session.js'; +import { + AgentSessionEventTypes, + type AgentState, + type AgentStateChangedEvent, + type ConversationItemAddedEvent, + type ErrorEvent, + type FunctionToolsExecutedEvent, + type MetricsCollectedEvent, + type UserInputTranscribedEvent, + type UserState, + type UserStateChangedEvent, +} from './events.js'; +import type { RoomIO } from './room_io/room_io.js'; + +// =========================================================================== +// Shared types (TextInput, Client event types, wire format aliases) +// =========================================================================== + +export interface TextInputEvent { + text: string; + info?: TextStreamInfo; + participantIdentity?: string; +} + +export type TextInputCallback = (session: AgentSession, ev: TextInputEvent) => void | Promise; + +/** @experimental */ +export type RemoteSessionEventTypes = + | 'agent_state_changed' + | 'user_state_changed' + | 'conversation_item_added' + | 'user_input_transcribed' + | 'function_tools_executed' + | 'overlapping_speech' + | 'session_usage' + | 'error'; + +/** @experimental */ +export type RemoteSessionCallbacks = { + agent_state_changed: (ev: pb.AgentSessionEvent_AgentStateChanged) => void; + user_state_changed: (ev: pb.AgentSessionEvent_UserStateChanged) => void; + conversation_item_added: (ev: pb.AgentSessionEvent_ConversationItemAdded) => void; + user_input_transcribed: (ev: pb.AgentSessionEvent_UserInputTranscribed) => void; + function_tools_executed: (ev: pb.AgentSessionEvent_FunctionToolsExecuted) => void; + overlapping_speech: (ev: pb.AgentSessionEvent_OverlappingSpeech) => void; + session_usage: (ev: pb.AgentSessionEvent_SessionUsageUpdated) => void; + error: (ev: pb.AgentSessionEvent_Error) => void; +}; + +// =========================================================================== +// SessionTransport +// =========================================================================== + +export abstract class SessionTransport { + async start(): Promise {} + abstract sendMessage(msg: pb.AgentSessionMessage): Promise; + abstract close(): Promise; + abstract [Symbol.asyncIterator](): AsyncIterator; +} + +export class RoomSessionTransport extends SessionTransport { + private readonly room: Room; + private handlerRegistered = false; + private closed = false; + private pendingMessages: pb.AgentSessionMessage[] = []; + private waitingResolve: ((value: IteratorResult) => void) | null = null; + private roomIO: RoomIO; + + constructor(room: Room, roomIO: RoomIO) { + super(); + this.room = room; + this.roomIO = roomIO; + } + + private getRemoteIdentity() { + return this.roomIO.linkedParticipant?.identity; + } + + override async start(): Promise { + if (this.handlerRegistered) return; + this.room.registerByteStreamHandler(TOPIC_SESSION_MESSAGES, this.onByteStream); + this.handlerRegistered = true; + } + + private onByteStream = (reader: ByteStreamReader, participantInfo: { identity: string }) => { + if (this.getRemoteIdentity() && participantInfo.identity !== this.getRemoteIdentity()) { + return; + } + this.readStream(reader).catch((e) => { + log().warn({ error: e }, 'failed to read binary stream message'); + }); + }; + + private async readStream(reader: ByteStreamReader): Promise { + try { + const chunks = await reader.readAll(); + let totalLength = 0; + for (const chunk of chunks) { + totalLength += chunk.length; + } + const data = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + data.set(chunk, offset); + offset += chunk.length; + } + const msg = pb.AgentSessionMessage.fromBinary(data); + this.enqueue(msg); + } catch (e) { + if (!this.closed) { + log().warn({ error: e }, 'failed to parse binary stream message'); + } + } + } + + override async sendMessage(msg: pb.AgentSessionMessage): Promise { + if (this.closed || !this.room.isConnected) return; + + try { + const data = msg.toBinary(); + const opts: Record = { + topic: TOPIC_SESSION_MESSAGES, + name: shortuuid('AS_'), + }; + const remoteIdentity = this.getRemoteIdentity(); + if (remoteIdentity) { + opts.destinationIdentities = [remoteIdentity]; + } + const writer = await this.room.localParticipant!.streamBytes(opts); + await writer.write(new Uint8Array(data)); + await writer.close(); + } catch (e) { + log().warn({ error: e }, 'failed to send binary stream message'); + } + } + + override async close(): Promise { + if (this.closed) return; + this.closed = true; + + if (this.handlerRegistered) { + try { + this.room.unregisterByteStreamHandler(TOPIC_SESSION_MESSAGES); + } catch (e) { + log().debug({ error: e }, 'byte stream handler already unregistered'); + } + this.handlerRegistered = false; + } + + if (this.waitingResolve) { + this.waitingResolve({ + value: undefined as unknown as pb.AgentSessionMessage, + done: true, + }); + this.waitingResolve = null; + } + } + + private enqueue(msg: pb.AgentSessionMessage): void { + if (this.closed) return; + + if (this.waitingResolve) { + const resolve = this.waitingResolve; + this.waitingResolve = null; + resolve({ value: msg, done: false }); + } else { + this.pendingMessages.push(msg); + } + } + + override [Symbol.asyncIterator](): AsyncIterator { + return { + next: (): Promise> => { + if (this.closed && this.pendingMessages.length === 0) { + return Promise.resolve({ + value: undefined as unknown as pb.AgentSessionMessage, + done: true, + }); + } + + const pending = this.pendingMessages.shift(); + if (pending) { + return Promise.resolve({ value: pending, done: false }); + } + + return new Promise>((resolve) => { + this.waitingResolve = resolve; + }); + }, + return: (): Promise> => { + this.close(); + return Promise.resolve({ + value: undefined as unknown as pb.AgentSessionMessage, + done: true, + }); + }, + }; + } +} + +// =========================================================================== +// Enum maps +// =========================================================================== +const AGENT_STATE_MAP: Record = { + initializing: pb.AgentState.AS_INITIALIZING, + idle: pb.AgentState.AS_IDLE, + listening: pb.AgentState.AS_LISTENING, + thinking: pb.AgentState.AS_THINKING, + speaking: pb.AgentState.AS_SPEAKING, +}; + +const USER_STATE_MAP: Record = { + speaking: pb.UserState.US_SPEAKING, + listening: pb.UserState.US_LISTENING, + away: pb.UserState.US_AWAY, +}; + +// =========================================================================== +// Chat item / timestamp conversion helpers +// =========================================================================== +function msToTimestamp(ms: number): Timestamp { + return Timestamp.fromDate(new Date(ms)); +} + +function nowTimestamp(): Timestamp { + return Timestamp.fromDate(new Date()); +} + +function chatItemToProto(item: ChatItem): pb.ChatContext_ChatItem { + switch (item.type) { + case 'message': { + const msg = item; + const roleMap: Record = { + developer: pb.ChatRole.DEVELOPER, + system: pb.ChatRole.SYSTEM, + user: pb.ChatRole.USER, + assistant: pb.ChatRole.ASSISTANT, + }; + const content: pb.ChatMessage_ChatContent[] = []; + for (const c of msg.content) { + if (typeof c === 'string') { + content.push(new pb.ChatMessage_ChatContent({ payload: { case: 'text', value: c } })); + } + } + + const metricsReport = new pb.MetricsReport(); + if (msg.metrics.transcriptionDelay !== undefined) + metricsReport.transcriptionDelay = msg.metrics.transcriptionDelay; + if (msg.metrics.endOfTurnDelay !== undefined) + metricsReport.endOfTurnDelay = msg.metrics.endOfTurnDelay; + if (msg.metrics.onUserTurnCompletedDelay !== undefined) + metricsReport.onUserTurnCompletedDelay = msg.metrics.onUserTurnCompletedDelay; + if (msg.metrics.llmNodeTtft !== undefined) + metricsReport.llmNodeTtft = msg.metrics.llmNodeTtft; + if (msg.metrics.ttsNodeTtfb !== undefined) + metricsReport.ttsNodeTtfb = msg.metrics.ttsNodeTtfb; + if (msg.metrics.e2eLatency !== undefined) metricsReport.e2eLatency = msg.metrics.e2eLatency; + + const pbMsg = new pb.ChatMessage({ + id: msg.id, + role: roleMap[msg.role] ?? pb.ChatRole.ASSISTANT, + content, + interrupted: msg.interrupted, + metrics: metricsReport, + createdAt: msToTimestamp(msg.createdAt), + }); + if (msg.transcriptConfidence !== undefined) { + pbMsg.transcriptConfidence = msg.transcriptConfidence; + } + return new pb.ChatContext_ChatItem({ item: { case: 'message', value: pbMsg } }); + } + case 'function_call': { + const fc = item; + return new pb.ChatContext_ChatItem({ + item: { + case: 'functionCall', + value: new pb.FunctionCall({ + id: fc.id, + callId: fc.callId, + name: fc.name, + arguments: fc.args, + createdAt: msToTimestamp(fc.createdAt), + }), + }, + }); + } + case 'function_call_output': { + const fco = item; + return new pb.ChatContext_ChatItem({ + item: { + case: 'functionCallOutput', + value: new pb.FunctionCallOutput({ + id: fco.id, + callId: fco.callId, + name: fco.name, + output: fco.output, + isError: fco.isError, + createdAt: msToTimestamp(fco.createdAt), + }), + }, + }); + } + case 'agent_handoff': { + const ah = item; + return new pb.ChatContext_ChatItem({ + item: { + case: 'agentHandoff', + value: new pb.AgentHandoff({ + id: ah.id, + oldAgentId: ah.oldAgentId, + newAgentId: ah.newAgentId, + createdAt: msToTimestamp(ah.createdAt), + }), + }, + }); + } + } +} + +// =========================================================================== +// Usage conversion helpers +// =========================================================================== +function sessionUsageToProto(usage: AgentSessionUsage): pb.AgentSessionUsage { + const modelUsages: pb.ModelUsage[] = []; + for (const mu of usage.modelUsage) { + switch (mu.type) { + case 'llm_usage': { + const lu = mu as Partial; + modelUsages.push( + new pb.ModelUsage({ + usage: { + case: 'llm', + value: new pb.LLMModelUsage({ + provider: lu.provider ?? '', + model: lu.model ?? '', + inputTokens: lu.inputTokens ?? 0, + inputCachedTokens: lu.inputCachedTokens ?? 0, + inputAudioTokens: lu.inputAudioTokens ?? 0, + inputCachedAudioTokens: lu.inputCachedAudioTokens ?? 0, + inputTextTokens: lu.inputTextTokens ?? 0, + inputCachedTextTokens: lu.inputCachedTextTokens ?? 0, + inputImageTokens: lu.inputImageTokens ?? 0, + inputCachedImageTokens: lu.inputCachedImageTokens ?? 0, + outputTokens: lu.outputTokens ?? 0, + outputAudioTokens: lu.outputAudioTokens ?? 0, + outputTextTokens: lu.outputTextTokens ?? 0, + sessionDuration: (lu.sessionDurationMs ?? 0) / 1000, + }), + }, + }), + ); + break; + } + case 'tts_usage': { + const tu = mu as Partial; + modelUsages.push( + new pb.ModelUsage({ + usage: { + case: 'tts', + value: new pb.TTSModelUsage({ + provider: tu.provider ?? '', + model: tu.model ?? '', + inputTokens: tu.inputTokens ?? 0, + outputTokens: tu.outputTokens ?? 0, + charactersCount: tu.charactersCount ?? 0, + audioDuration: (tu.audioDurationMs ?? 0) / 1000, + }), + }, + }), + ); + break; + } + case 'stt_usage': { + const su = mu as Partial; + modelUsages.push( + new pb.ModelUsage({ + usage: { + case: 'stt', + value: new pb.STTModelUsage({ + provider: su.provider ?? '', + model: su.model ?? '', + inputTokens: su.inputTokens ?? 0, + outputTokens: su.outputTokens ?? 0, + audioDuration: (su.audioDurationMs ?? 0) / 1000, + }), + }, + }), + ); + break; + } + case 'interruption_usage': { + const iu = mu as Partial; + modelUsages.push( + new pb.ModelUsage({ + usage: { + case: 'interruption', + value: new pb.InterruptionModelUsage({ + provider: iu.provider ?? '', + model: iu.model ?? '', + totalRequests: iu.totalRequests ?? 0, + }), + }, + }), + ); + break; + } + } + } + return new pb.AgentSessionUsage({ modelUsage: modelUsages }); +} + +function toolNames(toolCtx: ToolContext | undefined): string[] { + if (!toolCtx) return []; + return Object.keys(toolCtx); +} + +function protoSerializeOptions(opts: { + turnHandling?: { endpointing?: unknown; interruption?: unknown }; + maxToolSteps?: number; + userAwayTimeout?: number | null; + preemptiveGeneration?: boolean; + useTtsAlignedTranscript?: boolean; +}): Record { + return { + endpointing: JSON.stringify(opts.turnHandling?.endpointing ?? {}), + interruption: JSON.stringify(opts.turnHandling?.interruption ?? {}), + max_tool_steps: String(opts.maxToolSteps ?? 0), + user_away_timeout: String(opts.userAwayTimeout ?? ''), + preemptive_generation: String(opts.preemptiveGeneration ?? false), + use_tts_aligned_transcript: String(opts.useTtsAlignedTranscript ?? false), + }; +} + +// =========================================================================== +// SessionHost (protobuf-based server-side handler) +// =========================================================================== +export class SessionHost { + private readonly transport: SessionTransport; + private session: AgentSession | undefined; + private started = false; + private eventsRegistered = false; + private recvTask: Task | undefined; + private readonly tasks = new Set>(); + private textInputCb: TextInputCallback | undefined; + + constructor(transport: SessionTransport) { + this.transport = transport; + } + + registerSession(session: AgentSession): void { + this.session = session; + if (!this.eventsRegistered) { + this.eventsRegistered = true; + session.on(AgentSessionEventTypes.AgentStateChanged, this.onAgentStateChanged); + session.on(AgentSessionEventTypes.UserStateChanged, this.onUserStateChanged); + session.on(AgentSessionEventTypes.ConversationItemAdded, this.onConversationItemAdded); + session.on(AgentSessionEventTypes.UserInputTranscribed, this.onUserInputTranscribed); + session.on(AgentSessionEventTypes.FunctionToolsExecuted, this.onFunctionToolsExecuted); + session.on(AgentSessionEventTypes.MetricsCollected, this.onMetricsCollected); + session.on(AgentSessionEventTypes.OverlappingSpeech, this.onOverlappingSpeech); + session.on(AgentSessionEventTypes.Error, this.onHostError); + } + } + + registerTextInput(textInputCb: TextInputCallback): void { + this.textInputCb = textInputCb; + } + + async start(): Promise { + if (this.started) return; + this.started = true; + await this.transport.start(); + this.recvTask = Task.from(async () => this.recvLoop()); + } + + async close(): Promise { + if (!this.started) return; + this.started = false; + + if (this.session && this.eventsRegistered) { + this.eventsRegistered = false; + this.session.off(AgentSessionEventTypes.AgentStateChanged, this.onAgentStateChanged); + this.session.off(AgentSessionEventTypes.UserStateChanged, this.onUserStateChanged); + this.session.off(AgentSessionEventTypes.ConversationItemAdded, this.onConversationItemAdded); + this.session.off(AgentSessionEventTypes.UserInputTranscribed, this.onUserInputTranscribed); + this.session.off(AgentSessionEventTypes.FunctionToolsExecuted, this.onFunctionToolsExecuted); + this.session.off(AgentSessionEventTypes.MetricsCollected, this.onMetricsCollected); + this.session.off(AgentSessionEventTypes.OverlappingSpeech, this.onOverlappingSpeech); + this.session.off(AgentSessionEventTypes.Error, this.onHostError); + } + + if (this.recvTask) { + this.recvTask.cancel(); + } + + await Promise.allSettled([...this.tasks].map((task) => task.cancelAndWait())); + this.tasks.clear(); + + await this.transport.close(); + } + + private async recvLoop(): Promise { + try { + for await (const msg of this.transport) { + if (msg.message.case === 'request') { + if (this.session) { + this.trackTask( + Task.from(async () => this.handleRequestSafe(msg.message.value as pb.SessionRequest)), + ); + } + } + } + } catch (e) { + if (this.started) { + log().warn({ error: e }, 'error processing session message'); + } + } + } + + private sendEvent(event: pb.AgentSessionEvent): void { + const msg = new pb.AgentSessionMessage({ + message: { case: 'event', value: event }, + }); + this.trackTask(Task.from(async () => this.transport.sendMessage(msg))); + } + + private emitEvent( + event: Event, + createdAt?: number, + ): void { + this.sendEvent( + new pb.AgentSessionEvent({ + createdAt: createdAt ? msToTimestamp(createdAt) : nowTimestamp(), + event: event, + }), + ); + } + + private onAgentStateChanged = (event: AgentStateChangedEvent): void => { + this.emitEvent( + { + case: 'agentStateChanged', + value: new pb.AgentSessionEvent_AgentStateChanged({ + oldState: AGENT_STATE_MAP[event.oldState], + newState: AGENT_STATE_MAP[event.newState], + }), + }, + event.createdAt, + ); + }; + + private onUserStateChanged = (event: UserStateChangedEvent): void => { + this.emitEvent( + { + case: 'userStateChanged', + value: new pb.AgentSessionEvent_UserStateChanged({ + oldState: USER_STATE_MAP[event.oldState], + newState: USER_STATE_MAP[event.newState], + }), + }, + event.createdAt, + ); + }; + + private onUserInputTranscribed = (event: UserInputTranscribedEvent): void => { + this.emitEvent( + { + case: 'userInputTranscribed', + value: new pb.AgentSessionEvent_UserInputTranscribed({ + transcript: event.transcript, + isFinal: event.isFinal, + }), + }, + event.createdAt, + ); + }; + + private onConversationItemAdded = (event: ConversationItemAddedEvent): void => { + this.emitEvent( + { + case: 'conversationItemAdded', + value: new pb.AgentSessionEvent_ConversationItemAdded({ + item: chatItemToProto(event.item), + }), + }, + event.createdAt, + ); + }; + + private onFunctionToolsExecuted = (event: FunctionToolsExecutedEvent): void => { + const pbCalls = event.functionCalls.map( + (fc: FCItem) => new pb.FunctionCall({ name: fc.name, arguments: fc.args, callId: fc.callId }), + ); + const pbOutputs = event.functionCallOutputs + .filter((fco): fco is FCOItem => fco != null) + .map( + (fco: FCOItem) => + new pb.FunctionCallOutput({ + callId: fco.callId, + output: fco.output, + isError: fco.isError, + }), + ); + this.emitEvent( + { + case: 'functionToolsExecuted', + value: new pb.AgentSessionEvent_FunctionToolsExecuted({ + functionCalls: pbCalls, + functionCallOutputs: pbOutputs, + }), + }, + event.createdAt, + ); + }; + + private onOverlappingSpeech = (event: OverlappingSpeechEvent): void => { + const value = new pb.AgentSessionEvent_OverlappingSpeech({ + isInterruption: event.isInterruption, + detectionDelay: event.detectionDelayInS, + detectedAt: msToTimestamp(event.detectedAt), + }); + if (event.overlapStartedAt != null) { + value.overlapStartedAt = msToTimestamp(event.overlapStartedAt); + } + this.emitEvent({ case: 'overlappingSpeech', value }); + }; + + private onMetricsCollected = (event: MetricsCollectedEvent): void => { + if (!this.session) return; + this.emitEvent( + { + case: 'sessionUsageUpdated', + value: new pb.AgentSessionEvent_SessionUsageUpdated({ + usage: sessionUsageToProto(this.session.usage), + }), + }, + event.createdAt, + ); + }; + + private onHostError = (event: ErrorEvent): void => { + this.emitEvent( + { + case: 'error', + value: new pb.AgentSessionEvent_Error({ + message: event.error ? String(event.error) : 'Unknown error', + }), + }, + event.createdAt, + ); + }; + + private async handleRequestSafe(req: pb.SessionRequest): Promise { + try { + await this.handleRequest(req); + } catch (e) { + log().warn({ error: e, requestId: req.requestId }, 'error handling session request'); + try { + const resp = new pb.AgentSessionMessage({ + message: { + case: 'response', + value: new pb.SessionResponse({ + requestId: req.requestId, + error: 'internal error', + }), + }, + }); + await this.transport.sendMessage(resp); + } catch (e) { + log().debug({ error: e }, 'failed to send error response'); + } + } + } + + private async handleRequest(req: pb.SessionRequest): Promise { + if (!this.session) return; + + switch (req.request.case) { + case 'ping': + return this.sendResponse(req.requestId, { + case: 'pong', + value: new pb.SessionResponse_Pong(), + }); + case 'getChatHistory': + return this.handleGetChatHistory(req.requestId); + case 'getAgentInfo': + return this.handleGetAgentInfo(req.requestId); + case 'runInput': + return this.handleRunInput(req.requestId, req.request.value); + case 'getSessionState': + return this.handleGetSessionState(req.requestId); + case 'getRtcStats': + return this.sendResponse(req.requestId, { + case: 'getRtcStats', + value: new pb.SessionResponse_GetRTCStatsResponse({ + publisherStats: [], + subscriberStats: [], + }), + }); + case 'getSessionUsage': + return this.handleGetSessionUsage(req.requestId); + } + } + + private async handleGetChatHistory(requestId: string): Promise { + const items = this.session!.history.items.map(chatItemToProto); + return this.sendResponse(requestId, { + case: 'getChatHistory', + value: new pb.SessionResponse_GetChatHistoryResponse({ items }), + }); + } + + private async handleGetAgentInfo(requestId: string): Promise { + const agent = this.session!.currentAgent; + return this.sendResponse(requestId, { + case: 'getAgentInfo', + value: new pb.SessionResponse_GetAgentInfoResponse({ + id: agent.id, + instructions: agent.instructions, + tools: toolNames(agent.toolCtx), + chatCtx: agent.chatCtx.items.map(chatItemToProto), + }), + }); + } + + private async handleRunInput( + requestId: string, + input: pb.SessionRequest_RunInput, + ): Promise { + const text = input.text; + let items: pb.ChatContext_ChatItem[] = []; + let error: string | undefined; + + if (text) { + if (this.textInputCb) { + const cbResult = this.textInputCb(this.session!, { text }); + if (cbResult instanceof Promise) { + await cbResult; + } + } else { + try { + await this.session!.interrupt({ force: true }).await; + } catch { + // ignore + } + + const result = this.session!.run({ userInput: text }); + try { + await result.wait(); + } catch (e) { + error = e instanceof Error ? e.message : String(e); + } + items = result.events.map((ev) => chatItemToProto(ev.item)); + } + } + + return this.sendResponse( + requestId, + { + case: 'runInput', + value: new pb.SessionResponse_RunInputResponse({ items }), + }, + error, + ); + } + + private async handleGetSessionState(requestId: string): Promise { + const agent = this.session!.currentAgent; + const startedAt = this.session!._startedAt ?? Date.now(); + return this.sendResponse(requestId, { + case: 'getSessionState', + value: new pb.SessionResponse_GetSessionStateResponse({ + agentState: AGENT_STATE_MAP[this.session!.agentState], + userState: USER_STATE_MAP[this.session!.userState], + agentId: agent.id, + options: protoSerializeOptions({ + turnHandling: this.session!.options.turnHandling, + maxToolSteps: this.session!.options.maxToolSteps, + userAwayTimeout: this.session!.options.userAwayTimeout, + preemptiveGeneration: this.session!.options.preemptiveGeneration, + useTtsAlignedTranscript: this.session!.options.useTtsAlignedTranscript, + }), + createdAt: msToTimestamp(startedAt), + }), + }); + } + + private async handleGetSessionUsage(requestId: string): Promise { + return this.sendResponse(requestId, { + case: 'getSessionUsage', + value: new pb.SessionResponse_GetSessionUsageResponse({ + usage: sessionUsageToProto(this.session!.usage), + createdAt: nowTimestamp(), + }), + }); + } + + private async sendResponse( + requestId: string, + response: pb.SessionResponse['response'], + error?: string, + ): Promise { + await this.transport.sendMessage( + new pb.AgentSessionMessage({ + message: { + case: 'response', + value: new pb.SessionResponse({ requestId, response, error }), + }, + }), + ); + } + + private trackTask(task: Task): void { + this.tasks.add(task); + task.addDoneCallback(() => { + this.tasks.delete(task); + }); + } +} + +// =========================================================================== +// RemoteSession (protobuf-based client-side interface) +// =========================================================================== + +/** @experimental */ +export class RemoteSession extends (EventEmitter as new () => TypedEventEmitter) { + private readonly transport: SessionTransport; + private started = false; + + private readonly tasks = new Set>(); + private readonly pendingRequests = new Map>(); + private recvTask: Task | undefined; + private readonly _logger = log(); + + constructor(transport: SessionTransport) { + super(); + this.transport = transport; + } + + static fromRoom(room: Room, roomIO: RoomIO): RemoteSession { + const transport = new RoomSessionTransport(room, roomIO); + return new RemoteSession(transport); + } + + async start(): Promise { + if (this.started) return; + this.started = true; + await this.transport.start(); + this.recvTask = Task.from(async () => this.recvLoop()); + } + + async close(): Promise { + if (!this.started) return; + this.started = false; + + if (this.recvTask) { + this.recvTask.cancel(); + } + + for (const pending of this.pendingRequests.values()) { + pending.reject(new Error('RemoteSession closed')); + } + this.pendingRequests.clear(); + + for (const task of this.tasks) { + task.cancel(); + } + this.tasks.clear(); + + await this.transport.close(); + } + + private async recvLoop(): Promise { + try { + for await (const msg of this.transport) { + switch (msg.message.case) { + case 'event': + this.dispatchEvent(msg.message.value); + break; + case 'response': + this.dispatchResponse(msg.message.value); + break; + } + } + } catch (e) { + if (this.started) { + this._logger.warn({ error: e }, 'error in RemoteSession recv loop'); + } + } + } + + private dispatchEvent(event: pb.AgentSessionEvent): void { + const ev = event.event; + switch (ev.case) { + case 'agentStateChanged': + this.emit('agent_state_changed', ev.value); + break; + case 'userStateChanged': + this.emit('user_state_changed', ev.value); + break; + case 'userInputTranscribed': + this.emit('user_input_transcribed', ev.value); + break; + case 'conversationItemAdded': + this.emit('conversation_item_added', ev.value); + break; + case 'functionToolsExecuted': + this.emit('function_tools_executed', ev.value); + break; + case 'overlappingSpeech': + this.emit('overlapping_speech', ev.value); + break; + case 'sessionUsageUpdated': + this.emit('session_usage', ev.value); + break; + case 'error': + this.emit('error', ev.value); + break; + } + } + + private dispatchResponse(response: pb.SessionResponse): void { + const future = this.pendingRequests.get(response.requestId); + this.pendingRequests.delete(response.requestId); + if (future && !future.done) { + future.resolve(response); + } + } + + private async sendRequest( + buildReq: (requestId: string) => pb.SessionRequest, + timeout = 60000, + ): Promise { + const requestId = shortuuid('req_'); + const req = buildReq(requestId); + req.requestId = requestId; + + const future = new Future(); + this.pendingRequests.set(requestId, future); + + const msg = new pb.AgentSessionMessage({ + message: { case: 'request', value: req }, + }); + await this.transport.sendMessage(msg); + + const timer = setTimeout(() => { + if (!future.done) { + this.pendingRequests.delete(requestId); + future.reject(new Error('RemoteSession request timed out')); + } + }, timeout); + + try { + const response = await future.await; + if (response.error) { + throw new Error(response.error); + } + return response; + } finally { + clearTimeout(timer); + } + } + + async fetchSessionState(): Promise { + const resp = await this.sendRequest( + (id) => + new pb.SessionRequest({ + requestId: id, + request: { case: 'getSessionState', value: new pb.SessionRequest_GetSessionState() }, + }), + ); + if (resp.response.case !== 'getSessionState') { + throw new Error('unexpected response type'); + } + return resp.response.value; + } + + async fetchChatHistory(): Promise { + const resp = await this.sendRequest( + (id) => + new pb.SessionRequest({ + requestId: id, + request: { case: 'getChatHistory', value: new pb.SessionRequest_GetChatHistory() }, + }), + ); + if (resp.response.case !== 'getChatHistory') { + throw new Error('unexpected response type'); + } + return resp.response.value; + } + + async fetchAgentInfo(): Promise { + const resp = await this.sendRequest( + (id) => + new pb.SessionRequest({ + requestId: id, + request: { case: 'getAgentInfo', value: new pb.SessionRequest_GetAgentInfo() }, + }), + ); + if (resp.response.case !== 'getAgentInfo') { + throw new Error('unexpected response type'); + } + return resp.response.value; + } + + async sendMessage( + text: string, + responseTimeout = 60000, + ): Promise { + const resp = await this.sendRequest( + (id) => + new pb.SessionRequest({ + requestId: id, + request: { case: 'runInput', value: new pb.SessionRequest_RunInput({ text }) }, + }), + responseTimeout, + ); + if (resp.response.case !== 'runInput') { + throw new Error('unexpected response type'); + } + return resp.response.value; + } + + async fetchRtcStats(): Promise { + const resp = await this.sendRequest( + (id) => + new pb.SessionRequest({ + requestId: id, + request: { case: 'getRtcStats', value: new pb.SessionRequest_GetRTCStats() }, + }), + ); + if (resp.response.case !== 'getRtcStats') { + throw new Error('unexpected response type'); + } + return resp.response.value; + } + + async fetchSessionUsage(): Promise { + const resp = await this.sendRequest( + (id) => + new pb.SessionRequest({ + requestId: id, + request: { case: 'getSessionUsage', value: new pb.SessionRequest_GetSessionUsage() }, + }), + ); + if (resp.response.case !== 'getSessionUsage') { + throw new Error('unexpected response type'); + } + return resp.response.value; + } + + private trackTask(task: Task): void { + this.tasks.add(task); + task.addDoneCallback(() => { + this.tasks.delete(task); + }); + } +} diff --git a/agents/src/voice/room_io/room_io.ts b/agents/src/voice/room_io/room_io.ts index 7cde75578..da88299d8 100644 --- a/agents/src/voice/room_io/room_io.ts +++ b/agents/src/voice/room_io/room_io.ts @@ -12,16 +12,16 @@ import { type RemoteParticipant, type Room, RoomEvent, + TextStreamReader, TrackPublishOptions, TrackSource, } from '@livekit/rtc-node'; import type { WritableStreamDefaultWriter } from 'node:stream/web'; -import { ATTRIBUTE_PUBLISH_ON_BEHALF } from '../../constants.js'; +import { ATTRIBUTE_PUBLISH_ON_BEHALF, TOPIC_CHAT } from '../../constants.js'; import { log } from '../../log.js'; import { IdentityTransform } from '../../stream/identity_transform.js'; import { Future, Task, waitForAbort } from '../../utils.js'; import { type AgentSession } from '../agent_session.js'; -import type { TextInputCallback } from '../client_events.js'; import { AgentSessionEventTypes, type AgentStateChangedEvent, @@ -29,6 +29,7 @@ import { type UserInputTranscribedEvent, } from '../events.js'; import type { AudioOutput, TextOutput } from '../io.js'; +import type { TextInputCallback } from '../remote_session.js'; import { TranscriptionSynchronizer } from '../transcription/synchronizer.js'; import { ParticipantAudioInputStream } from './_input.js'; import { @@ -127,6 +128,7 @@ export class RoomIO { private agentTranscriptOutput?: ParalellTextOutput; private transcriptionSynchronizer?: TranscriptionSynchronizer; private participantIdentity: string | null = null; + private textStreamHandlerRegistered = false; private participantAvailableFuture: Future = new Future(); private roomConnectedFuture: Future = new Future(); @@ -271,6 +273,37 @@ export class RoomIO { } }; + private onUserTextInput = (reader: TextStreamReader, participantInfo: { identity: string }) => { + if (this.participantIdentity && participantInfo.identity !== this.participantIdentity) { + return; + } + + const participant = this.room.remoteParticipants.get(participantInfo.identity); + if (!participant) { + this.logger.warn('participant not found, ignoring text input'); + return; + } + + const readText = async () => { + const text = await reader.readAll(); + + const textInputResult = this.inputOptions.textInputCallback!(this.agentSession, { + text, + info: reader.info, + participantIdentity: participantInfo.identity, + }); + + // check if callback is a Promise + if (textInputResult instanceof Promise) { + await textInputResult; + } + }; + + readText().catch((error) => { + this.logger.error({ error }, 'Error reading text input'); + }); + }; + private async forwardUserTranscript(signal: AbortSignal): Promise { const reader = this.userTranscriptStream.readable.getReader(); try { @@ -402,6 +435,18 @@ export class RoomIO { start() { // -- create inputs -- + + if (this.inputOptions.textEnabled) { + try { + this.room.registerTextStreamHandler(TOPIC_CHAT, this.onUserTextInput); + this.textStreamHandlerRegistered = true; + } catch (error) { + if (this.inputOptions.textEnabled) { + this.logger.warn(`text stream handler for topic "${TOPIC_CHAT}" already set, ignoring`); + } + } + } + if (this.inputOptions.audioEnabled) { this.audioInput = new ParticipantAudioInputStream({ room: this.room, @@ -476,6 +521,11 @@ export class RoomIO { this.agentSession.off(AgentSessionEventTypes.UserInputTranscribed, this.onUserInputTranscribed); this.agentSession.off(AgentSessionEventTypes.AgentStateChanged, this.onAgentStateChanged); + if (this.textStreamHandlerRegistered) { + this.room.unregisterTextStreamHandler(TOPIC_CHAT); + this.textStreamHandlerRegistered = false; + } + await this.initTask?.cancelAndWait(); // Close stream FIRST so reader.read() in forwardUserTranscript can exit. diff --git a/agents/src/voice/wire_format.ts b/agents/src/voice/wire_format.ts deleted file mode 100644 index 3ea7782e5..000000000 --- a/agents/src/voice/wire_format.ts +++ /dev/null @@ -1,827 +0,0 @@ -// SPDX-FileCopyrightText: 2025 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Explicit wire-format converters that produce the exact JSON shape emitted by -// Python Pydantic models (snake_case keys, durations in seconds). -// The agents-playground frontend (types.ts / useClientEvents.ts) consumes this -// format directly via JSON.parse — any mismatch breaks the UI. -import { z } from 'zod'; -import type { - AgentHandoffItem, - AudioContent, - ChatContent, - ChatItem, - ChatMessage, - FunctionCall, - FunctionCallOutput, - ImageContent, - MetricsReport, -} from '../llm/chat_context.js'; -import type { - AgentMetrics, - EOUMetrics, - InterruptionMetrics, - LLMMetrics, - MetricsMetadata, - RealtimeModelMetrics, - RealtimeModelMetricsCachedTokenDetails, - RealtimeModelMetricsInputTokenDetails, - RealtimeModelMetricsOutputTokenDetails, - STTMetrics, - TTSMetrics, - VADMetrics, -} from '../metrics/base.js'; -import type { - InterruptionModelUsage, - LLMModelUsage, - ModelUsage, - STTModelUsage, - TTSModelUsage, -} from '../metrics/model_usage.js'; -import type { AgentSessionUsage } from './agent_session.js'; - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -type WireObject = Record; - -export function msToS(ms: number): number { - return ms / 1000; -} - -function omitUndefined(obj: WireObject): WireObject { - const result: WireObject = {}; - for (const [k, v] of Object.entries(obj)) { - if (v !== undefined) { - result[k] = v; - } - } - return result; -} - -function imageContentToWire(img: ImageContent): WireObject { - return omitUndefined({ - id: img.id, - type: img.type, - image: typeof img.image === 'string' ? img.image : undefined, - inference_detail: img.inferenceDetail, - inference_width: img.inferenceWidth, - inference_height: img.inferenceHeight, - mime_type: img.mimeType, - }); -} - -function audioContentToWire(audio: AudioContent): WireObject { - return omitUndefined({ - type: audio.type, - transcript: audio.transcript, - }); -} - -function chatContentToWire(content: ChatContent): unknown { - if (typeof content === 'string') return content; - if (content.type === 'image_content') return imageContentToWire(content); - return audioContentToWire(content); -} - -function metricsReportToWire(m: MetricsReport): WireObject { - return omitUndefined({ - started_speaking_at: m.startedSpeakingAt, - stopped_speaking_at: m.stoppedSpeakingAt, - transcription_delay: m.transcriptionDelay, - end_of_turn_delay: m.endOfTurnDelay, - on_user_turn_completed_delay: m.onUserTurnCompletedDelay, - llm_node_ttft: m.llmNodeTtft, - tts_node_ttfb: m.ttsNodeTtfb, - e2e_latency: m.e2eLatency, - }); -} - -export function chatMessageToWire(msg: ChatMessage): WireObject { - const result: WireObject = { - id: msg.id, - type: msg.type, - role: msg.role, - content: msg.content.map(chatContentToWire), - interrupted: msg.interrupted, - created_at: msToS(msg.createdAt), - }; - - if (msg.transcriptConfidence !== undefined) { - result.transcript_confidence = msg.transcriptConfidence; - } - if (Object.keys(msg.metrics).length > 0) { - result.metrics = metricsReportToWire(msg.metrics); - } - if (Object.keys(msg.extra).length > 0) { - result.extra = msg.extra; - } - return result; -} - -export function functionCallToWire(fc: FunctionCall): WireObject { - const result: WireObject = { - id: fc.id, - type: fc.type, - call_id: fc.callId, - arguments: fc.args, - name: fc.name, - created_at: msToS(fc.createdAt), - }; - - if (Object.keys(fc.extra).length > 0) { - result.extra = fc.extra; - } - if (fc.groupId !== undefined) { - result.group_id = fc.groupId; - } - return result; -} - -export function functionCallOutputToWire(fco: FunctionCallOutput): WireObject { - return { - id: fco.id, - type: fco.type, - name: fco.name, - call_id: fco.callId, - output: fco.output, - is_error: fco.isError, - created_at: msToS(fco.createdAt), - }; -} - -export function agentHandoffToWire(ah: AgentHandoffItem): WireObject { - const result: WireObject = { - id: ah.id, - type: ah.type, - new_agent_id: ah.newAgentId, - created_at: msToS(ah.createdAt), - }; - if (ah.oldAgentId !== undefined) { - result.old_agent_id = ah.oldAgentId; - } - return result; -} - -export function chatItemToWire(item: ChatItem): WireObject { - switch (item.type) { - case 'message': - return chatMessageToWire(item); - case 'function_call': - return functionCallToWire(item); - case 'function_call_output': - return functionCallOutputToWire(item); - case 'agent_handoff': - return agentHandoffToWire(item); - } -} - -function metadataToWire(m: MetricsMetadata | undefined): WireObject | null { - if (!m) return null; - return omitUndefined({ - model_name: m.modelName, - model_provider: m.modelProvider, - }); -} - -function llmMetricsToWire(m: LLMMetrics): WireObject { - return omitUndefined({ - type: m.type, - label: m.label, - request_id: m.requestId, - timestamp: msToS(m.timestamp), - duration: msToS(m.durationMs), - ttft: msToS(m.ttftMs), - cancelled: m.cancelled, - completion_tokens: m.completionTokens, - prompt_tokens: m.promptTokens, - prompt_cached_tokens: m.promptCachedTokens, - total_tokens: m.totalTokens, - tokens_per_second: m.tokensPerSecond, - speech_id: m.speechId, - metadata: metadataToWire(m.metadata), - }); -} - -function sttMetricsToWire(m: STTMetrics): WireObject { - return omitUndefined({ - type: m.type, - label: m.label, - request_id: m.requestId, - timestamp: msToS(m.timestamp), - duration: msToS(m.durationMs), - audio_duration: msToS(m.audioDurationMs), - input_tokens: m.inputTokens, - output_tokens: m.outputTokens, - streamed: m.streamed, - metadata: metadataToWire(m.metadata), - }); -} - -function ttsMetricsToWire(m: TTSMetrics): WireObject { - return omitUndefined({ - type: m.type, - label: m.label, - request_id: m.requestId, - timestamp: msToS(m.timestamp), - ttfb: msToS(m.ttfbMs), - duration: msToS(m.durationMs), - audio_duration: msToS(m.audioDurationMs), - cancelled: m.cancelled, - characters_count: m.charactersCount, - input_tokens: m.inputTokens, - output_tokens: m.outputTokens, - streamed: m.streamed, - segment_id: m.segmentId, - speech_id: m.speechId, - metadata: metadataToWire(m.metadata), - }); -} - -function vadMetricsToWire(m: VADMetrics): WireObject { - return { - type: m.type, - label: m.label, - timestamp: msToS(m.timestamp), - idle_time: msToS(m.idleTimeMs), - inference_duration_total: msToS(m.inferenceDurationTotalMs), - inference_count: m.inferenceCount, - }; -} - -function eouMetricsToWire(m: EOUMetrics): WireObject { - return omitUndefined({ - type: m.type, - timestamp: msToS(m.timestamp), - end_of_utterance_delay: msToS(m.endOfUtteranceDelayMs), - transcription_delay: msToS(m.transcriptionDelayMs), - on_user_turn_completed_delay: msToS(m.onUserTurnCompletedDelayMs), - speech_id: m.speechId, - }); -} - -function cachedTokenDetailsToWire(d: RealtimeModelMetricsCachedTokenDetails): WireObject { - return { - audio_tokens: d.audioTokens, - text_tokens: d.textTokens, - image_tokens: d.imageTokens, - }; -} - -function inputTokenDetailsToWire(d: RealtimeModelMetricsInputTokenDetails): WireObject { - return omitUndefined({ - audio_tokens: d.audioTokens, - text_tokens: d.textTokens, - image_tokens: d.imageTokens, - cached_tokens: d.cachedTokens, - cached_tokens_details: d.cachedTokensDetails - ? cachedTokenDetailsToWire(d.cachedTokensDetails) - : undefined, - }); -} - -function outputTokenDetailsToWire(d: RealtimeModelMetricsOutputTokenDetails): WireObject { - return { - text_tokens: d.textTokens, - audio_tokens: d.audioTokens, - image_tokens: d.imageTokens, - }; -} - -function realtimeModelMetricsToWire(m: RealtimeModelMetrics): WireObject { - return omitUndefined({ - type: m.type, - label: m.label, - request_id: m.requestId, - timestamp: msToS(m.timestamp), - duration: msToS(m.durationMs), - session_duration: m.sessionDurationMs !== undefined ? msToS(m.sessionDurationMs) : undefined, - ttft: msToS(m.ttftMs), - cancelled: m.cancelled, - input_tokens: m.inputTokens, - output_tokens: m.outputTokens, - total_tokens: m.totalTokens, - tokens_per_second: m.tokensPerSecond, - input_token_details: inputTokenDetailsToWire(m.inputTokenDetails), - output_token_details: outputTokenDetailsToWire(m.outputTokenDetails), - metadata: metadataToWire(m.metadata), - }); -} - -function interruptionMetricsToWire(m: InterruptionMetrics): WireObject { - return omitUndefined({ - type: m.type, - timestamp: msToS(m.timestamp), - total_duration: msToS(m.totalDuration), - prediction_duration: msToS(m.predictionDuration), - detection_delay: msToS(m.detectionDelay), - num_interruptions: m.numInterruptions, - num_backchannels: m.numBackchannels, - num_requests: m.numRequests, - metadata: metadataToWire(m.metadata), - }); -} - -export function agentMetricsToWire(m: AgentMetrics): WireObject { - switch (m.type) { - case 'llm_metrics': - return llmMetricsToWire(m); - case 'stt_metrics': - return sttMetricsToWire(m); - case 'tts_metrics': - return ttsMetricsToWire(m); - case 'vad_metrics': - return vadMetricsToWire(m); - case 'eou_metrics': - return eouMetricsToWire(m); - case 'realtime_model_metrics': - return realtimeModelMetricsToWire(m); - case 'interruption_metrics': - return interruptionMetricsToWire(m); - } -} - -function llmModelUsageToWire(u: Partial): WireObject { - return { - type: u.type, - provider: u.provider ?? '', - model: u.model ?? '', - input_tokens: u.inputTokens ?? 0, - input_cached_tokens: u.inputCachedTokens ?? 0, - input_audio_tokens: u.inputAudioTokens ?? 0, - input_cached_audio_tokens: u.inputCachedAudioTokens ?? 0, - input_text_tokens: u.inputTextTokens ?? 0, - input_cached_text_tokens: u.inputCachedTextTokens ?? 0, - input_image_tokens: u.inputImageTokens ?? 0, - input_cached_image_tokens: u.inputCachedImageTokens ?? 0, - output_tokens: u.outputTokens ?? 0, - output_audio_tokens: u.outputAudioTokens ?? 0, - output_text_tokens: u.outputTextTokens ?? 0, - session_duration: msToS(u.sessionDurationMs ?? 0), - }; -} - -function ttsModelUsageToWire(u: Partial): WireObject { - return { - type: u.type, - provider: u.provider ?? '', - model: u.model ?? '', - input_tokens: u.inputTokens ?? 0, - output_tokens: u.outputTokens ?? 0, - characters_count: u.charactersCount ?? 0, - audio_duration: msToS(u.audioDurationMs ?? 0), - }; -} - -function sttModelUsageToWire(u: Partial): WireObject { - return { - type: u.type, - provider: u.provider ?? '', - model: u.model ?? '', - input_tokens: u.inputTokens ?? 0, - output_tokens: u.outputTokens ?? 0, - audio_duration: msToS(u.audioDurationMs ?? 0), - }; -} - -function interruptionModelUsageToWire(u: Partial): WireObject { - return { - type: u.type, - provider: u.provider ?? '', - model: u.model ?? '', - total_requests: u.totalRequests ?? 0, - }; -} - -export function modelUsageToWire(u: Partial): WireObject { - switch (u.type) { - case 'llm_usage': - return llmModelUsageToWire(u as Partial); - case 'tts_usage': - return ttsModelUsageToWire(u as Partial); - case 'stt_usage': - return sttModelUsageToWire(u as Partial); - case 'interruption_usage': - return interruptionModelUsageToWire(u as Partial); - default: - return u as WireObject; - } -} - -export function agentSessionUsageToWire(u: AgentSessionUsage): WireObject { - return { - model_usage: u.modelUsage.map(modelUsageToWire), - }; -} - -// =========================================================================== -// Zod wire-format schemas -// These validate the exact JSON shape that Python Pydantic emits on the wire. -// Inferred types via z.infer give fully typed parse results. -// =========================================================================== -const imageContentWireSchema = z.object({ - id: z.string(), - type: z.literal('image_content'), - image: z.string(), - inference_detail: z.enum(['auto', 'high', 'low']).optional(), - inference_width: z.number().optional(), - inference_height: z.number().optional(), - mime_type: z.string().optional(), -}); - -const audioContentWireSchema = z.object({ - type: z.literal('audio_content'), - transcript: z.string().nullable().optional(), -}); - -const chatContentWireSchema = z.union([z.string(), imageContentWireSchema, audioContentWireSchema]); - -const metricsReportWireSchema = z - .object({ - started_speaking_at: z.number().optional(), - stopped_speaking_at: z.number().optional(), - transcription_delay: z.number().optional(), - end_of_turn_delay: z.number().optional(), - on_user_turn_completed_delay: z.number().optional(), - llm_node_ttft: z.number().optional(), - tts_node_ttfb: z.number().optional(), - e2e_latency: z.number().optional(), - }) - .optional(); - -export const chatMessageWireSchema = z.object({ - id: z.string(), - type: z.literal('message'), - role: z.enum(['developer', 'system', 'user', 'assistant']), - content: z.array(chatContentWireSchema), - interrupted: z.boolean(), - created_at: z.number(), - transcript_confidence: z.number().optional(), - metrics: metricsReportWireSchema, - extra: z.record(z.string(), z.unknown()).optional(), -}); - -export const functionCallWireSchema = z.object({ - id: z.string(), - type: z.literal('function_call'), - call_id: z.string(), - arguments: z.string(), - name: z.string(), - created_at: z.number(), - extra: z.record(z.string(), z.unknown()).optional(), - group_id: z.string().optional(), -}); - -export const functionCallOutputWireSchema = z.object({ - id: z.string(), - type: z.literal('function_call_output'), - name: z.string(), - call_id: z.string(), - output: z.string(), - is_error: z.boolean(), - created_at: z.number(), -}); - -export const agentHandoffWireSchema = z.object({ - id: z.string(), - type: z.literal('agent_handoff'), - new_agent_id: z.string(), - created_at: z.number(), - old_agent_id: z.string().optional(), -}); - -export const chatItemWireSchema = z.discriminatedUnion('type', [ - chatMessageWireSchema, - functionCallWireSchema, - functionCallOutputWireSchema, - agentHandoffWireSchema, -]); - -const metadataWireSchema = z - .object({ - model_name: z.string().optional(), - model_provider: z.string().optional(), - }) - .nullable() - .optional(); - -export const llmMetricsWireSchema = z.object({ - type: z.literal('llm_metrics'), - label: z.string(), - request_id: z.string(), - timestamp: z.number(), - duration: z.number(), - ttft: z.number(), - cancelled: z.boolean(), - completion_tokens: z.number(), - prompt_tokens: z.number(), - prompt_cached_tokens: z.number(), - total_tokens: z.number(), - tokens_per_second: z.number(), - speech_id: z.string().nullable().optional(), - metadata: metadataWireSchema, -}); - -export const sttMetricsWireSchema = z.object({ - type: z.literal('stt_metrics'), - label: z.string(), - request_id: z.string(), - timestamp: z.number(), - duration: z.number(), - audio_duration: z.number(), - input_tokens: z.number().optional(), - output_tokens: z.number().optional(), - streamed: z.boolean(), - metadata: metadataWireSchema, -}); - -export const ttsMetricsWireSchema = z.object({ - type: z.literal('tts_metrics'), - label: z.string(), - request_id: z.string(), - timestamp: z.number(), - ttfb: z.number(), - duration: z.number(), - audio_duration: z.number(), - cancelled: z.boolean(), - characters_count: z.number(), - input_tokens: z.number().optional(), - output_tokens: z.number().optional(), - streamed: z.boolean(), - segment_id: z.string().nullable().optional(), - speech_id: z.string().nullable().optional(), - metadata: metadataWireSchema, -}); - -export const vadMetricsWireSchema = z.object({ - type: z.literal('vad_metrics'), - label: z.string(), - timestamp: z.number(), - idle_time: z.number(), - inference_duration_total: z.number(), - inference_count: z.number(), -}); - -export const eouMetricsWireSchema = z.object({ - type: z.literal('eou_metrics'), - timestamp: z.number(), - end_of_utterance_delay: z.number(), - transcription_delay: z.number(), - on_user_turn_completed_delay: z.number(), - speech_id: z.string().nullable().optional(), -}); - -const cachedTokenDetailsWireSchema = z.object({ - audio_tokens: z.number(), - text_tokens: z.number(), - image_tokens: z.number(), -}); - -const inputTokenDetailsWireSchema = z.object({ - audio_tokens: z.number(), - text_tokens: z.number(), - image_tokens: z.number(), - cached_tokens: z.number(), - cached_tokens_details: cachedTokenDetailsWireSchema.nullable().optional(), -}); - -const outputTokenDetailsWireSchema = z.object({ - text_tokens: z.number(), - audio_tokens: z.number(), - image_tokens: z.number(), -}); - -export const realtimeModelMetricsWireSchema = z.object({ - type: z.literal('realtime_model_metrics'), - label: z.string(), - request_id: z.string(), - timestamp: z.number(), - duration: z.number(), - session_duration: z.number().optional(), - ttft: z.number(), - cancelled: z.boolean(), - input_tokens: z.number(), - output_tokens: z.number(), - total_tokens: z.number(), - tokens_per_second: z.number(), - input_token_details: inputTokenDetailsWireSchema, - output_token_details: outputTokenDetailsWireSchema, - metadata: metadataWireSchema, -}); - -export const interruptionMetricsWireSchema = z.object({ - type: z.literal('interruption_metrics'), - timestamp: z.number(), - total_duration: z.number(), - prediction_duration: z.number(), - detection_delay: z.number(), - num_interruptions: z.number(), - num_backchannels: z.number(), - num_requests: z.number(), - metadata: metadataWireSchema, -}); - -export const agentMetricsWireSchema = z.discriminatedUnion('type', [ - llmMetricsWireSchema, - sttMetricsWireSchema, - ttsMetricsWireSchema, - vadMetricsWireSchema, - eouMetricsWireSchema, - realtimeModelMetricsWireSchema, - interruptionMetricsWireSchema, -]); - -// --------------------------------------------------------------------------- -// Model usage schemas -// --------------------------------------------------------------------------- - -export const llmModelUsageWireSchema = z.object({ - type: z.literal('llm_usage'), - provider: z.string().optional(), - model: z.string().optional(), - input_tokens: z.number().optional(), - input_cached_tokens: z.number().optional(), - input_audio_tokens: z.number().optional(), - input_cached_audio_tokens: z.number().optional(), - input_text_tokens: z.number().optional(), - input_cached_text_tokens: z.number().optional(), - input_image_tokens: z.number().optional(), - input_cached_image_tokens: z.number().optional(), - output_tokens: z.number().optional(), - output_audio_tokens: z.number().optional(), - output_text_tokens: z.number().optional(), - session_duration: z.number().optional(), -}); - -export const ttsModelUsageWireSchema = z.object({ - type: z.literal('tts_usage'), - provider: z.string().optional(), - model: z.string().optional(), - input_tokens: z.number().optional(), - output_tokens: z.number().optional(), - characters_count: z.number().optional(), - audio_duration: z.number().optional(), -}); - -export const sttModelUsageWireSchema = z.object({ - type: z.literal('stt_usage'), - provider: z.string().optional(), - model: z.string().optional(), - input_tokens: z.number().optional(), - output_tokens: z.number().optional(), - audio_duration: z.number().optional(), -}); - -export const interruptionModelUsageWireSchema = z.object({ - type: z.literal('interruption_usage'), - provider: z.string().optional(), - model: z.string().optional(), - total_requests: z.number().optional(), -}); - -export const modelUsageWireSchema = z.discriminatedUnion('type', [ - llmModelUsageWireSchema, - ttsModelUsageWireSchema, - sttModelUsageWireSchema, - interruptionModelUsageWireSchema, -]); - -export const agentSessionUsageWireSchema = z.object({ - model_usage: z.array(modelUsageWireSchema), -}); - -// --------------------------------------------------------------------------- -// Client event schemas -// --------------------------------------------------------------------------- - -const agentStateSchema = z.enum(['initializing', 'idle', 'listening', 'thinking', 'speaking']); -const userStateSchema = z.enum(['speaking', 'listening', 'away']); - -export const clientAgentStateChangedSchema = z.object({ - type: z.literal('agent_state_changed'), - old_state: agentStateSchema, - new_state: agentStateSchema, - created_at: z.number(), -}); - -export const clientUserStateChangedSchema = z.object({ - type: z.literal('user_state_changed'), - old_state: userStateSchema, - new_state: userStateSchema, - created_at: z.number(), -}); - -export const clientConversationItemAddedSchema = z.object({ - type: z.literal('conversation_item_added'), - item: chatMessageWireSchema, - created_at: z.number(), -}); - -export const clientUserInputTranscribedSchema = z.object({ - type: z.literal('user_input_transcribed'), - transcript: z.string(), - is_final: z.boolean(), - language: z.string().nullable(), - created_at: z.number(), -}); - -export const clientFunctionToolsExecutedSchema = z.object({ - type: z.literal('function_tools_executed'), - function_calls: z.array(functionCallWireSchema), - function_call_outputs: z.array(functionCallOutputWireSchema.nullable()), - created_at: z.number(), -}); - -export const clientMetricsCollectedSchema = z.object({ - type: z.literal('metrics_collected'), - metrics: agentMetricsWireSchema, - created_at: z.number(), -}); - -export const clientErrorSchema = z.object({ - type: z.literal('error'), - message: z.string(), - created_at: z.number(), -}); - -export const clientUserOverlappingSpeechSchema = z.object({ - type: z.literal('user_overlapping_speech'), - is_interruption: z.boolean(), - created_at: z.number(), - sent_at: z.number(), - detection_delay: z.number(), - overlap_started_at: z.number().nullable(), -}); - -export const clientSessionUsageSchema = z.object({ - type: z.literal('session_usage'), - usage: agentSessionUsageWireSchema, - created_at: z.number(), -}); - -export const clientEventSchema = z.discriminatedUnion('type', [ - clientAgentStateChangedSchema, - clientUserStateChangedSchema, - clientConversationItemAddedSchema, - clientUserInputTranscribedSchema, - clientFunctionToolsExecutedSchema, - clientMetricsCollectedSchema, - clientErrorSchema, - clientUserOverlappingSpeechSchema, - clientSessionUsageSchema, -]); - -// --------------------------------------------------------------------------- -// RPC schemas -// --------------------------------------------------------------------------- - -export const sendMessageRequestSchema = z.object({ - text: z.string(), -}); - -export const streamRequestSchema = z.object({ - request_id: z.string(), - method: z.string(), - payload: z.string(), -}); - -export const streamResponseSchema = z.object({ - request_id: z.string(), - payload: z.string(), - error: z.string().nullable().optional(), -}); - -export const getSessionStateResponseSchema = z.object({ - agent_state: agentStateSchema, - user_state: userStateSchema, - agent_id: z.string(), - options: z.record(z.string(), z.unknown()), - created_at: z.number(), -}); - -export const getChatHistoryResponseSchema = z.object({ - items: z.array(chatItemWireSchema), -}); - -export const getAgentInfoResponseSchema = z.object({ - id: z.string(), - instructions: z.string().nullable(), - tools: z.array(z.string()), - chat_ctx: z.array(chatItemWireSchema), -}); - -export const sendMessageResponseSchema = z.object({ - items: z.array(chatItemWireSchema), -}); - -export const getRTCStatsResponseSchema = z.object({ - publisher_stats: z.array(z.record(z.string(), z.unknown())), - subscriber_stats: z.array(z.record(z.string(), z.unknown())), -}); - -export const getSessionUsageResponseSchema = z.object({ - usage: agentSessionUsageWireSchema, - created_at: z.number(), -}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index da693782b..c3e8fce94 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -100,6 +100,9 @@ importers: agents: dependencies: + '@bufbuild/protobuf': + specifier: ^1.10.0 + version: 1.10.1 '@ffmpeg-installer/ffmpeg': specifier: ^1.1.0 version: 1.1.0 @@ -107,8 +110,8 @@ importers: specifier: ^1.1.1 version: 1.1.1 '@livekit/protocol': - specifier: ^1.43.0 - version: 1.43.0 + specifier: ^1.45.1 + version: 1.45.1 '@livekit/typed-emitter': specifier: ^3.0.0 version: 3.0.0 @@ -2020,8 +2023,8 @@ packages: cpu: [x64] os: [win32] - '@livekit/protocol@1.43.0': - resolution: {integrity: sha512-WCJ97fa4CBqPDh8pzdszOm/2xmelJ3Dx2vjKBlyb9BzmPQx1LjzVciP6uYFFMCMdrq2l1mjFQBXEz8Z20UCkyw==} + '@livekit/protocol@1.45.1': + resolution: {integrity: sha512-sr6p0TwKofHO5KW6kUzjq4hH2de4Al5scQo824xFnyI1XYo0qQn6fTG+bdr+Uj4EedjYAOqjezwUju5OErVIRA==} '@livekit/rtc-node-darwin-arm64@0.13.24': resolution: {integrity: sha512-gm5xOpGu6Rj/mNU2jEijcGhQGN2GdxV2dNYQm3NCKN7ow0BmMFZvXSCAWOWf+9oTutPXHnrc7EN1mt2v+lfqhA==} @@ -6269,7 +6272,7 @@ snapshots: '@livekit/noise-cancellation-win32-x64@0.1.9': optional: true - '@livekit/protocol@1.43.0': + '@livekit/protocol@1.45.1': dependencies: '@bufbuild/protobuf': 1.10.1 @@ -8632,14 +8635,14 @@ snapshots: livekit-server-sdk@2.13.3: dependencies: '@bufbuild/protobuf': 1.10.1 - '@livekit/protocol': 1.43.0 + '@livekit/protocol': 1.45.1 camelcase-keys: 9.1.3 jose: 5.2.4 livekit-server-sdk@2.14.1: dependencies: '@bufbuild/protobuf': 1.10.1 - '@livekit/protocol': 1.43.0 + '@livekit/protocol': 1.45.1 camelcase-keys: 9.1.3 jose: 5.2.4