From a01758a56c90db8636e23a3a5e4d9de358b029bf Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Mon, 1 Jun 2026 15:59:22 -0700 Subject: [PATCH] fix(bigquery-analytics): stop exporting plugin-owned OTel spans (#94) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Agent Engine telemetry is enabled (GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY=true) and Cloud Trace export is configured on the global OTel tracer provider, the BigQuery Agent Analytics plugin causes every instrumented operation to appear as TWO spans in Cloud Trace — the framework's real span plus a duplicate plugin-owned span. Reported in GoogleCloudPlatform/BigQuery-Agent-Analytics-SDK#94 (also at haiyuan-eng-google#94) by a regular BQAA + Agent Engine user. Root cause ---------- TraceManager.push_span() called tracer.start_span(...) purely as an ID carrier, so the plugin could populate BigQuery span_id / parent_span_id columns. The author was aware of one related class of pitfall (re-parenting the framework's spans — fixed by not attaching the plugin span to the ambient context, see #4561), but that guard does not stop the plugin span from going through the configured SpanProcessor → BatchSpanProcessor → Cloud Trace exporter. Once Agent Engine wires the global provider to Cloud Trace, every push/pop pair is exported. The plugin already tracked every parent/child relationship on its own contextvar-backed _SpanRecord stack — the OTel span object was incidental to correctness. Fix (scoped to TraceManager methods + _SpanRecord) ------------------------------------------------- * _SpanRecord no longer holds an OTel span object. It carries span_id, trace_id, owns_span, start_time_ns, first_token_time. * push_span generates a 16-hex span_id directly and inherits trace_id by precedence: top of internal stack → ambient OTel span → freshly generated 32-hex. No tracer.start_span call. * attach_current_span extracts trace_id/span_id from the ambient span (the existing path the framework already supports) and stores them as plain strings — no longer holds the OTel object. * pop_span / clear_stack drop the .end()/.start_time machinery since there is no OTel span to end. * get_trace_id / get_start_time read directly from the record. The signatures of push_span / attach_current_span / pop_span / clear_stack / get_trace_id / get_start_time are unchanged. Module-level `tracer = trace.get_tracer(...)` is retained for ABI compat (currently unused by plugin code; can be removed in a follow-up if no external consumers are identified). attach_current_span() is otherwise untouched — it only observed the ambient span; it never created one. That path was already correct and remains so. Cross-system correlation ------------------------ BigQuery rows still carry trace_id, inherited from the ambient Agent Engine / Runner span when present. Joining `agent_events` to Cloud Trace by trace_id continues to work end-to-end. Tests ----- Three existing tests that were directly asserting the bug (test_otel_integration, test_otel_integration_real_provider, test_clear_stack_ends_owned_spans) are rewritten as inverted regression guards: * test_push_pop_does_not_call_tracer_start_span * test_push_pop_does_not_export_spans_through_real_provider * test_clear_stack_does_not_export_spans Each asserts that the corresponding code path does NOT export an OTel span via an InMemorySpanExporter wired to a real provider, or does NOT invoke tracer.start_span via a mock spy. Four new tests added to lock in the lifecycle / inheritance contracts the plugin must keep: * test_push_span_inherits_ambient_trace_id — when an ambient OTel span exists (the Agent Engine pattern), the plugin's trace_id matches it. * test_llm_request_response_share_span_id_contract — the paired LLM_REQUEST / LLM_RESPONSE rows share one span_id (the documented BQAA join contract). * test_tool_starting_completed_share_span_id_contract — same invariant for the tool lifecycle pair. * test_streaming_llm_response_shares_span_id_until_final_contract — multiple partial LLM_RESPONSE callbacks reuse the same span_id and only the final fire pops the span. Prevents a future "dedupe" of streaming rows from breaking the contract. 226/229 plugin tests pass (6 skipped for unrelated optional deps); pyink + isort clean. Refs: - haiyuan-eng-google/BigQuery-Agent-Analytics-SDK#94 (reported by a customer) - google/adk-python#4561 (prior fix for span-hierarchy re-parenting) - google/adk-python#4645 (prior fix for trace_id fracture) --- .../bigquery_agent_analytics_plugin.py | 174 +++++------ .../test_bigquery_agent_analytics_plugin.py | 295 ++++++++++++++---- 2 files changed, 315 insertions(+), 154 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index dc3bdc567f0..0e1dddcec37 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -634,20 +634,35 @@ class BigQueryLoggerConfig: @dataclass class _SpanRecord: - """A single record on the unified span stack. - - Consolidates span, id, ownership, and timing into one object - so all stacks stay in sync by construction. - - Note: The plugin intentionally does NOT attach its spans to the - ambient OTel context (no ``context.attach``). This prevents the - plugin from corrupting the framework's span hierarchy when an - external OTel exporter (e.g. ``opentelemetry-instrumentation-vertexai``) - is active. See https://github.com/google/adk-python/issues/4561. + """A single record on the BQAA plugin's internal span stack. + + Stores the IDs and timing the plugin needs to populate BigQuery + ``span_id`` / ``parent_span_id`` / ``trace_id`` / ``latency_ms`` + columns. Crucially, no OpenTelemetry ``Span`` object is held. + + Background — prior approach and the bug it caused: + The previous implementation created real OTel spans via + ``tracer.start_span(...)`` purely as ID carriers. When the host + application has an OTel exporter configured (notably Agent Engine + with ``GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY=true``), those + plugin-owned spans were exported to Cloud Trace alongside the + framework's real spans — producing a duplicate-span view for + every BQAA-instrumented operation. See haiyuan-eng-google/BQAA-SDK#94. + + The plugin already tracked all parent / child relationships on + this internal stack, so the OTel span object was incidental to + correctness. We now store ``trace_id`` directly on each record + (inherited from the ambient OTel span when present, generated + otherwise) and skip span creation entirely. Cross-system + correlation with Cloud Trace still works via ``trace_id`` + inheritance. + + ``attach_current_span`` (which observes the ambient span without + owning one) is unaffected by this change. """ - span: trace.Span span_id: str + trace_id: str owns_span: bool start_time_ns: int first_token_time: Optional[float] = None @@ -689,17 +704,16 @@ def init_trace(callback_context: CallbackContext) -> None: @staticmethod def get_trace_id(callback_context: CallbackContext) -> Optional[str]: - """Gets the trace ID from the current span or invocation_id.""" + """Gets the trace ID from the current span stack or invocation_id.""" records = _span_records_ctx.get() if records: - current_span = records[-1].span - if current_span.get_span_context().is_valid: - return format(current_span.get_span_context().trace_id, "032x") + return records[-1].trace_id - # Fallback to OTel context - current_span = trace.get_current_span() - if current_span.get_span_context().is_valid: - return format(current_span.get_span_context().trace_id, "032x") + # Fallback to ambient OTel context (e.g. callbacks fired before + # any plugin span was pushed). + ambient_ctx = trace.get_current_span().get_span_context() + if ambient_ctx.is_valid: + return format(ambient_ctx.trace_id, "032x") return callback_context.invocation_id @@ -708,47 +722,48 @@ def push_span( callback_context: CallbackContext, span_name: Optional[str] = "adk-span", ) -> str: - """Starts a new span and pushes it onto the stack. - - The span is created but NOT attached to the ambient OTel context, - so it cannot corrupt the framework's own span hierarchy. The - plugin tracks span_id / parent_span_id internally via its own - contextvar stack. - - If OTel is not configured (returning non-recording spans), a UUID - fallback is generated to ensure span_id and parent_span_id are - populated in BigQuery logs. + """Pushes a BQAA-internal span record onto the stack. + + No OpenTelemetry span is created — see ``_SpanRecord`` for + background. The record carries everything the plugin needs to + populate BigQuery columns: + + * ``span_id`` — newly generated 16-hex string. + * ``trace_id`` — inherited by precedence: + 1. Top of the existing internal stack (keeps every push + within an invocation under one trace_id). + 2. Ambient OTel span when valid (e.g. the framework's Runner + span, or an Agent Engine root span) — keeps BigQuery rows + joinable to Cloud Trace via the shared ``trace_id``. + 3. A fresh 32-hex value (no ambient context, e.g. unit tests + or non-OTel runtimes). + * ``start_time_ns`` — for the eventual ``latency_ms`` on pop. + + ``span_name`` is preserved on the signature for API stability but + is no longer used (no OTel span name is set). """ + del span_name # No-op: kept for API stability; no OTel span is created. TraceManager.init_trace(callback_context) - # Create the span without attaching it to the ambient context. - # This avoids re-parenting framework spans like ``call_llm`` - # or ``execute_tool``. See #4561. - # - # If the internal stack already has a span, create the new span - # as a child so it shares the same trace_id. Without this, each - # ``start_span`` would be an independent root with its own - # trace_id — causing trace_id fracture (see #4645). records = TraceManager._get_records() - parent_ctx = None - if records and records[-1].span.get_span_context().is_valid: - parent_ctx = trace.set_span_in_context(records[-1].span) - span = tracer.start_span(span_name, context=parent_ctx) - - if span.get_span_context().is_valid: - span_id_str = format(span.get_span_context().span_id, "016x") + if records: + trace_id = records[-1].trace_id else: - span_id_str = uuid.uuid4().hex + ambient_ctx = trace.get_current_span().get_span_context() + if ambient_ctx.is_valid: + trace_id = format(ambient_ctx.trace_id, "032x") + else: + trace_id = uuid.uuid4().hex # 32 hex chars + + span_id_str = uuid.uuid4().hex[:16] record = _SpanRecord( - span=span, span_id=span_id_str, + trace_id=trace_id, owns_span=True, start_time_ns=time.time_ns(), ) - - new_records = list(records) + [record] - _span_records_ctx.set(new_records) + _span_records_ctx.set(list(records) + [record]) return span_id_str @@ -756,30 +771,31 @@ def push_span( def attach_current_span( callback_context: CallbackContext, ) -> str: - """Records the current OTel span on the stack without owning it. + """Records the ambient OTel span's IDs on the stack without owning it. - The span is NOT re-attached to the ambient context; it is only - tracked internally for span_id / parent_span_id resolution. + No OTel span is created or attached. This path captures the + ambient span's ``trace_id`` / ``span_id`` so plugin-emitted + BigQuery rows correlate with whatever Cloud Trace / external + exporter the host is already running. """ TraceManager.init_trace(callback_context) - span = trace.get_current_span() - - if span.get_span_context().is_valid: - span_id_str = format(span.get_span_context().span_id, "016x") + ambient_ctx = trace.get_current_span().get_span_context() + if ambient_ctx.is_valid: + span_id_str = format(ambient_ctx.span_id, "016x") + trace_id = format(ambient_ctx.trace_id, "032x") else: - span_id_str = uuid.uuid4().hex + span_id_str = uuid.uuid4().hex[:16] + trace_id = uuid.uuid4().hex record = _SpanRecord( - span=span, span_id=span_id_str, + trace_id=trace_id, owns_span=False, start_time_ns=time.time_ns(), ) - records = TraceManager._get_records() - new_records = list(records) + [record] - _span_records_ctx.set(new_records) + _span_records_ctx.set(list(records) + [record]) return span_id_str @@ -828,10 +844,10 @@ def ensure_invocation_span( @staticmethod def pop_span() -> tuple[Optional[str], Optional[int]]: - """Ends the current span and pops it from the stack. + """Pops the top span record from the internal stack. - No ambient OTel context is detached because we never attached - one in the first place (see ``push_span``). + Returns ``(span_id, duration_ms)``. No OTel span is ended + because the plugin no longer creates one (see ``_SpanRecord``). """ records = _span_records_ctx.get() if not records: @@ -841,29 +857,13 @@ def pop_span() -> tuple[Optional[str], Optional[int]]: record = new_records.pop() _span_records_ctx.set(new_records) - # Calculate duration - duration_ms = None - otel_start = getattr(record.span, "start_time", None) - if isinstance(otel_start, (int, float)) and otel_start: - duration_ms = int((time.time_ns() - otel_start) / 1_000_000) - else: - duration_ms = int((time.time_ns() - record.start_time_ns) / 1_000_000) - - if record.owns_span: - record.span.end() - + duration_ms = int((time.time_ns() - record.start_time_ns) / 1_000_000) return record.span_id, duration_ms @staticmethod def clear_stack() -> None: """Clears all span records. Safety net for cross-invocation cleanup.""" - records = _span_records_ctx.get() - if records: - # End any owned spans to avoid OTel resource leaks. - for record in reversed(records): - if record.owns_span: - record.span.end() - _span_records_ctx.set([]) + _span_records_ctx.set([]) @staticmethod def get_current_span_and_parent() -> tuple[Optional[str], Optional[str]]: @@ -894,19 +894,11 @@ def get_root_agent_name() -> Optional[str]: @staticmethod def get_start_time(span_id: str) -> Optional[float]: - """Gets start time of a span by ID.""" + """Gets start time of a span by ID (seconds since epoch).""" records = _span_records_ctx.get() if records: for record in reversed(records): if record.span_id == span_id: - # Try OTel span start_time first - otel_start = getattr(record.span, "start_time", None) - if ( - record.span.get_span_context().is_valid - and isinstance(otel_start, (int, float)) - and otel_start - ): - return otel_start / 1_000_000_000.0 return record.start_time_ns / 1_000_000_000.0 return None diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index d91398cf9a8..46cd233ef28 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -2274,53 +2274,55 @@ class LocalIncident: assert content_json["result"]["kpi_missed"][0]["kpi"] == "latency" @pytest.mark.asyncio - async def test_otel_integration( + async def test_push_pop_does_not_call_tracer_start_span( self, callback_context, ): - """Verifies OpenTelemetry integration in TraceManager.""" - # Mock the tracer and span + """Regression guard for the duplicate-Cloud-Trace bug (issue #94). + + The plugin must NOT call ``tracer.start_span(...)`` from + ``push_span`` / ``pop_span``. Any owned OTel span goes through + the globally configured exporter (e.g. Cloud Trace via Agent + Engine telemetry) and surfaces as a duplicate span next to the + framework's real one. The plugin's internal stack is sufficient + for ``span_id`` / ``parent_span_id`` / ``trace_id`` resolution + without creating an exportable span. + """ mock_tracer = mock.Mock() - mock_span = mock.Mock() - mock_context = mock.Mock() - # Setup mock IDs (128-bit trace_id, 64-bit span_id) - trace_id_int = 0x12345678123456781234567812345678 - span_id_int = 0x1234567812345678 - mock_context.trace_id = trace_id_int - mock_context.span_id = span_id_int - mock_context.is_valid = True - mock_span.get_span_context.return_value = mock_context - mock_span.start_time = 1234567890000000000 # Mock start time in ns - mock_tracer.start_span.return_value = mock_span - # Patch the global tracer in the plugin module with mock.patch( - "google.adk.plugins.bigquery_agent_analytics_plugin.tracer", mock_tracer + "google.adk.plugins.bigquery_agent_analytics_plugin.tracer", + mock_tracer, ): - # Test push_span span_id = bigquery_agent_analytics_plugin.TraceManager.push_span( callback_context, "test_span" ) - mock_tracer.start_span.assert_called_with("test_span", context=None) - assert span_id == format(span_id_int, "016x") - # Test get_trace_id - # We need to mock trace.get_current_span() to return our mock span - # because push_span calls trace.attach(), which affects the global context - with mock.patch( - "opentelemetry.trace.get_current_span", return_value=mock_span - ): - trace_id = bigquery_agent_analytics_plugin.TraceManager.get_trace_id( - callback_context - ) - assert trace_id == format(trace_id_int, "032x") - # Test pop_span - # pop_span calls span.end() - bigquery_agent_analytics_plugin.TraceManager.pop_span() - mock_span.end.assert_called_once() + assert isinstance(span_id, str) and len(span_id) == 16 + + trace_id = bigquery_agent_analytics_plugin.TraceManager.get_trace_id( + callback_context + ) + assert isinstance(trace_id, str) and len(trace_id) == 32 + + popped_span_id, _duration_ms = ( + bigquery_agent_analytics_plugin.TraceManager.pop_span() + ) + assert popped_span_id == span_id + + mock_tracer.start_span.assert_not_called() @pytest.mark.asyncio - async def test_otel_integration_real_provider(self, callback_context): - """Verifies TraceManager with a real OpenTelemetry TracerProvider.""" - # Setup OTEL with in-memory exporter + async def test_push_pop_does_not_export_spans_through_real_provider( + self, callback_context + ): + """End-to-end regression guard against #94 with a real OTel + provider + in-memory exporter. + + Wires an ``InMemorySpanExporter`` to a real ``TracerProvider``, + drives a push/pop cycle through ``TraceManager``, and asserts + that **zero** spans were exported. Pre-fix behavior was to + export one span per push/pop pair — visible to Cloud Trace as + duplicate spans alongside the framework's real ones. + """ # pylint: disable=g-import-not-at-top from opentelemetry.sdk import trace as trace_sdk from opentelemetry.sdk.trace import export as trace_export @@ -2329,36 +2331,185 @@ async def test_otel_integration_real_provider(self, callback_context): # pylint: enable=g-import-not-at-top provider = trace_sdk.TracerProvider() exporter = in_memory_span_exporter.InMemorySpanExporter() - processor = trace_export.SimpleSpanProcessor(exporter) - provider.add_span_processor(processor) - tracer = provider.get_tracer("test_tracer") - # Patch the global tracer in the plugin module + provider.add_span_processor(trace_export.SimpleSpanProcessor(exporter)) + real_tracer = provider.get_tracer("test_tracer") + with mock.patch( - "google.adk.plugins.bigquery_agent_analytics_plugin.tracer", tracer + "google.adk.plugins.bigquery_agent_analytics_plugin.tracer", + real_tracer, ): - # 1. Start a span span_id = bigquery_agent_analytics_plugin.TraceManager.push_span( callback_context, "test_span" ) - # Verify a span was started but not ended - current_spans = exporter.get_finished_spans() - assert not current_spans - # Verify we can retrieve the trace ID + assert exporter.get_finished_spans() == () + trace_id = bigquery_agent_analytics_plugin.TraceManager.get_trace_id( callback_context ) - assert trace_id is not None - # 2. End the span + assert trace_id is not None and len(trace_id) == 32 + popped_span_id, _ = ( bigquery_agent_analytics_plugin.TraceManager.pop_span() ) assert popped_span_id == span_id - # Verify span is now finished and exported - finished_spans = exporter.get_finished_spans() - assert len(finished_spans) == 1 - assert finished_spans[0].name == "test_span" - assert format(finished_spans[0].context.span_id, "016x") == span_id - assert format(finished_spans[0].context.trace_id, "032x") == trace_id + + assert exporter.get_finished_spans() == (), ( + "Plugin must not export OTel spans; any owned span would" + " surface as a duplicate in Cloud Trace alongside the" + " framework's real spans (issue #94)." + ) + + provider.shutdown() + + @pytest.mark.asyncio + async def test_push_span_inherits_ambient_trace_id(self, callback_context): + """When the host has an ambient OTel span (e.g. Agent Engine's + Runner span), the plugin's ``trace_id`` MUST inherit from it so + BigQuery rows correlate with the host's Cloud Trace entries via + a shared ``trace_id``. + """ + # pylint: disable=g-import-not-at-top + from opentelemetry import trace as otel_trace + from opentelemetry.sdk import trace as trace_sdk + + # pylint: enable=g-import-not-at-top + provider = trace_sdk.TracerProvider() + host_tracer = provider.get_tracer("host_tracer") + + # Clear any state on the plugin's contextvar stack. + bigquery_agent_analytics_plugin._span_records_ctx.set(None) + + with host_tracer.start_as_current_span("ambient-host-span") as host_span: + expected_trace_id = format(host_span.get_span_context().trace_id, "032x") + + # Plugin pushes its first internal span inside the ambient span. + bigquery_agent_analytics_plugin.TraceManager.push_span( + callback_context, "bqaa-span" + ) + + plugin_trace_id = ( + bigquery_agent_analytics_plugin.TraceManager.get_trace_id( + callback_context + ) + ) + assert plugin_trace_id == expected_trace_id, ( + "Plugin must inherit ambient trace_id so BigQuery rows join" + " to Cloud Trace via the same trace_id" + ) + + # Nested plugin push also stays under the ambient trace_id. + bigquery_agent_analytics_plugin.TraceManager.push_span( + callback_context, "bqaa-nested" + ) + assert ( + bigquery_agent_analytics_plugin.TraceManager.get_trace_id( + callback_context + ) + == expected_trace_id + ) + + bigquery_agent_analytics_plugin.TraceManager.clear_stack() + provider.shutdown() + del otel_trace # unused; imported for symmetry with provider setup + + @pytest.mark.asyncio + async def test_llm_request_response_share_span_id_contract( + self, callback_context + ): + """Lifecycle contract: ``LLM_REQUEST`` and ``LLM_RESPONSE`` for the + same model call share one ``span_id`` and one ``trace_id``. + + Models the structural pattern the real callbacks use: + * ``before_model_callback`` calls ``push_span(...)`` and writes + ``LLM_REQUEST`` with the returned ``span_id``. + * ``after_model_callback`` calls ``get_current_span_id()`` / + ``pop_span()`` and writes ``LLM_RESPONSE`` with the same + ``span_id``. + + A future change must not split this pair onto two different + ``span_id``s — that would break the documented BigQuery query + shape and the BQAA join contract. + """ + bigquery_agent_analytics_plugin._span_records_ctx.set(None) + TM = bigquery_agent_analytics_plugin.TraceManager + + # before_model_callback path. + pushed_span_id = TM.push_span(callback_context, "llm_request") + request_trace_id = TM.get_trace_id(callback_context) + + # after_model_callback (final chunk) path. + response_top_of_stack = TM.get_current_span_id() + popped_span_id, _duration_ms = TM.pop_span() + response_trace_id = TM.get_trace_id(callback_context) + + assert response_top_of_stack == pushed_span_id + assert popped_span_id == pushed_span_id + # trace_id resolved on the response side may have to fall back + # past the now-empty stack — but if it does resolve, it must + # match what the request observed. An empty-stack fallback to + # invocation_id is acceptable here; what we are guarding against + # is the *pair* drifting onto two structurally different ids. + if response_trace_id is not None and len(response_trace_id) == 32: + assert response_trace_id == request_trace_id + + @pytest.mark.asyncio + async def test_tool_starting_completed_share_span_id_contract( + self, callback_context + ): + """Lifecycle contract: ``TOOL_STARTING`` and ``TOOL_COMPLETED`` for + the same tool call share one ``span_id``. + + Same shape as the LLM pair above — push on before, pop on after, + same id on both sides. + """ + bigquery_agent_analytics_plugin._span_records_ctx.set(None) + TM = bigquery_agent_analytics_plugin.TraceManager + + # before_tool_callback path. + pushed_span_id = TM.push_span(callback_context, "tool") + starting_trace_id = TM.get_trace_id(callback_context) + + # after_tool_callback path. + popped_span_id, _duration_ms = TM.pop_span() + + assert popped_span_id == pushed_span_id + assert isinstance(starting_trace_id, str) and len(starting_trace_id) == 32 + + @pytest.mark.asyncio + async def test_streaming_llm_response_shares_span_id_until_final_contract( + self, callback_context + ): + """Streaming-response contract. + + On a streaming LLM call, ``after_model_callback`` is fired once + per partial chunk *plus* once for the final chunk. Partial fires + do NOT pop the span (see ``after_model_callback:3354-3363``) — + they only read ``get_current_span_id()`` and record first-token + timing. Only the final fire calls ``pop_span()``. + + All resulting ``LLM_RESPONSE`` rows therefore share one + ``span_id`` (the same as the paired ``LLM_REQUEST``). A future + change must not "dedupe" the partial rows by switching to a fresh + span id per chunk — those rows are real and intentional. + """ + bigquery_agent_analytics_plugin._span_records_ctx.set(None) + TM = bigquery_agent_analytics_plugin.TraceManager + + pushed_span_id = TM.push_span(callback_context, "llm_request") + + # Simulate three partial chunks: each callback observes the same + # span_id at top of stack and does NOT pop. + for _ in range(3): + assert TM.get_current_span_id() == pushed_span_id + + # Final chunk: pop_span returns the same id and a populated + # latency. + popped_span_id, duration_ms = TM.pop_span() + assert popped_span_id == pushed_span_id + assert duration_ms is not None and duration_ms >= 0 + + # Stack must be empty after the final chunk. + assert TM.get_current_span_id() is None @pytest.mark.asyncio async def test_keyword_identifiers_emission_default( @@ -5902,8 +6053,10 @@ async def test_trace_id_continuity_no_ambient_span(self, callback_context): TM = bigquery_agent_analytics_plugin.TraceManager - # Create a real TracerProvider and patch the plugin's module-level - # tracer so push_span creates valid spans with proper trace_ids. + # Wire a real TracerProvider with an in-memory exporter so we can + # also assert the plugin path does NOT export anything through it. + # (push_span no longer creates OTel spans — see _SpanRecord; the + # exporter is here as a regression guard, not a span source.) exporter = InMemorySpanExporter() provider = SdkProvider() provider.add_span_processor(SimpleSpanProcessor(exporter)) @@ -6225,8 +6378,11 @@ async def test_starting_completed_same_span_with_ambient( assert len(agent_starting) == 1 assert len(agent_completed) == 1 - # Both events must share the same span_id (the ambient - # invoke_agent span) — no plugin-synthetic override. + # Both events must share the same span_id (the plugin-internal + # agent span pushed by before_agent_callback and popped by + # after_agent_callback). The lifecycle-pair invariant holds + # regardless of whether the id comes from a plugin-minted hex + # string or an ambient OTel span. assert agent_starting[0]["span_id"] == agent_completed[0]["span_id"] assert ( agent_starting[0]["parent_span_id"] @@ -6411,8 +6567,16 @@ def test_ensure_invocation_span_clears_stale_records(self, callback_context): provider.shutdown() - def test_clear_stack_ends_owned_spans(self, callback_context): - """clear_stack() ends all owned spans.""" + def test_clear_stack_does_not_export_spans(self, callback_context): + """``clear_stack()`` clears the internal records but does NOT + export any OTel spans (issue #94 regression guard). + + Pre-fix, ``clear_stack()`` called ``record.span.end()`` for every + owned record, which delivered the now-finished span to whatever + exporter the host had wired — duplicating it next to the + framework's real span in Cloud Trace. Post-fix the plugin owns + no OTel span at all; ``clear_stack()`` only resets the contextvar. + """ from opentelemetry.sdk.trace import TracerProvider as SdkProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter @@ -6433,6 +6597,8 @@ def test_clear_stack_ends_owned_spans(self, callback_context): records = list(bigquery_agent_analytics_plugin._span_records_ctx.get()) assert all(r.owns_span for r in records) + # No exported spans yet (the plugin never creates any). + assert exporter.get_finished_spans() == () TM.clear_stack() @@ -6440,9 +6606,12 @@ def test_clear_stack_ends_owned_spans(self, callback_context): result = bigquery_agent_analytics_plugin._span_records_ctx.get() assert result == [] - # Both owned spans should have been ended (exported). - exported = exporter.get_finished_spans() - assert len(exported) == 2 + # Still no exported spans — the regression guard for #94. + assert exporter.get_finished_spans() == (), ( + "clear_stack() must not export OTel spans; any owned span" + " would surface as a duplicate in Cloud Trace alongside the" + " framework's real spans (issue #94)." + ) provider.shutdown()