Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 108 additions & 5 deletions packages/factory-sdk/src/orchestrator/factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6650,8 +6650,8 @@ describe('FactoryLoop PR babysitter', () => {
expect(states.some((s) => s.stateId === humanReviewStateId)).toBe(false)
})

it('ignores a ready signal when the PR meta shows the PR already merged/closed', async () => {
const issue = realIssueFile(405, ready, { title: 'Real babysitter not ready' })
it('advances to Done when the PR merged before the babysitter ready signal', async () => {
const issue = realIssueFile(405, ready, { title: 'Real babysitter already merged' })
const mount = new FakeMountClient({ [issuePath(405)]: issue })
seedPrMeta(mount, 'AgentWorkforce/pear', 405, { state: 'closed', merged: true })
const fleet = new FakeFleetClient()
Expand All @@ -6669,12 +6669,115 @@ describe('FactoryLoop PR babysitter', () => {
await vi.waitFor(() => expect(fleet.spawns.map((s) => s.name)).toContain('ar-405-babysit'))

fleet.emitAgentMessage({ from: 'ar-405-babysit', target: 'factory', body: '[factory-pr-ready] AR-405' })
await flush()

await vi.waitFor(() => expect(factory.status().counters.done).toBe(1))
expect(factory.status().counters.humanReview).toBeUndefined()
expect(states.some((s) => s.stateId === humanReviewStateId)).toBe(false)
expect(factory.status().inFlight.map((ref) => ref.key)).toEqual(['AR-405'])
expect(fleet.releases).toEqual([])
expect(states.at(-1)).toEqual({ key: 'AR-405', stateId: done })
expect(factory.status().inFlight).toEqual([])
expect(fleet.releases.map((r) => r.name).sort()).toEqual(['ar-405-babysit', 'ar-405-impl', 'ar-405-review'])
})

it('advances a persisted Human Review issue to Done on a merged PR metadata event', async () => {
const issue = realIssueFile(408, humanReviewStateId, { title: 'Real merged after review' })
const mount = new FakeMountClient({ [issuePath(408)]: issue })
const states: Array<{ key: string; stateId: string }> = []
const factory = createFactory(babysitterConfig(), {
mount,
fleet: new FakeFleetClient(),
triage: new StaticTriage(),
linear: recordingLinear(states),
})

await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } })
try {
const prPath = '/github/repos/AgentWorkforce/pear/pulls/408/metadata.json'
mount.files.set(prPath, {
content: {
number: 408,
state: 'closed',
merged: true,
title: 'Ship the review gate',
body: 'Linear: AR-408',
head_ref: 'release-review-gate',
url: 'https://github.com/AgentWorkforce/pear/pull/408',
},
})
mount.emit(changeEvent(prPath, 'pr-408-merged'))

await vi.waitFor(() => expect(factory.status().counters.mergedPrAdvancedDone).toBe(1))
expect(states).toEqual([{ key: 'AR-408', stateId: done }])
expect(factory.status().counters.done).toBe(1)
} finally {
await factory.stop()
}
})

it('suppresses duplicate merged PR events after advancing a Human Review issue', async () => {
const issue = realIssueFile(410, humanReviewStateId, { title: 'Real duplicate merge event' })
const mount = new FakeMountClient({ [issuePath(410)]: issue })
const states: Array<{ key: string; stateId: string }> = []
const factory = createFactory(babysitterConfig(), {
mount,
fleet: new FakeFleetClient(),
triage: new StaticTriage(),
linear: recordingLinear(states),
})

await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } })
try {
const prPath = '/github/repos/AgentWorkforce__pear/pulls/by-id/410.json'
mount.files.set(prPath, {
content: {
number: 410,
state: 'closed',
merged: true,
title: 'AR-410: merged once',
head_ref: 'review-gate',
},
})
mount.emit(changeEvent(prPath, 'pr-410-merged-1'))
await vi.waitFor(() => expect(factory.status().counters.mergedPrAdvancedDone).toBe(1))

mount.emit(changeEvent(prPath, 'pr-410-merged-2'))
await flush()

expect(states).toEqual([{ key: 'AR-410', stateId: done }])
expect(factory.status().counters.mergedPrAdvanceDuplicatesSuppressed).toBe(1)
} finally {
await factory.stop()
}
})

it('re-dispatches when an operator moves Human Review back to Ready for Agent', async () => {
vi.useFakeTimers()
try {
const issue = realIssueFile(411, ready, { title: 'Real human review redispatch' })
const mount = new FakeMountClient({ [issuePath(411)]: issue })
seedPrMeta(mount, 'AgentWorkforce/pear', 411, { state: 'open', draft: false })
const fleet = new FakeFleetClient()
const factory = createFactory(babysitterConfig(), {
mount,
fleet,
triage: new StaticTriage(),
probePrResolver: async () => ({ repo: 'AgentWorkforce/pear', prNumber: 411 }),
})

const decision = await factory.triageIssue(parseLinearIssue(issuePath(411), issue))
await factory.dispatch(decision)
fleet.emitAgentExit('ar-411-impl', 'worker_exited')
await vi.waitFor(() => expect(fleet.spawns.map((s) => s.name)).toContain('ar-411-babysit'))
fleet.emitAgentMessage({ from: 'ar-411-babysit', target: 'factory', body: '[factory-pr-ready] AR-411' })
await vi.waitFor(() => expect(factory.status().counters.humanReview).toBe(1))

mount.files.set(issuePath(411), { content: realIssueFile(411, ready, { title: 'Real human review redispatch' }) })
await factory.runOnce()

expect(factory.status().counters.dispatchTerminalReopened).toBe(1)
expect(fleet.spawns.filter((spawn) => spawn.name === 'ar-411-impl')).toHaveLength(2)
} finally {
vi.useRealTimers()
}
})

it('spawns the babysitter from a PR metadata change event (webhook-driven)', async () => {
Expand Down
121 changes: 110 additions & 11 deletions packages/factory-sdk/src/orchestrator/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ export class FactoryLoop implements Factory {
readonly #dispatchAttempts = new Map<string, DispatchAttemptState>()
readonly #canonicalIssueStates = new Map<string, string>()
readonly #dispatchFailureReaperHandoffs = new Map<string, RegistryHandoffAgent>()
readonly #postMergeDoneAdvances = new Set<string>()
#slackDegraded = false
#slackDegradedReason: string | undefined
#slackWritebackFailureDegraded = false
Expand Down Expand Up @@ -322,6 +323,10 @@ export class FactoryLoop implements Factory {

await this.#backfillReadyIssues()
this.#subscription = this.#mount.subscribe([`${ISSUE_ROOT}/**/*.json`, LIVE_GITHUB_ISSUE_GLOB], (event) => {
if (isGithubPullFilePath(event.resource.path)) {
void this.#handlePrChange(event.resource.path)
return
}
if (isGithubIssueFilePath(event.resource.path)) {
void this.#handleGithubIssueChange(event.resource.path, { dryRun: this.#config.dryRun })
return
Expand Down Expand Up @@ -662,9 +667,7 @@ export class FactoryLoop implements Factory {

#prepareLiveEventForDrain(event: ChangeEvent, seenIssueKeys: Set<string>): string | undefined {
const path = event.resource.path
// PR change events only matter when the babysitter is enabled; ignore them
// otherwise so the legacy completion path is untouched.
const isPullPath = this.#config.babysitter.enabled && isGithubPullFilePath(path)
const isPullPath = isGithubPullFilePath(path)
if (!isIssueFilePath(path) && !isGithubIssueFilePath(path) && !isPullPath) {
return undefined
}
Expand Down Expand Up @@ -765,7 +768,7 @@ export class FactoryLoop implements Factory {
}

async #handlePreparedLiveChange(path: string): Promise<void> {
if (this.#config.babysitter.enabled && isGithubPullFilePath(path)) {
if (isGithubPullFilePath(path)) {
await this.#handlePrChange(path)
return
}
Expand Down Expand Up @@ -1516,7 +1519,9 @@ export class FactoryLoop implements Factory {

#recordCanonicalIssueState(issue: Pick<LinearIssue, 'key' | 'stateId'>): void {
const previousStateId = this.#canonicalIssueStates.get(issue.key)
if (previousStateId === this.#config.stateIds.done && issue.stateId === this.#config.stateIds.readyForAgent) {
const reopenedFromTerminal = previousStateId === this.#config.stateIds.done ||
previousStateId === this.#config.stateIds.humanReview
Comment on lines +1522 to +1523

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When an issue is ingested for the first time, previousStateId is undefined. If this.#config.stateIds.humanReview is also undefined (which is common if the operator has not configured it), the comparison previousStateId === this.#config.stateIds.humanReview will evaluate to true. This incorrectly flags the issue as reopenedFromTerminal and resets the dispatch attempts. Adding a guard to ensure previousStateId !== undefined prevents this bug.

    const reopenedFromTerminal = previousStateId !== undefined &&
      (previousStateId === this.#config.stateIds.done ||
       previousStateId === this.#config.stateIds.humanReview)

if (reopenedFromTerminal && issue.stateId === this.#config.stateIds.readyForAgent) {
const dispatchState = this.#dispatchAttempts.get(issue.key)
if (dispatchState?.terminal) {
dispatchState.attempts = 0
Expand Down Expand Up @@ -2367,13 +2372,21 @@ export class FactoryLoop implements Factory {
const headRef = snapshot.headRef ?? ''
const record = this.#batch.inFlight.find((candidate) =>
!candidate.dryRun && headRef && containsIssueKey(headRef, candidate.issue.key))

if (prMetaShowsMerged(snapshot)) {
await this.#advanceMergedPrToDone(snapshot, record)
return
}

if (!this.#config.babysitter.enabled) {
return
}

if (!record) {
return
}

if (snapshot.state && snapshot.state.trim().toUpperCase() !== 'OPEN') {
// Closed/merged PR — completion of Human Review -> Done stays with the
// merge gate / the operator; nothing for the babysitter to spawn.
return
}
if (snapshot.draft) {
Expand All @@ -2384,6 +2397,77 @@ export class FactoryLoop implements Factory {
await this.#ensureBabysitter(record, { repo: `${parts.owner}/${parts.repo}`, prNumber: snapshot.number, url: snapshot.url, path })
}

async #advanceMergedPrToDone(snapshot: PullSnapshot, record?: InFlightIssue): Promise<void> {
if (record) {
await this.#completeIssue(record, { targetState: 'done', runMergeGate: false })
return
}

const issue = await this.#findMergeAdvanceIssueForPr(snapshot)
if (!issue) {
this.#increment('mergedPrAdvanceNoIssue')
return
}
const advanceKey = `${issue.key}:${snapshot.number}`
if (this.#postMergeDoneAdvances.has(advanceKey)) {
this.#increment('mergedPrAdvanceDuplicatesSuppressed')
return
}
this.#postMergeDoneAdvances.add(advanceKey)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The #postMergeDoneAdvances Set is used to suppress duplicate merge events, but it grows indefinitely as the long-running daemon processes more PRs over time. To prevent a slow memory leak, we should bound the Set's size by evicting the oldest entry when it exceeds a reasonable threshold.

    this.#postMergeDoneAdvances.add(advanceKey)
    if (this.#postMergeDoneAdvances.size > 5000) {
      const oldest = this.#postMergeDoneAdvances.values().next().value
      if (oldest !== undefined) {
        this.#postMergeDoneAdvances.delete(oldest)
      }
    }


try {
await this.#linear.setState(issue, this.#config.stateIds.done)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Complete matched in-flight issues through the batch

When a merged PR's branch name does not contain the issue key but its title/body does, record is undefined even though #findMergeAdvanceIssueForPr can still find the Agent Implementing issue from the mount. This direct setState marks Linear Done without going through #completeIssue, so the implementer/reviewer are not released and the batch slot remains in-flight. Please check the matched issue against #batch and complete that record before taking the direct persisted-state path.

Useful? React with 👍 / 👎.

this.#recordCanonicalIssueState({ key: issue.key, stateId: this.#config.stateIds.done })
this.#emit('writeback-verified', { issue: issueRef(issue), path: issue.path })
this.#increment('mergedPrAdvancedDone')
this.#increment('done')
this.#logger.info?.('[factory] merged PR advanced issue to Done', {
issue: issue.key,
prNumber: snapshot.number,
url: snapshot.url,
})

if (this.#slack && this.#config.slack && !await this.#shouldSkipSlackWriteback('merge-done-thread')) {
try {
const root = await this.#slack.postThread({
channel: this.#config.slack.channel,
text: `${issue.key}: PR merged; Linear state set to done.`,
})
await this.#slack.reply(root.threadId, `${issue.key}: Linear state set to done.`)
this.#recordSlackWritebackSuccess('merge-done-thread')
} catch (error) {
this.#markSlackWritebackFailure('merge-done-thread', error)
}
}
} catch (error) {
this.#postMergeDoneAdvances.delete(advanceKey)
this.#error(error, issueRef(issue))
}
}

async #findMergeAdvanceIssueForPr(snapshot: PullSnapshot): Promise<LinearIssue | undefined> {
const upstreamStates = new Set([
this.#config.stateIds.agentImplementing,
this.#config.stateIds.humanReview,
].filter((stateId): stateId is string => Boolean(stateId)))
for (const path of await this.#mount.listTree(ISSUE_ROOT)) {
if (!isIssueFilePath(path)) {
continue
}
const issue = await this.#readIssue(path)
if (!issue || !upstreamStates.has(issue.stateId)) {
continue
}
if (!isRealLinearIssue(issue) || !isInFactoryScope(issue, this.#config.safety)) {
continue
}
if (prSnapshotReferencesIssue(snapshot, issue.key)) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Restrict merged PR advancement to configured repos

Because the live/default subscription watches all /github/repos/** PR paths, this content-only issue-key match can advance a Human Review or Agent Implementing Linear issue from a merged PR in an unrelated mounted repository that merely mentions the same key. The existing PR resolvers limit candidates to configured repos; carry the repo from githubPullPathParts into this lookup and reject repos outside the factory route/config before accepting the match.

Useful? React with 👍 / 👎.

return issue
}
}
Comment on lines +2453 to +2467

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Currently, #findMergeAdvanceIssueForPr reads every single issue file in the repository (await this.#readIssue(path)) to check if it matches the PR snapshot. In repositories with many issues, this O(N) disk/mount I/O operation on every PR merge event will cause significant latency and block the event loop.

Since keyFromPath(path) is a lightweight string operation, we can check if the PR snapshot references the issue key before reading the file. This optimizes the search to O(1) file reads.

    for (const path of await this.#mount.listTree(ISSUE_ROOT)) {
      if (!isIssueFilePath(path)) {
        continue
      }
      const issueKey = keyFromPath(path)
      if (!prSnapshotReferencesIssue(snapshot, issueKey)) {
        continue
      }
      const issue = await this.#readIssue(path)
      if (!issue || !upstreamStates.has(issue.stateId)) {
        continue
      }
      if (!isRealLinearIssue(issue) || !isInFactoryScope(issue, this.#config.safety)) {
        continue
      }
      return issue
    }

return undefined
}

// Safety net for a missed PR-open mount event: resolve the PR via the existing
// probe resolver and spawn the babysitter. Triggered by an implementer exiting
// after opening its PR (an event, not a poll).
Expand Down Expand Up @@ -2484,6 +2568,10 @@ export class FactoryLoop implements Factory {
if (snapshot) {
const guard = prMetaAllowsHumanReview(snapshot)
if (!guard.ok) {
if (prMetaShowsMerged(snapshot)) {
await this.#advanceMergedPrToDone(snapshot, record)
return
}
this.#increment('babysitterReadinessGuardBlocked')
this.#logger.info?.('[factory] babysitter ready signal ignored; PR meta not eligible', {
issue: record.issue.key,
Expand Down Expand Up @@ -2546,7 +2634,7 @@ export class FactoryLoop implements Factory {
return found
}

async #completeIssue(record: InFlightIssue, opts: { humanReview?: boolean } = {}): Promise<void> {
async #completeIssue(record: InFlightIssue, opts: { humanReview?: boolean; targetState?: 'configured' | 'done'; runMergeGate?: boolean } = {}): Promise<void> {
const completionKey = issueKey(record.issue)
if (this.#completionInFlight.has(completionKey)) {
return
Expand All @@ -2556,7 +2644,8 @@ export class FactoryLoop implements Factory {
// state AND configured its UUID; otherwise fall back to `done` (the legacy
// terminal) so an operator who sets terminalState: 'done' keeps it and the
// issue never gets stuck waiting on an unconfigured state.
const humanReview = opts.humanReview === true &&
const humanReview = opts.targetState !== 'done' &&
opts.humanReview === true &&
this.#config.terminalState === 'human-review' &&
Boolean(this.#config.stateIds.humanReview)
const targetState = humanReview ? this.#config.stateIds.humanReview! : this.#config.stateIds.done
Expand Down Expand Up @@ -2584,7 +2673,7 @@ export class FactoryLoop implements Factory {
// Only auto-merge on the `done` terminal path. Human Review parks the PR
// for an operator — the merge gate (which requires an APPROVED review)
// would refuse anyway, and we must not merge before the human has looked.
if (issue && !humanReview) {
if (issue && !humanReview && opts.runMergeGate !== false) {
await this.#runCompletionMergeGate(issue)
}

Expand Down Expand Up @@ -3671,7 +3760,7 @@ const githubPullPathParts = (path: string): { owner: string; repo: string; numbe
const isGithubPullFilePath = (path: string): boolean =>
githubPullPathParts(path) !== undefined

type PullSnapshot = { number: number; state?: string; headRef?: string; draft?: boolean; url?: string; title?: string; merged?: boolean }
type PullSnapshot = { number: number; state?: string; headRef?: string; draft?: boolean; url?: string; title?: string; body?: string; merged?: boolean }

const parsePullSnapshot = (content: unknown, fallbackNumber: number): PullSnapshot | undefined => {
const payload = wrappedPayload(content)
Expand All @@ -3684,10 +3773,20 @@ const parsePullSnapshot = (content: unknown, fallbackNumber: number): PullSnapsh
draft: booleanValue(payload.isDraft) ?? booleanValue(payload.draft),
url: stringValue(payload.url) ?? stringValue(payload.html_url),
title: stringValue(payload.title),
body: stringValue(payload.body),
merged: booleanValue(payload.merged),
}
}

const prSnapshotReferencesIssue = (snapshot: PullSnapshot, issueKey: string): boolean =>
containsIssueKey(snapshot.headRef ?? '', issueKey) ||
containsIssueKey(snapshot.title ?? '', issueKey) ||
containsExplicitIssueReference(snapshot.body ?? '', issueKey)

const prMetaShowsMerged = (snapshot: PullSnapshot): boolean =>
snapshot.merged === true ||
snapshot.state?.trim().toUpperCase() === 'MERGED'

// Guard applied to the babysitter's ready signal: the PR's own webhook-fed meta
// must still be eligible for human review. A missing `state` is treated as open
// (older mount layouts omit it). The CI/conflict/review verdict is NOT re-derived
Expand Down
Loading