Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* 023 — kg_edges composite indexes (session_id, source_id) / (session_id, target_id).
*
* Speeds the GET /api/db/sessions/:sessionKey/kg/raw-sources/:nodeId fallback's
* one-hop neighbor CTE, which scans kg_edges by (session-scoped) source_id OR
* target_id. The pre-existing single-column indexes + (session_id, edge_type)
* don't cover the (session_id, source_id|target_id) access pattern.
*
* CONCURRENTLY: non-locking build, safe to apply to a large live kg_edges
* during deploy. Requires running outside a transaction — hence pgm.noTransaction().
* The boot path (ensureKnowledgeGraphSchema in src/db/postgres.js) creates the
* same indexes plainly; that runs pre-listen so locking is irrelevant there.
*/

export const up = (pgm) => {
pgm.noTransaction(); // CREATE INDEX CONCURRENTLY cannot run inside a transaction block
pgm.createIndex('kg_edges', ['session_id', 'source_id'], {
name: 'idx_kg_edges_session_source',
ifNotExists: true,
concurrently: true,
});
pgm.createIndex('kg_edges', ['session_id', 'target_id'], {
name: 'idx_kg_edges_session_target',
ifNotExists: true,
concurrently: true,
});
};

export const down = (pgm) => {
pgm.noTransaction();
pgm.dropIndex('kg_edges', ['session_id', 'source_id'], {
name: 'idx_kg_edges_session_source',
ifExists: true,
concurrently: true,
});
pgm.dropIndex('kg_edges', ['session_id', 'target_id'], {
name: 'idx_kg_edges_session_target',
ifExists: true,
concurrently: true,
});
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- 024_sessions-kg-phase4b-repaired.down.sql
ALTER TABLE sessions DROP COLUMN IF EXISTS kg_phase4b_repaired;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- 024_sessions-kg-phase4b-repaired.up.sql
-- One-shot marker for the reconciliation Phase 4b source-provenance repair
-- (embedding / KG-build ordering-race safety-net). Set true after a repair
-- attempt so a session that legitimately yields zero matches is not
-- re-attempted on every scan. Mirrors the column added to
-- ensureKnowledgeGraphSchema()'s sessions ALTERs in src/db/postgres.js.

ALTER TABLE sessions ADD COLUMN IF NOT EXISTS kg_phase4b_repaired BOOLEAN DEFAULT FALSE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/**
* Backfill source_chunk_embeddings from an on-disk raw-source archive.
*
* Why this exists: the live SourceEmbeddingDispatcher ran as the no-op stub
* for historical sessions (RAW_SOURCE_EMBEDDING did not resolve true at
* runtime), so source_chunk_embeddings is empty even though the raw-source
* archive was written. The /kg/raw-sources route depends on this table via
* Phase 4b provenance (source_hash). This script reproduces the dispatcher's
* process() pipeline (read -> chunk -> embed -> INSERT) offline, against the
* archive already on disk.
*
* Mirrors src/utils/rawSource/SourceEmbeddingDispatcher.js process() exactly:
* - same SourceStorage.read (decompress + integrity check)
* - same SourceChunker.chunk (NUL-stripped — see Workstream A)
* - same embeddingService.embedDocuments (Gemini, 3072-dim, RETRIEVAL_DOCUMENT)
* - same 11-column INSERT + per-source DELETE for idempotency
*
* SCOPE: backfill-only, for HISTORICAL sessions that completed before the live
* fixes (flags.env loading + embedding/Phase-4b race fix). New live sessions
* self-populate source_chunk_embeddings — they do NOT need this script. After
* backfilling, run scripts/run-phase4b.mjs <session_key> to write the explicit
* kg_provenance.source_hash rows the kg/raw-sources route prefers.
*
* Usage:
* node scripts/backfill-source-chunk-embeddings.mjs <session_key> [--dry-run] [--concurrency=5] [--force]
* (defaults to Cardinal: 2026-05-22-1779484021)
*/

import 'dotenv/config';
import fs from 'fs';
import readline from 'readline';
import path from 'path';
import { fileURLToPath } from 'url';

const __dirname = path.dirname(fileURLToPath(import.meta.url));
const REPO_ROOT = path.resolve(__dirname, '..');

import { createSourceStorage } from '../src/utils/rawSource/SourceStorage.js';
import { chunk as chunkSource } from '../src/utils/rawSource/SourceChunker.js';
import { embedDocuments, initEmbeddingService } from '../src/utils/embeddingService.js';

// Mirror the service's source_type derivation (rawSource/index.js).
const SOURCE_TYPE_BY_TOOL = { fetch_document: 'document', exa_web_search: 'exa_result' };
const SOURCE_TYPE_ENUM = new Set(['document', 'exa_result', 'web_search', 'unknown']);
function inferSourceType(toolName) {
if (!toolName) return 'unknown';
const t = SOURCE_TYPE_BY_TOOL[toolName] || 'unknown';
return SOURCE_TYPE_ENUM.has(t) ? t : 'unknown';
}

// ── args ──────────────────────────────────────────────────────────────
const args = process.argv.slice(2);
const sessionKey = args.find(a => !a.startsWith('--')) || '2026-05-22-1779484021';
const dryRun = args.includes('--dry-run');
const concArg = args.find(a => a.startsWith('--concurrency='));
const concurrency = concArg ? parseInt(concArg.split('=')[1], 10) : 5;

const sessionDir = path.join(REPO_ROOT, 'reports', sessionKey);
const poolDir = path.join(sessionDir, 'raw-sources');
const manifestPath = path.join(sessionDir, 'raw-sources-manifest.ndjson');

async function main() {
console.log(`[backfill] session_key=${sessionKey} dryRun=${dryRun} concurrency=${concurrency}`);

if (!fs.existsSync(manifestPath)) {
console.error(`[backfill] manifest not found: ${manifestPath}`);
process.exit(1);
}

// 1. Init embedding service (REQUIRED — embedDocuments returns null without it)
await initEmbeddingService();
// Probe the service is live before doing work
const probe = await embedDocuments(['healthcheck'], ['probe']);
if (!probe || !probe[0] || probe[0].length === 0) {
console.error('[backfill] embedding service not available (GEMINI_API_KEY missing/invalid). Aborting.');
process.exit(1);
}
console.log(`[backfill] embedding service OK (dim=${probe[0].length})`);

// 2. DB pool + session UUID
const { Pool } = await import('pg');
const pgvector = await import('pgvector/pg');
const pool = new Pool({ connectionString: process.env.PG_CONNECTION_STRING });

const sess = await pool.query('SELECT id FROM sessions WHERE session_key = $1 LIMIT 1', [sessionKey]);
const sessionUuid = sess.rows[0]?.id;
if (!sessionUuid) {
console.error(`[backfill] no sessions row for session_key=${sessionKey}. Aborting.`);
await pool.end();
process.exit(1);
}
console.log(`[backfill] session_id (uuid)=${sessionUuid}`);

// 3. Read manifest, dedupe by hash (keep first occurrence's metadata)
const byHash = new Map();
const rl = readline.createInterface({ input: fs.createReadStream(manifestPath) });
for await (const line of rl) {
if (!line.trim()) continue;
let row;
try { row = JSON.parse(line); } catch { continue; }
if (!row.hash) continue;
if (!byHash.has(row.hash)) byHash.set(row.hash, row);
}
const sources = [...byHash.values()];
console.log(`[backfill] ${sources.length} unique sources to process`);

// Resume support: skip sources already embedded for this session (unless --force)
const force = args.includes('--force');
const existingHashes = new Set();
if (!force && !dryRun) {
const ex = await pool.query(
'SELECT DISTINCT source_hash FROM source_chunk_embeddings WHERE session_id = $1',
[sessionUuid],
);
for (const r of ex.rows) existingHashes.add(r.source_hash);
if (existingHashes.size > 0) console.log(`[backfill] resume: ${existingHashes.size} sources already present, will skip`);
}

// PostgreSQL text/varchar cannot store NUL (0x00). Strip it from any value
// we persist. The live dispatcher has the same constraint — see findings.
const stripNul = (s) => (typeof s === 'string' ? s.replace(/\0/g, '') : s);

const storage = createSourceStorage({ poolDir });

// 4. Process each source (mirror dispatcher process())
const stats = { processed: 0, inserted: 0, chunks: 0, skipped: 0, skippedEmpty: 0, readFail: 0, embedFail: 0, insertFail: 0 };

async function processSource(row) {
const hash = row.hash;
if (existingHashes.has(hash)) { stats.skipped++; return; }
const ext = row.ext || 'text';
const toolName = row.tool_name || null;
const sourceType = inferSourceType(toolName);
const url = row.url || null;

// read
let content;
try {
const body = await storage.read(hash, ext);
content = body.toString('utf-8');
} catch (err) {
stats.readFail++;
console.warn(`[backfill] read failed ${hash.slice(0, 12)}: ${err.message}`);
return;
}
if (!content || content.trim().length === 0) { stats.skippedEmpty++; return; }

// chunk
const chunks = chunkSource(content, toolName, url);
if (chunks.length === 0) { stats.skippedEmpty++; return; }

// embed
const texts = chunks.map(c => c.content);
const titles = chunks.map(c => c.header);
const embeddings = await embedDocuments(texts, titles);
if (!embeddings || embeddings.length !== chunks.length) {
stats.embedFail++;
console.warn(`[backfill] embed mismatch ${hash.slice(0, 12)}: expected ${chunks.length} got ${embeddings?.length}`);
return;
}

if (dryRun) {
stats.processed++; stats.chunks += chunks.length;
return;
}

// INSERT (per-source transaction; DELETE first for idempotency) — mirrors dispatcher
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(
'DELETE FROM source_chunk_embeddings WHERE source_hash = $1 AND session_id = $2',
[hash, sessionUuid],
);
const valuePlaceholders = [];
const params = [];
const COLS = 11;
for (let i = 0; i < chunks.length; i++) {
const o = i * COLS;
valuePlaceholders.push(
`($${o + 1}, $${o + 2}, $${o + 3}, $${o + 4}, $${o + 5}, $${o + 6}, $${o + 7}, $${o + 8}, $${o + 9}, $${o + 10}, $${o + 11})`,
);
params.push(
hash,
sessionUuid,
i,
stripNul(chunks[i].header) || null,
stripNul(chunks[i].content),
sourceType || null,
toolName || null,
pgvector.default.toSql(embeddings[i]),
'gemini-embedding-2-preview',
'RETRIEVAL_DOCUMENT',
Math.ceil(chunks[i].content.length / 4),
);
}
await client.query(
`INSERT INTO source_chunk_embeddings
(source_hash, session_id, chunk_index, chunk_header, chunk_content,
source_type, tool_name, embedding, model, task_type, token_count)
VALUES ${valuePlaceholders.join(', ')}`,
params,
);
await client.query('COMMIT');
stats.inserted++; stats.chunks += chunks.length;
} catch (txErr) {
await client.query('ROLLBACK').catch(() => {});
stats.insertFail++;
console.warn(`[backfill] INSERT failed ${hash.slice(0, 12)}: ${txErr.message}`);
} finally {
client.release();
}
stats.processed++;
}

// bounded concurrency
let idx = 0;
async function worker() {
while (idx < sources.length) {
const myIdx = idx++;
await processSource(sources[myIdx]);
if (stats.processed % 25 === 0 && stats.processed > 0) {
console.log(`[backfill] progress: ${stats.processed}/${sources.length} sources, ${stats.chunks} chunks`);
}
}
}
await Promise.all(Array.from({ length: concurrency }, worker));

console.log('\n[backfill] DONE');
console.log(JSON.stringify(stats, null, 2));

if (!dryRun) {
const total = await pool.query(
'SELECT COUNT(*)::int n FROM source_chunk_embeddings WHERE session_id = $1',
[sessionUuid],
);
console.log(`[backfill] source_chunk_embeddings rows for session: ${total.rows[0].n}`);
}

await pool.end();
}

main().catch(err => { console.error('[backfill] fatal:', err); process.exit(1); });
68 changes: 68 additions & 0 deletions super-legal-mcp-refactored/scripts/run-phase4b.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* run-phase4b.mjs — standalone Phase 4b (source-evidence) runner for ONE session.
*
* Writes explicit `kg_provenance.source_hash` rows by cosine-matching KG node
* labels against `source_chunk_embeddings`, WITHOUT a full 16-phase KG rebuild
* (a full rebuild would duplicate ~all provenance — `upsertProvenance` has no
* ON CONFLICT). Idempotent: re-running yields the same row count.
*
* Prerequisites: `source_chunk_embeddings` must already be populated for the
* session (live embedding, or scripts/backfill-source-chunk-embeddings.mjs for
* historical sessions). Phase 4b has NO Phase 4c dependency — it embeds labels
* fresh, so it needs only the embedding service + GEMINI_API_KEY/GOOGLE_API_KEY.
*
* Usage:
* node scripts/run-phase4b.mjs <session_key>
* (e.g., node scripts/run-phase4b.mjs 2026-05-22-1779484021)
*/

import 'dotenv/config';
import path from 'path';
import { fileURLToPath } from 'url';
import dotenv from 'dotenv';

const __dirname = path.dirname(fileURLToPath(import.meta.url));
// Ensure flags.env is loaded (GEMINI/GOOGLE key may live in .env; flags here for parity).
dotenv.config({ path: path.join(__dirname, '../flags.env') });

import { initEmbeddingService } from '../src/utils/embeddingService.js';
import { repairSourceProvenance } from '../src/utils/knowledgeGraph/repairSourceProvenance.js';

async function main() {
const sessionKey = process.argv.slice(2).find(a => !a.startsWith('--'));
if (!sessionKey) {
console.error('Usage: node scripts/run-phase4b.mjs <session_key>');
process.exit(1);
}

// Phase 4b embeds node labels live → service must be initialized first.
await initEmbeddingService();

const { Pool } = await import('pg');
const pool = new Pool({ connectionString: process.env.PG_CONNECTION_STRING });

try {
const sess = await pool.query('SELECT id FROM sessions WHERE session_key = $1 LIMIT 1', [sessionKey]);
const sessionUuid = sess.rows[0]?.id;
if (!sessionUuid) {
console.error(`[run-phase4b] no sessions row for session_key=${sessionKey}`);
process.exit(1);
}

const emb = await pool.query(
'SELECT COUNT(*)::int AS n FROM source_chunk_embeddings WHERE session_id = $1',
[sessionUuid],
);
console.log(`[run-phase4b] session_key=${sessionKey} uuid=${sessionUuid} source_chunk_embeddings=${emb.rows[0].n}`);
if (emb.rows[0].n === 0) {
console.warn('[run-phase4b] source_chunk_embeddings is empty — Phase 4b will produce nothing. Backfill first.');
}

const result = await repairSourceProvenance(pool, sessionUuid);
console.log(`[run-phase4b] DONE — deleted ${result.deleted} prior rows, now ${result.provenanceRows} source_chunk provenance rows`);
} finally {
await pool.end();
}
}

main().catch(err => { console.error('[run-phase4b] fatal:', err); process.exit(1); });
Loading