refactor: harden enrichment controller for Gemini Flex throughput#234
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
📝 WalkthroughWalkthroughAdds per-store serialized write queues and token-bucket rate limiting; refactors enrichment flows to avoid worker-thread DB mutations by enqueueing writes; introduces a thread-safe WriteQueue, TokenBucket rate limiter, enhanced Gemini call handling with retry/backoff, and lifecycle coordination for store-scoped operations. Changes
Sequence DiagramsequenceDiagram
participant WT as Worker Thread
participant RateLim as TokenBucket
participant Gemini as Gemini API
participant Queue as WriteQueue
participant Main as Main Thread
participant DB as Database
rect rgba(100, 150, 200, 0.5)
Note over WT,DB: Realtime / batch enrichment flow
end
WT->>RateLim: acquire(1)
activate RateLim
RateLim-->>WT: token acquired (may wait)
deactivate RateLim
WT->>Gemini: generate_content(chunk) with retry/backoff
activate Gemini
Gemini-->>WT: enriched result
deactivate Gemini
WT->>Queue: submit(name, callback=apply_enrichment)
activate Queue
Queue-->>WT: Future enqueued
deactivate Queue
Main->>Queue: worker thread executes queued callback
activate Queue
Queue->>Main: callback runs
Main->>DB: persist enrichment (single-threaded writes)
deactivate Queue
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@codex review @coderabbitai This enrichment path now uses a per-store single writer queue because SQLite allows only one writer at a time and direct concurrent writes were creating lock contention under sustained Gemini Flex traffic. Gemini requests are also gated by a token bucket ( |
|
Tip For best results, initiate chat on the files or code changes.
✏️ Learnings added
🧠 Learnings used |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/brainlayer/enrichment_controller.py`:
- Around line 179-192: The current _ensure_enrichment_columns unconditionally
marks a store key ready even if the migration functions failed; change it so the
nested _ensure() returns a boolean that ANDs the results of
_ensure_content_hash_column(store) and _ensure_raw_entities_json_column(store),
capture the result of _submit_write(store, "ensure-enrichment-columns", _ensure)
(or the boolean outcome of the queued task) and only add key to
_ENRICHMENT_COLUMN_READY inside the _ENRICHMENT_COLUMN_LOCK when that result is
True; also ensure the underlying migration helpers (_ensure_content_hash_column
and _ensure_raw_entities_json_column) implement SQLITE_BUSY retry logic using a
per-worker DB connection so transient DDL conflicts retry rather than silently
failing.
In `@src/brainlayer/pipeline/write_queue.py`:
- Around line 58-74: The stop() method can return and clear _worker while the
old worker is still alive if queue.put(_STOP) fails or join() times out; update
stop() so it keeps attempting to enqueue the _STOP sentinel until it succeeds or
the deadline expires (don’t break out on queue.Full unless deadline reached),
then wait until either worker.is_alive() is false or the deadline passes (use
repeated join() or a loop checking is_alive before clearing); only set
self._worker = None under the lock if worker.is_alive() is False. Ensure you
reference the existing symbols stop, self._worker, self._queue, _STOP,
self._lock and worker.join when making the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d8a16bad-5f35-4620-9503-90cc5c09a789
📒 Files selected for processing (7)
src/brainlayer/enrichment_controller.pysrc/brainlayer/pipeline/rate_limiter.pysrc/brainlayer/pipeline/write_queue.pytests/test_enrichment_controller.pytests/test_enrichment_flex_integration.pytests/test_rate_limiter.pytests/test_write_queue.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Macroscope - Correctness Check
- GitHub Check: test (3.13)
- GitHub Check: test (3.11)
- GitHub Check: test (3.12)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work
Run pytest before claiming behavior changed safely; current test suite has 929 tests
**/*.py: Usepaths.py:get_db_path()for all database path resolution; all scripts and CLI must use this function rather than hardcoding paths
When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches
Files:
tests/test_rate_limiter.pytests/test_write_queue.pytests/test_enrichment_flex_integration.pytests/test_enrichment_controller.pysrc/brainlayer/pipeline/rate_limiter.pysrc/brainlayer/pipeline/write_queue.pysrc/brainlayer/enrichment_controller.py
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/**/*.py: Use retry logic onSQLITE_BUSYerrors; each worker must use its own database connection to handle concurrency safely
Classification must preserveai_code,stack_trace, anduser_messageverbatim; skipnoiseentries entirely and summarizebuild_loganddir_listingentries (structure only)
Use AST-aware chunking via tree-sitter; never split stack traces; mask large tool output
For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback viaenrichment_controller.py, and Ollama as offline last-resort; allow override viaBRAINLAYER_ENRICH_BACKENDenv var
Configure enrichment rate viaBRAINLAYER_ENRICH_RATEenvironment variable (default 0.2 = 12 RPM)
Implement chunk lifecycle columns:superseded_by,aggregated_into,archived_aton chunks table; exclude lifecycle-managed chunks from default search; allowinclude_archived=Trueto show history
Implementbrain_supersedewith safety gate for personal data (journals, notes, health/finance); use soft-delete forbrain_archivewith timestamp
Addsupersedesparameter tobrain_storefor atomic store-and-replace operations
Run linting and formatting with:ruff check src/ && ruff format src/
Run tests withpytest
UsePRAGMA wal_checkpoint(FULL)before and after bulk database operations to prevent WAL bloat
Files:
src/brainlayer/pipeline/rate_limiter.pysrc/brainlayer/pipeline/write_queue.pysrc/brainlayer/enrichment_controller.py
🧠 Learnings (12)
📓 Common learnings
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py` and `src/brainlayer/pipeline/rate_limiter.py`: Gemini API calls in the enrichment pipeline are gated by a token bucket rate limiter. The rate is controlled by `BRAINLAYER_ENRICH_RATE` (default `5/s`, burst `10`) to keep throughput inside the Gemini Flex intended envelope. This default supersedes the earlier 0.2 (12 RPM) default for the Gemini Flex integration path.
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py`, `src/brainlayer/pipeline/write_queue.py`, and related enrichment pipeline files: A per-store single-writer queue is used for SQLite enrichment writes because SQLite allows only one writer at a time; direct concurrent writes caused lock contention under sustained Gemini Flex traffic. Do not flag serialized write patterns in this path as a performance concern — the queue is intentional.
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-01T01:24:44.281Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment backend priority: Groq (primary/cloud) → Gemini (fallback) → Ollama (offline last-resort), configurable via `BRAINLAYER_ENRICH_BACKEND` environment variable
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback via `enrichment_controller.py`, and Ollama as offline last-resort; allow override via `BRAINLAYER_ENRICH_BACKEND` env var
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py`: Google-genai SDK auto-retries are explicitly disabled (`http_options.retry_options.attempts = 1`) in the enrichment controller. Retry policy is intentionally owned at the application layer, not delegated to the SDK. Do not flag this as missing retry logic.
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T23:32:14.543Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment rate configurable via `BRAINLAYER_ENRICH_RATE` environment variable (default 0.2 = 12 RPM)
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-03T11:43:08.915Z
Learning: Never run bulk database operations while enrichment is writing; this causes WAL bloat and potential freeze
📚 Learning: 2026-04-11T16:54:45.605Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py` and `src/brainlayer/pipeline/rate_limiter.py`: Gemini API calls in the enrichment pipeline are gated by a token bucket rate limiter. The rate is controlled by `BRAINLAYER_ENRICH_RATE` (default `5/s`, burst `10`) to keep throughput inside the Gemini Flex intended envelope. This default supersedes the earlier 0.2 (12 RPM) default for the Gemini Flex integration path.
Applied to files:
tests/test_rate_limiter.pytests/test_enrichment_controller.pysrc/brainlayer/pipeline/rate_limiter.pysrc/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-11T16:54:45.605Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py`, `src/brainlayer/pipeline/write_queue.py`, and related enrichment pipeline files: A per-store single-writer queue is used for SQLite enrichment writes because SQLite allows only one writer at a time; direct concurrent writes caused lock contention under sustained Gemini Flex traffic. Do not flag serialized write patterns in this path as a performance concern — the queue is intentional.
Applied to files:
tests/test_write_queue.pytests/test_enrichment_flex_integration.pysrc/brainlayer/pipeline/write_queue.pysrc/brainlayer/enrichment_controller.py
📚 Learning: 2026-03-22T15:55:22.017Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 100
File: src/brainlayer/enrichment_controller.py:175-199
Timestamp: 2026-03-22T15:55:22.017Z
Learning: In `src/brainlayer/enrichment_controller.py`, the `parallel` parameter in `enrich_local()` is intentionally kept in the function signature (currently unused, suppressed with `# noqa: ARG001`) for API stability. Parallel local enrichment via a thread pool or process pool is planned for a future iteration. Do not flag this as dead code requiring removal.
Applied to files:
tests/test_enrichment_flex_integration.pysrc/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-11T16:54:45.605Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py`: Google-genai SDK auto-retries are explicitly disabled (`http_options.retry_options.attempts = 1`) in the enrichment controller. Retry policy is intentionally owned at the application layer, not delegated to the SDK. Do not flag this as missing retry logic.
Applied to files:
tests/test_enrichment_controller.py
📚 Learning: 2026-04-01T01:24:44.281Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-01T01:24:44.281Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment backend priority: Groq (primary/cloud) → Gemini (fallback) → Ollama (offline last-resort), configurable via `BRAINLAYER_ENRICH_BACKEND` environment variable
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback via `enrichment_controller.py`, and Ollama as offline last-resort; allow override via `BRAINLAYER_ENRICH_BACKEND` env var
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-03T11:43:08.915Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-03T11:43:08.915Z
Learning: Applies to src/brainlayer/*bulk*.py : Before bulk database operations: stop enrichment workers, checkpoint WAL with `PRAGMA wal_checkpoint(FULL)`, drop FTS triggers before bulk deletes
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to **/*.py : When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-03T11:43:08.915Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-03T11:43:08.915Z
Learning: Never run bulk database operations while enrichment is writing; this causes WAL bloat and potential freeze
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : Configure enrichment rate via `BRAINLAYER_ENRICH_RATE` environment variable (default 0.2 = 12 RPM)
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-02T23:32:14.543Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T23:32:14.543Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment rate configurable via `BRAINLAYER_ENRICH_RATE` environment variable (default 0.2 = 12 RPM)
Applied to files:
src/brainlayer/enrichment_controller.py
| def _ensure_enrichment_columns(store) -> None: | ||
| key = _store_queue_key(store) | ||
| with _ENRICHMENT_COLUMN_LOCK: | ||
| if key in _ENRICHMENT_COLUMN_READY: | ||
| return | ||
|
|
||
| def _ensure() -> None: | ||
| _ensure_content_hash_column(store) | ||
| _ensure_raw_entities_json_column(store) | ||
|
|
||
| _submit_write(store, "ensure-enrichment-columns", _ensure) | ||
|
|
||
| with _ENRICHMENT_COLUMN_LOCK: | ||
| _ENRICHMENT_COLUMN_READY.add(key) |
There was a problem hiding this comment.
Only cache “columns ready” after the migration actually succeeds.
Lines 185-192 ignore the bool returned by _ensure_content_hash_column() and _ensure_raw_entities_json_column(), then unconditionally add the store key to _ENRICHMENT_COLUMN_READY. A transient SQLITE_BUSY or DDL failure will therefore disable future retries for the rest of that store lifecycle, leaving dedup/raw-entity writes silently broken.
Proposed fix
def _ensure_enrichment_columns(store) -> None:
key = _store_queue_key(store)
with _ENRICHMENT_COLUMN_LOCK:
if key in _ENRICHMENT_COLUMN_READY:
return
def _ensure() -> None:
- _ensure_content_hash_column(store)
- _ensure_raw_entities_json_column(store)
+ if not _ensure_content_hash_column(store):
+ raise RuntimeError("failed to ensure chunks.content_hash")
+ if not _ensure_raw_entities_json_column(store):
+ raise RuntimeError("failed to ensure chunks.raw_entities_json")
_submit_write(store, "ensure-enrichment-columns", _ensure)
with _ENRICHMENT_COLUMN_LOCK:
_ENRICHMENT_COLUMN_READY.add(key)As per coding guidelines, src/brainlayer/**/*.py: “Use retry logic on SQLITE_BUSY errors; each worker must use its own database connection to handle concurrency safely.”
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/enrichment_controller.py` around lines 179 - 192, The current
_ensure_enrichment_columns unconditionally marks a store key ready even if the
migration functions failed; change it so the nested _ensure() returns a boolean
that ANDs the results of _ensure_content_hash_column(store) and
_ensure_raw_entities_json_column(store), capture the result of
_submit_write(store, "ensure-enrichment-columns", _ensure) (or the boolean
outcome of the queued task) and only add key to _ENRICHMENT_COLUMN_READY inside
the _ENRICHMENT_COLUMN_LOCK when that result is True; also ensure the underlying
migration helpers (_ensure_content_hash_column and
_ensure_raw_entities_json_column) implement SQLITE_BUSY retry logic using a
per-worker DB connection so transient DDL conflicts retry rather than silently
failing.
| def stop(self, timeout: float = 5.0) -> None: | ||
| with self._lock: | ||
| worker = self._worker | ||
| if worker is None: | ||
| return | ||
| deadline = time.monotonic() + max(timeout, 0.0) | ||
| while worker.is_alive(): | ||
| try: | ||
| self._queue.put(_STOP, timeout=0.05) | ||
| break | ||
| except queue.Full: | ||
| if time.monotonic() >= deadline: | ||
| break | ||
| worker.join(timeout=timeout) | ||
| if not worker.is_alive(): | ||
| with self._lock: | ||
| self._worker = None |
There was a problem hiding this comment.
Don’t report shutdown success while the old write worker is still alive.
Lines 63-74 can return with worker.is_alive() still True when the queue stays full or join() times out. src/brainlayer/enrichment_controller.py then drops the registry entry and clears _STORE_CLOSING, so the next operation can start a second writer thread for the same store while the first one is still draining work. That breaks the single-writer guarantee and can reintroduce SQLite write contention.
Proposed fix
def stop(self, timeout: float = 5.0) -> None:
with self._lock:
worker = self._worker
if worker is None:
return
deadline = time.monotonic() + max(timeout, 0.0)
while worker.is_alive():
try:
self._queue.put(_STOP, timeout=0.05)
break
except queue.Full:
if time.monotonic() >= deadline:
- break
- worker.join(timeout=timeout)
- if not worker.is_alive():
- with self._lock:
- self._worker = None
+ raise TimeoutError("write queue did not accept stop signal before timeout")
+ remaining = max(0.0, deadline - time.monotonic())
+ worker.join(timeout=remaining)
+ if worker.is_alive():
+ raise TimeoutError("write queue worker did not stop before timeout")
+ with self._lock:
+ if self._worker is worker:
+ self._worker = NoneAs per coding guidelines, **/*.py: “Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work.”
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/pipeline/write_queue.py` around lines 58 - 74, The stop()
method can return and clear _worker while the old worker is still alive if
queue.put(_STOP) fails or join() times out; update stop() so it keeps attempting
to enqueue the _STOP sentinel until it succeeds or the deadline expires (don’t
break out on queue.Full unless deadline reached), then wait until either
worker.is_alive() is false or the deadline passes (use repeated join() or a loop
checking is_alive before clearing); only set self._worker = None under the lock
if worker.is_alive() is False. Ensure you reference the existing symbols stop,
self._worker, self._queue, _STOP, self._lock and worker.join when making the
change.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6179eabea7
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for limiter_key in list(_RATE_LIMITER_REGISTRY): | ||
| if limiter_key[0] == key: | ||
| _RATE_LIMITER_REGISTRY.pop(limiter_key, None) |
There was a problem hiding this comment.
Preserve token bucket across operation cleanup
Removing every limiter for a store when _STORE_OPERATION_COUNTS hits zero resets the bucket state after each completed enrich_single() call, because that path opens and closes one operation per chunk. In the common sequential (non-overlapping) request pattern, each new call gets a fresh burst of 10 tokens, so the intended sustained cap (default 5/s) is effectively bypassed and Gemini traffic can spike far above the configured envelope; keep limiter state alive across idle transitions (or expire it with a real idle TTL) instead of deleting it on every cleanup.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 90e8b58e.
The limiter registry is no longer cleared during store cleanup, so sequential enrich_single() calls keep the same token-bucket state instead of regaining a fresh burst after every idle transition. I also added test_rate_limiter_survives_store_cleanup to lock that behavior in.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/brainlayer/enrichment_controller.py (1)
525-529: 🧹 Nitpick | 🔵 TrivialDDL migration inside
_apply_enrichmentmay cause issues.
_ensure_raw_entities_json_column(store)is called here during enrichment application, but this function is also called in_ensure_enrichment_columns. If the column already exists (the common case), this is a no-op. However, if_ensure_enrichment_columnsfailed silently (per the issue at lines 179-192), this redundant check will also fail, causing entity data to be silently dropped.Consider removing this call since
_ensure_enrichment_columnsis now called before any enrichment work, or ensure both paths handle failures consistently.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/brainlayer/enrichment_controller.py` around lines 525 - 529, The redundant call to _ensure_raw_entities_json_column(store) inside _apply_enrichment can mask earlier failures and lead to dropped entity data; remove that invocation from _apply_enrichment and rely on the prior _ensure_enrichment_columns call (which is run before enrichment work), or if you prefer to keep it, change it to a non-failing check that raises/logs on error consistently with _ensure_enrichment_columns; update/remove the block that calls store.conn.cursor().execute("UPDATE chunks SET raw_entities_json = ? WHERE id = ?", (json.dumps(entities), chunk["id"])) so it assumes the column exists only after _ensure_enrichment_columns has succeeded and do not swallow errors silently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/brainlayer/enrichment_controller.py`:
- Line 848: The call creating rate_limiter uses an inconsistent fallback value
(RATE_LIMITS.get("realtime", 0.2)) that contradicts the new default of 5.0;
update this call to use the same default (e.g., RATE_LIMITS.get("realtime",
5.0)) or remove the fallback and access RATE_LIMITS["realtime"] directly so
_get_store_rate_limiter(store, rate_per_second=...) and the RATE_LIMITS constant
remain consistent.
In `@tests/test_enrichment_flex_integration.py`:
- Around line 92-93: The test currently only calls store.close() in the finally
block, which can leave module-level registries (_WRITE_QUEUE_REGISTRY,
_RATE_LIMITER_REGISTRY, _ENRICHMENT_COLUMN_READY) populated after failures;
update the finally block to explicitly remove any entries related to the test
store by calling the module cleanup used in normal operation (e.g., invoke
_end_store_operation or directly pop/clear keys for those registries) after
calling store.close(), ensuring you reference the same identifiers
(_WRITE_QUEUE_REGISTRY, _RATE_LIMITER_REGISTRY, _ENRICHMENT_COLUMN_READY, and
_end_store_operation/enrich_single) so the test cannot leak state to other
tests.
---
Outside diff comments:
In `@src/brainlayer/enrichment_controller.py`:
- Around line 525-529: The redundant call to
_ensure_raw_entities_json_column(store) inside _apply_enrichment can mask
earlier failures and lead to dropped entity data; remove that invocation from
_apply_enrichment and rely on the prior _ensure_enrichment_columns call (which
is run before enrichment work), or if you prefer to keep it, change it to a
non-failing check that raises/logs on error consistently with
_ensure_enrichment_columns; update/remove the block that calls
store.conn.cursor().execute("UPDATE chunks SET raw_entities_json = ? WHERE id =
?", (json.dumps(entities), chunk["id"])) so it assumes the column exists only
after _ensure_enrichment_columns has succeeded and do not swallow errors
silently.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8626769c-8755-4988-86ac-c60770c5b5ec
📒 Files selected for processing (3)
src/brainlayer/enrichment_controller.pytests/test_enrichment_flex_integration.pytests/test_write_queue.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: test (3.13)
- GitHub Check: test (3.12)
- GitHub Check: test (3.11)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work
Run pytest before claiming behavior changed safely; current test suite has 929 tests
**/*.py: Usepaths.py:get_db_path()for all database path resolution; all scripts and CLI must use this function rather than hardcoding paths
When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches
Files:
tests/test_write_queue.pytests/test_enrichment_flex_integration.pysrc/brainlayer/enrichment_controller.py
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/**/*.py: Use retry logic onSQLITE_BUSYerrors; each worker must use its own database connection to handle concurrency safely
Classification must preserveai_code,stack_trace, anduser_messageverbatim; skipnoiseentries entirely and summarizebuild_loganddir_listingentries (structure only)
Use AST-aware chunking via tree-sitter; never split stack traces; mask large tool output
For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback viaenrichment_controller.py, and Ollama as offline last-resort; allow override viaBRAINLAYER_ENRICH_BACKENDenv var
Configure enrichment rate viaBRAINLAYER_ENRICH_RATEenvironment variable (default 0.2 = 12 RPM)
Implement chunk lifecycle columns:superseded_by,aggregated_into,archived_aton chunks table; exclude lifecycle-managed chunks from default search; allowinclude_archived=Trueto show history
Implementbrain_supersedewith safety gate for personal data (journals, notes, health/finance); use soft-delete forbrain_archivewith timestamp
Addsupersedesparameter tobrain_storefor atomic store-and-replace operations
Run linting and formatting with:ruff check src/ && ruff format src/
Run tests withpytest
UsePRAGMA wal_checkpoint(FULL)before and after bulk database operations to prevent WAL bloat
Files:
src/brainlayer/enrichment_controller.py
🧠 Learnings (18)
📓 Common learnings
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py`, `src/brainlayer/pipeline/write_queue.py`, and related enrichment pipeline files: A per-store single-writer queue is used for SQLite enrichment writes because SQLite allows only one writer at a time; direct concurrent writes caused lock contention under sustained Gemini Flex traffic. Do not flag serialized write patterns in this path as a performance concern — the queue is intentional.
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py` and `src/brainlayer/pipeline/rate_limiter.py`: Gemini API calls in the enrichment pipeline are gated by a token bucket rate limiter. The rate is controlled by `BRAINLAYER_ENRICH_RATE` (default `5/s`, burst `10`) to keep throughput inside the Gemini Flex intended envelope. This default supersedes the earlier 0.2 (12 RPM) default for the Gemini Flex integration path.
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-01T01:24:44.281Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment backend priority: Groq (primary/cloud) → Gemini (fallback) → Ollama (offline last-resort), configurable via `BRAINLAYER_ENRICH_BACKEND` environment variable
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback via `enrichment_controller.py`, and Ollama as offline last-resort; allow override via `BRAINLAYER_ENRICH_BACKEND` env var
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-14T02:20:54.656Z
Learning: Request codex review, cursor review, and bugbot review for BrainLayer PRs
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py`: Google-genai SDK auto-retries are explicitly disabled (`http_options.retry_options.attempts = 1`) in the enrichment controller. Retry policy is intentionally owned at the application layer, not delegated to the SDK. Do not flag this as missing retry logic.
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-03T11:43:08.915Z
Learning: Never run bulk database operations while enrichment is writing; this causes WAL bloat and potential freeze
📚 Learning: 2026-04-11T16:54:45.605Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py`, `src/brainlayer/pipeline/write_queue.py`, and related enrichment pipeline files: A per-store single-writer queue is used for SQLite enrichment writes because SQLite allows only one writer at a time; direct concurrent writes caused lock contention under sustained Gemini Flex traffic. Do not flag serialized write patterns in this path as a performance concern — the queue is intentional.
Applied to files:
tests/test_write_queue.pytests/test_enrichment_flex_integration.pysrc/brainlayer/enrichment_controller.py
📚 Learning: 2026-03-22T15:55:22.017Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 100
File: src/brainlayer/enrichment_controller.py:175-199
Timestamp: 2026-03-22T15:55:22.017Z
Learning: In `src/brainlayer/enrichment_controller.py`, the `parallel` parameter in `enrich_local()` is intentionally kept in the function signature (currently unused, suppressed with `# noqa: ARG001`) for API stability. Parallel local enrichment via a thread pool or process pool is planned for a future iteration. Do not flag this as dead code requiring removal.
Applied to files:
tests/test_enrichment_flex_integration.pysrc/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-01T01:24:44.281Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-01T01:24:44.281Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment backend priority: Groq (primary/cloud) → Gemini (fallback) → Ollama (offline last-resort), configurable via `BRAINLAYER_ENRICH_BACKEND` environment variable
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-11T16:54:45.605Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.605Z
Learning: Applies to `src/brainlayer/enrichment_controller.py` and `src/brainlayer/pipeline/rate_limiter.py`: Gemini API calls in the enrichment pipeline are gated by a token bucket rate limiter. The rate is controlled by `BRAINLAYER_ENRICH_RATE` (default `5/s`, burst `10`) to keep throughput inside the Gemini Flex intended envelope. This default supersedes the earlier 0.2 (12 RPM) default for the Gemini Flex integration path.
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-03T11:43:08.915Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-03T11:43:08.915Z
Learning: Applies to src/brainlayer/*bulk*.py : Before bulk database operations: stop enrichment workers, checkpoint WAL with `PRAGMA wal_checkpoint(FULL)`, drop FTS triggers before bulk deletes
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to **/*.py : When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-03T11:43:08.915Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-03T11:43:08.915Z
Learning: Never run bulk database operations while enrichment is writing; this causes WAL bloat and potential freeze
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback via `enrichment_controller.py`, and Ollama as offline last-resort; allow override via `BRAINLAYER_ENRICH_BACKEND` env var
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : Add `supersedes` parameter to `brain_store` for atomic store-and-replace operations
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : Use retry logic on `SQLITE_BUSY` errors; each worker must use its own database connection to handle concurrency safely
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-03T11:43:08.915Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-03T11:43:08.915Z
Learning: Applies to src/**/*.py : All database connections must retry on `SQLITE_BUSY`; each worker uses its own connection
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-03-14T02:20:54.656Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-14T02:20:54.656Z
Learning: Be aware of known BrainLayer issues: DB locking during enrichment and WAL growth up to 4.7GB
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-03-14T02:20:54.656Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-14T02:20:54.656Z
Learning: Applies to **/*.py : Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-04T15:22:02.740Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 198
File: hooks/brainlayer-prompt-search.py:241-259
Timestamp: 2026-04-04T15:22:02.740Z
Learning: In `hooks/brainlayer-prompt-search.py` (Python), `record_injection_event()` is explicitly best-effort telemetry: silent `except sqlite3.Error: pass` is intentional — table non-existence or lock failures are acceptable silent failures. `sqlite3.connect(timeout=2)` is the file-open timeout; `PRAGMA busy_timeout` governs per-statement lock-wait. The `DEADLINE_MS` (450ms) guard applies only to the FTS search phase, not to this side-channel write.
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : Implement chunk lifecycle columns: `superseded_by`, `aggregated_into`, `archived_at` on chunks table; exclude lifecycle-managed chunks from default search; allow `include_archived=True` to show history
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : Configure enrichment rate via `BRAINLAYER_ENRICH_RATE` environment variable (default 0.2 = 12 RPM)
Applied to files:
src/brainlayer/enrichment_controller.py
📚 Learning: 2026-04-02T23:32:14.543Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T23:32:14.543Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment rate configurable via `BRAINLAYER_ENRICH_RATE` environment variable (default 0.2 = 12 RPM)
Applied to files:
src/brainlayer/enrichment_controller.py
🔇 Additional comments (15)
src/brainlayer/enrichment_controller.py (9)
179-192: Migration success not validated before caching "ready" state.This concern was previously raised: the boolean results from
_ensure_content_hash_column()and_ensure_raw_entities_json_column()are ignored, and the store key is unconditionally added to_ENRICHMENT_COLUMN_READY. A transientSQLITE_BUSYor DDL failure will disable future retries for this store's lifetime.
57-66: Well-structured per-store concurrency infrastructure.The module-level registries with dedicated locks for write queues, rate limiters, and store operation tracking provide clean isolation per store. The condition variable pattern for coordinating shutdown (
_STORE_OPERATION_CONDITION,_STORE_CLOSING) is appropriate for ensuring cleanup completes before new work begins.
132-172: Robust store lifecycle coordination with proper cleanup sequencing.The
_begin_store_operation/_end_store_operationpair correctly uses a condition variable to block new operations while shutdown is in progress. The cleanup logic (lines 156-172) properly:
- Removes and stops the write queue outside the lock
- Cleans up rate limiters for the store
- Clears column-ready state
- Uses a
finallyblock to ensure_STORE_CLOSINGis always cleared
262-265: SDK retry disabled correctly; application layer owns retry policy.The
http_options.retry_options.attempts = 1configuration aligns with the documented design decision to disable SDK auto-retries and manage retry behavior at the application layer via_retry_with_backoff. Based on learnings: "Google-genai SDK auto-retries are explicitly disabled... Retry policy is intentionally owned at the application layer."
213-245: Read-only chunk fetch correctly bypasses write queue.The fallback to
store.get_chunkwhen_read_cursoris unavailable ensures compatibility. Using direct SQL via_read_cursor()for reads aligns with the design goal of separating read and write paths.Note: accessing
store._read_cursor()(private method) couples this to the store's internal implementation. Consider whether a public read API on the store would be more maintainable long-term.
195-210: Rate limiter correctly implements token bucket with configurable rate and burst.The per-store rate limiter with tuple key
(store_key, rate, burst)allows different configurations per store. The defaults (5/s, burst10) align with documented design. Based on learnings: "Gemini API calls are gated by token bucket rate limiter... BRAINLAYER_ENRICH_RATE default is 5/s with burst 10."
598-657:enrich_singlecorrectly integrates write queue and rate limiting.The function properly:
- Wraps all work in
_begin_store_operation/_end_store_operationwithtry/finally- Uses
_get_chunk_readonlyfor the read path- Submits all DB mutations (
_mark_meta_research,_apply_enrichment) through_submit_write- Applies rate limiting to Gemini calls via
_generate_content_with_rate_limit
763-802: Concurrent Gemini calls with serialized DB writes correctly implemented.The
ThreadPoolExecutorenables concurrent Gemini API calls (up toENRICH_CONCURRENCY), while all DB mutations (_mark_meta_research,_apply_enrichment) go through_submit_writeto the single-writer queue. The sharedrate_limitergates API throughput across all threads. This correctly implements the design of concurrent inference with serialized writes. Based on learnings: "Per-store single-writer queue is used for SQLite enrichment writes because SQLite allows only one writer at a time."
908-948:enrich_localcorrectly integrates write queue without rate limiting.The function properly wraps operations with lifecycle management and routes all DB writes through
_submit_write. No rate limiting is applied, which is correct sinceRATE_LIMITS["local"]defaults to0(line 50).tests/test_enrichment_flex_integration.py (3)
35-54: Well-designed FakeClient for concurrent testing.The
FakeClientimplementation correctly:
- Uses a lock to thread-safely record call timestamps
- Sleeps to simulate work (allowing rate limiter behavior to manifest)
- Returns a valid JSON response structure
72-75: Correct steady-state rate measurement.Excluding the first 10 calls (burst allowance) and measuring the remaining 90 calls accurately tests the sustained rate. The 5.5 calls/sec threshold provides appropriate tolerance over the 5.0 target rate.
77-91: DB assertions correctly validate enrichment pipeline.The assertions verify:
resolved_queryis not set to the summary value (it would beNULLsince the mock response lacksresolved_query/resolved_queries)- All 100 chunks have
summary = "serialized enrichment"This confirms both the enrichment parsing and the serialized write path are working correctly.
tests/test_write_queue.py (3)
80-112: Test correctly validates FIFO ordering of write queue.The barrier ensures all threads contend simultaneously, and the lock inside
submit_valueensuressubmission_ordercaptures the actual submission sequence. The assertionpersisted_order == submission_ordervalidates that callbacks execute in submission order (FIFO).
114-141: Test validates queue durability across worker restart.The test exercises the scenario where
crash_on_error=Truecauses the worker to stop, but previously queued items remain and are processed afterstart()is called again. This is important for ensuring no writes are lost during transient failures.
143-152: Test correctly validates backpressure behavior.Overriding
startwith a no-op prevents the worker from draining the queue, allowing the test to verify thatWriteQueueFullErroris raised when the queue is full. The assertionnot first_future.done()confirms the item remains queued.Minor note: the test doesn't call
stop()on the queue, but since the worker never started, this is acceptable.
|
|
||
| sanitizer = Sanitizer.from_env() | ||
| config = _build_gemini_config() | ||
| rate_limiter = _get_store_rate_limiter(store, rate_per_second=RATE_LIMITS.get("realtime", 0.2)) |
There was a problem hiding this comment.
Inconsistent fallback rate value.
The fallback value 0.2 in RATE_LIMITS.get("realtime", 0.2) is inconsistent with the new default of 5.0 defined at line 48. Since RATE_LIMITS["realtime"] is always defined, this fallback is unreachable, but it could cause confusion if the code is refactored.
Suggested fix
- rate_limiter = _get_store_rate_limiter(store, rate_per_second=RATE_LIMITS.get("realtime", 0.2))
+ rate_limiter = _get_store_rate_limiter(store, rate_per_second=RATE_LIMITS["realtime"])📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| rate_limiter = _get_store_rate_limiter(store, rate_per_second=RATE_LIMITS.get("realtime", 0.2)) | |
| rate_limiter = _get_store_rate_limiter(store, rate_per_second=RATE_LIMITS["realtime"]) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/enrichment_controller.py` at line 848, The call creating
rate_limiter uses an inconsistent fallback value (RATE_LIMITS.get("realtime",
0.2)) that contradicts the new default of 5.0; update this call to use the same
default (e.g., RATE_LIMITS.get("realtime", 5.0)) or remove the fallback and
access RATE_LIMITS["realtime"] directly so _get_store_rate_limiter(store,
rate_per_second=...) and the RATE_LIMITS constant remain consistent.
| finally: | ||
| store.close() |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider cleaning up controller module state after test.
The finally block closes the store, but module-level registries (_WRITE_QUEUE_REGISTRY, _RATE_LIMITER_REGISTRY, _ENRICHMENT_COLUMN_READY) may retain stale entries if tests don't complete normally. Since _end_store_operation is called per enrich_single and would normally clean up, this should be fine in the happy path, but adding explicit cleanup could improve test isolation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_enrichment_flex_integration.py` around lines 92 - 93, The test
currently only calls store.close() in the finally block, which can leave
module-level registries (_WRITE_QUEUE_REGISTRY, _RATE_LIMITER_REGISTRY,
_ENRICHMENT_COLUMN_READY) populated after failures; update the finally block to
explicitly remove any entries related to the test store by calling the module
cleanup used in normal operation (e.g., invoke _end_store_operation or directly
pop/clear keys for those registries) after calling store.close(), ensuring you
reference the same identifiers (_WRITE_QUEUE_REGISTRY, _RATE_LIMITER_REGISTRY,
_ENRICHMENT_COLUMN_READY, and _end_store_operation/enrich_single) so the test
cannot leak state to other tests.
Summary
BRAINLAYER_ENRICH_RATEwith a default of5/sand burst10Why
R86 Deep Research called out three architectural prerequisites before running Gemini Flash Flex inference at sustained rate against BrainLayer's SQLite-backed enrichment pipeline.
The current concurrent
enrich_single()path can send at about346.9 req/sin the new integration RED test, versus the target ceiling of<=5.5 req/safter the initial burst. This PR closes that gap and removes direct multi-threaded write contention on SQLite.Implementation Notes
src/brainlayer/enrichment_controller.pyR84b-design.mdsection 10.1 line 323 is stale and still referencessrc/brainlayer/pipeline/enrichment_controller.pygoogle-genaiversion is1.63.0; retry control is underhttp_options.retry_options.attempts, not a top-levelretry_optionsclient kwarg/Users/etanheyman/Gits/orchestrator/collab/brainlayer-pr-a3-enrichment-refactor.mdTests
pytest tests/test_write_queue.py tests/test_rate_limiter.py -qpytest tests/test_enrichment_controller.py::test_store_lifecycle_waits_for_cleanup_before_new_operations tests/test_write_queue.py tests/test_rate_limiter.py -qpytest tests/test_enrichment_controller.py tests/test_write_queue.py tests/test_concurrent_enrichment.py tests/test_auto_enrich.py tests/test_rate_limiter.py tests/test_enrichment_flex_integration.py -qruff check src/brainlayer/enrichment_controller.py src/brainlayer/pipeline/write_queue.py src/brainlayer/pipeline/rate_limiter.py tests/test_enrichment_controller.py tests/test_write_queue.py tests/test_rate_limiter.py tests/test_enrichment_flex_integration.pyFull Suite Context
pytest tests/ -qin this local environment still reports:origin/main(tests/test_eval_baselines.py,tests/test_vector_store.py)sess-1,sess-123) contaminating dedup state across long runsThose are out of scope for PR-A3 and pre-existing to this refactor.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests
Note
Harden enrichment controller with per-store write queues and token-bucket rate limiting for Gemini Flex throughput
WriteQueueto serialize all database writes through a per-store background worker thread, replacing direct calls inenrich_single,enrich_realtime,enrich_batch, andenrich_local.TokenBucketfor per-store Gemini call pacing, replacing the fixedtime.sleeploops previously used inenrich_realtimeandenrich_batch._begin_store_operation/_end_store_operationso write-queue workers are started on first use and cleanly shut down when all concurrent operations complete.attempts=1) and routes retry logic entirely through the existing_retry_with_backoffhelper, which now logs a warning with the planned delay before each sleep.Future.result()instead.Macroscope summarized 90e8b58.