Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ per-file-ignores = {"src/brainlayer/cli/__init__.py" = ["F401"]}

[tool.pytest.ini_options]
asyncio_mode = "auto"
pythonpath = ["src"]
markers = [
"integration: tests that need the real production DB (268K+ chunks)",
"slow: tests that load ML models or take >30s",
Expand Down
138 changes: 138 additions & 0 deletions src/brainlayer/pipeline/enrichment_tiers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Tiered enrichment selectivity for BrainLayer.

Assigns each chunk a processing tier based on source, content type, and age:

T0 (IMMEDIATE): manual/digest — always enrich, highest priority
T1 (HOURLY): recent claude_code chunks — hourly local enrichment
T2 (LAZY): old claude_code backlog — lazy remote batch
T3 (EXPLICIT): youtube transcripts — only when explicitly triggered

Design note: tiers are IntEnum so T0 < T1 < T2 < T3 comparisons work naturally
and callers can filter "up to tier N" with `tier <= max_tier`.
"""

from datetime import datetime, timedelta, timezone
from enum import IntEnum
from typing import List, Optional, Set

# ── Tier definition ──────────────────────────────────────────────────────


class EnrichmentTier(IntEnum):
T0_IMMEDIATE = 0 # manual / digest — enrich immediately
T1_HOURLY = 1 # recent claude_code — hourly local run
T2_LAZY = 2 # old claude_code backlog — lazy / remote batch
T3_EXPLICIT = 3 # youtube — explicit request only


# ── Constants ────────────────────────────────────────────────────────────

# Sources that are always T0 regardless of age or content type.
T0_SOURCES: frozenset = frozenset({"manual", "digest"})

# Sources that are always T3 regardless of age or content type.
T3_SOURCES: frozenset = frozenset({"youtube"})

# Default recency window: chunks within this many days are T1, older are T2.
DEFAULT_RECENCY_DAYS: int = 7

# High-value content types enriched at T1 (and T2 for old backlog).
T1_CONTENT_TYPES: List[str] = ["ai_code", "stack_trace", "user_message", "assistant_text"]

# Content types that should never be enriched (low-signal noise).
SKIP_CONTENT_TYPES: frozenset = frozenset({"noise"})

# Only these sources participate in the T1/T2 recency gate.
# Unrecognised sources fall through to T2 (lazy backlog) rather than T1.
T1_T2_SOURCES: frozenset = frozenset({"claude_code"})


# ── Classifier ───────────────────────────────────────────────────────────


def classify_chunk_tier(
source: str,
content_type: str,
created_at: Optional[str],
recency_days: int = DEFAULT_RECENCY_DAYS,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
) -> EnrichmentTier:
"""Return the enrichment tier for a chunk.

Args:
source: The chunk source field (e.g. "claude_code", "youtube", "manual").
content_type: The chunk content_type field (e.g. "ai_code", "assistant_text").
created_at: ISO timestamp string when the chunk was created, or None.
recency_days: Window (days) for T1 vs T2 split (default 7).

Returns:
EnrichmentTier for the chunk.
"""
# Noise is never worth enriching, regardless of source.
if content_type in SKIP_CONTENT_TYPES:
return EnrichmentTier.T3_EXPLICIT

# T0: always-on sources (manual brain_store, digested documents)
if source in T0_SOURCES:
return EnrichmentTier.T0_IMMEDIATE

# T3: archival sources never touched by the default pipeline
if source in T3_SOURCES:
return EnrichmentTier.T3_EXPLICIT

# Only recognised T1/T2 sources participate in the recency gate.
# Unknown sources default to lazy backlog (T2) rather than crowding T1.
if source not in T1_T2_SOURCES:
return EnrichmentTier.T2_LAZY

# Claude code: age determines tier.
if _is_recent(created_at, recency_days):
return EnrichmentTier.T1_HOURLY
return EnrichmentTier.T2_LAZY


def _is_recent(created_at: Optional[str], recency_days: int) -> bool:
"""Return True if created_at falls within the recency window."""
if created_at is None:
return False
try:
dt = datetime.fromisoformat(created_at)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
cutoff = datetime.now(timezone.utc) - timedelta(days=recency_days)
return dt >= cutoff
except (ValueError, TypeError):
return False


# ── Selectors ────────────────────────────────────────────────────────────


def get_tier_content_types(tier: EnrichmentTier) -> List[str]:
"""Return the content types relevant for a given tier.

All tiers currently share the same high-value content type list.
This function exists so callers can parametrize queries without
hard-coding content types.
"""
return list(T1_CONTENT_TYPES)


def get_tier_source_filter(tier: EnrichmentTier) -> Set[str]:
"""Return the set of *allowed* sources for a given tier.

Useful for building SQL IN clauses or filtering chunk lists.

T0 → {manual, digest}
T1 → {claude_code} (explicitly excludes youtube)
T2 → {claude_code} (old backlog only)
T3 → {youtube}
"""
if tier == EnrichmentTier.T0_IMMEDIATE:
return set(T0_SOURCES)
if tier == EnrichmentTier.T1_HOURLY:
return {"claude_code"}
if tier == EnrichmentTier.T2_LAZY:
return {"claude_code"}
if tier == EnrichmentTier.T3_EXPLICIT:
return set(T3_SOURCES)
return set() # pragma: no cover
225 changes: 225 additions & 0 deletions tests/test_enrichment_tiers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
"""TDD tests for tiered enrichment selectivity.

Tiers:
T0 (IMMEDIATE): manual/digest sources and high-signal types (decision, mistake)
T1 (HOURLY): recent claude_code chunks (ai_code, stack_trace, user_message, assistant_text)
T2 (LAZY): old claude_code backlog (older than recency_days)
Comment on lines +4 to +6
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Test docstring overstates T0 criteria.

The header says T0 includes decision/mistake types, but the classifier/test API has no memory_type input. Please align the docstring to the actual contract.

Suggested fix
-T0 (IMMEDIATE): manual/digest sources and high-signal types (decision, mistake)
+T0 (IMMEDIATE): manual/digest sources
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_enrichment_tiers.py` around lines 4 - 6, The docstring header "T0
(IMMEDIATE): manual/digest sources and high-signal types (decision, mistake)"
overstates T0 by referencing memory_type values not accepted by the
classifier/test API; update the docstring in tests/test_enrichment_tiers.py (the
header line starting with "T0 (IMMEDIATE):") to remove the parenthetical
"decision, mistake" or replace it with wording that matches the actual contract
(e.g., "manual/digest sources and high-signal types" or "manual/digest sources
where classifier does not accept memory_type"), ensuring the test description
matches the classifier/test API inputs.

T3 (EXPLICIT): youtube transcripts — only when explicitly requested
"""

from datetime import datetime, timedelta, timezone

from brainlayer.pipeline.enrichment_tiers import (
EnrichmentTier,
classify_chunk_tier,
get_tier_content_types,
get_tier_source_filter,
)

# ── Helpers ─────────────────────────────────────────────────────────────


def _dt(days_ago: int) -> str:
"""Return ISO timestamp N days ago in UTC."""
return (datetime.now(timezone.utc) - timedelta(days=days_ago)).isoformat()


RECENT = _dt(3)
OLD = _dt(30)
VERY_OLD = _dt(365)

# ── T0: IMMEDIATE — manual / digest / high-signal memory types ───────────


def test_manual_source_is_tier0():
"""Manually stored chunks (brain_store) must always be tier 0."""
tier = classify_chunk_tier(source="manual", content_type="assistant_text", created_at=RECENT)
assert tier == EnrichmentTier.T0_IMMEDIATE


def test_digest_source_is_tier0():
"""Digested documents must always be tier 0."""
tier = classify_chunk_tier(source="digest", content_type="assistant_text", created_at=RECENT)
assert tier == EnrichmentTier.T0_IMMEDIATE


def test_manual_source_is_tier0_regardless_of_age():
"""Manual chunks should be T0 even if they are old."""
tier = classify_chunk_tier(source="manual", content_type="assistant_text", created_at=VERY_OLD)
assert tier == EnrichmentTier.T0_IMMEDIATE


def test_digest_source_is_tier0_regardless_of_age():
"""Digest chunks should be T0 even if they are old."""
tier = classify_chunk_tier(source="digest", content_type="ai_code", created_at=VERY_OLD)
assert tier == EnrichmentTier.T0_IMMEDIATE


# ── T1: HOURLY — recent claude_code ──────────────────────────────────────


def test_recent_claude_code_ai_code_is_tier1():
"""Recent ai_code from claude_code sessions is tier 1."""
tier = classify_chunk_tier(source="claude_code", content_type="ai_code", created_at=RECENT)
assert tier == EnrichmentTier.T1_HOURLY


def test_recent_claude_code_stack_trace_is_tier1():
"""Recent stack_trace from claude_code sessions is tier 1."""
tier = classify_chunk_tier(source="claude_code", content_type="stack_trace", created_at=RECENT)
assert tier == EnrichmentTier.T1_HOURLY


def test_recent_claude_code_user_message_is_tier1():
"""Recent user_message from claude_code sessions is tier 1."""
tier = classify_chunk_tier(source="claude_code", content_type="user_message", created_at=RECENT)
assert tier == EnrichmentTier.T1_HOURLY


def test_recent_claude_code_assistant_text_is_tier1():
"""Recent assistant_text from claude_code sessions is tier 1."""
tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=RECENT)
assert tier == EnrichmentTier.T1_HOURLY


def test_today_claude_code_is_tier1():
"""Chunks from today (0 days ago) are recent."""
today = _dt(0)
tier = classify_chunk_tier(source="claude_code", content_type="ai_code", created_at=today)
assert tier == EnrichmentTier.T1_HOURLY


def test_within_recency_window_is_tier1():
"""Chunks well within the recency window (6 days) are T1.

The boundary (7 days) is not tested at microsecond precision here because
classify_chunk_tier evaluates 'now' slightly after the timestamp is captured.
test_just_past_recency_window_is_tier2 covers the other side at 8 days.
"""
at_boundary = _dt(6)
tier = classify_chunk_tier(source="claude_code", content_type="ai_code", created_at=at_boundary)
assert tier == EnrichmentTier.T1_HOURLY


# ── T2: LAZY — old claude_code backlog ───────────────────────────────────


def test_old_claude_code_assistant_text_is_tier2():
"""Old assistant_text backlog (>7 days) from claude_code is tier 2."""
tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=OLD)
assert tier == EnrichmentTier.T2_LAZY


def test_old_claude_code_user_message_is_tier2():
"""Old user_message backlog from claude_code is tier 2."""
tier = classify_chunk_tier(source="claude_code", content_type="user_message", created_at=OLD)
assert tier == EnrichmentTier.T2_LAZY


def test_very_old_claude_code_is_tier2():
"""Very old claude_code chunks (>1 year) are still only T2, not T3."""
tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=VERY_OLD)
assert tier == EnrichmentTier.T2_LAZY


def test_just_past_recency_window_is_tier2():
"""Chunks just past the recency window (8 days) are T2."""
just_past = _dt(8)
tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=just_past)
assert tier == EnrichmentTier.T2_LAZY


# ── T3: EXPLICIT — youtube transcripts ──────────────────────────────────


def test_youtube_source_is_tier3():
"""YouTube transcript chunks are always tier 3 regardless of content type or age."""
tier = classify_chunk_tier(source="youtube", content_type="assistant_text", created_at=RECENT)
assert tier == EnrichmentTier.T3_EXPLICIT


def test_youtube_source_old_is_still_tier3():
"""Old YouTube chunks stay T3."""
tier = classify_chunk_tier(source="youtube", content_type="assistant_text", created_at=VERY_OLD)
assert tier == EnrichmentTier.T3_EXPLICIT


def test_unknown_source_defaults_to_tier2():
"""Unknown/unrecognized source defaults to T2 (lazy backlog), regardless of age.

Only claude_code participates in T1 (recency-gated). Unrecognised sources
are treated as lazy backlog to prevent them from crowding hourly enrichment.
"""
tier = classify_chunk_tier(source="unknown", content_type="assistant_text", created_at=RECENT)
assert tier == EnrichmentTier.T2_LAZY


def test_unknown_source_old_is_tier2():
"""Old unknown source is also T2 (same as recent unknown)."""
tier = classify_chunk_tier(source="unknown", content_type="assistant_text", created_at=OLD)
assert tier == EnrichmentTier.T2_LAZY


# ── Noise content type — never enrich ────────────────────────────────────


def test_noise_content_type_is_tier3_explicit():
"""Noise chunks should never be enriched by default (T3 = explicit only)."""
tier = classify_chunk_tier(source="claude_code", content_type="noise", created_at=RECENT)
assert tier == EnrichmentTier.T3_EXPLICIT


def test_noise_content_type_overrides_t0_source():
"""Even a T0 source should yield T3 if content_type is noise."""
tier = classify_chunk_tier(source="manual", content_type="noise", created_at=RECENT)
assert tier == EnrichmentTier.T3_EXPLICIT


# ── None / missing created_at ────────────────────────────────────────────


def test_none_created_at_defaults_to_tier2_for_claude_code():
"""Chunks with no created_at timestamp are treated as old (T2 for claude_code)."""
tier = classify_chunk_tier(source="claude_code", content_type="assistant_text", created_at=None)
assert tier == EnrichmentTier.T2_LAZY


# ── get_tier_content_types ───────────────────────────────────────────────


def test_get_tier_content_types_returns_high_value_for_t1():
"""T1 content types should include the core high-value types."""
types = get_tier_content_types(EnrichmentTier.T1_HOURLY)
assert "ai_code" in types
assert "stack_trace" in types
assert "user_message" in types
assert "assistant_text" in types


# ── get_tier_source_filter ───────────────────────────────────────────────


def test_get_tier_source_filter_t1_excludes_youtube():
"""T1 source filter must exclude youtube so it's not processed hourly."""
sources = get_tier_source_filter(EnrichmentTier.T1_HOURLY)
assert "youtube" not in sources


def test_get_tier_source_filter_t0_includes_manual_and_digest():
"""T0 source filter must include manual and digest sources."""
sources = get_tier_source_filter(EnrichmentTier.T0_IMMEDIATE)
assert "manual" in sources
assert "digest" in sources


def test_get_tier_source_filter_t3_only_youtube():
"""T3 source filter should only include youtube."""
sources = get_tier_source_filter(EnrichmentTier.T3_EXPLICIT)
assert sources == {"youtube"}


def test_tier_ordering():
"""Lower tier numbers should be higher priority (T0 < T1 < T2 < T3)."""
assert EnrichmentTier.T0_IMMEDIATE < EnrichmentTier.T1_HOURLY
assert EnrichmentTier.T1_HOURLY < EnrichmentTier.T2_LAZY
assert EnrichmentTier.T2_LAZY < EnrichmentTier.T3_EXPLICIT
Loading