diff --git a/pyproject.toml b/pyproject.toml index 1ae32e14..0c75ed59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,6 +106,7 @@ per-file-ignores = {"src/brainlayer/cli/__init__.py" = ["F401"]} [tool.pytest.ini_options] asyncio_mode = "auto" +pythonpath = ["src"] markers = [ "integration: tests that need the real production DB (268K+ chunks)", "slow: tests that load ML models or take >30s", diff --git a/src/brainlayer/pipeline/enrichment_tiers.py b/src/brainlayer/pipeline/enrichment_tiers.py new file mode 100644 index 00000000..54bdf27f --- /dev/null +++ b/src/brainlayer/pipeline/enrichment_tiers.py @@ -0,0 +1,138 @@ +"""Tiered enrichment selectivity for BrainLayer. + +Assigns each chunk a processing tier based on source, content type, and age: + + T0 (IMMEDIATE): manual/digest — always enrich, highest priority + T1 (HOURLY): recent claude_code chunks — hourly local enrichment + T2 (LAZY): old claude_code backlog — lazy remote batch + T3 (EXPLICIT): youtube transcripts — only when explicitly triggered + +Design note: tiers are IntEnum so T0 < T1 < T2 < T3 comparisons work naturally +and callers can filter "up to tier N" with `tier <= max_tier`. +""" + +from datetime import datetime, timedelta, timezone +from enum import IntEnum +from typing import List, Optional, Set + +# ── Tier definition ────────────────────────────────────────────────────── + + +class EnrichmentTier(IntEnum): + T0_IMMEDIATE = 0 # manual / digest — enrich immediately + T1_HOURLY = 1 # recent claude_code — hourly local run + T2_LAZY = 2 # old claude_code backlog — lazy / remote batch + T3_EXPLICIT = 3 # youtube — explicit request only + + +# ── Constants ──────────────────────────────────────────────────────────── + +# Sources that are always T0 regardless of age or content type. +T0_SOURCES: frozenset = frozenset({"manual", "digest"}) + +# Sources that are always T3 regardless of age or content type. +T3_SOURCES: frozenset = frozenset({"youtube"}) + +# Default recency window: chunks within this many days are T1, older are T2. +DEFAULT_RECENCY_DAYS: int = 7 + +# High-value content types enriched at T1 (and T2 for old backlog). +T1_CONTENT_TYPES: List[str] = ["ai_code", "stack_trace", "user_message", "assistant_text"] + +# Content types that should never be enriched (low-signal noise). +SKIP_CONTENT_TYPES: frozenset = frozenset({"noise"}) + +# Only these sources participate in the T1/T2 recency gate. +# Unrecognised sources fall through to T2 (lazy backlog) rather than T1. +T1_T2_SOURCES: frozenset = frozenset({"claude_code"}) + + +# ── Classifier ─────────────────────────────────────────────────────────── + + +def classify_chunk_tier( + source: str, + content_type: str, + created_at: Optional[str], + recency_days: int = DEFAULT_RECENCY_DAYS, +) -> EnrichmentTier: + """Return the enrichment tier for a chunk. + + Args: + source: The chunk source field (e.g. "claude_code", "youtube", "manual"). + content_type: The chunk content_type field (e.g. "ai_code", "assistant_text"). + created_at: ISO timestamp string when the chunk was created, or None. + recency_days: Window (days) for T1 vs T2 split (default 7). + + Returns: + EnrichmentTier for the chunk. + """ + # Noise is never worth enriching, regardless of source. + if content_type in SKIP_CONTENT_TYPES: + return EnrichmentTier.T3_EXPLICIT + + # T0: always-on sources (manual brain_store, digested documents) + if source in T0_SOURCES: + return EnrichmentTier.T0_IMMEDIATE + + # T3: archival sources never touched by the default pipeline + if source in T3_SOURCES: + return EnrichmentTier.T3_EXPLICIT + + # Only recognised T1/T2 sources participate in the recency gate. + # Unknown sources default to lazy backlog (T2) rather than crowding T1. + if source not in T1_T2_SOURCES: + return EnrichmentTier.T2_LAZY + + # Claude code: age determines tier. + if _is_recent(created_at, recency_days): + return EnrichmentTier.T1_HOURLY + return EnrichmentTier.T2_LAZY + + +def _is_recent(created_at: Optional[str], recency_days: int) -> bool: + """Return True if created_at falls within the recency window.""" + if created_at is None: + return False + try: + dt = datetime.fromisoformat(created_at) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + cutoff = datetime.now(timezone.utc) - timedelta(days=recency_days) + return dt >= cutoff + except (ValueError, TypeError): + return False + + +# ── Selectors ──────────────────────────────────────────────────────────── + + +def get_tier_content_types(tier: EnrichmentTier) -> List[str]: + """Return the content types relevant for a given tier. + + All tiers currently share the same high-value content type list. + This function exists so callers can parametrize queries without + hard-coding content types. + """ + return list(T1_CONTENT_TYPES) + + +def get_tier_source_filter(tier: EnrichmentTier) -> Set[str]: + """Return the set of *allowed* sources for a given tier. + + Useful for building SQL IN clauses or filtering chunk lists. + + T0 → {manual, digest} + T1 → {claude_code} (explicitly excludes youtube) + T2 → {claude_code} (old backlog only) + T3 → {youtube} + """ + if tier == EnrichmentTier.T0_IMMEDIATE: + return set(T0_SOURCES) + if tier == EnrichmentTier.T1_HOURLY: + return {"claude_code"} + if tier == EnrichmentTier.T2_LAZY: + return {"claude_code"} + if tier == EnrichmentTier.T3_EXPLICIT: + return set(T3_SOURCES) + return set() # pragma: no cover diff --git a/tests/test_enrichment_tiers.py b/tests/test_enrichment_tiers.py new file mode 100644 index 00000000..ca4033ac --- /dev/null +++ b/tests/test_enrichment_tiers.py @@ -0,0 +1,225 @@ +"""TDD tests for tiered enrichment selectivity. + +Tiers: + T0 (IMMEDIATE): manual/digest sources and high-signal types (decision, mistake) + T1 (HOURLY): recent claude_code chunks (ai_code, stack_trace, user_message, assistant_text) + T2 (LAZY): old claude_code backlog (older than recency_days) + T3 (EXPLICIT): youtube transcripts — only when explicitly requested +""" + +from datetime import datetime, timedelta, timezone + +from brainlayer.pipeline.enrichment_tiers import ( + EnrichmentTier, + classify_chunk_tier, + get_tier_content_types, + get_tier_source_filter, +) + +# ── Helpers ───────────────────────────────────────────────────────────── + + +def _dt(days_ago: int) -> str: + """Return ISO timestamp N days ago in UTC.""" + return (datetime.now(timezone.utc) - timedelta(days=days_ago)).isoformat() + + +RECENT = _dt(3) +OLD = _dt(30) +VERY_OLD = _dt(365) + +# ── T0: IMMEDIATE — manual / digest / high-signal memory types ─────────── + + +def test_manual_source_is_tier0(): + """Manually stored chunks (brain_store) must always be tier 0.""" + tier = classify_chunk_tier(source="manual", content_type="assistant_text", created_at=RECENT) + assert tier == EnrichmentTier.T0_IMMEDIATE + + +def test_digest_source_is_tier0(): + """Digested documents must always be tier 0.""" + tier = classify_chunk_tier(source="digest", content_type="assistant_text", created_at=RECENT) + assert tier == EnrichmentTier.T0_IMMEDIATE + + +def test_manual_source_is_tier0_regardless_of_age(): + """Manual chunks should be T0 even if they are old.""" + tier = classify_chunk_tier(source="manual", content_type="assistant_text", created_at=VERY_OLD) + assert tier == EnrichmentTier.T0_IMMEDIATE + + +def test_digest_source_is_tier0_regardless_of_age(): + """Digest chunks should be T0 even if they are old.""" + tier = classify_chunk_tier(source="digest", content_type="ai_code", created_at=VERY_OLD) + assert tier == EnrichmentTier.T0_IMMEDIATE + + +# ── T1: HOURLY — recent claude_code ────────────────────────────────────── + + +def test_recent_claude_code_ai_code_is_tier1(): + """Recent ai_code from claude_code sessions is tier 1.""" + tier = classify_chunk_tier(source="claude_code", content_type="ai_code", created_at=RECENT) + assert tier == EnrichmentTier.T1_HOURLY + + +def test_recent_claude_code_stack_trace_is_tier1(): + """Recent stack_trace from claude_code sessions is tier 1.""" + tier = classify_chunk_tier(source="claude_code", content_type="stack_trace", created_at=RECENT) + assert tier == EnrichmentTier.T1_HOURLY + + +def test_recent_claude_code_user_message_is_tier1(): + """Recent user_message from claude_code sessions is tier 1.""" + tier = classify_chunk_tier(source="claude_code", content_type="user_message", created_at=RECENT) + assert tier == EnrichmentTier.T1_HOURLY + + +def test_recent_claude_code_assistant_text_is_tier1(): + """Recent assistant_text from claude_code sessions is tier 1.""" + tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=RECENT) + assert tier == EnrichmentTier.T1_HOURLY + + +def test_today_claude_code_is_tier1(): + """Chunks from today (0 days ago) are recent.""" + today = _dt(0) + tier = classify_chunk_tier(source="claude_code", content_type="ai_code", created_at=today) + assert tier == EnrichmentTier.T1_HOURLY + + +def test_within_recency_window_is_tier1(): + """Chunks well within the recency window (6 days) are T1. + + The boundary (7 days) is not tested at microsecond precision here because + classify_chunk_tier evaluates 'now' slightly after the timestamp is captured. + test_just_past_recency_window_is_tier2 covers the other side at 8 days. + """ + at_boundary = _dt(6) + tier = classify_chunk_tier(source="claude_code", content_type="ai_code", created_at=at_boundary) + assert tier == EnrichmentTier.T1_HOURLY + + +# ── T2: LAZY — old claude_code backlog ─────────────────────────────────── + + +def test_old_claude_code_assistant_text_is_tier2(): + """Old assistant_text backlog (>7 days) from claude_code is tier 2.""" + tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=OLD) + assert tier == EnrichmentTier.T2_LAZY + + +def test_old_claude_code_user_message_is_tier2(): + """Old user_message backlog from claude_code is tier 2.""" + tier = classify_chunk_tier(source="claude_code", content_type="user_message", created_at=OLD) + assert tier == EnrichmentTier.T2_LAZY + + +def test_very_old_claude_code_is_tier2(): + """Very old claude_code chunks (>1 year) are still only T2, not T3.""" + tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=VERY_OLD) + assert tier == EnrichmentTier.T2_LAZY + + +def test_just_past_recency_window_is_tier2(): + """Chunks just past the recency window (8 days) are T2.""" + just_past = _dt(8) + tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=just_past) + assert tier == EnrichmentTier.T2_LAZY + + +# ── T3: EXPLICIT — youtube transcripts ────────────────────────────────── + + +def test_youtube_source_is_tier3(): + """YouTube transcript chunks are always tier 3 regardless of content type or age.""" + tier = classify_chunk_tier(source="youtube", content_type="assistant_text", created_at=RECENT) + assert tier == EnrichmentTier.T3_EXPLICIT + + +def test_youtube_source_old_is_still_tier3(): + """Old YouTube chunks stay T3.""" + tier = classify_chunk_tier(source="youtube", content_type="assistant_text", created_at=VERY_OLD) + assert tier == EnrichmentTier.T3_EXPLICIT + + +def test_unknown_source_defaults_to_tier2(): + """Unknown/unrecognized source defaults to T2 (lazy backlog), regardless of age. + + Only claude_code participates in T1 (recency-gated). Unrecognised sources + are treated as lazy backlog to prevent them from crowding hourly enrichment. + """ + tier = classify_chunk_tier(source="unknown", content_type="assistant_text", created_at=RECENT) + assert tier == EnrichmentTier.T2_LAZY + + +def test_unknown_source_old_is_tier2(): + """Old unknown source is also T2 (same as recent unknown).""" + tier = classify_chunk_tier(source="unknown", content_type="assistant_text", created_at=OLD) + assert tier == EnrichmentTier.T2_LAZY + + +# ── Noise content type — never enrich ──────────────────────────────────── + + +def test_noise_content_type_is_tier3_explicit(): + """Noise chunks should never be enriched by default (T3 = explicit only).""" + tier = classify_chunk_tier(source="claude_code", content_type="noise", created_at=RECENT) + assert tier == EnrichmentTier.T3_EXPLICIT + + +def test_noise_content_type_overrides_t0_source(): + """Even a T0 source should yield T3 if content_type is noise.""" + tier = classify_chunk_tier(source="manual", content_type="noise", created_at=RECENT) + assert tier == EnrichmentTier.T3_EXPLICIT + + +# ── None / missing created_at ──────────────────────────────────────────── + + +def test_none_created_at_defaults_to_tier2_for_claude_code(): + """Chunks with no created_at timestamp are treated as old (T2 for claude_code).""" + tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=None) + assert tier == EnrichmentTier.T2_LAZY + + +# ── get_tier_content_types ─────────────────────────────────────────────── + + +def test_get_tier_content_types_returns_high_value_for_t1(): + """T1 content types should include the core high-value types.""" + types = get_tier_content_types(EnrichmentTier.T1_HOURLY) + assert "ai_code" in types + assert "stack_trace" in types + assert "user_message" in types + assert "assistant_text" in types + + +# ── get_tier_source_filter ─────────────────────────────────────────────── + + +def test_get_tier_source_filter_t1_excludes_youtube(): + """T1 source filter must exclude youtube so it's not processed hourly.""" + sources = get_tier_source_filter(EnrichmentTier.T1_HOURLY) + assert "youtube" not in sources + + +def test_get_tier_source_filter_t0_includes_manual_and_digest(): + """T0 source filter must include manual and digest sources.""" + sources = get_tier_source_filter(EnrichmentTier.T0_IMMEDIATE) + assert "manual" in sources + assert "digest" in sources + + +def test_get_tier_source_filter_t3_only_youtube(): + """T3 source filter should only include youtube.""" + sources = get_tier_source_filter(EnrichmentTier.T3_EXPLICIT) + assert sources == {"youtube"} + + +def test_tier_ordering(): + """Lower tier numbers should be higher priority (T0 < T1 < T2 < T3).""" + assert EnrichmentTier.T0_IMMEDIATE < EnrichmentTier.T1_HOURLY + assert EnrichmentTier.T1_HOURLY < EnrichmentTier.T2_LAZY + assert EnrichmentTier.T2_LAZY < EnrichmentTier.T3_EXPLICIT