diff --git a/src/orchestrator/factory.test.ts b/src/orchestrator/factory.test.ts index 4a8470e..48d258e 100644 --- a/src/orchestrator/factory.test.ts +++ b/src/orchestrator/factory.test.ts @@ -3809,6 +3809,29 @@ describe('FactoryLoop', () => { await factory.stop() }) + it('dedupes concurrent dispatches for the same Linear key across mirror paths', async () => { + const canonicalPath = issuePath(190) + const aliasPath = '/linear/issues/by-id/AR-190.json' + const issue = realIssueFile(190) + const mount = new DelayedIssueReadMount({ + [canonicalPath]: issue, + [aliasPath]: issue, + }, 25) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + const canonicalDecision = await factory.triageIssue(parseLinearIssue(canonicalPath, issue)) + const aliasDecision = await factory.triageIssue(parseLinearIssue(aliasPath, issue)) + + await Promise.all([ + factory.dispatch(canonicalDecision), + factory.dispatch(aliasDecision), + ]) + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-190-impl-pear', 'ar-190-review']) + expect(factory.status().counters.dispatchDuplicateSuppressed).toBe(1) + expect(factory.status().inFlight.map((inFlightIssue) => inFlightIssue.key)).toEqual(['AR-190']) + }) + it('live subscription dispatches a newly-arrived in-scope ready issue from subscribe events', async () => { const path = issuePath(25) const mount = new FakeMountClient() diff --git a/src/orchestrator/factory.ts b/src/orchestrator/factory.ts index de8af0a..099d874 100644 --- a/src/orchestrator/factory.ts +++ b/src/orchestrator/factory.ts @@ -192,6 +192,7 @@ export class FactoryLoop implements Factory { readonly #listeners = new Map>() readonly #counters: Record = {} readonly #resumeInFlight = new Map>() + readonly #dispatchInFlight = new Map>() readonly #slackWatchers = new Map() readonly #slackWatcherStarts = new Map>() #resolvedSlackChannelDir?: string @@ -1288,6 +1289,27 @@ export class FactoryLoop implements Factory { } async dispatch(decision: TriageDecision, opts: { dryRun?: boolean } = {}): Promise { + const dryRun = opts.dryRun ?? this.#config.dryRun + const phase = triageEscalationReason(decision) ? 'escalation' : 'dispatch' + const key = `${decision.issue.key}:${dryRun ? 'dry-run' : 'live'}:${phase}` + const inFlight = this.#dispatchInFlight.get(key) + if (inFlight) { + this.#increment('dispatchDuplicateSuppressed') + return inFlight + } + + const dispatched = this.#dispatchUnlocked(decision, opts) + this.#dispatchInFlight.set(key, dispatched) + try { + return await dispatched + } finally { + if (this.#dispatchInFlight.get(key) === dispatched) { + this.#dispatchInFlight.delete(key) + } + } + } + + async #dispatchUnlocked(decision: TriageDecision, opts: { dryRun?: boolean } = {}): Promise { const dryRun = opts.dryRun ?? this.#config.dryRun const batch = await this.#batch() const existingRecord = batch.getIssue(decision.issue)