diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 6ca5c5e9..12fcca02 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, vi } from 'vitest' -import { mkdtemp, readFile, rm } from 'node:fs/promises' +import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises' import { tmpdir } from 'node:os' import { join } from 'node:path' import type { BrokerEvent, SendMessageInput, SpawnPtyInput } from '@agent-relay/harness-driver' @@ -11,6 +11,7 @@ import { parseLinearIssue, readFactoryInFlightRegistry, readFactoryLoopHeartbeat, + reapFactoryOrphansOnce, type FactoryConfig, type TriageDecision, type TriageEngine, @@ -725,6 +726,283 @@ describe('FactoryLoop', () => { } }) + it('hands spawned agents to the durable reaper registry when dispatch writeback fails', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-registry-')) + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + try { + const mount = new FakeMountClient({ [issuePath(72)]: issueFile(72) }) + const fleet = new FakeFleetClient() + fleet.setSessionRef('ar-72-impl', 'session-ar-72-impl') + fleet.setSessionRef('ar-72-review', 'session-ar-72-review') + const linear: LinearWriteback = { + async postComment() {}, + async setState() { + throw new Error('Live state changed before writeback') + }, + async createIssue() { + throw new Error('not used') + }, + async verify() { + return true + }, + } + const factory = createFactory(config({ + loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 }, + }), { + mount, + fleet, + triage: new StaticTriage(), + linear, + processFinder: async () => ({ status: 'missing' }), + processIdentityReader: async () => undefined, + }) + + await expect(factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(72), issueFile(72))))) + .rejects.toThrow('Live state changed before writeback') + + expect(factory.status().inFlight).toEqual([]) + expect(factory.status().counters.dispatchFailureReaperHandoffs).toBe(1) + const registry = await readFactoryInFlightRegistry(registryPath) + expect(registry?.agents).toMatchObject([ + { + name: 'ar-72-impl', + role: 'implementer', + sessionRef: 'session-ar-72-impl', + issue: { key: 'AR-72', uuid: 'uuid-72', path: issuePath(72) }, + pids: [], + processes: [], + }, + { + name: 'ar-72-review', + role: 'reviewer', + sessionRef: 'session-ar-72-review', + issue: { key: 'AR-72', uuid: 'uuid-72', path: issuePath(72) }, + pids: [], + processes: [], + }, + ]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('preserves dispatch-failure handoffs through abandon and stop-time registry rewrites', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-loop-registry-')) + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + try { + const mount = new FakeMountClient({ [issuePath(76)]: issueFile(76) }) + const fleet = new FakeFleetClient() + fleet.setSessionRef('ar-76-impl', 'session-ar-76-impl') + fleet.setSessionRef('ar-76-review', 'session-ar-76-review') + const linear: LinearWriteback = { + async postComment() {}, + async setState() { + throw new Error('setState 404') + }, + async createIssue() { + throw new Error('not used') + }, + async verify() { + return true + }, + } + const factory = createFactory(config({ + loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 }, + }), { + mount, + fleet, + triage: new StaticTriage(), + linear, + processFinder: async () => ({ status: 'missing' }), + processIdentityReader: async () => undefined, + }) + + await expect(factory.runLoop()).rejects.toThrow('setState 404') + + expect(factory.status().inFlight).toEqual([]) + const heartbeat = await readFactoryLoopHeartbeat(heartbeatPath) + expect(heartbeat).toMatchObject({ status: 'stopping', registryPath }) + const registry = await readFactoryInFlightRegistry(registryPath) + expect(registry?.agents).toMatchObject([ + { name: 'ar-76-impl', sessionRef: 'session-ar-76-impl', pids: [], processes: [] }, + { name: 'ar-76-review', sessionRef: 'session-ar-76-review', pids: [], processes: [] }, + ]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('reaper consumes dispatch-failure handoff by resolving name-only agents without touching protected pids', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-reap-')) + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + try { + const mount = new FakeMountClient({ [issuePath(73)]: issueFile(73) }) + const fleet = new FakeFleetClient() + fleet.setSessionRef('ar-73-impl', 'session-ar-73-impl') + fleet.setSessionRef('ar-73-review', 'session-ar-73-review') + const linear: LinearWriteback = { + async postComment() {}, + async setState() { + throw new Error('setState 404') + }, + async createIssue() { + throw new Error('not used') + }, + async verify() { + return true + }, + } + const factory = createFactory(config({ + loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 1_000 }, + }), { + mount, + fleet, + triage: new StaticTriage(), + linear, + processFinder: async () => ({ status: 'missing' }), + processIdentityReader: async () => undefined, + }) + + await expect(factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(73), issueFile(73))))) + .rejects.toThrow('setState 404') + await writeFile(heartbeatPath, JSON.stringify({ + pid: process.pid, + status: 'running', + iteration: 1, + maxIterations: 1, + updatedAt: new Date(1_000).toISOString(), + updatedAtMs: 1_000, + registryPath, + }), 'utf8') + + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const alive = new Set([7_301, 7_302, 7_303, 68_009]) + const report = await reapFactoryOrphansOnce({ + heartbeatPath, + registryPath, + staleMs: 1_000, + nowMs: 3_500, + termGraceMs: 0, + fleet: { + protectedPids: async () => [68_009], + resolveAgentPid: async (name) => { + if (name === 'ar-73-impl') return { status: 'found', pid: 7_301 } + if (name === 'ar-73-review') return { status: 'found', pid: 68_009 } + return { status: 'unresolved' } + }, + }, + processFinder: async () => ({ status: 'missing' }), + readProcessIdentity: async (pid) => { + if (pid === 7_301) return { pid, startTime: 'start-7301', cmdline: 'node --agent-name ar-73-impl launcher' } + if (pid === 68_009) return { pid, startTime: 'broker-start', cmdline: 'node --agent-name ar-73-review broker' } + return undefined + }, + readParentPid: async () => undefined, + readChildPids: async (pid) => pid === 7_301 ? [7_302, 7_303] : [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!alive.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + if (signal === 'SIGKILL') alive.delete(pid) + return true + }, + }) + + expect(report.reaped.map((entry) => entry.pid)).toEqual([7_302, 7_303, 7_301]) + expect(killed.some((entry) => entry.pid === 68_009 && entry.signal !== 0)).toBe(false) + expect(report.skipped).toContainEqual({ pid: 68_009, reason: 'protected pid' }) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('dispatch failure before spawn does not create an orphan handoff', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-before-spawn-')) + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + try { + const mount = new FakeMountClient({ [issuePath(74)]: issueFile(74) }) + const fleet = new SpawnFailingFleetClient() + const factory = createFactory(config({ + loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 }, + }), { mount, fleet, triage: new StaticTriage() }) + + await expect(factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(74), issueFile(74))))) + .rejects.toThrow('Dispatch spawn failed for AR-74/ar-74-impl') + + expect(factory.status().inFlight).toEqual([]) + expect(factory.status().counters.dispatchFailureReaperHandoffs).toBeUndefined() + expect(await readFactoryInFlightRegistry(registryPath)).toBeUndefined() + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('reaper reports unresolved dispatch-failure handoff pids instead of treating them as success', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-unresolved-')) + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + try { + const mount = new FakeMountClient({ [issuePath(75)]: issueFile(75) }) + const fleet = new FakeFleetClient() + fleet.setSessionRef('ar-75-impl', 'session-ar-75-impl') + fleet.setSessionRef('ar-75-review', 'session-ar-75-review') + const linear: LinearWriteback = { + async postComment() {}, + async setState() { + throw new Error('setState 404') + }, + async createIssue() { + throw new Error('not used') + }, + async verify() { + return true + }, + } + const factory = createFactory(config({ + loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 1_000 }, + }), { + mount, + fleet, + triage: new StaticTriage(), + linear, + processFinder: async () => ({ status: 'missing' }), + processIdentityReader: async () => undefined, + }) + + await expect(factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(75), issueFile(75))))) + .rejects.toThrow('setState 404') + await writeFile(heartbeatPath, JSON.stringify({ + pid: process.pid, + status: 'running', + iteration: 1, + maxIterations: 1, + updatedAt: new Date(1_000).toISOString(), + updatedAtMs: 1_000, + registryPath, + }), 'utf8') + + const report = await reapFactoryOrphansOnce({ + heartbeatPath, + registryPath, + staleMs: 1_000, + nowMs: 3_500, + fleet: { resolveAgentPid: async () => ({ status: 'unresolved' }) }, + processFinder: async () => ({ status: 'missing' }), + }) + + expect(report.reaped).toEqual([]) + expect(report.skipped).toEqual([ + { reason: 'pid missing for ar-75-impl' }, + { reason: 'pid missing for ar-75-review' }, + ]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + it('reports missing or stale loop heartbeat as not live', () => { expect(checkFactoryLoopLiveness(undefined, { nowMs: 2_000, staleMs: 1_000 })).toMatchObject({ ok: false, diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 7b12fd64..077372ce 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -50,6 +50,11 @@ type FactoryEvent = 'issue-queued' | 'dispatched' | 'issue-done' | 'writeback-ve type Listener = (payload: FactoryEventPayload) => void type SlackThreadWatcher = { stop(): Promise } type TerminationRoots = { pids: number[]; status: AgentPidResolution['status'] } +type RegistryHandoffAgent = { + issue: IssueRef + name: string + tracked: TrackedAgent +} type DispatchAttemptState = { attempts: number inFlight: boolean @@ -117,6 +122,7 @@ export class FactoryLoop implements Factory { readonly #slackWatchers = new Map() readonly #slackWatcherStarts = new Map>() readonly #dispatchAttempts = new Map() + readonly #dispatchFailureReaperHandoffs = new Map() #slackDegraded = false #slackDegradedReason: string | undefined #slackWritebackFailureDegraded = false @@ -497,10 +503,19 @@ export class FactoryLoop implements Factory { return record.result } + const spawnedForReaperHandoff: RegistryHandoffAgent[] = [] try { const agents: DispatchResult['agents'] = [] for (const spec of [...decision.implementers, decision.reviewer]) { const spawned = await this.#spawnAgent(record, spec, dryRun) + const tracked = record.agents.get(spawned.name) + if (tracked) { + spawnedForReaperHandoff.push({ + issue: record.issue, + name: spawned.name, + tracked: cloneTrackedAgent(tracked), + }) + } agents.push({ name: spawned.name, role: spec.role }) } await this.#writeInFlightRegistry() @@ -535,6 +550,7 @@ export class FactoryLoop implements Factory { await this.#sendCriticalReviewerMessage(record) return result } catch (error) { + await this.#persistDispatchFailureReaperHandoff(record, spawnedForReaperHandoff) this.#recordDispatchFailure(decision.issue) this.#batch.abandon(decision.issue) this.#error(error, decision.issue) @@ -860,6 +876,34 @@ export class FactoryLoop implements Factory { } } + async #persistDispatchFailureReaperHandoff(record: InFlightIssue, handoffAgents: RegistryHandoffAgent[]): Promise { + if (record.dryRun || handoffAgents.length === 0) { + return + } + + try { + for (const agent of handoffAgents) { + this.#dispatchFailureReaperHandoffs.set(registryHandoffKey(agent.issue, agent.name), agent) + } + await this.#writeInFlightRegistry() + this.#increment('dispatchFailureReaperHandoffs') + this.#logger.warn?.('[factory] persisted dispatch-failed agents for orphan reaper', { + issue: record.issue, + agents: handoffAgents.map((agent) => agent.name).sort(), + }) + } catch (error) { + this.#increment('dispatchFailureReaperHandoffFailures') + for (const agent of handoffAgents) { + this.#dispatchFailureReaperHandoffs.delete(registryHandoffKey(agent.issue, agent.name)) + } + this.#logger.error?.('[factory] failed to persist dispatch-failed agents for orphan reaper', { + issue: record.issue, + error, + }) + this.#error(error, record.issue) + } + } + async #writeInFlightRegistry( path = this.#config.loop.registryPath, heartbeatPath = this.#config.loop.heartbeatPath, @@ -867,29 +911,42 @@ export class FactoryLoop implements Factory { ): Promise { const updatedAtMs = this.#clock.now() const agents: FactoryInFlightRegistryAgent[] = [] + const seenAgents = new Set() + const appendAgent = async (issue: IssueRef, agentName: string, tracked: TrackedAgent): Promise => { + const key = registryHandoffKey(issue, agentName) + if (seenAgents.has(key)) { + return + } + seenAgents.add(key) + const { pids } = await this.#terminationRoots(agentName, tracked) + const processes = [] + for (const pid of pids) { + const identity = await this.#processIdentityReader(pid) + if (identity && identity.cmdline.includes(agentName)) { + processes.push({ ...identity, agentName }) + } + } + agents.push({ + name: agentName, + role: tracked.spec.role, + issue, + sessionRef: tracked.sessionRef, + pids, + processes, + }) + } + if (!empty) { for (const record of this.#batch.inFlight) { if (record.dryRun) continue for (const [agentName, tracked] of record.agents) { - const { pids } = await this.#terminationRoots(agentName, tracked) - const processes = [] - for (const pid of pids) { - const identity = await this.#processIdentityReader(pid) - if (identity && identity.cmdline.includes(agentName)) { - processes.push({ ...identity, agentName }) - } - } - agents.push({ - name: agentName, - role: tracked.spec.role, - issue: record.issue, - sessionRef: tracked.sessionRef, - pids, - processes, - }) + await appendAgent(record.issue, agentName, tracked) } } } + for (const agent of this.#dispatchFailureReaperHandoffs.values()) { + await appendAgent(agent.issue, agent.name, agent.tracked) + } const registry: FactoryInFlightRegistry = { pid: process.pid, @@ -2001,6 +2058,15 @@ const contextualError = (context: string, error: unknown): Error => { return wrapped } +const registryHandoffKey = (issue: IssueRef, agentName: string): string => + `${issueKey(issue)}:${agentName}` + +const cloneTrackedAgent = (tracked: TrackedAgent): TrackedAgent => ({ + spec: { ...tracked.spec }, + result: tracked.result ? { ...tracked.result } : undefined, + sessionRef: tracked.sessionRef, +}) + const parseSlackReply = (path: string, content: unknown, botUserId: string): SlackReply | undefined => { const raw = asRecord(parseJsonContent(content)) ?? {} const payload = wrappedPayload(raw)