diff --git a/docs/content/docs/(core)/cortex.mdx b/docs/content/docs/(core)/cortex.mdx index 41aef7c5f..f2a77cf30 100644 --- a/docs/content/docs/(core)/cortex.mdx +++ b/docs/content/docs/(core)/cortex.mdx @@ -1,6 +1,6 @@ --- title: Cortex -description: System-level observer that maintains memory bulletins and monitors agent health. +description: System-level observer that maintains memory bulletins, runs maintenance, and monitors agent health. --- # The Cortex @@ -84,26 +84,37 @@ These serve different purposes: The bulletin doesn't replace recall — it reduces how often recall is needed. A channel that already knows the user's name, their current project, and recent decisions from the bulletin doesn't need to spawn a branch for basic context. -## Future Responsibilities +## Current Responsibilities -The bulletin is the cortex's first and most impactful responsibility. The following capabilities are designed but not yet implemented: +The bulletin is still the cortex's most visible responsibility, but it is no longer the only one. ### System Health Monitoring -The cortex monitors running processes and keeps the system clean: +The cortex already supervises long-running process state: + +- **Worker supervision** — detect hanging workers, apply timeout policies, and clean up detached worker state +- **Branch supervision** — detect stale branches and enforce timeout/cancellation policy +- **Circuit breakers** — recurring bulletin refresh and maintenance failures back off automatically instead of retrying forever + +### Memory Maintenance + +The cortex also runs periodic graph hygiene: + +- **Decay** — reduce importance for stale non-identity memories +- **Prune** — delete memories that have fallen below the configured importance floor and age threshold +- **Merge** — combine near-duplicate memories and rewire graph associations atomically + +## Future Responsibilities -- **Worker supervision** — detect hanging workers, kill error loops, clean up stale state -- **Branch supervision** — kill stale branches, track latency trends -- **Channel health** — flag channels approaching context limits faster than compactors can manage -- **Circuit breakers** — after 3 consecutive failures of the same type, disable the failing component +The remaining cortex roadmap is about richer cross-system inference, not basic supervision: ### Memory Coherence -The cortex sees memory activity across all channels and maintains the graph: +The cortex sees memory activity across all channels and can grow into deeper graph stewardship: -- **Consolidation** — merge overlapping memories, create cross-channel associations -- **Maintenance** — decay old memories, prune low-importance orphans, recompute centrality -- **Observations** — generate observation-type memories from cross-channel patterns +- **Consolidation** — merge overlapping memories across channels and create cross-channel associations +- **Observations** — generate observation-type memories from recurring patterns +- **Higher-order scoring** — extend maintenance beyond the current decay/prune/merge pass with richer graph analysis ### The Signal Bus @@ -138,7 +149,7 @@ The cortex is the only singleton in the system (per agent). There's one cortex p **Compactors** are per-channel, programmatic monitors. They watch one channel's context size and trigger compaction workers. They're not LLM processes. -**The cortex** is an LLM-assisted process that sees across all channels. It doesn't manage context size (that's the compactor's job). It manages the memory bulletin, and will eventually handle memory coherence and system health. +**The cortex** is an LLM-assisted process that sees across all channels. It doesn't manage context size (that's the compactor's job). It manages the memory bulletin, supervises background health state, and runs periodic memory maintenance. Deeper memory coherence work still grows from here. ## Interactive Cortex Chat @@ -155,7 +166,7 @@ This makes cortex chat a practical control-room interface for troubleshooting, v ```toml [defaults.cortex] -# Base tick interval for health monitoring (future). +# Base tick interval for health monitoring and maintenance scheduling. tick_interval_secs = 30 # How often to regenerate the memory bulletin. @@ -172,6 +183,21 @@ branch_timeout_secs = 60 # Consecutive failures before circuit breaker trips. circuit_breaker_threshold = 3 + +# Interval between maintenance passes. +maintenance_interval_secs = 3600 + +# Per-day decay applied during maintenance. +maintenance_decay_rate = 0.05 + +# Importance floor for pruning. +maintenance_prune_threshold = 0.1 + +# Minimum age before pruning can delete a memory. +maintenance_min_age_days = 30 + +# Similarity threshold for duplicate merges. +maintenance_merge_similarity_threshold = 0.95 ``` ## Warmup API diff --git a/docs/content/docs/(core)/memory.mdx b/docs/content/docs/(core)/memory.mdx index 6577bba84..b1edf63ea 100644 --- a/docs/content/docs/(core)/memory.mdx +++ b/docs/content/docs/(core)/memory.mdx @@ -150,7 +150,7 @@ Importance is influenced by: - **Recency** -- recent memories score higher; old memories decay - **Graph centrality** -- memories with many strong connections to other memories are more important -A background maintenance process runs periodically to decay old memories, prune memories that have fallen below a threshold, merge near-duplicates, and recompute graph centrality scores. +A background maintenance process runs periodically to decay old memories, prune memories that have fallen below a threshold, and merge near-duplicates. Identity and permanent-tagged memories are exempt from decay and pruning. They always survive. @@ -202,6 +202,5 @@ A periodic background process handles graph hygiene: - **Decay** -- reduce importance of old, unaccessed memories - **Prune** -- delete memories below an importance floor (identity/permanent exempt) - **Merge** -- combine near-duplicate memories (>0.95 similarity) -- **Reindex** -- recompute graph centrality scores -This is a scheduled job managed by the cortex. It runs in workers, doesn't block anything, and keeps the graph healthy over time. +This is a scheduled job managed by the cortex. It runs as an internal background task in the cortex loop, doesn't block channels, and keeps the graph healthy over time. diff --git a/docs/design-docs/cortex-implementation.md b/docs/design-docs/cortex-implementation.md index f18b6ead3..7edc4544f 100644 --- a/docs/design-docs/cortex-implementation.md +++ b/docs/design-docs/cortex-implementation.md @@ -1,6 +1,6 @@ # Cortex Implementation Plan -The cortex is designed to be the system's self-awareness — supervising processes, maintaining memory coherence, and generating the memory bulletin. Phase 1 plumbing and Phase 2 health supervision are now live; maintenance and consolidation phases remain. +The cortex is designed to be the system's self-awareness — supervising processes, maintaining memory coherence, and generating the memory bulletin. Phase 1 plumbing, Phase 2 health supervision, and Phase 3 maintenance are now live; consolidation remains. This doc covers the path from "bulletin generator" to "full system supervisor." @@ -16,8 +16,8 @@ This doc covers the path from "bulletin generator" to "full system supervisor." - `Signal` enum — aligned to current `ProcessEvent` surface (worker/branch/tool/memory/compaction/task/link events). - `CortexHook` — all methods return `Continue` with trace logging. -**Implemented but never called:** -- `memory/maintenance.rs` — `apply_decay()` and `prune_memories()` work. `merge_similar_memories()` is a stub returning `Ok(0)`. +**Implemented and wired into the cortex loop:** +- `memory/maintenance.rs` — `apply_decay()`, `prune_memories()`, and `merge_similar_memories()` are implemented and wired into the cortex loop. **Wired through config:** - `tick_interval_secs` and `bulletin_interval_secs` are read by the running cortex loop and hot-reload during runtime. @@ -124,27 +124,31 @@ Per tick, cortex maintains runtime maps for workers, branches, branch latency, a - Success resets the counter/tripped flag for that key. - Breaker state is in-memory only in Phase 2 and resets on process restart. -## Phase 3: Memory Maintenance +## Phase 3: Memory Maintenance (Implemented) -Wire up the maintenance code that already exists in `memory/maintenance.rs`. +Memory maintenance is now wired into the cortex loop. -### Schedule maintenance in the tick loop +### Scheduling and control -- Add `maintenance_interval_secs` to `CortexConfig` (default: 3600) -- On the maintenance tick, call `run_maintenance()` with the current `MaintenanceConfig` -- Log the `MaintenanceReport` (decayed, pruned, merged counts) -- Respect hot-reload — `MaintenanceConfig` values should come from `CortexConfig` +- `maintenance_interval_secs` is part of `CortexConfig` and hot-reloads at runtime +- The cortex tick loop schedules `run_maintenance_with_cancel()` with the current `MaintenanceConfig` +- Maintenance reports log decayed, pruned, and merged counts +- Invalid maintenance config values are rejected at load/update time: + - `maintenance_interval_secs >= 1` + - `maintenance_decay_rate`, `maintenance_prune_threshold`, `maintenance_merge_similarity_threshold` in `[0.0, 1.0]` + - `maintenance_min_age_days >= 0` +- Maintenance runs with timeout, graceful cancellation, forced abort fallback, and a recurring-failure circuit breaker -### Finish `merge_similar_memories()` +### Merge behavior -- Query LanceDB for high-similarity memory pairs above `merge_similarity_threshold` (default 0.95) -- Keep the higher-importance memory as the survivor -- Merge content from the lower-importance memory into the survivor -- Create `Updates` association from survivor to merged memory -- Transfer associations from the merged memory to the survivor -- Soft-delete the merged memory +- LanceDB similarity search drives near-duplicate detection above `merge_similarity_threshold` (default 0.95) +- The higher-importance memory stays the survivor +- Survivor content is updated, associations are rewired atomically in SQLite, and an `Updates` edge is preserved +- The merged memory is soft-forgotten and its embedding is removed +- The survivor embedding is recomputed after merge +- Per-pass work is bounded (candidate and merge caps) to avoid unbounded scans on large corpora -**End state:** Memories decay over time, low-importance orphans get pruned, near-duplicates get merged. All on autopilot. +**End state:** Memories decay over time, low-importance orphans get pruned, and near-duplicates get merged on autopilot under cortex supervision. ## Phase 4: Memory Consolidation (LLM-Assisted) diff --git a/src/agent/cortex.rs b/src/agent/cortex.rs index 8e350e6e1..27009f3d8 100644 --- a/src/agent/cortex.rs +++ b/src/agent/cortex.rs @@ -17,6 +17,7 @@ use crate::agent::worker::Worker; use crate::error::Result; use crate::hooks::CortexHook; use crate::llm::SpacebotModel; +use crate::memory::maintenance as memory_maintenance; use crate::memory::search::{SearchConfig, SearchMode, SearchSort}; use crate::memory::types::{Association, MemoryType, RelationType}; use crate::tasks::{TaskStatus, UpdateTaskInput}; @@ -86,6 +87,12 @@ const BULLETIN_REFRESH_FAILURE_BACKOFF_BASE_SECS: u64 = 30; const BULLETIN_REFRESH_FAILURE_BACKOFF_MAX_SECS: u64 = 600; const BULLETIN_REFRESH_CIRCUIT_OPEN_THRESHOLD: u32 = 3; const BULLETIN_REFRESH_CIRCUIT_OPEN_SECS: u64 = 1800; +const MAINTENANCE_CIRCUIT_OPEN_THRESHOLD: usize = 3; +const MAINTENANCE_CIRCUIT_OPEN_SECS: u64 = 1800; +const MAINTENANCE_TASK_TIMEOUT_MIN_SECS: u64 = 300; +const MAINTENANCE_TASK_TIMEOUT_MAX_SECS: u64 = 3_600; +const MAINTENANCE_TASK_TIMEOUT_MULTIPLIER: u64 = 6; +const MAINTENANCE_TASK_CANCEL_GRACE_SECS: u64 = 30; fn bulletin_refresh_failure_backoff(consecutive_failures: u32) -> Duration { let exponent = consecutive_failures.saturating_sub(1).min(5); @@ -138,6 +145,83 @@ fn maybe_close_bulletin_refresh_circuit( true } +fn record_maintenance_failure( + maintenance_consecutive_failures: &mut usize, + maintenance_disabled_at: &mut Option, + now: Instant, +) -> bool { + *maintenance_consecutive_failures = maintenance_consecutive_failures.saturating_add(1); + if *maintenance_consecutive_failures >= MAINTENANCE_CIRCUIT_OPEN_THRESHOLD + && maintenance_disabled_at.is_none() + { + *maintenance_disabled_at = Some(now); + return true; + } + false +} + +fn maybe_close_maintenance_circuit( + maintenance_consecutive_failures: &mut usize, + maintenance_disabled_at: &mut Option, + now: Instant, +) -> bool { + let Some(disabled_at) = *maintenance_disabled_at else { + return false; + }; + if now.duration_since(disabled_at) < Duration::from_secs(MAINTENANCE_CIRCUIT_OPEN_SECS) { + return false; + } + + *maintenance_consecutive_failures = 0; + *maintenance_disabled_at = None; + true +} + +fn maintenance_task_timeout(maintenance_interval_secs: u64) -> Duration { + let interval_secs = maintenance_interval_secs.max(1); + let derived_secs = interval_secs.saturating_mul(MAINTENANCE_TASK_TIMEOUT_MULTIPLIER); + let bounded_secs = derived_secs.clamp( + MAINTENANCE_TASK_TIMEOUT_MIN_SECS, + MAINTENANCE_TASK_TIMEOUT_MAX_SECS, + ); + Duration::from_secs(bounded_secs) +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum MaintenanceTimeoutAction { + None, + RequestCancel, + ForceAbort, +} + +fn maintenance_timeout_action( + now: Instant, + started_at: Instant, + timeout: Duration, + cancel_requested_at: Option, + forced_abort_issued: bool, +) -> MaintenanceTimeoutAction { + if now.duration_since(started_at) < timeout { + return MaintenanceTimeoutAction::None; + } + + if cancel_requested_at.is_none() { + return MaintenanceTimeoutAction::RequestCancel; + } + + if forced_abort_issued { + return MaintenanceTimeoutAction::None; + } + + if now.duration_since(cancel_requested_at.unwrap()) + >= Duration::from_secs(MAINTENANCE_TASK_CANCEL_GRACE_SECS) + { + return MaintenanceTimeoutAction::ForceAbort; + } + + MaintenanceTimeoutAction::None +} + fn has_completed_initial_warmup(status: &crate::config::WarmupStatus) -> bool { status.last_refresh_unix_ms.is_some() && matches!(status.state, crate::config::WarmupState::Warm) @@ -1673,9 +1757,19 @@ async fn run_cortex_loop( let mut last_lag_warning_memory: Option = None; let mut memory_event_stream_open = true; let mut refresh_task: Option> = None; + let mut maintenance_task: Option< + tokio::task::JoinHandle>, + > = None; + let mut maintenance_task_started_at: Option = None; + let mut maintenance_task_cancel_tx: Option> = None; + let mut maintenance_task_cancel_requested_at: Option = None; + let mut maintenance_task_forced_abort_issued = false; + let mut maintenance_consecutive_failures: usize = 0; + let mut maintenance_disabled_at: Option = None; let mut bulletin_refresh_failures: u32 = 0; let mut bulletin_refresh_circuit_open = false; let mut next_bulletin_refresh_allowed_at = Instant::now(); + let mut last_maintenance = Instant::now(); loop { tokio::select! { @@ -1704,6 +1798,9 @@ async fn run_cortex_loop( if let Some(task) = refresh_task.take() { task.abort(); } + if let Some(task) = maintenance_task.take() { + task.abort(); + } return Ok(()); } CortexReceiverOutcome::DisableStream => unreachable!("control stream cannot disable itself"), @@ -1732,6 +1829,9 @@ async fn run_cortex_loop( if let Some(task) = refresh_task.take() { task.abort(); } + if let Some(task) = maintenance_task.take() { + task.abort(); + } return Ok(()); } CortexReceiverOutcome::DisableStream => { @@ -1744,6 +1844,9 @@ async fn run_cortex_loop( tracing::warn!(%error, "cortex health tick failed"); } + let cortex_config = **cortex.deps.runtime_config.cortex.load(); + let now = Instant::now(); + if refresh_task .as_ref() .is_some_and(tokio::task::JoinHandle::is_finished) @@ -1812,7 +1915,138 @@ async fn run_cortex_loop( } } - let cortex_config = **cortex.deps.runtime_config.cortex.load(); + if maintenance_task + .as_ref() + .is_some_and(tokio::task::JoinHandle::is_finished) + && let Some(task) = maintenance_task.take() + { + maintenance_task_started_at = None; + maintenance_task_cancel_tx = None; + maintenance_task_cancel_requested_at = None; + maintenance_task_forced_abort_issued = false; + match task.await { + Ok(Ok(report)) => { + if maintenance_consecutive_failures > 0 || maintenance_disabled_at.is_some() + { + tracing::info!( + previous_failures = maintenance_consecutive_failures, + "cortex maintenance circuit reset after successful run" + ); + } + maintenance_consecutive_failures = 0; + maintenance_disabled_at = None; + logger.log( + "maintenance_completed", + "Memory maintenance completed", + Some(serde_json::json!({ + "decayed": report.decayed, + "pruned": report.pruned, + "merged": report.merged, + })), + ); + } + Ok(Err(error)) => { + let now = Instant::now(); + let circuit_opened = record_maintenance_failure( + &mut maintenance_consecutive_failures, + &mut maintenance_disabled_at, + now, + ); + tracing::warn!(%error, "cortex maintenance failed"); + if circuit_opened { + tracing::warn!( + failures = maintenance_consecutive_failures, + cooldown_secs = MAINTENANCE_CIRCUIT_OPEN_SECS, + "cortex maintenance circuit opened after consecutive failures" + ); + } + } + Err(error) => { + let now = Instant::now(); + let circuit_opened = record_maintenance_failure( + &mut maintenance_consecutive_failures, + &mut maintenance_disabled_at, + now, + ); + if error.is_cancelled() { + tracing::warn!( + %error, + "cortex maintenance task was cancelled before completion" + ); + } else if error.is_panic() { + tracing::warn!(%error, "cortex maintenance task panicked"); + } else { + tracing::warn!(%error, "cortex maintenance task failed"); + } + if circuit_opened { + tracing::warn!( + failures = maintenance_consecutive_failures, + cooldown_secs = MAINTENANCE_CIRCUIT_OPEN_SECS, + "cortex maintenance circuit opened after task failures" + ); + } + } + } + } + + if let Some(started_at) = maintenance_task_started_at { + let timeout = maintenance_task_timeout(cortex_config.maintenance_interval_secs); + let action = maintenance_timeout_action( + now, + started_at, + timeout, + maintenance_task_cancel_requested_at, + maintenance_task_forced_abort_issued, + ); + match action { + MaintenanceTimeoutAction::None => {} + MaintenanceTimeoutAction::RequestCancel => { + if let Some(cancel_tx) = maintenance_task_cancel_tx.as_ref() { + cancel_tx.send(true).ok(); + } + maintenance_task_cancel_requested_at = Some(now); + tracing::warn!( + elapsed_secs = started_at.elapsed().as_secs(), + timeout_secs = timeout.as_secs(), + "cortex maintenance task timed out; requesting graceful cancel" + ); + logger.log( + "maintenance_timeout", + "Memory maintenance timeout requested", + Some(serde_json::json!({ + "elapsed_secs": started_at.elapsed().as_secs(), + "timeout_secs": timeout.as_secs(), + "maintenance_interval_secs": cortex_config.maintenance_interval_secs, + "graceful_cancel": true, + })), + ); + } + MaintenanceTimeoutAction::ForceAbort => { + if let Some(task) = maintenance_task.as_ref() { + task.abort(); + } + maintenance_task_cancel_requested_at = Some(now); + maintenance_task_forced_abort_issued = true; + tracing::warn!( + elapsed_secs = started_at.elapsed().as_secs(), + timeout_secs = timeout.as_secs(), + grace_secs = MAINTENANCE_TASK_CANCEL_GRACE_SECS, + "cortex maintenance task did not stop gracefully; forcing abort" + ); + logger.log( + "maintenance_timeout", + "Memory maintenance forced abort", + Some(serde_json::json!({ + "elapsed_secs": started_at.elapsed().as_secs(), + "timeout_secs": timeout.as_secs(), + "maintenance_interval_secs": cortex_config.maintenance_interval_secs, + "forced_abort": true, + })), + ); + } + } + } + let bulletin_interval = Duration::from_secs(cortex_config.bulletin_interval_secs.max(1)); let now = Instant::now(); if maybe_close_bulletin_refresh_circuit( @@ -1823,6 +2057,13 @@ async fn run_cortex_loop( ) { tracing::info!("cortex bulletin refresh circuit closed; retries re-enabled"); } + if maybe_close_maintenance_circuit( + &mut maintenance_consecutive_failures, + &mut maintenance_disabled_at, + now, + ) { + tracing::info!("cortex maintenance circuit closed; retries re-enabled"); + } if refresh_task.is_none() && !bulletin_refresh_circuit_open && last_bulletin_refresh.elapsed() >= bulletin_interval @@ -1834,6 +2075,55 @@ async fn run_cortex_loop( )); } + if last_maintenance.elapsed() >= Duration::from_secs( + cortex_config.maintenance_interval_secs.max(1), + ) { + if maintenance_task.is_none() && maintenance_disabled_at.is_none() { + maintenance_task_started_at = Some(Instant::now()); + let maintenance_config = memory_maintenance::MaintenanceConfig { + prune_threshold: cortex_config.maintenance_prune_threshold, + decay_rate: cortex_config.maintenance_decay_rate, + min_age_days: cortex_config.maintenance_min_age_days, + merge_similarity_threshold: cortex_config + .maintenance_merge_similarity_threshold, + }; + let memory_search = cortex.deps.memory_search.clone(); + logger.log( + "maintenance_started", + "Memory maintenance started", + Some(serde_json::json!({ + "decay_rate": maintenance_config.decay_rate, + "prune_threshold": maintenance_config.prune_threshold, + "min_age_days": maintenance_config.min_age_days, + "merge_similarity_threshold": maintenance_config.merge_similarity_threshold, + })), + ); + let (maintenance_cancel_tx, maintenance_cancel_rx) = + tokio::sync::watch::channel(false); + maintenance_task = Some(tokio::spawn(async move { + memory_maintenance::run_maintenance_with_cancel( + memory_search.store(), + memory_search.embedding_table(), + memory_search.embedding_model_arc(), + &maintenance_config, + maintenance_cancel_rx, + ) + .await + })); + maintenance_task_cancel_tx = Some(maintenance_cancel_tx); + maintenance_task_cancel_requested_at = None; + maintenance_task_forced_abort_issued = false; + } else if maintenance_disabled_at.is_some() { + tracing::debug!( + failures = maintenance_consecutive_failures, + cooldown_secs = MAINTENANCE_CIRCUIT_OPEN_SECS, + "maintenance scheduling skipped while maintenance circuit is open" + ); + } + + last_maintenance = Instant::now(); + } + let updated_tick_interval_secs = cortex_config.tick_interval_secs.max(1); if updated_tick_interval_secs != tick_interval_secs { tick_interval_secs = updated_tick_interval_secs; @@ -3252,14 +3542,16 @@ async fn fetch_memories_for_association( mod tests { use super::{ BULLETIN_REFRESH_CIRCUIT_OPEN_SECS, BULLETIN_REFRESH_CIRCUIT_OPEN_THRESHOLD, BranchTracker, - BulletinRefreshOutcome, CortexReceiverOutcome, HealthRuntimeState, ReceiverClosedBehavior, + BulletinRefreshOutcome, CortexReceiverOutcome, HealthRuntimeState, + MAINTENANCE_TASK_CANCEL_GRACE_SECS, MaintenanceTimeoutAction, ReceiverClosedBehavior, Signal, WorkerTracker, apply_cancelled_warmup_status, build_kill_targets, claim_detached_completion, detached_timeout_transition, handle_cortex_receiver_result, has_completed_initial_warmup, is_cancelled_control_result, is_terminal_control_result, - maybe_close_bulletin_refresh_circuit, maybe_generate_bulletin_under_lock, - parse_structured_success_flag, push_signal_into_buffer, record_bulletin_refresh_failure, - should_execute_warmup, should_generate_bulletin_from_bulletin_loop, signal_from_event, - summarize_signal_text, take_lagged_control_flag, + maintenance_task_timeout, maintenance_timeout_action, maybe_close_bulletin_refresh_circuit, + maybe_generate_bulletin_under_lock, parse_structured_success_flag, push_signal_into_buffer, + record_bulletin_refresh_failure, should_execute_warmup, + should_generate_bulletin_from_bulletin_loop, signal_from_event, summarize_signal_text, + take_lagged_control_flag, }; use crate::ProcessEvent; use crate::agent::process_control::ControlActionResult; @@ -3802,6 +4094,58 @@ mod tests { ); } + #[test] + fn maintenance_task_timeout_bounds() { + assert_eq!(maintenance_task_timeout(1).as_secs(), 300); + assert_eq!(maintenance_task_timeout(100).as_secs(), 600); + assert_eq!(maintenance_task_timeout(600).as_secs(), 3_600); + assert_eq!(maintenance_task_timeout(2_000).as_secs(), 3_600); + assert_eq!(maintenance_task_timeout(0).as_secs(), 300); + } + + #[test] + fn maintenance_timeout_action_progresses_from_none_to_cancel_to_abort() { + let now = Instant::now(); + let started_at = now - Duration::from_secs(1); + let timeout = Duration::from_secs(3); + let grace = Duration::from_secs(MAINTENANCE_TASK_CANCEL_GRACE_SECS); + + assert_eq!( + maintenance_timeout_action( + started_at + Duration::from_secs(1), + started_at, + timeout, + None, + false + ), + MaintenanceTimeoutAction::None + ); + assert_eq!( + maintenance_timeout_action(started_at + timeout, started_at, timeout, None, false), + MaintenanceTimeoutAction::RequestCancel + ); + assert_eq!( + maintenance_timeout_action( + started_at + timeout + grace, + started_at, + timeout, + Some(started_at + timeout), + false + ), + MaintenanceTimeoutAction::ForceAbort + ); + assert_eq!( + maintenance_timeout_action( + started_at + timeout + grace + Duration::from_secs(1), + started_at, + timeout, + Some(started_at + timeout), + true + ), + MaintenanceTimeoutAction::None, + ); + } + #[test] fn detached_timeout_transition_requeues_until_limit_then_quarantines() { let metadata = serde_json::json!({}); diff --git a/src/agent/cortex_chat.rs b/src/agent/cortex_chat.rs index c2dec868d..e95338bdf 100644 --- a/src/agent/cortex_chat.rs +++ b/src/agent/cortex_chat.rs @@ -200,6 +200,8 @@ impl PromptHook for CortexChatHook { ) -> HookAction { let guard_action = self.spacebot_hook.guard_tool_result(tool_name, result); if !matches!(guard_action, HookAction::Continue) { + self.spacebot_hook + .record_tool_result_metrics(tool_name, internal_call_id); return guard_action; } let preview = crate::tools::truncate_utf8_ellipsis(result, 200); diff --git a/src/agent/process_control.rs b/src/agent/process_control.rs index 6e6c2b5bd..587b805e2 100644 --- a/src/agent/process_control.rs +++ b/src/agent/process_control.rs @@ -3,7 +3,7 @@ use crate::agent::channel::WeakChannelControlHandle; use crate::{AgentId, BranchId, ChannelId, WorkerId}; use std::collections::HashMap; -use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::atomic::{AtomicU8, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -18,6 +18,26 @@ pub const DETACHED_WORKER_LIFECYCLE_COMPLETING: u8 = 1; pub const DETACHED_WORKER_LIFECYCLE_KILLING: u8 = 2; pub const DETACHED_WORKER_LIFECYCLE_TERMINAL: u8 = 3; +#[derive(Debug, Clone)] +pub struct DetachedWorkerControlSnapshot { + pub worker_id: WorkerId, + pub agent_id: AgentId, + pub task_number: i64, + pub lifecycle: u8, +} + +#[derive(Clone)] +struct ChannelControlEntry { + handle: WeakChannelControlHandle, + registration_id: u64, +} + +enum ChannelLookupResult { + Found(crate::agent::channel::ChannelControlHandle), + Stale(u64), + Missing, +} + pub struct DetachedWorkerControl { pub worker_id: WorkerId, pub agent_id: AgentId, @@ -45,8 +65,9 @@ impl DetachedWorkerControl { } pub struct ProcessControlRegistry { - channels: tokio::sync::RwLock>, + channels: tokio::sync::RwLock>, detached_workers: tokio::sync::RwLock>, + next_channel_registration: AtomicU64, } impl Default for ProcessControlRegistry { @@ -60,21 +81,43 @@ impl ProcessControlRegistry { Self { channels: tokio::sync::RwLock::new(HashMap::new()), detached_workers: tokio::sync::RwLock::new(HashMap::new()), + next_channel_registration: AtomicU64::new(1), } } - pub async fn register_channel(&self, channel_id: ChannelId, handle: WeakChannelControlHandle) { - self.channels.write().await.insert(channel_id, handle); + pub async fn register_channel( + &self, + channel_id: ChannelId, + handle: WeakChannelControlHandle, + ) -> u64 { + let registration_id = self + .next_channel_registration + .fetch_add(1, Ordering::AcqRel); + self.channels.write().await.insert( + channel_id, + ChannelControlEntry { + handle, + registration_id, + }, + ); + registration_id } - pub async fn unregister_channel(&self, channel_id: &ChannelId) -> bool { - self.channels.write().await.remove(channel_id).is_some() + pub async fn unregister_channel(&self, channel_id: &ChannelId, registration_id: u64) -> bool { + let mut channels = self.channels.write().await; + let should_remove = channels + .get(channel_id) + .is_some_and(|entry| entry.registration_id == registration_id); + if should_remove { + channels.remove(channel_id); + } + should_remove } pub async fn prune_dead_channels(&self) -> usize { let mut channels = self.channels.write().await; let before = channels.len(); - channels.retain(|_, handle| handle.upgrade().is_some()); + channels.retain(|_, entry| entry.handle.upgrade().is_some()); before.saturating_sub(channels.len()) } @@ -93,21 +136,33 @@ impl ProcessControlRegistry { .is_some() } - async fn lookup_channel_handle( - &self, - channel_id: &ChannelId, - ) -> std::result::Result { - let mut channels = self.channels.write().await; - let Some(weak_handle) = channels.get(channel_id).cloned() else { - return Err(ControlActionResult::NotFound); - }; + pub async fn detached_worker_snapshots(&self) -> Vec { + let workers = self.detached_workers.read().await; + workers + .values() + .map(|control| DetachedWorkerControlSnapshot { + worker_id: control.worker_id, + agent_id: control.agent_id.clone(), + task_number: control.task_number, + lifecycle: control.lifecycle.load(Ordering::Acquire), + }) + .collect() + } - let Some(handle) = weak_handle.upgrade() else { - channels.remove(channel_id); - return Err(ControlActionResult::NotFound); + async fn lookup_channel_handle(&self, channel_id: &ChannelId) -> ChannelLookupResult { + let handle_entry = { + let channels = self.channels.read().await; + let Some(handle_entry) = channels.get(channel_id).cloned() else { + return ChannelLookupResult::Missing; + }; + + handle_entry }; - Ok(handle) + match handle_entry.handle.upgrade() { + Some(handle) => ChannelLookupResult::Found(handle), + None => ChannelLookupResult::Stale(handle_entry.registration_id), + } } pub async fn cancel_channel_worker( @@ -116,12 +171,19 @@ impl ProcessControlRegistry { worker_id: WorkerId, reason: &str, ) -> ControlActionResult { - let handle = match self.lookup_channel_handle(channel_id).await { - Ok(handle) => handle, - Err(result) => return result, - }; - - handle.cancel_worker_with_reason(worker_id, reason).await + for _ in 0..2 { + match self.lookup_channel_handle(channel_id).await { + ChannelLookupResult::Found(handle) => { + return handle.cancel_worker_with_reason(worker_id, reason).await; + } + ChannelLookupResult::Stale(registration_id) => { + self.remove_stale_channel_if_matches(channel_id, registration_id) + .await; + } + ChannelLookupResult::Missing => return ControlActionResult::NotFound, + } + } + ControlActionResult::NotFound } pub async fn cancel_channel_branch( @@ -130,12 +192,36 @@ impl ProcessControlRegistry { branch_id: BranchId, reason: &str, ) -> ControlActionResult { - let handle = match self.lookup_channel_handle(channel_id).await { - Ok(handle) => handle, - Err(result) => return result, - }; + for _ in 0..2 { + match self.lookup_channel_handle(channel_id).await { + ChannelLookupResult::Found(handle) => { + return handle.cancel_branch_with_reason(branch_id, reason).await; + } + ChannelLookupResult::Stale(registration_id) => { + self.remove_stale_channel_if_matches(channel_id, registration_id) + .await; + } + ChannelLookupResult::Missing => return ControlActionResult::NotFound, + } + } + ControlActionResult::NotFound + } + + async fn remove_stale_channel_if_matches( + &self, + channel_id: &ChannelId, + expected_registration_id: u64, + ) -> bool { + let mut channels = self.channels.write().await; + let should_remove = channels + .get(channel_id) + .is_some_and(|current| current.registration_id == expected_registration_id); - handle.cancel_branch_with_reason(branch_id, reason).await + if should_remove { + channels.remove(channel_id); + } + + should_remove } pub async fn cancel_detached_worker( @@ -186,8 +272,9 @@ impl ProcessControlRegistry { mod tests { use super::{ ControlActionResult, DETACHED_WORKER_LIFECYCLE_ACTIVE, DetachedWorkerControl, - ProcessControlRegistry, + DetachedWorkerControlSnapshot, ProcessControlRegistry, }; + use crate::agent::channel::WeakChannelControlHandle; use std::sync::Arc; use std::sync::atomic::{AtomicU8, Ordering}; @@ -216,7 +303,7 @@ mod tests { async fn prune_dead_channels_removes_stale_entries() { let registry = ProcessControlRegistry::new(); let channel_id: crate::ChannelId = Arc::from("channel-1"); - registry + let registration_id = registry .register_channel( channel_id.clone(), crate::agent::channel::WeakChannelControlHandle::dangling(), @@ -226,7 +313,42 @@ mod tests { let pruned = registry.prune_dead_channels().await; assert_eq!(pruned, 1); - assert!(!registry.unregister_channel(&channel_id).await); + assert!( + !registry + .unregister_channel(&channel_id, registration_id) + .await + ); + } + + #[tokio::test] + async fn stale_channel_entry_cleanup_only_removes_matching_registration_id() { + let registry = ProcessControlRegistry::new(); + let channel_id: crate::ChannelId = Arc::from("channel-stale-race"); + let stale_handle = WeakChannelControlHandle::dangling(); + + let stale_registration_id = registry + .register_channel(channel_id.clone(), stale_handle) + .await; + + let active_registration_id = registry + .register_channel(channel_id.clone(), WeakChannelControlHandle::dangling()) + .await; + + assert!( + !registry + .remove_stale_channel_if_matches(&channel_id, stale_registration_id) + .await + ); + assert!( + !registry + .unregister_channel(&channel_id, stale_registration_id) + .await + ); + assert!( + registry + .unregister_channel(&channel_id, active_registration_id) + .await + ); } #[tokio::test] @@ -254,6 +376,30 @@ mod tests { ); } + #[tokio::test] + async fn cancel_stale_channel_entry_prunes_then_returns_not_found() { + let registry = ProcessControlRegistry::new(); + let channel_id: crate::ChannelId = Arc::from("stale-channel"); + let worker_id = uuid::Uuid::new_v4(); + + let registration_id = registry + .register_channel(channel_id.clone(), WeakChannelControlHandle::dangling()) + .await; + + assert_eq!( + registry + .cancel_channel_worker(&channel_id, worker_id, "test") + .await, + ControlActionResult::NotFound + ); + assert!( + !registry + .unregister_channel(&channel_id, registration_id) + .await, + "stale entry should be pruned during cancellation retry path" + ); + } + #[tokio::test] async fn cancel_detached_worker_is_single_winner_and_idempotent() { let registry = ProcessControlRegistry::new(); @@ -283,4 +429,31 @@ mod tests { super::DETACHED_WORKER_LIFECYCLE_KILLING ); } + + #[tokio::test] + async fn detached_worker_snapshots_capture_state() { + let registry = ProcessControlRegistry::new(); + let worker_id = uuid::Uuid::new_v4(); + let lifecycle = Arc::new(AtomicU8::new(DETACHED_WORKER_LIFECYCLE_ACTIVE)); + let (cancel_tx, _cancel_rx) = tokio::sync::oneshot::channel(); + + registry + .register_detached_worker(DetachedWorkerControl::new( + worker_id, + Arc::from("agent"), + 99, + cancel_tx, + lifecycle, + )) + .await; + + let snapshots = registry.detached_worker_snapshots().await; + assert_eq!(snapshots.len(), 1); + + let snapshot: DetachedWorkerControlSnapshot = snapshots[0].clone(); + assert_eq!(snapshot.worker_id, worker_id); + assert_eq!(snapshot.agent_id.as_ref(), "agent"); + assert_eq!(snapshot.task_number, 99); + assert_eq!(snapshot.lifecycle, DETACHED_WORKER_LIFECYCLE_ACTIVE); + } } diff --git a/src/api/config.rs b/src/api/config.rs index a1d2a8b84..59a5f4101 100644 --- a/src/api/config.rs +++ b/src/api/config.rs @@ -38,6 +38,7 @@ pub(super) struct CompactionSection { #[derive(Serialize, Debug)] pub(super) struct CortexSection { tick_interval_secs: u64, + maintenance_interval_secs: u64, worker_timeout_secs: u64, branch_timeout_secs: u64, detached_worker_timeout_retry_limit: u8, @@ -46,6 +47,10 @@ pub(super) struct CortexSection { bulletin_interval_secs: u64, bulletin_max_words: usize, bulletin_max_turns: usize, + maintenance_decay_rate: f32, + maintenance_prune_threshold: f32, + maintenance_min_age_days: i64, + maintenance_merge_similarity_threshold: f32, } #[derive(Serialize, Debug)] @@ -189,6 +194,7 @@ pub(super) struct CompactionUpdate { #[derive(Deserialize, Debug)] pub(super) struct CortexUpdate { tick_interval_secs: Option, + maintenance_interval_secs: Option, worker_timeout_secs: Option, branch_timeout_secs: Option, detached_worker_timeout_retry_limit: Option, @@ -197,6 +203,10 @@ pub(super) struct CortexUpdate { bulletin_interval_secs: Option, bulletin_max_words: Option, bulletin_max_turns: Option, + maintenance_decay_rate: Option, + maintenance_prune_threshold: Option, + maintenance_min_age_days: Option, + maintenance_merge_similarity_threshold: Option, } #[derive(Deserialize, Debug)] @@ -305,6 +315,7 @@ pub(super) async fn get_agent_config( }, cortex: CortexSection { tick_interval_secs: cortex.tick_interval_secs, + maintenance_interval_secs: cortex.maintenance_interval_secs, worker_timeout_secs: cortex.worker_timeout_secs, branch_timeout_secs: cortex.branch_timeout_secs, detached_worker_timeout_retry_limit: cortex.detached_worker_timeout_retry_limit, @@ -313,6 +324,10 @@ pub(super) async fn get_agent_config( bulletin_interval_secs: cortex.bulletin_interval_secs, bulletin_max_words: cortex.bulletin_max_words, bulletin_max_turns: cortex.bulletin_max_turns, + maintenance_decay_rate: cortex.maintenance_decay_rate, + maintenance_prune_threshold: cortex.maintenance_prune_threshold, + maintenance_min_age_days: cortex.maintenance_min_age_days, + maintenance_merge_similarity_threshold: cortex.maintenance_merge_similarity_threshold, }, warmup: WarmupSection { enabled: warmup.enabled, @@ -451,7 +466,13 @@ pub(super) async fn update_agent_config( update_discord_table(&mut doc, discord)?; } - tokio::fs::write(&config_path, doc.to_string()) + let updated_content = doc.to_string(); + if let Err(error) = crate::config::Config::validate_toml(&updated_content) { + tracing::warn!(%error, "rejected config API update due to invalid resulting TOML"); + return Err(StatusCode::BAD_REQUEST); + } + + tokio::fs::write(&config_path, updated_content) .await .map_err(|error| { tracing::warn!(%error, "failed to write config.toml"); @@ -635,6 +656,32 @@ fn update_compaction_table( Ok(()) } +fn validate_maintenance_unit_interval(name: &str, value: f32) -> Result<(), StatusCode> { + if !value.is_finite() || !(0.0..=1.0).contains(&value) { + tracing::warn!( + field = name, + value, + "invalid maintenance value in config update" + ); + return Err(StatusCode::BAD_REQUEST); + } + Ok(()) +} + +fn to_i64_from_u64(field: &'static str, value: u64) -> Result { + i64::try_from(value).map_err(|_| { + tracing::warn!(field, value, "config value exceeds i64 range"); + StatusCode::BAD_REQUEST + }) +} + +fn to_i64_from_usize(field: &'static str, value: usize) -> Result { + i64::try_from(value).map_err(|_| { + tracing::warn!(field, value, "config value exceeds i64 range"); + StatusCode::BAD_REQUEST + }) +} + fn update_cortex_table( doc: &mut toml_edit::DocumentMut, agent_idx: usize, @@ -643,32 +690,63 @@ fn update_cortex_table( let agent = get_agent_table_mut(doc, agent_idx)?; let table = get_or_create_subtable(agent, "cortex")?; if let Some(v) = cortex.tick_interval_secs { - table["tick_interval_secs"] = toml_edit::value(v as i64); + table["tick_interval_secs"] = toml_edit::value(to_i64_from_u64("tick_interval_secs", v)?); } if let Some(v) = cortex.worker_timeout_secs { - table["worker_timeout_secs"] = toml_edit::value(v as i64); + table["worker_timeout_secs"] = toml_edit::value(to_i64_from_u64("worker_timeout_secs", v)?); } if let Some(v) = cortex.branch_timeout_secs { - table["branch_timeout_secs"] = toml_edit::value(v as i64); + table["branch_timeout_secs"] = toml_edit::value(to_i64_from_u64("branch_timeout_secs", v)?); } if let Some(v) = cortex.detached_worker_timeout_retry_limit { - table["detached_worker_timeout_retry_limit"] = toml_edit::value(v as i64); + table["detached_worker_timeout_retry_limit"] = toml_edit::value(i64::from(v)); } if let Some(v) = cortex.supervisor_kill_budget_per_tick { table["supervisor_kill_budget_per_tick"] = - toml_edit::value(i64::try_from(v).map_err(|_| StatusCode::BAD_REQUEST)?); + toml_edit::value(to_i64_from_usize("supervisor_kill_budget_per_tick", v)?); } if let Some(v) = cortex.circuit_breaker_threshold { - table["circuit_breaker_threshold"] = toml_edit::value(v as i64); + table["circuit_breaker_threshold"] = toml_edit::value(i64::from(v)); } if let Some(v) = cortex.bulletin_interval_secs { - table["bulletin_interval_secs"] = toml_edit::value(v as i64); + table["bulletin_interval_secs"] = + toml_edit::value(to_i64_from_u64("bulletin_interval_secs", v)?); } if let Some(v) = cortex.bulletin_max_words { - table["bulletin_max_words"] = toml_edit::value(v as i64); + table["bulletin_max_words"] = toml_edit::value(to_i64_from_usize("bulletin_max_words", v)?); } if let Some(v) = cortex.bulletin_max_turns { - table["bulletin_max_turns"] = toml_edit::value(v as i64); + table["bulletin_max_turns"] = toml_edit::value(to_i64_from_usize("bulletin_max_turns", v)?); + } + if let Some(v) = cortex.maintenance_interval_secs { + if v == 0 { + tracing::warn!("maintenance_interval_secs must be >= 1"); + return Err(StatusCode::BAD_REQUEST); + } + table["maintenance_interval_secs"] = + toml_edit::value(to_i64_from_u64("maintenance_interval_secs", v)?); + } + if let Some(v) = cortex.maintenance_decay_rate { + validate_maintenance_unit_interval("maintenance_decay_rate", v)?; + table["maintenance_decay_rate"] = toml_edit::value(v as f64); + } + if let Some(v) = cortex.maintenance_prune_threshold { + validate_maintenance_unit_interval("maintenance_prune_threshold", v)?; + table["maintenance_prune_threshold"] = toml_edit::value(v as f64); + } + if let Some(v) = cortex.maintenance_min_age_days { + if v < 0 { + tracing::warn!( + maintenance_min_age_days = v, + "maintenance_min_age_days must be >= 0" + ); + return Err(StatusCode::BAD_REQUEST); + } + table["maintenance_min_age_days"] = toml_edit::value(v); + } + if let Some(v) = cortex.maintenance_merge_similarity_threshold { + validate_maintenance_unit_interval("maintenance_merge_similarity_threshold", v)?; + table["maintenance_merge_similarity_threshold"] = toml_edit::value(v as f64); } Ok(()) } @@ -1000,7 +1078,7 @@ id = "main" } #[test] - fn test_update_cortex_table_rejects_large_usize_value() { + fn test_update_cortex_table_rejects_large_numeric_values() { let mut doc: toml_edit::DocumentMut = r#" [[agents]] id = "main" @@ -1012,6 +1090,7 @@ id = "main" find_or_create_agent_table(&mut doc, "main").expect("failed to find/create agent"); let update = CortexUpdate { tick_interval_secs: None, + maintenance_interval_secs: None, worker_timeout_secs: None, branch_timeout_secs: None, detached_worker_timeout_retry_limit: None, @@ -1020,6 +1099,10 @@ id = "main" bulletin_interval_secs: None, bulletin_max_words: None, bulletin_max_turns: None, + maintenance_decay_rate: None, + maintenance_prune_threshold: None, + maintenance_min_age_days: None, + maintenance_merge_similarity_threshold: None, }; let result = update_cortex_table(&mut doc, agent_idx, &update); @@ -1028,5 +1111,258 @@ id = "main" } else { assert!(result.is_ok()); } + + let overflow_u64_update = CortexUpdate { + tick_interval_secs: Some(u64::MAX), + maintenance_interval_secs: None, + worker_timeout_secs: None, + branch_timeout_secs: None, + detached_worker_timeout_retry_limit: None, + supervisor_kill_budget_per_tick: None, + circuit_breaker_threshold: None, + bulletin_interval_secs: None, + bulletin_max_words: None, + bulletin_max_turns: None, + maintenance_decay_rate: None, + maintenance_prune_threshold: None, + maintenance_min_age_days: None, + maintenance_merge_similarity_threshold: None, + }; + assert_eq!( + update_cortex_table(&mut doc, agent_idx, &overflow_u64_update), + Err(StatusCode::BAD_REQUEST) + ); + } + + #[test] + fn test_update_cortex_table_rejects_invalid_maintenance_values() { + let mut doc: toml_edit::DocumentMut = r#" +[[agents]] +id = "main" +"# + .parse() + .expect("failed to parse test TOML"); + + let agent_idx = + find_or_create_agent_table(&mut doc, "main").expect("failed to find/create agent"); + + let invalid_decay = CortexUpdate { + tick_interval_secs: None, + maintenance_interval_secs: None, + worker_timeout_secs: None, + branch_timeout_secs: None, + detached_worker_timeout_retry_limit: None, + supervisor_kill_budget_per_tick: None, + circuit_breaker_threshold: None, + bulletin_interval_secs: None, + bulletin_max_words: None, + bulletin_max_turns: None, + maintenance_decay_rate: Some(1.1), + maintenance_prune_threshold: None, + maintenance_min_age_days: None, + maintenance_merge_similarity_threshold: None, + }; + assert_eq!( + update_cortex_table(&mut doc, agent_idx, &invalid_decay), + Err(StatusCode::BAD_REQUEST) + ); + + let invalid_min_age = CortexUpdate { + tick_interval_secs: None, + maintenance_interval_secs: None, + worker_timeout_secs: None, + branch_timeout_secs: None, + detached_worker_timeout_retry_limit: None, + supervisor_kill_budget_per_tick: None, + circuit_breaker_threshold: None, + bulletin_interval_secs: None, + bulletin_max_words: None, + bulletin_max_turns: None, + maintenance_decay_rate: None, + maintenance_prune_threshold: None, + maintenance_min_age_days: Some(-1), + maintenance_merge_similarity_threshold: None, + }; + assert_eq!( + update_cortex_table(&mut doc, agent_idx, &invalid_min_age), + Err(StatusCode::BAD_REQUEST) + ); + + let invalid_interval = CortexUpdate { + tick_interval_secs: None, + maintenance_interval_secs: Some(0), + worker_timeout_secs: None, + branch_timeout_secs: None, + detached_worker_timeout_retry_limit: None, + supervisor_kill_budget_per_tick: None, + circuit_breaker_threshold: None, + bulletin_interval_secs: None, + bulletin_max_words: None, + bulletin_max_turns: None, + maintenance_decay_rate: None, + maintenance_prune_threshold: None, + maintenance_min_age_days: None, + maintenance_merge_similarity_threshold: None, + }; + assert_eq!( + update_cortex_table(&mut doc, agent_idx, &invalid_interval), + Err(StatusCode::BAD_REQUEST) + ); + } + + #[test] + fn test_update_cortex_table_writes_values() { + let mut doc: toml_edit::DocumentMut = r#" +[[agents]] +id = "main" +"# + .parse() + .expect("failed to parse test TOML"); + + let agent_idx = + find_or_create_agent_table(&mut doc, "main").expect("failed to find/create agent"); + let update = CortexUpdate { + tick_interval_secs: Some(45), + maintenance_interval_secs: Some(3_600), + worker_timeout_secs: Some(321), + branch_timeout_secs: Some(12), + detached_worker_timeout_retry_limit: Some(3), + supervisor_kill_budget_per_tick: Some(12), + circuit_breaker_threshold: Some(6), + bulletin_interval_secs: Some(120), + bulletin_max_words: Some(4000), + bulletin_max_turns: Some(5), + maintenance_decay_rate: Some(0.16), + maintenance_prune_threshold: Some(0.17), + maintenance_min_age_days: Some(15), + maintenance_merge_similarity_threshold: Some(0.98), + }; + + update_cortex_table(&mut doc, agent_idx, &update).expect("failed to update cortex"); + + let agent = doc + .get("agents") + .and_then(|item| item.as_array_of_tables()) + .and_then(|agents| agents.get(agent_idx)) + .expect("missing agent table"); + let cortex = agent + .get("cortex") + .and_then(|item| item.as_table()) + .expect("missing cortex table"); + + assert_eq!(cortex["tick_interval_secs"].as_integer(), Some(45)); + assert_eq!( + cortex["maintenance_interval_secs"].as_integer(), + Some(3_600) + ); + assert_eq!(cortex["worker_timeout_secs"].as_integer(), Some(321)); + assert_eq!(cortex["branch_timeout_secs"].as_integer(), Some(12)); + assert_eq!( + cortex["detached_worker_timeout_retry_limit"].as_integer(), + Some(3) + ); + assert_eq!( + cortex["supervisor_kill_budget_per_tick"].as_integer(), + Some(12) + ); + assert_eq!(cortex["bulletin_interval_secs"].as_integer(), Some(120)); + assert_eq!(cortex["bulletin_max_words"].as_integer(), Some(4000)); + assert_eq!(cortex["bulletin_max_turns"].as_integer(), Some(5)); + assert!((cortex["maintenance_decay_rate"].as_float().unwrap_or(0.0) - 0.16).abs() < 1e-6); + assert!( + (cortex["maintenance_prune_threshold"] + .as_float() + .unwrap_or(0.0) + - 0.17) + .abs() + < 1e-6 + ); + assert_eq!(cortex["maintenance_min_age_days"].as_integer(), Some(15)); + assert!( + (cortex["maintenance_merge_similarity_threshold"] + .as_float() + .unwrap_or(0.0) + - 0.98) + .abs() + < 1e-6 + ); + assert_eq!(cortex["circuit_breaker_threshold"].as_integer(), Some(6)); + } + + #[test] + fn test_update_cortex_table_partial_update_only_sets_requested_keys() { + let mut doc: toml_edit::DocumentMut = r#" +[[agents]] +id = "main" +"# + .parse() + .expect("failed to parse test TOML"); + + let agent_idx = + find_or_create_agent_table(&mut doc, "main").expect("failed to find/create agent"); + let initial = CortexUpdate { + tick_interval_secs: Some(45), + maintenance_interval_secs: None, + worker_timeout_secs: None, + branch_timeout_secs: None, + detached_worker_timeout_retry_limit: None, + supervisor_kill_budget_per_tick: None, + circuit_breaker_threshold: None, + bulletin_interval_secs: None, + bulletin_max_words: None, + bulletin_max_turns: None, + maintenance_decay_rate: None, + maintenance_prune_threshold: None, + maintenance_min_age_days: None, + maintenance_merge_similarity_threshold: None, + }; + + update_cortex_table(&mut doc, agent_idx, &initial).expect("failed to apply initial update"); + + let second = CortexUpdate { + tick_interval_secs: Some(60), + maintenance_interval_secs: Some(4_800), + worker_timeout_secs: None, + branch_timeout_secs: None, + detached_worker_timeout_retry_limit: None, + supervisor_kill_budget_per_tick: None, + circuit_breaker_threshold: None, + bulletin_interval_secs: None, + bulletin_max_words: None, + bulletin_max_turns: None, + maintenance_decay_rate: Some(0.2), + maintenance_prune_threshold: None, + maintenance_min_age_days: None, + maintenance_merge_similarity_threshold: Some(0.85), + }; + + update_cortex_table(&mut doc, agent_idx, &second).expect("failed to apply partial update"); + + let agent = doc + .get("agents") + .and_then(|item| item.as_array_of_tables()) + .and_then(|agents| agents.get(agent_idx)) + .expect("missing agent table"); + let cortex = agent + .get("cortex") + .and_then(|item| item.as_table()) + .expect("missing cortex table"); + + assert_eq!(cortex["tick_interval_secs"].as_integer(), Some(60)); + assert_eq!( + cortex["maintenance_interval_secs"].as_integer(), + Some(4_800) + ); + assert!((cortex["maintenance_decay_rate"].as_float().unwrap_or(0.0) - 0.2).abs() < 1e-6); + assert!( + (cortex["maintenance_merge_similarity_threshold"] + .as_float() + .unwrap_or(0.0) + - 0.85) + .abs() + < 1e-6 + ); + assert!(cortex.get("worker_timeout_secs").is_none()); + assert!(cortex.get("maintenance_prune_threshold").is_none()); } } diff --git a/src/config.rs b/src/config.rs index dea4b597c..11318cab8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -985,6 +985,9 @@ tick_interval_secs = 45 detached_worker_timeout_retry_limit = 4 supervisor_kill_budget_per_tick = 12 bulletin_max_words = 1200 +maintenance_interval_secs = 1200 +maintenance_prune_threshold = 0.21 +maintenance_min_age_days = 17 [[agents]] id = "main" @@ -993,6 +996,7 @@ id = "main" branch_timeout_secs = 77 supervisor_kill_budget_per_tick = 3 association_max_per_pass = 55 +maintenance_decay_rate = 0.33 "#; let parsed: TomlConfig = toml::from_str(toml).expect("failed to parse test TOML"); let config = Config::from_toml(parsed, PathBuf::from(".")).expect("failed to build Config"); @@ -1005,15 +1009,95 @@ association_max_per_pass = 55 ); assert_eq!(config.defaults.cortex.supervisor_kill_budget_per_tick, 12); assert_eq!(config.defaults.cortex.bulletin_max_words, 1200); + assert_eq!(config.defaults.cortex.maintenance_interval_secs, 1200); + assert_eq!(config.defaults.cortex.maintenance_prune_threshold, 0.21); + assert_eq!(config.defaults.cortex.maintenance_min_age_days, 17); assert_eq!(resolved.cortex.tick_interval_secs, 45); assert_eq!(resolved.cortex.branch_timeout_secs, 77); assert_eq!(resolved.cortex.detached_worker_timeout_retry_limit, 4); assert_eq!(resolved.cortex.supervisor_kill_budget_per_tick, 3); assert_eq!(resolved.cortex.bulletin_max_words, 1200); + assert_eq!(resolved.cortex.maintenance_interval_secs, 1200); + assert_eq!(resolved.cortex.maintenance_decay_rate, 0.33); + assert_eq!(resolved.cortex.maintenance_prune_threshold, 0.21); + assert_eq!(resolved.cortex.maintenance_min_age_days, 17); + assert_eq!(resolved.cortex.maintenance_merge_similarity_threshold, 0.95); assert_eq!(resolved.cortex.association_max_per_pass, 55); } + #[test] + fn test_cortex_maintenance_config_rejects_invalid_ranges() { + let invalid_threshold = r#" +[defaults.cortex] +maintenance_prune_threshold = 1.2 +"#; + let parsed: TomlConfig = + toml::from_str(invalid_threshold).expect("failed to parse invalid threshold TOML"); + assert!( + Config::from_toml(parsed, PathBuf::from(".")).is_err(), + "expected invalid maintenance_prune_threshold to be rejected" + ); + + let invalid_min_age = r#" +[defaults.cortex] +maintenance_min_age_days = -3 +"#; + let parsed: TomlConfig = + toml::from_str(invalid_min_age).expect("failed to parse invalid min age TOML"); + assert!( + Config::from_toml(parsed, PathBuf::from(".")).is_err(), + "expected negative maintenance_min_age_days to be rejected" + ); + + let invalid_agent_override = r#" +[[agents]] +id = "main" + +[agents.cortex] +maintenance_decay_rate = -0.1 +"#; + let parsed: TomlConfig = + toml::from_str(invalid_agent_override).expect("failed to parse invalid agent TOML"); + assert!( + Config::from_toml(parsed, PathBuf::from(".")).is_err(), + "expected invalid agent maintenance_decay_rate to be rejected" + ); + + let invalid_interval = r#" +[defaults.cortex] +maintenance_interval_secs = 0 +"#; + let parsed: TomlConfig = + toml::from_str(invalid_interval).expect("failed to parse invalid interval TOML"); + assert!( + Config::from_toml(parsed, PathBuf::from(".")).is_err(), + "expected maintenance_interval_secs = 0 to be rejected" + ); + + let invalid_merge_similarity_low = r#" +[defaults.cortex] +maintenance_merge_similarity_threshold = -0.1 +"#; + let parsed: TomlConfig = toml::from_str(invalid_merge_similarity_low) + .expect("failed to parse invalid low merge similarity TOML"); + assert!( + Config::from_toml(parsed, PathBuf::from(".")).is_err(), + "expected low maintenance_merge_similarity_threshold to be rejected" + ); + + let invalid_merge_similarity_high = r#" +[defaults.cortex] +maintenance_merge_similarity_threshold = 1.1 +"#; + let parsed: TomlConfig = toml::from_str(invalid_merge_similarity_high) + .expect("failed to parse invalid high merge similarity TOML"); + assert!( + Config::from_toml(parsed, PathBuf::from(".")).is_err(), + "expected high maintenance_merge_similarity_threshold to be rejected" + ); + } + #[test] fn test_work_readiness_requires_warm_state() { let readiness = evaluate_work_readiness( diff --git a/src/config/load.rs b/src/config/load.rs index f39b3ecc9..3da09e61f 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -142,8 +142,17 @@ fn resolve_close_policy( } impl CortexConfig { - fn resolve(overrides: TomlCortexConfig, defaults: CortexConfig) -> CortexConfig { - CortexConfig { + fn resolve(overrides: TomlCortexConfig, defaults: CortexConfig) -> Result { + let maintenance_interval_secs = overrides + .maintenance_interval_secs + .unwrap_or(defaults.maintenance_interval_secs); + if maintenance_interval_secs < 1 { + return Err( + ConfigError::Invalid("maintenance_interval_secs must be >= 1".to_string()).into(), + ); + } + + let config = CortexConfig { tick_interval_secs: overrides .tick_interval_secs .unwrap_or(defaults.tick_interval_secs), @@ -171,6 +180,19 @@ impl CortexConfig { bulletin_max_turns: overrides .bulletin_max_turns .unwrap_or(defaults.bulletin_max_turns), + maintenance_interval_secs, + maintenance_decay_rate: overrides + .maintenance_decay_rate + .unwrap_or(defaults.maintenance_decay_rate), + maintenance_prune_threshold: overrides + .maintenance_prune_threshold + .unwrap_or(defaults.maintenance_prune_threshold), + maintenance_min_age_days: overrides + .maintenance_min_age_days + .unwrap_or(defaults.maintenance_min_age_days), + maintenance_merge_similarity_threshold: overrides + .maintenance_merge_similarity_threshold + .unwrap_or(defaults.maintenance_merge_similarity_threshold), association_interval_secs: overrides .association_interval_secs .unwrap_or(defaults.association_interval_secs), @@ -183,7 +205,9 @@ impl CortexConfig { association_max_per_pass: overrides .association_max_per_pass .unwrap_or(defaults.association_max_per_pass), - } + }; + config.validate_maintenance_bounds()?; + Ok(config) } } @@ -1418,6 +1442,7 @@ impl Config { .defaults .cortex .map(|c| CortexConfig::resolve(c, base_defaults.cortex)) + .transpose()? .unwrap_or(base_defaults.cortex), warmup: toml .defaults @@ -1634,7 +1659,10 @@ impl Config { .unwrap_or(defaults.ingestion.poll_interval_secs), chunk_size: ig.chunk_size.unwrap_or(defaults.ingestion.chunk_size), }), - cortex: a.cortex.map(|c| CortexConfig::resolve(c, defaults.cortex)), + cortex: a + .cortex + .map(|c| CortexConfig::resolve(c, defaults.cortex)) + .transpose()?, warmup: a.warmup.map(|w| WarmupConfig { enabled: w.enabled.unwrap_or(defaults.warmup.enabled), eager_embedding_load: w diff --git a/src/config/toml_schema.rs b/src/config/toml_schema.rs index d105a7384..969333053 100644 --- a/src/config/toml_schema.rs +++ b/src/config/toml_schema.rs @@ -351,6 +351,11 @@ pub(super) struct TomlCortexConfig { pub(super) bulletin_interval_secs: Option, pub(super) bulletin_max_words: Option, pub(super) bulletin_max_turns: Option, + pub(super) maintenance_interval_secs: Option, + pub(super) maintenance_decay_rate: Option, + pub(super) maintenance_prune_threshold: Option, + pub(super) maintenance_min_age_days: Option, + pub(super) maintenance_merge_similarity_threshold: Option, pub(super) association_interval_secs: Option, pub(super) association_similarity_threshold: Option, pub(super) association_updates_threshold: Option, diff --git a/src/config/types.rs b/src/config/types.rs index 88fd09497..6c6111f66 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -841,6 +841,16 @@ pub struct CortexConfig { pub bulletin_max_words: usize, /// Max LLM turns for bulletin generation. pub bulletin_max_turns: usize, + /// Interval in seconds between memory maintenance passes. + pub maintenance_interval_secs: u64, + /// Per-day decay applied to memory importance during maintenance. + pub maintenance_decay_rate: f32, + /// Minimum importance score for non-identity memories to avoid pruning. + pub maintenance_prune_threshold: f32, + /// Minimum age in days before a memory becomes prune-eligible. + pub maintenance_min_age_days: i64, + /// Similarity threshold above which memories are merged as near-duplicates. + pub maintenance_merge_similarity_threshold: f32, /// Interval in seconds between association passes. pub association_interval_secs: u64, /// Minimum cosine similarity to create a RelatedTo edge. @@ -863,6 +873,11 @@ impl Default for CortexConfig { bulletin_interval_secs: 3600, bulletin_max_words: 1500, bulletin_max_turns: 15, + maintenance_interval_secs: 3600, + maintenance_decay_rate: 0.05, + maintenance_prune_threshold: 0.1, + maintenance_min_age_days: 30, + maintenance_merge_similarity_threshold: 0.95, association_interval_secs: 300, association_similarity_threshold: 0.85, association_updates_threshold: 0.95, @@ -871,6 +886,44 @@ impl Default for CortexConfig { } } +impl CortexConfig { + /// Validate maintenance tuning bounds used by pruning/merge logic. + pub fn validate_maintenance_bounds(&self) -> Result<()> { + validate_unit_interval_f32("maintenance_decay_rate", self.maintenance_decay_rate)?; + validate_unit_interval_f32( + "maintenance_prune_threshold", + self.maintenance_prune_threshold, + )?; + validate_unit_interval_f32( + "maintenance_merge_similarity_threshold", + self.maintenance_merge_similarity_threshold, + )?; + if self.maintenance_min_age_days < 0 { + return Err(ConfigError::Invalid(format!( + "maintenance_min_age_days must be >= 0, got {}", + self.maintenance_min_age_days + )) + .into()); + } + if self.maintenance_interval_secs == 0 { + return Err( + ConfigError::Invalid("maintenance_interval_secs must be >= 1".to_string()).into(), + ); + } + Ok(()) + } +} + +fn validate_unit_interval_f32(name: &str, value: f32) -> Result<()> { + if !value.is_finite() || !(0.0..=1.0).contains(&value) { + return Err(ConfigError::Invalid(format!( + "{name} must be finite and between 0.0 and 1.0, got {value}" + )) + .into()); + } + Ok(()) +} + /// Warmup configuration. #[derive(Debug, Clone, Copy)] pub struct WarmupConfig { diff --git a/src/main.rs b/src/main.rs index 713802d78..1c09da7d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1762,7 +1762,7 @@ async fn run( agent.config.logs_dir(), snapshot_store, ); - agent + let channel_registration_id = agent .deps .process_control_registry .register_channel(channel.id.clone(), channel.control_handle().downgrade()) @@ -1859,7 +1859,7 @@ async fn run( let scoped_channel_id: spacebot::ChannelId = Arc::from(cleanup_channel_id.as_str()); process_control_registry - .unregister_channel(&scoped_channel_id) + .unregister_channel(&scoped_channel_id, channel_registration_id) .await; api_state_for_cleanup .unregister_channel_status(&cleanup_channel_id) @@ -2049,7 +2049,7 @@ async fn run( agent.config.logs_dir(), snapshot_store, ); - agent + let channel_registration_id = agent .deps .process_control_registry .register_channel(channel.id.clone(), channel.control_handle().downgrade()) @@ -2105,7 +2105,7 @@ async fn run( let scoped_channel_id: spacebot::ChannelId = Arc::from(cleanup_channel_id.as_str()); process_control_registry - .unregister_channel(&scoped_channel_id) + .unregister_channel(&scoped_channel_id, channel_registration_id) .await; api_state_for_cleanup .unregister_channel_status(&cleanup_channel_id) diff --git a/src/memory/maintenance.rs b/src/memory/maintenance.rs index 64143ac62..67b935497 100644 --- a/src/memory/maintenance.rs +++ b/src/memory/maintenance.rs @@ -1,8 +1,21 @@ //! Memory maintenance: decay, prune, merge, reindex. use crate::error::Result; -use crate::memory::MemoryStore; -use crate::memory::types::MemoryType; +use crate::memory::{EmbeddingModel, EmbeddingTable, Memory, MemoryStore, MemoryType}; +use anyhow::Context; + +use sqlx::Row; +use sqlx::sqlite::SqliteRow; +use tokio::sync::watch; + +use std::collections::HashSet; +use std::future::Future; +use std::sync::Arc; + +const MAX_MAINTENANCE_MERGE_SOURCE_MEMORIES: i64 = 2_000; +const MAX_MAINTENANCE_MERGES_PER_PASS: usize = 500; +const MAX_MAINTENANCE_SIMILAR_CANDIDATES: usize = 25; +const MAX_MERGED_MEMORY_CONTENT_BYTES: usize = 50_000; /// Maintenance configuration. #[derive(Debug, Clone)] @@ -31,25 +44,63 @@ impl Default for MaintenanceConfig { /// Run maintenance tasks on the memory store. pub async fn run_maintenance( memory_store: &MemoryStore, + embedding_table: &EmbeddingTable, + embedding_model: &Arc, + config: &MaintenanceConfig, +) -> Result { + let (_maintenance_cancel_tx, maintenance_cancel_rx) = watch::channel(false); + run_maintenance_with_cancel( + memory_store, + embedding_table, + embedding_model, + config, + maintenance_cancel_rx, + ) + .await +} + +/// Run maintenance tasks with a cancellation signal. +/// +/// The signal allows maintenance to exit quickly when the caller decides to stop it. +pub async fn run_maintenance_with_cancel( + memory_store: &MemoryStore, + embedding_table: &EmbeddingTable, + embedding_model: &Arc, config: &MaintenanceConfig, + mut maintenance_cancel_rx: watch::Receiver, ) -> Result { let mut report = MaintenanceReport::default(); + check_maintenance_cancellation(&mut maintenance_cancel_rx).await?; + validate_maintenance_config(config)?; // Apply decay to all non-identity memories // Fields are assigned sequentially because the values are async — can't use struct literal. #[allow(clippy::field_reassign_with_default)] { - report.decayed = apply_decay(memory_store, config.decay_rate).await?; - report.pruned = prune_memories(memory_store, config).await?; - report.merged = - merge_similar_memories(memory_store, config.merge_similarity_threshold).await?; + report.decayed = + apply_decay(memory_store, config.decay_rate, &mut maintenance_cancel_rx).await?; + report.pruned = prune_memories(memory_store, config, &mut maintenance_cancel_rx).await?; + report.merged = merge_similar_memories( + memory_store, + embedding_table, + embedding_model, + config.merge_similarity_threshold, + &mut maintenance_cancel_rx, + ) + .await?; } Ok(report) } /// Apply importance decay based on recency and access patterns. -async fn apply_decay(memory_store: &MemoryStore, decay_rate: f32) -> Result { +async fn apply_decay( + memory_store: &MemoryStore, + decay_rate: f32, + maintenance_cancel_rx: &mut watch::Receiver, +) -> Result { + check_maintenance_cancellation(maintenance_cancel_rx).await?; + // Get all non-identity memories let all_types: Vec<_> = MemoryType::ALL .iter() @@ -60,9 +111,15 @@ async fn apply_decay(memory_store: &MemoryStore, decay_rate: f32) -> Result Result 0.01 { memory.importance = new_importance.clamp(0.0, 1.0); memory.updated_at = now; - memory_store.update(&memory).await?; + maintenance_cancelable_op(maintenance_cancel_rx, memory_store.update(&memory)) + .await?; decayed_count += 1; } } @@ -92,30 +150,40 @@ async fn apply_decay(memory_store: &MemoryStore, decay_rate: f32) -> Result Result { +async fn prune_memories( + memory_store: &MemoryStore, + config: &MaintenanceConfig, + maintenance_cancel_rx: &mut watch::Receiver, +) -> Result { + check_maintenance_cancellation(maintenance_cancel_rx).await?; + let now = chrono::Utc::now(); let min_age = chrono::Duration::days(config.min_age_days); let cutoff_date = now - min_age; // Get all memories below threshold that are old enough - let candidates = sqlx::query( - r#" + let candidates: Vec = maintenance_cancelable_op( + maintenance_cancel_rx, + sqlx::query( + r#" SELECT id FROM memories WHERE importance < ? AND memory_type != 'identity' AND created_at < ? "#, + ) + .bind(config.prune_threshold) + .bind(cutoff_date) + .fetch_all(memory_store.pool()), ) - .bind(config.prune_threshold) - .bind(cutoff_date) - .fetch_all(memory_store.pool()) .await?; let mut pruned_count = 0; for row in candidates { - let id: String = sqlx::Row::try_get(&row, "id")?; - memory_store.delete(&id).await?; + let id: String = row.try_get("id")?; + check_maintenance_cancellation(maintenance_cancel_rx).await?; + maintenance_cancelable_op(maintenance_cancel_rx, memory_store.delete(&id)).await?; pruned_count += 1; } @@ -124,16 +192,288 @@ async fn prune_memories(memory_store: &MemoryStore, config: &MaintenanceConfig) /// Merge near-duplicate memories. async fn merge_similar_memories( - _memory_store: &MemoryStore, + memory_store: &MemoryStore, + embedding_table: &EmbeddingTable, + embedding_model: &Arc, similarity_threshold: f32, + maintenance_cancel_rx: &mut watch::Receiver, ) -> Result { - // For now, this is a placeholder - // Full implementation would: - // 1. Find pairs of memories with high embedding similarity - // 2. Merge them, keeping the higher importance one - // 3. Update associations to point to the merged memory - let _ = similarity_threshold; - Ok(0) + let memory_ids = fetch_candidate_memory_ids(memory_store, maintenance_cancel_rx).await?; + if memory_ids.is_empty() { + return Ok(0); + } + + let mut merged_count = 0_usize; + let mut merged_memory_ids = HashSet::new(); + + for source_id in memory_ids { + if merged_count >= MAX_MAINTENANCE_MERGES_PER_PASS { + break; + } + check_maintenance_cancellation(maintenance_cancel_rx).await?; + + if merged_memory_ids.contains(&source_id) { + continue; + } + + let Some(source_memory) = + maintenance_cancelable_op(maintenance_cancel_rx, memory_store.load(&source_id)).await? + else { + continue; + }; + if source_memory.forgotten { + continue; + } + let source_id = source_memory.id.clone(); + if merged_memory_ids.contains(&source_id) { + continue; + } + + let similar = maintenance_cancelable_op( + maintenance_cancel_rx, + embedding_table.find_similar( + &source_memory.id, + similarity_threshold, + MAX_MAINTENANCE_SIMILAR_CANDIDATES, + ), + ) + .await + .with_context(|| { + format!( + "failed to lookup similar memories during maintenance for source memory {}", + source_memory.id + ) + })?; + + if similar.is_empty() { + continue; + } + + let mut active_survivor = source_memory; + let mut source_merged = false; + + for (candidate_id, _similarity) in similar { + if merged_count >= MAX_MAINTENANCE_MERGES_PER_PASS { + break; + } + check_maintenance_cancellation(maintenance_cancel_rx).await?; + if merged_memory_ids.contains(&candidate_id) || candidate_id == active_survivor.id { + continue; + } + + let Some(candidate_memory) = + maintenance_cancelable_op(maintenance_cancel_rx, memory_store.load(&candidate_id)) + .await? + else { + continue; + }; + if candidate_memory.forgotten { + continue; + } + + let (winner, loser) = choose_merge_pair(&active_survivor, &candidate_memory); + let merged_survivor = merge_pair( + memory_store, + embedding_table, + embedding_model, + &winner, + &loser, + maintenance_cancel_rx, + ) + .await?; + merged_memory_ids.insert(loser.id.clone()); + merged_count += 1; + + if loser.id == source_id { + source_merged = true; + break; + } + + active_survivor = merged_survivor; + } + + if source_merged { + continue; + } + } + + Ok(merged_count) +} + +fn choose_merge_pair(first: &Memory, second: &Memory) -> (Memory, Memory) { + let first_wins = first.importance > second.importance + || (first.importance == second.importance && first.id < second.id); + + if first_wins { + (first.clone(), second.clone()) + } else { + (second.clone(), first.clone()) + } +} + +fn merged_memory_content(winner: String, loser: &str) -> String { + let winner_trimmed = winner.trim_end(); + let loser_trimmed = loser.trim_end(); + + if loser_trimmed.is_empty() { + return winner_trimmed.to_string(); + } + + if winner_trimmed.contains(loser_trimmed) { + return winner_trimmed.to_string(); + } + + let merged = if winner_trimmed.is_empty() { + loser_trimmed.to_string() + } else { + format!("{winner_trimmed}\n\n{loser_trimmed}") + }; + + if merged.len() <= MAX_MERGED_MEMORY_CONTENT_BYTES { + return merged; + } + + let boundary = merged.floor_char_boundary(MAX_MERGED_MEMORY_CONTENT_BYTES); + merged[..boundary].to_string() +} + +async fn merge_pair( + memory_store: &MemoryStore, + embedding_table: &EmbeddingTable, + embedding_model: &Arc, + survivor: &Memory, + merged: &Memory, + maintenance_cancel_rx: &mut watch::Receiver, +) -> Result { + check_maintenance_cancellation(maintenance_cancel_rx).await?; + + let mut updated_survivor = survivor.clone(); + updated_survivor.content = merged_memory_content(updated_survivor.content, &merged.content); + updated_survivor.updated_at = chrono::Utc::now(); + + maintenance_cancelable_op( + maintenance_cancel_rx, + memory_store.merge_memories_atomic(&updated_survivor, merged), + ) + .await?; + + let updated_survivor_embedding = maintenance_cancelable_op( + maintenance_cancel_rx, + embedding_model.embed_one(&updated_survivor.content), + ) + .await?; + maintenance_cancelable_op( + maintenance_cancel_rx, + embedding_table.delete(&updated_survivor.id), + ) + .await?; + maintenance_cancelable_op( + maintenance_cancel_rx, + embedding_table.store( + &updated_survivor.id, + &updated_survivor.content, + &updated_survivor_embedding, + ), + ) + .await?; + maintenance_cancelable_op(maintenance_cancel_rx, embedding_table.delete(&merged.id)).await?; + Ok(updated_survivor) +} + +async fn fetch_candidate_memory_ids( + memory_store: &MemoryStore, + maintenance_cancel_rx: &mut watch::Receiver, +) -> Result> { + check_maintenance_cancellation(maintenance_cancel_rx).await?; + + let rows: Vec = maintenance_cancelable_op( + maintenance_cancel_rx, + sqlx::query( + "SELECT id FROM memories WHERE forgotten = 0 ORDER BY importance DESC, created_at DESC, id ASC LIMIT ?", + ) + .bind(MAX_MAINTENANCE_MERGE_SOURCE_MEMORIES) + .fetch_all(memory_store.pool()), + ) + .await + .with_context(|| "failed to fetch candidate memories for maintenance")?; + + let ids: Vec = rows + .into_iter() + .map(|row| { + let memory_id: String = row.get("id"); + memory_id + }) + .collect(); + + Ok(ids) +} + +fn validate_maintenance_config(config: &MaintenanceConfig) -> Result<()> { + validate_unit_interval("prune_threshold", config.prune_threshold)?; + validate_unit_interval("decay_rate", config.decay_rate)?; + validate_unit_interval( + "merge_similarity_threshold", + config.merge_similarity_threshold, + )?; + if config.min_age_days < 0 { + return Err(anyhow::anyhow!( + "maintenance min_age_days must be >= 0, got {}", + config.min_age_days + ) + .into()); + } + Ok(()) +} + +fn validate_unit_interval(name: &str, value: f32) -> Result<()> { + if !value.is_finite() || !(0.0..=1.0).contains(&value) { + return Err(anyhow::anyhow!( + "maintenance {name} must be finite and between 0.0 and 1.0, got {value}" + ) + .into()); + } + Ok(()) +} + +async fn check_maintenance_cancellation( + maintenance_cancel_rx: &mut watch::Receiver, +) -> Result<()> { + if *maintenance_cancel_rx.borrow() { + return Err(anyhow::anyhow!("memory maintenance cancelled").into()); + } + + if maintenance_cancel_rx + .has_changed() + .map_err(|error| anyhow::anyhow!(error))? + { + maintenance_cancel_rx + .changed() + .await + .map_err(|error| anyhow::anyhow!(error))?; + if *maintenance_cancel_rx.borrow() { + return Err(anyhow::anyhow!("memory maintenance cancelled").into()); + } + } + + Ok(()) +} + +async fn maintenance_cancelable_op( + maintenance_cancel_rx: &mut watch::Receiver, + operation: impl Future>, +) -> Result +where + E: Into, +{ + check_maintenance_cancellation(maintenance_cancel_rx).await?; + + tokio::select! { + biased; + _ = maintenance_cancel_rx.changed() => { + Err(anyhow::anyhow!("memory maintenance cancelled").into()) + } + result = operation => result.map_err(Into::into), + } } /// Maintenance report. @@ -143,3 +483,407 @@ pub struct MaintenanceReport { pub pruned: usize, pub merged: usize, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::memory::{Association, RelationType}; + use std::sync::{Arc, OnceLock}; + use tempfile::tempdir; + use tokio::time::Duration; + + fn shared_embedding_model() -> Arc { + static MODEL: OnceLock> = OnceLock::new(); + Arc::clone(MODEL.get_or_init(|| { + let cache_dir = std::env::temp_dir().join("spacebot-test-embedding-cache"); + std::fs::create_dir_all(&cache_dir).expect("failed to create embedding cache dir"); + Arc::new( + crate::memory::EmbeddingModel::new(&cache_dir) + .expect("failed to initialize embedding model"), + ) + })) + } + + async fn create_memory_with_embedding( + store: &MemoryStore, + embedding_table: &crate::memory::lance::EmbeddingTable, + content: &str, + memory_type: MemoryType, + importance: f32, + embedding: Vec, + ) -> Memory { + let memory = Memory::new(content, memory_type).with_importance(importance); + store.save(&memory).await.expect("failed to save memory"); + + embedding_table + .store(&memory.id, &memory.content, &embedding) + .await + .expect("failed to store embedding"); + + memory + } + + #[tokio::test] + async fn merges_near_duplicate_memories_and_transfers_associations() { + let store = MemoryStore::connect_in_memory().await; + + let dir = tempdir().expect("failed to create temp dir"); + let lance_conn = lancedb::connect(dir.path().to_str().expect("temp path")) + .execute() + .await + .expect("failed to connect to lancedb"); + let embedding_table = crate::memory::EmbeddingTable::open_or_create(&lance_conn) + .await + .expect("failed to create embedding table"); + + let survivor = create_memory_with_embedding( + &store, + &embedding_table, + "rust memory maintenance", + MemoryType::Fact, + 0.9, + vec![1.0; 384], + ) + .await; + + let duplicate = create_memory_with_embedding( + &store, + &embedding_table, + "rust memory maintenance updated", + MemoryType::Fact, + 0.4, + vec![1.0; 384], + ) + .await; + + let related = create_memory_with_embedding( + &store, + &embedding_table, + "related memory", + MemoryType::Fact, + 0.7, + vec![0.0; 384], + ) + .await; + + store + .create_association(&Association::new( + &duplicate.id, + &related.id, + RelationType::RelatedTo, + )) + .await + .expect("failed to create related-to association"); + + store + .create_association(&Association::new( + &related.id, + &duplicate.id, + RelationType::PartOf, + )) + .await + .expect("failed to create part-of association"); + + let config = super::MaintenanceConfig { + prune_threshold: 0.2, + decay_rate: 0.05, + min_age_days: 30, + merge_similarity_threshold: 0.95, + }; + + let embedding_model = shared_embedding_model(); + let report = run_maintenance(&store, &embedding_table, &embedding_model, &config) + .await + .expect("maintenance should succeed"); + + assert_eq!(report.merged, 1); + + let updated_survivor = store + .load(&survivor.id) + .await + .expect("failed to load survivor") + .expect("survivor should exist"); + assert_eq!(updated_survivor.id, survivor.id); + assert!( + updated_survivor + .content + .contains("rust memory maintenance updated") + ); + + let forgotten_duplicate = store + .load(&duplicate.id) + .await + .expect("failed to load duplicate") + .expect("duplicate should still exist"); + assert!(forgotten_duplicate.forgotten); + + let duplicate_embeddings = embedding_table + .find_similar(&duplicate.id, 0.0, 10) + .await + .expect("failed to search for missing duplicate embeddings"); + assert!(duplicate_embeddings.is_empty()); + + let survivor_associations = store + .get_associations(&survivor.id) + .await + .expect("failed to fetch survivor associations"); + + let has_updates = survivor_associations.iter().any(|assoc| { + assoc.source_id == survivor.id + && assoc.target_id == duplicate.id + && assoc.relation_type == RelationType::Updates + }); + assert!(has_updates); + + assert!( + survivor_associations + .iter() + .any(|assoc| { assoc.source_id == survivor.id && assoc.target_id == related.id }) + ); + + assert!( + survivor_associations + .iter() + .any(|assoc| assoc.source_id == related.id && assoc.target_id == survivor.id) + ); + + let duplicate_associations = store + .get_associations(&duplicate.id) + .await + .expect("failed to load duplicate associations"); + assert_eq!(duplicate_associations.len(), 1); + assert_eq!( + duplicate_associations[0].source_id, survivor.id, + "only expected survivor updates edge to duplicate after merge" + ); + assert_eq!(duplicate_associations[0].target_id, duplicate.id); + assert_eq!( + duplicate_associations[0].relation_type, + RelationType::Updates + ); + } + + #[tokio::test] + async fn merges_multiple_duplicates_into_one_survivor_in_single_pass() { + let store = MemoryStore::connect_in_memory().await; + + let dir = tempdir().expect("failed to create temp dir"); + let lance_conn = lancedb::connect(dir.path().to_str().expect("temp path")) + .execute() + .await + .expect("failed to connect to lancedb"); + let embedding_table = crate::memory::EmbeddingTable::open_or_create(&lance_conn) + .await + .expect("failed to create embedding table"); + + let survivor = create_memory_with_embedding( + &store, + &embedding_table, + "durable rust maintenance note", + MemoryType::Fact, + 0.9, + vec![1.0; 384], + ) + .await; + + let duplicate_a = create_memory_with_embedding( + &store, + &embedding_table, + "durable rust maintenance note update A", + MemoryType::Fact, + 0.6, + vec![1.0; 384], + ) + .await; + let duplicate_b = create_memory_with_embedding( + &store, + &embedding_table, + "durable rust maintenance note update B", + MemoryType::Fact, + 0.5, + vec![1.0; 384], + ) + .await; + + let related_a = create_memory_with_embedding( + &store, + &embedding_table, + "related A", + MemoryType::Fact, + 0.7, + { + let mut embedding = vec![0.0; 384]; + embedding[0] = 1.0; + embedding + }, + ) + .await; + let related_b = create_memory_with_embedding( + &store, + &embedding_table, + "related B", + MemoryType::Fact, + 0.7, + { + let mut embedding = vec![0.0; 384]; + embedding[1] = 1.0; + embedding + }, + ) + .await; + + store + .create_association(&Association::new( + &duplicate_a.id, + &related_a.id, + RelationType::RelatedTo, + )) + .await + .expect("failed to create duplicate_a association"); + store + .create_association(&Association::new( + &related_b.id, + &duplicate_b.id, + RelationType::PartOf, + )) + .await + .expect("failed to create duplicate_b association"); + + let embedding_model = shared_embedding_model(); + let report = run_maintenance( + &store, + &embedding_table, + &embedding_model, + &MaintenanceConfig { + prune_threshold: 0.2, + decay_rate: 0.05, + min_age_days: 30, + merge_similarity_threshold: 0.95, + }, + ) + .await + .expect("maintenance should succeed"); + + assert_eq!(report.merged, 2); + + let refreshed_survivor = store + .load(&survivor.id) + .await + .expect("failed to load survivor") + .expect("survivor should exist"); + assert!( + refreshed_survivor + .content + .contains("durable rust maintenance note update A") + ); + assert!( + refreshed_survivor + .content + .contains("durable rust maintenance note update B") + ); + + for duplicate_id in [&duplicate_a.id, &duplicate_b.id] { + let duplicate = store + .load(duplicate_id) + .await + .expect("failed to load duplicate") + .expect("duplicate should exist"); + assert!(duplicate.forgotten); + + let duplicate_embeddings = embedding_table + .find_similar(duplicate_id, 0.0, 10) + .await + .expect("failed to search duplicate embeddings"); + assert!(duplicate_embeddings.is_empty()); + } + + let survivor_associations = store + .get_associations(&survivor.id) + .await + .expect("failed to load survivor associations"); + assert!( + survivor_associations.iter().any(|association| { + association.source_id == survivor.id && association.target_id == related_a.id + }), + "expected duplicate_a associations to rewire to survivor" + ); + assert!( + survivor_associations.iter().any(|association| { + association.source_id == related_b.id && association.target_id == survivor.id + }), + "expected duplicate_b associations to rewire to survivor" + ); + } + + #[tokio::test] + async fn run_maintenance_with_cancel_stops_when_cancel_requested() { + let store = MemoryStore::connect_in_memory().await; + + let dir = tempdir().expect("failed to create temp dir"); + let lance_conn = lancedb::connect(dir.path().to_str().expect("temp path")) + .execute() + .await + .expect("failed to connect to lancedb"); + let embedding_table = crate::memory::EmbeddingTable::open_or_create(&lance_conn) + .await + .expect("failed to create embedding table"); + + let (_cancel_tx, maintenance_cancel_rx) = tokio::sync::watch::channel(true); + let embedding_model = shared_embedding_model(); + let result = run_maintenance_with_cancel( + &store, + &embedding_table, + &embedding_model, + &MaintenanceConfig::default(), + maintenance_cancel_rx, + ) + .await; + + assert!( + result.is_err(), + "maintenance should stop immediately when cancellation is requested" + ); + assert!( + result + .as_ref() + .unwrap_err() + .to_string() + .contains("memory maintenance cancelled"), + "expected explicit maintenance cancellation error" + ); + + tokio::time::sleep(Duration::from_millis(10)).await; + } + + #[tokio::test] + async fn run_maintenance_rejects_invalid_configuration_ranges() { + let store = MemoryStore::connect_in_memory().await; + let dir = tempdir().expect("failed to create temp dir"); + let lance_conn = lancedb::connect(dir.path().to_str().expect("temp path")) + .execute() + .await + .expect("failed to connect to lancedb"); + let embedding_table = crate::memory::EmbeddingTable::open_or_create(&lance_conn) + .await + .expect("failed to create embedding table"); + + let invalid_config = MaintenanceConfig { + prune_threshold: 0.2, + decay_rate: 0.05, + min_age_days: -1, + merge_similarity_threshold: 0.95, + }; + + let embedding_model = shared_embedding_model(); + let result = + run_maintenance(&store, &embedding_table, &embedding_model, &invalid_config).await; + assert!(result.is_err(), "expected invalid config to fail"); + assert!( + result + .as_ref() + .unwrap_err() + .to_string() + .contains("min_age_days must be >= 0") + ); + } +} diff --git a/src/memory/store.rs b/src/memory/store.rs index 31c656cb5..36b104a81 100644 --- a/src/memory/store.rs +++ b/src/memory/store.rs @@ -259,6 +259,131 @@ impl MemoryStore { Ok(result.rows_affected() > 0) } + /// Merge one memory into a survivor with atomic SQLite updates. + /// + /// This updates survivor content/metadata, rewires associations, records an + /// Updates edge, and marks the merged memory forgotten in one transaction. + pub async fn merge_memories_atomic( + &self, + updated_survivor: &Memory, + merged_memory: &Memory, + ) -> Result<()> { + let mut transaction = self + .pool + .begin() + .await + .with_context(|| "failed to start memory merge transaction")?; + + sqlx::query( + r#" + UPDATE memories + SET content = ?, memory_type = ?, importance = ?, updated_at = ?, + last_accessed_at = ?, access_count = ?, source = ?, channel_id = ?, + forgotten = ? + WHERE id = ? + "#, + ) + .bind(&updated_survivor.content) + .bind(updated_survivor.memory_type.to_string()) + .bind(updated_survivor.importance) + .bind(updated_survivor.updated_at) + .bind(updated_survivor.last_accessed_at) + .bind(updated_survivor.access_count) + .bind(&updated_survivor.source) + .bind(updated_survivor.channel_id.as_ref().map(|id| id.as_ref())) + .bind(updated_survivor.forgotten) + .bind(&updated_survivor.id) + .execute(&mut *transaction) + .await + .with_context(|| format!("failed to update survivor memory {}", updated_survivor.id))?; + + sqlx::query( + r#" + INSERT INTO associations (id, source_id, target_id, relation_type, weight, created_at) + SELECT + lower(hex(randomblob(16))), + CASE WHEN source_id = ?2 THEN ?1 ELSE source_id END, + CASE WHEN target_id = ?2 THEN ?1 ELSE target_id END, + relation_type, + weight, + created_at + FROM associations + WHERE (source_id = ?2 OR target_id = ?2) + AND source_id != ?1 + AND CASE WHEN source_id = ?2 THEN ?1 ELSE source_id END != CASE WHEN target_id = ?2 THEN ?1 ELSE target_id END + ON CONFLICT(source_id, target_id, relation_type) DO UPDATE SET + weight = excluded.weight + "#, + ) + .bind(&updated_survivor.id) + .bind(&merged_memory.id) + .execute(&mut *transaction) + .await + .with_context(|| { + format!( + "failed to rewire associations while merging {} into {}", + merged_memory.id, updated_survivor.id + ) + })?; + + sqlx::query("DELETE FROM associations WHERE source_id = ? OR target_id = ?") + .bind(&merged_memory.id) + .bind(&merged_memory.id) + .execute(&mut *transaction) + .await + .with_context(|| { + format!( + "failed to delete associations for merged memory {}", + merged_memory.id + ) + })?; + + let updates_association = Association::new( + &updated_survivor.id, + &merged_memory.id, + RelationType::Updates, + ) + .with_weight(1.0); + sqlx::query( + r#" + INSERT INTO associations (id, source_id, target_id, relation_type, weight, created_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(source_id, target_id, relation_type) DO UPDATE SET + weight = excluded.weight + "#, + ) + .bind(&updates_association.id) + .bind(&updates_association.source_id) + .bind(&updates_association.target_id) + .bind(updates_association.relation_type.to_string()) + .bind(updates_association.weight) + .bind(updates_association.created_at) + .execute(&mut *transaction) + .await + .with_context(|| { + format!( + "failed to create updates association {} -> {}", + updated_survivor.id, merged_memory.id + ) + })?; + + sqlx::query( + "UPDATE memories SET forgotten = 1, updated_at = ? WHERE id = ? AND forgotten = 0", + ) + .bind(chrono::Utc::now()) + .bind(&merged_memory.id) + .execute(&mut *transaction) + .await + .with_context(|| format!("failed to forget merged memory {}", merged_memory.id))?; + + transaction + .commit() + .await + .with_context(|| "failed to commit memory merge transaction")?; + + Ok(()) + } + /// Create an association between two memories. pub async fn create_association(&self, association: &Association) -> Result<()> { sqlx::query( @@ -310,6 +435,18 @@ impl MemoryStore { Ok(associations) } + /// Delete all associations referencing this memory. + pub async fn delete_associations_for_memory(&self, memory_id: &str) -> Result { + let result = sqlx::query("DELETE FROM associations WHERE source_id = ? OR target_id = ?") + .bind(memory_id) + .bind(memory_id) + .execute(&self.pool) + .await + .with_context(|| format!("failed to delete associations for memory {}", memory_id))?; + + Ok(result.rows_affected()) + } + /// Get all associations where both source and target are in the provided set. /// Used by the graph view to fetch edges between a known set of visible nodes. pub async fn get_associations_between( diff --git a/tests/maintenance.rs b/tests/maintenance.rs new file mode 100644 index 000000000..8bdf566ab --- /dev/null +++ b/tests/maintenance.rs @@ -0,0 +1,239 @@ +//! Memory maintenance integration coverage. + +use spacebot::memory::maintenance::{run_maintenance, run_maintenance_with_cancel}; +use spacebot::memory::{MemoryStore, RelationType, maintenance::MaintenanceConfig}; +use std::sync::{Arc, OnceLock}; +use tempfile::tempdir; +use tokio::sync::watch; + +fn shared_embedding_model() -> Arc { + static MODEL: OnceLock> = OnceLock::new(); + Arc::clone(MODEL.get_or_init(|| { + let cache_dir = std::env::temp_dir().join("spacebot-test-embedding-cache"); + std::fs::create_dir_all(&cache_dir).expect("failed to create embedding cache dir"); + Arc::new( + spacebot::memory::EmbeddingModel::new(&cache_dir) + .expect("failed to initialize embedding model"), + ) + })) +} + +async fn make_memory_maintenance_fixture() -> ( + std::sync::Arc, + spacebot::memory::EmbeddingTable, + Arc, + tempfile::TempDir, +) { + let options = sqlx::sqlite::SqliteConnectOptions::new() + .in_memory(true) + .create_if_missing(true); + let pool = sqlx::pool::PoolOptions::::new() + .max_connections(1) + .connect_with(options) + .await + .expect("failed to connect in-memory db"); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .expect("failed to run migrations"); + + let store: std::sync::Arc = MemoryStore::new(pool); + + let dir = tempdir().expect("failed to create temp dir"); + let lance_conn = lancedb::connect(dir.path().to_str().expect("temp path")) + .execute() + .await + .expect("failed to connect to lancedb"); + let embedding_table = spacebot::memory::EmbeddingTable::open_or_create(&lance_conn) + .await + .expect("failed to create embedding table"); + + (store, embedding_table, shared_embedding_model(), dir) +} + +#[tokio::test] +async fn maintenance_run_merges_duplicate_memory_and_links_updates_edge() { + let (store, embedding_table, embedding_model, _dir_guard) = + make_memory_maintenance_fixture().await; + + let survivor = { + let memory = spacebot::memory::Memory::new( + "phase3 maintenance survivor", + spacebot::memory::MemoryType::Fact, + ) + .with_importance(0.9); + store + .save(&memory) + .await + .expect("failed to save survivor memory"); + embedding_table + .store(&memory.id, &memory.content, &vec![1.0; 384]) + .await + .expect("failed to store survivor embedding"); + memory + }; + + let duplicate = { + let memory = spacebot::memory::Memory::new( + "phase3 maintenance survivor updated", + spacebot::memory::MemoryType::Fact, + ) + .with_importance(0.4); + store + .save(&memory) + .await + .expect("failed to save duplicate memory"); + embedding_table + .store(&memory.id, &memory.content, &vec![1.0; 384]) + .await + .expect("failed to store duplicate embedding"); + memory + }; + + let related = { + let memory = + spacebot::memory::Memory::new("related memory", spacebot::memory::MemoryType::Fact) + .with_importance(0.8); + store + .save(&memory) + .await + .expect("failed to save related memory"); + embedding_table + .store(&memory.id, &memory.content, &vec![0.0; 384]) + .await + .expect("failed to store related embedding"); + memory + }; + + store + .create_association(&spacebot::memory::Association::new( + &duplicate.id, + &related.id, + spacebot::memory::RelationType::RelatedTo, + )) + .await + .expect("failed to create related association"); + + store + .create_association(&spacebot::memory::Association::new( + &related.id, + &duplicate.id, + spacebot::memory::RelationType::PartOf, + )) + .await + .expect("failed to create part-of association"); + + let report = run_maintenance( + &store, + &embedding_table, + &embedding_model, + &MaintenanceConfig { + prune_threshold: 0.2, + decay_rate: 0.05, + min_age_days: 30, + merge_similarity_threshold: 0.95, + }, + ) + .await + .expect("maintenance should succeed"); + + assert_eq!(report.merged, 1); + + let survivor_assocs = store + .get_associations(&survivor.id) + .await + .expect("failed to load survivor associations"); + let has_updates = survivor_assocs.iter().any(|association| { + association.source_id == survivor.id + && association.target_id == duplicate.id + && association.relation_type == RelationType::Updates + }); + assert!( + has_updates, + "survivor must keep updates association to merged memory" + ); + + let duplicate_assocs = store + .get_associations(&duplicate.id) + .await + .expect("failed to load duplicate associations"); + assert_eq!(duplicate_assocs.len(), 1); + assert_eq!(duplicate_assocs[0].source_id, survivor.id); + assert_eq!(duplicate_assocs[0].relation_type, RelationType::Updates); + + let forgotten = store + .load(&duplicate.id) + .await + .expect("failed to load duplicate memory") + .expect("duplicate should still exist"); + assert!(forgotten.forgotten); + + let similar_to_merged = embedding_table + .find_similar(&duplicate.id, 0.0, 10) + .await + .expect("failed to query merged embedding"); + assert!(similar_to_merged.is_empty()); +} + +#[tokio::test] +async fn maintenance_run_can_be_cancelled() { + let (store, embedding_table, embedding_model, _dir_guard) = + make_memory_maintenance_fixture().await; + let maintenance_config = MaintenanceConfig::default(); + + let (cancel_tx, cancel_rx) = watch::channel(false); + cancel_tx.send_replace(true); + + let maintenance_task = tokio::spawn(async move { + run_maintenance_with_cancel( + &store, + &embedding_table, + &embedding_model, + &maintenance_config, + cancel_rx, + ) + .await + }); + + let result = maintenance_task + .await + .expect("maintenance task should have completed"); + + assert!( + result.is_err(), + "maintenance should stop after cancellation is requested" + ); + let error_message = result.unwrap_err().to_string(); + assert!( + error_message.contains("memory maintenance cancelled"), + "expected cancellation error, got: {}", + error_message, + ); +} + +#[tokio::test] +async fn maintenance_config_validation_rejects_negative_min_age() { + let (store, embedding_table, embedding_model, _dir_guard) = + make_memory_maintenance_fixture().await; + + let result = run_maintenance( + &store, + &embedding_table, + &embedding_model, + &MaintenanceConfig { + prune_threshold: 0.2, + decay_rate: 0.05, + min_age_days: -5, + merge_similarity_threshold: 0.95, + }, + ) + .await; + + assert!(result.is_err(), "negative min_age_days must be rejected"); + let error_message = result.unwrap_err().to_string(); + assert!( + error_message.contains("min_age_days must be >= 0"), + "unexpected error message: {}", + error_message + ); +}