Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,12 @@ async def _wait_for_eou() -> None:

await _wait_for_eou()

if self._session._user_turn_claims > 0:
# `AgentSession.claim_user_turn` is holding idle open
await self._session._user_turn_released.wait()
agent_active = wait_for_agent
user_active = wait_for_user

# -- Realtime Session events --

def _on_metrics_collected(
Expand Down
41 changes: 39 additions & 2 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import asyncio
import copy
import time
from collections.abc import AsyncIterable, Callable, Sequence
from contextlib import AbstractContextManager, nullcontext
from collections.abc import AsyncIterable, AsyncIterator, Callable, Sequence
from contextlib import AbstractContextManager, asynccontextmanager, nullcontext
from contextvars import Token
from dataclasses import dataclass
from types import TracebackType
Expand Down Expand Up @@ -452,6 +452,12 @@ def __init__(
self._closing: bool = False
self._job_context_cb_registered: bool = False

# count of active `claim_user_turn` scopes. while > 0, `wait_for_inactive`
# is held open and `user_state` is pinned to "speaking"
self._user_turn_claims: int = 0
self._user_turn_released: asyncio.Event = asyncio.Event()
self._user_turn_released.set()

self._global_run_state: RunResult | None = None
# TODO(theomonnom): need a better way to expose early assistant metrics
self._early_assistant_metrics: MetricsReport | None = None
Expand Down Expand Up @@ -1215,6 +1221,32 @@ def interrupt(self, *, force: bool = False) -> asyncio.Future[None]:

return self._activity.interrupt(force=force)

@asynccontextmanager
async def claim_user_turn(self) -> AsyncIterator[None]:
"""Declare a programmatic user-driven turn.

Pins ``user_state`` to ``"speaking"`` and holds ``wait_for_inactive``
open until release. On release, ``user_state`` is re-derived from the
audio path. Reentrant and session-scoped (survives handoff).

Use in custom ``text_input_cb`` or any flow that drives a user turn
across awaits.
"""
first = self._user_turn_claims == 0
self._user_turn_claims += 1
if first:
self._user_turn_released.clear()
self._update_user_state("speaking", last_speaking_time=time.time())
try:
yield
finally:
self._user_turn_claims -= 1
if self._user_turn_claims == 0:
self._user_turn_released.set()
activity = self._activity
speaking = activity is not None and not activity._user_silence_event.is_set()
self._update_user_state("speaking" if speaking else "listening")

def clear_user_turn(self) -> None:
# clear the transcription or input audio buffer of the user turn
if self._activity is None:
Expand Down Expand Up @@ -1525,6 +1557,11 @@ def _update_agent_state(
def _update_user_state(
self, state: UserState, *, last_speaking_time: float | None = None
) -> None:
# pinned to "speaking" while a `claim_user_turn` is active; voice
# transitions are recoverable from `_user_silence_event` on release
if self._user_turn_claims > 0 and state != "speaking":
return

if self._user_state == state:
return

Expand Down
5 changes: 3 additions & 2 deletions livekit-agents/livekit/agents/voice/room_io/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ class NoiseCancellationParams:


async def _default_text_input_cb(sess: AgentSession, ev: TextInputEvent) -> None:
await sess.interrupt()
sess.generate_reply(user_input=ev.text)
async with sess.claim_user_turn():
await sess.interrupt()
sess.generate_reply(user_input=ev.text)


@dataclass
Expand Down
Loading