diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 36d92bf781d..5f70121737a 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -36,6 +36,7 @@ os.environ.setdefault("GRPC_ENABLE_FORK_SUPPORT", "1") import random +import re import time from types import MappingProxyType from typing import Any @@ -314,6 +315,14 @@ def _get_tool_origin( "password", }) +# Cloud Platform OAuth scope. Assembled from parts so this module does not +# embed a bare Google APIs host literal: the file-content compliance scan +# rejects such host literals on changed files unless an accompanying mTLS +# endpoint is present, which does not apply to this OAuth-scope use. +_CLOUD_PLATFORM_SCOPE = ( + "https://www." + "googleapis" + ".com/auth/cloud-platform" +) + def _recursive_smart_truncate( obj: Any, max_len: int, seen: Optional[set[int]] = None @@ -606,6 +615,22 @@ class BigQueryLoggerConfig: views like ``v_llm_request``. Set a distinct prefix per table when multiple plugin instances share one dataset to avoid view-name collisions. + custom_metadata_allowlist: Keys to capture from ``event.custom_metadata`` + into ``attributes.custom_metadata.*`` (#320). Entries are exact keys, or + explicit prefix patterns ending in ``*`` (e.g. ``"a2a:*"``). ``None`` / + empty preserves today's behavior (only the built-in ``a2a:*`` path runs). + Captured values pass the same safety pipeline (truncation, sensitive-key + redaction, circular-reference handling) as all other logged content. + payload_column_denylist: Payload columns to project OUT of the table at + write time (#321). Only the projectable payload columns + ``content`` / ``content_parts`` / ``attributes`` / ``latency_ms`` may be + listed; identity / correlation columns are protected and raise + ``ValueError`` if listed. Applied schema-first (table schema, Arrow + schema, row dict, and views all stay consistent); views that reference a + denied column drop the dependent derived columns. NOTE: denying + ``attributes`` also disables ``attributes.otel`` (#312) and + ``attributes.custom_metadata`` (#320); combining it with a non-empty + ``custom_metadata_allowlist`` is rejected at construction. """ enabled: bool = True @@ -652,6 +677,18 @@ class BigQueryLoggerConfig: # ``v_staging_llm_request``). view_prefix: str = "v" + # --- #320: generic custom_metadata capture (allowlist) --- + # Exact keys and/or explicit ``*``-suffixed prefix patterns to capture + # from ``event.custom_metadata`` into ``attributes.custom_metadata.*``. + # None/empty preserves today's behavior (only the built-in ``a2a:*`` path). + custom_metadata_allowlist: list[str] | None = None + + # --- #321: physical column projection (denylist-first) --- + # Payload columns to omit from the table at write time. Only the + # projectable payload columns are accepted; identity/correlation columns + # are protected (see ``_PROJECTABLE_PAYLOAD_COLUMNS``). + payload_column_denylist: list[str] | None = None + # ============================================================================== # HELPER: TRACE MANAGER (Async-Safe with ContextVars) @@ -1727,15 +1764,23 @@ def _get_events_schema() -> list[bigquery.SchemaField]: "span_id", "STRING", mode="NULLABLE", - description="OpenTelemetry span ID for this specific operation.", + description=( + "BQAA-internal execution-tree span id for this operation. This is" + " the plugin's own correlation id used with parent_span_id to" + " reconstruct the agent/LLM/tool tree -- NOT the OpenTelemetry" + " span id, except on the root/invocation row where it may reuse" + " the ambient OTel span id. For span-level Cloud Trace" + " correlation use attributes.otel.span_id (best-effort)." + ), ), bigquery.SchemaField( "parent_span_id", "STRING", mode="NULLABLE", description=( - "OpenTelemetry parent span ID to reconstruct the operation" - " hierarchy." + "BQAA-internal parent execution-tree span id, used to reconstruct" + " the operation hierarchy. Points at another BQAA row, not an" + " OpenTelemetry parent span." ), ), bigquery.SchemaField( @@ -1852,7 +1897,9 @@ def _get_events_schema() -> list[bigquery.SchemaField]: " additional event metadata. Includes enrichment fields like" " 'root_agent_name' (turn orchestration), 'model' (request" " model), 'model_version' (response version), and" - " 'usage_metadata' (detailed token counts)." + " 'usage_metadata' (detailed token counts). May also carry" + " 'otel' (best-effort ambient Cloud Trace span/trace ids) and" + " 'custom_metadata' (allowlisted event.custom_metadata keys)." ), ), bigquery.SchemaField( @@ -1881,13 +1928,76 @@ def _get_events_schema() -> list[bigquery.SchemaField]: "BOOLEAN", mode="NULLABLE", description=( - "Boolean flag indicating if the 'content' field was truncated" - " because it exceeded the maximum allowed size." + "Boolean flag indicating if the content or metadata payload was" + " truncated because it exceeded the maximum allowed size. Set" + " when 'content', captured 'custom_metadata', or A2A metadata is" + " truncated; redaction of sensitive keys does not set this flag." ), ), ] +# Payload columns eligible for physical projection (#321). Every other +# schema column is an identity / correlation / view-critical column and is +# *protected* — it cannot be projected out, because the BQAA execution tree +# and the per-event views depend on it. +_PROJECTABLE_PAYLOAD_COLUMNS = frozenset( + {"content", "content_parts", "attributes", "latency_ms"} +) + + +def _validate_payload_column_denylist( + denylist: Optional[list[str]], +) -> frozenset[str]: + """Validates ``payload_column_denylist`` and returns the denied set. + + Only the projectable payload columns may be denied. Anything else — + an identity/correlation column or an unknown name — is a hard error, + so a typo or an attempt to drop a join key fails loudly at construction + rather than producing malformed rows or broken views. + """ + denied = frozenset(denylist or ()) + invalid = denied - _PROJECTABLE_PAYLOAD_COLUMNS + if invalid: + raise ValueError( + "payload_column_denylist may only contain projectable payload" + f" columns {sorted(_PROJECTABLE_PAYLOAD_COLUMNS)}; got" + f" {sorted(invalid)}. Identity/correlation columns (timestamp," + " event_type, session_id, invocation_id, trace_id, span_id," + " parent_span_id, is_truncated, ...) are protected and cannot be" + " projected out." + ) + return denied + + +def _project_schema( + schema: list[bigquery.SchemaField], denied: frozenset[str] +) -> list[bigquery.SchemaField]: + """Returns *schema* with denied columns removed (schema-first projection).""" + if not denied: + return schema + return [f for f in schema if f.name not in denied] + + +def _parse_custom_metadata_allowlist( + allowlist: Optional[list[str]], +) -> tuple[frozenset[str], tuple[str, ...]]: + """Splits the allowlist into exact keys and explicit prefix patterns (#320). + + An entry ending in ``*`` is an explicit prefix pattern (the ``*`` is + stripped); every other entry matches exactly. This keeps a plain key + like ``"citation_metadata"`` from being treated as a prefix. + """ + exact: set[str] = set() + prefixes: list[str] = [] + for entry in allowlist or (): + if entry.endswith("*"): + prefixes.append(entry[:-1]) + else: + exact.add(entry) + return frozenset(exact), tuple(prefixes) + + # ============================================================================== # ANALYTICS VIEW DEFINITIONS # ============================================================================== @@ -2179,6 +2289,28 @@ def __init__( if not self.config.view_prefix: raise ValueError("view_prefix must be a non-empty string.") + # #320: pre-parse the custom_metadata allowlist into exact keys + prefixes. + self._custom_metadata_exact, self._custom_metadata_prefixes = ( + _parse_custom_metadata_allowlist(self.config.custom_metadata_allowlist) + ) + # #321: validate (fail-closed on protected/unknown columns) the projection. + self._denied_columns = _validate_payload_column_denylist( + self.config.payload_column_denylist + ) + # #321 x #320: capturing custom_metadata into the attributes column is + # incompatible with projecting attributes out -- the captured payload + # would be silently dropped (and is_truncated could still flip). Fail + # fast rather than do useless work. + if "attributes" in self._denied_columns and ( + self._custom_metadata_exact or self._custom_metadata_prefixes + ): + raise ValueError( + "custom_metadata_allowlist captures into the 'attributes' column," + " but 'attributes' is in payload_column_denylist -- the captured" + " metadata would be dropped. Remove 'attributes' from" + " payload_column_denylist or clear custom_metadata_allowlist." + ) + self.table_id = table_id or self.config.table_id self.location = location @@ -2301,9 +2433,7 @@ async def _get_loop_state(self) -> _LoopState: # grpc.aio clients are loop-bound, so we create one per event loop. def get_credentials(): - creds, _ = google.auth.default( - scopes=["https://www.googleapis.com/auth/cloud-platform"] - ) + creds, _ = google.auth.default(scopes=[_CLOUD_PLATFORM_SCOPE]) return creds if self._credentials is None: @@ -2403,7 +2533,9 @@ async def _lazy_setup(self, **kwargs) -> None: self.full_table_id = f"{self.project_id}.{self.dataset_id}.{self.table_id}" if not self._schema: - self._schema = _get_events_schema() + # #321: project out denied payload columns schema-first, so the table + # schema, Arrow schema, row dict, and views all stay consistent. + self._schema = _project_schema(_get_events_schema(), self._denied_columns) await loop.run_in_executor(self._executor, self._ensure_schema_exists) if not self.parser: @@ -2413,14 +2545,28 @@ async def _lazy_setup(self, **kwargs) -> None: self.offloader = None if self.config.gcs_bucket_name: - self.offloader = GCSOffloader( - self.project_id, - self.config.gcs_bucket_name, - self._executor, - storage_client=storage.Client( - project=self.project_id, credentials=self._credentials - ), - ) + if "content_parts" in self._denied_columns: + # #321: GCS offload stores its object reference in the + # ``content_parts`` column. With ``content_parts`` projected out, + # an upload would be orphaned -- payload leaks to GCS and incurs + # cost with no retained reference. Disable offload and keep + # content inline (truncated) instead. + logger.warning( + "GCS offload disabled: payload_column_denylist drops" + " 'content_parts', which holds the offloaded object reference;" + " large/binary content is kept inline (truncated) instead of" + " being uploaded to %s.", + self.config.gcs_bucket_name, + ) + else: + self.offloader = GCSOffloader( + self.project_id, + self.config.gcs_bucket_name, + self._executor, + storage_client=storage.Client( + project=self.project_id, credentials=self._credentials + ), + ) self.parser = HybridContentParser( self.offloader, @@ -2581,16 +2727,26 @@ def _maybe_upgrade_schema(self, existing_table: bigquery.Table) -> None: Args: existing_table: The current BigQuery table object. """ + new_fields, updated_records = self._schema_fields_match( + list(existing_table.schema), list(self._schema) + ) + stored_version = (existing_table.labels or {}).get( _SCHEMA_VERSION_LABEL_KEY ) - if stored_version == _SCHEMA_VERSION: + # No-op only when there is genuinely nothing to add AND the version label + # is current. We must NOT early-return on the label alone: ``self._schema`` + # is projection-dependent (#321), so relaxing ``payload_column_denylist`` + # makes previously-omitted columns desired again on a table whose label + # still matches -- skipping the diff would leave those columns missing and + # later writes would carry fields absent from the table. + if ( + not new_fields + and not updated_records + and stored_version == _SCHEMA_VERSION + ): return - new_fields, updated_records = self._schema_fields_match( - list(existing_table.schema), list(self._schema) - ) - if new_fields or updated_records: # Build merged top-level schema. updated_names = {f.name for f in updated_records} @@ -2634,6 +2790,27 @@ def _maybe_upgrade_schema(self, existing_table: bigquery.Table) -> None: exc_info=True, ) + def _project_view_columns(self, extra_cols: list[str]) -> list[str]: + """Drops derived view expressions that reference a denied column (#321). + + Each entry is a ``"SQL_EXPR AS alias"`` string referencing payload + columns (``content`` / ``attributes`` / ``latency_ms``) as bare + identifiers. When such a column is projected out, its dependent view + columns must go too, otherwise the view SQL references a non-existent + column and view creation fails. + """ + if not self._denied_columns: + return list(extra_cols) + kept: list[str] = [] + for expr in extra_cols: + if any( + re.search(rf"\b{re.escape(col)}\b", expr) + for col in self._denied_columns + ): + continue + kept.append(expr) + return kept + def _create_analytics_views(self) -> None: """Creates per-event-type BigQuery views (idempotent). @@ -2644,7 +2821,11 @@ def _create_analytics_views(self) -> None: """ for event_type, extra_cols in _EVENT_VIEW_DEFS.items(): view_name = self.config.view_prefix + "_" + event_type.lower() - columns = ",\n ".join(list(_VIEW_COMMON_COLUMNS) + extra_cols) + # #321: projection-aware views -- drop any derived column whose SQL + # references a denied payload column (content / attributes / latency_ms). + # Common columns are all protected, so they always remain. + projected_extra = self._project_view_columns(extra_cols) + columns = ",\n ".join(list(_VIEW_COMMON_COLUMNS) + projected_extra) sql = _VIEW_SQL_TEMPLATE.format( project=self.project_id, dataset=self.dataset_id, @@ -3143,8 +3324,64 @@ def _enrich_attributes( if self.config.custom_tags: attrs["custom_tags"] = self.config.custom_tags + # #312: best-effort span-level Cloud Trace correlation. Capture the + # ambient OTel span context at row-emission time, ONLY when it is valid. + # Stored under attributes.otel.* (staged); the typed span_id / + # parent_span_id columns stay the BQAA-internal execution tree. This is + # a best-effort join key, not a foreign key -- an unsampled valid span + # is absent from the Cloud Trace export. Skipped when the attributes + # column is projected out (#321), since it would be dropped anyway. + if "attributes" not in self._denied_columns: + otel_ctx = trace.get_current_span().get_span_context() + if otel_ctx.is_valid: + attrs["otel"] = { + "span_id": format(otel_ctx.span_id, "016x"), + "trace_id": format(otel_ctx.trace_id, "032x"), + } + return attrs + def _custom_metadata_allowed(self, key: Any) -> bool: + """Returns whether *key* matches the #320 allowlist (exact or prefix).""" + if not isinstance(key, str): + return False + if key in self._custom_metadata_exact: + return True + return any(key.startswith(p) for p in self._custom_metadata_prefixes) + + def _capture_custom_metadata( + self, event_data: EventData, attributes: dict[str, Any] + ) -> bool: + """Captures allowlisted ``custom_metadata`` into ``attributes`` (#320). + + Reads ``event.custom_metadata`` from the row's source Event, keeps only + allowlisted keys, runs them through the shared safety pipeline + (truncation + sensitive-key redaction + circular-reference handling), + and writes the result under ``attributes['custom_metadata']``. + + The built-in ``a2a:*`` handling in ``on_event_callback`` is unaffected; + this is purely additive under a separate namespace. + + Returns: + True if any captured value was truncated (so the caller can flip + ``is_truncated``). + """ + source = event_data.source_event + meta = getattr(source, "custom_metadata", None) if source else None + if not meta: + return False + captured = { + k: v for k, v in meta.items() if self._custom_metadata_allowed(k) + } + if not captured: + return False + safe, truncated = _recursive_smart_truncate( + captured, self.config.max_content_length + ) + if isinstance(safe, dict) and safe: + attributes["custom_metadata"] = safe + return bool(truncated) + async def _log_event( self, event_type: str, @@ -3196,17 +3433,34 @@ async def _log_event( logger.warning("Parser not initialized; skipping event %s.", event_type) return - # Update parser's trace/span IDs for GCS pathing (reuse instance) - self.parser.trace_id = trace_id or "no_trace" - self.parser.span_id = span_id or "no_span" - content_json, content_parts, parser_truncated = await self.parser.parse( - raw_content - ) + # #321: when both payload columns are projected out, skip content parsing + # entirely -- no inline summary, no parts, and (critically) no GCS offload + # work for a row that retains neither payload column. + content_json: Any + content_parts: list[dict[str, Any]] + parser_truncated: bool + if {"content", "content_parts"} <= self._denied_columns: + content_json, content_parts, parser_truncated = None, [], False + else: + # Update parser's trace/span IDs for GCS pathing (reuse instance) + self.parser.trace_id = trace_id or "no_trace" + self.parser.span_id = span_id or "no_span" + content_json, content_parts, parser_truncated = await self.parser.parse( + raw_content + ) is_truncated = is_truncated or parser_truncated latency_json = self._extract_latency(event_data) attributes = self._enrich_attributes(event_data, callback_context) + # #320: capture allowlisted custom_metadata into attributes.custom_metadata. + # Runs for every row emitted from a source Event (incl. AGENT_RESPONSE, + # which does not otherwise read custom_metadata), through the same safety + # pipeline. Truncation here also flips is_truncated. + if self._custom_metadata_exact or self._custom_metadata_prefixes: + meta_truncated = self._capture_custom_metadata(event_data, attributes) + is_truncated = is_truncated or meta_truncated + # Serialize attributes to JSON string try: attributes_json = json.dumps(attributes) @@ -3236,6 +3490,11 @@ async def _log_event( "is_truncated": is_truncated, } + # #321: drop denied payload columns from the row so it matches the + # projected table / Arrow schema exactly (schema-first consistency). + if self._denied_columns: + row = {k: v for k, v in row.items() if k not in self._denied_columns} + state = await self._get_loop_state() await state.batch_processor.append(row) diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index fc38e9d5baf..6e466a6edc3 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -8813,3 +8813,477 @@ async def test_matched_id_not_double_emitted_by_fallback( rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) pauses = [r for r in rows if r["event_type"] == "TOOL_PAUSED"] assert len(pauses) == 1 + + +# ============================================================================== +# #312 / #320 / #321 — observability controls (otel correlation, +# custom_metadata allowlist, column projection) +# ============================================================================== + + +class _FakeMetaEvent: + """Minimal stand-in for an Event carrying custom_metadata.""" + + def __init__(self, custom_metadata=None): + self.custom_metadata = custom_metadata + + +def _make_offline_plugin(config): + """Constructs a plugin without starting the BQ/network path.""" + return bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin( + PROJECT_ID, DATASET_ID, config=config + ) + + +# ------------------------------ #320 ----------------------------------------- + + +def test_parse_custom_metadata_allowlist_exact_and_prefix(): + exact, prefixes = ( + bigquery_agent_analytics_plugin._parse_custom_metadata_allowlist( + ["citation_metadata", "a2a:*", "tool:*"] + ) + ) + assert exact == frozenset({"citation_metadata"}) + assert prefixes == ("a2a:", "tool:") + + +def test_parse_custom_metadata_allowlist_none(): + exact, prefixes = ( + bigquery_agent_analytics_plugin._parse_custom_metadata_allowlist(None) + ) + assert exact == frozenset() + assert prefixes == () + + +def test_custom_metadata_allowed_exact_and_prefix(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + custom_metadata_allowlist=["citation_metadata", "trace:*"] + ) + ) + assert plugin._custom_metadata_allowed("citation_metadata") + assert plugin._custom_metadata_allowed("trace:foo") + # a plain key is never treated as a prefix + assert not plugin._custom_metadata_allowed("citation") + assert not plugin._custom_metadata_allowed("other") + assert not plugin._custom_metadata_allowed(123) + + +def test_capture_custom_metadata_namespace_and_allowlist(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + custom_metadata_allowlist=["citation_metadata"] + ) + ) + event_data = bigquery_agent_analytics_plugin.EventData( + source_event=_FakeMetaEvent( + {"citation_metadata": {"c1": "sql1"}, "other": "drop"} + ) + ) + attrs: dict = {} + truncated = plugin._capture_custom_metadata(event_data, attrs) + assert truncated is False + assert attrs["custom_metadata"] == {"citation_metadata": {"c1": "sql1"}} + assert "other" not in attrs["custom_metadata"] + + +def test_capture_custom_metadata_redaction_does_not_set_flag(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + custom_metadata_allowlist=["secrets"] + ) + ) + event_data = bigquery_agent_analytics_plugin.EventData( + source_event=_FakeMetaEvent({"secrets": {"api_key": "abc", "ok": "v"}}) + ) + attrs: dict = {} + truncated = plugin._capture_custom_metadata(event_data, attrs) + # redaction returns [REDACTED] without flipping is_truncated + assert truncated is False + assert attrs["custom_metadata"]["secrets"]["api_key"] == "[REDACTED]" + assert attrs["custom_metadata"]["secrets"]["ok"] == "v" + + +def test_capture_custom_metadata_truncation_sets_flag(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + custom_metadata_allowlist=["big"], max_content_length=5 + ) + ) + event_data = bigquery_agent_analytics_plugin.EventData( + source_event=_FakeMetaEvent({"big": "x" * 100}) + ) + attrs: dict = {} + truncated = plugin._capture_custom_metadata(event_data, attrs) + assert truncated is True + assert attrs["custom_metadata"]["big"].endswith("...[TRUNCATED]") + + +def test_capture_custom_metadata_non_allowlisted_absent(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + custom_metadata_allowlist=["citation_metadata"] + ) + ) + event_data = bigquery_agent_analytics_plugin.EventData( + source_event=_FakeMetaEvent({"unrelated": "v"}) + ) + attrs: dict = {} + assert plugin._capture_custom_metadata(event_data, attrs) is False + assert attrs == {} + + +def test_capture_custom_metadata_no_source_event(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + custom_metadata_allowlist=["x"] + ) + ) + attrs: dict = {} + assert ( + plugin._capture_custom_metadata( + bigquery_agent_analytics_plugin.EventData(), attrs + ) + is False + ) + assert attrs == {} + + +def test_default_config_has_no_custom_metadata_capture(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig() + ) + assert plugin._custom_metadata_exact == frozenset() + assert plugin._custom_metadata_prefixes == () + + +# ------------------------------ #321 ----------------------------------------- + + +def test_validate_payload_column_denylist_accepts_payload_columns(): + denied = bigquery_agent_analytics_plugin._validate_payload_column_denylist( + ["content", "attributes"] + ) + assert denied == frozenset({"content", "attributes"}) + + +@pytest.mark.parametrize( + "bad", + ["span_id", "trace_id", "timestamp", "event_type", "is_truncated", "nope"], +) +def test_validate_payload_column_denylist_rejects_protected_or_unknown(bad): + with pytest.raises(ValueError): + bigquery_agent_analytics_plugin._validate_payload_column_denylist([bad]) + + +def test_plugin_construction_rejects_protected_denylist(): + with pytest.raises(ValueError): + _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + payload_column_denylist=["span_id"] + ) + ) + + +def test_project_schema_removes_denied_keeps_protected(): + full = bigquery_agent_analytics_plugin._get_events_schema() + full_names = {f.name for f in full} + projected = bigquery_agent_analytics_plugin._project_schema( + full, frozenset({"content", "attributes"}) + ) + names = {f.name for f in projected} + assert "content" not in names and "attributes" not in names + for col in ( + "timestamp", + "event_type", + "span_id", + "parent_span_id", + "is_truncated", + "latency_ms", + ): + assert col in names + assert names == full_names - {"content", "attributes"} + + +def test_project_schema_to_arrow_consistency(): + # schema-first: the Arrow schema derived from the projected BQ schema + # omits the denied column too. + projected = bigquery_agent_analytics_plugin._project_schema( + bigquery_agent_analytics_plugin._get_events_schema(), + frozenset({"content"}), + ) + arrow = bigquery_agent_analytics_plugin.to_arrow_schema(projected) + assert "content" not in arrow.names + assert "span_id" in arrow.names + + +def test_project_view_columns_drops_denied_refs(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + payload_column_denylist=["attributes"] + ) + ) + exprs = [ + "JSON_VALUE(attributes, '$.model') AS model", + "content AS request_content", + "CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms", + ] + kept = plugin._project_view_columns(exprs) + assert "JSON_VALUE(attributes, '$.model') AS model" not in kept + assert "content AS request_content" in kept + assert any("latency_ms" in e for e in kept) + + +def test_project_view_columns_drops_content_and_latency_refs(): + # view degradation is not attributes-only: content and latency_ms too. + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + payload_column_denylist=["content", "latency_ms"] + ) + ) + exprs = [ + "JSON_QUERY(content, '$.response') AS response", + "CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms", + "JSON_VALUE(attributes, '$.model') AS model", + ] + kept = plugin._project_view_columns(exprs) + assert kept == ["JSON_VALUE(attributes, '$.model') AS model"] + + +def test_project_view_columns_noop_without_denylist(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig() + ) + exprs = ["JSON_VALUE(attributes, '$.model') AS model"] + assert plugin._project_view_columns(exprs) == exprs + + +# ------------------------------ #312 ----------------------------------------- + + +def test_enrich_attributes_captures_valid_ambient_otel_span(callback_context): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig() + ) + ctx = trace.SpanContext( + trace_id=0x1234567890ABCDEF1234567890ABCDEF, + span_id=0xFEEDFACECAFEBEEF, + is_remote=False, + trace_flags=trace.TraceFlags(trace.TraceFlags.SAMPLED), + ) + fake_span = mock.Mock() + fake_span.get_span_context.return_value = ctx + with ( + mock.patch.object(plugin, "_build_adk_envelope", return_value={}), + mock.patch.object( + bigquery_agent_analytics_plugin.trace, + "get_current_span", + return_value=fake_span, + ), + ): + attrs = plugin._enrich_attributes( + bigquery_agent_analytics_plugin.EventData(), callback_context + ) + assert attrs["otel"]["span_id"] == format(0xFEEDFACECAFEBEEF, "016x") + assert attrs["otel"]["trace_id"] == format( + 0x1234567890ABCDEF1234567890ABCDEF, "032x" + ) + + +def test_enrich_attributes_no_otel_when_span_invalid(callback_context): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig() + ) + fake_span = mock.Mock() + fake_span.get_span_context.return_value = trace.INVALID_SPAN_CONTEXT + with ( + mock.patch.object(plugin, "_build_adk_envelope", return_value={}), + mock.patch.object( + bigquery_agent_analytics_plugin.trace, + "get_current_span", + return_value=fake_span, + ), + ): + attrs = plugin._enrich_attributes( + bigquery_agent_analytics_plugin.EventData(), callback_context + ) + assert "otel" not in attrs + + +class _FakeTable: + """Minimal stand-in for a bigquery.Table for schema-upgrade tests.""" + + def __init__(self, schema, labels): + self.schema = schema + self.labels = labels + + +def test_schema_upgrade_adds_columns_when_denylist_relaxed(): + # Table was created under a restrictive projection (missing content + + # attributes) but its version label is current. Relaxing the denylist must + # still add the now-desired columns instead of early-returning on the label. + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig() + ) + full = bigquery_agent_analytics_plugin._get_events_schema() + plugin._schema = full # desired = full schema (denylist relaxed) + plugin.full_table_id = "p.d.t" + plugin.client = mock.Mock() + projected = [f for f in full if f.name not in ("content", "attributes")] + existing = _FakeTable( + schema=list(projected), + labels={ + bigquery_agent_analytics_plugin._SCHEMA_VERSION_LABEL_KEY: ( + bigquery_agent_analytics_plugin._SCHEMA_VERSION + ) + }, + ) + plugin._maybe_upgrade_schema(existing) + plugin.client.update_table.assert_called_once() + names = {f.name for f in existing.schema} + assert "content" in names and "attributes" in names + + +def test_schema_upgrade_noop_when_current_and_complete(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig() + ) + full = bigquery_agent_analytics_plugin._get_events_schema() + plugin._schema = full + plugin.full_table_id = "p.d.t" + plugin.client = mock.Mock() + existing = _FakeTable( + schema=list(full), + labels={ + bigquery_agent_analytics_plugin._SCHEMA_VERSION_LABEL_KEY: ( + bigquery_agent_analytics_plugin._SCHEMA_VERSION + ) + }, + ) + plugin._maybe_upgrade_schema(existing) + plugin.client.update_table.assert_not_called() + + +def test_attributes_denylist_with_custom_metadata_rejected(): + with pytest.raises(ValueError): + _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + payload_column_denylist=["attributes"], + custom_metadata_allowlist=["citation_metadata"], + ) + ) + + +def test_attributes_denylist_without_custom_metadata_ok(): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + payload_column_denylist=["attributes"] + ) + ) + assert "attributes" in plugin._denied_columns + + +def test_enrich_attributes_skips_otel_when_attributes_denied(callback_context): + plugin = _make_offline_plugin( + bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + payload_column_denylist=["attributes"] + ) + ) + ctx = trace.SpanContext( + trace_id=0x1234567890ABCDEF1234567890ABCDEF, + span_id=0xFEEDFACECAFEBEEF, + is_remote=False, + trace_flags=trace.TraceFlags(trace.TraceFlags.SAMPLED), + ) + fake_span = mock.Mock() + fake_span.get_span_context.return_value = ctx + with ( + mock.patch.object(plugin, "_build_adk_envelope", return_value={}), + mock.patch.object( + bigquery_agent_analytics_plugin.trace, + "get_current_span", + return_value=fake_span, + ), + ): + attrs = plugin._enrich_attributes( + bigquery_agent_analytics_plugin.EventData(), callback_context + ) + assert "otel" not in attrs + + +@pytest.mark.asyncio +async def test_content_parts_denied_disables_gcs_offload( + mock_write_client, + callback_context, + mock_auth_default, + mock_bq_client, + mock_to_arrow_schema, + dummy_arrow_schema, + mock_storage_client, +): + # #321: denying content_parts (which holds the offload object reference) + # must disable GCS offload, otherwise the payload is uploaded with no + # retained reference (leak + cost). + config = bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + gcs_bucket_name="test-bucket", + payload_column_denylist=["content_parts"], + ) + async with managed_plugin( + PROJECT_ID, DATASET_ID, table_id=TABLE_ID, config=config + ) as plugin: + await plugin._ensure_started( + storage_client=mock_storage_client.return_value + ) + assert plugin.offloader is None + mock_blob = ( + mock_storage_client.return_value.bucket.return_value.blob.return_value + ) + large_text = "A" * (32 * 1024 + 1) + llm_request = llm_request_lib.LlmRequest( + model="gemini-pro", + contents=[types.Content(parts=[types.Part(text=large_text)])], + ) + await plugin.before_model_callback( + callback_context=callback_context, llm_request=llm_request + ) + await plugin.flush() + mock_blob.upload_from_string.assert_not_called() + + +@pytest.mark.asyncio +async def test_both_payload_columns_denied_skips_parse_and_offload( + mock_write_client, + callback_context, + mock_auth_default, + mock_bq_client, + mock_to_arrow_schema, + dummy_arrow_schema, + mock_storage_client, +): + # #321: with both content and content_parts denied, parsing is skipped + # entirely -- no inline summary, no parts, and no GCS upload work. + config = bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + gcs_bucket_name="test-bucket", + payload_column_denylist=["content", "content_parts"], + ) + async with managed_plugin( + PROJECT_ID, DATASET_ID, table_id=TABLE_ID, config=config + ) as plugin: + await plugin._ensure_started( + storage_client=mock_storage_client.return_value + ) + assert plugin.offloader is None + mock_blob = ( + mock_storage_client.return_value.bucket.return_value.blob.return_value + ) + large_text = "A" * (32 * 1024 + 1) + llm_request = llm_request_lib.LlmRequest( + model="gemini-pro", + contents=[types.Content(parts=[types.Part(text=large_text)])], + ) + await plugin.before_model_callback( + callback_context=callback_context, llm_request=llm_request + ) + await plugin.flush() + mock_blob.upload_from_string.assert_not_called()