diff --git a/src/daemon/agent-registry.test.ts b/src/daemon/agent-registry.test.ts new file mode 100644 index 000000000..5390dac6e --- /dev/null +++ b/src/daemon/agent-registry.test.ts @@ -0,0 +1,97 @@ +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import { describe, it, expect } from 'vitest'; +import { AgentRegistry } from './agent-registry.js'; + +function makeTempDir(): string { + return fs.mkdtempSync(path.join(os.tmpdir(), 'agent-registry-')); +} + +describe('AgentRegistry', () => { + it('creates and persists agent records', () => { + const dir = makeTempDir(); + const registry = new AgentRegistry(dir); + + const created = registry.registerOrUpdate({ + name: 'alice', + cli: 'claude', + workingDirectory: '/tmp/alice', + }); + + expect(created.id).toBeTruthy(); + expect(created.firstSeen).toBeTruthy(); + expect(created.messagesSent).toBe(0); + expect(created.messagesReceived).toBe(0); + + registry.recordSend('alice'); + registry.recordReceive('alice'); + + const agentsPath = path.join(dir, 'agents.json'); + const fileData = JSON.parse(fs.readFileSync(agentsPath, 'utf-8')); + const fileAgent = fileData.agents.find((a: any) => a.name === 'alice'); + expect(fileAgent.messagesSent).toBe(1); + expect(fileAgent.messagesReceived).toBe(1); + + const registryReloaded = new AgentRegistry(dir); + const [loaded] = registryReloaded.getAgents(); + expect(loaded.name).toBe('alice'); + expect(loaded.messagesSent).toBe(1); + expect(loaded.messagesReceived).toBe(1); + }); + + it('updates metadata on re-register', () => { + const dir = makeTempDir(); + const registry = new AgentRegistry(dir); + + registry.registerOrUpdate({ + name: 'bob', + cli: 'claude', + workingDirectory: '/tmp/one', + }); + const first = registry.getAgents()[0]; + + registry.registerOrUpdate({ + name: 'bob', + cli: 'gemini', + workingDirectory: '/tmp/two', + }); + const [updated] = registry.getAgents(); + + expect(updated.firstSeen).toBe(first.firstSeen); + expect(updated.cli).toBe('gemini'); + expect(updated.workingDirectory).toBe('/tmp/two'); + expect(new Date(updated.lastSeen).getTime()).toBeGreaterThanOrEqual(new Date(first.lastSeen).getTime()); + }); + + it('handles malformed agents.json gracefully', () => { + const dir = makeTempDir(); + const agentsPath = path.join(dir, 'agents.json'); + fs.writeFileSync(agentsPath, '{bad json', 'utf-8'); + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + const registry = new AgentRegistry(dir); + expect(registry.getAgents()).toEqual([]); + expect(errorSpy).toHaveBeenCalled(); + + errorSpy.mockRestore(); + }); + + it('logs write failures without throwing', () => { + const dir = makeTempDir(); + const registry = new AgentRegistry(dir); + registry.registerOrUpdate({ name: 'carol' }); + + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + const writeSpy = vi.spyOn(fs, 'writeFileSync').mockImplementationOnce(() => { + throw new Error('disk full'); + }); + + // Trigger a save + registry.recordSend('carol'); + + expect(errorSpy).toHaveBeenCalled(); + writeSpy.mockRestore(); + errorSpy.mockRestore(); + }); +}); diff --git a/src/daemon/agent-registry.ts b/src/daemon/agent-registry.ts new file mode 100644 index 000000000..295ba02a7 --- /dev/null +++ b/src/daemon/agent-registry.ts @@ -0,0 +1,178 @@ +/** + * Agent Registry + * Persists agent metadata across daemon restarts. + */ + +import fs from 'node:fs'; +import path from 'node:path'; +import { v4 as uuid } from 'uuid'; + +export interface AgentRecord { + id: string; + name: string; + cli?: string; + workingDirectory?: string; + firstSeen: string; + lastSeen: string; + messagesSent: number; + messagesReceived: number; +} + +type AgentInput = { + name: string; + cli?: string; + workingDirectory?: string; +}; + +export class AgentRegistry { + private registryPath: string; + private agents: Map = new Map(); // name -> record + + constructor(teamDir: string) { + this.registryPath = path.join(teamDir, 'agents.json'); + this.ensureDir(teamDir); + this.load(); + } + + /** + * Register or update an agent, refreshing lastSeen and metadata. + */ + registerOrUpdate(agent: AgentInput): AgentRecord { + const now = new Date().toISOString(); + const existing = this.agents.get(agent.name); + + if (existing) { + const updated: AgentRecord = { + ...existing, + cli: agent.cli ?? existing.cli, + workingDirectory: agent.workingDirectory ?? existing.workingDirectory, + lastSeen: now, + }; + this.agents.set(agent.name, updated); + this.save(); + return updated; + } + + const record: AgentRecord = { + id: `agent-${uuid()}`, + name: agent.name, + cli: agent.cli, + workingDirectory: agent.workingDirectory, + firstSeen: now, + lastSeen: now, + messagesSent: 0, + messagesReceived: 0, + }; + + this.agents.set(agent.name, record); + this.save(); + return record; + } + + /** + * Increment sent counter for an agent. + */ + recordSend(agentName: string): void { + const record = this.ensureRecord(agentName); + record.messagesSent += 1; + record.lastSeen = new Date().toISOString(); + this.agents.set(agentName, record); + this.save(); + } + + /** + * Increment received counter for an agent. + */ + recordReceive(agentName: string): void { + const record = this.ensureRecord(agentName); + record.messagesReceived += 1; + record.lastSeen = new Date().toISOString(); + this.agents.set(agentName, record); + this.save(); + } + + /** + * Touch lastSeen for an agent (e.g., on disconnect). + */ + touch(agentName: string): void { + const record = this.ensureRecord(agentName); + record.lastSeen = new Date().toISOString(); + this.agents.set(agentName, record); + this.save(); + } + + /** + * Get a snapshot of all agents. + */ + getAgents(): AgentRecord[] { + return Array.from(this.agents.values()); + } + + private ensureRecord(agentName: string): AgentRecord { + const existing = this.agents.get(agentName); + if (existing) return existing; + + const now = new Date().toISOString(); + const record: AgentRecord = { + id: `agent-${uuid()}`, + name: agentName, + firstSeen: now, + lastSeen: now, + messagesSent: 0, + messagesReceived: 0, + }; + + this.agents.set(agentName, record); + return record; + } + + private ensureDir(dir: string): void { + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } + } + + private load(): void { + if (!fs.existsSync(this.registryPath)) { + return; + } + + try { + const data = JSON.parse(fs.readFileSync(this.registryPath, 'utf-8')); + const rawAgents = Array.isArray(data?.agents) + ? data.agents + : typeof data?.agents === 'object' && data?.agents !== null + ? Object.values(data.agents) + : []; + + for (const raw of rawAgents) { + if (!raw?.name) continue; + const record: AgentRecord = { + id: raw.id ?? `agent-${uuid()}`, + name: raw.name, + cli: raw.cli, + workingDirectory: raw.workingDirectory, + firstSeen: raw.firstSeen ?? new Date().toISOString(), + lastSeen: raw.lastSeen ?? new Date().toISOString(), + messagesSent: typeof raw.messagesSent === 'number' ? raw.messagesSent : 0, + messagesReceived: typeof raw.messagesReceived === 'number' ? raw.messagesReceived : 0, + }; + this.agents.set(record.name, record); + } + } catch (err) { + console.error('[registry] Failed to load agents.json:', err); + } + } + + private save(): void { + try { + fs.writeFileSync( + this.registryPath, + JSON.stringify({ agents: this.getAgents() }, null, 2), + 'utf-8' + ); + } catch (err) { + console.error('[registry] Failed to write agents.json:', err); + } + } +} diff --git a/src/daemon/connection.ts b/src/daemon/connection.ts index 9a3c417bb..ed6c66a37 100644 --- a/src/daemon/connection.ts +++ b/src/daemon/connection.ts @@ -43,6 +43,7 @@ export class Connection { private _state: ConnectionState = 'CONNECTING'; private _agentName?: string; private _cli?: string; + private _workingDirectory?: string; private _sessionId: string; private _resumeToken: string; @@ -83,6 +84,10 @@ export class Connection { return this._cli; } + get workingDirectory(): string | undefined { + return this._workingDirectory; + } + get sessionId(): string { return this._sessionId; } @@ -139,6 +144,7 @@ export class Connection { this._agentName = envelope.payload.agent; this._cli = envelope.payload.cli; + this._workingDirectory = envelope.payload.workingDirectory; // Check for session resume if (envelope.payload.session?.resume_token) { diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 53d41273f..2fe9acec0 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -1,3 +1,4 @@ export * from './server.js'; export * from './router.js'; export * from './connection.js'; +export * from './agent-registry.js'; diff --git a/src/daemon/router.ts b/src/daemon/router.ts index e0b10a0b4..46629d7a3 100644 --- a/src/daemon/router.ts +++ b/src/daemon/router.ts @@ -12,11 +12,13 @@ import { PROTOCOL_VERSION, } from '../protocol/types.js'; import type { StorageAdapter } from '../storage/adapter.js'; +import type { AgentRegistry } from './agent-registry.js'; export interface RoutableConnection { id: string; agentName?: string; cli?: string; + workingDirectory?: string; sessionId: string; close(): void; send(envelope: Envelope): boolean; @@ -53,10 +55,12 @@ export class Router { private subscriptions: Map> = new Map(); // topic -> Set private pendingDeliveries: Map = new Map(); // deliverId -> pending private deliveryOptions: DeliveryReliabilityOptions; + private registry?: AgentRegistry; - constructor(options: { storage?: StorageAdapter; delivery?: Partial } = {}) { + constructor(options: { storage?: StorageAdapter; delivery?: Partial; registry?: AgentRegistry } = {}) { this.storage = options.storage; this.deliveryOptions = { ...DEFAULT_DELIVERY_OPTIONS, ...options.delivery }; + this.registry = options.registry; } /** @@ -73,6 +77,11 @@ export class Router { this.connections.delete(existing.id); } this.agents.set(connection.agentName, connection); + this.registry?.registerOrUpdate({ + name: connection.agentName, + cli: connection.cli, + workingDirectory: connection.workingDirectory, + }); } } @@ -131,6 +140,8 @@ export class Router { return; } + this.registry?.recordSend(senderName); + const to = envelope.to; const topic = envelope.topic; @@ -165,6 +176,7 @@ export class Router { this.persistDeliverEnvelope(deliver); if (sent) { this.trackDelivery(target, deliver); + this.registry?.recordReceive(to); } return sent; } @@ -191,6 +203,7 @@ export class Router { this.persistDeliverEnvelope(deliver); if (sent) { this.trackDelivery(target, deliver); + this.registry?.recordReceive(agentName); } } } diff --git a/src/daemon/server.ts b/src/daemon/server.ts index 92f42a39f..04bebb137 100644 --- a/src/daemon/server.ts +++ b/src/daemon/server.ts @@ -10,6 +10,9 @@ import { Connection, type ConnectionConfig, DEFAULT_CONFIG } from './connection. import { Router } from './router.js'; import type { Envelope, SendPayload } from '../protocol/types.js'; import { createStorageAdapter, type StorageAdapter, type StorageConfig } from '../storage/adapter.js'; +import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; +import { getProjectPaths } from '../utils/project-namespace.js'; +import { AgentRegistry } from './agent-registry.js'; export interface DaemonConfig extends ConnectionConfig { socketPath: string; @@ -38,6 +41,7 @@ export class Daemon { private connections: Set = new Set(); private storage?: StorageAdapter; private storageInitialized = false; + private registry?: AgentRegistry; constructor(config: Partial = {}) { this.config = { ...DEFAULT_DAEMON_CONFIG, ...config }; @@ -48,6 +52,9 @@ export class Daemon { if (!this.config.teamDir) { this.config.teamDir = path.dirname(this.config.socketPath); } + if (this.config.teamDir) { + this.registry = new AgentRegistry(this.config.teamDir); + } // Storage is initialized lazily in start() to support async createStorageAdapter this.server = net.createServer(this.handleConnection.bind(this)); } @@ -56,18 +63,15 @@ export class Daemon { * Write current agents to agents.json for dashboard consumption. */ private writeAgentsFile(): void { - if (!this.config.teamDir) return; - const agentsPath = path.join(this.config.teamDir, 'agents.json'); - const agents = this.router.getAgents().map(name => { - const connection = this.router.getConnection(name); - return { - name, - cli: connection?.cli, - connectedAt: new Date().toISOString(), - }; - }); + if (!this.registry) return; + // The registry persists on every update; this is a no-op helper for symmetry. + const agents = this.registry.getAgents(); try { - fs.writeFileSync(agentsPath, JSON.stringify({ agents }, null, 2)); + fs.writeFileSync( + path.join(this.config.teamDir ?? path.dirname(this.config.socketPath), 'agents.json'), + JSON.stringify({ agents }, null, 2), + 'utf-8' + ); } catch (err) { console.error('[daemon] Failed to write agents.json:', err); } @@ -89,7 +93,7 @@ export class Daemon { this.storage = await createStorageAdapter(storagePath, this.config.storageConfig); } - this.router = new Router({ storage: this.storage }); + this.router = new Router({ storage: this.storage, registry: this.registry }); this.storageInitialized = true; } @@ -188,7 +192,27 @@ export class Daemon { if (connection.agentName) { this.router.register(connection); console.log(`[daemon] Agent registered: ${connection.agentName}`); - this.writeAgentsFile(); + if (connection.agentName) { + this.registry?.registerOrUpdate({ + name: connection.agentName, + cli: connection.cli, + workingDirectory: connection.workingDirectory, + }); + this.writeAgentsFile(); + } + + // Record session start + if (this.storage instanceof SqliteStorageAdapter) { + const projectPaths = getProjectPaths(); + this.storage.startSession({ + id: connection.sessionId, + agentName: connection.agentName, + cli: connection.cli, + projectId: projectPaths.projectId, + projectRoot: projectPaths.projectRoot, + startedAt: Date.now(), + }).catch(err => console.error('[daemon] Failed to record session start:', err)); + } } }; @@ -196,14 +220,32 @@ export class Daemon { console.log(`[daemon] Connection closed: ${connection.agentName ?? connection.id}`); this.connections.delete(connection); this.router.unregister(connection); + if (connection.agentName) { + this.registry?.touch(connection.agentName); + } this.writeAgentsFile(); + + // Record session end (disconnect - agent may still mark it closed explicitly) + if (this.storage instanceof SqliteStorageAdapter) { + this.storage.endSession(connection.sessionId, { closedBy: 'disconnect' }) + .catch(err => console.error('[daemon] Failed to record session end:', err)); + } }; connection.onError = (error: Error) => { console.error(`[daemon] Connection error: ${error.message}`); this.connections.delete(connection); this.router.unregister(connection); + if (connection.agentName) { + this.registry?.touch(connection.agentName); + } this.writeAgentsFile(); + + // Record session end on error + if (this.storage instanceof SqliteStorageAdapter) { + this.storage.endSession(connection.sessionId, { closedBy: 'error' }) + .catch(err => console.error('[daemon] Failed to record session end:', err)); + } }; } diff --git a/src/dashboard/server.ts b/src/dashboard/server.ts index 192f167df..91864733e 100644 --- a/src/dashboard/server.ts +++ b/src/dashboard/server.ts @@ -27,6 +27,33 @@ interface Message { id: string; // unique-ish id } +interface SessionInfo { + id: string; + agentName: string; + cli?: string; + startedAt: string; + endedAt?: string; + duration?: string; + messageCount: number; + summary?: string; + /** + * true if session is still active (endedAt is not set). + * Note: This is determined solely by endedAt, regardless of how the session + * was closed (agent explicit close, disconnect, or error via closedBy field). + */ + isActive: boolean; + /** How the session was closed: 'agent' (explicit), 'disconnect', 'error', or undefined */ + closedBy?: 'agent' | 'disconnect' | 'error'; +} + +interface AgentSummary { + agentName: string; + lastUpdated: string; + currentTask?: string; + completedTasks?: string[]; + context?: string; +} + export async function startDashboard(port: number, dataDir: string, dbPath?: string): Promise { console.log('Starting dashboard...'); console.log('__dirname:', __dirname); @@ -157,9 +184,53 @@ export async function startDashboard(port: number, dataDir: string, dbPath?: str return allMessages; }; + const formatDuration = (startMs: number, endMs?: number): string => { + const end = endMs ?? Date.now(); + const durationMs = end - startMs; + const minutes = Math.floor(durationMs / 60000); + const hours = Math.floor(minutes / 60); + if (hours > 0) { + return `${hours}h ${minutes % 60}m`; + } + return `${minutes}m`; + }; + + const getRecentSessions = async (): Promise => { + if (storage && storage instanceof SqliteStorageAdapter) { + const sessions = await storage.getRecentSessions(20); + return sessions.map(s => ({ + id: s.id, + agentName: s.agentName, + cli: s.cli, + startedAt: new Date(s.startedAt).toISOString(), + endedAt: s.endedAt ? new Date(s.endedAt).toISOString() : undefined, + duration: formatDuration(s.startedAt, s.endedAt), + messageCount: s.messageCount, + summary: s.summary, + isActive: !s.endedAt, // Active if no end time + closedBy: s.closedBy, + })); + } + return []; + }; + + const getAgentSummaries = async (): Promise => { + if (storage && storage instanceof SqliteStorageAdapter) { + const summaries = await storage.getAllAgentSummaries(); + return summaries.map(s => ({ + agentName: s.agentName, + lastUpdated: new Date(s.lastUpdated).toISOString(), + currentTask: s.currentTask, + completedTasks: s.completedTasks, + context: s.context, + })); + } + return []; + }; + const getAllData = async () => { const team = getTeamData(); - if (!team) return { agents: [], messages: [], activity: [] }; + if (!team) return { agents: [], messages: [], activity: [], sessions: [], summaries: [] }; const agentsMap = new Map(); const allMessages: Message[] = await getMessages(team.agents); @@ -202,10 +273,18 @@ export async function startDashboard(port: number, dataDir: string, dbPath?: str } }); + // Fetch sessions and summaries in parallel + const [sessions, summaries] = await Promise.all([ + getRecentSessions(), + getAgentSummaries(), + ]); + return { agents: Array.from(agentsMap.values()), messages: allMessages, - activity: allMessages // For now, activity log is just the message log + activity: allMessages, // For now, activity log is just the message log + sessions, + summaries, }; }; diff --git a/src/protocol/types.ts b/src/protocol/types.ts index 02cd71eac..b1256ac7e 100644 --- a/src/protocol/types.ts +++ b/src/protocol/types.ts @@ -49,6 +49,8 @@ export interface HelloPayload { }; /** Optional hint about which CLI the agent is using (claude, codex, gemini, etc.) */ cli?: string; + /** Optional working directory hint for registry/dashboard */ + workingDirectory?: string; session?: { resume_token?: string; }; @@ -75,6 +77,8 @@ export interface SendPayload { export interface SendMeta { requires_ack?: boolean; ttl_ms?: number; + importance?: number; // 0-100, 100 is highest + replyTo?: string; // Correlation ID for replies } export interface DeliveryInfo { diff --git a/src/storage/adapter.ts b/src/storage/adapter.ts index 8e6f675ff..573d2fe0c 100644 --- a/src/storage/adapter.ts +++ b/src/storage/adapter.ts @@ -37,12 +37,56 @@ export interface MessageQuery { urgentOnly?: boolean; } +export interface StoredSession { + id: string; + agentName: string; + cli?: string; + projectId?: string; + projectRoot?: string; + startedAt: number; + endedAt?: number; + messageCount: number; + summary?: string; + /** How the session was closed: 'agent' (explicit), 'disconnect', 'error', or undefined (still active) */ + closedBy?: 'agent' | 'disconnect' | 'error'; +} + +export interface SessionQuery { + agentName?: string; + projectId?: string; + since?: number; + limit?: number; +} + +export interface AgentSummary { + agentName: string; + projectId?: string; + lastUpdated: number; + currentTask?: string; + completedTasks?: string[]; + decisions?: string[]; + context?: string; + files?: string[]; +} + export interface StorageAdapter { init(): Promise; saveMessage(message: StoredMessage): Promise; getMessages(query?: MessageQuery): Promise; getMessageById?(id: string): Promise; close?(): Promise; + + // Session management (optional - for adapters that support it) + startSession?(session: Omit): Promise; + endSession?(sessionId: string, options?: { summary?: string; closedBy?: 'agent' | 'disconnect' | 'error' }): Promise; + getSessions?(query?: SessionQuery): Promise; + getRecentSessions?(limit?: number): Promise; + incrementSessionMessageCount?(sessionId: string): Promise; + + // Agent summaries (optional - for adapters that support it) + saveAgentSummary?(summary: Omit): Promise; + getAgentSummary?(agentName: string): Promise; + getAllAgentSummaries?(): Promise; } /** diff --git a/src/storage/sqlite-adapter.test.ts b/src/storage/sqlite-adapter.test.ts index e0e82bef0..d8cdba3b2 100644 --- a/src/storage/sqlite-adapter.test.ts +++ b/src/storage/sqlite-adapter.test.ts @@ -126,4 +126,179 @@ describe('SqliteStorageAdapter', () => { const rows = await adapter.getMessages(); expect(rows.map(r => r.id)).toEqual(['fallback-1']); }); + + describe('Session Management', () => { + it('starts and retrieves a session', async () => { + const sessionId = 'session-1'; + await adapter.startSession({ + id: sessionId, + agentName: 'TestAgent', + cli: 'claude', + projectId: 'proj-123', + projectRoot: '/home/test/project', + startedAt: Date.now(), + }); + + const sessions = await adapter.getSessions(); + expect(sessions).toHaveLength(1); + expect(sessions[0]).toMatchObject({ + id: sessionId, + agentName: 'TestAgent', + cli: 'claude', + projectId: 'proj-123', + messageCount: 0, + }); + expect(sessions[0].endedAt).toBeUndefined(); + }); + + it('ends a session with closedBy reason', async () => { + const sessionId = 'session-2'; + await adapter.startSession({ + id: sessionId, + agentName: 'Agent2', + startedAt: Date.now() - 5000, + }); + + await adapter.endSession(sessionId, { + summary: 'Completed auth module', + closedBy: 'agent', + }); + + const sessions = await adapter.getSessions(); + expect(sessions[0]).toMatchObject({ + id: sessionId, + summary: 'Completed auth module', + closedBy: 'agent', + }); + expect(sessions[0].endedAt).toBeDefined(); + }); + + it('increments session message count', async () => { + const sessionId = 'session-3'; + await adapter.startSession({ + id: sessionId, + agentName: 'Agent3', + startedAt: Date.now(), + }); + + await adapter.incrementSessionMessageCount(sessionId); + await adapter.incrementSessionMessageCount(sessionId); + await adapter.incrementSessionMessageCount(sessionId); + + const sessions = await adapter.getSessions(); + expect(sessions[0].messageCount).toBe(3); + }); + + it('filters sessions by agentName and projectId', async () => { + const now = Date.now(); + await adapter.startSession({ id: 's1', agentName: 'Alice', projectId: 'p1', startedAt: now - 2000 }); + await adapter.startSession({ id: 's2', agentName: 'Bob', projectId: 'p1', startedAt: now - 1000 }); + await adapter.startSession({ id: 's3', agentName: 'Alice', projectId: 'p2', startedAt: now }); + + const aliceSessions = await adapter.getSessions({ agentName: 'Alice' }); + expect(aliceSessions.map(s => s.id)).toEqual(['s3', 's1']); + + const p1Sessions = await adapter.getSessions({ projectId: 'p1' }); + expect(p1Sessions.map(s => s.id)).toEqual(['s2', 's1']); + }); + + it('getRecentSessions returns limited results', async () => { + for (let i = 0; i < 5; i++) { + await adapter.startSession({ + id: `recent-${i}`, + agentName: 'Agent', + startedAt: Date.now() + i * 100, + }); + } + + const recent = await adapter.getRecentSessions(3); + expect(recent).toHaveLength(3); + expect(recent[0].id).toBe('recent-4'); // Most recent first + }); + }); + + describe('Agent Summaries', () => { + it('saves and retrieves an agent summary', async () => { + await adapter.saveAgentSummary({ + agentName: 'SummaryAgent', + projectId: 'proj-1', + currentTask: 'Implementing auth', + completedTasks: ['setup', 'database'], + decisions: ['Use JWT'], + context: 'Working on login flow', + files: ['src/auth.ts', 'src/login.ts'], + }); + + const summary = await adapter.getAgentSummary('SummaryAgent'); + expect(summary).not.toBeNull(); + expect(summary).toMatchObject({ + agentName: 'SummaryAgent', + projectId: 'proj-1', + currentTask: 'Implementing auth', + completedTasks: ['setup', 'database'], + decisions: ['Use JWT'], + context: 'Working on login flow', + files: ['src/auth.ts', 'src/login.ts'], + }); + expect(summary!.lastUpdated).toBeDefined(); + }); + + it('returns null for non-existent agent summary', async () => { + const summary = await adapter.getAgentSummary('NonExistent'); + expect(summary).toBeNull(); + }); + + it('updates existing summary on save', async () => { + await adapter.saveAgentSummary({ + agentName: 'UpdateAgent', + currentTask: 'Task 1', + }); + + await adapter.saveAgentSummary({ + agentName: 'UpdateAgent', + currentTask: 'Task 2', + completedTasks: ['Task 1'], + }); + + const summary = await adapter.getAgentSummary('UpdateAgent'); + expect(summary?.currentTask).toBe('Task 2'); + expect(summary?.completedTasks).toEqual(['Task 1']); + }); + + it('getAllAgentSummaries returns all summaries ordered by lastUpdated', async () => { + await adapter.saveAgentSummary({ agentName: 'Agent1', currentTask: 'T1' }); + await new Promise(r => setTimeout(r, 10)); // Small delay for different timestamps + await adapter.saveAgentSummary({ agentName: 'Agent2', currentTask: 'T2' }); + await new Promise(r => setTimeout(r, 10)); + await adapter.saveAgentSummary({ agentName: 'Agent3', currentTask: 'T3' }); + + const summaries = await adapter.getAllAgentSummaries(); + expect(summaries).toHaveLength(3); + expect(summaries[0].agentName).toBe('Agent3'); // Most recent first + expect(summaries[2].agentName).toBe('Agent1'); // Oldest last + }); + }); + + describe('getMessageById', () => { + it('retrieves message by exact ID', async () => { + await adapter.saveMessage(makeMessage({ id: 'exact-id-123', body: 'hello' })); + + const msg = await adapter.getMessageById('exact-id-123'); + expect(msg).not.toBeNull(); + expect(msg?.body).toBe('hello'); + }); + + it('retrieves message by ID prefix', async () => { + await adapter.saveMessage(makeMessage({ id: 'prefix-abc-xyz-123', body: 'world' })); + + const msg = await adapter.getMessageById('prefix-abc'); + expect(msg).not.toBeNull(); + expect(msg?.id).toBe('prefix-abc-xyz-123'); + }); + + it('returns null for non-existent message', async () => { + const msg = await adapter.getMessageById('does-not-exist'); + expect(msg).toBeNull(); + }); + }); }); diff --git a/src/storage/sqlite-adapter.ts b/src/storage/sqlite-adapter.ts index 94c2ffc1e..c3e05b286 100644 --- a/src/storage/sqlite-adapter.ts +++ b/src/storage/sqlite-adapter.ts @@ -1,12 +1,23 @@ import path from 'node:path'; import fs from 'node:fs'; import { createRequire } from 'node:module'; -import { type MessageQuery, type StorageAdapter, type StoredMessage, type MessageStatus } from './adapter.js'; +import { + type AgentSummary, + type MessageQuery, + type MessageStatus, + type SessionQuery, + type StorageAdapter, + type StoredMessage, + type StoredSession, +} from './adapter.js'; export interface SqliteAdapterOptions { dbPath: string; } +// Re-export types for backwards compatibility +export type { StoredSession, SessionQuery } from './adapter.js'; + type SqliteDriverName = 'better-sqlite3' | 'node'; interface SqliteStatement { @@ -26,6 +37,7 @@ export class SqliteStorageAdapter implements StorageAdapter { private dbPath: string; private db?: SqliteDatabase; private insertStmt?: SqliteStatement; + private insertSessionStmt?: SqliteStatement; private driver?: SqliteDriverName; constructor(options: SqliteAdapterOptions) { @@ -90,13 +102,13 @@ export class SqliteStorageAdapter implements StorageAdapter { ); } - // Check if table exists and get columns for migration decisions - const tableExists = (this.db.prepare( + // Check if messages table exists for migration decisions + const messagesTableExists = this.db.prepare( "SELECT name FROM sqlite_master WHERE type='table' AND name='messages'" - ).get() as { name: string } | undefined); + ).get() as { name: string } | undefined; - if (!tableExists) { - // Fresh install: create table with all columns + if (!messagesTableExists) { + // Fresh install: create messages table with all columns this.db.exec(` CREATE TABLE messages ( id TEXT PRIMARY KEY, @@ -141,6 +153,40 @@ export class SqliteStorageAdapter implements StorageAdapter { } } + // Create sessions table (IF NOT EXISTS is safe here - no new columns to migrate) + this.db.exec(` + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + agent_name TEXT NOT NULL, + cli TEXT, + project_id TEXT, + project_root TEXT, + started_at INTEGER NOT NULL, + ended_at INTEGER, + message_count INTEGER DEFAULT 0, + summary TEXT, + closed_by TEXT + ); + CREATE INDEX IF NOT EXISTS idx_sessions_agent ON sessions (agent_name); + CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions (started_at); + CREATE INDEX IF NOT EXISTS idx_sessions_project ON sessions (project_id); + `); + + // Create agent_summaries table (IF NOT EXISTS is safe here - no new columns to migrate) + this.db.exec(` + CREATE TABLE IF NOT EXISTS agent_summaries ( + agent_name TEXT PRIMARY KEY, + project_id TEXT, + last_updated INTEGER NOT NULL, + current_task TEXT, + completed_tasks TEXT, + decisions TEXT, + context TEXT, + files TEXT + ); + CREATE INDEX IF NOT EXISTS idx_summaries_updated ON agent_summaries (last_updated); + `); + this.insertStmt = this.db.prepare(` INSERT OR REPLACE INTO messages (id, ts, sender, recipient, topic, kind, body, data, thread, delivery_seq, delivery_session_id, session_id, status, is_urgent) @@ -288,4 +334,206 @@ export class SqliteStorageAdapter implements StorageAdapter { this.db = undefined; } } + + // ============ Session Management ============ + + async startSession(session: Omit): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + INSERT OR REPLACE INTO sessions + (id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + + stmt.run( + session.id, + session.agentName, + session.cli ?? null, + session.projectId ?? null, + session.projectRoot ?? null, + session.startedAt, + session.endedAt ?? null, + 0, + session.summary ?? null + ); + } + + /** + * End a session and optionally set a summary. + * + * Note: The summary uses COALESCE(?, summary) - if a summary was previously + * set (e.g., during startSession or a prior endSession call), passing null/undefined + * for summary will preserve the existing value rather than clearing it. + * To explicitly clear a summary, pass an empty string. + */ + async endSession( + sessionId: string, + options?: { summary?: string; closedBy?: 'agent' | 'disconnect' | 'error' } + ): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + UPDATE sessions + SET ended_at = ?, summary = COALESCE(?, summary), closed_by = ? + WHERE id = ? + `); + + stmt.run( + Date.now(), + options?.summary ?? null, + options?.closedBy ?? null, + sessionId + ); + } + + async incrementSessionMessageCount(sessionId: string): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + UPDATE sessions SET message_count = message_count + 1 WHERE id = ? + `); + + stmt.run(sessionId); + } + + async getSessions(query: SessionQuery = {}): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const clauses: string[] = []; + const params: unknown[] = []; + + if (query.agentName) { + clauses.push('agent_name = ?'); + params.push(query.agentName); + } + if (query.projectId) { + clauses.push('project_id = ?'); + params.push(query.projectId); + } + if (query.since) { + clauses.push('started_at >= ?'); + params.push(query.since); + } + + const where = clauses.length ? `WHERE ${clauses.join(' AND ')}` : ''; + const limit = query.limit ?? 50; + + const stmt = this.db.prepare(` + SELECT id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary, closed_by + FROM sessions + ${where} + ORDER BY started_at DESC + LIMIT ? + `); + + const rows = stmt.all(...params, limit); + return rows.map((row: any) => ({ + id: row.id, + agentName: row.agent_name, + cli: row.cli ?? undefined, + projectId: row.project_id ?? undefined, + projectRoot: row.project_root ?? undefined, + startedAt: row.started_at, + endedAt: row.ended_at ?? undefined, + messageCount: row.message_count, + summary: row.summary ?? undefined, + closedBy: row.closed_by ?? undefined, + })); + } + + async getRecentSessions(limit: number = 10): Promise { + return this.getSessions({ limit }); + } + + // ============ Agent Summaries ============ + + async saveAgentSummary(summary: { + agentName: string; + projectId?: string; + currentTask?: string; + completedTasks?: string[]; + decisions?: string[]; + context?: string; + files?: string[]; + }): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + INSERT OR REPLACE INTO agent_summaries + (agent_name, project_id, last_updated, current_task, completed_tasks, decisions, context, files) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `); + + stmt.run( + summary.agentName, + summary.projectId ?? null, + Date.now(), + summary.currentTask ?? null, + summary.completedTasks ? JSON.stringify(summary.completedTasks) : null, + summary.decisions ? JSON.stringify(summary.decisions) : null, + summary.context ?? null, + summary.files ? JSON.stringify(summary.files) : null + ); + } + + async getAgentSummary(agentName: string): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + SELECT agent_name, project_id, last_updated, current_task, completed_tasks, decisions, context, files + FROM agent_summaries + WHERE agent_name = ? + `); + + const row: any = stmt.get(agentName); + if (!row) return null; + + return { + agentName: row.agent_name, + projectId: row.project_id ?? undefined, + lastUpdated: row.last_updated, + currentTask: row.current_task ?? undefined, + completedTasks: row.completed_tasks ? JSON.parse(row.completed_tasks) : undefined, + decisions: row.decisions ? JSON.parse(row.decisions) : undefined, + context: row.context ?? undefined, + files: row.files ? JSON.parse(row.files) : undefined, + }; + } + + async getAllAgentSummaries(): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + SELECT agent_name, project_id, last_updated, current_task, completed_tasks, decisions, context, files + FROM agent_summaries + ORDER BY last_updated DESC + `); + + const rows = stmt.all(); + return rows.map((row: any) => ({ + agentName: row.agent_name, + projectId: row.project_id ?? undefined, + lastUpdated: row.last_updated, + currentTask: row.current_task ?? undefined, + completedTasks: row.completed_tasks ? JSON.parse(row.completed_tasks) : undefined, + decisions: row.decisions ? JSON.parse(row.decisions) : undefined, + context: row.context ?? undefined, + files: row.files ? JSON.parse(row.files) : undefined, + })); + } } diff --git a/src/utils/project-namespace.ts b/src/utils/project-namespace.ts index c0ad207c8..dcc9e5c44 100644 --- a/src/utils/project-namespace.ts +++ b/src/utils/project-namespace.ts @@ -9,8 +9,32 @@ import crypto from 'node:crypto'; import path from 'node:path'; import fs from 'node:fs'; +import os from 'node:os'; -const BASE_DIR = '/tmp/agent-relay'; +/** + * Get the base directory for agent-relay data. + * Priority: + * 1. AGENT_RELAY_DATA_DIR environment variable + * 2. XDG_DATA_HOME/agent-relay (Linux/macOS standard) + * 3. ~/.agent-relay (fallback) + */ +function getBaseDir(): string { + // Explicit override + if (process.env.AGENT_RELAY_DATA_DIR) { + return process.env.AGENT_RELAY_DATA_DIR; + } + + // XDG Base Directory Specification + const xdgDataHome = process.env.XDG_DATA_HOME; + if (xdgDataHome) { + return path.join(xdgDataHome, 'agent-relay'); + } + + // Default: ~/.agent-relay + return path.join(os.homedir(), '.agent-relay'); +} + +const BASE_DIR = getBaseDir(); /** * Generate a short hash of a path for namespacing diff --git a/src/wrapper/client.ts b/src/wrapper/client.ts index 88fcdba33..bbdf4a9bc 100644 --- a/src/wrapper/client.ts +++ b/src/wrapper/client.ts @@ -25,6 +25,8 @@ export interface ClientConfig { agentName: string; /** Optional CLI identifier to surface to the dashboard */ cli?: string; + /** Optional working directory to surface in registry/dashboard */ + workingDirectory?: string; reconnect: boolean; maxReconnectAttempts: number; reconnectDelayMs: number; @@ -73,6 +75,11 @@ export class RelayClient { return this.config.agentName; } + /** Get the session ID assigned by the server */ + get currentSessionId(): string | undefined { + return this.sessionId; + } + /** * Connect to the relay daemon. */ @@ -251,6 +258,7 @@ export class RelayClient { payload: { agent: this.config.agentName, cli: this.config.cli, + workingDirectory: this.config.workingDirectory, capabilities: { ack: true, resume: true, diff --git a/src/wrapper/parser.test.ts b/src/wrapper/parser.test.ts index f89279cd9..7b03e31e0 100644 --- a/src/wrapper/parser.test.ts +++ b/src/wrapper/parser.test.ts @@ -3,7 +3,7 @@ */ import { describe, it, expect, beforeEach } from 'vitest'; -import { OutputParser, formatIncomingMessage } from './parser.js'; +import { OutputParser, formatIncomingMessage, parseSummaryFromOutput, parseSessionEndFromOutput, parseRelayMetadataFromOutput } from './parser.js'; describe('OutputParser', () => { let parser: OutputParser; @@ -620,3 +620,183 @@ describe('formatIncomingMessage', () => { expect(result).toBe('\n[MSG] from agent1: Line 1\nLine 2\nLine 3\n'); }); }); + +describe('parseSummaryFromOutput', () => { + it('parses valid JSON summary block', () => { + const output = `Some output +[[SUMMARY]] +{ + "currentTask": "Implementing auth", + "context": "Working on login flow", + "files": ["src/auth.ts"] +} +[[/SUMMARY]] +More output`; + + const summary = parseSummaryFromOutput(output); + expect(summary).not.toBeNull(); + expect(summary).toEqual({ + currentTask: 'Implementing auth', + context: 'Working on login flow', + files: ['src/auth.ts'], + }); + }); + + it('parses summary with all fields', () => { + const output = `[[SUMMARY]]{"currentTask":"Task 1","completedTasks":["T0"],"decisions":["Use JWT"],"context":"Auth work","files":["a.ts","b.ts"]}[[/SUMMARY]]`; + + const summary = parseSummaryFromOutput(output); + expect(summary).toEqual({ + currentTask: 'Task 1', + completedTasks: ['T0'], + decisions: ['Use JWT'], + context: 'Auth work', + files: ['a.ts', 'b.ts'], + }); + }); + + it('returns null when no summary block exists', () => { + const output = 'Just regular output without any summary block'; + + const summary = parseSummaryFromOutput(output); + expect(summary).toBeNull(); + }); + + it('returns null for invalid JSON', () => { + const output = '[[SUMMARY]]not valid json[[/SUMMARY]]'; + + const summary = parseSummaryFromOutput(output); + expect(summary).toBeNull(); + }); + + it('handles empty summary block', () => { + const output = '[[SUMMARY]]{}[[/SUMMARY]]'; + + const summary = parseSummaryFromOutput(output); + expect(summary).toEqual({}); + }); +}); + +describe('parseSessionEndFromOutput', () => { + it('parses valid JSON session end block', () => { + const output = `Some output +[[SESSION_END]] +{ + "summary": "Completed auth module", + "completedTasks": ["login", "logout"] +} +[[/SESSION_END]] +More output`; + + const result = parseSessionEndFromOutput(output); + expect(result).not.toBeNull(); + expect(result).toEqual({ + summary: 'Completed auth module', + completedTasks: ['login', 'logout'], + }); + }); + + it('parses empty session end block', () => { + const output = '[[SESSION_END]][[/SESSION_END]]'; + + const result = parseSessionEndFromOutput(output); + expect(result).toEqual({}); + }); + + it('parses session end with only summary', () => { + const output = '[[SESSION_END]]{"summary":"All done!"}[[/SESSION_END]]'; + + const result = parseSessionEndFromOutput(output); + expect(result).toEqual({ summary: 'All done!' }); + }); + + it('treats non-JSON content as plain summary', () => { + const output = '[[SESSION_END]]Work completed successfully[[/SESSION_END]]'; + + const result = parseSessionEndFromOutput(output); + expect(result).toEqual({ summary: 'Work completed successfully' }); + }); + + it('returns null when no session end block exists', () => { + const output = 'Regular output without session end'; + + const result = parseSessionEndFromOutput(output); + expect(result).toBeNull(); + }); + + it('handles multiline plain text summary', () => { + const output = `[[SESSION_END]] +Completed the following: +- Feature A +- Feature B +[[/SESSION_END]]`; + + const result = parseSessionEndFromOutput(output); + expect(result?.summary).toContain('Completed the following:'); + expect(result?.summary).toContain('Feature A'); + }); +}); + +describe('parseRelayMetadataFromOutput', () => { + it('parses valid metadata block', () => { + const output = `Some output +[[RELAY_METADATA]] +{ + "subject": "Task update", + "importance": 80, + "replyTo": "msg-abc123", + "ackRequired": true +} +[[/RELAY_METADATA]] +More output`; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(true); + expect(result.valid).toBe(true); + expect(result.metadata).toEqual({ + subject: 'Task update', + importance: 80, + replyTo: 'msg-abc123', + ackRequired: true, + }); + expect(result.rawContent).toContain('"subject"'); + }); + + it('returns not found when no metadata block exists', () => { + const output = 'Regular output without any metadata block'; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(false); + expect(result.valid).toBe(false); + expect(result.metadata).toBeNull(); + expect(result.rawContent).toBeNull(); + }); + + it('returns invalid for malformed JSON', () => { + const output = '[[RELAY_METADATA]]not valid json[[/RELAY_METADATA]]'; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(true); + expect(result.valid).toBe(false); + expect(result.metadata).toBeNull(); + expect(result.rawContent).toBe('not valid json'); + }); + + it('handles empty metadata block', () => { + const output = '[[RELAY_METADATA]]{}[[/RELAY_METADATA]]'; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(true); + expect(result.valid).toBe(true); + expect(result.metadata).toEqual({}); + }); + + it('parses metadata with partial fields', () => { + const output = '[[RELAY_METADATA]]{"subject":"Quick note"}[[/RELAY_METADATA]]'; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(true); + expect(result.valid).toBe(true); + expect(result.metadata).toEqual({ subject: 'Quick note' }); + }); +}); diff --git a/src/wrapper/parser.ts b/src/wrapper/parser.ts index accc33a49..f4bf8a351 100644 --- a/src/wrapper/parser.ts +++ b/src/wrapper/parser.ts @@ -23,6 +23,7 @@ export interface ParsedCommand { /** Optional thread ID for grouping related messages */ thread?: string; raw: string; + meta?: ParsedMessageMetadata; } export interface ParserOptions { @@ -45,8 +46,9 @@ const DEFAULT_OPTIONS: Required = { // Static patterns (not prefix-dependent) const BLOCK_END = /\[\[\/RELAY\]\]/; -const CODE_FENCE = /^```/; -// Continuation helpers +const BLOCK_METADATA_START = '[[RELAY_METADATA]]'; +const BLOCK_METADATA_END = /\[\[\/RELAY_METADATA\]\]/; +const CODE_FENCE = /^```/;// Continuation helpers const BULLET_OR_NUMBERED_LIST = /^[ \t]*([\-*•◦‣⏺◆◇○□■]|[0-9]+[.)])\s+/; const PROMPTISH_LINE = /^[\s]*[>$%#➜›»][\s]*$/; const RELAY_INJECTION_PREFIX = /^\s*Relay message from /; @@ -99,6 +101,8 @@ export class OutputParser { private inCodeFence = false; private inBlock = false; private blockBuffer = ''; + private blockType: 'RELAY' | 'RELAY_METADATA' | null = null; + private lastParsedMetadata: ParsedMessageMetadata | null = null; // Dynamic patterns based on prefix configuration private inlineRelayPattern: RegExp; @@ -133,17 +137,17 @@ export class OutputParser { let output = ''; // If we're inside a block, accumulate until we see the end - if (this.inBlock) { - return this.parseInBlockMode(data, commands); + if (this.inBlock && this.blockType) { + return this.parseInBlockMode(data, commands, this.blockType); } - // Find [[RELAY]] that's at the start of a line (or start of input) - // and NOT inside a code fence - const blockStartIdx = this.findBlockStart(data); + // Find [[RELAY_METADATA]] or [[RELAY]] that's at the start of a line + const blockStart = this.findBlockStart(data); - if (this.options.enableBlock && blockStartIdx !== -1) { - const before = data.substring(0, blockStartIdx); - const after = data.substring(blockStartIdx + '[[RELAY]]'.length); + if (this.options.enableBlock && blockStart.index !== -1 && blockStart.identifier) { + const blockStartIdentifier = blockStart.identifier; + const before = data.substring(0, blockStart.index); + const after = data.substring(blockStart.index + blockStartIdentifier.length); // Output everything before the block start if (before) { @@ -153,6 +157,7 @@ export class OutputParser { // Enter block mode this.inBlock = true; + this.blockType = blockStartIdentifier === BLOCK_METADATA_START ? 'RELAY_METADATA' : 'RELAY'; this.blockBuffer = after; // Check size limit before processing @@ -160,12 +165,14 @@ export class OutputParser { console.error('[parser] Block too large, discarding'); this.inBlock = false; this.blockBuffer = ''; + this.blockType = null; return { commands, output }; } // Check if block ends in same chunk - if (BLOCK_END.test(this.blockBuffer)) { - const blockResult = this.finishBlock(); + const blockEndPattern = this.blockType === 'RELAY_METADATA' ? BLOCK_METADATA_END : BLOCK_END; + if (blockEndPattern.test(this.blockBuffer)) { + const blockResult = this.finishBlock(this.blockType); if (blockResult.command) { commands.push(blockResult.command); } @@ -186,66 +193,76 @@ export class OutputParser { } /** - * Find [[RELAY]] that's at the start of a line and not inside a code fence. - * Returns the index, or -1 if not found. + * Find [[RELAY_METADATA]] or [[RELAY]] that's at the start of a line and not inside a code fence. + * Returns the index and identifier, or -1 and null if not found. */ - private findBlockStart(data: string): number { + private findBlockStart(data: string): { index: number; identifier: string | null } { // Track code fence state through the data let inFence = this.inCodeFence; let searchStart = 0; + // Prioritize RELAY_METADATA over RELAY + const blockIdentifiers = [BLOCK_METADATA_START, '[[RELAY]]']; + while (searchStart < data.length) { - // Look for next [[RELAY]] or code fence - const relayIdx = data.indexOf('[[RELAY]]', searchStart); - const fenceIdx = data.indexOf('```', searchStart); + let earliestBlockIdx = -1; + let earliestBlockIdentifier: string | null = null; + + for (const identifier of blockIdentifiers) { + const currentBlockIdx = data.indexOf(identifier, searchStart); + if (currentBlockIdx !== -1 && (earliestBlockIdx === -1 || currentBlockIdx < earliestBlockIdx)) { + earliestBlockIdx = currentBlockIdx; + earliestBlockIdentifier = identifier; + } + } - // No more [[RELAY]] found - if (relayIdx === -1) { + // No more blocks found + if (earliestBlockIdx === -1) { // Still update code fence state for remaining data + let fenceIdx = data.indexOf('```', searchStart); while (fenceIdx !== -1) { - const nextFence = data.indexOf('```', searchStart); - if (nextFence === -1) break; inFence = !inFence; - searchStart = nextFence + 3; + searchStart = fenceIdx + 3; + fenceIdx = data.indexOf('```', searchStart); } - return -1; + return { index: -1, identifier: null }; } - // Process any code fences before this [[RELAY]] + // Process any code fences before this block let tempIdx = searchStart; while (true) { const nextFence = data.indexOf('```', tempIdx); - if (nextFence === -1 || nextFence >= relayIdx) break; + if (nextFence === -1 || nextFence >= earliestBlockIdx) break; inFence = !inFence; tempIdx = nextFence + 3; } - // If we're inside a code fence, skip this [[RELAY]] + // If we're inside a code fence, skip this block if (inFence) { - searchStart = relayIdx + 9; // Skip past [[RELAY]] + searchStart = earliestBlockIdx + (earliestBlockIdentifier?.length ?? 0); // Skip past the block continue; } - // Check if [[RELAY]] is at start of a line - if (relayIdx === 0) { - return 0; // At very start + // Check if block is at start of a line + if (earliestBlockIdx === 0) { + return { index: 0, identifier: earliestBlockIdentifier }; // At very start } // Look backwards for the start of line - const beforeRelay = data.substring(0, relayIdx); - const lastNewline = beforeRelay.lastIndexOf('\n'); - const lineStart = beforeRelay.substring(lastNewline + 1); + const beforeBlock = data.substring(0, earliestBlockIdx); + const lastNewline = beforeBlock.lastIndexOf('\n'); + const lineStart = beforeBlock.substring(lastNewline + 1); - // Must be only whitespace before [[RELAY]] on this line + // Must be only whitespace before block on this line if (/^\s*$/.test(lineStart)) { - return relayIdx; + return { index: earliestBlockIdx, identifier: earliestBlockIdentifier }; } // Not at start of line, keep searching - searchStart = relayIdx + 9; + searchStart = earliestBlockIdx + (earliestBlockIdentifier?.length ?? 0); } - return -1; + return { index: -1, identifier: null }; } /** @@ -395,7 +412,7 @@ export class OutputParser { /** * Parse while inside a [[RELAY]] block - buffer until we see [[/RELAY]]. */ - private parseInBlockMode(data: string, commands: ParsedCommand[]): { commands: ParsedCommand[]; output: string } { + private parseInBlockMode(data: string, commands: ParsedCommand[], blockType: 'RELAY' | 'RELAY_METADATA'): { commands: ParsedCommand[]; output: string } { this.blockBuffer += data; // Check size limit @@ -403,12 +420,14 @@ export class OutputParser { console.error('[parser] Block too large, discarding'); this.inBlock = false; this.blockBuffer = ''; + this.blockType = null; return { commands, output: '' }; } // Check for block end - if (BLOCK_END.test(this.blockBuffer)) { - const result = this.finishBlock(); + const blockEndPattern = blockType === 'RELAY_METADATA' ? BLOCK_METADATA_END : BLOCK_END; + if (blockEndPattern.test(this.blockBuffer)) { + const result = this.finishBlock(blockType); if (result.command) { commands.push(result.command); } @@ -499,37 +518,59 @@ export class OutputParser { * Finish processing a block and extract command. * Returns the command (if valid) and any remaining content after [[/RELAY]]. */ - private finishBlock(): { command: ParsedCommand | null; remaining: string | null } { - const endIdx = this.blockBuffer.indexOf('[[/RELAY]]'); + private finishBlock(blockType: 'RELAY' | 'RELAY_METADATA'): { command: ParsedCommand | null; remaining: string | null; metadata: ParsedMessageMetadata | null } { + const blockEndIdentifier = blockType === 'RELAY_METADATA' ? BLOCK_METADATA_END.source : BLOCK_END.source; + const endIdx = this.blockBuffer.indexOf(blockEndIdentifier.replace(/\\/g, '')); // Remove regex escapes for indexOf const jsonStr = this.blockBuffer.substring(0, endIdx).trim(); - const remaining = this.blockBuffer.substring(endIdx + '[[/RELAY]]'.length) || null; + const remaining = this.blockBuffer.substring(endIdx + blockEndIdentifier.replace(/\\/g, '').length) || null; this.inBlock = false; this.blockBuffer = ''; - - try { - const parsed = JSON.parse(jsonStr); - - // Validate required fields - if (!parsed.to || !parsed.type) { - console.error('[parser] Block missing required fields (to, type)'); - return { command: null, remaining }; + this.blockType = null; + + if (blockType === 'RELAY_METADATA') { + try { + const metadata = JSON.parse(jsonStr) as ParsedMessageMetadata; + this.lastParsedMetadata = metadata; + return { command: null, remaining, metadata }; + } catch (err) { + console.error('[parser] Invalid JSON in RELAY_METADATA block:', err); + this.lastParsedMetadata = null; + return { command: null, remaining, metadata: null }; } + } else { // blockType === 'RELAY' + try { + const parsed = JSON.parse(jsonStr); + + // Validate required fields + if (!parsed.to || !parsed.type) { + console.error('[parser] Block missing required fields (to, type)'); + this.lastParsedMetadata = null; // Clear metadata even if RELAY block is invalid + return { command: null, remaining, metadata: null }; + } - return { - command: { + const command: ParsedCommand = { to: parsed.to, kind: parsed.type as PayloadKind, body: parsed.body ?? parsed.text ?? '', data: parsed.data, thread: parsed.thread || undefined, raw: jsonStr, - }, - remaining, - }; - } catch (err) { - console.error('[parser] Invalid JSON in block:', err); - return { command: null, remaining }; + meta: this.lastParsedMetadata || undefined, // Attach last parsed metadata + }; + + this.lastParsedMetadata = null; // Clear after use + + return { + command, + remaining, + metadata: null, + }; + } catch (err) { + console.error('[parser] Invalid JSON in RELAY block:', err); + this.lastParsedMetadata = null; + return { command: null, remaining, metadata: null }; + } } } @@ -540,6 +581,8 @@ export class OutputParser { const result = this.parse('\n'); this.inBlock = false; this.blockBuffer = ''; + this.blockType = null; + this.lastParsedMetadata = null; this.inCodeFence = false; return result; } @@ -550,6 +593,8 @@ export class OutputParser { reset(): void { this.inBlock = false; this.blockBuffer = ''; + this.blockType = null; + this.lastParsedMetadata = null; this.inCodeFence = false; } } @@ -561,3 +606,153 @@ export function formatIncomingMessage(from: string, body: string, kind: PayloadK const prefix = kind === 'thinking' ? '[THINKING]' : '[MSG]'; return `\n${prefix} from ${from}: ${body}\n`; } + +/** + * Parsed message metadata block from agent output. + */ +export interface ParsedMessageMetadata { + subject?: string; + importance?: number; + replyTo?: string; + ackRequired?: boolean; +} + +/** + * Result of attempting to parse a RELAY_METADATA block. + */ +export interface MetadataParseResult { + found: boolean; + valid: boolean; + metadata: ParsedMessageMetadata | null; + rawContent: string | null; // Raw block content for deduplication +} + +/** + * Parse [[RELAY_METADATA]]...[[/RELAY_METADATA]] blocks from agent output. + * Agents can output metadata to enhance messages. + * + * Format: + * [[RELAY_METADATA]] + * { + * "subject": "Task update", + * "importance": 80, + * "replyTo": "msg-abc123", + * "ackRequired": true + * } + * [[/RELAY_METADATA]] + */ +export function parseRelayMetadataFromOutput(output: string): MetadataParseResult { + const match = output.match(/\[\[RELAY_METADATA\]\]([\s\S]*?)\[\[\/RELAY_METADATA\]\]/); + + if (!match) { + return { found: false, valid: false, metadata: null, rawContent: null }; + } + + const rawContent = match[1].trim(); + + try { + const metadata = JSON.parse(rawContent) as ParsedMessageMetadata; + return { found: true, valid: true, metadata, rawContent }; + } catch { + return { found: true, valid: false, metadata: null, rawContent }; + } +} + +/** + * Parsed summary block from agent output. + */ +export interface ParsedSummary { + currentTask?: string; + completedTasks?: string[]; + decisions?: string[]; + context?: string; + files?: string[]; +} + +/** + * Result of attempting to parse a SUMMARY block. + */ +export interface SummaryParseResult { + found: boolean; + valid: boolean; + summary: ParsedSummary | null; + rawContent: string | null; // Raw block content for deduplication +} + +/** + * Parse [[SUMMARY]]...[[/SUMMARY]] blocks from agent output. + * Agents can output summaries to keep a running context of their work. + * + * Format: + * [[SUMMARY]] + * { + * "currentTask": "Working on auth module", + * "context": "Completed login flow, now implementing logout", + * "files": ["src/auth.ts", "src/session.ts"] + * } + * [[/SUMMARY]] + */ +export function parseSummaryFromOutput(output: string): ParsedSummary | null { + const result = parseSummaryWithDetails(output); + return result.summary; +} + +/** + * Parse SUMMARY block with full details for deduplication. + * Returns raw content to allow caller to dedupe before logging errors. + */ +export function parseSummaryWithDetails(output: string): SummaryParseResult { + const match = output.match(/\[\[SUMMARY\]\]([\s\S]*?)\[\[\/SUMMARY\]\]/); + + if (!match) { + return { found: false, valid: false, summary: null, rawContent: null }; + } + + const rawContent = match[1].trim(); + + try { + const summary = JSON.parse(rawContent) as ParsedSummary; + return { found: true, valid: true, summary, rawContent }; + } catch { + return { found: true, valid: false, summary: null, rawContent }; + } +} + +/** + * Session end marker from agent output. + */ +export interface SessionEndMarker { + summary?: string; + completedTasks?: string[]; +} + +/** + * Parse [[SESSION_END]]...[[/SESSION_END]] blocks from agent output. + * Agents output this to explicitly mark their session as complete. + * + * Format: + * [[SESSION_END]] + * {"summary": "Completed auth module implementation", "completedTasks": ["login", "logout"]} + * [[/SESSION_END]] + * + * Or simply: [[SESSION_END]][[/SESSION_END]] for a clean close without summary. + */ +export function parseSessionEndFromOutput(output: string): SessionEndMarker | null { + const match = output.match(/\[\[SESSION_END\]\]([\s\S]*?)\[\[\/SESSION_END\]\]/); + + if (!match) { + return null; + } + + const content = match[1].trim(); + if (!content) { + return {}; // Empty marker = session ended without summary + } + + try { + return JSON.parse(content) as SessionEndMarker; + } catch { + // If not valid JSON, treat the content as a plain summary string + return { summary: content }; + } +} diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index 37f70a405..2517d6ba6 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -15,9 +15,11 @@ import { exec, execSync, spawn, ChildProcess } from 'node:child_process'; import { promisify } from 'node:util'; import { RelayClient } from './client.js'; -import { OutputParser, type ParsedCommand } from './parser.js'; +import { OutputParser, type ParsedCommand, parseSummaryWithDetails, parseSessionEndFromOutput, ParsedMessageMetadata } from './parser.js'; import { InboxManager } from './inbox.js'; import type { SendPayload } from '../protocol/types.js'; +import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; +import { getProjectPaths } from '../utils/project-namespace.js'; const execAsync = promisify(exec); const escapeRegex = (str: string): string => str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); @@ -84,6 +86,8 @@ export class TmuxWrapper { private client: RelayClient; private parser: OutputParser; private inbox?: InboxManager; + private storage?: SqliteStorageAdapter; + private storageReady: Promise; // Resolves true if storage initialized, false if failed private running = false; private pollTimer?: NodeJS.Timeout; private attachProcess?: ChildProcess; @@ -100,6 +104,9 @@ export class TmuxWrapper { private lastDebugLog = 0; private cliType: 'claude' | 'codex' | 'gemini' | 'other'; private relayPrefix: string; + private lastSummaryHash = ''; // Dedup summary saves + private lastSummaryRawContent = ''; // Dedup invalid JSON error logging + private sessionEndProcessed = false; // Track if we've already processed session end constructor(config: TmuxWrapperConfig) { this.config = { @@ -139,6 +146,7 @@ export class TmuxWrapper { agentName: config.name, socketPath: config.socketPath, cli: this.cliType, + workingDirectory: this.config.cwd ?? process.cwd(), }); this.parser = new OutputParser({ prefix: this.relayPrefix }); @@ -151,6 +159,16 @@ export class TmuxWrapper { }); } + // Initialize storage for session/summary persistence + const projectPaths = getProjectPaths(); + this.storage = new SqliteStorageAdapter({ dbPath: projectPaths.dbPath }); + // Initialize asynchronously (don't block constructor) - methods await storageReady + this.storageReady = this.storage.init().then(() => true).catch(err => { + this.logStderr(`Failed to initialize storage: ${err.message}`, true); + this.storage = undefined; + return false; + }); + // Handle incoming messages from relay this.client.onMessage = (from: string, payload: SendPayload, messageId: string) => { this.handleIncomingMessage(from, payload, messageId); @@ -258,6 +276,7 @@ export class TmuxWrapper { 'setw -g alternate-screen on', // Ensure alternate screen works // Pass through mouse scroll to application in alternate screen mode 'set -ga terminal-overrides ",xterm*:Tc"', + 'set -g status-left-length 100', // Provide ample space for agent name in status bar ]; // Add mouse mode if enabled (allows scroll passthrough to CLI apps) @@ -323,8 +342,9 @@ export class TmuxWrapper { const instructions = [ `[Agent Relay] You are "${this.config.name}" - connected for real-time messaging.`, `SEND: ${this.relayPrefix}AgentName message (or ${this.relayPrefix}* to broadcast)`, - `RECEIVE: "Relay message from X [id]: content"`, - `TRUNCATED: Run "agent-relay read " if message seems cut off`, + `RECEIVE: Messages appear as "Relay message from X [id]: content"`, + `SUMMARY: Periodically output [[SUMMARY]]{"currentTask":"...","context":"..."}[[/SUMMARY]] to track progress`, + `END: Output [[SESSION_END]]{"summary":"..."}[[/SESSION_END]] when your task is complete`, ].join(' | '); try { @@ -464,6 +484,12 @@ export class TmuxWrapper { this.sendRelayCommand(cmd); } + // Check for [[SUMMARY]] blocks and save to storage + this.parseSummaryAndSave(cleanContent); + + // Check for [[SESSION_END]] blocks to explicitly close session + this.parseSessionEndAndClose(cleanContent); + this.updateActivityState(); // Also check for injection opportunity @@ -597,6 +623,103 @@ export class TmuxWrapper { } } + /** + * Parse [[SUMMARY]] blocks from output and save to storage. + * Agents can output summaries to maintain running context: + * + * [[SUMMARY]] + * {"currentTask": "Implementing auth", "context": "Completed login flow"} + * [[/SUMMARY]] + */ + private parseSummaryAndSave(content: string): void { + const result = parseSummaryWithDetails(content); + + // No SUMMARY block found + if (!result.found) return; + + // Dedup based on raw content - prevents repeated error logging for same invalid JSON + if (result.rawContent === this.lastSummaryRawContent) return; + this.lastSummaryRawContent = result.rawContent || ''; + + // Invalid JSON - log error once (deduped above) + if (!result.valid) { + this.logStderr('[parser] Invalid JSON in SUMMARY block'); + return; + } + + const summary = result.summary!; + + // Dedup valid summaries - don't save same summary twice + const summaryHash = JSON.stringify(summary); + if (summaryHash === this.lastSummaryHash) return; + this.lastSummaryHash = summaryHash; + + // Wait for storage to be ready before saving + this.storageReady.then(ready => { + if (!ready || !this.storage) { + this.logStderr('Cannot save summary: storage not initialized'); + return; + } + + const projectPaths = getProjectPaths(); + this.storage.saveAgentSummary({ + agentName: this.config.name, + projectId: projectPaths.projectId, + currentTask: summary.currentTask, + completedTasks: summary.completedTasks, + decisions: summary.decisions, + context: summary.context, + files: summary.files, + }).then(() => { + this.logStderr(`Saved agent summary: ${summary.currentTask || 'updated context'}`); + }).catch(err => { + this.logStderr(`Failed to save summary: ${err.message}`, true); + }); + }); + } + + /** + * Parse [[SESSION_END]] blocks from output and close session explicitly. + * Agents output this to mark their work session as complete: + * + * [[SESSION_END]] + * {"summary": "Completed auth module", "completedTasks": ["login", "logout"]} + * [[/SESSION_END]] + */ + private parseSessionEndAndClose(content: string): void { + if (this.sessionEndProcessed) return; // Only process once per session + + const sessionEnd = parseSessionEndFromOutput(content); + if (!sessionEnd) return; + + // Get session ID from client connection - if not available yet, don't set flag + // so we can retry when sessionId becomes available + const sessionId = this.client.currentSessionId; + if (!sessionId) { + this.logStderr('Cannot close session: no session ID yet, will retry'); + return; + } + + this.sessionEndProcessed = true; + + // Wait for storage to be ready before attempting to close session + this.storageReady.then(ready => { + if (!ready || !this.storage) { + this.logStderr('Cannot close session: storage not initialized'); + return; + } + + this.storage.endSession(sessionId, { + summary: sessionEnd.summary, + closedBy: 'agent', + }).then(() => { + this.logStderr(`Session closed by agent: ${sessionEnd.summary || 'complete'}`); + }).catch(err => { + this.logStderr(`Failed to close session: ${err.message}`, true); + }); + }); + } + /** * Handle incoming message from relay */ @@ -646,7 +769,14 @@ export class TmuxWrapper { this.logStderr(`Injecting message from ${msg.from} (cli: ${this.cliType})`); try { - const sanitizedBody = msg.body.replace(/[\r\n]+/g, ' ').trim(); + let sanitizedBody = msg.body.replace(/[\r\n]+/g, ' ').trim(); + + // Gemini interprets certain keywords (While, For, If, etc.) as shell commands + // Wrap in backticks to prevent shell keyword interpretation + if (this.cliType === 'gemini') { + sanitizedBody = `\`${sanitizedBody.replace(/`/g, "'")}\``; + } + // Short message ID for display (first 8 chars) const shortId = msg.messageId.substring(0, 8); @@ -672,6 +802,21 @@ export class TmuxWrapper { await this.sleep(30); } + // For Gemini: check if we're at a shell prompt ($) vs chat prompt (>) + // If at shell prompt, skip injection to avoid shell command execution + if (this.cliType === 'gemini') { + const lastLine = await this.getLastLine(); + const cleanLine = this.stripAnsi(lastLine).trim(); + if (/^\$\s*$/.test(cleanLine) || /^\s*\$\s*$/.test(cleanLine)) { + this.logStderr('Gemini at shell prompt, skipping injection to avoid shell execution'); + // Re-queue the message for later + this.messageQueue.unshift(msg); + this.isInjecting = false; + setTimeout(() => this.checkForInjectionOpportunity(), 2000); + return; + } + } + // Standard injection for all CLIs including Gemini // Format: Relay message from Sender [abc12345]: content const injection = `Relay message from ${msg.from} ${idTag}: ${sanitizedBody}${truncationHint}`; @@ -722,6 +867,16 @@ export class TmuxWrapper { return new Promise(r => setTimeout(r, ms)); } + /** + * Reset session-specific state for wrapper reuse. + * Call this when starting a new session with the same wrapper instance. + */ + resetSessionState(): void { + this.sessionEndProcessed = false; + this.lastSummaryHash = ''; + this.lastSummaryRawContent = ''; + } + /** * Get the prompt pattern for the current CLI type. */ @@ -851,6 +1006,9 @@ export class TmuxWrapper { this.running = false; this.activityState = 'disconnected'; + // Reset session state for potential reuse + this.resetSessionState(); + // Stop polling if (this.pollTimer) { clearInterval(this.pollTimer);