Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/orchestrator/factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3809,6 +3809,29 @@ describe('FactoryLoop', () => {
await factory.stop()
})

it('dedupes concurrent dispatches for the same Linear key across mirror paths', async () => {
const canonicalPath = issuePath(190)
const aliasPath = '/linear/issues/by-id/AR-190.json'
const issue = realIssueFile(190)
const mount = new DelayedIssueReadMount({
[canonicalPath]: issue,
[aliasPath]: issue,
}, 25)
const fleet = new FakeFleetClient()
const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() })
const canonicalDecision = await factory.triageIssue(parseLinearIssue(canonicalPath, issue))
const aliasDecision = await factory.triageIssue(parseLinearIssue(aliasPath, issue))

await Promise.all([
factory.dispatch(canonicalDecision),
factory.dispatch(aliasDecision),
])

expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-190-impl-pear', 'ar-190-review'])
expect(factory.status().counters.dispatchDuplicateSuppressed).toBe(1)
expect(factory.status().inFlight.map((inFlightIssue) => inFlightIssue.key)).toEqual(['AR-190'])
})

it('live subscription dispatches a newly-arrived in-scope ready issue from subscribe events', async () => {
const path = issuePath(25)
const mount = new FakeMountClient()
Expand Down
22 changes: 22 additions & 0 deletions src/orchestrator/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ export class FactoryLoop implements Factory {
readonly #listeners = new Map<FactoryEvent, Set<Listener>>()
readonly #counters: Record<string, number> = {}
readonly #resumeInFlight = new Map<string, Promise<void>>()
readonly #dispatchInFlight = new Map<string, Promise<DispatchResult>>()
readonly #slackWatchers = new Map<string, SlackThreadWatcher>()
readonly #slackWatcherStarts = new Map<string, Promise<unknown>>()
#resolvedSlackChannelDir?: string
Expand Down Expand Up @@ -1288,6 +1289,27 @@ export class FactoryLoop implements Factory {
}

async dispatch(decision: TriageDecision, opts: { dryRun?: boolean } = {}): Promise<DispatchResult> {
const dryRun = opts.dryRun ?? this.#config.dryRun
const phase = triageEscalationReason(decision) ? 'escalation' : 'dispatch'
const key = `${decision.issue.key}:${dryRun ? 'dry-run' : 'live'}:${phase}`
const inFlight = this.#dispatchInFlight.get(key)
if (inFlight) {
this.#increment('dispatchDuplicateSuppressed')
return inFlight
}

const dispatched = this.#dispatchUnlocked(decision, opts)
this.#dispatchInFlight.set(key, dispatched)
try {
return await dispatched
} finally {
if (this.#dispatchInFlight.get(key) === dispatched) {
this.#dispatchInFlight.delete(key)
}
}
}
Comment on lines 1291 to +1310

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If dispatch is called concurrently with different dryRun options (for example, one real run and one dry run), they will share the same in-flight promise because the cache key only uses the issue key. This can lead to a real dispatch being suppressed by a dry-run dispatch, preventing agents from actually being spawned.

To resolve this, calculate the effective dryRun value and include it in the #dispatchInFlight map key.

Suggested change
async dispatch(decision: TriageDecision, opts: { dryRun?: boolean } = {}): Promise<DispatchResult> {
const key = issueKey(decision.issue)
const inFlight = this.#dispatchInFlight.get(key)
if (inFlight) {
this.#increment('dispatchDuplicateSuppressed')
return inFlight
}
const dispatched = this.#dispatchUnlocked(decision, opts)
this.#dispatchInFlight.set(key, dispatched)
try {
return await dispatched
} finally {
if (this.#dispatchInFlight.get(key) === dispatched) {
this.#dispatchInFlight.delete(key)
}
}
}
async dispatch(decision: TriageDecision, opts: { dryRun?: boolean } = {}): Promise<DispatchResult> {
const dryRun = opts.dryRun ?? this.#config.dryRun
const key = issueKey(decision.issue) + ':' + dryRun
const inFlight = this.#dispatchInFlight.get(key)
if (inFlight) {
this.#increment('dispatchDuplicateSuppressed')
return inFlight
}
const dispatched = this.#dispatchUnlocked(decision, opts)
this.#dispatchInFlight.set(key, dispatched)
try {
return await dispatched
} finally {
if (this.#dispatchInFlight.get(key) === dispatched) {
this.#dispatchInFlight.delete(key)
}
}
}


async #dispatchUnlocked(decision: TriageDecision, opts: { dryRun?: boolean } = {}): Promise<DispatchResult> {
const dryRun = opts.dryRun ?? this.#config.dryRun
const batch = await this.#batch()
const existingRecord = batch.getIssue(decision.issue)
Expand Down