From 159f185f48934b71c3a43e5f38b0a7f66901a73a Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Sat, 30 May 2026 01:15:49 -0700 Subject: [PATCH] fix(bigquery-analytics): route Storage Write API appends to the dataset's region MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Storage Write API `AppendRows` streaming RPC does not auto-populate the request-routing header, so the plugin's writes were always routed to the US multiregion. Writes to a dataset in any other region (e.g. northamerica-northeast1) failed with a "session not found" / stream-not-found error and no rows were ever written — which surfaced to users as session_id (and every other column) failing to propagate. Set the `x-goog-request-params: write_stream=` routing header on the append_rows call, matching what google.cloud.bigquery_storage_v1.writer does internally, so requests reach the region that owns the write stream. US-multiregion behavior is unchanged. Adds a regression test asserting the routing header is passed. Fixes GoogleCloudPlatform/BigQuery-Agent-Analytics-SDK#262 --- .../bigquery_agent_analytics_plugin.py | 16 +++++- .../test_bigquery_agent_analytics_plugin.py | 49 +++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 90bd50628c3..dc3bdc567f0 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -1171,7 +1171,21 @@ async def requests_iter(): yield req async def perform_write(): - responses = await self.write_client.append_rows(requests_iter()) + # The AppendRows streaming RPC does not auto-populate the + # request-routing header, so writes to any region other than + # the US multiregion fail with a "session not found" / + # stream-not-found error. Set the routing header explicitly + # (same as google.cloud.bigquery_storage_v1.writer) so the + # request reaches the region that owns the write stream. + responses = await self.write_client.append_rows( + requests_iter(), + metadata=( + ( + "x-goog-request-params", + f"write_stream={self.write_stream}", + ), + ), + ) async for response in responses: error = getattr(response, "error", None) error_code = getattr(error, "code", None) diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index c6d47539d14..d91398cf9a8 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -635,6 +635,55 @@ async def test_event_denylist( await asyncio.sleep(0.01) mock_write_client.append_rows.assert_called_once() + @pytest.mark.asyncio + async def test_append_rows_sets_regional_routing_header( + self, + mock_write_client, + callback_context, + mock_auth_default, + mock_bq_client, + mock_to_arrow_schema, + dummy_arrow_schema, + mock_asyncio_to_thread, + ): + """Regression test for cross-region writes (issue #262). + + The Storage Write API streaming AppendRows RPC does not + auto-populate the request-routing header, so writes to a dataset + outside the US multiregion (e.g. northamerica-northeast1) fail with + a "session not found" / stream-not-found error unless the header is + set explicitly. Assert the header is passed to append_rows so the + request reaches the region that owns the write stream. + """ + _ = mock_auth_default + _ = mock_bq_client + config = bigquery_agent_analytics_plugin.BigQueryLoggerConfig() + async with managed_plugin( + PROJECT_ID, + DATASET_ID, + table_id=TABLE_ID, + config=config, + location="northamerica-northeast1", + ) as plugin: + await plugin._ensure_started() + mock_write_client.append_rows.reset_mock() + llm_request = llm_request_lib.LlmRequest( + model="gemini-pro", + contents=[types.Content(parts=[types.Part(text="Prompt")])], + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(callback_context) + await plugin.before_model_callback( + callback_context=callback_context, llm_request=llm_request + ) + await asyncio.sleep(0.01) # Allow background task to run + mock_write_client.append_rows.assert_called_once() + metadata = mock_write_client.append_rows.call_args.kwargs.get("metadata") + assert metadata is not None, "append_rows must receive routing metadata" + assert ( + "x-goog-request-params", + f"write_stream={DEFAULT_STREAM_NAME}", + ) in tuple(metadata) + @pytest.mark.asyncio async def test_content_formatter( self,