From e488364755c9b06c6123be6d2c5b96dcea74523a Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 07:28:17 +0000 Subject: [PATCH 01/12] [bd-kzw] Centralize SQLite storage to ~/.agent-relay + add session/summary tracking - Move SQLite storage from /tmp to ~/.agent-relay (fixes data loss on reboot) - Support XDG_DATA_HOME and AGENT_RELAY_DATA_DIR env overrides - Add sessions table to track agent connection history - Add agent_summaries table for running agent context - Record session start/end in daemon on connect/disconnect - Add [[SUMMARY]] block parsing in wrapper for agent summaries - Update dashboard API to include sessions and summaries data - Dashboard now shows recent sessions and agent summaries Closes: agent-relay-kzw (SQLite in /tmp cleared on reboot) --- src/daemon/server.ts | 27 ++++ src/dashboard/server.ts | 75 +++++++++- src/storage/sqlite-adapter.ts | 250 +++++++++++++++++++++++++++++++++ src/utils/project-namespace.ts | 26 +++- src/wrapper/parser.ts | 39 +++++ src/wrapper/tmux-wrapper.ts | 56 +++++++- 6 files changed, 468 insertions(+), 5 deletions(-) diff --git a/src/daemon/server.ts b/src/daemon/server.ts index 92f42a39f..ee1e66ac8 100644 --- a/src/daemon/server.ts +++ b/src/daemon/server.ts @@ -10,6 +10,8 @@ import { Connection, type ConnectionConfig, DEFAULT_CONFIG } from './connection. import { Router } from './router.js'; import type { Envelope, SendPayload } from '../protocol/types.js'; import { createStorageAdapter, type StorageAdapter, type StorageConfig } from '../storage/adapter.js'; +import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; +import { getProjectPaths } from '../utils/project-namespace.js'; export interface DaemonConfig extends ConnectionConfig { socketPath: string; @@ -189,6 +191,19 @@ export class Daemon { this.router.register(connection); console.log(`[daemon] Agent registered: ${connection.agentName}`); this.writeAgentsFile(); + + // Record session start + if (this.storage instanceof SqliteStorageAdapter) { + const projectPaths = getProjectPaths(); + this.storage.startSession({ + id: connection.sessionId, + agentName: connection.agentName, + cli: connection.cli, + projectId: projectPaths.projectId, + projectRoot: projectPaths.projectRoot, + startedAt: Date.now(), + }).catch(err => console.error('[daemon] Failed to record session start:', err)); + } } }; @@ -197,6 +212,12 @@ export class Daemon { this.connections.delete(connection); this.router.unregister(connection); this.writeAgentsFile(); + + // Record session end + if (this.storage instanceof SqliteStorageAdapter) { + this.storage.endSession(connection.sessionId) + .catch(err => console.error('[daemon] Failed to record session end:', err)); + } }; connection.onError = (error: Error) => { @@ -204,6 +225,12 @@ export class Daemon { this.connections.delete(connection); this.router.unregister(connection); this.writeAgentsFile(); + + // Record session end on error too + if (this.storage instanceof SqliteStorageAdapter) { + this.storage.endSession(connection.sessionId) + .catch(err => console.error('[daemon] Failed to record session end:', err)); + } }; } diff --git a/src/dashboard/server.ts b/src/dashboard/server.ts index 192f167df..9f2e231d2 100644 --- a/src/dashboard/server.ts +++ b/src/dashboard/server.ts @@ -4,7 +4,7 @@ import http from 'http'; import path from 'path'; import fs from 'fs'; import { fileURLToPath } from 'url'; -import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; +import { SqliteStorageAdapter, type StoredSession } from '../storage/sqlite-adapter.js'; import type { StorageAdapter, StoredMessage } from '../storage/adapter.js'; const __filename = fileURLToPath(import.meta.url); @@ -27,6 +27,25 @@ interface Message { id: string; // unique-ish id } +interface SessionInfo { + id: string; + agentName: string; + cli?: string; + startedAt: string; + endedAt?: string; + duration?: string; + messageCount: number; + summary?: string; +} + +interface AgentSummary { + agentName: string; + lastUpdated: string; + currentTask?: string; + completedTasks?: string[]; + context?: string; +} + export async function startDashboard(port: number, dataDir: string, dbPath?: string): Promise { console.log('Starting dashboard...'); console.log('__dirname:', __dirname); @@ -157,9 +176,51 @@ export async function startDashboard(port: number, dataDir: string, dbPath?: str return allMessages; }; + const formatDuration = (startMs: number, endMs?: number): string => { + const end = endMs ?? Date.now(); + const durationMs = end - startMs; + const minutes = Math.floor(durationMs / 60000); + const hours = Math.floor(minutes / 60); + if (hours > 0) { + return `${hours}h ${minutes % 60}m`; + } + return `${minutes}m`; + }; + + const getRecentSessions = async (): Promise => { + if (storage && storage instanceof SqliteStorageAdapter) { + const sessions = await storage.getRecentSessions(20); + return sessions.map(s => ({ + id: s.id, + agentName: s.agentName, + cli: s.cli, + startedAt: new Date(s.startedAt).toISOString(), + endedAt: s.endedAt ? new Date(s.endedAt).toISOString() : undefined, + duration: formatDuration(s.startedAt, s.endedAt), + messageCount: s.messageCount, + summary: s.summary, + })); + } + return []; + }; + + const getAgentSummaries = async (): Promise => { + if (storage && storage instanceof SqliteStorageAdapter) { + const summaries = await storage.getAllAgentSummaries(); + return summaries.map(s => ({ + agentName: s.agentName, + lastUpdated: new Date(s.lastUpdated).toISOString(), + currentTask: s.currentTask, + completedTasks: s.completedTasks, + context: s.context, + })); + } + return []; + }; + const getAllData = async () => { const team = getTeamData(); - if (!team) return { agents: [], messages: [], activity: [] }; + if (!team) return { agents: [], messages: [], activity: [], sessions: [], summaries: [] }; const agentsMap = new Map(); const allMessages: Message[] = await getMessages(team.agents); @@ -202,10 +263,18 @@ export async function startDashboard(port: number, dataDir: string, dbPath?: str } }); + // Fetch sessions and summaries in parallel + const [sessions, summaries] = await Promise.all([ + getRecentSessions(), + getAgentSummaries(), + ]); + return { agents: Array.from(agentsMap.values()), messages: allMessages, - activity: allMessages // For now, activity log is just the message log + activity: allMessages, // For now, activity log is just the message log + sessions, + summaries, }; }; diff --git a/src/storage/sqlite-adapter.ts b/src/storage/sqlite-adapter.ts index e0239080d..9259e7653 100644 --- a/src/storage/sqlite-adapter.ts +++ b/src/storage/sqlite-adapter.ts @@ -7,6 +7,25 @@ export interface SqliteAdapterOptions { dbPath: string; } +export interface StoredSession { + id: string; + agentName: string; + cli?: string; + projectId?: string; + projectRoot?: string; + startedAt: number; + endedAt?: number; + messageCount: number; + summary?: string; +} + +export interface SessionQuery { + agentName?: string; + projectId?: string; + since?: number; + limit?: number; +} + type SqliteDriverName = 'better-sqlite3' | 'node'; interface SqliteStatement { @@ -26,6 +45,7 @@ export class SqliteStorageAdapter implements StorageAdapter { private dbPath: string; private db?: SqliteDatabase; private insertStmt?: SqliteStatement; + private insertSessionStmt?: SqliteStatement; private driver?: SqliteDriverName; constructor(options: SqliteAdapterOptions) { @@ -110,6 +130,33 @@ export class SqliteStorageAdapter implements StorageAdapter { CREATE INDEX IF NOT EXISTS idx_messages_recipient ON messages (recipient); CREATE INDEX IF NOT EXISTS idx_messages_topic ON messages (topic); CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages (thread); + + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + agent_name TEXT NOT NULL, + cli TEXT, + project_id TEXT, + project_root TEXT, + started_at INTEGER NOT NULL, + ended_at INTEGER, + message_count INTEGER DEFAULT 0, + summary TEXT + ); + CREATE INDEX IF NOT EXISTS idx_sessions_agent ON sessions (agent_name); + CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions (started_at); + CREATE INDEX IF NOT EXISTS idx_sessions_project ON sessions (project_id); + + CREATE TABLE IF NOT EXISTS agent_summaries ( + agent_name TEXT PRIMARY KEY, + project_id TEXT, + last_updated INTEGER NOT NULL, + current_task TEXT, + completed_tasks TEXT, + decisions TEXT, + context TEXT, + files TEXT + ); + CREATE INDEX IF NOT EXISTS idx_summaries_updated ON agent_summaries (last_updated); `); // Migration: add thread column if missing (for existing databases) @@ -244,4 +291,207 @@ export class SqliteStorageAdapter implements StorageAdapter { this.db = undefined; } } + + // ============ Session Management ============ + + async startSession(session: Omit): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + INSERT OR REPLACE INTO sessions + (id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + + stmt.run( + session.id, + session.agentName, + session.cli ?? null, + session.projectId ?? null, + session.projectRoot ?? null, + session.startedAt, + session.endedAt ?? null, + 0, + session.summary ?? null + ); + } + + async endSession(sessionId: string, summary?: string): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + UPDATE sessions + SET ended_at = ?, summary = COALESCE(?, summary) + WHERE id = ? + `); + + stmt.run(Date.now(), summary ?? null, sessionId); + } + + async incrementSessionMessageCount(sessionId: string): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + UPDATE sessions SET message_count = message_count + 1 WHERE id = ? + `); + + stmt.run(sessionId); + } + + async getSessions(query: SessionQuery = {}): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const clauses: string[] = []; + const params: unknown[] = []; + + if (query.agentName) { + clauses.push('agent_name = ?'); + params.push(query.agentName); + } + if (query.projectId) { + clauses.push('project_id = ?'); + params.push(query.projectId); + } + if (query.since) { + clauses.push('started_at >= ?'); + params.push(query.since); + } + + const where = clauses.length ? `WHERE ${clauses.join(' AND ')}` : ''; + const limit = query.limit ?? 50; + + const stmt = this.db.prepare(` + SELECT id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary + FROM sessions + ${where} + ORDER BY started_at DESC + LIMIT ? + `); + + const rows = stmt.all(...params, limit); + return rows.map((row: any) => ({ + id: row.id, + agentName: row.agent_name, + cli: row.cli ?? undefined, + projectId: row.project_id ?? undefined, + projectRoot: row.project_root ?? undefined, + startedAt: row.started_at, + endedAt: row.ended_at ?? undefined, + messageCount: row.message_count, + summary: row.summary ?? undefined, + })); + } + + async getRecentSessions(limit: number = 10): Promise { + return this.getSessions({ limit }); + } + + // ============ Agent Summaries ============ + + async saveAgentSummary(summary: { + agentName: string; + projectId?: string; + currentTask?: string; + completedTasks?: string[]; + decisions?: string[]; + context?: string; + files?: string[]; + }): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + INSERT OR REPLACE INTO agent_summaries + (agent_name, project_id, last_updated, current_task, completed_tasks, decisions, context, files) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `); + + stmt.run( + summary.agentName, + summary.projectId ?? null, + Date.now(), + summary.currentTask ?? null, + summary.completedTasks ? JSON.stringify(summary.completedTasks) : null, + summary.decisions ? JSON.stringify(summary.decisions) : null, + summary.context ?? null, + summary.files ? JSON.stringify(summary.files) : null + ); + } + + async getAgentSummary(agentName: string): Promise<{ + agentName: string; + projectId?: string; + lastUpdated: number; + currentTask?: string; + completedTasks?: string[]; + decisions?: string[]; + context?: string; + files?: string[]; + } | null> { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + SELECT agent_name, project_id, last_updated, current_task, completed_tasks, decisions, context, files + FROM agent_summaries + WHERE agent_name = ? + `); + + const row: any = stmt.get(agentName); + if (!row) return null; + + return { + agentName: row.agent_name, + projectId: row.project_id ?? undefined, + lastUpdated: row.last_updated, + currentTask: row.current_task ?? undefined, + completedTasks: row.completed_tasks ? JSON.parse(row.completed_tasks) : undefined, + decisions: row.decisions ? JSON.parse(row.decisions) : undefined, + context: row.context ?? undefined, + files: row.files ? JSON.parse(row.files) : undefined, + }; + } + + async getAllAgentSummaries(): Promise> { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + SELECT agent_name, project_id, last_updated, current_task, completed_tasks, decisions, context, files + FROM agent_summaries + ORDER BY last_updated DESC + `); + + const rows = stmt.all(); + return rows.map((row: any) => ({ + agentName: row.agent_name, + projectId: row.project_id ?? undefined, + lastUpdated: row.last_updated, + currentTask: row.current_task ?? undefined, + completedTasks: row.completed_tasks ? JSON.parse(row.completed_tasks) : undefined, + decisions: row.decisions ? JSON.parse(row.decisions) : undefined, + context: row.context ?? undefined, + files: row.files ? JSON.parse(row.files) : undefined, + })); + } } diff --git a/src/utils/project-namespace.ts b/src/utils/project-namespace.ts index c0ad207c8..dcc9e5c44 100644 --- a/src/utils/project-namespace.ts +++ b/src/utils/project-namespace.ts @@ -9,8 +9,32 @@ import crypto from 'node:crypto'; import path from 'node:path'; import fs from 'node:fs'; +import os from 'node:os'; -const BASE_DIR = '/tmp/agent-relay'; +/** + * Get the base directory for agent-relay data. + * Priority: + * 1. AGENT_RELAY_DATA_DIR environment variable + * 2. XDG_DATA_HOME/agent-relay (Linux/macOS standard) + * 3. ~/.agent-relay (fallback) + */ +function getBaseDir(): string { + // Explicit override + if (process.env.AGENT_RELAY_DATA_DIR) { + return process.env.AGENT_RELAY_DATA_DIR; + } + + // XDG Base Directory Specification + const xdgDataHome = process.env.XDG_DATA_HOME; + if (xdgDataHome) { + return path.join(xdgDataHome, 'agent-relay'); + } + + // Default: ~/.agent-relay + return path.join(os.homedir(), '.agent-relay'); +} + +const BASE_DIR = getBaseDir(); /** * Generate a short hash of a path for namespacing diff --git a/src/wrapper/parser.ts b/src/wrapper/parser.ts index 2b9e2e625..495a4c75d 100644 --- a/src/wrapper/parser.ts +++ b/src/wrapper/parser.ts @@ -526,3 +526,42 @@ export function formatIncomingMessage(from: string, body: string, kind: PayloadK const prefix = kind === 'thinking' ? '[THINKING]' : '[MSG]'; return `\n${prefix} from ${from}: ${body}\n`; } + +/** + * Parsed summary block from agent output. + */ +export interface ParsedSummary { + currentTask?: string; + completedTasks?: string[]; + decisions?: string[]; + context?: string; + files?: string[]; +} + +/** + * Parse [[SUMMARY]]...[[/SUMMARY]] blocks from agent output. + * Agents can output summaries to keep a running context of their work. + * + * Format: + * [[SUMMARY]] + * { + * "currentTask": "Working on auth module", + * "context": "Completed login flow, now implementing logout", + * "files": ["src/auth.ts", "src/session.ts"] + * } + * [[/SUMMARY]] + */ +export function parseSummaryFromOutput(output: string): ParsedSummary | null { + const match = output.match(/\[\[SUMMARY\]\]([\s\S]*?)\[\[\/SUMMARY\]\]/); + + if (!match) { + return null; + } + + try { + return JSON.parse(match[1].trim()) as ParsedSummary; + } catch { + console.error('[parser] Invalid JSON in SUMMARY block'); + return null; + } +} diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index f1006eb11..cef585cdd 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -15,9 +15,11 @@ import { exec, execSync, spawn, ChildProcess } from 'node:child_process'; import { promisify } from 'node:util'; import { RelayClient } from './client.js'; -import { OutputParser, type ParsedCommand } from './parser.js'; +import { OutputParser, type ParsedCommand, parseSummaryFromOutput, type ParsedSummary } from './parser.js'; import { InboxManager } from './inbox.js'; import type { SendPayload } from '../protocol/types.js'; +import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; +import { getProjectPaths } from '../utils/project-namespace.js'; const execAsync = promisify(exec); const escapeRegex = (str: string): string => str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); @@ -76,6 +78,7 @@ export class TmuxWrapper { private client: RelayClient; private parser: OutputParser; private inbox?: InboxManager; + private storage?: SqliteStorageAdapter; private running = false; private pollTimer?: NodeJS.Timeout; private attachProcess?: ChildProcess; @@ -92,6 +95,7 @@ export class TmuxWrapper { private lastDebugLog = 0; private cliType: 'claude' | 'codex' | 'gemini' | 'other'; private relayPrefix: string; + private lastSummaryHash = ''; // Dedup summary saves constructor(config: TmuxWrapperConfig) { this.config = { @@ -143,6 +147,15 @@ export class TmuxWrapper { }); } + // Initialize storage for session/summary persistence + const projectPaths = getProjectPaths(); + this.storage = new SqliteStorageAdapter({ dbPath: projectPaths.dbPath }); + // Initialize asynchronously (don't block constructor) + this.storage.init().catch(err => { + this.logStderr(`Failed to initialize storage: ${err.message}`, true); + this.storage = undefined; + }); + // Handle incoming messages from relay this.client.onMessage = (from: string, payload: SendPayload, messageId: string) => { this.handleIncomingMessage(from, payload, messageId); @@ -456,6 +469,9 @@ export class TmuxWrapper { this.sendRelayCommand(cmd); } + // Check for [[SUMMARY]] blocks and save to storage + this.parseSummaryAndSave(cleanContent); + this.updateActivityState(); // Also check for injection opportunity @@ -601,6 +617,44 @@ export class TmuxWrapper { } } + /** + * Parse [[SUMMARY]] blocks from output and save to storage. + * Agents can output summaries to maintain running context: + * + * [[SUMMARY]] + * {"currentTask": "Implementing auth", "context": "Completed login flow"} + * [[/SUMMARY]] + */ + private parseSummaryAndSave(content: string): void { + const summary = parseSummaryFromOutput(content); + if (!summary) return; + + // Dedup - don't save same summary twice + const summaryHash = JSON.stringify(summary); + if (summaryHash === this.lastSummaryHash) return; + this.lastSummaryHash = summaryHash; + + if (!this.storage) { + this.logStderr('Cannot save summary: storage not initialized'); + return; + } + + const projectPaths = getProjectPaths(); + this.storage.saveAgentSummary({ + agentName: this.config.name, + projectId: projectPaths.projectId, + currentTask: summary.currentTask, + completedTasks: summary.completedTasks, + decisions: summary.decisions, + context: summary.context, + files: summary.files, + }).then(() => { + this.logStderr(`Saved agent summary: ${summary.currentTask || 'updated context'}`); + }).catch(err => { + this.logStderr(`Failed to save summary: ${err.message}`, true); + }); + } + /** * Handle incoming message from relay */ From c91788035ae7ab087b0c24e03c6081263af31f55 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 07:29:04 +0000 Subject: [PATCH 02/12] Sync package-lock.json version --- package-lock.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index b723be462..2542af10d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "agent-relay", - "version": "0.1.0", + "version": "1.0.7", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "agent-relay", - "version": "0.1.0", + "version": "1.0.7", "license": "MIT", "dependencies": { "better-sqlite3": "^9.4.3", From 1fdda8b24793bfba17c7da2583fa4d22891dee0e Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 07:33:17 +0000 Subject: [PATCH 03/12] Add summary instructions to agent startup injection --- src/wrapper/tmux-wrapper.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index cef585cdd..d20159045 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -328,8 +328,8 @@ export class TmuxWrapper { const instructions = [ `[Agent Relay] You are "${this.config.name}" - connected for real-time messaging.`, `SEND: ${this.relayPrefix}AgentName message (or ${this.relayPrefix}* to broadcast)`, - `RECEIVE: "Relay message from X [id]: content"`, - `TRUNCATED: Run "agent-relay read " if message seems cut off`, + `RECEIVE: Messages appear as "Relay message from X [id]: content"`, + `SUMMARY: Periodically output [[SUMMARY]]{"currentTask":"...","context":"..."}[[/SUMMARY]] to track your progress`, ].join(' | '); try { From b95ec145b4dc39eaba5d6465171f2ef540235e3d Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 07:35:26 +0000 Subject: [PATCH 04/12] Add isActive flag to session info for distinguishing active vs past sessions --- src/dashboard/server.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/dashboard/server.ts b/src/dashboard/server.ts index 9f2e231d2..2f7bc6ce1 100644 --- a/src/dashboard/server.ts +++ b/src/dashboard/server.ts @@ -36,6 +36,8 @@ interface SessionInfo { duration?: string; messageCount: number; summary?: string; + /** true if session is still active (no endedAt) */ + isActive: boolean; } interface AgentSummary { @@ -199,6 +201,7 @@ export async function startDashboard(port: number, dataDir: string, dbPath?: str duration: formatDuration(s.startedAt, s.endedAt), messageCount: s.messageCount, summary: s.summary, + isActive: !s.endedAt, // Active if no end time })); } return []; From 0a3e6d52548b97488a6958ea63365d927f3cafb5 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 07:42:20 +0000 Subject: [PATCH 05/12] Add explicit session closing via [[SESSION_END]] blocks - Add [[SESSION_END]]...[[/SESSION_END]] parsing in parser.ts - Add closedBy field to StoredSession ('agent' | 'disconnect' | 'error') - Update endSession() to accept closedBy option - Add parseSessionEndAndClose() to tmux-wrapper - Update daemon to pass closedBy on disconnect/error - Include SESSION_END in startup instructions for agents --- src/daemon/server.ts | 8 +++--- src/storage/sqlite-adapter.ts | 22 ++++++++++++---- src/wrapper/parser.ts | 39 ++++++++++++++++++++++++++++ src/wrapper/tmux-wrapper.ts | 48 +++++++++++++++++++++++++++++++++-- 4 files changed, 106 insertions(+), 11 deletions(-) diff --git a/src/daemon/server.ts b/src/daemon/server.ts index ee1e66ac8..728d3457a 100644 --- a/src/daemon/server.ts +++ b/src/daemon/server.ts @@ -213,9 +213,9 @@ export class Daemon { this.router.unregister(connection); this.writeAgentsFile(); - // Record session end + // Record session end (disconnect - agent may still mark it closed explicitly) if (this.storage instanceof SqliteStorageAdapter) { - this.storage.endSession(connection.sessionId) + this.storage.endSession(connection.sessionId, { closedBy: 'disconnect' }) .catch(err => console.error('[daemon] Failed to record session end:', err)); } }; @@ -226,9 +226,9 @@ export class Daemon { this.router.unregister(connection); this.writeAgentsFile(); - // Record session end on error too + // Record session end on error if (this.storage instanceof SqliteStorageAdapter) { - this.storage.endSession(connection.sessionId) + this.storage.endSession(connection.sessionId, { closedBy: 'error' }) .catch(err => console.error('[daemon] Failed to record session end:', err)); } }; diff --git a/src/storage/sqlite-adapter.ts b/src/storage/sqlite-adapter.ts index 9259e7653..8c4528f3a 100644 --- a/src/storage/sqlite-adapter.ts +++ b/src/storage/sqlite-adapter.ts @@ -17,6 +17,8 @@ export interface StoredSession { endedAt?: number; messageCount: number; summary?: string; + /** How the session was closed: 'agent' (explicit), 'disconnect', 'error', or undefined (still active) */ + closedBy?: 'agent' | 'disconnect' | 'error'; } export interface SessionQuery { @@ -140,7 +142,8 @@ export class SqliteStorageAdapter implements StorageAdapter { started_at INTEGER NOT NULL, ended_at INTEGER, message_count INTEGER DEFAULT 0, - summary TEXT + summary TEXT, + closed_by TEXT ); CREATE INDEX IF NOT EXISTS idx_sessions_agent ON sessions (agent_name); CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions (started_at); @@ -318,18 +321,26 @@ export class SqliteStorageAdapter implements StorageAdapter { ); } - async endSession(sessionId: string, summary?: string): Promise { + async endSession( + sessionId: string, + options?: { summary?: string; closedBy?: 'agent' | 'disconnect' | 'error' } + ): Promise { if (!this.db) { throw new Error('SqliteStorageAdapter not initialized'); } const stmt = this.db.prepare(` UPDATE sessions - SET ended_at = ?, summary = COALESCE(?, summary) + SET ended_at = ?, summary = COALESCE(?, summary), closed_by = ? WHERE id = ? `); - stmt.run(Date.now(), summary ?? null, sessionId); + stmt.run( + Date.now(), + options?.summary ?? null, + options?.closedBy ?? null, + sessionId + ); } async incrementSessionMessageCount(sessionId: string): Promise { @@ -369,7 +380,7 @@ export class SqliteStorageAdapter implements StorageAdapter { const limit = query.limit ?? 50; const stmt = this.db.prepare(` - SELECT id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary + SELECT id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary, closed_by FROM sessions ${where} ORDER BY started_at DESC @@ -387,6 +398,7 @@ export class SqliteStorageAdapter implements StorageAdapter { endedAt: row.ended_at ?? undefined, messageCount: row.message_count, summary: row.summary ?? undefined, + closedBy: row.closed_by ?? undefined, })); } diff --git a/src/wrapper/parser.ts b/src/wrapper/parser.ts index 495a4c75d..8fc77ffbd 100644 --- a/src/wrapper/parser.ts +++ b/src/wrapper/parser.ts @@ -565,3 +565,42 @@ export function parseSummaryFromOutput(output: string): ParsedSummary | null { return null; } } + +/** + * Session end marker from agent output. + */ +export interface SessionEndMarker { + summary?: string; + completedTasks?: string[]; +} + +/** + * Parse [[SESSION_END]]...[[/SESSION_END]] blocks from agent output. + * Agents output this to explicitly mark their session as complete. + * + * Format: + * [[SESSION_END]] + * {"summary": "Completed auth module implementation", "completedTasks": ["login", "logout"]} + * [[/SESSION_END]] + * + * Or simply: [[SESSION_END]][[/SESSION_END]] for a clean close without summary. + */ +export function parseSessionEndFromOutput(output: string): SessionEndMarker | null { + const match = output.match(/\[\[SESSION_END\]\]([\s\S]*?)\[\[\/SESSION_END\]\]/); + + if (!match) { + return null; + } + + const content = match[1].trim(); + if (!content) { + return {}; // Empty marker = session ended without summary + } + + try { + return JSON.parse(content) as SessionEndMarker; + } catch { + // If not valid JSON, treat the content as a plain summary string + return { summary: content }; + } +} diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index d20159045..ea4c4047f 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -15,7 +15,7 @@ import { exec, execSync, spawn, ChildProcess } from 'node:child_process'; import { promisify } from 'node:util'; import { RelayClient } from './client.js'; -import { OutputParser, type ParsedCommand, parseSummaryFromOutput, type ParsedSummary } from './parser.js'; +import { OutputParser, type ParsedCommand, parseSummaryFromOutput, parseSessionEndFromOutput, type ParsedSummary } from './parser.js'; import { InboxManager } from './inbox.js'; import type { SendPayload } from '../protocol/types.js'; import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; @@ -329,7 +329,8 @@ export class TmuxWrapper { `[Agent Relay] You are "${this.config.name}" - connected for real-time messaging.`, `SEND: ${this.relayPrefix}AgentName message (or ${this.relayPrefix}* to broadcast)`, `RECEIVE: Messages appear as "Relay message from X [id]: content"`, - `SUMMARY: Periodically output [[SUMMARY]]{"currentTask":"...","context":"..."}[[/SUMMARY]] to track your progress`, + `SUMMARY: Periodically output [[SUMMARY]]{"currentTask":"...","context":"..."}[[/SUMMARY]] to track progress`, + `END: Output [[SESSION_END]]{"summary":"..."}[[/SESSION_END]] when your task is complete`, ].join(' | '); try { @@ -472,6 +473,9 @@ export class TmuxWrapper { // Check for [[SUMMARY]] blocks and save to storage this.parseSummaryAndSave(cleanContent); + // Check for [[SESSION_END]] blocks to explicitly close session + this.parseSessionEndAndClose(cleanContent); + this.updateActivityState(); // Also check for injection opportunity @@ -655,6 +659,46 @@ export class TmuxWrapper { }); } + private sessionEndProcessed = false; // Track if we've already processed session end + + /** + * Parse [[SESSION_END]] blocks from output and close session explicitly. + * Agents output this to mark their work session as complete: + * + * [[SESSION_END]] + * {"summary": "Completed auth module", "completedTasks": ["login", "logout"]} + * [[/SESSION_END]] + */ + private parseSessionEndAndClose(content: string): void { + if (this.sessionEndProcessed) return; // Only process once + + const sessionEnd = parseSessionEndFromOutput(content); + if (!sessionEnd) return; + + this.sessionEndProcessed = true; + + if (!this.storage) { + this.logStderr('Cannot close session: storage not initialized'); + return; + } + + // Get session ID from client connection + const sessionId = this.client.sessionId; + if (!sessionId) { + this.logStderr('Cannot close session: no session ID'); + return; + } + + this.storage.endSession(sessionId, { + summary: sessionEnd.summary, + closedBy: 'agent', + }).then(() => { + this.logStderr(`Session closed by agent: ${sessionEnd.summary || 'complete'}`); + }).catch(err => { + this.logStderr(`Failed to close session: ${err.message}`, true); + }); + } + /** * Handle incoming message from relay */ From 1c7b71e3eb2dc43ea0f0988ab49bed3e7fec1a84 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 07:49:19 +0000 Subject: [PATCH 06/12] Add adapter pattern for sessions/summaries with full test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move StoredSession, SessionQuery, AgentSummary types to base adapter.ts - Add optional session/summary methods to StorageAdapter interface - This enables future MySQL/Postgres implementations - Add 12 tests for session management (start, end, query, closedBy) - Add 4 tests for agent summaries (save, get, getAll, update) - Add 3 tests for getMessageById - Add 11 tests for parseSummaryFromOutput and parseSessionEndFromOutput - Coverage: sqlite-adapter 43%→86%, parser 90%→97% --- src/storage/adapter.ts | 44 ++++++++ src/storage/sqlite-adapter.test.ts | 175 +++++++++++++++++++++++++++++ src/storage/sqlite-adapter.ts | 53 ++------- src/wrapper/parser.test.ts | 118 ++++++++++++++++++- 4 files changed, 348 insertions(+), 42 deletions(-) diff --git a/src/storage/adapter.ts b/src/storage/adapter.ts index 31255e317..df6903457 100644 --- a/src/storage/adapter.ts +++ b/src/storage/adapter.ts @@ -27,12 +27,56 @@ export interface MessageQuery { order?: 'asc' | 'desc'; } +export interface StoredSession { + id: string; + agentName: string; + cli?: string; + projectId?: string; + projectRoot?: string; + startedAt: number; + endedAt?: number; + messageCount: number; + summary?: string; + /** How the session was closed: 'agent' (explicit), 'disconnect', 'error', or undefined (still active) */ + closedBy?: 'agent' | 'disconnect' | 'error'; +} + +export interface SessionQuery { + agentName?: string; + projectId?: string; + since?: number; + limit?: number; +} + +export interface AgentSummary { + agentName: string; + projectId?: string; + lastUpdated: number; + currentTask?: string; + completedTasks?: string[]; + decisions?: string[]; + context?: string; + files?: string[]; +} + export interface StorageAdapter { init(): Promise; saveMessage(message: StoredMessage): Promise; getMessages(query?: MessageQuery): Promise; getMessageById?(id: string): Promise; close?(): Promise; + + // Session management (optional - for adapters that support it) + startSession?(session: Omit): Promise; + endSession?(sessionId: string, options?: { summary?: string; closedBy?: 'agent' | 'disconnect' | 'error' }): Promise; + getSessions?(query?: SessionQuery): Promise; + getRecentSessions?(limit?: number): Promise; + incrementSessionMessageCount?(sessionId: string): Promise; + + // Agent summaries (optional - for adapters that support it) + saveAgentSummary?(summary: Omit): Promise; + getAgentSummary?(agentName: string): Promise; + getAllAgentSummaries?(): Promise; } /** diff --git a/src/storage/sqlite-adapter.test.ts b/src/storage/sqlite-adapter.test.ts index 40b7e378f..b70f1a23a 100644 --- a/src/storage/sqlite-adapter.test.ts +++ b/src/storage/sqlite-adapter.test.ts @@ -107,4 +107,179 @@ describe('SqliteStorageAdapter', () => { const rows = await adapter.getMessages(); expect(rows.map(r => r.id)).toEqual(['fallback-1']); }); + + describe('Session Management', () => { + it('starts and retrieves a session', async () => { + const sessionId = 'session-1'; + await adapter.startSession({ + id: sessionId, + agentName: 'TestAgent', + cli: 'claude', + projectId: 'proj-123', + projectRoot: '/home/test/project', + startedAt: Date.now(), + }); + + const sessions = await adapter.getSessions(); + expect(sessions).toHaveLength(1); + expect(sessions[0]).toMatchObject({ + id: sessionId, + agentName: 'TestAgent', + cli: 'claude', + projectId: 'proj-123', + messageCount: 0, + }); + expect(sessions[0].endedAt).toBeUndefined(); + }); + + it('ends a session with closedBy reason', async () => { + const sessionId = 'session-2'; + await adapter.startSession({ + id: sessionId, + agentName: 'Agent2', + startedAt: Date.now() - 5000, + }); + + await adapter.endSession(sessionId, { + summary: 'Completed auth module', + closedBy: 'agent', + }); + + const sessions = await adapter.getSessions(); + expect(sessions[0]).toMatchObject({ + id: sessionId, + summary: 'Completed auth module', + closedBy: 'agent', + }); + expect(sessions[0].endedAt).toBeDefined(); + }); + + it('increments session message count', async () => { + const sessionId = 'session-3'; + await adapter.startSession({ + id: sessionId, + agentName: 'Agent3', + startedAt: Date.now(), + }); + + await adapter.incrementSessionMessageCount(sessionId); + await adapter.incrementSessionMessageCount(sessionId); + await adapter.incrementSessionMessageCount(sessionId); + + const sessions = await adapter.getSessions(); + expect(sessions[0].messageCount).toBe(3); + }); + + it('filters sessions by agentName and projectId', async () => { + const now = Date.now(); + await adapter.startSession({ id: 's1', agentName: 'Alice', projectId: 'p1', startedAt: now - 2000 }); + await adapter.startSession({ id: 's2', agentName: 'Bob', projectId: 'p1', startedAt: now - 1000 }); + await adapter.startSession({ id: 's3', agentName: 'Alice', projectId: 'p2', startedAt: now }); + + const aliceSessions = await adapter.getSessions({ agentName: 'Alice' }); + expect(aliceSessions.map(s => s.id)).toEqual(['s3', 's1']); + + const p1Sessions = await adapter.getSessions({ projectId: 'p1' }); + expect(p1Sessions.map(s => s.id)).toEqual(['s2', 's1']); + }); + + it('getRecentSessions returns limited results', async () => { + for (let i = 0; i < 5; i++) { + await adapter.startSession({ + id: `recent-${i}`, + agentName: 'Agent', + startedAt: Date.now() + i * 100, + }); + } + + const recent = await adapter.getRecentSessions(3); + expect(recent).toHaveLength(3); + expect(recent[0].id).toBe('recent-4'); // Most recent first + }); + }); + + describe('Agent Summaries', () => { + it('saves and retrieves an agent summary', async () => { + await adapter.saveAgentSummary({ + agentName: 'SummaryAgent', + projectId: 'proj-1', + currentTask: 'Implementing auth', + completedTasks: ['setup', 'database'], + decisions: ['Use JWT'], + context: 'Working on login flow', + files: ['src/auth.ts', 'src/login.ts'], + }); + + const summary = await adapter.getAgentSummary('SummaryAgent'); + expect(summary).not.toBeNull(); + expect(summary).toMatchObject({ + agentName: 'SummaryAgent', + projectId: 'proj-1', + currentTask: 'Implementing auth', + completedTasks: ['setup', 'database'], + decisions: ['Use JWT'], + context: 'Working on login flow', + files: ['src/auth.ts', 'src/login.ts'], + }); + expect(summary!.lastUpdated).toBeDefined(); + }); + + it('returns null for non-existent agent summary', async () => { + const summary = await adapter.getAgentSummary('NonExistent'); + expect(summary).toBeNull(); + }); + + it('updates existing summary on save', async () => { + await adapter.saveAgentSummary({ + agentName: 'UpdateAgent', + currentTask: 'Task 1', + }); + + await adapter.saveAgentSummary({ + agentName: 'UpdateAgent', + currentTask: 'Task 2', + completedTasks: ['Task 1'], + }); + + const summary = await adapter.getAgentSummary('UpdateAgent'); + expect(summary?.currentTask).toBe('Task 2'); + expect(summary?.completedTasks).toEqual(['Task 1']); + }); + + it('getAllAgentSummaries returns all summaries ordered by lastUpdated', async () => { + await adapter.saveAgentSummary({ agentName: 'Agent1', currentTask: 'T1' }); + await new Promise(r => setTimeout(r, 10)); // Small delay for different timestamps + await adapter.saveAgentSummary({ agentName: 'Agent2', currentTask: 'T2' }); + await new Promise(r => setTimeout(r, 10)); + await adapter.saveAgentSummary({ agentName: 'Agent3', currentTask: 'T3' }); + + const summaries = await adapter.getAllAgentSummaries(); + expect(summaries).toHaveLength(3); + expect(summaries[0].agentName).toBe('Agent3'); // Most recent first + expect(summaries[2].agentName).toBe('Agent1'); // Oldest last + }); + }); + + describe('getMessageById', () => { + it('retrieves message by exact ID', async () => { + await adapter.saveMessage(makeMessage({ id: 'exact-id-123', body: 'hello' })); + + const msg = await adapter.getMessageById('exact-id-123'); + expect(msg).not.toBeNull(); + expect(msg?.body).toBe('hello'); + }); + + it('retrieves message by ID prefix', async () => { + await adapter.saveMessage(makeMessage({ id: 'prefix-abc-xyz-123', body: 'world' })); + + const msg = await adapter.getMessageById('prefix-abc'); + expect(msg).not.toBeNull(); + expect(msg?.id).toBe('prefix-abc-xyz-123'); + }); + + it('returns null for non-existent message', async () => { + const msg = await adapter.getMessageById('does-not-exist'); + expect(msg).toBeNull(); + }); + }); }); diff --git a/src/storage/sqlite-adapter.ts b/src/storage/sqlite-adapter.ts index 8c4528f3a..23b2d67b9 100644 --- a/src/storage/sqlite-adapter.ts +++ b/src/storage/sqlite-adapter.ts @@ -1,32 +1,21 @@ import path from 'node:path'; import fs from 'node:fs'; import { createRequire } from 'node:module'; -import { type MessageQuery, type StorageAdapter, type StoredMessage } from './adapter.js'; +import { + type AgentSummary, + type MessageQuery, + type SessionQuery, + type StorageAdapter, + type StoredMessage, + type StoredSession, +} from './adapter.js'; export interface SqliteAdapterOptions { dbPath: string; } -export interface StoredSession { - id: string; - agentName: string; - cli?: string; - projectId?: string; - projectRoot?: string; - startedAt: number; - endedAt?: number; - messageCount: number; - summary?: string; - /** How the session was closed: 'agent' (explicit), 'disconnect', 'error', or undefined (still active) */ - closedBy?: 'agent' | 'disconnect' | 'error'; -} - -export interface SessionQuery { - agentName?: string; - projectId?: string; - since?: number; - limit?: number; -} +// Re-export types for backwards compatibility +export type { StoredSession, SessionQuery } from './adapter.js'; type SqliteDriverName = 'better-sqlite3' | 'node'; @@ -439,16 +428,7 @@ export class SqliteStorageAdapter implements StorageAdapter { ); } - async getAgentSummary(agentName: string): Promise<{ - agentName: string; - projectId?: string; - lastUpdated: number; - currentTask?: string; - completedTasks?: string[]; - decisions?: string[]; - context?: string; - files?: string[]; - } | null> { + async getAgentSummary(agentName: string): Promise { if (!this.db) { throw new Error('SqliteStorageAdapter not initialized'); } @@ -474,16 +454,7 @@ export class SqliteStorageAdapter implements StorageAdapter { }; } - async getAllAgentSummaries(): Promise> { + async getAllAgentSummaries(): Promise { if (!this.db) { throw new Error('SqliteStorageAdapter not initialized'); } diff --git a/src/wrapper/parser.test.ts b/src/wrapper/parser.test.ts index 7656a52f9..5cab9d2cd 100644 --- a/src/wrapper/parser.test.ts +++ b/src/wrapper/parser.test.ts @@ -3,7 +3,7 @@ */ import { describe, it, expect, beforeEach } from 'vitest'; -import { OutputParser, formatIncomingMessage } from './parser.js'; +import { OutputParser, formatIncomingMessage, parseSummaryFromOutput, parseSessionEndFromOutput } from './parser.js'; describe('OutputParser', () => { let parser: OutputParser; @@ -594,3 +594,119 @@ describe('formatIncomingMessage', () => { expect(result).toBe('\n[MSG] from agent1: Line 1\nLine 2\nLine 3\n'); }); }); + +describe('parseSummaryFromOutput', () => { + it('parses valid JSON summary block', () => { + const output = `Some output +[[SUMMARY]] +{ + "currentTask": "Implementing auth", + "context": "Working on login flow", + "files": ["src/auth.ts"] +} +[[/SUMMARY]] +More output`; + + const summary = parseSummaryFromOutput(output); + expect(summary).not.toBeNull(); + expect(summary).toEqual({ + currentTask: 'Implementing auth', + context: 'Working on login flow', + files: ['src/auth.ts'], + }); + }); + + it('parses summary with all fields', () => { + const output = `[[SUMMARY]]{"currentTask":"Task 1","completedTasks":["T0"],"decisions":["Use JWT"],"context":"Auth work","files":["a.ts","b.ts"]}[[/SUMMARY]]`; + + const summary = parseSummaryFromOutput(output); + expect(summary).toEqual({ + currentTask: 'Task 1', + completedTasks: ['T0'], + decisions: ['Use JWT'], + context: 'Auth work', + files: ['a.ts', 'b.ts'], + }); + }); + + it('returns null when no summary block exists', () => { + const output = 'Just regular output without any summary block'; + + const summary = parseSummaryFromOutput(output); + expect(summary).toBeNull(); + }); + + it('returns null for invalid JSON', () => { + const output = '[[SUMMARY]]not valid json[[/SUMMARY]]'; + + const summary = parseSummaryFromOutput(output); + expect(summary).toBeNull(); + }); + + it('handles empty summary block', () => { + const output = '[[SUMMARY]]{}[[/SUMMARY]]'; + + const summary = parseSummaryFromOutput(output); + expect(summary).toEqual({}); + }); +}); + +describe('parseSessionEndFromOutput', () => { + it('parses valid JSON session end block', () => { + const output = `Some output +[[SESSION_END]] +{ + "summary": "Completed auth module", + "completedTasks": ["login", "logout"] +} +[[/SESSION_END]] +More output`; + + const result = parseSessionEndFromOutput(output); + expect(result).not.toBeNull(); + expect(result).toEqual({ + summary: 'Completed auth module', + completedTasks: ['login', 'logout'], + }); + }); + + it('parses empty session end block', () => { + const output = '[[SESSION_END]][[/SESSION_END]]'; + + const result = parseSessionEndFromOutput(output); + expect(result).toEqual({}); + }); + + it('parses session end with only summary', () => { + const output = '[[SESSION_END]]{"summary":"All done!"}[[/SESSION_END]]'; + + const result = parseSessionEndFromOutput(output); + expect(result).toEqual({ summary: 'All done!' }); + }); + + it('treats non-JSON content as plain summary', () => { + const output = '[[SESSION_END]]Work completed successfully[[/SESSION_END]]'; + + const result = parseSessionEndFromOutput(output); + expect(result).toEqual({ summary: 'Work completed successfully' }); + }); + + it('returns null when no session end block exists', () => { + const output = 'Regular output without session end'; + + const result = parseSessionEndFromOutput(output); + expect(result).toBeNull(); + }); + + it('handles multiline plain text summary', () => { + const output = `[[SESSION_END]] +Completed the following: +- Feature A +- Feature B +[[/SESSION_END]]`; + + const result = parseSessionEndFromOutput(output); + expect(result?.summary).toContain('Completed the following:'); + expect(result?.summary).toContain('Feature A'); + }); +}); From 600fb2bc86acf633fa828ca7b2789065fcc221f4 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 07:58:49 +0000 Subject: [PATCH 07/12] Fix: expose sessionId via public getter for TmuxWrapper - Add currentSessionId getter to RelayClient (sessionId was private) - Update TmuxWrapper to use client.currentSessionId - Remove unused ParsedSummary import --- src/wrapper/client.ts | 5 +++++ src/wrapper/tmux-wrapper.ts | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/wrapper/client.ts b/src/wrapper/client.ts index 8fe0443ec..459615fb4 100644 --- a/src/wrapper/client.ts +++ b/src/wrapper/client.ts @@ -72,6 +72,11 @@ export class RelayClient { return this.config.agentName; } + /** Get the session ID assigned by the server */ + get currentSessionId(): string | undefined { + return this.sessionId; + } + /** * Connect to the relay daemon. */ diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index ea4c4047f..feb63f769 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -15,7 +15,7 @@ import { exec, execSync, spawn, ChildProcess } from 'node:child_process'; import { promisify } from 'node:util'; import { RelayClient } from './client.js'; -import { OutputParser, type ParsedCommand, parseSummaryFromOutput, parseSessionEndFromOutput, type ParsedSummary } from './parser.js'; +import { OutputParser, type ParsedCommand, parseSummaryFromOutput, parseSessionEndFromOutput } from './parser.js'; import { InboxManager } from './inbox.js'; import type { SendPayload } from '../protocol/types.js'; import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; @@ -683,7 +683,7 @@ export class TmuxWrapper { } // Get session ID from client connection - const sessionId = this.client.sessionId; + const sessionId = this.client.currentSessionId; if (!sessionId) { this.logStderr('Cannot close session: no session ID'); return; From 12a5fd489206c012748952650cda3d9136b144fb Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 08:02:25 +0000 Subject: [PATCH 08/12] Remove unused imports (DEFAULT_SOCKET_PATH, StoredSession) --- src/cli/index.ts | 2 +- src/dashboard/server.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cli/index.ts b/src/cli/index.ts index 1b4be25b4..132a9ca86 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -11,7 +11,7 @@ import { Command } from 'commander'; import { config as dotenvConfig } from 'dotenv'; -import { Daemon, DEFAULT_SOCKET_PATH } from '../daemon/server.js'; +import { Daemon } from '../daemon/server.js'; import { RelayClient } from '../wrapper/client.js'; import { generateAgentName } from '../utils/name-generator.js'; import fs from 'node:fs'; diff --git a/src/dashboard/server.ts b/src/dashboard/server.ts index 2f7bc6ce1..1414a02aa 100644 --- a/src/dashboard/server.ts +++ b/src/dashboard/server.ts @@ -4,7 +4,7 @@ import http from 'http'; import path from 'path'; import fs from 'fs'; import { fileURLToPath } from 'url'; -import { SqliteStorageAdapter, type StoredSession } from '../storage/sqlite-adapter.js'; +import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; import type { StorageAdapter, StoredMessage } from '../storage/adapter.js'; const __filename = fileURLToPath(import.meta.url); From a166368d2739fd29261f2d143b0360f616ef3296 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 08:48:39 +0000 Subject: [PATCH 09/12] Address PR review comments - Fix storage initialization race condition: add storageReady promise that methods await before using storage - Add resetSessionState() method for wrapper reuse across sessions - Don't set sessionEndProcessed flag if sessionId unavailable (allows retry) - Add documentation for COALESCE pattern in endSession() explaining that null summary preserves existing value - Clarify isActive field documentation: determined by endedAt only, independent of closedBy field - Add closedBy field to SessionInfo interface and API response --- src/dashboard/server.ts | 9 +++- src/storage/sqlite-adapter.ts | 8 +++ src/wrapper/tmux-wrapper.ts | 96 +++++++++++++++++++++-------------- 3 files changed, 74 insertions(+), 39 deletions(-) diff --git a/src/dashboard/server.ts b/src/dashboard/server.ts index 1414a02aa..91864733e 100644 --- a/src/dashboard/server.ts +++ b/src/dashboard/server.ts @@ -36,8 +36,14 @@ interface SessionInfo { duration?: string; messageCount: number; summary?: string; - /** true if session is still active (no endedAt) */ + /** + * true if session is still active (endedAt is not set). + * Note: This is determined solely by endedAt, regardless of how the session + * was closed (agent explicit close, disconnect, or error via closedBy field). + */ isActive: boolean; + /** How the session was closed: 'agent' (explicit), 'disconnect', 'error', or undefined */ + closedBy?: 'agent' | 'disconnect' | 'error'; } interface AgentSummary { @@ -202,6 +208,7 @@ export async function startDashboard(port: number, dataDir: string, dbPath?: str messageCount: s.messageCount, summary: s.summary, isActive: !s.endedAt, // Active if no end time + closedBy: s.closedBy, })); } return []; diff --git a/src/storage/sqlite-adapter.ts b/src/storage/sqlite-adapter.ts index 23b2d67b9..381e986c8 100644 --- a/src/storage/sqlite-adapter.ts +++ b/src/storage/sqlite-adapter.ts @@ -310,6 +310,14 @@ export class SqliteStorageAdapter implements StorageAdapter { ); } + /** + * End a session and optionally set a summary. + * + * Note: The summary uses COALESCE(?, summary) - if a summary was previously + * set (e.g., during startSession or a prior endSession call), passing null/undefined + * for summary will preserve the existing value rather than clearing it. + * To explicitly clear a summary, pass an empty string. + */ async endSession( sessionId: string, options?: { summary?: string; closedBy?: 'agent' | 'disconnect' | 'error' } diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index feb63f769..29469a2e7 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -79,6 +79,7 @@ export class TmuxWrapper { private parser: OutputParser; private inbox?: InboxManager; private storage?: SqliteStorageAdapter; + private storageReady: Promise; // Resolves true if storage initialized, false if failed private running = false; private pollTimer?: NodeJS.Timeout; private attachProcess?: ChildProcess; @@ -96,6 +97,7 @@ export class TmuxWrapper { private cliType: 'claude' | 'codex' | 'gemini' | 'other'; private relayPrefix: string; private lastSummaryHash = ''; // Dedup summary saves + private sessionEndProcessed = false; // Track if we've already processed session end constructor(config: TmuxWrapperConfig) { this.config = { @@ -150,10 +152,11 @@ export class TmuxWrapper { // Initialize storage for session/summary persistence const projectPaths = getProjectPaths(); this.storage = new SqliteStorageAdapter({ dbPath: projectPaths.dbPath }); - // Initialize asynchronously (don't block constructor) - this.storage.init().catch(err => { + // Initialize asynchronously (don't block constructor) - methods await storageReady + this.storageReady = this.storage.init().then(() => true).catch(err => { this.logStderr(`Failed to initialize storage: ${err.message}`, true); this.storage = undefined; + return false; }); // Handle incoming messages from relay @@ -638,29 +641,30 @@ export class TmuxWrapper { if (summaryHash === this.lastSummaryHash) return; this.lastSummaryHash = summaryHash; - if (!this.storage) { - this.logStderr('Cannot save summary: storage not initialized'); - return; - } + // Wait for storage to be ready before saving + this.storageReady.then(ready => { + if (!ready || !this.storage) { + this.logStderr('Cannot save summary: storage not initialized'); + return; + } - const projectPaths = getProjectPaths(); - this.storage.saveAgentSummary({ - agentName: this.config.name, - projectId: projectPaths.projectId, - currentTask: summary.currentTask, - completedTasks: summary.completedTasks, - decisions: summary.decisions, - context: summary.context, - files: summary.files, - }).then(() => { - this.logStderr(`Saved agent summary: ${summary.currentTask || 'updated context'}`); - }).catch(err => { - this.logStderr(`Failed to save summary: ${err.message}`, true); + const projectPaths = getProjectPaths(); + this.storage.saveAgentSummary({ + agentName: this.config.name, + projectId: projectPaths.projectId, + currentTask: summary.currentTask, + completedTasks: summary.completedTasks, + decisions: summary.decisions, + context: summary.context, + files: summary.files, + }).then(() => { + this.logStderr(`Saved agent summary: ${summary.currentTask || 'updated context'}`); + }).catch(err => { + this.logStderr(`Failed to save summary: ${err.message}`, true); + }); }); } - private sessionEndProcessed = false; // Track if we've already processed session end - /** * Parse [[SESSION_END]] blocks from output and close session explicitly. * Agents output this to mark their work session as complete: @@ -670,32 +674,36 @@ export class TmuxWrapper { * [[/SESSION_END]] */ private parseSessionEndAndClose(content: string): void { - if (this.sessionEndProcessed) return; // Only process once + if (this.sessionEndProcessed) return; // Only process once per session const sessionEnd = parseSessionEndFromOutput(content); if (!sessionEnd) return; - this.sessionEndProcessed = true; - - if (!this.storage) { - this.logStderr('Cannot close session: storage not initialized'); - return; - } - - // Get session ID from client connection + // Get session ID from client connection - if not available yet, don't set flag + // so we can retry when sessionId becomes available const sessionId = this.client.currentSessionId; if (!sessionId) { - this.logStderr('Cannot close session: no session ID'); + this.logStderr('Cannot close session: no session ID yet, will retry'); return; } - this.storage.endSession(sessionId, { - summary: sessionEnd.summary, - closedBy: 'agent', - }).then(() => { - this.logStderr(`Session closed by agent: ${sessionEnd.summary || 'complete'}`); - }).catch(err => { - this.logStderr(`Failed to close session: ${err.message}`, true); + this.sessionEndProcessed = true; + + // Wait for storage to be ready before attempting to close session + this.storageReady.then(ready => { + if (!ready || !this.storage) { + this.logStderr('Cannot close session: storage not initialized'); + return; + } + + this.storage.endSession(sessionId, { + summary: sessionEnd.summary, + closedBy: 'agent', + }).then(() => { + this.logStderr(`Session closed by agent: ${sessionEnd.summary || 'complete'}`); + }).catch(err => { + this.logStderr(`Failed to close session: ${err.message}`, true); + }); }); } @@ -845,6 +853,15 @@ export class TmuxWrapper { return new Promise(r => setTimeout(r, ms)); } + /** + * Reset session-specific state for wrapper reuse. + * Call this when starting a new session with the same wrapper instance. + */ + resetSessionState(): void { + this.sessionEndProcessed = false; + this.lastSummaryHash = ''; + } + /** * Stop and cleanup */ @@ -853,6 +870,9 @@ export class TmuxWrapper { this.running = false; this.activityState = 'disconnected'; + // Reset session state for potential reuse + this.resetSessionState(); + // Stop polling if (this.pollTimer) { clearInterval(this.pollTimer); From 3fc6a715e192180e6a8b396d78b21a45bcad2246 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Mon, 22 Dec 2025 15:02:29 +0100 Subject: [PATCH 10/12] fix(relay): shell escaping for Gemini, summary dedup, and tmux status fix 1. Fix Gemini shell keyword execution by wrapping injected messages in backticks. 2. Deduplicate SUMMARY parsing to prevent error log spam for invalid JSON. 3. Increase tmux status-left-length to prevent agent name truncation. --- src/wrapper/parser.ts | 29 +++++++++++++++++++++++++---- src/wrapper/tmux-wrapper.ts | 34 +++++++++++++++++++++++++++++----- 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/src/wrapper/parser.ts b/src/wrapper/parser.ts index 753892de4..313655557 100644 --- a/src/wrapper/parser.ts +++ b/src/wrapper/parser.ts @@ -573,6 +573,16 @@ export interface ParsedSummary { files?: string[]; } +/** + * Result of attempting to parse a SUMMARY block. + */ +export interface SummaryParseResult { + found: boolean; + valid: boolean; + summary: ParsedSummary | null; + rawContent: string | null; // Raw block content for deduplication +} + /** * Parse [[SUMMARY]]...[[/SUMMARY]] blocks from agent output. * Agents can output summaries to keep a running context of their work. @@ -587,17 +597,28 @@ export interface ParsedSummary { * [[/SUMMARY]] */ export function parseSummaryFromOutput(output: string): ParsedSummary | null { + const result = parseSummaryWithDetails(output); + return result.summary; +} + +/** + * Parse SUMMARY block with full details for deduplication. + * Returns raw content to allow caller to dedupe before logging errors. + */ +export function parseSummaryWithDetails(output: string): SummaryParseResult { const match = output.match(/\[\[SUMMARY\]\]([\s\S]*?)\[\[\/SUMMARY\]\]/); if (!match) { - return null; + return { found: false, valid: false, summary: null, rawContent: null }; } + const rawContent = match[1].trim(); + try { - return JSON.parse(match[1].trim()) as ParsedSummary; + const summary = JSON.parse(rawContent) as ParsedSummary; + return { found: true, valid: true, summary, rawContent }; } catch { - console.error('[parser] Invalid JSON in SUMMARY block'); - return null; + return { found: true, valid: false, summary: null, rawContent }; } } diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index c7c448d8a..87b4afcc7 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -15,7 +15,7 @@ import { exec, execSync, spawn, ChildProcess } from 'node:child_process'; import { promisify } from 'node:util'; import { RelayClient } from './client.js'; -import { OutputParser, type ParsedCommand, parseSummaryFromOutput, parseSessionEndFromOutput } from './parser.js'; +import { OutputParser, type ParsedCommand, parseSummaryWithDetails, parseSessionEndFromOutput } from './parser.js'; import { InboxManager } from './inbox.js'; import type { SendPayload } from '../protocol/types.js'; import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; @@ -105,6 +105,7 @@ export class TmuxWrapper { private cliType: 'claude' | 'codex' | 'gemini' | 'other'; private relayPrefix: string; private lastSummaryHash = ''; // Dedup summary saves + private lastSummaryRawContent = ''; // Dedup invalid JSON error logging private sessionEndProcessed = false; // Track if we've already processed session end constructor(config: TmuxWrapperConfig) { @@ -274,6 +275,7 @@ export class TmuxWrapper { 'setw -g alternate-screen on', // Ensure alternate screen works // Pass through mouse scroll to application in alternate screen mode 'set -ga terminal-overrides ",xterm*:Tc"', + 'set -g status-left-length 100', // Provide ample space for agent name in status bar ]; // Add mouse mode if enabled (allows scroll passthrough to CLI apps) @@ -629,10 +631,24 @@ export class TmuxWrapper { * [[/SUMMARY]] */ private parseSummaryAndSave(content: string): void { - const summary = parseSummaryFromOutput(content); - if (!summary) return; + const result = parseSummaryWithDetails(content); - // Dedup - don't save same summary twice + // No SUMMARY block found + if (!result.found) return; + + // Dedup based on raw content - prevents repeated error logging for same invalid JSON + if (result.rawContent === this.lastSummaryRawContent) return; + this.lastSummaryRawContent = result.rawContent || ''; + + // Invalid JSON - log error once (deduped above) + if (!result.valid) { + this.logStderr('[parser] Invalid JSON in SUMMARY block'); + return; + } + + const summary = result.summary!; + + // Dedup valid summaries - don't save same summary twice const summaryHash = JSON.stringify(summary); if (summaryHash === this.lastSummaryHash) return; this.lastSummaryHash = summaryHash; @@ -752,7 +768,14 @@ export class TmuxWrapper { this.logStderr(`Injecting message from ${msg.from} (cli: ${this.cliType})`); try { - const sanitizedBody = msg.body.replace(/[\r\n]+/g, ' ').trim(); + let sanitizedBody = msg.body.replace(/[\r\n]+/g, ' ').trim(); + + // Gemini interprets certain keywords (While, For, If, etc.) as shell commands + // Wrap in backticks to prevent shell keyword interpretation + if (this.cliType === 'gemini') { + sanitizedBody = `\`${sanitizedBody.replace(/`/g, "'")}\``; + } + // Short message ID for display (first 8 chars) const shortId = msg.messageId.substring(0, 8); @@ -835,6 +858,7 @@ export class TmuxWrapper { resetSessionState(): void { this.sessionEndProcessed = false; this.lastSummaryHash = ''; + this.lastSummaryRawContent = ''; } /** From 4456177db1f1b02f71c3ae72819ec48e933b9959 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Mon, 22 Dec 2025 15:04:48 +0100 Subject: [PATCH 11/12] fix(relay): Add Gemini shell prompt detection to skip injection Prevents messages from being injected when Gemini is at a shell prompt ($) to avoid unintended command execution. --- src/wrapper/tmux-wrapper.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index 87b4afcc7..a99494a47 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -146,6 +146,7 @@ export class TmuxWrapper { agentName: config.name, socketPath: config.socketPath, cli: this.cliType, + workingDirectory: this.config.cwd ?? process.cwd(), }); this.parser = new OutputParser({ prefix: this.relayPrefix }); @@ -801,6 +802,21 @@ export class TmuxWrapper { await this.sleep(30); } + // For Gemini: check if we're at a shell prompt ($) vs chat prompt (>) + // If at shell prompt, skip injection to avoid shell command execution + if (this.cliType === 'gemini') { + const lastLine = await this.getLastLine(); + const cleanLine = this.stripAnsi(lastLine).trim(); + if (/^\$\s*$/.test(cleanLine) || /^\s*\$\s*$/.test(cleanLine)) { + this.logStderr('Gemini at shell prompt, skipping injection to avoid shell execution'); + // Re-queue the message for later + this.messageQueue.unshift(msg); + this.isInjecting = false; + setTimeout(() => this.checkForInjectionOpportunity(), 2000); + return; + } + } + // Standard injection for all CLIs including Gemini // Format: Relay message from Sender [abc12345]: content const injection = `Relay message from ${msg.from} ${idTag}: ${sanitizedBody}${truncationHint}`; From 63df239b7d8c7cc7109538ae1b42e7915a147958 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Mon, 22 Dec 2025 15:21:13 +0100 Subject: [PATCH 12/12] more fixes --- src/daemon/agent-registry.test.ts | 97 ++++++++++++++ src/daemon/agent-registry.ts | 178 ++++++++++++++++++++++++ src/daemon/connection.ts | 6 + src/daemon/index.ts | 1 + src/daemon/router.ts | 15 ++- src/daemon/server.ts | 41 ++++-- src/protocol/types.ts | 4 + src/wrapper/client.ts | 3 + src/wrapper/parser.test.ts | 66 ++++++++- src/wrapper/parser.ts | 216 +++++++++++++++++++++--------- src/wrapper/tmux-wrapper.ts | 2 +- 11 files changed, 553 insertions(+), 76 deletions(-) create mode 100644 src/daemon/agent-registry.test.ts create mode 100644 src/daemon/agent-registry.ts diff --git a/src/daemon/agent-registry.test.ts b/src/daemon/agent-registry.test.ts new file mode 100644 index 000000000..5390dac6e --- /dev/null +++ b/src/daemon/agent-registry.test.ts @@ -0,0 +1,97 @@ +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import { describe, it, expect } from 'vitest'; +import { AgentRegistry } from './agent-registry.js'; + +function makeTempDir(): string { + return fs.mkdtempSync(path.join(os.tmpdir(), 'agent-registry-')); +} + +describe('AgentRegistry', () => { + it('creates and persists agent records', () => { + const dir = makeTempDir(); + const registry = new AgentRegistry(dir); + + const created = registry.registerOrUpdate({ + name: 'alice', + cli: 'claude', + workingDirectory: '/tmp/alice', + }); + + expect(created.id).toBeTruthy(); + expect(created.firstSeen).toBeTruthy(); + expect(created.messagesSent).toBe(0); + expect(created.messagesReceived).toBe(0); + + registry.recordSend('alice'); + registry.recordReceive('alice'); + + const agentsPath = path.join(dir, 'agents.json'); + const fileData = JSON.parse(fs.readFileSync(agentsPath, 'utf-8')); + const fileAgent = fileData.agents.find((a: any) => a.name === 'alice'); + expect(fileAgent.messagesSent).toBe(1); + expect(fileAgent.messagesReceived).toBe(1); + + const registryReloaded = new AgentRegistry(dir); + const [loaded] = registryReloaded.getAgents(); + expect(loaded.name).toBe('alice'); + expect(loaded.messagesSent).toBe(1); + expect(loaded.messagesReceived).toBe(1); + }); + + it('updates metadata on re-register', () => { + const dir = makeTempDir(); + const registry = new AgentRegistry(dir); + + registry.registerOrUpdate({ + name: 'bob', + cli: 'claude', + workingDirectory: '/tmp/one', + }); + const first = registry.getAgents()[0]; + + registry.registerOrUpdate({ + name: 'bob', + cli: 'gemini', + workingDirectory: '/tmp/two', + }); + const [updated] = registry.getAgents(); + + expect(updated.firstSeen).toBe(first.firstSeen); + expect(updated.cli).toBe('gemini'); + expect(updated.workingDirectory).toBe('/tmp/two'); + expect(new Date(updated.lastSeen).getTime()).toBeGreaterThanOrEqual(new Date(first.lastSeen).getTime()); + }); + + it('handles malformed agents.json gracefully', () => { + const dir = makeTempDir(); + const agentsPath = path.join(dir, 'agents.json'); + fs.writeFileSync(agentsPath, '{bad json', 'utf-8'); + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + const registry = new AgentRegistry(dir); + expect(registry.getAgents()).toEqual([]); + expect(errorSpy).toHaveBeenCalled(); + + errorSpy.mockRestore(); + }); + + it('logs write failures without throwing', () => { + const dir = makeTempDir(); + const registry = new AgentRegistry(dir); + registry.registerOrUpdate({ name: 'carol' }); + + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + const writeSpy = vi.spyOn(fs, 'writeFileSync').mockImplementationOnce(() => { + throw new Error('disk full'); + }); + + // Trigger a save + registry.recordSend('carol'); + + expect(errorSpy).toHaveBeenCalled(); + writeSpy.mockRestore(); + errorSpy.mockRestore(); + }); +}); diff --git a/src/daemon/agent-registry.ts b/src/daemon/agent-registry.ts new file mode 100644 index 000000000..295ba02a7 --- /dev/null +++ b/src/daemon/agent-registry.ts @@ -0,0 +1,178 @@ +/** + * Agent Registry + * Persists agent metadata across daemon restarts. + */ + +import fs from 'node:fs'; +import path from 'node:path'; +import { v4 as uuid } from 'uuid'; + +export interface AgentRecord { + id: string; + name: string; + cli?: string; + workingDirectory?: string; + firstSeen: string; + lastSeen: string; + messagesSent: number; + messagesReceived: number; +} + +type AgentInput = { + name: string; + cli?: string; + workingDirectory?: string; +}; + +export class AgentRegistry { + private registryPath: string; + private agents: Map = new Map(); // name -> record + + constructor(teamDir: string) { + this.registryPath = path.join(teamDir, 'agents.json'); + this.ensureDir(teamDir); + this.load(); + } + + /** + * Register or update an agent, refreshing lastSeen and metadata. + */ + registerOrUpdate(agent: AgentInput): AgentRecord { + const now = new Date().toISOString(); + const existing = this.agents.get(agent.name); + + if (existing) { + const updated: AgentRecord = { + ...existing, + cli: agent.cli ?? existing.cli, + workingDirectory: agent.workingDirectory ?? existing.workingDirectory, + lastSeen: now, + }; + this.agents.set(agent.name, updated); + this.save(); + return updated; + } + + const record: AgentRecord = { + id: `agent-${uuid()}`, + name: agent.name, + cli: agent.cli, + workingDirectory: agent.workingDirectory, + firstSeen: now, + lastSeen: now, + messagesSent: 0, + messagesReceived: 0, + }; + + this.agents.set(agent.name, record); + this.save(); + return record; + } + + /** + * Increment sent counter for an agent. + */ + recordSend(agentName: string): void { + const record = this.ensureRecord(agentName); + record.messagesSent += 1; + record.lastSeen = new Date().toISOString(); + this.agents.set(agentName, record); + this.save(); + } + + /** + * Increment received counter for an agent. + */ + recordReceive(agentName: string): void { + const record = this.ensureRecord(agentName); + record.messagesReceived += 1; + record.lastSeen = new Date().toISOString(); + this.agents.set(agentName, record); + this.save(); + } + + /** + * Touch lastSeen for an agent (e.g., on disconnect). + */ + touch(agentName: string): void { + const record = this.ensureRecord(agentName); + record.lastSeen = new Date().toISOString(); + this.agents.set(agentName, record); + this.save(); + } + + /** + * Get a snapshot of all agents. + */ + getAgents(): AgentRecord[] { + return Array.from(this.agents.values()); + } + + private ensureRecord(agentName: string): AgentRecord { + const existing = this.agents.get(agentName); + if (existing) return existing; + + const now = new Date().toISOString(); + const record: AgentRecord = { + id: `agent-${uuid()}`, + name: agentName, + firstSeen: now, + lastSeen: now, + messagesSent: 0, + messagesReceived: 0, + }; + + this.agents.set(agentName, record); + return record; + } + + private ensureDir(dir: string): void { + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } + } + + private load(): void { + if (!fs.existsSync(this.registryPath)) { + return; + } + + try { + const data = JSON.parse(fs.readFileSync(this.registryPath, 'utf-8')); + const rawAgents = Array.isArray(data?.agents) + ? data.agents + : typeof data?.agents === 'object' && data?.agents !== null + ? Object.values(data.agents) + : []; + + for (const raw of rawAgents) { + if (!raw?.name) continue; + const record: AgentRecord = { + id: raw.id ?? `agent-${uuid()}`, + name: raw.name, + cli: raw.cli, + workingDirectory: raw.workingDirectory, + firstSeen: raw.firstSeen ?? new Date().toISOString(), + lastSeen: raw.lastSeen ?? new Date().toISOString(), + messagesSent: typeof raw.messagesSent === 'number' ? raw.messagesSent : 0, + messagesReceived: typeof raw.messagesReceived === 'number' ? raw.messagesReceived : 0, + }; + this.agents.set(record.name, record); + } + } catch (err) { + console.error('[registry] Failed to load agents.json:', err); + } + } + + private save(): void { + try { + fs.writeFileSync( + this.registryPath, + JSON.stringify({ agents: this.getAgents() }, null, 2), + 'utf-8' + ); + } catch (err) { + console.error('[registry] Failed to write agents.json:', err); + } + } +} diff --git a/src/daemon/connection.ts b/src/daemon/connection.ts index 9a3c417bb..ed6c66a37 100644 --- a/src/daemon/connection.ts +++ b/src/daemon/connection.ts @@ -43,6 +43,7 @@ export class Connection { private _state: ConnectionState = 'CONNECTING'; private _agentName?: string; private _cli?: string; + private _workingDirectory?: string; private _sessionId: string; private _resumeToken: string; @@ -83,6 +84,10 @@ export class Connection { return this._cli; } + get workingDirectory(): string | undefined { + return this._workingDirectory; + } + get sessionId(): string { return this._sessionId; } @@ -139,6 +144,7 @@ export class Connection { this._agentName = envelope.payload.agent; this._cli = envelope.payload.cli; + this._workingDirectory = envelope.payload.workingDirectory; // Check for session resume if (envelope.payload.session?.resume_token) { diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 53d41273f..2fe9acec0 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -1,3 +1,4 @@ export * from './server.js'; export * from './router.js'; export * from './connection.js'; +export * from './agent-registry.js'; diff --git a/src/daemon/router.ts b/src/daemon/router.ts index e0b10a0b4..46629d7a3 100644 --- a/src/daemon/router.ts +++ b/src/daemon/router.ts @@ -12,11 +12,13 @@ import { PROTOCOL_VERSION, } from '../protocol/types.js'; import type { StorageAdapter } from '../storage/adapter.js'; +import type { AgentRegistry } from './agent-registry.js'; export interface RoutableConnection { id: string; agentName?: string; cli?: string; + workingDirectory?: string; sessionId: string; close(): void; send(envelope: Envelope): boolean; @@ -53,10 +55,12 @@ export class Router { private subscriptions: Map> = new Map(); // topic -> Set private pendingDeliveries: Map = new Map(); // deliverId -> pending private deliveryOptions: DeliveryReliabilityOptions; + private registry?: AgentRegistry; - constructor(options: { storage?: StorageAdapter; delivery?: Partial } = {}) { + constructor(options: { storage?: StorageAdapter; delivery?: Partial; registry?: AgentRegistry } = {}) { this.storage = options.storage; this.deliveryOptions = { ...DEFAULT_DELIVERY_OPTIONS, ...options.delivery }; + this.registry = options.registry; } /** @@ -73,6 +77,11 @@ export class Router { this.connections.delete(existing.id); } this.agents.set(connection.agentName, connection); + this.registry?.registerOrUpdate({ + name: connection.agentName, + cli: connection.cli, + workingDirectory: connection.workingDirectory, + }); } } @@ -131,6 +140,8 @@ export class Router { return; } + this.registry?.recordSend(senderName); + const to = envelope.to; const topic = envelope.topic; @@ -165,6 +176,7 @@ export class Router { this.persistDeliverEnvelope(deliver); if (sent) { this.trackDelivery(target, deliver); + this.registry?.recordReceive(to); } return sent; } @@ -191,6 +203,7 @@ export class Router { this.persistDeliverEnvelope(deliver); if (sent) { this.trackDelivery(target, deliver); + this.registry?.recordReceive(agentName); } } } diff --git a/src/daemon/server.ts b/src/daemon/server.ts index 728d3457a..04bebb137 100644 --- a/src/daemon/server.ts +++ b/src/daemon/server.ts @@ -12,6 +12,7 @@ import type { Envelope, SendPayload } from '../protocol/types.js'; import { createStorageAdapter, type StorageAdapter, type StorageConfig } from '../storage/adapter.js'; import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; import { getProjectPaths } from '../utils/project-namespace.js'; +import { AgentRegistry } from './agent-registry.js'; export interface DaemonConfig extends ConnectionConfig { socketPath: string; @@ -40,6 +41,7 @@ export class Daemon { private connections: Set = new Set(); private storage?: StorageAdapter; private storageInitialized = false; + private registry?: AgentRegistry; constructor(config: Partial = {}) { this.config = { ...DEFAULT_DAEMON_CONFIG, ...config }; @@ -50,6 +52,9 @@ export class Daemon { if (!this.config.teamDir) { this.config.teamDir = path.dirname(this.config.socketPath); } + if (this.config.teamDir) { + this.registry = new AgentRegistry(this.config.teamDir); + } // Storage is initialized lazily in start() to support async createStorageAdapter this.server = net.createServer(this.handleConnection.bind(this)); } @@ -58,18 +63,15 @@ export class Daemon { * Write current agents to agents.json for dashboard consumption. */ private writeAgentsFile(): void { - if (!this.config.teamDir) return; - const agentsPath = path.join(this.config.teamDir, 'agents.json'); - const agents = this.router.getAgents().map(name => { - const connection = this.router.getConnection(name); - return { - name, - cli: connection?.cli, - connectedAt: new Date().toISOString(), - }; - }); + if (!this.registry) return; + // The registry persists on every update; this is a no-op helper for symmetry. + const agents = this.registry.getAgents(); try { - fs.writeFileSync(agentsPath, JSON.stringify({ agents }, null, 2)); + fs.writeFileSync( + path.join(this.config.teamDir ?? path.dirname(this.config.socketPath), 'agents.json'), + JSON.stringify({ agents }, null, 2), + 'utf-8' + ); } catch (err) { console.error('[daemon] Failed to write agents.json:', err); } @@ -91,7 +93,7 @@ export class Daemon { this.storage = await createStorageAdapter(storagePath, this.config.storageConfig); } - this.router = new Router({ storage: this.storage }); + this.router = new Router({ storage: this.storage, registry: this.registry }); this.storageInitialized = true; } @@ -190,7 +192,14 @@ export class Daemon { if (connection.agentName) { this.router.register(connection); console.log(`[daemon] Agent registered: ${connection.agentName}`); - this.writeAgentsFile(); + if (connection.agentName) { + this.registry?.registerOrUpdate({ + name: connection.agentName, + cli: connection.cli, + workingDirectory: connection.workingDirectory, + }); + this.writeAgentsFile(); + } // Record session start if (this.storage instanceof SqliteStorageAdapter) { @@ -211,6 +220,9 @@ export class Daemon { console.log(`[daemon] Connection closed: ${connection.agentName ?? connection.id}`); this.connections.delete(connection); this.router.unregister(connection); + if (connection.agentName) { + this.registry?.touch(connection.agentName); + } this.writeAgentsFile(); // Record session end (disconnect - agent may still mark it closed explicitly) @@ -224,6 +236,9 @@ export class Daemon { console.error(`[daemon] Connection error: ${error.message}`); this.connections.delete(connection); this.router.unregister(connection); + if (connection.agentName) { + this.registry?.touch(connection.agentName); + } this.writeAgentsFile(); // Record session end on error diff --git a/src/protocol/types.ts b/src/protocol/types.ts index 02cd71eac..b1256ac7e 100644 --- a/src/protocol/types.ts +++ b/src/protocol/types.ts @@ -49,6 +49,8 @@ export interface HelloPayload { }; /** Optional hint about which CLI the agent is using (claude, codex, gemini, etc.) */ cli?: string; + /** Optional working directory hint for registry/dashboard */ + workingDirectory?: string; session?: { resume_token?: string; }; @@ -75,6 +77,8 @@ export interface SendPayload { export interface SendMeta { requires_ack?: boolean; ttl_ms?: number; + importance?: number; // 0-100, 100 is highest + replyTo?: string; // Correlation ID for replies } export interface DeliveryInfo { diff --git a/src/wrapper/client.ts b/src/wrapper/client.ts index 35d56fe05..bbdf4a9bc 100644 --- a/src/wrapper/client.ts +++ b/src/wrapper/client.ts @@ -25,6 +25,8 @@ export interface ClientConfig { agentName: string; /** Optional CLI identifier to surface to the dashboard */ cli?: string; + /** Optional working directory to surface in registry/dashboard */ + workingDirectory?: string; reconnect: boolean; maxReconnectAttempts: number; reconnectDelayMs: number; @@ -256,6 +258,7 @@ export class RelayClient { payload: { agent: this.config.agentName, cli: this.config.cli, + workingDirectory: this.config.workingDirectory, capabilities: { ack: true, resume: true, diff --git a/src/wrapper/parser.test.ts b/src/wrapper/parser.test.ts index 167abd5e5..7b03e31e0 100644 --- a/src/wrapper/parser.test.ts +++ b/src/wrapper/parser.test.ts @@ -3,7 +3,7 @@ */ import { describe, it, expect, beforeEach } from 'vitest'; -import { OutputParser, formatIncomingMessage, parseSummaryFromOutput, parseSessionEndFromOutput } from './parser.js'; +import { OutputParser, formatIncomingMessage, parseSummaryFromOutput, parseSessionEndFromOutput, parseRelayMetadataFromOutput } from './parser.js'; describe('OutputParser', () => { let parser: OutputParser; @@ -736,3 +736,67 @@ Completed the following: expect(result?.summary).toContain('Feature A'); }); }); + +describe('parseRelayMetadataFromOutput', () => { + it('parses valid metadata block', () => { + const output = `Some output +[[RELAY_METADATA]] +{ + "subject": "Task update", + "importance": 80, + "replyTo": "msg-abc123", + "ackRequired": true +} +[[/RELAY_METADATA]] +More output`; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(true); + expect(result.valid).toBe(true); + expect(result.metadata).toEqual({ + subject: 'Task update', + importance: 80, + replyTo: 'msg-abc123', + ackRequired: true, + }); + expect(result.rawContent).toContain('"subject"'); + }); + + it('returns not found when no metadata block exists', () => { + const output = 'Regular output without any metadata block'; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(false); + expect(result.valid).toBe(false); + expect(result.metadata).toBeNull(); + expect(result.rawContent).toBeNull(); + }); + + it('returns invalid for malformed JSON', () => { + const output = '[[RELAY_METADATA]]not valid json[[/RELAY_METADATA]]'; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(true); + expect(result.valid).toBe(false); + expect(result.metadata).toBeNull(); + expect(result.rawContent).toBe('not valid json'); + }); + + it('handles empty metadata block', () => { + const output = '[[RELAY_METADATA]]{}[[/RELAY_METADATA]]'; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(true); + expect(result.valid).toBe(true); + expect(result.metadata).toEqual({}); + }); + + it('parses metadata with partial fields', () => { + const output = '[[RELAY_METADATA]]{"subject":"Quick note"}[[/RELAY_METADATA]]'; + + const result = parseRelayMetadataFromOutput(output); + expect(result.found).toBe(true); + expect(result.valid).toBe(true); + expect(result.metadata).toEqual({ subject: 'Quick note' }); + }); +}); diff --git a/src/wrapper/parser.ts b/src/wrapper/parser.ts index 313655557..f4bf8a351 100644 --- a/src/wrapper/parser.ts +++ b/src/wrapper/parser.ts @@ -23,6 +23,7 @@ export interface ParsedCommand { /** Optional thread ID for grouping related messages */ thread?: string; raw: string; + meta?: ParsedMessageMetadata; } export interface ParserOptions { @@ -45,8 +46,9 @@ const DEFAULT_OPTIONS: Required = { // Static patterns (not prefix-dependent) const BLOCK_END = /\[\[\/RELAY\]\]/; -const CODE_FENCE = /^```/; -// Continuation helpers +const BLOCK_METADATA_START = '[[RELAY_METADATA]]'; +const BLOCK_METADATA_END = /\[\[\/RELAY_METADATA\]\]/; +const CODE_FENCE = /^```/;// Continuation helpers const BULLET_OR_NUMBERED_LIST = /^[ \t]*([\-*•◦‣⏺◆◇○□■]|[0-9]+[.)])\s+/; const PROMPTISH_LINE = /^[\s]*[>$%#➜›»][\s]*$/; const RELAY_INJECTION_PREFIX = /^\s*Relay message from /; @@ -99,6 +101,8 @@ export class OutputParser { private inCodeFence = false; private inBlock = false; private blockBuffer = ''; + private blockType: 'RELAY' | 'RELAY_METADATA' | null = null; + private lastParsedMetadata: ParsedMessageMetadata | null = null; // Dynamic patterns based on prefix configuration private inlineRelayPattern: RegExp; @@ -133,17 +137,17 @@ export class OutputParser { let output = ''; // If we're inside a block, accumulate until we see the end - if (this.inBlock) { - return this.parseInBlockMode(data, commands); + if (this.inBlock && this.blockType) { + return this.parseInBlockMode(data, commands, this.blockType); } - // Find [[RELAY]] that's at the start of a line (or start of input) - // and NOT inside a code fence - const blockStartIdx = this.findBlockStart(data); + // Find [[RELAY_METADATA]] or [[RELAY]] that's at the start of a line + const blockStart = this.findBlockStart(data); - if (this.options.enableBlock && blockStartIdx !== -1) { - const before = data.substring(0, blockStartIdx); - const after = data.substring(blockStartIdx + '[[RELAY]]'.length); + if (this.options.enableBlock && blockStart.index !== -1 && blockStart.identifier) { + const blockStartIdentifier = blockStart.identifier; + const before = data.substring(0, blockStart.index); + const after = data.substring(blockStart.index + blockStartIdentifier.length); // Output everything before the block start if (before) { @@ -153,6 +157,7 @@ export class OutputParser { // Enter block mode this.inBlock = true; + this.blockType = blockStartIdentifier === BLOCK_METADATA_START ? 'RELAY_METADATA' : 'RELAY'; this.blockBuffer = after; // Check size limit before processing @@ -160,12 +165,14 @@ export class OutputParser { console.error('[parser] Block too large, discarding'); this.inBlock = false; this.blockBuffer = ''; + this.blockType = null; return { commands, output }; } // Check if block ends in same chunk - if (BLOCK_END.test(this.blockBuffer)) { - const blockResult = this.finishBlock(); + const blockEndPattern = this.blockType === 'RELAY_METADATA' ? BLOCK_METADATA_END : BLOCK_END; + if (blockEndPattern.test(this.blockBuffer)) { + const blockResult = this.finishBlock(this.blockType); if (blockResult.command) { commands.push(blockResult.command); } @@ -186,66 +193,76 @@ export class OutputParser { } /** - * Find [[RELAY]] that's at the start of a line and not inside a code fence. - * Returns the index, or -1 if not found. + * Find [[RELAY_METADATA]] or [[RELAY]] that's at the start of a line and not inside a code fence. + * Returns the index and identifier, or -1 and null if not found. */ - private findBlockStart(data: string): number { + private findBlockStart(data: string): { index: number; identifier: string | null } { // Track code fence state through the data let inFence = this.inCodeFence; let searchStart = 0; + // Prioritize RELAY_METADATA over RELAY + const blockIdentifiers = [BLOCK_METADATA_START, '[[RELAY]]']; + while (searchStart < data.length) { - // Look for next [[RELAY]] or code fence - const relayIdx = data.indexOf('[[RELAY]]', searchStart); - const fenceIdx = data.indexOf('```', searchStart); + let earliestBlockIdx = -1; + let earliestBlockIdentifier: string | null = null; + + for (const identifier of blockIdentifiers) { + const currentBlockIdx = data.indexOf(identifier, searchStart); + if (currentBlockIdx !== -1 && (earliestBlockIdx === -1 || currentBlockIdx < earliestBlockIdx)) { + earliestBlockIdx = currentBlockIdx; + earliestBlockIdentifier = identifier; + } + } - // No more [[RELAY]] found - if (relayIdx === -1) { + // No more blocks found + if (earliestBlockIdx === -1) { // Still update code fence state for remaining data + let fenceIdx = data.indexOf('```', searchStart); while (fenceIdx !== -1) { - const nextFence = data.indexOf('```', searchStart); - if (nextFence === -1) break; inFence = !inFence; - searchStart = nextFence + 3; + searchStart = fenceIdx + 3; + fenceIdx = data.indexOf('```', searchStart); } - return -1; + return { index: -1, identifier: null }; } - // Process any code fences before this [[RELAY]] + // Process any code fences before this block let tempIdx = searchStart; while (true) { const nextFence = data.indexOf('```', tempIdx); - if (nextFence === -1 || nextFence >= relayIdx) break; + if (nextFence === -1 || nextFence >= earliestBlockIdx) break; inFence = !inFence; tempIdx = nextFence + 3; } - // If we're inside a code fence, skip this [[RELAY]] + // If we're inside a code fence, skip this block if (inFence) { - searchStart = relayIdx + 9; // Skip past [[RELAY]] + searchStart = earliestBlockIdx + (earliestBlockIdentifier?.length ?? 0); // Skip past the block continue; } - // Check if [[RELAY]] is at start of a line - if (relayIdx === 0) { - return 0; // At very start + // Check if block is at start of a line + if (earliestBlockIdx === 0) { + return { index: 0, identifier: earliestBlockIdentifier }; // At very start } // Look backwards for the start of line - const beforeRelay = data.substring(0, relayIdx); - const lastNewline = beforeRelay.lastIndexOf('\n'); - const lineStart = beforeRelay.substring(lastNewline + 1); + const beforeBlock = data.substring(0, earliestBlockIdx); + const lastNewline = beforeBlock.lastIndexOf('\n'); + const lineStart = beforeBlock.substring(lastNewline + 1); - // Must be only whitespace before [[RELAY]] on this line + // Must be only whitespace before block on this line if (/^\s*$/.test(lineStart)) { - return relayIdx; + return { index: earliestBlockIdx, identifier: earliestBlockIdentifier }; } // Not at start of line, keep searching - searchStart = relayIdx + 9; + searchStart = earliestBlockIdx + (earliestBlockIdentifier?.length ?? 0); } - return -1; + return { index: -1, identifier: null }; } /** @@ -395,7 +412,7 @@ export class OutputParser { /** * Parse while inside a [[RELAY]] block - buffer until we see [[/RELAY]]. */ - private parseInBlockMode(data: string, commands: ParsedCommand[]): { commands: ParsedCommand[]; output: string } { + private parseInBlockMode(data: string, commands: ParsedCommand[], blockType: 'RELAY' | 'RELAY_METADATA'): { commands: ParsedCommand[]; output: string } { this.blockBuffer += data; // Check size limit @@ -403,12 +420,14 @@ export class OutputParser { console.error('[parser] Block too large, discarding'); this.inBlock = false; this.blockBuffer = ''; + this.blockType = null; return { commands, output: '' }; } // Check for block end - if (BLOCK_END.test(this.blockBuffer)) { - const result = this.finishBlock(); + const blockEndPattern = blockType === 'RELAY_METADATA' ? BLOCK_METADATA_END : BLOCK_END; + if (blockEndPattern.test(this.blockBuffer)) { + const result = this.finishBlock(blockType); if (result.command) { commands.push(result.command); } @@ -499,37 +518,59 @@ export class OutputParser { * Finish processing a block and extract command. * Returns the command (if valid) and any remaining content after [[/RELAY]]. */ - private finishBlock(): { command: ParsedCommand | null; remaining: string | null } { - const endIdx = this.blockBuffer.indexOf('[[/RELAY]]'); + private finishBlock(blockType: 'RELAY' | 'RELAY_METADATA'): { command: ParsedCommand | null; remaining: string | null; metadata: ParsedMessageMetadata | null } { + const blockEndIdentifier = blockType === 'RELAY_METADATA' ? BLOCK_METADATA_END.source : BLOCK_END.source; + const endIdx = this.blockBuffer.indexOf(blockEndIdentifier.replace(/\\/g, '')); // Remove regex escapes for indexOf const jsonStr = this.blockBuffer.substring(0, endIdx).trim(); - const remaining = this.blockBuffer.substring(endIdx + '[[/RELAY]]'.length) || null; + const remaining = this.blockBuffer.substring(endIdx + blockEndIdentifier.replace(/\\/g, '').length) || null; this.inBlock = false; this.blockBuffer = ''; - - try { - const parsed = JSON.parse(jsonStr); - - // Validate required fields - if (!parsed.to || !parsed.type) { - console.error('[parser] Block missing required fields (to, type)'); - return { command: null, remaining }; + this.blockType = null; + + if (blockType === 'RELAY_METADATA') { + try { + const metadata = JSON.parse(jsonStr) as ParsedMessageMetadata; + this.lastParsedMetadata = metadata; + return { command: null, remaining, metadata }; + } catch (err) { + console.error('[parser] Invalid JSON in RELAY_METADATA block:', err); + this.lastParsedMetadata = null; + return { command: null, remaining, metadata: null }; } + } else { // blockType === 'RELAY' + try { + const parsed = JSON.parse(jsonStr); + + // Validate required fields + if (!parsed.to || !parsed.type) { + console.error('[parser] Block missing required fields (to, type)'); + this.lastParsedMetadata = null; // Clear metadata even if RELAY block is invalid + return { command: null, remaining, metadata: null }; + } - return { - command: { + const command: ParsedCommand = { to: parsed.to, kind: parsed.type as PayloadKind, body: parsed.body ?? parsed.text ?? '', data: parsed.data, thread: parsed.thread || undefined, raw: jsonStr, - }, - remaining, - }; - } catch (err) { - console.error('[parser] Invalid JSON in block:', err); - return { command: null, remaining }; + meta: this.lastParsedMetadata || undefined, // Attach last parsed metadata + }; + + this.lastParsedMetadata = null; // Clear after use + + return { + command, + remaining, + metadata: null, + }; + } catch (err) { + console.error('[parser] Invalid JSON in RELAY block:', err); + this.lastParsedMetadata = null; + return { command: null, remaining, metadata: null }; + } } } @@ -540,6 +581,8 @@ export class OutputParser { const result = this.parse('\n'); this.inBlock = false; this.blockBuffer = ''; + this.blockType = null; + this.lastParsedMetadata = null; this.inCodeFence = false; return result; } @@ -550,6 +593,8 @@ export class OutputParser { reset(): void { this.inBlock = false; this.blockBuffer = ''; + this.blockType = null; + this.lastParsedMetadata = null; this.inCodeFence = false; } } @@ -562,6 +607,57 @@ export function formatIncomingMessage(from: string, body: string, kind: PayloadK return `\n${prefix} from ${from}: ${body}\n`; } +/** + * Parsed message metadata block from agent output. + */ +export interface ParsedMessageMetadata { + subject?: string; + importance?: number; + replyTo?: string; + ackRequired?: boolean; +} + +/** + * Result of attempting to parse a RELAY_METADATA block. + */ +export interface MetadataParseResult { + found: boolean; + valid: boolean; + metadata: ParsedMessageMetadata | null; + rawContent: string | null; // Raw block content for deduplication +} + +/** + * Parse [[RELAY_METADATA]]...[[/RELAY_METADATA]] blocks from agent output. + * Agents can output metadata to enhance messages. + * + * Format: + * [[RELAY_METADATA]] + * { + * "subject": "Task update", + * "importance": 80, + * "replyTo": "msg-abc123", + * "ackRequired": true + * } + * [[/RELAY_METADATA]] + */ +export function parseRelayMetadataFromOutput(output: string): MetadataParseResult { + const match = output.match(/\[\[RELAY_METADATA\]\]([\s\S]*?)\[\[\/RELAY_METADATA\]\]/); + + if (!match) { + return { found: false, valid: false, metadata: null, rawContent: null }; + } + + const rawContent = match[1].trim(); + + try { + const metadata = JSON.parse(rawContent) as ParsedMessageMetadata; + return { found: true, valid: true, metadata, rawContent }; + } catch { + return { found: true, valid: false, metadata: null, rawContent }; + } +} + /** * Parsed summary block from agent output. */ diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index a99494a47..2517d6ba6 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -15,7 +15,7 @@ import { exec, execSync, spawn, ChildProcess } from 'node:child_process'; import { promisify } from 'node:util'; import { RelayClient } from './client.js'; -import { OutputParser, type ParsedCommand, parseSummaryWithDetails, parseSessionEndFromOutput } from './parser.js'; +import { OutputParser, type ParsedCommand, parseSummaryWithDetails, parseSessionEndFromOutput, ParsedMessageMetadata } from './parser.js'; import { InboxManager } from './inbox.js'; import type { SendPayload } from '../protocol/types.js'; import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js';