Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations

import os
from collections.abc import Awaitable
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Any, Literal, cast

Expand Down Expand Up @@ -244,27 +244,31 @@ def chat(
content[-1]["cache_control"] = CACHE_CONTROL_EPHEMERAL # type: ignore
break

if beta_flag:
stream = self._client.beta.messages.create(
betas=[beta_flag],
messages=messages, # type: ignore[arg-type]
model=self._opts.model,
stream=True,
timeout=conn_options.timeout,
**extra,
)
else:
stream = self._client.messages.create(
messages=messages,
model=self._opts.model,
stream=True,
timeout=conn_options.timeout,
**extra,
)
async def create_anthropic_stream() -> anthropic.AsyncStream[
anthropic.types.RawMessageStreamEvent
]:
if beta_flag:
stream = await self._client.beta.messages.create(
betas=[beta_flag],
messages=messages, # type: ignore[arg-type]
model=self._opts.model,
stream=True,
timeout=conn_options.timeout,
**extra,
)
else:
stream = await self._client.messages.create(
messages=messages,
model=self._opts.model,
stream=True,
timeout=conn_options.timeout,
**extra,
)
return cast(anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent], stream)

return LLMStream(
self,
anthropic_stream=stream, # type: ignore[arg-type]
create_anthropic_stream=create_anthropic_stream,
chat_ctx=chat_ctx,
tools=tools or [],
conn_options=conn_options,
Expand All @@ -276,16 +280,15 @@ def __init__(
self,
llm: LLM,
*,
anthropic_stream: Awaitable[anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]],
create_anthropic_stream: Callable[
[], Awaitable[anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]]
],
chat_ctx: llm.ChatContext,
tools: list[Tool],
conn_options: APIConnectOptions,
) -> None:
super().__init__(llm, chat_ctx=chat_ctx, tools=tools, conn_options=conn_options)
self._awaitable_anthropic_stream = anthropic_stream
self._anthropic_stream: (
anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent] | None
) = None
self._create_anthropic_stream = create_anthropic_stream

# current function call that we're waiting for full completion (args are streamed)
self._tool_call_id: str | None = None
Expand All @@ -302,10 +305,7 @@ def __init__(
async def _run(self) -> None:
retryable = True
try:
if not self._anthropic_stream:
self._anthropic_stream = await self._awaitable_anthropic_stream

async with self._anthropic_stream as stream:
async with await self._create_anthropic_stream() as stream:
Comment on lines 305 to +308
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Instance state not reset between retries in _run(), causing stale _ignoring_cot and _tool_call_id on retry

Now that retries actually work (the core purpose of this PR), _run() can be called multiple times by _main_task() (livekit-agents/livekit/agents/llm/llm.py:215-223). However, _run() does not reset instance state variables (_tool_call_id, _fnc_name, _fnc_raw_arguments, _ignoring_cot, _input_tokens, _output_tokens, etc.) at the start of each attempt. If a first attempt partially processes stream events before failing with a retryable error, the stale state carries into the retry.

Two concrete failure modes:

  1. _ignoring_cot stuck True: If the first attempt receives a <thinking> text delta (line 367-368) before failing, _ignoring_cot stays True on retry, silently dropping all text content until a </thinking> tag appears — which may never happen in the retry's fresh response.
  2. Stale _tool_call_id causes bogus tool calls: If the first attempt receives a content_block_start with type tool_use (lines 356-359) before failing, _tool_call_id remains set on retry. When the retry's text block hits content_block_stop (line 385), the if self._tool_call_id is not None check is True, emitting a spurious tool call with stale name/arguments.

Both of these are retryable scenarios (no ChatChunk was emitted, so retryable stays True). Other plugins like Mistral (livekit-plugins/livekit-plugins-mistralai/livekit/plugins/mistralai/llm.py:222-224) and OpenAI (livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/responses/llm.py:402-403) reset their per-attempt state at the top of _run().

Suggested change
async def _run(self) -> None:
retryable = True
try:
if not self._anthropic_stream:
self._anthropic_stream = await self._awaitable_anthropic_stream
async with self._anthropic_stream as stream:
async with await self._create_anthropic_stream() as stream:
async def _run(self) -> None:
# Reset per-attempt state so retries start clean
self._tool_call_id = None
self._fnc_name = None
self._fnc_raw_arguments = None
self._request_id = ""
self._ignoring_cot = False
self._input_tokens = 0
self._cache_creation_tokens = 0
self._cache_read_tokens = 0
self._output_tokens = 0
retryable = True
try:
async with await self._create_anthropic_stream() as stream:
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

async for event in stream:
chat_chunk = self._parse_event(event)
if chat_chunk is not None:
Expand Down
48 changes: 48 additions & 0 deletions tests/test_plugin_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from __future__ import annotations

import httpx
import pytest

from livekit.agents import APIConnectOptions, llm
from livekit.plugins.anthropic.llm import LLMStream


def _make_llm(**kwargs):
Expand Down Expand Up @@ -57,3 +61,47 @@ def test_explicit_client_bypasses_timeout_param(self) -> None:
# timeout= argument should have no effect here
llm = _make_llm(client=tight_client, timeout=httpx.Timeout(5.0, read=999.0))
assert llm._client._client.timeout.read == 1.0


class _EmptyAnthropicStream:
async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
return None

def __aiter__(self):
return self

async def __anext__(self):
raise StopAsyncIteration


class TestAnthropicStreamRetry:
@pytest.mark.asyncio
async def test_retry_creates_a_fresh_stream_awaitable(self) -> None:
calls = 0

async def failing_stream():
raise RuntimeError("transient connect failure")

async def empty_stream():
return _EmptyAnthropicStream()

def create_stream():
nonlocal calls
calls += 1
return failing_stream() if calls == 1 else empty_stream()

stream = LLMStream(
_make_llm(),
create_anthropic_stream=create_stream,
chat_ctx=llm.ChatContext.empty(),
tools=[],
conn_options=APIConnectOptions(max_retry=1, retry_interval=0),
)

response = await stream.collect()

assert calls == 2
assert response.usage is not None
Loading