From 5c62ecd32f84b81841db9824b622a8f7a46887f6 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 20:58:18 +0200 Subject: [PATCH 1/5] Terminate factory agent processes on release --- .../src/fleet/internal-fleet-client.test.ts | 58 ++++- .../src/fleet/internal-fleet-client.ts | 73 +++++- .../src/orchestrator/factory.test.ts | 245 +++++++++++++++++- .../factory-sdk/src/orchestrator/factory.ts | 70 ++++- .../src/orchestrator/reaper.test.ts | 62 ++++- .../factory-sdk/src/orchestrator/reaper.ts | 160 ++++++++++-- packages/factory-sdk/src/ports/fleet.ts | 1 + packages/factory-sdk/src/types.ts | 3 + 8 files changed, 632 insertions(+), 40 deletions(-) diff --git a/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts b/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts index ba861812..302f6d51 100644 --- a/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts +++ b/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts @@ -5,6 +5,7 @@ import type { BrokerEvent, SendMessageInput, SpawnPtyInput } from '@agent-relay/ import { InternalFleetClient, type HarnessDriverClientLike } from './internal-fleet-client' class FakeHarnessDriverClient implements HarnessDriverClientLike { + brokerPid: number | undefined readonly spawned: SpawnPtyInput[] = [] readonly released: Array<{ name: string; reason?: string }> = [] readonly sent: SendMessageInput[] = [] @@ -14,12 +15,13 @@ class FakeHarnessDriverClient implements HarnessDriverClientLike { readonly exitListeners = new Set<(agent: { name: string; sessionId?: string }) => void>() connectEventsCalls = 0 - agents: Array<{ name: string }> = [] + agents: Array<{ name: string; pid?: number }> = [] nextSessionRef = 'session-1' + nextPid: number | undefined - async spawnPty(input: SpawnPtyInput): Promise<{ name: string; session_ref: string }> { + async spawnPty(input: SpawnPtyInput): Promise<{ name: string; session_ref: string; pid?: number }> { this.spawned.push(input) - this.agents.push({ name: input.name }) + this.agents.push({ name: input.name, pid: this.nextPid }) return { name: input.name, session_ref: this.nextSessionRef } } @@ -28,7 +30,7 @@ class FakeHarnessDriverClient implements HarnessDriverClientLike { return { name } } - async listAgents(): Promise> { + async listAgents(): Promise> { return this.agents } @@ -119,6 +121,54 @@ describe('InternalFleetClient', () => { ]) }) + it('falls back to roster PID when the immediate spawn handle omits pid', async () => { + const harness = new FakeHarnessDriverClient() + harness.nextPid = 901969 + const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) + + await expect( + fleet.spawn({ + name: 'ar-1-impl', + capability: 'spawn:codex', + node: 'self', + }), + ).resolves.toEqual({ name: 'ar-1-impl', sessionRef: 'session-1', pid: 901969 }) + }) + + it('retries roster PID lookup when broker spawned-list registration lags spawn ack', async () => { + vi.useFakeTimers() + try { + const harness = new FakeHarnessDriverClient() + harness.nextPid = 901969 + let listCalls = 0 + harness.listAgents = async () => { + listCalls += 1 + return listCalls === 1 ? [{ name: 'ar-1-impl' }] : harness.agents + } + const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) + + const spawned = fleet.spawn({ + name: 'ar-1-impl', + capability: 'spawn:codex', + node: 'self', + }) + await vi.advanceTimersByTimeAsync(75) + + await expect(spawned).resolves.toEqual({ name: 'ar-1-impl', sessionRef: 'session-1', pid: 901969 }) + expect(listCalls).toBe(2) + } finally { + vi.useRealTimers() + } + }) + + it('surfaces the broker pid as protected process state', async () => { + const harness = new FakeHarnessDriverClient() + harness.brokerPid = 68009 + const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) + + await expect(fleet.protectedPids()).resolves.toEqual([68009]) + }) + it('maps claude capability and per-spawn cwd', async () => { const harness = new FakeHarnessDriverClient() const fleet = new InternalFleetClient({ client: harness, cwd: '/default' }) diff --git a/packages/factory-sdk/src/fleet/internal-fleet-client.ts b/packages/factory-sdk/src/fleet/internal-fleet-client.ts index 48544c47..24b7ec5d 100644 --- a/packages/factory-sdk/src/fleet/internal-fleet-client.ts +++ b/packages/factory-sdk/src/fleet/internal-fleet-client.ts @@ -1,4 +1,6 @@ import { HarnessDriverClient } from '@agent-relay/harness-driver' +import { readFile } from 'node:fs/promises' +import { join } from 'node:path' import type { BrokerEvent, ListAgent, SendMessageInput, SpawnPtyInput } from '@agent-relay/harness-driver' @@ -7,13 +9,14 @@ import type { Logger } from '../ports/system' type SpawnedHandleLike = { name: string; sessionId?: string; session_ref?: string; sessionRef?: string; pid?: number } type HarnessEventListener = (event: BrokerEvent) => void -type DriverAgentLike = { name: string; sessionId?: string } +type DriverAgentLike = { name: string; sessionId?: string; pid?: number } type DriverDeliveryEventLike = BrokerEvent export interface HarnessDriverClientLike { + readonly brokerPid?: number spawnPty(input: SpawnPtyInput): Promise release(name: string, reason?: string): Promise<{ name: string }> - listAgents(): Promise>> + listAgents(): Promise>> sendMessage(input: SendMessageInput): Promise<{ event_id: string; targets?: string[] }> sendInput(name: string, data: string): Promise connectEvents?(sinceSeq?: number): void @@ -49,10 +52,13 @@ const selfNode: RosterEntry['nodes'][number] = { capabilities: ['spawn:claude', 'spawn:codex'], live: true, } +const PID_RESOLVE_ATTEMPTS = 3 +const PID_RESOLVE_BACKOFF_MS = 75 export class InternalFleetClient implements FleetClient { readonly #client: HarnessDriverClientLike readonly #cwd?: string + readonly #connectionPath?: string readonly #resumeCapability: Capability readonly #logger?: Logger readonly #agentExitListeners = new Set() @@ -72,6 +78,7 @@ export class InternalFleetClient implements FleetClient { constructor(options: InternalFleetClientOptions = {}) { this.#cwd = options.cwd + this.#connectionPath = options.connectionPath this.#resumeCapability = options.resumeCapability ?? 'spawn:codex' this.#logger = options.logger this.#client = options.client ?? HarnessDriverClient.connect({ cwd: options.cwd, connectionPath: options.connectionPath }) @@ -93,7 +100,7 @@ export class InternalFleetClient implements FleetClient { this.#clearAgentExitLatch(handle.name) - return spawnResultFrom(handle) + return spawnResultFrom(handle, await this.#pidForAgent(handle)) } async resume(input: { name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }): Promise { @@ -109,7 +116,7 @@ export class InternalFleetClient implements FleetClient { this.#clearAgentExitLatch(handle.name) - return { ...spawnResultFrom(handle), sessionRef: sessionRefFrom(handle) ?? input.sessionRef } + return { ...spawnResultFrom(handle, await this.#pidForAgent(handle)), sessionRef: sessionRefFrom(handle) ?? input.sessionRef } } async release(name: string, reason?: string): Promise { @@ -124,6 +131,52 @@ export class InternalFleetClient implements FleetClient { } } + async protectedPids(): Promise { + const pids = new Set() + if (Number.isInteger(this.#client.brokerPid) && this.#client.brokerPid! > 0) { + pids.add(this.#client.brokerPid!) + } + const connectionPid = await this.#connectionFilePid() + if (connectionPid) { + pids.add(connectionPid) + } + return [...pids].sort((a, b) => a - b) + } + + async #pidForAgent(handle: SpawnedHandleLike): Promise { + if (typeof handle.pid === 'number') { + return handle.pid + } + + try { + for (let attempt = 1; attempt <= PID_RESOLVE_ATTEMPTS; attempt += 1) { + const agent = (await this.#client.listAgents()).find((candidate) => candidate.name === handle.name) + if (typeof agent?.pid === 'number') { + return agent.pid + } + if (attempt < PID_RESOLVE_ATTEMPTS) { + await sleep(PID_RESOLVE_BACKOFF_MS) + } + } + return undefined + } catch (error) { + this.#logger?.warn?.('[factory-sdk] unable to resolve spawned agent pid from roster', error) + return undefined + } + } + + async #connectionFilePid(): Promise { + const path = this.#connectionPath ?? connectionPathForCwd(this.#cwd) + if (!path) return undefined + try { + const parsed = JSON.parse(await readFile(path, 'utf8')) as { pid?: unknown } + const pid = parsed.pid + return typeof pid === 'number' && Number.isInteger(pid) && pid > 0 ? pid : undefined + } catch { + return undefined + } + } + async sendMessage(input: SendInput): Promise { await this.#client.sendMessage(messageInputFrom(input)) } @@ -351,6 +404,14 @@ export class InternalFleetClient implements FleetClient { } } +function connectionPathForCwd(cwd: string | undefined): string | undefined { + const stateDir = process.env.AGENT_RELAY_STATE_DIR + if (stateDir) return join(stateDir, 'connection.json') + return cwd ? join(cwd, '.agentworkforce', 'relay', 'connection.json') : undefined +} + +const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)) + function assertSelfNode(node: SpawnInput['node']): void { if (node && node !== 'self') { throw new Error(`InternalFleetClient only supports node 'self' tonight; received ${node}`) @@ -361,11 +422,11 @@ function sessionRefFrom(handle: SpawnedHandleLike): string | undefined { return handle.session_ref ?? handle.sessionRef ?? handle.sessionId } -function spawnResultFrom(handle: SpawnedHandleLike): SpawnResult { +function spawnResultFrom(handle: SpawnedHandleLike, resolvedPid = handle.pid): SpawnResult { const result: SpawnResult = { name: handle.name } const sessionRef = sessionRefFrom(handle) if (sessionRef) result.sessionRef = sessionRef - if (typeof handle.pid === 'number') result.pid = handle.pid + if (typeof resolvedPid === 'number') result.pid = resolvedPid return result } diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index e00de802..bac59524 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it, vi } from 'vitest' import { mkdtemp, readFile, rm } from 'node:fs/promises' import { tmpdir } from 'node:os' import { join } from 'node:path' +import type { BrokerEvent, SendMessageInput, SpawnPtyInput } from '@agent-relay/harness-driver' import { FactoryConfigSchema, @@ -19,6 +20,7 @@ import { FakeFleetClient, FakeMountClient } from '../testing' import type { CloseProbePrInput, LinearIssue } from '../index' import { BatchTracker } from './batch-tracker' import { keyFromPath } from './factory' +import { InternalFleetClient, type HarnessDriverClientLike } from '../fleet/internal-fleet-client' const ready = 'b9bec744-b60c-4745-8022-d90d6ab59ae3' const implementing = '39b9881d-1196-4c95-8b80-a20f0c7263f7' @@ -185,6 +187,83 @@ class PidFleetClient extends FakeFleetClient { } } +class CapturedPidFleetClient extends FakeFleetClient { + readonly plans: Map + + constructor(plans: SpawnResult[]) { + super() + this.plans = new Map(plans.map((plan) => [plan.name, plan])) + } + + override async spawn(input: SpawnInput): Promise { + this.spawns.push(input) + const planned = this.plans.get(input.name) + return { + name: input.name, + sessionRef: planned?.sessionRef ?? `session-${input.name}`, + pid: planned?.pid, + pids: planned?.pids, + } + } +} + +class RosterPidHarnessClient implements HarnessDriverClientLike { + readonly brokerPid = 68009 + readonly spawned: SpawnPtyInput[] = [] + readonly releases: Array<{ name: string; reason?: string }> = [] + readonly sent: SendMessageInput[] = [] + readonly inputs: Array<{ name: string; data: string }> = [] + readonly eventListeners = new Set<(event: BrokerEvent) => void>() + readonly agents = new Map() + readonly pidsByName = new Map() + + async spawnPty(input: SpawnPtyInput): Promise<{ name: string; session_ref: string }> { + this.spawned.push(input) + this.agents.set(input.name, { name: input.name, pid: this.pidsByName.get(input.name) }) + return { name: input.name, session_ref: `session-${input.name}` } + } + + async release(name: string, reason?: string): Promise<{ name: string }> { + this.releases.push({ name, reason }) + this.agents.delete(name) + return { name } + } + + async listAgents(): Promise> { + return [...this.agents.values()] + } + + async sendMessage(input: SendMessageInput): Promise<{ event_id: string; targets?: string[] }> { + this.sent.push(input) + const eventId = `event-${this.sent.length}` + this.emit({ kind: 'delivery_injected', event_id: eventId, name: input.to } as BrokerEvent) + return { event_id: eventId, targets: [input.to] } + } + + async sendInput(name: string, data: string): Promise { + this.inputs.push({ name, data }) + } + + connectEvents(): void {} + + onEvent(listener: (event: BrokerEvent) => void): () => void { + this.eventListeners.add(listener) + return () => { + this.eventListeners.delete(listener) + } + } + + addListener(): () => void { + return () => {} + } + + emit(event: BrokerEvent): void { + for (const listener of this.eventListeners) { + listener(event) + } + } +} + class CountingEventsMount extends FakeMountClient { getEventsCalls = 0 @@ -760,8 +839,29 @@ describe('FactoryLoop', () => { it('stop releases each in-flight factory-dispatched agent', async () => { const mount = new FakeMountClient({ [issuePath(60)]: issueFile(60) }) - const fleet = new FakeFleetClient() - const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + const fleet = new CapturedPidFleetClient([ + { name: 'ar-60-impl', sessionRef: 'session-901969', pid: 901969 }, + { name: 'ar-60-review', sessionRef: 'session-902338', pid: 902338 }, + ]) + const children = new Map([ + [901969, [901970]], + [902338, [902339]], + ]) + const alive = new Set([901969, 901970, 902338, 902339]) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + terminationGraceMs: 0, + readChildPids: async (pid) => children.get(pid) ?? [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!alive.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + if (signal === 'SIGKILL') alive.delete(pid) + return true + }, + }) await fleet.spawn({ name: 'external-worker', capability: 'spawn:codex', task: 'external', model: 'codex' }) await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(60), issueFile(60)))) @@ -771,6 +871,103 @@ describe('FactoryLoop', () => { { name: 'ar-60-impl', reason: 'factory-stopped' }, { name: 'ar-60-review', reason: 'factory-stopped' }, ]) + expect(killed.filter((entry) => entry.signal === 'SIGTERM').map((entry) => entry.pid).sort((a, b) => a - b)).toEqual([ + 901969, + 901970, + 902338, + 902339, + ]) + expect(killed.filter((entry) => entry.signal === 'SIGKILL').map((entry) => entry.pid).sort((a, b) => a - b)).toEqual([ + 901969, + 901970, + 902338, + 902339, + ]) + expect(alive).toEqual(new Set()) + }) + + it('stop terminates trees using roster PID fallback when spawn ack omits pid', async () => { + const mount = new FakeMountClient({ [issuePath(63)]: issueFile(63) }) + const harness = new RosterPidHarnessClient() + harness.pidsByName.set('ar-63-impl', 901969) + harness.pidsByName.set('ar-63-review', 902338) + const fleet = new InternalFleetClient({ client: harness, cwd: '/work/pear' }) + const brokerParentPid = 68009 + const children = new Map([ + [901969, [901970, brokerParentPid]], + [902338, [902339]], + ]) + const alive = new Set([brokerParentPid, 901969, 901970, 902338, 902339]) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + terminationGraceMs: 0, + readChildPids: async (pid) => children.get(pid) ?? [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!alive.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + if (signal === 'SIGKILL') alive.delete(pid) + return true + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(63), issueFile(63)))) + await factory.stop() + + expect(harness.spawned).toHaveLength(2) + expect(harness.releases).toEqual([ + { name: 'ar-63-impl', reason: 'factory-stopped' }, + { name: 'ar-63-review', reason: 'factory-stopped' }, + ]) + expect(killed.filter((entry) => entry.signal === 'SIGTERM').map((entry) => entry.pid).sort((a, b) => a - b)).toEqual([ + 901969, + 901970, + 902338, + 902339, + ]) + expect(killed.some((entry) => entry.pid === brokerParentPid)).toBe(false) + expect(alive).toEqual(new Set([brokerParentPid])) + }) + + it('stop reports missing terminate roots instead of silently certifying a no-op', async () => { + const mount = new FakeMountClient({ [issuePath(65)]: issueFile(65) }) + const fleet = new CapturedPidFleetClient([ + { name: 'ar-65-impl', sessionRef: 'session-ar-65-impl' }, + { name: 'ar-65-review', sessionRef: 'session-ar-65-review' }, + ]) + const errors: unknown[][] = [] + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + terminationGraceMs: 0, + readChildPids: async () => [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + return true + }, + logger: { + error: (...args: unknown[]) => errors.push(args), + warn: () => undefined, + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(65), issueFile(65)))) + await factory.stop() + + expect(fleet.releases).toEqual([ + { name: 'ar-65-impl', reason: 'factory-stopped' }, + { name: 'ar-65-review', reason: 'factory-stopped' }, + ]) + expect(killed).toEqual([]) + expect(factory.status().counters.agentTerminateMissingPid).toBe(2) + expect(errors).toEqual([ + ['[factory] no pid available to terminate ar-65-impl during stop', expect.objectContaining({ agentName: 'ar-65-impl' })], + ['[factory] no pid available to terminate ar-65-review during stop', expect.objectContaining({ agentName: 'ar-65-review' })], + ]) }) it('stop swallows one release failure and still releases others plus tears down listeners', async () => { @@ -1528,6 +1725,50 @@ describe('FactoryLoop', () => { ]) }) + it('completion releases and terminates tracked pair process trees', async () => { + const mount = new FakeMountClient({ [issuePath(64)]: issueFile(64) }) + const fleet = new CapturedPidFleetClient([ + { name: 'ar-64-impl', sessionRef: 'session-901969', pid: 901969 }, + { name: 'ar-64-review', sessionRef: 'session-902338', pid: 902338 }, + ]) + const children = new Map([[901969, [901970]]]) + const alive = new Set([901969, 901970, 902338]) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + terminationGraceMs: 0, + readChildPids: async (pid) => children.get(pid) ?? [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!alive.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + if (signal === 'SIGKILL') alive.delete(pid) + return true + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(64), issueFile(64)))) + fleet.emitAgentExit('ar-64-impl', 'issue-done') + await vi.waitFor(() => expect(killed.filter((entry) => entry.signal === 'SIGKILL')).toHaveLength(3)) + + expect(fleet.releases).toEqual([ + { name: 'ar-64-impl', reason: 'issue-done' }, + { name: 'ar-64-review', reason: 'issue-done' }, + ]) + expect(killed.filter((entry) => entry.signal === 'SIGTERM').map((entry) => entry.pid).sort((a, b) => a - b)).toEqual([ + 901969, + 901970, + 902338, + ]) + expect(killed.filter((entry) => entry.signal === 'SIGKILL').map((entry) => entry.pid).sort((a, b) => a - b)).toEqual([ + 901969, + 901970, + 902338, + ]) + expect(alive).toEqual(new Set()) + }) + it('does not close probes for non-never merge policies', async () => { const markedMount = new FakeMountClient({ [issuePath(19)]: issueFile(19) }) const markedFleet = new FakeFleetClient() diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 777fe343..686da9a6 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -32,8 +32,9 @@ import type { } from '../types' import { MountGithubRead, MountLinearWriteback, MountSlackWriteback } from '../writeback' import { asRecord, parseJsonContent, stableHash, wrappedPayload } from '../writeback/shared' -import { BatchTracker, type InFlightIssue, issueKey } from './batch-tracker' +import { BatchTracker, type InFlightIssue, issueKey, type TrackedAgent } from './batch-tracker' import { readProcessIdentity } from './process-identity' +import { terminatePids } from './reaper' type FactoryEvent = 'issue-queued' | 'dispatched' | 'issue-done' | 'writeback-verified' | 'error' type Listener = (payload: FactoryEventPayload) => void @@ -91,6 +92,9 @@ export class FactoryLoop implements Factory { readonly #logger: Logger readonly #clock: Clock readonly #processIdentityReader: typeof readProcessIdentity + readonly #kill: (pid: number, signal?: NodeJS.Signals | 0) => boolean + readonly #readChildPids: ((pid: number) => Promise) | undefined + readonly #terminationGraceMs: number | undefined readonly #batch: BatchTracker readonly #listeners = new Map>() readonly #counters: Record = {} @@ -137,6 +141,9 @@ export class FactoryLoop implements Factory { this.#logger = ports.logger ?? console this.#clock = ports.clock ?? realClock this.#processIdentityReader = ports.processIdentityReader ?? readProcessIdentity + this.#kill = ports.kill ?? process.kill + this.#readChildPids = ports.readChildPids + this.#terminationGraceMs = ports.terminationGraceMs this.#batch = new BatchTracker(config.batchSize) this.#wireFleetEvents() } @@ -731,24 +738,69 @@ export class FactoryLoop implements Factory { } async #releaseInFlightAgents(reason: string): Promise { - const agentNames = new Set() + const agents = new Map() for (const record of this.#batch.inFlight) { if (record.dryRun) { continue } - for (const agentName of record.agents.keys()) { - agentNames.add(agentName) + for (const [agentName, tracked] of record.agents) { + agents.set(agentName, tracked) } } - await Promise.all([...agentNames].map(async (agentName) => { + await this.#releaseAndTerminateAgents([...agents], reason, 'stop') + await this.#writeInFlightRegistry(undefined, undefined, true) + } + + async #releaseAndTerminateAgents( + agents: Array<[string, TrackedAgent]>, + reason: string, + context: 'stop' | 'completion', + ): Promise { + const protectedPids = await this.#protectedPids() + await Promise.all(agents.map(async ([agentName, tracked]) => { + const pids = pidsFromSpawnResult(tracked.result) + if (pids.length === 0) { + this.#increment('agentTerminateMissingPid') + this.#logger.error?.(`[factory] no pid available to terminate ${agentName} during ${context}`, { + agentName, + reason, + sessionRef: tracked.sessionRef, + }) + } + try { await this.#fleet.release(agentName, reason) } catch (error) { - this.#logger.warn?.(`[factory] failed to release ${agentName} during stop`, error) + this.#logger.warn?.(`[factory] failed to release ${agentName} during ${context}`, error) + } + + if (pids.length === 0) { + return + } + + const report = await terminatePids(pids, { + kill: this.#kill, + readChildPids: this.#readChildPids, + sleep: this.#clock.sleep, + termGraceMs: this.#terminationGraceMs, + protectedPids, + }) + for (const skipped of report.skipped) { + if (skipped.reason !== 'pid not running') { + this.#logger.warn?.(`[factory] failed to terminate pid ${skipped.pid} for ${agentName} during ${context}`, skipped.reason) + } } })) - await this.#writeInFlightRegistry(undefined, undefined, true) + } + + async #protectedPids(): Promise { + try { + return await this.#fleet.protectedPids?.() ?? [] + } catch (error) { + this.#logger.warn?.('[factory] failed to resolve protected fleet pids', error) + return [] + } } async #writeInFlightRegistry( @@ -1070,9 +1122,7 @@ export class FactoryLoop implements Factory { await this.#closeProbeIfRequired(issue) } - for (const agent of record.agents.keys()) { - await this.#fleet.release(agent, 'issue-done') - } + await this.#releaseAndTerminateAgents([...record.agents], 'issue-done', 'completion') this.#increment('done') this.#emit('issue-done', { issue: record.issue }) diff --git a/packages/factory-sdk/src/orchestrator/reaper.test.ts b/packages/factory-sdk/src/orchestrator/reaper.test.ts index 56d592e2..d025fc9b 100644 --- a/packages/factory-sdk/src/orchestrator/reaper.test.ts +++ b/packages/factory-sdk/src/orchestrator/reaper.test.ts @@ -5,7 +5,7 @@ import { join } from 'node:path' import { describe, expect, it, vi } from 'vitest' import type { FactoryInFlightRegistry, FactoryLoopHeartbeat } from '../types' -import { FactoryReaper, reapFactoryOrphansOnce } from './reaper' +import { FactoryReaper, reapFactoryOrphansOnce, terminatePids } from './reaper' const writeJson = async (path: string, value: unknown): Promise => { await writeFile(path, `${JSON.stringify(value, null, 2)}\n`, 'utf8') @@ -60,6 +60,7 @@ describe('factory reaper', () => { expect(report.reaped).toEqual([{ pid: 111, signals: ['SIGTERM', 'SIGKILL'] }]) expect(killed).toEqual([ + { pid: 111, signal: 0 }, { pid: 111, signal: 0 }, { pid: 111, signal: 'SIGTERM' }, { pid: 111, signal: 0 }, @@ -71,6 +72,65 @@ describe('factory reaper', () => { } }) + it('terminates a whole process tree before killing the parent root', async () => { + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const brokerParentPid = 68009 + const live = new Set([brokerParentPid, 901969, 901970, 901971, 901972]) + const children = new Map([ + [901969, [901970, 901971, brokerParentPid]], + [901970, [901972]], + ]) + + const report = await terminatePids([901969], { + termGraceMs: 0, + protectedPids: [brokerParentPid], + readChildPids: async (pid) => children.get(pid) ?? [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!live.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + if (signal === 'SIGKILL') live.delete(pid) + return true + }, + }) + + expect(report.terminated.map((entry) => entry.pid)).toEqual([901972, 901970, 901971, 901969]) + expect(killed.filter((entry) => entry.signal === 'SIGTERM').map((entry) => entry.pid)).toEqual([ + 901972, + 901970, + 901971, + 901969, + ]) + expect(killed.some((entry) => entry.pid === brokerParentPid)).toBe(false) + expect(live).toEqual(new Set([brokerParentPid])) + }) + + it('continues terminating later PIDs when an earlier PID is already gone', async () => { + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const live = new Set([222]) + + const report = await terminatePids([111, 222], { + termGraceMs: 0, + readChildPids: async () => [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!live.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + if (signal === 'SIGKILL') live.delete(pid) + return true + }, + }) + + expect(report.skipped).toEqual([{ pid: 111, reason: 'pid not running' }]) + expect(report.terminated).toEqual([{ pid: 222, signals: ['SIGTERM', 'SIGKILL'] }]) + expect(killed).toEqual([ + { pid: 111, signal: 0 }, + { pid: 222, signal: 0 }, + { pid: 222, signal: 'SIGTERM' }, + { pid: 222, signal: 0 }, + { pid: 222, signal: 'SIGKILL' }, + ]) + expect(live).toEqual(new Set()) + }) + it('does not kill a recycled PID whose current identity no longer matches the registry', async () => { const root = await mkdtemp(join(tmpdir(), 'factory-reaper-reuse-')) try { diff --git a/packages/factory-sdk/src/orchestrator/reaper.ts b/packages/factory-sdk/src/orchestrator/reaper.ts index 0fa3cd79..b72e3f73 100644 --- a/packages/factory-sdk/src/orchestrator/reaper.ts +++ b/packages/factory-sdk/src/orchestrator/reaper.ts @@ -1,4 +1,6 @@ import { readFile } from 'node:fs/promises' +import { execFile } from 'node:child_process' +import { promisify } from 'node:util' import { parseJsonContent } from '../writeback/shared' import type { Clock, Logger } from '../ports' @@ -6,6 +8,8 @@ import type { FactoryInFlightRegistry, FactoryInFlightRegistryProcess } from '.. import { checkFactoryLoopLiveness, readFactoryLoopHeartbeat } from './factory' import { readProcessIdentity, type ProcessIdentity } from './process-identity' +const execFileAsync = promisify(execFile) + export interface FactoryReaperOptions { heartbeatPath: string registryPath: string @@ -26,6 +30,20 @@ export interface FactoryReaperReport { skipped: Array<{ pid?: number; reason: string }> } +export interface TerminatePidsOptions { + termGraceMs?: number + sleep?: (ms: number) => Promise + kill?: (pid: number, signal?: NodeJS.Signals | 0) => boolean + readChildPids?: (pid: number) => Promise + readParentPid?: (pid: number) => Promise + protectedPids?: number[] +} + +export interface TerminatePidsReport { + terminated: Array<{ pid: number; signals: Array }> + skipped: Array<{ pid: number; reason: string }> +} + export async function readFactoryInFlightRegistry(path: string): Promise { try { return parseJsonContent(await readFile(path, 'utf8')) as FactoryInFlightRegistry @@ -34,6 +52,119 @@ export async function readFactoryInFlightRegistry(path: string): Promise { + const kill = opts.kill ?? process.kill + const sleep = opts.sleep ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms))) + const termGraceMs = opts.termGraceMs ?? 1_000 + const terminated: TerminatePidsReport['terminated'] = [] + const skipped: TerminatePidsReport['skipped'] = [] + const orderedPids = await pidTreePostOrder(pids, opts.readChildPids ?? readChildPids) + const protectedPids = await protectedPidSet(opts.protectedPids ?? [], opts.readParentPid ?? readParentPid) + + for (const pid of orderedPids) { + if (protectedPids.has(pid)) { + skipped.push({ pid, reason: 'protected pid' }) + continue + } + if (!isPidLive(pid, kill)) { + skipped.push({ pid, reason: 'pid not running' }) + continue + } + + const signals: Array = [] + try { + kill(pid, 'SIGTERM') + signals.push('SIGTERM') + if (termGraceMs > 0) { + await sleep(termGraceMs) + } + if (isPidLive(pid, kill)) { + kill(pid, 'SIGKILL') + signals.push('SIGKILL') + } + terminated.push({ pid, signals }) + } catch (error) { + skipped.push({ pid, reason: error instanceof Error ? error.message : String(error) }) + } + } + + return { terminated, skipped } +} + +async function protectedPidSet( + explicitPids: number[], + readParent: (pid: number) => Promise, +): Promise> { + const protectedPids = new Set() + for (const pid of explicitPids) { + if (Number.isInteger(pid) && pid > 0) protectedPids.add(pid) + } + let current: number | undefined = process.pid + while (current && current > 0 && !protectedPids.has(current)) { + protectedPids.add(current) + try { + current = await readParent(current) + } catch { + break + } + } + return protectedPids +} + +async function pidTreePostOrder( + roots: number[], + readChildren: (pid: number) => Promise, +): Promise { + const ordered: number[] = [] + const seen = new Set() + const visit = async (pid: number): Promise => { + if (!Number.isInteger(pid) || pid <= 0 || seen.has(pid)) return + seen.add(pid) + let children: number[] = [] + try { + children = await readChildren(pid) + } catch { + children = [] + } + for (const child of children.sort((a, b) => a - b)) { + await visit(child) + } + ordered.push(pid) + } + + for (const pid of [...new Set(roots)].sort((a, b) => a - b)) { + await visit(pid) + } + return ordered +} + +async function readChildPids(pid: number): Promise { + try { + const { stdout } = await execFileAsync('pgrep', ['-P', String(pid)]) + return parsePidList(stdout) + } catch (error) { + return parsePidList((error as { stdout?: string }).stdout) + } +} + +async function readParentPid(pid: number): Promise { + try { + const { stdout } = await execFileAsync('ps', ['-o', 'ppid=', '-p', String(pid)]) + const parent = Number(stdout.trim()) + return Number.isInteger(parent) && parent > 0 ? parent : undefined + } catch { + return undefined + } +} + +const parsePidList = (stdout: string | Buffer | undefined): number[] => { + const text = Buffer.isBuffer(stdout) ? stdout.toString('utf8') : stdout ?? '' + return text + .split(/\s+/u) + .map((value) => Number(value)) + .filter((pid) => Number.isInteger(pid) && pid > 0) +} + export async function reapFactoryOrphansOnce(opts: FactoryReaperOptions): Promise { const heartbeat = await readFactoryLoopHeartbeat(opts.heartbeatPath) const liveness = checkFactoryLoopLiveness(heartbeat, { nowMs: opts.nowMs ?? opts.clock?.now(), staleMs: opts.staleMs }) @@ -46,12 +177,11 @@ export async function reapFactoryOrphansOnce(opts: FactoryReaperOptions): Promis return { stale: true, reason: 'registry missing', reaped: [], skipped: [{ reason: 'registry missing' }] } } - const kill = opts.kill ?? process.kill - const sleep = opts.clock?.sleep ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms))) const termGraceMs = opts.termGraceMs ?? 1_000 const processes = registryProcesses(registry) const reaped: FactoryReaperReport['reaped'] = [] const skipped: FactoryReaperReport['skipped'] = [] + const kill = opts.kill ?? process.kill for (const processInfo of processes) { const { pid } = processInfo @@ -65,21 +195,17 @@ export async function reapFactoryOrphansOnce(opts: FactoryReaperOptions): Promis continue } - const signals: Array = [] - try { - kill(pid, 'SIGTERM') - signals.push('SIGTERM') - if (termGraceMs > 0) { - await sleep(termGraceMs) - } - if (isPidLive(pid, kill)) { - kill(pid, 'SIGKILL') - signals.push('SIGKILL') - } - reaped.push({ pid, signals }) - opts.logger?.warn?.('[factory-reaper] reaped orphaned factory pid', { pid, signals }) - } catch (error) { - skipped.push({ pid, reason: error instanceof Error ? error.message : String(error) }) + const report = await terminatePids([pid], { + kill, + sleep: opts.clock?.sleep, + termGraceMs, + }) + for (const entry of report.terminated) { + reaped.push(entry) + opts.logger?.warn?.('[factory-reaper] reaped orphaned factory pid', { pid: entry.pid, signals: entry.signals }) + } + for (const entry of report.skipped) { + skipped.push(entry) } } diff --git a/packages/factory-sdk/src/ports/fleet.ts b/packages/factory-sdk/src/ports/fleet.ts index 2e13c5c6..c83e8e26 100644 --- a/packages/factory-sdk/src/ports/fleet.ts +++ b/packages/factory-sdk/src/ports/fleet.ts @@ -33,6 +33,7 @@ export interface FleetClient { resume(input: { name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }): Promise release(name: string, reason?: string): Promise roster(): Promise + protectedPids?(): Promise sendMessage(input: SendInput): Promise waitForInjected?(input: SendInput, opts?: { timeoutMs?: number }): Promise<{ eventId: string; targets: string[] }> sendInput?(name: string, data: string): Promise diff --git a/packages/factory-sdk/src/types.ts b/packages/factory-sdk/src/types.ts index 7fa21731..19e54752 100644 --- a/packages/factory-sdk/src/types.ts +++ b/packages/factory-sdk/src/types.ts @@ -18,6 +18,9 @@ export interface FactoryPorts { logger?: Logger clock?: Clock processIdentityReader?: (pid: number) => Promise + kill?: (pid: number, signal?: NodeJS.Signals | 0) => boolean + readChildPids?: (pid: number) => Promise + terminationGraceMs?: number } export interface Factory { From 182b91cf66d470cfb80bd9d4788d9a9eeefc6d01 Mon Sep 17 00:00:00 2001 From: "agent-relay-code[bot]" Date: Fri, 12 Jun 2026 19:40:36 +0000 Subject: [PATCH 2/5] chore: apply pr-reviewer fixes for #263 --- packages/factory-sdk/src/fleet/internal-fleet-client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/factory-sdk/src/fleet/internal-fleet-client.ts b/packages/factory-sdk/src/fleet/internal-fleet-client.ts index 24b7ec5d..9fc13442 100644 --- a/packages/factory-sdk/src/fleet/internal-fleet-client.ts +++ b/packages/factory-sdk/src/fleet/internal-fleet-client.ts @@ -16,7 +16,7 @@ export interface HarnessDriverClientLike { readonly brokerPid?: number spawnPty(input: SpawnPtyInput): Promise release(name: string, reason?: string): Promise<{ name: string }> - listAgents(): Promise>> + listAgents(): Promise & { pid?: number }>> sendMessage(input: SendMessageInput): Promise<{ event_id: string; targets?: string[] }> sendInput(name: string, data: string): Promise connectEvents?(sinceSeq?: number): void From 2d5ab0dd261ebd9b8e1f26f2b6626d15e8e3036f Mon Sep 17 00:00:00 2001 From: "agent-relay-code[bot]" Date: Fri, 12 Jun 2026 20:04:13 +0000 Subject: [PATCH 3/5] chore: apply pr-reviewer fixes for #263 --- .../src/orchestrator/factory.test.ts | 51 +++++++++++++++++++ .../factory-sdk/src/orchestrator/factory.ts | 36 +++++++------ 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index bac59524..74605357 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -931,6 +931,57 @@ describe('FactoryLoop', () => { expect(alive).toEqual(new Set([brokerParentPid])) }) + it('stop discovers child pids before releasing broker sessions', async () => { + const mount = new FakeMountClient({ [issuePath(66)]: issueFile(66) }) + const fleet = new CapturedPidFleetClient([ + { name: 'ar-66-impl', sessionRef: 'session-901969', pid: 901969 }, + { name: 'ar-66-review', sessionRef: 'session-902338', pid: 902338 }, + ]) + const released = new Set() + const originalRelease = fleet.release.bind(fleet) + fleet.release = async (name, reason) => { + released.add(name) + await originalRelease(name, reason) + } + const children = new Map([ + [901969, { agent: 'ar-66-impl', pids: [901970] }], + [902338, { agent: 'ar-66-review', pids: [902339] }], + ]) + const alive = new Set([901969, 901970, 902338, 902339]) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + terminationGraceMs: 0, + readChildPids: async (pid) => { + const child = children.get(pid) + return child && !released.has(child.agent) ? child.pids : [] + }, + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!alive.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + if (signal === 'SIGKILL') alive.delete(pid) + return true + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(66), issueFile(66)))) + await factory.stop() + + expect(fleet.releases).toEqual([ + { name: 'ar-66-impl', reason: 'factory-stopped' }, + { name: 'ar-66-review', reason: 'factory-stopped' }, + ]) + expect(killed.filter((entry) => entry.signal === 'SIGTERM').map((entry) => entry.pid).sort((a, b) => a - b)).toEqual([ + 901969, + 901970, + 902338, + 902339, + ]) + expect(alive).toEqual(new Set()) + }) + it('stop reports missing terminate roots instead of silently certifying a no-op', async () => { const mount = new FakeMountClient({ [issuePath(65)]: issueFile(65) }) const fleet = new CapturedPidFleetClient([ diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 686da9a6..89220b1e 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -758,7 +758,7 @@ export class FactoryLoop implements Factory { context: 'stop' | 'completion', ): Promise { const protectedPids = await this.#protectedPids() - await Promise.all(agents.map(async ([agentName, tracked]) => { + for (const [agentName, tracked] of agents) { const pids = pidsFromSpawnResult(tracked.result) if (pids.length === 0) { this.#increment('agentTerminateMissingPid') @@ -769,29 +769,27 @@ export class FactoryLoop implements Factory { }) } + if (pids.length > 0) { + const report = await terminatePids(pids, { + kill: this.#kill, + readChildPids: this.#readChildPids, + sleep: this.#clock.sleep, + termGraceMs: this.#terminationGraceMs, + protectedPids, + }) + for (const skipped of report.skipped) { + if (skipped.reason !== 'pid not running') { + this.#logger.warn?.(`[factory] failed to terminate pid ${skipped.pid} for ${agentName} during ${context}`, skipped.reason) + } + } + } + try { await this.#fleet.release(agentName, reason) } catch (error) { this.#logger.warn?.(`[factory] failed to release ${agentName} during ${context}`, error) } - - if (pids.length === 0) { - return - } - - const report = await terminatePids(pids, { - kill: this.#kill, - readChildPids: this.#readChildPids, - sleep: this.#clock.sleep, - termGraceMs: this.#terminationGraceMs, - protectedPids, - }) - for (const skipped of report.skipped) { - if (skipped.reason !== 'pid not running') { - this.#logger.warn?.(`[factory] failed to terminate pid ${skipped.pid} for ${agentName} during ${context}`, skipped.reason) - } - } - })) + } } async #protectedPids(): Promise { From b142e92379d11a42c390e2afd8cc9ebeeadb83f3 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 23:35:26 +0200 Subject: [PATCH 4/5] Resolve factory agent pid at termination --- .../src/fleet/internal-fleet-client.test.ts | 23 +++++-------------- .../src/fleet/internal-fleet-client.ts | 12 ++++------ .../src/orchestrator/factory.test.ts | 12 ++++++---- .../factory-sdk/src/orchestrator/factory.ts | 17 +++++++++++++- packages/factory-sdk/src/ports/fleet.ts | 1 + 5 files changed, 35 insertions(+), 30 deletions(-) diff --git a/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts b/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts index 302f6d51..1336560c 100644 --- a/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts +++ b/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts @@ -121,40 +121,29 @@ describe('InternalFleetClient', () => { ]) }) - it('falls back to roster PID when the immediate spawn handle omits pid', async () => { + it('resolves an agent PID from the broker roster', async () => { const harness = new FakeHarnessDriverClient() - harness.nextPid = 901969 + harness.agents = [{ name: 'ar-1-impl', pid: 901969 }] const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) - await expect( - fleet.spawn({ - name: 'ar-1-impl', - capability: 'spawn:codex', - node: 'self', - }), - ).resolves.toEqual({ name: 'ar-1-impl', sessionRef: 'session-1', pid: 901969 }) + await expect(fleet.resolveAgentPid('ar-1-impl')).resolves.toBe(901969) }) it('retries roster PID lookup when broker spawned-list registration lags spawn ack', async () => { vi.useFakeTimers() try { const harness = new FakeHarnessDriverClient() - harness.nextPid = 901969 let listCalls = 0 harness.listAgents = async () => { listCalls += 1 - return listCalls === 1 ? [{ name: 'ar-1-impl' }] : harness.agents + return listCalls === 1 ? [{ name: 'ar-1-impl' }] : [{ name: 'ar-1-impl', pid: 901969 }] } const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) - const spawned = fleet.spawn({ - name: 'ar-1-impl', - capability: 'spawn:codex', - node: 'self', - }) + const resolved = fleet.resolveAgentPid('ar-1-impl') await vi.advanceTimersByTimeAsync(75) - await expect(spawned).resolves.toEqual({ name: 'ar-1-impl', sessionRef: 'session-1', pid: 901969 }) + await expect(resolved).resolves.toBe(901969) expect(listCalls).toBe(2) } finally { vi.useRealTimers() diff --git a/packages/factory-sdk/src/fleet/internal-fleet-client.ts b/packages/factory-sdk/src/fleet/internal-fleet-client.ts index 9fc13442..53af3761 100644 --- a/packages/factory-sdk/src/fleet/internal-fleet-client.ts +++ b/packages/factory-sdk/src/fleet/internal-fleet-client.ts @@ -100,7 +100,7 @@ export class InternalFleetClient implements FleetClient { this.#clearAgentExitLatch(handle.name) - return spawnResultFrom(handle, await this.#pidForAgent(handle)) + return spawnResultFrom(handle) } async resume(input: { name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }): Promise { @@ -116,7 +116,7 @@ export class InternalFleetClient implements FleetClient { this.#clearAgentExitLatch(handle.name) - return { ...spawnResultFrom(handle, await this.#pidForAgent(handle)), sessionRef: sessionRefFrom(handle) ?? input.sessionRef } + return { ...spawnResultFrom(handle), sessionRef: sessionRefFrom(handle) ?? input.sessionRef } } async release(name: string, reason?: string): Promise { @@ -143,14 +143,10 @@ export class InternalFleetClient implements FleetClient { return [...pids].sort((a, b) => a - b) } - async #pidForAgent(handle: SpawnedHandleLike): Promise { - if (typeof handle.pid === 'number') { - return handle.pid - } - + async resolveAgentPid(name: string): Promise { try { for (let attempt = 1; attempt <= PID_RESOLVE_ATTEMPTS; attempt += 1) { - const agent = (await this.#client.listAgents()).find((candidate) => candidate.name === handle.name) + const agent = (await this.#client.listAgents()).find((candidate) => candidate.name === name) if (typeof agent?.pid === 'number') { return agent.pid } diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 74605357..992cb734 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -219,7 +219,7 @@ class RosterPidHarnessClient implements HarnessDriverClientLike { async spawnPty(input: SpawnPtyInput): Promise<{ name: string; session_ref: string }> { this.spawned.push(input) - this.agents.set(input.name, { name: input.name, pid: this.pidsByName.get(input.name) }) + this.agents.set(input.name, { name: input.name }) return { name: input.name, session_ref: `session-${input.name}` } } @@ -230,7 +230,7 @@ class RosterPidHarnessClient implements HarnessDriverClientLike { } async listAgents(): Promise> { - return [...this.agents.values()] + return [...this.agents.values()].map((agent) => ({ ...agent, pid: this.pidsByName.get(agent.name) })) } async sendMessage(input: SendMessageInput): Promise<{ event_id: string; targets?: string[] }> { @@ -889,8 +889,6 @@ describe('FactoryLoop', () => { it('stop terminates trees using roster PID fallback when spawn ack omits pid', async () => { const mount = new FakeMountClient({ [issuePath(63)]: issueFile(63) }) const harness = new RosterPidHarnessClient() - harness.pidsByName.set('ar-63-impl', 901969) - harness.pidsByName.set('ar-63-review', 902338) const fleet = new InternalFleetClient({ client: harness, cwd: '/work/pear' }) const brokerParentPid = 68009 const children = new Map([ @@ -914,6 +912,12 @@ describe('FactoryLoop', () => { }) await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(63), issueFile(63)))) + expect(await harness.listAgents()).toEqual([ + { name: 'ar-63-impl', pid: undefined }, + { name: 'ar-63-review', pid: undefined }, + ]) + harness.pidsByName.set('ar-63-impl', 901969) + harness.pidsByName.set('ar-63-review', 902338) await factory.stop() expect(harness.spawned).toHaveLength(2) diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 89220b1e..2ed46722 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -759,7 +759,7 @@ export class FactoryLoop implements Factory { ): Promise { const protectedPids = await this.#protectedPids() for (const [agentName, tracked] of agents) { - const pids = pidsFromSpawnResult(tracked.result) + const pids = await this.#terminationRoots(agentName, tracked) if (pids.length === 0) { this.#increment('agentTerminateMissingPid') this.#logger.error?.(`[factory] no pid available to terminate ${agentName} during ${context}`, { @@ -792,6 +792,21 @@ export class FactoryLoop implements Factory { } } + async #terminationRoots(agentName: string, tracked: TrackedAgent): Promise { + const pids = pidsFromSpawnResult(tracked.result) + if (pids.length > 0) { + return pids + } + + try { + const pid = await this.#fleet.resolveAgentPid?.(agentName) + return Number.isInteger(pid) && pid! > 0 ? [pid!] : [] + } catch (error) { + this.#logger.warn?.(`[factory] failed to resolve pid for ${agentName}`, error) + return [] + } + } + async #protectedPids(): Promise { try { return await this.#fleet.protectedPids?.() ?? [] diff --git a/packages/factory-sdk/src/ports/fleet.ts b/packages/factory-sdk/src/ports/fleet.ts index c83e8e26..a81d3cd7 100644 --- a/packages/factory-sdk/src/ports/fleet.ts +++ b/packages/factory-sdk/src/ports/fleet.ts @@ -33,6 +33,7 @@ export interface FleetClient { resume(input: { name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }): Promise release(name: string, reason?: string): Promise roster(): Promise + resolveAgentPid?(name: string): Promise protectedPids?(): Promise sendMessage(input: SendInput): Promise waitForInjected?(input: SendInput, opts?: { timeoutMs?: number }): Promise<{ eventId: string; targets: string[] }> From e1ede3fca3bcbcf6ffd41a95a0c9ef58a2d6e3b4 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 23:55:38 +0200 Subject: [PATCH 5/5] Persist resolved factory agent pids in registry --- packages/factory-sdk/src/cli/fleet.ts | 4 +- .../src/fleet/internal-fleet-client.test.ts | 24 +- .../src/fleet/internal-fleet-client.ts | 14 +- .../src/orchestrator/factory.test.ts | 205 +++++++++- .../factory-sdk/src/orchestrator/factory.ts | 83 +++-- .../src/orchestrator/process-identity.test.ts | 60 +++ .../src/orchestrator/process-identity.ts | 136 +++++++ .../src/orchestrator/reaper.test.ts | 350 ++++++++++++++++++ .../factory-sdk/src/orchestrator/reaper.ts | 110 +++++- packages/factory-sdk/src/ports/fleet.ts | 7 +- packages/factory-sdk/src/ports/index.ts | 1 + packages/factory-sdk/src/types.ts | 3 +- 12 files changed, 944 insertions(+), 53 deletions(-) create mode 100644 packages/factory-sdk/src/orchestrator/process-identity.test.ts diff --git a/packages/factory-sdk/src/cli/fleet.ts b/packages/factory-sdk/src/cli/fleet.ts index c2cd1199..dbc4172d 100644 --- a/packages/factory-sdk/src/cli/fleet.ts +++ b/packages/factory-sdk/src/cli/fleet.ts @@ -109,7 +109,7 @@ export async function runFleetCli(argv: string[], deps: FleetCliDeps = {}): Prom if (!loaded) throw new Error('factory command requires config') const mount = await buildMount(loaded, deps) const factory = createFactory(loaded.config, { mount, fleet }) - return await runFactoryCommand(command, factory, mount, loaded.config, globals, out) + return await runFactoryCommand(command, factory, mount, fleet, loaded.config, globals, out) } } } catch (error) { @@ -185,6 +185,7 @@ async function runFactoryCommand( command: Extract, factory: Factory, mount: MountClient, + fleet: FleetClient, config: FactoryConfig, globals: GlobalOptions, out: Pick, @@ -217,6 +218,7 @@ async function runFactoryCommand( heartbeatPath: config.loop.heartbeatPath, registryPath: config.loop.registryPath, staleMs: config.loop.heartbeatStaleMs, + fleet, })) return 0 } diff --git a/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts b/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts index 1336560c..1d2b9a55 100644 --- a/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts +++ b/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts @@ -126,7 +126,7 @@ describe('InternalFleetClient', () => { harness.agents = [{ name: 'ar-1-impl', pid: 901969 }] const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) - await expect(fleet.resolveAgentPid('ar-1-impl')).resolves.toBe(901969) + await expect(fleet.resolveAgentPid('ar-1-impl')).resolves.toEqual({ status: 'found', pid: 901969 }) }) it('retries roster PID lookup when broker spawned-list registration lags spawn ack', async () => { @@ -143,13 +143,33 @@ describe('InternalFleetClient', () => { const resolved = fleet.resolveAgentPid('ar-1-impl') await vi.advanceTimersByTimeAsync(75) - await expect(resolved).resolves.toBe(901969) + await expect(resolved).resolves.toEqual({ status: 'found', pid: 901969 }) expect(listCalls).toBe(2) } finally { vi.useRealTimers() } }) + it('distinguishes absent agents from live agents with no PID', async () => { + vi.useFakeTimers() + try { + const absentHarness = new FakeHarnessDriverClient() + const absentFleet = new InternalFleetClient({ client: absentHarness, cwd: '/worktree' }) + const absent = absentFleet.resolveAgentPid('ar-1-impl') + await vi.advanceTimersByTimeAsync(150) + await expect(absent).resolves.toEqual({ status: 'missing' }) + + const unresolvedHarness = new FakeHarnessDriverClient() + unresolvedHarness.agents = [{ name: 'ar-1-impl' }] + const unresolvedFleet = new InternalFleetClient({ client: unresolvedHarness, cwd: '/worktree' }) + const unresolved = unresolvedFleet.resolveAgentPid('ar-1-impl') + await vi.advanceTimersByTimeAsync(150) + await expect(unresolved).resolves.toEqual({ status: 'unresolved' }) + } finally { + vi.useRealTimers() + } + }) + it('surfaces the broker pid as protected process state', async () => { const harness = new FakeHarnessDriverClient() harness.brokerPid = 68009 diff --git a/packages/factory-sdk/src/fleet/internal-fleet-client.ts b/packages/factory-sdk/src/fleet/internal-fleet-client.ts index 53af3761..d3559ecd 100644 --- a/packages/factory-sdk/src/fleet/internal-fleet-client.ts +++ b/packages/factory-sdk/src/fleet/internal-fleet-client.ts @@ -4,7 +4,7 @@ import { join } from 'node:path' import type { BrokerEvent, ListAgent, SendMessageInput, SpawnPtyInput } from '@agent-relay/harness-driver' -import type { Capability, FleetClient, RosterEntry, SendInput, SpawnInput, SpawnResult } from '../ports/fleet' +import type { AgentPidResolution, Capability, FleetClient, RosterEntry, SendInput, SpawnInput, SpawnResult } from '../ports/fleet' import type { Logger } from '../ports/system' type SpawnedHandleLike = { name: string; sessionId?: string; session_ref?: string; sessionRef?: string; pid?: number } @@ -143,21 +143,25 @@ export class InternalFleetClient implements FleetClient { return [...pids].sort((a, b) => a - b) } - async resolveAgentPid(name: string): Promise { + async resolveAgentPid(name: string): Promise { try { + let sawAgent = false for (let attempt = 1; attempt <= PID_RESOLVE_ATTEMPTS; attempt += 1) { const agent = (await this.#client.listAgents()).find((candidate) => candidate.name === name) + if (agent) { + sawAgent = true + } if (typeof agent?.pid === 'number') { - return agent.pid + return { status: 'found', pid: agent.pid } } if (attempt < PID_RESOLVE_ATTEMPTS) { await sleep(PID_RESOLVE_BACKOFF_MS) } } - return undefined + return sawAgent ? { status: 'unresolved' } : { status: 'missing' } } catch (error) { this.#logger?.warn?.('[factory-sdk] unable to resolve spawned agent pid from roster', error) - return undefined + return { status: 'unresolved' } } } diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 992cb734..6ca5c5e9 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -177,16 +177,6 @@ class ReleaseFailingFleetClient extends FakeFleetClient { } } -class PidFleetClient extends FakeFleetClient { - nextPid = 9_000 - - override async spawn(input: SpawnInput): Promise { - this.spawns.push(input) - const pid = this.nextPid++ - return { name: input.name, sessionRef: `session-${pid}`, pid } - } -} - class CapturedPidFleetClient extends FakeFleetClient { readonly plans: Map @@ -207,6 +197,23 @@ class CapturedPidFleetClient extends FakeFleetClient { } } +class UnresolvedPidFleetClient extends FakeFleetClient { + async resolveAgentPid(_name: string): Promise<{ status: 'unresolved' }> { + return { status: 'unresolved' } + } +} + +class FoundPidFleetClient extends FakeFleetClient { + constructor(readonly pidsByName: Map) { + super() + } + + async resolveAgentPid(name: string): Promise<{ status: 'found'; pid: number } | { status: 'missing' }> { + const pid = this.pidsByName.get(name) + return pid ? { status: 'found', pid } : { status: 'missing' } + } +} + class RosterPidHarnessClient implements HarnessDriverClientLike { readonly brokerPid = 68009 readonly spawned: SpawnPtyInput[] = [] @@ -643,7 +650,10 @@ describe('FactoryLoop', () => { const registryPath = join(root, 'registry.json') try { const mount = new FakeMountClient({ [issuePath(62)]: issueFile(62) }) - const fleet = new PidFleetClient() + const harness = new RosterPidHarnessClient() + harness.pidsByName.set('ar-62-impl', 9_000) + harness.pidsByName.set('ar-62-review', 9_001) + const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) const factory = createFactory(config({ loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 }, }), { @@ -659,6 +669,8 @@ describe('FactoryLoop', () => { await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(62), issueFile(62)))) + expect(harness.spawned).toHaveLength(2) + expect(harness.spawned.every((spawn) => spawn.name.startsWith('ar-62-'))).toBe(true) const registry = await readFactoryInFlightRegistry(registryPath) expect(registry).toMatchObject({ pid: process.pid, @@ -683,6 +695,36 @@ describe('FactoryLoop', () => { } }) + it('persists registry agent names when broker PID registration is still pending', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-loop-registry-pending-')) + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + try { + const mount = new FakeMountClient({ [issuePath(63)]: issueFile(63) }) + const harness = new RosterPidHarnessClient() + const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) + const factory = createFactory(config({ + loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 }, + }), { + mount, + fleet, + triage: new StaticTriage(), + processIdentityReader: async () => undefined, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(63), issueFile(63)))) + + const registry = await readFactoryInFlightRegistry(registryPath) + expect(registry?.agents).toMatchObject([ + { name: 'ar-63-impl', pids: [], processes: [] }, + { name: 'ar-63-review', pids: [], processes: [] }, + ]) + await factory.stop() + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + it('reports missing or stale loop heartbeat as not live', () => { expect(checkFactoryLoopLiveness(undefined, { nowMs: 2_000, staleMs: 1_000 })).toMatchObject({ ok: false, @@ -1025,6 +1067,147 @@ describe('FactoryLoop', () => { ]) }) + it('stop falls back to a ps-discovered agent process when broker PID is unresolved', async () => { + const mount = new FakeMountClient({ [issuePath(67)]: issueFile(67) }) + const fleet = new UnresolvedPidFleetClient() + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + terminationGraceMs: 0, + processFinder: async (agentName) => ({ + status: 'found', + identity: { + pid: agentName === 'ar-67-impl' ? 906700 : 906701, + startTime: `start-${agentName}`, + cmdline: `node --agent-name ${agentName}`, + }, + }), + readChildPids: async () => [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + return true + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(67), issueFile(67)))) + await factory.stop() + + expect(killed.filter((entry) => entry.signal === 'SIGTERM').map((entry) => entry.pid).sort((a, b) => a - b)).toEqual([ + 906700, + 906701, + ]) + expect(factory.status().counters.agentTerminateMissingPid).toBeUndefined() + }) + + it('stop uses the anchored launcher root even when the broker resolves a worker child', async () => { + const mount = new FakeMountClient({ [issuePath(69)]: issueFile(69) }) + const fleet = new FoundPidFleetClient(new Map([ + ['ar-69-impl', 906910], + ['ar-69-review', 906911], + ])) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const children = new Map([ + [906900, [906910]], + [906901, [906911]], + ]) + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + terminationGraceMs: 0, + processFinder: async (agentName) => ({ + status: 'found', + identity: { + pid: agentName === 'ar-69-impl' ? 906900 : 906901, + startTime: `launcher-${agentName}`, + cmdline: `node --agent-name ${agentName} launcher`, + }, + }), + readChildPids: async (pid) => children.get(pid) ?? [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + return true + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(69), issueFile(69)))) + await factory.stop() + + expect(killed.filter((entry) => entry.signal === 'SIGTERM').map((entry) => entry.pid)).toEqual([ + 906910, + 906900, + 906911, + 906901, + ]) + expect(factory.status().counters.agentTerminateMissingPid).toBeUndefined() + }) + + it('stop treats unresolved broker PID with no ps match as process-less', async () => { + const mount = new FakeMountClient({ [issuePath(68)]: issueFile(68) }) + const fleet = new UnresolvedPidFleetClient() + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const errors: unknown[][] = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + terminationGraceMs: 0, + processFinder: async () => ({ status: 'missing' }), + readChildPids: async () => [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + return true + }, + logger: { + error: (...args: unknown[]) => errors.push(args), + warn: () => undefined, + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(68), issueFile(68)))) + await factory.stop() + + expect(killed).toEqual([]) + expect(factory.status().counters.agentTerminateMissingPid).toBeUndefined() + expect(errors).toEqual([]) + }) + + it('stop does not count an already-exited agent as a missing live PID', async () => { + const mount = new FakeMountClient({ [issuePath(66)]: issueFile(66) }) + const harness = new RosterPidHarnessClient() + harness.pidsByName.set('ar-66-impl', 906600) + harness.pidsByName.set('ar-66-review', 906601) + const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const errors: unknown[][] = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + terminationGraceMs: 0, + readChildPids: async () => [], + kill: (pid, signal) => { + killed.push({ pid, signal }) + return true + }, + logger: { + error: (...args: unknown[]) => errors.push(args), + warn: () => undefined, + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(66), issueFile(66)))) + harness.agents.clear() + harness.pidsByName.clear() + await factory.stop() + + expect(killed).toEqual([]) + expect(factory.status().counters.agentTerminateMissingPid).toBeUndefined() + expect(errors).toEqual([]) + }) + it('stop swallows one release failure and still releases others plus tears down listeners', async () => { const mount = new TrackingEventsMount({ [issuePath(61)]: issueFile(61) }) const fleet = new ReleaseFailingFleetClient(new Set(['ar-61-impl'])) diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 2ed46722..7b12fd64 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -4,7 +4,17 @@ import { dirname } from 'node:path' import { FactoryConfigSchema, type FactoryConfig } from '../config/schema' import { LINEAR_STATE_IDS, linearByStatePath } from '../constants/linear' import { GithubMergeGate, closeProbePr, type GithubMergeGate as GithubMergeGatePort } from '../github' -import type { AgentSpec, ChangeEvent, FleetClient, LinearWriteback, MountClient, ProviderSyncStatus, SlackWriteback, Subscription } from '../ports' +import type { + AgentPidResolution, + AgentSpec, + ChangeEvent, + FleetClient, + LinearWriteback, + MountClient, + ProviderSyncStatus, + SlackWriteback, + Subscription, +} from '../ports' import type { Clock, Logger } from '../ports/system' import { isInFactoryScope } from '../safety/factory-scope' import { renderAgentTask } from '../dispatch/templates' @@ -33,12 +43,13 @@ import type { import { MountGithubRead, MountLinearWriteback, MountSlackWriteback } from '../writeback' import { asRecord, parseJsonContent, stableHash, wrappedPayload } from '../writeback/shared' import { BatchTracker, type InFlightIssue, issueKey, type TrackedAgent } from './batch-tracker' -import { readProcessIdentity } from './process-identity' +import { findAgentProcessByName, readProcessIdentity, type AgentProcessFinder } from './process-identity' import { terminatePids } from './reaper' type FactoryEvent = 'issue-queued' | 'dispatched' | 'issue-done' | 'writeback-verified' | 'error' type Listener = (payload: FactoryEventPayload) => void type SlackThreadWatcher = { stop(): Promise } +type TerminationRoots = { pids: number[]; status: AgentPidResolution['status'] } type DispatchAttemptState = { attempts: number inFlight: boolean @@ -92,6 +103,7 @@ export class FactoryLoop implements Factory { readonly #logger: Logger readonly #clock: Clock readonly #processIdentityReader: typeof readProcessIdentity + readonly #processFinder: AgentProcessFinder readonly #kill: (pid: number, signal?: NodeJS.Signals | 0) => boolean readonly #readChildPids: ((pid: number) => Promise) | undefined readonly #terminationGraceMs: number | undefined @@ -141,6 +153,10 @@ export class FactoryLoop implements Factory { this.#logger = ports.logger ?? console this.#clock = ports.clock ?? realClock this.#processIdentityReader = ports.processIdentityReader ?? readProcessIdentity + this.#processFinder = ports.processFinder ?? ((agentName, opts) => findAgentProcessByName(agentName, { + readProcessIdentity: this.#processIdentityReader, + protectedPids: opts?.protectedPids, + })) this.#kill = ports.kill ?? process.kill this.#readChildPids = ports.readChildPids this.#terminationGraceMs = ports.terminationGraceMs @@ -487,6 +503,7 @@ export class FactoryLoop implements Factory { const spawned = await this.#spawnAgent(record, spec, dryRun) agents.push({ name: spawned.name, role: spec.role }) } + await this.#writeInFlightRegistry() const comment = dispatchComment(decision, agents) if (!dryRun) { @@ -759,8 +776,8 @@ export class FactoryLoop implements Factory { ): Promise { const protectedPids = await this.#protectedPids() for (const [agentName, tracked] of agents) { - const pids = await this.#terminationRoots(agentName, tracked) - if (pids.length === 0) { + const roots = await this.#terminationRoots(agentName, tracked, protectedPids) + if (roots.pids.length === 0 && roots.status === 'unresolved') { this.#increment('agentTerminateMissingPid') this.#logger.error?.(`[factory] no pid available to terminate ${agentName} during ${context}`, { agentName, @@ -769,8 +786,8 @@ export class FactoryLoop implements Factory { }) } - if (pids.length > 0) { - const report = await terminatePids(pids, { + if (roots.pids.length > 0) { + const report = await terminatePids(roots.pids, { kill: this.#kill, readChildPids: this.#readChildPids, sleep: this.#clock.sleep, @@ -792,18 +809,45 @@ export class FactoryLoop implements Factory { } } - async #terminationRoots(agentName: string, tracked: TrackedAgent): Promise { + async #terminationRoots(agentName: string, tracked: TrackedAgent, protectedPids: number[] = []): Promise { const pids = pidsFromSpawnResult(tracked.result) + if (!this.#fleet.resolveAgentPid) { + return pids.length > 0 ? { pids, status: 'found' } : { pids: [], status: 'unresolved' } + } + + const scan = await this.#processFinder(agentName, { protectedPids }) + if ( + scan.status === 'found' && + Number.isInteger(scan.identity.pid) && + scan.identity.pid > 0 && + scan.identity.cmdline.includes(agentName) + ) { + return { pids: [scan.identity.pid], status: 'found' } + } + if (scan.status === 'ambiguous') { + this.#logger.warn?.(`[factory] ambiguous process lookup for ${agentName}`) + return { pids: [], status: 'unresolved' } + } + if (pids.length > 0) { - return pids + return { pids, status: 'found' } } try { - const pid = await this.#fleet.resolveAgentPid?.(agentName) - return Number.isInteger(pid) && pid! > 0 ? [pid!] : [] + const resolution = await this.#fleet.resolveAgentPid?.(agentName) + if (!resolution) { + return { pids: [], status: 'unresolved' } + } + if (resolution.status === 'found' && Number.isInteger(resolution.pid) && resolution.pid > 0) { + return { pids: [resolution.pid], status: 'found' } + } + if (resolution.status === 'unresolved' && scan.status === 'missing') { + return { pids: [], status: 'missing' } + } + return { pids: [], status: resolution.status } } catch (error) { this.#logger.warn?.(`[factory] failed to resolve pid for ${agentName}`, error) - return [] + return { pids: [], status: 'unresolved' } } } @@ -827,7 +871,7 @@ export class FactoryLoop implements Factory { for (const record of this.#batch.inFlight) { if (record.dryRun) continue for (const [agentName, tracked] of record.agents) { - const pids = pidsFromSpawnResult(tracked.result) + const { pids } = await this.#terminationRoots(agentName, tracked) const processes = [] for (const pid of pids) { const identity = await this.#processIdentityReader(pid) @@ -858,10 +902,6 @@ export class FactoryLoop implements Factory { await writeFile(path, `${JSON.stringify(registry, null, 2)}\n`, 'utf8') } - #hasInFlightPids(): boolean { - return this.#batch.inFlight.some(recordHasPids) - } - async #spawnAgent(record: InFlightIssue, spec: AgentSpec, dryRun: boolean): Promise<{ name: string }> { const invocationId = this.#batch.invocationIdFor(record.issue, spec) const existing = record.agents.get(spec.name) @@ -910,9 +950,6 @@ export class FactoryLoop implements Factory { ) } this.#batch.recordSpawn(record, spec, invocationId, result) - if (pidsFromSpawnResult(result).length > 0) { - await this.#writeInFlightRegistry() - } return { name: result.name } } @@ -1141,14 +1178,11 @@ export class FactoryLoop implements Factory { this.#emit('issue-done', { issue: record.issue }) await this.#stopSlackWatcher(record.issue) this.#recordDispatchTerminal(record.issue) - const completedHadPids = recordHasPids(record) const next = this.#batch.complete(record.issue) - if (completedHadPids || this.#hasInFlightPids()) { - await this.#writeInFlightRegistry() - } if (next) { await this.dispatch(next.decision, { dryRun: next.dryRun }) } + await this.#writeInFlightRegistry() } catch (error) { this.#error(error, record.issue) } @@ -1603,9 +1637,6 @@ export function isRealLinearIssue(issue: LinearIssue): boolean { const issueRef = (issue: LinearIssue): IssueRef => ({ uuid: issue.uuid, key: issue.key, path: issue.path }) -const recordHasPids = (record: InFlightIssue): boolean => - [...record.agents.values()].some((agent) => pidsFromSpawnResult(agent.result).length > 0) - const pidsFromSpawnResult = (result: { pid?: number; pids?: number[] } | undefined): number[] => { const pids = new Set() for (const pid of result?.pids ?? []) { diff --git a/packages/factory-sdk/src/orchestrator/process-identity.test.ts b/packages/factory-sdk/src/orchestrator/process-identity.test.ts new file mode 100644 index 00000000..dbdd82a9 --- /dev/null +++ b/packages/factory-sdk/src/orchestrator/process-identity.test.ts @@ -0,0 +1,60 @@ +import { describe, expect, it } from 'vitest' + +import { findAgentProcessByName, type ProcessIdentity } from './process-identity' + +describe('process identity lookup', () => { + it('matches an anchored agent-name token and rejects loose substrings', async () => { + const identities = new Map([ + [101, { pid: 101, startTime: 'start-101', cmdline: 'node --agent-name ar-1-impl worker' }], + [102, { pid: 102, startTime: 'start-102', cmdline: 'node --agent-name ar-1-impl-extra worker' }], + [103, { pid: 103, startTime: 'start-103', cmdline: 'node --agent-name ar-10-impl worker' }], + ]) + + await expect(findAgentProcessByName('ar-1-impl', { + listPidsByCommand: async () => [101, 102, 103], + readProcessIdentity: async (pid) => identities.get(pid), + })).resolves.toEqual({ + status: 'found', + identity: identities.get(101), + }) + }) + + it('fails closed when no anchored agent-name process exists', async () => { + await expect(findAgentProcessByName('ar-2-impl', { + listPidsByCommand: async () => [201], + readProcessIdentity: async (pid) => ({ pid, startTime: 'start-201', cmdline: 'node --agent-name ar-20-impl worker' }), + })).resolves.toEqual({ status: 'missing' }) + }) + + it('fails closed when multiple anchored agent-name processes match', async () => { + await expect(findAgentProcessByName('ar-3-impl', { + listPidsByCommand: async () => [301, 302], + readProcessIdentity: async (pid) => ({ pid, startTime: `start-${pid}`, cmdline: 'node --agent-name ar-3-impl worker' }), + readParentPid: async () => undefined, + })).resolves.toEqual({ status: 'ambiguous' }) + }) + + it('allows multiple anchored matches only when they form one agent tree and returns the launcher root', async () => { + const identities = new Map([ + [501, { pid: 501, startTime: 'launcher-start', cmdline: 'node --agent-name ar-5-review launcher' }], + [502, { pid: 502, startTime: 'worker-start', cmdline: 'node --agent-name ar-5-review worker' }], + ]) + + await expect(findAgentProcessByName('ar-5-review', { + listPidsByCommand: async () => [501, 502], + readProcessIdentity: async (pid) => identities.get(pid), + readParentPid: async (pid) => pid === 502 ? 501 : undefined, + })).resolves.toEqual({ + status: 'found', + identity: identities.get(501), + }) + }) + + it('excludes protected PIDs from scan results', async () => { + await expect(findAgentProcessByName('ar-4-impl', { + protectedPids: [401], + listPidsByCommand: async () => [401], + readProcessIdentity: async (pid) => ({ pid, startTime: 'broker-start', cmdline: 'node --agent-name ar-4-impl broker' }), + })).resolves.toEqual({ status: 'missing' }) + }) +}) diff --git a/packages/factory-sdk/src/orchestrator/process-identity.ts b/packages/factory-sdk/src/orchestrator/process-identity.ts index 5aff5051..70547adf 100644 --- a/packages/factory-sdk/src/orchestrator/process-identity.ts +++ b/packages/factory-sdk/src/orchestrator/process-identity.ts @@ -7,6 +7,24 @@ const execFileAsync = promisify(execFile) export type ProcessIdentity = Pick +export type AgentProcessLookupResult = + | { status: 'found'; identity: ProcessIdentity } + | { status: 'missing' } + | { status: 'ambiguous' } + +export interface AgentProcessFinderOptions { + protectedPids?: number[] +} + +export type AgentProcessFinder = (agentName: string, opts?: AgentProcessFinderOptions) => Promise + +export interface AgentProcessLookupOptions { + listPidsByCommand?: (pattern: string) => Promise + readProcessIdentity?: (pid: number) => Promise + readParentPid?: (pid: number) => Promise + protectedPids?: number[] +} + export async function readProcessIdentity(pid: number): Promise { try { const { stdout } = await execFileAsync('ps', ['-p', String(pid), '-o', 'lstart=', '-o', 'command=']) @@ -19,3 +37,121 @@ export async function readProcessIdentity(pid: number): Promise { + const readIdentity = opts.readProcessIdentity ?? readProcessIdentity + const readParent = opts.readParentPid ?? readParentPid + const protectedPids = new Set(opts.protectedPids ?? []) + const pids = await (opts.listPidsByCommand ?? listPidsByCommand)(agentNameCommandPattern(agentName)) + const matches: ProcessIdentity[] = [] + for (const pid of [...new Set(pids)].sort((a, b) => a - b)) { + if (protectedPids.has(pid)) { + continue + } + const identity = await readIdentity(pid) + if (identity && agentCommandLineMatches(identity.cmdline, agentName)) { + matches.push(identity) + } + } + if (matches.length === 0) return { status: 'missing' } + const root = await coherentAgentRoot(matches, readParent) + return root ? { status: 'found', identity: root } : { status: 'ambiguous' } +} + +function agentNameCommandPattern(agentName: string): string { + return `(^|[[:space:]])--agent-name[[:space:]=]+${escapeExtendedRegex(agentName)}([[:space:]]|$)` +} + +function agentCommandLineMatches(cmdline: string, agentName: string): boolean { + const escaped = escapeRegExp(agentName) + return new RegExp(`(^|\\s)--agent-name(\\s+|=)${escaped}(\\s|$)`, 'u').test(cmdline) +} + +async function coherentAgentRoot( + matches: ProcessIdentity[], + readParent: (pid: number) => Promise, +): Promise { + if (matches.length === 1) return matches[0] + const byPid = new Map(matches.map((identity) => [identity.pid, identity])) + const roots: ProcessIdentity[] = [] + for (const identity of matches) { + if (!await hasMatchedAncestor(identity.pid, byPid, readParent)) { + roots.push(identity) + } + } + if (roots.length !== 1) return undefined + const root = roots[0]! + for (const identity of matches) { + if (identity.pid === root.pid) continue + if (!await reachesRoot(identity.pid, root.pid, readParent)) { + return undefined + } + } + return root +} + +async function hasMatchedAncestor( + pid: number, + matches: Map, + readParent: (pid: number) => Promise, +): Promise { + let current = pid + const seen = new Set([pid]) + for (;;) { + const parent = await readParent(current) + if (!parent || seen.has(parent)) return false + if (matches.has(parent)) return true + seen.add(parent) + current = parent + } +} + +async function reachesRoot( + pid: number, + rootPid: number, + readParent: (pid: number) => Promise, +): Promise { + let current = pid + const seen = new Set([pid]) + for (;;) { + const parent = await readParent(current) + if (!parent || seen.has(parent)) return false + if (parent === rootPid) return true + seen.add(parent) + current = parent + } +} + +async function listPidsByCommand(pattern: string): Promise { + try { + const { stdout } = await execFileAsync('pgrep', ['-f', pattern]) + return parsePidList(stdout) + } catch (error) { + return parsePidList((error as { stdout?: string }).stdout) + } +} + +async function readParentPid(pid: number): Promise { + try { + const { stdout } = await execFileAsync('ps', ['-o', 'ppid=', '-p', String(pid)]) + const parent = Number(stdout.trim()) + return Number.isInteger(parent) && parent > 0 ? parent : undefined + } catch { + return undefined + } +} + +const parsePidList = (stdout: string | Buffer | undefined): number[] => { + const text = Buffer.isBuffer(stdout) ? stdout.toString('utf8') : stdout ?? '' + return text + .split(/\s+/u) + .map((value) => Number(value)) + .filter((pid) => Number.isInteger(pid) && pid > 0) +} + +const escapeRegExp = (value: string): string => value.replace(/[.*+?^${}()|[\]\\]/gu, '\\$&') + +const escapeExtendedRegex = (value: string): string => value.replace(/[.[\]()*+?{}|^$\\]/gu, '\\$&') diff --git a/packages/factory-sdk/src/orchestrator/reaper.test.ts b/packages/factory-sdk/src/orchestrator/reaper.test.ts index d025fc9b..c90e70c0 100644 --- a/packages/factory-sdk/src/orchestrator/reaper.test.ts +++ b/packages/factory-sdk/src/orchestrator/reaper.test.ts @@ -6,6 +6,7 @@ import { describe, expect, it, vi } from 'vitest' import type { FactoryInFlightRegistry, FactoryLoopHeartbeat } from '../types' import { FactoryReaper, reapFactoryOrphansOnce, terminatePids } from './reaper' +import type { FleetClient } from '../ports' const writeJson = async (path: string, value: unknown): Promise => { await writeFile(path, `${JSON.stringify(value, null, 2)}\n`, 'utf8') @@ -161,6 +162,355 @@ describe('factory reaper', () => { } }) + it('resolves and reaps registry agents that only persisted a name before factory crash', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-reaper-name-only-')) + try { + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + await writeJson(heartbeatPath, heartbeat(1_000)) + await writeJson(registryPath, { + pid: 100, + updatedAt: new Date(1_000).toISOString(), + updatedAtMs: 1_000, + agents: [{ + name: 'ar-9-impl', + role: 'implementer', + issue: { key: 'AR-9', uuid: 'uuid-9', path: '/linear/issues/AR-9__uuid-9.json' }, + pids: [], + processes: [], + }], + } satisfies FactoryInFlightRegistry) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const live = new Set([9090]) + const fleet: Pick = { + resolveAgentPid: async (name) => name === 'ar-9-impl' + ? { status: 'found', pid: 9090 } + : { status: 'missing' }, + } + + const report = await reapFactoryOrphansOnce({ + heartbeatPath, + registryPath, + staleMs: 1_000, + nowMs: 3_001, + termGraceMs: 0, + fleet, + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!live.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + return true + }, + readProcessIdentity: async (pid) => ({ pid, startTime: 'started-9090', cmdline: 'node ar-9-impl worker' }), + }) + + expect(report.reaped).toEqual([{ pid: 9090, signals: ['SIGTERM', 'SIGKILL'] }]) + expect(killed).toEqual([ + { pid: 9090, signal: 0 }, + { pid: 9090, signal: 0 }, + { pid: 9090, signal: 'SIGTERM' }, + { pid: 9090, signal: 0 }, + { pid: 9090, signal: 'SIGKILL' }, + ]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('uses the anchored launcher root before a broker-resolved worker child for name-only registry agents', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-reaper-name-only-launcher-primary-')) + try { + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + await writeJson(heartbeatPath, heartbeat(1_000)) + await writeJson(registryPath, { + pid: 100, + updatedAt: new Date(1_000).toISOString(), + updatedAtMs: 1_000, + agents: [{ + name: 'ar-15-impl', + role: 'implementer', + issue: { key: 'AR-15', uuid: 'uuid-15', path: '/linear/issues/AR-15__uuid-15.json' }, + pids: [], + processes: [], + }], + } satisfies FactoryInFlightRegistry) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const live = new Set([15_015, 15_016]) + + const report = await reapFactoryOrphansOnce({ + heartbeatPath, + registryPath, + staleMs: 1_000, + nowMs: 3_001, + termGraceMs: 0, + fleet: { resolveAgentPid: async () => ({ status: 'found', pid: 15_016 }) }, + processFinder: async () => { + return { status: 'found', identity: { pid: 15_015, startTime: 'launcher', cmdline: 'node --agent-name ar-15-impl launcher' } } + }, + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!live.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + return true + }, + readProcessIdentity: async (pid) => ({ + pid, + startTime: pid === 15_015 ? 'launcher' : 'worker', + cmdline: `node --agent-name ar-15-impl ${pid === 15_015 ? 'launcher' : 'worker'}`, + }), + readChildPids: async (pid) => pid === 15_015 ? [15_016] : [], + }) + + expect(report.reaped).toEqual([ + { pid: 15_016, signals: ['SIGTERM', 'SIGKILL'] }, + { pid: 15_015, signals: ['SIGTERM', 'SIGKILL'] }, + ]) + expect(killed.filter((entry) => entry.signal === 'SIGTERM').map((entry) => entry.pid)).toEqual([15_016, 15_015]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('falls back to a ps-discovered process when a name-only registry agent has unresolved broker PID', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-reaper-name-only-ps-')) + try { + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + await writeJson(heartbeatPath, heartbeat(1_000)) + await writeJson(registryPath, { + pid: 100, + updatedAt: new Date(1_000).toISOString(), + updatedAtMs: 1_000, + agents: [{ + name: 'ar-12-impl', + role: 'implementer', + issue: { key: 'AR-12', uuid: 'uuid-12', path: '/linear/issues/AR-12__uuid-12.json' }, + pids: [], + processes: [], + }], + } satisfies FactoryInFlightRegistry) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const live = new Set([12_012]) + const fleet: Pick = { + resolveAgentPid: async () => ({ status: 'unresolved' }), + } + + const report = await reapFactoryOrphansOnce({ + heartbeatPath, + registryPath, + staleMs: 1_000, + nowMs: 3_001, + termGraceMs: 0, + fleet, + processFinder: async () => ({ + status: 'found', + identity: { pid: 12_012, startTime: 'started-12012', cmdline: 'node --agent-name ar-12-impl worker' }, + }), + kill: (pid, signal) => { + killed.push({ pid, signal }) + if (!live.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' }) + return true + }, + readChildPids: async () => [], + readParentPid: async () => undefined, + readProcessIdentity: async (pid) => ({ pid, startTime: 'started-12012', cmdline: 'node --agent-name ar-12-impl worker' }), + }) + + expect(report.reaped).toEqual([{ pid: 12_012, signals: ['SIGTERM', 'SIGKILL'] }]) + expect(killed.filter((entry) => entry.signal === 'SIGTERM').map((entry) => entry.pid)).toEqual([12_012]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('skips a name-only registry agent when broker PID and ps fallback are both missing', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-reaper-name-only-missing-')) + try { + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + await writeJson(heartbeatPath, heartbeat(1_000)) + await writeJson(registryPath, { + pid: 100, + updatedAt: new Date(1_000).toISOString(), + updatedAtMs: 1_000, + agents: [{ + name: 'ar-13-impl', + role: 'implementer', + issue: { key: 'AR-13', uuid: 'uuid-13', path: '/linear/issues/AR-13__uuid-13.json' }, + pids: [], + processes: [], + }], + } satisfies FactoryInFlightRegistry) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + + const report = await reapFactoryOrphansOnce({ + heartbeatPath, + registryPath, + staleMs: 1_000, + nowMs: 3_001, + termGraceMs: 0, + fleet: { resolveAgentPid: async () => ({ status: 'unresolved' }) }, + processFinder: async () => ({ status: 'missing' }), + kill: (pid, signal) => { + killed.push({ pid, signal }) + return true + }, + }) + + expect(report.reaped).toEqual([]) + expect(report.skipped).toEqual([{ reason: 'pid missing for ar-13-impl' }]) + expect(killed).toEqual([]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('rejects a ps-discovered fallback process whose identity does not match the agent', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-reaper-name-only-ps-foreign-')) + try { + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + await writeJson(heartbeatPath, heartbeat(1_000)) + await writeJson(registryPath, { + pid: 100, + updatedAt: new Date(1_000).toISOString(), + updatedAtMs: 1_000, + agents: [{ + name: 'ar-14-impl', + role: 'implementer', + issue: { key: 'AR-14', uuid: 'uuid-14', path: '/linear/issues/AR-14__uuid-14.json' }, + pids: [], + processes: [], + }], + } satisfies FactoryInFlightRegistry) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + + const report = await reapFactoryOrphansOnce({ + heartbeatPath, + registryPath, + staleMs: 1_000, + nowMs: 3_001, + termGraceMs: 0, + fleet: { resolveAgentPid: async () => ({ status: 'unresolved' }) }, + processFinder: async () => ({ + status: 'found', + identity: { pid: 14_014, startTime: 'foreign-start', cmdline: 'node --agent-name foreign worker' }, + }), + kill: (pid, signal) => { + killed.push({ pid, signal }) + return true + }, + }) + + expect(report.reaped).toEqual([]) + expect(report.skipped).toEqual([{ pid: 14_014, reason: 'pid identity mismatch' }]) + expect(killed).toEqual([]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('does not reap a name-only registry agent when the resolved PID identity does not match', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-reaper-name-only-reuse-')) + try { + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + await writeJson(heartbeatPath, heartbeat(1_000)) + await writeJson(registryPath, { + pid: 100, + updatedAt: new Date(1_000).toISOString(), + updatedAtMs: 1_000, + agents: [{ + name: 'ar-10-impl', + role: 'implementer', + issue: { key: 'AR-10', uuid: 'uuid-10', path: '/linear/issues/AR-10__uuid-10.json' }, + pids: [], + processes: [], + }], + } satisfies FactoryInFlightRegistry) + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const fleet: Pick = { + resolveAgentPid: async (name) => name === 'ar-10-impl' + ? { status: 'found', pid: 10_010 } + : { status: 'missing' }, + } + + const report = await reapFactoryOrphansOnce({ + heartbeatPath, + registryPath, + staleMs: 1_000, + nowMs: 3_001, + termGraceMs: 0, + fleet, + kill: (pid, signal) => { + killed.push({ pid, signal }) + return true + }, + readProcessIdentity: async (pid) => ({ pid, startTime: 'foreign-start', cmdline: 'node foreign-worker' }), + }) + + expect(report.reaped).toEqual([]) + expect(report.skipped).toEqual([{ pid: 10_010, reason: 'pid identity mismatch' }]) + expect(killed).toEqual([]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('does not signal a name-only registry agent that resolves to a protected broker PID', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-reaper-name-only-protected-')) + try { + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + await writeJson(heartbeatPath, heartbeat(1_000)) + await writeJson(registryPath, { + pid: 100, + updatedAt: new Date(1_000).toISOString(), + updatedAtMs: 1_000, + agents: [{ + name: 'ar-11-impl', + role: 'implementer', + issue: { key: 'AR-11', uuid: 'uuid-11', path: '/linear/issues/AR-11__uuid-11.json' }, + pids: [], + processes: [], + }], + } satisfies FactoryInFlightRegistry) + const brokerPid = 68_009 + const brokerChildPid = 990_099 + const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = [] + const fleet: Pick = { + protectedPids: async () => [brokerPid], + resolveAgentPid: async (name) => name === 'ar-11-impl' + ? { status: 'found', pid: brokerPid } + : { status: 'missing' }, + } + + const report = await reapFactoryOrphansOnce({ + heartbeatPath, + registryPath, + staleMs: 1_000, + nowMs: 3_001, + termGraceMs: 0, + fleet, + kill: (pid, signal) => { + killed.push({ pid, signal }) + return true + }, + readChildPids: async (pid) => pid === brokerPid ? [brokerChildPid] : [], + readParentPid: async () => undefined, + readProcessIdentity: async (pid) => ({ pid, startTime: 'broker-start', cmdline: `node ar-11-impl broker ${pid}` }), + }) + + expect(report.reaped).toEqual([]) + expect(report.skipped).toEqual([{ pid: brokerPid, reason: 'protected pid' }]) + expect(killed).toEqual([ + { pid: brokerPid, signal: 0 }, + ]) + expect(killed.some((entry) => entry.pid === brokerChildPid)).toBe(false) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + it('does not reap while the heartbeat is fresh', async () => { const root = await mkdtemp(join(tmpdir(), 'factory-reaper-fresh-')) try { diff --git a/packages/factory-sdk/src/orchestrator/reaper.ts b/packages/factory-sdk/src/orchestrator/reaper.ts index b72e3f73..7e0b7de0 100644 --- a/packages/factory-sdk/src/orchestrator/reaper.ts +++ b/packages/factory-sdk/src/orchestrator/reaper.ts @@ -3,10 +3,10 @@ import { execFile } from 'node:child_process' import { promisify } from 'node:util' import { parseJsonContent } from '../writeback/shared' -import type { Clock, Logger } from '../ports' -import type { FactoryInFlightRegistry, FactoryInFlightRegistryProcess } from '../types' +import type { Clock, FleetClient, Logger } from '../ports' +import type { FactoryInFlightRegistry, FactoryInFlightRegistryAgent, FactoryInFlightRegistryProcess } from '../types' import { checkFactoryLoopLiveness, readFactoryLoopHeartbeat } from './factory' -import { readProcessIdentity, type ProcessIdentity } from './process-identity' +import { findAgentProcessByName, readProcessIdentity, type AgentProcessFinder, type ProcessIdentity } from './process-identity' const execFileAsync = promisify(execFile) @@ -19,7 +19,11 @@ export interface FactoryReaperOptions { clock?: Clock logger?: Logger kill?: (pid: number, signal?: NodeJS.Signals | 0) => boolean + readChildPids?: (pid: number) => Promise + readParentPid?: (pid: number) => Promise readProcessIdentity?: (pid: number) => Promise + processFinder?: AgentProcessFinder + fleet?: Pick } @@ -58,8 +62,8 @@ export async function terminatePids(pids: number[], opts: TerminatePidsOptions = const termGraceMs = opts.termGraceMs ?? 1_000 const terminated: TerminatePidsReport['terminated'] = [] const skipped: TerminatePidsReport['skipped'] = [] - const orderedPids = await pidTreePostOrder(pids, opts.readChildPids ?? readChildPids) const protectedPids = await protectedPidSet(opts.protectedPids ?? [], opts.readParentPid ?? readParentPid) + const orderedPids = await pidTreePostOrder(pids, opts.readChildPids ?? readChildPids, protectedPids) for (const pid of orderedPids) { if (protectedPids.has(pid)) { @@ -114,12 +118,17 @@ async function protectedPidSet( async function pidTreePostOrder( roots: number[], readChildren: (pid: number) => Promise, + protectedPids = new Set(), ): Promise { const ordered: number[] = [] const seen = new Set() const visit = async (pid: number): Promise => { if (!Number.isInteger(pid) || pid <= 0 || seen.has(pid)) return seen.add(pid) + if (protectedPids.has(pid)) { + ordered.push(pid) + return + } let children: number[] = [] try { children = await readChildren(pid) @@ -178,10 +187,11 @@ export async function reapFactoryOrphansOnce(opts: FactoryReaperOptions): Promis } const termGraceMs = opts.termGraceMs ?? 1_000 - const processes = registryProcesses(registry) const reaped: FactoryReaperReport['reaped'] = [] const skipped: FactoryReaperReport['skipped'] = [] const kill = opts.kill ?? process.kill + const protectedPids = await reaperProtectedPids(opts) + const processes = await registryProcesses(registry, opts, skipped, protectedPids) for (const processInfo of processes) { const { pid } = processInfo @@ -199,6 +209,9 @@ export async function reapFactoryOrphansOnce(opts: FactoryReaperOptions): Promis kill, sleep: opts.clock?.sleep, termGraceMs, + protectedPids, + readChildPids: opts.readChildPids, + readParentPid: opts.readParentPid, }) for (const entry of report.terminated) { reaped.push(entry) @@ -254,7 +267,21 @@ export class FactoryReaper { } } -const registryProcesses = (registry: FactoryInFlightRegistry): FactoryInFlightRegistryProcess[] => { +async function reaperProtectedPids(opts: FactoryReaperOptions): Promise { + try { + return await opts.fleet?.protectedPids?.() ?? [] + } catch (error) { + opts.logger?.warn?.('[factory-reaper] failed to resolve protected fleet pids', error) + return [] + } +} + +async function registryProcesses( + registry: FactoryInFlightRegistry, + opts: FactoryReaperOptions, + skipped: FactoryReaperReport['skipped'], + protectedPids: number[], +): Promise { const processes = new Map() for (const agent of registry.agents ?? []) { for (const processInfo of agent.processes ?? []) { @@ -262,10 +289,81 @@ const registryProcesses = (registry: FactoryInFlightRegistry): FactoryInFlightRe processes.set(processInfo.pid, processInfo) } } + if (!agentHasPersistedPid(agent)) { + const resolved = await resolveRegistryAgentProcess(agent, opts, skipped, protectedPids) + if (resolved) { + processes.set(resolved.pid, resolved) + } + } } return [...processes.values()].sort((a, b) => a.pid - b.pid) } +const agentHasPersistedPid = (agent: FactoryInFlightRegistryAgent): boolean => + [...(agent.processes ?? []), ...(agent.pids ?? []).map((pid) => ({ pid }))] + .some((entry) => Number.isInteger(entry.pid) && entry.pid > 0) + +async function resolveRegistryAgentProcess( + agent: FactoryInFlightRegistryAgent, + opts: FactoryReaperOptions, + skipped: FactoryReaperReport['skipped'], + protectedPids: number[], +): Promise { + const scanned = await scanRegistryAgentProcess(agent, opts, protectedPids) + if (scanned.status === 'found') { + return scanned.process + } + if (scanned.status === 'ambiguous') { + skipped.push({ reason: `pid ambiguous for ${agent.name}` }) + return undefined + } + if (scanned.status === 'identity-mismatch') { + skipped.push({ pid: scanned.pid, reason: 'pid identity mismatch' }) + return undefined + } + + const resolution = await opts.fleet?.resolveAgentPid?.(agent.name) + if (!resolution) { + skipped.push({ reason: `pid missing for ${agent.name}` }) + return undefined + } + if (resolution.status !== 'found') { + skipped.push({ reason: resolution.status === 'unresolved' ? `pid missing for ${agent.name}` : `pid ${resolution.status} for ${agent.name}` }) + return undefined + } + const identity = await (opts.readProcessIdentity ?? readProcessIdentity)(resolution.pid) + if (!identity || !identity.cmdline.includes(agent.name)) { + skipped.push({ pid: resolution.pid, reason: 'pid identity mismatch' }) + return undefined + } + return { ...identity, agentName: agent.name } +} + +type RegistryAgentScan = + | { status: 'found'; process: FactoryInFlightRegistryProcess } + | { status: 'missing' } + | { status: 'ambiguous' } + | { status: 'identity-mismatch'; pid: number } + +async function scanRegistryAgentProcess( + agent: FactoryInFlightRegistryAgent, + opts: FactoryReaperOptions, + protectedPids: number[], +): Promise { + const lookup = await (opts.processFinder ?? ((agentName) => findAgentProcessByName(agentName, { + readProcessIdentity: opts.readProcessIdentity, + readParentPid: opts.readParentPid, + protectedPids, + })))(agent.name, { protectedPids }) + if (lookup.status === 'found') { + if (!lookup.identity.cmdline.includes(agent.name)) { + return { status: 'identity-mismatch', pid: lookup.identity.pid } + } + return { status: 'found', process: { ...lookup.identity, agentName: agent.name } } + } + return { status: lookup.status } +} + const isPidLive = (pid: number, kill: (pid: number, signal?: NodeJS.Signals | 0) => boolean): boolean => { try { kill(pid, 0) diff --git a/packages/factory-sdk/src/ports/fleet.ts b/packages/factory-sdk/src/ports/fleet.ts index a81d3cd7..cc18770c 100644 --- a/packages/factory-sdk/src/ports/fleet.ts +++ b/packages/factory-sdk/src/ports/fleet.ts @@ -26,6 +26,11 @@ export interface RosterEntry { nodes: Array<{ name: string; capabilities: Capability[]; live: boolean }> } +export type AgentPidResolution = + | { status: 'found'; pid: number } + | { status: 'missing' } + | { status: 'unresolved' } + export type SendInput = { to: string; text: string; from?: string; data?: Record } export interface FleetClient { @@ -33,7 +38,7 @@ export interface FleetClient { resume(input: { name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }): Promise release(name: string, reason?: string): Promise roster(): Promise - resolveAgentPid?(name: string): Promise + resolveAgentPid?(name: string): Promise protectedPids?(): Promise sendMessage(input: SendInput): Promise waitForInjected?(input: SendInput, opts?: { timeoutMs?: number }): Promise<{ eventId: string; targets: string[] }> diff --git a/packages/factory-sdk/src/ports/index.ts b/packages/factory-sdk/src/ports/index.ts index aa721051..d2072a97 100644 --- a/packages/factory-sdk/src/ports/index.ts +++ b/packages/factory-sdk/src/ports/index.ts @@ -7,6 +7,7 @@ export type { Subscription, } from './mount' export type { + AgentPidResolution, AgentSpec, Capability, FleetClient, diff --git a/packages/factory-sdk/src/types.ts b/packages/factory-sdk/src/types.ts index 19e54752..08e20af5 100644 --- a/packages/factory-sdk/src/types.ts +++ b/packages/factory-sdk/src/types.ts @@ -3,7 +3,7 @@ import type { AgentSpec, FleetClient, GithubRead, LinearWriteback, MountClient, import type { Clock, Logger } from './ports/system' import type { CloseProbePrInput, CloseProbePrResult } from './github/probe-closer' import type { GithubMergeGate } from './github/merge-gate' -import type { ProcessIdentity } from './orchestrator/process-identity' +import type { AgentProcessFinder, ProcessIdentity } from './orchestrator/process-identity' export interface FactoryPorts { mount: MountClient @@ -18,6 +18,7 @@ export interface FactoryPorts { logger?: Logger clock?: Clock processIdentityReader?: (pid: number) => Promise + processFinder?: AgentProcessFinder kill?: (pid: number, signal?: NodeJS.Signals | 0) => boolean readChildPids?: (pid: number) => Promise terminationGraceMs?: number