Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 175 additions & 1 deletion src/google/adk/plugins/bigquery_agent_analytics_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,37 @@ class BigQueryLoggerConfig:
contextvars.ContextVar("_bq_analytics_active_invocation_id", default=None)
)

# Per-invocation workflow-node accumulator for WORKFLOW_NODE_STARTING /
# WORKFLOW_NODE_COMPLETED boundary derivation (#207). Invocation-local
# (contextvar, like _active_invocation_id_ctx) so concurrent async
# invocations stay isolated and no global plugin state is shared. Maps
# (node.path, node.run_id) -> _WorkflowNodeProgress for the current
# invocation; populated in on_event_callback, drained + cleared in
# after_run_callback.
_workflow_nodes_ctx: contextvars.ContextVar[
Optional[dict[tuple[str, Optional[str]], "_WorkflowNodeProgress"]]
] = contextvars.ContextVar("_bq_analytics_workflow_nodes", default=None)


@dataclass
class _WorkflowNodeProgress:
"""Tracking state for one workflow node within an invocation (#207).

``path`` / ``run_id`` / ``parent_run_id`` mirror the C1 node envelope
so the drained WORKFLOW_NODE_COMPLETED row can carry the same node
identity as the WORKFLOW_NODE_STARTING row. ``last_event_ts`` is the
float-epoch timestamp of the most recent event observed at this node
(used as the COMPLETED row's timestamp). ``last_event_long_running``
records whether that last event carried ``long_running_tool_ids`` —
i.e. the node is paused, not complete.
"""

path: str
run_id: Optional[str]
parent_run_id: Optional[str]
last_event_ts: float
last_event_long_running: bool


@dataclass
class _SpanRecord:
Expand Down Expand Up @@ -2078,6 +2109,31 @@ def _get_events_schema() -> list[bigquery.SchemaField]:
"JSON_VALUE(attributes, '$.adk.pause_kind') AS pause_kind",
"JSON_VALUE(attributes, '$.adk.function_call_id') AS function_call_id",
],
# Workflow-node boundaries (#207). Both rows carry the node envelope
# so STARTING <-> COMPLETED pair on
# (app_name, user_id, session_id, node_path, node_run_id). COMPLETED
# additionally carries the outcome discriminator
# (``completed``; ``failed`` reserved for the error-signal follow-up).
"WORKFLOW_NODE_STARTING": [
"JSON_VALUE(attributes, '$.adk.node.path') AS node_path",
"JSON_VALUE(attributes, '$.adk.node.run_id') AS node_run_id",
(
"JSON_VALUE(attributes, '$.adk.node.parent_run_id')"
" AS node_parent_run_id"
),
],
"WORKFLOW_NODE_COMPLETED": [
"JSON_VALUE(attributes, '$.adk.node.path') AS node_path",
"JSON_VALUE(attributes, '$.adk.node.run_id') AS node_run_id",
(
"JSON_VALUE(attributes, '$.adk.node.parent_run_id')"
" AS node_parent_run_id"
),
(
"JSON_VALUE(attributes, '$.adk.workflow_node_status')"
" AS workflow_node_status"
),
],
}

_VIEW_SQL_TEMPLATE = """\
Expand Down Expand Up @@ -2118,6 +2174,11 @@ class EventData:
error_message: Optional[str] = None
extra_attributes: dict[str, Any] = field(default_factory=dict)
trace_id_override: Optional[str] = None
# Row-timestamp override. Used by drained rows that represent a past
# moment rather than "now" — e.g. WORKFLOW_NODE_COMPLETED, emitted in
# the after-run drain but stamped with the node's last-event time so
# per-node duration (STARTING -> COMPLETED) is preserved.
timestamp_override: Optional[datetime] = None
# ADK 2.0 envelope: callbacks that hold the source Event pass it here
# so ``_log_event`` can stamp ``attributes.adk.{source_event_id, node,
# branch, scope, ...}``. Leave None for rows that don't originate from
Expand Down Expand Up @@ -3181,7 +3242,7 @@ async def _log_event(
if event_data is None:
event_data = EventData()

timestamp = datetime.now(timezone.utc)
timestamp = event_data.timestamp_override or datetime.now(timezone.utc)
if self.config.content_formatter:
try:
raw_content = self.config.content_formatter(raw_content, event_type)
Expand Down Expand Up @@ -3322,6 +3383,107 @@ async def on_user_message_callback(
),
)

async def _track_workflow_node(
self, callback_ctx: CallbackContext, source_event: "Event"
) -> None:
"""Record a workflow-node observation; emit WORKFLOW_NODE_STARTING (#207).

Invocation-segment scoped: keyed by ``(node.path, node.run_id)`` in
the per-invocation contextvar accumulator. On the first event seen at
a node, emits ``WORKFLOW_NODE_STARTING`` (carrying the C1 node
envelope from ``source_event``). Later events update the node's
last-seen timestamp and paused state for the after-run drain.

Events with an empty ``node.path`` (the ``NodeInfo`` default for
non-workflow events) are ignored — no synthetic node is created.
"""
node_info = getattr(source_event, "node_info", None)
if node_info is None:
return
path = getattr(node_info, "path", "") or ""
if not path:
return # non-workflow event; never synthesize a node
run_id = getattr(node_info, "run_id", None)
parent_run_id = getattr(node_info, "parent_run_id", None)
last_running = bool(getattr(source_event, "long_running_tool_ids", None))
raw_ts = getattr(source_event, "timestamp", None)
ts = float(raw_ts) if isinstance(raw_ts, (int, float)) else time.time()

nodes = _workflow_nodes_ctx.get()
if nodes is None:
nodes = {}
_workflow_nodes_ctx.set(nodes)
key = (path, run_id)
existing = nodes.get(key)
if existing is None:
nodes[key] = _WorkflowNodeProgress(
path=path,
run_id=run_id,
parent_run_id=parent_run_id,
last_event_ts=ts,
last_event_long_running=last_running,
)
await self._log_event(
"WORKFLOW_NODE_STARTING",
callback_ctx,
event_data=EventData(source_event=source_event),
)
else:
existing.last_event_ts = ts
existing.last_event_long_running = last_running

async def _drain_workflow_nodes(self, callback_ctx: CallbackContext) -> None:
"""Emit WORKFLOW_NODE_COMPLETED for terminal nodes at invocation end (#207).

Pause-aware drain: a node is left **open** (no COMPLETED) when its
last observed event carried ``long_running_tool_ids`` (paused leaf),
or when it is a strict path-ancestor of any such paused node (a
parent stays open while a descendant is suspended). The resume
segment emits a fresh STARTING and a later drain completes it.

Each emitted COMPLETED carries the node envelope, the node's
last-event timestamp (so STARTING -> COMPLETED gives node duration),
and ``attributes.adk.workflow_node_status = 'completed'``. The
``failed`` outcome is reserved for a follow-up once a reliable
invocation-error signal is available; this drain never emits it.
"""
nodes = _workflow_nodes_ctx.get()
if not nodes:
return
paused_paths = [
prog.path for prog in nodes.values() if prog.last_event_long_running
]

def _is_open(prog: "_WorkflowNodeProgress") -> bool:
if prog.last_event_long_running:
return True # paused leaf
# strict path-ancestor of any paused node stays open
return any(
paused != prog.path and paused.startswith(prog.path + "/")
for paused in paused_paths
)

for prog in nodes.values():
if _is_open(prog):
continue
await self._log_event(
"WORKFLOW_NODE_COMPLETED",
callback_ctx,
event_data=EventData(
timestamp_override=datetime.fromtimestamp(
prog.last_event_ts, timezone.utc
),
adk_extras={
"node": {
"path": prog.path,
"run_id": prog.run_id,
"parent_run_id": prog.parent_run_id,
},
"workflow_node_status": "completed",
},
),
)

@_safe_callback
async def on_event_callback(
self,
Expand Down Expand Up @@ -3357,6 +3519,11 @@ async def on_event_callback(
"""
callback_ctx = CallbackContext(invocation_context)

# --- Workflow node boundary: emit WORKFLOW_NODE_STARTING on first
# observation of a node within this invocation (#207). Done first
# so the node's "start" precedes its own event rows.
await self._track_workflow_node(callback_ctx, event)

# --- State delta logging ---
if event.actions.state_delta:
await self._log_event(
Expand Down Expand Up @@ -3661,6 +3828,12 @@ async def after_run_callback(
callback_ctx = CallbackContext(invocation_context)
trace_id = TraceManager.get_trace_id(callback_ctx)

# Drain workflow-node boundaries (#207) at this deterministic
# lifecycle point, BEFORE INVOCATION_COMPLETED, so the invocation
# row remains the last event of the segment. Paused / ancestor-of-
# paused nodes are left open by the drain.
await self._drain_workflow_nodes(callback_ctx)

# Pop the invocation-root span pushed by ensure_invocation_span.
span_id, duration = TraceManager.pop_span()
parent_span_id = TraceManager.get_current_span_id()
Expand All @@ -3681,6 +3854,7 @@ async def after_run_callback(
TraceManager.clear_stack()
_active_invocation_id_ctx.set(None)
_root_agent_name_ctx.set(None)
_workflow_nodes_ctx.set(None)
# Ensure all logs are flushed before the agent returns.
await self.flush()

Expand Down
Loading
Loading