diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index d3906e67..6ed5adb1 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -6650,8 +6650,8 @@ describe('FactoryLoop PR babysitter', () => { expect(states.some((s) => s.stateId === humanReviewStateId)).toBe(false) }) - it('ignores a ready signal when the PR meta shows the PR already merged/closed', async () => { - const issue = realIssueFile(405, ready, { title: 'Real babysitter not ready' }) + it('advances to Done when the PR merged before the babysitter ready signal', async () => { + const issue = realIssueFile(405, ready, { title: 'Real babysitter already merged' }) const mount = new FakeMountClient({ [issuePath(405)]: issue }) seedPrMeta(mount, 'AgentWorkforce/pear', 405, { state: 'closed', merged: true }) const fleet = new FakeFleetClient() @@ -6669,12 +6669,115 @@ describe('FactoryLoop PR babysitter', () => { await vi.waitFor(() => expect(fleet.spawns.map((s) => s.name)).toContain('ar-405-babysit')) fleet.emitAgentMessage({ from: 'ar-405-babysit', target: 'factory', body: '[factory-pr-ready] AR-405' }) - await flush() + await vi.waitFor(() => expect(factory.status().counters.done).toBe(1)) expect(factory.status().counters.humanReview).toBeUndefined() expect(states.some((s) => s.stateId === humanReviewStateId)).toBe(false) - expect(factory.status().inFlight.map((ref) => ref.key)).toEqual(['AR-405']) - expect(fleet.releases).toEqual([]) + expect(states.at(-1)).toEqual({ key: 'AR-405', stateId: done }) + expect(factory.status().inFlight).toEqual([]) + expect(fleet.releases.map((r) => r.name).sort()).toEqual(['ar-405-babysit', 'ar-405-impl', 'ar-405-review']) + }) + + it('advances a persisted Human Review issue to Done on a merged PR metadata event', async () => { + const issue = realIssueFile(408, humanReviewStateId, { title: 'Real merged after review' }) + const mount = new FakeMountClient({ [issuePath(408)]: issue }) + const states: Array<{ key: string; stateId: string }> = [] + const factory = createFactory(babysitterConfig(), { + mount, + fleet: new FakeFleetClient(), + triage: new StaticTriage(), + linear: recordingLinear(states), + }) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } }) + try { + const prPath = '/github/repos/AgentWorkforce/pear/pulls/408/metadata.json' + mount.files.set(prPath, { + content: { + number: 408, + state: 'closed', + merged: true, + title: 'Ship the review gate', + body: 'Linear: AR-408', + head_ref: 'release-review-gate', + url: 'https://github.com/AgentWorkforce/pear/pull/408', + }, + }) + mount.emit(changeEvent(prPath, 'pr-408-merged')) + + await vi.waitFor(() => expect(factory.status().counters.mergedPrAdvancedDone).toBe(1)) + expect(states).toEqual([{ key: 'AR-408', stateId: done }]) + expect(factory.status().counters.done).toBe(1) + } finally { + await factory.stop() + } + }) + + it('suppresses duplicate merged PR events after advancing a Human Review issue', async () => { + const issue = realIssueFile(410, humanReviewStateId, { title: 'Real duplicate merge event' }) + const mount = new FakeMountClient({ [issuePath(410)]: issue }) + const states: Array<{ key: string; stateId: string }> = [] + const factory = createFactory(babysitterConfig(), { + mount, + fleet: new FakeFleetClient(), + triage: new StaticTriage(), + linear: recordingLinear(states), + }) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } }) + try { + const prPath = '/github/repos/AgentWorkforce__pear/pulls/by-id/410.json' + mount.files.set(prPath, { + content: { + number: 410, + state: 'closed', + merged: true, + title: 'AR-410: merged once', + head_ref: 'review-gate', + }, + }) + mount.emit(changeEvent(prPath, 'pr-410-merged-1')) + await vi.waitFor(() => expect(factory.status().counters.mergedPrAdvancedDone).toBe(1)) + + mount.emit(changeEvent(prPath, 'pr-410-merged-2')) + await flush() + + expect(states).toEqual([{ key: 'AR-410', stateId: done }]) + expect(factory.status().counters.mergedPrAdvanceDuplicatesSuppressed).toBe(1) + } finally { + await factory.stop() + } + }) + + it('re-dispatches when an operator moves Human Review back to Ready for Agent', async () => { + vi.useFakeTimers() + try { + const issue = realIssueFile(411, ready, { title: 'Real human review redispatch' }) + const mount = new FakeMountClient({ [issuePath(411)]: issue }) + seedPrMeta(mount, 'AgentWorkforce/pear', 411, { state: 'open', draft: false }) + const fleet = new FakeFleetClient() + const factory = createFactory(babysitterConfig(), { + mount, + fleet, + triage: new StaticTriage(), + probePrResolver: async () => ({ repo: 'AgentWorkforce/pear', prNumber: 411 }), + }) + + const decision = await factory.triageIssue(parseLinearIssue(issuePath(411), issue)) + await factory.dispatch(decision) + fleet.emitAgentExit('ar-411-impl', 'worker_exited') + await vi.waitFor(() => expect(fleet.spawns.map((s) => s.name)).toContain('ar-411-babysit')) + fleet.emitAgentMessage({ from: 'ar-411-babysit', target: 'factory', body: '[factory-pr-ready] AR-411' }) + await vi.waitFor(() => expect(factory.status().counters.humanReview).toBe(1)) + + mount.files.set(issuePath(411), { content: realIssueFile(411, ready, { title: 'Real human review redispatch' }) }) + await factory.runOnce() + + expect(factory.status().counters.dispatchTerminalReopened).toBe(1) + expect(fleet.spawns.filter((spawn) => spawn.name === 'ar-411-impl')).toHaveLength(2) + } finally { + vi.useRealTimers() + } }) it('spawns the babysitter from a PR metadata change event (webhook-driven)', async () => { diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 967ec109..879b74d8 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -197,6 +197,7 @@ export class FactoryLoop implements Factory { readonly #dispatchAttempts = new Map() readonly #canonicalIssueStates = new Map() readonly #dispatchFailureReaperHandoffs = new Map() + readonly #postMergeDoneAdvances = new Set() #slackDegraded = false #slackDegradedReason: string | undefined #slackWritebackFailureDegraded = false @@ -322,6 +323,10 @@ export class FactoryLoop implements Factory { await this.#backfillReadyIssues() this.#subscription = this.#mount.subscribe([`${ISSUE_ROOT}/**/*.json`, LIVE_GITHUB_ISSUE_GLOB], (event) => { + if (isGithubPullFilePath(event.resource.path)) { + void this.#handlePrChange(event.resource.path) + return + } if (isGithubIssueFilePath(event.resource.path)) { void this.#handleGithubIssueChange(event.resource.path, { dryRun: this.#config.dryRun }) return @@ -662,9 +667,7 @@ export class FactoryLoop implements Factory { #prepareLiveEventForDrain(event: ChangeEvent, seenIssueKeys: Set): string | undefined { const path = event.resource.path - // PR change events only matter when the babysitter is enabled; ignore them - // otherwise so the legacy completion path is untouched. - const isPullPath = this.#config.babysitter.enabled && isGithubPullFilePath(path) + const isPullPath = isGithubPullFilePath(path) if (!isIssueFilePath(path) && !isGithubIssueFilePath(path) && !isPullPath) { return undefined } @@ -765,7 +768,7 @@ export class FactoryLoop implements Factory { } async #handlePreparedLiveChange(path: string): Promise { - if (this.#config.babysitter.enabled && isGithubPullFilePath(path)) { + if (isGithubPullFilePath(path)) { await this.#handlePrChange(path) return } @@ -1516,7 +1519,9 @@ export class FactoryLoop implements Factory { #recordCanonicalIssueState(issue: Pick): void { const previousStateId = this.#canonicalIssueStates.get(issue.key) - if (previousStateId === this.#config.stateIds.done && issue.stateId === this.#config.stateIds.readyForAgent) { + const reopenedFromTerminal = previousStateId === this.#config.stateIds.done || + previousStateId === this.#config.stateIds.humanReview + if (reopenedFromTerminal && issue.stateId === this.#config.stateIds.readyForAgent) { const dispatchState = this.#dispatchAttempts.get(issue.key) if (dispatchState?.terminal) { dispatchState.attempts = 0 @@ -2367,13 +2372,21 @@ export class FactoryLoop implements Factory { const headRef = snapshot.headRef ?? '' const record = this.#batch.inFlight.find((candidate) => !candidate.dryRun && headRef && containsIssueKey(headRef, candidate.issue.key)) + + if (prMetaShowsMerged(snapshot)) { + await this.#advanceMergedPrToDone(snapshot, record) + return + } + + if (!this.#config.babysitter.enabled) { + return + } + if (!record) { return } if (snapshot.state && snapshot.state.trim().toUpperCase() !== 'OPEN') { - // Closed/merged PR — completion of Human Review -> Done stays with the - // merge gate / the operator; nothing for the babysitter to spawn. return } if (snapshot.draft) { @@ -2384,6 +2397,77 @@ export class FactoryLoop implements Factory { await this.#ensureBabysitter(record, { repo: `${parts.owner}/${parts.repo}`, prNumber: snapshot.number, url: snapshot.url, path }) } + async #advanceMergedPrToDone(snapshot: PullSnapshot, record?: InFlightIssue): Promise { + if (record) { + await this.#completeIssue(record, { targetState: 'done', runMergeGate: false }) + return + } + + const issue = await this.#findMergeAdvanceIssueForPr(snapshot) + if (!issue) { + this.#increment('mergedPrAdvanceNoIssue') + return + } + const advanceKey = `${issue.key}:${snapshot.number}` + if (this.#postMergeDoneAdvances.has(advanceKey)) { + this.#increment('mergedPrAdvanceDuplicatesSuppressed') + return + } + this.#postMergeDoneAdvances.add(advanceKey) + + try { + await this.#linear.setState(issue, this.#config.stateIds.done) + this.#recordCanonicalIssueState({ key: issue.key, stateId: this.#config.stateIds.done }) + this.#emit('writeback-verified', { issue: issueRef(issue), path: issue.path }) + this.#increment('mergedPrAdvancedDone') + this.#increment('done') + this.#logger.info?.('[factory] merged PR advanced issue to Done', { + issue: issue.key, + prNumber: snapshot.number, + url: snapshot.url, + }) + + if (this.#slack && this.#config.slack && !await this.#shouldSkipSlackWriteback('merge-done-thread')) { + try { + const root = await this.#slack.postThread({ + channel: this.#config.slack.channel, + text: `${issue.key}: PR merged; Linear state set to done.`, + }) + await this.#slack.reply(root.threadId, `${issue.key}: Linear state set to done.`) + this.#recordSlackWritebackSuccess('merge-done-thread') + } catch (error) { + this.#markSlackWritebackFailure('merge-done-thread', error) + } + } + } catch (error) { + this.#postMergeDoneAdvances.delete(advanceKey) + this.#error(error, issueRef(issue)) + } + } + + async #findMergeAdvanceIssueForPr(snapshot: PullSnapshot): Promise { + const upstreamStates = new Set([ + this.#config.stateIds.agentImplementing, + this.#config.stateIds.humanReview, + ].filter((stateId): stateId is string => Boolean(stateId))) + for (const path of await this.#mount.listTree(ISSUE_ROOT)) { + if (!isIssueFilePath(path)) { + continue + } + const issue = await this.#readIssue(path) + if (!issue || !upstreamStates.has(issue.stateId)) { + continue + } + if (!isRealLinearIssue(issue) || !isInFactoryScope(issue, this.#config.safety)) { + continue + } + if (prSnapshotReferencesIssue(snapshot, issue.key)) { + return issue + } + } + return undefined + } + // Safety net for a missed PR-open mount event: resolve the PR via the existing // probe resolver and spawn the babysitter. Triggered by an implementer exiting // after opening its PR (an event, not a poll). @@ -2484,6 +2568,10 @@ export class FactoryLoop implements Factory { if (snapshot) { const guard = prMetaAllowsHumanReview(snapshot) if (!guard.ok) { + if (prMetaShowsMerged(snapshot)) { + await this.#advanceMergedPrToDone(snapshot, record) + return + } this.#increment('babysitterReadinessGuardBlocked') this.#logger.info?.('[factory] babysitter ready signal ignored; PR meta not eligible', { issue: record.issue.key, @@ -2546,7 +2634,7 @@ export class FactoryLoop implements Factory { return found } - async #completeIssue(record: InFlightIssue, opts: { humanReview?: boolean } = {}): Promise { + async #completeIssue(record: InFlightIssue, opts: { humanReview?: boolean; targetState?: 'configured' | 'done'; runMergeGate?: boolean } = {}): Promise { const completionKey = issueKey(record.issue) if (this.#completionInFlight.has(completionKey)) { return @@ -2556,7 +2644,8 @@ export class FactoryLoop implements Factory { // state AND configured its UUID; otherwise fall back to `done` (the legacy // terminal) so an operator who sets terminalState: 'done' keeps it and the // issue never gets stuck waiting on an unconfigured state. - const humanReview = opts.humanReview === true && + const humanReview = opts.targetState !== 'done' && + opts.humanReview === true && this.#config.terminalState === 'human-review' && Boolean(this.#config.stateIds.humanReview) const targetState = humanReview ? this.#config.stateIds.humanReview! : this.#config.stateIds.done @@ -2584,7 +2673,7 @@ export class FactoryLoop implements Factory { // Only auto-merge on the `done` terminal path. Human Review parks the PR // for an operator — the merge gate (which requires an APPROVED review) // would refuse anyway, and we must not merge before the human has looked. - if (issue && !humanReview) { + if (issue && !humanReview && opts.runMergeGate !== false) { await this.#runCompletionMergeGate(issue) } @@ -3671,7 +3760,7 @@ const githubPullPathParts = (path: string): { owner: string; repo: string; numbe const isGithubPullFilePath = (path: string): boolean => githubPullPathParts(path) !== undefined -type PullSnapshot = { number: number; state?: string; headRef?: string; draft?: boolean; url?: string; title?: string; merged?: boolean } +type PullSnapshot = { number: number; state?: string; headRef?: string; draft?: boolean; url?: string; title?: string; body?: string; merged?: boolean } const parsePullSnapshot = (content: unknown, fallbackNumber: number): PullSnapshot | undefined => { const payload = wrappedPayload(content) @@ -3684,10 +3773,20 @@ const parsePullSnapshot = (content: unknown, fallbackNumber: number): PullSnapsh draft: booleanValue(payload.isDraft) ?? booleanValue(payload.draft), url: stringValue(payload.url) ?? stringValue(payload.html_url), title: stringValue(payload.title), + body: stringValue(payload.body), merged: booleanValue(payload.merged), } } +const prSnapshotReferencesIssue = (snapshot: PullSnapshot, issueKey: string): boolean => + containsIssueKey(snapshot.headRef ?? '', issueKey) || + containsIssueKey(snapshot.title ?? '', issueKey) || + containsExplicitIssueReference(snapshot.body ?? '', issueKey) + +const prMetaShowsMerged = (snapshot: PullSnapshot): boolean => + snapshot.merged === true || + snapshot.state?.trim().toUpperCase() === 'MERGED' + // Guard applied to the babysitter's ready signal: the PR's own webhook-fed meta // must still be eligible for human review. A missing `state` is treated as open // (older mount layouts omit it). The CI/conflict/review verdict is NOT re-derived