Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
)
from agent_core.core.impl.llm import LLMCallType
from agent_core.core.impl.trigger import TriggerQueue
from agent_core.core.impl.workflow_lock import WorkflowLockManager
from agent_core.core.impl.event_stream import (
EventStream,
EventStreamManager,
Expand Down Expand Up @@ -369,6 +370,7 @@
"create_memory_processing_task",
"LLMCallType",
"TriggerQueue",
"WorkflowLockManager",
"EventStream",
"EventStreamManager",
# Prompts - Registry
Expand Down
44 changes: 34 additions & 10 deletions agent_core/core/impl/memory/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,11 @@ def _compute_content_hash(content: str) -> str:
# ───────────────────────────── Task Creation Helper ─────────────────────────────


def create_memory_processing_task(task_manager) -> str:
def create_memory_processing_task(
task_manager,
needs_pruning: bool = False,
prune_target: int = 100,
) -> str:
"""
Create a task to process unprocessed events into distilled memories.

Expand All @@ -884,26 +888,46 @@ def create_memory_processing_task(task_manager) -> str:
- Check for duplicate memories
- Write to MEMORY.md in strict format
- Clear processed events
- Optionally prune MEMORY.md when it has grown past the configured cap

Args:
task_manager: The TaskManager instance to create the task with
needs_pruning: True when MEMORY.md has reached the max-items threshold
and the task should also run the pruning phase after distillation.
prune_target: Approximate number of oldest items the pruning phase
should consolidate or drop.

Returns:
The task ID of the created task
"""
instruction = (
"SILENT BACKGROUND TASK - NEVER use send_message or run_python. "
"Read agent_file_system/EVENT_UNPROCESSED.md. "
"DISTILL (rewrite, don't copy) into agent_file_system/MEMORY.md. "
"Format: [YYYY-MM-DD HH:MM:SS] [category] Subject predicate object. "
"DISCARD 95%+ events. Agent messages and greetings are ALWAYS discarded. "
"Each memory item must be <= 150 words. "
"Use stream_edit only. Never write code."
)

if needs_pruning:
instruction += (
f" MEMORY.md has reached the item-count cap. After processing events, "
f"run the Pruning phase: remove the FIRST (oldest) ~{prune_target} items "
f"from the items section — they appear at the top, immediately after the header block. "
f"Merge related items about the same subject before dropping, then drop duplicates "
f"and low-utility items. Preserve high-utility items regardless of age. "
f"The header block must NOT be modified. Keep only the newest items (bottom of file). "
f"Target: remove at least {prune_target} items so only the latest 1/3 remain."
)

return task_manager.create_task(
task_name="Process Memory Events",
task_instruction=(
"SILENT BACKGROUND TASK - NEVER use send_message or run_python. "
"Read agent_file_system/EVENT_UNPROCESSED.md. "
"DISTILL (rewrite, don't copy) into agent_file_system/MEMORY.md. "
"Format: [YYYY-MM-DD HH:MM:SS] [category] Subject predicate object. "
"DISCARD 95%+ events. Agent messages and greetings are ALWAYS discarded. "
"Use stream_edit only. Never write code."
),
task_instruction=instruction,
mode="complex",
action_sets=["file_operations"],
selected_skills=["memory-processor"]
selected_skills=["memory-processor"],
workflow_id="memory_processing",
)


Expand Down
20 changes: 20 additions & 0 deletions agent_core/core/impl/task/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

if TYPE_CHECKING:
from agent_core.core.state.base import StateManagerBase
from agent_core.core.impl.workflow_lock import WorkflowLockManager

# Set up logger - use shared agent_core logger for consistency
from agent_core.utils.logger import logger
Expand Down Expand Up @@ -106,6 +107,8 @@ def __init__(
on_todo_transition: Optional[OnTodoTransitionHook] = None,
on_task_ended_chatserver: Optional[OnTaskEndedChatserverHook] = None,
finalize_todos_chatserver: Optional[FinalizeTodosChatserverHook] = None,
# Workflow-lock registry for auto-release on task end
workflow_lock_manager: Optional["WorkflowLockManager"] = None,
):
"""
Initialize the task manager.
Expand Down Expand Up @@ -177,6 +180,9 @@ def __init__(
self._on_task_ended_chatserver = on_task_ended_chatserver
self._finalize_todos_chatserver = finalize_todos_chatserver

# Workflow-lock registry (optional)
self.workflow_lock_manager = workflow_lock_manager

@property
def active(self) -> Optional[Task]:
"""Current session's task.
Expand Down Expand Up @@ -228,6 +234,7 @@ def create_task(
session_id: Optional[str] = None,
original_query: Optional[str] = None,
original_platform: Optional[str] = None,
workflow_id: Optional[str] = None,
) -> str:
"""
Create a new task without LLM planning.
Expand Down Expand Up @@ -283,6 +290,7 @@ def create_task(
selected_skills=selected_skills or [],
conversation_id=conversation_id,
source_platform=original_platform,
workflow_id=workflow_id,
)

self.tasks[task_id] = task
Expand Down Expand Up @@ -627,6 +635,18 @@ async def _end_task(
if self.state_manager:
self.state_manager.on_task_ended(task, status, summary)

# Release any workflow lock this task was holding. Runs regardless of
# terminal status (completed / error / cancelled) so a crashed task
# never leaves its workflow wedged.
if self.workflow_lock_manager and task.workflow_id:
try:
await self.workflow_lock_manager.release(task.workflow_id)
except Exception as e:
logger.warning(
f"[TaskManager] Failed to release workflow lock "
f"'{task.workflow_id}' for task {task.id}: {e}"
)

# Remove task from dict and clean up event stream
self.tasks.pop(task.id, None)
if self._current_session_id == task.id:
Expand Down
6 changes: 6 additions & 0 deletions agent_core/core/impl/workflow_lock/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""Workflow lock registry — prevents overlapping execution of named workflows."""

from agent_core.core.impl.workflow_lock.manager import WorkflowLockManager

__all__ = ["WorkflowLockManager"]
67 changes: 67 additions & 0 deletions agent_core/core/impl/workflow_lock/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
"""WorkflowLockManager — exclusive locks for named background workflows.

A *workflow* is any recurring background activity that must not run concurrently
with another instance of itself (e.g. memory processing, proactive cycles).
Each workflow is identified by a stable string. At most one task may own a
given workflow lock at a time.

Typical usage:

if not await locks.try_acquire("memory_processing"):
logger.info("workflow already active; skipping")
return

try:
task_id = task_manager.create_task(..., workflow_id="memory_processing")
# TaskManager auto-releases the lock in its _end_task funnel when the
# task terminates (completed / error / cancelled).
except Exception:
# Release on any failure before the task takes ownership.
await locks.release("memory_processing")
raise

The manager is safe for concurrent callers inside a single asyncio event loop
because every mutation is guarded by an internal ``asyncio.Lock``.
"""

from __future__ import annotations

import asyncio
from typing import FrozenSet, Set


class WorkflowLockManager:
"""Registry of exclusive locks for named background workflows."""

def __init__(self) -> None:
self._held: Set[str] = set()
self._mutex = asyncio.Lock()

async def try_acquire(self, workflow_id: str) -> bool:
"""Attempt to acquire the lock for ``workflow_id``.

Returns True on success, False if another holder already owns it.
"""
if not workflow_id:
raise ValueError("workflow_id must be a non-empty string")
async with self._mutex:
if workflow_id in self._held:
return False
self._held.add(workflow_id)
return True

async def release(self, workflow_id: str) -> None:
"""Release the lock for ``workflow_id``. Idempotent."""
if not workflow_id:
return
async with self._mutex:
self._held.discard(workflow_id)

def is_locked(self, workflow_id: str) -> bool:
"""Non-blocking check — True iff a holder currently owns ``workflow_id``."""
return workflow_id in self._held

def active_workflows(self) -> FrozenSet[str]:
"""Snapshot of all currently-held workflow ids."""
return frozenset(self._held)
5 changes: 5 additions & 0 deletions agent_core/core/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class Task:
waiting_for_user_reply: bool = False
# Platform that started (or most recently resumed) this task — outbound messages route here
source_platform: Optional[str] = None
# Named background workflow this task runs on behalf of (e.g. "memory_processing").
# When set, the TaskManager auto-releases the corresponding lock on task end.
workflow_id: Optional[str] = None

def get_current_todo(self) -> Optional[TodoItem]:
"""
Expand Down Expand Up @@ -117,6 +120,7 @@ def to_dict(self) -> Dict[str, Any]:
"chatserver_action_id": self.chatserver_action_id,
"waiting_for_user_reply": self.waiting_for_user_reply,
"source_platform": self.source_platform,
"workflow_id": self.workflow_id,
}

@classmethod
Expand Down Expand Up @@ -144,4 +148,5 @@ def from_dict(cls, data: Dict[str, Any]) -> "Task":
chatserver_action_id=data.get("chatserver_action_id"),
waiting_for_user_reply=data.get("waiting_for_user_reply", False),
source_platform=data.get("source_platform"),
workflow_id=data.get("workflow_id"),
)
Loading