From 1e91fd1abbb91584556df51de70dadb653447d38 Mon Sep 17 00:00:00 2001 From: xiami762 <> Date: Mon, 18 May 2026 13:12:49 +0800 Subject: [PATCH] fix(workflow): isolate nested tool context and stabilize delegate UI Prevent nested workflow tools from overwriting run_workflow metadata, report completed status for successful task/delegate results, reject stale running updates over terminal tool parts, and tighten delegate card rendering so run_workflow is not misclassified. Co-authored-by: Cursor --- flocks/session/message.py | 16 +++++++ flocks/tool/agent/delegate_task.py | 2 +- flocks/tool/task/run_workflow.py | 29 +++++++++++- flocks/tool/task/task.py | 2 +- tests/session/test_message_parts.py | 46 +++++++++++++++++++ tests/workflow/test_tool_run_workflow.py | 46 +++++++++++++++++++ .../common/DelegateTaskCard.test.tsx | 21 +++++++++ .../components/common/DelegateTaskCard.tsx | 5 +- 8 files changed, 163 insertions(+), 4 deletions(-) diff --git a/flocks/session/message.py b/flocks/session/message.py index 54e51de88..130e6b7f9 100644 --- a/flocks/session/message.py +++ b/flocks/session/message.py @@ -974,6 +974,22 @@ async def store_part(cls, session_id: str, message_id: str, part: PartType) -> P updated = False for i, existing in enumerate(parts_list): if existing.id == part.id: + existing_status = getattr(getattr(existing, "state", None), "status", None) + incoming_status = getattr(getattr(part, "state", None), "status", None) + if ( + isinstance(existing, ToolPart) + and isinstance(part, ToolPart) + and existing_status in {"completed", "error"} + and incoming_status in {"pending", "running"} + ): + log.debug("message.part.skip_terminal_tool_downgrade", { + "session_id": session_id, + "message_id": message_id, + "part_id": part.id, + "existing_status": existing_status, + "incoming_status": incoming_status, + }) + return existing parts_list[i] = part updated = True break diff --git a/flocks/tool/agent/delegate_task.py b/flocks/tool/agent/delegate_task.py index 8196555a8..77ff56d64 100644 --- a/flocks/tool/agent/delegate_task.py +++ b/flocks/tool/agent/delegate_task.py @@ -474,6 +474,6 @@ async def delegate_task_tool( loop_result=result, metadata=forwarder.final_metadata, ) - result_status = "running" if tool_result.success else "error" + result_status = "completed" if tool_result.success else "error" ctx.metadata({"title": description, "metadata": {**forwarder.final_metadata, "status": result_status}}) return tool_result diff --git a/flocks/tool/task/run_workflow.py b/flocks/tool/task/run_workflow.py index 542115c40..2feec283a 100644 --- a/flocks/tool/task/run_workflow.py +++ b/flocks/tool/task/run_workflow.py @@ -96,6 +96,32 @@ def _get_workflow_runtime(): _DESCRIPTION_CACHE_TTL: float = 60.0 # seconds +def _create_nested_tool_context(ctx: ToolContext) -> ToolContext: + """Create an isolated child ToolContext for workflow node tools. + + Workflow execution itself should surface workflow-level progress via + ``run_workflow`` metadata. Reusing the outer tool context inside workflow + nodes lets nested tools (notably ``task``) overwrite that metadata with + child-session progress, which makes the UI misclassify the outer workflow + card as a delegate/subagent card. + + We therefore clone the context but intentionally omit the metadata + callback, while preserving permissions, event publishing, abort signal, and + sandbox/session identity. + """ + return ToolContext( + session_id=ctx.session_id, + message_id=ctx.message_id, + agent=ctx.agent, + call_id=ctx.call_id, + extra=dict(ctx.extra), + abort_event=ctx.abort, + permission_callback=ctx._permission_callback, + metadata_callback=None, + event_publish_callback=ctx.event_publish_callback, + ) + + async def _build_description() -> str: """Build dynamic description with available workflows list (TTL-cached, 60 s).""" global _DESCRIPTION_CACHE, _DESCRIPTION_CACHE_AT @@ -565,6 +591,7 @@ def _on_step_complete(step_result: Any) -> None: }) execution_started_at = time.time() + nested_tool_ctx = _create_nested_tool_context(ctx) call_kwargs: Dict[str, Any] = { "workflow": workflow_source, "inputs": workflow_inputs, @@ -574,7 +601,7 @@ def _on_step_complete(step_result: Any) -> None: ), "timeout_s": timeout_s, "trace": trace, - "tool_context": ctx, + "tool_context": nested_tool_ctx, } # Backward-compatibility: older runtimes may not accept `use_llm`. diff --git a/flocks/tool/task/task.py b/flocks/tool/task/task.py index dbfa991c7..48e5c7242 100644 --- a/flocks/tool/task/task.py +++ b/flocks/tool/task/task.py @@ -355,7 +355,7 @@ async def task_tool( loop_result=result, metadata=forwarder.final_metadata, ) - result_status = "running" if tool_result.success else "error" + result_status = "completed" if tool_result.success else "error" ctx.metadata({ "title": description, "metadata": {**forwarder.final_metadata, "status": result_status}, diff --git a/tests/session/test_message_parts.py b/tests/session/test_message_parts.py index 0a38f4a83..89504ba62 100644 --- a/tests/session/test_message_parts.py +++ b/tests/session/test_message_parts.py @@ -524,6 +524,52 @@ async def test_store_completed_tool_part_caches_prompt_output_and_revision(self) assert Message.get_parts_revision(sid, msg.id) == 2 assert updated.state.metadata["llm_output_text"].startswith("{") + @pytest.mark.asyncio + async def test_store_part_does_not_downgrade_terminal_tool_state(self, monkeypatch): + sid = SID + "_terminal_guard" + msg = await Message.create(sid, MessageRole.ASSISTANT, "") + part_id = "part_terminal_guard" + + monkeypatch.setattr("flocks.session.message.Recorder.record_tool_state", AsyncMock()) + + completed_part = ToolPart( + id=part_id, + sessionID=sid, + messageID=msg.id, + callID="call_terminal_guard", + tool="task", + state=ToolStateCompleted( + input={"prompt": "run"}, + output="done", + title="task", + metadata={"sessionId": "ses_child_done"}, + time={"start": 1000, "end": 2000}, + ), + ) + stale_running_part = ToolPart( + id=part_id, + sessionID=sid, + messageID=msg.id, + callID="call_terminal_guard", + tool="task", + state=ToolStateRunning( + input={"prompt": "run"}, + title="task", + metadata={"sessionId": "ses_child_done", "status": "running"}, + time={"start": 1000}, + ), + ) + + await Message.store_part(sid, msg.id, completed_part) + stored = await Message.store_part(sid, msg.id, stale_running_part) + parts = await Message.parts(msg.id, sid) + tool_parts = [p for p in parts if p.type == "tool" and p.id == part_id] + + assert stored.state.status == "completed" + assert len(tool_parts) == 1 + assert tool_parts[0].state.status == "completed" + assert getattr(tool_parts[0].state, "time", {}).get("end") == 2000 + @pytest.mark.asyncio async def test_list_with_parts_keeps_message_info_separate(): diff --git a/tests/workflow/test_tool_run_workflow.py b/tests/workflow/test_tool_run_workflow.py index 7b37ef16a..d7762ebd2 100644 --- a/tests/workflow/test_tool_run_workflow.py +++ b/tests/workflow/test_tool_run_workflow.py @@ -83,6 +83,7 @@ def simple_workflow(): return { "id": "test-workflow-001", "name": "Test Workflow", + "start": "node-1", "metadata": {}, "nodes": [ { @@ -101,6 +102,7 @@ def workflow_with_requirements(): return { "id": "test-workflow-002", "name": "Test Workflow with Requirements", + "start": "node-1", "metadata": { "requirements": ["requests>=2.31,<3"] }, @@ -121,6 +123,7 @@ def workflow_with_inputs(): return { "id": "test-workflow-003", "name": "Test Workflow with Inputs", + "start": "node-1", "metadata": {}, "nodes": [ { @@ -336,6 +339,49 @@ def run_side_effect(**kwargs): record_result.assert_awaited_once() assert storage_write.await_count >= 1 assert any(update.get("workflow_execution_id") == "exec-registered" for update in metadata_updates) + + @pytest.mark.anyio + async def test_run_workflow_uses_isolated_child_tool_context( + self, + tool_context_with_permission, + simple_workflow, + ): + fake = FakeRunWorkflowResult( + status="SUCCEEDED", + run_id="run-isolated-ctx", + steps=1, + last_node_id="node-1", + outputs={"ok": True}, + history=[], + error=None, + ) + mock_run = Mock(name="run_workflow", return_value=fake) + + with patch.object( + run_workflow_module, + "_get_workflow_runtime", + return_value=_runtime_tuple(run_fn=mock_run), + ): + result = await ToolRegistry.execute( + "run_workflow", + ctx=tool_context_with_permission, + workflow=simple_workflow, + inputs={"name": "Flocks"}, + ) + + assert result.success is True + call_kwargs = mock_run.call_args.kwargs + nested_ctx = call_kwargs["tool_context"] + assert nested_ctx is not tool_context_with_permission + assert nested_ctx.session_id == tool_context_with_permission.session_id + assert nested_ctx.message_id == tool_context_with_permission.message_id + assert nested_ctx.agent == tool_context_with_permission.agent + assert nested_ctx.call_id == tool_context_with_permission.call_id + assert nested_ctx.extra == tool_context_with_permission.extra + assert nested_ctx.abort is tool_context_with_permission.abort + assert nested_ctx.event_publish_callback == tool_context_with_permission.event_publish_callback + assert nested_ctx._permission_callback == tool_context_with_permission._permission_callback + assert nested_ctx._metadata_callback is None @pytest.mark.anyio async def test_run_workflow_with_inputs(self, tool_context_with_permission, workflow_with_inputs): diff --git a/webui/src/components/common/DelegateTaskCard.test.tsx b/webui/src/components/common/DelegateTaskCard.test.tsx index 4710fb769..a50dbbefb 100644 --- a/webui/src/components/common/DelegateTaskCard.test.tsx +++ b/webui/src/components/common/DelegateTaskCard.test.tsx @@ -62,4 +62,25 @@ describe('shouldRenderDelegateTaskCard', () => { expect(shouldRenderDelegateTaskCard(part)).toBe(true); }); + + it('does not treat run_workflow with leaked child session metadata as a delegate task', () => { + const part = { + id: 'part-run-workflow', + type: 'tool', + tool: 'run_workflow', + state: { + status: 'running', + input: { + workflow: 'loop_host_forensics_fast', + }, + metadata: { + workflow_id: 'loop_host_forensics_fast', + workflow_execution_id: 'wf_exec_123', + sessionId: 'ses_child_leaked', + }, + }, + } as MessagePart; + + expect(shouldRenderDelegateTaskCard(part)).toBe(false); + }); }); diff --git a/webui/src/components/common/DelegateTaskCard.tsx b/webui/src/components/common/DelegateTaskCard.tsx index 1e9fdfa93..ef52ab20d 100644 --- a/webui/src/components/common/DelegateTaskCard.tsx +++ b/webui/src/components/common/DelegateTaskCard.tsx @@ -33,7 +33,10 @@ export function shouldRenderDelegateTaskCard(part: MessagePart): boolean { } const output = typeof state.output === 'string' ? state.output : undefined; - return !!extractSessionId(state.metadata, output); + if (!part.tool || part.tool === 'unknown') { + return !!extractSessionId(state.metadata, output); + } + return false; } interface ActivityStep {