diff --git a/agent_core/__init__.py b/agent_core/__init__.py index d0757090..b7badbdc 100644 --- a/agent_core/__init__.py +++ b/agent_core/__init__.py @@ -161,6 +161,7 @@ ) from agent_core.core.impl.llm import LLMCallType from agent_core.core.impl.trigger import TriggerQueue +from agent_core.core.impl.workflow_lock import WorkflowLockManager from agent_core.core.impl.event_stream import ( EventStream, EventStreamManager, @@ -369,6 +370,7 @@ "create_memory_processing_task", "LLMCallType", "TriggerQueue", + "WorkflowLockManager", "EventStream", "EventStreamManager", # Prompts - Registry diff --git a/agent_core/core/impl/memory/manager.py b/agent_core/core/impl/memory/manager.py index 5b491864..ff103391 100644 --- a/agent_core/core/impl/memory/manager.py +++ b/agent_core/core/impl/memory/manager.py @@ -874,7 +874,11 @@ def _compute_content_hash(content: str) -> str: # ───────────────────────────── Task Creation Helper ───────────────────────────── -def create_memory_processing_task(task_manager) -> str: +def create_memory_processing_task( + task_manager, + needs_pruning: bool = False, + prune_target: int = 100, +) -> str: """ Create a task to process unprocessed events into distilled memories. @@ -884,26 +888,46 @@ def create_memory_processing_task(task_manager) -> str: - Check for duplicate memories - Write to MEMORY.md in strict format - Clear processed events + - Optionally prune MEMORY.md when it has grown past the configured cap Args: task_manager: The TaskManager instance to create the task with + needs_pruning: True when MEMORY.md has reached the max-items threshold + and the task should also run the pruning phase after distillation. + prune_target: Approximate number of oldest items the pruning phase + should consolidate or drop. Returns: The task ID of the created task """ + instruction = ( + "SILENT BACKGROUND TASK - NEVER use send_message or run_python. " + "Read agent_file_system/EVENT_UNPROCESSED.md. " + "DISTILL (rewrite, don't copy) into agent_file_system/MEMORY.md. " + "Format: [YYYY-MM-DD HH:MM:SS] [category] Subject predicate object. " + "DISCARD 95%+ events. Agent messages and greetings are ALWAYS discarded. " + "Each memory item must be <= 150 words. " + "Use stream_edit only. Never write code." + ) + + if needs_pruning: + instruction += ( + f" MEMORY.md has reached the item-count cap. After processing events, " + f"run the Pruning phase: remove the FIRST (oldest) ~{prune_target} items " + f"from the items section — they appear at the top, immediately after the header block. " + f"Merge related items about the same subject before dropping, then drop duplicates " + f"and low-utility items. Preserve high-utility items regardless of age. " + f"The header block must NOT be modified. Keep only the newest items (bottom of file). " + f"Target: remove at least {prune_target} items so only the latest 1/3 remain." + ) + return task_manager.create_task( task_name="Process Memory Events", - task_instruction=( - "SILENT BACKGROUND TASK - NEVER use send_message or run_python. " - "Read agent_file_system/EVENT_UNPROCESSED.md. " - "DISTILL (rewrite, don't copy) into agent_file_system/MEMORY.md. " - "Format: [YYYY-MM-DD HH:MM:SS] [category] Subject predicate object. " - "DISCARD 95%+ events. Agent messages and greetings are ALWAYS discarded. " - "Use stream_edit only. Never write code." - ), + task_instruction=instruction, mode="complex", action_sets=["file_operations"], - selected_skills=["memory-processor"] + selected_skills=["memory-processor"], + workflow_id="memory_processing", ) diff --git a/agent_core/core/impl/task/manager.py b/agent_core/core/impl/task/manager.py index 0e388374..493c2feb 100644 --- a/agent_core/core/impl/task/manager.py +++ b/agent_core/core/impl/task/manager.py @@ -38,6 +38,7 @@ if TYPE_CHECKING: from agent_core.core.state.base import StateManagerBase + from agent_core.core.impl.workflow_lock import WorkflowLockManager # Set up logger - use shared agent_core logger for consistency from agent_core.utils.logger import logger @@ -106,6 +107,8 @@ def __init__( on_todo_transition: Optional[OnTodoTransitionHook] = None, on_task_ended_chatserver: Optional[OnTaskEndedChatserverHook] = None, finalize_todos_chatserver: Optional[FinalizeTodosChatserverHook] = None, + # Workflow-lock registry for auto-release on task end + workflow_lock_manager: Optional["WorkflowLockManager"] = None, ): """ Initialize the task manager. @@ -177,6 +180,9 @@ def __init__( self._on_task_ended_chatserver = on_task_ended_chatserver self._finalize_todos_chatserver = finalize_todos_chatserver + # Workflow-lock registry (optional) + self.workflow_lock_manager = workflow_lock_manager + @property def active(self) -> Optional[Task]: """Current session's task. @@ -228,6 +234,7 @@ def create_task( session_id: Optional[str] = None, original_query: Optional[str] = None, original_platform: Optional[str] = None, + workflow_id: Optional[str] = None, ) -> str: """ Create a new task without LLM planning. @@ -283,6 +290,7 @@ def create_task( selected_skills=selected_skills or [], conversation_id=conversation_id, source_platform=original_platform, + workflow_id=workflow_id, ) self.tasks[task_id] = task @@ -627,6 +635,18 @@ async def _end_task( if self.state_manager: self.state_manager.on_task_ended(task, status, summary) + # Release any workflow lock this task was holding. Runs regardless of + # terminal status (completed / error / cancelled) so a crashed task + # never leaves its workflow wedged. + if self.workflow_lock_manager and task.workflow_id: + try: + await self.workflow_lock_manager.release(task.workflow_id) + except Exception as e: + logger.warning( + f"[TaskManager] Failed to release workflow lock " + f"'{task.workflow_id}' for task {task.id}: {e}" + ) + # Remove task from dict and clean up event stream self.tasks.pop(task.id, None) if self._current_session_id == task.id: diff --git a/agent_core/core/impl/workflow_lock/__init__.py b/agent_core/core/impl/workflow_lock/__init__.py new file mode 100644 index 00000000..62bcb647 --- /dev/null +++ b/agent_core/core/impl/workflow_lock/__init__.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- +"""Workflow lock registry — prevents overlapping execution of named workflows.""" + +from agent_core.core.impl.workflow_lock.manager import WorkflowLockManager + +__all__ = ["WorkflowLockManager"] diff --git a/agent_core/core/impl/workflow_lock/manager.py b/agent_core/core/impl/workflow_lock/manager.py new file mode 100644 index 00000000..e7229cfe --- /dev/null +++ b/agent_core/core/impl/workflow_lock/manager.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +"""WorkflowLockManager — exclusive locks for named background workflows. + +A *workflow* is any recurring background activity that must not run concurrently +with another instance of itself (e.g. memory processing, proactive cycles). +Each workflow is identified by a stable string. At most one task may own a +given workflow lock at a time. + +Typical usage: + + if not await locks.try_acquire("memory_processing"): + logger.info("workflow already active; skipping") + return + + try: + task_id = task_manager.create_task(..., workflow_id="memory_processing") + # TaskManager auto-releases the lock in its _end_task funnel when the + # task terminates (completed / error / cancelled). + except Exception: + # Release on any failure before the task takes ownership. + await locks.release("memory_processing") + raise + +The manager is safe for concurrent callers inside a single asyncio event loop +because every mutation is guarded by an internal ``asyncio.Lock``. +""" + +from __future__ import annotations + +import asyncio +from typing import FrozenSet, Set + + +class WorkflowLockManager: + """Registry of exclusive locks for named background workflows.""" + + def __init__(self) -> None: + self._held: Set[str] = set() + self._mutex = asyncio.Lock() + + async def try_acquire(self, workflow_id: str) -> bool: + """Attempt to acquire the lock for ``workflow_id``. + + Returns True on success, False if another holder already owns it. + """ + if not workflow_id: + raise ValueError("workflow_id must be a non-empty string") + async with self._mutex: + if workflow_id in self._held: + return False + self._held.add(workflow_id) + return True + + async def release(self, workflow_id: str) -> None: + """Release the lock for ``workflow_id``. Idempotent.""" + if not workflow_id: + return + async with self._mutex: + self._held.discard(workflow_id) + + def is_locked(self, workflow_id: str) -> bool: + """Non-blocking check — True iff a holder currently owns ``workflow_id``.""" + return workflow_id in self._held + + def active_workflows(self) -> FrozenSet[str]: + """Snapshot of all currently-held workflow ids.""" + return frozenset(self._held) diff --git a/agent_core/core/task/task.py b/agent_core/core/task/task.py index 3051823e..c90c3a21 100644 --- a/agent_core/core/task/task.py +++ b/agent_core/core/task/task.py @@ -71,6 +71,9 @@ class Task: waiting_for_user_reply: bool = False # Platform that started (or most recently resumed) this task — outbound messages route here source_platform: Optional[str] = None + # Named background workflow this task runs on behalf of (e.g. "memory_processing"). + # When set, the TaskManager auto-releases the corresponding lock on task end. + workflow_id: Optional[str] = None def get_current_todo(self) -> Optional[TodoItem]: """ @@ -117,6 +120,7 @@ def to_dict(self) -> Dict[str, Any]: "chatserver_action_id": self.chatserver_action_id, "waiting_for_user_reply": self.waiting_for_user_reply, "source_platform": self.source_platform, + "workflow_id": self.workflow_id, } @classmethod @@ -144,4 +148,5 @@ def from_dict(cls, data: Dict[str, Any]) -> "Task": chatserver_action_id=data.get("chatserver_action_id"), waiting_for_user_reply=data.get("waiting_for_user_reply", False), source_platform=data.get("source_platform"), + workflow_id=data.get("workflow_id"), ) diff --git a/app/agent_base.py b/app/agent_base.py index 94709a5a..d56825cf 100644 --- a/app/agent_base.py +++ b/app/agent_base.py @@ -55,7 +55,13 @@ from app.vlm_interface import VLMInterface from app.database_interface import DatabaseInterface from app.logger import logger -from agent_core import MemoryManager, MemoryPointer, MemoryFileWatcher, create_memory_processing_task +from agent_core import ( + MemoryManager, + MemoryPointer, + MemoryFileWatcher, + create_memory_processing_task, + WorkflowLockManager, +) from app.context_engine import ContextEngine from app.state.state_manager import StateManager from app.state.agent_state import STATE @@ -69,7 +75,12 @@ from app.gui.handler import GUIHandler from app.scheduler import SchedulerManager from app.proactive import initialize_proactive_manager, get_proactive_manager -from app.ui_layer.settings.memory_settings import is_memory_enabled +from app.ui_layer.settings.memory_settings import ( + is_memory_enabled, + _parse_memory_items, + get_memory_max_items, + get_memory_prune_target, +) from agent_core import profile, profile_loop, OperationCategory from agent_core import ( # Registries for dependency injection @@ -203,6 +214,11 @@ def __init__( ) self.action_router = ActionRouter(self.action_library, self.llm, self.context_engine) + # Workflow lock registry — prevents overlapping runs of named background + # workflows (e.g. memory processing, proactive cycle). Locks are released + # automatically when the owning task ends. + self.workflow_lock_manager = WorkflowLockManager() + self.task_manager = TaskManager( db_interface=self.db_interface, event_stream_manager=self.event_stream_manager, @@ -210,6 +226,7 @@ def __init__( llm_interface=self.llm, context_engine=self.context_engine, on_task_end_callback=self._cleanup_session_triggers, + workflow_lock_manager=self.workflow_lock_manager, ) # Bind task_manager so state_manager can look up tasks by session_id @@ -444,7 +461,11 @@ async def react(self, trigger: Trigger) -> None: # Memory Processing # ===================================== - def create_process_memory_task(self) -> Optional[str]: + def create_process_memory_task( + self, + needs_pruning: bool = False, + prune_target: int = 0, + ) -> Optional[str]: """ Create a task to process unprocessed events and move them to memory. @@ -455,6 +476,7 @@ def create_process_memory_task(self) -> Optional[str]: 3. Check for duplicate memories using memory_search 4. Write important, unique events to MEMORY.md 5. Clear processed events from EVENT_UNPROCESSED.md + 6. If needs_pruning, run the pruning phase on MEMORY.md afterwards Returns: The task ID of the created task, or None if memory is disabled. @@ -464,7 +486,10 @@ def create_process_memory_task(self) -> Optional[str]: logger.info("[MEMORY] Memory is disabled, skipping process memory task") return None - logger.info("[MEMORY] Creating process memory task") + logger.info( + "[MEMORY] Creating process memory task" + + (" with pruning phase" if needs_pruning else "") + ) # Enable skip_unprocessed_logging to prevent infinite loops # (events generated during memory processing won't be added to EVENT_UNPROCESSED.md) @@ -472,7 +497,11 @@ def create_process_memory_task(self) -> Optional[str]: self.event_stream_manager.set_skip_unprocessed_logging(True) # Create task using the memory-processor skill - task_id = create_memory_processing_task(self.task_manager) + task_id = create_memory_processing_task( + self.task_manager, + needs_pruning=needs_pruning, + prune_target=prune_target, + ) logger.info(f"[MEMORY] Process memory task created: {task_id}") return task_id @@ -549,41 +578,85 @@ async def _handle_memory_processing_trigger(self) -> bool: logger.info("[MEMORY] Memory is disabled, skipping memory processing trigger") return False - task_created = False + # Early-exit if there's nothing to process (avoid touching the lock for a no-op). + unprocessed_file = AGENT_FILE_SYSTEM_PATH / "EVENT_UNPROCESSED.md" + if not unprocessed_file.exists(): + logger.debug("[MEMORY] EVENT_UNPROCESSED.md not found") + return False try: - # Check if there are events to process - unprocessed_file = AGENT_FILE_SYSTEM_PATH / "EVENT_UNPROCESSED.md" - if unprocessed_file.exists(): - content = unprocessed_file.read_text(encoding="utf-8") - lines = content.strip().split("\n") - event_lines = [l for l in lines if l.strip() and l.strip().startswith("[")] - - if event_lines: - logger.info(f"[MEMORY] Processing {len(event_lines)} unprocessed events") - task_id = self.create_process_memory_task() - - if task_id: - # Queue trigger to start the task - trigger = Trigger( - fire_at=time.time(), - priority=60, - next_action_description="Process unprocessed events into long-term memory", - session_id=task_id, - payload={}, + content = unprocessed_file.read_text(encoding="utf-8") + except Exception as e: + logger.warning(f"[MEMORY] Failed to read EVENT_UNPROCESSED.md: {e}") + return False + + event_lines = [ + l for l in content.strip().split("\n") + if l.strip() and l.strip().startswith("[") + ] + if not event_lines: + logger.info("[MEMORY] No unprocessed events to process") + return False + + # Acquire the exclusive workflow lock. If another memory-processing task + # is still running (e.g. a slow prior run when 3am fires), skip this + # trigger — the lock is released automatically by TaskManager._end_task. + if not await self.workflow_lock_manager.try_acquire("memory_processing"): + logger.info( + "[MEMORY] memory_processing workflow already active; skipping trigger" + ) + return False + + try: + # Count items in MEMORY.md to decide whether the pruning phase + # should run alongside event processing. + max_items = get_memory_max_items() + needs_pruning = False + memory_file = AGENT_FILE_SYSTEM_PATH / "MEMORY.md" + if memory_file.exists(): + try: + memory_items = _parse_memory_items( + memory_file.read_text(encoding="utf-8") + ) + if len(memory_items) >= max_items: + needs_pruning = True + logger.info( + f"[MEMORY] MEMORY.md has {len(memory_items)} items " + f"(>= {max_items}); pruning phase will run" ) - await self.triggers.put(trigger) - logger.info(f"[MEMORY] Queued trigger for memory processing task: {task_id}") - task_created = True - else: - logger.info("[MEMORY] No unprocessed events to process") - else: - logger.debug("[MEMORY] EVENT_UNPROCESSED.md not found") + except Exception as e: + logger.warning(f"[MEMORY] Failed to count MEMORY.md items: {e}") + + logger.info(f"[MEMORY] Processing {len(event_lines)} unprocessed events") + task_id = self.create_process_memory_task( + needs_pruning=needs_pruning, + prune_target=get_memory_prune_target(), + ) + + if not task_id: + # Task was not created (e.g. memory disabled mid-trigger). Release + # the lock so the next trigger can try again. + await self.workflow_lock_manager.release("memory_processing") + return False + + # Queue trigger to start the task. Lock is now owned by the task and + # will be released by TaskManager when the task ends. + trigger = Trigger( + fire_at=time.time(), + priority=60, + next_action_description="Process unprocessed events into long-term memory", + session_id=task_id, + payload={}, + ) + await self.triggers.put(trigger) + logger.info(f"[MEMORY] Queued trigger for memory processing task: {task_id}") + return True except Exception as e: + # Anything went wrong before the task took ownership — release the lock. logger.warning(f"[MEMORY] Failed to process memory: {e}") - - return task_created + await self.workflow_lock_manager.release("memory_processing") + return False # ===================================== # Workflow Routing diff --git a/app/config/settings.json b/app/config/settings.json index 669d7ebd..72de6062 100644 --- a/app/config/settings.json +++ b/app/config/settings.json @@ -8,7 +8,10 @@ "enabled": true }, "memory": { - "enabled": true + "enabled": true, + "max_items": 200, + "prune_target": 135, + "item_word_limit": 150 }, "model": { "llm_provider": "byteplus", diff --git a/app/task/task_manager.py b/app/task/task_manager.py index cb338ea5..b237ce65 100644 --- a/app/task/task_manager.py +++ b/app/task/task_manager.py @@ -21,6 +21,7 @@ if TYPE_CHECKING: from app.llm import LLMInterface from app.context_engine import ContextEngine + from agent_core.core.impl.workflow_lock import WorkflowLockManager def _get_gui_mode() -> bool: @@ -91,6 +92,7 @@ def __init__( llm_interface: Optional["LLMInterface"] = None, context_engine: Optional["ContextEngine"] = None, on_task_end_callback: Optional[Callable[[str], Awaitable[None]]] = None, + workflow_lock_manager: Optional["WorkflowLockManager"] = None, ): super().__init__( db_interface=db_interface, @@ -118,6 +120,8 @@ def __init__( on_todo_transition=None, on_task_ended_chatserver=None, finalize_todos_chatserver=None, + # Workflow lock registry for auto-release on task end + workflow_lock_manager=workflow_lock_manager, ) diff --git a/app/ui_layer/settings/memory_settings.py b/app/ui_layer/settings/memory_settings.py index 86969943..9dc06f0a 100644 --- a/app/ui_layer/settings/memory_settings.py +++ b/app/ui_layer/settings/memory_settings.py @@ -23,6 +23,12 @@ r'^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]\s+\[(\w+)\]\s+(.+)$' ) +# Memory size and length thresholds — live-read from settings.json via the +# getter functions below, so values can be tuned without a code change. +# Defaults kick in only when a key is missing from settings.json. +_MEMORY_MAX_ITEMS_DEFAULT = 200 +_MEMORY_PRUNE_TARGET_DEFAULT = 135 +_MEMORY_ITEM_WORD_LIMIT_DEFAULT = 150 # ───────────────────────────────────────────────────────────────────── # Memory Mode Control @@ -69,6 +75,33 @@ def is_memory_enabled() -> bool: return settings.get("memory", {}).get("enabled", True) +def get_memory_max_items() -> int: + """Upper bound on MEMORY.md item count before pruning kicks in.""" + return int( + _load_settings().get("memory", {}).get( + "max_items", _MEMORY_MAX_ITEMS_DEFAULT + ) + ) + + +def get_memory_prune_target() -> int: + """Approximate number of oldest items the pruning phase should remove.""" + return int( + _load_settings().get("memory", {}).get( + "prune_target", _MEMORY_PRUNE_TARGET_DEFAULT + ) + ) + + +def get_memory_item_word_limit() -> int: + """Maximum words allowed per distilled memory item.""" + return int( + _load_settings().get("memory", {}).get( + "item_word_limit", _MEMORY_ITEM_WORD_LIMIT_DEFAULT + ) + ) + + def get_memory_mode() -> Dict[str, Any]: """Get the current memory mode status. diff --git a/skills/memory-processor/SKILL.md b/skills/memory-processor/SKILL.md index e43fa3d2..ebdc67a1 100644 --- a/skills/memory-processor/SKILL.md +++ b/skills/memory-processor/SKILL.md @@ -83,6 +83,16 @@ Note: Get actual names from existing MEMORY.md. Never use "user", "conversation - Check MEMORY.md before saving. Skip if similar memory exists. - Actively remove memories you found duplicated in MEMORY.md, keeping only the latest one. +### Length Limit (Strict) + +Each memory item MUST be <= 150 words, counted on the text AFTER the `[category]` tag. +If a distillation would exceed 150 words, compress further: +- Drop filler, restatements, and incidental detail +- Keep only the lasting-value core: subject, predicate, object, key qualifier +- If still too long, split into two atomic memories OR drop the less-important half + +Never truncate mid-sentence; never end an item with `...`. + ### CRITICAL DISTILLATION RULES **Core principle:** Memory is for LASTING PERSONAL INSIGHTS that improve future interactions, not event logging. @@ -143,3 +153,58 @@ Only save the memory if it contains lasting value: ``` **Result:** 50 events → 1 memory, progress tracked via todos + +### Example: compressing a verbose event to <= 150 words + +**Input (raw, rambling):** +``` +[2026-03-12 10:04:22] [user message]: so i was thinking about the trip next month, we're still planning to fly to tokyo on april 18th, staying at the shinjuku hilton for six nights, my wife emma is coming, and we also want to try the sushi place called sukiyabashi jiro that my friend kenji recommended, oh and the flight is on ANA from LAX, departing 10:55am, i need to remember to pack the camera and charger, and i'm a bit anxious because last time i forgot my passport +``` + +**Output (<= 150 words, two atomic memories):** +``` +[2026-03-12 10:04:22] [event] John and Emma fly Tokyo on 2026-04-18 via ANA from LAX 10:55am, staying Shinjuku Hilton 6 nights +[2026-03-12 10:04:22] [preference] John wants to try Sukiyabashi Jiro sushi in Tokyo (recommended by Kenji) +``` + +Anxiety, packing reminder, and narrative framing are dropped — no lasting utility. + +## Pruning Mode + +Triggered when the task instruction contains a "Pruning phase" directive (MEMORY.md has reached the item-count cap). + +### Workflow + +Add these todos alongside the event-processing todos: + +``` +N+1. [pending] Read MEMORY.md header + oldest block +N+2. [pending] Consolidate/merge/drop oldest items +N+3. [pending] Replace oldest block in MEMORY.md +``` + +Execute AFTER event processing completes: + +1. `stream_read` MEMORY.md from line 11 (skip the header block) up to the oldest-N range indicated in the task instruction. +2. Decide, item by item, what to merge / drop / keep. See ranking heuristics below. The 150-word limit still applies to every merged item. +3. `stream_edit` MEMORY.md to replace the oldest block with the consolidated set. The `# Memory Log` / `## Overview` / `## Memory` header (lines 1-10) must remain intact. + +### Ranking heuristics (drop priority, highest first) + +1. Stale `[event]` items whose date has passed and have no lasting consequence. +2. Duplicate or near-duplicate facts about the same subject — keep the most recent/complete one; merge timestamps into a single canonical entry. +3. Weak-signal preferences or one-off observations that fail the Future Utility Test. +4. Superseded items (older preference that a newer item contradicts) — keep the newer one. + +### Never drop + +- Personal identity facts (name, role, relationships) +- Contact information +- Stated hard preferences (allergies, strict dislikes, required workflows) +- Active commitments and goals with a future date + +### Critical + +- No hard-coded "drop oldest N" rule — judge utility first, use age only as a tiebreaker. +- Target a final item count at least `prune_target` below the pre-prune count (the task instruction states the exact number). +- When in doubt, merge rather than drop.