diff --git a/src/server/infra/process/query-log-handler.ts b/src/server/infra/process/query-log-handler.ts new file mode 100644 index 000000000..8b1c67dee --- /dev/null +++ b/src/server/infra/process/query-log-handler.ts @@ -0,0 +1,192 @@ +import type {QueryLogEntry} from '../../core/domain/entities/query-log-entry.js' +import type {TaskInfo} from '../../core/domain/transport/task-info.js' +import type {QueryExecutorResult} from '../../core/interfaces/executor/i-query-executor.js' +import type {ITaskLifecycleHook} from '../../core/interfaces/process/i-task-lifecycle-hook.js' +import type {IQueryLogStore} from '../../core/interfaces/storage/i-query-log-store.js' + +import {getProjectDataDir} from '../../utils/path-utils.js' +import {transportLog} from '../../utils/process-logger.js' +import {FileQueryLogStore} from '../storage/file-query-log-store.js' + +// ── Internal state ──────────────────────────────────────────────────────────── + +type TaskState = { + /** Cached initial entry — used in onTaskCompleted/onTaskError to avoid a getById round-trip. */ + entry: QueryLogEntry + projectPath: string + /** Metadata from QueryExecutor, set by setQueryResult(). Undefined until called. */ + queryResult?: QueryExecutorResult +} + +const QUERY_TASK_TYPES: ReadonlySet = new Set(['query']) + +// ── QueryLogHandler ────────────────────────────────────────────────────────── + +/** + * Lifecycle hook that transparently logs query task execution. + * + * Wired into TaskRouter via lifecycleHooks[]. Writes log entries to + * per-project FileQueryLogStore. All I/O errors are swallowed — logging + * must never block or affect query task execution. + * + * Key difference from CurateLogHandler: no onToolResult accumulation. + * Query metadata (tier, timing, matchedDocs, searchMetadata) arrives via + * setQueryResult() called after QueryExecutor.executeWithAgent() returns. + */ +export class QueryLogHandler implements ITaskLifecycleHook { + /** Active task count per projectPath — used to evict idle stores. */ + private readonly activeTaskCount = new Map() + /** Per-project store cache (one store per projectPath). Evicted when no active tasks remain. */ + private readonly stores = new Map() + /** In-memory state per active task. Cleared on cleanup(). */ + private readonly tasks = new Map() + + constructor(private readonly createStore?: (projectPath: string) => IQueryLogStore) {} + + cleanup(taskId: string): void { + const state = this.tasks.get(taskId) + this.tasks.delete(taskId) + + if (state) { + const remaining = (this.activeTaskCount.get(state.projectPath) ?? 1) - 1 + if (remaining <= 0) { + this.activeTaskCount.delete(state.projectPath) + this.stores.delete(state.projectPath) + } else { + this.activeTaskCount.set(state.projectPath, remaining) + } + } + } + + async onTaskCancelled(taskId: string, _task: TaskInfo): Promise { + const state = this.tasks.get(taskId) + if (!state) return + + const store = this.getOrCreateStore(state.projectPath) + + const updated: QueryLogEntry = { + ...state.entry, + completedAt: Date.now(), + matchedDocs: state.queryResult?.matchedDocs ?? state.entry.matchedDocs, + searchMetadata: state.queryResult?.searchMetadata, + status: 'cancelled', + tier: state.queryResult?.tier, + timing: state.queryResult?.timing, + } + + await store.save(updated).catch((error: unknown) => { + transportLog( + `QueryLogHandler: failed to save cancelled entry for ${taskId}: ${error instanceof Error ? error.message : String(error)}`, + ) + }) + } + + async onTaskCompleted(taskId: string, result: string, _task: TaskInfo): Promise { + const state = this.tasks.get(taskId) + if (!state) return + + const store = this.getOrCreateStore(state.projectPath) + + const updated: QueryLogEntry = { + ...state.entry, + completedAt: Date.now(), + matchedDocs: state.queryResult?.matchedDocs ?? state.entry.matchedDocs, + response: result.length > 0 ? result : undefined, + searchMetadata: state.queryResult?.searchMetadata, + status: 'completed', + tier: state.queryResult?.tier, + timing: state.queryResult?.timing, + } + + await store.save(updated).catch((error: unknown) => { + transportLog( + `QueryLogHandler: failed to save completed entry for ${taskId}: ${error instanceof Error ? error.message : String(error)}`, + ) + }) + } + + async onTaskCreate(task: TaskInfo): Promise { + if (!QUERY_TASK_TYPES.has(task.type)) return + if (!task.projectPath) return + + const store = this.getOrCreateStore(task.projectPath) + const logId = await store.getNextId().catch((error: unknown) => { + transportLog( + `QueryLogHandler: getNextId failed for ${task.taskId}: ${error instanceof Error ? error.message : String(error)}`, + ) + }) + if (!logId) return + + const entry: QueryLogEntry = { + id: logId, + matchedDocs: [], + query: task.content, + startedAt: task.createdAt, + status: 'processing', + taskId: task.taskId, + } + + // MEMORY-FIRST: Set in-memory state BEFORE disk write so setQueryResult can access it immediately. + // Caching `entry` here lets onTaskCompleted/onTaskError rebuild the final entry + // without a getById round-trip — so completion is never lost even if this initial + // save fails. + this.tasks.set(task.taskId, {entry, projectPath: task.projectPath}) + this.activeTaskCount.set(task.projectPath, (this.activeTaskCount.get(task.projectPath) ?? 0) + 1) + + // Fire-and-forget disk I/O — logId is already known and returned. + store.save(entry).catch((error: unknown) => { + transportLog( + `QueryLogHandler: failed to save processing entry for ${task.taskId}: ${error instanceof Error ? error.message : String(error)}`, + ) + }) + + return {logId} + } + + async onTaskError(taskId: string, errorMessage: string, _task: TaskInfo): Promise { + const state = this.tasks.get(taskId) + if (!state) return + + const store = this.getOrCreateStore(state.projectPath) + + const updated: QueryLogEntry = { + ...state.entry, + completedAt: Date.now(), + error: errorMessage, + matchedDocs: state.queryResult?.matchedDocs ?? state.entry.matchedDocs, + searchMetadata: state.queryResult?.searchMetadata, + status: 'error', + tier: state.queryResult?.tier, + timing: state.queryResult?.timing, + } + + await store.save(updated).catch((error: unknown) => { + transportLog( + `QueryLogHandler: failed to save error entry for ${taskId}: ${error instanceof Error ? error.message : String(error)}`, + ) + }) + } + + /** + * Store query execution metadata for later finalization. + * Called by agent-process after QueryExecutor.executeWithAgent() returns. + * Synchronous — no I/O. Metadata is merged into the final entry on completion. + */ + setQueryResult(taskId: string, result: QueryExecutorResult): void { + const state = this.tasks.get(taskId) + if (!state) return + state.queryResult = result + } + + private getOrCreateStore(projectPath: string): IQueryLogStore { + const existing = this.stores.get(projectPath) + if (existing) return existing + + const store = this.createStore + ? this.createStore(projectPath) + : new FileQueryLogStore({baseDir: getProjectDataDir(projectPath)}) + + this.stores.set(projectPath, store) + return store + } +} diff --git a/test/unit/infra/process/query-log-handler.test.ts b/test/unit/infra/process/query-log-handler.test.ts new file mode 100644 index 000000000..adadc3da6 --- /dev/null +++ b/test/unit/infra/process/query-log-handler.test.ts @@ -0,0 +1,311 @@ +import {expect} from 'chai' +import {createSandbox, type SinonSandbox, type SinonStub} from 'sinon' + +import type {TaskInfo} from '../../../../src/server/core/domain/transport/task-info.js' +import type {QueryExecutorResult} from '../../../../src/server/core/interfaces/executor/i-query-executor.js' +import type {IQueryLogStore} from '../../../../src/server/core/interfaces/storage/i-query-log-store.js' + +import {TIER_DIRECT_SEARCH} from '../../../../src/server/core/domain/entities/query-log-entry.js' +import {QueryLogHandler} from '../../../../src/server/infra/process/query-log-handler.js' + +// ============================================================================ +// Helpers +// ============================================================================ + +function makeTask(overrides: Partial = {}): TaskInfo { + return { + clientId: 'client-1', + content: 'what is caching?', + createdAt: Date.now(), + projectPath: '/app', + taskId: 'task-abc', + type: 'query', + ...overrides, + } +} + +function makeStore(sandbox: SinonSandbox): IQueryLogStore & { + getById: SinonStub + getNextId: SinonStub + list: SinonStub + save: SinonStub +} { + return { + getById: sandbox.stub().resolves(null), + getNextId: sandbox.stub().resolves('qry-1000'), + list: sandbox.stub().resolves([]), + save: sandbox.stub().resolves(), + } +} + +function makeQueryResult(overrides: Partial = {}): QueryExecutorResult { + return { + matchedDocs: [{path: 'design/caching.md', score: 0.95, title: 'Caching Strategy'}], + response: 'Caching uses Redis for hot data...', + searchMetadata: {resultCount: 3, topScore: 0.95, totalFound: 10}, + tier: TIER_DIRECT_SEARCH, + timing: {durationMs: 450}, + ...overrides, + } +} + +// ============================================================================ +// Tests +// ============================================================================ + +describe('QueryLogHandler', () => { + let sandbox: SinonSandbox + let store: ReturnType + let handler: QueryLogHandler + + beforeEach(() => { + sandbox = createSandbox() + store = makeStore(sandbox) + handler = new QueryLogHandler(() => store) + }) + + afterEach(() => { + sandbox.restore() + }) + + // ── onTaskCreate ───────────────────────────────────────────────────────── + + describe('onTaskCreate', () => { + it('should create processing entry for query task and return logId', async () => { + const task = makeTask() + const result = await handler.onTaskCreate(task) + + expect(result).to.deep.equal({logId: 'qry-1000'}) + expect(store.save.calledOnce).to.be.true + + const savedEntry = store.save.firstCall.args[0] + expect(savedEntry.id).to.equal('qry-1000') + expect(savedEntry.status).to.equal('processing') + expect(savedEntry.query).to.equal('what is caching?') + expect(savedEntry.taskId).to.equal('task-abc') + expect(savedEntry.matchedDocs).to.deep.equal([]) + expect(savedEntry.startedAt).to.equal(task.createdAt) + }) + + it('should ignore non-query task types', async () => { + const curateResult = await handler.onTaskCreate(makeTask({type: 'curate'})) + const folderResult = await handler.onTaskCreate(makeTask({type: 'curate-folder'})) + + expect(curateResult).to.be.undefined + expect(folderResult).to.be.undefined + expect(store.save.called).to.be.false + }) + + it('should ignore tasks without projectPath', async () => { + const result = await handler.onTaskCreate(makeTask({projectPath: undefined})) + + expect(result).to.be.undefined + expect(store.save.called).to.be.false + }) + + it('should return undefined if getNextId throws', async () => { + store.getNextId.rejects(new Error('disk full')) + + const result = await handler.onTaskCreate(makeTask()) + + expect(result).to.be.undefined + }) + + it('should return logId even if save fails (memory-first)', async () => { + store.save.rejects(new Error('write error')) + + const task = makeTask() + const result = await handler.onTaskCreate(task) + + expect(result).to.deep.equal({logId: 'qry-1000'}) + + // Prove memory-first: onTaskCompleted still works despite initial save failure + store.save.resolves() // Reset save to succeed for completion + await handler.onTaskCompleted('task-abc', 'response text', task) + + // Last save call (completion) should have status: 'completed' + expect(store.save.lastCall.args[0].status).to.equal('completed') + }) + }) + + // ── setQueryResult + onTaskCompleted ─────────────────────────────────── + + describe('setQueryResult + onTaskCompleted', () => { + it('should save completed entry with all metadata from setQueryResult', async () => { + const task = makeTask() + await handler.onTaskCreate(task) + handler.setQueryResult('task-abc', makeQueryResult()) + + await handler.onTaskCompleted('task-abc', 'Caching uses Redis...', task) + + const savedEntry = store.save.lastCall.args[0] + // Status + completion fields + expect(savedEntry.status).to.equal('completed') + expect(savedEntry.completedAt).to.be.a('number') + expect(savedEntry.response).to.equal('Caching uses Redis...') + // Base fields preserved from processing entry + expect(savedEntry.id).to.equal('qry-1000') + expect(savedEntry.query).to.equal('what is caching?') + expect(savedEntry.taskId).to.equal('task-abc') + expect(savedEntry.startedAt).to.equal(task.createdAt) + // Metadata merged from setQueryResult + expect(savedEntry.tier).to.equal(TIER_DIRECT_SEARCH) + expect(savedEntry.timing).to.deep.equal({durationMs: 450}) + expect(savedEntry.matchedDocs).to.deep.equal([ + {path: 'design/caching.md', score: 0.95, title: 'Caching Strategy'}, + ]) + expect(savedEntry.searchMetadata).to.deep.equal({resultCount: 3, topScore: 0.95, totalFound: 10}) + }) + + it('should gracefully degrade if setQueryResult was never called', async () => { + const task = makeTask() + await handler.onTaskCreate(task) + + // Skip setQueryResult — simulate case where metadata never arrived + await handler.onTaskCompleted('task-abc', 'fallback response', task) + + const savedEntry = store.save.lastCall.args[0] + expect(savedEntry.status).to.equal('completed') + expect(savedEntry.response).to.equal('fallback response') + // All metadata fields degrade to undefined/empty + expect(savedEntry.tier).to.be.undefined + expect(savedEntry.timing).to.be.undefined + expect(savedEntry.searchMetadata).to.be.undefined + expect(savedEntry.matchedDocs).to.deep.equal([]) + // Base fields still preserved + expect(savedEntry.id).to.equal('qry-1000') + expect(savedEntry.query).to.equal('what is caching?') + expect(savedEntry.taskId).to.equal('task-abc') + }) + }) + + // ── onTaskError ────────────────────────────────────────────────────────── + + describe('onTaskError', () => { + it('should save error entry with metadata when setQueryResult was called', async () => { + const task = makeTask() + await handler.onTaskCreate(task) + handler.setQueryResult('task-abc', makeQueryResult()) + + await handler.onTaskError('task-abc', 'LLM failed', task) + + const savedEntry = store.save.lastCall.args[0] + expect(savedEntry.status).to.equal('error') + expect(savedEntry.error).to.equal('LLM failed') + expect(savedEntry.completedAt).to.be.a('number') + // Base fields preserved + expect(savedEntry.id).to.equal('qry-1000') + expect(savedEntry.query).to.equal('what is caching?') + expect(savedEntry.taskId).to.equal('task-abc') + // Metadata preserved from setQueryResult + expect(savedEntry.tier).to.equal(TIER_DIRECT_SEARCH) + expect(savedEntry.timing).to.deep.equal({durationMs: 450}) + expect(savedEntry.matchedDocs).to.deep.equal([ + {path: 'design/caching.md', score: 0.95, title: 'Caching Strategy'}, + ]) + expect(savedEntry.searchMetadata).to.deep.equal({resultCount: 3, topScore: 0.95, totalFound: 10}) + }) + + it('should gracefully degrade if setQueryResult was never called', async () => { + const task = makeTask() + await handler.onTaskCreate(task) + + // No setQueryResult — error before executor returned + await handler.onTaskError('task-abc', 'Agent crashed', task) + + const savedEntry = store.save.lastCall.args[0] + expect(savedEntry.status).to.equal('error') + expect(savedEntry.error).to.equal('Agent crashed') + expect(savedEntry.completedAt).to.be.a('number') + // Metadata degrades to undefined/empty + expect(savedEntry.tier).to.be.undefined + expect(savedEntry.timing).to.be.undefined + expect(savedEntry.searchMetadata).to.be.undefined + expect(savedEntry.matchedDocs).to.deep.equal([]) + }) + }) + + // ── onTaskCancelled ────────────────────────────────────────────────────── + + describe('onTaskCancelled', () => { + it('should save cancelled entry with base fields and degraded metadata', async () => { + const task = makeTask() + await handler.onTaskCreate(task) + + await handler.onTaskCancelled('task-abc', task) + + const savedEntry = store.save.lastCall.args[0] + expect(savedEntry.status).to.equal('cancelled') + expect(savedEntry.completedAt).to.be.a('number') + // Base fields preserved + expect(savedEntry.id).to.equal('qry-1000') + expect(savedEntry.query).to.equal('what is caching?') + expect(savedEntry.taskId).to.equal('task-abc') + expect(savedEntry.startedAt).to.equal(task.createdAt) + // Metadata degrades — setQueryResult was never called + expect(savedEntry.tier).to.be.undefined + expect(savedEntry.timing).to.be.undefined + expect(savedEntry.searchMetadata).to.be.undefined + expect(savedEntry.matchedDocs).to.deep.equal([]) + }) + }) + + // ── cleanup ────────────────────────────────────────────────────────────── + + describe('cleanup', () => { + it('should remove in-memory task state', async () => { + const task = makeTask() + await handler.onTaskCreate(task) + + handler.cleanup('task-abc') + + // After cleanup, onTaskCompleted should be a no-op (state is gone) + await handler.onTaskCompleted('task-abc', 'response', task) + + // Only the initial processing save should exist, no completion save + expect(store.save.callCount).to.equal(1) + expect(store.save.firstCall.args[0].status).to.equal('processing') + }) + + it('should evict store when last task for project is cleaned up', async () => { + let factoryCallCount = 0 + const trackingHandler = new QueryLogHandler(() => { + factoryCallCount++ + return makeStore(sandbox) + }) + + // Two tasks for same project + await trackingHandler.onTaskCreate(makeTask({taskId: 'task-1'})) + await trackingHandler.onTaskCreate(makeTask({taskId: 'task-2'})) + expect(factoryCallCount).to.equal(1) // Shared store + + // Cleanup first — store still alive for task-2 + trackingHandler.cleanup('task-1') + + // Cleanup second — store should be evicted + trackingHandler.cleanup('task-2') + + // New task for same project — factory called again (fresh store) + await trackingHandler.onTaskCreate(makeTask({taskId: 'task-3'})) + expect(factoryCallCount).to.equal(2) + }) + }) + + // ── store sharing ──────────────────────────────────────────────────────── + + describe('store sharing', () => { + it('should share one store for concurrent tasks on same project', async () => { + let factoryCallCount = 0 + const trackingHandler = new QueryLogHandler(() => { + factoryCallCount++ + return makeStore(sandbox) + }) + + await trackingHandler.onTaskCreate(makeTask({taskId: 'task-1'})) + await trackingHandler.onTaskCreate(makeTask({taskId: 'task-2'})) + await trackingHandler.onTaskCreate(makeTask({taskId: 'task-3'})) + + expect(factoryCallCount).to.equal(1) + }) + }) +})