From 5e7e9c28ac6bf9e02f95d6271dd63211b1e6e2fa Mon Sep 17 00:00:00 2001 From: pin <7534292+pintaste@users.noreply.github.com> Date: Wed, 20 May 2026 06:09:47 +0800 Subject: [PATCH] fix: route Bedrock error SSE events through existing error handler Bedrock delivers all event-stream frames over HTTP 200, including error payloads such as rate-limit or model-not-found responses. Previously, `AWSEventStreamDecoder` emitted every decoded frame as `ServerSentEvent(event="completion")`, so a payload like {"type": "error", "error": {"type": "rate_limit_error", ...}} was passed to `_process_response_data` and cast to the stream event union type. The cast produced a `RawMessageStartEvent` whose required `message` field was `None`, causing downstream code to raise `AttributeError: 'NoneType' object has no attribute 'model'` (or `'usage'`, etc.) instead of a clean `APIStatusError`. Fix: add `_sse_event_type()` which peeks at the decoded JSON and returns `"error"` when `type == "error"`. The `Stream.__stream__` / `AsyncStream.__stream__` methods in `_streaming.py` already contain a handler for `sse.event == "error"` that raises `_make_status_error`, so this routes the payload through the correct path without any additional changes to the streaming core. Fixes #1472 --- src/anthropic/lib/bedrock/_stream_decoder.py | 23 ++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/anthropic/lib/bedrock/_stream_decoder.py b/src/anthropic/lib/bedrock/_stream_decoder.py index 02e81a3ca..b3fe0dce0 100644 --- a/src/anthropic/lib/bedrock/_stream_decoder.py +++ b/src/anthropic/lib/bedrock/_stream_decoder.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING, Iterator, AsyncIterator from ..._utils import lru_cache @@ -37,7 +38,7 @@ def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]: for event in event_stream_buffer: message = self._parse_message_from_event(event) if message: - yield ServerSentEvent(data=message, event="completion") + yield ServerSentEvent(data=message, event=self._sse_event_type(message)) async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]: """Given an async iterator that yields lines, iterate over it & yield every event encountered""" @@ -49,7 +50,25 @@ async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[Ser for event in event_stream_buffer: message = self._parse_message_from_event(event) if message: - yield ServerSentEvent(data=message, event="completion") + yield ServerSentEvent(data=message, event=self._sse_event_type(message)) + + def _sse_event_type(self, message: str) -> str: + """Return the SSE event type for a decoded Bedrock message string. + + Bedrock wraps all SSE payloads (including error events) in its binary + event-stream framing and delivers them over HTTP 200. We need to + inspect the ``type`` field of the inner JSON so that error payloads are + surfaced as ``event="error"`` rather than ``event="completion"``. The + standard ``_streaming.py`` error-handling path already converts + ``event="error"`` into the correct ``APIStatusError``. + """ + try: + data = json.loads(message) + if isinstance(data, dict) and data.get("type") == "error": + return "error" + except Exception: + pass + return "completion" def _parse_message_from_event(self, event: EventStreamMessage) -> str | None: response_dict = event.to_response_dict()