From cff639c0ac4abb80aa7a0a163758280c1a11f301 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 13 Jun 2026 22:29:08 +0200 Subject: [PATCH 1/4] Add PR-state completion sweep --- .../src/orchestrator/factory.test.ts | 284 ++++++++++++++++++ .../factory-sdk/src/orchestrator/factory.ts | 114 ++++++- 2 files changed, 394 insertions(+), 4 deletions(-) diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 06d2ebdf..b7891b0a 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -84,6 +84,22 @@ const realMergeIssueFile = (n: number, stateId = ready) => realIssueFile(n, stat title: `Real product issue ${n}`, }) +const prFile = ( + number: number, + payload: { title?: string; body?: string; head_ref?: string; isDraft?: boolean } = {}, +) => ({ + provider: 'github', + objectType: 'pull_request', + objectId: String(number), + payload: { + number, + title: payload.title ?? `AR-${number}: test PR`, + body: payload.body ?? '', + head_ref: payload.head_ref ?? `ar-${number}-test`, + isDraft: payload.isDraft, + }, +}) + const slackConfig = (channel = 'C0FACTORY__factory-e2e') => ({ channel, style: 'threaded-summarized' as const, @@ -3076,6 +3092,274 @@ describe('FactoryLoop', () => { expect(alive).toEqual(new Set()) }) + it('PR-state sweep completes wedged synthetic issues, frees batch slots, and dispatches queued work', async () => { + const mount = new FakeMountClient({ + [issuePath(351)]: issueFile(351), + [issuePath(352)]: issueFile(352), + [issuePath(353)]: issueFile(353), + '/github/repos/AgentWorkforce__pear/pulls/by-id/351.json': prFile(351, { + title: 'Add isOdd factory SDK util', + body: 'Linear: AR-351', + head_ref: 'ar-351-is-odd-v11b', + }), + '/github/repos/AgentWorkforce__pear/pulls/by-id/352.json': prFile(352, { + title: 'AR-352: add square utility', + body: '', + head_ref: 'square-util', + }), + }) + const fleet = new FakeFleetClient() + for (const n of [351, 352, 353]) { + fleet.setSessionRef(`ar-${n}-impl`, `session-ar-${n}-impl`) + fleet.setSessionRef(`ar-${n}-review`, `session-ar-${n}-review`) + } + const closeInputs: Array> = [] + const factory = createFactory(config({ batchSize: 2 }), { + mount, + fleet, + triage: new StaticTriage(), + probeCloser: async (input) => { + closeInputs.push(input) + return { repo: input.repo, prNumber: input.prNumber, state: 'CLOSED' } + }, + }) + + await factory.runOnce() + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-351', 'AR-352']) + expect(factory.status().queued.map((issue) => issue.key)).toEqual(['AR-353']) + + for (const n of [351, 352]) { + for (const role of ['impl', 'review']) { + fleet.emitAgentExit(`ar-${n}-${role}`, 'worker_exited') + await flush() + fleet.emitAgentExit(`ar-${n}-${role}`, 'worker_exited') + await flush() + } + } + + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-351', 'AR-352']) + expect(fleet.spawns.map((spawn) => spawn.name)).not.toContain('ar-353-impl') + + await factory.runLoop({ maxIterations: 1 }) + + expect(closeInputs).toEqual([ + { repo: 'AgentWorkforce/pear', prNumber: 351, expectedIssueKey: 'AR-351', requireTitleMarker: false }, + { repo: 'AgentWorkforce/pear', prNumber: 352, expectedIssueKey: 'AR-352', requireTitleMarker: false }, + ]) + expect(fleet.releases.filter((release) => release.reason === 'issue-done').map((release) => release.name)).toEqual([ + 'ar-351-impl', + 'ar-351-review', + 'ar-352-impl', + 'ar-352-review', + ]) + expect(fleet.spawns.map((spawn) => spawn.name)).toContain('ar-353-impl') + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-353']) + expect(factory.status().queued).toEqual([]) + expect(factory.status().counters.completionSweepCompleted).toBe(2) + + await factory.runLoop({ maxIterations: 1 }) + expect(closeInputs).toHaveLength(2) + expect(factory.status().counters.done).toBe(2) + }) + + it('live mode runs the PR-state completion sweep timer', async () => { + vi.useFakeTimers() + try { + const mount = new FakeMountClient({ + [issuePath(241)]: issueFile(241), + '/github/repos/AgentWorkforce__pear/pulls/by-id/241.json': prFile(241, { + title: 'Add live completion sweep coverage', + body: '', + head_ref: 'ar-241-live-completion-sweep', + }), + }) + const fleet = new FakeFleetClient() + const closeInputs: unknown[] = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + probeCloser: async (input) => { + closeInputs.push(input) + return { repo: input.repo, prNumber: input.prNumber, state: 'CLOSED' } + }, + }) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } }) + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(241), issueFile(241)))) + await vi.advanceTimersByTimeAsync(0) + + expect(closeInputs).toEqual([{ + repo: 'AgentWorkforce/pear', + prNumber: 241, + expectedIssueKey: 'AR-241', + requireTitleMarker: false, + }]) + expect(factory.status().inFlight).toEqual([]) + await factory.stop() + } finally { + vi.useRealTimers() + } + }) + + it('coalesces concurrent PR sweep and agent-exit completion triggers', async () => { + const mount = new FakeMountClient({ + [issuePath(354)]: issueFile(354), + '/github/repos/AgentWorkforce__pear/pulls/by-id/354.json': prFile(354, { + title: 'Add idempotent completion coverage', + body: '', + head_ref: 'ar-354-idempotent-completion', + }), + }) + const fleet = new FakeFleetClient() + const closeInputs: Array> = [] + let releaseProbeClose!: () => void + const probeCloseBlocked = new Promise((release) => { + releaseProbeClose = release + }) + let resolveProbeCloseStarted!: () => void + const probeCloseStarted = new Promise((resolve) => { + resolveProbeCloseStarted = resolve + }) + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + linear: stateOnlyLinear(mount), + probeCloser: async (input) => { + closeInputs.push(input) + resolveProbeCloseStarted() + await probeCloseBlocked + return { repo: input.repo, prNumber: input.prNumber, state: 'CLOSED' } + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(354), issueFile(354)))) + const sweep = factory.runLoop({ maxIterations: 1 }) + await probeCloseStarted + fleet.emitAgentExit('ar-354-impl', 'issue-done') + await flush() + releaseProbeClose() + await sweep + + expect(closeInputs).toEqual([{ + repo: 'AgentWorkforce/pear', + prNumber: 354, + expectedIssueKey: 'AR-354', + requireTitleMarker: false, + }]) + expect(fleet.releases.filter((release) => release.reason === 'issue-done').map((release) => release.name)).toEqual([ + 'ar-354-impl', + 'ar-354-review', + ]) + expect(factory.status().counters.done).toBe(1) + expect(factory.status().inFlight).toEqual([]) + }) + + it('PR-state sweep releases a real issue under mergePolicy never without merging', async () => { + const mount = new FakeMountClient({ + [issuePath(240)]: realMergeIssueFile(240), + '/github/repos/AgentWorkforce__pear/pulls/by-id/240.json': prFile(240, { + title: 'Real product issue 240', + body: 'Linear: AR-240', + head_ref: 'ar-240-real-fix', + }), + }) + const fleet = new FakeFleetClient() + const gate = new ScriptedGithubMergeGate([readyMergeVerdict('green-sha')]) + const factory = createFactory(config({ + mergePolicy: 'never', + safety: { requireTitlePrefix: 'Real', requireTeamKey: 'AR' }, + }), { + mount, + fleet, + triage: new StaticTriage(), + linear: stateOnlyLinear(mount), + mergeGate: gate, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(240), realMergeIssueFile(240)))) + await factory.runLoop({ maxIterations: 1 }) + + expect(fleet.releases.map((release) => release.name)).toEqual(['ar-240-impl', 'ar-240-review']) + expect(gate.checks).toEqual([]) + expect(gate.merges).toEqual([]) + expect(factory.status().inFlight).toEqual([]) + expect(mount.writes).toContainEqual({ path: issuePath(240), content: { stateId: done } }) + }) + + it('PR-state sweep merges a real issue only when policy, checks, review, and head are ready', async () => { + const mount = new FakeMountClient({ + [issuePath(242)]: realMergeIssueFile(242), + '/github/repos/AgentWorkforce__pear/pulls/by-id/242.json': prFile(242, { + title: 'Real product issue 242', + body: '', + head_ref: 'ar-242-real-fix', + }), + }) + const fleet = new FakeFleetClient() + const gate = new ScriptedGithubMergeGate([readyMergeVerdict('green-approved-sha')]) + const factory = createFactory(config({ + mergePolicy: 'on-green-with-review', + safety: { requireTitlePrefix: 'Real', requireTeamKey: 'AR' }, + }), { + mount, + fleet, + triage: new StaticTriage(), + linear: stateOnlyLinear(mount), + mergeGate: gate, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(242), realMergeIssueFile(242)))) + await factory.runLoop({ maxIterations: 1 }) + + expect(gate.merges).toEqual([{ + repo: 'AgentWorkforce/pear', + number: 242, + expectedHeadSha: 'green-approved-sha', + }]) + expect(factory.status().counters.mergeGateMerged).toBe(1) + expect(factory.status().inFlight).toEqual([]) + }) + + it('PR-state sweep does not complete on a wrong PR or draft PR', async () => { + const mount = new FakeMountClient({ + [issuePath(250)]: issueFile(250), + [issuePath(251)]: issueFile(251), + '/github/repos/AgentWorkforce__pear/pulls/by-id/250.json': prFile(250, { + title: 'Unrelated cleanup', + body: 'This merely mentions ar-250, but is not the issue PR.', + head_ref: 'docs-cleanup', + }), + '/github/repos/AgentWorkforce__pear/pulls/by-id/251.json': prFile(251, { + title: 'AR-251: draft work', + body: '', + head_ref: 'ar-251-draft-work', + isDraft: true, + }), + }) + const fleet = new FakeFleetClient() + const closeInputs: unknown[] = [] + const factory = createFactory(config({ batchSize: 2 }), { + mount, + fleet, + triage: new StaticTriage(), + probeCloser: async (input) => { + closeInputs.push(input) + return { repo: input.repo, prNumber: input.prNumber, state: 'CLOSED' } + }, + }) + + await factory.runOnce() + await factory.runLoop({ maxIterations: 1 }) + + expect(closeInputs).toEqual([]) + expect(fleet.releases.filter((release) => release.reason === 'issue-done')).toEqual([]) + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-250', 'AR-251']) + expect(factory.status().counters.completionSweepCompleted).toBeUndefined() + expect(factory.status().counters.completionSweepDraftPr).toBe(1) + }) + it('closes synthetic probe PRs even when real auto-merge is enabled', async () => { const markedMount = new FakeMountClient({ [issuePath(19)]: issueFile(19) }) const markedFleet = new FakeFleetClient() diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index fb9e84d2..802af549 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -63,6 +63,7 @@ type DispatchAttemptState = { terminal: boolean backoffUntilMs: number } +type ResolvedIssuePr = { repo: string; prNumber: number; draft?: boolean } type SlackReply = { channelDir: string threadTs: string @@ -77,6 +78,8 @@ const READY_EVENTS_LIMIT = 100 const LIVE_ISSUE_GLOB = `${ISSUE_ROOT}/**` const LIVE_DEDUPE_LIMIT = 5_000 const LIVE_EVENT_DRAIN_BATCH_SIZE = 5 +const COMPLETION_SWEEP_INTERVAL_MS = 15_000 +const COMPLETION_SWEEP_BATCH_SIZE = 2 const STATE_NAME_TO_ID: Record = { 'Ready for Agent': LINEAR_STATE_IDS.readyForAgent, 'Agent Implementing': LINEAR_STATE_IDS.agentImplementing, @@ -156,6 +159,9 @@ export class FactoryLoop implements Factory { readonly #liveEventQueue: ChangeEvent[] = [] #liveEventDrainScheduled = false #liveEventDrainActive = false + #completionSweepTimer?: ReturnType + #completionSweepActive = false + readonly #completionInFlight = new Set() readonly #seenLiveEvents = new Set() #offAgentExit?: () => void #offDeliveryFailed?: () => void @@ -224,6 +230,7 @@ export class FactoryLoop implements Factory { this.#started = true try { await this.#startLiveSubscription(opts.liveSubscription) + this.#scheduleCompletionSweep(0) return } catch (error) { this.#started = false @@ -242,12 +249,15 @@ export class FactoryLoop implements Factory { async stop(): Promise { this.#started = false this.#stopping = true + if (this.#completionSweepTimer) clearTimeout(this.#completionSweepTimer) + this.#completionSweepTimer = undefined await this.#stopLiveHeartbeat('stopping') await this.#releaseInFlightAgents('factory-stopped') if (this.#livePollTimer) clearTimeout(this.#livePollTimer) this.#livePollTimer = undefined this.#livePollInFlight = false this.#liveEventQueue.length = 0 + this.#completionInFlight.clear() const subscription = this.#subscription this.#subscription = undefined await this.#boundedStopTeardown('factory subscription unsubscribe', () => subscription?.unsubscribe()) @@ -551,6 +561,92 @@ export class FactoryLoop implements Factory { await this.#handleChange(path, { requireRealIssue: true }) } + #scheduleCompletionSweep(delayMs = COMPLETION_SWEEP_INTERVAL_MS): void { + if (!this.#started || this.#completionSweepTimer || this.#completionSweepActive) { + return + } + this.#completionSweepTimer = setTimeout(() => { + this.#completionSweepTimer = undefined + void this.#sweepPrStateCompletions('live-timer') + .catch((error: unknown) => { + this.#increment('completionSweepErrors') + this.#logger.warn?.('[factory] PR completion sweep failed', error) + }) + .finally(() => { + if (this.#started) this.#scheduleCompletionSweep() + }) + }, delayMs) + this.#completionSweepTimer.unref?.() + } + + async #sweepPrStateCompletions(reason: 'live-timer' | 'run-loop'): Promise { + if (this.#completionSweepActive) { + return + } + this.#completionSweepActive = true + try { + const records = this.#batch.inFlight + .filter((record) => !record.dryRun && !this.#completionInFlight.has(issueKey(record.issue))) + if (records.length === 0) { + return + } + + this.#increment('completionSweepRuns') + for (let index = 0; index < records.length; index += COMPLETION_SWEEP_BATCH_SIZE) { + const candidates = await Promise.all( + records.slice(index, index + COMPLETION_SWEEP_BATCH_SIZE).map(async (record) => { + const issue = await this.#readIssue(record.issue.path) + if (!issue || !isInFactoryScope(issue, this.#config.safety)) { + return undefined + } + const pr = await this.#completionPrForIssue(issue) + if (!pr) { + this.#increment('completionSweepMissingPr') + return undefined + } + if (pr.draft) { + this.#increment('completionSweepDraftPr') + return undefined + } + return { record, pr } + }), + ) + + for (const candidate of candidates) { + if (!candidate || this.#batch.getIssue(candidate.record.issue) !== candidate.record) { + continue + } + this.#increment('completionSweepCompleted') + this.#logger.info?.('[factory] PR completion sweep completing issue', { + issue: candidate.record.issue.key, + repo: candidate.pr.repo, + prNumber: candidate.pr.prNumber, + reason, + }) + // workaround for relay#1116: agents often exit as worker_exited after opening a PR, + // so PR state is the primary completion signal that frees the batch slot. + await this.#completeIssue(candidate.record) + } + + await this.#refreshLiveHeartbeatIfDue() + if (index + COMPLETION_SWEEP_BATCH_SIZE < records.length) { + await liveEventYield() + } + } + } finally { + this.#completionSweepActive = false + } + } + + async #completionPrForIssue(issue: LinearIssue): Promise { + if (this.#customProbePrResolver) { + return this.#probePrResolver(issue) + } + return resolveIssuePrFromMount(this.#mount, this.#config, issue, { + titleMarker: FACTORY_E2E_MARKER, + }) + } + async runOnce(opts: { dryRun?: boolean } = {}): Promise { const dryRun = opts.dryRun ?? this.#config.dryRun const paths = await this.#readyIssuePaths() @@ -619,6 +715,7 @@ export class FactoryLoop implements Factory { for (let iteration = 0; iteration < maxIterations; iteration += 1) { await this.#writeLoopHeartbeat(heartbeatPath, registryPath, 'running', iteration, maxIterations) try { + await this.#sweepPrStateCompletions('run-loop') reports.push(await this.runOnce({ dryRun: opts.dryRun })) consecutiveFailures = 0 } catch (error) { @@ -1537,6 +1634,11 @@ export class FactoryLoop implements Factory { } async #completeIssue(record: InFlightIssue): Promise { + const completionKey = issueKey(record.issue) + if (this.#completionInFlight.has(completionKey)) { + return + } + this.#completionInFlight.add(completionKey) try { const issue = await this.#readIssue(record.issue.path) if (issue) { @@ -1573,6 +1675,8 @@ export class FactoryLoop implements Factory { await this.#writeInFlightRegistry() } catch (error) { this.#error(error, record.issue) + } finally { + this.#completionInFlight.delete(completionKey) } } @@ -2161,8 +2265,8 @@ const resolveIssuePrFromMount = async ( config: FactoryConfig, issue: LinearIssue, opts: { requireTitleMarker?: boolean; titleMarker?: string } = {}, -): Promise<{ repo: string; prNumber: number } | undefined> => { - const candidates: Array<{ repo: string; prNumber: number; score: number }> = [] +): Promise => { + const candidates: Array = [] for (const repo of reposFromConfig(config)) { for (const path of await mount.listTree(githubPullRoot(repo))) { if (!path.endsWith('.json')) continue @@ -2171,7 +2275,7 @@ const resolveIssuePrFromMount = async ( ? issuePrMatchScore(pr, issue, opts.titleMarker ?? config.safety.requireTitlePrefix, opts) : 0 if (!pr || score <= 0) continue - candidates.push({ repo, prNumber: pr.number, score }) + candidates.push({ repo, prNumber: pr.number, draft: pr.draft, score }) } } @@ -2196,7 +2300,7 @@ const githubPullRoot = (repo: string): string => { const readProbePrCandidate = async ( mount: MountClient, path: string, -): Promise<{ number: number; title: string; body: string; headRef: string } | undefined> => { +): Promise<{ number: number; title: string; body: string; headRef: string; draft?: boolean } | undefined> => { try { const payload = wrappedPayload((await mount.readFile(path)).content) const number = typeof payload.number === 'number' @@ -2208,6 +2312,7 @@ const readProbePrCandidate = async ( title: stringValue(payload.title) ?? '', body: stringValue(payload.body) ?? '', headRef: refName(payload.headRef) ?? refName(payload.head) ?? stringValue(payload.head_ref) ?? '', + draft: booleanValue(payload.isDraft) ?? booleanValue(payload.draft), } } catch { return undefined @@ -2268,6 +2373,7 @@ export const keyFromPath = (path: string): string => const uuidFromPath = (path: string): string | undefined => path.split('__')[1]?.replace(/\.json$/, '') const stringValue = (value: unknown): string | undefined => typeof value === 'string' ? value : undefined +const booleanValue = (value: unknown): boolean | undefined => typeof value === 'boolean' ? value : undefined const stateNameToId = (name: string | undefined): string | undefined => name ? STATE_NAME_TO_ID[name] : undefined From 1f478a0645695990ed105ac5084857eabcec77b2 Mon Sep 17 00:00:00 2001 From: "agent-relay-code[bot]" Date: Sat, 13 Jun 2026 20:56:58 +0000 Subject: [PATCH 2/4] chore: apply pr-reviewer fixes for #287 --- .../src/orchestrator/factory.test.ts | 42 +++++++++++++++++++ .../factory-sdk/src/orchestrator/factory.ts | 1 + 2 files changed, 43 insertions(+) diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index b7891b0a..8b7d62e5 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -3202,6 +3202,47 @@ describe('FactoryLoop', () => { } }) + it('backfill-and-subscribe mode runs the PR-state completion sweep timer', async () => { + vi.useFakeTimers() + try { + const mount = new FakeMountClient({ + [issuePath(243)]: issueFile(243), + '/github/repos/AgentWorkforce__pear/pulls/by-id/243.json': prFile(243, { + title: 'Add default start completion sweep coverage', + body: '', + head_ref: 'ar-243-default-completion-sweep', + }), + }) + const fleet = new FakeFleetClient() + const closeInputs: unknown[] = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + probeCloser: async (input) => { + closeInputs.push(input) + return { repo: input.repo, prNumber: input.prNumber, state: 'CLOSED' } + }, + }) + + await factory.start() + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-243']) + + await vi.advanceTimersByTimeAsync(0) + + expect(closeInputs).toEqual([{ + repo: 'AgentWorkforce/pear', + prNumber: 243, + expectedIssueKey: 'AR-243', + requireTitleMarker: false, + }]) + expect(factory.status().inFlight).toEqual([]) + await factory.stop() + } finally { + vi.useRealTimers() + } + }) + it('coalesces concurrent PR sweep and agent-exit completion triggers', async () => { const mount = new FakeMountClient({ [issuePath(354)]: issueFile(354), @@ -3357,6 +3398,7 @@ describe('FactoryLoop', () => { expect(fleet.releases.filter((release) => release.reason === 'issue-done')).toEqual([]) expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-250', 'AR-251']) expect(factory.status().counters.completionSweepCompleted).toBeUndefined() + expect(factory.status().counters.completionSweepMissingPr).toBe(1) expect(factory.status().counters.completionSweepDraftPr).toBe(1) }) diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 802af549..498b8ae1 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -244,6 +244,7 @@ export class FactoryLoop implements Factory { void this.#handleChange(event.resource.path) }) this.#started = true + this.#scheduleCompletionSweep(0) } async stop(): Promise { From ca112aa9550ad2ea5624ccc60dd6183f8abb6658 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 13 Jun 2026 23:03:19 +0200 Subject: [PATCH 3/4] Add gh fallback for factory PR discovery --- packages/factory-sdk/src/cli/fleet.ts | 9 +- packages/factory-sdk/src/github/index.ts | 1 + packages/factory-sdk/src/github/merge-gate.ts | 2 +- .../src/github/probe-closer.test.ts | 26 ++ .../factory-sdk/src/github/probe-closer.ts | 11 +- packages/factory-sdk/src/index.ts | 1 + .../src/orchestrator/factory.test.ts | 236 ++++++++++++++++++ .../factory-sdk/src/orchestrator/factory.ts | 140 ++++++++++- packages/factory-sdk/src/types.ts | 5 +- 9 files changed, 419 insertions(+), 12 deletions(-) diff --git a/packages/factory-sdk/src/cli/fleet.ts b/packages/factory-sdk/src/cli/fleet.ts index 16eefe19..b564f036 100644 --- a/packages/factory-sdk/src/cli/fleet.ts +++ b/packages/factory-sdk/src/cli/fleet.ts @@ -9,6 +9,7 @@ import { closeProbePr, createFactory, createFleet, + defaultGhRunner, isInFactoryScope, parseLinearIssue, reapFactoryOrphansOnce, @@ -18,6 +19,7 @@ import { type FactoryConfig, type FleetBackend, type FleetClient, + type GhRunner, type MountClient, type ProbeCloser, type RelayfileCloudMountClientConfig, @@ -34,6 +36,7 @@ interface FleetCliDeps { stdout?: Pick stderr?: Pick probeCloser?: ProbeCloser + probePrGhRunner?: GhRunner now?: () => number stopSignalProcessLike?: Pick daemonExit?: (code: number) => void @@ -128,7 +131,11 @@ export async function runFleetCli(argv: string[], deps: FleetCliDeps = {}): Prom return 0 } const mount = await buildMount(loaded, deps) - const factory = (deps.createFactory ?? createFactory)(loaded.config, { mount, fleet }) + const factory = (deps.createFactory ?? createFactory)(loaded.config, { + mount, + fleet, + probePrGhRunner: deps.probePrGhRunner ?? defaultGhRunner, + }) return await runFactoryCommand(command, factory, mount, fleet, loaded.config, globals, out, deps) } } diff --git a/packages/factory-sdk/src/github/index.ts b/packages/factory-sdk/src/github/index.ts index 697f7dca..21f7e8fa 100644 --- a/packages/factory-sdk/src/github/index.ts +++ b/packages/factory-sdk/src/github/index.ts @@ -1,6 +1,7 @@ export { GhCliGithubMergeGate, GithubMergeGate, + defaultGhRunner, evaluateGithubMergeGate, } from './merge-gate' export { diff --git a/packages/factory-sdk/src/github/merge-gate.ts b/packages/factory-sdk/src/github/merge-gate.ts index fec6f402..39df380c 100644 --- a/packages/factory-sdk/src/github/merge-gate.ts +++ b/packages/factory-sdk/src/github/merge-gate.ts @@ -172,7 +172,7 @@ export function evaluateGithubMergeGate( } } -const defaultGhRunner: GhRunner = async (args) => { +export const defaultGhRunner: GhRunner = async (args) => { const { stdout, stderr } = await execFileAsync('gh', args, { maxBuffer: 1024 * 1024 }) return { stdout, stderr } } diff --git a/packages/factory-sdk/src/github/probe-closer.test.ts b/packages/factory-sdk/src/github/probe-closer.test.ts index 6160edf8..e7790a2a 100644 --- a/packages/factory-sdk/src/github/probe-closer.test.ts +++ b/packages/factory-sdk/src/github/probe-closer.test.ts @@ -118,6 +118,32 @@ describe('closeProbePr', () => { ]) }) + it('treats an already-closed probe PR as idempotent success', async () => { + const calls: string[][] = [] + const runner: GhRunner = async (args) => { + calls.push(args) + return { + stdout: JSON.stringify({ + state: 'CLOSED', + headRefName: 'ar-229-is-positive', + title: 'Add isPositive util', + body: '', + }), + } + } + + await expect(closeProbePr({ + repo: 'AgentWorkforce/pear', + prNumber: 279, + expectedIssueKey: 'AR-229', + requireTitleMarker: false, + runner, + })).resolves.toEqual({ repo: 'AgentWorkforce/pear', prNumber: 279, state: 'CLOSED' }) + expect(calls.map((args) => args.slice(0, 3))).toEqual([ + ['pr', 'view', '279'], + ]) + }) + it('refuses a probe that is not tied to the expected issue key before closing', async () => { const calls: string[][] = [] const runner: GhRunner = async (args) => { diff --git a/packages/factory-sdk/src/github/probe-closer.ts b/packages/factory-sdk/src/github/probe-closer.ts index ee76c7b8..697db739 100644 --- a/packages/factory-sdk/src/github/probe-closer.ts +++ b/packages/factory-sdk/src/github/probe-closer.ts @@ -24,7 +24,10 @@ export interface CloseProbePrResult { export async function closeProbePr(input: CloseProbePrInput): Promise { const run = input.runner ?? defaultGhRunner const before = await viewPr(run, input) - assertOpenProbe(before, input) + const beforeState = assertClosableProbe(before, input) + if (beforeState === 'CLOSED') { + return { repo: input.repo, prNumber: input.prNumber, state: 'CLOSED' } + } await run(['pr', 'close', String(input.prNumber), '--repo', input.repo]) @@ -53,9 +56,10 @@ const viewPr = async (run: GhRunner, input: CloseProbePrInput): Promise, input: CloseProbePrInput): void => { +const assertClosableProbe = (live: Record, input: CloseProbePrInput): 'OPEN' | 'CLOSED' => { const state = stringValue(live.state) - if (normalizeState(state) !== 'OPEN') { + const normalized = normalizeState(state) + if (normalized !== 'OPEN' && normalized !== 'CLOSED') { throw new Error(`Refusing to close probe PR #${input.prNumber}: live state is ${state ?? 'unknown'}`) } @@ -69,6 +73,7 @@ const assertOpenProbe = (live: Record, input: CloseProbePrInput if ((input.requireTitleMarker ?? true) && !hasFactoryE2eMarker(title)) { throw new Error(`Refusing to close probe PR #${input.prNumber}: missing ${FACTORY_E2E_MARKER} probe marker`) } + return normalized } const hasFactoryE2eMarker = (title: string): boolean => diff --git a/packages/factory-sdk/src/index.ts b/packages/factory-sdk/src/index.ts index 11755230..2bac73c9 100644 --- a/packages/factory-sdk/src/index.ts +++ b/packages/factory-sdk/src/index.ts @@ -45,6 +45,7 @@ export { GhCliGithubMergeGate, GithubMergeGate, closeProbePr, + defaultGhRunner, evaluateGithubMergeGate, } from './github' export type { diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 8b7d62e5..48cb749b 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -8,6 +8,7 @@ import type { BrokerEvent, SendMessageInput, SpawnPtyInput } from '@agent-relay/ import { FactoryConfigSchema, checkFactoryLoopLiveness, + closeProbePr, createFactory, parseLinearIssue, readFactoryInFlightRegistry, @@ -100,6 +101,18 @@ const prFile = ( }, }) +const ghPr = ( + number: number, + payload: { title?: string; body?: string; headRefName?: string; isDraft?: boolean; state?: string } = {}, +) => ({ + number, + title: payload.title ?? `AR-${number}: test PR`, + body: payload.body ?? '', + headRefName: payload.headRefName ?? `ar-${number}-test`, + isDraft: payload.isDraft ?? false, + state: payload.state ?? 'OPEN', +}) + const slackConfig = (channel = 'C0FACTORY__factory-e2e') => ({ channel, style: 'threaded-summarized' as const, @@ -3402,6 +3415,229 @@ describe('FactoryLoop', () => { expect(factory.status().counters.completionSweepDraftPr).toBe(1) }) + it('PR-state sweep resolves fresh PRs through gh when the mount is missing them', async () => { + const mount = new FakeMountClient({ + [issuePath(355)]: issueFile(355), + [issuePath(356)]: issueFile(356), + [issuePath(357)]: issueFile(357), + }) + const fleet = new FakeFleetClient() + const closeInputs: Array> = [] + const ghCalls: string[][] = [] + const factory = createFactory(config({ batchSize: 2 }), { + mount, + fleet, + triage: new StaticTriage(), + probePrGhRunner: async (args) => { + ghCalls.push(args) + return { + stdout: JSON.stringify([ + ghPr(880, { + title: 'Fix PR-state completion sweep', + body: 'Regression fixture mentions AR-355 but is not its PR.', + headRefName: 'factory-sdk-pr-state-completion-sb-impl3', + state: 'OPEN', + }), + ghPr(855, { + title: 'Add old isOdd util', + body: '', + headRefName: 'ar-355-is-odd-v1', + state: 'CLOSED', + }), + ghPr(856, { + title: 'Add fresh isOdd util', + body: '', + headRefName: 'ar-355-is-odd-v2', + state: 'OPEN', + }), + ghPr(857, { + title: 'AR-356: add square utility', + body: '', + headRefName: 'ar-356-square', + state: 'OPEN', + }), + ]), + } + }, + probeCloser: async (input) => { + closeInputs.push(input) + return { repo: input.repo, prNumber: input.prNumber, state: 'CLOSED' } + }, + }) + + await factory.runOnce() + await factory.runLoop({ maxIterations: 1 }) + + expect(ghCalls).toHaveLength(2) + expect(ghCalls.every((args) => args.includes('--repo') && args.includes('AgentWorkforce/pear'))).toBe(true) + expect(closeInputs).toEqual([ + { repo: 'AgentWorkforce/pear', prNumber: 856, expectedIssueKey: 'AR-355', requireTitleMarker: false }, + { repo: 'AgentWorkforce/pear', prNumber: 857, expectedIssueKey: 'AR-356', requireTitleMarker: false }, + ]) + expect(fleet.spawns.map((spawn) => spawn.name)).toContain('ar-357-impl') + expect(factory.status().counters.probePrGhResolveHits).toBe(2) + }) + + it('gh PR fallback rejects fuzzy over-matches and numeric-prefix collisions', async () => { + const mount = new FakeMountClient({ [issuePath(229)]: issueFile(229) }) + const fleet = new FakeFleetClient() + const closeInputs: unknown[] = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + probePrGhRunner: async () => ({ + stdout: JSON.stringify([ + ghPr(287, { + title: 'Add PR-state completion sweep', + body: 'This fix PR mentions AR-229 in tests but is not its issue PR.', + headRefName: 'factory-sdk-pr-state-completion-sb-impl3', + state: 'OPEN', + }), + ghPr(291, { + title: 'AR-22: wrong issue', + body: 'Linear: AR-22', + headRefName: 'ar-22-9-not-229', + state: 'OPEN', + }), + ghPr(292, { + title: 'AR-229-1: wrong child issue', + body: '', + headRefName: 'ar-229-1-is-positive', + state: 'OPEN', + }), + ghPr(293, { + title: 'AR-2290: wrong prefix', + body: '', + headRefName: 'ar-2290-is-positive', + state: 'OPEN', + }), + ]), + }), + probeCloser: async (input) => { + closeInputs.push(input) + return { repo: input.repo, prNumber: input.prNumber, state: 'CLOSED' } + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(229), issueFile(229)))) + await factory.runLoop({ maxIterations: 1 }) + + expect(closeInputs).toEqual([]) + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-229']) + expect(factory.status().counters.completionSweepMissingPr).toBe(1) + }) + + it('gh PR fallback fails closed when gh is unavailable', async () => { + const mount = new FakeMountClient({ [issuePath(358)]: issueFile(358) }) + const fleet = new FakeFleetClient() + const closeInputs: unknown[] = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + probePrGhRunner: async () => { + throw new Error('gh auth missing') + }, + probeCloser: async (input) => { + closeInputs.push(input) + return { repo: input.repo, prNumber: input.prNumber, state: 'CLOSED' } + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(358), issueFile(358)))) + await factory.runLoop({ maxIterations: 1 }) + + expect(closeInputs).toEqual([]) + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-358']) + expect(factory.status().counters.completionSweepMissingPr).toBe(1) + expect(factory.status().counters.done).toBeUndefined() + }) + + it('gh PR fallback skips draft PRs and backs off repeated unresolved lookups', async () => { + const clock = new ManualClock() + const mount = new FakeMountClient({ [issuePath(359)]: issueFile(359) }) + const fleet = new FakeFleetClient() + const ghCalls: string[][] = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + clock, + probePrGhRunner: async (args) => { + ghCalls.push(args) + return { + stdout: JSON.stringify([ + ghPr(859, { + title: 'AR-359: draft work', + body: '', + headRefName: 'ar-359-draft-work', + isDraft: true, + state: 'OPEN', + }), + ]), + } + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(359), issueFile(359)))) + await factory.runLoop({ maxIterations: 1 }) + await factory.runLoop({ maxIterations: 1 }) + expect(ghCalls).toHaveLength(1) + expect(factory.status().counters.probePrGhBackoffSkips).toBe(1) + expect(factory.status().counters.completionSweepDraftPr).toBe(1) + + clock.advance(60_000) + await factory.runLoop({ maxIterations: 1 }) + expect(ghCalls).toHaveLength(2) + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-359']) + }) + + it('treats already-closed gh-resolved probe PRs as completed instead of re-wedging', async () => { + const mount = new FakeMountClient({ [issuePath(360)]: issueFile(360) }) + const fleet = new FakeFleetClient() + const closeViewCalls: string[][] = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + probePrGhRunner: async () => ({ + stdout: JSON.stringify([ + ghPr(860, { + title: 'Add already closed probe work', + body: '', + headRefName: 'ar-360-closed-work', + state: 'CLOSED', + }), + ]), + }), + probeCloser: (input) => closeProbePr({ + ...input, + runner: async (args) => { + closeViewCalls.push(args) + return { + stdout: JSON.stringify({ + state: 'CLOSED', + title: 'Add already closed probe work', + body: '', + headRefName: 'ar-360-closed-work', + }), + } + }, + }), + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(360), issueFile(360)))) + await factory.runLoop({ maxIterations: 1 }) + + expect(closeViewCalls).toHaveLength(1) + expect(closeViewCalls[0]).toContain('view') + expect(fleet.releases.map((release) => release.reason)).toEqual(['issue-done', 'issue-done']) + expect(factory.status().inFlight).toEqual([]) + expect(factory.status().counters.done).toBe(1) + expect(factory.status().counters.errors).toBeUndefined() + }) + it('closes synthetic probe PRs even when real auto-merge is enabled', async () => { const markedMount = new FakeMountClient({ [issuePath(19)]: issueFile(19) }) const markedFleet = new FakeFleetClient() diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 498b8ae1..583a3791 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -3,7 +3,7 @@ import { dirname } from 'node:path' import { FactoryConfigSchema, type FactoryConfig } from '../config/schema' import { LINEAR_STATE_IDS, linearByStatePath } from '../constants/linear' -import { GithubMergeGate, closeProbePr, type GithubMergeGate as GithubMergeGatePort } from '../github' +import { GithubMergeGate, closeProbePr, type GhRunner, type GithubMergeGate as GithubMergeGatePort } from '../github' import type { AgentPidResolution, AgentSpec, @@ -80,6 +80,8 @@ const LIVE_DEDUPE_LIMIT = 5_000 const LIVE_EVENT_DRAIN_BATCH_SIZE = 5 const COMPLETION_SWEEP_INTERVAL_MS = 15_000 const COMPLETION_SWEEP_BATCH_SIZE = 2 +const PROBE_PR_GH_BACKOFF_MS = 60_000 +const PROBE_PR_GH_CANDIDATE_LIMIT = 200 const STATE_NAME_TO_ID: Record = { 'Ready for Agent': LINEAR_STATE_IDS.readyForAgent, 'Agent Implementing': LINEAR_STATE_IDS.agentImplementing, @@ -106,7 +108,6 @@ const realClock: Clock = { now: () => Date.now(), sleep: (ms) => new Promise((resolve) => setTimeout(resolve, ms)), } - export function createFactory(config: FactoryConfig, ports: FactoryPorts): Factory { return new FactoryLoop(FactoryConfigSchema.parse(config), ports) } @@ -122,6 +123,7 @@ export class FactoryLoop implements Factory { readonly #probeCloser: ProbeCloser readonly #probePrResolver: ProbePrResolver readonly #customProbePrResolver: boolean + readonly #probePrGhRunner: GhRunner readonly #logger: Logger readonly #clock: Clock readonly #processIdentityReader: typeof readProcessIdentity @@ -162,6 +164,8 @@ export class FactoryLoop implements Factory { #completionSweepTimer?: ReturnType #completionSweepActive = false readonly #completionInFlight = new Set() + readonly #probePrGhBackoffUntilMs = new Map() + readonly #probePrResolvedCache = new Map() readonly #seenLiveEvents = new Set() #offAgentExit?: () => void #offDeliveryFailed?: () => void @@ -184,7 +188,8 @@ export class FactoryLoop implements Factory { this.#mergeGate = ports.mergeGate ?? new GithubMergeGate() this.#probeCloser = ports.probeCloser ?? closeProbePr this.#customProbePrResolver = Boolean(ports.probePrResolver) - this.#probePrResolver = ports.probePrResolver ?? ((issue) => resolveIssuePrFromMount(this.#mount, this.#config, issue)) + this.#probePrGhRunner = ports.probePrGhRunner ?? failClosedGhRunner + this.#probePrResolver = ports.probePrResolver ?? ((issue) => this.#resolveIssuePr(issue)) this.#logger = ports.logger ?? console this.#clock = ports.clock ?? realClock this.#processIdentityReader = ports.processIdentityReader ?? readProcessIdentity @@ -607,6 +612,7 @@ export class FactoryLoop implements Factory { } if (pr.draft) { this.#increment('completionSweepDraftPr') + this.#probePrGhBackoffUntilMs.set(issue.key, this.#clock.now() + PROBE_PR_GH_BACKOFF_MS) return undefined } return { record, pr } @@ -643,11 +649,48 @@ export class FactoryLoop implements Factory { if (this.#customProbePrResolver) { return this.#probePrResolver(issue) } - return resolveIssuePrFromMount(this.#mount, this.#config, issue, { + return this.#resolveIssuePr(issue, { titleMarker: FACTORY_E2E_MARKER, }) } + async #resolveIssuePr( + issue: LinearIssue, + opts: { requireTitleMarker?: boolean; titleMarker?: string } = {}, + ): Promise { + const key = issue.key + const now = this.#clock.now() + const cached = this.#probePrResolvedCache.get(key) + if (cached && cached.expiresAtMs > now) { + return cached.pr + } + + const mountPr = await resolveIssuePrFromMount(this.#mount, this.#config, issue, opts) + if (mountPr) { + return mountPr + } + + const backoffUntil = this.#probePrGhBackoffUntilMs.get(key) ?? 0 + if (backoffUntil > now) { + this.#increment('probePrGhBackoffSkips') + return undefined + } + + const ghPr = await resolveIssuePrFromGh(this.#probePrGhRunner, this.#config, issue, opts, this.#logger) + this.#increment('probePrGhResolveAttempts') + if (ghPr) { + this.#probePrGhBackoffUntilMs.delete(key) + if (!ghPr.draft) { + this.#probePrResolvedCache.set(key, { pr: ghPr, expiresAtMs: now + PROBE_PR_GH_BACKOFF_MS }) + } + this.#increment('probePrGhResolveHits') + return ghPr + } + + this.#probePrGhBackoffUntilMs.set(key, now + PROBE_PR_GH_BACKOFF_MS) + return undefined + } + async runOnce(opts: { dryRun?: boolean } = {}): Promise { const dryRun = opts.dryRun ?? this.#config.dryRun const paths = await this.#readyIssuePaths() @@ -1678,6 +1721,8 @@ export class FactoryLoop implements Factory { this.#error(error, record.issue) } finally { this.#completionInFlight.delete(completionKey) + this.#probePrGhBackoffUntilMs.delete(completionKey) + this.#probePrResolvedCache.delete(completionKey) } } @@ -2093,7 +2138,7 @@ export class FactoryLoop implements Factory { async #closeSyntheticProbeIfPresent(issue: LinearIssue): Promise { const probe = this.#customProbePrResolver ? await this.#probePrResolver(issue) - : await resolveIssuePrFromMount(this.#mount, this.#config, issue, { + : await this.#resolveIssuePr(issue, { titleMarker: FACTORY_E2E_MARKER, }) if (!probe) { @@ -2283,6 +2328,69 @@ const resolveIssuePrFromMount = async ( return candidates.sort((a, b) => b.score - a.score || b.prNumber - a.prNumber)[0] } +const resolveIssuePrFromGh = async ( + run: GhRunner, + config: FactoryConfig, + issue: LinearIssue, + opts: { requireTitleMarker?: boolean; titleMarker?: string } = {}, + logger?: Logger, +): Promise => { + const candidates: Array = [] + for (const repo of reposFromConfig(config)) { + let payload: unknown + try { + const result = await run([ + 'pr', + 'list', + '--repo', + repo, + '--state', + 'all', + '--json', + 'number,title,body,headRefName,isDraft,state', + '--limit', + String(PROBE_PR_GH_CANDIDATE_LIMIT), + ]) + if (!result.stdout.trim()) { + logger?.warn?.('[factory] gh PR resolver returned empty output', { issue: issue.key, repo }) + continue + } + payload = parseJsonContent(result.stdout) + } catch (error) { + logger?.warn?.('[factory] gh PR resolver failed', { issue: issue.key, repo, error }) + continue + } + + if (!Array.isArray(payload)) { + logger?.warn?.('[factory] gh PR resolver returned non-array payload', { issue: issue.key, repo }) + continue + } + if (payload.length >= PROBE_PR_GH_CANDIDATE_LIMIT) { + logger?.warn?.('[factory] gh PR resolver hit candidate limit', { issue: issue.key, repo, limit: PROBE_PR_GH_CANDIDATE_LIMIT }) + } + + for (const entry of payload) { + const pr = ghProbePrCandidate(entry) + if (!pr || !containsIssueKey(pr.headRef, issue.key)) continue + const score = issuePrMatchScore(pr, issue, opts.titleMarker ?? config.safety.requireTitlePrefix, opts) + if (score <= 0) continue + candidates.push({ + repo, + prNumber: pr.number, + draft: pr.draft, + score, + open: normalizePrState(pr.state) === 'OPEN', + }) + } + } + + return candidates.sort((a, b) => + b.score - a.score || + Number(b.open) - Number(a.open) || + b.prNumber - a.prNumber + )[0] +} + const reposFromConfig = (config: FactoryConfig): string[] => { const repos = new Set([ ...Object.values(config.repos.byLabel), @@ -2320,6 +2428,23 @@ const readProbePrCandidate = async ( } } +const ghProbePrCandidate = ( + value: unknown, +): { number: number; title: string; body: string; headRef: string; draft?: boolean; state?: string } | undefined => { + const payload = asRecord(value) + if (!payload) return undefined + const number = numberValue(payload.number) + if (typeof number !== 'number' || !Number.isInteger(number) || number <= 0) return undefined + return { + number, + title: stringValue(payload.title) ?? '', + body: stringValue(payload.body) ?? '', + headRef: stringValue(payload.headRefName) ?? '', + draft: booleanValue(payload.isDraft), + state: stringValue(payload.state), + } +} + const issuePrMatchScore = ( pr: { title: string; body: string; headRef: string }, issue: LinearIssue, @@ -2337,6 +2462,10 @@ const issuePrMatchScore = ( const hasTitlePrefix = (title: string, marker: string): boolean => title === marker || title.startsWith(`${marker} `) +const normalizePrState = (state?: string): string | undefined => state?.toUpperCase() + +const failClosedGhRunner: GhRunner = async () => ({ stdout: '[]' }) + const ISSUE_KEY_PATTERN = /^[A-Z]+-\d+$/u const isIssuePathUnderRoot = (path: string): boolean => @@ -2375,6 +2504,7 @@ const uuidFromPath = (path: string): string | undefined => path.split('__')[1]?. const stringValue = (value: unknown): string | undefined => typeof value === 'string' ? value : undefined const booleanValue = (value: unknown): boolean | undefined => typeof value === 'boolean' ? value : undefined +const numberValue = (value: unknown): number | undefined => typeof value === 'number' ? value : undefined const stateNameToId = (name: string | undefined): string | undefined => name ? STATE_NAME_TO_ID[name] : undefined diff --git a/packages/factory-sdk/src/types.ts b/packages/factory-sdk/src/types.ts index 9f04ac0f..ba8c27f8 100644 --- a/packages/factory-sdk/src/types.ts +++ b/packages/factory-sdk/src/types.ts @@ -2,7 +2,7 @@ import type { FactoryConfig } from './config/schema' import type { AgentSpec, FleetClient, GithubRead, LinearWriteback, MountClient, SlackWriteback } from './ports' import type { Clock, Logger } from './ports/system' import type { CloseProbePrInput, CloseProbePrResult } from './github/probe-closer' -import type { GithubMergeGate } from './github/merge-gate' +import type { GhRunner, GithubMergeGate } from './github/merge-gate' import type { AgentProcessFinder, ProcessIdentity } from './orchestrator/process-identity' export interface FactoryPorts { @@ -15,6 +15,7 @@ export interface FactoryPorts { mergeGate?: GithubMergeGate probeCloser?: ProbeCloser probePrResolver?: ProbePrResolver + probePrGhRunner?: GhRunner logger?: Logger clock?: Clock processIdentityReader?: (pid: number) => Promise @@ -196,7 +197,7 @@ export interface PrSummary { filesChanged?: string[] } -export type ProbePrRef = Pick +export type ProbePrRef = Pick & { draft?: boolean } export type ProbePrResolver = (issue: LinearIssue) => Promise From b4d4c49e191338df2d9e5e0b07dbf399aaa035f5 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 13 Jun 2026 23:19:32 +0200 Subject: [PATCH 4/4] Cover gh resolver not-found backoff --- .../src/orchestrator/factory.test.ts | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 48cb749b..1275d5ac 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -3554,6 +3554,47 @@ describe('FactoryLoop', () => { expect(factory.status().counters.done).toBeUndefined() }) + it('gh PR fallback backs off repeated not-found lookups', async () => { + const clock = new ManualClock() + const mount = new FakeMountClient({ [issuePath(361)]: issueFile(361) }) + const fleet = new FakeFleetClient() + const ghCalls: string[][] = [] + const factory = createFactory(config(), { + mount, + fleet, + triage: new StaticTriage(), + clock, + probePrGhRunner: async (args) => { + ghCalls.push(args) + return { + stdout: JSON.stringify([ + ghPr(871, { + title: 'Unrelated factory fix', + body: 'Mentions AR-361 in a loose sentence only.', + headRefName: 'factory-sdk-pr-state-completion-sb-impl3', + state: 'OPEN', + }), + ]), + } + }, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(361), issueFile(361)))) + await factory.runLoop({ maxIterations: 1 }) + await factory.runLoop({ maxIterations: 1 }) + + expect(ghCalls).toHaveLength(1) + expect(factory.status().counters.probePrGhBackoffSkips).toBe(1) + expect(factory.status().counters.completionSweepMissingPr).toBe(2) + + clock.advance(60_000) + await factory.runLoop({ maxIterations: 1 }) + + expect(ghCalls).toHaveLength(2) + expect(factory.status().inFlight.map((issue) => issue.key)).toEqual(['AR-361']) + expect(factory.status().counters.done).toBeUndefined() + }) + it('gh PR fallback skips draft PRs and backs off repeated unresolved lookups', async () => { const clock = new ManualClock() const mount = new FakeMountClient({ [issuePath(359)]: issueFile(359) })