Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions livekit-agents/livekit/agents/utils/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,27 @@
merge_frames = rtc.combine_audio_frames


def silence_frame(duration: float, sample_rate: int, num_channels: int = 1) -> rtc.AudioFrame:
"""Create a zeroed ``rtc.AudioFrame`` of the given duration and format."""
samples = int(duration * sample_rate)
return rtc.AudioFrame(
data=b"\x00\x00" * samples * num_channels,
num_channels=num_channels,
samples_per_channel=samples,
sample_rate=sample_rate,
)


def silence_frame_like(frame: rtc.AudioFrame) -> rtc.AudioFrame:
"""Create a zeroed ``rtc.AudioFrame`` matching the shape of ``frame``."""
return rtc.AudioFrame(
data=b"\x00\x00" * frame.samples_per_channel * frame.num_channels,
num_channels=frame.num_channels,
samples_per_channel=frame.samples_per_channel,
sample_rate=frame.sample_rate,
)


def calculate_audio_duration(frames: AudioBuffer) -> float:
"""
Calculate the total duration of audio frames.
Expand Down
17 changes: 11 additions & 6 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,14 +1029,19 @@ def push_audio(self, frame: rtc.AudioFrame) -> None:

should_discard: bool = aec_warmup_active or uninterruptible_speech_active

if not should_discard:
if self._rt_session is not None:
self._rt_session.push_audio(frame)
# When discarding, substitute silence on the paths that would otherwise
# see contaminated/echoed audio (STT, realtime model) so the downstream
# stream stays continuous. VAD, AMD and the interruption detector keep
# receiving the real frame so they can still react to the user.
stt_frame: rtc.AudioFrame | None = None
if should_discard:
stt_frame = utils.audio.silence_frame_like(frame)

if self._rt_session is not None:
self._rt_session.push_audio(stt_frame if stt_frame is not None else frame)

# Always forward to _audio_recognition for VAD, even when discarding STT/LLM
# VAD needs frames to detect speech end and update user state correctly
if self._audio_recognition is not None:
self._audio_recognition.push_audio(frame, skip_stt=should_discard)
self._audio_recognition.push_audio(frame, stt_frame=stt_frame)

def push_video(self, frame: rtc.VideoFrame) -> None:
if not self._started:
Expand Down
24 changes: 12 additions & 12 deletions livekit-agents/livekit/agents/voice/audio_recognition.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,13 +541,19 @@ def _should_hold_stt_event(self, ev: stt.SpeechEvent) -> bool:

return False

def push_audio(self, frame: rtc.AudioFrame, *, skip_stt: bool = False) -> None:
def push_audio(self, frame: rtc.AudioFrame, *, stt_frame: rtc.AudioFrame | None = None) -> None:
"""Forward an audio frame to STT, VAD, AMD and the interruption detector.

When ``stt_frame`` is provided, it is sent to the STT pipeline in place of
``frame`` (e.g. a silence substitute during AEC warmup or uninterruptible
speech). VAD, AMD and the interruption channel always receive ``frame``.
"""
if self._input_started_at is None:
self._input_started_at = time.time() - frame.duration

self._sample_rate = frame.sample_rate
if not skip_stt and self._stt_pipeline is not None:
self._stt_pipeline.audio_ch.send_nowait(frame)
if self._stt_pipeline is not None:
self._stt_pipeline.audio_ch.send_nowait(stt_frame if stt_frame is not None else frame)

if self._vad_ch is not None:
self._vad_ch.send_nowait(frame)
Expand Down Expand Up @@ -737,16 +743,10 @@ async def _commit_user_turn() -> None:

# flush the stt by pushing silence
if audio_detached and self._sample_rate:
num_samples = int(self._sample_rate * 0.2)
silence_frame = rtc.AudioFrame(
b"\x00\x00" * num_samples,
sample_rate=self._sample_rate,
num_channels=1,
samples_per_channel=num_samples,
)
num_frames = max(0, int(math.ceil(stt_flush_duration / silence_frame.duration)))
silence = utils.audio.silence_frame(0.2, self._sample_rate)
num_frames = max(0, int(math.ceil(stt_flush_duration / silence.duration)))
for _ in range(num_frames):
self.push_audio(silence_frame)
self.push_audio(silence)

# wait for the final transcript to be available
try:
Expand Down
11 changes: 1 addition & 10 deletions livekit-agents/livekit/agents/voice/recorder_io/recorder_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from livekit import rtc

from ...log import logger
from ...utils.audio import silence_frame as _create_silence_frame
from .. import io

if TYPE_CHECKING:
Expand Down Expand Up @@ -533,16 +534,6 @@ def clear_buffer(self) -> None:
self.next_in_chain.clear_buffer()


def _create_silence_frame(duration: float, sample_rate: int, num_channels: int) -> rtc.AudioFrame:
samples = int(duration * sample_rate)
return rtc.AudioFrame(
data=b"\x00\x00" * samples * num_channels,
num_channels=num_channels,
samples_per_channel=samples,
sample_rate=sample_rate,
)


def _split_frame(frame: rtc.AudioFrame, position: float) -> tuple[rtc.AudioFrame, rtc.AudioFrame]:
if position <= 0.0:
return rtc.AudioFrame(
Expand Down
9 changes: 3 additions & 6 deletions livekit-agents/livekit/agents/voice/turn.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ class InterruptionOptions(TypedDict, total=False):
speech classified as a backchannel by the adaptive detector is suppressed
(events flagged as interruptions still pass through). Use a tuple to apply
different values for start and end separately. ``None`` disables. Defaults
to ``(1.0, 3.5)``. End value should be higher to account for STT transcript
timestamp inaccuracy."""
to ``(1.0, 1.0)``. End value accounts for STT transcript timestamp
inaccuracy."""


_INTERRUPTION_DEFAULTS: InterruptionOptions = {
Expand All @@ -121,10 +121,7 @@ class InterruptionOptions(TypedDict, total=False):
"min_words": 0,
"resume_false_interruption": True,
"false_interruption_timeout": 2.0,
"backchannel_boundary": (
1.0,
3.5, # higher value for the end as STT timestamps aren't very reliable
),
"backchannel_boundary": (1.0, 1.0),
}


Expand Down
1 change: 1 addition & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ unit-tests:
tests/test_room.py \
tests/test_room_io.py \
tests/test_audio_recognition_handoff.py \
tests/test_audio_recognition_push_audio.py \
tests/test_utils/test_audio_array_buffer.py \
tests/test_utils/test_bounded_dict.py \
tests/test_interruption/test_overlapping_speech_event.py \
Expand Down
75 changes: 75 additions & 0 deletions tests/test_audio_recognition_push_audio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import annotations

from unittest.mock import MagicMock

from livekit import rtc
from livekit.agents.voice.audio_recognition import AudioRecognition


def _make_frame(byte: int = 0x11, samples: int = 160, sample_rate: int = 16000) -> rtc.AudioFrame:
data = bytes([byte, byte]) * samples
return rtc.AudioFrame(
data=data,
sample_rate=sample_rate,
num_channels=1,
samples_per_channel=samples,
)


def _make_recognition() -> AudioRecognition:
"""Build an AudioRecognition stub with just the attributes ``push_audio`` reads."""
ar = object.__new__(AudioRecognition)
ar._input_started_at = None # type: ignore[attr-defined]
ar._sample_rate = None # type: ignore[attr-defined]
ar._stt_pipeline = MagicMock() # type: ignore[attr-defined]
ar._vad_ch = MagicMock() # type: ignore[attr-defined]
ar._interruption_ch = MagicMock() # type: ignore[attr-defined]
ar._session = MagicMock() # type: ignore[attr-defined]
return ar


def test_push_audio_routes_real_frame_everywhere_by_default() -> None:
ar = _make_recognition()
frame = _make_frame()

ar.push_audio(frame)

ar._stt_pipeline.audio_ch.send_nowait.assert_called_once_with(frame)
ar._vad_ch.send_nowait.assert_called_once_with(frame)
ar._session.amd.push_audio.assert_called_once_with(frame)
ar._interruption_ch.send_nowait.assert_called_once_with(frame)


def test_push_audio_substitutes_stt_frame_only_on_stt_path() -> None:
ar = _make_recognition()
real = _make_frame(byte=0x11)
silence = _make_frame(byte=0x00)

ar.push_audio(real, stt_frame=silence)

# STT pipeline sees the substitute (silence), nothing else does.
ar._stt_pipeline.audio_ch.send_nowait.assert_called_once_with(silence)
ar._vad_ch.send_nowait.assert_called_once_with(real)
ar._session.amd.push_audio.assert_called_once_with(real)
ar._interruption_ch.send_nowait.assert_called_once_with(real)


def test_push_audio_skips_optional_consumers_when_unset() -> None:
ar = _make_recognition()
ar._stt_pipeline = None # type: ignore[attr-defined]
ar._vad_ch = None # type: ignore[attr-defined]
ar._interruption_ch = None # type: ignore[attr-defined]
ar._session.amd = None

# Should not raise even when every downstream consumer is absent.
ar.push_audio(_make_frame())


def test_push_audio_records_sample_rate_and_input_start() -> None:
ar = _make_recognition()
frame = _make_frame(sample_rate=24000)

ar.push_audio(frame)

assert ar._sample_rate == 24000 # type: ignore[attr-defined]
assert ar._input_started_at is not None # type: ignore[attr-defined]
Loading