Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 279 additions & 1 deletion packages/factory-sdk/src/orchestrator/factory.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, it, vi } from 'vitest'
import { mkdtemp, readFile, rm } from 'node:fs/promises'
import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import type { BrokerEvent, SendMessageInput, SpawnPtyInput } from '@agent-relay/harness-driver'
Expand All @@ -11,6 +11,7 @@ import {
parseLinearIssue,
readFactoryInFlightRegistry,
readFactoryLoopHeartbeat,
reapFactoryOrphansOnce,
type FactoryConfig,
type TriageDecision,
type TriageEngine,
Expand Down Expand Up @@ -725,6 +726,283 @@ describe('FactoryLoop', () => {
}
})

it('hands spawned agents to the durable reaper registry when dispatch writeback fails', async () => {
const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-registry-'))
const heartbeatPath = join(root, 'heartbeat.json')
const registryPath = join(root, 'registry.json')
try {
const mount = new FakeMountClient({ [issuePath(72)]: issueFile(72) })
const fleet = new FakeFleetClient()
fleet.setSessionRef('ar-72-impl', 'session-ar-72-impl')
fleet.setSessionRef('ar-72-review', 'session-ar-72-review')
const linear: LinearWriteback = {
async postComment() {},
async setState() {
throw new Error('Live state changed before writeback')
},
async createIssue() {
throw new Error('not used')
},
async verify() {
return true
},
}
const factory = createFactory(config({
loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 },
}), {
mount,
fleet,
triage: new StaticTriage(),
linear,
processFinder: async () => ({ status: 'missing' }),
processIdentityReader: async () => undefined,
})

await expect(factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(72), issueFile(72)))))
.rejects.toThrow('Live state changed before writeback')

expect(factory.status().inFlight).toEqual([])
expect(factory.status().counters.dispatchFailureReaperHandoffs).toBe(1)
const registry = await readFactoryInFlightRegistry(registryPath)
expect(registry?.agents).toMatchObject([
{
name: 'ar-72-impl',
role: 'implementer',
sessionRef: 'session-ar-72-impl',
issue: { key: 'AR-72', uuid: 'uuid-72', path: issuePath(72) },
pids: [],
processes: [],
},
{
name: 'ar-72-review',
role: 'reviewer',
sessionRef: 'session-ar-72-review',
issue: { key: 'AR-72', uuid: 'uuid-72', path: issuePath(72) },
pids: [],
processes: [],
},
])
} finally {
await rm(root, { recursive: true, force: true })
}
})

it('preserves dispatch-failure handoffs through abandon and stop-time registry rewrites', async () => {
const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-loop-registry-'))
const heartbeatPath = join(root, 'heartbeat.json')
const registryPath = join(root, 'registry.json')
try {
const mount = new FakeMountClient({ [issuePath(76)]: issueFile(76) })
const fleet = new FakeFleetClient()
fleet.setSessionRef('ar-76-impl', 'session-ar-76-impl')
fleet.setSessionRef('ar-76-review', 'session-ar-76-review')
const linear: LinearWriteback = {
async postComment() {},
async setState() {
throw new Error('setState 404')
},
async createIssue() {
throw new Error('not used')
},
async verify() {
return true
},
}
const factory = createFactory(config({
loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 },
}), {
mount,
fleet,
triage: new StaticTriage(),
linear,
processFinder: async () => ({ status: 'missing' }),
processIdentityReader: async () => undefined,
})

await expect(factory.runLoop()).rejects.toThrow('setState 404')

expect(factory.status().inFlight).toEqual([])
const heartbeat = await readFactoryLoopHeartbeat(heartbeatPath)
expect(heartbeat).toMatchObject({ status: 'stopping', registryPath })
const registry = await readFactoryInFlightRegistry(registryPath)
expect(registry?.agents).toMatchObject([
{ name: 'ar-76-impl', sessionRef: 'session-ar-76-impl', pids: [], processes: [] },
{ name: 'ar-76-review', sessionRef: 'session-ar-76-review', pids: [], processes: [] },
])
} finally {
await rm(root, { recursive: true, force: true })
}
})

it('reaper consumes dispatch-failure handoff by resolving name-only agents without touching protected pids', async () => {
const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-reap-'))
const heartbeatPath = join(root, 'heartbeat.json')
const registryPath = join(root, 'registry.json')
try {
const mount = new FakeMountClient({ [issuePath(73)]: issueFile(73) })
const fleet = new FakeFleetClient()
fleet.setSessionRef('ar-73-impl', 'session-ar-73-impl')
fleet.setSessionRef('ar-73-review', 'session-ar-73-review')
const linear: LinearWriteback = {
async postComment() {},
async setState() {
throw new Error('setState 404')
},
async createIssue() {
throw new Error('not used')
},
async verify() {
return true
},
}
const factory = createFactory(config({
loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 1_000 },
}), {
mount,
fleet,
triage: new StaticTriage(),
linear,
processFinder: async () => ({ status: 'missing' }),
processIdentityReader: async () => undefined,
})

await expect(factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(73), issueFile(73)))))
.rejects.toThrow('setState 404')
await writeFile(heartbeatPath, JSON.stringify({
pid: process.pid,
status: 'running',
iteration: 1,
maxIterations: 1,
updatedAt: new Date(1_000).toISOString(),
updatedAtMs: 1_000,
registryPath,
}), 'utf8')

const killed: Array<{ pid: number; signal?: NodeJS.Signals | 0 }> = []
const alive = new Set([7_301, 7_302, 7_303, 68_009])
const report = await reapFactoryOrphansOnce({
heartbeatPath,
registryPath,
staleMs: 1_000,
nowMs: 3_500,
termGraceMs: 0,
fleet: {
protectedPids: async () => [68_009],
resolveAgentPid: async (name) => {
if (name === 'ar-73-impl') return { status: 'found', pid: 7_301 }
if (name === 'ar-73-review') return { status: 'found', pid: 68_009 }
return { status: 'unresolved' }
},
},
processFinder: async () => ({ status: 'missing' }),
readProcessIdentity: async (pid) => {
if (pid === 7_301) return { pid, startTime: 'start-7301', cmdline: 'node --agent-name ar-73-impl launcher' }
if (pid === 68_009) return { pid, startTime: 'broker-start', cmdline: 'node --agent-name ar-73-review broker' }
return undefined
},
readParentPid: async () => undefined,
readChildPids: async (pid) => pid === 7_301 ? [7_302, 7_303] : [],
kill: (pid, signal) => {
killed.push({ pid, signal })
if (!alive.has(pid)) throw Object.assign(new Error('not running'), { code: 'ESRCH' })
if (signal === 'SIGKILL') alive.delete(pid)
return true
},
})

expect(report.reaped.map((entry) => entry.pid)).toEqual([7_302, 7_303, 7_301])
expect(killed.some((entry) => entry.pid === 68_009 && entry.signal !== 0)).toBe(false)
expect(report.skipped).toContainEqual({ pid: 68_009, reason: 'protected pid' })
} finally {
Comment on lines +913 to +916

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Add an explicit zero-survivor assertion for the V1 reaper scenario.

This test verifies kill/reap reporting but doesn’t assert final process liveness. Add a post-condition that only protected broker PID 68_009 remains alive.

🔧 Suggested fix
       expect(report.reaped.map((entry) => entry.pid)).toEqual([7_302, 7_303, 7_301])
       expect(killed.some((entry) => entry.pid === 68_009 && entry.signal !== 0)).toBe(false)
       expect(report.skipped).toContainEqual({ pid: 68_009, reason: 'protected pid' })
+      expect(alive).toEqual(new Set([68_009]))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
expect(report.reaped.map((entry) => entry.pid)).toEqual([7_302, 7_303, 7_301])
expect(killed.some((entry) => entry.pid === 68_009 && entry.signal !== 0)).toBe(false)
expect(report.skipped).toContainEqual({ pid: 68_009, reason: 'protected pid' })
} finally {
expect(report.reaped.map((entry) => entry.pid)).toEqual([7_302, 7_303, 7_301])
expect(killed.some((entry) => entry.pid === 68_009 && entry.signal !== 0)).toBe(false)
expect(report.skipped).toContainEqual({ pid: 68_009, reason: 'protected pid' })
expect(alive).toEqual(new Set([68_009]))
} finally {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/factory-sdk/src/orchestrator/factory.test.ts` around lines 966 -
969, Add a final explicit assertion after the existing expectations in the V1
reaper test to verify process liveness: assert that no killed entry has a
nonzero signal except the protected broker PID 68009 and that the set of alive
PIDs equals exactly [68009]; specifically, add checks against the killed array
(e.g., ensure killed.every(e => e.pid === 68009 || e.signal === 0)) and assert
report.alive or an equivalent liveness source contains only 68009, referencing
the existing symbols report, killed, and the protected pid 68009.

await rm(root, { recursive: true, force: true })
}
})

it('dispatch failure before spawn does not create an orphan handoff', async () => {
const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-before-spawn-'))
const heartbeatPath = join(root, 'heartbeat.json')
const registryPath = join(root, 'registry.json')
try {
const mount = new FakeMountClient({ [issuePath(74)]: issueFile(74) })
const fleet = new SpawnFailingFleetClient()
const factory = createFactory(config({
loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 },
}), { mount, fleet, triage: new StaticTriage() })

await expect(factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(74), issueFile(74)))))
.rejects.toThrow('Dispatch spawn failed for AR-74/ar-74-impl')

expect(factory.status().inFlight).toEqual([])
expect(factory.status().counters.dispatchFailureReaperHandoffs).toBeUndefined()
expect(await readFactoryInFlightRegistry(registryPath)).toBeUndefined()
} finally {
await rm(root, { recursive: true, force: true })
}
})

it('reaper reports unresolved dispatch-failure handoff pids instead of treating them as success', async () => {
const root = await mkdtemp(join(tmpdir(), 'factory-dispatch-failure-unresolved-'))
const heartbeatPath = join(root, 'heartbeat.json')
const registryPath = join(root, 'registry.json')
try {
const mount = new FakeMountClient({ [issuePath(75)]: issueFile(75) })
const fleet = new FakeFleetClient()
fleet.setSessionRef('ar-75-impl', 'session-ar-75-impl')
fleet.setSessionRef('ar-75-review', 'session-ar-75-review')
const linear: LinearWriteback = {
async postComment() {},
async setState() {
throw new Error('setState 404')
},
async createIssue() {
throw new Error('not used')
},
async verify() {
return true
},
}
const factory = createFactory(config({
loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 1_000 },
}), {
mount,
fleet,
triage: new StaticTriage(),
linear,
processFinder: async () => ({ status: 'missing' }),
processIdentityReader: async () => undefined,
})

await expect(factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(75), issueFile(75)))))
.rejects.toThrow('setState 404')
await writeFile(heartbeatPath, JSON.stringify({
pid: process.pid,
status: 'running',
iteration: 1,
maxIterations: 1,
updatedAt: new Date(1_000).toISOString(),
updatedAtMs: 1_000,
registryPath,
}), 'utf8')

const report = await reapFactoryOrphansOnce({
heartbeatPath,
registryPath,
staleMs: 1_000,
nowMs: 3_500,
fleet: { resolveAgentPid: async () => ({ status: 'unresolved' }) },
processFinder: async () => ({ status: 'missing' }),
})

expect(report.reaped).toEqual([])
expect(report.skipped).toEqual([
{ reason: 'pid missing for ar-75-impl' },
{ reason: 'pid missing for ar-75-review' },
])
} finally {
await rm(root, { recursive: true, force: true })
}
})

it('reports missing or stale loop heartbeat as not live', () => {
expect(checkFactoryLoopLiveness(undefined, { nowMs: 2_000, staleMs: 1_000 })).toMatchObject({
ok: false,
Expand Down
Loading
Loading