diff --git a/tests/test_normalize_project.py b/tests/test_normalize_project.py new file mode 100644 index 00000000..df0df3d1 --- /dev/null +++ b/tests/test_normalize_project.py @@ -0,0 +1,61 @@ +"""Tests for project name normalization in MCP server.""" + +import os + +from brainlayer.mcp import normalize_project_name + + +class TestNormalizeProjectName: + """Test Claude Code path encoding → clean project name.""" + + def test_none_returns_none(self): + assert normalize_project_name(None) is None + + def test_empty_returns_none(self): + assert normalize_project_name("") is None + assert normalize_project_name(" ") is None + assert normalize_project_name("-") is None + + def test_simple_name_passthrough(self): + """Already-clean names pass through unchanged.""" + assert normalize_project_name("golems") == "golems" + + def test_claude_code_encoded_path(self): + """Standard Claude Code path encoding decodes correctly.""" + # This test requires the actual filesystem to have ~/Gits/golems + result = normalize_project_name("-Users-etanheyman-Gits-golems") + assert result == "golems" + + def test_desktop_gits_path(self): + """Old Desktop/Gits paths decode correctly.""" + # Old path — directory may not exist, but first segment is returned + result = normalize_project_name("-Users-etanheyman-Desktop-Gits-golems") + # Either finds the dir or falls back to first segment + assert result is not None + + def test_compound_name_with_dashes(self): + """Compound project names with dashes resolve via filesystem check.""" + # Only works if the directory actually exists + home = os.path.expanduser("~") + gits_dir = os.path.join(home, "Gits") + if os.path.isdir(gits_dir): + for entry in os.listdir(gits_dir): + if "-" in entry and os.path.isdir(os.path.join(gits_dir, entry)): + # Test that this compound name resolves correctly + encoded = f"-Users-{os.path.basename(home)}-Gits-{entry.replace('-', '-')}" + # The encoding is just dashes, same as the name + # Just verify it doesn't crash + result = normalize_project_name(encoded) + assert result is not None + break + + def test_worktree_suffix_stripped(self): + """Worktree suffixes are stripped.""" + assert normalize_project_name("golems-nightshift-1770775282043") == "golems" + assert normalize_project_name("golems-haiku-1770775282043") == "golems" + assert normalize_project_name("golems-worktree-1770775282043") == "golems" + + def test_no_gits_segment_returns_none(self): + """Paths without 'Gits' segment return None.""" + result = normalize_project_name("-Users-etanheyman-Documents-stuff") + assert result is None diff --git a/tests/test_paths.py b/tests/test_paths.py new file mode 100644 index 00000000..c49f02de --- /dev/null +++ b/tests/test_paths.py @@ -0,0 +1,50 @@ +"""Tests for brainlayer.paths — DB path resolution.""" + +import os +from unittest.mock import patch + +from brainlayer.paths import get_db_path + + +class TestGetDbPath: + """Test DB path resolution order.""" + + def test_env_var_override(self, tmp_path): + """BRAINLAYER_DB env var takes highest priority.""" + db_path = tmp_path / "custom.db" + with patch.dict(os.environ, {"BRAINLAYER_DB": str(db_path)}): + assert get_db_path() == db_path + + def test_legacy_path_if_exists(self, tmp_path): + """Legacy zikaron path used when it exists.""" + legacy = tmp_path / "zikaron.db" + legacy.touch() + with ( + patch.dict(os.environ, {}, clear=True), + patch("brainlayer.paths._LEGACY_DB_PATH", legacy), + patch("brainlayer.paths._CANONICAL_DB_PATH", tmp_path / "brainlayer.db"), + ): + # Remove env var if set + os.environ.pop("BRAINLAYER_DB", None) + assert get_db_path() == legacy + + def test_canonical_path_fresh_install(self, tmp_path): + """Canonical path used when no legacy DB exists.""" + canonical = tmp_path / "brainlayer" / "brainlayer.db" + legacy = tmp_path / "nonexistent" / "zikaron.db" + with ( + patch.dict(os.environ, {}, clear=True), + patch("brainlayer.paths._LEGACY_DB_PATH", legacy), + patch("brainlayer.paths._CANONICAL_DB_PATH", canonical), + ): + os.environ.pop("BRAINLAYER_DB", None) + result = get_db_path() + assert result == canonical + assert canonical.parent.exists() # Parent dir created + + def test_real_db_exists(self): + """The real production DB exists at the resolved path.""" + from brainlayer.paths import DEFAULT_DB_PATH + + assert DEFAULT_DB_PATH.exists(), f"DB not found at {DEFAULT_DB_PATH}" + assert DEFAULT_DB_PATH.stat().st_size > 1_000_000, "DB too small — might be empty" diff --git a/tests/test_phase3_qa.py b/tests/test_phase3_qa.py new file mode 100644 index 00000000..4663ef95 --- /dev/null +++ b/tests/test_phase3_qa.py @@ -0,0 +1,399 @@ +"""Phase 4 QA: Test Phase 3 core fixes — date filtering, project normalization, metadata. + +Tests cover: +1. paths.py — DB path resolution logic +2. Date filtering — date_from/date_to in search and hybrid_search +3. Project name normalization — Claude Code paths, worktrees, clean names +4. Search metadata — created_at and source in results +5. Backfill coverage — all chunks should have created_at +""" + +import os +from pathlib import Path +from unittest.mock import patch + +import pytest + +from brainlayer.mcp import normalize_project_name +from brainlayer.paths import get_db_path +from brainlayer.vector_store import VectorStore + +# ============================================================================ +# 1. DB Path Resolution +# ============================================================================ + + +class TestDBPathResolution: + """Test paths.py resolves database location correctly.""" + + def test_env_var_override(self, tmp_path): + """BRAINLAYER_DB env var should take priority over everything.""" + custom = str(tmp_path / "custom.db") + with patch.dict(os.environ, {"BRAINLAYER_DB": custom}): + assert get_db_path() == Path(custom) + + def test_legacy_path_preferred_over_canonical(self, tmp_path): + """Legacy zikaron path should be used if it exists.""" + with patch.dict(os.environ, {}, clear=True): + # Remove env var if set + os.environ.pop("BRAINLAYER_DB", None) + with patch("brainlayer.paths._LEGACY_DB_PATH", tmp_path / "legacy.db"): + # Create the file so .exists() returns True + (tmp_path / "legacy.db").touch() + result = get_db_path() + assert result == tmp_path / "legacy.db" + + def test_canonical_path_when_legacy_missing(self, tmp_path): + """Canonical path should be used for fresh installs.""" + with patch.dict(os.environ, {}, clear=True): + os.environ.pop("BRAINLAYER_DB", None) + with patch("brainlayer.paths._LEGACY_DB_PATH", tmp_path / "nonexistent.db"): + with patch("brainlayer.paths._CANONICAL_DB_PATH", tmp_path / "canonical.db"): + result = get_db_path() + assert result == tmp_path / "canonical.db" + + +# ============================================================================ +# 2. Date Filtering +# ============================================================================ + + +@pytest.fixture +def store_with_dates(tmp_path): + """Store with chunks that have different created_at dates.""" + db_path = tmp_path / "test_dates.db" + store = VectorStore(db_path) + + chunks = [ + { + "id": "old-chunk", + "content": "Authentication using JWT tokens for the API", + "metadata": {"role": "assistant"}, + "source_file": "/session/old.jsonl", + "project": "my-project", + "content_type": "assistant_text", + "char_count": 50, + "source": "claude_code", + "created_at": "2026-01-15T10:00:00+00:00", + }, + { + "id": "mid-chunk", + "content": "Database migration strategy for PostgreSQL upgrade", + "metadata": {"role": "assistant"}, + "source_file": "/session/mid.jsonl", + "project": "my-project", + "content_type": "assistant_text", + "char_count": 55, + "source": "claude_code", + "created_at": "2026-02-01T10:00:00+00:00", + }, + { + "id": "new-chunk", + "content": "Deploy React app to Vercel with environment variables", + "metadata": {"role": "assistant"}, + "source_file": "/session/new.jsonl", + "project": "my-project", + "content_type": "assistant_text", + "char_count": 55, + "source": "claude_code", + "created_at": "2026-02-15T10:00:00+00:00", + }, + ] + embeddings = [[float(i) / 1024] * 1024 for i in range(3)] + store.upsert_chunks(chunks, embeddings) + return store + + +class TestDateFiltering: + """Test date_from/date_to search parameters.""" + + def test_search_date_from(self, store_with_dates): + """date_from should exclude chunks before the date.""" + query_embedding = [0.001] * 1024 + results = store_with_dates.search( + query_embedding=query_embedding, + n_results=10, + date_from="2026-02-01", + ) + ids = results["ids"][0] + assert "old-chunk" not in ids + assert "mid-chunk" in ids + assert "new-chunk" in ids + + def test_search_date_to(self, store_with_dates): + """date_to should exclude chunks after the date (exclusive comparison).""" + query_embedding = [0.001] * 1024 + # date_to uses string comparison: "2026-02-01T10:00:00" > "2026-02-01" + # So date_to="2026-02-02" includes Feb 1 but excludes Feb 15 + results = store_with_dates.search( + query_embedding=query_embedding, + n_results=10, + date_to="2026-02-02", + ) + ids = results["ids"][0] + assert "old-chunk" in ids + assert "mid-chunk" in ids + assert "new-chunk" not in ids + + def test_search_date_range(self, store_with_dates): + """date_from + date_to should filter to a specific range.""" + query_embedding = [0.001] * 1024 + results = store_with_dates.search( + query_embedding=query_embedding, + n_results=10, + date_from="2026-01-20", + date_to="2026-02-10", + ) + ids = results["ids"][0] + assert "old-chunk" not in ids + assert "mid-chunk" in ids + assert "new-chunk" not in ids + + def test_search_no_date_filter_returns_all(self, store_with_dates): + """No date filter should return all chunks.""" + query_embedding = [0.001] * 1024 + results = store_with_dates.search( + query_embedding=query_embedding, + n_results=10, + ) + assert len(results["ids"][0]) == 3 + + def test_hybrid_search_date_from(self, store_with_dates): + """Hybrid search should also respect date_from.""" + query_embedding = [0.001] * 1024 + results = store_with_dates.hybrid_search( + query_embedding=query_embedding, + query_text="authentication JWT", + n_results=10, + date_from="2026-02-01", + ) + # Old chunk about auth/JWT should be excluded by date + ids = results["ids"][0] + assert "old-chunk" not in ids + + def test_hybrid_search_date_to(self, store_with_dates): + """Hybrid search should also respect date_to.""" + query_embedding = [0.001] * 1024 + results = store_with_dates.hybrid_search( + query_embedding=query_embedding, + query_text="deploy React", + n_results=10, + date_to="2026-02-10", + ) + ids = results["ids"][0] + assert "new-chunk" not in ids + + +# ============================================================================ +# 3. Project Name Normalization +# ============================================================================ + + +class TestProjectNameNormalization: + """Test normalize_project_name handles Claude Code paths correctly.""" + + def test_none_input(self): + assert normalize_project_name(None) is None + + def test_empty_string(self): + assert normalize_project_name("") is None + + def test_dash_only(self): + assert normalize_project_name("-") is None + + def test_whitespace(self): + assert normalize_project_name(" ") is None + + def test_clean_name_passthrough(self): + """Already-clean project names should pass through.""" + assert normalize_project_name("golems") == "golems" + + def test_worktree_suffix_stripped(self): + """Worktree suffixes should be removed.""" + assert normalize_project_name("golems-nightshift-1770775282043") == "golems" + assert normalize_project_name("golems-haiku-1234567890") == "golems" + assert normalize_project_name("golems-worktree-9999999999") == "golems" + + def test_claude_code_simple_path(self, tmp_path): + """Simple Claude Code path: -Users-name-Gits-projectname.""" + # Create a mock directory structure + gits_dir = tmp_path / "Gits" + gits_dir.mkdir() + (gits_dir / "myproject").mkdir() + + segments = f"-{tmp_path.name}-Gits-myproject" + # We need to mock the path correctly + with patch("os.path.isdir") as mock_isdir: + mock_isdir.side_effect = lambda p: p.endswith("/myproject") + result = normalize_project_name("-Users-etanheyman-Gits-myproject") + # Should try to match "myproject" against filesystem + # Since we mock isdir to return True for myproject, it should return it + assert result == "myproject" + + def test_claude_code_compound_name(self): + """Compound names like rudy-monorepo should try filesystem lookup.""" + # When os.path.isdir returns True for "rudy-monorepo", use the compound name + with patch("os.path.isdir") as mock_isdir: + mock_isdir.side_effect = lambda p: "rudy-monorepo" in p + result = normalize_project_name("-Users-etanheyman-Gits-rudy-monorepo") + assert result == "rudy-monorepo" + + def test_claude_code_compound_name_fallback(self): + """When no directory matches, fall back to first segment.""" + with patch("os.path.isdir", return_value=False): + result = normalize_project_name("-Users-etanheyman-Gits-rudy-monorepo") + assert result == "rudy" + + def test_claude_code_desktop_gits(self): + """Old-style Desktop/Gits path should work too.""" + with patch("os.path.isdir") as mock_isdir: + mock_isdir.side_effect = lambda p: "golems" in p + result = normalize_project_name("-Users-etanheyman-Desktop-Gits-golems") + assert result == "golems" + + def test_no_gits_segment(self): + """Paths without 'Gits' should return None.""" + result = normalize_project_name("-Users-etanheyman-Documents-stuff") + assert result is None + + +# ============================================================================ +# 4. Search Metadata +# ============================================================================ + + +class TestSearchMetadata: + """Test that search results include created_at and source.""" + + def test_search_results_include_created_at(self, store_with_dates): + """Search results should include created_at in metadata.""" + query_embedding = [0.001] * 1024 + results = store_with_dates.search( + query_embedding=query_embedding, + n_results=3, + ) + for meta in results["metadatas"][0]: + assert "created_at" in meta, f"Missing created_at in {meta}" + + def test_search_results_include_source(self, store_with_dates): + """Search results should include source in metadata.""" + query_embedding = [0.001] * 1024 + results = store_with_dates.search( + query_embedding=query_embedding, + n_results=3, + ) + for meta in results["metadatas"][0]: + assert "source" in meta, f"Missing source in {meta}" + assert meta["source"] == "claude_code" + + def test_hybrid_search_results_include_metadata(self, store_with_dates): + """Hybrid search results should also include created_at and source.""" + query_embedding = [0.001] * 1024 + results = store_with_dates.hybrid_search( + query_embedding=query_embedding, + query_text="database", + n_results=3, + ) + for meta in results["metadatas"][0]: + assert "created_at" in meta + assert "source" in meta + + +# ============================================================================ +# 5. Chunk Boundary Improvements +# ============================================================================ + + +class TestChunkBoundaries: + """Test sentence-aware chunking from Phase 3.""" + + def test_sentence_splitting(self): + """Chunks should split at sentence boundaries, not mid-sentence.""" + from brainlayer.pipeline.chunk import _split_at_sentences + + text = "First sentence here. Second sentence here. Third sentence here. Fourth sentence is longer and has more words." + chunks = _split_at_sentences(text, target_size=50) + # Each chunk should end at a sentence boundary (or be the last chunk) + for chunk in chunks[:-1]: + assert chunk.rstrip().endswith((".")) or chunk.rstrip().endswith(("!")) or chunk.rstrip().endswith(("?")), ( + f"Chunk doesn't end at sentence boundary: '{chunk}'" + ) + + def test_short_text_not_split(self): + """Text shorter than target should not be split.""" + from brainlayer.pipeline.chunk import _split_at_sentences + + text = "Short text." + chunks = _split_at_sentences(text, target_size=2000) + assert len(chunks) == 1 + assert chunks[0] == text + + def test_long_paragraph_splits_at_sentences(self): + """chunk_content should split long paragraphs at sentence boundaries.""" + from brainlayer.pipeline.chunk import chunk_content + from brainlayer.pipeline.classify import ClassifiedContent, ContentType, ContentValue + + # Create a long paragraph that exceeds TARGET_CHUNK_SIZE (2000 chars) + sentences = [f"This is sentence number {i} with some padding words to make it longer." for i in range(60)] + long_text = " ".join(sentences) # ~3000 chars + + classified = ClassifiedContent( + content=long_text, + content_type=ContentType.ASSISTANT_TEXT, + value=ContentValue.HIGH, + metadata={"role": "assistant"}, + ) + chunks = chunk_content(classified) + assert len(chunks) > 1, "Long text should be split into multiple chunks" + + +# ============================================================================ +# 6. Created_at in upsert_chunks +# ============================================================================ + + +class TestCreatedAtUpsert: + """Test that created_at is properly stored during chunk insertion.""" + + def test_created_at_stored(self, tmp_path): + """Chunks with created_at should have it stored in the DB.""" + store = VectorStore(tmp_path / "test.db") + chunks = [ + { + "id": "dated-chunk", + "content": "Test content with a date", + "metadata": {}, + "source_file": "/test.jsonl", + "project": "test", + "content_type": "user_message", + "char_count": 30, + "source": "claude_code", + "created_at": "2026-02-19T10:00:00+00:00", + } + ] + store.upsert_chunks(chunks, [[0.1] * 1024]) + + cursor = store.conn.cursor() + row = list(cursor.execute("SELECT created_at FROM chunks WHERE id = 'dated-chunk'")) + assert row[0][0] == "2026-02-19T10:00:00+00:00" + + def test_created_at_null_when_missing(self, tmp_path): + """Chunks without created_at should have NULL in the DB.""" + store = VectorStore(tmp_path / "test.db") + chunks = [ + { + "id": "undated-chunk", + "content": "Test content without a date", + "metadata": {}, + "source_file": "/test.jsonl", + "project": "test", + "content_type": "user_message", + "char_count": 30, + "source": "claude_code", + } + ] + store.upsert_chunks(chunks, [[0.1] * 1024]) + + cursor = store.conn.cursor() + row = list(cursor.execute("SELECT created_at FROM chunks WHERE id = 'undated-chunk'")) + assert row[0][0] is None diff --git a/tests/test_vector_store.py b/tests/test_vector_store.py index e68bd325..f5c6c4e1 100644 --- a/tests/test_vector_store.py +++ b/tests/test_vector_store.py @@ -1,143 +1,146 @@ -"""Test vector store functionality (FTS5, hybrid search, context view).""" +"""Tests for vector_store.py — date filtering, search metadata, schema.""" import pytest -from brainlayer.vector_store import VectorStore, serialize_f32 - - -@pytest.fixture -def store(tmp_path): - """Create a temporary vector store.""" - db_path = tmp_path / "test.db" - return VectorStore(db_path) - - -@pytest.fixture -def populated_store(store): - """Store with sample data for testing.""" - chunks = [ - { - "id": "chunk-1", - "content": "How to implement OTP authentication in Python", - "metadata": {"role": "user"}, - "source_file": "/session/conv1.jsonl", - "project": "my-project", - "content_type": "user_message", - "char_count": 50, - "source": "claude_code", - }, - { - "id": "chunk-2", - "content": "Here is the OTP implementation using pyotp library", - "metadata": {"role": "assistant"}, - "source_file": "/session/conv1.jsonl", - "project": "my-project", - "content_type": "ai_code", - "char_count": 55, - "source": "claude_code", - }, - { - "id": "chunk-3", - "content": "React useEffect cleanup function for websockets", - "metadata": {"role": "user"}, - "source_file": "/session/conv2.jsonl", - "project": "app-a", - "content_type": "user_message", - "char_count": 50, - "source": "claude_code", - }, - ] - # Use 1024-dim fake embeddings - embeddings = [[float(i) / 1024] * 1024 for i in range(3)] - store.upsert_chunks(chunks, embeddings) - return store - - -def test_fts5_table_created(store): - """FTS5 virtual table should exist after init.""" - cursor = store.conn.cursor() - tables = [row[0] for row in cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")] - assert "chunks_fts" in tables - - -def test_fts5_auto_populated_on_insert(populated_store): - """FTS5 should be populated via triggers when chunks are inserted.""" - cursor = populated_store.conn.cursor() - fts_count = list(cursor.execute("SELECT COUNT(*) FROM chunks_fts"))[0][0] - chunk_count = list(cursor.execute("SELECT COUNT(*) FROM chunks"))[0][0] - assert fts_count == chunk_count - assert fts_count == 3 - - -def test_fts5_keyword_search(populated_store): - """FTS5 should find exact keyword matches.""" - cursor = populated_store.conn.cursor() - results = list(cursor.execute("SELECT chunk_id FROM chunks_fts WHERE chunks_fts MATCH 'OTP' ORDER BY rank")) - # Both chunk-1 and chunk-2 contain "OTP" - ids = [r[0] for r in results] - assert "chunk-1" in ids - assert "chunk-2" in ids - assert "chunk-3" not in ids - - -def test_hybrid_search_returns_results(populated_store): - """Hybrid search should return results combining semantic + keyword.""" - query_embedding = [0.001] * 1024 - results = populated_store.hybrid_search( - query_embedding=query_embedding, - query_text="OTP", - n_results=5, - ) - assert "documents" in results - assert "metadatas" in results - assert "distances" in results - assert len(results["documents"][0]) > 0 - - -def test_hybrid_search_respects_project_filter(populated_store): - """Hybrid search should filter by project.""" - query_embedding = [0.001] * 1024 - results = populated_store.hybrid_search( - query_embedding=query_embedding, - query_text="implementation", - n_results=5, - project_filter="app-a", - ) - for meta in results["metadatas"][0]: - assert meta.get("project") == "app-a" - - -def test_get_context_with_conversation(populated_store): - """Context view should return surrounding chunks.""" - cursor = populated_store.conn.cursor() - # Manually set conversation_id and position - cursor.execute("UPDATE chunks SET conversation_id = '/session/conv1.jsonl', position = 0 WHERE id = 'chunk-1'") - cursor.execute("UPDATE chunks SET conversation_id = '/session/conv1.jsonl', position = 1 WHERE id = 'chunk-2'") - - result = populated_store.get_context("chunk-1", before=0, after=5) - assert result["target"] is not None - assert result["target"]["id"] == "chunk-1" - assert len(result["context"]) == 2 # chunk-1 and chunk-2 - # chunk-1 should be marked as target - target_chunks = [c for c in result["context"] if c.get("is_target")] - assert len(target_chunks) == 1 - - -def test_get_context_missing_chunk(populated_store): - """Context view should handle missing chunk gracefully.""" - result = populated_store.get_context("nonexistent-id") - assert result.get("error") == "Chunk not found" - - -def test_get_context_no_conversation_id(populated_store): - """Context view should handle chunks without conversation_id.""" - result = populated_store.get_context("chunk-3") - assert "error" in result - assert "no conversation context" in result["error"].lower() - - -def test_serialize_f32(): - """serialize_f32 should produce correct byte length.""" - vec = [1.0, 2.0, 3.0] - data = serialize_f32(vec) - assert len(data) == 12 # 3 floats * 4 bytes +from brainlayer.paths import DEFAULT_DB_PATH +from brainlayer.vector_store import VectorStore + + +@pytest.fixture(scope="module") +def store(): + """Read-only connection to the production DB for integration tests.""" + s = VectorStore(DEFAULT_DB_PATH) + yield s + s.close() + + +class TestSchema: + """Verify the DB schema has required columns.""" + + def test_created_at_column_exists(self, store): + """created_at column exists in chunks table.""" + cursor = store.conn.cursor() + cols = list(cursor.execute("PRAGMA table_info(chunks)")) + col_names = [c[1] for c in cols] + assert "created_at" in col_names + + def test_created_at_coverage(self, store): + """All chunks should have created_at (from backfill).""" + cursor = store.conn.cursor() + total = list(cursor.execute("SELECT COUNT(*) FROM chunks"))[0][0] + with_date = list(cursor.execute("SELECT COUNT(*) FROM chunks WHERE created_at IS NOT NULL"))[0][0] + coverage = with_date / total if total > 0 else 0 + assert coverage >= 0.99, f"Only {coverage:.1%} of chunks have created_at (expected 100%)" + + def test_source_column_exists(self, store): + """source column exists in chunks table.""" + cursor = store.conn.cursor() + cols = list(cursor.execute("PRAGMA table_info(chunks)")) + col_names = [c[1] for c in cols] + assert "source" in col_names + + +class TestDateFiltering: + """Test date filtering in search queries.""" + + def test_search_with_date_from(self, store): + """Search with date_from filter returns only recent results.""" + from brainlayer.embeddings import get_embedding_model + + model = get_embedding_model() + query_emb = model.embed_query("test query") + + results = store.hybrid_search( + query_embedding=query_emb, + query_text="test query", + n_results=5, + date_from="2026-02-15", + ) + docs = results["documents"][0] + # Should return results (we have data from Feb 2026) + # The key test is that it doesn't crash + assert isinstance(docs, list) + + def test_search_with_date_to(self, store): + """Search with date_to filter works.""" + from brainlayer.embeddings import get_embedding_model + + model = get_embedding_model() + query_emb = model.embed_query("test query") + + results = store.hybrid_search( + query_embedding=query_emb, + query_text="test query", + n_results=5, + date_to="2026-01-01", + ) + # Should not crash, may return empty if no old data + assert isinstance(results["documents"][0], list) + + def test_search_with_date_range(self, store): + """Search with both date_from and date_to works.""" + from brainlayer.embeddings import get_embedding_model + + model = get_embedding_model() + query_emb = model.embed_query("authentication") + + results = store.hybrid_search( + query_embedding=query_emb, + query_text="authentication", + n_results=5, + date_from="2026-02-01", + date_to="2026-02-28", + ) + assert isinstance(results["documents"][0], list) + + +class TestSearchMetadata: + """Test that search results include proper metadata.""" + + def test_results_have_created_at(self, store): + """Search results include created_at in metadata.""" + from brainlayer.embeddings import get_embedding_model + + model = get_embedding_model() + query_emb = model.embed_query("function implementation") + + results = store.hybrid_search( + query_embedding=query_emb, + query_text="function implementation", + n_results=3, + ) + if results["documents"][0]: + meta = results["metadatas"][0][0] + assert "created_at" in meta, "Search results should include created_at" + + def test_results_have_source(self, store): + """Search results include source in metadata.""" + from brainlayer.embeddings import get_embedding_model + + model = get_embedding_model() + query_emb = model.embed_query("function implementation") + + results = store.hybrid_search( + query_embedding=query_emb, + query_text="function implementation", + n_results=3, + ) + if results["documents"][0]: + meta = results["metadatas"][0][0] + assert "source" in meta or "source_file" in meta, "Results should include source info" + + +class TestStats: + """Test get_stats returns expected data.""" + + def test_stats_have_projects(self, store): + stats = store.get_stats() + assert len(stats["projects"]) > 0 + + def test_stats_have_content_types(self, store): + stats = store.get_stats() + assert len(stats["content_types"]) > 0 + + def test_stats_total_chunks(self, store): + stats = store.get_stats() + assert stats["total_chunks"] > 200_000 # We have 268K+