diff --git a/src/orchestrator/factory.test.ts b/src/orchestrator/factory.test.ts index f27f152..4a8470e 100644 --- a/src/orchestrator/factory.test.ts +++ b/src/orchestrator/factory.test.ts @@ -6843,6 +6843,37 @@ describe('FactoryLoop', () => { expect(factory.status().counters.errors).toBeUndefined() }) + it('replays an existing human Slack answer when a triage escalation watcher starts', async () => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(32)]: issueFile(32) }) + const replyPath = slackReplyFixturePath('C0FACTORY__factory-e2e', mount.threadTs, 'human-already-answered-32') + emitSlackReply(mount, replyPath, 'slack-human-already-answered-32', { + text: 'The cloud deploy path should auto-join the configured Slack channel before asking follow-up questions there.', + user: 'U123', + user_is_bot: false, + }) + const fleet = new FakeFleetClient() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new SlackStillEscalatingTriage({ rationale: 'Matched repository from Linear label.' }), + }) + + const result = await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(32), issueFile(32)))) + + expect(result.agents).toEqual([ + { name: 'ar-32-impl-pear', role: 'implementer' }, + { name: 'ar-32-review', role: 'reviewer' }, + ]) + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-32-impl-pear', 'ar-32-review']) + expect(slackAnswerInputs(fleet)).toEqual([ + { name: 'ar-32-impl-pear', data: '\nHuman reply in the Slack thread:\nThe cloud deploy path should auto-join the configured Slack channel before asking follow-up questions there.\n\r' }, + ]) + expect(factory.status().counters.slackTriageAnswersReplayed).toBe(1) + expect(factory.status().counters.slackTriageAnswersDispatchedWithRemainingEscalation).toBe(1) + expect(factory.status().counters.slackTriageAnswersInjectedToAgents).toBe(1) + expect(factory.status().counters.errors).toBeUndefined() + }) + it('ignores a human Slack thread reply after the issue has no in-flight implementer', async () => { const mount = new CloudWritebackFakeMountClient({ [issuePath(21)]: issueFile(21) }) const fleet = new FakeFleetClient() diff --git a/src/orchestrator/factory.ts b/src/orchestrator/factory.ts index bfd829b..de8af0a 100644 --- a/src/orchestrator/factory.ts +++ b/src/orchestrator/factory.ts @@ -193,7 +193,7 @@ export class FactoryLoop implements Factory { readonly #counters: Record = {} readonly #resumeInFlight = new Map>() readonly #slackWatchers = new Map() - readonly #slackWatcherStarts = new Map>() + readonly #slackWatcherStarts = new Map>() #resolvedSlackChannelDir?: string #slackChannelDirRefresh?: Promise // Agents we've already logged an ambiguous-PID-lookup warning for, so the @@ -1317,9 +1317,9 @@ export class FactoryLoop implements Factory { const escalationReason = triageEscalationReason(decision) if (escalationReason) { - await this.#escalateTriageToSlack(decision, escalationReason, dryRun) + const replayedResult = await this.#escalateTriageToSlack(decision, escalationReason, dryRun) this.#recordTriageEscalation(decision, escalationReason) - return { issue: decision.issue, agents: [], dryRun } + return replayedResult ?? { issue: decision.issue, agents: [], dryRun } } // TODO(AR-274 follow-up): short-circuit LLM triage once label-derived @@ -3346,9 +3346,12 @@ export class FactoryLoop implements Factory { const key = issueKey(record.issue) const existingThread = await this.#state.getSlackThread(this.#workspaceId, key) - if (existingThread || this.#slackWatcherStarts.has(key)) { + const watcherStart = this.#slackWatcherStarts.get(key) + if (existingThread || watcherStart) { try { - await this.#slackWatcherStarts.get(key) + if (watcherStart && !this.#slackWatchers.has(key)) { + await watcherStart + } } catch { // The initiator logs Slack watcher startup failures. } @@ -3392,7 +3395,7 @@ export class FactoryLoop implements Factory { this.#recordSlackWritebackSuccess('dispatch-thread') } - async #escalateTriageToSlack(decision: TriageDecision, reason: string, dryRun: boolean): Promise { + async #escalateTriageToSlack(decision: TriageDecision, reason: string, dryRun: boolean): Promise { if (dryRun) { return } @@ -3411,9 +3414,12 @@ export class FactoryLoop implements Factory { const key = issueKey(decision.issue) const existingThread = await this.#state.getSlackThread(this.#workspaceId, key) - if (existingThread || this.#slackWatcherStarts.has(key)) { + const watcherStart = this.#slackWatcherStarts.get(key) + if (existingThread || watcherStart) { try { - await this.#slackWatcherStarts.get(key) + if (watcherStart && !this.#slackWatchers.has(key)) { + await watcherStart + } } catch { // The initiator logs Slack watcher startup failures. } @@ -3429,7 +3435,7 @@ export class FactoryLoop implements Factory { const start = this.#postAndWatchSlackEscalationThread(decision, reason) this.#slackWatcherStarts.set(key, start) try { - await start + return await start } catch (error) { this.#markSlackWritebackFailure('triage-escalation', error) this.#logger.warn?.(`[factory] failed to establish Slack escalation thread for ${decision.issue.key}`, error) @@ -3438,7 +3444,7 @@ export class FactoryLoop implements Factory { } } - async #postAndWatchSlackEscalationThread(decision: TriageDecision, reason: string): Promise { + async #postAndWatchSlackEscalationThread(decision: TriageDecision, reason: string): Promise { if (!this.#slack || !this.#config.slack) { return } @@ -3453,11 +3459,12 @@ export class FactoryLoop implements Factory { ].join('\n'), }) await this.#state.setSlackThread(this.#workspaceId, issueKey(decision.issue), root.threadId) - await this.#watchSlackThread(escalationWatchRecord(decision), root.threadId) + const replayedResult = await this.#watchSlackThread(escalationWatchRecord(decision), root.threadId) this.#recordSlackWritebackSuccess('triage-escalation') + return replayedResult } - async #watchSlackThread(record: InFlightIssue, threadId: string): Promise { + async #watchSlackThread(record: InFlightIssue, threadId: string): Promise { if (!this.#config.slack) { return } @@ -3470,6 +3477,7 @@ export class FactoryLoop implements Factory { const channelDir = await this.#slackChannelDir() ?? this.#config.slack.channel const messagesPrefix = slackChannelMessagesPrefix(channelDir) const preExistingPaths = new Set() + const preExistingPathOrder: string[] = [] const seenReplies = new Set() const seenReplyMessages = new Set() let missingIdentityLogged = false @@ -3485,6 +3493,7 @@ export class FactoryLoop implements Factory { const path = changeEventPath(event) if (path && path.startsWith(messagesPrefix)) { preExistingPaths.add(path) + preExistingPathOrder.push(path) } } } catch (error) { @@ -3586,6 +3595,40 @@ export class FactoryLoop implements Factory { await this.#boundedStopTeardown('Slack reply subscription unsubscribe', () => subscription?.unsubscribe()) }, }) + + return await this.#replayLatestSlackTriageAnswer(record, threadId, channelDir, preExistingPathOrder) + } + + async #replayLatestSlackTriageAnswer( + record: InFlightIssue, + threadId: string, + channelDir: string, + preExistingPaths: readonly string[], + ): Promise { + if (!isTriageEscalationWatchRecord(record)) { + return + } + + let latest: SlackReply | undefined + for (const path of preExistingPaths) { + const reply = await this.#readSlackReply(path) + if ( + reply?.isThreadReply && + reply.threadTs === threadId && + reply.channelDir === channelDir && + !reply.isBot && + reply.text.trim() + ) { + latest = reply + } + } + + if (!latest) { + return + } + + this.#increment('slackTriageAnswersReplayed') + return await this.#routeSlackAnswerToImplementers(record, latest) } // Re-attach a live reply watcher to a dispatch/escalation thread that already @@ -3653,7 +3696,7 @@ export class FactoryLoop implements Factory { } } - async #routeSlackAnswerToImplementers(record: InFlightIssue, reply: SlackReply): Promise { + async #routeSlackAnswerToImplementers(record: InFlightIssue, reply: SlackReply): Promise { if (!this.#config.slack) { return } @@ -3667,8 +3710,7 @@ export class FactoryLoop implements Factory { const liveRecord = (await this.#batch()).getIssue(record.issue) if (!liveRecord || liveRecord.dryRun) { if (isTriageEscalationWatchRecord(record)) { - await this.#handleTriageEscalationSlackAnswer(record, text) - return + return await this.#handleTriageEscalationSlackAnswer(record, text) } this.#increment('slackAnswersIgnoredNoInFlight') return @@ -3696,7 +3738,7 @@ export class FactoryLoop implements Factory { } } - async #handleTriageEscalationSlackAnswer(record: InFlightIssue, text: string): Promise { + async #handleTriageEscalationSlackAnswer(record: InFlightIssue, text: string): Promise { const issue = await this.#readIssue(record.issue.path) if (!issue || !isInFactoryScope(issue, this.#config.safety) || !isRealLinearIssue(issue)) { this.#increment('slackTriageAnswersIgnoredIssueUnavailable') @@ -3726,9 +3768,9 @@ export class FactoryLoop implements Factory { if (escalationReason) { if (hasDispatchableRoute(decision)) { this.#pendingSlackClarifications.set(issueKey(decision.issue), text) - await this.#startOrQueueSlackClarifiedDecision(dispatchAfterSlackClarification(decision, escalationReason)) + const result = await this.#startOrQueueSlackClarifiedDecision(dispatchAfterSlackClarification(decision, escalationReason)) this.#increment('slackTriageAnswersDispatchedWithRemainingEscalation') - return + return result } this.#increment('slackTriageAnswersStillEscalated') this.#logger.warn?.('[factory] Slack triage answer still leaves issue escalated', { @@ -3739,15 +3781,15 @@ export class FactoryLoop implements Factory { } this.#pendingSlackClarifications.set(issueKey(decision.issue), text) - await this.#startOrQueueSlackClarifiedDecision(decision) + const result = await this.#startOrQueueSlackClarifiedDecision(decision) this.#increment('slackTriageAnswersDispatched') + return result } - async #startOrQueueSlackClarifiedDecision(decision: TriageDecision): Promise { + async #startOrQueueSlackClarifiedDecision(decision: TriageDecision): Promise { const batch = await this.#batch() if (batch.canStart()) { - await this.dispatch(decision, { dryRun: this.#config.dryRun }) - return + return await this.dispatch(decision, { dryRun: this.#config.dryRun }) } if (batch.queue(decision, this.#config.dryRun)) {