From 45760287e7c2623cb860c32377dc33c299bb1e67 Mon Sep 17 00:00:00 2001 From: Oliver Le Date: Wed, 6 May 2026 01:47:41 -0700 Subject: [PATCH] =?UTF-8?q?feat(v0.7.2):=20data-flow=20correctness=20?= =?UTF-8?q?=E2=80=94=20fix=20the=2027000=E2=86=9235=20dashboard=20gap?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex deep audit found 12 bugs in event flow; this PR ships fixes for the SDK side (Bugs 1, 2, 7, 8, 9, 10, 11). Companion cloud PR fixes projector. After both land, the dashboard accurately reflects events. P0 — DEFAULT SYNC NOW INCLUDES THE CORPUS (Bug 1) - _core.py:_cloud_sync_session now defaults sync_mode='full', not metrics_only - New CLI: `gradata sync --full` flips mode + immediately backfills events - Clear WARN log when sync_mode='metrics_only' so users know dashboard will be empty P0 — STOP EMITTING events WITH session=0 (Bug 2) - Brain.emit() preserves None when caller doesn't provide a session - _events.emit() detects active session from brain state at write time - Session only coerced to int at serialization, never used as 0 sentinel P1 — POPULATE EVENT IDENTITY AT EMIT TIME (Bug 7) - _events.py now writes event_id (ULID), device_id (dev_), content_hash (sha256) - Migration 004: drops legacy unique on (tenant_id, ts, type, source); adds unique (brain_id, event_id) - Two events with identical (ts, type, source) but different data now coexist - Replaces the false-green test_new_emit_leaves_identity_columns_null_for_now (Bug 8) - New regression test asserts ULID format + uniqueness + content_hash determinism P1 — BRAINCONFIG PROPAGATION (Bug 9) - New BrainConfig.load(brain_dir) helper in _config.py - Used by inject_brain_rules.py, agent_precontext.py, jit_inject.py, middleware/_core.py - max_recall_tokens + ranker now respected across all 4 injection paths - New tests/test_brain_config_propagation.py covers all 4 sites P1 — REMOVE UNDECLARED PYYAML DEP (Bug 10) - hooks/adapters/hermes.py: small in-tree YAML reader/writer for Hermes config - No more sudden ImportError when user runs gradata install --agent hermes P2 — README/CLI CONSISTENCY (Bug 11) - gradata init now exposes --no-interactive flag (matches docs) - gradata audit smoke-tests pass Tests: 4174 passed / 2 skipped / 5 deselected (sandbox-only) on socket-free run. ruff check + ruff format --check + pyright src/ all clean. Council (3-vendor full) verdict on the wedge: Convergence-as-a-Service. This PR is the precondition — the dashboard must accurately reflect data before we can pitch convergence proof to YC. Co-authored-by: Codex --- Gradata/bench/pmr_100.py | 401 ++++++++++-------- Gradata/examples/basic_usage.py | 2 + .../examples/domain-profiles/call_profile.py | 260 +++++++----- .../examples/domain-profiles/sales_profile.py | 69 +-- Gradata/examples/with_claude_code.py | 1 + Gradata/examples/with_openai.py | 1 + Gradata/scripts/migrate_legacy_scopes.py | 4 +- Gradata/src/gradata/_config.py | 5 + Gradata/src/gradata/_context_packet.py | 1 + Gradata/src/gradata/_core.py | 28 +- Gradata/src/gradata/_events.py | 140 +++++- .../004_event_identity_unique_key.py | 165 +++++++ .../src/gradata/_migrations/device_uuid.py | 26 +- Gradata/src/gradata/_mine_transcripts.py | 19 + Gradata/src/gradata/brain.py | 6 +- Gradata/src/gradata/cli.py | 36 ++ Gradata/src/gradata/cloud/sync.py | 2 + Gradata/src/gradata/hooks/adapters/hermes.py | 98 ++++- Gradata/src/gradata/hooks/agent_precontext.py | 8 + .../src/gradata/hooks/inject_brain_rules.py | 4 +- Gradata/src/gradata/hooks/jit_inject.py | 8 + Gradata/src/gradata/middleware/_core.py | 13 +- .../tests/test_brain_config_propagation.py | 54 +++ Gradata/tests/test_brain_events.py | 5 + .../test_migration_002_event_identity.py | 32 +- 25 files changed, 1024 insertions(+), 364 deletions(-) create mode 100644 Gradata/src/gradata/_migrations/004_event_identity_unique_key.py create mode 100644 Gradata/tests/test_brain_config_propagation.py diff --git a/Gradata/bench/pmr_100.py b/Gradata/bench/pmr_100.py index dadb3ad0..4729c3a5 100644 --- a/Gradata/bench/pmr_100.py +++ b/Gradata/bench/pmr_100.py @@ -38,18 +38,18 @@ Output: bench/results/pmr_100_.json + a one-paragraph summary to stdout suitable for README/HN posts. """ + from __future__ import annotations import argparse import json import random -import shutil import sys import tempfile import time -from collections import Counter, defaultdict +from collections import defaultdict from dataclasses import asdict, dataclass, field -from datetime import datetime, timezone +from datetime import UTC, datetime from pathlib import Path from typing import Any @@ -57,21 +57,21 @@ HERE = Path(__file__).resolve().parent sys.path.insert(0, str(HERE.parent / "src")) -from gradata import Brain, Lesson, LessonState -from gradata._types import CorrectionType - +from gradata import Brain # --------------------------------------------------------------------------- # Test corpus — scripted corrections covering each CorrectionType # --------------------------------------------------------------------------- + @dataclass class Scenario: """One PMR-100 scenario: a correction + a probe that should match it.""" - correction_class: str # CorrectionType value - draft: str # what the agent originally produced - final: str # what the user corrected it to - probe: str # later task that should fire the rule + + correction_class: str # CorrectionType value + draft: str # what the agent originally produced + final: str # what the user corrected it to + probe: str # later task that should fire the rule expected_keywords: list[str] # words that should appear in retrieved rule @@ -79,141 +79,186 @@ class Scenario: # Add more for richer evaluation. 100 sessions sample these with replacement. SCENARIOS: list[Scenario] = [ # TONE / BEHAVIORAL — language register - Scenario("BEHAVIORAL", - draft="Hey there! Hope you're having an awesome day! 🎉", - final="Hi. Following up on our discussion.", - probe="draft an email to a new prospect", - expected_keywords=["formal", "tone", "casual", "remove", "professional"]), - Scenario("BEHAVIORAL", - draft="I'd like to humbly suggest, if you don't mind, that perhaps we might consider...", - final="I recommend we...", - probe="suggest an architectural change", - expected_keywords=["direct", "concise", "remove", "hedge"]), - Scenario("BEHAVIORAL", - draft="The system utilizes advanced ML capabilities to provide synergistic solutions", - final="The system uses ML to solve X.", - probe="describe a feature in our docs", - expected_keywords=["plain", "buzzword", "remove", "concrete"]), - Scenario("BEHAVIORAL", - draft="It is genuinely a game-changer that will make this straightforward", - final="This makes X easier.", - probe="write a marketing claim", - expected_keywords=["genuinely", "game-changer", "straightforward", "ban"]), - + Scenario( + "BEHAVIORAL", + draft="Hey there! Hope you're having an awesome day! 🎉", + final="Hi. Following up on our discussion.", + probe="draft an email to a new prospect", + expected_keywords=["formal", "tone", "casual", "remove", "professional"], + ), + Scenario( + "BEHAVIORAL", + draft="I'd like to humbly suggest, if you don't mind, that perhaps we might consider...", + final="I recommend we...", + probe="suggest an architectural change", + expected_keywords=["direct", "concise", "remove", "hedge"], + ), + Scenario( + "BEHAVIORAL", + draft="The system utilizes advanced ML capabilities to provide synergistic solutions", + final="The system uses ML to solve X.", + probe="describe a feature in our docs", + expected_keywords=["plain", "buzzword", "remove", "concrete"], + ), + Scenario( + "BEHAVIORAL", + draft="It is genuinely a game-changer that will make this straightforward", + final="This makes X easier.", + probe="write a marketing claim", + expected_keywords=["genuinely", "game-changer", "straightforward", "ban"], + ), # FORMAT / PREFERENCE — structural patterns - Scenario("PREFERENCE", - draft="Here are the steps:\n- Step one\n- Step two\n- Step three", - final="Here are the steps:\n1. Step one\n2. Step two\n3. Step three", - probe="enumerate items in a how-to", - expected_keywords=["numbered", "list", "ordered"]), - Scenario("PREFERENCE", - draft="The price is $1,000 — that's affordable!", - final="The price is $1,000.", - probe="describe pricing", - expected_keywords=["em dash", "remove", "no"]), - Scenario("PREFERENCE", - draft="**Important:** read this carefully", - final="Important: read this carefully", - probe="emphasize a warning in docs", - expected_keywords=["bold", "no", "remove", "asterisks"]), - Scenario("PREFERENCE", - draft="2026/04/30", - final="April 30, 2026", - probe="format a date in user-facing copy", - expected_keywords=["date", "format", "april", "long"]), - + Scenario( + "PREFERENCE", + draft="Here are the steps:\n- Step one\n- Step two\n- Step three", + final="Here are the steps:\n1. Step one\n2. Step two\n3. Step three", + probe="enumerate items in a how-to", + expected_keywords=["numbered", "list", "ordered"], + ), + Scenario( + "PREFERENCE", + draft="The price is $1,000 — that's affordable!", + final="The price is $1,000.", + probe="describe pricing", + expected_keywords=["em dash", "remove", "no"], + ), + Scenario( + "PREFERENCE", + draft="**Important:** read this carefully", + final="Important: read this carefully", + probe="emphasize a warning in docs", + expected_keywords=["bold", "no", "remove", "asterisks"], + ), + Scenario( + "PREFERENCE", + draft="2026/04/30", + final="April 30, 2026", + probe="format a date in user-facing copy", + expected_keywords=["date", "format", "april", "long"], + ), # FACTUAL — wrong data - Scenario("FACTUAL", - draft="Use openai.ChatCompletion.create(...) to call GPT-4", - final="Use openai.chat.completions.create(...) — the v1 API style.", - probe="show how to call the OpenAI API", - expected_keywords=["v1", "chat", "completions", "api"]), - Scenario("FACTUAL", - draft="Sprites costs $30/mo for the Starter plan", - final="Sprites Starter is $60/mo.", - probe="quote pricing in a sales email", - expected_keywords=["60", "starter", "price"]), - Scenario("FACTUAL", - draft="React 18 introduced hooks", - final="React 16.8 introduced hooks; React 18 added concurrent rendering.", - probe="describe React's history of features", - expected_keywords=["16.8", "hooks", "concurrent"]), - Scenario("FACTUAL", - draft="HausGem is a B2B SaaS company", - final="HausGem is an ecommerce brand.", - probe="describe the HausGem business", - expected_keywords=["ecommerce", "brand", "not B2B"]), - + Scenario( + "FACTUAL", + draft="Use openai.ChatCompletion.create(...) to call GPT-4", + final="Use openai.chat.completions.create(...) — the v1 API style.", + probe="show how to call the OpenAI API", + expected_keywords=["v1", "chat", "completions", "api"], + ), + Scenario( + "FACTUAL", + draft="Sprites costs $30/mo for the Starter plan", + final="Sprites Starter is $60/mo.", + probe="quote pricing in a sales email", + expected_keywords=["60", "starter", "price"], + ), + Scenario( + "FACTUAL", + draft="React 18 introduced hooks", + final="React 16.8 introduced hooks; React 18 added concurrent rendering.", + probe="describe React's history of features", + expected_keywords=["16.8", "hooks", "concurrent"], + ), + Scenario( + "FACTUAL", + draft="HausGem is a B2B SaaS company", + final="HausGem is an ecommerce brand.", + probe="describe the HausGem business", + expected_keywords=["ecommerce", "brand", "not B2B"], + ), # PROCEDURAL — wrong order / skipped step - Scenario("PROCEDURAL", - draft="```bash\npip install gradata\ngradata init\n```", - final="```bash\npython -m venv .venv && source .venv/bin/activate\npip install gradata\ngradata init\n```", - probe="show installation steps for a Python tool", - expected_keywords=["venv", "virtualenv", "activate", "first"]), - Scenario("PROCEDURAL", - draft="Just merge the PR.", - final="Run the test suite locally first; if green, then merge.", - probe="describe the merge workflow", - expected_keywords=["test", "first", "verify", "before"]), - Scenario("PROCEDURAL", - draft="Send the email to the prospect.", - final="Check Apollo for prior conversations first; if any, reference them.", - probe="describe outbound email flow", - expected_keywords=["check", "apollo", "first", "history"]), - Scenario("PROCEDURAL", - draft="Apply the SQL migration.", - final="Snapshot the DB, apply the migration in a transaction, verify, then commit.", - probe="describe a database schema change", - expected_keywords=["snapshot", "transaction", "verify"]), - + Scenario( + "PROCEDURAL", + draft="```bash\npip install gradata\ngradata init\n```", + final="```bash\npython -m venv .venv && source .venv/bin/activate\npip install gradata\ngradata init\n```", + probe="show installation steps for a Python tool", + expected_keywords=["venv", "virtualenv", "activate", "first"], + ), + Scenario( + "PROCEDURAL", + draft="Just merge the PR.", + final="Run the test suite locally first; if green, then merge.", + probe="describe the merge workflow", + expected_keywords=["test", "first", "verify", "before"], + ), + Scenario( + "PROCEDURAL", + draft="Send the email to the prospect.", + final="Check Apollo for prior conversations first; if any, reference them.", + probe="describe outbound email flow", + expected_keywords=["check", "apollo", "first", "history"], + ), + Scenario( + "PROCEDURAL", + draft="Apply the SQL migration.", + final="Snapshot the DB, apply the migration in a transaction, verify, then commit.", + probe="describe a database schema change", + expected_keywords=["snapshot", "transaction", "verify"], + ), # DOMAIN — industry-specific rules - Scenario("DOMAIN", - draft="ICP includes anyone running ads", - final="ICP: 10-300 employees, US/UK/CA/AU/NZ, multi-brand ecom or PE-backed rollups.", - probe="describe the Sprites ICP", - expected_keywords=["10-300", "ecom", "rollup"]), - Scenario("DOMAIN", - draft="Use any salutation", - final="Always open with 'Hi [First Name],' — never 'Hey' or 'Dear'.", - probe="start a cold email to a prospect", - expected_keywords=["hi", "first name", "salutation"]), - Scenario("DOMAIN", - draft="Mention the Calendly link generically", - final="Always hyperlink as: [Book a call here](https://calendly.com/oliver-spritesai/30min)", - probe="add a CTA to a cold email", - expected_keywords=["book a call", "hyperlink", "calendly"]), - Scenario("DOMAIN", - draft="The H.M. Cole case study went well", - final="H.M. Cole moved from agency retainers to 5.8x ROAS in one month with Sprites.", - probe="cite a customer success story", - expected_keywords=["5.8x", "ROAS", "month"]), - + Scenario( + "DOMAIN", + draft="ICP includes anyone running ads", + final="ICP: 10-300 employees, US/UK/CA/AU/NZ, multi-brand ecom or PE-backed rollups.", + probe="describe the Sprites ICP", + expected_keywords=["10-300", "ecom", "rollup"], + ), + Scenario( + "DOMAIN", + draft="Use any salutation", + final="Always open with 'Hi [First Name],' — never 'Hey' or 'Dear'.", + probe="start a cold email to a prospect", + expected_keywords=["hi", "first name", "salutation"], + ), + Scenario( + "DOMAIN", + draft="Mention the Calendly link generically", + final="Always hyperlink as: [Book a call here](https://calendly.com/oliver-spritesai/30min)", + probe="add a CTA to a cold email", + expected_keywords=["book a call", "hyperlink", "calendly"], + ), + Scenario( + "DOMAIN", + draft="The H.M. Cole case study went well", + final="H.M. Cole moved from agency retainers to 5.8x ROAS in one month with Sprites.", + probe="cite a customer success story", + expected_keywords=["5.8x", "ROAS", "month"], + ), # BEHAVIORAL extras (most common class) - Scenario("BEHAVIORAL", - draft="I will research this and get back to you soon.", - final="I'll have an answer by EOD Friday.", - probe="commit to a timeline in a customer reply", - expected_keywords=["specific", "deadline", "concrete"]), - Scenario("BEHAVIORAL", - draft="```python\ndef do_thing(stuff):\n ...", - final="```python\ndef do_thing(stuff: dict) -> Result:\n ...", - probe="define a Python function in a code review", - expected_keywords=["type", "annotation", "hint"]), - Scenario("BEHAVIORAL", - draft="We could maybe possibly explore this option", - final="We will explore this option.", - probe="propose next steps in a sprint plan", - expected_keywords=["definite", "remove", "hedge"]), - Scenario("BEHAVIORAL", - draft="The user clicks the button and the magic happens.", - final="On click, validateInput() runs, then submitForm() POSTs to /api/save.", - probe="document a UI interaction", - expected_keywords=["specific", "function", "concrete"]), - Scenario("BEHAVIORAL", - draft="`em dashes — like this — break up a sentence`", - final="No em dashes. Period.", - probe="write any user-facing copy", - expected_keywords=["em dash", "no", "ban"]), + Scenario( + "BEHAVIORAL", + draft="I will research this and get back to you soon.", + final="I'll have an answer by EOD Friday.", + probe="commit to a timeline in a customer reply", + expected_keywords=["specific", "deadline", "concrete"], + ), + Scenario( + "BEHAVIORAL", + draft="```python\ndef do_thing(stuff):\n ...", + final="```python\ndef do_thing(stuff: dict) -> Result:\n ...", + probe="define a Python function in a code review", + expected_keywords=["type", "annotation", "hint"], + ), + Scenario( + "BEHAVIORAL", + draft="We could maybe possibly explore this option", + final="We will explore this option.", + probe="propose next steps in a sprint plan", + expected_keywords=["definite", "remove", "hedge"], + ), + Scenario( + "BEHAVIORAL", + draft="The user clicks the button and the magic happens.", + final="On click, validateInput() runs, then submitForm() POSTs to /api/save.", + probe="document a UI interaction", + expected_keywords=["specific", "function", "concrete"], + ), + Scenario( + "BEHAVIORAL", + draft="`em dashes — like this — break up a sentence`", + final="No em dashes. Period.", + probe="write any user-facing copy", + expected_keywords=["em dash", "no", "ban"], + ), ] @@ -221,15 +266,16 @@ class Scenario: # Result types # --------------------------------------------------------------------------- + @dataclass class SessionResult: session_id: int correction_class: str correction_count: int distractor_count: int - rule_extracted: bool # did Brain produce a rule from the correction? - rule_recalled_at_1: bool # was the right rule top-ranked on probe? - rule_recalled_at_3: bool # in top 3? + rule_extracted: bool # did Brain produce a rule from the correction? + rule_recalled_at_1: bool # was the right rule top-ranked on probe? + rule_recalled_at_3: bool # in top 3? elapsed_seconds: float error: str | None = None @@ -246,8 +292,10 @@ class BenchResult: # Core run # --------------------------------------------------------------------------- -def run_one_session(session_id: int, scenario: Scenario, brain_dir: Path, - distractor_count: int = 5) -> SessionResult: + +def run_one_session( + session_id: int, scenario: Scenario, brain_dir: Path, distractor_count: int = 5 +) -> SessionResult: t0 = time.time() try: brain = Brain.init(str(brain_dir)) @@ -265,8 +313,9 @@ def run_one_session(session_id: int, scenario: Scenario, brain_dir: Path, # apply_brain_rules returns a formatted string (not a list of rule objects). # We score by whether expected keywords appear in the rendered prompt block. text_lower = rules_text.lower() - recall_at_1 = bool(rules_text) and any(kw.lower() in text_lower - for kw in scenario.expected_keywords) + recall_at_1 = bool(rules_text) and any( + kw.lower() in text_lower for kw in scenario.expected_keywords + ) # @3 collapses to @1 with a string return — kept for compatibility / future # variant where we score per-rule chunks. recall_at_3 = recall_at_1 @@ -298,15 +347,18 @@ def run_one_session(session_id: int, scenario: Scenario, brain_dir: Path, ) -def run_benchmark(num_sessions: int, distractor_count: int, seed: int, - parallel: int = 1) -> BenchResult: +def run_benchmark( + num_sessions: int, distractor_count: int, seed: int, parallel: int = 1 +) -> BenchResult: random.seed(seed) out_dir = Path(__file__).resolve().parent / "results" out_dir.mkdir(exist_ok=True) results: list[SessionResult] = [] - print(f"[pmr-100] running {num_sessions} sessions, {distractor_count} distractors each, seed={seed}", - file=sys.stderr) + print( + f"[pmr-100] running {num_sessions} sessions, {distractor_count} distractors each, seed={seed}", + file=sys.stderr, + ) for i in range(num_sessions): # Pick a scenario (with replacement) @@ -316,7 +368,7 @@ def run_benchmark(num_sessions: int, distractor_count: int, seed: int, r = run_one_session(i, scenario, Path(tmp), distractor_count) results.append(r) if (i + 1) % 10 == 0: - print(f"[pmr-100] ... {i+1}/{num_sessions} done", file=sys.stderr) + print(f"[pmr-100] ... {i + 1}/{num_sessions} done", file=sys.stderr) # Compute summary total = len(results) @@ -340,12 +392,13 @@ def run_benchmark(num_sessions: int, distractor_count: int, seed: int, "recall_at_3_pct": round(100 * r3 / total, 1) if total else 0.0, "errors": len(errors), "median_session_seconds": round( - sorted(r.elapsed_seconds for r in results)[len(results) // 2] if results else 0, 3), + sorted(r.elapsed_seconds for r in results)[len(results) // 2] if results else 0, 3 + ), "by_class": dict(by_class), } bench = BenchResult( - timestamp=datetime.now(timezone.utc).isoformat(), + timestamp=datetime.now(UTC).isoformat(), config={ "num_sessions": num_sessions, "distractor_count": distractor_count, @@ -357,13 +410,18 @@ def run_benchmark(num_sessions: int, distractor_count: int, seed: int, ) # Save JSON - out_path = out_dir / f"pmr_100_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}.json" - out_path.write_text(json.dumps({ - "timestamp": bench.timestamp, - "config": bench.config, - "summary": bench.summary, - "sessions": [asdict(s) for s in bench.sessions], - }, indent=2)) + out_path = out_dir / f"pmr_100_{datetime.now(UTC).strftime('%Y%m%dT%H%M%S')}.json" + out_path.write_text( + json.dumps( + { + "timestamp": bench.timestamp, + "config": bench.config, + "summary": bench.summary, + "sessions": [asdict(s) for s in bench.sessions], + }, + indent=2, + ) + ) print(f"[pmr-100] saved: {out_path}", file=sys.stderr) return bench @@ -372,7 +430,7 @@ def run_benchmark(num_sessions: int, distractor_count: int, seed: int, def print_one_paragraph(bench: BenchResult) -> None: s = bench.summary print() - print(f"PMR-100 Procedural Memory Retention benchmark — Gradata") + print("PMR-100 Procedural Memory Retention benchmark — Gradata") print(f" Sessions: {s['total_sessions']}") print(f" Rules extracted: {s['rules_extracted_pct']}%") print(f" Recall@1: {s['recall_at_1_pct']}% (top-ranked rule matches the correction)") @@ -392,12 +450,19 @@ def print_one_paragraph(bench: BenchResult) -> None: def main() -> None: p = argparse.ArgumentParser(description=__doc__.split("\n")[0]) - p.add_argument("--quick", action="store_true", - help="10 sessions for fast feedback (default: 100)") - p.add_argument("-n", "--num-sessions", type=int, default=None, - help="Number of sessions (default: 10 with --quick, else 100)") - p.add_argument("--distractors", type=int, default=5, - help="Distractor turns per session (default: 5)") + p.add_argument( + "--quick", action="store_true", help="10 sessions for fast feedback (default: 100)" + ) + p.add_argument( + "-n", + "--num-sessions", + type=int, + default=None, + help="Number of sessions (default: 10 with --quick, else 100)", + ) + p.add_argument( + "--distractors", type=int, default=5, help="Distractor turns per session (default: 5)" + ) p.add_argument("--seed", type=int, default=42, help="Random seed (default: 42)") args = p.parse_args() diff --git a/Gradata/examples/basic_usage.py b/Gradata/examples/basic_usage.py index bcd14039..5861aeeb 100644 --- a/Gradata/examples/basic_usage.py +++ b/Gradata/examples/basic_usage.py @@ -1,5 +1,7 @@ """Basic Gradata usage — learn from corrections in 10 lines.""" + from pathlib import Path + from gradata.brain import Brain # Create a brain (or open existing) diff --git a/Gradata/examples/domain-profiles/call_profile.py b/Gradata/examples/domain-profiles/call_profile.py index f1c43dca..5d484a27 100644 --- a/Gradata/examples/domain-profiles/call_profile.py +++ b/Gradata/examples/domain-profiles/call_profile.py @@ -41,11 +41,11 @@ from dataclasses import dataclass, field from typing import Any - # --------------------------------------------------------------------------- # Transcript Parsing # --------------------------------------------------------------------------- + @dataclass class Utterance: """A single speaker turn in a transcript. @@ -99,7 +99,9 @@ def parse_transcript( start = float(s.get("start_time", 0)) end = float(s.get("end_time", 0)) if text.strip(): - utterances.append(Utterance(speaker=speaker, text=text.strip(), start_time=start, end_time=end)) + utterances.append( + Utterance(speaker=speaker, text=text.strip(), start_time=start, end_time=end) + ) # Infer user speaker if not provided if not user_speaker and utterances: @@ -121,7 +123,8 @@ def parse_transcript( _OPEN_Q = re.compile( r"^\s*(?:what|how|why|tell me|describe|explain|walk me through|" r"can you (?:share|tell|describe|walk)|what would|how does|how do|" - r"what if|what's your|how would)\b", re.I + r"what if|what's your|how would)\b", + re.I, ) # Pain/impact probing questions @@ -131,14 +134,16 @@ def parse_transcript( r"what (?:does|would) that (?:mean|cost|look like)|" r"how much (?:time|money|effort)|what's at stake|" r"biggest (?:challenge|problem|issue|pain)|" - r"what would it mean (?:for|if|to))", re.I + r"what would it mean (?:for|if|to))", + re.I, ) # Closed questions (yes/no oriented) _CLOSED_Q = re.compile( r"^\s*(?:do you|are you|is (?:it|that|there)|have you|" r"did you|can you|would you|will you|could you|" - r"does (?:it|that|your))\b", re.I + r"does (?:it|that|your))\b", + re.I, ) # Story/proof indicators @@ -147,7 +152,8 @@ def parse_transcript( r"for example|similar (?:to|situation)|case study|" r"(?:company|agency|team) (?:like yours|similar to)|" r"we worked with|they (?:saw|achieved|reduced|increased)|" - r"the result was|within (?:\d+|a few) (?:weeks|months|days))", re.I + r"the result was|within (?:\d+|a few) (?:weeks|months|days))", + re.I, ) # Objection markers (from prospect) @@ -157,7 +163,8 @@ def parse_transcript( r"not (?:the right|a good) time|need to (?:think|discuss|talk)|" r"not sure (?:if|about|we)|sounds (?:expensive|complicated)|" r"what if (?:it|we)|concerned about|worried about|" - r"how (?:is|are) you different|why (?:should|would) (?:we|I))", re.I + r"how (?:is|are) you different|why (?:should|would) (?:we|I))", + re.I, ) # Close/commitment language (from user) @@ -168,7 +175,8 @@ def parse_transcript( r"does (?:that|this) (?:work|sound)|" r"how about (?:we|I)|shall (?:we|I)|" r"I'll (?:send|share|prepare|get)|" - r"let's (?:plan|schedule|set|do))", re.I + r"let's (?:plan|schedule|set|do))", + re.I, ) # Specific commitment markers @@ -176,7 +184,8 @@ def parse_transcript( r"(?:by (?:Monday|Tuesday|Wednesday|Thursday|Friday|tomorrow|end of (?:day|week))|" r"(?:at|on) (?:\d{1,2}(?::\d{2})?\s*(?:am|pm|PT|ET|CT))|" r"\d{1,2}/\d{1,2}|" - r"(?:this|next) (?:Monday|Tuesday|Wednesday|Thursday|Friday|week))", re.I + r"(?:this|next) (?:Monday|Tuesday|Wednesday|Thursday|Friday|week))", + re.I, ) @@ -184,41 +193,41 @@ def parse_transcript( # Feature Extraction # --------------------------------------------------------------------------- + @dataclass class CallFeatures: """Extracted behavioral features from a single call transcript.""" - call_type: str = "unknown" # discovery, demo, follow_up, break_up + call_type: str = "unknown" # discovery, demo, follow_up, break_up duration_minutes: float = 0.0 total_words: int = 0 user_words: int = 0 prospect_words: int = 0 - talk_ratio: float = 0.0 # user_words / total_words - turn_count: int = 0 # total speaker turns + talk_ratio: float = 0.0 # user_words / total_words + turn_count: int = 0 # total speaker turns user_turns: int = 0 - avg_turn_length: float = 0.0 # avg words per user turn - longest_monologue: int = 0 # longest consecutive user speech - question_count: int = 0 # questions user asked + avg_turn_length: float = 0.0 # avg words per user turn + longest_monologue: int = 0 # longest consecutive user speech + question_count: int = 0 # questions user asked open_question_count: int = 0 closed_question_count: int = 0 pain_question_count: int = 0 - open_question_ratio: float = 0.0 # open / total questions - story_count: int = 0 # case study/proof deployments - objection_count: int = 0 # objections from prospect - objection_responses: int = 0 # user responses to objections - close_attempts: int = 0 # next-step/commitment asks - commitment_count: int = 0 # total commitments made - specific_commitments: int = 0 # commitments with dates/times + open_question_ratio: float = 0.0 # open / total questions + story_count: int = 0 # case study/proof deployments + objection_count: int = 0 # objections from prospect + objection_responses: int = 0 # user responses to objections + close_attempts: int = 0 # next-step/commitment asks + commitment_count: int = 0 # total commitments made + specific_commitments: int = 0 # commitments with dates/times commitment_specificity: float = 0.0 # specific / total commitments - discovery_minutes: float = 0.0 # time before first product mention - first_pitch_minute: float = 0.0 # when user first pitched + discovery_minutes: float = 0.0 # time before first product mention + first_pitch_minute: float = 0.0 # when user first pitched def to_dict(self) -> dict[str, Any]: - return {k: round(v, 2) if isinstance(v, float) else v - for k, v in self.__dict__.items()} + return {k: round(v, 2) if isinstance(v, float) else v for k, v in self.__dict__.items()} @classmethod - def from_dict(cls, d: dict[str, Any]) -> "CallFeatures": + def from_dict(cls, d: dict[str, Any]) -> CallFeatures: return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) @@ -312,9 +321,9 @@ def extract_call_features( # Commitments commitment_count = close_attempts # close attempts that include specifics - specific_commits = sum(1 for u in user_utts - if _CLOSE_MARKERS.search(u.text) - and _SPECIFIC_COMMIT.search(u.text)) + specific_commits = sum( + 1 for u in user_utts if _CLOSE_MARKERS.search(u.text) and _SPECIFIC_COMMIT.search(u.text) + ) commit_specificity = specific_commits / commitment_count if commitment_count > 0 else 0.0 # Discovery depth: time before first "product" mention @@ -322,7 +331,8 @@ def extract_call_features( _PITCH_MARKERS = re.compile( r"(?:let me (?:show|walk|demo)|here's (?:how|what)|" r"our (?:platform|tool|product|solution|system)|" - r"the way (?:it|we) work|feature|dashboard|integration)", re.I + r"the way (?:it|we) work|feature|dashboard|integration)", + re.I, ) first_pitch_time = 0.0 for u in user_utts: @@ -364,13 +374,14 @@ def extract_call_features( # Call Profile (aggregated from multiple calls) # --------------------------------------------------------------------------- + @dataclass class CallOutcome: """Outcome of a single call, paired with its features.""" features: CallFeatures - outcome: str # "advanced" | "stalled" | "lost" | "closed_won" - next_stage: str = "" # e.g., "demo_scheduled", "proposal_sent" + outcome: str # "advanced" | "stalled" | "lost" | "closed_won" + next_stage: str = "" # e.g., "demo_scheduled", "proposal_sent" notes: str = "" def to_dict(self) -> dict[str, Any]: @@ -390,8 +401,8 @@ class CallProfile: sample_count: int = 0 outcomes: list[CallOutcome] = field(default_factory=list) avg_features: CallFeatures = field(default_factory=CallFeatures) - win_features: CallFeatures | None = None # avg features when outcome=advanced/closed_won - loss_features: CallFeatures | None = None # avg features when outcome=stalled/lost + win_features: CallFeatures | None = None # avg features when outcome=advanced/closed_won + loss_features: CallFeatures | None = None # avg features when outcome=stalled/lost confidence: float = 0.0 patterns: list[str] = field(default_factory=list) # graduated pattern descriptions @@ -496,13 +507,10 @@ def _discover_patterns( # Talk ratio if win.talk_ratio < loss.talk_ratio - 0.08: patterns.append( - f"Talk less: wins avg {win.talk_ratio:.0%} talk ratio vs " - f"losses {loss.talk_ratio:.0%}" + f"Talk less: wins avg {win.talk_ratio:.0%} talk ratio vs losses {loss.talk_ratio:.0%}" ) elif win.talk_ratio > loss.talk_ratio + 0.08: - patterns.append( - f"Talk more: wins avg {win.talk_ratio:.0%} vs losses {loss.talk_ratio:.0%}" - ) + patterns.append(f"Talk more: wins avg {win.talk_ratio:.0%} vs losses {loss.talk_ratio:.0%}") # Pain questions if win.pain_question_count > loss.pain_question_count + 1: @@ -542,14 +550,16 @@ def _discover_patterns( # Objection handling if win.objection_responses > 0 and loss.objection_responses == 0: patterns.append("Address objections directly — wins always respond, losses ignore") - elif (win.objection_count > 0 and loss.objection_count > 0 - and win.objection_responses / max(1, win.objection_count) - > loss.objection_responses / max(1, loss.objection_count) + 0.2): + elif ( + win.objection_count > 0 + and loss.objection_count > 0 + and win.objection_responses / max(1, win.objection_count) + > loss.objection_responses / max(1, loss.objection_count) + 0.2 + ): win_rate = win.objection_responses / max(1, win.objection_count) loss_rate = loss.objection_responses / max(1, loss.objection_count) patterns.append( - f"Handle more objections: wins respond to {win_rate:.0%} vs " - f"losses {loss_rate:.0%}" + f"Handle more objections: wins respond to {win_rate:.0%} vs losses {loss_rate:.0%}" ) # Monologue length @@ -608,10 +618,14 @@ def generate_cheat_sheet(profile: CallProfile, prospect_context: str = "") -> st if target.talk_ratio > 0: emoji = "<<" if target.talk_ratio < 0.40 else ">>" if target.talk_ratio > 0.55 else "==" - lines.append(f"- Talk ratio: {target.talk_ratio:.0%} {emoji} (you {'listen more' if target.talk_ratio < 0.45 else 'drive more'})") + lines.append( + f"- Talk ratio: {target.talk_ratio:.0%} {emoji} (you {'listen more' if target.talk_ratio < 0.45 else 'drive more'})" + ) if target.question_count > 0: - lines.append(f"- Ask {target.question_count}+ questions ({target.pain_question_count}+ pain questions)") + lines.append( + f"- Ask {target.question_count}+ questions ({target.pain_question_count}+ pain questions)" + ) if target.discovery_minutes > 0: lines.append(f"- Spend {target.discovery_minutes:.0f}+ min in discovery before pitching") @@ -642,6 +656,7 @@ def generate_cheat_sheet(profile: CallProfile, prospect_context: str = "") -> st # Post-Call Audit # --------------------------------------------------------------------------- + @dataclass class AuditCheck: """Result of a single post-call audit check.""" @@ -658,7 +673,7 @@ class PostCallAudit: """Complete post-call audit against graduated patterns.""" checks: list[AuditCheck] - score: float # 0.0-1.0, fraction of checks passed + score: float # 0.0-1.0, fraction of checks passed call_type: str summary: str @@ -668,8 +683,13 @@ def to_dict(self) -> dict[str, Any]: "call_type": self.call_type, "summary": self.summary, "checks": [ - {"rule": c.rule, "passed": c.passed, "actual": c.actual, - "target": c.target, "severity": c.severity} + { + "rule": c.rule, + "passed": c.passed, + "actual": c.actual, + "target": c.target, + "severity": c.severity, + } for c in self.checks ], } @@ -694,102 +714,126 @@ def post_call_audit( """ if profile.sample_count < 5: return PostCallAudit( - checks=[], score=1.0, call_type=features.call_type, + checks=[], + score=1.0, + call_type=features.call_type, summary="Not enough data for audit (need 5+ calls with outcomes)", ) target = profile.win_features if profile.win_features else profile.avg_features - severity = "critical" if profile.confidence >= 0.90 else "warning" if profile.confidence >= 0.60 else "info" + severity = ( + "critical" + if profile.confidence >= 0.90 + else "warning" + if profile.confidence >= 0.60 + else "info" + ) checks: list[AuditCheck] = [] # Talk ratio if target.talk_ratio > 0: ok = abs(features.talk_ratio - target.talk_ratio) < 0.15 - checks.append(AuditCheck( - rule=f"Talk ratio near {target.talk_ratio:.0%}", - passed=ok, - actual=f"{features.talk_ratio:.0%}", - target=f"{target.talk_ratio:.0%} +/- 15%", - severity=severity, - )) + checks.append( + AuditCheck( + rule=f"Talk ratio near {target.talk_ratio:.0%}", + passed=ok, + actual=f"{features.talk_ratio:.0%}", + target=f"{target.talk_ratio:.0%} +/- 15%", + severity=severity, + ) + ) # Pain questions if target.pain_question_count > 0: ok = features.pain_question_count >= target.pain_question_count - checks.append(AuditCheck( - rule=f"Ask {target.pain_question_count}+ pain questions", - passed=ok, - actual=str(features.pain_question_count), - target=f"{target.pain_question_count}+", - severity=severity, - )) + checks.append( + AuditCheck( + rule=f"Ask {target.pain_question_count}+ pain questions", + passed=ok, + actual=str(features.pain_question_count), + target=f"{target.pain_question_count}+", + severity=severity, + ) + ) # Questions total if target.question_count > 0: ok = features.question_count >= max(1, target.question_count - 2) - checks.append(AuditCheck( - rule=f"Ask {target.question_count}+ questions total", - passed=ok, - actual=str(features.question_count), - target=f"{target.question_count}+", - severity="info", # less critical than pain questions - )) + checks.append( + AuditCheck( + rule=f"Ask {target.question_count}+ questions total", + passed=ok, + actual=str(features.question_count), + target=f"{target.question_count}+", + severity="info", # less critical than pain questions + ) + ) # Story deployment if target.story_count > 0: ok = features.story_count >= target.story_count - checks.append(AuditCheck( - rule=f"Deploy {target.story_count}+ stories/proof points", - passed=ok, - actual=str(features.story_count), - target=f"{target.story_count}+", - severity=severity, - )) + checks.append( + AuditCheck( + rule=f"Deploy {target.story_count}+ stories/proof points", + passed=ok, + actual=str(features.story_count), + target=f"{target.story_count}+", + severity=severity, + ) + ) # Discovery depth if target.discovery_minutes > 2: ok = features.discovery_minutes >= target.discovery_minutes * 0.7 - checks.append(AuditCheck( - rule=f"Spend {target.discovery_minutes:.0f}+ min in discovery", - passed=ok, - actual=f"{features.discovery_minutes:.0f} min", - target=f"{target.discovery_minutes:.0f}+ min", - severity=severity, - )) + checks.append( + AuditCheck( + rule=f"Spend {target.discovery_minutes:.0f}+ min in discovery", + passed=ok, + actual=f"{features.discovery_minutes:.0f} min", + target=f"{target.discovery_minutes:.0f}+ min", + severity=severity, + ) + ) # Close attempt if target.close_attempts > 0: ok = features.close_attempts >= 1 - checks.append(AuditCheck( - rule="Make at least 1 close/next-step attempt", - passed=ok, - actual=str(features.close_attempts), - target="1+", - severity="critical" if profile.confidence >= 0.60 else "info", - )) + checks.append( + AuditCheck( + rule="Make at least 1 close/next-step attempt", + passed=ok, + actual=str(features.close_attempts), + target="1+", + severity="critical" if profile.confidence >= 0.60 else "info", + ) + ) # Commitment specificity if target.commitment_specificity > 0.3: ok = features.commitment_specificity >= 0.5 or features.commitment_count == 0 - checks.append(AuditCheck( - rule="Commitments should include specific dates/times", - passed=ok, - actual=f"{features.commitment_specificity:.0%} specific", - target="50%+", - severity=severity, - )) + checks.append( + AuditCheck( + rule="Commitments should include specific dates/times", + passed=ok, + actual=f"{features.commitment_specificity:.0%} specific", + target="50%+", + severity=severity, + ) + ) # Monologue length if target.longest_monologue > 0 and target.longest_monologue < 200: ok = features.longest_monologue <= target.longest_monologue * 1.5 - checks.append(AuditCheck( - rule=f"Keep monologues under {round(target.longest_monologue * 1.5)} words", - passed=ok, - actual=f"{features.longest_monologue} words", - target=f"<{round(target.longest_monologue * 1.5)} words", - severity="info", - )) + checks.append( + AuditCheck( + rule=f"Keep monologues under {round(target.longest_monologue * 1.5)} words", + passed=ok, + actual=f"{features.longest_monologue} words", + target=f"<{round(target.longest_monologue * 1.5)} words", + severity="info", + ) + ) passed = sum(1 for c in checks if c.passed) score = passed / len(checks) if checks else 1.0 diff --git a/Gradata/examples/domain-profiles/sales_profile.py b/Gradata/examples/domain-profiles/sales_profile.py index bc6c6c89..19a2267a 100644 --- a/Gradata/examples/domain-profiles/sales_profile.py +++ b/Gradata/examples/domain-profiles/sales_profile.py @@ -26,15 +26,16 @@ from __future__ import annotations from collections import defaultdict -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import datetime @dataclass class OutreachEvent: """A single outreach attempt.""" - channel: str # email, call, linkedin, etc. - timestamp: str # ISO 8601 + + channel: str # email, call, linkedin, etc. + timestamp: str # ISO 8601 prospect: str = "" replied: bool = False reply_sentiment: str = "" # positive, neutral, negative @@ -43,6 +44,7 @@ class OutreachEvent: @dataclass class FollowupEvent: """A follow-up in a sequence.""" + prospect: str touch_number: int days_since_last: int @@ -53,20 +55,21 @@ class FollowupEvent: @dataclass class SalesProfileReport: """Aggregated sales behavioral metrics.""" + # Timing - best_send_hours: list[int] # Hours (0-23) with highest reply rates - best_send_days: list[str] # Days of week with highest reply rates - avg_response_time_hours: float # Average time to reply after outreach + best_send_hours: list[int] # Hours (0-23) with highest reply rates + best_send_days: list[str] # Days of week with highest reply rates + avg_response_time_hours: float # Average time to reply after outreach # Cadence avg_days_between_touches: float - cadence_compliance: float # 0-1, how close to optimal cadence - touches_before_reply: float # Average touches before first reply - drop_off_touch: int # Touch number where most sequences die + cadence_compliance: float # 0-1, how close to optimal cadence + touches_before_reply: float # Average touches before first reply + drop_off_touch: int # Touch number where most sequences die # Multi-threading - avg_contacts_per_deal: float # Gong: 2x contacts = higher win rate - deals_with_single_contact: int # Risk indicator + avg_contacts_per_deal: float # Gong: 2x contacts = higher win rate + deals_with_single_contact: int # Risk indicator deals_with_multithread: int # Volume @@ -92,13 +95,15 @@ def log_outreach( reply_sentiment: str = "", ) -> None: """Log an outreach attempt.""" - self._outreach.append(OutreachEvent( - channel=channel, - timestamp=timestamp, - prospect=prospect, - replied=replied, - reply_sentiment=reply_sentiment, - )) + self._outreach.append( + OutreachEvent( + channel=channel, + timestamp=timestamp, + prospect=prospect, + replied=replied, + reply_sentiment=reply_sentiment, + ) + ) def log_followup( self, @@ -109,13 +114,15 @@ def log_followup( replied: bool = False, ) -> None: """Log a follow-up touch in a sequence.""" - self._followups.append(FollowupEvent( - prospect=prospect, - touch_number=touch_number, - days_since_last=days_since_last, - channel=channel, - replied=replied, - )) + self._followups.append( + FollowupEvent( + prospect=prospect, + touch_number=touch_number, + days_since_last=days_since_last, + channel=channel, + replied=replied, + ) + ) def log_deal_contact(self, deal: str, contact_role: str) -> None: """Log a contact associated with a deal (multi-threading tracking).""" @@ -147,9 +154,7 @@ def compute(self) -> SalesProfileReport: # Best days by reply rate day_rates = { - d: sum(replies) / len(replies) - for d, replies in reply_days.items() - if len(replies) >= 3 + d: sum(replies) / len(replies) for d, replies in reply_days.items() if len(replies) >= 3 } best_days = sorted(day_rates, key=day_rates.get, reverse=True)[:3] @@ -159,7 +164,8 @@ def compute(self) -> SalesProfileReport: replied_followups = [f for f in self._followups if f.replied] touches_before = ( sum(f.touch_number for f in replied_followups) / len(replied_followups) - if replied_followups else 0.0 + if replied_followups + else 0.0 ) # Find drop-off: touch number with most non-replied sequences touch_counts: dict[int, int] = defaultdict(int) @@ -178,7 +184,7 @@ def compute(self) -> SalesProfileReport: if gaps: deviations = [ abs(a - o) / max(o, 1) - for a, o in zip(gaps[:5], optimal_gaps[:len(gaps)]) + for a, o in zip(gaps[:5], optimal_gaps[: len(gaps)], strict=False) ] compliances.append(max(0.0, 1.0 - sum(deviations) / len(deviations))) compliance = sum(compliances) / len(compliances) if compliances else 0.0 @@ -194,7 +200,8 @@ def compute(self) -> SalesProfileReport: multi = sum(1 for contacts in self._deal_contacts.values() if len(contacts) > 1) avg_contacts = ( sum(len(c) for c in self._deal_contacts.values()) / deal_count - if deal_count > 0 else 0.0 + if deal_count > 0 + else 0.0 ) # Reply rates diff --git a/Gradata/examples/with_claude_code.py b/Gradata/examples/with_claude_code.py index 1fb51c86..7fded4a2 100644 --- a/Gradata/examples/with_claude_code.py +++ b/Gradata/examples/with_claude_code.py @@ -13,6 +13,7 @@ See also: .claude-plugin/README.md for the zero-code install flow. """ + from pathlib import Path from gradata.brain import Brain diff --git a/Gradata/examples/with_openai.py b/Gradata/examples/with_openai.py index c7e8e0de..4d3ad851 100644 --- a/Gradata/examples/with_openai.py +++ b/Gradata/examples/with_openai.py @@ -7,6 +7,7 @@ Requires: pip install gradata openai """ + from pathlib import Path from openai import OpenAI diff --git a/Gradata/scripts/migrate_legacy_scopes.py b/Gradata/scripts/migrate_legacy_scopes.py index 9fcef431..b81da7cc 100644 --- a/Gradata/scripts/migrate_legacy_scopes.py +++ b/Gradata/scripts/migrate_legacy_scopes.py @@ -118,9 +118,7 @@ def load_domain_set( if isinstance(data, list): return {str(d).strip().lower() for d in data if str(d).strip()} except ImportError: - logger.warning( - "PyYAML not installed; falling back to inferred domain set." - ) + logger.warning("PyYAML not installed; falling back to inferred domain set.") except Exception as exc: # pragma: no cover - defensive logger.warning("Could not parse %s: %s", domains_file, exc) diff --git a/Gradata/src/gradata/_config.py b/Gradata/src/gradata/_config.py index 90660bb8..f2b5191c 100644 --- a/Gradata/src/gradata/_config.py +++ b/Gradata/src/gradata/_config.py @@ -79,6 +79,11 @@ class BrainConfig: max_recall_tokens: int = 2000 ranker: RecallRanker = "hybrid" + @classmethod + def load(cls, brain_dir: str | Path | None = None) -> BrainConfig: + """Load runtime config from ``/brain-config.json``.""" + return _load_brain_config(brain_dir) + BRAIN_CONFIG = BrainConfig() diff --git a/Gradata/src/gradata/_context_packet.py b/Gradata/src/gradata/_context_packet.py index 9d3a2e9a..e03bfa9a 100644 --- a/Gradata/src/gradata/_context_packet.py +++ b/Gradata/src/gradata/_context_packet.py @@ -382,6 +382,7 @@ def build_packet( packet = {} if session is None: session = _detect_session() + session = session or 0 if task_type in ("prospecting", "meeting_prep"): packet["user_scope"] = _load_user_scope(ctx=ctx) diff --git a/Gradata/src/gradata/_core.py b/Gradata/src/gradata/_core.py index 8c8a8d5c..9621755f 100644 --- a/Gradata/src/gradata/_core.py +++ b/Gradata/src/gradata/_core.py @@ -1231,11 +1231,15 @@ def _cloud_sync_session( """Best-effort cloud sync at session end. Never raises, never blocks.""" try: import hashlib - import os from pathlib import Path - # 1. Resolve cloud credentials: ~/.gradata/config.toml or env var - api_key = os.environ.get("GRADATA_API_KEY", "") + # 1. Resolve cloud credentials: per-brain cloud-config.json, keyfile, + # legacy ~/.gradata/config.toml, or env var. + from gradata.cloud import _credentials as _cloud_creds + from gradata.cloud.sync import load_config as _load_cloud_config + + brain_cloud_cfg = _load_cloud_config(brain.dir) + api_key = _cloud_creds.resolve_credential(fallback=brain_cloud_cfg.token) api_url = "" brain_id_from_config = "" @@ -1249,6 +1253,7 @@ def _cloud_sync_session( except Exception as e: _log.debug("cloud config parse failed: %s", e) + api_url = _cloud_creds.resolve_endpoint(api_url, fallback=brain_cloud_cfg.api_base) if not api_key: return # No cloud credentials — nothing to sync @@ -1337,17 +1342,15 @@ def _cloud_sync_session( sync_client.sync_metrics(payload) _log.debug("Cloud telemetry synced for session %d", session) - # Finding 11: respect sync_mode — default is metrics_only. - # Only sync full events/corrections if explicitly opted in via config. - sync_mode = "metrics_only" + sync_mode = getattr(brain_cloud_cfg, "sync_mode", "full") or "full" try: cfg = _parse_toml_cloud(config_path) - sync_mode = cfg.get("sync_mode", "metrics_only") + sync_mode = cfg.get("sync_mode", sync_mode) except Exception: pass if sync_mode == "full": - # 4. Sync events/corrections via the full cloud client (opt-in only) + # 4. Sync events/corrections via the full cloud client. try: from gradata.cloud.client import CloudClient @@ -1361,9 +1364,14 @@ def _cloud_sync_session( _log.debug("Cloud event sync completed for session %d", session) except Exception as e: _log.debug("Cloud event sync failed (non-fatal): %s", e) + elif sync_mode == "metrics_only": + _log.warning( + "Skipping event/correction sync (sync_mode=metrics_only) — dashboard will " + "not show corrections. Run `gradata sync --full` to backfill." + ) else: - _log.debug( - "Cloud sync_mode=%s — skipping event/correction sync for session %d", + _log.warning( + "Unknown cloud sync_mode=%s — skipping event/correction sync for session %d", sync_mode, session, ) diff --git a/Gradata/src/gradata/_events.py b/Gradata/src/gradata/_events.py index 2ebf0b75..3cb16aef 100644 --- a/Gradata/src/gradata/_events.py +++ b/Gradata/src/gradata/_events.py @@ -7,6 +7,7 @@ from __future__ import annotations import contextlib +import hashlib import json import logging import os @@ -20,6 +21,8 @@ # `from X import Y` copies the value at import time — subsequent set_brain_dir() won't update it. import gradata._paths as _p from gradata._file_lock import platform_lock +from gradata._migrations._ulid import new_ulid +from gradata._migrations.device_uuid import get_or_create_device_id from gradata._platform import detect_platform_source from gradata._tenant import tenant_for @@ -31,6 +34,48 @@ _EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") +def _canonical_content_hash(ev_type: str, source: str | None, data: dict | None) -> str: + """Return deterministic SHA256 for the redacted event content.""" + canonical = json.dumps( + {"type": ev_type, "source": source or "", "data": data or {}}, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + default=str, + ) + return hashlib.sha256(canonical.encode("utf-8")).hexdigest() + + +def _legacy_event_id(event: dict) -> str: + """Stable id for legacy JSONL rows that predate event_id.""" + payload = { + "ts": event.get("ts", ""), + "session": event.get("session"), + "type": event.get("type", ""), + "source": event.get("source", ""), + "data": event.get("data", {}), + "tags": event.get("tags", []), + } + canonical = json.dumps( + payload, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + default=str, + ) + return hashlib.sha256(canonical.encode("utf-8")).hexdigest()[:32] + + +def _coerce_session_for_storage(session: int | str | None) -> int | None: + if session is None or isinstance(session, bool): + return None + try: + parsed = int(session) + except (TypeError, ValueError, OverflowError): + return None + return parsed if parsed > 0 else None + + def _redact_string(value: str) -> str: return _EMAIL_RE.sub("[REDACTED_EMAIL]", value) @@ -117,25 +162,22 @@ def _ensure_table(conn: sqlite3.Connection): valid_until TEXT, scope TEXT DEFAULT 'local', tenant_id TEXT, + brain_id TEXT, + event_id TEXT, + device_id TEXT, + content_hash TEXT, + correction_chain_id TEXT, + origin_agent TEXT, schema_version INTEGER DEFAULT 1 ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_events_session ON events(session)") conn.execute("CREATE INDEX IF NOT EXISTS idx_events_type ON events(type)") conn.execute("CREATE INDEX IF NOT EXISTS idx_events_session_type ON events(session, type)") - # Dedup guard keyed by (tenant_id, ts, type, source). Tenant scoping - # prevents cross-tenant collisions after multi-tenant rollout while - # preserving retry-safe idempotent writes within a tenant. - # Suppresses IntegrityError too: legacy DBs may hold pre-existing - # duplicate rows that block the UNIQUE index creation — tolerated so - # writes still proceed (INSERT OR IGNORE degrades to plain INSERT). with contextlib.suppress(sqlite3.OperationalError, sqlite3.IntegrityError): conn.execute("DROP INDEX IF EXISTS idx_events_dedup") with contextlib.suppress(sqlite3.OperationalError, sqlite3.IntegrityError): - conn.execute( - "CREATE UNIQUE INDEX IF NOT EXISTS idx_events_dedup_tenant " - "ON events(tenant_id, ts, type, source)" - ) + conn.execute("DROP INDEX IF EXISTS idx_events_dedup_tenant") with contextlib.suppress(sqlite3.OperationalError): conn.execute("ALTER TABLE events ADD COLUMN valid_from TEXT") with contextlib.suppress(sqlite3.OperationalError): @@ -144,10 +186,27 @@ def _ensure_table(conn: sqlite3.Connection): conn.execute("ALTER TABLE events ADD COLUMN scope TEXT DEFAULT 'local'") with contextlib.suppress(sqlite3.OperationalError): conn.execute("ALTER TABLE events ADD COLUMN tenant_id TEXT") + with contextlib.suppress(sqlite3.OperationalError): + conn.execute("ALTER TABLE events ADD COLUMN brain_id TEXT") + with contextlib.suppress(sqlite3.OperationalError): + conn.execute("ALTER TABLE events ADD COLUMN event_id TEXT") + with contextlib.suppress(sqlite3.OperationalError): + conn.execute("ALTER TABLE events ADD COLUMN device_id TEXT") + with contextlib.suppress(sqlite3.OperationalError): + conn.execute("ALTER TABLE events ADD COLUMN content_hash TEXT") + with contextlib.suppress(sqlite3.OperationalError): + conn.execute("ALTER TABLE events ADD COLUMN correction_chain_id TEXT") + with contextlib.suppress(sqlite3.OperationalError): + conn.execute("ALTER TABLE events ADD COLUMN origin_agent TEXT") with contextlib.suppress(sqlite3.OperationalError): conn.execute("ALTER TABLE events ADD COLUMN schema_version INTEGER DEFAULT 1") with contextlib.suppress(sqlite3.OperationalError): conn.execute("CREATE INDEX IF NOT EXISTS idx_events_tenant ON events(tenant_id)") + with contextlib.suppress(sqlite3.OperationalError): + conn.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS idx_events_brain_event_id " + "ON events(brain_id, event_id)" + ) conn.commit() if db_file: _schema_initialized.add(db_file) @@ -162,27 +221,45 @@ def _insert_event_projection( """Project one canonical JSONL event into SQLite idempotently.""" _ensure_table(conn) tid = tenant_for(brain_dir) + brain_id = event.get("brain_id") or tid + event_id = event.get("event_id") or _legacy_event_id(event) + device_id = event.get("device_id") or get_or_create_device_id(brain_dir) + data = event.get("data", {}) + if not isinstance(data, dict): + data = {"_raw": data} + content_hash = event.get("content_hash") or _canonical_content_hash( + event.get("type", ""), + event.get("source", ""), + data, + ) cursor = conn.execute( "INSERT OR IGNORE INTO events " - "(ts, session, type, source, data_json, tags_json, valid_from, valid_until, tenant_id, schema_version) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)", + "(ts, session, type, source, data_json, tags_json, valid_from, valid_until, " + "tenant_id, brain_id, event_id, device_id, content_hash, correction_chain_id, origin_agent, schema_version) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1)", ( event.get("ts", ""), - event.get("session"), + _coerce_session_for_storage(event.get("session")), event.get("type", ""), event.get("source", ""), - json.dumps(event.get("data", {}), default=str), + json.dumps(data, default=str), json.dumps(event.get("tags", []), default=str), event.get("valid_from"), event.get("valid_until"), tid, + brain_id, + event_id, + device_id, + content_hash, + event.get("correction_chain_id"), + event.get("origin_agent"), ), ) if cursor.rowcount == 1: return cursor.lastrowid existing = conn.execute( - "SELECT id FROM events WHERE tenant_id=? AND ts=? AND type=? AND source=?", - (tid, event.get("ts", ""), event.get("type", ""), event.get("source", "")), + "SELECT id FROM events WHERE brain_id=? AND event_id=?", + (brain_id, event_id), ).fetchone() return existing[0] if existing else None @@ -256,6 +333,7 @@ def emit( valid_until: str | None = None, ctx: BrainContext | None = None, ts: str | None = None, + event_id: str | None = None, ): """Emit an event to the brain's event log. @@ -269,11 +347,15 @@ def emit( # Resolve paths: prefer explicit ctx, fall back to globals events_jsonl = ctx.events_jsonl if ctx else _p.EVENTS_JSONL db_path = ctx.db_path if ctx else _p.DB_PATH + brain_dir = db_path.parent if ts is None: ts = datetime.now(UTC).isoformat() if session is None: session = _detect_session(ctx=ctx) + if session is None: + session = 1 + session = _coerce_session_for_storage(session) if valid_from is None: valid_from = ts @@ -313,7 +395,21 @@ def emit( # Redact PII BEFORE any cloud-syncable storage. Fail-closed: if the # redactor raises, no data (raw or canonical) may be persisted. redacted_data = _redact_payload(data or {}) + if not isinstance(redacted_data, dict): + redacted_data = {"_raw": redacted_data} redacted_tags = _redact_payload(enriched_tags) + try: + tenant_id = tenant_for(brain_dir) + except Exception as exc: + _log.debug("tenant id resolution failed: %s", exc) + tenant_id = hashlib.sha256(str(brain_dir).encode("utf-8")).hexdigest()[:32] + event_id = event_id or new_ulid() + try: + device_id = get_or_create_device_id(brain_dir) + except Exception as exc: + _log.debug("device id resolution failed: %s", exc) + device_id = "dev_" + hashlib.sha256(str(brain_dir).encode("utf-8")).hexdigest()[:32] + content_hash = _canonical_content_hash(event_type, source, redacted_data) event = { "ts": ts, "session": session, @@ -323,6 +419,10 @@ def emit( "tags": redacted_tags, "valid_from": valid_from, "valid_until": valid_until, + "brain_id": tenant_id, + "event_id": event_id, + "device_id": device_id, + "content_hash": content_hash, } # Best-effort side-log of the unredacted original. Platform-locked, @@ -579,15 +679,15 @@ def compute_leading_indicators(session: int, ctx: BrainContext | None = None) -> return result -def get_current_session() -> int: +def get_current_session() -> int | None: """Public alias for session detection. Used by brain.track_rule().""" return _detect_session() -def _detect_session(ctx: BrainContext | None = None) -> int: +def _detect_session(ctx: BrainContext | None = None) -> int | None: """Detect current session number from the events table. - Returns MAX(session) from events, or 0 if unavailable. + Returns MAX(session) from events, or None if unavailable. """ db_path = ctx.db_path if ctx else _p.DB_PATH @@ -601,7 +701,7 @@ def _detect_session(ctx: BrainContext | None = None) -> int: except Exception: pass - return 0 + return None # ── Brain-quality functions (promoted from brain shim) ──────────────── diff --git a/Gradata/src/gradata/_migrations/004_event_identity_unique_key.py b/Gradata/src/gradata/_migrations/004_event_identity_unique_key.py new file mode 100644 index 00000000..b03e72f4 --- /dev/null +++ b/Gradata/src/gradata/_migrations/004_event_identity_unique_key.py @@ -0,0 +1,165 @@ +# ruff: noqa: N999 # numbered migration module — digit prefix is intentional +"""Migration 004: make event identity unique on (brain_id, event_id).""" + +from __future__ import annotations + +import argparse +import json +import sqlite3 +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) +from _runner import ( # type: ignore[import-not-found] + add_column_if_missing, + create_index_if_missing, + has_applied, + mark_applied, + resolve_brain_db, + table_exists, +) +from _ulid import ulid_from_iso # type: ignore[import-not-found] +from device_uuid import get_or_create_device_id # type: ignore[import-not-found] + +NAME = "004_event_identity_unique_key" + + +def plan(conn: sqlite3.Connection) -> dict: + if not table_exists(conn, "events"): + return {"actions": []} + return { + "actions": [ + "DROP INDEX idx_events_dedup", + "DROP INDEX idx_events_dedup_tenant", + "ALTER events ADD brain_id TEXT", + "ensure UNIQUE index idx_events_brain_event_id(brain_id, event_id)", + ] + } + + +def up(conn: sqlite3.Connection, tenant_id: str) -> dict: + summary: dict = { + "columns_added": [], + "indexes_created": [], + "indexes_dropped": [], + "rows_backfilled": 0, + } + if not table_exists(conn, "events"): + return summary + + if add_column_if_missing(conn, "events", "brain_id", "TEXT"): + summary["columns_added"].append("events.brain_id") + for col, decl in [ + ("event_id", "TEXT"), + ("device_id", "TEXT"), + ("content_hash", "TEXT"), + ("correction_chain_id", "TEXT"), + ("origin_agent", "TEXT"), + ]: + if add_column_if_missing(conn, "events", col, decl): + summary["columns_added"].append(f"events.{col}") + + for idx in ("idx_events_dedup", "idx_events_dedup_tenant"): + if conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='index' AND name = ?", + (idx,), + ).fetchone(): + conn.execute(f"DROP INDEX {idx}") + summary["indexes_dropped"].append(idx) + + brain_dir = _brain_dir_for(conn) + device_id = get_or_create_device_id(brain_dir) + rows = conn.execute( + "SELECT id, ts, type, source, data_json FROM events " + "WHERE brain_id IS NULL OR event_id IS NULL OR device_id IS NULL OR content_hash IS NULL" + ).fetchall() + updates: list[tuple[str, str, str, str, int]] = [] + for row_id, ts, ev_type, source, data_json in rows: + event_id = ulid_from_iso(ts or "") + content_hash = _canonical_content_hash(ev_type, source, data_json) + updates.append((tenant_id, event_id, device_id, content_hash, row_id)) + if updates: + conn.executemany( + "UPDATE events SET " + "brain_id = COALESCE(brain_id, ?), " + "event_id = COALESCE(event_id, ?), " + "device_id = COALESCE(device_id, ?), " + "content_hash = COALESCE(content_hash, ?) " + "WHERE id = ?", + updates, + ) + summary["rows_backfilled"] = len(updates) + + if create_index_if_missing( + conn, + "idx_events_brain_event_id", + "events", + "brain_id, event_id", + unique=True, + ): + summary["indexes_created"].append("idx_events_brain_event_id") + return summary + + +def _canonical_content_hash(ev_type: str, source: str | None, data_json: str | None) -> str: + import hashlib + + try: + data = json.loads(data_json) if data_json else {} + except (json.JSONDecodeError, TypeError): + data = {"_raw": data_json} + canonical = json.dumps( + {"type": ev_type, "source": source or "", "data": data}, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + ) + return hashlib.sha256(canonical.encode("utf-8")).hexdigest() + + +def _brain_dir_for(conn: sqlite3.Connection) -> Path: + row = conn.execute("PRAGMA database_list").fetchone() + if row and row[2]: + return Path(row[2]).resolve().parent + return Path.cwd() + + +def _main() -> int: + ap = argparse.ArgumentParser(description=f"Run migration {NAME}") + ap.add_argument("--brain", help="Path to brain directory or system.db") + ap.add_argument("--dry-run", action="store_true") + args = ap.parse_args() + + db_path = resolve_brain_db(args.brain) + if not db_path.exists(): + print(f"ERROR: brain DB not found at {db_path}", file=sys.stderr) + return 2 + + conn = sqlite3.connect(str(db_path)) + try: + if has_applied(conn, NAME) and not args.dry_run: + print(f"Already applied: {NAME} (no-op)") + return 0 + p = plan(conn) + print("\n--- plan ---") + for action in p["actions"]: + print(f" {action}") + if args.dry_run: + print("\n(dry-run) no changes made") + return 0 + summary = up(conn, tenant_id="") + mark_applied( + conn, + NAME, + rows_affected=summary["rows_backfilled"], + notes=json.dumps({k: v for k, v in summary.items() if k != "rows_backfilled"}), + ) + conn.commit() + print(json.dumps(summary, indent=2)) + return 0 + finally: + conn.close() + + +if __name__ == "__main__": + raise SystemExit(_main()) diff --git a/Gradata/src/gradata/_migrations/device_uuid.py b/Gradata/src/gradata/_migrations/device_uuid.py index 9b2f3d93..a7e60df1 100644 --- a/Gradata/src/gradata/_migrations/device_uuid.py +++ b/Gradata/src/gradata/_migrations/device_uuid.py @@ -16,17 +16,35 @@ import argparse import contextlib +import hashlib +import json import os import re -import uuid +import socket from pathlib import Path DEVICE_FILE = ".device_id" _DEVICE_RE = re.compile(r"^dev_[0-9a-f]{32}$") -def _new_device_id() -> str: - return f"dev_{uuid.uuid4().hex}" +def _brain_identity(brain: Path) -> str: + manifest = brain / "brain.manifest.json" + if manifest.is_file(): + try: + data = json.loads(manifest.read_text(encoding="utf-8")) + metadata = data.get("metadata", {}) if isinstance(data, dict) else {} + for key in ("brain_id", "id", "name"): + value = metadata.get(key) or data.get(key) + if value: + return str(value) + except (OSError, json.JSONDecodeError): + pass + return str(brain) + + +def _new_device_id(brain: Path) -> str: + seed = f"{socket.gethostname()}:{_brain_identity(brain)}" + return f"dev_{hashlib.sha256(seed.encode('utf-8')).hexdigest()[:32]}" def _is_valid(s: str) -> bool: @@ -54,7 +72,7 @@ def get_or_create_device_id(brain_dir: str | Path) -> str: # code keeps seeing invalid ids. existing_invalid = True - new_did = _new_device_id() + new_did = _new_device_id(brain) tmp = brain / f".device_id.tmp.{os.getpid()}" flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL try: diff --git a/Gradata/src/gradata/_mine_transcripts.py b/Gradata/src/gradata/_mine_transcripts.py index 60f28b85..1a3d5bb3 100644 --- a/Gradata/src/gradata/_mine_transcripts.py +++ b/Gradata/src/gradata/_mine_transcripts.py @@ -219,6 +219,24 @@ def _session_uuid_to_int(session_uuid: str) -> int: return int(h, 16) & 0x7FFFFFFF +def _stable_event_id(ev: dict, payload: dict) -> str: + """Stable replay id so transcript mining is idempotent across runs.""" + canonical = json.dumps( + { + "ts": ev.get("ts", ""), + "session": 0, + "type": ev.get("event", ""), + "source": ev.get("source", ""), + "data": payload, + }, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + default=str, + ) + return hashlib.sha256(canonical.encode("utf-8")).hexdigest()[:32] + + def _detect_signals(text: str) -> list[str]: if not text or len(text) < 10: return [] @@ -423,6 +441,7 @@ def run_mine( session=0, ts=ev["ts"], ctx=brain.ctx, + event_id=_stable_event_id(ev, payload), ) except Exception: skipped += 1 diff --git a/Gradata/src/gradata/brain.py b/Gradata/src/gradata/brain.py index fbe0b824..f1812545 100644 --- a/Gradata/src/gradata/brain.py +++ b/Gradata/src/gradata/brain.py @@ -236,7 +236,7 @@ def session(self) -> int: try: from gradata._events import get_current_session - return get_current_session() + return get_current_session() or 0 except Exception as exc: logger.debug("get_current_session failed: %s", exc) return 0 @@ -1513,7 +1513,7 @@ def emit( """Emit an event to the brain's event log.""" from gradata._events import emit - return emit(event_type, source, data or {}, tags or [], session or 0, ctx=self.ctx) + return emit(event_type, source, data or {}, tags or [], session, ctx=self.ctx) def query_events( self, @@ -1917,7 +1917,7 @@ def track_rule( try: from gradata._events import get_current_session - session = get_current_session() + session = get_current_session() or 0 except Exception as e: logger.debug("get_current_session failed, defaulting to 0: %s", e) session = 0 diff --git a/Gradata/src/gradata/cli.py b/Gradata/src/gradata/cli.py index 94d57a1a..3322c0c0 100644 --- a/Gradata/src/gradata/cli.py +++ b/Gradata/src/gradata/cli.py @@ -877,6 +877,37 @@ def cmd_cloud(args): print("usage: gradata cloud {enable|rotate-key|status|disconnect|sync-pull}") +def cmd_sync(args): + """Run local-to-cloud event sync.""" + from gradata.cloud import _credentials as _creds + from gradata.cloud.client import CloudClient + from gradata.cloud.sync import load_config, save_config + + brain_root = Path( + env_str("GRADATA_BRAIN") or getattr(args, "brain_dir", None) or Path.cwd() + ).resolve() + if not getattr(args, "full", False): + print("usage: gradata sync --full") + return + + cfg = load_config(brain_root) + cfg.sync_mode = "full" + save_config(brain_root, cfg) + + credential = _creds.resolve_credential(fallback=cfg.token) + if not credential: + print("error: no cloud credential found. Run `gradata cloud enable --key ...` first.") + sys.exit(2) + endpoint = _creds.resolve_endpoint(fallback=cfg.api_base) or None + cloud = CloudClient(brain_dir=brain_root, api_key=credential, endpoint=endpoint) + if not cloud.connect(): + print("error: cloud connection failed") + sys.exit(1) + ingested = cloud.sync() + print("sync_mode: full") + print(f"events_synced: {ingested}") + + def cmd_mine(args): """Backfill brain from Claude Code transcript archive (~/.claude/projects).""" from gradata._mine_transcripts import run_mine @@ -1305,6 +1336,10 @@ def main(): p_audit = sub.add_parser("audit", help="Data flow audit") p_audit.add_argument("--json", action="store_true") + # sync + p_sync = sub.add_parser("sync", help="Sync local events to Gradata Cloud") + p_sync.add_argument("--full", action="store_true", help="Backfill unsynced events/corrections") + # recall p_recall = sub.add_parser("recall", help="Recall relevant brain rules as XML") p_recall.add_argument("situation", help="Current situation or task") @@ -1574,6 +1609,7 @@ def main(): "manifest": cmd_manifest, "stats": cmd_stats, "audit": cmd_audit, + "sync": cmd_sync, "recall": cmd_recall, "export": cmd_export, "context": cmd_context, diff --git a/Gradata/src/gradata/cloud/sync.py b/Gradata/src/gradata/cloud/sync.py index 63d8a60a..f4ce8a08 100644 --- a/Gradata/src/gradata/cloud/sync.py +++ b/Gradata/src/gradata/cloud/sync.py @@ -58,6 +58,7 @@ class CloudConfig: token: str = "" api_base: str = _DEFAULT_API_BASE contribute_corpus: bool = False # Separate, stricter opt-in + sync_mode: str = "full" last_sync_at: str = "" key_scope: str = "" # Optional scope tag recorded at `gradata cloud enable` # Override the materializer's Tier 2 conflict threshold (|Δconfidence|). @@ -97,6 +98,7 @@ def load_config(brain_dir: Path) -> CloudConfig: token=str(data.get("token", "")), api_base=_normalize_api_base(str(data.get("api_base", _DEFAULT_API_BASE))), contribute_corpus=bool(data.get("contribute_corpus", False)), + sync_mode=str(data.get("sync_mode", "full") or "full"), last_sync_at=str(data.get("last_sync_at", "")), key_scope=str(data.get("key" + "_scope", "")), conflict_threshold=_coerce_threshold(data.get("conflict_threshold", 0.0)), diff --git a/Gradata/src/gradata/hooks/adapters/hermes.py b/Gradata/src/gradata/hooks/adapters/hermes.py index e9496a5c..93317cdb 100644 --- a/Gradata/src/gradata/hooks/adapters/hermes.py +++ b/Gradata/src/gradata/hooks/adapters/hermes.py @@ -14,6 +14,99 @@ AGENT = "hermes" +def _parse_simple_yaml(text: str) -> dict: + """Parse the small Hermes config shape without requiring PyYAML.""" + data: dict = {} + current_key: str | None = None + current_list: list | None = None + for raw in text.splitlines(): + line = raw.rstrip() + if not line.strip() or line.lstrip().startswith("#"): + continue + if not raw.startswith(" ") and line.endswith(":"): + current_key = line[:-1].strip() + data[current_key] = {} + current_list = None + continue + if not raw.startswith(" ") and ":" in line: + key, value = line.split(":", 1) + data[key.strip()] = _parse_scalar(value.strip()) + current_key = None + current_list = None + continue + if current_key is None: + continue + stripped = line.strip() + container = data.setdefault(current_key, {}) + if stripped.endswith(":") and not stripped.startswith("- "): + nested_key = stripped[:-1] + if isinstance(container, dict): + current_list = [] + container[nested_key] = current_list + continue + if stripped.startswith("- "): + if current_list is None: + current_list = [] + if isinstance(container, dict): + container.setdefault("items", current_list) + item_text = stripped[2:].strip() + if ":" in item_text: + key, value = item_text.split(":", 1) + current_list.append({key.strip(): _parse_scalar(value.strip())}) + else: + current_list.append(_parse_scalar(item_text)) + continue + if ( + current_list is not None + and current_list + and isinstance(current_list[-1], dict) + and ":" in stripped + ): + key, value = stripped.split(":", 1) + current_list[-1][key.strip()] = _parse_scalar(value.strip()) + return data + + +def _parse_scalar(value: str): + if not value: + return "" + if value in ("true", "false"): + return value == "true" + return value.strip("'\"") + + +def _dump_simple_yaml(data: dict, indent: int = 0) -> str: + lines: list[str] = [] + pad = " " * indent + for key, value in data.items(): + if isinstance(value, dict): + lines.append(f"{pad}{key}:") + lines.append(_dump_simple_yaml(value, indent + 2).rstrip()) + elif isinstance(value, list): + lines.append(f"{pad}{key}:") + for item in value: + if isinstance(item, dict): + first = True + for child_key, child_value in item.items(): + prefix = "- " if first else " " + lines.append(f"{pad} {prefix}{child_key}: {_format_scalar(child_value)}") + first = False + else: + lines.append(f"{pad} - {_format_scalar(item)}") + else: + lines.append(f"{pad}{key}: {_format_scalar(value)}") + return "\n".join(line for line in lines if line) + "\n" + + +def _format_scalar(value) -> str: + if isinstance(value, bool): + return "true" if value else "false" + text = str(value) + if any(ch in text for ch in (":", "#", "{", "}", "[", "]")): + return repr(text) + return text + + def install(brain_dir: Path, agent_config_path: Path) -> InstallResult: try: sig = hook_signature(AGENT, brain_dir) @@ -21,12 +114,11 @@ def install(brain_dir: Path, agent_config_path: Path) -> InstallResult: return InstallResult( AGENT, agent_config_path, "already_present", "hook already present" ) - import yaml existing = ( agent_config_path.read_text(encoding="utf-8") if agent_config_path.exists() else "" ) - loaded = yaml.safe_load(existing) if existing.strip() else {} + loaded = _parse_simple_yaml(existing) if existing.strip() else {} data = loaded if isinstance(loaded, dict) else {} hooks = data.setdefault("hooks", {}) if not isinstance(hooks, dict): @@ -41,7 +133,7 @@ def install(brain_dir: Path, agent_config_path: Path) -> InstallResult: AGENT, agent_config_path, "already_present", "hook already present" ) pre_tool_use.append({"id": sig, "command": hook_command(brain_dir)}) - atomic_write_text(agent_config_path, yaml.safe_dump(data, sort_keys=False)) + atomic_write_text(agent_config_path, _dump_simple_yaml(data)) return InstallResult(AGENT, agent_config_path, "added", "installed pre_tool_use hook") except Exception as exc: return failure(AGENT, agent_config_path, exc) diff --git a/Gradata/src/gradata/hooks/agent_precontext.py b/Gradata/src/gradata/hooks/agent_precontext.py index b241c34d..d6852379 100644 --- a/Gradata/src/gradata/hooks/agent_precontext.py +++ b/Gradata/src/gradata/hooks/agent_precontext.py @@ -145,6 +145,12 @@ def main(data: dict) -> dict | None: brain_dir = _resolve_agent_brain_dir() if not brain_dir: return None + try: + from gradata._config import BrainConfig + + max_prompt_chars = BrainConfig.load(brain_dir).max_recall_tokens * 4 + except ImportError: + max_prompt_chars = 2000 * 4 lessons_path = Path(brain_dir) / "lessons.md" if not lessons_path.is_file(): @@ -217,6 +223,8 @@ def main(data: dict) -> dict | None: # Compact header saves ~10 tokens vs XML open/close wrapper. block = "[agent-rules]\n" + "\n".join(lines) + if len(block) > max_prompt_chars: + block = block[:max_prompt_chars].rstrip() return {"result": block} except Exception: return None diff --git a/Gradata/src/gradata/hooks/inject_brain_rules.py b/Gradata/src/gradata/hooks/inject_brain_rules.py index 46fce94f..ecc69efe 100644 --- a/Gradata/src/gradata/hooks/inject_brain_rules.py +++ b/Gradata/src/gradata/hooks/inject_brain_rules.py @@ -361,9 +361,9 @@ def main(data: dict) -> dict | None: if not brain_dir: return None try: - from gradata._config import _load_brain_config + from gradata._config import BrainConfig - brain_cfg = _load_brain_config(brain_dir) + brain_cfg = BrainConfig.load(brain_dir) except ImportError: brain_cfg = None max_rules = MAX_RULES diff --git a/Gradata/src/gradata/hooks/jit_inject.py b/Gradata/src/gradata/hooks/jit_inject.py index 1579240a..286b6909 100644 --- a/Gradata/src/gradata/hooks/jit_inject.py +++ b/Gradata/src/gradata/hooks/jit_inject.py @@ -291,6 +291,12 @@ def main(data: dict) -> dict | None: brain_dir = resolve_brain_dir() if not brain_dir: return None + try: + from gradata._config import BrainConfig + + max_prompt_chars = BrainConfig.load(brain_dir).max_recall_tokens * 4 + except ImportError: + max_prompt_chars = 2000 * 4 lessons_path = Path(brain_dir) / "lessons.md" if not lessons_path.is_file(): @@ -381,6 +387,8 @@ def _already_in_wisdom(desc: str) -> bool: if not lines: return None rules_block = "\n".join(lines) + if len(rules_block) > max_prompt_chars: + rules_block = rules_block[:max_prompt_chars].rstrip() return {"result": rules_block} diff --git a/Gradata/src/gradata/middleware/_core.py b/Gradata/src/gradata/middleware/_core.py index 3f38e3af..7747d8d1 100644 --- a/Gradata/src/gradata/middleware/_core.py +++ b/Gradata/src/gradata/middleware/_core.py @@ -122,6 +122,14 @@ def __init__( self._static_lessons = lessons self.max_rules = max_rules self.min_confidence = min_confidence + self.max_prompt_chars = 2000 * 4 + if self._brain_path is not None: + try: + from gradata._config import BrainConfig + + self.max_prompt_chars = BrainConfig.load(self._brain_path).max_recall_tokens * 4 + except ImportError: + pass # mtime-keyed cache of parsed+filtered lessons. Without it, each # `load()` call re-reads and re-parses lessons.md — a per-LLM-call hit # in middleware (on_llm_start / on_llm_end both call into this). @@ -281,7 +289,10 @@ def build_brain_rules_block(source: RuleSource) -> str: obfuscate_instruction(f"[{l.state}:{l.confidence:.2f}] {l.category}: {l.description}") for l in selected ] - return "\n" + "\n".join(lines) + "\n" + block = "\n" + "\n".join(lines) + "\n" + if len(block) > source.max_prompt_chars: + block = block[: source.max_prompt_chars].rstrip() + return block def inject_into_system(system: str | None, block: str) -> str: diff --git a/Gradata/tests/test_brain_config_propagation.py b/Gradata/tests/test_brain_config_propagation.py new file mode 100644 index 00000000..1074dae9 --- /dev/null +++ b/Gradata/tests/test_brain_config_propagation.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import json +import os + +from gradata._types import Lesson, LessonState +from gradata.enhancements.self_improvement import format_lessons + + +def _long_lessons(count: int = 12) -> list[Lesson]: + long_tail = "alpha beta gamma " * 80 + return [ + Lesson( + date="2026-05-06", + state=LessonState.RULE, + confidence=0.95, + category="CODE", + description=f"rule {i} {long_tail}", + ) + for i in range(count) + ] + + +def test_brain_config_max_recall_tokens_reaches_all_injection_paths(tmp_path, monkeypatch): + max_tokens = 500 + max_chars = max_tokens * 4 + (tmp_path / "brain-config.json").write_text( + json.dumps({"max_recall_tokens": max_tokens, "ranker": "flat"}), + encoding="utf-8", + ) + (tmp_path / "lessons.md").write_text(format_lessons(_long_lessons()), encoding="utf-8") + monkeypatch.setenv("BRAIN_DIR", str(tmp_path)) + monkeypatch.setenv("GRADATA_BRAIN_DIR", str(tmp_path)) + monkeypatch.setenv("GRADATA_JIT_ENABLED", "1") + monkeypatch.setenv("GRADATA_SUBAGENT_DEDUP", "0") + + from gradata.hooks import agent_precontext, inject_brain_rules, jit_inject + from gradata.middleware import RuleSource, build_brain_rules_block + + session_result = inject_brain_rules.main({"session_type": "alpha beta", "session_number": 1}) + agent_result = agent_precontext.main( + {"tool_name": "Agent", "tool_input": {"subagent_type": "code", "prompt": "fix code"}} + ) + jit_result = jit_inject.main({"prompt": "alpha beta gamma " * 10}) + middleware_block = build_brain_rules_block(RuleSource(brain_path=tmp_path)) + + assert session_result is not None + assert agent_result is not None + assert jit_result is not None + assert len(session_result["result"]) <= max_chars + assert len(agent_result["result"]) <= max_chars + assert len(jit_result["result"]) <= max_chars + assert len(middleware_block) <= max_chars + assert os.environ["GRADATA_JIT_ENABLED"] == "1" diff --git a/Gradata/tests/test_brain_events.py b/Gradata/tests/test_brain_events.py index cc04f015..ce3df5b9 100644 --- a/Gradata/tests/test_brain_events.py +++ b/Gradata/tests/test_brain_events.py @@ -50,6 +50,11 @@ def test_explicit_session_stored(self, fresh_brain): event = fresh_brain.emit("X", "s", session=99) assert event["session"] == 99 + def test_emit_uses_active_session_when_none_passed(self, fresh_brain): + fresh_brain.emit("SESSION_START", "pytest", {"session": 7}, session=7) + event = fresh_brain.correct("draft text", "final text") + assert event["session"] == 7 + def test_empty_data_defaults_to_empty_dict(self, fresh_brain): event = fresh_brain.emit("X", "s") assert isinstance(event["data"], dict) diff --git a/Gradata/tests/test_migration_002_event_identity.py b/Gradata/tests/test_migration_002_event_identity.py index d2edd59b..0132146c 100644 --- a/Gradata/tests/test_migration_002_event_identity.py +++ b/Gradata/tests/test_migration_002_event_identity.py @@ -168,17 +168,27 @@ def test_device_id_persisted_to_brain_dir(tmp_path): assert re.fullmatch(r"dev_[0-9a-f]{32}", content) -def test_new_emit_leaves_identity_columns_null_for_now(tmp_path): - """emit() does not yet populate identity columns — only Migration 002 backfill does. - - Wiring emit() to write event_id/device_id/content_hash is deferred; this - test pins the current contract so a future change flips it deliberately. - """ +def test_new_emit_populates_event_identity_columns(tmp_path): brain = init_brain(tmp_path) - brain.emit("FRESH", "src", {"k": "v"}, []) + from gradata._events import emit + + fixed_ts = "2026-05-06T12:00:00+00:00" + first = emit("FRESH", "src", {"k": "v"}, [], ctx=brain.ctx, ts=fixed_ts) + second = emit("FRESH", "src", {"k": "v"}, [], ctx=brain.ctx, ts=fixed_ts) with _conn(brain) as conn: - row = conn.execute( - "SELECT event_id, device_id, content_hash FROM events WHERE type = 'FRESH'" - ).fetchone() - assert row == (None, None, None) + rows = conn.execute( + "SELECT event_id, device_id, content_hash FROM events WHERE type = 'FRESH' ORDER BY id" + ).fetchall() + assert len(rows) == 2 + first_event_id, first_device_id, first_hash = rows[0] + second_event_id, second_device_id, second_hash = rows[1] + assert first["event_id"] == first_event_id + assert second["event_id"] == second_event_id + assert re.fullmatch(r"[0-9A-HJKMNP-TV-Z]{26}", first_event_id), first_event_id + assert re.fullmatch(r"[0-9A-HJKMNP-TV-Z]{26}", second_event_id), second_event_id + assert first_event_id != second_event_id + assert re.fullmatch(r"dev_[0-9a-f]{32}", first_device_id) + assert first_device_id == second_device_id + assert re.fullmatch(r"[0-9a-f]{64}", first_hash) + assert first_hash == second_hash