Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions scripts/backfill_orchestrate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env bash
# backfill_orchestrate.sh — Full pipeline orchestrator for Gemini batch enrichment
# Handles: resume submitted → fix broken-completed → resubmit failed+unsubmitted → resume again
set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
PYTHON="$SCRIPT_DIR/../.venv/bin/python3"
BACKFILL="$SCRIPT_DIR/cloud_backfill.py"
Comment on lines +7 to +8
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Hardcoded virtual environment path may not exist.

The script assumes Python is at ../.venv/bin/python3. Consider adding a fallback or validation:

♻️ Proposed validation
 PYTHON="$SCRIPT_DIR/../.venv/bin/python3"
+if [[ ! -x "$PYTHON" ]]; then
+    echo "ERROR: Python not found at $PYTHON"
+    echo "Ensure virtual environment is set up: python3 -m venv .venv"
+    exit 1
+fi
 BACKFILL="$SCRIPT_DIR/cloud_backfill.py"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
PYTHON="$SCRIPT_DIR/../.venv/bin/python3"
BACKFILL="$SCRIPT_DIR/cloud_backfill.py"
PYTHON="$SCRIPT_DIR/../.venv/bin/python3"
if [[ ! -x "$PYTHON" ]]; then
echo "ERROR: Python not found at $PYTHON"
echo "Ensure virtual environment is set up: python3 -m venv .venv"
exit 1
fi
BACKFILL="$SCRIPT_DIR/cloud_backfill.py"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/backfill_orchestrate.sh` around lines 7 - 8, The PYTHON variable is
hardcoded to "$SCRIPT_DIR/../.venv/bin/python3" which may not exist; modify the
script to validate that path and fall back if missing: check
existence/readability of "$SCRIPT_DIR/../.venv/bin/python3" before exporting
PYTHON, if missing try locating a system python via "which python3" or "which
python" and assign that to PYTHON, and if no usable interpreter is found log a
clear error and exit; update any uses of PYTHON (e.g., invoking "$PYTHON" to run
BACKFILL/cloud_backfill.py) to rely on this validated variable.

LOG="/tmp/backfill_orchestrate.log"
API_KEY="${GOOGLE_API_KEY:-}"

if [[ -z "$API_KEY" ]]; then
echo "ERROR: GOOGLE_API_KEY not set"
exit 1
fi

log() { echo "[$(date '+%H:%M:%S')] $*" | tee -a "$LOG"; }

log "=== ORCHESTRATOR START ==="
cd "$SCRIPT_DIR/.."

# PHASE 1: Resume all submitted batches (may already be running)
log "Phase 1: Resume submitted batches..."
GOOGLE_API_KEY="$API_KEY" "$PYTHON" -u "$BACKFILL" --resume 2>&1 | tee -a "$LOG"

# PHASE 2: Fix "completed" batches that had 0 imports (broken download bug)
# The broken run completed all its batches at 2026-03-14T12:21 UTC.
# Current (correct) run completes batches after 2026-03-14T12:22 UTC.
log ""
log "Phase 2: Checking for incorrectly-completed batches (0 imports due to broken download)..."
"$PYTHON" -u - <<'PYEOF' 2>&1 | tee -a "$LOG"
import apsw
from pathlib import Path

cp_db = Path.home() / '.local/share/brainlayer/enrichment_checkpoints.db'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low scripts/backfill_orchestrate.sh:35

Phase 2's inline Python hardcodes the checkpoint database path to ~/.local/share/brainlayer/enrichment_checkpoints.db, while cloud_backfill.py uses get_db_path().with_name("enrichment_checkpoints.db") which respects the BRAINLAYER_DB environment variable. When BRAINLAYER_DB is set to a custom path, Phase 2 queries the wrong database — broken batches at the custom path aren't reset, leaving them incorrectly marked as completed with 0 imports. Pass the resolved path from cloud_backfill.py to Phase 2 instead of hardcoding.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file scripts/backfill_orchestrate.sh around line 35:

Phase 2's inline Python hardcodes the checkpoint database path to `~/.local/share/brainlayer/enrichment_checkpoints.db`, while `cloud_backfill.py` uses `get_db_path().with_name("enrichment_checkpoints.db")` which respects the `BRAINLAYER_DB` environment variable. When `BRAINLAYER_DB` is set to a custom path, Phase 2 queries the wrong database — broken batches at the custom path aren't reset, leaving them incorrectly marked as completed with 0 imports. Pass the resolved path from `cloud_backfill.py` to Phase 2 instead of hardcoding.

conn = apsw.Connection(str(cp_db))
Comment on lines +35 to +36
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Hardcoded database path violates coding guidelines.

Line 35 hardcodes the path ~/.local/share/brainlayer/enrichment_checkpoints.db instead of using paths.py:get_db_path(). As per coding guidelines: "All scripts and CLI must use paths.py:get_db_path() for database path resolution instead of hardcoding paths."

🐛 Proposed fix to use paths.py
 "$PYTHON" -u - <<'PYEOF' 2>&1 | tee -a "$LOG"
 import apsw
 from pathlib import Path
+import sys
+sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
+from brainlayer.paths import get_db_path

-cp_db = Path.home() / '.local/share/brainlayer/enrichment_checkpoints.db'
+# enrichment_checkpoints.db is in the same directory as the main DB
+cp_db = get_db_path().parent / 'enrichment_checkpoints.db'
 conn = apsw.Connection(str(cp_db))

Note: The inline Python heredoc may need adjustment since __file__ isn't available. Consider extracting this logic to a separate Python module.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/backfill_orchestrate.sh` around lines 35 - 36, Replace the hardcoded
cp_db path with the shared path resolver: import and call paths.get_db_path() to
obtain the DB path and use that value when constructing cp_db and passing it to
apsw.Connection (currently where cp_db and apsw.Connection(...) are used); if
the script uses an inline Python heredoc where __file__ is unavailable, extract
the DB-path resolution into a small Python module or a callable that imports
paths.get_db_path() so the script can invoke it instead of hardcoding
"~/.local/share/brainlayer/enrichment_checkpoints.db".

conn.setbusytimeout(10000)

# Find completed batches from the broken 3rd run (completed at 12:21 UTC)
# These had 0 imports due to Files.download(name=) bug that has since been fixed
rows = list(conn.cursor().execute("""
SELECT batch_id, chunk_count, completed_at
FROM enrichment_checkpoints
WHERE status = 'completed'
AND completed_at < '2026-03-14T12:22:00'
"""))

print(f"Found {len(rows)} broken 'completed' batches (before fix at 12:22 UTC)")
if rows:
# Reset them to 'submitted' so --resume will re-process them
conn.cursor().execute("""
UPDATE enrichment_checkpoints
SET status = 'submitted', completed_at = NULL
WHERE status = 'completed'
AND completed_at < '2026-03-14T12:22:00'
""")
print(f"Reset {len(rows)} batches to 'submitted'")
else:
print("No broken batches found - all completed batches imported correctly")

conn.close()
PYEOF
Comment on lines +26 to +62
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Phase 2 contains one-time fix logic with hardcoded timestamp.

This phase resets batches completed before 2026-03-14T12:22:00 to address a historical bug. Once executed successfully, this logic becomes dead code. Consider:

  1. Adding a guard to skip if no matching rows exist (already done, but could log more explicitly)
  2. Removing this phase after the fix is confirmed applied
  3. Moving to a separate migration script

Would you like me to create a separate one-time migration script that can be removed after execution?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/backfill_orchestrate.sh` around lines 26 - 62, Phase 2 currently
contains one-time fix logic that unconditionally updates enrichment_checkpoints
rows with completed_at < '2026-03-14T12:22:00'; extract this into a dedicated,
idempotent migration script (e.g., a one-off script that opens the same DB at
cp_db, queries enrichment_checkpoints for rows with status='completed' and
completed_at < '2026-03-14T12:22:00' using the same SQL used in the block, logs
explicitly when zero rows are found and exits without changing state, and when
rows exist performs the UPDATE inside a transaction and logs the exact number
updated), then remove the Phase 2 block from scripts/backfill_orchestrate.sh so
the main orchestrator no longer contains hardcoded historical timestamps. Ensure
the migration uses the same connection setup (apsw.Connection, setbusytimeout),
explicit logging of found vs updated counts, and is safe to re-run (idempotent).


# PHASE 3: Resume the reset batches
log ""
log "Phase 3: Resume reset batches..."
GOOGLE_API_KEY="$API_KEY" "$PYTHON" -u "$BACKFILL" --resume 2>&1 | tee -a "$LOG"

# PHASE 4: Resubmit failed batches + submit unsubmitted JSONL files
log ""
log "Phase 4: Resubmit failed/unsubmitted batches..."
GOOGLE_API_KEY="$API_KEY" "$PYTHON" -u "$BACKFILL" --submit-only 2>&1 | tee -a "$LOG"

log ""
log "Phase 4 done. Waiting 60s for jobs to register before polling..."
sleep 60

# PHASE 5: Resume all newly submitted batches
log ""
log "Phase 5: Resume newly submitted batches (poll + import)..."
GOOGLE_API_KEY="$API_KEY" "$PYTHON" -u "$BACKFILL" --resume 2>&1 | tee -a "$LOG"

# Final stats
log ""
log "=== FINAL STATUS ==="
GOOGLE_API_KEY="$API_KEY" "$PYTHON" -u "$BACKFILL" --status 2>&1 | tee -a "$LOG"

log "=== ORCHESTRATOR COMPLETE ==="
106 changes: 106 additions & 0 deletions scripts/batch_submit_paced.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""Paced batch submission — submits one batch at a time with delays to avoid 429s.
Bypasses VectorStore entirely to avoid DB lock issues with BrainBar.

Usage: GOOGLE_API_KEY=... python3 scripts/batch_submit_paced.py [--delay 45] [--max-retries 5]
"""
import glob, json, os, sys, time
from pathlib import Path

try:
import google.generativeai as genai
except ImportError:
print("pip install google-generativeai"); sys.exit(1)

API_KEY = os.environ.get("GOOGLE_API_KEY")
if not API_KEY:
print("ERROR: GOOGLE_API_KEY required"); sys.exit(1)

genai.configure(api_key=API_KEY)

DELAY = int(sys.argv[sys.argv.index("--delay") + 1]) if "--delay" in sys.argv else 45
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low scripts/batch_submit_paced.py:21

If --delay is passed without a value (e.g., python script.py --delay), sys.argv.index("--delay") + 1 equals len(sys.argv), causing an IndexError. Consider using argparse or validating that a value follows the flag.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file scripts/batch_submit_paced.py around line 21:

If `--delay` is passed without a value (e.g., `python script.py --delay`), `sys.argv.index("--delay") + 1` equals `len(sys.argv)`, causing an `IndexError`. Consider using `argparse` or validating that a value follows the flag.

Evidence trail:
scripts/batch_submit_paced.py lines 21-22 at REVIEWED_COMMIT show argument parsing without bounds checking: `DELAY = int(sys.argv[sys.argv.index("--delay") + 1]) if "--delay" in sys.argv else 45`. When `--delay` is the last argument, `sys.argv.index("--delay") + 1` equals `len(sys.argv)`, and indexing that position raises IndexError.

MAX_RETRIES = int(sys.argv[sys.argv.index("--max-retries") + 1]) if "--max-retries" in sys.argv else 10
Comment on lines +21 to +22
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

CLI argument parsing lacks bounds checking.

If --delay or --max-retries is provided without a value (e.g., as the last argument), sys.argv.index(...) + 1 will raise IndexError. Consider using argparse for robust CLI handling.

♻️ Proposed fix using argparse
+import argparse
 import glob, json, os, sys, time
 from pathlib import Path
 ...
-DELAY = int(sys.argv[sys.argv.index("--delay") + 1]) if "--delay" in sys.argv else 45
-MAX_RETRIES = int(sys.argv[sys.argv.index("--max-retries") + 1]) if "--max-retries" in sys.argv else 10
+parser = argparse.ArgumentParser(description="Paced batch submission for Gemini")
+parser.add_argument("--delay", type=int, default=45, help="Delay between submissions (seconds)")
+parser.add_argument("--max-retries", type=int, default=10, help="Max retries per batch")
+args = parser.parse_args()
+DELAY = args.delay
+MAX_RETRIES = args.max_retries
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/batch_submit_paced.py` around lines 21 - 22, Replace the fragile
sys.argv lookups for DELAY and MAX_RETRIES with proper argparse-based parsing:
create an ArgumentParser and add arguments "--delay" and "--max-retries" with
type=int and defaults 45 and 10 respectively, then assign parsed values to DELAY
and MAX_RETRIES; also add basic validation (e.g., argparse's type check or a
post-parse check to ensure non-negative integers) so missing values or
out-of-range inputs don't raise IndexError or produce invalid state.

MODEL = "gemini-2.5-flash-lite"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Model differs from cloud_backfill.py and is not configurable.

This script hardcodes gemini-2.5-flash-lite while cloud_backfill.py defaults to gemini-2.5-flash (per context snippet 3, line 1070). Consider making the model configurable via CLI argument or environment variable for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/batch_submit_paced.py` at line 23, The MODEL constant is hardcoded to
"gemini-2.5-flash-lite" in scripts/batch_submit_paced.py; make it configurable
to match cloud_backfill.py's default and allow overrides. Add a CLI argument
(e.g., --model) or read an environment variable (e.g., os.environ.get("MODEL"))
and set MODEL from that with a default of "gemini-2.5-flash" (or reuse
cloud_backfill.py's default), then replace the hardcoded MODEL usage with this
configurable value; update any function that references MODEL (e.g., the batch
submit entrypoint) to use the new configurable variable.


# Track what we've already submitted this run
STATE_FILE = Path(__file__).parent / "backfill_data" / ".paced_state.json"
submitted = {}
if STATE_FILE.exists():
submitted = json.loads(STATE_FILE.read_text())
Comment on lines +28 to +29
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low scripts/batch_submit_paced.py:28

If STATE_FILE exists but contains invalid JSON (e.g., empty file from an interrupted write on line 83), json.loads raises JSONDecodeError and crashes the script. The write on line 83 uses write_text without atomic rename, so a crash mid-write leaves a truncated file. Consider catching JSONDecodeError and treating a corrupted state file as empty, or use atomic write-then-rename to prevent truncation.

 submitted = {}
 if STATE_FILE.exists():
-    submitted = json.loads(STATE_FILE.read_text())
+    try:
+        submitted = json.loads(STATE_FILE.read_text())
+    except json.JSONDecodeError:
+        submitted = {}
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file scripts/batch_submit_paced.py around lines 28-29:

If `STATE_FILE` exists but contains invalid JSON (e.g., empty file from an interrupted write on line 83), `json.loads` raises `JSONDecodeError` and crashes the script. The write on line 83 uses `write_text` without atomic rename, so a crash mid-write leaves a truncated file. Consider catching `JSONDecodeError` and treating a corrupted state file as empty, or use atomic write-then-rename to prevent truncation.

Evidence trail:
scripts/batch_submit_paced.py lines 27-29: `STATE_FILE = Path(__file__).parent / "backfill_data" / ".paced_state.json"` followed by `if STATE_FILE.exists(): submitted = json.loads(STATE_FILE.read_text())` - no JSONDecodeError handling.

scripts/batch_submit_paced.py line 83 (within the batch creation loop): `STATE_FILE.write_text(json.dumps(submitted, indent=2))` - direct write without atomic rename pattern.

Commit: REVIEWED_COMMIT

Comment on lines +25 to +29
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if both scripts can see the same batch files and use different tracking
echo "=== batch_submit_paced.py state file ==="
cat scripts/backfill_data/.paced_state.json 2>/dev/null || echo "State file not found"

echo ""
echo "=== cloud_backfill.py checkpoint tracking ==="
rg -n "enrichment_checkpoints" scripts/cloud_backfill.py | head -20

echo ""
echo "=== Both scripts glob batch_*.jsonl ==="
rg -n "batch_\*\.jsonl" scripts/

Repository: EtanHey/brainlayer

Length of output: 801


🏁 Script executed:

#!/bin/bash

echo "=== EXPORT_DIR in cloud_backfill.py ==="
rg -n "EXPORT_DIR\s*=" scripts/cloud_backfill.py | head -5

echo ""
echo "=== EXPORT_DIR in cloud_stream.py ==="
rg -n "EXPORT_DIR\s*=" scripts/cloud_stream.py | head -5

echo ""
echo "=== Full context around batch file discovery in batch_submit_paced.py ==="
sed -n '30,35p' scripts/batch_submit_paced.py

echo ""
echo "=== Full context around batch file discovery in cloud_backfill.py ==="
sed -n '260,270p' scripts/cloud_backfill.py

echo ""
echo "=== Check for any lock files or coordination mechanisms ==="
rg -n "\.lock|flock|fcntl" scripts/ | grep -E "(batch_submit_paced|cloud_backfill)" || echo "No lock mechanisms found"

Repository: EtanHey/brainlayer

Length of output: 1238


🏁 Script executed:

#!/bin/bash

echo "=== What does batch_submit_paced.py do with batch files? (full submission logic) ==="
sed -n '40,98p' scripts/batch_submit_paced.py

echo ""
echo "=== What does cloud_backfill.py do in --submit-only mode? ==="
rg -n "submit.only|submit_only" scripts/cloud_backfill.py -A 3 -B 1 | head -40

echo ""
echo "=== Does batch_submit_paced.py reference enrichment_checkpoints.db? ==="
rg -n "enrichment_checkpoints|checkpoint" scripts/batch_submit_paced.py || echo "No checkpoint DB references found"

echo ""
echo "=== Check README or documentation for script usage patterns ==="
head -50 scripts/README.md 2>/dev/null || echo "README not found"

Repository: EtanHey/brainlayer

Length of output: 3649


🏁 Script executed:

#!/bin/bash

echo "=== batch_submit_paced.py header and docstring ==="
head -25 scripts/batch_submit_paced.py

echo ""
echo "=== cloud_backfill.py header/module docstring ==="
head -30 scripts/cloud_backfill.py | grep -A 20 "^#\|^\"\"\"" | head -20

echo ""
echo "=== Check for any usage notes or comments in both scripts ==="
rg -n "Note|WARNING|IMPORTANT|TODO|FIXME" scripts/batch_submit_paced.py scripts/cloud_backfill.py | grep -i "submit\|coordinate\|exclusive\|together"

Repository: EtanHey/brainlayer

Length of output: 1706


Dual-tracking with cloud_backfill.py creates double-submission risk.

Both scripts search scripts/backfill_data/batch_*.jsonl but use independent tracking: batch_submit_paced.py records submissions in .paced_state.json while cloud_backfill.py uses enrichment_checkpoints.db. Since batch_submit_paced.py does not check the database, it will resubmit files already submitted by cloud_backfill.py --submit-only, and vice versa. No coordination mechanism (locks, shared state) exists to prevent concurrent execution.

Given that batch_submit_paced.py is designed to "bypass VectorStore entirely to avoid DB lock issues," it should either:

  1. Check enrichment_checkpoints.db before submission to detect already-submitted files
  2. Use a lock file to ensure mutual exclusion with cloud_backfill.py
  3. Document that these scripts must never run against the same directory in parallel
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/batch_submit_paced.py` around lines 25 - 29, batch_submit_paced.py
currently only reads STATE_FILE (.paced_state.json) into the submitted dict so
it can resubmit files already recorded by cloud_backfill.py; update
batch_submit_paced.py to first consult the shared enrichment_checkpoints.db used
by cloud_backfill.py for each candidate file (or batch id) before submitting,
and treat any DB-recorded file as already-submitted (i.e., skip and add to
submitted if not present); additionally add a simple mutual-exclusion lock
(create and atomically acquire a lock file at start, release on exit) around the
main submit loop to prevent concurrent runs with cloud_backfill.py if the DB
check cannot be used, referencing STATE_FILE, submitted, .paced_state.json, and
enrichment_checkpoints.db in your changes so both checks/locking prevent double
submission.


# Find all batch JSONL files
batch_files = sorted(glob.glob(str(Path(__file__).parent / "backfill_data" / "batch_*.jsonl")))
print(f"Total batch files: {len(batch_files)}")
print(f"Already submitted (this run): {len(submitted)}")
print(f"Delay between submissions: {DELAY}s")
print(f"Max retries per batch: {MAX_RETRIES}")
print(f"Model: {MODEL}")
print()

created = 0
skipped = 0
failed = 0

for i, fpath in enumerate(batch_files):
fname = Path(fpath).name
if fname in submitted:
skipped += 1
continue

# Count chunks in this file
with open(fpath) as f:
chunks = sum(1 for _ in f)

print(f"[{i+1}/{len(batch_files)}] {fname} ({chunks} chunks)")

# Upload file
for attempt in range(MAX_RETRIES):
try:
print(f" Uploading...", end="", flush=True)
uploaded = genai.upload_file(fpath)
print(f" ok ({uploaded.name})")
break
except Exception as e:
wait = min(30 * (attempt + 1), 300)
print(f" 429, waiting {wait}s (attempt {attempt+1}/{MAX_RETRIES})")
time.sleep(wait)
Comment on lines +63 to +66
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Upload retry assumes all errors are rate-limit 429s.

The exception handler treats every failure as a rate-limit error, printing "429" and retrying. Non-transient errors (authentication, network, malformed request) will be retried unnecessarily with misleading output.

🐛 Proposed fix to differentiate error types
         except Exception as e:
-            wait = min(30 * (attempt + 1), 300)
-            print(f" 429, waiting {wait}s (attempt {attempt+1}/{MAX_RETRIES})")
-            time.sleep(wait)
+            err_str = str(e)
+            if "429" in err_str or "RESOURCE_EXHAUSTED" in err_str:
+                wait = min(30 * (attempt + 1), 300)
+                print(f" rate-limited, waiting {wait}s (attempt {attempt+1}/{MAX_RETRIES})")
+                time.sleep(wait)
+            else:
+                print(f" ERROR: {e}")
+                failed += 1
+                break
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
wait = min(30 * (attempt + 1), 300)
print(f" 429, waiting {wait}s (attempt {attempt+1}/{MAX_RETRIES})")
time.sleep(wait)
except Exception as e:
err_str = str(e)
if "429" in err_str or "RESOURCE_EXHAUSTED" in err_str:
wait = min(30 * (attempt + 1), 300)
print(f" rate-limited, waiting {wait}s (attempt {attempt+1}/{MAX_RETRIES})")
time.sleep(wait)
else:
print(f" ERROR: {e}")
failed += 1
break
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/batch_submit_paced.py` around lines 63 - 66, The current except
Exception as e block in scripts/batch_submit_paced.py treats all failures as
HTTP 429 rate-limits; update it to inspect the exception (e) and only apply the
429 retry/backoff logic when the error indicates an HTTP 429 response (e.g.,
check e.response.status_code or isinstance(e,
HTTPStatusError)/requests.HTTPError with status==429); for other exceptions
(authentication 401/403, client errors 4xx other than 429, or non-HTTP
exceptions like network/validation errors) log the actual error (include e) and
re-raise or fail immediately instead of sleeping and retrying; keep the existing
wait/backoff calculation (wait = min(30 * (attempt + 1), 300)) and MAX_RETRIES
usage for the 429 branch, and ensure the printed message includes the real
status or exception text rather than always "429".

else:
print(f" FAILED upload after {MAX_RETRIES} retries, skipping")
failed += 1
continue

# Create batch job
for attempt in range(MAX_RETRIES):
try:
print(f" Creating batch job...", end="", flush=True)
job = genai.batches.create(
model=f"models/{MODEL}",
src=uploaded.uri,
config={"display_name": fname},
)
print(f" ok ({job.name}, {job.state})")
submitted[fname] = {"job_name": job.name, "chunks": chunks, "time": time.strftime("%Y-%m-%dT%H:%M:%S")}
STATE_FILE.write_text(json.dumps(submitted, indent=2))
Comment on lines +82 to +83
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

State file write is not atomic.

If the process crashes during write_text(), the state file could be corrupted or truncated, losing track of submitted jobs. Consider atomic write (write to temp file, then rename).

♻️ Proposed atomic write
+import tempfile
 ...
-            STATE_FILE.write_text(json.dumps(submitted, indent=2))
+            # Atomic write to prevent corruption on crash
+            tmp = STATE_FILE.with_suffix('.tmp')
+            tmp.write_text(json.dumps(submitted, indent=2))
+            tmp.rename(STATE_FILE)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
submitted[fname] = {"job_name": job.name, "chunks": chunks, "time": time.strftime("%Y-%m-%dT%H:%M:%S")}
STATE_FILE.write_text(json.dumps(submitted, indent=2))
import tempfile
...
submitted[fname] = {"job_name": job.name, "chunks": chunks, "time": time.strftime("%Y-%m-%dT%H:%M:%S")}
# Atomic write to prevent corruption on crash
tmp = STATE_FILE.with_suffix('.tmp')
tmp.write_text(json.dumps(submitted, indent=2))
tmp.rename(STATE_FILE)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/batch_submit_paced.py` around lines 82 - 83, The
STATE_FILE.write_text call is not atomic and can corrupt state if the process
crashes; modify the code that updates submitted (the dict named submitted and
the STATE_FILE variable) to perform an atomic write: serialize submitted to a
string (json.dumps with indent) then write that to a temporary file in the same
directory (e.g., STATE_FILE.with_suffix(".tmp") or similar) and atomically
replace the original via os.replace or Path.replace; ensure exceptions are
handled so the temp file is cleaned up if needed and the rename only occurs
after the write completes successfully.

created += 1
break
except Exception as e:
if "429" in str(e) or "RESOURCE_EXHAUSTED" in str(e):
wait = min(60 * (attempt + 1), 600)
print(f" 429, waiting {wait}s (attempt {attempt+1}/{MAX_RETRIES})")
time.sleep(wait)
else:
print(f" ERROR: {e}")
failed += 1
break
else:
print(f" FAILED create after {MAX_RETRIES} retries, skipping")
failed += 1
continue
Comment on lines +91 to +98
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Non-rate-limit errors during job creation allow fall-through to delay.

When a non-rate-limit error occurs (lines 91-94), the loop breaks but execution continues to the pacing delay (lines 100-103) before moving to the next file. This is inconsistent with the retry exhaustion path (line 98 uses continue).

♻️ Proposed fix for consistent flow
             else:
                 print(f" ERROR: {e}")
                 failed += 1
-                break
+                continue
     else:
         print(f"  FAILED create after {MAX_RETRIES} retries, skipping")
         failed += 1
         continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
else:
print(f" ERROR: {e}")
failed += 1
break
else:
print(f" FAILED create after {MAX_RETRIES} retries, skipping")
failed += 1
continue
else:
print(f" ERROR: {e}")
failed += 1
continue
else:
print(f" FAILED create after {MAX_RETRIES} retries, skipping")
failed += 1
continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/batch_submit_paced.py` around lines 91 - 98, The non-rate-limit error
branch in scripts/batch_submit_paced.py currently does "failed += 1; break"
which falls through to the pacing sleep; change that break to "continue" so that
after recording the failure (failed) the code immediately proceeds to the next
file and skips the pacing delay; keep the rate-limit handling path untouched so
rate-limit errors still fall through to the pacing/sleep logic; reference the
loop that uses MAX_RETRIES and the failed counter and the pacing sleep block.


# Pace ourselves
if i < len(batch_files) - 1:
print(f" Waiting {DELAY}s before next...")
time.sleep(DELAY)

print(f"\nDone! Created: {created}, Skipped (already done): {skipped}, Failed: {failed}")
print(f"State saved to {STATE_FILE}")
56 changes: 56 additions & 0 deletions scripts/launchd/com.brainlayer.enrichment.plist
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.brainlayer.enrichment</string>

<!-- Unified enrichment daemon — replaces com.brainlayer.enrich
Runs realtime enrichment hourly on recent chunks.
For batch/local modes, use the CLI or brain_enrich MCP tool. -->

<key>ProgramArguments</key>
<array>
<string>__BRAINLAYER_BIN__</string>
<string>enrich</string>
<string>--mode</string>
<string>realtime</string>
<string>--since-hours</string>
<string>24</string>
<string>--limit</string>
<string>50</string>
</array>

<key>StartInterval</key>
<integer>3600</integer>

<key>StandardOutPath</key>
<string>__HOME__/.local/share/brainlayer/logs/enrichment.log</string>
<key>StandardErrorPath</key>
<string>__HOME__/.local/share/brainlayer/logs/enrichment.err</string>

<key>EnvironmentVariables</key>
<dict>
<key>PATH</key>
<string>/usr/local/bin:/usr/bin:/bin:__HOME__/.local/bin</string>
<key>PYTHONUNBUFFERED</key>
<string>1</string>
<key>BRAINLAYER_STALL_TIMEOUT</key>
<string>300</string>
<key>GOOGLE_API_KEY</key>
<string>__GOOGLE_API_KEY__</string>
<!-- Optional: set regional endpoint for lower latency -->
<!-- <key>GOOGLE_CLOUD_REGION</key>
<string>us-central1</string> -->
</dict>
Comment on lines +32 to +45
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding Groq as primary backend per project guidelines.

Based on learnings, Groq should be the primary enrichment backend with Gemini as fallback. The current plist only configures GOOGLE_API_KEY for Gemini. Consider adding the Groq backend configuration:

Suggested addition to EnvironmentVariables
<key>GROQ_API_KEY</key>
<string>__GROQ_API_KEY__</string>
<key>BRAINLAYER_ENRICH_BACKEND</key>
<string>groq</string>

Note: The install.sh script already retrieves GROQ_API_KEY from 1Password (line 18), so the placeholder substitution would work.

Based on learnings: "Use Groq as primary enrichment backend (configured in launchd plist); fall back to Gemini, then Ollama as offline last-resort"

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/launchd/com.brainlayer.enrichment.plist` around lines 32 - 45, Update
the EnvironmentVariables dict in the launchd plist to include Groq as the
primary enrichment backend: add keys GROQ_API_KEY and BRAINLAYER_ENRICH_BACKEND
with values __GROQ_API_KEY__ and "groq" respectively, so the service prefers
Groq and still retains GOOGLE_API_KEY for Gemini fallback; ensure the new keys
sit alongside the existing EnvironmentVariables entries (e.g., PATH,
PYTHONUNBUFFERED, BRAINLAYER_STALL_TIMEOUT, GOOGLE_API_KEY) and confirm
install.sh (which already fetches GROQ_API_KEY) will substitute the placeholder.


<key>RunAtLoad</key>
<true/>

<key>Nice</key>
<integer>15</integer>

<key>ProcessType</key>
<string>Background</string>
</dict>
</plist>
14 changes: 11 additions & 3 deletions scripts/launchd/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,31 @@ case "${1:-all}" in
install_plist index
;;
enrich)
# Legacy — install old enrich plist
install_plist enrich
;;
enrichment)
# New unified enrichment plist (replaces enrich)
install_plist enrichment
;;
checkpoint)
install_plist wal-checkpoint
;;
all)
install_plist index
install_plist enrich
install_plist enrichment
install_plist wal-checkpoint
# Remove old enrich plist if present
remove_plist enrich 2>/dev/null || true
;;
remove)
remove_plist index
remove_plist enrich
remove_plist enrich 2>/dev/null || true
remove_plist enrichment 2>/dev/null || true
remove_plist wal-checkpoint
;;
*)
echo "Usage: $0 [index|enrich|checkpoint|all|remove]"
echo "Usage: $0 [index|enrich|enrichment|checkpoint|all|remove]"
exit 1
;;
esac
Expand Down
Loading
Loading