Skip to content

feat(memory): async pipeline + per-tree label fixes + summariser split + memory tree enabled at app startup #997

Closed
sanil-23 wants to merge 5 commits intotinyhumansai:mainfrom
sanil-23:feat/memory-tree-async-pipeline
Closed

feat(memory): async pipeline + per-tree label fixes + summariser split + memory tree enabled at app startup #997
sanil-23 wants to merge 5 commits intotinyhumansai:mainfrom
sanil-23:feat/memory-tree-async-pipeline

Conversation

@sanil-23
Copy link
Copy Markdown
Contributor

@sanil-23 sanil-23 commented Apr 27, 2026

Summary

  • Move memory-tree summariser/extractor/cascade work onto a SQLite-backed
    job queue so ingest no longer blocks on LLM calls.
  • Fix source-tree seals to populate entities/topics on every summary
    (was always empty), split the summariser ↔ extractor responsibility,
    and run the cascade as per-level jobs.
  • Fix topic-tree spawn backfill to honour the 30-day hotness recency
    window, and stop cross-polluting topic trees with auxiliary entities.
  • Fix global-tree daily/weekly/monthly nodes to inherit entity/topic
    labels via union from contributing summaries (was always empty), and
    add a manual trigger_digest RPC for catch-up.
  • Lower local-LLM summariser cap from 6000 → 3500 tokens (output
    quality cliffs above that on small instruction-tuned models).
  • Land slack/gmail canonicalisers and composio provider updates that
    feed this pipeline.

Problem

Today's memory-tree ingest does extraction, embedding, source-tree
append, cascade-seal, summariser LLM call, AND topic-tree fan-out — all
synchronously inside ingest_chat. Every chunk that crosses the seal
budget pulls a summariser call into the ingest critical section, and
topic fan-out multiplies the cost.

Underneath the queueing problem there are three real bugs in the tree
mechanics:

  1. Source-tree summary nodes had no entities/topics indexed. The
    summariser interface returned Vec::new() for both fields by design
    — entity/topic minting was deferred to "the LLM summariser when it
    lands" but nothing actually populated the fields downstream. So
    mem_tree_entity_index had zero rows with node_kind='summary' and
    "summaries that mention Alice" was a query the index couldn't
    answer. The summariser was also being asked to emit canonical
    entities, which would have corrupted the index with surface forms
    ("Alice", "she") if it had succeeded.

  2. Topic-tree spawn backfill walked every historical leaf for an
    entity, ignoring the fact that hotness has a 30-day decay cliff.
    Leaves >30d old contribute zero to current hotness yet were
    summariser-folded on every spawn, burning LLM budget on content
    whose hotness signal already decayed.

  3. Global-tree daily/weekly/monthly nodes had no entity/topic labels.
    Same root cause as (1) — the digest summariser returned empty Vecs;
    nothing attached labels. Time-based recap queries worked, but
    "days that mentioned Alice" did not.

Plus an empirical quality regression on local Ollama: the summariser
prompt templated the token budget into the system message ("stay well
under N tokens") and uncapped the output ceiling at 6000 tokens. Small
instruction-tuned models produced curt, generic output when the
budget instruction dominated the prompt, AND drifted/repeated past
~3500 tokens as they tried to extend toward the ceiling.

Solution per tree kind

Source trees

  • Per-level seal jobs. Each seal handles exactly one level and
    enqueues a parent seal job if the parent buffer's gate is now met.
    Each level is its own crash-recovery checkpoint; each LLM call
    competes for a fresh slot from the global semaphore.
  • Summariser ↔ extractor split. The summariser now produces only
    content. Its JSON schema is just { "summary": "..." } — the
    entities/topics fields it used to emit were unreliable surface
    forms anyway. Token-budget templating dropped from the system prompt
    (clamp_to_budget enforces length post-generation instead). Output
    ceiling lowered from 6000 → 3500 tokens to stay under the local-LLM
    quality cliff.
  • LabelStrategy::ExtractFromContent runs a regex + LLM extractor
    (with emit_topics: true) on the new summary's content after the
    summariser returns, populates entities (canonical_ids) and topics
    (free-form labels), and indexes them via the existing
    index_summary_entity_ids_tx. Reuses the leaf-side extractor types
    — no parallel implementation.
  • Summary-side topic_route. After a source-tree seal commits, the
    handler enqueues topic_route { NodeRef::Summary { summary_id } } so
    the summary's extracted entities feed the topic-tree spawn pipeline
    the same way leaves do.

Topic trees

  • 30-day backfill window (BACKFILL_WINDOW_DAYS = 30) aligned with
    hotness::recency_decay's cliff. Leaves older than 30 days are
    filtered out before append_leaf — they cannot have driven the spawn
    decision so backfilling them is wasted summariser cost. Older content
    remains queryable through source-tree retrieval and the entity index.
  • LabelStrategy::Empty on internal seals. The tree's scope already
    pins the canonical id this tree represents; inheriting auxiliary
    entities (e.g. Alice happens to appear in the phoenix-migration tree)
    would cross-pollinate unrelated topic trees and noise the entity
    index. Topic-tree summaries deliberately carry no labels.
  • NodeRef::{Leaf, Summary} payloads. topic_route and
    append_buffer now address either a chunk or a summary, so the same
    handler routes both leaf-level and summary-level entity hits into
    matching topic trees.

Global tree

  • LabelStrategy::UnionFromChildren for both the L0 daily fold (in
    end_of_day_digest) and the L1+ cascade. Each input is already
    labeled via the source-tree seal extractor, so union preserves
    "days/weeks/months that mentioned Alice" retrievability without an
    extra LLM call. Global is a sink; emergent themes are captured at
    the source level instead.
  • memory_tree.trigger_digest RPC. Optional date_iso (default:
    yesterday in UTC) lets admins manually fire the daily digest and
    catch up after process downtime. Idempotent on
    digest_daily:{date} — re-running for an already-emitted day is a
    no-op.
  • jobs::backfill_missing_digests(N) enqueues digests for the last
    N calendar days. Helper for "the scheduler missed days" recovery.

Async pipeline (the substrate for all three)

  • SQLite-backed mem_tree_jobs table with dedupe keys, exponential
    backoff retry, partial unique index excluding terminal states.
  • 3-worker pool with a global LLM-bound semaphore (cap=3) so the
    topic-route → spawn → backfill chain can't breach LLM concurrency
    even when it's transitively summarising.
  • New lifecycle_status column on mem_tree_chunks
    (pending_extraction → admitted | dropped → buffered) so admission
    can happen async without losing track of chunk state.
  • Daily scheduler enqueues digest_daily(yesterday) + flush_stale
    near UTC midnight; both date-keyed dedupe so restarts are safe.

Other

  • extract::build_summary_extractor honest builder returning
    Arc<dyn EntityExtractor> — no ScoringConfig thresholds
    masquerading as seal-time config.
  • Lenient day-of-week parser in email_clean::parse_message_date
    real MTAs send mismatched day-names; chrono's strict rfc2822
    rejects these. We fall back to a manual format string after
    stripping the prefix.
  • Slack/Gmail canonicalisers + composio providers + slack_ingestion
    module — the data path that feeds this pipeline.

Submission Checklist

  • Unit tests — 16+ new tests across LabelStrategy variants
    (Extract / Union / Empty), summary topic_route enqueue,
    handle_append_buffer hydrating from a summary, manual digest
    trigger + backfill, lenient date parser. 522 tests passing
    across memory_tree (370), slack_ingestion (16), and
    composio::providers (136).
  • E2E / integration — N/A. Change is below the JSON-RPC
    boundary; existing JSON-RPC E2E tests for memory_tree continue
    to pass via the new async pipeline (results are
    eventually-consistent after drain_until_idle rather than
    synchronous).
  • Doc comments — module-level rustdoc on jobs/,
    LabelStrategy, NodeRef, build_summary_extractor; inline
    Why: comments on label semantics per tree kind, 30-day
    backfill window rationale, summariser cap rationale, dedupe-key
    composition.
  • Inline comments — grep-friendly stable prefixes
    ([memory_tree::jobs], [topic_tree::backfill],
    [source_tree::bucket_seal], etc.).

Impact

  • Runtime: desktop core only. No Tauri shell or app changes.
  • Migration: additive — new mem_tree_jobs table and
    lifecycle_status column on mem_tree_chunks (default 'admitted'
    so existing rows stay queryable). Idempotent
    add_column_if_missing guard so re-running on an already-migrated
    DB is a no-op.
  • Behavior: ingest returns immediately after persisting chunks;
    summaries materialise asynchronously. Tests asserting summary
    appearance after ingest_chat use the new drain_until_idle
    helper.
  • Performance: ingest hot path drops from "extract + embed + seal +
    topic fan-out" to "fast score + persist + enqueue". Summariser
    output 42% shorter on average (3500 vs 6000 cap) → faster local
    Ollama generations and better quality on small models. Steady-state
    LLM call count is unchanged but bounded by the 3-slot semaphore.
  • Compatibility: only RPC addition is memory_tree.trigger_digest.
    Existing memory_tree.{ingest,list_chunks,get_chunk} unchanged.

Related

  • Closes:
  • Follow-up PR(s)/TODOs:
    • Global-tree cascade → per-level jobs (currently still sync inside
      digest_daily) for parity with source/topic trees.
    • Queue inspection RPCs (list_jobs, queue_stats) — deferred since
      those belong on a generic admin namespace, not memory_tree.

Summary by CodeRabbit

  • New Features

    • Slack quota probe mode to measure rate-limit behavior.
    • Gmail page ingestion into the memory system.
    • Full async memory-tree pipeline: scheduler, worker pool, job queue, and manual digest trigger.
    • Ingest now enqueues async extraction/embedding and tracks chunk lifecycle states.
    • Labeling strategies for sealed summaries and opt-in LLM-emitted topics.
    • Topic-tree backfill window (30 days) and backfill tools.
  • Documentation

    • Added architectural diagram of the async memory-tree pipeline.
  • Chores

    • Updated .gitignore to exclude internal target directory.
  • Tests

    • Updated and added tests for async pipeline, labeling, extraction, and scheduling.

…ct, quantity

The previous LLM extractor schema only emitted person / organization /
location / event / product. That covers classic NER but leaves real
gaps for chat content:

* `datetime`   — "Friday", "Q2 2026", "EOD tomorrow"
                 (deadlines mentioned inside chunks)
* `technology` — "Rust", "OAuth", "Slack API", "nomic-embed"
                 (tools / frameworks / languages / services)
* `artifact`   — "PR tinyhumansai#934", "src/openhuman/...", "OH-42"
                 (code / ticket / doc references)
* `quantity`   — "$5K", "20/min", "10k tokens"
                 (amounts / metrics / money)

Without these kinds, the affected mentions either get dropped or fall
into Misc — both flatten retrieval. This adds them to the EntityKind
enum, the LLM extractor's allowed_kinds + parse_kind synonyms, and the
SYSTEM_PROMPT (with a Kinds guide and an explicit "skip URLs/emails —
those are extracted by regex" instruction so the model doesn't double
up with the mechanical extractors).

The probe-ratelimit / probe-pacing-ms flags from the prior commit
auto-formatted slightly; included here.

Tests: extract suite 36/36, full memory_tree 337/337.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sanil-23 sanil-23 requested a review from a team April 27, 2026 21:59
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 27, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds an async SQLite-backed memory-tree job pipeline with scheduler, workers, and handlers; refactors ingest to enqueue extract jobs and persist chunk lifecycle; introduces LabelStrategy-driven sealing and label resolution changes; adds Gmail ingest and email-cleaning utilities; expands scoring/extraction behavior; exposes RPC trigger and related tests/docs.

Changes

Cohort / File(s) Summary
Job Pipeline
src/openhuman/memory/tree/jobs/mod.rs, src/openhuman/memory/tree/jobs/scheduler.rs, src/openhuman/memory/tree/jobs/store.rs, src/openhuman/memory/tree/jobs/types.rs, src/openhuman/memory/tree/jobs/handlers/mod.rs, src/openhuman/memory/tree/jobs/worker.rs, src/openhuman/memory/tree/jobs/testing.rs
New async job subsystem: scheduler, SQLite-backed queue with dedupe/locks/retries, typed payloads, worker pool with LLM semaphore, handlers (extract/append_buffer/seal/topic_route/digest_daily/flush_stale), and test helpers.
Ingest hot path & store
src/openhuman/memory/tree/ingest.rs, src/openhuman/memory/tree/store.rs
Ingest now fast-scores then persists chunks as PENDING_EXTRACTION, enqueues extract jobs; store schema gains lifecycle_status, mem_tree_jobs table, lifecycle APIs and helpers.
Sealing & Labeling
src/openhuman/memory/tree/source_tree/bucket_seal.rs, src/openhuman/memory/tree/source_tree/*, src/openhuman/memory/tree/source_tree/bucket_seal_tests.rs, src/openhuman/memory/tree/source_tree/flush.rs, src/openhuman/memory/tree/source_tree/mod.rs
Adds LabelStrategy, threads it through append/flush/seal, moves label resolution to strategy/hydrated inputs, adds append_leaf_deferred, and makes follow-up job enqueues atomic within seal transactions.
Handlers & Worker Flow
src/openhuman/memory/tree/jobs/handlers/mod.rs, src/openhuman/memory/tree/jobs/worker.rs
Handlers implement extract → score/embed/decide, append-buffer (transactional buffer upsert), seal (one-level with strategy & follow-ups), topic routing, daily digest, stale flush; worker claims/executes jobs with LLM-bound concurrency control.
Scheduler & RPC
src/openhuman/memory/tree/jobs/scheduler.rs, src/openhuman/memory/tree/rpc.rs, src/openhuman/memory/tree/schemas.rs, src/core/jsonrpc.rs
Wall-clock scheduler for daily jobs, trigger/backfill APIs, new RPC trigger_digest, and startup wiring to launch job subsystem.
Scoring & Extraction
src/openhuman/memory/tree/score/mod.rs, src/openhuman/memory/tree/score/extract/mod.rs, src/openhuman/memory/tree/score/extract/llm.rs, src/openhuman/memory/tree/score/extract/types.rs, src/openhuman/memory/tree/score/extract/llm_tests.rs, src/openhuman/memory/tree/score/store.rs
Adds score_chunks_fast (no LLM), summary-extractor builder with topics enabled, LLM extraction emits topics and new EntityKind variants, and node→entity listing API.
Global digest & sealing changes
src/openhuman/memory/tree/global_tree/seal.rs, src/openhuman/memory/tree/global_tree/digest.rs, src/openhuman/memory/tree/global_tree/digest_tests.rs, src/openhuman/memory/tree/global_tree/recap.rs
Global seal/digest now unions entities/topics from hydrated inputs (deduped) instead of using summariser output; tests updated.
Topic tree & backfill
src/openhuman/memory/tree/topic_tree/backfill.rs, src/openhuman/memory/tree/topic_tree/curator.rs, src/openhuman/memory/tree/topic_tree/routing.rs
Adds 30-day backfill window and deterministic backfill_at API, routing/backfill use LabelStrategy::Empty, tests adjusted for timestamp determinism.
Canonicalize & Gmail ingest
src/openhuman/memory/tree/canonicalize/email_clean.rs, src/openhuman/composio/providers/gmail/ingest.rs
New email cleaning/parsing utilities and Gmail page ingest module that buckets messages into threads, canonicalizes, and calls email ingest; includes unit tests.
Tests & retrieval updates
src/openhuman/memory/tree/retrieval/*, src/openhuman/memory/tree/retrieval/integration_test.rs
Tests updated to pass explicit LabelStrategy::Empty, wait for async job completion via drain_until_idle, and align timestamps for backfill coverage.
Misc: CLI, docs, exports
src/bin/slack_backfill.rs, docs/memory-tree-async-pipeline.excalidraw, .gitignore, src/openhuman/memory/tree/mod.rs
Slack backfill probe flags added, Excalidraw diagram added, .target-codex/ ignored, and jobs submodule exported.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Ingest
    participant JobStore as "JobStore (SQLite)"
    participant WorkerPool as "Worker Pool"
    participant Handler as "JobHandler"
    participant MemTree as "MemoryTree"

    Client->>Ingest: submit page/data
    Ingest->>Ingest: fast-score chunks
    Ingest->>MemTree: persist chunks (PENDING_EXTRACTION)
    Ingest->>JobStore: enqueue ExtractChunk jobs
    Ingest->>WorkerPool: wake_workers()

    WorkerPool->>JobStore: claim_next()
    JobStore-->>WorkerPool: job (running/locked)
    WorkerPool->>Handler: handle ExtractChunk
    Handler->>MemTree: score/embed & persist chunk updates
    Handler->>JobStore: enqueue AppendBuffer / TopicRoute / Seal
    Handler->>JobStore: mark_done()

    WorkerPool->>JobStore: claim_next()
    JobStore-->>WorkerPool: append_buffer job
    WorkerPool->>Handler: handle AppendBuffer
    Handler->>MemTree: hydrate & append L0 (BUFFERED)
    Handler->>JobStore: enqueue Seal
    Handler->>JobStore: mark_done()

    WorkerPool->>JobStore: claim_next()
    JobStore-->>WorkerPool: seal job
    WorkerPool->>Handler: handle Seal
    Handler->>MemTree: seal_one_level(strategy)
    Handler->>JobStore: enqueue follow-ups (topic/digest) as needed
    Handler->>JobStore: mark_done()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~65 minutes

Possibly related PRs

Suggested reviewers

  • senamakel

"🐰 I hop through queues with quick-foot cheer,
Chunks queue, workers hum — async drawing near.
From Gmail threads to sealed-up lore,
Labels unioned, digests soar.
Small paws, big pipelines — hooray, we cheer!"

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title comprehensively covers the main changes: async pipeline, label fixes, summariser split, and startup behavior. It accurately reflects the PR's scope.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@sanil-23 sanil-23 changed the title feat(memory): async pipeline + per-tree label fixes + summariser split feat(memory): async pipeline + per-tree label fixes + summariser split + memory tree enabled at app startup Apr 27, 2026
@sanil-23 sanil-23 force-pushed the feat/memory-tree-async-pipeline branch from 605582f to 2e87c05 Compare April 27, 2026 22:03
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 18

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/openhuman/memory/tree/topic_tree/backfill.rs (1)

51-64: ⚠️ Potential issue | 🟠 Major

Don't resample time in the public wrapper.

The main caller already computes now_ms for the hotness decision, but this wrapper throws that away and calls Utc::now() again. Around the 30-day cutoff, that can create a topic tree from one timestamp and then immediately filter out the leaf that caused the spawn. Thread the caller's now_ms through instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/topic_tree/backfill.rs` around lines 51 - 64, The
public wrapper backfill_topic_tree currently resamples time with
Utc::now().timestamp_millis() before calling backfill_topic_tree_at, which
discards the caller's computed timestamp and can cause racey hotness filtering;
change backfill_topic_tree to accept a now_ms: i64 (or same type used elsewhere)
parameter and pass that straight through to backfill_topic_tree_at instead of
calling Utc::now().timestamp_millis(), and update all callsites to forward their
existing now_ms value to backfill_topic_tree so the same timestamp is used for
both spawn and leaf filtering.
src/openhuman/memory/tree/source_tree/bucket_seal.rs (1)

419-436: ⚠️ Potential issue | 🔴 Critical

Use tree.kind instead of hardcoding TreeKind::Source for persisted node.

Line 417 hardcodes tree_kind: TreeKind::Source, but the function receives trees of any kind (Source, Topic, Global). Topic trees calling this function will have their summaries incorrectly persisted as Source trees, corrupting the tree structure. Replace with tree.kind.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/source_tree/bucket_seal.rs` around lines 419 - 436,
The SummaryNode is being created with a hardcoded tree_kind: TreeKind::Source
which is incorrect for non-source trees; update the constructor for SummaryNode
(the node variable) to set tree_kind to the incoming tree's kind (use tree.kind)
instead of TreeKind::Source so summaries persist with the correct tree kind;
locate the SummaryNode creation in the bucket sealing logic (where node is
built) and replace the hardcoded enum with the tree.kind field.
🧹 Nitpick comments (9)
src/bin/slack_backfill.rs (1)

320-323: Extract shared rate-limit detection instead of duplicating string checks.

This duplicates the same marker logic already used in src/openhuman/composio/providers/slack/provider.rs (execute_with_retry). Please centralize the predicate to avoid drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/bin/slack_backfill.rs` around lines 320 - 323, Extract the repeated
rate-limit string checks into a single shared predicate (e.g., fn
is_rate_limited(msg: &str) -> bool) in a common module used by both Slack
callers; replace the duplicated block in slack_backfill.rs (the if that checks
err.contains("ratelimited") || err.contains("rate_limit") || err.contains("rate
limit")) with a call to is_rate_limited(&err) and update
src/openhuman/composio/providers/slack/provider.rs's execute_with_retry to call
the same helper so both sites use the centralized detection logic.
src/openhuman/memory/tree/canonicalize/email_clean.rs (1)

69-75: Variable naming here diverges from the repository convention.

Several locals use snake_case (e.g., quoted_run_start, quoted_run_len, max_chars, raw) instead of camelCase in this path.

As per coding guidelines: src/openhuman/**/*.rs: Use camelCase for variable names and keep domain mod.rs export-focused, with operational code in ops.rs, store.rs, types.rs, etc.

Also applies to: 117-123, 154-156, 221-224

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/canonicalize/email_clean.rs` around lines 69 - 75,
The local variable names in the email canonicalization function use snake_case
(e.g., quoted_run_start, quoted_run_len, max_chars, raw) which violates the
repo's camelCase convention for this module; rename these locals to camelCase
(e.g., quotedRunStart, quotedRunLen, maxChars, rawStr or rawData) and update
every reference inside the function (including the loop over s.split_inclusive
and all uses at the other noted locations around lines 117-123, 154-156,
221-224) to the new identifiers so compilation succeeds and the naming matches
the style guide; ensure any Option/usize/u32 types remain unchanged and tests
still pass.
src/openhuman/memory/tree/global_tree/seal.rs (1)

191-210: Consider extracting shared union helper.

The entity/topic union logic (lines 191-210) is duplicated verbatim in digest.rs (lines 165-176) and mirrors resolve_labels's UnionFromChildren branch in bucket_seal.rs. A small helper function could reduce duplication.

♻️ Optional: Extract shared helper
// Could live in source_tree/summariser/mod.rs or a shared utils module
pub fn union_labels_from_inputs(inputs: &[SummaryInput]) -> (Vec<String>, Vec<String>) {
    let mut entities_set: BTreeSet<String> = BTreeSet::new();
    let mut topics_set: BTreeSet<String> = BTreeSet::new();
    for inp in inputs {
        for e in &inp.entities {
            entities_set.insert(e.clone());
        }
        for t in &inp.topics {
            topics_set.insert(t.clone());
        }
    }
    (
        entities_set.into_iter().collect(),
        topics_set.into_iter().collect(),
    )
}

Also applies to: 233-234

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/global_tree/seal.rs` around lines 191 - 210,
Extract the duplicated entity/topic union logic into a shared helper (e.g., pub
fn union_labels_from_inputs(inputs: &[SummaryInput]) -> (Vec<String>,
Vec<String>)) and replace the verbatim loops in global_tree/seal.rs (the
node_entities/node_topics construction), digest.rs, and the UnionFromChildren
branch in bucket_seal.rs/resolve_labels with calls to that helper; ensure the
helper uses BTreeSet like the original to preserve ordering and returns
(entities, topics) as Vec<String>.
src/openhuman/memory/tree/topic_tree/curator.rs (1)

189-194: Test determinism can be tightened by using a single base timestamp.

Line 193 samples Utc::now() per call, so ordering is still time-of-execution dependent. Consider passing a base_now_ms from each test and deriving ts_ms from that for fully reproducible ordering.

♻️ Suggested tweak
-fn seed_leaf_for_entity(cfg: &Config, entity_id: &str, source_tree: &str, seq: u32) {
-    let ts_ms = Utc::now().timestamp_millis() - (seq as i64) * 1_000;
+fn seed_leaf_for_entity(
+    cfg: &Config,
+    entity_id: &str,
+    source_tree: &str,
+    seq: u32,
+    base_now_ms: i64,
+) {
+    let ts_ms = base_now_ms - (seq as i64) * 1_000;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/topic_tree/curator.rs` around lines 189 - 194, The
current seeding logic calls Utc::now() per iteration which makes test ordering
time-dependent; change it to accept a single base timestamp (e.g., base_now_ms
or base_now) from the caller/test and compute ts_ms = base_now_ms - (seq as i64)
* 1_000, then construct ts via Utc.timestamp_millis_opt(ts_ms).unwrap() (replace
the Utc::now() call near the ts_ms/ts computation); update the function
signature(s) that call this code to pass the deterministic base timestamp so
tests can control ordering.
src/openhuman/memory/tree/jobs/testing.rs (1)

8-14: Add a safety bound to prevent infinite test hangs.

Line 9 runs an unbounded loop. If a bug keeps producing immediately claimable jobs, CI can hang indefinitely. Add a max-iteration guard and fail fast with context.

🛠️ Suggested hardening
-use anyhow::Result;
+use anyhow::{bail, Result};
@@
 pub async fn drain_until_idle(config: &Config) -> Result<()> {
+    const MAX_DRAIN_ITERS: usize = 10_000;
+    let mut iters = 0usize;
     loop {
+        iters += 1;
+        if iters > MAX_DRAIN_ITERS {
+            bail!("drain_until_idle exceeded {} iterations", MAX_DRAIN_ITERS);
+        }
         if !super::worker::run_once(config).await? {
             break;
         }
     }
     Ok(())
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/jobs/testing.rs` around lines 8 - 14, The loop in
drain_until_idle repeatedly calls super::worker::run_once(config) with no bound,
which can hang tests; add a max-iteration guard (e.g., a constant like
MAX_DRAIN_ITERATIONS) and increment a counter each loop, returning an Err with
contextual message (including the count and perhaps Config info) if the limit is
exceeded; keep the existing behavior of breaking when run_once returns false and
ensure the function still returns Ok(()) on normal completion.
src/openhuman/memory/tree/ingest.rs (1)

109-110: Avoid constructing the LLM extractor on the fast ingest path.

Line 109 still goes through ScoringConfig::from_config, but Line 110 immediately calls score_chunks_fast, which nulls out llm_extractor. That keeps the hot path paying the builder/fallback/logging cost for an extractor it can never use. A regex-only constructor or without_llm() helper would keep ingest closer to the PR objective here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/ingest.rs` around lines 109 - 110,
ScoringConfig::from_config constructs the full LLM extractor even though
score::score_chunks_fast immediately nulls llm_extractor; change the ingest hot
path to use a regex-only variant (e.g., add ScoringConfig::without_llm or
ScoringConfig::from_config_regex_only) and call that here instead of
ScoringConfig::from_config so the builder/fallback/logging for llm_extractor is
skipped; update any callers or tests that expect the full config, and ensure the
new constructor preserves all regex/scoring fields but leaves llm_extractor as
None.
src/openhuman/memory/tree/score/extract/mod.rs (1)

21-89: Move build_summary_extractor out of mod.rs.

This is new operational construction logic, not export wiring. Please put it in a dedicated module (ops.rs, builder.rs, etc.) and re-export it from mod.rs so this file stays registry/export-focused. As per coding guidelines, "keep domain mod.rs export-focused, with operational code in ops.rs, store.rs, types.rs, etc."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/score/extract/mod.rs` around lines 21 - 89, Move
the operational builder function build_summary_extractor out of mod.rs into a
new module file (e.g., ops.rs or builder.rs) and re-export it from mod.rs;
specifically, cut the entire build_summary_extractor implementation and place it
in the new module, add the required use/imports there (Arc, EntityExtractor,
Config, LlmExtractorConfig, LlmEntityExtractor, CompositeExtractor,
RegexEntityExtractor, log, std::time::Duration), and keep the function signature
and behavior unchanged, then in mod.rs add a pub use
...::build_summary_extractor; (or pub mod ops; pub use
ops::build_summary_extractor;) so mod.rs remains export-focused while the
operational construction lives in the new ops/builder module.
src/openhuman/memory/tree/jobs/scheduler.rs (1)

15-28: Background scheduler loop lacks graceful shutdown.

The spawned task loops forever with no mechanism to signal termination. If the application needs to shut down cleanly (e.g., during hot reload or graceful termination), this task will continue running until process exit.

Consider adding a cancellation token or watch channel that the loop checks alongside tokio::time::sleep.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/jobs/scheduler.rs` around lines 15 - 28, The
scheduler started by start(...) spawns a task that loops forever (STARTED,
enqueue_daily_jobs, next_sleep_duration) with no shutdown path; modify start to
take or create a cancellation/watch token (e.g., a tokio::sync::watch::Receiver
or a CancellationToken) and have the spawned task use tokio::select! to await
either the sleep future or a shutdown signal, calling enqueue_daily_jobs inside
the loop only while not cancelled and exiting the task when the token is
triggered; keep STARTED semantics but pass/clone the shutdown receiver into the
async move so the loop can break cleanly on shutdown.
src/openhuman/memory/tree/source_tree/bucket_seal.rs (1)

103-118: Redundant .sort() calls after BTreeSet collection.

BTreeSet::into_iter() already yields elements in sorted order. The explicit .sort() calls on lines 109 and 117 are no-ops.

♻️ Proposed simplification
             let mut entities: Vec<String> = canonical
                 .into_iter()
                 .map(|c| c.canonical_id)
                 .collect::<BTreeSet<_>>()
                 .into_iter()
                 .collect();
-            entities.sort();
             let mut topics: Vec<String> = extracted
                 .topics
                 .into_iter()
                 .map(|t| t.label)
                 .collect::<BTreeSet<_>>()
                 .into_iter()
                 .collect();
-            topics.sort();
             Ok((entities, topics))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/source_tree/bucket_seal.rs` around lines 103 - 118,
The code collects into a BTreeSet which already yields sorted elements, so
remove the redundant entities.sort() and topics.sort() calls; keep the existing
transformation that maps to canonical_id/label, collects into BTreeSet, then
converts to Vec<String> (e.g., via
.collect::<BTreeSet<_>>().into_iter().collect::<Vec<_>>()) so entities and
topics remain Vec<String> but without the no-op .sort() calls (refer to the
variables entities and topics in bucket_seal.rs).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/memory-tree-async-pipeline.excalidraw`:
- Around line 722-729: Update the scheduler diagram to show an explicit
manual/recovery branch: add a "manual trigger / backfill" node that points into
the same mem_tree_jobs pipeline and label its actions as
memory_tree.trigger_digest(yesterday|range) and
jobs::backfill_missing_digests(range), alongside the existing UTC daily tick ->
enqueue digest_daily(yesterday) + flush_stale(today) path; ensure the manual
node sits in the scheduler/background area and also add the same branch at the
other referenced location (around the block corresponding to lines 1317-1324) so
operators can see both automatic and on-demand digest flows.

In `@src/bin/slack_backfill.rs`:
- Around line 106-107: The probe_ratelimit Arg (probe_ratelimit: Option<u32>)
must be validated to a sane non-zero/max-capped range before being used to
allocate memory (avoid Vec::with_capacity(n as usize) blowing up); when
parsing/handling probe_ratelimit (where the CLI Arg is read and used) reject or
normalize values == 0 and excessively large values (e.g. cap at a defined
MAX_PROBE_RATE or return a user-facing error), and convert using a checked cast
(e.g. usize::try_from or explicit bounds check) to prevent overflow when calling
Vec::with_capacity; update the code paths that use probe_ratelimit (the
probe_ratelimit variable and the Vec::with_capacity call) to use the
validated/normalized value.
- Around line 261-291: The probe-ratelimit path (triggered by
cli.probe_ratelimit) runs before any connection filtering and therefore ignores
--connection; fix by adding a guard at the start of that probe block to detect
if cli.connection_id (or the connection flag variable) is set and return an
error (or bail) with a clear message that --probe-ratelimit cannot be used with
--connection, or alternatively move the probe logic to run after the existing
connection-selection logic so the probe is scoped to the selected connection;
update the block that builds list_resp / channel_id (the probe code using
client.execute_tool and list_resp) to respect this guard.
- Around line 324-327: The log currently serializes and prints the entire
provider payload via serde_json::to_string(&r.data) in the log::warn! inside the
rate-limit branch; instead avoid exposing raw message content by redacting
sensitive fields (e.g. remove "text", "blocks", "attachments", "message" keys)
or only logging non-sensitive identifiers (e.g. timestamp/ts, channel id, event
type) or a hashed/truncated fingerprint; implement this by building a small
redacted map/object from r.data (or cloning and removing those keys) and
serializing that for the log::warn! call (or log a concise message like
channel/ts/hash), leaving the log::warn! invocation and variables i and r.data
as the context to locate and replace the serialization.

In `@src/openhuman/composio/providers/gmail/ingest.rs`:
- Around line 187-212: The logs currently print the full sender PII via the
sender variable in the info and warn calls; before logging compute a redacted
identifier (e.g., mask the local-part or compute a short hash such as SHA-256
hex/truncated) into a new variable like redacted_sender and use that in both the
log::info and log::warn messages (keep thread_id unchanged for correlation);
update the code around the EmailThread creation and the ingest_email error
branch to log redacted_sender instead of sender so full email addresses are
never emitted.

In `@src/openhuman/memory/tree/canonicalize/email_clean.rs`:
- Around line 164-179: The documentation for md_escape says '#' should be
escaped but the match in function md_escape omits '#' so header markers are not
escaped; update the match arm in md_escape (and the duplicate implementation
around the other occurrence noted) to include '#' in the list of characters to
escape (i.e., treat '#' like '\\', '`', '*', '_', '|'), and adjust/add unit
tests to cover escaping of '#' to ensure header-like content is safely escaped.
- Around line 151-161: The truncate_body function (and the duplicate logic at
the other occurrence) currently takes max_chars characters then appends an
ellipsis, producing max_chars+1 length; update truncate_body so that after
trimming you check if trimmed.chars().count() <= max_chars return
trimmed.to_string(), otherwise if max_chars == 0 return "…".to_string() or
decide on behavior, but for normal positive max_chars collect only max_chars-1
characters (trimmed.chars().take(max_chars.saturating_sub(1))) and then
push('…') so the final string is at most max_chars characters; apply the same
change to the other similar truncation block (lines referenced in the review) to
keep behavior consistent.

In `@src/openhuman/memory/tree/global_tree/digest_tests.rs`:
- Around line 283-303: Reformat the helper block containing the
index_entity(...) and append_leaf(...) calls with rustfmt so the code matches
project style; run cargo fmt or rustfmt on the file and ensure the calls to
index_entity and append_leaf (and the surrounding struct literal for LeafRef)
are properly line-wrapped/indented to satisfy cargo fmt --check, then commit the
updated formatting.
- Around line 265-284: The test is seeding entity index rows with Some(scope),
but production treats that field as the source tree id so tests become
misleading; update the call to index_entity so it passes the actual source-tree
id from the chunk (use the chunk's id instead of scope) when seeding (e.g.,
replace the Some(scope) argument with the chunk's id representation used
elsewhere), keeping the rest of the CanonicalEntity/index_entity usage
unchanged.

In `@src/openhuman/memory/tree/ingest.rs`:
- Around line 229-237: The assertion block is not formatted to rustfmt standards
and causes CI failure; reformat this block (or run cargo fmt) so the assertions
are properly line-broken and aligned — specifically adjust the calls to
lookup_entity(...).unwrap().len(), list_chunks(&cfg,
&ListChunksQuery::default()).unwrap(), and get_chunk_embedding(&cfg,
&out.chunk_ids[0]).unwrap().is_some() so they follow rustfmt style (preserve the
existing checks using lookup_entity, ListChunksQuery, list_chunks, and
get_chunk_embedding) and ensure trailing commas/line breaks match rustfmt
output.

In `@src/openhuman/memory/tree/jobs/handlers/mod.rs`:
- Around line 172-212: The code updates chunk lifecycle
(chunk_store::set_chunk_lifecycle_status) before ensuring a seal job is
persisted, which can permanently leave chunks buffered if enqueue
(store::enqueue with NewJob::seal) fails; change the flow so that after
append_leaf_deferred returns true you enqueue the seal first (call
store::enqueue/NewJob::seal and wake workers on success) and only then update
the chunk lifecycle in chunk_store::set_chunk_lifecycle_status; adjust both
AppendTarget::Source and AppendTarget::Topic paths so lifecycle transitions
happen after a successful enqueue (or are done inside a transactional/atomic
commit that includes the enqueue) and ensure append_buffer/no-op re-append cases
still result in a pending seal when needed.

In `@src/openhuman/memory/tree/jobs/scheduler.rs`:
- Around line 117-129: The CI formatting error is due to the chained call to
Utc.with_ymd_and_hms(...).single().unwrap_or_else(...) in the scheduler code
(variables: now, tomorrow, next and the method with_ymd_and_hms/unwrap_or_else);
fix by running rustfmt (cargo fmt) to reformat the chain or manually reformat
that call chain to match rustfmt style (preserve the same logic: compute
tomorrow, call Utc.with_ymd_and_hms(...).single().unwrap_or_else(|| now +
ChronoDuration::hours(24))) so the formatting matches CI expectations.
- Around line 5-6: CI flagged Rust formatting on the chrono use statement; run
rustfmt (cargo fmt) and apply the standard formatting to the import line(s) in
src/openhuman/memory/tree/jobs/scheduler.rs so the use declaration for chrono
({Datelike, Duration as ChronoDuration, NaiveDate, TimeZone, Utc}) is formatted
to match project style (split into multiple lines or reordered according to
rustfmt) — update the use statement where it's declared so the file passes cargo
fmt.

In `@src/openhuman/memory/tree/jobs/types.rs`:
- Around line 103-105: CI reports a rustfmt formatting mismatch in the
JobStatus::is_terminal function; run rustfmt (cargo fmt) or reformat the
function so the matches! macro conforms to rustfmt style (update the is_terminal
implementation in types.rs to the formatted version) and re-run the formatter to
ensure the matches!(self, JobStatus::Done | JobStatus::Failed |
JobStatus::Cancelled) line matches CI expectations.

In `@src/openhuman/memory/tree/rpc.rs`:
- Around line 222-240: The RPC currently calls jobs::trigger_digest and returns
enqueued=true even if a digest has already been materialized; make the RPC
idempotent by date by checking for an existing digest before spawning the
blocking enqueue. Locate the block around jobs::trigger_digest (where job_id is
created and TriggerDigestResponse is built) and add a pre-check that queries the
persistence layer for an existing digest for the given date (using the existing
storage/config/db API available in this module or via config.clone()), and if a
digest exists return RpcOutcome::single_log with enqueued=false and job_id=None
(keeping date_iso), otherwise proceed to spawn_blocking to call
jobs::trigger_digest as now. Ensure the new check runs in the async context (or
on a blocking thread if it touches SQLite) and preserve the existing error
handling and logging format.

In `@src/openhuman/memory/tree/score/extract/llm.rs`:
- Around line 436-451: The parser currently always maps self.topics into
ExtractedTopic instances (using ExtractedTopic { label, score: 0.85 }) even when
the caller disabled topics; change the logic in the response parsing block that
processes self.topics to first check the emit_topics flag (e.g.,
self.emit_topics or the struct field that indicates the caller opted-in) and if
it's false return an empty Vec for topics instead of mapping any returned
values; update the code around the topics mapping that references self.topics
and ExtractedTopic so topics are only populated when the emit_topics flag is
true.

In `@src/openhuman/memory/tree/score/extract/mod.rs`:
- Around line 67-75: The info log inside the LlmEntityExtractor::new(cfg) match
arm currently logs the full endpoint (variable endpoint) which may contain
embedded credentials; update the log in the match arm around
LlmEntityExtractor::new to redact sensitive parts by parsing the endpoint URL
and logging only a safe summary (e.g., scheme + host and optionally path without
userinfo or query) instead of the raw endpoint string; locate the logging call
inside the match where log::info! is invoked and replace the endpoint argument
with the redacted summary returned from a small helper (or inline parsing) that
strips userinfo and query components.

---

Outside diff comments:
In `@src/openhuman/memory/tree/source_tree/bucket_seal.rs`:
- Around line 419-436: The SummaryNode is being created with a hardcoded
tree_kind: TreeKind::Source which is incorrect for non-source trees; update the
constructor for SummaryNode (the node variable) to set tree_kind to the incoming
tree's kind (use tree.kind) instead of TreeKind::Source so summaries persist
with the correct tree kind; locate the SummaryNode creation in the bucket
sealing logic (where node is built) and replace the hardcoded enum with the
tree.kind field.

In `@src/openhuman/memory/tree/topic_tree/backfill.rs`:
- Around line 51-64: The public wrapper backfill_topic_tree currently resamples
time with Utc::now().timestamp_millis() before calling backfill_topic_tree_at,
which discards the caller's computed timestamp and can cause racey hotness
filtering; change backfill_topic_tree to accept a now_ms: i64 (or same type used
elsewhere) parameter and pass that straight through to backfill_topic_tree_at
instead of calling Utc::now().timestamp_millis(), and update all callsites to
forward their existing now_ms value to backfill_topic_tree so the same timestamp
is used for both spawn and leaf filtering.

---

Nitpick comments:
In `@src/bin/slack_backfill.rs`:
- Around line 320-323: Extract the repeated rate-limit string checks into a
single shared predicate (e.g., fn is_rate_limited(msg: &str) -> bool) in a
common module used by both Slack callers; replace the duplicated block in
slack_backfill.rs (the if that checks err.contains("ratelimited") ||
err.contains("rate_limit") || err.contains("rate limit")) with a call to
is_rate_limited(&err) and update
src/openhuman/composio/providers/slack/provider.rs's execute_with_retry to call
the same helper so both sites use the centralized detection logic.

In `@src/openhuman/memory/tree/canonicalize/email_clean.rs`:
- Around line 69-75: The local variable names in the email canonicalization
function use snake_case (e.g., quoted_run_start, quoted_run_len, max_chars, raw)
which violates the repo's camelCase convention for this module; rename these
locals to camelCase (e.g., quotedRunStart, quotedRunLen, maxChars, rawStr or
rawData) and update every reference inside the function (including the loop over
s.split_inclusive and all uses at the other noted locations around lines
117-123, 154-156, 221-224) to the new identifiers so compilation succeeds and
the naming matches the style guide; ensure any Option/usize/u32 types remain
unchanged and tests still pass.

In `@src/openhuman/memory/tree/global_tree/seal.rs`:
- Around line 191-210: Extract the duplicated entity/topic union logic into a
shared helper (e.g., pub fn union_labels_from_inputs(inputs: &[SummaryInput]) ->
(Vec<String>, Vec<String>)) and replace the verbatim loops in
global_tree/seal.rs (the node_entities/node_topics construction), digest.rs, and
the UnionFromChildren branch in bucket_seal.rs/resolve_labels with calls to that
helper; ensure the helper uses BTreeSet like the original to preserve ordering
and returns (entities, topics) as Vec<String>.

In `@src/openhuman/memory/tree/ingest.rs`:
- Around line 109-110: ScoringConfig::from_config constructs the full LLM
extractor even though score::score_chunks_fast immediately nulls llm_extractor;
change the ingest hot path to use a regex-only variant (e.g., add
ScoringConfig::without_llm or ScoringConfig::from_config_regex_only) and call
that here instead of ScoringConfig::from_config so the builder/fallback/logging
for llm_extractor is skipped; update any callers or tests that expect the full
config, and ensure the new constructor preserves all regex/scoring fields but
leaves llm_extractor as None.

In `@src/openhuman/memory/tree/jobs/scheduler.rs`:
- Around line 15-28: The scheduler started by start(...) spawns a task that
loops forever (STARTED, enqueue_daily_jobs, next_sleep_duration) with no
shutdown path; modify start to take or create a cancellation/watch token (e.g.,
a tokio::sync::watch::Receiver or a CancellationToken) and have the spawned task
use tokio::select! to await either the sleep future or a shutdown signal,
calling enqueue_daily_jobs inside the loop only while not cancelled and exiting
the task when the token is triggered; keep STARTED semantics but pass/clone the
shutdown receiver into the async move so the loop can break cleanly on shutdown.

In `@src/openhuman/memory/tree/jobs/testing.rs`:
- Around line 8-14: The loop in drain_until_idle repeatedly calls
super::worker::run_once(config) with no bound, which can hang tests; add a
max-iteration guard (e.g., a constant like MAX_DRAIN_ITERATIONS) and increment a
counter each loop, returning an Err with contextual message (including the count
and perhaps Config info) if the limit is exceeded; keep the existing behavior of
breaking when run_once returns false and ensure the function still returns
Ok(()) on normal completion.

In `@src/openhuman/memory/tree/score/extract/mod.rs`:
- Around line 21-89: Move the operational builder function
build_summary_extractor out of mod.rs into a new module file (e.g., ops.rs or
builder.rs) and re-export it from mod.rs; specifically, cut the entire
build_summary_extractor implementation and place it in the new module, add the
required use/imports there (Arc, EntityExtractor, Config, LlmExtractorConfig,
LlmEntityExtractor, CompositeExtractor, RegexEntityExtractor, log,
std::time::Duration), and keep the function signature and behavior unchanged,
then in mod.rs add a pub use ...::build_summary_extractor; (or pub mod ops; pub
use ops::build_summary_extractor;) so mod.rs remains export-focused while the
operational construction lives in the new ops/builder module.

In `@src/openhuman/memory/tree/source_tree/bucket_seal.rs`:
- Around line 103-118: The code collects into a BTreeSet which already yields
sorted elements, so remove the redundant entities.sort() and topics.sort()
calls; keep the existing transformation that maps to canonical_id/label,
collects into BTreeSet, then converts to Vec<String> (e.g., via
.collect::<BTreeSet<_>>().into_iter().collect::<Vec<_>>()) so entities and
topics remain Vec<String> but without the no-op .sort() calls (refer to the
variables entities and topics in bucket_seal.rs).

In `@src/openhuman/memory/tree/topic_tree/curator.rs`:
- Around line 189-194: The current seeding logic calls Utc::now() per iteration
which makes test ordering time-dependent; change it to accept a single base
timestamp (e.g., base_now_ms or base_now) from the caller/test and compute ts_ms
= base_now_ms - (seq as i64) * 1_000, then construct ts via
Utc.timestamp_millis_opt(ts_ms).unwrap() (replace the Utc::now() call near the
ts_ms/ts computation); update the function signature(s) that call this code to
pass the deterministic base timestamp so tests can control ordering.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f7609a11-0f42-4990-bf43-8a5360316b03

📥 Commits

Reviewing files that changed from the base of the PR and between f9c15bd and 605582f.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (40)
  • .gitignore
  • docs/memory-tree-async-pipeline.excalidraw
  • src/bin/slack_backfill.rs
  • src/core/jsonrpc.rs
  • src/openhuman/composio/providers/gmail/ingest.rs
  • src/openhuman/memory/tree/canonicalize/email_clean.rs
  • src/openhuman/memory/tree/global_tree/digest.rs
  • src/openhuman/memory/tree/global_tree/digest_tests.rs
  • src/openhuman/memory/tree/global_tree/recap.rs
  • src/openhuman/memory/tree/global_tree/seal.rs
  • src/openhuman/memory/tree/ingest.rs
  • src/openhuman/memory/tree/jobs/handlers/mod.rs
  • src/openhuman/memory/tree/jobs/mod.rs
  • src/openhuman/memory/tree/jobs/scheduler.rs
  • src/openhuman/memory/tree/jobs/store.rs
  • src/openhuman/memory/tree/jobs/testing.rs
  • src/openhuman/memory/tree/jobs/types.rs
  • src/openhuman/memory/tree/jobs/worker.rs
  • src/openhuman/memory/tree/mod.rs
  • src/openhuman/memory/tree/retrieval/drill_down.rs
  • src/openhuman/memory/tree/retrieval/global.rs
  • src/openhuman/memory/tree/retrieval/integration_test.rs
  • src/openhuman/memory/tree/retrieval/source.rs
  • src/openhuman/memory/tree/rpc.rs
  • src/openhuman/memory/tree/schemas.rs
  • src/openhuman/memory/tree/score/extract/llm.rs
  • src/openhuman/memory/tree/score/extract/llm_tests.rs
  • src/openhuman/memory/tree/score/extract/mod.rs
  • src/openhuman/memory/tree/score/extract/types.rs
  • src/openhuman/memory/tree/score/mod.rs
  • src/openhuman/memory/tree/score/store.rs
  • src/openhuman/memory/tree/source_tree/bucket_seal.rs
  • src/openhuman/memory/tree/source_tree/bucket_seal_tests.rs
  • src/openhuman/memory/tree/source_tree/flush.rs
  • src/openhuman/memory/tree/source_tree/mod.rs
  • src/openhuman/memory/tree/source_tree/summariser/llm.rs
  • src/openhuman/memory/tree/store.rs
  • src/openhuman/memory/tree/topic_tree/backfill.rs
  • src/openhuman/memory/tree/topic_tree/curator.rs
  • src/openhuman/memory/tree/topic_tree/routing.rs

Comment thread docs/memory-tree-async-pipeline.excalidraw
Comment thread src/bin/slack_backfill.rs
Comment thread src/bin/slack_backfill.rs
Comment thread src/bin/slack_backfill.rs
Comment thread src/openhuman/composio/providers/gmail/ingest.rs
Comment thread src/openhuman/memory/tree/jobs/scheduler.rs
Comment thread src/openhuman/memory/tree/jobs/types.rs
Comment thread src/openhuman/memory/tree/rpc.rs
Comment thread src/openhuman/memory/tree/score/extract/llm.rs
Comment thread src/openhuman/memory/tree/score/extract/mod.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/openhuman/memory/tree/source_tree/summariser/llm.rs (1)

391-395: ⚠️ Potential issue | 🟠 Major

Make summary required in the LLM payload.

#[serde(default)] turns {} or any legacy/wrong-shape JSON into a successful parse with summary = "", so this path emits an empty sealed summary instead of taking the inert fallback. That is silent data loss, not a soft-fail.

Suggested fix
 #[derive(Debug, Deserialize)]
 struct LlmSummaryOutput {
-    #[serde(default)]
     summary: String,
 }
         let parsed: LlmSummaryOutput = match serde_json::from_str(&envelope.message.content) {
             Ok(v) => v,
             Err(e) => {
                 log::warn!(
                     "[source_tree::summariser::llm] model returned non-JSON or wrong-shape \
                      body: {e}; content was: {} — falling back to inert",
                     truncate_for_log(&envelope.message.content, 400)
                 );
                 return self.fallback.summarise(inputs, ctx).await;
             }
         };
+        if parsed.summary.trim().is_empty() {
+            log::warn!(
+                "[source_tree::summariser::llm] model returned empty summary body — \
+                 falling back to inert for tree_id={} level={}",
+                ctx.tree_id,
+                ctx.target_level
+            );
+            return self.fallback.summarise(inputs, ctx).await;
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/source_tree/summariser/llm.rs` around lines 391 -
395, The LlmSummaryOutput struct currently uses #[serde(default)] on the summary
field which silently accepts missing or malformed payloads as summary = "",
causing silent data loss; remove the #[serde(default)] attribute from the
summary field in struct LlmSummaryOutput so deserialization fails if the LLM
payload lacks a summary, and update any calling code that assumed an empty
string fallback to handle the deserialization error or provide an explicit
fallback path instead.
♻️ Duplicate comments (2)
src/openhuman/memory/tree/score/extract/llm.rs (1)

436-451: ⚠️ Potential issue | 🟠 Major

Honor emit_topics in the response parser.

This still maps any returned topics array into ExtractedEntities even when the caller explicitly disabled topics. A model that emits extra keys anyway can inject topic labels into flows that opted out.

Suggested fix
-        let topics = self
-            .topics
-            .into_iter()
-            .filter_map(|raw| {
-                let label = raw.trim().to_string();
-                if label.is_empty() {
-                    None
-                } else {
-                    Some(ExtractedTopic { label, score: 0.85 })
-                }
-            })
-            .collect();
+        let topics = if cfg.emit_topics {
+            self.topics
+                .into_iter()
+                .filter_map(|raw| {
+                    let label = raw.trim().to_string();
+                    if label.is_empty() {
+                        None
+                    } else {
+                        Some(ExtractedTopic { label, score: 0.85 })
+                    }
+                })
+                .collect()
+        } else {
+            Vec::new()
+        };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/score/extract/llm.rs` around lines 436 - 451, The
parser currently maps self.topics into ExtractedTopic regardless of whether
topics were requested; change the logic in the response parsing section that
builds topics (the block using self.topics and producing ExtractedTopic { label,
score: 0.85 }) to first check the emit_topics flag (e.g. self.emit_topics or the
appropriate request flag) and only perform the filter_map when emit_topics is
true, otherwise produce an empty list so any model-emitted topics are ignored
when the caller disabled emit_topics.
src/openhuman/memory/tree/rpc.rs (1)

227-246: ⚠️ Potential issue | 🟠 Major

Check for an existing digest before enqueueing.

This RPC is only dedupe-idempotent while a job is still ready/running. Once that day’s digest has already been materialized, calling this again can still enqueue a fresh no-op job and return enqueued=true, which doesn’t match the documented “idempotent by date” contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/rpc.rs` around lines 227 - 246, The RPC currently
calls jobs::trigger_digest(...) which may enqueue a no-op job even after that
day's digest is already materialized, breaking the “idempotent by date”
contract; add a pre-check before spawning the blocking task to detect an
already-materialized digest (e.g. call a new or existing jobs::digest_exists /
jobs::is_digest_materialized function with the same config and date) and if it
returns true return RpcOutcome::single_log with TriggerDigestResponse {
enqueued: false, job_id: None, date_iso } and an appropriate log message instead
of calling jobs::trigger_digest; alternatively, change jobs::trigger_digest to
return a distinct enum (e.g. SkippedAlreadyMaterialized) and handle that case
here so enqueued only becomes true when a real job was created.
🧹 Nitpick comments (1)
src/openhuman/memory/tree/jobs/types.rs (1)

215-227: FlushStalePayload::dedupe_key signature is asymmetric with other payloads.

Unlike other payload types where dedupe_key() derives the key solely from the payload's own fields, FlushStalePayload::dedupe_key(&self, date_iso: &str) requires an external date_iso parameter. This creates a risk of inconsistent dedupe keys if different callers pass different dates for the same payload.

Consider either:

  1. Adding date_iso: String as a field on FlushStalePayload
  2. Documenting clearly that the scheduler is the single caller responsible for providing a consistent date
♻️ Option 1: Add date_iso to the payload
 #[derive(Clone, Debug, Serialize, Deserialize, Default)]
 pub struct FlushStalePayload {
     /// Override the configured `DEFAULT_FLUSH_AGE_SECS`. Optional so the
     /// scheduler can enqueue with `None` and let the handler use the
     /// configured default.
     pub max_age_secs: Option<i64>,
+    /// UTC calendar date for dedupe key generation.
+    pub date_iso: String,
 }

 impl FlushStalePayload {
-    pub fn dedupe_key(&self, date_iso: &str) -> String {
-        format!("flush_stale:{date_iso}")
+    pub fn dedupe_key(&self) -> String {
+        format!("flush_stale:{}", self.date_iso)
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/jobs/types.rs` around lines 215 - 227, The dedupe
key is asymmetric because FlushStalePayload::dedupe_key currently takes an
external date_iso; change the payload to own the date by adding a new field pub
date_iso: String to FlushStalePayload, remove the dedupe_key(&self, date_iso:
&str) signature and make it dedupe_key(&self) -> String that returns
format!("flush_stale:{}", self.date_iso); then update all places that construct
FlushStalePayload (the scheduler/enqueue sites) to populate date_iso and any
tests to reflect the new field and signature so callers no longer provide a
separate date string.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/openhuman/memory/tree/ingest.rs`:
- Around line 132-146: The loop currently unconditionally sets every upserted
chunk to store::CHUNK_STATUS_PENDING_EXTRACTION and enqueues an ExtractChunk job
(NewJob::extract_chunk + jobs::enqueue_tx), which requeues chunks that have
already progressed; instead, read each chunk's current lifecycle before updating
(e.g., via a lookup/get in store) and only call
store::set_chunk_lifecycle_status_tx to PENDING and enqueue the ExtractChunk job
for chunks whose existing status is not one of the terminal/progressed states
(compare against store::CHUNK_STATUS_ADMITTED, CHUNK_STATUS_BUFFERED,
store::CHUNK_STATUS_SEALED); skip updating/enqueuing for chunks already
admitted/buffered/sealed so ingest replays are idempotent.

In `@src/openhuman/memory/tree/jobs/worker.rs`:
- Around line 26-60: The worker startup is rebuilding Config::default() inside
each worker loop which drops real settings; instead construct the real Config
once (the cfg you set before recover_stale_locks) and pass/clone that into each
spawned task (e.g., wrap cfg in Arc or implement Clone) so
run_once_with_semaphore gets the actual configured instance; remove the inner
Config::default() in the for loop and use the outer cfg when calling
run_once_with_semaphore and when spawning workers (symbols: STARTED,
WORKER_NOTIFY, WORKER_COUNT, cfg, recover_stale_locks, run_once_with_semaphore).

In `@src/openhuman/memory/tree/source_tree/bucket_seal.rs`:
- Around line 318-324: The sealing code in seal_one_level (and the other sealing
paths around the later ranges) incorrectly hard-codes SummaryContext.tree_kind
and persisted SummaryNode.tree_kind to TreeKind::Source; update those usages to
use the actual tree kind from the Tree being sealed (e.g., use tree.kind() or
tree.kind) so SummaryContext.tree_kind and the SummaryNode.tree_kind value
derive from the provided tree instead of always TreeKind::Source, and apply the
same change consistently in the other sealing branches referenced in this diff.
- Around line 588-595: The call to list_entity_ids_for_node is currently using
unwrap_or_default(), which silences DB/read errors and can permanently drop
entity labels; change this to propagate or handle the error instead of
defaulting: replace list_entity_ids_for_node(config, id).unwrap_or_default()
with a fallible call (e.g., let entities = list_entity_ids_for_node(config, id)?
) and adjust the surrounding function (the seal/commit function that builds
SummaryInput) to return a Result so failures abort the seal (or implement
explicit retry/log-and-return Err) so entity lookup errors are not treated as
“no entities” when constructing SummaryInput.

---

Outside diff comments:
In `@src/openhuman/memory/tree/source_tree/summariser/llm.rs`:
- Around line 391-395: The LlmSummaryOutput struct currently uses
#[serde(default)] on the summary field which silently accepts missing or
malformed payloads as summary = "", causing silent data loss; remove the
#[serde(default)] attribute from the summary field in struct LlmSummaryOutput so
deserialization fails if the LLM payload lacks a summary, and update any calling
code that assumed an empty string fallback to handle the deserialization error
or provide an explicit fallback path instead.

---

Duplicate comments:
In `@src/openhuman/memory/tree/rpc.rs`:
- Around line 227-246: The RPC currently calls jobs::trigger_digest(...) which
may enqueue a no-op job even after that day's digest is already materialized,
breaking the “idempotent by date” contract; add a pre-check before spawning the
blocking task to detect an already-materialized digest (e.g. call a new or
existing jobs::digest_exists / jobs::is_digest_materialized function with the
same config and date) and if it returns true return RpcOutcome::single_log with
TriggerDigestResponse { enqueued: false, job_id: None, date_iso } and an
appropriate log message instead of calling jobs::trigger_digest; alternatively,
change jobs::trigger_digest to return a distinct enum (e.g.
SkippedAlreadyMaterialized) and handle that case here so enqueued only becomes
true when a real job was created.

In `@src/openhuman/memory/tree/score/extract/llm.rs`:
- Around line 436-451: The parser currently maps self.topics into ExtractedTopic
regardless of whether topics were requested; change the logic in the response
parsing section that builds topics (the block using self.topics and producing
ExtractedTopic { label, score: 0.85 }) to first check the emit_topics flag (e.g.
self.emit_topics or the appropriate request flag) and only perform the
filter_map when emit_topics is true, otherwise produce an empty list so any
model-emitted topics are ignored when the caller disabled emit_topics.

---

Nitpick comments:
In `@src/openhuman/memory/tree/jobs/types.rs`:
- Around line 215-227: The dedupe key is asymmetric because
FlushStalePayload::dedupe_key currently takes an external date_iso; change the
payload to own the date by adding a new field pub date_iso: String to
FlushStalePayload, remove the dedupe_key(&self, date_iso: &str) signature and
make it dedupe_key(&self) -> String that returns format!("flush_stale:{}",
self.date_iso); then update all places that construct FlushStalePayload (the
scheduler/enqueue sites) to populate date_iso and any tests to reflect the new
field and signature so callers no longer provide a separate date string.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 68e72e9f-e8f2-4665-b59d-bd49c119beb4

📥 Commits

Reviewing files that changed from the base of the PR and between 605582f and 2e87c05.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (38)
  • .gitignore
  • docs/memory-tree-async-pipeline.excalidraw
  • src/core/jsonrpc.rs
  • src/openhuman/composio/providers/gmail/ingest.rs
  • src/openhuman/memory/tree/canonicalize/email_clean.rs
  • src/openhuman/memory/tree/global_tree/digest.rs
  • src/openhuman/memory/tree/global_tree/digest_tests.rs
  • src/openhuman/memory/tree/global_tree/recap.rs
  • src/openhuman/memory/tree/global_tree/seal.rs
  • src/openhuman/memory/tree/ingest.rs
  • src/openhuman/memory/tree/jobs/handlers/mod.rs
  • src/openhuman/memory/tree/jobs/mod.rs
  • src/openhuman/memory/tree/jobs/scheduler.rs
  • src/openhuman/memory/tree/jobs/store.rs
  • src/openhuman/memory/tree/jobs/testing.rs
  • src/openhuman/memory/tree/jobs/types.rs
  • src/openhuman/memory/tree/jobs/worker.rs
  • src/openhuman/memory/tree/mod.rs
  • src/openhuman/memory/tree/retrieval/drill_down.rs
  • src/openhuman/memory/tree/retrieval/global.rs
  • src/openhuman/memory/tree/retrieval/integration_test.rs
  • src/openhuman/memory/tree/retrieval/source.rs
  • src/openhuman/memory/tree/rpc.rs
  • src/openhuman/memory/tree/schemas.rs
  • src/openhuman/memory/tree/score/extract/llm.rs
  • src/openhuman/memory/tree/score/extract/llm_tests.rs
  • src/openhuman/memory/tree/score/extract/mod.rs
  • src/openhuman/memory/tree/score/mod.rs
  • src/openhuman/memory/tree/score/store.rs
  • src/openhuman/memory/tree/source_tree/bucket_seal.rs
  • src/openhuman/memory/tree/source_tree/bucket_seal_tests.rs
  • src/openhuman/memory/tree/source_tree/flush.rs
  • src/openhuman/memory/tree/source_tree/mod.rs
  • src/openhuman/memory/tree/source_tree/summariser/llm.rs
  • src/openhuman/memory/tree/store.rs
  • src/openhuman/memory/tree/topic_tree/backfill.rs
  • src/openhuman/memory/tree/topic_tree/curator.rs
  • src/openhuman/memory/tree/topic_tree/routing.rs
✅ Files skipped from review due to trivial changes (2)
  • .gitignore
  • src/openhuman/memory/tree/score/mod.rs
🚧 Files skipped from review as they are similar to previous changes (16)
  • src/openhuman/memory/tree/mod.rs
  • src/core/jsonrpc.rs
  • src/openhuman/memory/tree/source_tree/mod.rs
  • src/openhuman/memory/tree/jobs/testing.rs
  • src/openhuman/memory/tree/retrieval/drill_down.rs
  • src/openhuman/memory/tree/retrieval/source.rs
  • src/openhuman/memory/tree/global_tree/seal.rs
  • src/openhuman/memory/tree/topic_tree/curator.rs
  • src/openhuman/memory/tree/global_tree/digest.rs
  • src/openhuman/memory/tree/retrieval/global.rs
  • src/openhuman/memory/tree/jobs/mod.rs
  • docs/memory-tree-async-pipeline.excalidraw
  • src/openhuman/memory/tree/topic_tree/routing.rs
  • src/openhuman/memory/tree/store.rs
  • src/openhuman/memory/tree/retrieval/integration_test.rs
  • src/openhuman/memory/tree/source_tree/flush.rs

Comment thread src/openhuman/memory/tree/ingest.rs
Comment thread src/openhuman/memory/tree/jobs/worker.rs Outdated
Comment thread src/openhuman/memory/tree/source_tree/bucket_seal.rs
Comment thread src/openhuman/memory/tree/source_tree/bucket_seal.rs
@sanil-23 sanil-23 force-pushed the feat/memory-tree-async-pipeline branch from 2e87c05 to b00692c Compare April 27, 2026 23:03
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (3)
src/openhuman/memory/tree/score/extract/mod.rs (1)

69-75: ⚠️ Potential issue | 🟠 Major

Don’t log raw extractor endpoints.

Line 70-Line 74 can leak credentials/query secrets if the configured URL contains them. Log model/timeout only, or a redacted endpoint summary.

Suggested fix
             log::info!(
-                "[memory_tree::extract] summary extractor: regex + LLM endpoint={} model={} \
+                "[memory_tree::extract] summary extractor: regex + LLM model={} \
                  timeout_ms={} emit_topics=true",
-                endpoint,
                 model,
                 timeout_ms
             );

As per coding guidelines, "Never log secrets or full PII."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/score/extract/mod.rs` around lines 69 - 75, The
current log in the summary extractor prints the raw endpoint variable (endpoint)
which can leak credentials; modify the log::info call in the extract module so
it does not emit the full endpoint: either remove endpoint from the message and
only log model and timeout_ms, or replace endpoint with a redacted summary
(e.g., scheme+host only or mask url::Url::username/password/userinfo) before
logging; update the call sites around the log::info invocation that reference
endpoint to use the redacted value or omit it.
src/openhuman/memory/tree/global_tree/digest_tests.rs (1)

283-290: ⚠️ Potential issue | 🟡 Minor

Use the actual source tree id when seeding index_entity.

Line 289 currently passes Some(scope), but this parameter is the tree_id slot. Seeding with scope can produce non-production-shaped index rows and hide tree-id-sensitive issues in digest/hydration paths.

Suggested fix
             index_entity(
                 cfg,
                 &e,
                 &chunk.id,
                 "leaf",
                 ts.timestamp_millis(),
-                Some(scope),
+                Some(&tree.id),
             )
             .unwrap();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/global_tree/digest_tests.rs` around lines 283 -
290, The call to index_entity is passing Some(scope) into the tree_id parameter,
which seeds index rows with scope instead of the actual source tree id; update
the invocation of index_entity (the function named index_entity) to pass the
correct source tree id value (e.g., the tree.id or whatever variable represents
the source tree identifier in the test setup) in place of Some(scope) so the
test seeds production-shaped index rows and exercises tree-id-sensitive
digest/hydration code paths.
src/openhuman/memory/tree/score/extract/llm.rs (1)

436-451: ⚠️ Potential issue | 🟠 Major

Honor emit_topics in the parser, not just the prompt.

Despite the comment, topics are still mapped unconditionally. If a model emits topics even when opt-out is requested, they leak into outputs.

Suggested fix
-        let topics = self
-            .topics
-            .into_iter()
-            .filter_map(|raw| {
-                let label = raw.trim().to_string();
-                if label.is_empty() {
-                    None
-                } else {
-                    Some(ExtractedTopic { label, score: 0.85 })
-                }
-            })
-            .collect();
+        let topics = if cfg.emit_topics {
+            self.topics
+                .into_iter()
+                .filter_map(|raw| {
+                    let label = raw.trim().to_string();
+                    if label.is_empty() {
+                        None
+                    } else {
+                        Some(ExtractedTopic { label, score: 0.85 })
+                    }
+                })
+                .collect()
+        } else {
+            Vec::new()
+        };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/score/extract/llm.rs` around lines 436 - 451, The
topics mapping currently always converts self.topics into ExtractedTopic values
causing leaked topics even when the caller requested opt-out; update the parser
to honor the emit_topics setting by guarding the mapping: if the parser/struct
flag (e.g., self.emit_topics or an emit_topics parameter) is false, set topics
to an empty Vec immediately, otherwise perform the existing filter_map on
self.topics to produce ExtractedTopic instances; modify the block around the
topics variable (the code that currently assigns to let topics =
self.topics.into_iter().filter_map(...).collect();) to perform this conditional
check so topics are only populated when emit_topics is true.
🧹 Nitpick comments (1)
src/openhuman/memory/tree/jobs/scheduler.rs (1)

15-26: Scheduler loop has no graceful shutdown mechanism.

The spawned task loops forever with no way to signal termination. This is acceptable for a long-running daemon, but if the scheduler needs to be stopped (e.g., during tests or graceful shutdown), there's no cancellation token or shutdown signal.

Consider adding a CancellationToken or watch channel if graceful shutdown becomes necessary.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/jobs/scheduler.rs` around lines 15 - 26, The
scheduler spawned in start(Config) runs an infinite loop (inside
STARTED.call_once) and lacks a graceful shutdown mechanism; modify start and the
spawned task to accept and observe a CancellationToken or a watch channel (e.g.,
add a shutdown token parameter to start or capture a shared CancellationToken)
and check it inside the loop before/after calling enqueue_daily_jobs and before
sleeping (using token.cancelled().await or tokio::select! with the sleep and
shutdown signal) so the tokio::spawn task can exit cleanly; update call sites
that invoke start to provide the shutdown token and ensure enqueue_daily_jobs
and next_sleep_duration interplay with the token for prompt termination.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/core/jsonrpc.rs`:
- Around line 808-811: register_domain_subscribers currently calls
jobs::start(config.clone()) at bootstrap and Because it's Once-guarded this pins
long-lived workers to the bootstrap-time workspace; to fix, remove starting
long-lived workers from register_domain_subscribers and instead register the
subscribers at user/login-scoped startup (e.g. move the startup call into
channels/runtime/startup.rs so it runs after the active workspace is selected),
or change the worker code started by jobs::start to re-resolve the active Config
on each poll before accessing the memory-tree DB; implement the
publish/subscribe pattern by publishing DomainEvent variants with
event_bus::publish_global(...) and move subscriber implementations into each
domain's bus.rs per-domain so they are initialized under the correct
user/workspace scope rather than at core bootstrap.

In `@src/openhuman/memory/tree/jobs/store.rs`:
- Around line 163-208: In mark_failed, do not log or store the raw error string;
sanitize/redact PII before using the error variable in log::warn!, log::info!
and the last_error DB parameter. Create or call a sanitizer (e.g.,
sanitize_error or redact_pii) to remove emails, prompt text, secrets and to
truncate long messages, then use the sanitized value in the two
conn.execute(...) params and the log messages (references: mark_failed, error
parameter, last_error column, mem_tree_jobs UPDATE, log::warn!, log::info!,
backoff_ms). Ensure the sanitizer is applied to both terminal-failure and retry
paths so no raw error is written or logged.
- Around line 77-91: The debug lines in the enqueue path log job.dedupe_key raw
(seen in the "enqueue suppressed by dedupe" and "enqueued" log calls) which can
leak PII; change both log::debug calls to avoid printing the raw dedupe_key by
either logging a redacted indicator (e.g. "present"/"none") or a fixed-length
hash of job.dedupe_key (e.g. SHA-256 hex) instead, keeping the rest of the
context (id, job.kind.as_str(), available_at) intact; update the code that
references job.dedupe_key in those log messages to use the new redaction/hash
helper so future logs never contain the raw key.

---

Duplicate comments:
In `@src/openhuman/memory/tree/global_tree/digest_tests.rs`:
- Around line 283-290: The call to index_entity is passing Some(scope) into the
tree_id parameter, which seeds index rows with scope instead of the actual
source tree id; update the invocation of index_entity (the function named
index_entity) to pass the correct source tree id value (e.g., the tree.id or
whatever variable represents the source tree identifier in the test setup) in
place of Some(scope) so the test seeds production-shaped index rows and
exercises tree-id-sensitive digest/hydration code paths.

In `@src/openhuman/memory/tree/score/extract/llm.rs`:
- Around line 436-451: The topics mapping currently always converts self.topics
into ExtractedTopic values causing leaked topics even when the caller requested
opt-out; update the parser to honor the emit_topics setting by guarding the
mapping: if the parser/struct flag (e.g., self.emit_topics or an emit_topics
parameter) is false, set topics to an empty Vec immediately, otherwise perform
the existing filter_map on self.topics to produce ExtractedTopic instances;
modify the block around the topics variable (the code that currently assigns to
let topics = self.topics.into_iter().filter_map(...).collect();) to perform this
conditional check so topics are only populated when emit_topics is true.

In `@src/openhuman/memory/tree/score/extract/mod.rs`:
- Around line 69-75: The current log in the summary extractor prints the raw
endpoint variable (endpoint) which can leak credentials; modify the log::info
call in the extract module so it does not emit the full endpoint: either remove
endpoint from the message and only log model and timeout_ms, or replace endpoint
with a redacted summary (e.g., scheme+host only or mask
url::Url::username/password/userinfo) before logging; update the call sites
around the log::info invocation that reference endpoint to use the redacted
value or omit it.

---

Nitpick comments:
In `@src/openhuman/memory/tree/jobs/scheduler.rs`:
- Around line 15-26: The scheduler spawned in start(Config) runs an infinite
loop (inside STARTED.call_once) and lacks a graceful shutdown mechanism; modify
start and the spawned task to accept and observe a CancellationToken or a watch
channel (e.g., add a shutdown token parameter to start or capture a shared
CancellationToken) and check it inside the loop before/after calling
enqueue_daily_jobs and before sleeping (using token.cancelled().await or
tokio::select! with the sleep and shutdown signal) so the tokio::spawn task can
exit cleanly; update call sites that invoke start to provide the shutdown token
and ensure enqueue_daily_jobs and next_sleep_duration interplay with the token
for prompt termination.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ed334870-f439-4f36-a809-3facfb2cbc3b

📥 Commits

Reviewing files that changed from the base of the PR and between 2e87c05 and b00692c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (38)
  • .gitignore
  • docs/memory-tree-async-pipeline.excalidraw
  • src/core/jsonrpc.rs
  • src/openhuman/composio/providers/gmail/ingest.rs
  • src/openhuman/memory/tree/canonicalize/email_clean.rs
  • src/openhuman/memory/tree/global_tree/digest.rs
  • src/openhuman/memory/tree/global_tree/digest_tests.rs
  • src/openhuman/memory/tree/global_tree/recap.rs
  • src/openhuman/memory/tree/global_tree/seal.rs
  • src/openhuman/memory/tree/ingest.rs
  • src/openhuman/memory/tree/jobs/handlers/mod.rs
  • src/openhuman/memory/tree/jobs/mod.rs
  • src/openhuman/memory/tree/jobs/scheduler.rs
  • src/openhuman/memory/tree/jobs/store.rs
  • src/openhuman/memory/tree/jobs/testing.rs
  • src/openhuman/memory/tree/jobs/types.rs
  • src/openhuman/memory/tree/jobs/worker.rs
  • src/openhuman/memory/tree/mod.rs
  • src/openhuman/memory/tree/retrieval/drill_down.rs
  • src/openhuman/memory/tree/retrieval/global.rs
  • src/openhuman/memory/tree/retrieval/integration_test.rs
  • src/openhuman/memory/tree/retrieval/source.rs
  • src/openhuman/memory/tree/rpc.rs
  • src/openhuman/memory/tree/schemas.rs
  • src/openhuman/memory/tree/score/extract/llm.rs
  • src/openhuman/memory/tree/score/extract/llm_tests.rs
  • src/openhuman/memory/tree/score/extract/mod.rs
  • src/openhuman/memory/tree/score/mod.rs
  • src/openhuman/memory/tree/score/store.rs
  • src/openhuman/memory/tree/source_tree/bucket_seal.rs
  • src/openhuman/memory/tree/source_tree/bucket_seal_tests.rs
  • src/openhuman/memory/tree/source_tree/flush.rs
  • src/openhuman/memory/tree/source_tree/mod.rs
  • src/openhuman/memory/tree/source_tree/summariser/llm.rs
  • src/openhuman/memory/tree/store.rs
  • src/openhuman/memory/tree/topic_tree/backfill.rs
  • src/openhuman/memory/tree/topic_tree/curator.rs
  • src/openhuman/memory/tree/topic_tree/routing.rs
✅ Files skipped from review due to trivial changes (3)
  • .gitignore
  • src/openhuman/memory/tree/topic_tree/curator.rs
  • docs/memory-tree-async-pipeline.excalidraw
🚧 Files skipped from review as they are similar to previous changes (12)
  • src/openhuman/memory/tree/mod.rs
  • src/openhuman/memory/tree/jobs/testing.rs
  • src/openhuman/memory/tree/source_tree/mod.rs
  • src/openhuman/memory/tree/retrieval/drill_down.rs
  • src/openhuman/memory/tree/schemas.rs
  • src/openhuman/memory/tree/jobs/worker.rs
  • src/openhuman/memory/tree/canonicalize/email_clean.rs
  • src/openhuman/memory/tree/jobs/types.rs
  • src/openhuman/memory/tree/source_tree/flush.rs
  • src/openhuman/memory/tree/source_tree/summariser/llm.rs
  • src/openhuman/composio/providers/gmail/ingest.rs
  • src/openhuman/memory/tree/topic_tree/backfill.rs

Comment thread src/core/jsonrpc.rs
Comment thread src/openhuman/memory/tree/jobs/store.rs
Comment thread src/openhuman/memory/tree/jobs/store.rs
…gest trigger

Move memory-tree work onto a SQLite-backed job queue so summariser /
extractor / cascade work runs out of band of ingest.

Pipeline shape after this change:

  ingest_chat → write chunk (lifecycle=pending_extraction)
              → enqueue extract_chunk

  worker (×3, LLM semaphore=3):
    extract_chunk    → LLM extract → admit/drop → enqueue append_buffer + topic_route
    append_buffer    → push to L0 → enqueue seal if gate met
    seal             → seal one level → enqueue parent seal if cascading
                       → for source trees: enqueue topic_route(Summary)
    topic_route      → maybe_spawn_topic_tree (counters + backfill if hot)
                       → enqueue per-topic append_buffer
    digest_daily     → end_of_day_digest(date)
    flush_stale      → enqueue seal jobs for stale buffers

  scheduler (1 task, daily UTC 00:05):
    enqueue digest_daily(yesterday) + flush_stale (dedupe-keyed by date)

Key additions:

  * mem_tree_jobs table with dedupe keys and per-status retry/backoff
  * lifecycle_status column on mem_tree_chunks
  * Per-level seal jobs (each level its own crash-recovery checkpoint)
  * LabelStrategy enum driving summary entity/topic population:
      - source-tree seal:    ExtractFromContent(regex+LLM emit_topics=true)
      - global L0 + L1+:     UnionFromChildren (no extra LLM call)
      - topic-tree seal:     Empty (scope already pins the topic)
  * NodeRef::{Leaf, Summary} payload generalisation so summary entities
    flow back into topic-tree spawn / route pipeline
  * 30-day backfill window on topic-tree spawn aligned with hotness
    recency cliff
  * memory_tree.trigger_digest RPC + jobs::backfill_missing_digests
    catch-up helper
  * extract::build_summary_extractor honest builder (no ScoringConfig
    misuse for seal-time extraction)
  * Lenient day-of-week parser in email_clean for real-world MTA quirks
  * Slack/Gmail canonicalisers, composio providers, slack_ingestion module

Tests: 522 passing across memory_tree, slack_ingestion, composio_providers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sanil-23 sanil-23 force-pushed the feat/memory-tree-async-pipeline branch from b00692c to 4d917fa Compare April 27, 2026 23:17
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (8)
src/openhuman/memory/tree/score/extract/mod.rs (1)

69-75: ⚠️ Potential issue | 🟠 Major

Don’t log the raw LLM endpoint.

If this URL carries embedded credentials or sensitive query params, this info! leaks them verbatim. Log a redacted summary instead, or omit the endpoint entirely.

As per coding guidelines, "Never log secrets or full PII."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/score/extract/mod.rs` around lines 69 - 75, The log
call in the summary extractor is currently emitting the raw LLM endpoint
(variable endpoint) which can contain credentials or sensitive query params;
change the logging to avoid leaking secrets by redacting or omitting the
endpoint: create or call a small sanitizer (e.g., redact_url or use url::Url to
strip userinfo and query) and log a redacted_endpoint (or just log the
host/origin) instead of endpoint, while keeping model and timeout_ms unchanged;
update the info! call in the summary extractor block that references endpoint so
it uses the sanitized value.
src/openhuman/memory/tree/canonicalize/email_clean.rs (2)

151-161: ⚠️ Potential issue | 🟡 Minor

truncate_body exceeds its advertised limit.

For truncated inputs this returns max_chars + 1 characters, and max_chars == 0 still produces a non-empty string. Keep the ellipsis inside the budget.

🛠️ Proposed fix
 pub fn truncate_body(body: &str, max_chars: usize) -> String {
     let trimmed = body.trim();
+    if max_chars == 0 {
+        return String::new();
+    }
     if trimmed.chars().count() <= max_chars {
         return trimmed.to_string();
     }
-    let mut out: String = trimmed.chars().take(max_chars).collect();
+    let mut out: String = trimmed.chars().take(max_chars - 1).collect();
     out.push('…');
     out
 }
     fn truncate_body_adds_ellipsis() {
         let s = "x".repeat(2000);
         let t = truncate_body(&s, 1200);
         assert!(t.ends_with('…'));
-        assert_eq!(t.chars().count(), 1201);
+        assert_eq!(t.chars().count(), 1200);
     }

Also applies to: 321-326

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/canonicalize/email_clean.rs` around lines 151 -
161, truncate_body currently returns max_chars+1 characters when truncating and
emits an ellipsis even when max_chars == 0; fix this in truncate_body by
counting characters after trim, returning the trimmed string if its char count
<= max_chars, and if truncation is needed ensure the ellipsis fits inside the
budget: if max_chars == 0 return an empty String, otherwise collect
trimmed.chars().take(max_chars - 1) and then push '…' so the final length is <=
max_chars; update the same logic in the duplicate occurrence (lines ~321-326)
where the same truncation is implemented.

164-179: ⚠️ Potential issue | 🟡 Minor

Escape # here or fix the docs.

The docstring says header markers are escaped, but # currently passes through unchanged and can still break markdown rendering.

🛠️ Proposed fix
     for ch in s.chars() {
         match ch {
-            '\\' | '`' | '*' | '_' | '|' => {
+            '\\' | '`' | '*' | '_' | '|' | '#' => {
                 out.push('\\');
                 out.push(ch);
             }
     fn md_escape_handles_special_chars() {
         assert_eq!(md_escape("a*b_c"), "a\\*b\\_c");
         assert_eq!(md_escape("foo|bar"), "foo\\|bar");
+        assert_eq!(md_escape("# heading"), "\\# heading");
         assert_eq!(md_escape("line1\nline2"), "line1 line2");
         assert_eq!(md_escape("plain text"), "plain text");
     }

Also applies to: 336-341

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/canonicalize/email_clean.rs` around lines 164 -
179, The md_escape function currently fails to escape the '#' character despite
the docstring claiming header markers are escaped; update the match in md_escape
to include '#' alongside '\\', '`', '*', '_', '|' so that a backslash is
prepended before '#' and newlines still collapse to spaces, and apply the same
change to the other duplicate md_escape occurrence referenced (the identical
escape match around lines 336-341) so both implementations consistently escape
header markers.
src/openhuman/memory/tree/source_tree/bucket_seal.rs (1)

646-652: ⚠️ Potential issue | 🟠 Major

Don't silently drop entity labels when the index lookup fails.

list_entity_ids_for_node errors are swallowed by unwrap_or_default(), which treats any DB/read error as "this chunk has no entities." This can permanently strip labels from rolled-up summaries instead of retrying the seal.

Suggested fix
-        let entities = list_entity_ids_for_node(config, id).unwrap_or_default();
+        let entities = list_entity_ids_for_node(config, id)
+            .with_context(|| format!("list entity ids for chunk {id}"))?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/source_tree/bucket_seal.rs` around lines 646 - 652,
The code calls list_entity_ids_for_node(...).unwrap_or_default(), which hides
DB/read errors and treats them as "no entities"; instead propagate failures so
the seal can retry: remove unwrap_or_default and handle the Result from
list_entity_ids_for_node by returning or propagating the error from the
surrounding function (or converting it into a meaningful error), only treating
Ok(vec) as the entities for building
SummaryInput/LabelStrategy::UnionFromChildren; if you need soft-failure logging,
log the error with context (including node id) and then return Err to avoid
silently dropping labels.
src/openhuman/memory/tree/rpc.rs (1)

182-206: ⚠️ Potential issue | 🟡 Minor

Documentation overstates idempotency guarantee.

The docstring says "idempotent — the handler skips if a daily node already exists" but the actual behavior only dedupes against active jobs (ready/running). Once a digest job completes, calling again will successfully enqueue a new job that may re-summarize the same day's content.

Consider either:

  1. Adding a pre-check for an existing daily summary before enqueueing, or
  2. Updating the documentation to clarify the dedupe scope
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/rpc.rs` around lines 182 - 206, The docstring for
TriggerDigestRequest/TriggerDigestResponse overstates idempotency: the current
enqueue logic only dedupes against active jobs (ready/running) rather than
existing completed daily-summary nodes; either add a pre-check in the trigger
handler to query for an existing daily node for the target date and
refuse/enqueue accordingly (update the handler that computes the target date and
performs DB enqueue), or update the docstrings/comments on
TriggerDigestRequest/TriggerDigestResponse to explicitly state that
deduplication only prevents duplicate active jobs and completed summaries are
not considered idempotent; reference TriggerDigestRequest, TriggerDigestResponse
and the trigger handler/enqueue code path when making the change.
src/openhuman/memory/tree/jobs/store.rs (2)

77-91: ⚠️ Potential issue | 🟡 Minor

Raw dedupe keys logged may leak workspace/user data.

The dedupe_key can encode chunk/source identifiers which may contain PII. This was flagged in a prior review. Consider logging only presence ("present" vs "none") or a truncated hash instead.

As per coding guidelines: "Never log secrets or full PII."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/jobs/store.rs` around lines 77 - 91, The logs in
store.rs currently print the raw job.dedupe_key which may contain PII; update
the two debug log sites (the branches that check `inserted == 0` and the
subsequent enqueued log) to avoid printing the full dedupe_key: either log its
presence as "present"/"none" or compute a short deterministic fingerprint (e.g.,
a SHA256 or xxhash and log the first N hex chars) and log that instead along
with `job.kind.as_str()` and `id`/`available_at`; ensure the transformation is
done once (e.g., compute `let dedupe_indicator = ...` near where `job` is
available) and use `dedupe_indicator` in both debug messages so no raw key is
emitted.

180-198: ⚠️ Potential issue | 🟡 Minor

Error strings stored and logged without sanitization.

The error parameter in mark_failed is written directly to last_error in the database and logged. If upstream providers include prompt text, source scopes, or email addresses in error messages, this retains PII. This was flagged in a prior review.

As per coding guidelines: "Never log secrets or full PII."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/jobs/store.rs` around lines 180 - 198, The code
writes and logs the raw `error` in `mark_failed` (used in the DB UPDATE and
log::warn/info), which may contain PII; create and call a sanitizer (e.g.,
sanitize_error(&str) -> String) to redact emails, long prompt text, and other
sensitive tokens, and use the sanitized string both in the
`log::warn!/log::info!` messages and as the `last_error` parameter in the
`conn.execute` call (replace uses of `error` with the sanitized value); ensure
the sanitizer also truncates excessively long messages and leaves no raw PII in
logs or DB.
src/openhuman/composio/providers/gmail/ingest.rs (1)

187-212: ⚠️ Potential issue | 🟡 Minor

Sender email addresses are logged as PII.

Lines 188-189 and 208-209 log the full sender email address, which violates the coding guideline "Never log secrets or full PII." This was flagged in a prior review and remains unaddressed.

As per coding guidelines: "Never log secrets or full PII."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/composio/providers/gmail/ingest.rs` around lines 187 - 212, The
logs currently emit full sender email (variable sender) in the info and warn
calls around the ingest loop; replace that with a non-PII representation (e.g.,
mask the local-part, log only domain, or a one-way hash) and use that masked
value in both log::info and log::warn where sender is used (the log calls that
surround thread creation and the ingest_email error handling). Implement or call
a helper like mask_email(sender) or hash_identifier(sender) and pass the masked
result into the log messages and keep thread_id and other non-PII fields
unchanged.
🧹 Nitpick comments (6)
src/openhuman/memory/tree/score/extract/mod.rs (1)

21-89: Move build_summary_extractor out of mod.rs.

This adds config parsing, construction policy, and fallback behavior directly into the module root. Re-export it from here, but keep the operational code in a sibling file so mod.rs stays lightweight.

As per coding guidelines, "keep domain mod.rs export-focused, with operational code in ops.rs, store.rs, types.rs, etc."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/score/extract/mod.rs` around lines 21 - 89, Move
the operational implementation of build_summary_extractor (including the
LlmExtractorConfig creation, endpoint/model parsing, timeout logic,
LlmEntityExtractor::new match, and CompositeExtractor/RegexEntityExtractor
fallback) into a new sibling file (e.g., ops.rs) and re-export the function from
mod.rs; leave mod.rs as an export-only module that pub use
crate::openhuman::memory::tree::score::extract::build_summary_extractor (or
equivalent) while keeping all logic and imports (LlmExtractorConfig,
LlmEntityExtractor, CompositeExtractor, RegexEntityExtractor) inside the new
file so mod.rs remains lightweight and only exposes the symbol.
src/openhuman/memory/tree/score/mod.rs (1)

343-351: Avoid hand-copying ScoringConfig here.

This fast path only wants llm_extractor = None, but rebuilding the whole struct manually makes it easy for future scoring knobs to get out of sync with the normal path.

♻️ Possible cleanup
- pub struct ScoringConfig {
+ #[derive(Clone)]
+ pub struct ScoringConfig {
     pub extractor: Arc<dyn EntityExtractor>,
     pub weights: SignalWeights,
     pub drop_threshold: f32,
     pub llm_extractor: Option<Arc<dyn EntityExtractor>>,
     pub definite_keep_threshold: f32,
     pub definite_drop_threshold: f32,
 }
 pub async fn score_chunks_fast(chunks: &[Chunk], cfg: &ScoringConfig) -> Result<Vec<ScoreResult>> {
-    let fast_cfg = ScoringConfig {
-        extractor: cfg.extractor.clone(),
-        weights: cfg.weights.clone(),
-        drop_threshold: cfg.drop_threshold,
-        llm_extractor: None,
-        definite_keep_threshold: cfg.definite_keep_threshold,
-        definite_drop_threshold: cfg.definite_drop_threshold,
-    };
+    let mut fast_cfg = cfg.clone();
+    fast_cfg.llm_extractor = None;
     score_chunks(chunks, &fast_cfg).await
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/score/mod.rs` around lines 343 - 351, The function
score_chunks_fast currently rebuilds a ScoringConfig by hand which risks
drifting fields out of sync; instead clone the existing cfg (use ScoringConfig's
Clone implementation) and simply set its llm_extractor field to None (e.g., let
mut fast_cfg = cfg.clone(); fast_cfg.llm_extractor = None), then use fast_cfg in
the fast path. This keeps score_chunks_fast, ScoringConfig, and the
llm_extractor handling in sync with future changes.
src/openhuman/memory/tree/jobs/testing.rs (1)

8-15: Fail fast if the queue never converges.

This loop has no bound, so a job that keeps re-enqueuing immediate work will hang the test process until CI times out. A small iteration cap or deadline would make those failures diagnosable.

♻️ Possible hardening
 pub async fn drain_until_idle(config: &Config) -> Result<()> {
-    loop {
+    const MAX_RUNS: usize = 10_000;
+    for _ in 0..MAX_RUNS {
         if !super::worker::run_once(config).await? {
-            break;
+            return Ok(());
         }
     }
-    Ok(())
+    anyhow::bail!("memory-tree job queue did not go idle after {MAX_RUNS} iterations")
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/jobs/testing.rs` around lines 8 - 15, The
drain_until_idle loop in drain_until_idle keeps trying run_once indefinitely;
add a safety bound and fail fast if it doesn't converge (e.g., introduce a
max_iterations counter or a deadline/timeout using tokio::time::Instant +
Duration or tokio::time::timeout) and return an Err when exceeded; update the
function to either accept a max_iterations or compute a deadline from Config,
increment/check the counter or deadline around calls to
super::worker::run_once(config).await? and return a clear error explaining that
the queue never became idle if the cap/deadline is reached.
src/openhuman/memory/tree/jobs/worker.rs (1)

68-71: Clarify single-slot semaphore intent.

run_once creates a fresh Semaphore(1) per call, which is fine for test/single-shot use but differs from the production pool's shared 3-slot semaphore. Consider a brief doc comment noting this is for test harnesses.

Suggested doc
+/// Single-shot job execution for tests / `drain_until_idle`. Uses a
+/// private 1-slot semaphore since there's no concurrency to coordinate.
 pub async fn run_once(config: &Config) -> Result<bool> {
     let llm_slots = Arc::new(Semaphore::new(1));
     run_once_with_semaphore(config, llm_slots).await
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/jobs/worker.rs` around lines 68 - 71, The run_once
function currently constructs a fresh Semaphore(1) per call which is intended
for test/single-shot usage and differs from the production shared 3-slot
semaphore; add a brief doc comment above pub async fn run_once(config: &Config)
-> Result<bool> clarifying that this function purposely uses a single-slot
Semaphore(1) for test harnesses or single-shot runs, and note that production
code uses the shared 3-slot semaphore managed elsewhere and should call
run_once_with_semaphore when the shared pool is available.
src/openhuman/memory/tree/ingest.rs (1)

119-127: Variable naming convention.

The coding guidelines specify camelCase for variable names in src/openhuman/**/*.rs, but chunks_for_store, results_for_store, and all_results use snake_case.

As per coding guidelines: "Use camelCase for variable names".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/ingest.rs` around lines 119 - 127, The variables
all_results, chunks_for_store, and results_for_store (and similarly config_owned
if present) violate the camelCase naming rule; rename them to camelCase (e.g.,
allResults, chunksForStore, resultsForStore, configOwned), update every use site
in this function/module (including the let bindings shown and any subsequent
references like the filter/count and cloned values), and run cargo build/tests
to ensure no remaining references to the old snake_case identifiers remain.
src/openhuman/memory/tree/jobs/handlers/mod.rs (1)

385-404: Consider bounding the number of seal jobs enqueued in flush_stale.

If list_stale_buffers returns a very large number of buffers (e.g., after extended downtime), this loop enqueues a seal job for each one synchronously, which could cause a spike in queue depth and memory pressure.

💡 Potential improvement

Consider adding a configurable limit to the number of seal jobs enqueued per flush_stale invocation, or batching the enqueues:

+const MAX_STALE_SEALS_PER_FLUSH: usize = 100;
+
 async fn handle_flush_stale(config: &Config, job: &Job) -> Result<()> {
     // ...
     let buffers =
         crate::openhuman::memory::tree::source_tree::store::list_stale_buffers(config, cutoff)?;
-    for buf in buffers {
+    for buf in buffers.into_iter().take(MAX_STALE_SEALS_PER_FLUSH) {
         // ...
     }
     Ok(())
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/tree/jobs/handlers/mod.rs` around lines 385 - 404, In
handle_flush_stale, bound the number of seal jobs enqueued to avoid spikes: add
a configurable limit (e.g., in Config or FlushStalePayload) and iterate only up
to that limit when processing the vector returned by list_stale_buffers; for
each selected buf build SealPayload and call
store::enqueue(NewJob::seal(&seal)?), track whether any jobs were enqueued and
call super::worker::wake_workers() once if so; additionally consider enqueuing
in batches (e.g., chunking the selected buffers) if you want to limit concurrent
DB work—update handle_flush_stale, list_stale_buffers usage, and references to
SealPayload/NewJob::seal/store::enqueue accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/openhuman/composio/providers/gmail/ingest.rs`:
- Around line 187-212: The logs currently emit full sender email (variable
sender) in the info and warn calls around the ingest loop; replace that with a
non-PII representation (e.g., mask the local-part, log only domain, or a one-way
hash) and use that masked value in both log::info and log::warn where sender is
used (the log calls that surround thread creation and the ingest_email error
handling). Implement or call a helper like mask_email(sender) or
hash_identifier(sender) and pass the masked result into the log messages and
keep thread_id and other non-PII fields unchanged.

In `@src/openhuman/memory/tree/canonicalize/email_clean.rs`:
- Around line 151-161: truncate_body currently returns max_chars+1 characters
when truncating and emits an ellipsis even when max_chars == 0; fix this in
truncate_body by counting characters after trim, returning the trimmed string if
its char count <= max_chars, and if truncation is needed ensure the ellipsis
fits inside the budget: if max_chars == 0 return an empty String, otherwise
collect trimmed.chars().take(max_chars - 1) and then push '…' so the final
length is <= max_chars; update the same logic in the duplicate occurrence (lines
~321-326) where the same truncation is implemented.
- Around line 164-179: The md_escape function currently fails to escape the '#'
character despite the docstring claiming header markers are escaped; update the
match in md_escape to include '#' alongside '\\', '`', '*', '_', '|' so that a
backslash is prepended before '#' and newlines still collapse to spaces, and
apply the same change to the other duplicate md_escape occurrence referenced
(the identical escape match around lines 336-341) so both implementations
consistently escape header markers.

In `@src/openhuman/memory/tree/jobs/store.rs`:
- Around line 77-91: The logs in store.rs currently print the raw job.dedupe_key
which may contain PII; update the two debug log sites (the branches that check
`inserted == 0` and the subsequent enqueued log) to avoid printing the full
dedupe_key: either log its presence as "present"/"none" or compute a short
deterministic fingerprint (e.g., a SHA256 or xxhash and log the first N hex
chars) and log that instead along with `job.kind.as_str()` and
`id`/`available_at`; ensure the transformation is done once (e.g., compute `let
dedupe_indicator = ...` near where `job` is available) and use
`dedupe_indicator` in both debug messages so no raw key is emitted.
- Around line 180-198: The code writes and logs the raw `error` in `mark_failed`
(used in the DB UPDATE and log::warn/info), which may contain PII; create and
call a sanitizer (e.g., sanitize_error(&str) -> String) to redact emails, long
prompt text, and other sensitive tokens, and use the sanitized string both in
the `log::warn!/log::info!` messages and as the `last_error` parameter in the
`conn.execute` call (replace uses of `error` with the sanitized value); ensure
the sanitizer also truncates excessively long messages and leaves no raw PII in
logs or DB.

In `@src/openhuman/memory/tree/rpc.rs`:
- Around line 182-206: The docstring for
TriggerDigestRequest/TriggerDigestResponse overstates idempotency: the current
enqueue logic only dedupes against active jobs (ready/running) rather than
existing completed daily-summary nodes; either add a pre-check in the trigger
handler to query for an existing daily node for the target date and
refuse/enqueue accordingly (update the handler that computes the target date and
performs DB enqueue), or update the docstrings/comments on
TriggerDigestRequest/TriggerDigestResponse to explicitly state that
deduplication only prevents duplicate active jobs and completed summaries are
not considered idempotent; reference TriggerDigestRequest, TriggerDigestResponse
and the trigger handler/enqueue code path when making the change.

In `@src/openhuman/memory/tree/score/extract/mod.rs`:
- Around line 69-75: The log call in the summary extractor is currently emitting
the raw LLM endpoint (variable endpoint) which can contain credentials or
sensitive query params; change the logging to avoid leaking secrets by redacting
or omitting the endpoint: create or call a small sanitizer (e.g., redact_url or
use url::Url to strip userinfo and query) and log a redacted_endpoint (or just
log the host/origin) instead of endpoint, while keeping model and timeout_ms
unchanged; update the info! call in the summary extractor block that references
endpoint so it uses the sanitized value.

In `@src/openhuman/memory/tree/source_tree/bucket_seal.rs`:
- Around line 646-652: The code calls
list_entity_ids_for_node(...).unwrap_or_default(), which hides DB/read errors
and treats them as "no entities"; instead propagate failures so the seal can
retry: remove unwrap_or_default and handle the Result from
list_entity_ids_for_node by returning or propagating the error from the
surrounding function (or converting it into a meaningful error), only treating
Ok(vec) as the entities for building
SummaryInput/LabelStrategy::UnionFromChildren; if you need soft-failure logging,
log the error with context (including node id) and then return Err to avoid
silently dropping labels.

---

Nitpick comments:
In `@src/openhuman/memory/tree/ingest.rs`:
- Around line 119-127: The variables all_results, chunks_for_store, and
results_for_store (and similarly config_owned if present) violate the camelCase
naming rule; rename them to camelCase (e.g., allResults, chunksForStore,
resultsForStore, configOwned), update every use site in this function/module
(including the let bindings shown and any subsequent references like the
filter/count and cloned values), and run cargo build/tests to ensure no
remaining references to the old snake_case identifiers remain.

In `@src/openhuman/memory/tree/jobs/handlers/mod.rs`:
- Around line 385-404: In handle_flush_stale, bound the number of seal jobs
enqueued to avoid spikes: add a configurable limit (e.g., in Config or
FlushStalePayload) and iterate only up to that limit when processing the vector
returned by list_stale_buffers; for each selected buf build SealPayload and call
store::enqueue(NewJob::seal(&seal)?), track whether any jobs were enqueued and
call super::worker::wake_workers() once if so; additionally consider enqueuing
in batches (e.g., chunking the selected buffers) if you want to limit concurrent
DB work—update handle_flush_stale, list_stale_buffers usage, and references to
SealPayload/NewJob::seal/store::enqueue accordingly.

In `@src/openhuman/memory/tree/jobs/testing.rs`:
- Around line 8-15: The drain_until_idle loop in drain_until_idle keeps trying
run_once indefinitely; add a safety bound and fail fast if it doesn't converge
(e.g., introduce a max_iterations counter or a deadline/timeout using
tokio::time::Instant + Duration or tokio::time::timeout) and return an Err when
exceeded; update the function to either accept a max_iterations or compute a
deadline from Config, increment/check the counter or deadline around calls to
super::worker::run_once(config).await? and return a clear error explaining that
the queue never became idle if the cap/deadline is reached.

In `@src/openhuman/memory/tree/jobs/worker.rs`:
- Around line 68-71: The run_once function currently constructs a fresh
Semaphore(1) per call which is intended for test/single-shot usage and differs
from the production shared 3-slot semaphore; add a brief doc comment above pub
async fn run_once(config: &Config) -> Result<bool> clarifying that this function
purposely uses a single-slot Semaphore(1) for test harnesses or single-shot
runs, and note that production code uses the shared 3-slot semaphore managed
elsewhere and should call run_once_with_semaphore when the shared pool is
available.

In `@src/openhuman/memory/tree/score/extract/mod.rs`:
- Around line 21-89: Move the operational implementation of
build_summary_extractor (including the LlmExtractorConfig creation,
endpoint/model parsing, timeout logic, LlmEntityExtractor::new match, and
CompositeExtractor/RegexEntityExtractor fallback) into a new sibling file (e.g.,
ops.rs) and re-export the function from mod.rs; leave mod.rs as an export-only
module that pub use
crate::openhuman::memory::tree::score::extract::build_summary_extractor (or
equivalent) while keeping all logic and imports (LlmExtractorConfig,
LlmEntityExtractor, CompositeExtractor, RegexEntityExtractor) inside the new
file so mod.rs remains lightweight and only exposes the symbol.

In `@src/openhuman/memory/tree/score/mod.rs`:
- Around line 343-351: The function score_chunks_fast currently rebuilds a
ScoringConfig by hand which risks drifting fields out of sync; instead clone the
existing cfg (use ScoringConfig's Clone implementation) and simply set its
llm_extractor field to None (e.g., let mut fast_cfg = cfg.clone();
fast_cfg.llm_extractor = None), then use fast_cfg in the fast path. This keeps
score_chunks_fast, ScoringConfig, and the llm_extractor handling in sync with
future changes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2318c511-db0c-4960-b968-f8b25b056b97

📥 Commits

Reviewing files that changed from the base of the PR and between b00692c and 4d917fa.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (38)
  • .gitignore
  • docs/memory-tree-async-pipeline.excalidraw
  • src/core/jsonrpc.rs
  • src/openhuman/composio/providers/gmail/ingest.rs
  • src/openhuman/memory/tree/canonicalize/email_clean.rs
  • src/openhuman/memory/tree/global_tree/digest.rs
  • src/openhuman/memory/tree/global_tree/digest_tests.rs
  • src/openhuman/memory/tree/global_tree/recap.rs
  • src/openhuman/memory/tree/global_tree/seal.rs
  • src/openhuman/memory/tree/ingest.rs
  • src/openhuman/memory/tree/jobs/handlers/mod.rs
  • src/openhuman/memory/tree/jobs/mod.rs
  • src/openhuman/memory/tree/jobs/scheduler.rs
  • src/openhuman/memory/tree/jobs/store.rs
  • src/openhuman/memory/tree/jobs/testing.rs
  • src/openhuman/memory/tree/jobs/types.rs
  • src/openhuman/memory/tree/jobs/worker.rs
  • src/openhuman/memory/tree/mod.rs
  • src/openhuman/memory/tree/retrieval/drill_down.rs
  • src/openhuman/memory/tree/retrieval/global.rs
  • src/openhuman/memory/tree/retrieval/integration_test.rs
  • src/openhuman/memory/tree/retrieval/source.rs
  • src/openhuman/memory/tree/rpc.rs
  • src/openhuman/memory/tree/schemas.rs
  • src/openhuman/memory/tree/score/extract/llm.rs
  • src/openhuman/memory/tree/score/extract/llm_tests.rs
  • src/openhuman/memory/tree/score/extract/mod.rs
  • src/openhuman/memory/tree/score/mod.rs
  • src/openhuman/memory/tree/score/store.rs
  • src/openhuman/memory/tree/source_tree/bucket_seal.rs
  • src/openhuman/memory/tree/source_tree/bucket_seal_tests.rs
  • src/openhuman/memory/tree/source_tree/flush.rs
  • src/openhuman/memory/tree/source_tree/mod.rs
  • src/openhuman/memory/tree/source_tree/summariser/llm.rs
  • src/openhuman/memory/tree/store.rs
  • src/openhuman/memory/tree/topic_tree/backfill.rs
  • src/openhuman/memory/tree/topic_tree/curator.rs
  • src/openhuman/memory/tree/topic_tree/routing.rs
✅ Files skipped from review due to trivial changes (5)
  • .gitignore
  • src/openhuman/memory/tree/global_tree/recap.rs
  • src/openhuman/memory/tree/source_tree/mod.rs
  • src/openhuman/memory/tree/jobs/types.rs
  • docs/memory-tree-async-pipeline.excalidraw
🚧 Files skipped from review as they are similar to previous changes (9)
  • src/openhuman/memory/tree/mod.rs
  • src/openhuman/memory/tree/retrieval/source.rs
  • src/openhuman/memory/tree/retrieval/global.rs
  • src/core/jsonrpc.rs
  • src/openhuman/memory/tree/global_tree/digest_tests.rs
  • src/openhuman/memory/tree/topic_tree/routing.rs
  • src/openhuman/memory/tree/retrieval/integration_test.rs
  • src/openhuman/memory/tree/score/extract/llm.rs
  • src/openhuman/memory/tree/source_tree/summariser/llm.rs

…ration assertion

The memory_tree_trigger_digest controller was added in the async digest
work but tests/json_rpc_e2e.rs still expected only 3 registered methods,
causing json_rpc_memory_tree_end_to_end to fail with 4 != 3.
Copy link
Copy Markdown
Contributor

@graycyrus graycyrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

This is a well-engineered async pipeline refactor with solid design throughout — the SQLite-backed job queue, the crash-safe atomic enqueue patterns, the LabelStrategy enum, and the summariser/extractor split all follow the project's conventions closely. Module layout, mod.rs exports, and RpcOutcome usage are all correct. The 522-test suite is a strong baseline.

A few things worth addressing before merging:

Blockers / Majors (not covered by CodeRabbit)

  1. jobs/worker.rs:39recover_stale_locks failure silently swallowed with let _ = .... If the DB is unavailable at startup the error is dropped, stale running rows stay running forever, and nothing surfaces to the operator. Log at warn minimum.

  2. jobs/worker.rs:91err.to_string() is stored in last_error and logs. Should be format!("{err:#}") to preserve the full anyhow chain, consistent with the job failed log line three lines above it that already uses {:#}.

  3. source_tree/bucket_seal.rs (~lines 376 and 433) — seal_one_level hardcodes TreeKind::Source in both the SummaryContext and the SummaryNode it constructs. The function is also called for topic trees (through handle_seal, cascade_all_from, flush). Topic-tree summaries will be written with tree_kind='source' in mem_tree_summaries, breaking any query filtering on tree_kind. Use tree.kind (already in scope via the tree: &Tree parameter) instead of the literal. CodeRabbit flagged line 342 but the node construction site further down also needs fixing.

  4. global_tree/seal.rs — The global-tree cascade is entirely synchronous inside handle_digest_daily. A digest job that triggers a weekly+monthly cascade holds the LLM semaphore for 2–3 sequential summariser calls, blocking one of the three worker slots for the full duration. The PR description notes the per-level split is a follow-up, which is fine, but a // Why: comment in handle_digest_daily would prevent a future reader from landing the same workaround twice.

Minors

  1. jobs/types.rs:2 — Module doc says "#TBD" for the issue number; never filled in.

  2. jobs/handlers/mod.rshandle_topic_route returns silently when entity_ids is empty with no log output. A debug! log on the no-op path would make operational diagnosis easier.

  3. src/bin/slack_backfill.rs — Confirm the [[bin]] entry exists in Cargo.toml (or that path auto-discovery covers it). The file itself is a reasonable exception to the "new functionality in a subdomain subdirectory" rule since all module logic lives in composio/providers/slack/.

  4. jobs/scheduler.rs:118next_sleep_duration's with_ymd_and_hms(...).single() fallback of now + 24h is fine because UTC has no DST gaps, but a comment explaining why .single() can't return None here would avoid future confusion.

Looks good

  • CLAUDE.md conventions followed: jobs/ subdirectory pattern, mod.rs is export-only, jsonrpc.rs change is pure bootstrapping, no new src/openhuman/ root-level files.
  • RpcOutcome used correctly in trigger_digest_rpc.
  • The dedupe partial-unique-index design in mem_tree_jobs is correct and well-tested.
  • Atomic enqueue in seal_one_level (parent cascade + topic_route in same tx as seal commit) properly eliminates the crash window.
  • 30-day backfill window aligned with hotness recency decay is the right call.
  • LabelStrategy::UnionFromChildren on global tree avoids extra LLM pass while preserving entity/topic retrieval. LabelStrategy::Empty on topic trees prevents cross-entity contamination — both are correct.

Copy link
Copy Markdown
Contributor

@graycyrus graycyrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline comments with concrete suggestions for the issues flagged in the top-level review.

.get_or_init(|| Arc::new(Notify::new()))
.clone();
let llm_slots = Arc::new(Semaphore::new(3));
let _ = recover_stale_locks(&config);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[major] Silent error discard on recover_stale_locks.

If the DB is unavailable at startup, stale running rows stay locked forever with zero operator signal. This is the kind of silent failure that makes an outage hard to diagnose.

Suggested change
let _ = recover_stale_locks(&config);
if let Err(err) = recover_stale_locks(&config) {
log::warn!("[memory_tree::jobs] recover_stale_locks failed at startup: {err:#}");
}

mark_done(config, &job.id)?;
}
Err(err) => {
let message = err.to_string();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[major] err.to_string() loses the anyhow cause chain.

The log::warn! three lines above uses {err:#} (full chain), but the persisted last_error only stores the top-level message. When someone reads the job table to debug a failure, they lose the root cause.

Suggested change
let message = err.to_string();
let message = format!("{err:#}");

Comment thread src/openhuman/memory/tree/jobs/types.rs Outdated
@@ -0,0 +1,439 @@
//! Job types for the async memory-tree pipeline (#TBD).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] #TBD placeholder — either fill in the issue number or drop the annotation.

Suggested change
//! Job types for the async memory-tree pipeline (#TBD).
//! Job types for the async memory-tree pipeline.

};

let entity_ids = score_store::list_entity_ids_for_node(config, &node_id)?;
if entity_ids.is_empty() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Silent early return when entity_ids is empty.

This is the only handler that returns without logging on the no-op path. When debugging why a chunk produced no topic-tree fan-out, there is zero trace. A debug! log here would make that visible.

Suggested change
if entity_ids.is_empty() {
if entity_ids.is_empty() {
log::debug!("[memory_tree::jobs] topic_route no entities for node_id={node_id} — skipping");
return Ok(());
}

let tomorrow = now.date_naive() + ChronoDuration::days(1);
let next = Utc
.with_ymd_and_hms(tomorrow.year(), tomorrow.month(), tomorrow.day(), 0, 5, 0)
.single()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] .single() always succeeds here because UTC has no DST gaps, but a reader unfamiliar with chrono might wonder if this can fail. A one-liner keeps future maintainers from second-guessing it.

Suggested change
.single()
// UTC has no DST — `single()` always returns `Some` for valid dates.
.single()

Comment thread src/openhuman/memory/tree/global_tree/seal.rs
sanil-23 and others added 2 commits April 28, 2026 17:28
Major
- jobs/worker.rs: log warn on `recover_stale_locks` startup failure
  instead of swallowing with `let _ = ...`. Stale `running` rows are
  now operator-visible at startup.
- jobs/worker.rs: persist full anyhow chain in `last_error` via
  `format!("{err:#}")` so readers of mem_tree_jobs see the root cause,
  matching the `{:#}` log format used three lines above.
- source_tree/bucket_seal.rs: stop hardcoding `TreeKind::Source` in
  `seal_one_level`'s `SummaryContext` and `SummaryNode`. The function
  also runs for topic trees (handle_seal, cascade_all_from, flush) —
  topic-tree summaries were being persisted with `tree_kind='source'`,
  breaking any query filtering on tree_kind. Use `tree.kind`.

Minor
- jobs/types.rs: drop `#TBD` placeholder from module doc.
- jobs/handlers/mod.rs: add `debug!` log on the no-op `entity_ids`
  empty path in `handle_topic_route` for diagnosis.
- jobs/scheduler.rs: comment why `.single()` cannot return `None`
  (UTC has no DST gaps).

Tests
- Add `topic_tree_seal_persists_topic_kind_not_source` regression
  test guarding the bucket_seal `tree.kind` fix.
- 371/371 memory::tree tests pass · 23/23 memory::tree::jobs tests
  pass · cargo fmt clean.

Cargo.lock: bump openhuman version 0.53.3 → 0.53.4 (matches Cargo.toml).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sanil-23
Copy link
Copy Markdown
Contributor Author

Thanks @cyrusgray — addressed in 345d654:

Majors

  • worker.rs:39 — recover_stale_locks now logs at warn on failure
  • worker.rs:91 — last_error persists full anyhow chain via format!("{err:#}")
  • bucket_seal.rs:376/436 — seal_one_level uses tree.kind instead of hardcoded TreeKind::Source in both SummaryContext and SummaryNode. Added regression test topic_tree_seal_persists_topic_kind_not_source to guard the topic-tree path.

Minors

  • types.rs:1 — dropped #TBD
  • handlers/mod.rs — handle_topic_route now debug!-logs the empty entity_ids no-op path
  • scheduler.rs:119 — added comment explaining why .single() can't return None under UTC

Already in place

  • Cargo.toml:12-14 has the [[bin]] name = "slack-backfill" entry — no change needed.

Deferred

  • The global-tree sync-cascade // Why: comment is intentionally not in this commit; tracking it alongside the per-level-job follow-up noted in the PR description.

Quality: cargo fmt clean, 371/371 memory::tree + 23/23 memory::tree::jobs tests pass, 0 new clippy hits on touched files.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants