Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/anthropic/lib/bedrock/_stream_decoder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Iterator, AsyncIterator

from ..._utils import lru_cache
Expand Down Expand Up @@ -37,7 +38,7 @@ def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
for event in event_stream_buffer:
message = self._parse_message_from_event(event)
if message:
yield ServerSentEvent(data=message, event="completion")
yield ServerSentEvent(data=message, event=self._sse_event_type(message))

async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]:
"""Given an async iterator that yields lines, iterate over it & yield every event encountered"""
Expand All @@ -49,7 +50,25 @@ async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[Ser
for event in event_stream_buffer:
message = self._parse_message_from_event(event)
if message:
yield ServerSentEvent(data=message, event="completion")
yield ServerSentEvent(data=message, event=self._sse_event_type(message))

def _sse_event_type(self, message: str) -> str:
"""Return the SSE event type for a decoded Bedrock message string.

Bedrock wraps all SSE payloads (including error events) in its binary
event-stream framing and delivers them over HTTP 200. We need to
inspect the ``type`` field of the inner JSON so that error payloads are
surfaced as ``event="error"`` rather than ``event="completion"``. The
standard ``_streaming.py`` error-handling path already converts
``event="error"`` into the correct ``APIStatusError``.
"""
try:
data = json.loads(message)
if isinstance(data, dict) and data.get("type") == "error":
return "error"
except Exception:
pass
return "completion"

def _parse_message_from_event(self, event: EventStreamMessage) -> str | None:
response_dict = event.to_response_dict()
Expand Down