From 92c44571464a37eddb6f4c4d01910cb1d6033e17 Mon Sep 17 00:00:00 2001 From: Number531 <120485065+Number531@users.noreply.github.com> Date: Fri, 29 May 2026 00:08:44 -0400 Subject: [PATCH 1/2] =?UTF-8?q?fix(kg):=20deterministic=20raw-source=20pro?= =?UTF-8?q?venance=20=E2=80=94=20NUL=20strip,=20flag=20resolution,=20embed?= =?UTF-8?q?ding/Phase-4b=20race?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Durable backend correction so kg/raw-sources provenance populates reliably on live sessions instead of depending on a one-off backfill + the frontend's semantic fallback. Root causes (all confirmed): a fire-and-forget embedding queue racing the post-session KG build, RAW_SOURCE_EMBEDDING silently inert in dev (flags.env never loaded before the hoisted featureFlags import), and a NUL-byte INSERT that silently dropped sources. A. SourceChunker.chunk() strips NUL (0x00) from chunk content/header — Postgres rejects 0x00; 14/367 Cardinal sources were silently dropped. Mirrors the kgPhase4cNodeEmbeddings precedent. + source-chunker-nul.test.js. B. loadFlagsEnv.js (first import) loads flags.env before featureFlags evaluates, fixing the dev hoisting gap. Boot '[Startup] Feature flags' banner + validateFlags() warns on RAW_SOURCE_EMBEDDING prerequisites (ARCHIVE/HOOK_DB/GEMINI||GOOGLE) and the archive-on/embedding-off divergence. + validate-flags.test.js. C. hookDBBridge SessionEnd: stabilization poll waits for source_chunk_embeddings to drain (count>0 & stable, 60s cap, proceed-on-cap) before the KG build, so Phase 4b sees populated embeddings. Reconciliation safety-net re-runs Phase 4b for built-but-unlinked sessions, gated by one-shot kg_phase4b_repaired marker. D. scripts/run-phase4b.mjs + repairSourceProvenance(): idempotent DELETE + re-run of Phase 4b for one session (no full-rebuild provenance duplication). Hardened phase4b provenance write: truncate source_key to varchar(200) and strip NUL (previously dropped ~6 rows/run silently). E. kg_edges composite indexes (session_id, source_id)/(session_id, target_id) for the fallback neighbor CTE — boot path (plain, pre-listen) + migration 023 (CONCURRENTLY) for live deploys. G. Commit scripts/backfill-source-chunk-embeddings.mjs as the historical-session backfill tool. Rollout: RAW_SOURCE_EMBEDDING stays ON; RAW_SOURCE_EMBEDDING=false is the instant rollback. Route-contract hardening (linkage_mode, threshold) is a separate PR on frontend-revamp. 424/424 KG unit tests pass; Cardinal repaired (171 source_chunk provenance rows, idempotent re-run verified). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../migrations/023_kg-edges-composite-idx.js | 41 +++++++++++ .../024_sessions-kg-phase4b-repaired.down.sql | 2 + .../024_sessions-kg-phase4b-repaired.up.sql | 8 +++ .../backfill-source-chunk-embeddings.mjs | Bin 0 -> 9871 bytes .../scripts/run-phase4b.mjs | 68 ++++++++++++++++++ .../src/config/featureFlags.js | 32 +++++++++ super-legal-mcp-refactored/src/db/postgres.js | 10 +++ .../src/server/claude-sdk-server.js | 35 +++++++-- .../src/server/loadFlagsEnv.js | 30 ++++++++ .../src/utils/hookDBBridge.js | 28 ++++++++ .../src/utils/knowledgeGraph/kgPhases1to5.js | 9 ++- .../knowledgeGraph/repairSourceProvenance.js | 42 +++++++++++ .../src/utils/rawSource/SourceChunker.js | 13 +++- .../src/utils/sessionReconciliation.js | 48 ++++++++++++- .../test/sdk/source-chunker-nul.test.js | 58 +++++++++++++++ .../test/sdk/validate-flags.test.js | 52 ++++++++++++++ 16 files changed, 463 insertions(+), 13 deletions(-) create mode 100644 super-legal-mcp-refactored/migrations/023_kg-edges-composite-idx.js create mode 100644 super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.down.sql create mode 100644 super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.up.sql create mode 100644 super-legal-mcp-refactored/scripts/backfill-source-chunk-embeddings.mjs create mode 100644 super-legal-mcp-refactored/scripts/run-phase4b.mjs create mode 100644 super-legal-mcp-refactored/src/server/loadFlagsEnv.js create mode 100644 super-legal-mcp-refactored/src/utils/knowledgeGraph/repairSourceProvenance.js create mode 100644 super-legal-mcp-refactored/test/sdk/source-chunker-nul.test.js create mode 100644 super-legal-mcp-refactored/test/sdk/validate-flags.test.js diff --git a/super-legal-mcp-refactored/migrations/023_kg-edges-composite-idx.js b/super-legal-mcp-refactored/migrations/023_kg-edges-composite-idx.js new file mode 100644 index 000000000..cf7025567 --- /dev/null +++ b/super-legal-mcp-refactored/migrations/023_kg-edges-composite-idx.js @@ -0,0 +1,41 @@ +/** + * 023 — kg_edges composite indexes (session_id, source_id) / (session_id, target_id). + * + * Speeds the GET /api/db/sessions/:sessionKey/kg/raw-sources/:nodeId fallback's + * one-hop neighbor CTE, which scans kg_edges by (session-scoped) source_id OR + * target_id. The pre-existing single-column indexes + (session_id, edge_type) + * don't cover the (session_id, source_id|target_id) access pattern. + * + * CONCURRENTLY: non-locking build, safe to apply to a large live kg_edges + * during deploy. Requires running outside a transaction — hence pgm.noTransaction(). + * The boot path (ensureKnowledgeGraphSchema in src/db/postgres.js) creates the + * same indexes plainly; that runs pre-listen so locking is irrelevant there. + */ + +export const up = (pgm) => { + pgm.noTransaction(); // CREATE INDEX CONCURRENTLY cannot run inside a transaction block + pgm.createIndex('kg_edges', ['session_id', 'source_id'], { + name: 'idx_kg_edges_session_source', + ifNotExists: true, + concurrently: true, + }); + pgm.createIndex('kg_edges', ['session_id', 'target_id'], { + name: 'idx_kg_edges_session_target', + ifNotExists: true, + concurrently: true, + }); +}; + +export const down = (pgm) => { + pgm.noTransaction(); + pgm.dropIndex('kg_edges', ['session_id', 'source_id'], { + name: 'idx_kg_edges_session_source', + ifExists: true, + concurrently: true, + }); + pgm.dropIndex('kg_edges', ['session_id', 'target_id'], { + name: 'idx_kg_edges_session_target', + ifExists: true, + concurrently: true, + }); +}; diff --git a/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.down.sql b/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.down.sql new file mode 100644 index 000000000..63396aa73 --- /dev/null +++ b/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.down.sql @@ -0,0 +1,2 @@ +-- 024_sessions-kg-phase4b-repaired.down.sql +ALTER TABLE sessions DROP COLUMN IF EXISTS kg_phase4b_repaired; diff --git a/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.up.sql b/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.up.sql new file mode 100644 index 000000000..68ebb219d --- /dev/null +++ b/super-legal-mcp-refactored/migrations/024_sessions-kg-phase4b-repaired.up.sql @@ -0,0 +1,8 @@ +-- 024_sessions-kg-phase4b-repaired.up.sql +-- One-shot marker for the reconciliation Phase 4b source-provenance repair +-- (embedding / KG-build ordering-race safety-net). Set true after a repair +-- attempt so a session that legitimately yields zero matches is not +-- re-attempted on every scan. Mirrors the column added to +-- ensureKnowledgeGraphSchema()'s sessions ALTERs in src/db/postgres.js. + +ALTER TABLE sessions ADD COLUMN IF NOT EXISTS kg_phase4b_repaired BOOLEAN DEFAULT FALSE; diff --git a/super-legal-mcp-refactored/scripts/backfill-source-chunk-embeddings.mjs b/super-legal-mcp-refactored/scripts/backfill-source-chunk-embeddings.mjs new file mode 100644 index 0000000000000000000000000000000000000000..89b0bfdfd61c611434c8774641b12e57a7c3cfa7 GIT binary patch literal 9871 zcmdT~>rxv>7T(`}isLG8G>WAW5IYwLc41H)Z9pOj)|Fx@c^I{%F{2q}dIr!Es`3zd z!aT`-=XCeX2t<cebF`7F|;ZX=!RZQLk|v4Dm` zyX%M^M0yeA;Y?+qmQbK&Ws~oSp1< zN8O`$-GhT(|2>VPF(qkEnbK*D1M{p<6y!`OE0R2#t7Z!lJ}D$KJzwNYQr9X0H)%1Q z2~(A{Ov?&PH(>cDi}GA0KAl3!&egP2E!C8zMXqS97AhHQs1CGnL>^qmims!8X&ui3 zt?1PyEwU7c21%&Mvqr$HS(7PalMb_J!IEL!xCoU|7EEDP*w9Q3FY!uyOFcuFkcP!vB{{tBy_!c!)S@pozS=@O%v*HQJw54l|G9TKIvDJq9d-Mst0Xr!+hH0P z^TcW$Vk}hVLg=7-*gfrvw?|{RU<#KCmynA^8}1K|yW4bWL$sYH@v!4*D=StxguPSQ_0kuHijfUjH6eFPt@bMUaFaZ3HcDJ2TLNi-P8_GxdDtBeUc zIl)nj!`8Y!ysJps3l5i8m;M}X^sQxMbfuPWC{HC8ji~B&5l3Mpk$*KERk7vQLJQ$} zW5Q@{*uvW;D$~x}&osQzI%<-RA*w{A?l^v}E!!e{yVRs!!|QpwJ`&ypBW}&)ZZ4kltCqWUz?I7ZQkik?zyiHphTVJ*}UbnZl+M8c}_06lVUu|q{ zvd?v%J&WK$Sx${H`_Xj=^e54@vE!akESxtcx_ZcQCV(MG-*C6p0szg-q=5!sRxj@0 z9(=WZt0hADa*6EM|Wr=rfz^eg-Hc35SE;t)y(RGj` ziy}3YcpW8Ubqj26`c~gIx1RoV+#S99X>>Xm976LjwL-=ojV%;zvyT~6Y_yQ@gVBw; z9BIWF67To{>;&g+(7m%{opQH-b_B8sfMuw1@4QZ}z8ki!2;-y%43VTaN#g?co)k$a z@HPu;Mu;Su8jVb5il5 zxISIs#x0jFVUiZfc$;3_xu;A-&SE0YlW|z?ZGr(Jge?&QchjdHpt`D-Af?oaXiK_3 zp7l<;2U29N;yY@0MjTB^fj;X7ZjSDNq8vG!f|~&J*5SV=C*#oLEWI?vjXMlofwWb& z#2dqlrgVd%76jQGIm>`O2o52?$_~JGrh+)1Ne$hAqndHw63K zwI8cwiiE$ryGwBLb;@tlT%k-_rMkFR3$BAGmX64K-#zN}d!xPM-sp$!Pc)Cv>?PAq zlw1dKG)4iscZsmTBT4g58mV%ChtBwzgTW7|ucA4;@PSbGP3v?}{gzJ$?}+0LC9-v4 zIy>tfl#0HiV}5&I>e|#XUSl!kxGC*1y;dRmnl;PJi_U}fOw^=DrthHyNs|O5kiCa! zXOii5<)_KDfBb&5Kj`6l#f~NT11ds6u=Ds|gCGvkRRJQaQLv$efr-TOj zyOY6@lTy=%@4F{mcgs){n-# zPV#YW*?I@6V@ZO}9sJ7x0K}vPvJW2POU+x@lI$%e3`A-)ut!itgvaF(qlhhQ65QL~39$V;OXsj|v-O=y=@Zb;XzD=Atxw<(fsIQ z!KrjO!QY$G2$}g=6c&JhagaX32VJh7lIJ4?&ownLCz(`oVbYW%a(_?q`Wt zb>i#j--_OFE_=%M+|A4t7Y%E+-aj%ZPpmL8Y9Zb8Y!x;)SmY{+<5cIlm$lJKx*gytk)L)=zaP!zDiOWJ+0Gu1eGbN32sToH~bx@56 z8FWlz$ZfdTG(!QGmjf{_iJ>!_XV}p|i=#2A zTf~ewoxgRaC_@`u_G&1*MomU~AhxD42T-wBab;^W79@J*bKS#{L@EGE7^fm{E~7`D zMxub039=11U{US307DaQTJ1}PYHx$vY*bK~jmeF;BZky?&gQ{RxVgqZvCiYQTr-BBUFkf-s%6^9SsaoRNTDy*KM>43vYA#jlYmNT7xyI@z+wJ-e3~?-B zzA8z|`7ki6RjxT@z$G!S%FT938}iD@a+zWV?GT7V!;ObUCp>=0ZD`95CWYm^%g<9& zE_sb2pR~U=u!w@*CLa%-g0l!sSvo$U_AKV04veH%XQkuXyfZQ+c*g&@Rg;Q;A zHSYy3vJ6u#%|pbp#UN)gIiQyHtRh%l_R=QWhZXm#6NtC2dV)}fp2f}OM&jPwb8yHD z>zDSd76y%GaMB{4!5R749Zj(O*Gx#*)RqXE*_Nvr-g$kAlkU?x$a##D4a=?RjQ+TcF>fuPqfy%HAGoGvWNMqO zvbG^Q=Y{2zr&i+-S&ww!Cu#ne^Xn{);q2_z=TfFtNRGlfn6G=WKR6s>{pO}Kn$%7V zxe*?3?BKs|S6v#9UcRjS+HsL$d4#VLOwiFHQ-aia^B0Ald8Ph{K6)=++@hhcFa&x10y@@{Fm6A1`qU#e~Jz`YsQDeN;pH^hh zo78W(2I3;g!sB^#;afm*`R-3vJG5I_CXOVT)+WvRTL;kV4p`VBgKxhZaq{~)_9~13 zgUvdz##Fv$R-dfeTWu7s*O9uZxhlfo@i)$u_ecC$Csa{g5mqz@ov7$)N0%;cQr{m3 z;YSN8z5eOIH5UR+wO*oTxRsCSWrQIFTgK5x^2Nj57<;)p%XD2wz(*6=T>hE_DMb>h z4u`!hn&Y!zjF*AF8o6~>Dj9_sG|HHdLluAUmH4bX1a3bHYF|dd{DGr+U-Mhztd5^j zZR=ode{h7!OoeL(!(GYy7g4>ei}KrUy)s&N;K|_d@ZH}24~?cToxF#EFmm`E7x6~i zItza>D_UFsNlhi(J*ukk`H{bn#+W)-1}v&)DpvS<>=J}=I>TH&?O~H^&n{s&%xt&_ zb6*7TR&aC7&F}_`_@LD10j$dGRGjE$hO&^n2=~_1es532ptux6nQbg z-+4R@vFrip?P^m&z^V8vsLVu+V;BqYPyS~lJ;v`DMOyhVptqN0!P4jN(P$3Mta-a` z3j)v^b@NvmkYGJJ0UG~GYP08qLBCrKL);n}N0}BjnkX!CC**`@Ffm&AoQ=zZr(qC9jW6EowakO6~~pO8}_5Y=UFcia>5k>nFB^ zlvuD{$n#=5ImKi1^~Xj%Iw%SI(gx<^OMU)7!9+9O2Dovzwd0y{{+c5nSfuU$4;H<^ AfdBvi literal 0 HcmV?d00001 diff --git a/super-legal-mcp-refactored/scripts/run-phase4b.mjs b/super-legal-mcp-refactored/scripts/run-phase4b.mjs new file mode 100644 index 000000000..7bb2715ac --- /dev/null +++ b/super-legal-mcp-refactored/scripts/run-phase4b.mjs @@ -0,0 +1,68 @@ +/** + * run-phase4b.mjs — standalone Phase 4b (source-evidence) runner for ONE session. + * + * Writes explicit `kg_provenance.source_hash` rows by cosine-matching KG node + * labels against `source_chunk_embeddings`, WITHOUT a full 16-phase KG rebuild + * (a full rebuild would duplicate ~all provenance — `upsertProvenance` has no + * ON CONFLICT). Idempotent: re-running yields the same row count. + * + * Prerequisites: `source_chunk_embeddings` must already be populated for the + * session (live embedding, or scripts/backfill-source-chunk-embeddings.mjs for + * historical sessions). Phase 4b has NO Phase 4c dependency — it embeds labels + * fresh, so it needs only the embedding service + GEMINI_API_KEY/GOOGLE_API_KEY. + * + * Usage: + * node scripts/run-phase4b.mjs + * (e.g., node scripts/run-phase4b.mjs 2026-05-22-1779484021) + */ + +import 'dotenv/config'; +import path from 'path'; +import { fileURLToPath } from 'url'; +import dotenv from 'dotenv'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); +// Ensure flags.env is loaded (GEMINI/GOOGLE key may live in .env; flags here for parity). +dotenv.config({ path: path.join(__dirname, '../flags.env') }); + +import { initEmbeddingService } from '../src/utils/embeddingService.js'; +import { repairSourceProvenance } from '../src/utils/knowledgeGraph/repairSourceProvenance.js'; + +async function main() { + const sessionKey = process.argv.slice(2).find(a => !a.startsWith('--')); + if (!sessionKey) { + console.error('Usage: node scripts/run-phase4b.mjs '); + process.exit(1); + } + + // Phase 4b embeds node labels live → service must be initialized first. + await initEmbeddingService(); + + const { Pool } = await import('pg'); + const pool = new Pool({ connectionString: process.env.PG_CONNECTION_STRING }); + + try { + const sess = await pool.query('SELECT id FROM sessions WHERE session_key = $1 LIMIT 1', [sessionKey]); + const sessionUuid = sess.rows[0]?.id; + if (!sessionUuid) { + console.error(`[run-phase4b] no sessions row for session_key=${sessionKey}`); + process.exit(1); + } + + const emb = await pool.query( + 'SELECT COUNT(*)::int AS n FROM source_chunk_embeddings WHERE session_id = $1', + [sessionUuid], + ); + console.log(`[run-phase4b] session_key=${sessionKey} uuid=${sessionUuid} source_chunk_embeddings=${emb.rows[0].n}`); + if (emb.rows[0].n === 0) { + console.warn('[run-phase4b] source_chunk_embeddings is empty — Phase 4b will produce nothing. Backfill first.'); + } + + const result = await repairSourceProvenance(pool, sessionUuid); + console.log(`[run-phase4b] DONE — deleted ${result.deleted} prior rows, now ${result.provenanceRows} source_chunk provenance rows`); + } finally { + await pool.end(); + } +} + +main().catch(err => { console.error('[run-phase4b] fatal:', err); process.exit(1); }); diff --git a/super-legal-mcp-refactored/src/config/featureFlags.js b/super-legal-mcp-refactored/src/config/featureFlags.js index 648557121..0bc68570d 100644 --- a/super-legal-mcp-refactored/src/config/featureFlags.js +++ b/super-legal-mcp-refactored/src/config/featureFlags.js @@ -293,6 +293,38 @@ export const featureFlags = { TRANSCRIPT_SIDECAR_WRITE: envBool(process.env.TRANSCRIPT_SIDECAR_WRITE, false), }; +/** + * Boot-time flag dependency validation. Surfaces silent misconfigurations + * where a flag is ON but a prerequisite is missing — the class of failure + * that left RAW_SOURCE_EMBEDDING inert in production (flag true in flags.env, + * dependency unmet, dispatcher silently no-op). Returns an array of warning + * strings; callers log them. Never throws. + */ +export function validateFlags(flags = featureFlags, env = process.env) { + const warnings = []; + const hasGeminiKey = !!(env.GEMINI_API_KEY || env.GOOGLE_API_KEY); + + if (flags.RAW_SOURCE_EMBEDDING) { + if (!flags.RAW_SOURCE_ARCHIVE) { + warnings.push('RAW_SOURCE_EMBEDDING=true but RAW_SOURCE_ARCHIVE=false — no raw sources are archived to embed; embedding will produce nothing.'); + } + if (!flags.HOOK_DB_PERSISTENCE) { + warnings.push('RAW_SOURCE_EMBEDDING=true but HOOK_DB_PERSISTENCE=false — source_chunk_embeddings cannot be written.'); + } + if (!hasGeminiKey) { + warnings.push('RAW_SOURCE_EMBEDDING=true but neither GEMINI_API_KEY nor GOOGLE_API_KEY is set — embedDocuments() returns null, embedding silently no-ops.'); + } + } + + // Silent-divergence case: archive on, embedding off. Archive runs but raw + // sources are never embedded — exactly the Cardinal symptom. + if (flags.RAW_SOURCE_ARCHIVE && !flags.RAW_SOURCE_EMBEDDING) { + warnings.push('RAW_SOURCE_ARCHIVE=true but RAW_SOURCE_EMBEDDING=false — sources are archived but not embedded; kg/raw-sources provenance will rely on the semantic fallback only.'); + } + + return warnings; +} + /** * Parse WRAPPED_SUBAGENT_ALLOWLIST into a Set of agent names. Multiple call * sites (agents: IIFE filter, MCP server registration, system prompt) consume diff --git a/super-legal-mcp-refactored/src/db/postgres.js b/super-legal-mcp-refactored/src/db/postgres.js index eff5e51e1..48f0ad88c 100644 --- a/super-legal-mcp-refactored/src/db/postgres.js +++ b/super-legal-mcp-refactored/src/db/postgres.js @@ -677,6 +677,10 @@ const SESSIONS_RECONCILIATION_DDL = ` ALTER TABLE sessions ADD COLUMN IF NOT EXISTS artifacts_build_attempts INTEGER DEFAULT 0; ALTER TABLE sessions ADD COLUMN IF NOT EXISTS last_artifacts_build_attempt_at TIMESTAMPTZ; ALTER TABLE sessions ADD COLUMN IF NOT EXISTS artifacts_build_last_error TEXT; + -- One-shot marker for the reconciliation Phase 4b source-provenance repair + -- (embedding/KG-build race safety-net). Set true after a repair attempt so a + -- session that legitimately yields zero matches is not re-attempted forever. + ALTER TABLE sessions ADD COLUMN IF NOT EXISTS kg_phase4b_repaired BOOLEAN DEFAULT FALSE; CREATE INDEX IF NOT EXISTS idx_sessions_kg_pending ON sessions(created_at DESC, kg_build_attempts) @@ -973,6 +977,12 @@ const KG_SCHEMA_DDL = ` CREATE INDEX IF NOT EXISTS idx_kg_edges_source ON kg_edges(source_id); CREATE INDEX IF NOT EXISTS idx_kg_edges_target ON kg_edges(target_id); CREATE INDEX IF NOT EXISTS idx_kg_edges_session ON kg_edges(session_id, edge_type); + -- Composite (session_id, source_id/target_id) for the kg/raw-sources fallback + -- neighbor CTE (one-hop edge lookup scoped to a session). Plain (non-CONCURRENT) + -- here is safe: ensure*Schema runs pre-listen, so there is no live traffic to + -- lock. Live deploys against a large table use migration 023 (CONCURRENTLY). + CREATE INDEX IF NOT EXISTS idx_kg_edges_session_source ON kg_edges(session_id, source_id); + CREATE INDEX IF NOT EXISTS idx_kg_edges_session_target ON kg_edges(session_id, target_id); CREATE TABLE IF NOT EXISTS kg_evolution ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), diff --git a/super-legal-mcp-refactored/src/server/claude-sdk-server.js b/super-legal-mcp-refactored/src/server/claude-sdk-server.js index add02515c..c4a791f72 100644 --- a/super-legal-mcp-refactored/src/server/claude-sdk-server.js +++ b/super-legal-mcp-refactored/src/server/claude-sdk-server.js @@ -7,9 +7,14 @@ // Set Claude Code max output tokens before any SDK imports process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS = '64000'; +// MUST be first: loads flags.env (+ .env) into process.env BEFORE the +// hoisted static import of featureFlags.js evaluates. Without this, flags +// defined only in flags.env (e.g. RAW_SOURCE_EMBEDDING) silently default to +// off in local dev. No-op under bootstrap.js (prod). See loadFlagsEnv.js. +import './loadFlagsEnv.js'; + import express from 'express'; import cors from 'cors'; -import dotenv from 'dotenv'; import fs from 'fs'; import path from 'path'; import { fileURLToPath } from 'url'; @@ -23,7 +28,7 @@ import { CODE_EXECUTION_MODELS } from '../config/catalogDisplay/index.js'; import { createSdkStreamHandler } from '../utils/sdkStreamHandler.js'; -import { featureFlags, resolveModelId } from '../config/featureFlags.js'; +import { featureFlags, resolveModelId, validateFlags } from '../config/featureFlags.js'; // Both sources imported, MODULAR_SUBAGENTS flag selects which one is used import { getLegalSubagents as getLegacySubagents, SUBAGENT_SYSTEM_PROMPT_SECTION as LEGACY_PROMPT_SECTION } from '../config/legalSubagents.js'; import { getLegalSubagents as getModularSubagents, SUBAGENT_SYSTEM_PROMPT_SECTION as MODULAR_PROMPT_SECTION, getSubagent } from '../config/legalSubagents/index.js'; @@ -108,11 +113,10 @@ import { CircuitBreaker } from '../utils/circuitBreaker.js'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); -// dotenv.config() is handled by bootstrap.js in production (Dockerfile CMD). -// Guard preserves direct execution for local dev: node src/server/claude-sdk-server.js -if (!process.env.ANTHROPIC_API_KEY) { - dotenv.config({ path: path.join(__dirname, '../../.env') }); -} +// Env loading is handled by ./loadFlagsEnv.js (first import above) for local +// dev and by bootstrap.js (which also loads flags.env + Secret Manager) in +// production. No dotenv.config() here — it would run after featureFlags.js +// has already evaluated (static-import hoisting). // --- Environment Validation --- const REQUIRED_ENV = ['ANTHROPIC_API_KEY']; @@ -130,6 +134,23 @@ for (const key of RECOMMENDED_ENV) { } } +// --- Feature flag visibility + dependency validation (B2/B3) --- +// Surfaces the resolved state of the raw-source/embedding flags at boot and +// warns on silent misconfigurations (the class that left RAW_SOURCE_EMBEDDING +// inert: flag true in flags.env but a prerequisite unmet → dispatcher no-op). +console.log('[Startup] Feature flags:', JSON.stringify({ + RAW_SOURCE_ARCHIVE: featureFlags.RAW_SOURCE_ARCHIVE, + RAW_SOURCE_EMBEDDING: featureFlags.RAW_SOURCE_EMBEDDING, + HOOK_DB_PERSISTENCE: featureFlags.HOOK_DB_PERSISTENCE, + EMBEDDING_PERSISTENCE: featureFlags.EMBEDDING_PERSISTENCE, + KNOWLEDGE_GRAPH: featureFlags.KNOWLEDGE_GRAPH, + SESSION_RECONCILIATION: featureFlags.SESSION_RECONCILIATION, + gemini_key_present: !!(process.env.GEMINI_API_KEY || process.env.GOOGLE_API_KEY), +})); +for (const w of validateFlags()) { + console.warn(`[Startup] flag-validation: ${w}`); +} + initSdkMetrics(); // Initialize hook DB schema at startup (blocks before app.listen, idempotent DDL) diff --git a/super-legal-mcp-refactored/src/server/loadFlagsEnv.js b/super-legal-mcp-refactored/src/server/loadFlagsEnv.js new file mode 100644 index 000000000..111ee86f4 --- /dev/null +++ b/super-legal-mcp-refactored/src/server/loadFlagsEnv.js @@ -0,0 +1,30 @@ +/** + * Side-effect env loader — MUST be the first import in any server entrypoint + * that reads feature flags via static import. + * + * Why a separate module (not inline dotenv in the server file): ES module + * static imports are hoisted and evaluated BEFORE any statement in the + * importing file's body. featureFlags.js reads process.env at module-eval + * time, so a `dotenv.config()` call inside claude-sdk-server.js body runs + * too late — featureFlags has already frozen its values from a bare + * process.env. By doing the load here and importing THIS module first, + * flags.env lands in process.env before featureFlags.js evaluates. + * + * Production goes through bootstrap.js (which loads flags.env before the + * dynamic import of the server) — this module is then a harmless no-op + * (dotenv does not overwrite already-present keys). For local dev + * (`node src/server/claude-sdk-server.js`), this is what gives flags.env + * the same effect it has in production — closing the gap where + * RAW_SOURCE_EMBEDDING (and other flags.env-only flags) silently defaulted. + */ +import dotenv from 'dotenv'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); + +// flags.env is the single source of truth for feature flags (local + prod). +dotenv.config({ path: path.join(__dirname, '../../flags.env') }); +// .env supplies secrets in local dev; no-overwrite preserves anything already +// set by the shell or by bootstrap.js/Secret Manager in production. +dotenv.config({ path: path.join(__dirname, '../../.env') }); diff --git a/super-legal-mcp-refactored/src/utils/hookDBBridge.js b/super-legal-mcp-refactored/src/utils/hookDBBridge.js index 424df625b..948d59f67 100644 --- a/super-legal-mcp-refactored/src/utils/hookDBBridge.js +++ b/super-legal-mcp-refactored/src/utils/hookDBBridge.js @@ -1297,6 +1297,34 @@ async function persistHookEvent(pool, sessionCache, hookName, input, result) { kgMeta.missing_reports = CRITICAL_REPORTS.filter(k => !presentKeys.includes(k)); kgMeta.reports_timeout = true; } + // Wait for the fire-and-forget raw-source embedding queue to + // drain before the KG build, so Phase 4b (source-evidence + // linkage) sees a populated source_chunk_embeddings table + // instead of racing ahead of it and writing zero source_hash + // provenance. Mirrors the reports-poll above. Heuristic: count + // non-zero AND unchanged across two consecutive checks (queue + // drained), or cap. count-still-0-at-cap means the session + // embedded no raw sources — proceed; Phase 4b correctly skips. + if (featureFlags.RAW_SOURCE_EMBEDDING) { + try { + const MAX_EMB_POLLS = 12; // 12 × 5s = 60s cap (background task) + let prevCount = -1; + for (let i = 0; i < MAX_EMB_POLLS; i++) { + const er = await pool.query( + `SELECT COUNT(*)::int AS cnt FROM source_chunk_embeddings WHERE session_id = $1`, + [kgSessionId] + ); + const cnt = er.rows[0].cnt; + if (cnt > 0 && cnt === prevCount) break; // drained (stable, non-zero) + prevCount = cnt; + if (i < MAX_EMB_POLLS - 1) await new Promise(r => setTimeout(r, 5000)); + } + if (prevCount <= 0) kgMeta.source_embeddings_empty = true; + else kgMeta.source_embeddings_count = prevCount; + } catch (err) { + console.warn('[KG] source_chunk_embeddings stabilization poll failed (proceeding):', err.message); + } + } const { buildSessionKnowledgeGraph } = await import('./knowledgeGraphExtractor.js'); const kgResult = await buildSessionKnowledgeGraph(pool, kgSessionId, kgSessionKey); try { diff --git a/super-legal-mcp-refactored/src/utils/knowledgeGraph/kgPhases1to5.js b/super-legal-mcp-refactored/src/utils/knowledgeGraph/kgPhases1to5.js index 58701ec82..ed21ab0a8 100644 --- a/super-legal-mcp-refactored/src/utils/knowledgeGraph/kgPhases1to5.js +++ b/super-legal-mcp-refactored/src/utils/knowledgeGraph/kgPhases1to5.js @@ -781,12 +781,17 @@ async function phase4b_sourceEvidence(pool, sessionId, evolutionLog) { ); for (const match of matches.rows) { + // source_key is varchar(200): truncate (long chunk_headers overflowed and + // were silently dropped by upsertProvenance's catch). Strip NUL from both + // text fields — PostgreSQL rejects 0x00 in text/varchar. + const sourceKey = (match.chunk_header || 'raw_source').replace(/\0/g, '').slice(0, 200); + const rawText = match.chunk_content ? match.chunk_content.replace(/\0/g, '').slice(0, 300) : null; await upsertProvenance(pool, sessionId, node.id, null, { source_type: 'source_chunk', - source_key: match.chunk_header || 'raw_source', + source_key: sourceKey, source_hash: match.source_hash, extraction_method: 'embedding_similarity', - raw_text: match.chunk_content?.slice(0, 300) || null, + raw_text: rawText, }); provenanceCount++; } diff --git a/super-legal-mcp-refactored/src/utils/knowledgeGraph/repairSourceProvenance.js b/super-legal-mcp-refactored/src/utils/knowledgeGraph/repairSourceProvenance.js new file mode 100644 index 000000000..eef9c32cd --- /dev/null +++ b/super-legal-mcp-refactored/src/utils/knowledgeGraph/repairSourceProvenance.js @@ -0,0 +1,42 @@ +/** + * repairSourceProvenance — re-run Phase 4b (source-evidence linkage) for a + * single session, idempotently. + * + * Phase 4b (`phase4b_sourceEvidence`) embeds KG node labels and cosine-matches + * them against `source_chunk_embeddings`, writing `kg_provenance` rows with + * `source_type='source_chunk'`. It uses a plain INSERT (no ON CONFLICT), so a + * naive re-run would duplicate rows. This helper first DELETEs the prior + * phase4b output for the session (a targeted filter — `source_type='source_chunk'` + * is written ONLY by Phase 4b), then re-runs it. Safe to call repeatedly. + * + * Shared by: + * - scripts/run-phase4b.mjs (CLI: one-shot repair / Cardinal) + * - src/utils/sessionReconciliation.js (background safety-net for the race) + * + * The embedding service must already be initialized (server boot, or the CLI + * calls initEmbeddingService() before invoking this) — Phase 4b calls + * embedDocuments() internally, which returns null on an uninitialized client. + * + * @param {import('pg').Pool} pool + * @param {string} sessionUuid sessions.id (UUID), NOT the session_key string + * @returns {Promise<{ deleted: number, provenanceRows: number }>} + */ +export async function repairSourceProvenance(pool, sessionUuid) { + if (!sessionUuid) throw new Error('repairSourceProvenance: sessionUuid required'); + + // Idempotency: clear prior Phase 4b output for this session before re-running. + const del = await pool.query( + `DELETE FROM kg_provenance WHERE session_id = $1 AND source_type = 'source_chunk'`, + [sessionUuid], + ); + + const { phase4b_sourceEvidence } = await import('./kgPhases1to5.js'); + await phase4b_sourceEvidence(pool, sessionUuid, []); // evolutionLog unused for output + + const after = await pool.query( + `SELECT COUNT(*)::int AS n FROM kg_provenance WHERE session_id = $1 AND source_type = 'source_chunk'`, + [sessionUuid], + ); + + return { deleted: del.rowCount || 0, provenanceRows: after.rows[0].n }; +} diff --git a/super-legal-mcp-refactored/src/utils/rawSource/SourceChunker.js b/super-legal-mcp-refactored/src/utils/rawSource/SourceChunker.js index 40c93e528..d7973a5ce 100644 --- a/super-legal-mcp-refactored/src/utils/rawSource/SourceChunker.js +++ b/super-legal-mcp-refactored/src/utils/rawSource/SourceChunker.js @@ -306,6 +306,15 @@ export function chunk(content, toolName, url = null) { const chunks = strategyFn(content); - // Ensure no empty chunks returned - return chunks.filter(c => c.content && c.content.trim().length > 0); + // Strip NUL (\0) — PostgreSQL text/varchar reject it ("invalid byte sequence + // for encoding UTF8: 0x00"). Raw API/PDF responses can carry embedded nulls; + // without this, the downstream source_chunk_embeddings INSERT fails and the + // chunk is silently dropped. Same defensive strip as kgPhase4cNodeEmbeddings. + // Re-filter empties last, since stripping can empty a chunk. + return chunks + .map(c => ({ + header: typeof c.header === 'string' ? c.header.replace(/\0/g, '') : c.header, + content: typeof c.content === 'string' ? c.content.replace(/\0/g, '') : c.content, + })) + .filter(c => c.content && c.content.trim().length > 0); } diff --git a/super-legal-mcp-refactored/src/utils/sessionReconciliation.js b/super-legal-mcp-refactored/src/utils/sessionReconciliation.js index fc3a43514..d8b6cf6fd 100644 --- a/super-legal-mcp-refactored/src/utils/sessionReconciliation.js +++ b/super-legal-mcp-refactored/src/utils/sessionReconciliation.js @@ -37,6 +37,7 @@ import { fileURLToPath } from 'url'; import { getPool } from '../db/postgres.js'; import { featureFlags } from '../config/featureFlags.js'; import { backgroundTasks } from './hookDBBridge.js'; +import { repairSourceProvenance } from './knowledgeGraph/repairSourceProvenance.js'; import { recordReconciliationScan, recordReconciliationRebuild, @@ -549,13 +550,56 @@ async function reconcile() { } } + // Phase 4b source-provenance repair — safety-net for the embedding/ + // KG-build ordering race. A session can be kg_status='built' yet have + // zero source_chunk provenance if its raw-source embeddings landed AFTER + // the build's stabilization-poll cap. Re-run Phase 4b once (idempotent + // DELETE+insert). One-shot kg_phase4b_repaired marker prevents re-running + // forever on sessions that legitimately yield zero matches. + let p4bRows = { rowCount: 0, rows: [] }; + if (featureFlags.RAW_SOURCE_EMBEDDING) { + try { + p4bRows = await pool.query(` + SELECT s.id, s.session_key + FROM sessions s + WHERE s.kg_status = 'built' + AND s.status = 'completed' + AND COALESCE(s.kg_phase4b_repaired, false) = false + AND s.created_at >= NOW() - INTERVAL '${MAX_SESSION_AGE_HOURS} hours' + AND EXISTS (SELECT 1 FROM source_chunk_embeddings e WHERE e.session_id = s.id) + AND NOT EXISTS (SELECT 1 FROM kg_provenance p WHERE p.session_id = s.id AND p.source_type = 'source_chunk') + ORDER BY s.created_at DESC + LIMIT $1`, + [SCAN_BATCH_LIMIT]); + } catch (p4bErr) { + // Column missing (migration not yet applied) etc. — skip, don't abort sweep. + console.warn('[Recon] phase4b candidate query failed:', p4bErr.code || p4bErr.message); + p4bRows = { rowCount: 0, rows: [] }; + } + + for (const session of p4bRows.rows) { + try { + const r = await repairSourceProvenance(pool, session.id); + // One-shot marker: set regardless of match count so a zero-match + // session is not re-attempted every scan. + await pool.query('UPDATE sessions SET kg_phase4b_repaired = true WHERE id = $1', [session.id]); + console.log(`[Recon] phase4b ${session.session_key}: ${r.provenanceRows} source_chunk provenance rows`); + recordReconciliationRebuild('kg', 'ok'); + } catch (err) { + console.warn(`[Recon] phase4b ${session.session_key} threw:`, err.message); + recordReconciliationRebuild('kg', 'failed'); + } + } + } + span?.setAttribute?.('scan.kg_candidates', kgRows.rowCount); span?.setAttribute?.('scan.artifact_candidates', artRows.rowCount); span?.setAttribute?.('scan.xlsx_candidates', xlsxRows.rowCount); + span?.setAttribute?.('scan.phase4b_candidates', p4bRows.rowCount); recordReconciliationScan('ok'); - if (kgRows.rowCount + artRows.rowCount + xlsxRows.rowCount > 0) { - console.log(`[SessionReconciliation] Scan complete — kg:${kgRows.rowCount} artifacts:${artRows.rowCount} xlsx:${xlsxRows.rowCount}`); + if (kgRows.rowCount + artRows.rowCount + xlsxRows.rowCount + p4bRows.rowCount > 0) { + console.log(`[SessionReconciliation] Scan complete — kg:${kgRows.rowCount} artifacts:${artRows.rowCount} xlsx:${xlsxRows.rowCount} phase4b:${p4bRows.rowCount}`); } }); } catch (err) { diff --git a/super-legal-mcp-refactored/test/sdk/source-chunker-nul.test.js b/super-legal-mcp-refactored/test/sdk/source-chunker-nul.test.js new file mode 100644 index 000000000..0e7a6c789 --- /dev/null +++ b/super-legal-mcp-refactored/test/sdk/source-chunker-nul.test.js @@ -0,0 +1,58 @@ +/** + * SourceChunker NUL-byte stripping (Workstream A). + * + * Raw API/PDF responses can carry embedded NUL (0x00) bytes. PostgreSQL + * text/varchar columns reject them ("invalid byte sequence for encoding + * UTF8: 0x00"), which silently dropped 14/367 Cardinal sources from + * source_chunk_embeddings. chunk() must strip NUL from content and header. + */ + +import { test } from 'node:test'; +import assert from 'node:assert/strict'; +import { chunk } from '../../src/utils/rawSource/SourceChunker.js'; + +const NUL = String.fromCharCode(0); + +test('strips NUL from chunk content', () => { + const content = `First paragraph with a ${NUL} embedded null byte.\n\nSecond paragraph also ${NUL}has one.`; + const chunks = chunk(content, 'fetch_document', null); + assert.ok(chunks.length > 0, 'should produce chunks'); + for (const c of chunks) { + assert.ok(!c.content.includes(NUL), `content must not contain NUL: ${JSON.stringify(c.content)}`); + assert.ok(typeof c.header !== 'string' || !c.header.includes(NUL), 'header must not contain NUL'); + } +}); + +test('strips NUL from JSON-source content', () => { + const json = JSON.stringify({ a: `value with ${NUL} null`, b: `another ${NUL}` }); + const chunks = chunk(json, 'search_sec_filings', null); + for (const c of chunks) { + assert.ok(!c.content.includes(NUL), 'JSON chunk content must not contain NUL'); + } +}); + +test('a chunk that is ONLY a NUL byte is dropped (empty after strip)', () => { + // Paragraph split: a paragraph consisting solely of NUL becomes empty and + // must be filtered out, not returned as an empty chunk. + const content = `Real paragraph one.\n\n${NUL}\n\nReal paragraph two.`; + const chunks = chunk(content, 'fetch_document', null); + assert.ok(chunks.length >= 1); + for (const c of chunks) { + assert.ok(c.content.trim().length > 0, 'no empty chunks'); + assert.ok(!c.content.includes(NUL), 'no NUL survives'); + } +}); + +test('content with no NUL is unchanged in substance', () => { + const content = `Clean paragraph one.\n\nClean paragraph two.`; + const chunks = chunk(content, 'fetch_document', null); + const joined = chunks.map(c => c.content).join(' '); + assert.ok(joined.includes('Clean paragraph one')); + assert.ok(joined.includes('Clean paragraph two')); +}); + +test('empty / non-string input returns empty array (unchanged contract)', () => { + assert.deepEqual(chunk('', 'fetch_document', null), []); + assert.deepEqual(chunk(null, 'fetch_document', null), []); + assert.deepEqual(chunk(NUL, 'fetch_document', null), [], 'pure-NUL input yields no chunks'); +}); diff --git a/super-legal-mcp-refactored/test/sdk/validate-flags.test.js b/super-legal-mcp-refactored/test/sdk/validate-flags.test.js new file mode 100644 index 000000000..f7afa4076 --- /dev/null +++ b/super-legal-mcp-refactored/test/sdk/validate-flags.test.js @@ -0,0 +1,52 @@ +/** + * validateFlags() dependency-validation truth table (Workstream B3). + * + * Guards the silent-misconfiguration class that left RAW_SOURCE_EMBEDDING + * inert in production (flag true, prerequisite unmet, dispatcher no-op). + */ + +import { test } from 'node:test'; +import assert from 'node:assert/strict'; +import { validateFlags } from '../../src/config/featureFlags.js'; + +const base = { + RAW_SOURCE_ARCHIVE: true, + RAW_SOURCE_EMBEDDING: true, + HOOK_DB_PERSISTENCE: true, +}; +const keyEnv = { GEMINI_API_KEY: 'x' }; + +test('all prerequisites met → no warnings', () => { + assert.deepEqual(validateFlags(base, keyEnv), []); +}); + +test('GOOGLE_API_KEY alone satisfies the key requirement', () => { + assert.deepEqual(validateFlags(base, { GOOGLE_API_KEY: 'x' }), []); +}); + +test('embedding on, archive off → warns', () => { + const w = validateFlags({ ...base, RAW_SOURCE_ARCHIVE: false }, keyEnv); + assert.ok(w.some(s => /RAW_SOURCE_ARCHIVE=false/.test(s))); +}); + +test('embedding on, HOOK_DB off → warns', () => { + const w = validateFlags({ ...base, HOOK_DB_PERSISTENCE: false }, keyEnv); + assert.ok(w.some(s => /HOOK_DB_PERSISTENCE=false/.test(s))); +}); + +test('embedding on, no API key → warns', () => { + const w = validateFlags(base, {}); + assert.ok(w.some(s => /GEMINI_API_KEY nor GOOGLE_API_KEY/.test(s))); +}); + +test('archive on, embedding off → silent-divergence warning', () => { + const w = validateFlags({ ...base, RAW_SOURCE_EMBEDDING: false }, keyEnv); + assert.ok(w.some(s => /archived but not embedded/.test(s))); +}); + +test('both archive + embedding off → no warnings (feature simply off)', () => { + assert.deepEqual( + validateFlags({ RAW_SOURCE_ARCHIVE: false, RAW_SOURCE_EMBEDDING: false, HOOK_DB_PERSISTENCE: true }, keyEnv), + [], + ); +}); From b44f10cb5be33a725eb9dcdee8f2d60cb031edca Mon Sep 17 00:00:00 2001 From: Number531 <120485065+Number531@users.noreply.github.com> Date: Fri, 29 May 2026 00:20:20 -0400 Subject: [PATCH 2/2] =?UTF-8?q?fix(kg):=20review=20follow-ups=20=E2=80=94?= =?UTF-8?q?=20embedding-service=20availability=20before=20phase4b=20repair?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses gaps found in self-review of 684b293a: - CRITICAL: reconciliation could burn the one-shot kg_phase4b_repaired marker when the embedding service was unavailable (e.g. RAW_SOURCE_EMBEDDING=true but EMBEDDING_PERSISTENCE=false → initEmbeddingService never ran → embedDocuments null → Phase 4b wrote 0 rows → marker set → never retried). Two-part fix: (a) broaden the boot init gate to also initialize the embedding service when RAW_SOURCE_EMBEDDING is on (not only EMBEDDING_PERSISTENCE); (b) probe the embedding service before the reconciliation repair pass and skip WITHOUT setting the marker when it's unavailable, so the session is retried once the service is healthy. The marker is only set when the service is confirmed live (a true zero-match terminal state). - MEDIUM: backfill-source-chunk-embeddings.mjs had a raw NUL byte embedded in the stripNul regex literal (made the file non-UTF8). Replaced with the /\0/g escape. 424/424 KG unit tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../backfill-source-chunk-embeddings.mjs | Bin 9871 -> 9872 bytes .../src/server/claude-sdk-server.js | 8 +- .../src/utils/sessionReconciliation.js | 73 +++++++++++------- 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/super-legal-mcp-refactored/scripts/backfill-source-chunk-embeddings.mjs b/super-legal-mcp-refactored/scripts/backfill-source-chunk-embeddings.mjs index 89b0bfdfd61c611434c8774641b12e57a7c3cfa7..1629698171dae4795e4bb3aed12d5f5073679379 100644 GIT binary patch delta 15 WcmeD8o#4A6M2soMU~{OLl>h)NeFVw? delta 14 VcmbQ>+wZ#}M2wMPbC{Tw001Te1U>)& diff --git a/super-legal-mcp-refactored/src/server/claude-sdk-server.js b/super-legal-mcp-refactored/src/server/claude-sdk-server.js index c4a791f72..ebfd5d97e 100644 --- a/super-legal-mcp-refactored/src/server/claude-sdk-server.js +++ b/super-legal-mcp-refactored/src/server/claude-sdk-server.js @@ -187,8 +187,12 @@ if (featureFlags.CITATION_CHAT && featureFlags.HOOK_DB_PERSISTENCE) { } } -// Initialize embedding schema (pgvector) — requires HOOK_DB_PERSISTENCE -if (featureFlags.EMBEDDING_PERSISTENCE && featureFlags.HOOK_DB_PERSISTENCE) { +// Initialize embedding schema (pgvector) — requires HOOK_DB_PERSISTENCE. +// Also init when RAW_SOURCE_EMBEDDING is on: raw-source embedding and Phase 4b +// source-evidence linkage need the embedding service even if report-embedding +// (EMBEDDING_PERSISTENCE) is off. Without this, embedDocuments() silently +// returns null and the embedding/Phase-4b path no-ops. +if ((featureFlags.EMBEDDING_PERSISTENCE || featureFlags.RAW_SOURCE_EMBEDDING) && featureFlags.HOOK_DB_PERSISTENCE) { try { const { ensureEmbeddingSchema } = await import('../db/postgres.js'); const schemaReady = await ensureEmbeddingSchema(); diff --git a/super-legal-mcp-refactored/src/utils/sessionReconciliation.js b/super-legal-mcp-refactored/src/utils/sessionReconciliation.js index d8b6cf6fd..9f8fd2637 100644 --- a/super-legal-mcp-refactored/src/utils/sessionReconciliation.js +++ b/super-legal-mcp-refactored/src/utils/sessionReconciliation.js @@ -558,36 +558,53 @@ async function reconcile() { // forever on sessions that legitimately yield zero matches. let p4bRows = { rowCount: 0, rows: [] }; if (featureFlags.RAW_SOURCE_EMBEDDING) { + // Probe the embedding service FIRST. Phase 4b embeds node labels; if the + // service is unavailable (missing API key / not initialized) it writes 0 + // rows. Skip the pass WITHOUT setting the one-shot marker so the session + // is retried once the service is healthy — instead of burning the marker + // on a false "zero matches" result (the silent-no-op class this fix targets). + let embedReady = false; try { - p4bRows = await pool.query(` - SELECT s.id, s.session_key - FROM sessions s - WHERE s.kg_status = 'built' - AND s.status = 'completed' - AND COALESCE(s.kg_phase4b_repaired, false) = false - AND s.created_at >= NOW() - INTERVAL '${MAX_SESSION_AGE_HOURS} hours' - AND EXISTS (SELECT 1 FROM source_chunk_embeddings e WHERE e.session_id = s.id) - AND NOT EXISTS (SELECT 1 FROM kg_provenance p WHERE p.session_id = s.id AND p.source_type = 'source_chunk') - ORDER BY s.created_at DESC - LIMIT $1`, - [SCAN_BATCH_LIMIT]); - } catch (p4bErr) { - // Column missing (migration not yet applied) etc. — skip, don't abort sweep. - console.warn('[Recon] phase4b candidate query failed:', p4bErr.code || p4bErr.message); - p4bRows = { rowCount: 0, rows: [] }; - } - - for (const session of p4bRows.rows) { + const { embedDocuments } = await import('./embeddingService.js'); + const probe = await embedDocuments(['_recon_probe']); + embedReady = !!(probe && probe[0] && probe[0].length > 0); + } catch { embedReady = false; } + + if (!embedReady) { + console.warn('[Recon] phase4b repair skipped — embedding service unavailable'); + } else { try { - const r = await repairSourceProvenance(pool, session.id); - // One-shot marker: set regardless of match count so a zero-match - // session is not re-attempted every scan. - await pool.query('UPDATE sessions SET kg_phase4b_repaired = true WHERE id = $1', [session.id]); - console.log(`[Recon] phase4b ${session.session_key}: ${r.provenanceRows} source_chunk provenance rows`); - recordReconciliationRebuild('kg', 'ok'); - } catch (err) { - console.warn(`[Recon] phase4b ${session.session_key} threw:`, err.message); - recordReconciliationRebuild('kg', 'failed'); + p4bRows = await pool.query(` + SELECT s.id, s.session_key + FROM sessions s + WHERE s.kg_status = 'built' + AND s.status = 'completed' + AND COALESCE(s.kg_phase4b_repaired, false) = false + AND s.created_at >= NOW() - INTERVAL '${MAX_SESSION_AGE_HOURS} hours' + AND EXISTS (SELECT 1 FROM source_chunk_embeddings e WHERE e.session_id = s.id) + AND NOT EXISTS (SELECT 1 FROM kg_provenance p WHERE p.session_id = s.id AND p.source_type = 'source_chunk') + ORDER BY s.created_at DESC + LIMIT $1`, + [SCAN_BATCH_LIMIT]); + } catch (p4bErr) { + // Column missing (migration not yet applied) etc. — skip, don't abort sweep. + console.warn('[Recon] phase4b candidate query failed:', p4bErr.code || p4bErr.message); + p4bRows = { rowCount: 0, rows: [] }; + } + + for (const session of p4bRows.rows) { + try { + const r = await repairSourceProvenance(pool, session.id); + // Service is confirmed live (probed above), so a zero-match result + // is a true terminal state — set the one-shot marker so we don't + // re-embed labels every scan for a session with no matches >0.70. + await pool.query('UPDATE sessions SET kg_phase4b_repaired = true WHERE id = $1', [session.id]); + console.log(`[Recon] phase4b ${session.session_key}: ${r.provenanceRows} source_chunk provenance rows`); + recordReconciliationRebuild('kg', 'ok'); + } catch (err) { + console.warn(`[Recon] phase4b ${session.session_key} threw:`, err.message); + recordReconciliationRebuild('kg', 'failed'); + } } } }