Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/orchestrator/factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6843,6 +6843,37 @@ describe('FactoryLoop', () => {
expect(factory.status().counters.errors).toBeUndefined()
})

it('replays an existing human Slack answer when a triage escalation watcher starts', async () => {
const mount = new CloudWritebackFakeMountClient({ [issuePath(32)]: issueFile(32) })
const replyPath = slackReplyFixturePath('C0FACTORY__factory-e2e', mount.threadTs, 'human-already-answered-32')
emitSlackReply(mount, replyPath, 'slack-human-already-answered-32', {
text: 'The cloud deploy path should auto-join the configured Slack channel before asking follow-up questions there.',
user: 'U123',
user_is_bot: false,
})
const fleet = new FakeFleetClient()
const factory = createFactory(config({ slack: slackConfig() }), {
mount,
fleet,
triage: new SlackStillEscalatingTriage({ rationale: 'Matched repository from Linear label.' }),
})

const result = await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(32), issueFile(32))))

expect(result.agents).toEqual([
{ name: 'ar-32-impl-pear', role: 'implementer' },
{ name: 'ar-32-review', role: 'reviewer' },
])
expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-32-impl-pear', 'ar-32-review'])
expect(slackAnswerInputs(fleet)).toEqual([
{ name: 'ar-32-impl-pear', data: '<integration-event source="slack" issue="AR-32">\nHuman reply in the Slack thread:\nThe cloud deploy path should auto-join the configured Slack channel before asking follow-up questions there.\n</integration-event>\r' },
])
expect(factory.status().counters.slackTriageAnswersReplayed).toBe(1)
expect(factory.status().counters.slackTriageAnswersDispatchedWithRemainingEscalation).toBe(1)
expect(factory.status().counters.slackTriageAnswersInjectedToAgents).toBe(1)
expect(factory.status().counters.errors).toBeUndefined()
})

it('ignores a human Slack thread reply after the issue has no in-flight implementer', async () => {
const mount = new CloudWritebackFakeMountClient({ [issuePath(21)]: issueFile(21) })
const fleet = new FakeFleetClient()
Expand Down
86 changes: 64 additions & 22 deletions src/orchestrator/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export class FactoryLoop implements Factory {
readonly #counters: Record<string, number> = {}
readonly #resumeInFlight = new Map<string, Promise<void>>()
readonly #slackWatchers = new Map<string, SlackThreadWatcher>()
readonly #slackWatcherStarts = new Map<string, Promise<void>>()
readonly #slackWatcherStarts = new Map<string, Promise<unknown>>()
#resolvedSlackChannelDir?: string
#slackChannelDirRefresh?: Promise<string | undefined>
// Agents we've already logged an ambiguous-PID-lookup warning for, so the
Expand Down Expand Up @@ -1317,9 +1317,9 @@ export class FactoryLoop implements Factory {

const escalationReason = triageEscalationReason(decision)
if (escalationReason) {
await this.#escalateTriageToSlack(decision, escalationReason, dryRun)
const replayedResult = await this.#escalateTriageToSlack(decision, escalationReason, dryRun)
this.#recordTriageEscalation(decision, escalationReason)
return { issue: decision.issue, agents: [], dryRun }
return replayedResult ?? { issue: decision.issue, agents: [], dryRun }
}

// TODO(AR-274 follow-up): short-circuit LLM triage once label-derived
Expand Down Expand Up @@ -3346,9 +3346,12 @@ export class FactoryLoop implements Factory {

const key = issueKey(record.issue)
const existingThread = await this.#state.getSlackThread(this.#workspaceId, key)
if (existingThread || this.#slackWatcherStarts.has(key)) {
const watcherStart = this.#slackWatcherStarts.get(key)
if (existingThread || watcherStart) {
try {
await this.#slackWatcherStarts.get(key)
if (watcherStart && !this.#slackWatchers.has(key)) {
await watcherStart
}
} catch {
// The initiator logs Slack watcher startup failures.
}
Expand Down Expand Up @@ -3392,7 +3395,7 @@ export class FactoryLoop implements Factory {
this.#recordSlackWritebackSuccess('dispatch-thread')
}

async #escalateTriageToSlack(decision: TriageDecision, reason: string, dryRun: boolean): Promise<void> {
async #escalateTriageToSlack(decision: TriageDecision, reason: string, dryRun: boolean): Promise<DispatchResult | undefined> {
if (dryRun) {
return
}
Expand All @@ -3411,9 +3414,12 @@ export class FactoryLoop implements Factory {

const key = issueKey(decision.issue)
const existingThread = await this.#state.getSlackThread(this.#workspaceId, key)
if (existingThread || this.#slackWatcherStarts.has(key)) {
const watcherStart = this.#slackWatcherStarts.get(key)
if (existingThread || watcherStart) {
try {
await this.#slackWatcherStarts.get(key)
if (watcherStart && !this.#slackWatchers.has(key)) {
await watcherStart
}
} catch {
// The initiator logs Slack watcher startup failures.
}
Expand All @@ -3429,7 +3435,7 @@ export class FactoryLoop implements Factory {
const start = this.#postAndWatchSlackEscalationThread(decision, reason)
this.#slackWatcherStarts.set(key, start)
try {
await start
return await start
} catch (error) {
this.#markSlackWritebackFailure('triage-escalation', error)
this.#logger.warn?.(`[factory] failed to establish Slack escalation thread for ${decision.issue.key}`, error)
Expand All @@ -3438,7 +3444,7 @@ export class FactoryLoop implements Factory {
}
}

async #postAndWatchSlackEscalationThread(decision: TriageDecision, reason: string): Promise<void> {
async #postAndWatchSlackEscalationThread(decision: TriageDecision, reason: string): Promise<DispatchResult | undefined> {
if (!this.#slack || !this.#config.slack) {
return
}
Expand All @@ -3453,11 +3459,12 @@ export class FactoryLoop implements Factory {
].join('\n'),
})
await this.#state.setSlackThread(this.#workspaceId, issueKey(decision.issue), root.threadId)
await this.#watchSlackThread(escalationWatchRecord(decision), root.threadId)
const replayedResult = await this.#watchSlackThread(escalationWatchRecord(decision), root.threadId)
this.#recordSlackWritebackSuccess('triage-escalation')
return replayedResult
}

async #watchSlackThread(record: InFlightIssue, threadId: string): Promise<void> {
async #watchSlackThread(record: InFlightIssue, threadId: string): Promise<DispatchResult | undefined> {
if (!this.#config.slack) {
return
}
Expand All @@ -3470,6 +3477,7 @@ export class FactoryLoop implements Factory {
const channelDir = await this.#slackChannelDir() ?? this.#config.slack.channel
const messagesPrefix = slackChannelMessagesPrefix(channelDir)
const preExistingPaths = new Set<string>()
const preExistingPathOrder: string[] = []
const seenReplies = new Set<string>()
const seenReplyMessages = new Set<string>()
let missingIdentityLogged = false
Expand All @@ -3485,6 +3493,7 @@ export class FactoryLoop implements Factory {
const path = changeEventPath(event)
if (path && path.startsWith(messagesPrefix)) {
preExistingPaths.add(path)
preExistingPathOrder.push(path)
}
}
} catch (error) {
Expand Down Expand Up @@ -3586,6 +3595,40 @@ export class FactoryLoop implements Factory {
await this.#boundedStopTeardown('Slack reply subscription unsubscribe', () => subscription?.unsubscribe())
},
})

return await this.#replayLatestSlackTriageAnswer(record, threadId, channelDir, preExistingPathOrder)
}

async #replayLatestSlackTriageAnswer(
record: InFlightIssue,
threadId: string,
channelDir: string,
preExistingPaths: readonly string[],
): Promise<DispatchResult | undefined> {
if (!isTriageEscalationWatchRecord(record)) {
return
}

let latest: SlackReply | undefined
for (const path of preExistingPaths) {
const reply = await this.#readSlackReply(path)
if (
reply?.isThreadReply &&
reply.threadTs === threadId &&
reply.channelDir === channelDir &&
!reply.isBot &&
reply.text.trim()
) {
latest = reply
}
}

if (!latest) {
return
}

this.#increment('slackTriageAnswersReplayed')
return await this.#routeSlackAnswerToImplementers(record, latest)
}

// Re-attach a live reply watcher to a dispatch/escalation thread that already
Expand Down Expand Up @@ -3653,7 +3696,7 @@ export class FactoryLoop implements Factory {
}
}

async #routeSlackAnswerToImplementers(record: InFlightIssue, reply: SlackReply): Promise<void> {
async #routeSlackAnswerToImplementers(record: InFlightIssue, reply: SlackReply): Promise<DispatchResult | undefined> {
if (!this.#config.slack) {
return
}
Expand All @@ -3667,8 +3710,7 @@ export class FactoryLoop implements Factory {
const liveRecord = (await this.#batch()).getIssue(record.issue)
if (!liveRecord || liveRecord.dryRun) {
if (isTriageEscalationWatchRecord(record)) {
await this.#handleTriageEscalationSlackAnswer(record, text)
return
return await this.#handleTriageEscalationSlackAnswer(record, text)
}
this.#increment('slackAnswersIgnoredNoInFlight')
return
Expand Down Expand Up @@ -3696,7 +3738,7 @@ export class FactoryLoop implements Factory {
}
}

async #handleTriageEscalationSlackAnswer(record: InFlightIssue, text: string): Promise<void> {
async #handleTriageEscalationSlackAnswer(record: InFlightIssue, text: string): Promise<DispatchResult | undefined> {
const issue = await this.#readIssue(record.issue.path)
if (!issue || !isInFactoryScope(issue, this.#config.safety) || !isRealLinearIssue(issue)) {
this.#increment('slackTriageAnswersIgnoredIssueUnavailable')
Expand Down Expand Up @@ -3726,9 +3768,9 @@ export class FactoryLoop implements Factory {
if (escalationReason) {
if (hasDispatchableRoute(decision)) {
this.#pendingSlackClarifications.set(issueKey(decision.issue), text)
await this.#startOrQueueSlackClarifiedDecision(dispatchAfterSlackClarification(decision, escalationReason))
const result = await this.#startOrQueueSlackClarifiedDecision(dispatchAfterSlackClarification(decision, escalationReason))
this.#increment('slackTriageAnswersDispatchedWithRemainingEscalation')
return
return result
}
this.#increment('slackTriageAnswersStillEscalated')
this.#logger.warn?.('[factory] Slack triage answer still leaves issue escalated', {
Expand All @@ -3739,15 +3781,15 @@ export class FactoryLoop implements Factory {
}

this.#pendingSlackClarifications.set(issueKey(decision.issue), text)
await this.#startOrQueueSlackClarifiedDecision(decision)
const result = await this.#startOrQueueSlackClarifiedDecision(decision)
this.#increment('slackTriageAnswersDispatched')
return result
}

async #startOrQueueSlackClarifiedDecision(decision: TriageDecision): Promise<void> {
async #startOrQueueSlackClarifiedDecision(decision: TriageDecision): Promise<DispatchResult | undefined> {
const batch = await this.#batch()
if (batch.canStart()) {
await this.dispatch(decision, { dryRun: this.#config.dryRun })
return
return await this.dispatch(decision, { dryRun: this.#config.dryRun })
}

if (batch.queue(decision, this.#config.dryRun)) {
Expand Down