From f1a607bb45a4985337902d7da72981b44f0452a9 Mon Sep 17 00:00:00 2001 From: Aditi Kumari Date: Fri, 12 Jun 2026 16:32:56 +0530 Subject: [PATCH] feat(governance): GovernanceRuntime wrapper + runtime registry wiring Co-Authored-By: Claude Opus 4.8 --- docs/runtime-wrapper-extension.md | 85 ++ src/uipath/runtime/__init__.py | 7 + src/uipath/runtime/governance/wrapper.py | 999 +++++++++++++++++++++++ src/uipath/runtime/registry.py | 133 ++- src/uipath/runtime/wrapper.py | 54 ++ tests/test_dispose_isolation.py | 146 ++++ tests/test_registry.py | 186 ++++- tests/test_wrapper.py | 85 ++ tests/test_wrapper_internals.py | 422 ++++++++++ 9 files changed, 2093 insertions(+), 24 deletions(-) create mode 100644 docs/runtime-wrapper-extension.md create mode 100644 src/uipath/runtime/governance/wrapper.py create mode 100644 src/uipath/runtime/wrapper.py create mode 100644 tests/test_dispose_isolation.py create mode 100644 tests/test_wrapper.py create mode 100644 tests/test_wrapper_internals.py diff --git a/docs/runtime-wrapper-extension.md b/docs/runtime-wrapper-extension.md new file mode 100644 index 0000000..d077a18 --- /dev/null +++ b/docs/runtime-wrapper-extension.md @@ -0,0 +1,85 @@ +# Governance Integration Point + +`uipath-runtime` wraps runtimes with governance via a single direct +function, `apply_governance_wrapper`, gated by the +`EnablePythonGovernanceChecker` feature flag. + +Governance contracts (feature-flag, exceptions, models) live in +`uipath.core.governance` (in `uipath-core`); the runtime-side wrapper +lives here in `uipath.runtime.governance`. Runtime has **no separate +`uipath-governance` dependency** — the contracts namespace is always +available because `uipath-core` is already a hard dep. When the flag +is off, `uipath.runtime.governance.wrapper` is **not imported** — its +transitive cost stays off the startup path. + +## How it works + +``` +UiPathRuntimeFactoryRegistry.get(...) + ↓ returns +UiPathWrappedRuntimeFactory.new_runtime(...) + ↓ calls +apply_governance_wrapper(runtime, context, runtime_id) + ↓ + if _is_governance_enabled(): + from uipath.runtime.governance.wrapper import governance_wrapper # lazy + return governance_wrapper(runtime, context, runtime_id) + else: + return runtime # unwrapped, no governance import +``` + +## Feature flag + +| Setting | Effect | +|---|---| +| `FeatureFlags.configure_flags({"EnablePythonGovernanceChecker": True})` (typically via gitops) | Governance is applied | +| `UIPATH_FEATURE_EnablePythonGovernanceChecker=true` env var | Governance is applied (fallback when no programmatic config) | +| Neither set | Governance **not** applied; `uipath.runtime.governance.wrapper` is **not imported** | + +Resolution and fallback semantics come from `uipath-core`'s +`FeatureFlags.is_flag_enabled(..., default=False)`. Programmatic +configuration beats env var. + +## API + +```python +from uipath.runtime import ( + GOVERNANCE_FEATURE_FLAG, # "EnablePythonGovernanceChecker" + apply_governance_wrapper, # the call-site +) +``` + +`apply_governance_wrapper(runtime, context, runtime_id)` is an +`async` function. It returns the original runtime untouched when the +flag is off or when the wrapper itself raises — governance failures +must never break agent execution. + +## Why deferred-import matters + +When the flag is off, `apply_governance_wrapper` returns before the +`from uipath.runtime.governance.wrapper import governance_wrapper` line +ever runs. That keeps governance's transitive imports — audit, +evaluator, OpenTelemetry, the policy index — entirely off the startup +hot path. + +## Testing + +Force the flag on/off per test via `FeatureFlags`: + +```python +from uipath.core.feature_flags import FeatureFlags +from uipath.runtime.wrapper import GOVERNANCE_FEATURE_FLAG + +# Force enable +FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: True}) + +# Force disable +FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: False}) + +# Reset (typically in a teardown fixture) +FeatureFlags.reset_flags() +``` + +Use `sys.modules` patching to stub `uipath.runtime.governance.wrapper` +when you need to assert against the wrapper invocation without +actually importing it — see `tests/test_wrapper.py` for the fixture. diff --git a/src/uipath/runtime/__init__.py b/src/uipath/runtime/__init__.py index 1eed011..e7af5c5 100644 --- a/src/uipath/runtime/__init__.py +++ b/src/uipath/runtime/__init__.py @@ -43,6 +43,10 @@ ) from uipath.runtime.schema import UiPathRuntimeSchema from uipath.runtime.storage import UiPathRuntimeStorageProtocol +from uipath.runtime.wrapper import ( + GOVERNANCE_FEATURE_FLAG, + apply_governance_wrapper, +) __all__ = [ "UiPathExecuteOptions", @@ -73,4 +77,7 @@ "UiPathResumeTriggerName", "UiPathChatProtocol", "UiPathChatRuntime", + # Governance integration (direct, FF-gated, lazy import) + "GOVERNANCE_FEATURE_FLAG", + "apply_governance_wrapper", ] diff --git a/src/uipath/runtime/governance/wrapper.py b/src/uipath/runtime/governance/wrapper.py new file mode 100644 index 0000000..3b383ba --- /dev/null +++ b/src/uipath/runtime/governance/wrapper.py @@ -0,0 +1,999 @@ +"""SDK runtime wrapper integration with adapter-based framework support. + +This module provides the wrapper function that integrates with the +UiPath Runtime SDK's UiPathRuntimeWrapperRegistry. + +Architecture: + The wrapper automatically detects and wraps agents using framework-specific + adapters. This provides governance at all lifecycle hooks: + + - BEFORE_AGENT / AFTER_AGENT: Intercepted at runtime level in execute()/stream() + - BEFORE_MODEL / AFTER_MODEL / TOOL_CALL: Via framework-specific adapters + + Agent Detection Flow: + 1. GovernanceRuntime receives delegate runtime from SDK + 2. Extracts agent from delegate (looks for _agent, agent, _runnable, etc.) + 3. Uses AdapterRegistry.resolve() to find matching adapter + 4. Calls adapter.attach() to wrap agent with governance hooks + 5. Replaces original agent in delegate with governed version + + Supported Frameworks (via adapters): + - LangChain / LangGraph + - Microsoft AutoGen + - CrewAI + - LlamaIndex + - OpenAI Agents SDK + - PydanticAI + - Microsoft Semantic Kernel + - HuggingFace smolagents +""" + +from __future__ import annotations + +import logging +import threading +from contextvars import ContextVar, Token +from typing import Any, AsyncGenerator, Dict, Optional +from uuid import uuid4 + +from uipath.core.adapters import get_adapter_registry +from uipath.core.governance.config import is_governance_enabled +from uipath.core.governance.exceptions import GovernanceBlockException + +from uipath.runtime.base import UiPathRuntimeProtocol +from uipath.runtime.governance.config import EnforcementMode, get_enforcement_mode +from uipath.runtime.governance.delegation_guard import ( + install_delegation_guard, + uninstall_delegation_guard, +) +from uipath.runtime.governance.native.backend_client import set_agent_conversational +from uipath.runtime.governance.native.evaluator import GovernanceEvaluator +from uipath.runtime.governance.native.loader import ( + get_policy_index, + prefetch_policy_index, +) +from uipath.runtime.result import UiPathRuntimeResult + +logger = logging.getLogger(__name__) + +# Per-call-context model name set by GovernanceRuntime, read by adapters. +# A ContextVar keeps concurrent agent runs from stomping each other's +# value across threads and asyncio tasks. +_current_model_name: ContextVar[str] = ContextVar( + "_uipath_governance_current_model_name", default="" +) + + +def get_current_model_name() -> str: + """Get the model name captured from the runtime context.""" + return _current_model_name.get() + + +# Content keys we prioritize when walking dict-shaped agent payloads. +# Covers chat-message shapes (``{"content": "..."}``, ``{"text": "..."}``), +# the plural ``messages`` list (LangGraph state), and the OpenAI +# function-call shape (``{"arguments": ""}``). Any remaining keys +# are walked after these so the actual reply / latest user message leads +# the extracted blob. +_GOVERNANCE_CONTENT_KEYS: tuple[str, ...] = ( + "content", + "text", + "output", + "answer", + "messages", # plural — chat history list in LangGraph-style state + "message", + "result", + "arguments", + "thinking", +) + +# Total cap on the extracted governance-text blob. Sized for multi-turn +# chat where each turn can produce a couple KB and the latest content +# must fit even when the conversation history is long. Bounded so a +# runaway nested payload can't blow memory or dominate regex scan time. +_GOVERNANCE_TEXT_CAP = 64000 + +# Depth cap for the recursive walk. Anything beyond this is almost +# certainly framework plumbing, not user-facing content. +_GOVERNANCE_TEXT_MAX_DEPTH = 10 + + +def _extract_governable_text( + value: Any, + *, + budget: int = _GOVERNANCE_TEXT_CAP, + seen: set[int] | None = None, + depth: int = 0, + latest_only: bool = False, +) -> str: + """Pull governance-relevant text out of an arbitrary runtime payload. + + Replaces the prior ``str(value)[:2000]`` shortcut, which produced + ``"{'content': '...'}"``-style garbled prefixes for structured + outputs and ate into the budget with dict-syntax noise. Walks dicts + (prioritising :data:`_GOVERNANCE_CONTENT_KEYS`), lists, pydantic + models, and plain objects up to :data:`_GOVERNANCE_TEXT_MAX_DEPTH`, + joining text fragments with newlines. Non-text scalars and unknown + block types contribute nothing. Cycles and over-deep nesting are + skipped silently. + + **List ordering:** lists are walked in reverse, so the most recent + entry (latest chat-history message, latest assistant content block) + gets first claim on the budget. Long conversation histories no + longer crowd the latest user message out of the scanned blob. + + **latest_only:** when True (used by BEFORE_AGENT in conversational + agents), only the last item of any list is extracted — chat history + is reduced to the most recent message. The flag resets to False + when recursing into the chosen item so multi-block content within + that message is still fully extracted. + """ + if value is None or budget <= 0 or depth > _GOVERNANCE_TEXT_MAX_DEPTH: + return "" + if isinstance(value, str): + return value[:budget] + if isinstance(value, (bool, int, float)): + # Numeric / boolean scalars aren't governance text — skip them + # so dict walks don't pad the blob with ints / flags. + return "" + + # Pydantic / dataclass-like shapes are easier to walk via their + # dict form than via attribute introspection. If the first dumper + # raises (e.g. ``model_dump`` blows up on a partial pydantic v1 + # model), fall through to the next one rather than abandoning the + # whole pydantic/dataclass path. + for dumper in ("model_dump", "dict"): + fn = getattr(value, dumper, None) + if callable(fn): + try: + return _extract_governable_text( + fn(), + budget=budget, + seen=seen, + depth=depth + 1, + latest_only=latest_only, + ) + except Exception: # noqa: BLE001 - try the next dumper + continue + + obj_id = id(value) + if seen is None: + seen = set() + if obj_id in seen: + return "" + if isinstance(value, dict): + seen.add(obj_id) + # Walk recognized content keys first so the actual reply leads + # the extracted blob; any remaining keys follow. + keys: list[Any] = [k for k in _GOVERNANCE_CONTENT_KEYS if k in value] + keys.extend(k for k in value if k not in _GOVERNANCE_CONTENT_KEYS) + parts: list[str] = [] + remaining = budget + for key in keys: + if remaining <= 0: + break + piece = _extract_governable_text( + value[key], + budget=remaining, + seen=seen, + depth=depth + 1, + latest_only=latest_only, + ) + if piece: + parts.append(piece) + remaining -= len(piece) + 1 # +1 accounts for the newline + return "\n".join(parts) + if isinstance(value, (list, tuple)): + seen.add(obj_id) + # Reverse so the latest entry (new user message in conversation + # history, latest assistant content block) gets the budget first. + # When the caller asked for ``latest_only`` (BEFORE_AGENT in a + # conversational agent), stop after the first reversed item — + # i.e., evaluate only the latest input, not the whole history. + items = list(reversed(value)) + if latest_only: + items = items[:1] + parts = [] + remaining = budget + for item in items: + if remaining <= 0: + break + # Reset latest_only when recursing into the chosen item so + # multi-block content inside the latest message is walked + # in full (text + tool_use + thinking blocks all extracted). + piece = _extract_governable_text( + item, + budget=remaining, + seen=seen, + depth=depth + 1, + latest_only=False, + ) + if piece: + parts.append(piece) + remaining -= len(piece) + 1 + return "\n".join(parts) + + # Last-resort: walk public attributes on opaque objects (e.g. a + # framework-specific result class without model_dump/dict). + public = { + name: getattr(value, name) + for name in dir(value) + if not name.startswith("_") and not callable(getattr(value, name, None)) + } + if public: + return _extract_governable_text( + public, + budget=budget, + seen=seen, + depth=depth + 1, + latest_only=latest_only, + ) + return "" + + +class GovernanceRuntime: + """Runtime wrapper that adds governance evaluation at the runtime level. + + Automatically detects and wraps agents using the AdapterRegistry: + - LangChain / LangGraph + - Microsoft AutoGen + - CrewAI + - LlamaIndex + - OpenAI Agents SDK + - PydanticAI + - Microsoft Semantic Kernel + - HuggingFace smolagents + + Boundary hooks (BEFORE_AGENT, AFTER_AGENT) are handled at the runtime level. + Inner hooks (BEFORE_MODEL, TOOL_CALL, etc.) are handled by framework adapters + which are automatically attached to the agent when the runtime is created. + """ + + # Attributes to search for agent extraction (in priority order) + _AGENT_ATTRS = [ + "_agent", + "agent", + "_runnable", + "runnable", + "_graph", + "graph", + "_chain", + "chain", + "_crew", + "crew", + ] + + def __init__( + self, + delegate: UiPathRuntimeProtocol, + context: Any, + runtime_id: str, + ) -> None: + """Wrap ``delegate`` and prime per-run governance state. + + Captures the runtime/trace identifiers, resolves the agent name and + model from ``context``, and kicks off the background policy fetch. + """ + self._delegate = delegate + self._context = context + self._runtime_id = runtime_id + self._trace_id = str(uuid4()) + self._governed_agent: Any = None + self._original_agent: Any = None + self._agent_attr_name: str | None = None + self._agent_holder: Any = None # The object holding the agent attribute + self._init_failed = False # Track if initialization failed + self._model_name = "" + self._model_name_token: Token[str] | None = None + self._evaluator: GovernanceEvaluator | None = None + self._evaluator_ready: bool = False + self._evaluator_lock = threading.Lock() + self._adapter_registry: Any = None + + # Agent name is needed even in the FF-off path so dispose / log + # messages have something to print. Cheap and exception-free. + self._agent_name = "agent" + if context is not None and hasattr(context, "entrypoint"): + self._agent_name = context.entrypoint or "agent" + + # Extract model name and bind into the ContextVar so adapters + # running in this runtime's context see the right value under + # concurrent agents. We keep the token so dispose() can reset + # the var — without that, the value leaks into sibling tasks + # that inherit this context and outlives the runtime. + model_name = self._extract_model_name(delegate, context) + self._model_name_token: Token[str] | None = _current_model_name.set(model_name) + self._model_name = model_name + + # Determine whether this is a conversational or autonomous agent and + # record it before the policy prefetch fires, so the fetch can ask the + # server for the matching container key (conversational vs autonomous). + self._is_conversational = self._extract_is_conversational(delegate, context) + set_agent_conversational(self._is_conversational) + + # Fire the network-bound policy fetch in the background so it + # overlaps with the rest of the agent setup. The evaluator and + # adapter wrap are materialised lazily on the first hook fire + # (see _ensure_evaluator), which is where we wait for the + # prefetch to land. + self._evaluator: GovernanceEvaluator | None = None + self._evaluator_ready: bool = False + self._evaluator_lock = threading.Lock() + + # Governance feature flag gate (mirrors the runtime-side gate). + # When OFF, we short-circuit init: no prefetch, no adapter + # setup, no agent-start notification. All hook checks see + # _init_failed=True and no-op. + if not is_governance_enabled(): + self._init_failed = True + self._evaluator_ready = True # don't try to materialise later + logger.info( + "GovernanceRuntime initialized as no-op: governance feature " + "flag is OFF (agent='%s', runtime_id='%s')", + self._agent_name, + runtime_id, + ) + return + + try: + # Bind the model-name ContextVar so adapters running in this + # runtime's context see the right value under concurrent + # agents. The token is stashed so dispose() can reset the + # var — without that, the value leaks into sibling tasks + # that inherit this context and outlive the runtime. + model_name = self._extract_model_name(delegate, context) + self._model_name = model_name + self._model_name_token = _current_model_name.set(model_name) + + # Record agent-type before the policy prefetch fires so the + # fetch can ask the server for the matching container key + # (conversational vs autonomous). + set_agent_conversational( + self._extract_is_conversational(delegate, context) + ) + + # Fire the network-bound policy fetch in the background so + # it overlaps with the rest of the agent setup. The + # evaluator and adapter wrap are materialised lazily on the + # first hook fire (see _ensure_evaluator), which is where + # we wait for the prefetch to land. + prefetch_policy_index() + self._adapter_registry = get_adapter_registry() + + logger.info( + "GovernanceRuntime initialized (prefetching policy): agent='%s', " + "runtime_id='%s', model='%s', mode=%s, adapters=%d", + self._agent_name, + runtime_id, + model_name or "unknown", + get_enforcement_mode().value, + len(self._adapter_registry.get_all()), + ) + except Exception as e: + # Fail-safe: log error but don't break runtime initialization + self._init_failed = True + self._evaluator = None + self._adapter_registry = None + logger.warning( + "GovernanceRuntime initialization failed (continuing without governance): %s", + e, + ) + + def _extract_model_name( + self, + delegate: UiPathRuntimeProtocol, + context: Any, + ) -> str: + """Extract model name from delegate or context.""" + model_name = "" + + # Try _agent_definition.settings.model (LicensedRuntime pattern) + agent_def = getattr(delegate, "_agent_definition", None) + if agent_def: + settings = getattr(agent_def, "settings", None) + if settings: + model_name = getattr(settings, "model", None) or "" + + # Try direct attributes on delegate + if not model_name: + for attr in ["model", "model_name", "_model", "model_id"]: + val = getattr(delegate, attr, None) + if val: + model_name = str(val) + break + + # Try nested delegate chain (unwrap wrappers) + if not model_name: + inner = getattr(delegate, "_delegate", None) or getattr( + delegate, "delegate", None + ) + while inner and not model_name: + agent_def = getattr(inner, "_agent_definition", None) + if agent_def: + settings = getattr(agent_def, "settings", None) + if settings: + model_name = getattr(settings, "model", None) or "" + break + inner = getattr(inner, "_delegate", None) or getattr( + inner, "delegate", None + ) + + # Try context + if not model_name and context is not None: + for attr in ["model", "model_name", "model_id"]: + val = getattr(context, attr, None) + if val: + model_name = str(val) + break + + return model_name + + def _extract_is_conversational( + self, + delegate: UiPathRuntimeProtocol, + context: Any, + ) -> bool: + """Determine whether the hosted agent is conversational. + + Reads ``AgentDefinition.is_conversational`` off the delegate's + ``_agent_definition`` (the LicensedRuntime pattern, same source as + :meth:`_extract_model_name`), unwrapping nested delegates. Falls back + to the runtime context's conversation id when no agent definition is + reachable. Defaults to ``False`` (autonomous) — fail-safe: an + unknown agent is treated as autonomous, matching the server default. + """ + + def _from_agent_def(obj: Any) -> bool | None: + agent_def = getattr(obj, "_agent_definition", None) + if agent_def is None: + return None + value = getattr(agent_def, "is_conversational", None) + return bool(value) if value is not None else None + + # Delegate, then the unwrapped delegate chain. Depth cap mirrors + # :meth:`_extract_agent` — a pathological wrapper chain shouldn't + # turn this synchronous init into a loop. + node: Any = delegate + for _ in range(10): + if node is None: + break + result = _from_agent_def(node) + if result is not None: + return result + node = getattr(node, "_delegate", None) or getattr(node, "delegate", None) + + # Fallback: a populated conversation id implies a conversational run. + if context is not None: + conversation_id = getattr(context, "conversation_id", None) + if conversation_id: + return True + + return False + + def _wrap_delegate_agent(self) -> None: + """Extract agent from delegate and wrap with governance via adapters. + + This method: + 1. Searches delegate for known agent attributes + 2. Uses AdapterRegistry to find matching adapter + 3. Wraps agent with governance hooks + 4. Replaces original agent in delegate with governed version + + IMPORTANT: This method is fail-safe. Any error during adapter wrapping + will continue without inner hooks. It will NEVER break the execution flow. + BEFORE_AGENT/AFTER_AGENT checks at the runtime boundary still provide governance. + """ + try: + # Extract agent from delegate + agent, attr_name = self._extract_agent(self._delegate) + + if agent is None: + logger.debug( + "No agent found in delegate - continuing without inner hooks" + ) + return + + if self._adapter_registry is None: + # Defensive: should never happen because _ensure_evaluator + # is gated on _init_failed which is set with adapter_registry=None. + return + + # Find matching adapter + adapter = self._adapter_registry.resolve(agent) + if adapter is None: + logger.debug( + "No adapter found for agent type '%s' - continuing without inner hooks", + type(agent).__name__, + ) + return + + if self._evaluator is None: + # _wrap_delegate_agent is now called from _ensure_evaluator + # after the evaluator is materialised; this guard exists + # only for defensive symmetry with wrap_agent(). + logger.debug("Skipping adapter attach: evaluator not yet materialised") + return + + # Wrap agent with governance + governed = adapter.attach( + agent=agent, + agent_id=self._agent_name, + session_id=self._runtime_id, + evaluator=self._evaluator, + ) + + install_delegation_guard(agent) + + # Store references + self._original_agent = agent + self._governed_agent = governed + self._agent_attr_name = attr_name + + # Replace agent in delegate with governed version + if attr_name is not None: + self._replace_agent_in_delegate(governed, attr_name) + + logger.info( + "Agent wrapped with governance: type=%s, adapter=%s, attr=%s", + type(agent).__name__, + adapter.name, + attr_name, + ) + + except Exception as e: + # Catch-all: ensure we never break execution + logger.warning( + "Agent wrapping failed: %s - continuing without inner hooks", + e, + ) + + def _extract_agent(self, delegate: Any) -> tuple[Any, str | None]: + """Extract agent from delegate runtime. + + Searches known attribute names in priority order. + This method is fail-safe and will return (None, None) on any error. + + Returns: + Tuple of (agent, attribute_name) or (None, None) if not found + """ + try: + # First check direct attributes on delegate + for attr in self._AGENT_ATTRS: + try: + agent = getattr(delegate, attr, None) + if agent is not None: + logger.debug("Found agent at delegate.%s", attr) + return agent, attr + except Exception: + continue + + # Check nested delegate chain (unwrap wrapper layers) + inner = getattr(delegate, "_delegate", None) or getattr( + delegate, "delegate", None + ) + depth = 0 + while inner is not None and depth < 10: # Prevent infinite loops + for attr in self._AGENT_ATTRS: + try: + agent = getattr(inner, attr, None) + if agent is not None: + logger.debug( + "Found agent at nested delegate.%s (depth=%d)", + attr, + depth, + ) + # Return the inner delegate that holds the agent + self._agent_holder = inner + return agent, attr + except Exception: + continue + inner = getattr(inner, "_delegate", None) or getattr( + inner, "delegate", None + ) + depth += 1 + + except Exception as e: + logger.debug("Agent extraction failed: %s", e) + + return None, None + + def _replace_agent_in_delegate(self, governed: Any, attr_name: str) -> bool: + """Replace original agent in delegate with governed version. + + This method is fail-safe and will not raise exceptions. + + Args: + governed: The governed agent proxy + attr_name: Attribute name where agent was found + + Returns: + True if replacement succeeded, False otherwise + """ + # If we found agent in a nested delegate, use that + holder = getattr(self, "_agent_holder", None) or self._delegate + + try: + setattr(holder, attr_name, governed) + logger.debug("Replaced agent at %s.%s", type(holder).__name__, attr_name) + return True + except AttributeError: + # Some objects have read-only attributes + logger.debug( + "Cannot replace agent at %s.%s (read-only) - adapter hooks still active", + type(holder).__name__, + attr_name, + ) + return False + except Exception as e: + logger.debug( + "Failed to replace agent at %s.%s: %s - adapter hooks still active", + type(holder).__name__, + attr_name, + e, + ) + return False + + def wrap_agent(self, agent: Any, agent_id: str | None = None) -> Any: + """Wrap an agent with governance using the appropriate adapter. + + This method detects the agent framework and applies the correct adapter. + + Args: + agent: The agent to wrap + agent_id: Optional agent identifier (defaults to type name) + + Returns: + Governed agent proxy + """ + agent_id = agent_id or type(agent).__name__ + session_id = self._runtime_id + + if self._adapter_registry is None: + logger.warning( + "wrap_agent called but adapter registry not initialised " + "(governance likely disabled by feature flag); returning agent unwrapped" + ) + return agent + + # Find matching adapter + adapter = self._adapter_registry.resolve(agent) + if adapter is None: + logger.warning("No adapter found for agent type: %s", type(agent).__name__) + return agent + + if self._evaluator is None: + logger.warning( + "wrap_agent called before evaluator materialised; " + "ensuring evaluator is ready before attaching adapter" + ) + self._ensure_evaluator() + if self._evaluator is None: + logger.warning( + "Evaluator failed to materialise; returning agent unwrapped" + ) + return agent + + # Attach governance via adapter + governed = adapter.attach( + agent=agent, + agent_id=agent_id, + session_id=session_id, + evaluator=self._evaluator, + ) + + logger.info( + "Agent wrapped with governance: agent=%s, adapter=%s", + agent_id, + adapter.name, + ) + + return governed + + def _ensure_evaluator(self) -> None: + """Materialise the evaluator and attach adapters on first hook fire. + + Idempotent — subsequent calls are no-ops. The first call blocks + on the policy prefetch that ``__init__`` kicked off; the user + request was explicit that the wait happens here, not at init. + """ + if self._evaluator_ready or self._init_failed: + return + with self._evaluator_lock: + if self._evaluator_ready or self._init_failed: + return + try: + policy_index = get_policy_index() + self._evaluator = GovernanceEvaluator(policy_index) + self._wrap_delegate_agent() + logger.info( + "GovernanceRuntime ready: agent='%s', packs=%s, rules=%d, wrapped=%s", + self._agent_name, + policy_index.pack_names, + policy_index.total_rules, + self._governed_agent is not None, + ) + except Exception as exc: + self._init_failed = True + self._evaluator = None + logger.warning( + "Lazy evaluator materialisation failed; " + "governance disabled for this runtime: %s", + exc, + ) + finally: + self._evaluator_ready = True + + def _check_before_agent(self, input: Dict[str, Any] | None) -> None: + """Evaluate BEFORE_AGENT rules at runtime boundary. + + The evaluator owns audit emission and DENY-raising; this method + just primes the evaluator and propagates blocks. + + Fail-safe: Only GovernanceBlockException propagates. All other + errors are logged and execution continues. + """ + try: + self._ensure_evaluator() + if self._init_failed or self._evaluator is None: + return + + # In conversational agents the runtime ``input`` carries the + # full chat history (e.g. ``{"messages": [...]}``). Pass + # ``latest_only=True`` so governance evaluates the most + # recent user message and not the entire transcript. + agent_input = _extract_governable_text(input, latest_only=True) + + self._evaluator.evaluate_before_agent( + agent_input=agent_input, + agent_name=self._agent_name, + runtime_id=self._runtime_id, + trace_id=self._trace_id, + model_name=self._model_name, + ) + + except GovernanceBlockException: + raise # Allow intentional blocks to propagate + except Exception as e: + # Fail-safe: log and continue without blocking + logger.warning("BEFORE_AGENT governance check failed (continuing): %s", e) + + def _check_after_agent(self, output: Any) -> None: + """Evaluate AFTER_AGENT rules at runtime boundary. + + The evaluator owns audit emission and DENY-raising; this method + just primes the evaluator and propagates blocks. + + Fail-safe: Only GovernanceBlockException propagates. All other + errors are logged and execution continues. + """ + try: + self._ensure_evaluator() + if self._init_failed or self._evaluator is None: + return + + # Pull the agent's textual output for governance. UiPathRuntimeResult + # wraps the actual payload under ``.output``; everything else (raw + # dicts, strings, framework result objects) goes through the + # extractor directly. The extractor handles list-of-blocks, + # dict-content, pydantic, etc. — no more dict-repr garble. + payload: Any = getattr(output, "output", output) + agent_output = _extract_governable_text(payload) + + self._evaluator.evaluate_after_agent( + agent_output=agent_output, + agent_name=self._agent_name, + runtime_id=self._runtime_id, + trace_id=self._trace_id, + ) + + except GovernanceBlockException: + raise # Allow intentional blocks to propagate + except Exception as e: + # Fail-safe: log and continue without blocking + logger.warning("AFTER_AGENT governance check failed (continuing): %s", e) + + @property + def delegate(self) -> UiPathRuntimeProtocol: + """Return the wrapped runtime this proxy delegates to.""" + return self._delegate + + @property + def evaluator(self) -> GovernanceEvaluator | None: + """Get the governance evaluator (None until first hook fires).""" + return self._evaluator + + async def execute( + self, + input: Dict[str, Any] | None = None, + options: Optional[Any] = None, + ) -> Any: + """Execute with governance checks at runtime boundary.""" + # BEFORE_AGENT: Check input before any agent execution + self._check_before_agent(input) + + # Delegate to actual runtime + result = await self._delegate.execute(input, options) + + # AFTER_AGENT: Check output before returning + self._check_after_agent(result) + + return result + + async def stream( + self, + input: Dict[str, Any] | None = None, + options: Optional[Any] = None, + ) -> AsyncGenerator[Any, None]: + """Stream with governance checks at runtime boundary.""" + # BEFORE_AGENT: Check input before any agent execution + self._check_before_agent(input) + + # Delegate to actual runtime and collect final result. The + # terminal stream event is detected structurally via the + # Same-package class import — no cross-package shim needed. + # importing the concrete UiPathRuntimeResult class from + # uipath-runtime (which would create a core→runtime cycle). + final_result = None + async for event in self._delegate.stream(input, options): + if isinstance(event, UiPathRuntimeResult): + final_result = event + yield event + + # AFTER_AGENT: Check output after stream completes + if final_result is not None: + self._check_after_agent(final_result) + + async def get_schema(self) -> Any: + """Delegate to the wrapped runtime; governance has no schema layer.""" + return await self._delegate.get_schema() + + async def dispose(self) -> None: + """Dispose the runtime and restore original agent if wrapped. + + Each governance-side cleanup step is isolated so a single + failure can't strand later steps. ``self._delegate.dispose()`` + always runs last and is the only step whose exception is allowed + to propagate — that's the caller's contract with the wrapped + runtime. + """ + # Step 1: restore the original agent attribute. + if self._original_agent is not None and self._agent_attr_name is not None: + try: + holder = self._agent_holder or self._delegate + setattr(holder, self._agent_attr_name, self._original_agent) + logger.debug("Restored original agent on dispose") + except Exception as e: + logger.warning("Failed to restore original agent: %s", e) + + # Step 2: uninstall the delegation guard (its own try/except so + # a failed restore in step 1 doesn't skip the guard removal). + if self._original_agent is not None: + try: + uninstall_delegation_guard(self._original_agent) + except Exception as e: + logger.warning("Failed to uninstall delegation guard: %s", e) + + # Step 3: reset the model-name ContextVar so the value doesn't + # leak into sibling tasks that inherited this context. + if self._model_name_token is not None: + try: + _current_model_name.reset(self._model_name_token) + except ValueError: + # Token created in a different context — happens when + # dispose runs from a child task. Accept the leak rather + # than calling set(""), which would create yet another + # ContextVar level on top of the original. + logger.debug("Model-name ContextVar reset from foreign context") + except Exception as e: + logger.warning("Failed to reset model-name context: %s", e) + finally: + self._model_name_token = None + + # Step 4: delegate dispose — always last, exception propagates. + # The caller controls the contract with the wrapped runtime; we + # don't swallow its cleanup failures. + await self._delegate.dispose() + + def __getattr__(self, name: str) -> Any: + """Forward non-protocol attribute access to the wrapped runtime.""" + # Guard against recursion when __init__ raises before _delegate is + # bound, or against probes for our own private attributes that + # haven't been set yet. + if name.startswith("_"): + raise AttributeError(name) + try: + delegate = object.__getattribute__(self, "_delegate") + except AttributeError as exc: + raise AttributeError(name) from exc + return getattr(delegate, name) + + +def governance_wrapper( + runtime: UiPathRuntimeProtocol, + context: Any, + runtime_id: str, +) -> UiPathRuntimeProtocol: + """Wrapper function for UiPathRuntimeWrapperRegistry. + + Creates a GovernanceRuntime that wraps the given runtime with + compliance evaluation at each lifecycle hook. + + Args: + runtime: The runtime to wrap + context: Runtime context from the SDK (only ``entrypoint`` is read) + runtime_id: Unique identifier for this runtime instance + """ + if not is_governance_enabled(): + logger.debug( + "governance_wrapper: %s feature flag is OFF; returning unwrapped runtime", + "EnablePythonGovernanceChecker", + ) + return runtime + mode = get_enforcement_mode() + if mode == EnforcementMode.DISABLED: + logger.debug("Governance disabled - returning unwrapped runtime") + return runtime + return GovernanceRuntime(runtime, context, runtime_id) + + +def wrap_agent(agent: Any, agent_id: str | None = None) -> Any: + """Convenience function to wrap an agent with governance. + + Uses the AdapterRegistry to detect the framework and apply appropriate hooks. + + Args: + agent: The agent to wrap + agent_id: Optional agent identifier + + Returns: + Governed agent proxy + + Example: + from uipath.core.governance import wrap_agent + + governed_agent = wrap_agent(my_langchain_agent, "my-agent") + result = governed_agent.invoke({"input": "Hello"}) + """ + if not is_governance_enabled(): + logger.debug( + "wrap_agent: %s feature flag is OFF; returning unwrapped agent", + "EnablePythonGovernanceChecker", + ) + return agent + + mode = get_enforcement_mode() + if mode == EnforcementMode.DISABLED: + logger.debug("Governance disabled - returning unwrapped agent") + return agent + + agent_id = agent_id or type(agent).__name__ + session_id = str(uuid4()) + + # Get evaluator + policy_index = get_policy_index() + evaluator = GovernanceEvaluator(policy_index) + + # Find matching adapter + registry = get_adapter_registry() + adapter = registry.resolve(agent) + + if adapter is None: + logger.warning("No adapter found for agent type: %s", type(agent).__name__) + return agent + + # Attach governance + governed = adapter.attach( + agent=agent, + agent_id=agent_id, + session_id=session_id, + evaluator=evaluator, + ) + + logger.info( + "Agent wrapped: agent=%s, adapter=%s, packs=%s", + agent_id, + adapter.name, + policy_index.pack_names, + ) + + return governed diff --git a/src/uipath/runtime/registry.py b/src/uipath/runtime/registry.py index 032aee3..ffa7041 100644 --- a/src/uipath/runtime/registry.py +++ b/src/uipath/runtime/registry.py @@ -3,14 +3,90 @@ from pathlib import Path from typing import Callable, TypeAlias +from uipath.runtime.base import UiPathRuntimeProtocol from uipath.runtime.context import UiPathRuntimeContext -from uipath.runtime.factory import UiPathRuntimeFactoryProtocol +from uipath.runtime.factory import ( + UiPathRuntimeFactoryProtocol, + UiPathRuntimeFactorySettings, +) +from uipath.runtime.storage import UiPathRuntimeStorageProtocol +from uipath.runtime.wrapper import apply_governance_wrapper FactoryCallable: TypeAlias = Callable[ [UiPathRuntimeContext | None], UiPathRuntimeFactoryProtocol ] +class UiPathWrappedRuntimeFactory(UiPathRuntimeFactoryProtocol): + """Factory that delegates creation and applies governance to every runtime. + + Implements ``UiPathRuntimeFactoryProtocol`` so callers using the + protocol surface are unaffected. Non-protocol attribute access falls + through to the underlying factory via ``__getattr__``; the underlying + factory is also reachable directly via the :attr:`inner` property — + useful for callers that need the concrete registered type (e.g. + ``isinstance`` checks) and for tests. + """ + + def __init__( + self, + delegate: UiPathRuntimeFactoryProtocol, + context: UiPathRuntimeContext | None = None, + ) -> None: + """Initialize with the underlying factory and the runtime context.""" + self._delegate = delegate + self._context = context + + @property + def inner(self) -> UiPathRuntimeFactoryProtocol: + """Return the underlying registered factory. + + Use this when a caller needs the concrete factory type + (``isinstance`` checks, access to non-protocol public API). + Prefer passing ``apply_wrappers=False`` to + :meth:`UiPathRuntimeFactoryRegistry.get` when you want the + registry to return the concrete factory directly. + """ + return self._delegate + + def discover_entrypoints(self) -> list[str]: + """Delegate to the underlying factory.""" + return self._delegate.discover_entrypoints() + + async def new_runtime( + self, entrypoint: str, runtime_id: str, **kwargs + ) -> UiPathRuntimeProtocol: + """Create a runtime via the delegate and apply governance.""" + runtime = await self._delegate.new_runtime(entrypoint, runtime_id, **kwargs) + return await apply_governance_wrapper(runtime, self._context, runtime_id) + + async def get_storage(self) -> UiPathRuntimeStorageProtocol | None: + """Delegate to the underlying factory.""" + return await self._delegate.get_storage() + + async def get_settings(self) -> UiPathRuntimeFactorySettings | None: + """Delegate to the underlying factory.""" + return await self._delegate.get_settings() + + async def dispose(self) -> None: + """Delegate to the underlying factory.""" + return await self._delegate.dispose() + + def __getattr__(self, name: str): + """Forward attribute lookups to the delegate. + + Only invoked when normal attribute resolution fails on this + wrapper. Lets callers reach any non-protocol public API on the + registered concrete factory without knowing they hold a wrapper. + ``isinstance`` checks still see this class, not the delegate — + use :attr:`inner` or ``apply_wrappers=False`` for those. + """ + # Guard against recursion during construction (before _delegate is set). + if name == "_delegate": + raise AttributeError(name) + return getattr(self._delegate, name) + + class UiPathRuntimeFactoryRegistry: """Registry for UiPath runtime factories.""" @@ -41,6 +117,7 @@ def get( name: str | None = None, search_path: str = ".", context: UiPathRuntimeContext | None = None, + apply_wrappers: bool = True, ) -> UiPathRuntimeFactoryProtocol: """Get factory instance by name or auto-detect from config files. @@ -48,28 +125,52 @@ def get( name: Optional factory name search_path: Path to search for config files context: UiPathRuntimeContext to pass to factory + apply_wrappers: When True (default), the registered factory + is wrapped in :class:`UiPathWrappedRuntimeFactory` so + every runtime it produces passes through + :func:`apply_governance_wrapper`. Set False to obtain the + concrete registered factory unchanged — required for + callers that ``isinstance``-check the result or rely on + non-protocol public API. The wrapper exposes the + underlying factory via its :attr:`inner` property and + forwards unknown attribute access to it, but its type is + still :class:`UiPathWrappedRuntimeFactory`. Returns: - Factory instance + A :class:`UiPathRuntimeFactoryProtocol`; concretely a + :class:`UiPathWrappedRuntimeFactory` when + ``apply_wrappers=True``, otherwise the registered factory. """ + factory: UiPathRuntimeFactoryProtocol | None = None + if name: if name not in cls._factories: raise ValueError(f"Factory '{name}' not registered") factory_callable, _ = cls._factories[name] - return factory_callable(context) - - # Auto-detect based on config files in reverse registration order - search_dir = Path(search_path) - for factory_name in reversed(cls._registration_order): - factory_callable, config_file = cls._factories[factory_name] - if (search_dir / config_file).exists(): - return factory_callable(context) - - # Fallback to default - if cls._default_name is None: - raise ValueError("No default factory registered and no config file found") - factory_callable, _ = cls._factories[cls._default_name] - return factory_callable(context) + factory = factory_callable(context) + else: + # Auto-detect based on config files in reverse registration order + search_dir = Path(search_path) + for factory_name in reversed(cls._registration_order): + factory_callable, config_file = cls._factories[factory_name] + if (search_dir / config_file).exists(): + factory = factory_callable(context) + break + + # Fallback to default + if factory is None: + if cls._default_name is None: + raise ValueError( + "No default factory registered and no config file found" + ) + factory_callable, _ = cls._factories[cls._default_name] + factory = factory_callable(context) + + # Wrap factory to auto-apply runtime wrappers + if apply_wrappers: + factory = UiPathWrappedRuntimeFactory(factory, context) + + return factory @classmethod def set_default(cls, name: str) -> None: diff --git a/src/uipath/runtime/wrapper.py b/src/uipath/runtime/wrapper.py new file mode 100644 index 0000000..046673c --- /dev/null +++ b/src/uipath/runtime/wrapper.py @@ -0,0 +1,54 @@ +"""Governance integration for the runtime. + +Wraps a runtime with governance when the ``EnablePythonGovernanceChecker`` +feature flag is enabled. ``uipath.runtime.governance.wrapper`` is +imported lazily — only when the gate passes — so its transitive cost +(audit, evaluator, OTel, …) stays off the startup path when governance +is disabled. + +The feature flag name and gate function are re-exported from +``uipath.core.governance.config`` so there is a single source of truth. +""" + +from __future__ import annotations + +import logging + +from uipath.core.governance.config import ( + GOVERNANCE_FEATURE_FLAG, + is_governance_enabled, +) + +from uipath.runtime.base import UiPathRuntimeProtocol +from uipath.runtime.context import UiPathRuntimeContext + +logger = logging.getLogger(__name__) + +__all__ = ["GOVERNANCE_FEATURE_FLAG", "apply_governance_wrapper"] + + +async def apply_governance_wrapper( + runtime: "UiPathRuntimeProtocol", + context: "UiPathRuntimeContext | None", + runtime_id: str, +) -> "UiPathRuntimeProtocol": + """Wrap a runtime with governance when the feature flag is enabled. + + Returns the inner runtime unchanged when the flag is off or when + the wrapper itself raises — governance failures must never break + the agent run. + """ + if not is_governance_enabled(): + logger.debug( + "Skipping governance wrapper: %s feature flag is not enabled", + GOVERNANCE_FEATURE_FLAG, + ) + return runtime + + from uipath.runtime.governance.wrapper import governance_wrapper + + try: + return governance_wrapper(runtime, context, runtime_id) + except Exception as exc: + logger.warning("Failed to apply governance wrapper: %s", exc) + return runtime diff --git a/tests/test_dispose_isolation.py b/tests/test_dispose_isolation.py new file mode 100644 index 0000000..c0b87ad --- /dev/null +++ b/tests/test_dispose_isolation.py @@ -0,0 +1,146 @@ +"""Tests for step-isolated ``GovernanceRuntime.dispose()`` cleanup. + +A single failing step in dispose() must not strand the remaining steps. +``self._delegate.dispose()`` always runs last and is the only step whose +exception propagates. +""" + +from __future__ import annotations + +import asyncio + +import pytest +from uipath.core.feature_flags import FeatureFlags + +from uipath.runtime.governance import wrapper as wrapper_mod +from uipath.runtime.governance.wrapper import GovernanceRuntime, _current_model_name +from uipath.runtime.wrapper import GOVERNANCE_FEATURE_FLAG + + +@pytest.fixture(autouse=True) +def _enable_governance(): + """These tests exercise the post-FF-gate dispose path.""" + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: True}) + yield + FeatureFlags.reset_flags() + + +class _Holder: + """Mutable attribute holder whose setattr can be configured to raise.""" + + def __init__(self) -> None: + self._raise_on_setattr = False + self._setattr_count = 0 + # Pre-create the attr so the test isn't tripped by the agent + # restore path being the "first" set. + object.__setattr__(self, "agent", None) + + def __setattr__(self, name: str, value) -> None: # type: ignore[no-untyped-def] + if name in {"_raise_on_setattr", "_setattr_count"}: + object.__setattr__(self, name, value) + return + object.__setattr__(self, "_setattr_count", self._setattr_count + 1) + if self._raise_on_setattr: + raise RuntimeError("restore failed") + object.__setattr__(self, name, value) + + +class _Delegate: + """Minimal delegate with an instrumented dispose().""" + + def __init__(self, *, raises: bool = False) -> None: + self._dispose_count = 0 + self._raises = raises + + async def dispose(self) -> None: + self._dispose_count += 1 + if self._raises: + raise RuntimeError("delegate dispose failed") + + +def _make_runtime( + *, + restore_raises: bool = False, + uninstall_raises: bool = False, + delegate_dispose_raises: bool = False, + monkeypatch: pytest.MonkeyPatch | None = None, +) -> tuple[GovernanceRuntime, _Holder, _Delegate, dict[str, int]]: + """Build a runtime whose three dispose steps each have a tunable failure.""" + counters = {"uninstall": 0} + delegate = _Delegate(raises=delegate_dispose_raises) + holder = _Holder() + holder._raise_on_setattr = restore_raises + + runtime = GovernanceRuntime(delegate=delegate, context=None, runtime_id="rid-1") + + # Inject the original-agent scaffolding so dispose tries to restore. + runtime._original_agent = object() + runtime._agent_attr_name = "agent" + runtime._agent_holder = holder + # Bind a fresh token so dispose has something to reset. + runtime._model_name_token = _current_model_name.set("model-x") + + def _fake_uninstall(_agent) -> None: + counters["uninstall"] += 1 + if uninstall_raises: + raise RuntimeError("uninstall failed") + + assert monkeypatch is not None + monkeypatch.setattr(wrapper_mod, "uninstall_delegation_guard", _fake_uninstall) + + return runtime, holder, delegate, counters + + +def test_dispose_runs_all_steps_when_each_succeeds( + monkeypatch: pytest.MonkeyPatch, +) -> None: + runtime, holder, delegate, counters = _make_runtime(monkeypatch=monkeypatch) + asyncio.run(runtime.dispose()) + assert holder._setattr_count == 1, "agent restore should have run once" + assert counters["uninstall"] == 1 + assert delegate._dispose_count == 1 + assert runtime._model_name_token is None, "token must be cleared after dispose" + + +def test_dispose_continues_when_restore_step_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Restore failure must not skip uninstall, var reset, or delegate dispose.""" + runtime, holder, delegate, counters = _make_runtime( + restore_raises=True, monkeypatch=monkeypatch, + ) + asyncio.run(runtime.dispose()) + assert holder._setattr_count == 1, "restore was attempted" + assert counters["uninstall"] == 1, "uninstall must still run" + assert delegate._dispose_count == 1, "delegate dispose must still run" + assert runtime._model_name_token is None + + +def test_dispose_continues_when_uninstall_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Uninstall failure must not skip var reset or delegate dispose.""" + runtime, holder, delegate, counters = _make_runtime( + uninstall_raises=True, monkeypatch=monkeypatch, + ) + asyncio.run(runtime.dispose()) + assert holder._setattr_count == 1 + assert counters["uninstall"] == 1 + assert delegate._dispose_count == 1 + assert runtime._model_name_token is None + + +def test_dispose_propagates_delegate_failure( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Delegate dispose failure surfaces to the caller — that's the contract.""" + runtime, holder, delegate, counters = _make_runtime( + delegate_dispose_raises=True, monkeypatch=monkeypatch, + ) + with pytest.raises(RuntimeError, match="delegate dispose failed"): + asyncio.run(runtime.dispose()) + # All preceding governance steps must still have run. + assert holder._setattr_count == 1 + assert counters["uninstall"] == 1 + assert delegate._dispose_count == 1 + assert runtime._model_name_token is None diff --git a/tests/test_registry.py b/tests/test_registry.py index 86eda5b..2aa7694 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,10 +1,15 @@ +import sys import tempfile +import types from pathlib import Path from typing import Any, AsyncGenerator, Optional, cast +from unittest.mock import MagicMock import pytest +from uipath.core.feature_flags import FeatureFlags from uipath.runtime import ( + GOVERNANCE_FEATURE_FLAG, UiPathExecuteOptions, UiPathRuntimeContext, UiPathRuntimeEvent, @@ -18,6 +23,7 @@ UiPathRuntimeStorageProtocol, UiPathStreamOptions, ) +from uipath.runtime.registry import UiPathWrappedRuntimeFactory class MockStorage(UiPathRuntimeStorageProtocol): @@ -236,7 +242,7 @@ def create_factory( UiPathRuntimeFactoryRegistry.register("langgraph", create_factory, "langgraph.json") - factory = UiPathRuntimeFactoryRegistry.get(name="langgraph") + factory = UiPathRuntimeFactoryRegistry.get(name="langgraph", apply_wrappers=False) assert isinstance(factory, MockLangGraphFactory) assert factory.name == "langgraph" @@ -252,7 +258,9 @@ def create_factory( UiPathRuntimeFactoryRegistry.register("langgraph", create_factory, "langgraph.json") context = UiPathRuntimeContext.with_defaults(entrypoint="test") - factory = UiPathRuntimeFactoryRegistry.get(name="langgraph", context=context) + factory = UiPathRuntimeFactoryRegistry.get( + name="langgraph", context=context, apply_wrappers=False + ) assert isinstance(factory, MockLangGraphFactory) assert factory.context == context @@ -284,7 +292,9 @@ def create_langgraph( Path(temp_dir, "langgraph.json").touch() - factory = UiPathRuntimeFactoryRegistry.get(search_path=temp_dir) + factory = UiPathRuntimeFactoryRegistry.get( + search_path=temp_dir, apply_wrappers=False + ) assert isinstance(factory, MockLangGraphFactory) @@ -309,7 +319,9 @@ def create_llamaindex( Path(temp_dir, "llamaindex.json").touch() - factory = UiPathRuntimeFactoryRegistry.get(search_path=temp_dir) + factory = UiPathRuntimeFactoryRegistry.get( + search_path=temp_dir, apply_wrappers=False + ) assert isinstance(factory, MockLlamaIndexFactory) @@ -334,7 +346,9 @@ def create_langgraph( Path(temp_dir, "uipath.json").touch() - factory = UiPathRuntimeFactoryRegistry.get(search_path=temp_dir) + factory = UiPathRuntimeFactoryRegistry.get( + search_path=temp_dir, apply_wrappers=False + ) assert isinstance(factory, MockFunctionsFactory) @@ -357,7 +371,9 @@ def create_langgraph( ) UiPathRuntimeFactoryRegistry.set_default("functions") - factory = UiPathRuntimeFactoryRegistry.get(search_path=temp_dir) + factory = UiPathRuntimeFactoryRegistry.get( + search_path=temp_dir, apply_wrappers=False + ) assert isinstance(factory, MockFunctionsFactory) @@ -399,7 +415,9 @@ def create_langgraph( Path(temp_dir, "uipath.json").touch() Path(temp_dir, "langgraph.json").touch() - factory = UiPathRuntimeFactoryRegistry.get(search_path=temp_dir) + factory = UiPathRuntimeFactoryRegistry.get( + search_path=temp_dir, apply_wrappers=False + ) assert isinstance(factory, MockLangGraphFactory) @@ -430,7 +448,7 @@ def create_factory( UiPathRuntimeFactoryRegistry.register("langgraph", create_factory, "langgraph.json") - factory = UiPathRuntimeFactoryRegistry.get(name="langgraph") + factory = UiPathRuntimeFactoryRegistry.get(name="langgraph", apply_wrappers=False) runtime = await factory.new_runtime("agent", "runtime-1") assert isinstance(runtime, MockRuntime) assert runtime.name == "langgraph-agent" @@ -450,3 +468,155 @@ def create_factory( all_factories["malicious"] = "hack.json" assert "malicious" not in UiPathRuntimeFactoryRegistry.get_all() + + +# --------------------------------------------------------------------------- +# Wrapping behaviour (apply_wrappers=True) +# --------------------------------------------------------------------------- + + +@pytest.fixture +def _reset_feature_flags(): + FeatureFlags.reset_flags() + yield + FeatureFlags.reset_flags() + + +@pytest.fixture +def fake_governance_module(monkeypatch): + """Install a stub ``uipath.runtime.governance.wrapper`` for the lazy import. + + ``apply_governance_wrapper`` imports ``governance_wrapper`` only when + the FF is on; this fixture lets us assert it was called without + triggering the real governance runtime (which would try to talk to + the policy backend). + """ + mock_wrapper = MagicMock(name="governance_wrapper") + module = types.ModuleType("uipath.runtime.governance.wrapper") + module.governance_wrapper = mock_wrapper + monkeypatch.setitem(sys.modules, "uipath.runtime.governance.wrapper", module) + return mock_wrapper + + +def test_get_returns_wrapped_factory_by_default(clean_registry): + """``get`` with default args wraps the registered factory.""" + + def create_factory( + context: UiPathRuntimeContext | None = None, + ) -> UiPathRuntimeFactoryProtocol: + return MockLangGraphFactory(context) + + UiPathRuntimeFactoryRegistry.register("langgraph", create_factory, "langgraph.json") + + factory = UiPathRuntimeFactoryRegistry.get(name="langgraph") + + assert isinstance(factory, UiPathWrappedRuntimeFactory) + assert isinstance(factory.inner, MockLangGraphFactory) + + +def test_wrapped_factory_attribute_fallthrough(clean_registry): + """Non-protocol attributes on the delegate are reachable via ``__getattr__``.""" + + def create_factory( + context: UiPathRuntimeContext | None = None, + ) -> UiPathRuntimeFactoryProtocol: + return MockLangGraphFactory(context) + + UiPathRuntimeFactoryRegistry.register("langgraph", create_factory, "langgraph.json") + + factory = UiPathRuntimeFactoryRegistry.get(name="langgraph") + + # `name` is not part of the protocol but is defined on the concrete factory. + assert factory.name == "langgraph" # type: ignore[attr-defined] + + +def test_wrapped_factory_delegates_protocol_methods(clean_registry): + """Protocol methods reach the underlying factory unchanged.""" + + def create_factory( + context: UiPathRuntimeContext | None = None, + ) -> UiPathRuntimeFactoryProtocol: + return MockLangGraphFactory(context) + + UiPathRuntimeFactoryRegistry.register("langgraph", create_factory, "langgraph.json") + + factory = UiPathRuntimeFactoryRegistry.get(name="langgraph") + + assert factory.discover_entrypoints() == ["agent", "workflow"] + + +@pytest.mark.asyncio +async def test_wrapped_factory_returns_inner_runtime_when_flag_off( + clean_registry, _reset_feature_flags, monkeypatch +): + """With the governance FF off, ``new_runtime`` returns the bare runtime.""" + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: False}) + monkeypatch.delenv("UIPATH_FEATURE_" + GOVERNANCE_FEATURE_FLAG, raising=False) + + def create_factory( + context: UiPathRuntimeContext | None = None, + ) -> UiPathRuntimeFactoryProtocol: + return MockLangGraphFactory(context) + + UiPathRuntimeFactoryRegistry.register("langgraph", create_factory, "langgraph.json") + + factory = UiPathRuntimeFactoryRegistry.get(name="langgraph") + runtime = await factory.new_runtime("agent", "runtime-1") + + assert isinstance(runtime, MockRuntime) + assert runtime.name == "langgraph-agent" + + +@pytest.mark.asyncio +async def test_wrapped_factory_invokes_governance_when_flag_on( + clean_registry, _reset_feature_flags, fake_governance_module +): + """With the governance FF on, ``new_runtime`` routes through ``governance_wrapper``.""" + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: True}) + + governed_sentinel = MagicMock(name="GovernedRuntime") + fake_governance_module.return_value = governed_sentinel + + captured: dict[str, Any] = {} + + def create_factory( + context: UiPathRuntimeContext | None = None, + ) -> UiPathRuntimeFactoryProtocol: + captured["context"] = context + return MockLangGraphFactory(context) + + UiPathRuntimeFactoryRegistry.register("langgraph", create_factory, "langgraph.json") + + context = UiPathRuntimeContext.with_defaults(entrypoint="agent") + factory = UiPathRuntimeFactoryRegistry.get(name="langgraph", context=context) + + result = await factory.new_runtime("agent", "runtime-42") + + assert result is governed_sentinel + fake_governance_module.assert_called_once() + call_args = fake_governance_module.call_args + # signature: governance_wrapper(runtime, context, runtime_id) + assert isinstance(call_args.args[0], MockRuntime) + assert call_args.args[1] is context + assert call_args.args[2] == "runtime-42" + + +@pytest.mark.asyncio +async def test_wrapped_factory_swallows_governance_exception( + clean_registry, _reset_feature_flags, fake_governance_module +): + """Governance failures must never break runtime creation.""" + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: True}) + fake_governance_module.side_effect = RuntimeError("policy load failed") + + def create_factory( + context: UiPathRuntimeContext | None = None, + ) -> UiPathRuntimeFactoryProtocol: + return MockLangGraphFactory(context) + + UiPathRuntimeFactoryRegistry.register("langgraph", create_factory, "langgraph.json") + + factory = UiPathRuntimeFactoryRegistry.get(name="langgraph") + runtime = await factory.new_runtime("agent", "runtime-1") + + assert isinstance(runtime, MockRuntime) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py new file mode 100644 index 0000000..7e2cbab --- /dev/null +++ b/tests/test_wrapper.py @@ -0,0 +1,85 @@ +"""Tests for the governance feature-flag gate at the runtime boundary. + +The FF-resolution logic itself lives in +``uipath.core.governance.config.is_governance_enabled`` and is tested +there. These tests only exercise the runtime's thin shim: +``apply_governance_wrapper`` defers to the gate, lazy-imports core's +``governance_wrapper`` only when the gate passes, and never lets +governance failures bring down the agent run. +""" + +from __future__ import annotations + +import sys +import types +from unittest.mock import MagicMock + +import pytest +from uipath.core.feature_flags import FeatureFlags + +from uipath.runtime.wrapper import ( + GOVERNANCE_FEATURE_FLAG, + apply_governance_wrapper, +) + + +@pytest.fixture(autouse=True) +def _reset_feature_flags(): + FeatureFlags.reset_flags() + yield + FeatureFlags.reset_flags() + + +@pytest.fixture +def fake_governance_module(monkeypatch): + """Install a stub ``uipath.runtime.governance.wrapper`` for the lazy import. + + Each test gets its own MagicMock as ``governance_wrapper`` so we can + assert call behaviour. Cleared by monkeypatch on teardown. + """ + mock_wrapper = MagicMock(name="governance_wrapper") + module = types.ModuleType("uipath.runtime.governance.wrapper") + module.governance_wrapper = mock_wrapper + + monkeypatch.setitem(sys.modules, "uipath.runtime.governance.wrapper", module) + return mock_wrapper + + +async def test_apply_governance_wrapper_returns_inner_when_flag_off( + monkeypatch, fake_governance_module +): + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: False}) + monkeypatch.delenv("UIPATH_FEATURE_" + GOVERNANCE_FEATURE_FLAG, raising=False) + inner = MagicMock(name="InnerRuntime") + + result = await apply_governance_wrapper(inner, None, "runtime-1") + + assert result is inner + fake_governance_module.assert_not_called() + + +async def test_apply_governance_wrapper_invokes_governance_when_flag_on( + fake_governance_module, +): + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: True}) + inner = MagicMock(name="InnerRuntime") + governed = MagicMock(name="GovernedRuntime") + fake_governance_module.return_value = governed + + result = await apply_governance_wrapper(inner, None, "runtime-1") + + assert result is governed + fake_governance_module.assert_called_once_with(inner, None, "runtime-1") + + +async def test_apply_governance_wrapper_swallows_wrapper_exception( + fake_governance_module, +): + """If governance_wrapper raises, fail-safe: return inner unchanged.""" + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: True}) + fake_governance_module.side_effect = RuntimeError("boom") + inner = MagicMock(name="InnerRuntime") + + result = await apply_governance_wrapper(inner, None, "runtime-1") + + assert result is inner diff --git a/tests/test_wrapper_internals.py b/tests/test_wrapper_internals.py new file mode 100644 index 0000000..1ac6352 --- /dev/null +++ b/tests/test_wrapper_internals.py @@ -0,0 +1,422 @@ +"""Tests for ``GovernanceRuntime`` internal helpers. + +The integration paths (full execute/stream/dispose) are covered in +``test_dispose_isolation.py`` and ``test_text_extraction.py``. This +file pins the smaller helpers that don't have dedicated coverage: +model-name / conversational extraction, agent-attribute discovery, +adapter attach + replacement, ``__getattr__`` forwarding, and the +module-level ``governance_wrapper`` / ``wrap_agent`` entry points. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest +from uipath.core.feature_flags import FeatureFlags + +from uipath.runtime.governance import wrapper as wrapper_mod +from uipath.runtime.governance.config import ( + EnforcementMode, + reset_enforcement_mode, + set_enforcement_mode, +) +from uipath.runtime.governance.wrapper import ( + GovernanceRuntime, + _current_model_name, + get_current_model_name, + governance_wrapper, + wrap_agent, +) +from uipath.runtime.wrapper import GOVERNANCE_FEATURE_FLAG + + +@pytest.fixture +def _ff_on(): + """FF on AND mode=AUDIT — the path that actually wraps.""" + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: True}) + set_enforcement_mode(EnforcementMode.AUDIT) + yield + FeatureFlags.reset_flags() + reset_enforcement_mode() + + +@pytest.fixture +def _ff_on_mode_disabled(): + """FF on but mode=DISABLED — wrappers should still short-circuit.""" + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: True}) + set_enforcement_mode(EnforcementMode.DISABLED) + yield + FeatureFlags.reset_flags() + reset_enforcement_mode() + + +@pytest.fixture +def _ff_off(): + FeatureFlags.configure_flags({GOVERNANCE_FEATURE_FLAG: False}) + yield + FeatureFlags.reset_flags() + + +def _runtime_init_off() -> GovernanceRuntime: + """Build a GovernanceRuntime with FF off — fastest construction path. + + The FF-off path returns at line 153 without touching the network, + so this is the right scaffold for testing per-method helpers + that don't depend on the evaluator being materialised. + """ + delegate = SimpleNamespace() + return GovernanceRuntime(delegate=delegate, context=None, runtime_id="rt") + + +# --------------------------------------------------------------------------- +# get_current_model_name — module-level ContextVar accessor +# --------------------------------------------------------------------------- + + +def test_get_current_model_name_default_empty() -> None: + """Unset ContextVar yields an empty string.""" + # Reset by binding empty. + tok = _current_model_name.set("") + try: + assert get_current_model_name() == "" + finally: + _current_model_name.reset(tok) + + +def test_get_current_model_name_returns_bound_value() -> None: + tok = _current_model_name.set("gpt-5") + try: + assert get_current_model_name() == "gpt-5" + finally: + _current_model_name.reset(tok) + + +# --------------------------------------------------------------------------- +# _extract_model_name — every fallback in order +# --------------------------------------------------------------------------- + + +def test_extract_model_name_from_agent_definition_settings() -> None: + """LicensedRuntime pattern: delegate._agent_definition.settings.model.""" + runtime = _runtime_init_off() + settings = SimpleNamespace(model="gpt-4o") + delegate = SimpleNamespace( + _agent_definition=SimpleNamespace(settings=settings), + ) + assert runtime._extract_model_name(delegate, None) == "gpt-4o" + + +def test_extract_model_name_from_delegate_attribute() -> None: + """Falls back to direct attributes on the delegate.""" + runtime = _runtime_init_off() + delegate = SimpleNamespace(model="claude-sonnet") + assert runtime._extract_model_name(delegate, None) == "claude-sonnet" + + +def test_extract_model_name_alternate_attribute_names() -> None: + runtime = _runtime_init_off() + for attr in ("model_name", "_model", "model_id"): + delegate = SimpleNamespace(**{attr: f"via-{attr}"}) + assert runtime._extract_model_name(delegate, None) == f"via-{attr}" + + +def test_extract_model_name_via_nested_delegate_chain() -> None: + """Unwraps wrapper layers via ``_delegate`` until it finds the definition.""" + runtime = _runtime_init_off() + settings = SimpleNamespace(model="haiku") + inner = SimpleNamespace(_agent_definition=SimpleNamespace(settings=settings)) + middle = SimpleNamespace(_delegate=inner) + outer = SimpleNamespace(_delegate=middle) + assert runtime._extract_model_name(outer, None) == "haiku" + + +def test_extract_model_name_via_public_delegate_attr() -> None: + """``delegate.delegate`` (public) is also walked as part of the chain.""" + runtime = _runtime_init_off() + settings = SimpleNamespace(model="opus") + inner = SimpleNamespace(_agent_definition=SimpleNamespace(settings=settings)) + outer = SimpleNamespace(delegate=inner) # public form + assert runtime._extract_model_name(outer, None) == "opus" + + +def test_extract_model_name_from_context_fallback() -> None: + """Last resort: read from the runtime context's attrs.""" + runtime = _runtime_init_off() + delegate = SimpleNamespace() # nothing extractable + ctx = SimpleNamespace(model_name="ctx-model") + assert runtime._extract_model_name(delegate, ctx) == "ctx-model" + + +def test_extract_model_name_returns_empty_when_unfindable() -> None: + runtime = _runtime_init_off() + assert runtime._extract_model_name(SimpleNamespace(), None) == "" + + +# --------------------------------------------------------------------------- +# _extract_is_conversational — agent_definition + context fallback + depth cap +# --------------------------------------------------------------------------- + + +def test_extract_is_conversational_from_agent_definition_true() -> None: + runtime = _runtime_init_off() + delegate = SimpleNamespace( + _agent_definition=SimpleNamespace(is_conversational=True) + ) + assert runtime._extract_is_conversational(delegate, None) is True + + +def test_extract_is_conversational_from_agent_definition_false() -> None: + runtime = _runtime_init_off() + delegate = SimpleNamespace( + _agent_definition=SimpleNamespace(is_conversational=False) + ) + assert runtime._extract_is_conversational(delegate, None) is False + + +def test_extract_is_conversational_from_nested_delegate() -> None: + runtime = _runtime_init_off() + inner = SimpleNamespace( + _agent_definition=SimpleNamespace(is_conversational=True) + ) + outer = SimpleNamespace(_delegate=inner) + assert runtime._extract_is_conversational(outer, None) is True + + +def test_extract_is_conversational_fallback_to_conversation_id() -> None: + """A populated ``conversation_id`` on the context implies a conv run.""" + runtime = _runtime_init_off() + delegate = SimpleNamespace() + ctx = SimpleNamespace(conversation_id="conv-abc") + assert runtime._extract_is_conversational(delegate, ctx) is True + + +def test_extract_is_conversational_default_false() -> None: + """No agent_def, no conversation_id → fail-safe to False (autonomous).""" + runtime = _runtime_init_off() + assert runtime._extract_is_conversational(SimpleNamespace(), None) is False + + +def test_extract_is_conversational_depth_cap_terminates() -> None: + """A pathological wrapper chain doesn't loop forever — capped at 10.""" + runtime = _runtime_init_off() + # Build a chain longer than the cap; none of the nodes carry an + # _agent_definition. Must return False without hanging. + node = SimpleNamespace() + for _ in range(20): + node = SimpleNamespace(_delegate=node) + assert runtime._extract_is_conversational(node, None) is False + + +def test_extract_is_conversational_ignores_none_value_on_agent_def() -> None: + """If ``is_conversational`` is None, look further up the chain.""" + runtime = _runtime_init_off() + inner = SimpleNamespace( + _agent_definition=SimpleNamespace(is_conversational=True) + ) + outer = SimpleNamespace( + _agent_definition=SimpleNamespace(is_conversational=None), + _delegate=inner, + ) + assert runtime._extract_is_conversational(outer, None) is True + + +# --------------------------------------------------------------------------- +# _extract_agent — finds agent via standard attribute names +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "attr_name", + ["_agent", "agent", "_runnable", "runnable", "_graph", "graph", "_chain", "chain", "_crew", "crew"], +) +def test_extract_agent_finds_via_known_attribute(attr_name: str) -> None: + runtime = _runtime_init_off() + agent_obj = object() + delegate = SimpleNamespace(**{attr_name: agent_obj}) + agent, name = runtime._extract_agent(delegate) + assert agent is agent_obj + assert name == attr_name + + +def test_extract_agent_returns_first_priority_attribute() -> None: + """``_agent`` wins over ``_graph`` when both are present.""" + runtime = _runtime_init_off() + delegate = SimpleNamespace(_agent="primary", _graph="fallback") + agent, name = runtime._extract_agent(delegate) + assert agent == "primary" + assert name == "_agent" + + +def test_extract_agent_via_nested_delegate_chain() -> None: + runtime = _runtime_init_off() + agent_obj = object() + inner = SimpleNamespace(_agent=agent_obj) + outer = SimpleNamespace(_delegate=inner) + agent, name = runtime._extract_agent(outer) + assert agent is agent_obj + assert name == "_agent" + # ``_agent_holder`` is set to the inner delegate so dispose can + # restore the original agent on the right object. + assert runtime._agent_holder is inner + + +def test_extract_agent_returns_none_when_not_found() -> None: + runtime = _runtime_init_off() + delegate = SimpleNamespace(unrelated="thing") + agent, name = runtime._extract_agent(delegate) + assert agent is None + assert name is None + + +def test_extract_agent_nested_depth_cap() -> None: + """A deep chain without any matching attribute terminates cleanly.""" + runtime = _runtime_init_off() + node = SimpleNamespace() + for _ in range(15): + node = SimpleNamespace(_delegate=node) + agent, name = runtime._extract_agent(node) + assert agent is None + assert name is None + + +# --------------------------------------------------------------------------- +# __getattr__ forwarding +# --------------------------------------------------------------------------- + + +def test_getattr_forwards_public_attributes_to_delegate() -> None: + """Unknown public attrs are forwarded to the wrapped runtime.""" + delegate = SimpleNamespace(custom_value="from-delegate", another=42) + runtime = GovernanceRuntime(delegate=delegate, context=None, runtime_id="rt") + # Direct attribute access falls through to __getattr__ when the + # name isn't on GovernanceRuntime itself. + assert runtime.custom_value == "from-delegate" + assert runtime.another == 42 + + +def test_getattr_raises_for_private_names() -> None: + """Private attribute names (``_foo``) are not forwarded — they raise.""" + delegate = SimpleNamespace(_should_not_forward="x") + runtime = GovernanceRuntime(delegate=delegate, context=None, runtime_id="rt") + with pytest.raises(AttributeError): + _ = runtime._should_not_forward + + +def test_getattr_raises_when_delegate_missing_attr() -> None: + """Attribute the delegate doesn't have raises AttributeError.""" + runtime = GovernanceRuntime( + delegate=SimpleNamespace(), context=None, runtime_id="rt" + ) + with pytest.raises(AttributeError): + _ = runtime.nonexistent_attr + + +# --------------------------------------------------------------------------- +# governance_wrapper module function — the gate +# --------------------------------------------------------------------------- + + +def test_governance_wrapper_returns_unwrapped_when_ff_off(_ff_off) -> None: + """FF off → returns the original runtime untouched.""" + original = SimpleNamespace(unique="marker") + result = governance_wrapper(original, context=None, runtime_id="rt") + assert result is original + + +def test_governance_wrapper_returns_unwrapped_when_mode_disabled( + _ff_on_mode_disabled, +) -> None: + """FF on but enforcement mode is DISABLED → returns runtime unchanged. + + The DISABLED short-circuit avoids per-call audit overhead for + tenants whose server has explicitly opted out of governance. + """ + original = SimpleNamespace() + result = governance_wrapper(original, context=None, runtime_id="rt") + assert result is original + assert not isinstance(result, GovernanceRuntime) + + +def test_governance_wrapper_returns_governance_runtime_when_ff_on(_ff_on) -> None: + """FF on + mode != DISABLED → wraps in GovernanceRuntime.""" + original = SimpleNamespace() + result = governance_wrapper(original, context=None, runtime_id="rt") + assert isinstance(result, GovernanceRuntime) + # The unwrapped runtime is accessible via .delegate. + assert result.delegate is original + + +# --------------------------------------------------------------------------- +# wrap_agent module function — direct adapter wrap path +# --------------------------------------------------------------------------- + + +def test_wrap_agent_returns_original_when_ff_off(_ff_off) -> None: + """FF off → returns the agent unchanged.""" + agent = SimpleNamespace(marker="x") + result = wrap_agent(agent, agent_id="a") + assert result is agent + + +def test_wrap_agent_returns_original_when_mode_disabled( + _ff_on_mode_disabled, +) -> None: + """FF on but mode=DISABLED → returns the agent unchanged.""" + agent = SimpleNamespace(marker="x") + result = wrap_agent(agent, agent_id="a") + assert result is agent + + +def test_wrap_agent_returns_original_when_no_adapter(_ff_on) -> None: + """FF on, mode=AUDIT, but no adapter matches → returns the agent unchanged.""" + agent = SimpleNamespace(marker="x") + fake_registry = MagicMock() + fake_registry.resolve.return_value = None + with patch.object(wrapper_mod, "get_adapter_registry", return_value=fake_registry): + with patch.object(wrapper_mod, "get_policy_index", return_value=MagicMock()): + result = wrap_agent(agent, agent_id="a") + assert result is agent + + +def test_wrap_agent_attaches_when_adapter_found(_ff_on) -> None: + """When an adapter is found, ``adapter.attach`` is called and result returned.""" + agent = SimpleNamespace(marker="x") + governed_agent = SimpleNamespace(governed=True) + fake_adapter = MagicMock() + fake_adapter.attach.return_value = governed_agent + fake_adapter.name = "test-adapter" + fake_registry = MagicMock() + fake_registry.resolve.return_value = fake_adapter + + with patch.object(wrapper_mod, "get_adapter_registry", return_value=fake_registry): + with patch.object(wrapper_mod, "get_policy_index", return_value=MagicMock()): + result = wrap_agent(agent, agent_id="my-agent") + + assert result is governed_agent + fake_adapter.attach.assert_called_once() + call_kwargs = fake_adapter.attach.call_args.kwargs + assert call_kwargs["agent_id"] == "my-agent" + # session_id is generated internally as a uuid4 string. + assert isinstance(call_kwargs["session_id"], str) + assert len(call_kwargs["session_id"]) > 0 + + +def test_wrap_agent_defaults_agent_id_to_class_name(_ff_on) -> None: + """When ``agent_id`` is None, falls back to ``type(agent).__name__``.""" + + class MyAgent: + marker = "x" + + agent = MyAgent() + fake_adapter = MagicMock() + fake_adapter.attach.return_value = SimpleNamespace() + fake_adapter.name = "test-adapter" + fake_registry = MagicMock() + fake_registry.resolve.return_value = fake_adapter + + with patch.object(wrapper_mod, "get_adapter_registry", return_value=fake_registry): + with patch.object(wrapper_mod, "get_policy_index", return_value=MagicMock()): + wrap_agent(agent) # no agent_id + assert fake_adapter.attach.call_args.kwargs["agent_id"] == "MyAgent"