Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/uipath/_cli/_evals/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,13 +513,16 @@ async def _execute_eval(
execution_id=execution_id,
)

await self.event_bus.publish(
EvaluationEvents.CREATE_EVAL_RUN,
EvalRunCreatedEvent(
execution_id=execution_id,
eval_item=eval_item,
),
)
# Only create eval run entry if NOT resuming from a checkpoint
# When resuming, the entry already exists from the suspend phase
if not self.context.resume:
await self.event_bus.publish(
EvaluationEvents.CREATE_EVAL_RUN,
EvalRunCreatedEvent(
execution_id=execution_id,
eval_item=eval_item,
),
)
agent_execution_output = await self.execute_runtime(
eval_item,
execution_id,
Expand Down
360 changes: 360 additions & 0 deletions tests/cli/eval/test_eval_runtime_suspend_resume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
"""Tests for UiPathEvalRuntime suspend/resume event publishing.

This module tests:
- Normal flow: CREATE_EVAL_RUN event is published
- Resume flow: CREATE_EVAL_RUN event is NOT published (only UPDATE)
- Ensures no duplicate eval run entries in StudioWeb
"""

from pathlib import Path
from typing import Any, AsyncGenerator
from unittest.mock import AsyncMock

import pytest
from uipath.core.tracing import UiPathTraceManager
from uipath.runtime import (
UiPathExecuteOptions,
UiPathRuntimeEvent,
UiPathRuntimeProtocol,
UiPathRuntimeResult,
UiPathRuntimeStatus,
UiPathStreamOptions,
)
from uipath.runtime.schema import UiPathRuntimeSchema

from uipath._cli._evals._evaluate import evaluate
from uipath._cli._evals._runtime import UiPathEvalContext
from uipath._events._event_bus import EventBus
from uipath._events._events import EvaluationEvents


class MockRuntimeSchema(UiPathRuntimeSchema):
"""Mock schema for testing."""

def __init__(self):
super().__init__(
filePath="test.py",
uniqueId="test",
type="agent",
input={"type": "object", "properties": {}},
output={"type": "object", "properties": {}},
)


class SuspendingRuntime:
"""Test runtime that returns SUSPENDED status."""

async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
# Simulate an agent that suspends
# Returning SUSPENDED status is sufficient for our test
return UiPathRuntimeResult(
output={},
status=UiPathRuntimeStatus.SUSPENDED,
)

async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
yield UiPathRuntimeResult(
output={},
status=UiPathRuntimeStatus.SUSPENDED,
)

async def get_schema(self) -> UiPathRuntimeSchema:
return MockRuntimeSchema()

async def dispose(self) -> None:
pass


class SuccessfulRuntime:
"""Test runtime that returns SUCCESSFUL status."""

async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
return UiPathRuntimeResult(
output={"result": "success"},
status=UiPathRuntimeStatus.SUCCESSFUL,
)

async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
yield UiPathRuntimeResult(
output={"result": "success"},
status=UiPathRuntimeStatus.SUCCESSFUL,
)

async def get_schema(self) -> UiPathRuntimeSchema:
return MockRuntimeSchema()

async def dispose(self) -> None:
pass


class MockFactory:
"""Mock factory for creating test runtimes."""

def __init__(self, runtime_creator):
self.runtime_creator = runtime_creator

def discover_entrypoints(self) -> list[str]:
return ["test"]

async def discover_runtimes(self) -> list[UiPathRuntimeProtocol]:
return [await self.runtime_creator()]

async def new_runtime(
self, entrypoint: str, runtime_id: str, **kwargs
) -> UiPathRuntimeProtocol:
return await self.runtime_creator()

async def dispose(self) -> None:
pass


@pytest.fixture
def context():
"""Create eval context."""
context = UiPathEvalContext()
context.eval_set = str(
Path(__file__).parent / "evals" / "eval-sets" / "default.json"
)
return context


@pytest.fixture
def event_bus():
"""Create event bus with mocked publish method."""
bus = EventBus()
bus.publish = AsyncMock() # type: ignore[method-assign]
return bus


@pytest.fixture
def trace_manager():
"""Create trace manager."""
return UiPathTraceManager()


class TestNormalFlowCreateEvalRun:
"""Tests for normal execution flow - CREATE_EVAL_RUN should be published."""

async def test_publishes_create_eval_run_on_successful_execution(
self, context, event_bus, trace_manager
):
"""Test that CREATE_EVAL_RUN is published during normal successful execution."""

# Arrange
async def create_runtime():
return SuccessfulRuntime()

factory = MockFactory(create_runtime)

# Act
await evaluate(factory, trace_manager, context, event_bus)

# Assert
# Check that CREATE_EVAL_RUN was published
create_calls = [
call
for call in event_bus.publish.call_args_list
if call[0][0] == EvaluationEvents.CREATE_EVAL_RUN
]
assert len(create_calls) > 0, (
"CREATE_EVAL_RUN should be published in normal flow"
)

async def test_publishes_create_eval_run_on_suspend(
self, context, event_bus, trace_manager
):
"""Test that CREATE_EVAL_RUN is published when agent suspends."""

# Arrange
async def create_runtime():
return SuspendingRuntime()

factory = MockFactory(create_runtime)

# Act
await evaluate(factory, trace_manager, context, event_bus)

# Assert
# Check that CREATE_EVAL_RUN was published
create_calls = [
call
for call in event_bus.publish.call_args_list
if call[0][0] == EvaluationEvents.CREATE_EVAL_RUN
]
assert len(create_calls) > 0, (
"CREATE_EVAL_RUN should be published on suspend (initial execution)"
)


class TestResumeFlowSkipsCreateEvalRun:
"""Tests for resume flow - CREATE_EVAL_RUN should NOT be published."""

async def test_skips_create_eval_run_on_resume(
self, context, event_bus, trace_manager
):
"""Test that CREATE_EVAL_RUN is NOT published when resuming from checkpoint."""
# Arrange
context.resume = True # Set resume flag

async def create_runtime():
return SuccessfulRuntime()

factory = MockFactory(create_runtime)

# Act
await evaluate(factory, trace_manager, context, event_bus)

# Assert
# Check that CREATE_EVAL_RUN was NOT published
create_calls = [
call
for call in event_bus.publish.call_args_list
if call[0][0] == EvaluationEvents.CREATE_EVAL_RUN
]
assert len(create_calls) == 0, (
"CREATE_EVAL_RUN should NOT be published on resume"
)

async def test_publishes_update_eval_run_on_resume(
self, context, event_bus, trace_manager
):
"""Test that UPDATE_EVAL_RUN is still published when resuming."""
# Arrange
context.resume = True # Set resume flag

async def create_runtime():
return SuccessfulRuntime()

factory = MockFactory(create_runtime)

# Act
await evaluate(factory, trace_manager, context, event_bus)

# Assert
# Check that UPDATE_EVAL_RUN was published
update_calls = [
call
for call in event_bus.publish.call_args_list
if call[0][0] == EvaluationEvents.UPDATE_EVAL_RUN
]
assert len(update_calls) > 0, (
"UPDATE_EVAL_RUN should still be published on resume"
)

async def test_resume_flag_false_by_default(self, context):
"""Test that resume flag is False by default in context."""
assert context.resume is False, "resume should default to False"

async def test_no_duplicate_entries_on_resume(
self, context, event_bus, trace_manager
):
"""Test that resuming doesn't create duplicate entries (integration test concept).

This test verifies that when resume=True:
1. CREATE_EVAL_RUN is NOT called
2. Only UPDATE_EVAL_RUN is called

This ensures no duplicate entries in StudioWeb.
"""
# Arrange
context.resume = True

async def create_runtime():
return SuccessfulRuntime()

factory = MockFactory(create_runtime)

# Act
await evaluate(factory, trace_manager, context, event_bus)

# Assert
create_calls = [
call
for call in event_bus.publish.call_args_list
if call[0][0] == EvaluationEvents.CREATE_EVAL_RUN
]
update_calls = [
call
for call in event_bus.publish.call_args_list
if call[0][0] == EvaluationEvents.UPDATE_EVAL_RUN
]

assert len(create_calls) == 0, "Should NOT create new entry on resume"
assert len(update_calls) > 0, "Should update existing entry on resume"


class TestSuspendResumeLifecycle:
"""Tests for complete suspend/resume lifecycle."""

async def test_suspend_then_resume_lifecycle(self, context, trace_manager):
"""Test complete lifecycle: suspend (creates entry) then resume (updates entry)."""
# Phase 1: Suspend
event_bus_suspend = EventBus()
event_bus_suspend.publish = AsyncMock() # type: ignore[method-assign]

async def create_suspending_runtime():
return SuspendingRuntime()

factory_suspend = MockFactory(create_suspending_runtime)

await evaluate(factory_suspend, trace_manager, context, event_bus_suspend)

# Assert suspend phase
suspend_create_calls = [
call
for call in event_bus_suspend.publish.call_args_list
if call[0][0] == EvaluationEvents.CREATE_EVAL_RUN
]
suspend_update_calls = [
call
for call in event_bus_suspend.publish.call_args_list
if call[0][0] == EvaluationEvents.UPDATE_EVAL_RUN
]

assert len(suspend_create_calls) > 0, "Suspend phase should CREATE entry"
assert len(suspend_update_calls) > 0, (
"Suspend phase should UPDATE entry with triggers"
)

# Phase 2: Resume
context.resume = True
event_bus_resume = EventBus()
event_bus_resume.publish = AsyncMock() # type: ignore[method-assign]

async def create_successful_runtime():
return SuccessfulRuntime()

factory_resume = MockFactory(create_successful_runtime)

await evaluate(factory_resume, trace_manager, context, event_bus_resume)

# Assert resume phase
resume_create_calls = [
call
for call in event_bus_resume.publish.call_args_list
if call[0][0] == EvaluationEvents.CREATE_EVAL_RUN
]
resume_update_calls = [
call
for call in event_bus_resume.publish.call_args_list
if call[0][0] == EvaluationEvents.UPDATE_EVAL_RUN
]

assert len(resume_create_calls) == 0, "Resume phase should NOT CREATE new entry"
assert len(resume_update_calls) > 0, "Resume phase should UPDATE existing entry"