From 85e165ad563c419f56d9e668c047d4e470d502c2 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Thu, 11 Jun 2026 21:44:59 +0200 Subject: [PATCH 1/3] Add factory orchestrator and live merge gate --- .../src/fleet/internal-fleet-client.test.ts | 19 + .../src/fleet/internal-fleet-client.ts | 5 +- .../src/fleet/relay-fleet-client.ts | 4 +- packages/factory-sdk/src/github/index.ts | 12 + .../factory-sdk/src/github/merge-gate.test.ts | 87 +++ packages/factory-sdk/src/github/merge-gate.ts | 168 +++++ packages/factory-sdk/src/index.ts | 53 +- .../src/orchestrator/batch-tracker.ts | 162 +++++ .../src/orchestrator/factory.test.ts | 228 +++++++ .../factory-sdk/src/orchestrator/factory.ts | 591 ++++++++++++++++++ .../factory-sdk/src/orchestrator/index.ts | 3 + packages/factory-sdk/src/ports/fleet.ts | 2 +- packages/factory-sdk/src/testing/fakes.ts | 5 +- packages/factory-sdk/src/types.ts | 10 +- packages/factory-sdk/src/writeback/github.ts | 68 +- .../src/writeback/writeback.test.ts | 17 +- 16 files changed, 1339 insertions(+), 95 deletions(-) create mode 100644 packages/factory-sdk/src/github/index.ts create mode 100644 packages/factory-sdk/src/github/merge-gate.test.ts create mode 100644 packages/factory-sdk/src/github/merge-gate.ts create mode 100644 packages/factory-sdk/src/orchestrator/batch-tracker.ts create mode 100644 packages/factory-sdk/src/orchestrator/factory.test.ts create mode 100644 packages/factory-sdk/src/orchestrator/factory.ts create mode 100644 packages/factory-sdk/src/orchestrator/index.ts diff --git a/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts b/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts index 4d76ebee..9b501ea5 100644 --- a/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts +++ b/packages/factory-sdk/src/fleet/internal-fleet-client.test.ts @@ -144,6 +144,25 @@ describe('InternalFleetClient', () => { }) }) + it('resumes with the per-agent capability when provided', async () => { + const harness = new FakeHarnessDriverClient() + const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' }) + + await fleet.resume({ + name: 'ar-1-review', + sessionRef: 'review-session', + node: 'self', + capability: 'spawn:claude', + }) + + expect(harness.spawned[0]).toMatchObject({ + name: 'ar-1-review', + cli: 'claude', + cwd: '/worktree', + continueFrom: 'review-session', + }) + }) + it('rejects non-self placement for the internal single-node backend', async () => { const fleet = new InternalFleetClient({ client: new FakeHarnessDriverClient() }) diff --git a/packages/factory-sdk/src/fleet/internal-fleet-client.ts b/packages/factory-sdk/src/fleet/internal-fleet-client.ts index a95aba52..72160265 100644 --- a/packages/factory-sdk/src/fleet/internal-fleet-client.ts +++ b/packages/factory-sdk/src/fleet/internal-fleet-client.ts @@ -90,12 +90,13 @@ export class InternalFleetClient implements FleetClient { return { name: handle.name, sessionRef: sessionRefFrom(handle) } } - async resume(input: { name?: string; sessionRef: string; node?: 'self' | string }): Promise { + async resume(input: { name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }): Promise { assertSelfNode(input.node) const handle = await this.#client.spawnPty({ name: input.name ?? input.sessionRef, - cli: capabilityCli[this.#resumeCapability], + // followups [fleet→W6]: W6 owns resume-vs-respawn and passes the per-agent capability. + cli: capabilityCli[input.capability ?? this.#resumeCapability], cwd: this.#cwd, continueFrom: input.sessionRef, }) diff --git a/packages/factory-sdk/src/fleet/relay-fleet-client.ts b/packages/factory-sdk/src/fleet/relay-fleet-client.ts index 9da1713d..5a01c1bb 100644 --- a/packages/factory-sdk/src/fleet/relay-fleet-client.ts +++ b/packages/factory-sdk/src/fleet/relay-fleet-client.ts @@ -1,4 +1,4 @@ -import type { FleetClient, RosterEntry, SendInput, SpawnInput, SpawnResult } from '../ports' +import type { Capability, FleetClient, RosterEntry, SendInput, SpawnInput, SpawnResult } from '../ports' const notImplemented = () => new Error('RelayFleetClient not implemented — see relay#1056') @@ -8,7 +8,7 @@ export class RelayFleetClient implements FleetClient { throw notImplemented() } - async resume(_input: { name?: string; sessionRef: string; node?: 'self' | string }): Promise { + async resume(_input: { name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }): Promise { throw notImplemented() } diff --git a/packages/factory-sdk/src/github/index.ts b/packages/factory-sdk/src/github/index.ts new file mode 100644 index 00000000..c8eba1d8 --- /dev/null +++ b/packages/factory-sdk/src/github/index.ts @@ -0,0 +1,12 @@ +export { + GhCliGithubMergeGate, + GithubMergeGate, + evaluateGithubMergeGate, +} from './merge-gate' +export type { + GhRunner, + GhRunResult, + GithubMergeGateInput, + GithubMergeGateVerdict, + GithubMergeGate as GithubMergeGatePort, +} from './merge-gate' diff --git a/packages/factory-sdk/src/github/merge-gate.test.ts b/packages/factory-sdk/src/github/merge-gate.test.ts new file mode 100644 index 00000000..5a5c7f81 --- /dev/null +++ b/packages/factory-sdk/src/github/merge-gate.test.ts @@ -0,0 +1,87 @@ +import { describe, expect, it } from 'vitest' + +import { GhCliGithubMergeGate, evaluateGithubMergeGate, type GhRunner } from './merge-gate' + +const input = { + repo: 'AgentWorkforce/pear', + number: 123, + expectedHeadSha: 'abc123', +} + +const live = (overrides: Record = {}) => ({ + mergeable: 'MERGEABLE', + mergeStateStatus: 'CLEAN', + headRefOid: 'abc123', + statusCheckRollup: [ + { name: 'test', conclusion: 'SUCCESS' }, + ], + ...overrides, +}) + +describe('GithubMergeGate', () => { + it('returns READY only for MERGEABLE+CLEAN, matching head, and all checks SUCCESS', async () => { + const gate = new GhCliGithubMergeGate(async () => ({ stdout: JSON.stringify(live()) })) + + await expect(gate.check(input)).resolves.toMatchObject({ + verdict: 'READY', + ready: true, + }) + }) + + it('refuses stale mount-clean snapshots when live GitHub contradicts readiness', () => { + const staleMountSnapshot = { + mergeable: 'MERGEABLE', + mergeStateStatus: 'CLEAN', + headRefOid: 'abc123', + statusCheckRollup: [{ conclusion: 'SUCCESS' }], + } + void staleMountSnapshot + + expect(evaluateGithubMergeGate(input, live({ + mergeable: 'CONFLICTING', + mergeStateStatus: 'UNSTABLE', + headRefOid: 'def456', + statusCheckRollup: [{ conclusion: 'FAILURE' }], + }))).toMatchObject({ + verdict: 'REFUSE', + ready: false, + }) + }) + + it('fails closed when gh returns UNKNOWN, errors, or partial output', async () => { + const unknown = new GhCliGithubMergeGate(async () => ({ + stdout: JSON.stringify(live({ mergeable: 'UNKNOWN', mergeStateStatus: 'UNKNOWN' })), + })) + await expect(unknown.check(input)).resolves.toMatchObject({ + verdict: 'REFUSE', + ready: false, + }) + + const errorRunner: GhRunner = async () => { + throw new Error('gh timed out') + } + await expect(new GhCliGithubMergeGate(errorRunner).check(input)).resolves.toMatchObject({ + verdict: 'REFUSE', + ready: false, + }) + + const partial = new GhCliGithubMergeGate(async () => ({ + stdout: JSON.stringify({ mergeable: 'MERGEABLE', mergeStateStatus: 'CLEAN' }), + })) + await expect(partial.check(input)).resolves.toMatchObject({ + verdict: 'REFUSE', + ready: false, + }) + }) + + it('refuses missing or non-success status checks', () => { + expect(evaluateGithubMergeGate(input, live({ statusCheckRollup: [] }))).toMatchObject({ + verdict: 'REFUSE', + ready: false, + }) + expect(evaluateGithubMergeGate(input, live({ statusCheckRollup: [{ conclusion: 'FAILURE' }] }))).toMatchObject({ + verdict: 'REFUSE', + ready: false, + }) + }) +}) diff --git a/packages/factory-sdk/src/github/merge-gate.ts b/packages/factory-sdk/src/github/merge-gate.ts new file mode 100644 index 00000000..f689fe40 --- /dev/null +++ b/packages/factory-sdk/src/github/merge-gate.ts @@ -0,0 +1,168 @@ +import { execFile } from 'node:child_process' +import { promisify } from 'node:util' + +const execFileAsync = promisify(execFile) + +export interface GhRunResult { + stdout: string + stderr?: string +} + +export type GhRunner = (args: string[]) => Promise + +export interface GithubMergeGateInput { + repo: string + number: number + expectedHeadSha: string +} + +export interface GithubMergeGateVerdict { + verdict: 'READY' | 'REFUSE' + ready: boolean + reason: string + live: { + mergeable?: string + mergeStateStatus?: string + headRefOid?: string + checkStates: string[] + } +} + +export interface GithubMergeGate { + check(input: GithubMergeGateInput): Promise +} + +export class GhCliGithubMergeGate implements GithubMergeGate { + readonly #run: GhRunner + + constructor(run: GhRunner = defaultGhRunner) { + this.#run = run + } + + async check(input: GithubMergeGateInput): Promise { + try { + const result = await this.#run([ + 'pr', + 'view', + String(input.number), + '--repo', + input.repo, + '--json', + 'mergeable,mergeStateStatus,statusCheckRollup,headRefOid', + ]) + if (result.stdout.trim().length === 0) { + return refuse('gh returned empty output', { checkStates: [] }) + } + + return evaluateGithubMergeGate(input, parseGhJson(result.stdout)) + } catch (error) { + return refuse(`gh merge gate failed: ${error instanceof Error ? error.message : String(error)}`, { + checkStates: [], + }) + } + } +} + +export const GithubMergeGate = GhCliGithubMergeGate + +export function evaluateGithubMergeGate( + input: GithubMergeGateInput, + live: unknown, +): GithubMergeGateVerdict { + const record = asRecord(live) + const mergeable = stringValue(record.mergeable) + const mergeStateStatus = stringValue(record.mergeStateStatus) + const headRefOid = stringValue(record.headRefOid) + const statusCheckRollup = Array.isArray(record.statusCheckRollup) ? record.statusCheckRollup : undefined + const checkStates = statusCheckRollup ? checkStatesFromRollup(statusCheckRollup) : [] + + if (!mergeable || !mergeStateStatus || !headRefOid || !statusCheckRollup) { + return refuse('missing required live GitHub merge fields', { + mergeable, + mergeStateStatus, + headRefOid, + checkStates, + }) + } + + if (mergeable === 'UNKNOWN' || mergeStateStatus === 'UNKNOWN') { + return refuse('GitHub mergeability is still unknown', { mergeable, mergeStateStatus, headRefOid, checkStates }) + } + + if (headRefOid !== input.expectedHeadSha) { + return refuse(`head moved: expected ${input.expectedHeadSha}, live ${headRefOid ?? 'unknown'}`, { + mergeable, + mergeStateStatus, + headRefOid, + checkStates, + }) + } + + if (mergeable !== 'MERGEABLE') { + return refuse(`mergeable is ${mergeable ?? 'unknown'}`, { mergeable, mergeStateStatus, headRefOid, checkStates }) + } + + if (mergeStateStatus !== 'CLEAN') { + return refuse(`merge state is ${mergeStateStatus ?? 'unknown'}`, { mergeable, mergeStateStatus, headRefOid, checkStates }) + } + + if (checkStates.length === 0) { + return refuse('no successful status checks observed', { mergeable, mergeStateStatus, headRefOid, checkStates }) + } + + const failing = checkStates.filter((state) => state !== 'SUCCESS') + if (failing.length > 0) { + return refuse(`checks not successful: ${failing.join(', ')}`, { mergeable, mergeStateStatus, headRefOid, checkStates }) + } + + return { + verdict: 'READY', + ready: true, + reason: 'MERGEABLE+CLEAN with matching head and successful checks', + live: { mergeable, mergeStateStatus, headRefOid, checkStates }, + } +} + +const defaultGhRunner: GhRunner = async (args) => { + const { stdout, stderr } = await execFileAsync('gh', args, { maxBuffer: 1024 * 1024 }) + return { stdout, stderr } +} + +const parseGhJson = (stdout: string): unknown => JSON.parse(stdout) + +const refuse = (reason: string, live: GithubMergeGateVerdict['live']): GithubMergeGateVerdict => ({ + verdict: 'REFUSE', + ready: false, + reason, + live, +}) + +const checkStatesFromRollup = (value: unknown): string[] => { + if (!Array.isArray(value)) { + return [] + } + + return value.map((entry) => { + const record = asRecord(entry) + const conclusion = stringValue(record.conclusion) + if (conclusion) { + return conclusion + } + + const state = stringValue(record.state) + if (state) { + return state + } + + const status = stringValue(record.status) + return status === 'COMPLETED' ? 'SUCCESS' : status ?? 'UNKNOWN' + }) +} + +const asRecord = (value: unknown): Record => + value !== null && typeof value === 'object' && !Array.isArray(value) + ? value as Record + : {} + +const stringValue = (value: unknown): string | undefined => + typeof value === 'string' ? value : undefined diff --git a/packages/factory-sdk/src/index.ts b/packages/factory-sdk/src/index.ts index c26deb0c..fe6a5474 100644 --- a/packages/factory-sdk/src/index.ts +++ b/packages/factory-sdk/src/index.ts @@ -1,43 +1,4 @@ -import type { Factory, FactoryPorts } from './types' import type { FactoryConfig } from './config/schema' -import { MountGithubRead, MountLinearWriteback, MountSlackWriteback } from './writeback' - -const notImplemented = (method: string) => new Error(`Factory.${method} not implemented`) - -export function createFactory(config: FactoryConfig, ports: FactoryPorts): Factory { - const resolvedPorts: FactoryPorts = { - ...ports, - linear: ports.linear ?? MountLinearWriteback(ports.mount, config.stateIds), - slack: ports.slack ?? MountSlackWriteback(ports.mount, config.slack), - github: ports.github ?? MountGithubRead(ports.mount), - } - - void resolvedPorts - - return { - async start(): Promise { - throw notImplemented('start') - }, - async stop(): Promise { - throw notImplemented('stop') - }, - async runOnce(): Promise { - throw notImplemented('runOnce') - }, - async triageIssue(): Promise { - throw notImplemented('triageIssue') - }, - async dispatch(): Promise { - throw notImplemented('dispatch') - }, - status() { - return { inFlight: [], queued: [], counters: {} } - }, - on() { - return () => {} - }, - } -} export type { FactoryConfig } from './config/schema' export { FactoryConfigSchema } from './config/schema' @@ -73,6 +34,20 @@ export type { InternalFleetClientOptions, } from './fleet/internal-fleet-client' export { RelayFleetClient } from './fleet/relay-fleet-client' +export { + GhCliGithubMergeGate, + GithubMergeGate, + evaluateGithubMergeGate, +} from './github' +export type { + GhRunner, + GhRunResult, + GithubMergeGateInput, + GithubMergeGatePort, + GithubMergeGateVerdict, +} from './github' +export { BatchTracker, createFactory, FactoryLoop, issueKey, parseLinearIssue } from './orchestrator' +export type { InFlightIssue, QueuedIssue, TrackedAgent } from './orchestrator' export { HeuristicTriage, LlmTriage, diff --git a/packages/factory-sdk/src/orchestrator/batch-tracker.ts b/packages/factory-sdk/src/orchestrator/batch-tracker.ts new file mode 100644 index 00000000..55f1aead --- /dev/null +++ b/packages/factory-sdk/src/orchestrator/batch-tracker.ts @@ -0,0 +1,162 @@ +import type { AgentSpec, SpawnResult } from '../ports' +import type { DispatchResult, IssueRef, TriageDecision } from '../types' + +export interface TrackedAgent { + spec: AgentSpec + result?: SpawnResult + sessionRef?: string +} + +export interface InFlightIssue { + issue: IssueRef + decision: TriageDecision + dryRun: boolean + agents: Map + invocationIds: Set + result?: DispatchResult +} + +export interface QueuedIssue { + issue: IssueRef + decision: TriageDecision + dryRun: boolean +} + +const stableHash = (input: string): string => { + let hash = 0x811c9dc5 + for (let index = 0; index < input.length; index += 1) { + hash ^= input.charCodeAt(index) + hash = Math.imul(hash, 0x01000193) + } + + return (hash >>> 0).toString(36) +} + +export class BatchTracker { + readonly #limit: number + readonly #inFlight = new Map() + readonly #queued = new Map() + readonly #invocationIds = new Set() + + constructor(batchSize: number) { + this.#limit = Math.max(1, Math.min(5, Math.trunc(batchSize))) + } + + get size(): number { + return this.#inFlight.size + } + + get inFlight(): InFlightIssue[] { + return [...this.#inFlight.values()] + } + + get queued(): QueuedIssue[] { + return [...this.#queued.values()] + } + + getIssue(issue: IssueRef): InFlightIssue | undefined { + return this.#inFlight.get(issueKey(issue)) + } + + getIssueByAgent(name: string): InFlightIssue | undefined { + return this.inFlight.find((record) => record.agents.has(name)) + } + + isInFlight(issue: IssueRef): boolean { + return this.#inFlight.has(issueKey(issue)) + } + + isQueued(issue: IssueRef): boolean { + return this.#queued.has(issueKey(issue)) + } + + canStart(): boolean { + return this.#inFlight.size < this.#limit + } + + start(decision: TriageDecision, dryRun: boolean): InFlightIssue | undefined { + const key = issueKey(decision.issue) + const existing = this.#inFlight.get(key) + if (existing) { + return existing + } + + if (!this.canStart()) { + this.queue(decision, dryRun) + return undefined + } + + const record: InFlightIssue = { + issue: decision.issue, + decision, + dryRun, + agents: new Map(), + invocationIds: new Set(), + } + this.#inFlight.set(key, record) + this.#queued.delete(key) + return record + } + + queue(decision: TriageDecision, dryRun: boolean): boolean { + const key = issueKey(decision.issue) + if (this.#inFlight.has(key) || this.#queued.has(key)) { + return false + } + + this.#queued.set(key, { issue: decision.issue, decision, dryRun }) + return true + } + + complete(issue: IssueRef): QueuedIssue | undefined { + const key = issueKey(issue) + const record = this.#inFlight.get(key) + if (record) { + for (const invocationId of record.invocationIds) { + this.#invocationIds.delete(invocationId) + } + } + this.#inFlight.delete(key) + + if (!this.canStart()) { + return undefined + } + + const next = this.#queued.values().next().value as QueuedIssue | undefined + if (next) { + this.#queued.delete(issueKey(next.issue)) + } + + return next + } + + invocationIdFor(issue: IssueRef, spec: AgentSpec): string { + return spec.invocationId ?? `factory:${issue.key}:${stableHash(`${issue.uuid}:${spec.role}:${spec.name}:${spec.repo}`)}` + } + + shouldSpawn(record: InFlightIssue, invocationId: string): boolean { + return !record.invocationIds.has(invocationId) && !this.#invocationIds.has(invocationId) + } + + recordSpawn(record: InFlightIssue, spec: AgentSpec, invocationId: string, result: SpawnResult): void { + record.invocationIds.add(invocationId) + this.#invocationIds.add(invocationId) + record.agents.set(result.name, { + spec: { ...spec, invocationId }, + result, + sessionRef: result.sessionRef ?? spec.sessionRef, + }) + } + + recordDryRun(record: InFlightIssue, spec: AgentSpec, invocationId: string): void { + record.invocationIds.add(invocationId) + this.#invocationIds.add(invocationId) + record.agents.set(spec.name, { + spec: { ...spec, invocationId }, + result: { name: spec.name, sessionRef: spec.sessionRef }, + sessionRef: spec.sessionRef, + }) + } +} + +export const issueKey = (issue: IssueRef): string => `${issue.key}:${issue.uuid}:${issue.path}` diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts new file mode 100644 index 00000000..0fc0c8a1 --- /dev/null +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -0,0 +1,228 @@ +import { describe, expect, it } from 'vitest' + +import { FactoryConfigSchema, createFactory, parseLinearIssue, type FactoryConfig, type TriageDecision, type TriageEngine } from '../index' +import { FakeFleetClient, FakeMountClient } from '../testing' +import type { LinearIssue } from '../types' + +const ready = 'b9bec744-b60c-4745-8022-d90d6ab59ae3' +const implementing = '39b9881d-1196-4c95-8b80-a20f0c7263f7' + +const config = (overrides: Partial = {}): FactoryConfig => FactoryConfigSchema.parse({ + workspaceId: 'factory-test', + repos: { + byLabel: { pear: 'AgentWorkforce/pear' }, + clonePaths: { 'AgentWorkforce/pear': '/work/pear' }, + default: 'AgentWorkforce/pear', + }, + batchSize: 2, + ...overrides, +}) + +const issuePath = (n: number) => `/linear/issues/AR-${n}__uuid-${n}.json` + +const issuePayload = (n: number, stateId = ready) => ({ + id: `uuid-${n}`, + identifier: `AR-${n}`, + title: `Fix factory issue ${n}`, + description: 'Implement the requested fix in packages/factory-sdk/src/orchestrator/factory.ts and verify it with tests.', + stateId, + labels: [{ name: 'pear' }], + project: { name: 'Factory' }, + state: { id: stateId, name: stateId === ready ? 'Ready for Agent' : 'Implementing' }, +}) + +const issueFile = (n: number, stateId = ready) => ({ + provider: 'linear', + objectType: 'issue', + objectId: `uuid-${n}`, + payload: issuePayload(n, stateId), +}) + +const flush = async () => { + await new Promise((resolve) => setTimeout(resolve, 0)) +} + +class StaticTriage implements TriageEngine { + async triage(issue: LinearIssue): Promise { + const number = issue.key.match(/\d+/)?.[0] ?? '0' + return { + issue: { uuid: issue.uuid, key: issue.key, path: issue.path }, + routes: [{ repo: 'AgentWorkforce/pear', clonePath: '/work/pear', rationale: 'test route' }], + scope: 'single', + implementers: [{ + name: `ar-${number}-impl`, + role: 'implementer', + capability: 'spawn:codex', + model: 'codex', + task: `Implement ${issue.key}`, + repo: 'AgentWorkforce/pear', + clonePath: '/work/pear', + node: 'self', + }], + reviewer: { + name: `ar-${number}-review`, + role: 'reviewer', + capability: 'spawn:claude', + model: 'claude', + task: `Review ${issue.key}`, + repo: 'AgentWorkforce/pear', + clonePath: '/work/pear', + node: 'self', + }, + thin: false, + confidence: 'high', + rationale: 'static test decision', + } + } +} + +describe('FactoryLoop', () => { + it('parses wrapped Linear issue records', () => { + expect(parseLinearIssue(issuePath(1), issueFile(1))).toMatchObject({ + uuid: 'uuid-1', + key: 'AR-1', + title: 'Fix factory issue 1', + stateId: ready, + labels: ['pear'], + project: 'Factory', + }) + }) + + it('runOnce caps active issues, skips stale state, and pulls queued work after completion', async () => { + const mount = new FakeMountClient({ + [issuePath(1)]: issueFile(1), + [issuePath(2)]: issueFile(2), + [issuePath(3)]: issueFile(3), + [issuePath(4)]: issueFile(4, implementing), + }) + const fleet = new FakeFleetClient() + fleet.setSessionRef('ar-1-impl', 'session-impl-1') + fleet.setSessionRef('ar-1-review', 'session-review-1') + fleet.setSessionRef('ar-2-impl', 'session-impl-2') + fleet.setSessionRef('ar-2-review', 'session-review-2') + fleet.setSessionRef('ar-3-impl', 'session-impl-3') + fleet.setSessionRef('ar-3-review', 'session-review-3') + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + const report = await factory.runOnce() + + expect(report.pulled.map((issue) => issue.key)).toEqual(['AR-1', 'AR-2', 'AR-3', 'AR-4']) + expect(report.dispatched.map((result) => result.issue.key)).toEqual(['AR-1', 'AR-2']) + expect(report.skipped).toContainEqual({ issue: { uuid: 'uuid-3', key: 'AR-3', path: issuePath(3) }, reason: 'queued or escalated' }) + expect(report.skipped).toContainEqual({ issue: { uuid: 'uuid-4', key: 'AR-4', path: issuePath(4) }, reason: 'live state is not ready-for-agent' }) + expect(fleet.spawns).toHaveLength(4) + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-1', 'AR-2']) + expect(factory.status().queued.map((issue) => issue.key)).toEqual(['AR-3']) + expect(mount.writes.some((write) => write.path === issuePath(1) && (write.content as { stateId?: string }).stateId === implementing)).toBe(true) + + fleet.emitAgentExit('ar-1-impl', 'issue-done') + await flush() + + expect(fleet.releases.map((release) => release.name)).toEqual(['ar-1-impl', 'ar-1-review']) + expect(fleet.spawns.map((spawn) => spawn.name)).toContain('ar-3-impl') + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-2', 'AR-3']) + expect(factory.status().queued).toEqual([]) + }) + + it('dedupes repeated dispatch by stable invocation id', async () => { + const mount = new FakeMountClient({ [issuePath(5)]: issueFile(5) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + const decision = await factory.triageIssue(parseLinearIssue(issuePath(5), issueFile(5))) + + await factory.dispatch(decision) + await factory.dispatch(decision) + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-5-impl', 'ar-5-review']) + expect(new Set(fleet.spawns.map((spawn) => spawn.invocationId)).size).toBe(2) + }) + + it('resumes exited open agents by sessionRef with the original capability', async () => { + const mount = new FakeMountClient({ [issuePath(6)]: issueFile(6) }) + const fleet = new FakeFleetClient() + fleet.setSessionRef('ar-6-review', 'session-review-6') + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + const decision = await factory.triageIssue(parseLinearIssue(issuePath(6), issueFile(6))) + + await factory.dispatch(decision) + fleet.emitAgentExit('ar-6-review', 'crash') + await flush() + + expect(fleet.resumes).toEqual([{ + name: 'ar-6-review', + sessionRef: 'session-review-6', + node: 'self', + capability: 'spawn:claude', + }]) + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-6-impl', 'ar-6-review']) + }) + + it('coalesces duplicate exit callbacks for the same open issue, agent, and sessionRef', async () => { + const mount = new FakeMountClient({ [issuePath(10)]: issueFile(10) }) + const fleet = new FakeFleetClient() + fleet.setSessionRef('ar-10-review', 'session-review-10') + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + const decision = await factory.triageIssue(parseLinearIssue(issuePath(10), issueFile(10))) + + await factory.dispatch(decision) + fleet.emitAgentExit('ar-10-review', 'exited') + fleet.emitAgentExit('ar-10-review', 'crashed') + await flush() + fleet.emitAgentExit('ar-10-review', 'code:1') + await flush() + + expect(fleet.resumes).toEqual([{ + name: 'ar-10-review', + sessionRef: 'session-review-10', + node: 'self', + capability: 'spawn:claude', + }]) + }) + + it('fresh-spawns on exit only when sessionRef is absent', async () => { + const mount = new FakeMountClient({ [issuePath(7)]: issueFile(7) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + const decision = await factory.triageIssue(parseLinearIssue(issuePath(7), issueFile(7))) + + await factory.dispatch(decision) + fleet.emitAgentExit('ar-7-impl', 'crash') + await flush() + + expect(fleet.resumes).toEqual([]) + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-7-impl', 'ar-7-review', 'ar-7-impl']) + expect(fleet.spawns.at(-1)?.invocationId).toContain(':restart:') + }) + + it('emits an escalation on delivery_failed for an in-flight agent', async () => { + const mount = new FakeMountClient({ [issuePath(8)]: issueFile(8) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + const errors: unknown[] = [] + factory.on('error', (payload) => errors.push(payload)) + await factory.start() + + const decision = await factory.triageIssue(parseLinearIssue(issuePath(8), issueFile(8))) + await factory.dispatch(decision) + fleet.emitDeliveryFailed({ to: 'ar-8-review', reason: 'dead-lettered' }) + await flush() + + expect(errors).toHaveLength(1) + expect(errors[0]).toMatchObject({ issue: { key: 'AR-8' } }) + await factory.stop() + }) + + it('emits error and rejects when writeback verification fails', async () => { + const mount = new FakeMountClient({ [issuePath(9)]: issueFile(9) }) + mount.setConfirmWrite(issuePath(9), 'failed') + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + const errors: unknown[] = [] + factory.on('error', (payload) => errors.push(payload)) + const decision = await factory.triageIssue(parseLinearIssue(issuePath(9), issueFile(9))) + + await expect(factory.dispatch(decision)).rejects.toThrow('Writeback not acked') + expect(errors).toHaveLength(1) + expect(errors[0]).toMatchObject({ issue: { key: 'AR-9' } }) + }) +}) diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts new file mode 100644 index 00000000..12871d9f --- /dev/null +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -0,0 +1,591 @@ +import { FactoryConfigSchema, type FactoryConfig } from '../config/schema' +import { linearByStatePath } from '../constants/linear' +import { GithubMergeGate, type GithubMergeGate as GithubMergeGatePort } from '../github/merge-gate' +import type { AgentSpec, FleetClient, LinearWriteback, MountClient, SlackWriteback, Subscription } from '../ports' +import type { Clock, Logger } from '../ports/system' +import { HeuristicTriage, TieredTriage } from '../triage' +import type { + DispatchResult, + Factory, + FactoryEventPayload, + FactoryPorts, + FactoryStatus, + IssueRef, + IterationReport, + LinearIssue, + TriageDecision, + TriageEngine, +} from '../types' +import { MountGithubRead, MountLinearWriteback, MountSlackWriteback } from '../writeback' +import { asRecord, parseJsonContent, wrappedPayload } from '../writeback/shared' +import { BatchTracker, type InFlightIssue, issueKey } from './batch-tracker' + +type FactoryEvent = 'issue-queued' | 'dispatched' | 'issue-done' | 'writeback-verified' | 'error' +type Listener = (payload: FactoryEventPayload) => void + +const ISSUE_ROOT = '/linear/issues' +const READY_EVENTS_LIMIT = 100 + +const realClock: Clock = { + now: () => Date.now(), + sleep: (ms) => new Promise((resolve) => setTimeout(resolve, ms)), +} + +export function createFactory(config: FactoryConfig, ports: FactoryPorts): Factory { + return new FactoryLoop(FactoryConfigSchema.parse(config), ports) +} + +export class FactoryLoop implements Factory { + readonly #config: FactoryConfig + readonly #mount: MountClient + readonly #fleet: FleetClient + readonly #triage: TriageEngine + readonly #linear: LinearWriteback + readonly #slack?: SlackWriteback + readonly #mergeGate: GithubMergeGatePort + readonly #logger: Logger + readonly #clock: Clock + readonly #batch: BatchTracker + readonly #listeners = new Map>() + readonly #counters: Record = {} + readonly #criticalMessages = new Map[0] }>() + readonly #resumeInFlight = new Map>() + readonly #resumedExitKeys = new Set() + #subscription?: Subscription + #offAgentExit?: () => void + #offDeliveryFailed?: () => void + #started = false + + constructor(config: FactoryConfig, ports: FactoryPorts) { + this.#config = config + this.#mount = ports.mount + this.#fleet = ports.fleet + this.#triage = ports.triage ?? new TieredTriage(new HeuristicTriage()) + this.#linear = ports.linear ?? MountLinearWriteback(ports.mount, config.stateIds) + this.#slack = ports.slack ?? (config.slack ? MountSlackWriteback(ports.mount, config.slack) : undefined) + void (ports.github ?? MountGithubRead(ports.mount)) + this.#mergeGate = ports.mergeGate ?? new GithubMergeGate() + this.#logger = ports.logger ?? console + this.#clock = ports.clock ?? realClock + this.#batch = new BatchTracker(config.batchSize) + this.#wireFleetEvents() + } + + async start(): Promise { + if (this.#started) { + return + } + + const ready = await this.#mount.ensureSubRoot(ISSUE_ROOT, { timeoutMs: 90_000 }) + if (ready !== 'ready') { + this.#error(new Error(`${ISSUE_ROOT} sub-root is not mounted`)) + return + } + + this.#wireFleetEvents() + + await this.#backfillReadyIssues() + this.#subscription = this.#mount.subscribe([`${ISSUE_ROOT}/**/*.json`], (event) => { + void this.#handleChange(event.resource.path) + }) + this.#started = true + } + + async stop(): Promise { + this.#started = false + await this.#subscription?.unsubscribe() + this.#subscription = undefined + this.#offAgentExit?.() + this.#offDeliveryFailed?.() + this.#offAgentExit = undefined + this.#offDeliveryFailed = undefined + } + + async runOnce(opts: { dryRun?: boolean } = {}): Promise { + const dryRun = opts.dryRun ?? this.#config.dryRun + const paths = await this.#readyIssuePaths() + const pulled: IssueRef[] = [] + const triaged: TriageDecision[] = [] + const dispatched: DispatchResult[] = [] + const skipped: IterationReport['skipped'] = [] + + for (const path of paths) { + const issue = await this.#readIssue(path) + if (!issue) { + continue + } + + pulled.push(issueRef(issue)) + if (this.#batch.isInFlight(issue) || this.#batch.isQueued(issue)) { + skipped.push({ issue: issueRef(issue), reason: 'already tracked' }) + continue + } + + if (issue.stateId !== this.#config.stateIds.readyForAgent) { + skipped.push({ issue: issueRef(issue), reason: 'live state is not ready-for-agent' }) + continue + } + + const decision = await this.triageIssue(issue) + triaged.push(decision) + const result = await this.dispatch(decision, { dryRun }) + if (result.agents.length === 0 && !dryRun) { + skipped.push({ issue: decision.issue, reason: 'queued or escalated' }) + } else { + dispatched.push(result) + } + } + + return { pulled, triaged, dispatched, skipped, dryRun } + } + + async triageIssue(issue: LinearIssue): Promise { + return this.#triage.triage(issue, { + config: this.#config, + repoMap: repoMapFromConfig(this.#config), + }) + } + + async dispatch(decision: TriageDecision, opts: { dryRun?: boolean } = {}): Promise { + const dryRun = opts.dryRun ?? this.#config.dryRun + if (decision.confidence === 'low') { + const error = new Error(`Low-confidence triage for ${decision.issue.key}; escalation required`) + this.#error(error, decision.issue) + return { issue: decision.issue, agents: [], dryRun } + } + + const record = this.#batch.start(decision, dryRun) + if (!record) { + this.#increment('queued') + this.#emit('issue-queued', { issue: decision.issue }) + return { issue: decision.issue, agents: [], dryRun } + } + + if (record.result) { + return record.result + } + + try { + const agents: DispatchResult['agents'] = [] + for (const spec of [...decision.implementers, decision.reviewer]) { + const spawned = await this.#spawnAgent(record, spec, dryRun) + agents.push({ name: spawned.name, role: spec.role }) + } + + const comment = dispatchComment(decision, agents) + if (!dryRun) { + const issue = await this.#readIssue(decision.issue.path) + if (!issue || issue.stateId !== this.#config.stateIds.readyForAgent) { + throw new Error(`Live state changed before writeback for ${decision.issue.key}`) + } + await this.#linear.postComment(issue, comment) + await this.#linear.setState(issue, this.#config.stateIds.agentImplementing) + this.#emit('writeback-verified', { issue: decision.issue, path: issue.path }) + } + + const result = { + issue: decision.issue, + agents, + comments: [comment], + stateId: dryRun ? undefined : this.#config.stateIds.agentImplementing, + dryRun, + } + record.result = result + this.#increment('dispatched') + this.#emit('dispatched', { issue: decision.issue, result }) + await this.#sendCriticalReviewerMessage(record) + return result + } catch (error) { + this.#error(error, decision.issue) + throw error + } + } + + status(): FactoryStatus { + return { + inFlight: this.#batch.inFlight.map((record) => record.issue), + queued: this.#batch.queued.map((queued) => queued.issue), + counters: { ...this.#counters }, + } + } + + on(event: FactoryEvent, listener: Listener): () => void { + let listeners = this.#listeners.get(event) + if (!listeners) { + listeners = new Set() + this.#listeners.set(event, listeners) + } + listeners.add(listener) + return () => { + listeners?.delete(listener) + } + } + + #wireFleetEvents(): void { + if (!this.#offAgentExit) { + this.#offAgentExit = this.#fleet.onAgentExit((name, reason) => { + void this.#handleAgentExit(name, reason) + }) + } + if (!this.#offDeliveryFailed) { + this.#offDeliveryFailed = this.#fleet.onDeliveryFailed?.((info) => { + void this.#handleDeliveryFailed(info) + }) + } + } + + async #backfillReadyIssues(): Promise { + const page = await this.#mount.getEvents({ limit: READY_EVENTS_LIMIT }) + const eventPaths = page.events.map((event) => event.resource.path).filter(isIssueFilePath) + const treePaths = await this.#readyIssuePaths() + for (const path of new Set([...eventPaths, ...treePaths])) { + await this.#handleChange(path) + } + } + + async #handleChange(path: string): Promise { + if (!isIssueFilePath(path)) { + return + } + + try { + const issue = await this.#readIssue(path) + if (issue?.stateId !== this.#config.stateIds.readyForAgent) { + return + } + + if (this.#batch.isInFlight(issue) || this.#batch.isQueued(issue)) { + return + } + + const decision = await this.triageIssue(issue) + if (decision.confidence === 'low') { + this.#error(new Error(`Low-confidence triage for ${decision.issue.key}; escalation required`), decision.issue) + return + } + + if (!this.#batch.canStart()) { + if (this.#batch.queue(decision, this.#config.dryRun)) { + this.#emit('issue-queued', { issue: decision.issue }) + } + } + } catch (error) { + this.#error(error) + } + } + + async #readyIssuePaths(): Promise { + const paths = new Set() + for (const path of await this.#mount.listTree(ISSUE_ROOT)) { + if (isIssueFilePath(path)) { + paths.add(path) + } + } + for (const path of await this.#mount.listTree(linearByStatePath('ready-for-agent'))) { + if (isIssueFilePath(path)) { + paths.add(path) + } + } + return [...paths].sort() + } + + async #readIssue(path: string): Promise { + try { + const { content } = await this.#mount.readFile(path) + return parseLinearIssue(path, content) + } catch (error) { + this.#logger.warn?.(`Unable to read issue ${path}`, error) + return undefined + } + } + + async #spawnAgent(record: InFlightIssue, spec: AgentSpec, dryRun: boolean): Promise<{ name: string }> { + const invocationId = this.#batch.invocationIdFor(record.issue, spec) + const existing = record.agents.get(spec.name) + if (existing) { + return { name: existing.result?.name ?? spec.name } + } + + if (!this.#batch.shouldSpawn(record, invocationId)) { + return { name: spec.name } + } + + if (dryRun) { + this.#batch.recordDryRun(record, spec, invocationId) + return { name: spec.name } + } + + const roster = await this.#fleet.roster() + if (roster.agents.some((agent) => agent.name === spec.name)) { + this.#batch.recordSpawn(record, spec, invocationId, { name: spec.name, sessionRef: spec.sessionRef }) + return { name: spec.name } + } + + const result = await this.#fleet.spawn({ + name: spec.name, + capability: spec.capability, + node: spec.node ?? 'self', + task: spec.task, + model: spec.model, + cwd: spec.clonePath, + sessionRef: spec.sessionRef, + invocationId, + restartPolicy: spec.restartPolicy ?? defaultRestartPolicy(spec), + channel: spec.channel, + }) + this.#batch.recordSpawn(record, spec, invocationId, result) + return { name: result.name } + } + + async #handleAgentExit(name: string, reason?: string): Promise { + const record = this.#batch.getIssueByAgent(name) + if (!record) { + return + } + + if (isCompletionReason(reason)) { + await this.#completeIssue(record) + return + } + + const tracked = record.agents.get(name) + if (!tracked || record.dryRun) { + return + } + + try { + if (tracked.sessionRef) { + const resumeKey = `${issueKey(record.issue)}:${name}:${tracked.sessionRef}` + if (this.#resumedExitKeys.has(resumeKey)) { + return + } + + const existing = this.#resumeInFlight.get(resumeKey) + if (existing) { + await existing + return + } + + const resume = this.#resumeTrackedAgent(record, name, tracked) + this.#resumeInFlight.set(resumeKey, resume) + try { + await resume + this.#resumedExitKeys.add(resumeKey) + } finally { + this.#resumeInFlight.delete(resumeKey) + } + } else { + const invocationId = `${this.#batch.invocationIdFor(record.issue, tracked.spec)}:restart:${this.#clock.now()}` + const result = await this.#fleet.spawn({ + name: tracked.spec.name, + capability: tracked.spec.capability, + node: tracked.spec.node ?? 'self', + task: tracked.spec.task, + model: tracked.spec.model, + cwd: tracked.spec.clonePath, + sessionRef: tracked.spec.sessionRef, + invocationId, + restartPolicy: defaultRestartPolicy(tracked.spec), + channel: tracked.spec.channel, + }) + this.#batch.recordSpawn(record, tracked.spec, invocationId, result) + } + } catch (error) { + this.#error(error, record.issue) + } + } + + async #resumeTrackedAgent( + record: InFlightIssue, + name: string, + tracked: NonNullable>, + ): Promise { + if (!tracked.sessionRef) { + return + } + + const result = await this.#fleet.resume({ + name, + sessionRef: tracked.sessionRef, + node: tracked.spec.node ?? 'self', + capability: tracked.spec.capability, + }) + tracked.result = result + tracked.sessionRef = result.sessionRef ?? tracked.sessionRef + record.agents.delete(name) + record.agents.set(result.name, tracked) + } + + async #handleDeliveryFailed(info: { to: string; msgId?: string; reason?: string }): Promise { + const critical = this.#criticalMessages.get(info.msgId ?? '') + const record = this.#batch.getIssueByAgent(info.to) + const issue = critical?.issue ?? record?.issue + const error = new Error(`Critical delivery failed to ${info.to}${info.reason ? `: ${info.reason}` : ''}`) + this.#error(error, issue) + + if (critical && this.#fleet.waitForInjected) { + try { + const ack = await this.#fleet.waitForInjected(critical.input, { timeoutMs: 90_000 }) + this.#criticalMessages.set(ack.eventId, critical) + } catch (retryError) { + this.#error(retryError, critical.issue) + } + } + } + + async #sendCriticalReviewerMessage(record: InFlightIssue): Promise { + if (!this.#fleet.waitForInjected) { + return + } + + const reviewer = [...record.agents.values()].find((agent) => agent.spec.role === 'reviewer') + if (!reviewer) { + return + } + + const input = { + to: reviewer.result?.name ?? reviewer.spec.name, + text: `Review is queued for ${record.issue.key}. Watch implementer PR handoff and report readiness.`, + from: 'factory', + data: { issue: record.issue }, + } + const ack = await this.#fleet.waitForInjected(input, { timeoutMs: 90_000 }) + this.#criticalMessages.set(ack.eventId, { issue: record.issue, input }) + } + + async #completeIssue(record: InFlightIssue): Promise { + try { + const issue = await this.#readIssue(record.issue.path) + if (issue) { + await this.#linear.setState(issue, this.#config.stateIds.done) + this.#emit('writeback-verified', { issue: record.issue, path: issue.path }) + } + + if (this.#slack && this.#config.slack) { + const root = await this.#slack.postThread({ + channel: this.#config.slack.channel, + text: `${record.issue.key}: factory agents completed.\nStatus: done\nMerge policy: ${this.#config.mergePolicy}`, + }) + await this.#slack.reply(root.threadId, `${record.issue.key}: Linear state set to done.`) + } + void this.#mergeGate + + for (const agent of record.agents.keys()) { + await this.#fleet.release(agent, 'issue-done') + } + + this.#increment('done') + this.#emit('issue-done', { issue: record.issue }) + const next = this.#batch.complete(record.issue) + if (next) { + await this.dispatch(next.decision, { dryRun: next.dryRun }) + } + } catch (error) { + this.#error(error, record.issue) + } + } + + #emit(event: FactoryEvent, payload: FactoryEventPayload): void { + for (const listener of this.#listeners.get(event) ?? []) { + listener(payload) + } + } + + #error(error: unknown, issue?: IssueRef): void { + this.#increment('errors') + this.#logger.error?.('[factory] error', error) + this.#emit('error', { error, issue }) + } + + #increment(name: string): void { + this.#counters[name] = (this.#counters[name] ?? 0) + 1 + } +} + +export function parseLinearIssue(path: string, content: unknown): LinearIssue { + const parsed = parseJsonContent(content) + const payload = wrappedPayload(parsed) + const wrapper = asRecord(parsed) ?? {} + const state = asRecord(payload.state) + const labels = Array.isArray(payload.labels) + ? payload.labels.map(labelName).filter((label): label is string => Boolean(label)) + : [] + const project = recordName(payload.project) + const team = recordName(payload.team) + const assignee = recordName(payload.assignee) + const key = stringValue(payload.identifier) ?? keyFromPath(path) + const uuid = stringValue(payload.id) ?? stringValue(wrapper.objectId) ?? uuidFromPath(path) ?? key + const stateId = stringValue(payload.stateId) ?? stringValue(state?.id) ?? '' + + return { + uuid, + key, + title: stringValue(payload.title) ?? '', + description: stringValue(payload.description) ?? '', + stateId, + state: state ? { name: stringValue(state.name) ?? '' } : undefined, + labels, + project, + team, + assignee, + path, + raw: asRecord(parsed) ?? payload, + } +} + +const issueRef = (issue: LinearIssue): IssueRef => ({ uuid: issue.uuid, key: issue.key, path: issue.path }) + +const dispatchComment = (decision: TriageDecision, agents: DispatchResult['agents']): string => [ + `Factory dispatch for ${decision.issue.key}`, + `Implementers: ${agents.filter((agent) => agent.role === 'implementer').map((agent) => agent.name).join(', ') || 'none'}`, + `Reviewer: ${agents.find((agent) => agent.role === 'reviewer')?.name ?? 'none'}`, +].join('\n') + +const repoMapFromConfig = (config: FactoryConfig) => { + const repos = new Set([ + ...Object.values(config.repos.byLabel), + ...Object.values(config.repos.byProject), + ...config.repos.keywordRules.map((rule) => rule.repo), + config.repos.default, + ].filter((repo): repo is string => Boolean(repo))) + + return [...repos].map((repo) => ({ + repo, + clonePath: config.repos.clonePaths[repo], + source: 'default' as const, + })) +} + +const isIssueFilePath = (path: string): boolean => + path.startsWith(`${ISSUE_ROOT}/`) && + path.endsWith('.json') && + !path.includes('/comments/') && + !path.includes('/by-state/') + +const keyFromPath = (path: string): string => path.split('/').at(-1)?.split('__')[0] ?? path + +const uuidFromPath = (path: string): string | undefined => path.split('__')[1]?.replace(/\.json$/, '') + +const stringValue = (value: unknown): string | undefined => typeof value === 'string' ? value : undefined + +const recordName = (value: unknown): string | undefined => { + if (typeof value === 'string') { + return value + } + const record = asRecord(value) + return stringValue(record?.name) ?? stringValue(record?.key) ?? stringValue(record?.id) +} + +const labelName = (value: unknown): string | undefined => { + if (typeof value === 'string') { + return value + } + const record = asRecord(value) + return stringValue(record?.name) +} + +const isCompletionReason = (reason?: string): boolean => + reason === 'issue-done' || reason === 'done' || reason === 'completed' + +const defaultRestartPolicy = (spec: AgentSpec): AgentSpec['restartPolicy'] | undefined => + spec.role === 'implementer' ? { maxRestarts: 3, strategy: 'resume' } as AgentSpec['restartPolicy'] : spec.restartPolicy diff --git a/packages/factory-sdk/src/orchestrator/index.ts b/packages/factory-sdk/src/orchestrator/index.ts new file mode 100644 index 00000000..1bf1e82d --- /dev/null +++ b/packages/factory-sdk/src/orchestrator/index.ts @@ -0,0 +1,3 @@ +export { BatchTracker, issueKey } from './batch-tracker' +export type { InFlightIssue, QueuedIssue, TrackedAgent } from './batch-tracker' +export { createFactory, FactoryLoop, parseLinearIssue } from './factory' diff --git a/packages/factory-sdk/src/ports/fleet.ts b/packages/factory-sdk/src/ports/fleet.ts index d7dab7ef..e2477fcc 100644 --- a/packages/factory-sdk/src/ports/fleet.ts +++ b/packages/factory-sdk/src/ports/fleet.ts @@ -28,7 +28,7 @@ export type SendInput = { to: string; text: string; from?: string; data?: Record export interface FleetClient { spawn(input: SpawnInput): Promise - resume(input: { name?: string; sessionRef: string; node?: 'self' | string }): Promise + resume(input: { name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }): Promise release(name: string, reason?: string): Promise roster(): Promise sendMessage(input: SendInput): Promise diff --git a/packages/factory-sdk/src/testing/fakes.ts b/packages/factory-sdk/src/testing/fakes.ts index 4d88b8ff..648fa88d 100644 --- a/packages/factory-sdk/src/testing/fakes.ts +++ b/packages/factory-sdk/src/testing/fakes.ts @@ -9,6 +9,7 @@ import type { SpawnResult, SubscribeOptions, Subscription, + Capability, } from '../ports' type ExitListener = (name: string, reason?: string) => void @@ -107,7 +108,7 @@ export class FakeMountClient implements MountClient { export class FakeFleetClient implements FleetClient { readonly spawns: SpawnInput[] = [] - readonly resumes: Array<{ name?: string; sessionRef: string; node?: 'self' | string }> = [] + readonly resumes: Array<{ name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }> = [] readonly releases: Array<{ name: string; reason?: string }> = [] readonly messages: SendInput[] = [] @@ -122,7 +123,7 @@ export class FakeFleetClient implements FleetClient { return { name: input.name, sessionRef: this.#sessionRefs.get(input.name) ?? input.sessionRef } } - async resume(input: { name?: string; sessionRef: string; node?: 'self' | string }): Promise { + async resume(input: { name?: string; sessionRef: string; node?: 'self' | string; capability?: Capability }): Promise { this.resumes.push(input) const name = input.name ?? input.sessionRef this.#agents.add(name) diff --git a/packages/factory-sdk/src/types.ts b/packages/factory-sdk/src/types.ts index 9fbd5dc2..0035c1d4 100644 --- a/packages/factory-sdk/src/types.ts +++ b/packages/factory-sdk/src/types.ts @@ -1,6 +1,7 @@ import type { FactoryConfig } from './config/schema' import type { AgentSpec, FleetClient, GithubRead, LinearWriteback, MountClient, SlackWriteback } from './ports' import type { Clock, Logger } from './ports/system' +import type { GithubMergeGate } from './github/merge-gate' export interface FactoryPorts { mount: MountClient @@ -9,6 +10,7 @@ export interface FactoryPorts { linear?: LinearWriteback slack?: SlackWriteback github?: GithubRead + mergeGate?: GithubMergeGate logger?: Logger clock?: Clock } @@ -107,8 +109,10 @@ export interface PrSummary { number: number title?: string url?: string + /** Advisory only: mount snapshots can lag live GitHub state. Never use this for merge readiness. */ state?: string - status?: string - checks?: Array<{ name: string; status: string; conclusion?: string }> - mergeable?: boolean | 'unknown' + headRef?: string + baseRef?: string + author?: string + filesChanged?: string[] } diff --git a/packages/factory-sdk/src/writeback/github.ts b/packages/factory-sdk/src/writeback/github.ts index 7d9f2a49..aff7348c 100644 --- a/packages/factory-sdk/src/writeback/github.ts +++ b/packages/factory-sdk/src/writeback/github.ts @@ -18,52 +18,46 @@ const repoDir = (repo: string): string => { const prPath = (repo: string, number: number): string => `/github/repos/${repoDir(repo)}/pulls/by-id/${number}.json` -const checksFromPayload = (payload: Record): Array<{ name: string; status: string; conclusion?: string }> | undefined => { - const checks = payload.checks ?? payload.check_runs ?? payload.status_checks - if (!Array.isArray(checks)) { - return undefined - } - - return checks.map((check, index) => { - const record = asRecord(check) ?? {} - const name = typeof record.name === 'string' - ? record.name - : typeof record.context === 'string' - ? record.context - : `check-${index + 1}` - const status = typeof record.status === 'string' - ? record.status - : typeof record.state === 'string' - ? record.state - : 'unknown' - const conclusion = typeof record.conclusion === 'string' - ? record.conclusion - : undefined - - return { name, status, conclusion } - }) -} - export const MountGithubRead = (mount: MountClient) => ({ async getPr(repo: string, number: number): Promise { const { content } = await mount.readFile(prPath(repo, number)) const payload = wrappedPayload(content) - const state = typeof payload.state === 'string' ? payload.state : undefined - const mergeable = typeof payload.mergeable === 'boolean' - ? payload.mergeable - : payload.mergeable === 'unknown' - ? 'unknown' - : undefined return { repo, - number, + number: numberValue(payload.number) ?? number, title: typeof payload.title === 'string' ? payload.title : undefined, url: typeof payload.url === 'string' ? payload.url : undefined, - state, - status: state, - checks: checksFromPayload(payload), - mergeable, + state: typeof payload.state === 'string' ? payload.state : undefined, + headRef: refName(payload.headRef) ?? refName(payload.head) ?? stringValue(payload.head_ref), + baseRef: refName(payload.baseRef) ?? refName(payload.base) ?? stringValue(payload.base_ref), + author: refName(payload.author) ?? stringValue(payload.user), + filesChanged: filesChanged(payload.files_changed ?? payload.filesChanged ?? payload.files), } }, }) + +const stringValue = (value: unknown): string | undefined => + typeof value === 'string' ? value : undefined + +const numberValue = (value: unknown): number | undefined => + typeof value === 'number' ? value : undefined + +const refName = (value: unknown): string | undefined => { + if (typeof value === 'string') { + return value + } + const record = asRecord(value) + return stringValue(record?.name) ?? stringValue(record?.ref) ?? stringValue(record?.login) +} + +const filesChanged = (value: unknown): string[] | undefined => { + if (!Array.isArray(value)) { + return undefined + } + + const files = value + .map((entry) => typeof entry === 'string' ? entry : stringValue(asRecord(entry)?.path) ?? stringValue(asRecord(entry)?.filename)) + .filter((entry): entry is string => Boolean(entry)) + return files.length > 0 ? files : undefined +} diff --git a/packages/factory-sdk/src/writeback/writeback.test.ts b/packages/factory-sdk/src/writeback/writeback.test.ts index e18f8cb4..37cd1bf2 100644 --- a/packages/factory-sdk/src/writeback/writeback.test.ts +++ b/packages/factory-sdk/src/writeback/writeback.test.ts @@ -195,10 +195,10 @@ describe('MountGithubRead', () => { title: 'Add direct-proxy writeback fast path', state: 'open', url: 'https://github.com/AgentWorkforce/cloud/pull/2086', - checks: [ - { name: 'test', status: 'completed', conclusion: 'success' }, - ], - mergeable: 'unknown', + headRef: { name: 'factory-sdk/w4' }, + baseRef: { name: 'main' }, + author: { login: 'factory-bot' }, + filesChanged: [{ path: 'packages/factory-sdk/src/writeback/github.ts' }], }, }, }) @@ -210,11 +210,10 @@ describe('MountGithubRead', () => { title: 'Add direct-proxy writeback fast path', url: 'https://github.com/AgentWorkforce/cloud/pull/2086', state: 'open', - status: 'open', - checks: [ - { name: 'test', status: 'completed', conclusion: 'success' }, - ], - mergeable: 'unknown', + headRef: 'factory-sdk/w4', + baseRef: 'main', + author: 'factory-bot', + filesChanged: ['packages/factory-sdk/src/writeback/github.ts'], }) }) }) From 96363e87f8ad00f7b2c768172d7892a8e1498894 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Thu, 11 Jun 2026 21:57:44 +0200 Subject: [PATCH 2/3] Fix factory start dispatch and invocation tests --- .../src/orchestrator/factory.test.ts | 127 ++++++++++++++++++ .../factory-sdk/src/orchestrator/factory.ts | 18 ++- packages/factory-sdk/src/testing/fakes.ts | 2 + 3 files changed, 146 insertions(+), 1 deletion(-) diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 0fc0c8a1..a411ac57 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -1,8 +1,10 @@ import { describe, expect, it } from 'vitest' import { FactoryConfigSchema, createFactory, parseLinearIssue, type FactoryConfig, type TriageDecision, type TriageEngine } from '../index' +import type { ChangeEvent } from '../ports' import { FakeFleetClient, FakeMountClient } from '../testing' import type { LinearIssue } from '../types' +import { BatchTracker } from './batch-tracker' const ready = 'b9bec744-b60c-4745-8022-d90d6ab59ae3' const implementing = '39b9881d-1196-4c95-8b80-a20f0c7263f7' @@ -124,6 +126,95 @@ describe('FactoryLoop', () => { expect(factory.status().queued).toEqual([]) }) + it('start backfills ready issues and dispatches when capacity is available', async () => { + const mount = new FakeMountClient({ [issuePath(11)]: issueFile(11) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-11-impl', 'ar-11-review']) + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-11']) + expect(factory.status().queued).toEqual([]) + await factory.stop() + }) + + it('start queues and emits issue-queued when backfill exceeds batch capacity', async () => { + const mount = new FakeMountClient({ + [issuePath(15)]: issueFile(15), + [issuePath(16)]: issueFile(16), + }) + const fleet = new FakeFleetClient() + const factory = createFactory(config({ batchSize: 1 }), { mount, fleet, triage: new StaticTriage() }) + const queued: string[] = [] + factory.on('issue-queued', (payload) => { + if ('issue' in payload && payload.issue) { + queued.push(payload.issue.key) + } + }) + + await factory.start() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-15-impl', 'ar-15-review']) + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-15']) + expect(factory.status().queued.map((issue) => issue.key)).toEqual(['AR-16']) + expect(queued).toEqual(['AR-16']) + await factory.stop() + }) + + it('coalesces concurrent starts into one subscription and dispatch pass', async () => { + const mount = new FakeMountClient({ [issuePath(12)]: issueFile(12) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await Promise.all([factory.start(), factory.start()]) + + expect(mount.subscribeCount).toBe(1) + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-12-impl', 'ar-12-review']) + await factory.stop() + }) + + it('dedupes duplicate subscribe events for an already tracked issue', async () => { + const mount = new FakeMountClient({ [issuePath(17)]: issueFile(17) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start() + mount.emit(changeEvent(issuePath(17), 'event-duplicate-1')) + mount.emit(changeEvent(issuePath(17), 'event-duplicate-2')) + await flush() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-17-impl', 'ar-17-review']) + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-17']) + await factory.stop() + }) + + it('BatchTracker blocks duplicate invocation ids within and across issue records', async () => { + const tracker = new BatchTracker(5) + const decisionA = await new StaticTriage().triage(parseLinearIssue(issuePath(12), issueFile(12))) + const decisionB = await new StaticTriage().triage(parseLinearIssue(issuePath(13), issueFile(13))) + const recordA = tracker.start(decisionA, false) + const recordB = tracker.start(decisionB, false) + const specA = decisionA.implementers[0] + const specB = decisionB.implementers[0] + const invocationId = 'shared-invocation' + + expect(recordA).toBeDefined() + expect(recordB).toBeDefined() + expect(tracker.shouldSpawn(recordA!, invocationId)).toBe(true) + + tracker.recordSpawn(recordA!, specA, invocationId, { name: specA.name }) + + expect(tracker.shouldSpawn(recordA!, invocationId)).toBe(false) + expect(tracker.shouldSpawn(recordB!, invocationId)).toBe(false) + + tracker.complete(decisionA.issue) + + expect(tracker.shouldSpawn(recordB!, invocationId)).toBe(true) + tracker.recordSpawn(recordB!, specB, invocationId, { name: specB.name }) + expect(tracker.shouldSpawn(recordB!, invocationId)).toBe(false) + }) + it('dedupes repeated dispatch by stable invocation id', async () => { const mount = new FakeMountClient({ [issuePath(5)]: issueFile(5) }) const fleet = new FakeFleetClient() @@ -137,6 +228,27 @@ describe('FactoryLoop', () => { expect(new Set(fleet.spawns.map((spawn) => spawn.invocationId)).size).toBe(2) }) + it('dedupes dispatch spawns that retry the same invocation id under different agent names', async () => { + const mount = new FakeMountClient({ [issuePath(14)]: issueFile(14) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + const decision = await factory.triageIssue(parseLinearIssue(issuePath(14), issueFile(14))) + const sharedInvocationId = 'retry-same-invocation' + const duplicateDecision: TriageDecision = { + ...decision, + implementers: [ + { ...decision.implementers[0], invocationId: sharedInvocationId }, + { ...decision.implementers[0], name: 'ar-14-impl-retry', invocationId: sharedInvocationId }, + ], + reviewer: { ...decision.reviewer, invocationId: 'reviewer-invocation' }, + } + + await factory.dispatch(duplicateDecision) + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-14-impl', 'ar-14-review']) + expect(fleet.spawns.map((spawn) => spawn.invocationId)).toEqual([sharedInvocationId, 'reviewer-invocation']) + }) + it('resumes exited open agents by sessionRef with the original capability', async () => { const mount = new FakeMountClient({ [issuePath(6)]: issueFile(6) }) const fleet = new FakeFleetClient() @@ -226,3 +338,18 @@ describe('FactoryLoop', () => { expect(errors[0]).toMatchObject({ issue: { key: 'AR-9' } }) }) }) + +const changeEvent = (path: string, id: string) => ({ + id, + workspace: 'factory-test', + type: 'relayfile.changed', + occurredAt: new Date(0).toISOString(), + resource: { + path, + kind: 'file', + id: path, + provider: 'linear', + }, + summary: {}, + expand: async () => ({ level: 'summary', path, summary: {} }), +}) as unknown as ChangeEvent diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 12871d9f..f2d125b9 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -54,6 +54,7 @@ export class FactoryLoop implements Factory { #subscription?: Subscription #offAgentExit?: () => void #offDeliveryFailed?: () => void + #starting?: Promise #started = false constructor(config: FactoryConfig, ports: FactoryPorts) { @@ -76,6 +77,19 @@ export class FactoryLoop implements Factory { return } + if (this.#starting) { + return this.#starting + } + + this.#starting = this.#start() + try { + await this.#starting + } finally { + this.#starting = undefined + } + } + + async #start(): Promise { const ready = await this.#mount.ensureSubRoot(ISSUE_ROOT, { timeoutMs: 90_000 }) if (ready !== 'ready') { this.#error(new Error(`${ISSUE_ROOT} sub-root is not mounted`)) @@ -264,7 +278,9 @@ export class FactoryLoop implements Factory { return } - if (!this.#batch.canStart()) { + if (this.#batch.canStart()) { + await this.dispatch(decision, { dryRun: this.#config.dryRun }) + } else { if (this.#batch.queue(decision, this.#config.dryRun)) { this.#emit('issue-queued', { issue: decision.issue }) } diff --git a/packages/factory-sdk/src/testing/fakes.ts b/packages/factory-sdk/src/testing/fakes.ts index 648fa88d..7589a761 100644 --- a/packages/factory-sdk/src/testing/fakes.ts +++ b/packages/factory-sdk/src/testing/fakes.ts @@ -18,6 +18,7 @@ type DeliveryFailedListener = (info: { to: string; msgId?: string; reason?: stri export class FakeMountClient implements MountClient { readonly files = new Map() readonly writes: Array<{ path: string; content: unknown }> = [] + subscribeCount = 0 #subscribers = new Set<(event: ChangeEvent) => void>() #events: ChangeEvent[] = [] @@ -51,6 +52,7 @@ export class FakeMountClient implements MountClient { } subscribe(_globs: string[], onChange: (event: ChangeEvent) => void, _opts?: SubscribeOptions): Subscription { + this.subscribeCount += 1 this.#subscribers.add(onChange) return { From d977ba39f0e12cc717941e71e94c867570bfe123 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Thu, 11 Jun 2026 22:14:13 +0200 Subject: [PATCH 3/3] Relax merge gate advisory checks --- .../factory-sdk/src/github/merge-gate.test.ts | 38 ++++++++++++++++++- packages/factory-sdk/src/github/merge-gate.ts | 14 ++++--- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/packages/factory-sdk/src/github/merge-gate.test.ts b/packages/factory-sdk/src/github/merge-gate.test.ts index 5a5c7f81..935841e3 100644 --- a/packages/factory-sdk/src/github/merge-gate.test.ts +++ b/packages/factory-sdk/src/github/merge-gate.test.ts @@ -19,7 +19,7 @@ const live = (overrides: Record = {}) => ({ }) describe('GithubMergeGate', () => { - it('returns READY only for MERGEABLE+CLEAN, matching head, and all checks SUCCESS', async () => { + it('returns READY only for MERGEABLE+CLEAN, matching head, and no blocking checks', async () => { const gate = new GhCliGithubMergeGate(async () => ({ stdout: JSON.stringify(live()) })) await expect(gate.check(input)).resolves.toMatchObject({ @@ -28,6 +28,28 @@ describe('GithubMergeGate', () => { }) }) + it('returns READY for MERGEABLE+CLEAN with neutral, skipped, or expected advisory checks', () => { + expect(evaluateGithubMergeGate(input, live({ + statusCheckRollup: [ + { name: 'required', conclusion: 'SUCCESS' }, + { name: 'advisory-neutral', conclusion: 'NEUTRAL' }, + { name: 'advisory-skipped', conclusion: 'SKIPPED' }, + { name: 'expected-but-nonblocking', conclusion: 'EXPECTED' }, + ], + }))).toMatchObject({ + verdict: 'READY', + ready: true, + }) + }) + + it('refuses when the live head differs from the expected head sha', () => { + expect(evaluateGithubMergeGate(input, live({ headRefOid: 'different-sha' }))).toMatchObject({ + verdict: 'REFUSE', + ready: false, + reason: expect.stringMatching(/head moved/), + }) + }) + it('refuses stale mount-clean snapshots when live GitHub contradicts readiness', () => { const staleMountSnapshot = { mergeable: 'MERGEABLE', @@ -74,7 +96,7 @@ describe('GithubMergeGate', () => { }) }) - it('refuses missing or non-success status checks', () => { + it('refuses missing, blocking, pending, or unknown status checks', () => { expect(evaluateGithubMergeGate(input, live({ statusCheckRollup: [] }))).toMatchObject({ verdict: 'REFUSE', ready: false, @@ -83,5 +105,17 @@ describe('GithubMergeGate', () => { verdict: 'REFUSE', ready: false, }) + expect(evaluateGithubMergeGate(input, live({ statusCheckRollup: [{ status: 'IN_PROGRESS' }] }))).toMatchObject({ + verdict: 'REFUSE', + ready: false, + }) + expect(evaluateGithubMergeGate(input, live({ statusCheckRollup: [{ conclusion: 'UNKNOWN' }] }))).toMatchObject({ + verdict: 'REFUSE', + ready: false, + }) + expect(evaluateGithubMergeGate(input, live({ statusCheckRollup: [{ status: 'COMPLETED' }] }))).toMatchObject({ + verdict: 'REFUSE', + ready: false, + }) }) }) diff --git a/packages/factory-sdk/src/github/merge-gate.ts b/packages/factory-sdk/src/github/merge-gate.ts index f689fe40..09e2eb7f 100644 --- a/packages/factory-sdk/src/github/merge-gate.ts +++ b/packages/factory-sdk/src/github/merge-gate.ts @@ -110,15 +110,15 @@ export function evaluateGithubMergeGate( return refuse('no successful status checks observed', { mergeable, mergeStateStatus, headRefOid, checkStates }) } - const failing = checkStates.filter((state) => state !== 'SUCCESS') - if (failing.length > 0) { - return refuse(`checks not successful: ${failing.join(', ')}`, { mergeable, mergeStateStatus, headRefOid, checkStates }) + const blocking = checkStates.filter(isBlockingCheckState) + if (blocking.length > 0) { + return refuse(`checks not merge-ready: ${blocking.join(', ')}`, { mergeable, mergeStateStatus, headRefOid, checkStates }) } return { verdict: 'READY', ready: true, - reason: 'MERGEABLE+CLEAN with matching head and successful checks', + reason: 'MERGEABLE+CLEAN with matching head and no blocking checks', live: { mergeable, mergeStateStatus, headRefOid, checkStates }, } } @@ -155,10 +155,14 @@ const checkStatesFromRollup = (value: unknown): string[] => { } const status = stringValue(record.status) - return status === 'COMPLETED' ? 'SUCCESS' : status ?? 'UNKNOWN' + return status ?? 'UNKNOWN' }) } +const nonBlockingCheckStates = new Set(['SUCCESS', 'NEUTRAL', 'SKIPPED', 'EXPECTED']) + +const isBlockingCheckState = (state: string): boolean => !nonBlockingCheckStates.has(state) + const asRecord = (value: unknown): Record => value !== null && typeof value === 'object' && !Array.isArray(value) ? value as Record