Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions src/server/infra/process/query-log-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import type {QueryLogEntry} from '../../core/domain/entities/query-log-entry.js'
import type {TaskInfo} from '../../core/domain/transport/task-info.js'
import type {QueryExecutorResult} from '../../core/interfaces/executor/i-query-executor.js'
import type {ITaskLifecycleHook} from '../../core/interfaces/process/i-task-lifecycle-hook.js'
import type {IQueryLogStore} from '../../core/interfaces/storage/i-query-log-store.js'

import {getProjectDataDir} from '../../utils/path-utils.js'
import {transportLog} from '../../utils/process-logger.js'
import {FileQueryLogStore} from '../storage/file-query-log-store.js'

// ── Internal state ────────────────────────────────────────────────────────────

type TaskState = {
/** Cached initial entry — used in onTaskCompleted/onTaskError to avoid a getById round-trip. */
entry: QueryLogEntry
projectPath: string
/** Metadata from QueryExecutor, set by setQueryResult(). Undefined until called. */
queryResult?: QueryExecutorResult
}

const QUERY_TASK_TYPES: ReadonlySet<string> = new Set(['query'])

// ── QueryLogHandler ──────────────────────────────────────────────────────────

/**
* Lifecycle hook that transparently logs query task execution.
*
* Wired into TaskRouter via lifecycleHooks[]. Writes log entries to
* per-project FileQueryLogStore. All I/O errors are swallowed — logging
* must never block or affect query task execution.
*
* Key difference from CurateLogHandler: no onToolResult accumulation.
* Query metadata (tier, timing, matchedDocs, searchMetadata) arrives via
* setQueryResult() called after QueryExecutor.executeWithAgent() returns.
*/
export class QueryLogHandler implements ITaskLifecycleHook {
/** Active task count per projectPath — used to evict idle stores. */
private readonly activeTaskCount = new Map<string, number>()
/** Per-project store cache (one store per projectPath). Evicted when no active tasks remain. */
private readonly stores = new Map<string, IQueryLogStore>()
/** In-memory state per active task. Cleared on cleanup(). */
private readonly tasks = new Map<string, TaskState>()

constructor(private readonly createStore?: (projectPath: string) => IQueryLogStore) {}

cleanup(taskId: string): void {
const state = this.tasks.get(taskId)
this.tasks.delete(taskId)

if (state) {
const remaining = (this.activeTaskCount.get(state.projectPath) ?? 1) - 1
if (remaining <= 0) {
this.activeTaskCount.delete(state.projectPath)
this.stores.delete(state.projectPath)
} else {
this.activeTaskCount.set(state.projectPath, remaining)
}
}
}

async onTaskCancelled(taskId: string, _task: TaskInfo): Promise<void> {
const state = this.tasks.get(taskId)
if (!state) return

const store = this.getOrCreateStore(state.projectPath)

const updated: QueryLogEntry = {
...state.entry,
completedAt: Date.now(),
matchedDocs: state.queryResult?.matchedDocs ?? state.entry.matchedDocs,
searchMetadata: state.queryResult?.searchMetadata,
status: 'cancelled',
tier: state.queryResult?.tier,
timing: state.queryResult?.timing,
}

await store.save(updated).catch((error: unknown) => {
transportLog(
`QueryLogHandler: failed to save cancelled entry for ${taskId}: ${error instanceof Error ? error.message : String(error)}`,
)
})
}

async onTaskCompleted(taskId: string, result: string, _task: TaskInfo): Promise<void> {
const state = this.tasks.get(taskId)
if (!state) return

const store = this.getOrCreateStore(state.projectPath)

const updated: QueryLogEntry = {
...state.entry,
completedAt: Date.now(),
matchedDocs: state.queryResult?.matchedDocs ?? state.entry.matchedDocs,
response: result.length > 0 ? result : undefined,
searchMetadata: state.queryResult?.searchMetadata,
status: 'completed',
tier: state.queryResult?.tier,
timing: state.queryResult?.timing,
}

await store.save(updated).catch((error: unknown) => {
transportLog(
`QueryLogHandler: failed to save completed entry for ${taskId}: ${error instanceof Error ? error.message : String(error)}`,
)
})
}

async onTaskCreate(task: TaskInfo): Promise<void | {logId?: string}> {
if (!QUERY_TASK_TYPES.has(task.type)) return
if (!task.projectPath) return

const store = this.getOrCreateStore(task.projectPath)
const logId = await store.getNextId().catch((error: unknown) => {
transportLog(
`QueryLogHandler: getNextId failed for ${task.taskId}: ${error instanceof Error ? error.message : String(error)}`,
)
})
if (!logId) return

const entry: QueryLogEntry = {
id: logId,
matchedDocs: [],
query: task.content,
startedAt: task.createdAt,
status: 'processing',
taskId: task.taskId,
}

// MEMORY-FIRST: Set in-memory state BEFORE disk write so setQueryResult can access it immediately.
// Caching `entry` here lets onTaskCompleted/onTaskError rebuild the final entry
// without a getById round-trip — so completion is never lost even if this initial
// save fails.
this.tasks.set(task.taskId, {entry, projectPath: task.projectPath})
this.activeTaskCount.set(task.projectPath, (this.activeTaskCount.get(task.projectPath) ?? 0) + 1)

// Fire-and-forget disk I/O — logId is already known and returned.
store.save(entry).catch((error: unknown) => {
transportLog(
`QueryLogHandler: failed to save processing entry for ${task.taskId}: ${error instanceof Error ? error.message : String(error)}`,
)
})

return {logId}
}

async onTaskError(taskId: string, errorMessage: string, _task: TaskInfo): Promise<void> {
const state = this.tasks.get(taskId)
if (!state) return

const store = this.getOrCreateStore(state.projectPath)

const updated: QueryLogEntry = {
...state.entry,
completedAt: Date.now(),
error: errorMessage,
matchedDocs: state.queryResult?.matchedDocs ?? state.entry.matchedDocs,
searchMetadata: state.queryResult?.searchMetadata,
status: 'error',
tier: state.queryResult?.tier,
timing: state.queryResult?.timing,
}

await store.save(updated).catch((error: unknown) => {
transportLog(
`QueryLogHandler: failed to save error entry for ${taskId}: ${error instanceof Error ? error.message : String(error)}`,
)
})
}

/**
* Store query execution metadata for later finalization.
* Called by agent-process after QueryExecutor.executeWithAgent() returns.
* Synchronous — no I/O. Metadata is merged into the final entry on completion.
*/
setQueryResult(taskId: string, result: QueryExecutorResult): void {
const state = this.tasks.get(taskId)
if (!state) return
state.queryResult = result
}

private getOrCreateStore(projectPath: string): IQueryLogStore {
const existing = this.stores.get(projectPath)
if (existing) return existing

const store = this.createStore
? this.createStore(projectPath)
: new FileQueryLogStore({baseDir: getProjectDataDir(projectPath)})

this.stores.set(projectPath, store)
return store
}
}
Loading
Loading