Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@
"zod": "^3.25.76"
},
"dependencies": {
"@bufbuild/protobuf": "^1.10.0",
"@ffmpeg-installer/ffmpeg": "^1.1.0",
"@livekit/mutex": "^1.1.1",
"@livekit/protocol": "^1.43.0",
"@livekit/protocol": "^1.45.1",
"@livekit/typed-emitter": "^3.0.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/api-logs": "^0.54.0",
Expand Down
1 change: 1 addition & 0 deletions agents/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ export const RPC_GET_AGENT_INFO = 'lk.agent.get_agent_info';
export const RPC_SEND_MESSAGE = 'lk.agent.send_message';
export const TOPIC_AGENT_REQUEST = 'lk.agent.request';
export const TOPIC_AGENT_RESPONSE = 'lk.agent.response';
export const TOPIC_SESSION_MESSAGES = 'lk.agent.session';
4 changes: 2 additions & 2 deletions agents/src/inference/interruption/http_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ export function createHttpTransport(
updateUserSpeakingSpan(entry);
}
const event: OverlappingSpeechEvent = {
type: 'user_overlapping_speech',
timestamp: Date.now(),
type: 'overlapping_speech',
detectedAt: Date.now(),
overlapStartedAt: overlapSpeechStartedAt,
isInterruption: entry.isInterruption,
speechInput: entry.speechInput,
Expand Down
2 changes: 1 addition & 1 deletion agents/src/inference/interruption/interruption_detector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { InterruptionStreamBase } from './interruption_stream.js';
import type { InterruptionOptions, OverlappingSpeechEvent } from './types.js';

type InterruptionCallbacks = {
user_overlapping_speech: (event: OverlappingSpeechEvent) => void;
overlapping_speech: (event: OverlappingSpeechEvent) => void;
metrics_collected: (metrics: InterruptionMetrics) => void;
error: (error: InterruptionDetectionError) => void;
};
Expand Down
8 changes: 4 additions & 4 deletions agents/src/inference/interruption/interruption_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ export class InterruptionStreamBase {
}
const e = latestEntry ?? InterruptionCacheEntry.default();
const event: OverlappingSpeechEvent = {
type: 'user_overlapping_speech',
timestamp: chunk.endedAt,
type: 'overlapping_speech',
detectedAt: chunk.endedAt,
isInterruption: false,
overlapStartedAt: this.overlapSpeechStartedAt,
speechInput: e.speechInput,
Expand Down Expand Up @@ -334,11 +334,11 @@ export class InterruptionStreamBase {

const eventEmitter = new TransformStream<OverlappingSpeechEvent, OverlappingSpeechEvent>({
transform: (chunk, controller) => {
this.model.emit('user_overlapping_speech', chunk);
this.model.emit('overlapping_speech', chunk);

const metrics: InterruptionMetrics = {
type: 'interruption_metrics',
timestamp: chunk.timestamp,
timestamp: chunk.detectedAt,
totalDuration: chunk.totalDurationInS * 1000,
predictionDuration: chunk.predictionDurationInS * 1000,
detectionDelay: chunk.detectionDelayInS * 1000,
Expand Down
4 changes: 2 additions & 2 deletions agents/src/inference/interruption/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import type { Span } from '@opentelemetry/api';

export interface OverlappingSpeechEvent {
type: 'user_overlapping_speech';
timestamp: number;
type: 'overlapping_speech';
detectedAt: number;
isInterruption: boolean;
totalDurationInS: number;
predictionDurationInS: number;
Expand Down
4 changes: 2 additions & 2 deletions agents/src/inference/interruption/ws_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ export function createWsTransport(
);

const event: OverlappingSpeechEvent = {
type: 'user_overlapping_speech',
timestamp: Date.now(),
type: 'overlapping_speech',
detectedAt: Date.now(),
isInterruption: true,
totalDurationInS: entry.totalDurationInS,
predictionDurationInS: entry.predictionDurationInS,
Expand Down
14 changes: 7 additions & 7 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,11 @@ export class AgentActivity implements RecognitionHooks {
this.onError(ev);

private readonly onInterruptionOverlappingSpeech = (ev: OverlappingSpeechEvent): void => {
this.agentSession.emit(AgentSessionEventTypes.UserOverlappingSpeech, ev);
this.agentSession.emit(AgentSessionEventTypes.OverlappingSpeech, ev);
};

private readonly onInterruptionMetricsCollected = (ev: InterruptionMetrics): void => {
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
this.agentSession._usageCollector.collect(ev);
this.agentSession.emit(
AgentSessionEventTypes.MetricsCollected,
createMetricsCollectedEvent({ metrics: ev }),
Expand Down Expand Up @@ -698,6 +699,8 @@ export class AgentActivity implements RecognitionHooks {
}
}

this.agentSession._usageCollector.collect(ev);

this.agentSession.emit(
AgentSessionEventTypes.MetricsCollected,
createMetricsCollectedEvent({ metrics: ev }),
Expand Down Expand Up @@ -935,7 +938,7 @@ export class AgentActivity implements RecognitionHooks {
this.restoreInterruptionByAudioActivity();
this.interruptByAudioActivity();
if (this.audioRecognition) {
this.audioRecognition.onEndOfAgentSpeech(ev.overlapStartedAt || ev.timestamp);
this.audioRecognition.onEndOfAgentSpeech(ev.overlapStartedAt || ev.detectedAt);
}
}

Expand Down Expand Up @@ -2842,10 +2845,7 @@ export class AgentActivity implements RecognitionHooks {
await this._mainTask.cancelAndWait();
}
if (this.interruptionDetector) {
this.interruptionDetector.off(
'user_overlapping_speech',
this.onInterruptionOverlappingSpeech,
);
this.interruptionDetector.off('overlapping_speech', this.onInterruptionOverlappingSpeech);
this.interruptionDetector.off('metrics_collected', this.onInterruptionMetricsCollected);
this.interruptionDetector.off('error', this.onInterruptionError);
}
Expand Down Expand Up @@ -2888,7 +2888,7 @@ export class AgentActivity implements RecognitionHooks {
try {
const detector = new AdaptiveInterruptionDetector();

detector.on('user_overlapping_speech', this.onInterruptionOverlappingSpeech);
detector.on('overlapping_speech', this.onInterruptionOverlappingSpeech);
detector.on('metrics_collected', this.onInterruptionMetricsCollected);
detector.on('error', this.onInterruptionError);

Expand Down
26 changes: 13 additions & 13 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import type { VAD } from '../vad.js';
import type { Agent } from './agent.js';
import { AgentActivity } from './agent_activity.js';
import type { _TurnDetector } from './audio_recognition.js';
import { ClientEventsHandler } from './client_events.js';
import {
type AgentEvent,
AgentSessionEventTypes,
Expand All @@ -65,6 +64,7 @@ import {
} from './events.js';
import { AgentInput, AgentOutput } from './io.js';
import { RecorderIO } from './recorder_io/index.js';
import { RoomSessionTransport, SessionHost } from './remote_session.js';
import {
DEFAULT_TEXT_INPUT_CALLBACK,
RoomIO,
Expand Down Expand Up @@ -162,7 +162,7 @@ export type AgentSessionCallbacks = {
[AgentSessionEventTypes.SpeechCreated]: (ev: SpeechCreatedEvent) => void;
[AgentSessionEventTypes.Error]: (ev: ErrorEvent) => void;
[AgentSessionEventTypes.Close]: (ev: CloseEvent) => void;
[AgentSessionEventTypes.UserOverlappingSpeech]: (ev: OverlappingSpeechEvent) => void;
[AgentSessionEventTypes.OverlappingSpeech]: (ev: OverlappingSpeechEvent) => void;
};

export type AgentSessionOptions<UserData = UnknownUserData> = {
Expand Down Expand Up @@ -204,7 +204,7 @@ export class AgentSession<
private nextActivity?: AgentActivity;
private updateActivityTask?: Task<void>;
private started = false;
private clientEventsHandler?: ClientEventsHandler;
private sessionHost?: SessionHost;

private _chatCtx: ChatContext;
private _userData: UserData | undefined;
Expand Down Expand Up @@ -232,7 +232,8 @@ export class AgentSession<

private _interruptionDetection?: InterruptionOptions['mode'];

private _usageCollector: ModelUsageCollector = new ModelUsageCollector();
/** @internal */
_usageCollector: ModelUsageCollector = new ModelUsageCollector();

/** @internal */
_roomIO?: RoomIO;
Expand Down Expand Up @@ -322,9 +323,6 @@ export class AgentSession<
): boolean {
const eventData = args[0] as AgentEvent;
this._recordedEvents.push(eventData);
if (event === AgentSessionEventTypes.MetricsCollected) {
this._usageCollector.collect((eventData as MetricsCollectedEvent).metrics);
}
return super.emit(event, ...args);
}

Expand Down Expand Up @@ -422,9 +420,11 @@ export class AgentSession<

this._roomIO.start();

this.clientEventsHandler = new ClientEventsHandler(this, this._roomIO);
const transport = new RoomSessionTransport(room, this._roomIO);
this.sessionHost = new SessionHost(transport);
this.sessionHost.registerSession(this);
if (inputOptions?.textEnabled !== false) {
this.clientEventsHandler.registerTextInput(
this.sessionHost.registerTextInput(
inputOptions?.textInputCallback ?? DEFAULT_TEXT_INPUT_CALLBACK,
);
}
Expand Down Expand Up @@ -470,8 +470,8 @@ export class AgentSession<

await Promise.allSettled(tasks);

if (this.clientEventsHandler) {
await this.clientEventsHandler.start();
if (this.sessionHost) {
await this.sessionHost.start();
}

// Log used IO configuration
Expand Down Expand Up @@ -1154,8 +1154,8 @@ export class AgentSession<
this.output.audio = null;
this.output.transcription = null;

await this.clientEventsHandler?.close();
this.clientEventsHandler = undefined;
await this.sessionHost?.close();
this.sessionHost = undefined;

await this._roomIO?.close();
this._roomIO = undefined;
Expand Down
Loading