diff --git a/super-legal-mcp-refactored/migrations/023_kg-edges-composite-idx.js b/super-legal-mcp-refactored/migrations/023_kg-edges-composite-idx.js new file mode 100644 index 000000000..cf7025567 --- /dev/null +++ b/super-legal-mcp-refactored/migrations/023_kg-edges-composite-idx.js @@ -0,0 +1,41 @@ +/** + * 023 — kg_edges composite indexes (session_id, source_id) / (session_id, target_id). + * + * Speeds the GET /api/db/sessions/:sessionKey/kg/raw-sources/:nodeId fallback's + * one-hop neighbor CTE, which scans kg_edges by (session-scoped) source_id OR + * target_id. The pre-existing single-column indexes + (session_id, edge_type) + * don't cover the (session_id, source_id|target_id) access pattern. + * + * CONCURRENTLY: non-locking build, safe to apply to a large live kg_edges + * during deploy. Requires running outside a transaction — hence pgm.noTransaction(). + * The boot path (ensureKnowledgeGraphSchema in src/db/postgres.js) creates the + * same indexes plainly; that runs pre-listen so locking is irrelevant there. + */ + +export const up = (pgm) => { + pgm.noTransaction(); // CREATE INDEX CONCURRENTLY cannot run inside a transaction block + pgm.createIndex('kg_edges', ['session_id', 'source_id'], { + name: 'idx_kg_edges_session_source', + ifNotExists: true, + concurrently: true, + }); + pgm.createIndex('kg_edges', ['session_id', 'target_id'], { + name: 'idx_kg_edges_session_target', + ifNotExists: true, + concurrently: true, + }); +}; + +export const down = (pgm) => { + pgm.noTransaction(); + pgm.dropIndex('kg_edges', ['session_id', 'source_id'], { + name: 'idx_kg_edges_session_source', + ifExists: true, + concurrently: true, + }); + pgm.dropIndex('kg_edges', ['session_id', 'target_id'], { + name: 'idx_kg_edges_session_target', + ifExists: true, + concurrently: true, + }); +}; diff --git a/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.down.sql b/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.down.sql new file mode 100644 index 000000000..63396aa73 --- /dev/null +++ b/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.down.sql @@ -0,0 +1,2 @@ +-- 024_sessions-kg-phase4b-repaired.down.sql +ALTER TABLE sessions DROP COLUMN IF EXISTS kg_phase4b_repaired; diff --git a/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.up.sql b/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.up.sql new file mode 100644 index 000000000..68ebb219d --- /dev/null +++ b/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.up.sql @@ -0,0 +1,8 @@ +-- 024_sessions-kg-phase4b-repaired.up.sql +-- One-shot marker for the reconciliation Phase 4b source-provenance repair +-- (embedding / KG-build ordering-race safety-net). Set true after a repair +-- attempt so a session that legitimately yields zero matches is not +-- re-attempted on every scan. Mirrors the column added to +-- ensureKnowledgeGraphSchema()'s sessions ALTERs in src/db/postgres.js. + +ALTER TABLE sessions ADD COLUMN IF NOT EXISTS kg_phase4b_repaired BOOLEAN DEFAULT FALSE; diff --git a/super-legal-mcp-refactored/scripts/backfill-source-chunk-embeddings.mjs b/super-legal-mcp-refactored/scripts/backfill-source-chunk-embeddings.mjs new file mode 100644 index 000000000..162969817 --- /dev/null +++ b/super-legal-mcp-refactored/scripts/backfill-source-chunk-embeddings.mjs @@ -0,0 +1,243 @@ +/** + * Backfill source_chunk_embeddings from an on-disk raw-source archive. + * + * Why this exists: the live SourceEmbeddingDispatcher ran as the no-op stub + * for historical sessions (RAW_SOURCE_EMBEDDING did not resolve true at + * runtime), so source_chunk_embeddings is empty even though the raw-source + * archive was written. The /kg/raw-sources route depends on this table via + * Phase 4b provenance (source_hash). This script reproduces the dispatcher's + * process() pipeline (read -> chunk -> embed -> INSERT) offline, against the + * archive already on disk. + * + * Mirrors src/utils/rawSource/SourceEmbeddingDispatcher.js process() exactly: + * - same SourceStorage.read (decompress + integrity check) + * - same SourceChunker.chunk (NUL-stripped — see Workstream A) + * - same embeddingService.embedDocuments (Gemini, 3072-dim, RETRIEVAL_DOCUMENT) + * - same 11-column INSERT + per-source DELETE for idempotency + * + * SCOPE: backfill-only, for HISTORICAL sessions that completed before the live + * fixes (flags.env loading + embedding/Phase-4b race fix). New live sessions + * self-populate source_chunk_embeddings — they do NOT need this script. After + * backfilling, run scripts/run-phase4b.mjs to write the explicit + * kg_provenance.source_hash rows the kg/raw-sources route prefers. + * + * Usage: + * node scripts/backfill-source-chunk-embeddings.mjs [--dry-run] [--concurrency=5] [--force] + * (defaults to Cardinal: 2026-05-22-1779484021) + */ + +import 'dotenv/config'; +import fs from 'fs'; +import readline from 'readline'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); +const REPO_ROOT = path.resolve(__dirname, '..'); + +import { createSourceStorage } from '../src/utils/rawSource/SourceStorage.js'; +import { chunk as chunkSource } from '../src/utils/rawSource/SourceChunker.js'; +import { embedDocuments, initEmbeddingService } from '../src/utils/embeddingService.js'; + +// Mirror the service's source_type derivation (rawSource/index.js). +const SOURCE_TYPE_BY_TOOL = { fetch_document: 'document', exa_web_search: 'exa_result' }; +const SOURCE_TYPE_ENUM = new Set(['document', 'exa_result', 'web_search', 'unknown']); +function inferSourceType(toolName) { + if (!toolName) return 'unknown'; + const t = SOURCE_TYPE_BY_TOOL[toolName] || 'unknown'; + return SOURCE_TYPE_ENUM.has(t) ? t : 'unknown'; +} + +// ── args ────────────────────────────────────────────────────────────── +const args = process.argv.slice(2); +const sessionKey = args.find(a => !a.startsWith('--')) || '2026-05-22-1779484021'; +const dryRun = args.includes('--dry-run'); +const concArg = args.find(a => a.startsWith('--concurrency=')); +const concurrency = concArg ? parseInt(concArg.split('=')[1], 10) : 5; + +const sessionDir = path.join(REPO_ROOT, 'reports', sessionKey); +const poolDir = path.join(sessionDir, 'raw-sources'); +const manifestPath = path.join(sessionDir, 'raw-sources-manifest.ndjson'); + +async function main() { + console.log(`[backfill] session_key=${sessionKey} dryRun=${dryRun} concurrency=${concurrency}`); + + if (!fs.existsSync(manifestPath)) { + console.error(`[backfill] manifest not found: ${manifestPath}`); + process.exit(1); + } + + // 1. Init embedding service (REQUIRED — embedDocuments returns null without it) + await initEmbeddingService(); + // Probe the service is live before doing work + const probe = await embedDocuments(['healthcheck'], ['probe']); + if (!probe || !probe[0] || probe[0].length === 0) { + console.error('[backfill] embedding service not available (GEMINI_API_KEY missing/invalid). Aborting.'); + process.exit(1); + } + console.log(`[backfill] embedding service OK (dim=${probe[0].length})`); + + // 2. DB pool + session UUID + const { Pool } = await import('pg'); + const pgvector = await import('pgvector/pg'); + const pool = new Pool({ connectionString: process.env.PG_CONNECTION_STRING }); + + const sess = await pool.query('SELECT id FROM sessions WHERE session_key = $1 LIMIT 1', [sessionKey]); + const sessionUuid = sess.rows[0]?.id; + if (!sessionUuid) { + console.error(`[backfill] no sessions row for session_key=${sessionKey}. Aborting.`); + await pool.end(); + process.exit(1); + } + console.log(`[backfill] session_id (uuid)=${sessionUuid}`); + + // 3. Read manifest, dedupe by hash (keep first occurrence's metadata) + const byHash = new Map(); + const rl = readline.createInterface({ input: fs.createReadStream(manifestPath) }); + for await (const line of rl) { + if (!line.trim()) continue; + let row; + try { row = JSON.parse(line); } catch { continue; } + if (!row.hash) continue; + if (!byHash.has(row.hash)) byHash.set(row.hash, row); + } + const sources = [...byHash.values()]; + console.log(`[backfill] ${sources.length} unique sources to process`); + + // Resume support: skip sources already embedded for this session (unless --force) + const force = args.includes('--force'); + const existingHashes = new Set(); + if (!force && !dryRun) { + const ex = await pool.query( + 'SELECT DISTINCT source_hash FROM source_chunk_embeddings WHERE session_id = $1', + [sessionUuid], + ); + for (const r of ex.rows) existingHashes.add(r.source_hash); + if (existingHashes.size > 0) console.log(`[backfill] resume: ${existingHashes.size} sources already present, will skip`); + } + + // PostgreSQL text/varchar cannot store NUL (0x00). Strip it from any value + // we persist. The live dispatcher has the same constraint — see findings. + const stripNul = (s) => (typeof s === 'string' ? s.replace(/\0/g, '') : s); + + const storage = createSourceStorage({ poolDir }); + + // 4. Process each source (mirror dispatcher process()) + const stats = { processed: 0, inserted: 0, chunks: 0, skipped: 0, skippedEmpty: 0, readFail: 0, embedFail: 0, insertFail: 0 }; + + async function processSource(row) { + const hash = row.hash; + if (existingHashes.has(hash)) { stats.skipped++; return; } + const ext = row.ext || 'text'; + const toolName = row.tool_name || null; + const sourceType = inferSourceType(toolName); + const url = row.url || null; + + // read + let content; + try { + const body = await storage.read(hash, ext); + content = body.toString('utf-8'); + } catch (err) { + stats.readFail++; + console.warn(`[backfill] read failed ${hash.slice(0, 12)}: ${err.message}`); + return; + } + if (!content || content.trim().length === 0) { stats.skippedEmpty++; return; } + + // chunk + const chunks = chunkSource(content, toolName, url); + if (chunks.length === 0) { stats.skippedEmpty++; return; } + + // embed + const texts = chunks.map(c => c.content); + const titles = chunks.map(c => c.header); + const embeddings = await embedDocuments(texts, titles); + if (!embeddings || embeddings.length !== chunks.length) { + stats.embedFail++; + console.warn(`[backfill] embed mismatch ${hash.slice(0, 12)}: expected ${chunks.length} got ${embeddings?.length}`); + return; + } + + if (dryRun) { + stats.processed++; stats.chunks += chunks.length; + return; + } + + // INSERT (per-source transaction; DELETE first for idempotency) — mirrors dispatcher + const client = await pool.connect(); + try { + await client.query('BEGIN'); + await client.query( + 'DELETE FROM source_chunk_embeddings WHERE source_hash = $1 AND session_id = $2', + [hash, sessionUuid], + ); + const valuePlaceholders = []; + const params = []; + const COLS = 11; + for (let i = 0; i < chunks.length; i++) { + const o = i * COLS; + valuePlaceholders.push( + `($${o + 1}, $${o + 2}, $${o + 3}, $${o + 4}, $${o + 5}, $${o + 6}, $${o + 7}, $${o + 8}, $${o + 9}, $${o + 10}, $${o + 11})`, + ); + params.push( + hash, + sessionUuid, + i, + stripNul(chunks[i].header) || null, + stripNul(chunks[i].content), + sourceType || null, + toolName || null, + pgvector.default.toSql(embeddings[i]), + 'gemini-embedding-2-preview', + 'RETRIEVAL_DOCUMENT', + Math.ceil(chunks[i].content.length / 4), + ); + } + await client.query( + `INSERT INTO source_chunk_embeddings + (source_hash, session_id, chunk_index, chunk_header, chunk_content, + source_type, tool_name, embedding, model, task_type, token_count) + VALUES ${valuePlaceholders.join(', ')}`, + params, + ); + await client.query('COMMIT'); + stats.inserted++; stats.chunks += chunks.length; + } catch (txErr) { + await client.query('ROLLBACK').catch(() => {}); + stats.insertFail++; + console.warn(`[backfill] INSERT failed ${hash.slice(0, 12)}: ${txErr.message}`); + } finally { + client.release(); + } + stats.processed++; + } + + // bounded concurrency + let idx = 0; + async function worker() { + while (idx < sources.length) { + const myIdx = idx++; + await processSource(sources[myIdx]); + if (stats.processed % 25 === 0 && stats.processed > 0) { + console.log(`[backfill] progress: ${stats.processed}/${sources.length} sources, ${stats.chunks} chunks`); + } + } + } + await Promise.all(Array.from({ length: concurrency }, worker)); + + console.log('\n[backfill] DONE'); + console.log(JSON.stringify(stats, null, 2)); + + if (!dryRun) { + const total = await pool.query( + 'SELECT COUNT(*)::int n FROM source_chunk_embeddings WHERE session_id = $1', + [sessionUuid], + ); + console.log(`[backfill] source_chunk_embeddings rows for session: ${total.rows[0].n}`); + } + + await pool.end(); +} + +main().catch(err => { console.error('[backfill] fatal:', err); process.exit(1); }); diff --git a/super-legal-mcp-refactored/scripts/run-phase4b.mjs b/super-legal-mcp-refactored/scripts/run-phase4b.mjs new file mode 100644 index 000000000..7bb2715ac --- /dev/null +++ b/super-legal-mcp-refactored/scripts/run-phase4b.mjs @@ -0,0 +1,68 @@ +/** + * run-phase4b.mjs — standalone Phase 4b (source-evidence) runner for ONE session. + * + * Writes explicit `kg_provenance.source_hash` rows by cosine-matching KG node + * labels against `source_chunk_embeddings`, WITHOUT a full 16-phase KG rebuild + * (a full rebuild would duplicate ~all provenance — `upsertProvenance` has no + * ON CONFLICT). Idempotent: re-running yields the same row count. + * + * Prerequisites: `source_chunk_embeddings` must already be populated for the + * session (live embedding, or scripts/backfill-source-chunk-embeddings.mjs for + * historical sessions). Phase 4b has NO Phase 4c dependency — it embeds labels + * fresh, so it needs only the embedding service + GEMINI_API_KEY/GOOGLE_API_KEY. + * + * Usage: + * node scripts/run-phase4b.mjs + * (e.g., node scripts/run-phase4b.mjs 2026-05-22-1779484021) + */ + +import 'dotenv/config'; +import path from 'path'; +import { fileURLToPath } from 'url'; +import dotenv from 'dotenv'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); +// Ensure flags.env is loaded (GEMINI/GOOGLE key may live in .env; flags here for parity). +dotenv.config({ path: path.join(__dirname, '../flags.env') }); + +import { initEmbeddingService } from '../src/utils/embeddingService.js'; +import { repairSourceProvenance } from '../src/utils/knowledgeGraph/repairSourceProvenance.js'; + +async function main() { + const sessionKey = process.argv.slice(2).find(a => !a.startsWith('--')); + if (!sessionKey) { + console.error('Usage: node scripts/run-phase4b.mjs '); + process.exit(1); + } + + // Phase 4b embeds node labels live → service must be initialized first. + await initEmbeddingService(); + + const { Pool } = await import('pg'); + const pool = new Pool({ connectionString: process.env.PG_CONNECTION_STRING }); + + try { + const sess = await pool.query('SELECT id FROM sessions WHERE session_key = $1 LIMIT 1', [sessionKey]); + const sessionUuid = sess.rows[0]?.id; + if (!sessionUuid) { + console.error(`[run-phase4b] no sessions row for session_key=${sessionKey}`); + process.exit(1); + } + + const emb = await pool.query( + 'SELECT COUNT(*)::int AS n FROM source_chunk_embeddings WHERE session_id = $1', + [sessionUuid], + ); + console.log(`[run-phase4b] session_key=${sessionKey} uuid=${sessionUuid} source_chunk_embeddings=${emb.rows[0].n}`); + if (emb.rows[0].n === 0) { + console.warn('[run-phase4b] source_chunk_embeddings is empty — Phase 4b will produce nothing. Backfill first.'); + } + + const result = await repairSourceProvenance(pool, sessionUuid); + console.log(`[run-phase4b] DONE — deleted ${result.deleted} prior rows, now ${result.provenanceRows} source_chunk provenance rows`); + } finally { + await pool.end(); + } +} + +main().catch(err => { console.error('[run-phase4b] fatal:', err); process.exit(1); }); diff --git a/super-legal-mcp-refactored/src/config/featureFlags.js b/super-legal-mcp-refactored/src/config/featureFlags.js index 648557121..0bc68570d 100644 --- a/super-legal-mcp-refactored/src/config/featureFlags.js +++ b/super-legal-mcp-refactored/src/config/featureFlags.js @@ -293,6 +293,38 @@ export const featureFlags = { TRANSCRIPT_SIDECAR_WRITE: envBool(process.env.TRANSCRIPT_SIDECAR_WRITE, false), }; +/** + * Boot-time flag dependency validation. Surfaces silent misconfigurations + * where a flag is ON but a prerequisite is missing — the class of failure + * that left RAW_SOURCE_EMBEDDING inert in production (flag true in flags.env, + * dependency unmet, dispatcher silently no-op). Returns an array of warning + * strings; callers log them. Never throws. + */ +export function validateFlags(flags = featureFlags, env = process.env) { + const warnings = []; + const hasGeminiKey = !!(env.GEMINI_API_KEY || env.GOOGLE_API_KEY); + + if (flags.RAW_SOURCE_EMBEDDING) { + if (!flags.RAW_SOURCE_ARCHIVE) { + warnings.push('RAW_SOURCE_EMBEDDING=true but RAW_SOURCE_ARCHIVE=false — no raw sources are archived to embed; embedding will produce nothing.'); + } + if (!flags.HOOK_DB_PERSISTENCE) { + warnings.push('RAW_SOURCE_EMBEDDING=true but HOOK_DB_PERSISTENCE=false — source_chunk_embeddings cannot be written.'); + } + if (!hasGeminiKey) { + warnings.push('RAW_SOURCE_EMBEDDING=true but neither GEMINI_API_KEY nor GOOGLE_API_KEY is set — embedDocuments() returns null, embedding silently no-ops.'); + } + } + + // Silent-divergence case: archive on, embedding off. Archive runs but raw + // sources are never embedded — exactly the Cardinal symptom. + if (flags.RAW_SOURCE_ARCHIVE && !flags.RAW_SOURCE_EMBEDDING) { + warnings.push('RAW_SOURCE_ARCHIVE=true but RAW_SOURCE_EMBEDDING=false — sources are archived but not embedded; kg/raw-sources provenance will rely on the semantic fallback only.'); + } + + return warnings; +} + /** * Parse WRAPPED_SUBAGENT_ALLOWLIST into a Set of agent names. Multiple call * sites (agents: IIFE filter, MCP server registration, system prompt) consume diff --git a/super-legal-mcp-refactored/src/db/postgres.js b/super-legal-mcp-refactored/src/db/postgres.js index eff5e51e1..48f0ad88c 100644 --- a/super-legal-mcp-refactored/src/db/postgres.js +++ b/super-legal-mcp-refactored/src/db/postgres.js @@ -677,6 +677,10 @@ const SESSIONS_RECONCILIATION_DDL = ` ALTER TABLE sessions ADD COLUMN IF NOT EXISTS artifacts_build_attempts INTEGER DEFAULT 0; ALTER TABLE sessions ADD COLUMN IF NOT EXISTS last_artifacts_build_attempt_at TIMESTAMPTZ; ALTER TABLE sessions ADD COLUMN IF NOT EXISTS artifacts_build_last_error TEXT; + -- One-shot marker for the reconciliation Phase 4b source-provenance repair + -- (embedding/KG-build race safety-net). Set true after a repair attempt so a + -- session that legitimately yields zero matches is not re-attempted forever. + ALTER TABLE sessions ADD COLUMN IF NOT EXISTS kg_phase4b_repaired BOOLEAN DEFAULT FALSE; CREATE INDEX IF NOT EXISTS idx_sessions_kg_pending ON sessions(created_at DESC, kg_build_attempts) @@ -973,6 +977,12 @@ const KG_SCHEMA_DDL = ` CREATE INDEX IF NOT EXISTS idx_kg_edges_source ON kg_edges(source_id); CREATE INDEX IF NOT EXISTS idx_kg_edges_target ON kg_edges(target_id); CREATE INDEX IF NOT EXISTS idx_kg_edges_session ON kg_edges(session_id, edge_type); + -- Composite (session_id, source_id/target_id) for the kg/raw-sources fallback + -- neighbor CTE (one-hop edge lookup scoped to a session). Plain (non-CONCURRENT) + -- here is safe: ensure*Schema runs pre-listen, so there is no live traffic to + -- lock. Live deploys against a large table use migration 023 (CONCURRENTLY). + CREATE INDEX IF NOT EXISTS idx_kg_edges_session_source ON kg_edges(session_id, source_id); + CREATE INDEX IF NOT EXISTS idx_kg_edges_session_target ON kg_edges(session_id, target_id); CREATE TABLE IF NOT EXISTS kg_evolution ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), diff --git a/super-legal-mcp-refactored/src/server/claude-sdk-server.js b/super-legal-mcp-refactored/src/server/claude-sdk-server.js index add02515c..ebfd5d97e 100644 --- a/super-legal-mcp-refactored/src/server/claude-sdk-server.js +++ b/super-legal-mcp-refactored/src/server/claude-sdk-server.js @@ -7,9 +7,14 @@ // Set Claude Code max output tokens before any SDK imports process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS = '64000'; +// MUST be first: loads flags.env (+ .env) into process.env BEFORE the +// hoisted static import of featureFlags.js evaluates. Without this, flags +// defined only in flags.env (e.g. RAW_SOURCE_EMBEDDING) silently default to +// off in local dev. No-op under bootstrap.js (prod). See loadFlagsEnv.js. +import './loadFlagsEnv.js'; + import express from 'express'; import cors from 'cors'; -import dotenv from 'dotenv'; import fs from 'fs'; import path from 'path'; import { fileURLToPath } from 'url'; @@ -23,7 +28,7 @@ import { CODE_EXECUTION_MODELS } from '../config/catalogDisplay/index.js'; import { createSdkStreamHandler } from '../utils/sdkStreamHandler.js'; -import { featureFlags, resolveModelId } from '../config/featureFlags.js'; +import { featureFlags, resolveModelId, validateFlags } from '../config/featureFlags.js'; // Both sources imported, MODULAR_SUBAGENTS flag selects which one is used import { getLegalSubagents as getLegacySubagents, SUBAGENT_SYSTEM_PROMPT_SECTION as LEGACY_PROMPT_SECTION } from '../config/legalSubagents.js'; import { getLegalSubagents as getModularSubagents, SUBAGENT_SYSTEM_PROMPT_SECTION as MODULAR_PROMPT_SECTION, getSubagent } from '../config/legalSubagents/index.js'; @@ -108,11 +113,10 @@ import { CircuitBreaker } from '../utils/circuitBreaker.js'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); -// dotenv.config() is handled by bootstrap.js in production (Dockerfile CMD). -// Guard preserves direct execution for local dev: node src/server/claude-sdk-server.js -if (!process.env.ANTHROPIC_API_KEY) { - dotenv.config({ path: path.join(__dirname, '../../.env') }); -} +// Env loading is handled by ./loadFlagsEnv.js (first import above) for local +// dev and by bootstrap.js (which also loads flags.env + Secret Manager) in +// production. No dotenv.config() here — it would run after featureFlags.js +// has already evaluated (static-import hoisting). // --- Environment Validation --- const REQUIRED_ENV = ['ANTHROPIC_API_KEY']; @@ -130,6 +134,23 @@ for (const key of RECOMMENDED_ENV) { } } +// --- Feature flag visibility + dependency validation (B2/B3) --- +// Surfaces the resolved state of the raw-source/embedding flags at boot and +// warns on silent misconfigurations (the class that left RAW_SOURCE_EMBEDDING +// inert: flag true in flags.env but a prerequisite unmet → dispatcher no-op). +console.log('[Startup] Feature flags:', JSON.stringify({ + RAW_SOURCE_ARCHIVE: featureFlags.RAW_SOURCE_ARCHIVE, + RAW_SOURCE_EMBEDDING: featureFlags.RAW_SOURCE_EMBEDDING, + HOOK_DB_PERSISTENCE: featureFlags.HOOK_DB_PERSISTENCE, + EMBEDDING_PERSISTENCE: featureFlags.EMBEDDING_PERSISTENCE, + KNOWLEDGE_GRAPH: featureFlags.KNOWLEDGE_GRAPH, + SESSION_RECONCILIATION: featureFlags.SESSION_RECONCILIATION, + gemini_key_present: !!(process.env.GEMINI_API_KEY || process.env.GOOGLE_API_KEY), +})); +for (const w of validateFlags()) { + console.warn(`[Startup] flag-validation: ${w}`); +} + initSdkMetrics(); // Initialize hook DB schema at startup (blocks before app.listen, idempotent DDL) @@ -166,8 +187,12 @@ if (featureFlags.CITATION_CHAT && featureFlags.HOOK_DB_PERSISTENCE) { } } -// Initialize embedding schema (pgvector) — requires HOOK_DB_PERSISTENCE -if (featureFlags.EMBEDDING_PERSISTENCE && featureFlags.HOOK_DB_PERSISTENCE) { +// Initialize embedding schema (pgvector) — requires HOOK_DB_PERSISTENCE. +// Also init when RAW_SOURCE_EMBEDDING is on: raw-source embedding and Phase 4b +// source-evidence linkage need the embedding service even if report-embedding +// (EMBEDDING_PERSISTENCE) is off. Without this, embedDocuments() silently +// returns null and the embedding/Phase-4b path no-ops. +if ((featureFlags.EMBEDDING_PERSISTENCE || featureFlags.RAW_SOURCE_EMBEDDING) && featureFlags.HOOK_DB_PERSISTENCE) { try { const { ensureEmbeddingSchema } = await import('../db/postgres.js'); const schemaReady = await ensureEmbeddingSchema(); diff --git a/super-legal-mcp-refactored/src/server/loadFlagsEnv.js b/super-legal-mcp-refactored/src/server/loadFlagsEnv.js new file mode 100644 index 000000000..111ee86f4 --- /dev/null +++ b/super-legal-mcp-refactored/src/server/loadFlagsEnv.js @@ -0,0 +1,30 @@ +/** + * Side-effect env loader — MUST be the first import in any server entrypoint + * that reads feature flags via static import. + * + * Why a separate module (not inline dotenv in the server file): ES module + * static imports are hoisted and evaluated BEFORE any statement in the + * importing file's body. featureFlags.js reads process.env at module-eval + * time, so a `dotenv.config()` call inside claude-sdk-server.js body runs + * too late — featureFlags has already frozen its values from a bare + * process.env. By doing the load here and importing THIS module first, + * flags.env lands in process.env before featureFlags.js evaluates. + * + * Production goes through bootstrap.js (which loads flags.env before the + * dynamic import of the server) — this module is then a harmless no-op + * (dotenv does not overwrite already-present keys). For local dev + * (`node src/server/claude-sdk-server.js`), this is what gives flags.env + * the same effect it has in production — closing the gap where + * RAW_SOURCE_EMBEDDING (and other flags.env-only flags) silently defaulted. + */ +import dotenv from 'dotenv'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); + +// flags.env is the single source of truth for feature flags (local + prod). +dotenv.config({ path: path.join(__dirname, '../../flags.env') }); +// .env supplies secrets in local dev; no-overwrite preserves anything already +// set by the shell or by bootstrap.js/Secret Manager in production. +dotenv.config({ path: path.join(__dirname, '../../.env') }); diff --git a/super-legal-mcp-refactored/src/utils/hookDBBridge.js b/super-legal-mcp-refactored/src/utils/hookDBBridge.js index 424df625b..948d59f67 100644 --- a/super-legal-mcp-refactored/src/utils/hookDBBridge.js +++ b/super-legal-mcp-refactored/src/utils/hookDBBridge.js @@ -1297,6 +1297,34 @@ async function persistHookEvent(pool, sessionCache, hookName, input, result) { kgMeta.missing_reports = CRITICAL_REPORTS.filter(k => !presentKeys.includes(k)); kgMeta.reports_timeout = true; } + // Wait for the fire-and-forget raw-source embedding queue to + // drain before the KG build, so Phase 4b (source-evidence + // linkage) sees a populated source_chunk_embeddings table + // instead of racing ahead of it and writing zero source_hash + // provenance. Mirrors the reports-poll above. Heuristic: count + // non-zero AND unchanged across two consecutive checks (queue + // drained), or cap. count-still-0-at-cap means the session + // embedded no raw sources — proceed; Phase 4b correctly skips. + if (featureFlags.RAW_SOURCE_EMBEDDING) { + try { + const MAX_EMB_POLLS = 12; // 12 × 5s = 60s cap (background task) + let prevCount = -1; + for (let i = 0; i < MAX_EMB_POLLS; i++) { + const er = await pool.query( + `SELECT COUNT(*)::int AS cnt FROM source_chunk_embeddings WHERE session_id = $1`, + [kgSessionId] + ); + const cnt = er.rows[0].cnt; + if (cnt > 0 && cnt === prevCount) break; // drained (stable, non-zero) + prevCount = cnt; + if (i < MAX_EMB_POLLS - 1) await new Promise(r => setTimeout(r, 5000)); + } + if (prevCount <= 0) kgMeta.source_embeddings_empty = true; + else kgMeta.source_embeddings_count = prevCount; + } catch (err) { + console.warn('[KG] source_chunk_embeddings stabilization poll failed (proceeding):', err.message); + } + } const { buildSessionKnowledgeGraph } = await import('./knowledgeGraphExtractor.js'); const kgResult = await buildSessionKnowledgeGraph(pool, kgSessionId, kgSessionKey); try { diff --git a/super-legal-mcp-refactored/src/utils/knowledgeGraph/kgPhases1to5.js b/super-legal-mcp-refactored/src/utils/knowledgeGraph/kgPhases1to5.js index 58701ec82..ed21ab0a8 100644 --- a/super-legal-mcp-refactored/src/utils/knowledgeGraph/kgPhases1to5.js +++ b/super-legal-mcp-refactored/src/utils/knowledgeGraph/kgPhases1to5.js @@ -781,12 +781,17 @@ async function phase4b_sourceEvidence(pool, sessionId, evolutionLog) { ); for (const match of matches.rows) { + // source_key is varchar(200): truncate (long chunk_headers overflowed and + // were silently dropped by upsertProvenance's catch). Strip NUL from both + // text fields — PostgreSQL rejects 0x00 in text/varchar. + const sourceKey = (match.chunk_header || 'raw_source').replace(/\0/g, '').slice(0, 200); + const rawText = match.chunk_content ? match.chunk_content.replace(/\0/g, '').slice(0, 300) : null; await upsertProvenance(pool, sessionId, node.id, null, { source_type: 'source_chunk', - source_key: match.chunk_header || 'raw_source', + source_key: sourceKey, source_hash: match.source_hash, extraction_method: 'embedding_similarity', - raw_text: match.chunk_content?.slice(0, 300) || null, + raw_text: rawText, }); provenanceCount++; } diff --git a/super-legal-mcp-refactored/src/utils/knowledgeGraph/repairSourceProvenance.js b/super-legal-mcp-refactored/src/utils/knowledgeGraph/repairSourceProvenance.js new file mode 100644 index 000000000..eef9c32cd --- /dev/null +++ b/super-legal-mcp-refactored/src/utils/knowledgeGraph/repairSourceProvenance.js @@ -0,0 +1,42 @@ +/** + * repairSourceProvenance — re-run Phase 4b (source-evidence linkage) for a + * single session, idempotently. + * + * Phase 4b (`phase4b_sourceEvidence`) embeds KG node labels and cosine-matches + * them against `source_chunk_embeddings`, writing `kg_provenance` rows with + * `source_type='source_chunk'`. It uses a plain INSERT (no ON CONFLICT), so a + * naive re-run would duplicate rows. This helper first DELETEs the prior + * phase4b output for the session (a targeted filter — `source_type='source_chunk'` + * is written ONLY by Phase 4b), then re-runs it. Safe to call repeatedly. + * + * Shared by: + * - scripts/run-phase4b.mjs (CLI: one-shot repair / Cardinal) + * - src/utils/sessionReconciliation.js (background safety-net for the race) + * + * The embedding service must already be initialized (server boot, or the CLI + * calls initEmbeddingService() before invoking this) — Phase 4b calls + * embedDocuments() internally, which returns null on an uninitialized client. + * + * @param {import('pg').Pool} pool + * @param {string} sessionUuid sessions.id (UUID), NOT the session_key string + * @returns {Promise<{ deleted: number, provenanceRows: number }>} + */ +export async function repairSourceProvenance(pool, sessionUuid) { + if (!sessionUuid) throw new Error('repairSourceProvenance: sessionUuid required'); + + // Idempotency: clear prior Phase 4b output for this session before re-running. + const del = await pool.query( + `DELETE FROM kg_provenance WHERE session_id = $1 AND source_type = 'source_chunk'`, + [sessionUuid], + ); + + const { phase4b_sourceEvidence } = await import('./kgPhases1to5.js'); + await phase4b_sourceEvidence(pool, sessionUuid, []); // evolutionLog unused for output + + const after = await pool.query( + `SELECT COUNT(*)::int AS n FROM kg_provenance WHERE session_id = $1 AND source_type = 'source_chunk'`, + [sessionUuid], + ); + + return { deleted: del.rowCount || 0, provenanceRows: after.rows[0].n }; +} diff --git a/super-legal-mcp-refactored/src/utils/rawSource/SourceChunker.js b/super-legal-mcp-refactored/src/utils/rawSource/SourceChunker.js index 40c93e528..d7973a5ce 100644 --- a/super-legal-mcp-refactored/src/utils/rawSource/SourceChunker.js +++ b/super-legal-mcp-refactored/src/utils/rawSource/SourceChunker.js @@ -306,6 +306,15 @@ export function chunk(content, toolName, url = null) { const chunks = strategyFn(content); - // Ensure no empty chunks returned - return chunks.filter(c => c.content && c.content.trim().length > 0); + // Strip NUL (\0) — PostgreSQL text/varchar reject it ("invalid byte sequence + // for encoding UTF8: 0x00"). Raw API/PDF responses can carry embedded nulls; + // without this, the downstream source_chunk_embeddings INSERT fails and the + // chunk is silently dropped. Same defensive strip as kgPhase4cNodeEmbeddings. + // Re-filter empties last, since stripping can empty a chunk. + return chunks + .map(c => ({ + header: typeof c.header === 'string' ? c.header.replace(/\0/g, '') : c.header, + content: typeof c.content === 'string' ? c.content.replace(/\0/g, '') : c.content, + })) + .filter(c => c.content && c.content.trim().length > 0); } diff --git a/super-legal-mcp-refactored/src/utils/sessionReconciliation.js b/super-legal-mcp-refactored/src/utils/sessionReconciliation.js index fc3a43514..9f8fd2637 100644 --- a/super-legal-mcp-refactored/src/utils/sessionReconciliation.js +++ b/super-legal-mcp-refactored/src/utils/sessionReconciliation.js @@ -37,6 +37,7 @@ import { fileURLToPath } from 'url'; import { getPool } from '../db/postgres.js'; import { featureFlags } from '../config/featureFlags.js'; import { backgroundTasks } from './hookDBBridge.js'; +import { repairSourceProvenance } from './knowledgeGraph/repairSourceProvenance.js'; import { recordReconciliationScan, recordReconciliationRebuild, @@ -549,13 +550,73 @@ async function reconcile() { } } + // Phase 4b source-provenance repair — safety-net for the embedding/ + // KG-build ordering race. A session can be kg_status='built' yet have + // zero source_chunk provenance if its raw-source embeddings landed AFTER + // the build's stabilization-poll cap. Re-run Phase 4b once (idempotent + // DELETE+insert). One-shot kg_phase4b_repaired marker prevents re-running + // forever on sessions that legitimately yield zero matches. + let p4bRows = { rowCount: 0, rows: [] }; + if (featureFlags.RAW_SOURCE_EMBEDDING) { + // Probe the embedding service FIRST. Phase 4b embeds node labels; if the + // service is unavailable (missing API key / not initialized) it writes 0 + // rows. Skip the pass WITHOUT setting the one-shot marker so the session + // is retried once the service is healthy — instead of burning the marker + // on a false "zero matches" result (the silent-no-op class this fix targets). + let embedReady = false; + try { + const { embedDocuments } = await import('./embeddingService.js'); + const probe = await embedDocuments(['_recon_probe']); + embedReady = !!(probe && probe[0] && probe[0].length > 0); + } catch { embedReady = false; } + + if (!embedReady) { + console.warn('[Recon] phase4b repair skipped — embedding service unavailable'); + } else { + try { + p4bRows = await pool.query(` + SELECT s.id, s.session_key + FROM sessions s + WHERE s.kg_status = 'built' + AND s.status = 'completed' + AND COALESCE(s.kg_phase4b_repaired, false) = false + AND s.created_at >= NOW() - INTERVAL '${MAX_SESSION_AGE_HOURS} hours' + AND EXISTS (SELECT 1 FROM source_chunk_embeddings e WHERE e.session_id = s.id) + AND NOT EXISTS (SELECT 1 FROM kg_provenance p WHERE p.session_id = s.id AND p.source_type = 'source_chunk') + ORDER BY s.created_at DESC + LIMIT $1`, + [SCAN_BATCH_LIMIT]); + } catch (p4bErr) { + // Column missing (migration not yet applied) etc. — skip, don't abort sweep. + console.warn('[Recon] phase4b candidate query failed:', p4bErr.code || p4bErr.message); + p4bRows = { rowCount: 0, rows: [] }; + } + + for (const session of p4bRows.rows) { + try { + const r = await repairSourceProvenance(pool, session.id); + // Service is confirmed live (probed above), so a zero-match result + // is a true terminal state — set the one-shot marker so we don't + // re-embed labels every scan for a session with no matches >0.70. + await pool.query('UPDATE sessions SET kg_phase4b_repaired = true WHERE id = $1', [session.id]); + console.log(`[Recon] phase4b ${session.session_key}: ${r.provenanceRows} source_chunk provenance rows`); + recordReconciliationRebuild('kg', 'ok'); + } catch (err) { + console.warn(`[Recon] phase4b ${session.session_key} threw:`, err.message); + recordReconciliationRebuild('kg', 'failed'); + } + } + } + } + span?.setAttribute?.('scan.kg_candidates', kgRows.rowCount); span?.setAttribute?.('scan.artifact_candidates', artRows.rowCount); span?.setAttribute?.('scan.xlsx_candidates', xlsxRows.rowCount); + span?.setAttribute?.('scan.phase4b_candidates', p4bRows.rowCount); recordReconciliationScan('ok'); - if (kgRows.rowCount + artRows.rowCount + xlsxRows.rowCount > 0) { - console.log(`[SessionReconciliation] Scan complete — kg:${kgRows.rowCount} artifacts:${artRows.rowCount} xlsx:${xlsxRows.rowCount}`); + if (kgRows.rowCount + artRows.rowCount + xlsxRows.rowCount + p4bRows.rowCount > 0) { + console.log(`[SessionReconciliation] Scan complete — kg:${kgRows.rowCount} artifacts:${artRows.rowCount} xlsx:${xlsxRows.rowCount} phase4b:${p4bRows.rowCount}`); } }); } catch (err) { diff --git a/super-legal-mcp-refactored/test/sdk/source-chunker-nul.test.js b/super-legal-mcp-refactored/test/sdk/source-chunker-nul.test.js new file mode 100644 index 000000000..0e7a6c789 --- /dev/null +++ b/super-legal-mcp-refactored/test/sdk/source-chunker-nul.test.js @@ -0,0 +1,58 @@ +/** + * SourceChunker NUL-byte stripping (Workstream A). + * + * Raw API/PDF responses can carry embedded NUL (0x00) bytes. PostgreSQL + * text/varchar columns reject them ("invalid byte sequence for encoding + * UTF8: 0x00"), which silently dropped 14/367 Cardinal sources from + * source_chunk_embeddings. chunk() must strip NUL from content and header. + */ + +import { test } from 'node:test'; +import assert from 'node:assert/strict'; +import { chunk } from '../../src/utils/rawSource/SourceChunker.js'; + +const NUL = String.fromCharCode(0); + +test('strips NUL from chunk content', () => { + const content = `First paragraph with a ${NUL} embedded null byte.\n\nSecond paragraph also ${NUL}has one.`; + const chunks = chunk(content, 'fetch_document', null); + assert.ok(chunks.length > 0, 'should produce chunks'); + for (const c of chunks) { + assert.ok(!c.content.includes(NUL), `content must not contain NUL: ${JSON.stringify(c.content)}`); + assert.ok(typeof c.header !== 'string' || !c.header.includes(NUL), 'header must not contain NUL'); + } +}); + +test('strips NUL from JSON-source content', () => { + const json = JSON.stringify({ a: `value with ${NUL} null`, b: `another ${NUL}` }); + const chunks = chunk(json, 'search_sec_filings', null); + for (const c of chunks) { + assert.ok(!c.content.includes(NUL), 'JSON chunk content must not contain NUL'); + } +}); + +test('a chunk that is ONLY a NUL byte is dropped (empty after strip)', () => { + // Paragraph split: a paragraph consisting solely of NUL becomes empty and + // must be filtered out, not returned as an empty chunk. + const content = `Real paragraph one.\n\n${NUL}\n\nReal paragraph two.`; + const chunks = chunk(content, 'fetch_document', null); + assert.ok(chunks.length >= 1); + for (const c of chunks) { + assert.ok(c.content.trim().length > 0, 'no empty chunks'); + assert.ok(!c.content.includes(NUL), 'no NUL survives'); + } +}); + +test('content with no NUL is unchanged in substance', () => { + const content = `Clean paragraph one.\n\nClean paragraph two.`; + const chunks = chunk(content, 'fetch_document', null); + const joined = chunks.map(c => c.content).join(' '); + assert.ok(joined.includes('Clean paragraph one')); + assert.ok(joined.includes('Clean paragraph two')); +}); + +test('empty / non-string input returns empty array (unchanged contract)', () => { + assert.deepEqual(chunk('', 'fetch_document', null), []); + assert.deepEqual(chunk(null, 'fetch_document', null), []); + assert.deepEqual(chunk(NUL, 'fetch_document', null), [], 'pure-NUL input yields no chunks'); +}); diff --git a/super-legal-mcp-refactored/test/sdk/validate-flags.test.js b/super-legal-mcp-refactored/test/sdk/validate-flags.test.js new file mode 100644 index 000000000..f7afa4076 --- /dev/null +++ b/super-legal-mcp-refactored/test/sdk/validate-flags.test.js @@ -0,0 +1,52 @@ +/** + * validateFlags() dependency-validation truth table (Workstream B3). + * + * Guards the silent-misconfiguration class that left RAW_SOURCE_EMBEDDING + * inert in production (flag true, prerequisite unmet, dispatcher no-op). + */ + +import { test } from 'node:test'; +import assert from 'node:assert/strict'; +import { validateFlags } from '../../src/config/featureFlags.js'; + +const base = { + RAW_SOURCE_ARCHIVE: true, + RAW_SOURCE_EMBEDDING: true, + HOOK_DB_PERSISTENCE: true, +}; +const keyEnv = { GEMINI_API_KEY: 'x' }; + +test('all prerequisites met → no warnings', () => { + assert.deepEqual(validateFlags(base, keyEnv), []); +}); + +test('GOOGLE_API_KEY alone satisfies the key requirement', () => { + assert.deepEqual(validateFlags(base, { GOOGLE_API_KEY: 'x' }), []); +}); + +test('embedding on, archive off → warns', () => { + const w = validateFlags({ ...base, RAW_SOURCE_ARCHIVE: false }, keyEnv); + assert.ok(w.some(s => /RAW_SOURCE_ARCHIVE=false/.test(s))); +}); + +test('embedding on, HOOK_DB off → warns', () => { + const w = validateFlags({ ...base, HOOK_DB_PERSISTENCE: false }, keyEnv); + assert.ok(w.some(s => /HOOK_DB_PERSISTENCE=false/.test(s))); +}); + +test('embedding on, no API key → warns', () => { + const w = validateFlags(base, {}); + assert.ok(w.some(s => /GEMINI_API_KEY nor GOOGLE_API_KEY/.test(s))); +}); + +test('archive on, embedding off → silent-divergence warning', () => { + const w = validateFlags({ ...base, RAW_SOURCE_EMBEDDING: false }, keyEnv); + assert.ok(w.some(s => /archived but not embedded/.test(s))); +}); + +test('both archive + embedding off → no warnings (feature simply off)', () => { + assert.deepEqual( + validateFlags({ RAW_SOURCE_ARCHIVE: false, RAW_SOURCE_EMBEDDING: false, HOOK_DB_PERSISTENCE: true }, keyEnv), + [], + ); +});