feat: unified enrichment controller — brain_enrich MCP tool + 3 backends#112
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (10)
📝 WalkthroughWalkthroughAdds end-to-end enrichment infrastructure: paced batch submission and orchestration scripts, launchd daemon config, content-hash deduplication, telemetry events, MCP tool/handler for enrichment, and comprehensive tests for behavior and stats. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Controller as enrichment_controller
participant DB as VectorStore(DB)
participant Gemini as GeminiAPI
participant Telemetry
Client->>Controller: enrich_realtime(limit, since_hours, rate_per_second)
Controller->>Telemetry: emit_enrichment_start(mode, limit, pid, hostname)
Controller->>DB: SELECT candidate chunks (eligible, not short)
DB-->>Controller: candidate list
loop per chunk
Controller->>Controller: _ensure_content_hash_column()
Controller->>Controller: _content_hash(chunk)
Controller->>DB: SELECT WHERE content_hash AND enriched_at IS NOT NULL
alt duplicate
DB-->>Controller: found
Controller->>Controller: result.skipped++
else unique
Controller->>Gemini: request enrichment
Gemini-->>Controller: enrichment response
Controller->>DB: UPDATE chunk with enrichment and content_hash
Controller->>Telemetry: emit_enrichment_complete(...) per run
Controller->>Controller: result.enriched++
end
Controller->>Controller: sleep(1 / rate_per_second)
end
Controller->>Telemetry: emit_enrichment_complete(attempted,enriched,skipped,failed,duration_ms)
Controller-->>Client: return EnrichmentResult{attempted,enriched,skipped,failed,errors}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
| import apsw | ||
| from pathlib import Path | ||
|
|
||
| cp_db = Path.home() / '.local/share/brainlayer/enrichment_checkpoints.db' |
There was a problem hiding this comment.
🟢 Low scripts/backfill_orchestrate.sh:35
Phase 2's inline Python hardcodes the checkpoint database path to ~/.local/share/brainlayer/enrichment_checkpoints.db, while cloud_backfill.py uses get_db_path().with_name("enrichment_checkpoints.db") which respects the BRAINLAYER_DB environment variable. When BRAINLAYER_DB is set to a custom path, Phase 2 queries the wrong database — broken batches at the custom path aren't reset, leaving them incorrectly marked as completed with 0 imports. Pass the resolved path from cloud_backfill.py to Phase 2 instead of hardcoding.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file scripts/backfill_orchestrate.sh around line 35:
Phase 2's inline Python hardcodes the checkpoint database path to `~/.local/share/brainlayer/enrichment_checkpoints.db`, while `cloud_backfill.py` uses `get_db_path().with_name("enrichment_checkpoints.db")` which respects the `BRAINLAYER_DB` environment variable. When `BRAINLAYER_DB` is set to a custom path, Phase 2 queries the wrong database — broken batches at the custom path aren't reset, leaving them incorrectly marked as completed with 0 imports. Pass the resolved path from `cloud_backfill.py` to Phase 2 instead of hardcoding.
| def test_realtime_counts_failed_on_exception(monkeypatch): | ||
| from brainlayer import enrichment_controller as controller | ||
|
|
||
| store = MagicMock() | ||
| store.get_enrichment_candidates.return_value = [_candidate()] | ||
| monkeypatch.setattr(controller, "build_external_prompt", MagicMock(side_effect=RuntimeError("boom"))) | ||
| monkeypatch.setattr(controller, "Sanitizer", SimpleNamespace(from_env=lambda: SimpleNamespace())) | ||
| monkeypatch.setattr(controller.time, "sleep", lambda _: None) | ||
| monkeypatch.setattr(controller, "_get_gemini_client", lambda: _fake_gemini_client()) | ||
|
|
||
| result = controller.enrich_realtime(store, limit=1) | ||
| assert result.failed == 1 | ||
| assert "boom" in result.errors[0] |
There was a problem hiding this comment.
🟢 Low tests/test_enrichment_controller.py:551
test_realtime_counts_failed_on_exception patches build_external_prompt to raise, but the production code calls _is_duplicate_content first and skips the chunk when it returns true. MagicMock returns truthy values by default, so _is_duplicate_content may return true and skip the chunk entirely — the exception never fires and result.failed remains 0 instead of 1. Consider explicitly patching _is_duplicate_content to return False so the test reaches the exception path.
store.get_enrichment_candidates.return_value = [_candidate()]
monkeypatch.setattr(controller, "build_external_prompt", MagicMock(side_effect=RuntimeError("boom")))
monkeypatch.setattr(controller, "Sanitizer", SimpleNamespace(from_env=lambda: SimpleNamespace()))
+ monkeypatch.setattr(controller, "_is_duplicate_content", lambda s, c: False)
monkeypatch.setattr(controller.time, "sleep", lambda _: None)🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file tests/test_enrichment_controller.py around lines 551-563:
`test_realtime_counts_failed_on_exception` patches `build_external_prompt` to raise, but the production code calls `_is_duplicate_content` first and skips the chunk when it returns true. `MagicMock` returns truthy values by default, so `_is_duplicate_content` may return true and skip the chunk entirely — the exception never fires and `result.failed` remains 0 instead of 1. Consider explicitly patching `_is_duplicate_content` to return `False` so the test reaches the exception path.
Evidence trail:
src/brainlayer/enrichment_controller.py lines 299-304 (duplicate check before build_external_prompt), lines 111-127 (_is_duplicate_content implementation using row[0] > 0); tests/test_enrichment_controller.py lines 551-563 (test_realtime_counts_failed_on_exception without _is_duplicate_content patch), lines 365-374 (test_realtime_skips_duplicate_content showing proper _is_duplicate_content patching pattern)
| if STATE_FILE.exists(): | ||
| submitted = json.loads(STATE_FILE.read_text()) |
There was a problem hiding this comment.
🟢 Low scripts/batch_submit_paced.py:28
If STATE_FILE exists but contains invalid JSON (e.g., empty file from an interrupted write on line 83), json.loads raises JSONDecodeError and crashes the script. The write on line 83 uses write_text without atomic rename, so a crash mid-write leaves a truncated file. Consider catching JSONDecodeError and treating a corrupted state file as empty, or use atomic write-then-rename to prevent truncation.
submitted = {}
if STATE_FILE.exists():
- submitted = json.loads(STATE_FILE.read_text())
+ try:
+ submitted = json.loads(STATE_FILE.read_text())
+ except json.JSONDecodeError:
+ submitted = {}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file scripts/batch_submit_paced.py around lines 28-29:
If `STATE_FILE` exists but contains invalid JSON (e.g., empty file from an interrupted write on line 83), `json.loads` raises `JSONDecodeError` and crashes the script. The write on line 83 uses `write_text` without atomic rename, so a crash mid-write leaves a truncated file. Consider catching `JSONDecodeError` and treating a corrupted state file as empty, or use atomic write-then-rename to prevent truncation.
Evidence trail:
scripts/batch_submit_paced.py lines 27-29: `STATE_FILE = Path(__file__).parent / "backfill_data" / ".paced_state.json"` followed by `if STATE_FILE.exists(): submitted = json.loads(STATE_FILE.read_text())` - no JSONDecodeError handling.
scripts/batch_submit_paced.py line 83 (within the batch creation loop): `STATE_FILE.write_text(json.dumps(submitted, indent=2))` - direct write without atomic rename pattern.
Commit: REVIEWED_COMMIT
| def _backfill_content_hashes(store, limit: int = 1000) -> int: | ||
| """Backfill content_hash for chunks that don't have one yet. Returns count updated.""" | ||
| try: | ||
| cursor = store.conn.cursor() | ||
| rows = list( | ||
| cursor.execute( | ||
| "SELECT id, content FROM chunks WHERE content_hash IS NULL LIMIT ?", | ||
| (limit,), | ||
| ) | ||
| ) | ||
| count = 0 | ||
| for chunk_id, content in rows: | ||
| if content: | ||
| h = _content_hash(content) | ||
| cursor.execute("UPDATE chunks SET content_hash = ? WHERE id = ?", (h, chunk_id)) | ||
| count += 1 | ||
| return count | ||
| except Exception: | ||
| return 0 |
There was a problem hiding this comment.
🟡 Medium brainlayer/enrichment_controller.py:144
_backfill_content_hashes performs UPDATE statements but never calls store.conn.commit(), so all changes are silently rolled back when the connection closes. The function returns a count as if updates were persisted, but the database remains unchanged. Consider committing the transaction before returning.
def _backfill_content_hashes(store, limit: int = 1000) -> int:
"""Backfill content_hash for chunks that don't have one yet. Returns count updated."""
try:
cursor = store.conn.cursor()
rows = list(
cursor.execute(
"SELECT id, content FROM chunks WHERE content_hash IS NULL LIMIT ?",
(limit,),
)
)
count = 0
for chunk_id, content in rows:
if content:
h = _content_hash(content)
cursor.execute("UPDATE chunks SET content_hash = ? WHERE id = ?", (h, chunk_id))
count += 1
+ store.conn.commit()
return count
except Exception:
return 0🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/enrichment_controller.py around lines 144-162:
`_backfill_content_hashes` performs UPDATE statements but never calls `store.conn.commit()`, so all changes are silently rolled back when the connection closes. The function returns a count as if updates were persisted, but the database remains unchanged. Consider committing the transaction before returning.
Evidence trail:
src/brainlayer/enrichment_controller.py lines 144-163 (REVIEWED_COMMIT): Function `_backfill_content_hashes` executes `cursor.execute("UPDATE chunks SET content_hash = ? WHERE id = ?")` but has no `store.conn.commit()` call. Function returns `count` implying persistence. git_grep for 'isolation_level|autocommit' returns no results - connection uses default sqlite3 behavior requiring explicit commits.
There was a problem hiding this comment.
Not applicable — VectorStore uses APSW (not sqlite3), which runs in autocommit mode by default. Each UPDATE is committed immediately without an explicit commit() call.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
| def test_enrichment_plist_has_correct_label(): | ||
| import xml.etree.ElementTree as ET | ||
| from pathlib import Path | ||
|
|
||
| plist_path = Path(__file__).parent.parent / "scripts" / "launchd" / "com.brainlayer.enrichment.plist" | ||
| tree = ET.parse(plist_path) | ||
| root = tree.getroot() | ||
| d = root.find("dict") | ||
|
|
||
| # Find the string element right after the Label key | ||
| elements = list(d) | ||
| for i, el in enumerate(elements): | ||
| if el.tag == "key" and el.text == "Label": | ||
| assert elements[i + 1].text == "com.brainlayer.enrichment" | ||
| break |
There was a problem hiding this comment.
🟢 Low tests/test_enrichment_controller.py:844
The for loop in test_enrichment_plist_has_correct_label exits without running any assertions if the plist lacks a Label key, so the test passes silently on malformed input. Add a failure path after the loop to ensure the key is actually found and validated.
- elements = list(d)
- for i, el in enumerate(elements):
- if el.tag == "key" and el.text == "Label":
- assert elements[i + 1].text == "com.brainlayer.enrichment"
- break🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file tests/test_enrichment_controller.py around lines 844-858:
The `for` loop in `test_enrichment_plist_has_correct_label` exits without running any assertions if the plist lacks a `Label` key, so the test passes silently on malformed input. Add a failure path after the loop to ensure the key is actually found and validated.
Evidence trail:
tests/test_enrichment_controller.py lines 844-859 at REVIEWED_COMMIT - The test function `test_enrichment_plist_has_correct_label` contains a for loop with an assertion only inside an `if` block. If the `Label` key is not found, the loop completes without executing any assertions and the test passes silently.
There was a problem hiding this comment.
Actionable comments posted: 16
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@scripts/backfill_orchestrate.sh`:
- Around line 26-62: Phase 2 currently contains one-time fix logic that
unconditionally updates enrichment_checkpoints rows with completed_at <
'2026-03-14T12:22:00'; extract this into a dedicated, idempotent migration
script (e.g., a one-off script that opens the same DB at cp_db, queries
enrichment_checkpoints for rows with status='completed' and completed_at <
'2026-03-14T12:22:00' using the same SQL used in the block, logs explicitly when
zero rows are found and exits without changing state, and when rows exist
performs the UPDATE inside a transaction and logs the exact number updated),
then remove the Phase 2 block from scripts/backfill_orchestrate.sh so the main
orchestrator no longer contains hardcoded historical timestamps. Ensure the
migration uses the same connection setup (apsw.Connection, setbusytimeout),
explicit logging of found vs updated counts, and is safe to re-run (idempotent).
- Around line 7-8: The PYTHON variable is hardcoded to
"$SCRIPT_DIR/../.venv/bin/python3" which may not exist; modify the script to
validate that path and fall back if missing: check existence/readability of
"$SCRIPT_DIR/../.venv/bin/python3" before exporting PYTHON, if missing try
locating a system python via "which python3" or "which python" and assign that
to PYTHON, and if no usable interpreter is found log a clear error and exit;
update any uses of PYTHON (e.g., invoking "$PYTHON" to run
BACKFILL/cloud_backfill.py) to rely on this validated variable.
- Around line 35-36: Replace the hardcoded cp_db path with the shared path
resolver: import and call paths.get_db_path() to obtain the DB path and use that
value when constructing cp_db and passing it to apsw.Connection (currently where
cp_db and apsw.Connection(...) are used); if the script uses an inline Python
heredoc where __file__ is unavailable, extract the DB-path resolution into a
small Python module or a callable that imports paths.get_db_path() so the script
can invoke it instead of hardcoding
"~/.local/share/brainlayer/enrichment_checkpoints.db".
In `@scripts/batch_submit_paced.py`:
- Line 23: The MODEL constant is hardcoded to "gemini-2.5-flash-lite" in
scripts/batch_submit_paced.py; make it configurable to match cloud_backfill.py's
default and allow overrides. Add a CLI argument (e.g., --model) or read an
environment variable (e.g., os.environ.get("MODEL")) and set MODEL from that
with a default of "gemini-2.5-flash" (or reuse cloud_backfill.py's default),
then replace the hardcoded MODEL usage with this configurable value; update any
function that references MODEL (e.g., the batch submit entrypoint) to use the
new configurable variable.
- Around line 82-83: The STATE_FILE.write_text call is not atomic and can
corrupt state if the process crashes; modify the code that updates submitted
(the dict named submitted and the STATE_FILE variable) to perform an atomic
write: serialize submitted to a string (json.dumps with indent) then write that
to a temporary file in the same directory (e.g., STATE_FILE.with_suffix(".tmp")
or similar) and atomically replace the original via os.replace or Path.replace;
ensure exceptions are handled so the temp file is cleaned up if needed and the
rename only occurs after the write completes successfully.
- Around line 63-66: The current except Exception as e block in
scripts/batch_submit_paced.py treats all failures as HTTP 429 rate-limits;
update it to inspect the exception (e) and only apply the 429 retry/backoff
logic when the error indicates an HTTP 429 response (e.g., check
e.response.status_code or isinstance(e, HTTPStatusError)/requests.HTTPError with
status==429); for other exceptions (authentication 401/403, client errors 4xx
other than 429, or non-HTTP exceptions like network/validation errors) log the
actual error (include e) and re-raise or fail immediately instead of sleeping
and retrying; keep the existing wait/backoff calculation (wait = min(30 *
(attempt + 1), 300)) and MAX_RETRIES usage for the 429 branch, and ensure the
printed message includes the real status or exception text rather than always
"429".
- Around line 21-22: Replace the fragile sys.argv lookups for DELAY and
MAX_RETRIES with proper argparse-based parsing: create an ArgumentParser and add
arguments "--delay" and "--max-retries" with type=int and defaults 45 and 10
respectively, then assign parsed values to DELAY and MAX_RETRIES; also add basic
validation (e.g., argparse's type check or a post-parse check to ensure
non-negative integers) so missing values or out-of-range inputs don't raise
IndexError or produce invalid state.
- Around line 91-98: The non-rate-limit error branch in
scripts/batch_submit_paced.py currently does "failed += 1; break" which falls
through to the pacing sleep; change that break to "continue" so that after
recording the failure (failed) the code immediately proceeds to the next file
and skips the pacing delay; keep the rate-limit handling path untouched so
rate-limit errors still fall through to the pacing/sleep logic; reference the
loop that uses MAX_RETRIES and the failed counter and the pacing sleep block.
- Around line 25-29: batch_submit_paced.py currently only reads STATE_FILE
(.paced_state.json) into the submitted dict so it can resubmit files already
recorded by cloud_backfill.py; update batch_submit_paced.py to first consult the
shared enrichment_checkpoints.db used by cloud_backfill.py for each candidate
file (or batch id) before submitting, and treat any DB-recorded file as
already-submitted (i.e., skip and add to submitted if not present); additionally
add a simple mutual-exclusion lock (create and atomically acquire a lock file at
start, release on exit) around the main submit loop to prevent concurrent runs
with cloud_backfill.py if the DB check cannot be used, referencing STATE_FILE,
submitted, .paced_state.json, and enrichment_checkpoints.db in your changes so
both checks/locking prevent double submission.
In `@scripts/launchd/com.brainlayer.enrichment.plist`:
- Around line 32-45: Update the EnvironmentVariables dict in the launchd plist
to include Groq as the primary enrichment backend: add keys GROQ_API_KEY and
BRAINLAYER_ENRICH_BACKEND with values __GROQ_API_KEY__ and "groq" respectively,
so the service prefers Groq and still retains GOOGLE_API_KEY for Gemini
fallback; ensure the new keys sit alongside the existing EnvironmentVariables
entries (e.g., PATH, PYTHONUNBUFFERED, BRAINLAYER_STALL_TIMEOUT, GOOGLE_API_KEY)
and confirm install.sh (which already fetches GROQ_API_KEY) will substitute the
placeholder.
In `@scripts/launchd/install.sh`:
- Line 97: Update the usage echo in the install.sh script so it lists both
legacy and current options: include "enrich" alongside "enrichment" in the
string emitted by echo "Usage: $0 [index|enrichment|checkpoint|all|remove]";
modify that echo invocation in scripts/launchd/install.sh (the usage echo) to
reflect both "enrich" and "enrichment" so the help text matches the accepted
legacy option.
In `@src/brainlayer/enrichment_controller.py`:
- Around line 144-162: The _backfill_content_hashes function currently swallows
all exceptions and doesn't retry on SQLITE_BUSY; update it to retry write
operations when sqlite raises sqlite3.OperationalError indicating a
locked/SQLITE_BUSY state: wrap the UPDATE (and optionally the SELECT→UPDATE
loop) in a retry loop that catches sqlite3.OperationalError, checks for
"database is locked" / SQLITE_BUSY, sleeps with a small backoff (e.g.,
exponential or fixed short delay) and retries a limited number of times, then
re-raises or return 0 if retries exhausted; ensure you reference the existing
cursor from store.conn.cursor(), the UPDATE statement ("UPDATE chunks SET
content_hash = ? WHERE id = ?"), and preserve the use of _content_hash for
computing h. Make retries only for the write (cursor.execute UPDATE) to avoid
masking other errors.
In `@src/brainlayer/mcp/__init__.py`:
- Line 54: The import for _brain_enrich is out of alphabetical order in the
module import block; open the __init__ import block and move the line "from
.enrich_handler import _brain_enrich" so it is alphabetically sorted among the
other relative imports (keep it with other local imports and preserve grouping),
then re-run ruff (or run `ruff check --fix src/`) to verify the import block is
formatted correctly.
In `@src/brainlayer/mcp/enrich_handler.py`:
- Line 40: Replace the deprecated call to asyncio.get_event_loop() in
enrich_handler (the line setting loop = asyncio.get_event_loop()) with
asyncio.get_running_loop(), ensuring this is invoked from within a running
coroutine similar to other MCP handlers (see tags_handler.get_running_loop
usage); update any surrounding logic in the function or method that assumes a
potentially non-running loop so it runs inside an async context and remove
reliance on deprecated behavior.
In `@src/brainlayer/telemetry.py`:
- Around line 143-195: The three public helpers emit_enrichment_start,
emit_enrichment_complete, and emit_enrichment_error in telemetry.py are unused
and duplicate private functions in enrichment_controller; either delete these
public functions or change enrichment_controller to call them: replace calls to
_emit_enrichment_start/_emit_enrichment_complete/_emit_enrichment_error with
emit_enrichment_start/emit_enrichment_complete/emit_enrichment_error, adjust
call sites to match signatures (mode, limit/pid for start; mode,
attempted,enriched,skipped,failed,duration_ms,error_count for complete;
mode,chunk_id,error for error), add the telemetry import(s) in
enrichment_controller, and remove the corresponding private functions from
enrichment_controller if you choose the refactor path so there is a single
shared implementation.
In `@tests/eval_mcp_brainlayer.json`:
- Line 13: The JSON key "source_of_truth" in tests/eval_mcp_brainlayer.json is
set to a hardcoded absolute developer path; replace it with a relative path
(e.g., relative to the repo root or the test runner working directory) or remove
the "source_of_truth" field entirely if not required by the eval runner, and
ensure any code that reads this value can resolve relative paths or fall back to
a configurable environment variable.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: b1293abe-f128-4048-91ee-85d4537e1e18
📒 Files selected for processing (10)
scripts/backfill_orchestrate.shscripts/batch_submit_paced.pyscripts/launchd/com.brainlayer.enrichment.plistscripts/launchd/install.shsrc/brainlayer/enrichment_controller.pysrc/brainlayer/mcp/__init__.pysrc/brainlayer/mcp/enrich_handler.pysrc/brainlayer/telemetry.pytests/eval_mcp_brainlayer.jsontests/test_enrichment_controller.py
📜 Review details
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work
Run pytest before claiming behavior changed safely; current test suite has 929 tests
**/*.py: All scripts and CLI must usepaths.py:get_db_path()for database path resolution instead of hardcoding paths
Preserve verbatim content for message types:ai_code,stack_trace,user_messageduring classification and chunking
Skipnoisecontent entirely; summarizebuild_log; extract structure only fromdir_listingduring chunking
Use AST-aware chunking via tree-sitter; never split stack traces; mask large tool output
Implement retry logic onSQLITE_BUSYerrors; each worker must use its own database connection
Override enrichment backend viaBRAINLAYER_ENRICH_BACKENDenvironment variable (valid values:ollama,mlx,groq); default to Groq
Configure enrichment rate viaBRAINLAYER_ENRICH_RATEenvironment variable (default: 0.2 = 12 RPM)
Checkpoint WAL before and after bulk database operations:PRAGMA wal_checkpoint(FULL)
Drop FTS triggers before bulk deletes fromchunkstable; recreate after operation to avoid performance degradation
Batch deletes in 5-10K chunk sizes with WAL checkpoint every 3 batches
Default search queries must exclude lifecycle-managed chunks; useinclude_archived=Trueparameter to show history
Lint and format code withruff check src/ && ruff format src/
Session dedup coordination: SessionStart writes injected chunk_ids to/tmp/brainlayer_session_{id}.json; UserPromptSubmit skips already-injected chunks
Skip auto-search for prompts containing 'handoff' or 'session-handoff' keywords
Files:
src/brainlayer/mcp/__init__.pyscripts/batch_submit_paced.pysrc/brainlayer/mcp/enrich_handler.pysrc/brainlayer/telemetry.pytests/test_enrichment_controller.pysrc/brainlayer/enrichment_controller.py
**/*test*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use
pytestfor testing
Files:
tests/test_enrichment_controller.py
🧠 Learnings (7)
📓 Common learnings
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-14T02:20:54.656Z
Learning: Request codex review, cursor review, and bugbot review for BrainLayer PRs
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-26T15:46:16.139Z
Learning: Use Groq as primary enrichment backend (configured in launchd plist); fall back to Gemini, then Ollama as offline last-resort
📚 Learning: 2026-03-17T01:04:22.497Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-03-17T01:04:22.497Z
Learning: Applies to src/brainlayer/mcp/**/*.py and brain-bar/Sources/BrainBar/MCPRouter.swift: The 8 required MCP tools are `brain_search`, `brain_store`, `brain_recall`, `brain_entity`, `brain_expand`, `brain_update`, `brain_digest`, `brain_tags`. `brain_tags` is the 8th tool, replacing `brain_get_person`, as defined in the Phase B spec merged in PR `#72`. The Python MCP server already implements `brain_tags`. Legacy `brainlayer_*` aliases must be maintained for backward compatibility.
Applied to files:
src/brainlayer/mcp/__init__.pysrc/brainlayer/mcp/enrich_handler.pytests/eval_mcp_brainlayer.json
📚 Learning: 2026-03-26T15:46:16.139Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-26T15:46:16.139Z
Learning: Use Groq as primary enrichment backend (configured in launchd plist); fall back to Gemini, then Ollama as offline last-resort
Applied to files:
scripts/launchd/com.brainlayer.enrichment.plist
📚 Learning: 2026-03-26T15:46:16.139Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-26T15:46:16.139Z
Learning: Applies to **/*.py : Configure enrichment rate via `BRAINLAYER_ENRICH_RATE` environment variable (default: 0.2 = 12 RPM)
Applied to files:
scripts/launchd/com.brainlayer.enrichment.plistsrc/brainlayer/mcp/enrich_handler.pysrc/brainlayer/telemetry.pysrc/brainlayer/enrichment_controller.py
📚 Learning: 2026-03-22T15:55:22.017Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 100
File: src/brainlayer/enrichment_controller.py:175-199
Timestamp: 2026-03-22T15:55:22.017Z
Learning: In `src/brainlayer/enrichment_controller.py`, the `parallel` parameter in `enrich_local()` is intentionally kept in the function signature (currently unused, suppressed with `# noqa: ARG001`) for API stability. Parallel local enrichment via a thread pool or process pool is planned for a future iteration. Do not flag this as dead code requiring removal.
Applied to files:
src/brainlayer/mcp/enrich_handler.pysrc/brainlayer/telemetry.pysrc/brainlayer/enrichment_controller.py
📚 Learning: 2026-03-26T15:46:16.139Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-26T15:46:16.139Z
Learning: Applies to **/*.py : Override enrichment backend via `BRAINLAYER_ENRICH_BACKEND` environment variable (valid values: `ollama`, `mlx`, `groq`); default to Groq
Applied to files:
src/brainlayer/mcp/enrich_handler.pysrc/brainlayer/enrichment_controller.py
📚 Learning: 2026-03-26T15:46:16.139Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-26T15:46:16.139Z
Learning: Applies to **/*.py : All scripts and CLI must use `paths.py:get_db_path()` for database path resolution instead of hardcoding paths
Applied to files:
scripts/backfill_orchestrate.sh
🪛 GitHub Actions: CI
src/brainlayer/mcp/__init__.py
[error] 9-56: ruff check failed (I001): Import block is un-sorted or un-formatted. Run ruff check --fix (or organize imports) to resolve.
🔇 Additional comments (16)
scripts/launchd/install.sh (1)
83-89: LGTM!The migration logic correctly installs the new
enrichmentplist and cleans up the legacyenrichplist in theallcase. The error suppression ensures graceful handling when the old plist doesn't exist.src/brainlayer/mcp/__init__.py (2)
897-972: LGTM!The
brain_enrichtool registration is well-structured with comprehensive documentation explaining the three modes, features, and usage. The input schema correctly captures all parameters with appropriate types, enums, and defaults.
1179-1189: LGTM!The routing correctly passes all parameters to
_brain_enrichwith consistent defaults. Not wrapping with_with_timeoutis appropriate since enrichment operations can be long-running.scripts/launchd/com.brainlayer.enrichment.plist (1)
1-56: LGTM on plist structure.The LaunchAgent configuration is well-structured with appropriate settings:
- Hourly runs with
StartInterval=3600- Low priority via
Nice=15andProcessType=Background- Proper log paths and environment setup
src/brainlayer/telemetry.py (1)
185-195: LGTM on error truncation.The
emit_enrichment_errorhelper correctly truncates error messages to 300 characters (line 193), matching the pattern used for watcher errors (line 135).src/brainlayer/mcp/enrich_handler.py (2)
97-137: LGTM!The
_enrich_statsfunction correctly:
- Uses
store._read_cursor()for thread-safe read access- Handles division by zero at line 130
- Returns structured JSON with all relevant enrichment metrics
- Wraps everything in try/except with error result
78-90: LGTM!The result serialization correctly captures all enrichment counters and caps the error list to 10 items to prevent oversized responses.
tests/eval_mcp_brainlayer.json (2)
289-1334: LGTM on test case coverage!Comprehensive test cases covering:
- Happy path scenarios with semantic relevance assertions
- Error handling for missing/invalid parameters
- Latency budgets with p50/p95 thresholds
- Lock contention and fault injection scenarios
- Store-then-search roundtrip validation
- brain_digest exclusivity enforcement
5-11: 🧹 Nitpick | 🔵 TrivialConsider adding
brain_enrichto target_tools.The
target_toolslist doesn't includebrain_enrich, even though this PR introduces the tool. If this eval suite should cover the new enrichment functionality, add it to the list."target_tools": [ "brain_search", "brain_store", "brain_recall", "brain_expand", - "brain_digest" + "brain_digest", + "brain_enrich" ],⛔ Skipped due to learnings
Learnt from: EtanHey Repo: EtanHey/brainlayer PR: 0 File: :0-0 Timestamp: 2026-03-17T01:04:22.497Z Learning: Applies to src/brainlayer/mcp/**/*.py and brain-bar/Sources/BrainBar/MCPRouter.swift: The 8 required MCP tools are `brain_search`, `brain_store`, `brain_recall`, `brain_entity`, `brain_expand`, `brain_update`, `brain_digest`, `brain_tags`. `brain_tags` is the 8th tool, replacing `brain_get_person`, as defined in the Phase B spec merged in PR `#72`. The Python MCP server already implements `brain_tags`. Legacy `brainlayer_*` aliases must be maintained for backward compatibility.Learnt from: CR Repo: EtanHey/brainlayer PR: 0 File: AGENTS.md:0-0 Timestamp: 2026-03-14T02:20:54.656Z Learning: Treat retrieval correctness, write safety, and MCP stability as critical-path concerns in BrainLayer reviewsLearnt from: CR Repo: EtanHey/brainlayer PR: 0 File: CLAUDE.md:0-0 Timestamp: 2026-03-26T15:46:16.139Z Learning: BrainBar Swift daemon tools `brain_update`, `brain_expand`, `brain_tags` are STUB implementations returning fake success and saving nothing; do not rely on these toolstests/test_enrichment_controller.py (2)
1-8: LGTM!Excellent test coverage with 58 tests covering:
- All three enrichment backends (realtime/batch/local)
- Content-hash deduplication primitives
- Retry logic and rate limiting
- Telemetry emission
- MCP handler integration
- LaunchAgent plist validation
- Error handling and edge cases
844-879: LGTM on plist validation tests.The plist validation tests verify the LaunchAgent configuration correctness at test time, catching misconfigurations before deployment. Using
__file__parent navigation is acceptable for test fixtures.src/brainlayer/enrichment_controller.py (4)
33-38: LGTM on rate limiting configuration.Rate limits are correctly configured with environment variable overrides. The default
BRAINLAYER_ENRICH_RATE=0.2(12 RPM) aligns with coding guidelines.
201-210: LGTM on content hash persistence.Setting the content_hash after successful enrichment ensures future dedup works correctly. The silent exception handling is appropriate since this is a non-critical optimization.
269-333: LGTM on realtime enrichment updates.The function now includes:
- Proper rate limiting with env var override
- Telemetry bookends (start/complete/error)
- Content-hash deduplication to skip already-enriched content
- Duration tracking for observability
359-399: LGTM on local enrichment updates.Consistent with realtime mode: content-hash dedup, telemetry emission, and duration tracking are properly integrated.
scripts/backfill_orchestrate.sh (1)
1-4: LGTM!Good use of
set -euo pipefailfor strict error handling. Thetee -alogging pattern preserves both console output and log file. Withpipefail, the script correctly exits if any phase fails.Also applies to: 17-24
| PYTHON="$SCRIPT_DIR/../.venv/bin/python3" | ||
| BACKFILL="$SCRIPT_DIR/cloud_backfill.py" |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Hardcoded virtual environment path may not exist.
The script assumes Python is at ../.venv/bin/python3. Consider adding a fallback or validation:
♻️ Proposed validation
PYTHON="$SCRIPT_DIR/../.venv/bin/python3"
+if [[ ! -x "$PYTHON" ]]; then
+ echo "ERROR: Python not found at $PYTHON"
+ echo "Ensure virtual environment is set up: python3 -m venv .venv"
+ exit 1
+fi
BACKFILL="$SCRIPT_DIR/cloud_backfill.py"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| PYTHON="$SCRIPT_DIR/../.venv/bin/python3" | |
| BACKFILL="$SCRIPT_DIR/cloud_backfill.py" | |
| PYTHON="$SCRIPT_DIR/../.venv/bin/python3" | |
| if [[ ! -x "$PYTHON" ]]; then | |
| echo "ERROR: Python not found at $PYTHON" | |
| echo "Ensure virtual environment is set up: python3 -m venv .venv" | |
| exit 1 | |
| fi | |
| BACKFILL="$SCRIPT_DIR/cloud_backfill.py" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/backfill_orchestrate.sh` around lines 7 - 8, The PYTHON variable is
hardcoded to "$SCRIPT_DIR/../.venv/bin/python3" which may not exist; modify the
script to validate that path and fall back if missing: check
existence/readability of "$SCRIPT_DIR/../.venv/bin/python3" before exporting
PYTHON, if missing try locating a system python via "which python3" or "which
python" and assign that to PYTHON, and if no usable interpreter is found log a
clear error and exit; update any uses of PYTHON (e.g., invoking "$PYTHON" to run
BACKFILL/cloud_backfill.py) to rely on this validated variable.
| # PHASE 2: Fix "completed" batches that had 0 imports (broken download bug) | ||
| # The broken run completed all its batches at 2026-03-14T12:21 UTC. | ||
| # Current (correct) run completes batches after 2026-03-14T12:22 UTC. | ||
| log "" | ||
| log "Phase 2: Checking for incorrectly-completed batches (0 imports due to broken download)..." | ||
| "$PYTHON" -u - <<'PYEOF' 2>&1 | tee -a "$LOG" | ||
| import apsw | ||
| from pathlib import Path | ||
|
|
||
| cp_db = Path.home() / '.local/share/brainlayer/enrichment_checkpoints.db' | ||
| conn = apsw.Connection(str(cp_db)) | ||
| conn.setbusytimeout(10000) | ||
|
|
||
| # Find completed batches from the broken 3rd run (completed at 12:21 UTC) | ||
| # These had 0 imports due to Files.download(name=) bug that has since been fixed | ||
| rows = list(conn.cursor().execute(""" | ||
| SELECT batch_id, chunk_count, completed_at | ||
| FROM enrichment_checkpoints | ||
| WHERE status = 'completed' | ||
| AND completed_at < '2026-03-14T12:22:00' | ||
| """)) | ||
|
|
||
| print(f"Found {len(rows)} broken 'completed' batches (before fix at 12:22 UTC)") | ||
| if rows: | ||
| # Reset them to 'submitted' so --resume will re-process them | ||
| conn.cursor().execute(""" | ||
| UPDATE enrichment_checkpoints | ||
| SET status = 'submitted', completed_at = NULL | ||
| WHERE status = 'completed' | ||
| AND completed_at < '2026-03-14T12:22:00' | ||
| """) | ||
| print(f"Reset {len(rows)} batches to 'submitted'") | ||
| else: | ||
| print("No broken batches found - all completed batches imported correctly") | ||
|
|
||
| conn.close() | ||
| PYEOF |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Phase 2 contains one-time fix logic with hardcoded timestamp.
This phase resets batches completed before 2026-03-14T12:22:00 to address a historical bug. Once executed successfully, this logic becomes dead code. Consider:
- Adding a guard to skip if no matching rows exist (already done, but could log more explicitly)
- Removing this phase after the fix is confirmed applied
- Moving to a separate migration script
Would you like me to create a separate one-time migration script that can be removed after execution?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/backfill_orchestrate.sh` around lines 26 - 62, Phase 2 currently
contains one-time fix logic that unconditionally updates enrichment_checkpoints
rows with completed_at < '2026-03-14T12:22:00'; extract this into a dedicated,
idempotent migration script (e.g., a one-off script that opens the same DB at
cp_db, queries enrichment_checkpoints for rows with status='completed' and
completed_at < '2026-03-14T12:22:00' using the same SQL used in the block, logs
explicitly when zero rows are found and exits without changing state, and when
rows exist performs the UPDATE inside a transaction and logs the exact number
updated), then remove the Phase 2 block from scripts/backfill_orchestrate.sh so
the main orchestrator no longer contains hardcoded historical timestamps. Ensure
the migration uses the same connection setup (apsw.Connection, setbusytimeout),
explicit logging of found vs updated counts, and is safe to re-run (idempotent).
| cp_db = Path.home() / '.local/share/brainlayer/enrichment_checkpoints.db' | ||
| conn = apsw.Connection(str(cp_db)) |
There was a problem hiding this comment.
Hardcoded database path violates coding guidelines.
Line 35 hardcodes the path ~/.local/share/brainlayer/enrichment_checkpoints.db instead of using paths.py:get_db_path(). As per coding guidelines: "All scripts and CLI must use paths.py:get_db_path() for database path resolution instead of hardcoding paths."
🐛 Proposed fix to use paths.py
"$PYTHON" -u - <<'PYEOF' 2>&1 | tee -a "$LOG"
import apsw
from pathlib import Path
+import sys
+sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
+from brainlayer.paths import get_db_path
-cp_db = Path.home() / '.local/share/brainlayer/enrichment_checkpoints.db'
+# enrichment_checkpoints.db is in the same directory as the main DB
+cp_db = get_db_path().parent / 'enrichment_checkpoints.db'
conn = apsw.Connection(str(cp_db))Note: The inline Python heredoc may need adjustment since __file__ isn't available. Consider extracting this logic to a separate Python module.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/backfill_orchestrate.sh` around lines 35 - 36, Replace the hardcoded
cp_db path with the shared path resolver: import and call paths.get_db_path() to
obtain the DB path and use that value when constructing cp_db and passing it to
apsw.Connection (currently where cp_db and apsw.Connection(...) are used); if
the script uses an inline Python heredoc where __file__ is unavailable, extract
the DB-path resolution into a small Python module or a callable that imports
paths.get_db_path() so the script can invoke it instead of hardcoding
"~/.local/share/brainlayer/enrichment_checkpoints.db".
| DELAY = int(sys.argv[sys.argv.index("--delay") + 1]) if "--delay" in sys.argv else 45 | ||
| MAX_RETRIES = int(sys.argv[sys.argv.index("--max-retries") + 1]) if "--max-retries" in sys.argv else 10 |
There was a problem hiding this comment.
CLI argument parsing lacks bounds checking.
If --delay or --max-retries is provided without a value (e.g., as the last argument), sys.argv.index(...) + 1 will raise IndexError. Consider using argparse for robust CLI handling.
♻️ Proposed fix using argparse
+import argparse
import glob, json, os, sys, time
from pathlib import Path
...
-DELAY = int(sys.argv[sys.argv.index("--delay") + 1]) if "--delay" in sys.argv else 45
-MAX_RETRIES = int(sys.argv[sys.argv.index("--max-retries") + 1]) if "--max-retries" in sys.argv else 10
+parser = argparse.ArgumentParser(description="Paced batch submission for Gemini")
+parser.add_argument("--delay", type=int, default=45, help="Delay between submissions (seconds)")
+parser.add_argument("--max-retries", type=int, default=10, help="Max retries per batch")
+args = parser.parse_args()
+DELAY = args.delay
+MAX_RETRIES = args.max_retries🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/batch_submit_paced.py` around lines 21 - 22, Replace the fragile
sys.argv lookups for DELAY and MAX_RETRIES with proper argparse-based parsing:
create an ArgumentParser and add arguments "--delay" and "--max-retries" with
type=int and defaults 45 and 10 respectively, then assign parsed values to DELAY
and MAX_RETRIES; also add basic validation (e.g., argparse's type check or a
post-parse check to ensure non-negative integers) so missing values or
out-of-range inputs don't raise IndexError or produce invalid state.
|
|
||
| DELAY = int(sys.argv[sys.argv.index("--delay") + 1]) if "--delay" in sys.argv else 45 | ||
| MAX_RETRIES = int(sys.argv[sys.argv.index("--max-retries") + 1]) if "--max-retries" in sys.argv else 10 | ||
| MODEL = "gemini-2.5-flash-lite" |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Model differs from cloud_backfill.py and is not configurable.
This script hardcodes gemini-2.5-flash-lite while cloud_backfill.py defaults to gemini-2.5-flash (per context snippet 3, line 1070). Consider making the model configurable via CLI argument or environment variable for consistency.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/batch_submit_paced.py` at line 23, The MODEL constant is hardcoded to
"gemini-2.5-flash-lite" in scripts/batch_submit_paced.py; make it configurable
to match cloud_backfill.py's default and allow overrides. Add a CLI argument
(e.g., --model) or read an environment variable (e.g., os.environ.get("MODEL"))
and set MODEL from that with a default of "gemini-2.5-flash" (or reuse
cloud_backfill.py's default), then replace the hardcoded MODEL usage with this
configurable value; update any function that references MODEL (e.g., the batch
submit entrypoint) to use the new configurable variable.
| def _backfill_content_hashes(store, limit: int = 1000) -> int: | ||
| """Backfill content_hash for chunks that don't have one yet. Returns count updated.""" | ||
| try: | ||
| cursor = store.conn.cursor() | ||
| rows = list( | ||
| cursor.execute( | ||
| "SELECT id, content FROM chunks WHERE content_hash IS NULL LIMIT ?", | ||
| (limit,), | ||
| ) | ||
| ) | ||
| count = 0 | ||
| for chunk_id, content in rows: | ||
| if content: | ||
| h = _content_hash(content) | ||
| cursor.execute("UPDATE chunks SET content_hash = ? WHERE id = ?", (h, chunk_id)) | ||
| count += 1 | ||
| return count | ||
| except Exception: | ||
| return 0 |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider adding retry logic for _backfill_content_hashes.
This function performs batch UPDATE operations but doesn't handle SQLITE_BUSY errors. Per coding guidelines, implement retry logic on SQLITE_BUSY errors for write operations. While the function is non-critical (returns 0 on error), silent failures could leave content_hash unpopulated.
Suggested enhancement
def _backfill_content_hashes(store, limit: int = 1000) -> int:
"""Backfill content_hash for chunks that don't have one yet. Returns count updated."""
+ import sqlite3
try:
cursor = store.conn.cursor()
rows = list(
cursor.execute(
"SELECT id, content FROM chunks WHERE content_hash IS NULL LIMIT ?",
(limit,),
)
)
count = 0
for chunk_id, content in rows:
if content:
h = _content_hash(content)
- cursor.execute("UPDATE chunks SET content_hash = ? WHERE id = ?", (h, chunk_id))
+ for attempt in range(3):
+ try:
+ cursor.execute("UPDATE chunks SET content_hash = ? WHERE id = ?", (h, chunk_id))
+ break
+ except sqlite3.OperationalError as e:
+ if "database is locked" in str(e) and attempt < 2:
+ time.sleep(0.1 * (attempt + 1))
+ continue
+ raise
count += 1
return count
except Exception:
return 0As per coding guidelines: "Implement retry logic on SQLITE_BUSY errors"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/enrichment_controller.py` around lines 144 - 162, The
_backfill_content_hashes function currently swallows all exceptions and doesn't
retry on SQLITE_BUSY; update it to retry write operations when sqlite raises
sqlite3.OperationalError indicating a locked/SQLITE_BUSY state: wrap the UPDATE
(and optionally the SELECT→UPDATE loop) in a retry loop that catches
sqlite3.OperationalError, checks for "database is locked" / SQLITE_BUSY, sleeps
with a small backoff (e.g., exponential or fixed short delay) and retries a
limited number of times, then re-raises or return 0 if retries exhausted; ensure
you reference the existing cursor from store.conn.cursor(), the UPDATE statement
("UPDATE chunks SET content_hash = ? WHERE id = ?"), and preserve the use of
_content_hash for computing h. Make retries only for the write (cursor.execute
UPDATE) to avoid masking other errors.
| # ── Enrichment-specific helpers ──────────────────────────────────────────────── | ||
|
|
||
|
|
||
| def emit_enrichment_start(mode: str, limit: int, pid: int | None = None) -> bool: | ||
| """Emit enrichment run start event.""" | ||
| return emit( | ||
| _DATASET_ENRICHMENT, | ||
| { | ||
| "_type": "start", | ||
| "mode": mode, | ||
| "limit": limit, | ||
| "pid": pid or os.getpid(), | ||
| "hostname": os.uname().nodename, | ||
| }, | ||
| ) | ||
|
|
||
|
|
||
| def emit_enrichment_complete( | ||
| mode: str, | ||
| attempted: int, | ||
| enriched: int, | ||
| skipped: int, | ||
| failed: int, | ||
| duration_ms: float, | ||
| error_count: int = 0, | ||
| ) -> bool: | ||
| """Emit enrichment run completion event.""" | ||
| return emit( | ||
| _DATASET_ENRICHMENT, | ||
| { | ||
| "_type": "complete", | ||
| "mode": mode, | ||
| "attempted": attempted, | ||
| "enriched": enriched, | ||
| "skipped": skipped, | ||
| "failed": failed, | ||
| "duration_ms": round(duration_ms, 1), | ||
| "error_count": error_count, | ||
| }, | ||
| ) | ||
|
|
||
|
|
||
| def emit_enrichment_error(mode: str, chunk_id: str, error: str) -> bool: | ||
| """Emit per-chunk enrichment error event.""" | ||
| return emit( | ||
| _DATASET_ENRICHMENT, | ||
| { | ||
| "_type": "error", | ||
| "mode": mode, | ||
| "chunk_id": chunk_id, | ||
| "error": error[:300], | ||
| }, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if the public telemetry enrichment helpers are used anywhere
rg -n "emit_enrichment_start|emit_enrichment_complete|emit_enrichment_error" --type=py -g '!*test*' | grep -v "^src/brainlayer/telemetry.py" | grep -v "def emit_enrichment"Repository: EtanHey/brainlayer
Length of output: 1531
🏁 Script executed:
#!/bin/bash
# Check for imports of the public enrichment telemetry helpers
rg -n "from.*telemetry.*import|import.*telemetry" --type=py | head -20Repository: EtanHey/brainlayer
Length of output: 1741
🏁 Script executed:
#!/bin/bash
# Check if public helpers are used in test files
rg -n "emit_enrichment_start|emit_enrichment_complete|emit_enrichment_error" --type=py -g '*test*'Repository: EtanHey/brainlayer
Length of output: 1126
🏁 Script executed:
#!/bin/bash
# Get the actual implementation of the private helpers in enrichment_controller
sed -n '234,270p' src/brainlayer/enrichment_controller.pyRepository: EtanHey/brainlayer
Length of output: 1070
🏁 Script executed:
#!/bin/bash
# Get the public implementations from telemetry.py to compare
sed -n '143,195p' src/brainlayer/telemetry.pyRepository: EtanHey/brainlayer
Length of output: 1416
🏁 Script executed:
#!/bin/bash
# Check if public enrichment helpers are exported in __all__ or documented
rg -n "__all__|emit_enrichment" src/brainlayer/telemetry.py | head -30Repository: EtanHey/brainlayer
Length of output: 258
🏁 Script executed:
#!/bin/bash
# Check for any external usage of public helpers in the entire repo
rg "from brainlayer.telemetry import.*emit_enrichment|telemetry.emit_enrichment" --type=pyRepository: EtanHey/brainlayer
Length of output: 44
Remove unused public enrichment telemetry helpers or refactor controller to use them.
The public emit_enrichment_start, emit_enrichment_complete, and emit_enrichment_error functions are not used anywhere in the codebase. The enrichment_controller.py defines and uses its own private versions (_emit_enrichment_start, _emit_enrichment_complete, _emit_enrichment_error) instead. These public helpers are not exported in __all__ and have no external callers.
Either remove the public helpers from telemetry.py, or refactor the controller to use the public API instead of maintaining duplicate private helpers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/telemetry.py` around lines 143 - 195, The three public helpers
emit_enrichment_start, emit_enrichment_complete, and emit_enrichment_error in
telemetry.py are unused and duplicate private functions in
enrichment_controller; either delete these public functions or change
enrichment_controller to call them: replace calls to
_emit_enrichment_start/_emit_enrichment_complete/_emit_enrichment_error with
emit_enrichment_start/emit_enrichment_complete/emit_enrichment_error, adjust
call sites to match signatures (mode, limit/pid for start; mode,
attempted,enriched,skipped,failed,duration_ms,error_count for complete;
mode,chunk_id,error for error), add the telemetry import(s) in
enrichment_controller, and remove the corresponding private functions from
enrichment_controller if you choose the refactor path so there is a single
shared implementation.
| "brain_digest" | ||
| ], | ||
| "runner": "mcp_call", | ||
| "source_of_truth": "/Users/etanheyman/Gits/orchestrator/docs.local/plans/ecosystem-execution/phase-1-eval-foundation/findings.md", |
There was a problem hiding this comment.
Hardcoded local path won't work in CI or on other machines.
The source_of_truth path is an absolute path specific to one developer's machine. Consider using a relative path or removing this field if it's not essential for the eval runner.
- "source_of_truth": "/Users/etanheyman/Gits/orchestrator/docs.local/plans/ecosystem-execution/phase-1-eval-foundation/findings.md",
+ "source_of_truth": "docs/plans/phase-1-eval-foundation/findings.md",📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "source_of_truth": "/Users/etanheyman/Gits/orchestrator/docs.local/plans/ecosystem-execution/phase-1-eval-foundation/findings.md", | |
| "source_of_truth": "docs/plans/phase-1-eval-foundation/findings.md", |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/eval_mcp_brainlayer.json` at line 13, The JSON key "source_of_truth" in
tests/eval_mcp_brainlayer.json is set to a hardcoded absolute developer path;
replace it with a relative path (e.g., relative to the repo root or the test
runner working directory) or remove the "source_of_truth" field entirely if not
required by the eval runner, and ensure any code that reads this value can
resolve relative paths or fall back to a configurable environment variable.
|
nice consolidation, we went through a similar unification of scattered LaunchAgent plists into a single scheduling system. couple things from our experience: the content-hash dedup is critical, saved us from re-processing thousands of already-handled items. for the LaunchAgent plist, make sure you set ProcessType to Adaptive rather than Background, otherwise macOS aggressively throttles the agent and your rate limits won't behave as expected. also if the enrichment process runs longer than the LaunchAgent interval, you'll get overlapping invocations. we added a PID lock file check at the start of each run to skip if the previous invocation is still going. the 12 retries with exponential backoff sounds high - in practice we found 5 retries with jitter is enough for transient 429s and anything beyond that usually means the rate limit is structural, not transient. |
|
our LaunchAgent plist templates for scheduled automation tasks with PID locking and ProcessType configuration: https://github.com/m13v/social-autoposter/tree/main/launchd handles the overlapping invocation problem with lock files and configurable intervals. |
Single brain_enrich MCP tool replaces scattered enrichment scripts (enrichment-window.sh, enrichment-lazy.sh, enrich.sh). Three backends: - realtime: Gemini 2.5 Flash-Lite, single chunk, <600ms target - batch: Gemini Batch API, backlog processing, thinkingBudget=0 - local: MLX/Ollama backend, offline/privacy mode Features: content-hash dedup, per-backend rate limiting, retry with exponential backoff (12 retries, capped jitter), Axiom telemetry, unified LaunchAgent plist, CLI enrich command, 58 tests. Worker: track-A-R2-brainlayer Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e text - Sort enrich_handler import alphabetically (I001 lint fix) - Use asyncio.get_running_loop() instead of deprecated get_event_loop() - Fix install.sh usage text to include legacy 'enrich' option Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
752121a to
b521013
Compare
|
|
||
| genai.configure(api_key=API_KEY) | ||
|
|
||
| DELAY = int(sys.argv[sys.argv.index("--delay") + 1]) if "--delay" in sys.argv else 45 |
There was a problem hiding this comment.
🟢 Low scripts/batch_submit_paced.py:21
If --delay is passed without a value (e.g., python script.py --delay), sys.argv.index("--delay") + 1 equals len(sys.argv), causing an IndexError. Consider using argparse or validating that a value follows the flag.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file scripts/batch_submit_paced.py around line 21:
If `--delay` is passed without a value (e.g., `python script.py --delay`), `sys.argv.index("--delay") + 1` equals `len(sys.argv)`, causing an `IndexError`. Consider using `argparse` or validating that a value follows the flag.
Evidence trail:
scripts/batch_submit_paced.py lines 21-22 at REVIEWED_COMMIT show argument parsing without bounds checking: `DELAY = int(sys.argv[sys.argv.index("--delay") + 1]) if "--delay" in sys.argv else 45`. When `--delay` is the last argument, `sys.argv.index("--delay") + 1` equals `len(sys.argv)`, and indexing that position raises IndexError.
| def test_realtime_rate_limit_sleeps_between_chunks(monkeypatch): | ||
| from brainlayer import enrichment_controller as controller | ||
|
|
||
| store = MagicMock() | ||
| store.get_enrichment_candidates.return_value = [_candidate("c1"), _candidate("c2"), _candidate("c3")] | ||
| monkeypatch.setattr(controller, "build_external_prompt", MagicMock(return_value=("prompt", SimpleNamespace()))) | ||
| monkeypatch.setattr(controller, "parse_enrichment", MagicMock(return_value={"summary": "s", "tags": []})) | ||
| monkeypatch.setattr(controller, "Sanitizer", SimpleNamespace(from_env=lambda: SimpleNamespace())) | ||
| monkeypatch.setattr(controller, "_get_gemini_client", lambda: _fake_gemini_client()) | ||
|
|
||
| sleeps = [] | ||
| monkeypatch.setattr(controller.time, "sleep", sleeps.append) | ||
|
|
||
| controller.enrich_realtime(store, limit=3, rate_per_second=2.0) | ||
|
|
||
| # Should sleep between chunks (not after the last one) | ||
| assert len(sleeps) == 2 | ||
| assert all(s == 0.5 for s in sleeps) # 1/2.0 = 0.5s |
There was a problem hiding this comment.
🟡 Medium tests/test_enrichment_controller.py:451
The test does not mock _is_duplicate_content, so enrich_realtime may skip chunks if the mock returns True. This causes len(sleeps) to be fewer than 2, making the assertion fail unpredictably. Consider patching _is_duplicate_content to always return False so every chunk triggers build_external_prompt and the sleep calls.
def test_realtime_rate_limit_sleeps_between_chunks(monkeypatch):
from brainlayer import enrichment_controller as controller
store = MagicMock()
store.get_enrichment_candidates.return_value = [_candidate("c1"), _candidate("c2"), _candidate("c3")]
monkeypatch.setattr(controller, "build_external_prompt", MagicMock(return_value=("prompt", SimpleNamespace())))
monkeypatch.setattr(controller, "parse_enrichment", MagicMock(return_value={"summary": "s", "tags": []}))
monkeypatch.setattr(controller, "Sanitizer", SimpleNamespace(from_env=lambda: SimpleNamespace()))
monkeypatch.setattr(controller, "_get_gemini_client", lambda: _fake_gemini_client())
+ monkeypatch.setattr(controller, "_is_duplicate_content", lambda s, c: False)
sleeps = []
monkeypatch.setattr(controller.time, "sleep", sleeps.append)
controller.enrich_realtime(store, limit=3, rate_per_second=2.0)
# Should sleep between chunks (not after the last one)
assert len(sleeps) == 2
assert all(s == 0.5 for s in sleeps) # 1/2.0 = 0.5s🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file tests/test_enrichment_controller.py around lines 451-468:
The test does not mock `_is_duplicate_content`, so `enrich_realtime` may skip chunks if the mock returns `True`. This causes `len(sleeps)` to be fewer than 2, making the assertion fail unpredictably. Consider patching `_is_duplicate_content` to always return `False` so every chunk triggers `build_external_prompt` and the sleep calls.
Evidence trail:
- tests/test_enrichment_controller.py lines 450-468: test `test_realtime_rate_limit_sleeps_between_chunks` does not mock `_is_duplicate_content`
- src/brainlayer/enrichment_controller.py lines 116-132: `_is_duplicate_content` implementation returns `row[0] > 0` which with MagicMock returns truthy MagicMock
- src/brainlayer/enrichment_controller.py lines 371-402: `enrich_realtime` skips chunks via `continue` when `_is_duplicate_content` returns truthy, and sleep only happens after non-skipped chunks
- tests/test_enrichment_controller.py lines 367-373: similar test `test_realtime_skips_duplicate_content` correctly patches `_is_duplicate_content`
| # ── Rate limiting tests ────────────────────────────────────────────────────── | ||
|
|
||
|
|
||
| def test_rate_limits_defaults(): |
There was a problem hiding this comment.
🟢 Low tests/test_enrichment_controller.py:443
test_rate_limits_defaults checks that RATE_LIMITS["realtime"] is greater than zero, but RATE_LIMITS is a module-level constant computed at import time from os.environ.get(). Since the test does not control environment variables before import, it will fail when BRAINLAYER_ENRICH_RATE is set to a non-default value even though the code is correct. Consider either reloading the module after patching the environment, or testing the import-time logic directly by checking os.environ.get("BRAINLAYER_ENRICH_RATE", "0.2") instead of the constant.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file tests/test_enrichment_controller.py around line 443:
`test_rate_limits_defaults` checks that `RATE_LIMITS["realtime"]` is greater than zero, but `RATE_LIMITS` is a module-level constant computed at import time from `os.environ.get()`. Since the test does not control environment variables before import, it will fail when `BRAINLAYER_ENRICH_RATE` is set to a non-default value even though the code is correct. Consider either reloading the module after patching the environment, or testing the import-time logic directly by checking `os.environ.get("BRAINLAYER_ENRICH_RATE", "0.2")` instead of the constant.
Evidence trail:
src/brainlayer/enrichment_controller.py lines 39-43: `RATE_LIMITS = { "realtime": float(os.environ.get("BRAINLAYER_ENRICH_RATE", "0.2")), ... }` - confirms RATE_LIMITS is computed at import time from environment variable.
tests/test_enrichment_controller.py lines 443-448: `def test_rate_limits_defaults(): from brainlayer.enrichment_controller import RATE_LIMITS; assert RATE_LIMITS["realtime"] > 0` - confirms test imports and asserts without controlling environment.
No monkeypatch or environment setup visible in the test function or surrounding context that would control BRAINLAYER_ENRICH_RATE before module import.
Summary
brain_enrichMCP tool replaces 3 scattered enrichment scripts (enrichment-window.sh,enrichment-lazy.sh,enrich.sh)BRAINLAYER_ENRICH_RATE,BRAINLAYER_LOCAL_RATE,BRAINLAYER_BATCH_RATE)com.brainlayer.enrichment.plistreplaces 2 separate plistsbrainlayer enrich:--mode realtime|batch|local,--stats,--limit,--since-hoursAcceptance Criteria (A-R2)
brain_enrichMCP tool replaces scattered scriptscom.brainlayer.enrichment.plistTest plan
pytest tests/test_enrichment_controller.py -v— 58/58 passruff checkclean on all modified filesbrainlayer enrich --mode realtime --limit 5 --statsscripts/launchd/install.sh enrichmentWorker: track-A-R2-brainlayer
🤖 Generated with Claude Code
Note
Add unified
brain_enrichMCP tool with realtime, batch, and local backendsbrain_enrichMCP tool in enrich_handler.py that routes enrichment calls to realtime, batch, or local modes and returns structured results (attempted/enriched/skipped/failed counts).enrich_realtimeandenrich_localin enrichment_controller.py to skip chunks whose normalized content has already been enriched.com.brainlayer.enrichlaunchd plist with a newcom.brainlayer.enrichmentplist running hourly; the install.shalltarget migrates automatically.Macroscope summarized b521013.
Summary by CodeRabbit
New Features
Tests