diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 90bd50628c3..dc3bdc567f0 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -1171,7 +1171,21 @@ async def requests_iter(): yield req async def perform_write(): - responses = await self.write_client.append_rows(requests_iter()) + # The AppendRows streaming RPC does not auto-populate the + # request-routing header, so writes to any region other than + # the US multiregion fail with a "session not found" / + # stream-not-found error. Set the routing header explicitly + # (same as google.cloud.bigquery_storage_v1.writer) so the + # request reaches the region that owns the write stream. + responses = await self.write_client.append_rows( + requests_iter(), + metadata=( + ( + "x-goog-request-params", + f"write_stream={self.write_stream}", + ), + ), + ) async for response in responses: error = getattr(response, "error", None) error_code = getattr(error, "code", None) diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index c6d47539d14..d91398cf9a8 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -635,6 +635,55 @@ async def test_event_denylist( await asyncio.sleep(0.01) mock_write_client.append_rows.assert_called_once() + @pytest.mark.asyncio + async def test_append_rows_sets_regional_routing_header( + self, + mock_write_client, + callback_context, + mock_auth_default, + mock_bq_client, + mock_to_arrow_schema, + dummy_arrow_schema, + mock_asyncio_to_thread, + ): + """Regression test for cross-region writes (issue #262). + + The Storage Write API streaming AppendRows RPC does not + auto-populate the request-routing header, so writes to a dataset + outside the US multiregion (e.g. northamerica-northeast1) fail with + a "session not found" / stream-not-found error unless the header is + set explicitly. Assert the header is passed to append_rows so the + request reaches the region that owns the write stream. + """ + _ = mock_auth_default + _ = mock_bq_client + config = bigquery_agent_analytics_plugin.BigQueryLoggerConfig() + async with managed_plugin( + PROJECT_ID, + DATASET_ID, + table_id=TABLE_ID, + config=config, + location="northamerica-northeast1", + ) as plugin: + await plugin._ensure_started() + mock_write_client.append_rows.reset_mock() + llm_request = llm_request_lib.LlmRequest( + model="gemini-pro", + contents=[types.Content(parts=[types.Part(text="Prompt")])], + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(callback_context) + await plugin.before_model_callback( + callback_context=callback_context, llm_request=llm_request + ) + await asyncio.sleep(0.01) # Allow background task to run + mock_write_client.append_rows.assert_called_once() + metadata = mock_write_client.append_rows.call_args.kwargs.get("metadata") + assert metadata is not None, "append_rows must receive routing metadata" + assert ( + "x-goog-request-params", + f"write_stream={DEFAULT_STREAM_NAME}", + ) in tuple(metadata) + @pytest.mark.asyncio async def test_content_formatter( self,