From bba5b4e01ecbcda11656bce3ba62dd5e866f641c Mon Sep 17 00:00:00 2001 From: sven Date: Fri, 22 Aug 2025 21:14:56 +0900 Subject: [PATCH 1/5] fix: resolve streaming endpoint deadlock by pre-consuming request body - Fix infinite hang issue in /v1/message:stream endpoint - Pre-consume request.body() in _handle_streaming_request to prevent deadlock - EventSourceResponse context was causing request.body() consumption to block - Add comprehensive error handling for body consumption failures - Add regression tests for streaming endpoint request body handling Fixes deadlock where request.body() consumption inside EventSourceResponse context caused the event loop to hang indefinitely. Breaking changes: None Backward compatibility: Maintained --- src/a2a/server/apps/rest/rest_adapter.py | 7 + .../server/apps/rest/test_rest_fastapi_app.py | 130 ++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/src/a2a/server/apps/rest/rest_adapter.py b/src/a2a/server/apps/rest/rest_adapter.py index 0860825bf..6203a9e51 100644 --- a/src/a2a/server/apps/rest/rest_adapter.py +++ b/src/a2a/server/apps/rest/rest_adapter.py @@ -120,6 +120,13 @@ async def _handle_streaming_request( method: Callable[[Request, ServerCallContext], AsyncIterable[Any]], request: Request, ) -> EventSourceResponse: + # Pre-consume and cache the request body to prevent deadlock in streaming context + # This is required because Starlette's request.body() can only be consumed once, + # and attempting to consume it after EventSourceResponse starts causes deadlock + try: + await request.body() + except (ValueError, RuntimeError, OSError) as e: + logger.warning(f'Failed to pre-consume request body: {e}') call_context = self._context_builder.build(request) async def event_generator( diff --git a/tests/server/apps/rest/test_rest_fastapi_app.py b/tests/server/apps/rest/test_rest_fastapi_app.py index c5ea89c40..0f4603e64 100644 --- a/tests/server/apps/rest/test_rest_fastapi_app.py +++ b/tests/server/apps/rest/test_rest_fastapi_app.py @@ -34,6 +34,27 @@ async def agent_card() -> AgentCard: mock_agent_card = MagicMock(spec=AgentCard) mock_agent_card.url = 'http://mockurl.com' mock_agent_card.supports_authenticated_extended_card = False + + # Mock the capabilities object with streaming disabled + mock_capabilities = MagicMock() + mock_capabilities.streaming = False + mock_agent_card.capabilities = mock_capabilities + + return mock_agent_card + + +@pytest.fixture +async def streaming_agent_card() -> AgentCard: + """Agent card that supports streaming for testing streaming endpoints.""" + mock_agent_card = MagicMock(spec=AgentCard) + mock_agent_card.url = 'http://mockurl.com' + mock_agent_card.supports_authenticated_extended_card = False + + # Mock the capabilities object with streaming enabled + mock_capabilities = MagicMock() + mock_capabilities.streaming = True + mock_agent_card.capabilities = mock_capabilities + return mock_agent_card @@ -42,6 +63,25 @@ async def request_handler() -> RequestHandler: return MagicMock(spec=RequestHandler) +@pytest.fixture +async def streaming_app( + streaming_agent_card: AgentCard, request_handler: RequestHandler +) -> FastAPI: + """Builds the FastAPI application for testing streaming endpoints.""" + + return A2ARESTFastAPIApplication( + streaming_agent_card, request_handler + ).build(agent_card_url='/well-known/agent.json', rpc_url='') + + +@pytest.fixture +async def streaming_client(streaming_app: FastAPI) -> AsyncClient: + """HTTP client for the streaming FastAPI application.""" + return AsyncClient( + transport=ASGITransport(app=streaming_app), base_url='http://test' + ) + + @pytest.fixture async def app( agent_card: AgentCard, request_handler: RequestHandler @@ -222,5 +262,95 @@ async def test_send_message_success_task( assert expected_response == actual_response +@pytest.mark.anyio +async def test_streaming_message_request_body_consumption( + streaming_client: AsyncClient, request_handler: MagicMock +) -> None: + """Test that streaming endpoint properly handles request body consumption. + + This test verifies the fix for the deadlock issue where request.body() + was being consumed inside the EventSourceResponse context, causing + the application to hang indefinitely. + """ + + # Mock the async generator response from the request handler + async def mock_stream_response(): + """Mock streaming response generator.""" + yield Message( + message_id='stream_msg_1', + role=Role.agent, + parts=[Part(TextPart(text='First streaming response'))], + ) + yield Message( + message_id='stream_msg_2', + role=Role.agent, + parts=[Part(TextPart(text='Second streaming response'))], + ) + + request_handler.on_message_send_stream.return_value = mock_stream_response() + + # Create a valid streaming request + request = a2a_pb2.SendMessageRequest( + request=a2a_pb2.Message( + message_id='test_stream_msg', + role=a2a_pb2.ROLE_USER, + content=[a2a_pb2.Part(text='Test streaming message')], + ), + configuration=a2a_pb2.SendMessageConfiguration(), + ) + + # This should not hang indefinitely (previously it would due to the deadlock) + response = await streaming_client.post( + '/v1/message:stream', + json=json_format.MessageToDict(request), + headers={'Accept': 'text/event-stream'}, + timeout=10.0, # Reasonable timeout to prevent hanging in tests + ) + + # The response should be successful + response.raise_for_status() + assert response.status_code == 200 + assert 'text/event-stream' in response.headers.get('content-type', '') + + # Verify that the request handler was called + request_handler.on_message_send_stream.assert_called_once() + + +@pytest.mark.anyio +async def test_streaming_endpoint_with_invalid_content_type( + streaming_client: AsyncClient, request_handler: MagicMock +) -> None: + """Test streaming endpoint behavior with invalid content type.""" + + async def mock_stream_response(): + yield Message( + message_id='stream_msg_1', + role=Role.agent, + parts=[Part(TextPart(text='Response'))], + ) + + request_handler.on_message_send_stream.return_value = mock_stream_response() + + request = a2a_pb2.SendMessageRequest( + request=a2a_pb2.Message( + message_id='test_stream_msg', + role=a2a_pb2.ROLE_USER, + content=[a2a_pb2.Part(text='Test message')], + ), + configuration=a2a_pb2.SendMessageConfiguration(), + ) + + # Send request without proper event-stream headers + response = await streaming_client.post( + '/v1/message:stream', + json=json_format.MessageToDict(request), + timeout=10.0, + ) + + # Should still succeed (the adapter handles content-type internally) + response.raise_for_status() + assert response.status_code == 200 + + if __name__ == '__main__': pytest.main([__file__]) From fda49ce1f5d70aa172dc07b929757803c76828a1 Mon Sep 17 00:00:00 2001 From: sven Date: Fri, 22 Aug 2025 21:26:33 +0900 Subject: [PATCH 2/5] feat: add server error --- src/a2a/server/apps/rest/rest_adapter.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/a2a/server/apps/rest/rest_adapter.py b/src/a2a/server/apps/rest/rest_adapter.py index 6203a9e51..41c374679 100644 --- a/src/a2a/server/apps/rest/rest_adapter.py +++ b/src/a2a/server/apps/rest/rest_adapter.py @@ -39,7 +39,7 @@ rest_error_handler, rest_stream_error_handler, ) -from a2a.utils.errors import ServerError +from a2a.utils.errors import InvalidRequestError, ServerError logger = logging.getLogger(__name__) @@ -126,7 +126,13 @@ async def _handle_streaming_request( try: await request.body() except (ValueError, RuntimeError, OSError) as e: - logger.warning(f'Failed to pre-consume request body: {e}') + logger.warning('Failed to pre-consume request body: %s', e) + raise ServerError( + error=InvalidRequestError( + message=f'Failed to pre-consume request body: {e}' + ) + ) + call_context = self._context_builder.build(request) async def event_generator( From ce0c04fbcfd1b36dd159f662366d5c738aeab836 Mon Sep 17 00:00:00 2001 From: sven Date: Fri, 22 Aug 2025 21:27:42 +0900 Subject: [PATCH 3/5] fix: linter error --- src/a2a/server/apps/rest/rest_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/a2a/server/apps/rest/rest_adapter.py b/src/a2a/server/apps/rest/rest_adapter.py index 41c374679..a72c66705 100644 --- a/src/a2a/server/apps/rest/rest_adapter.py +++ b/src/a2a/server/apps/rest/rest_adapter.py @@ -131,7 +131,7 @@ async def _handle_streaming_request( error=InvalidRequestError( message=f'Failed to pre-consume request body: {e}' ) - ) + ) from e call_context = self._context_builder.build(request) From 3eff685aca9947f3546cdf56eb20a20c24f70472 Mon Sep 17 00:00:00 2001 From: youngchannel Date: Sat, 23 Aug 2025 12:10:29 +0900 Subject: [PATCH 4/5] Update tests/server/apps/rest/test_rest_fastapi_app.py Co-authored-by: Holt Skinner <13262395+holtskinner@users.noreply.github.com> --- tests/server/apps/rest/test_rest_fastapi_app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/apps/rest/test_rest_fastapi_app.py b/tests/server/apps/rest/test_rest_fastapi_app.py index 0f4603e64..3010c3a56 100644 --- a/tests/server/apps/rest/test_rest_fastapi_app.py +++ b/tests/server/apps/rest/test_rest_fastapi_app.py @@ -71,7 +71,7 @@ async def streaming_app( return A2ARESTFastAPIApplication( streaming_agent_card, request_handler - ).build(agent_card_url='/well-known/agent.json', rpc_url='') + ).build(agent_card_url='/well-known/agent-card.json', rpc_url='') @pytest.fixture From 1f5e1804698cbf823b7c7db5cb409096ebbe382c Mon Sep 17 00:00:00 2001 From: Holt Skinner <13262395+holtskinner@users.noreply.github.com> Date: Mon, 25 Aug 2025 19:52:19 +0100 Subject: [PATCH 5/5] Update src/a2a/server/apps/rest/rest_adapter.py --- src/a2a/server/apps/rest/rest_adapter.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/a2a/server/apps/rest/rest_adapter.py b/src/a2a/server/apps/rest/rest_adapter.py index a72c66705..c34d10fa4 100644 --- a/src/a2a/server/apps/rest/rest_adapter.py +++ b/src/a2a/server/apps/rest/rest_adapter.py @@ -126,7 +126,6 @@ async def _handle_streaming_request( try: await request.body() except (ValueError, RuntimeError, OSError) as e: - logger.warning('Failed to pre-consume request body: %s', e) raise ServerError( error=InvalidRequestError( message=f'Failed to pre-consume request body: {e}'