From e96ac446300ccaef0dc48584fa9112b707073e38 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Wed, 17 Jun 2026 14:53:59 -0700 Subject: [PATCH 1/3] feat(bigquery-analytics): workflow-node boundary events (#207 core) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Emit WORKFLOW_NODE_STARTING / WORKFLOW_NODE_COMPLETED from event- observation derivation, invocation-segment scoped. - Per-invocation contextvar accumulator (_workflow_nodes_ctx), keyed by (node.path, node.run_id); no global plugin state. - on_event_callback: on first event observed at a node, emit WORKFLOW_NODE_STARTING (carrying the C1 node envelope). Events with an empty node.path (non-workflow) are ignored — no synthetic node. - after_run_callback: pause-aware drain before INVOCATION_COMPLETED. A node is left open (no COMPLETED) when its last event carried long_running_tool_ids (paused leaf) or it is a strict path-ancestor of a paused node. Completed rows carry the node envelope, the node's last-event timestamp (via new EventData.timestamp_override, so STARTING->COMPLETED yields per-node duration), and attributes.adk.workflow_node_status = 'completed'. - Accumulator cleared in the after_run finally alongside the other per-invocation context vars. - Producer _EVENT_VIEW_DEFS gains typed views for both types. Scope: the 'failed' outcome is reserved for a follow-up once a reliable invocation-error signal exists (none is cleanly available today); this drain only ever stamps 'completed'. No WORKFLOW_NODE_PAUSED — open nodes are inferable from latest STARTING without COMPLETED within the query window. Tests: starting-once-per-node, empty-path-skip, completed-on-drain, paused-leaf-open, ancestor-of-paused-open, sibling-no-bleed. --- .../bigquery_agent_analytics_plugin.py | 178 +++++++++++++++++- .../test_bigquery_agent_analytics_plugin.py | 157 +++++++++++++++ 2 files changed, 334 insertions(+), 1 deletion(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 36d92bf781d..767a38dd32e 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -677,6 +677,37 @@ class BigQueryLoggerConfig: contextvars.ContextVar("_bq_analytics_active_invocation_id", default=None) ) +# Per-invocation workflow-node accumulator for WORKFLOW_NODE_STARTING / +# WORKFLOW_NODE_COMPLETED boundary derivation (#207). Invocation-local +# (contextvar, like _active_invocation_id_ctx) so concurrent async +# invocations stay isolated and no global plugin state is shared. Maps +# (node.path, node.run_id) -> _WorkflowNodeProgress for the current +# invocation; populated in on_event_callback, drained + cleared in +# after_run_callback. +_workflow_nodes_ctx: contextvars.ContextVar[ + Optional[dict[tuple[str, Optional[str]], "_WorkflowNodeProgress"]] +] = contextvars.ContextVar("_bq_analytics_workflow_nodes", default=None) + + +@dataclass +class _WorkflowNodeProgress: + """Tracking state for one workflow node within an invocation (#207). + + ``path`` / ``run_id`` / ``parent_run_id`` mirror the C1 node envelope + so the drained WORKFLOW_NODE_COMPLETED row can carry the same node + identity as the WORKFLOW_NODE_STARTING row. ``last_event_ts`` is the + float-epoch timestamp of the most recent event observed at this node + (used as the COMPLETED row's timestamp). ``last_event_long_running`` + records whether that last event carried ``long_running_tool_ids`` — + i.e. the node is paused, not complete. + """ + + path: str + run_id: Optional[str] + parent_run_id: Optional[str] + last_event_ts: float + last_event_long_running: bool + @dataclass class _SpanRecord: @@ -2078,6 +2109,31 @@ def _get_events_schema() -> list[bigquery.SchemaField]: "JSON_VALUE(attributes, '$.adk.pause_kind') AS pause_kind", "JSON_VALUE(attributes, '$.adk.function_call_id') AS function_call_id", ], + # Workflow-node boundaries (#207). Both rows carry the node envelope + # so STARTING <-> COMPLETED pair on + # (app_name, user_id, session_id, node_path, node_run_id). COMPLETED + # additionally carries the outcome discriminator + # (``completed``; ``failed`` reserved for the error-signal follow-up). + "WORKFLOW_NODE_STARTING": [ + "JSON_VALUE(attributes, '$.adk.node.path') AS node_path", + "JSON_VALUE(attributes, '$.adk.node.run_id') AS node_run_id", + ( + "JSON_VALUE(attributes, '$.adk.node.parent_run_id')" + " AS node_parent_run_id" + ), + ], + "WORKFLOW_NODE_COMPLETED": [ + "JSON_VALUE(attributes, '$.adk.node.path') AS node_path", + "JSON_VALUE(attributes, '$.adk.node.run_id') AS node_run_id", + ( + "JSON_VALUE(attributes, '$.adk.node.parent_run_id')" + " AS node_parent_run_id" + ), + ( + "JSON_VALUE(attributes, '$.adk.workflow_node_status')" + " AS workflow_node_status" + ), + ], } _VIEW_SQL_TEMPLATE = """\ @@ -2118,6 +2174,11 @@ class EventData: error_message: Optional[str] = None extra_attributes: dict[str, Any] = field(default_factory=dict) trace_id_override: Optional[str] = None + # Row-timestamp override. Used by drained rows that represent a past + # moment rather than "now" — e.g. WORKFLOW_NODE_COMPLETED, emitted in + # the after-run drain but stamped with the node's last-event time so + # per-node duration (STARTING -> COMPLETED) is preserved. + timestamp_override: Optional[datetime] = None # ADK 2.0 envelope: callbacks that hold the source Event pass it here # so ``_log_event`` can stamp ``attributes.adk.{source_event_id, node, # branch, scope, ...}``. Leave None for rows that don't originate from @@ -3181,7 +3242,7 @@ async def _log_event( if event_data is None: event_data = EventData() - timestamp = datetime.now(timezone.utc) + timestamp = event_data.timestamp_override or datetime.now(timezone.utc) if self.config.content_formatter: try: raw_content = self.config.content_formatter(raw_content, event_type) @@ -3322,6 +3383,109 @@ async def on_user_message_callback( ), ) + async def _track_workflow_node( + self, callback_ctx: CallbackContext, source_event: "Event" + ) -> None: + """Record a workflow-node observation; emit WORKFLOW_NODE_STARTING (#207). + + Invocation-segment scoped: keyed by ``(node.path, node.run_id)`` in + the per-invocation contextvar accumulator. On the first event seen at + a node, emits ``WORKFLOW_NODE_STARTING`` (carrying the C1 node + envelope from ``source_event``). Later events update the node's + last-seen timestamp and paused state for the after-run drain. + + Events with an empty ``node.path`` (the ``NodeInfo`` default for + non-workflow events) are ignored — no synthetic node is created. + """ + node_info = getattr(source_event, "node_info", None) + if node_info is None: + return + path = getattr(node_info, "path", "") or "" + if not path: + return # non-workflow event; never synthesize a node + run_id = getattr(node_info, "run_id", None) + parent_run_id = getattr(node_info, "parent_run_id", None) + last_running = bool(getattr(source_event, "long_running_tool_ids", None)) + ts = getattr(source_event, "timestamp", None) + + nodes = _workflow_nodes_ctx.get() + if nodes is None: + nodes = {} + _workflow_nodes_ctx.set(nodes) + key = (path, run_id) + existing = nodes.get(key) + if existing is None: + nodes[key] = _WorkflowNodeProgress( + path=path, + run_id=run_id, + parent_run_id=parent_run_id, + last_event_ts=ts, + last_event_long_running=last_running, + ) + await self._log_event( + "WORKFLOW_NODE_STARTING", + callback_ctx, + event_data=EventData(source_event=source_event), + ) + else: + if ts is not None: + existing.last_event_ts = ts + existing.last_event_long_running = last_running + + async def _drain_workflow_nodes(self, callback_ctx: CallbackContext) -> None: + """Emit WORKFLOW_NODE_COMPLETED for terminal nodes at invocation end (#207). + + Pause-aware drain: a node is left **open** (no COMPLETED) when its + last observed event carried ``long_running_tool_ids`` (paused leaf), + or when it is a strict path-ancestor of any such paused node (a + parent stays open while a descendant is suspended). The resume + segment emits a fresh STARTING and a later drain completes it. + + Each emitted COMPLETED carries the node envelope, the node's + last-event timestamp (so STARTING -> COMPLETED gives node duration), + and ``attributes.adk.workflow_node_status = 'completed'``. The + ``failed`` outcome is reserved for a follow-up once a reliable + invocation-error signal is available; this drain never emits it. + """ + nodes = _workflow_nodes_ctx.get() + if not nodes: + return + paused_paths = [ + prog.path for prog in nodes.values() if prog.last_event_long_running + ] + + def _is_open(prog: "_WorkflowNodeProgress") -> bool: + if prog.last_event_long_running: + return True # paused leaf + # strict path-ancestor of any paused node stays open + return any( + paused != prog.path and paused.startswith(prog.path + "/") + for paused in paused_paths + ) + + for prog in nodes.values(): + if _is_open(prog): + continue + await self._log_event( + "WORKFLOW_NODE_COMPLETED", + callback_ctx, + event_data=EventData( + timestamp_override=datetime.fromtimestamp( + prog.last_event_ts, timezone.utc + ) + if prog.last_event_ts is not None + else None, + adk_extras={ + "node": { + "path": prog.path, + "run_id": prog.run_id, + "parent_run_id": prog.parent_run_id, + }, + "workflow_node_status": "completed", + }, + ), + ) + @_safe_callback async def on_event_callback( self, @@ -3357,6 +3521,11 @@ async def on_event_callback( """ callback_ctx = CallbackContext(invocation_context) + # --- Workflow node boundary: emit WORKFLOW_NODE_STARTING on first + # observation of a node within this invocation (#207). Done first + # so the node's "start" precedes its own event rows. + await self._track_workflow_node(callback_ctx, event) + # --- State delta logging --- if event.actions.state_delta: await self._log_event( @@ -3661,6 +3830,12 @@ async def after_run_callback( callback_ctx = CallbackContext(invocation_context) trace_id = TraceManager.get_trace_id(callback_ctx) + # Drain workflow-node boundaries (#207) at this deterministic + # lifecycle point, BEFORE INVOCATION_COMPLETED, so the invocation + # row remains the last event of the segment. Paused / ancestor-of- + # paused nodes are left open by the drain. + await self._drain_workflow_nodes(callback_ctx) + # Pop the invocation-root span pushed by ensure_invocation_span. span_id, duration = TraceManager.pop_span() parent_span_id = TraceManager.get_current_span_id() @@ -3681,6 +3856,7 @@ async def after_run_callback( TraceManager.clear_stack() _active_invocation_id_ctx.set(None) _root_agent_name_ctx.set(None) + _workflow_nodes_ctx.set(None) # Ensure all logs are flushed before the agent returns. await self.flush() diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index 7cc8e3600c2..f58ca3fc496 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -8815,3 +8815,160 @@ async def test_matched_id_not_double_emitted_by_fallback( rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) pauses = [r for r in rows if r["event_type"] == "TOOL_PAUSED"] assert len(pauses) == 1 + + +class TestWorkflowNodeBoundaries: + """#207 — WORKFLOW_NODE_STARTING / WORKFLOW_NODE_COMPLETED derivation. + + Drives the real on_event_callback (which records nodes + emits + STARTING) and the after-run drain helper (which emits COMPLETED for + terminal, non-paused nodes), then asserts the captured rows. + """ + + def _event(self, path, *, long_running=None, ts=1000.0): + return event_lib.Event( + author="agent", + content=types.Content(role="model", parts=[types.Part(text="x")]), + node_info=event_lib.NodeInfo(path=path), + long_running_tool_ids=long_running, + actions=event_actions_lib.EventActions(), + timestamp=ts, + ) + + async def _observe(self, plugin, ic, *events): + bigquery_agent_analytics_plugin._workflow_nodes_ctx.set(None) + bigquery_agent_analytics_plugin.TraceManager.push_span(ic) + for ev in events: + await plugin.on_event_callback(invocation_context=ic, event=ev) + + def _node_rows(self, rows, event_type): + out = [] + for r in rows: + if r["event_type"] != event_type: + continue + adk = json.loads(r["attributes"])["adk"] + out.append(adk) + return out + + @pytest.mark.asyncio + async def test_starting_emitted_once_per_node( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + await self._observe( + bq_plugin_inst, + invocation_context, + self._event("wf/A@1"), + self._event("wf/A@1"), # same node twice + self._event("wf/B@1"), + ) + await bq_plugin_inst.flush() + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + starts = self._node_rows(rows, "WORKFLOW_NODE_STARTING") + paths = sorted(s["node"]["path"] for s in starts) + assert paths == ["wf/A@1", "wf/B@1"] # A not re-emitted + + @pytest.mark.asyncio + async def test_empty_path_event_emits_no_node_row( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + await self._observe(bq_plugin_inst, invocation_context, self._event("")) + await bq_plugin_inst.flush() + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + assert not self._node_rows(rows, "WORKFLOW_NODE_STARTING") + + @pytest.mark.asyncio + async def test_completed_drained_for_terminal_node( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + await self._observe( + bq_plugin_inst, invocation_context, self._event("wf/A@1", ts=1234.5) + ) + await bq_plugin_inst._drain_workflow_nodes( + bigquery_agent_analytics_plugin.CallbackContext(invocation_context) + ) + await bq_plugin_inst.flush() + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + done = self._node_rows(rows, "WORKFLOW_NODE_COMPLETED") + assert len(done) == 1 + assert done[0]["node"]["path"] == "wf/A@1" + assert done[0]["workflow_node_status"] == "completed" + + @pytest.mark.asyncio + async def test_paused_leaf_not_completed( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + await self._observe( + bq_plugin_inst, + invocation_context, + self._event("wf/A@1", long_running={"call-1"}), + ) + await bq_plugin_inst._drain_workflow_nodes( + bigquery_agent_analytics_plugin.CallbackContext(invocation_context) + ) + await bq_plugin_inst.flush() + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + assert not self._node_rows(rows, "WORKFLOW_NODE_COMPLETED") + + @pytest.mark.asyncio + async def test_ancestor_of_paused_not_completed( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + # Parent wf/A@1, child wf/A@1/B@1; child pauses -> neither completes. + await self._observe( + bq_plugin_inst, + invocation_context, + self._event("wf/A@1"), + self._event("wf/A@1/B@1", long_running={"call-1"}), + ) + await bq_plugin_inst._drain_workflow_nodes( + bigquery_agent_analytics_plugin.CallbackContext(invocation_context) + ) + await bq_plugin_inst.flush() + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + assert not self._node_rows(rows, "WORKFLOW_NODE_COMPLETED") + + @pytest.mark.asyncio + async def test_sibling_no_bleed( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + # A pauses; B is a separate terminal node -> B completes, A does not. + await self._observe( + bq_plugin_inst, + invocation_context, + self._event("wf/A@1", long_running={"call-1"}), + self._event("wf/B@1"), + ) + await bq_plugin_inst._drain_workflow_nodes( + bigquery_agent_analytics_plugin.CallbackContext(invocation_context) + ) + await bq_plugin_inst.flush() + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + done = sorted( + d["node"]["path"] + for d in self._node_rows(rows, "WORKFLOW_NODE_COMPLETED") + ) + assert done == ["wf/B@1"] From f879e7825ac615f80afcc6bf8f72c7d809f15e67 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Thu, 18 Jun 2026 11:30:22 -0700 Subject: [PATCH 2/3] test(bqaa): account for WORKFLOW_NODE_STARTING row in C1 envelope test test_envelope_node_with_parent_run_id fires a node-bearing STATE_DELTA event, which now also emits a WORKFLOW_NODE_STARTING row (#207). Switch the single-row capture to row-filtering and assert the C1 envelope on the STATE_DELTA row. --- .../plugins/test_bigquery_agent_analytics_plugin.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index f58ca3fc496..be162772046 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -8380,11 +8380,13 @@ async def test_envelope_node_with_parent_run_id( await bq_plugin_inst.on_event_callback( invocation_context=invocation_context, event=event ) - await asyncio.sleep(0.01) - log_entry = await _get_captured_event_dict_async( - mock_write_client, dummy_arrow_schema - ) - adk = json.loads(log_entry["attributes"])["adk"] + await bq_plugin_inst.flush() + # The event also triggers a WORKFLOW_NODE_STARTING row (#207); the C1 + # envelope is asserted on the STATE_DELTA row this event produces. + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + state_rows = [r for r in rows if r["event_type"] == "STATE_DELTA"] + assert len(state_rows) == 1 + adk = json.loads(state_rows[0]["attributes"])["adk"] assert adk["node"]["path"] == "wf/A@1/B@2" assert adk["node"]["run_id"] == "2" assert adk["node"]["parent_run_id"] == "1" From fc217040fc33ad91df8744204475bff6d7ad7a3a Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Thu, 18 Jun 2026 13:31:20 -0700 Subject: [PATCH 3/3] fix(bqaa): coerce node last_event_ts to float for mypy getattr(source_event, 'timestamp', None) is typed Any | None, but _WorkflowNodeProgress.last_event_ts is float. Coerce at the boundary (fall back to time.time() when absent) so construction type-checks, and drop the now-dead None guards on the field. --- .../adk/plugins/bigquery_agent_analytics_plugin.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 767a38dd32e..73c313b9329 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -3406,7 +3406,8 @@ async def _track_workflow_node( run_id = getattr(node_info, "run_id", None) parent_run_id = getattr(node_info, "parent_run_id", None) last_running = bool(getattr(source_event, "long_running_tool_ids", None)) - ts = getattr(source_event, "timestamp", None) + raw_ts = getattr(source_event, "timestamp", None) + ts = float(raw_ts) if isinstance(raw_ts, (int, float)) else time.time() nodes = _workflow_nodes_ctx.get() if nodes is None: @@ -3428,8 +3429,7 @@ async def _track_workflow_node( event_data=EventData(source_event=source_event), ) else: - if ts is not None: - existing.last_event_ts = ts + existing.last_event_ts = ts existing.last_event_long_running = last_running async def _drain_workflow_nodes(self, callback_ctx: CallbackContext) -> None: @@ -3472,9 +3472,7 @@ def _is_open(prog: "_WorkflowNodeProgress") -> bool: event_data=EventData( timestamp_override=datetime.fromtimestamp( prog.last_event_ts, timezone.utc - ) - if prog.last_event_ts is not None - else None, + ), adk_extras={ "node": { "path": prog.path,