Skip to content

fix(anthropic): recreate stream on retry#5820

Open
he-yufeng wants to merge 1 commit into
livekit:mainfrom
he-yufeng:fix/anthropic-stream-retry
Open

fix(anthropic): recreate stream on retry#5820
he-yufeng wants to merge 1 commit into
livekit:mainfrom
he-yufeng:fix/anthropic-stream-retry

Conversation

@he-yufeng
Copy link
Copy Markdown

Summary

  • create a fresh Anthropic streaming request for each retry attempt
  • avoid re-awaiting the same coroutine after a transient stream creation failure
  • add a no-network regression test for the retry path

Fixes #5805.

Tests

  • uv run pytest tests\test_plugin_anthropic.py -q
  • uv run ruff check livekit-plugins\livekit-plugins-anthropic\livekit\plugins\anthropic\llm.py tests\test_plugin_anthropic.py
  • python -m py_compile livekit-plugins\livekit-plugins-anthropic\livekit\plugins\anthropic\llm.py tests\test_plugin_anthropic.py
  • git diff --check

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@he-yufeng he-yufeng force-pushed the fix/anthropic-stream-retry branch from 2e0c716 to e260b5f Compare May 23, 2026 14:09
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 4 additional findings in Devin Review.

Open in Devin Review

Comment on lines 305 to +308
async def _run(self) -> None:
retryable = True
try:
if not self._anthropic_stream:
self._anthropic_stream = await self._awaitable_anthropic_stream

async with self._anthropic_stream as stream:
async with await self._create_anthropic_stream() as stream:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

πŸ”΄ Instance state not reset between retries in _run(), causing stale _ignoring_cot and _tool_call_id on retry

Now that retries actually work (the core purpose of this PR), _run() can be called multiple times by _main_task() (livekit-agents/livekit/agents/llm/llm.py:215-223). However, _run() does not reset instance state variables (_tool_call_id, _fnc_name, _fnc_raw_arguments, _ignoring_cot, _input_tokens, _output_tokens, etc.) at the start of each attempt. If a first attempt partially processes stream events before failing with a retryable error, the stale state carries into the retry.

Two concrete failure modes:

  1. _ignoring_cot stuck True: If the first attempt receives a <thinking> text delta (line 367-368) before failing, _ignoring_cot stays True on retry, silently dropping all text content until a </thinking> tag appears β€” which may never happen in the retry's fresh response.
  2. Stale _tool_call_id causes bogus tool calls: If the first attempt receives a content_block_start with type tool_use (lines 356-359) before failing, _tool_call_id remains set on retry. When the retry's text block hits content_block_stop (line 385), the if self._tool_call_id is not None check is True, emitting a spurious tool call with stale name/arguments.

Both of these are retryable scenarios (no ChatChunk was emitted, so retryable stays True). Other plugins like Mistral (livekit-plugins/livekit-plugins-mistralai/livekit/plugins/mistralai/llm.py:222-224) and OpenAI (livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/responses/llm.py:402-403) reset their per-attempt state at the top of _run().

Suggested change
async def _run(self) -> None:
retryable = True
try:
if not self._anthropic_stream:
self._anthropic_stream = await self._awaitable_anthropic_stream
async with self._anthropic_stream as stream:
async with await self._create_anthropic_stream() as stream:
async def _run(self) -> None:
# Reset per-attempt state so retries start clean
self._tool_call_id = None
self._fnc_name = None
self._fnc_raw_arguments = None
self._request_id = ""
self._ignoring_cot = False
self._input_tokens = 0
self._cache_creation_tokens = 0
self._cache_read_tokens = 0
self._output_tokens = 0
retryable = True
try:
async with await self._create_anthropic_stream() as stream:
Open in Devin Review

Was this helpful? React with πŸ‘ or πŸ‘Ž to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants