Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,81 @@ def _model_disables_prefill(model: str) -> bool:
return any(model.startswith(p) for p in _NO_PREFILL_PATTERNS)


_THINKING_OPEN = "<thinking>"
_THINKING_CLOSE = "</thinking>"


def _partial_tag_suffix_len(text: str, tag: str) -> int:
"""Length of the longest suffix of ``text`` that is a (non-empty) prefix of ``tag``.

Used to hold back the tail of a streamed chunk that might still grow into ``tag``
on the next delta (e.g. ``"</"`` could become ``"</thinking>"``).
"""
for size in range(min(len(text), len(tag) - 1), 0, -1):
if tag.startswith(text[-size:]):
return size
return 0


class _ThinkingTagFilter:
"""Strip ``<thinking>...</thinking>`` chain-of-thought spans from a streamed text.

Anthropic models may emit chain-of-thought wrapped in ``<thinking>`` tags when tools
are provided. The tags can be split across streaming deltas, so a stateful scanner is
required: a naive per-delta check leaves the parser stuck and silently drops the actual
answer (and therefore the audio that would be synthesized from it).
"""

def __init__(self) -> None:
self._inside = False
self._buf = ""

def push(self, text: str) -> str:
self._buf += text
out: list[str] = []

while self._buf:
if not self._inside:
idx = self._buf.find(_THINKING_OPEN)
if idx == -1:
# emit everything except a possible partial opening tag at the tail
hold = _partial_tag_suffix_len(self._buf, _THINKING_OPEN)
if hold:
out.append(self._buf[:-hold])
self._buf = self._buf[-hold:]
else:
out.append(self._buf)
self._buf = ""
break

out.append(self._buf[:idx])
self._buf = self._buf[idx + len(_THINKING_OPEN) :]
self._inside = True
else:
idx = self._buf.find(_THINKING_CLOSE)
if idx == -1:
# still thinking; keep only a possible partial closing tag at the tail
hold = _partial_tag_suffix_len(self._buf, _THINKING_CLOSE)
self._buf = self._buf[-hold:] if hold else ""
break

self._buf = self._buf[idx + len(_THINKING_CLOSE) :]
self._inside = False

return "".join(out)

def flush(self) -> str:
"""Return any buffered text that is not part of a thinking span."""
if self._inside:
self._buf = ""
return ""

# a dangling partial opening tag never completed: it was real text
out = self._buf
self._buf = ""
return out
Comment on lines +112 to +121
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 _ThinkingTagFilter.flush() does not reset _inside flag, causing silent text loss across content blocks

When flush() is called at content_block_stop while _inside is True (i.e., a <thinking> tag was opened but never closed within that block), the method clears _buf but leaves _inside = True. Any subsequent text content blocks in the same stream will have ALL their text silently dropped because push() still treats incoming text as part of a thinking span.

Reproduction trace showing text permanently lost
f = _ThinkingTagFilter()
f.push('<thinking>reasoning...')  # _inside becomes True
f.flush()  # content_block_stop: clears _buf but _inside stays True

# Subsequent text block
f.push('The actual answer')  # returns '' — silently dropped!
f.flush()  # returns '' — answer permanently lost

While this requires the model to emit an unclosed <thinking> tag (uncommon in practice), the consequence is severe when triggered: the assistant's spoken answer is entirely suppressed, producing silence from TTS. The fix should reset self._inside = False inside flush() (or at least when called from the content_block_stop handler at llm.py:467).

Suggested change
def flush(self) -> str:
"""Return any buffered text that is not part of a thinking span."""
if self._inside:
self._buf = ""
return ""
# a dangling partial opening tag never completed: it was real text
out = self._buf
self._buf = ""
return out
def flush(self) -> str:
"""Return any buffered text that is not part of a thinking span."""
if self._inside:
self._buf = ""
self._inside = False
return ""
# a dangling partial opening tag never completed: it was real text
out = self._buf
self._buf = ""
return out
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.



@dataclass
class _LLMOptions:
model: str | ChatModels
Expand Down Expand Up @@ -293,7 +368,8 @@ def __init__(
self._fnc_raw_arguments: str | None = None

self._request_id: str = ""
self._ignoring_cot = False # ignore chain of thought
# strips <thinking> chain-of-thought spans that Claude may emit when tools are set
self._thinking_filter = _ThinkingTagFilter()
self._input_tokens = 0
self._cache_creation_tokens = 0
self._cache_read_tokens = 0
Expand All @@ -312,6 +388,17 @@ async def _run(self) -> None:
self._event_ch.send_nowait(chat_chunk)
retryable = False

# flush any text held back by the thinking-tag filter (e.g. a trailing
# fragment that never completed into a real tag)
if text := self._thinking_filter.flush():
self._event_ch.send_nowait(
llm.ChatChunk(
id=self._request_id,
delta=llm.ChoiceDelta(content=text, role="assistant"),
)
)
retryable = False

# https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching#tracking-cache-performance
prompt_token = (
self._input_tokens + self._cache_creation_tokens + self._cache_read_tokens
Expand Down Expand Up @@ -360,17 +447,10 @@ def _parse_event(self, event: anthropic.types.RawMessageStreamEvent) -> llm.Chat
elif event.type == "content_block_delta":
delta = event.delta
if delta.type == "text_delta":
text = delta.text

if self._tools is not None:
# anthropic may inject COC when using functions
if text.startswith("<thinking>"):
self._ignoring_cot = True
elif self._ignoring_cot and "</thinking>" in text:
text = text.split("</thinking>")[-1]
self._ignoring_cot = False

if self._ignoring_cot:
# anthropic may inject chain-of-thought wrapped in <thinking> tags when
# tools are provided; strip it without dropping the actual answer
text = self._thinking_filter.push(delta.text)
if not text:
return None

return llm.ChatChunk(
Expand All @@ -382,6 +462,13 @@ def _parse_event(self, event: anthropic.types.RawMessageStreamEvent) -> llm.Chat
self._fnc_raw_arguments += delta.partial_json

elif event.type == "content_block_stop":
if self._tool_call_id is None:
# flush any text held back while scanning for a (possibly partial) tag
if text := self._thinking_filter.flush():
return llm.ChatChunk(
id=self._request_id,
delta=llm.ChoiceDelta(content=text, role="assistant"),
)
if self._tool_call_id is not None:
assert self._fnc_name is not None
assert self._fnc_raw_arguments is not None
Expand Down
117 changes: 117 additions & 0 deletions tests/test_plugin_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ def _make_llm(**kwargs):
return LLM(api_key="sk-ant-test", **kwargs)


def _filter_thinking(deltas: list[str]) -> str:
"""Stream ``deltas`` through the thinking-tag filter and return the emitted text."""
from livekit.plugins.anthropic.llm import _ThinkingTagFilter

fltr = _ThinkingTagFilter()
out = [fltr.push(text) for text in deltas]
out.append(fltr.flush())
return "".join(out)


class TestHttpxTimeoutDefaults:
def test_default_read_timeout_is_generous(self) -> None:
"""Default read timeout must accommodate adaptive-thinking pauses (≥30 s)."""
Expand Down Expand Up @@ -57,3 +67,110 @@ def test_explicit_client_bypasses_timeout_param(self) -> None:
# timeout= argument should have no effect here
llm = _make_llm(client=tight_client, timeout=httpx.Timeout(5.0, read=999.0))
assert llm._client._client.timeout.read == 1.0


class TestThinkingTagStripping:
"""When tools are attached, Claude may wrap chain-of-thought in <thinking> tags.

Those tags must be stripped from the streamed text, but the actual assistant
response (the part that feeds TTS) must always be emitted — even when the tags
are split across streaming deltas, which is the normal token-by-token case.
"""

def test_plain_answer_passes_through(self) -> None:
assert _filter_thinking(["Hello", " there", "!"]) == "Hello there!"

def test_thinking_block_stripped_tags_on_own_deltas(self) -> None:
got = _filter_thinking(["<thinking>", "the user said hi", "</thinking>", "Hello there!"])
assert got == "Hello there!"

def test_answer_emitted_when_closing_tag_split_across_deltas(self) -> None:
# regression: the closing tag streamed as "</" + "thinking>" used to leave the
# parser permanently in "ignoring" mode, dropping the entire spoken answer so
# no audio was ever published.
got = _filter_thinking(["<thinking>", "reasoning", "</", "thinking>", "Hello there!"])
assert got == "Hello there!"

def test_answer_emitted_when_opening_tag_split_across_deltas(self) -> None:
got = _filter_thinking(["<", "thinking>", "reasoning", "</thinking>", "Hello there!"])
assert got == "Hello there!"

def test_thinking_block_with_leading_whitespace(self) -> None:
got = _filter_thinking(["\n<thinking>", "reasoning", "</thinking>", "Hi!"])
assert got == "\nHi!"

def test_text_resembling_tag_is_not_dropped(self) -> None:
# text that merely looks like the start of a tag must not be silently swallowed
assert _filter_thinking(["1 < 2 is true"]) == "1 < 2 is true"

def test_partial_opening_tag_at_end_is_flushed(self) -> None:
# a dangling "<th" that never completes into a real tag must not be lost
assert _filter_thinking(["all good", "<th"]) == "all good<th"


class TestParseEventThinkingIntegration:
"""End-to-end check that ``_parse_event`` emits the spoken answer with tools attached."""

async def test_split_closing_tag_still_emits_answer(self) -> None:
import anthropic.types as at

from livekit.agents.llm.chat_context import ChatContext
from livekit.agents.types import DEFAULT_API_CONNECT_OPTIONS
from livekit.plugins.anthropic.llm import LLMStream

async def _never() -> None:
return None

never = _never()
stream = LLMStream(
_make_llm(),
anthropic_stream=never,
chat_ctx=ChatContext.empty(),
tools=["smoke"],
conn_options=DEFAULT_API_CONNECT_OPTIONS,
)
# we only drive _parse_event manually; tear down the background _run task
await stream.aclose()
never.close() # the cancelled _run never awaited it

events = [
at.RawContentBlockStartEvent(
type="content_block_start",
index=0,
content_block=at.TextBlock(type="text", text="", citations=None),
),
at.RawContentBlockDeltaEvent(
type="content_block_delta",
index=0,
delta=at.TextDelta(type="text_delta", text="<thinking>"),
),
at.RawContentBlockDeltaEvent(
type="content_block_delta",
index=0,
delta=at.TextDelta(type="text_delta", text="the user said hi"),
),
at.RawContentBlockDeltaEvent(
type="content_block_delta",
index=0,
delta=at.TextDelta(type="text_delta", text="</"),
),
at.RawContentBlockDeltaEvent(
type="content_block_delta",
index=0,
delta=at.TextDelta(type="text_delta", text="thinking>"),
),
at.RawContentBlockDeltaEvent(
type="content_block_delta",
index=0,
delta=at.TextDelta(type="text_delta", text="Hello there!"),
),
at.RawContentBlockStopEvent(type="content_block_stop", index=0),
]

emitted = ""
for ev in events:
chunk = stream._parse_event(ev)
if chunk is not None and chunk.delta is not None and chunk.delta.content:
emitted += chunk.delta.content

assert emitted == "Hello there!"
Loading