From f211a9ea9e468468a86db1a7ffc81c8acdc0323d Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Bind Date: Mon, 7 Jul 2025 23:46:46 +0530 Subject: [PATCH 1/2] feat(server): Improve event consumer error handling --- src/a2a/server/events/event_consumer.py | 5 +++++ tests/server/events/test_event_consumer.py | 23 +++++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 1fe5c3f36..c3df3a237 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -4,6 +4,8 @@ from collections.abc import AsyncGenerator +from pydantic import ValidationError + from a2a.server.events.event_queue import Event, EventQueue from a2a.types import ( InternalError, @@ -138,6 +140,9 @@ async def consume_all(self) -> AsyncGenerator[Event]: # python 3.12 and get a queue empty error on an open queue if self.queue.is_closed(): break + except ValidationError as e: + logger.error(f"Invalid event format received: {e}") + continue except Exception as e: logger.error( f'Stopping event consumption due to exception: {e}' diff --git a/tests/server/events/test_event_consumer.py b/tests/server/events/test_event_consumer.py index 9afad3632..01d9fa152 100644 --- a/tests/server/events/test_event_consumer.py +++ b/tests/server/events/test_event_consumer.py @@ -1,9 +1,9 @@ import asyncio - from typing import Any from unittest.mock import AsyncMock, MagicMock, patch import pytest +from pydantic import ValidationError from a2a.server.events.event_consumer import EventConsumer, QueueClosed from a2a.server.events.event_queue import EventQueue @@ -343,3 +343,24 @@ def test_agent_task_callback_no_exception(event_consumer: EventConsumer): assert event_consumer._exception is None # Should remain None mock_task.exception.assert_called_once() + + +@pytest.mark.asyncio +async def test_consume_all_handles_validation_error( + event_consumer: EventConsumer, mock_event_queue: AsyncMock +): + """Test that consume_all gracefully handles a pydantic.ValidationError.""" + # Simulate dequeue_event raising a ValidationError + mock_event_queue.dequeue_event.side_effect = [ + ValidationError.from_exception_data(title="Test Error", line_errors=[]), + asyncio.CancelledError, # To stop the loop for the test + ] + + with patch("a2a.server.events.event_consumer.logger.error") as logger_error_mock: + with pytest.raises(asyncio.CancelledError): + async for _ in event_consumer.consume_all(): + pass + + # Check that the specific error was logged and the consumer continued + logger_error_mock.assert_called_once() + assert "Invalid event format received" in logger_error_mock.call_args[0][0] From 88fce4b90a89606e150b056fbf380c4926198076 Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Bind Date: Mon, 7 Jul 2025 23:53:56 +0530 Subject: [PATCH 2/2] feat(server): Improve event consumer error handling --- tests/server/events/test_event_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/events/test_event_consumer.py b/tests/server/events/test_event_consumer.py index 01d9fa152..4765e82c4 100644 --- a/tests/server/events/test_event_consumer.py +++ b/tests/server/events/test_event_consumer.py @@ -353,7 +353,7 @@ async def test_consume_all_handles_validation_error( # Simulate dequeue_event raising a ValidationError mock_event_queue.dequeue_event.side_effect = [ ValidationError.from_exception_data(title="Test Error", line_errors=[]), - asyncio.CancelledError, # To stop the loop for the test + asyncio.CancelledError # To stop the loop for the test ] with patch("a2a.server.events.event_consumer.logger.error") as logger_error_mock: