Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/google/adk/plugins/bigquery_agent_analytics_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,21 @@ async def requests_iter():
yield req

async def perform_write():
responses = await self.write_client.append_rows(requests_iter())
# The AppendRows streaming RPC does not auto-populate the
# request-routing header, so writes to any region other than
# the US multiregion fail with a "session not found" /
# stream-not-found error. Set the routing header explicitly
# (same as google.cloud.bigquery_storage_v1.writer) so the
# request reaches the region that owns the write stream.
responses = await self.write_client.append_rows(
requests_iter(),
metadata=(
(
"x-goog-request-params",
f"write_stream={self.write_stream}",
),
),
)
async for response in responses:
error = getattr(response, "error", None)
error_code = getattr(error, "code", None)
Expand Down
49 changes: 49 additions & 0 deletions tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,55 @@ async def test_event_denylist(
await asyncio.sleep(0.01)
mock_write_client.append_rows.assert_called_once()

@pytest.mark.asyncio
async def test_append_rows_sets_regional_routing_header(
self,
mock_write_client,
callback_context,
mock_auth_default,
mock_bq_client,
mock_to_arrow_schema,
dummy_arrow_schema,
mock_asyncio_to_thread,
):
"""Regression test for cross-region writes (issue #262).

The Storage Write API streaming AppendRows RPC does not
auto-populate the request-routing header, so writes to a dataset
outside the US multiregion (e.g. northamerica-northeast1) fail with
a "session not found" / stream-not-found error unless the header is
set explicitly. Assert the header is passed to append_rows so the
request reaches the region that owns the write stream.
"""
_ = mock_auth_default
_ = mock_bq_client
config = bigquery_agent_analytics_plugin.BigQueryLoggerConfig()
async with managed_plugin(
PROJECT_ID,
DATASET_ID,
table_id=TABLE_ID,
config=config,
location="northamerica-northeast1",
) as plugin:
await plugin._ensure_started()
mock_write_client.append_rows.reset_mock()
llm_request = llm_request_lib.LlmRequest(
model="gemini-pro",
contents=[types.Content(parts=[types.Part(text="Prompt")])],
)
bigquery_agent_analytics_plugin.TraceManager.push_span(callback_context)
await plugin.before_model_callback(
callback_context=callback_context, llm_request=llm_request
)
await asyncio.sleep(0.01) # Allow background task to run
mock_write_client.append_rows.assert_called_once()
metadata = mock_write_client.append_rows.call_args.kwargs.get("metadata")
assert metadata is not None, "append_rows must receive routing metadata"
assert (
"x-goog-request-params",
f"write_stream={DEFAULT_STREAM_NAME}",
) in tuple(metadata)

@pytest.mark.asyncio
async def test_content_formatter(
self,
Expand Down
Loading