Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions packages/factory-sdk/src/orchestrator/factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,21 @@ class SpawnFailingFleetClient extends FakeFleetClient {
}
}

// Mimics the broker rejecting a resume because it never released the agent's
// name on exit (relay#1116-family): http 500 "agent '<name>' already exists".
class ResumeNameCollisionFleetClient extends FakeFleetClient {
override async resume(input: Parameters<FakeFleetClient['resume']>[0]): Promise<SpawnResult> {
this.resumes.push(input)
const name = input.name ?? input.sessionRef
throw Object.assign(new Error(`agent '${name}' already exists`), {
code: 'http_500',
status: 500,
retryable: true,
data: { error: `agent '${name}' already exists`, name, success: false },
})
}
}

class ManualClock {
value = 0

Expand Down Expand Up @@ -3092,6 +3107,25 @@ describe('FactoryLoop', () => {
expect(factory.status().inFlight).toEqual([])
})

it('does not loop when resume hits a leaked broker name ("already exists")', async () => {
const mount = new FakeMountClient({ [issuePath(80)]: issueFile(80) })
const fleet = new ResumeNameCollisionFleetClient()
fleet.setSessionRef('ar-80-review', 'session-review-80')
const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() })
const decision = await factory.triageIssue(parseLinearIssue(issuePath(80), issueFile(80)))

await factory.dispatch(decision)
fleet.emitAgentExit('ar-80-review', 'crash')
await flush()
// A second exit event must NOT trigger another resume attempt.
fleet.emitAgentExit('ar-80-review', 'crash')
await flush()

expect(fleet.resumes).toHaveLength(1) // resumed once, collided, then short-circuited
expect(factory.status().counters.resumeNameCollisions).toBe(1)
expect(factory.status().counters.errors ?? 0).toBe(0) // not surfaced as a hard error
})

it('does not complete on an implementer exit when only a draft PR exists', async () => {
const issue = realIssueFile(256, ready, { title: 'Real implementer draft PR exit' })
const mount = new FakeMountClient({ [issuePath(256)]: issue })
Expand Down
29 changes: 29 additions & 0 deletions packages/factory-sdk/src/orchestrator/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,24 @@ export class FactoryLoop implements Factory {
try {
await resume
this.#resumedExitKeys.add(resumeKey)
} catch (error) {
if (isAgentAlreadyExistsError(error)) {
Comment on lines +1606 to +1607

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Share the collision-swallowing promise with duplicate exits

When two exit callbacks for the same issue/name/sessionRef arrive before the first resume settles, the second callback takes the existing branch above and awaits the raw #resumeTrackedAgent promise. If the broker rejects with agent ... already exists, only the creator reaches this new isAgentAlreadyExistsError catch; any waiter still receives the rejection and the outer catch records a hard [factory] error, so replayed/concurrent exit delivery can still surface the 500 that this change is meant to suppress. Store a wrapped in-flight promise that swallows/counts this collision, or apply the same collision handling to waiters.

Useful? React with 👍 / 👎.

// The broker never released this agent's name on exit
// (relay#1116-family), so re-registering collides with the stuck
// name. The error is marked retryable but isn't — retrying just
// re-collides forever. Treat it as terminal for this name: record
// the resume key so subsequent exit events short-circuit, count it,
// and warn once instead of spamming a 500 stack trace. The external
// reaper / a broker restart reclaims the leaked name.
this.#resumedExitKeys.add(resumeKey)
this.#increment('resumeNameCollisions')
this.#logger.warn?.('[factory] resume skipped: broker still holds agent name (relay#1116); not retrying', {
issue: record.issue.key,
name,
})
} else {
throw error
}
} finally {
this.#resumeInFlight.delete(resumeKey)
}
Expand Down Expand Up @@ -2912,6 +2930,17 @@ const labelName = (value: unknown): string | undefined => {
const isCompletionReason = (reason?: string): boolean =>
reason === 'issue-done' || reason === 'done' || reason === 'completed'

// The broker rejects re-registering a name it never released on exit
// (relay#1116-family) with a 500 "agent '<name>' already exists". Detect it from
// the structured payload or the message so resume can treat it as terminal
// rather than retrying the (falsely) "retryable" error forever.
const isAgentAlreadyExistsError = (error: unknown): boolean => {
const record = asRecord(error)
const data = asRecord(record?.data)
const message = stringValue(data?.error) ?? (error instanceof Error ? error.message : '')
return /already exists/iu.test(message)
}
Comment on lines +2937 to +2942

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of isAgentAlreadyExistsError only checks error.message if error is an instance of Error. However, in JavaScript/TypeScript, errors can sometimes be thrown as plain objects, strings, or other custom shapes (especially when serialized/deserialized over the network or from API clients).

We can make this check significantly more robust and consistent by leveraging the existing describeError helper, which already handles Error instances, strings, and serializes plain objects to JSON.

Suggested change
const isAgentAlreadyExistsError = (error: unknown): boolean => {
const record = asRecord(error)
const data = asRecord(record?.data)
const message = stringValue(data?.error) ?? (error instanceof Error ? error.message : '')
return /already exists/iu.test(message)
}
const isAgentAlreadyExistsError = (error: unknown): boolean => {
const record = asRecord(error)
const data = asRecord(record?.data)
const message = stringValue(data?.error) ?? describeError(error).errorMessage
return /already exists/iu.test(message)
}


const defaultRestartPolicy = (spec: AgentSpec): AgentSpec['restartPolicy'] | undefined =>
spec.role === 'implementer' ? { maxRestarts: 3, strategy: 'resume' } as AgentSpec['restartPolicy'] : spec.restartPolicy

Expand Down
Loading