Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ chroma_db_memory/
core/chroma_db_actions/
**/logs/
agent_file_system/workspace/
agent_file_system/action_outputs/
craftbot/external_tools/
craftbot/generated_task_document/
**/__pycache__/
Expand Down
4 changes: 4 additions & 0 deletions agent_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@
from agent_core.core.impl.action import (
ActionExecutor,
ActionLibrary,
ActionOutputRecord,
ActionOutputStore,
ActionRouter,
ActionManager,
set_gui_execute_hook,
Expand Down Expand Up @@ -367,6 +369,8 @@
# Implementations
"ActionExecutor",
"ActionLibrary",
"ActionOutputRecord",
"ActionOutputStore",
"ActionRouter",
"ActionManager",
"set_gui_execute_hook",
Expand Down
7 changes: 7 additions & 0 deletions agent_core/core/impl/action/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
set_gui_execute_hook,
)
from agent_core.core.impl.action.library import ActionLibrary
from agent_core.core.impl.action.output_store import (
ActionOutputRecord,
ActionOutputStore,
)
from agent_core.core.impl.action.router import ActionRouter, _is_visible_in_mode
from agent_core.core.impl.action.manager import (
ActionManager,
Expand All @@ -31,6 +35,9 @@
"set_gui_execute_hook",
# Library
"ActionLibrary",
# Output store
"ActionOutputRecord",
"ActionOutputStore",
# Router
"ActionRouter",
"_is_visible_in_mode",
Expand Down
30 changes: 23 additions & 7 deletions agent_core/core/impl/action/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,16 +429,32 @@ def _atomic_action_venv_process(
timeout=timeout,
)

return {
"stdout": proc.stdout.strip(),
"stderr": proc.stderr.strip(),
"returncode": proc.returncode,
}
stdout = proc.stdout.strip()
stderr = proc.stderr.strip()

if proc.returncode != 0:
err = stderr or f"Action exited with code {proc.returncode}"
return {"status": "error", "message": err}

# The sandbox script prints ``json.dumps(result)`` to stdout, so
# the action's logical output is the inner dict — not the raw
# subprocess wrapper. Parse it here so downstream consumers
# (event stream, ActionOutputStore, $ref navigation) see the
# action's real shape (e.g. ``{"status", "stdout", "stderr"}``)
# rather than ``{"stdout": <json string>, ...}``. Mirrors the
# behaviour of ``_atomic_action_internal_subprocess``.
if not stdout:
return {"status": "success", "output": ""}

try:
return json.loads(stdout)
except json.JSONDecodeError:
return {"status": "success", "output": stdout}

except subprocess.TimeoutExpired:
return {"stdout": "", "stderr": "Execution timed out", "returncode": -1}
return {"status": "error", "message": "Execution timed out"}
except Exception as e:
return {"stdout": "", "stderr": f"Execution failed: {e}", "returncode": -1}
return {"status": "error", "message": f"Execution failed: {e}"}
finally:
_restore_worker_stdio(saved_stdout, saved_stderr)

Expand Down
82 changes: 78 additions & 4 deletions agent_core/core/impl/action/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
from agent_core.core.protocols.context import ContextEngineProtocol
from agent_core.core.protocols.state import StateManagerProtocol
from agent_core.core.impl.action.executor import ActionExecutor
from agent_core.core.impl.action.output_store import ActionOutputStore, make_key
from agent_core.core.impl.action.ref_resolver import (
render_output_for_event_stream,
resolve_refs,
)
from agent_core.utils.logger import logger

# ============================================================================
Expand Down Expand Up @@ -120,6 +125,7 @@ def __init__(
on_action_start: Optional[OnActionStartHook] = None,
on_action_end: Optional[OnActionEndHook] = None,
get_parent_id: Optional[GetParentIdHook] = None,
action_output_store: Optional[ActionOutputStore] = None,
):
"""
Build an ActionManager that can execute and track actions.
Expand All @@ -134,6 +140,11 @@ def __init__(
on_action_start: Optional hook called when action starts.
on_action_end: Optional hook called when action ends.
get_parent_id: Optional hook to resolve parent_id from task context.
action_output_store: Deterministic per-session archive of action
outputs. When supplied, every invocation is persisted, the
LLM's ``$ref`` markers in parameters are resolved against it
before the handler runs, and big outputs collapse to a shape
summary in the event stream.
"""
self.action_library = action_library
self.llm_interface = llm_interface
Expand All @@ -142,6 +153,7 @@ def __init__(
self.context_engine = context_engine
self.state_manager = state_manager
self.executor = ActionExecutor()
self.action_output_store = action_output_store

# Track in-flight actions
self._inflight: Dict[str, Dict] = {}
Expand Down Expand Up @@ -236,6 +248,10 @@ async def execute_action(
logger.debug(f"[INPUT DATA] {input_data}")
run_id = str(uuid.uuid4())
started_at = datetime.utcnow().isoformat()
# Stable reference key the LLM uses in any future ``$ref`` for this
# invocation. Computed up-front so action_start, action_end, and the
# archive all stamp the same value, removing the LLM's need to guess.
action_key = make_key(action.name, run_id)

# Resolve parent_id using hook if available
if not parent_id and self._get_parent_id:
Expand Down Expand Up @@ -268,11 +284,22 @@ async def execute_action(
# Log to event stream
# Only pass session_id when is_running_task=True (task stream exists)
# When no task exists, use global stream by not passing task_id
pretty_input = _to_pretty_json(input_data)
#
# Strip ``_``-prefixed plumbing keys (e.g. ``_session_id``) from what
# we log. They are internal channels used to hand context to the
# action sandbox; surfacing them in the event stream would leak
# internal state into every downstream action-selection prompt.
pretty_input = _to_pretty_json(
{k: v for k, v in input_data.items() if not k.startswith("_")}
)
self._log_event_stream(
is_gui_task=is_gui_task,
event_type="action_start",
event=f"Running action {action.name} with input: {pretty_input}.",
event=(
f"Running action {action_key} with input: {pretty_input}. "
f"(Reference this run later via "
f'{{"$ref": "{action_key}", "path": "..."}})'
),
display_message=f"Running {action.display_name}",
action_name=action.name,
# Always pass session_id when present so the event_stream_manager can route
Expand All @@ -284,6 +311,22 @@ async def execute_action(
session_id=session_id,
)

# Resolve ``$ref`` markers the LLM may have placed in parameters.
# The event-stream log above intentionally records the *unresolved*
# input so the agent's history shows the compact references that were
# actually emitted; only the handler-side ``input_data`` carries the
# materialised values.
if self.action_output_store and session_id:
try:
input_data = resolve_refs(
input_data, self.action_output_store, session_id
)
except Exception as exc:
logger.warning(
f"[ACTION] Failed to resolve $ref markers for {action.name} "
f"(session_id={session_id}): {exc}"
)

logger.debug(f"Starting execution of action {action.name}...")

try:
Expand Down Expand Up @@ -373,6 +416,29 @@ async def execute_action(
# 3. Persist final state
# ────────────────────────────────────────────────────────────────

# Always-on archive: one file per invocation at a deterministic path.
# Big outputs collapse to a shape summary + ``$ref`` instructions in
# the event stream; the full payload lives on disk for the LLM to
# navigate via ``$ref`` on subsequent steps.
archive_record = None
archive_path = None
if self.action_output_store and session_id:
archive_record = self.action_output_store.record(
session_id=session_id,
action_name=action.name,
run_id=run_id,
outputs=outputs if isinstance(outputs, dict) else {"value": outputs},
started_at=started_at,
ended_at=ended_at,
status=status,
)
if archive_record is not None:
archive_path = str(
self.action_output_store.record_path(
session_id, action.name, run_id
)
)

logger.info(f"Action {action.name} completed with status: {status}.")

# Log to event stream
Expand All @@ -381,11 +447,19 @@ async def execute_action(
display_status = (
"failed" if (status == "error" or output_has_error) else "completed"
)
pretty_output = _to_pretty_json(outputs)
pretty_output = render_output_for_event_stream(
outputs,
file_path=archive_path,
record_key=archive_record.key if archive_record else None,
)
self._log_event_stream(
is_gui_task=is_gui_task,
event_type="action_end",
event=f"Action {action.name} completed with output: {pretty_output}.",
event=(
f"Action {action_key} completed with output: {pretty_output}. "
f"(Reference this output via "
f'{{"$ref": "{action_key}", "path": "..."}})'
),
display_message=f"{action.display_name} → {display_status}",
action_name=action.name,
# Always pass session_id when present so the event_stream_manager can route
Expand Down
Loading