Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions flocks/session/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,22 @@ async def store_part(cls, session_id: str, message_id: str, part: PartType) -> P
updated = False
for i, existing in enumerate(parts_list):
if existing.id == part.id:
existing_status = getattr(getattr(existing, "state", None), "status", None)
incoming_status = getattr(getattr(part, "state", None), "status", None)
if (
isinstance(existing, ToolPart)
and isinstance(part, ToolPart)
and existing_status in {"completed", "error"}
and incoming_status in {"pending", "running"}
):
log.debug("message.part.skip_terminal_tool_downgrade", {
"session_id": session_id,
"message_id": message_id,
"part_id": part.id,
"existing_status": existing_status,
"incoming_status": incoming_status,
})
return existing
parts_list[i] = part
updated = True
break
Expand Down
2 changes: 1 addition & 1 deletion flocks/tool/agent/delegate_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,6 @@ async def delegate_task_tool(
loop_result=result,
metadata=forwarder.final_metadata,
)
result_status = "running" if tool_result.success else "error"
result_status = "completed" if tool_result.success else "error"
ctx.metadata({"title": description, "metadata": {**forwarder.final_metadata, "status": result_status}})
return tool_result
29 changes: 28 additions & 1 deletion flocks/tool/task/run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,32 @@ def _get_workflow_runtime():
_DESCRIPTION_CACHE_TTL: float = 60.0 # seconds


def _create_nested_tool_context(ctx: ToolContext) -> ToolContext:
"""Create an isolated child ToolContext for workflow node tools.

Workflow execution itself should surface workflow-level progress via
``run_workflow`` metadata. Reusing the outer tool context inside workflow
nodes lets nested tools (notably ``task``) overwrite that metadata with
child-session progress, which makes the UI misclassify the outer workflow
card as a delegate/subagent card.

We therefore clone the context but intentionally omit the metadata
callback, while preserving permissions, event publishing, abort signal, and
sandbox/session identity.
"""
return ToolContext(
session_id=ctx.session_id,
message_id=ctx.message_id,
agent=ctx.agent,
call_id=ctx.call_id,
extra=dict(ctx.extra),
abort_event=ctx.abort,
permission_callback=ctx._permission_callback,
metadata_callback=None,
event_publish_callback=ctx.event_publish_callback,
)


async def _build_description() -> str:
"""Build dynamic description with available workflows list (TTL-cached, 60 s)."""
global _DESCRIPTION_CACHE, _DESCRIPTION_CACHE_AT
Expand Down Expand Up @@ -565,6 +591,7 @@ def _on_step_complete(step_result: Any) -> None:
})
execution_started_at = time.time()

nested_tool_ctx = _create_nested_tool_context(ctx)
call_kwargs: Dict[str, Any] = {
"workflow": workflow_source,
"inputs": workflow_inputs,
Expand All @@ -574,7 +601,7 @@ def _on_step_complete(step_result: Any) -> None:
),
"timeout_s": timeout_s,
"trace": trace,
"tool_context": ctx,
"tool_context": nested_tool_ctx,
}

# Backward-compatibility: older runtimes may not accept `use_llm`.
Expand Down
2 changes: 1 addition & 1 deletion flocks/tool/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ async def task_tool(
loop_result=result,
metadata=forwarder.final_metadata,
)
result_status = "running" if tool_result.success else "error"
result_status = "completed" if tool_result.success else "error"
ctx.metadata({
"title": description,
"metadata": {**forwarder.final_metadata, "status": result_status},
Expand Down
46 changes: 46 additions & 0 deletions tests/session/test_message_parts.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,52 @@ async def test_store_completed_tool_part_caches_prompt_output_and_revision(self)
assert Message.get_parts_revision(sid, msg.id) == 2
assert updated.state.metadata["llm_output_text"].startswith("{")

@pytest.mark.asyncio
async def test_store_part_does_not_downgrade_terminal_tool_state(self, monkeypatch):
sid = SID + "_terminal_guard"
msg = await Message.create(sid, MessageRole.ASSISTANT, "")
part_id = "part_terminal_guard"

monkeypatch.setattr("flocks.session.message.Recorder.record_tool_state", AsyncMock())

completed_part = ToolPart(
id=part_id,
sessionID=sid,
messageID=msg.id,
callID="call_terminal_guard",
tool="task",
state=ToolStateCompleted(
input={"prompt": "run"},
output="done",
title="task",
metadata={"sessionId": "ses_child_done"},
time={"start": 1000, "end": 2000},
),
)
stale_running_part = ToolPart(
id=part_id,
sessionID=sid,
messageID=msg.id,
callID="call_terminal_guard",
tool="task",
state=ToolStateRunning(
input={"prompt": "run"},
title="task",
metadata={"sessionId": "ses_child_done", "status": "running"},
time={"start": 1000},
),
)

await Message.store_part(sid, msg.id, completed_part)
stored = await Message.store_part(sid, msg.id, stale_running_part)
parts = await Message.parts(msg.id, sid)
tool_parts = [p for p in parts if p.type == "tool" and p.id == part_id]

assert stored.state.status == "completed"
assert len(tool_parts) == 1
assert tool_parts[0].state.status == "completed"
assert getattr(tool_parts[0].state, "time", {}).get("end") == 2000


@pytest.mark.asyncio
async def test_list_with_parts_keeps_message_info_separate():
Expand Down
46 changes: 46 additions & 0 deletions tests/workflow/test_tool_run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def simple_workflow():
return {
"id": "test-workflow-001",
"name": "Test Workflow",
"start": "node-1",
"metadata": {},
"nodes": [
{
Expand All @@ -101,6 +102,7 @@ def workflow_with_requirements():
return {
"id": "test-workflow-002",
"name": "Test Workflow with Requirements",
"start": "node-1",
"metadata": {
"requirements": ["requests>=2.31,<3"]
},
Expand All @@ -121,6 +123,7 @@ def workflow_with_inputs():
return {
"id": "test-workflow-003",
"name": "Test Workflow with Inputs",
"start": "node-1",
"metadata": {},
"nodes": [
{
Expand Down Expand Up @@ -336,6 +339,49 @@ def run_side_effect(**kwargs):
record_result.assert_awaited_once()
assert storage_write.await_count >= 1
assert any(update.get("workflow_execution_id") == "exec-registered" for update in metadata_updates)

@pytest.mark.anyio
async def test_run_workflow_uses_isolated_child_tool_context(
self,
tool_context_with_permission,
simple_workflow,
):
fake = FakeRunWorkflowResult(
status="SUCCEEDED",
run_id="run-isolated-ctx",
steps=1,
last_node_id="node-1",
outputs={"ok": True},
history=[],
error=None,
)
mock_run = Mock(name="run_workflow", return_value=fake)

with patch.object(
run_workflow_module,
"_get_workflow_runtime",
return_value=_runtime_tuple(run_fn=mock_run),
):
result = await ToolRegistry.execute(
"run_workflow",
ctx=tool_context_with_permission,
workflow=simple_workflow,
inputs={"name": "Flocks"},
)

assert result.success is True
call_kwargs = mock_run.call_args.kwargs
nested_ctx = call_kwargs["tool_context"]
assert nested_ctx is not tool_context_with_permission
assert nested_ctx.session_id == tool_context_with_permission.session_id
assert nested_ctx.message_id == tool_context_with_permission.message_id
assert nested_ctx.agent == tool_context_with_permission.agent
assert nested_ctx.call_id == tool_context_with_permission.call_id
assert nested_ctx.extra == tool_context_with_permission.extra
assert nested_ctx.abort is tool_context_with_permission.abort
assert nested_ctx.event_publish_callback == tool_context_with_permission.event_publish_callback
assert nested_ctx._permission_callback == tool_context_with_permission._permission_callback
assert nested_ctx._metadata_callback is None

@pytest.mark.anyio
async def test_run_workflow_with_inputs(self, tool_context_with_permission, workflow_with_inputs):
Expand Down
21 changes: 21 additions & 0 deletions webui/src/components/common/DelegateTaskCard.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,25 @@ describe('shouldRenderDelegateTaskCard', () => {

expect(shouldRenderDelegateTaskCard(part)).toBe(true);
});

it('does not treat run_workflow with leaked child session metadata as a delegate task', () => {
const part = {
id: 'part-run-workflow',
type: 'tool',
tool: 'run_workflow',
state: {
status: 'running',
input: {
workflow: 'loop_host_forensics_fast',
},
metadata: {
workflow_id: 'loop_host_forensics_fast',
workflow_execution_id: 'wf_exec_123',
sessionId: 'ses_child_leaked',
},
},
} as MessagePart;

expect(shouldRenderDelegateTaskCard(part)).toBe(false);
});
});
5 changes: 4 additions & 1 deletion webui/src/components/common/DelegateTaskCard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ export function shouldRenderDelegateTaskCard(part: MessagePart): boolean {
}

const output = typeof state.output === 'string' ? state.output : undefined;
return !!extractSessionId(state.metadata, output);
if (!part.tool || part.tool === 'unknown') {
return !!extractSessionId(state.metadata, output);
}
return false;
}

interface ActivityStep {
Expand Down