Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 285 additions & 0 deletions crates/codegraph-core/src/native_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,60 @@ pub struct DataflowEdge {
pub confidence: f64,
}

// ── Build-glue return types ────────────────────────────────────────────

/// A single row from file_hashes.
#[napi(object)]
#[derive(Debug, Clone)]
pub struct FileHashRow {
pub file: String,
pub hash: String,
pub mtime: i64,
pub size: i64,
}

/// Batched result of file_hashes table read.
#[napi(object)]
#[derive(Debug, Clone)]
pub struct FileHashData {
pub exists: bool,
pub rows: Vec<FileHashRow>,
pub max_mtime: i64,
}

/// Counts for pending analysis tables.
#[napi(object)]
#[derive(Debug, Clone)]
pub struct PendingAnalysisCounts {
pub cfg_count: i64,
pub dataflow_count: i64,
}

/// Batched node/edge counts for finalize.
#[napi(object)]
#[derive(Debug, Clone)]
pub struct FinalizeCounts {
pub node_count: i64,
pub edge_count: i64,
}

/// Batched advisory check results.
#[napi(object)]
#[derive(Debug, Clone)]
pub struct AdvisoryCheckResult {
pub orphaned_embeddings: i64,
pub embed_built_at: Option<String>,
pub unused_exports: i64,
}

/// Batched collect-files data.
#[napi(object)]
#[derive(Debug, Clone)]
pub struct CollectFilesData {
pub count: i64,
pub files: Vec<String>,
}

// ── NativeDatabase class ────────────────────────────────────────────────

/// Persistent rusqlite Connection wrapper exposed to JS via napi-rs.
Expand Down Expand Up @@ -974,6 +1028,237 @@ impl NativeDatabase {
Ok(roles_db::do_classify_incremental(conn, &changed_files).ok())
}

// ── Phase 6.18: Batched build-glue queries ──────────────────────────

/// Batched read of file_hashes table for detect-changes stage.
/// Returns table existence, all rows, and max mtime in a single napi call.
#[napi]
pub fn get_file_hash_data(&self) -> napi::Result<FileHashData> {
let conn = self.conn()?;
if !has_table(conn, "file_hashes") {
return Ok(FileHashData {
exists: false,
rows: vec![],
max_mtime: 0,
});
}
let mut stmt = conn
.prepare_cached("SELECT file, hash, mtime, size FROM file_hashes")
.map_err(|e| napi::Error::from_reason(format!("getFileHashData prepare failed: {e}")))?;
let mut rows = Vec::new();
let mut max_mtime: i64 = 0;
let mapped = stmt
.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
))
})
.map_err(|e| napi::Error::from_reason(format!("getFileHashData query failed: {e}")))?;
for r in mapped {
let (file, hash, mtime, size) =
r.map_err(|e| napi::Error::from_reason(format!("getFileHashData row: {e}")))?;
if mtime > max_mtime {
max_mtime = mtime;
}
rows.push(FileHashRow {
file,
hash,
mtime,
size,
});
}
Ok(FileHashData {
exists: true,
rows,
max_mtime,
})
}

/// Check pending analysis tables: returns counts for cfg_blocks and dataflow.
/// Tables that don't exist return -1 (distinguishes "missing" from "empty").
#[napi]
pub fn check_pending_analysis(&self) -> napi::Result<PendingAnalysisCounts> {
let conn = self.conn()?;
let cfg_count = if has_table(conn, "cfg_blocks") {
conn.query_row("SELECT COUNT(*) FROM cfg_blocks", [], |r| r.get::<_, i64>(0))
.unwrap_or(-1)
} else {
-1
};
let dataflow_count = if has_table(conn, "dataflow") {
conn.query_row("SELECT COUNT(*) FROM dataflow", [], |r| r.get::<_, i64>(0))
.unwrap_or(-1)
} else {
-1
};
Ok(PendingAnalysisCounts {
cfg_count,
dataflow_count,
})
}

/// Batch upsert file_hashes for metadata healing (mtime/size only updates).
#[napi]
pub fn heal_file_metadata(&self, entries: Vec<FileHashEntry>) -> napi::Result<u32> {
if entries.is_empty() {
return Ok(0);
}
let conn = self.conn()?;
let tx = conn
.unchecked_transaction()
.map_err(|e| napi::Error::from_reason(format!("heal tx failed: {e}")))?;
let mut count = 0u32;
{
let mut stmt = tx
.prepare_cached(
"INSERT OR REPLACE INTO file_hashes (file, hash, mtime, size) VALUES (?1, ?2, ?3, ?4)",
)
.map_err(|e| napi::Error::from_reason(format!("heal prepare failed: {e}")))?;
for entry in &entries {
stmt.execute(params![entry.file, entry.hash, entry.mtime, entry.size])
.map_err(|e| napi::Error::from_reason(format!("heal row failed: {e}")))?;
count += 1;
}
}
tx.commit()
.map_err(|e| napi::Error::from_reason(format!("heal commit failed: {e}")))?;
Ok(count)
}

/// Find files that have edges pointing to any of the changed files.
/// Returns deduplicated list of reverse-dependency file paths.
#[napi]
pub fn find_reverse_dependencies(&self, changed_files: Vec<String>) -> napi::Result<Vec<String>> {
if changed_files.is_empty() {
return Ok(vec![]);
}
let conn = self.conn()?;
let changed_set: std::collections::HashSet<&str> =
changed_files.iter().map(|s| s.as_str()).collect();
let mut result_set: std::collections::HashSet<String> = std::collections::HashSet::new();

let mut stmt = conn
.prepare_cached(
"SELECT DISTINCT n_src.file FROM edges e \
JOIN nodes n_src ON e.source_id = n_src.id \
JOIN nodes n_tgt ON e.target_id = n_tgt.id \
WHERE n_tgt.file = ?1 AND n_src.file != n_tgt.file AND n_src.kind != 'directory'",
)
.map_err(|e| napi::Error::from_reason(format!("reverseDeps prepare failed: {e}")))?;

for file in &changed_files {
let rows = stmt
.query_map(params![file], |row| row.get::<_, String>(0))
.map_err(|e| {
napi::Error::from_reason(format!("reverseDeps query failed: {e}"))
})?;
for row in rows {
if let Ok(dep_file) = row {
if !changed_set.contains(dep_file.as_str()) {
result_set.insert(dep_file);
}
}
}
}
let mut result_vec: Vec<String> = result_set.into_iter().collect();
result_vec.sort_unstable();
Ok(result_vec)
}

/// Get node and edge counts in a single napi call.
#[napi]
pub fn get_finalize_counts(&self) -> napi::Result<FinalizeCounts> {
let conn = self.conn()?;
let node_count = conn
.query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get::<_, i64>(0))
.unwrap_or(0);
let edge_count = conn
.query_row("SELECT COUNT(*) FROM edges", [], |r| r.get::<_, i64>(0))
.unwrap_or(0);
Ok(FinalizeCounts {
node_count,
edge_count,
})
}

/// Run all advisory checks in a single napi call (orphaned embeddings,
/// stale embeddings, unused exports). Only called on full builds.
#[napi]
pub fn run_advisory_checks(&self, has_embeddings: bool) -> napi::Result<AdvisoryCheckResult> {
let conn = self.conn()?;
let mut result = AdvisoryCheckResult {
orphaned_embeddings: 0,
embed_built_at: None,
unused_exports: 0,
};

if has_embeddings {
// Orphaned embeddings
result.orphaned_embeddings = conn
.query_row(
"SELECT COUNT(*) FROM embeddings WHERE node_id NOT IN (SELECT id FROM nodes)",
[],
|r| r.get::<_, i64>(0),
)
.unwrap_or(0);

// Stale embeddings
result.embed_built_at = conn
.query_row(
"SELECT value FROM embedding_meta WHERE key = 'built_at'",
[],
|r| r.get::<_, String>(0),
)
.ok();
}

// Unused exports
result.unused_exports = conn
.query_row(
"SELECT COUNT(*) FROM nodes \
WHERE exported = 1 AND kind != 'file' \
AND id NOT IN ( \
SELECT DISTINCT e.target_id FROM edges e \
JOIN nodes caller ON e.source_id = caller.id \
JOIN nodes target ON e.target_id = target.id \
WHERE e.kind = 'calls' AND caller.file != target.file \
)",
[],
|r| r.get::<_, i64>(0),
)
.unwrap_or(0);

Ok(result)
}

/// Get file_hashes count and all file paths in a single napi call.
/// Used by the fast-collect path in collect-files stage.
#[napi]
pub fn get_collect_files_data(&self) -> napi::Result<CollectFilesData> {
let conn = self.conn()?;
if !has_table(conn, "file_hashes") {
return Ok(CollectFilesData {
count: 0,
files: vec![],
});
}
let mut stmt = conn
.prepare_cached("SELECT file FROM file_hashes")
.map_err(|e| napi::Error::from_reason(format!("collectFiles prepare failed: {e}")))?;
let rows = stmt
.query_map([], |row| row.get::<_, String>(0))
.map_err(|e| napi::Error::from_reason(format!("collectFiles query failed: {e}")))?;
let files: Vec<String> = rows.filter_map(|r| r.ok()).collect();
let count = files.len() as i64;
Ok(CollectFilesData {
count,
files,
})
}

/// Cascade-delete all graph data for the specified files across all tables.
/// Order: dependent tables first (embeddings, cfg, dataflow, complexity,
/// metrics, ast_nodes), then edges, then nodes, then optionally file_hashes.
Expand Down
25 changes: 18 additions & 7 deletions src/domain/graph/builder/stages/collect-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,22 @@ function tryFastCollect(
ctx: PipelineContext,
): { files: string[]; directories: Set<string> } | null {
const { db, rootDir } = ctx;
const useNative = ctx.engineName === 'native' && !!ctx.nativeDb?.getCollectFilesData;

// 1. Check that file_hashes table exists and has entries
let dbFileCount: number;
try {
dbFileCount = (db.prepare('SELECT COUNT(*) as c FROM file_hashes').get() as { c: number }).c;
} catch {
return null;
let dbFiles: string[];
if (useNative) {
const data = ctx.nativeDb!.getCollectFilesData!();
dbFileCount = data.count;
dbFiles = data.files;
} else {
try {
dbFileCount = (db.prepare('SELECT COUNT(*) as c FROM file_hashes').get() as { c: number }).c;
} catch {
return null;
}
dbFiles = []; // deferred — loaded below only if needed
}
if (dbFileCount === 0) return null;

Expand All @@ -42,9 +51,11 @@ function tryFastCollect(
if (!hasEntries) return null;

// 3. Load existing file list from file_hashes (relative paths)
const dbFiles = (db.prepare('SELECT file FROM file_hashes').all() as Array<{ file: string }>).map(
(r) => r.file,
);
if (!useNative) {
dbFiles = (db.prepare('SELECT file FROM file_hashes').all() as Array<{ file: string }>).map(
(r) => r.file,
);
}

// 4. Apply journal deltas: remove deleted files, add new/changed files
const fileSet = new Set(dbFiles);
Expand Down
Loading
Loading