Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions src/brainlayer/mcp/search_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

import asyncio

import apsw
from mcp.types import TextContent

# Retry settings for DB lock resilience on reads
_RETRY_MAX_ATTEMPTS = 3
_retry_delay = 0.1 # base delay in seconds (exposed for test patching)

from ._shared import (
_build_compact_result,
_error_result,
Expand Down Expand Up @@ -225,7 +230,7 @@ async def _search(
# Backward compat: accept old 'format' kwarg
format: str | None = None,
):
"""Execute a hybrid search query (semantic + keyword via RRF)."""
"""Execute a hybrid search query (semantic + keyword via RRF). Retries on BusyError."""
try:
# Backward compat: old 'format' kwarg overrides 'detail'
if format is not None:
Expand Down Expand Up @@ -255,26 +260,44 @@ async def _search(
elif source:
source_filter = source
else:
source_filter = None # AIDEV-NOTE: was "claude_code" — excluded brain_store ("manual") chunks. Default to all sources.
source_filter = (
None # AIDEV-NOTE: was "claude_code" — excluded brain_store ("manual") chunks. Default to all sources.
)

if entity_id and not source:
source_filter = None

results = store.hybrid_search(
query_embedding=query_embedding,
query_text=query,
n_results=num_results,
project_filter=normalized_project,
content_type_filter=content_type,
source_filter=source_filter,
tag_filter=tag,
intent_filter=intent,
importance_min=importance_min,
date_from=date_from,
date_to=date_to,
sentiment_filter=sentiment,
entity_id=entity_id,
)
# Retry hybrid_search on BusyError — WAL reads shouldn't block but
# they can during checkpoint or when enrichment holds exclusive lock.
results = None
for attempt in range(_RETRY_MAX_ATTEMPTS):
try:
results = store.hybrid_search(
query_embedding=query_embedding,
query_text=query,
n_results=num_results,
project_filter=normalized_project,
content_type_filter=content_type,
source_filter=source_filter,
tag_filter=tag,
intent_filter=intent,
importance_min=importance_min,
date_from=date_from,
date_to=date_to,
sentiment_filter=sentiment,
entity_id=entity_id,
)
break
except Exception as e:
is_lock = isinstance(e, apsw.BusyError) or "locked" in str(e).lower() or "busy" in str(e).lower()
if is_lock and attempt < _RETRY_MAX_ATTEMPTS - 1:
delay = _retry_delay * (2**attempt)
logger.warning(
"Search BusyError (attempt %d/%d), retrying in %.2fs", attempt + 1, _RETRY_MAX_ATTEMPTS, delay
)
await asyncio.sleep(delay)
continue
raise # Non-lock error or retries exhausted

if not results["documents"][0]:
empty = {"query": query, "total": 0, "results": []}
Expand Down
166 changes: 103 additions & 63 deletions src/brainlayer/mcp/store_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import json

import apsw
from mcp.types import CallToolResult, TextContent

from ._shared import (
Expand All @@ -15,6 +16,11 @@
logger,
)

# Retry settings for DB lock resilience
_RETRY_MAX_ATTEMPTS = 4
_retry_delay = 0.15 # base delay in seconds (exposed for test patching)
_QUEUE_MAX_SIZE = 100


async def _brain_digest(
content: str,
Expand Down Expand Up @@ -98,69 +104,80 @@ async def _brain_update(
importance: int | None = None,
merge_chunk_ids: list[str] | None = None,
):
"""Update, archive, or merge memories."""
try:
store = _get_vector_store()

if action == "archive":
ok = store.archive_chunk(chunk_id)
if not ok:
return _error_result(f"Chunk not found: {chunk_id}")
return [TextContent(type="text", text=json.dumps({"action": "archived", "chunk_id": chunk_id}))]

elif action == "update":
existing = store.get_chunk(chunk_id)
if not existing:
return _error_result(f"Chunk not found: {chunk_id}")

embedding = None
if content is not None:
loop = asyncio.get_running_loop()
model = _get_embedding_model()
embedding = await loop.run_in_executor(None, model.embed_query, content)

ok = store.update_chunk(
chunk_id=chunk_id,
content=content,
tags=tags,
importance=float(importance) if importance is not None else None,
embedding=embedding,
)
if not ok:
return _error_result(f"Update failed for: {chunk_id}")

result = {"action": "updated", "chunk_id": chunk_id, "fields": []}
if content is not None:
result["fields"].append("content")
if tags is not None:
result["fields"].append("tags")
if importance is not None:
result["fields"].append("importance")
return [TextContent(type="text", text=json.dumps(result))]

elif action == "merge":
if not merge_chunk_ids:
return _error_result("merge requires merge_chunk_ids (the duplicates to archive)")
keeper = store.get_chunk(chunk_id)
if not keeper:
return _error_result(f"Keeper chunk not found: {chunk_id}")
archived = []
failed = []
for dup_id in merge_chunk_ids:
ok = store.archive_chunk(dup_id)
if ok:
archived.append(dup_id)
else:
failed.append(dup_id)
result = {"action": "merged", "kept": chunk_id, "archived": archived, "failed": failed}
return [TextContent(type="text", text=json.dumps(result))]

else:
return _error_result(f"Unknown action: {action}. Use update, archive, or merge.")
"""Update, archive, or merge memories. Retries on BusyError."""
last_err = None
for attempt in range(_RETRY_MAX_ATTEMPTS):
try:
store = _get_vector_store()

Comment on lines +111 to +112
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use a per-operation DB connection in the retry path.

At Line 111, _brain_update still relies on _get_vector_store() (shared singleton). Under concurrent workers, this keeps lock contention concentrated on one connection and undermines the busy-retry design.

As per coding guidelines: "Handle concurrency by retrying on SQLITE_BUSY errors; each worker should use its own database connection".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/brainlayer/mcp/store_handler.py` around lines 111 - 112, _brain_update
currently calls the shared singleton _get_vector_store(), causing contention;
change _brain_update so that each retry attempt opens its own DB
connection/object instead of reusing _get_vector_store() (e.g., create a new
VectorStore/connection instance inside the retry loop), run the operation, and
always close/cleanup that connection in a finally block; ensure the SQLITE_BUSY
retry logic recreates a fresh per-attempt connection so concurrent workers do
not share a single connection.

if action == "archive":
ok = store.archive_chunk(chunk_id)
if not ok:
return _error_result(f"Chunk not found: {chunk_id}")
return [TextContent(type="text", text=json.dumps({"action": "archived", "chunk_id": chunk_id}))]

elif action == "update":
existing = store.get_chunk(chunk_id)
if not existing:
return _error_result(f"Chunk not found: {chunk_id}")

embedding = None
if content is not None:
loop = asyncio.get_running_loop()
model = _get_embedding_model()
embedding = await loop.run_in_executor(None, model.embed_query, content)

ok = store.update_chunk(
chunk_id=chunk_id,
content=content,
tags=tags,
importance=float(importance) if importance is not None else None,
embedding=embedding,
)
if not ok:
return _error_result(f"Update failed for: {chunk_id}")

result = {"action": "updated", "chunk_id": chunk_id, "fields": []}
if content is not None:
result["fields"].append("content")
if tags is not None:
result["fields"].append("tags")
if importance is not None:
result["fields"].append("importance")
return [TextContent(type="text", text=json.dumps(result))]

elif action == "merge":
if not merge_chunk_ids:
return _error_result("merge requires merge_chunk_ids (the duplicates to archive)")
keeper = store.get_chunk(chunk_id)
if not keeper:
return _error_result(f"Keeper chunk not found: {chunk_id}")
archived = []
failed = []
for dup_id in merge_chunk_ids:
ok = store.archive_chunk(dup_id)
if ok:
archived.append(dup_id)
else:
failed.append(dup_id)
result = {"action": "merged", "kept": chunk_id, "archived": archived, "failed": failed}
return [TextContent(type="text", text=json.dumps(result))]

else:
return _error_result(f"Unknown action: {action}. Use update, archive, or merge.")

except Exception as e:
logger.error("brain_update failed: %s", e)
return _error_result(f"brain_update error: {e}")
except Exception as e:
is_lock_error = isinstance(e, apsw.BusyError) or "locked" in str(e).lower() or "busy" in str(e).lower()
if is_lock_error and attempt < _RETRY_MAX_ATTEMPTS - 1:
delay = _retry_delay * (2**attempt)
logger.warning(
"brain_update BusyError (attempt %d/%d), retrying in %.2fs", attempt + 1, _RETRY_MAX_ATTEMPTS, delay
)
await asyncio.sleep(delay)
last_err = e
continue
logger.error("brain_update failed: %s", e)
return _error_result(f"brain_update error: {e}")


def _get_pending_store_path():
Expand All @@ -171,12 +188,35 @@ def _get_pending_store_path():


def _queue_store(item: dict) -> None:
"""Buffer a store request to JSONL when DB is locked."""
"""Buffer a store request to JSONL when DB is locked.

Enforces _QUEUE_MAX_SIZE: if the file exceeds the limit, oldest lines
are dropped to make room.
"""
path = _get_pending_store_path()
path.parent.mkdir(parents=True, exist_ok=True)

# Append the new item
with open(path, "a") as f:
f.write(json.dumps(item) + "\n")

# Enforce max size — read, trim oldest, atomic rewrite via tempfile
try:
lines = path.read_text().strip().splitlines()
if len(lines) > _QUEUE_MAX_SIZE:
trimmed = lines[-_QUEUE_MAX_SIZE:]
tmp = path.with_suffix(".tmp")
tmp.write_text("\n".join(trimmed) + "\n")
tmp.rename(path) # atomic on POSIX
logger.warning(
"Pending store queue trimmed: %d -> %d (dropped %d oldest)",
len(lines),
_QUEUE_MAX_SIZE,
len(lines) - _QUEUE_MAX_SIZE,
)
except Exception:
pass # Non-critical — queue still works, just unbounded


def _flush_pending_stores(store, embed_fn) -> int:
"""Flush pending-stores.jsonl (FIFO). Returns count flushed."""
Expand Down
Loading