Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ Slack alert via `FACTORY_CANARY_SLACK_WEBHOOK` on failure. See
`scripts/com.agentrelay.factory-canary.plist.example` for an every-6h launchd
template.

### Slack questions

Set `slack.channel` to the Slack channel name, channel ID, or mounted channel
directory. For example, `factory`, `C1234567890`, and
`C1234567890__factory` are all accepted when the channel is present under the
relayfile Slack mount.

## Tell it what to work on

Two ways to hand the factory an issue — both are just labeling/titling, nothing
Expand Down
33 changes: 33 additions & 0 deletions src/orchestrator/factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6875,6 +6875,39 @@ describe('FactoryLoop', () => {
expect(mount.confirmedPaths.filter((path) => path.includes('/replies/'))).toEqual([])
})

it('resolves a configured Slack channel name to the mounted channel directory', async () => {
const channelDir = 'C0FACTORY__factory-e2e'
const mount = new ConfirmRecordingSlackMountClient({
[issuePath(35)]: issueFile(35),
[`/slack/channels/${channelDir}/meta.json`]: {},
})
const fleet = new FakeFleetClient()
const factory = createFactory(config({ slack: slackConfig('factory-e2e') }), {
mount,
fleet,
triage: new StaticTriage(),
})

await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(35), issueFile(35))))

const slackRoots = mount.writes.filter((write) => isSlackRootWritePath(write.path))
expect(slackRoots).toHaveLength(1)
expect(slackRoots[0]?.path).toMatch(new RegExp(`^/slack/channels/${channelDir}/messages/`))
expect(fleet.messages[0]?.text).toContain(`/slack/channels/${channelDir}/messages/${mount.threadTs.replace(/\./g, '_')}/replies/question.json`)

emitSlackReply(mount, slackReplyFixturePath(channelDir, mount.threadTs, 'human-answer-35'), 'human-answer-35', {
text: 'Use the mounted channel directory.',
user: 'U123',
user_is_bot: false,
})
await flush()
await flush()

expect(slackAnswerInputs(fleet)).toEqual([
{ name: 'ar-35-impl-pear', data: '<integration-event source="slack" issue="AR-35">\nHuman reply in the Slack thread:\nUse the mounted channel directory.\n</integration-event>\r' },
])
})

it('routes a mid-task agent question to the Slack dispatch thread and returns the human answer via sendInput', async () => {
const mount = new ConfirmRecordingSlackMountClient({ [issuePath(36)]: issueFile(36) })
const fleet = new FakeFleetClient()
Expand Down
132 changes: 106 additions & 26 deletions src/orchestrator/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import type {
TriageDecision,
TriageEngine,
} from '../types'
import { MountGithubRead, MountLinearWriteback, MountSlackWriteback } from '../writeback'
import { MountGithubRead, MountLinearWriteback, MountSlackWriteback, slackChannelAliases, slackChannelSegment } from '../writeback'
import { asRecord, parseJsonContent, stableHash, wrappedPayload } from '../writeback/shared'
import { type InFlightIssue, issueKey, type TrackedAgent } from './batch-tracker'
import { findAgentProcessByName, readProcessIdentity, type AgentProcessFinder } from './process-identity'
Expand Down Expand Up @@ -194,6 +194,8 @@ export class FactoryLoop implements Factory {
readonly #resumeInFlight = new Map<string, Promise<void>>()
readonly #slackWatchers = new Map<string, SlackThreadWatcher>()
readonly #slackWatcherStarts = new Map<string, Promise<void>>()
#resolvedSlackChannelDir?: string
#slackChannelDirRefresh?: Promise<string | undefined>
// Agents we've already logged an ambiguous-PID-lookup warning for, so the
// reaper doesn't spam the same benign "ambiguous process lookup" line on every
// poll (a joined/cloud agent has no local PID to resolve — expected).
Expand Down Expand Up @@ -2833,12 +2835,15 @@ export class FactoryLoop implements Factory {

if (this.#slack && this.#config.slack && !await this.#shouldSkipSlackWriteback('merge-done-thread')) {
try {
const root = await this.#slack.postThread({
channel: this.#config.slack.channel,
text: `${issue.key}: PR merged; Linear state set to Done.`,
})
await this.#slack.reply(root.threadId, `${issue.key}: Linear state set to Done.`)
this.#recordSlackWritebackSuccess('merge-done-thread')
const channel = await this.#slackChannelDir()
if (channel) {
const root = await this.#slack.postThread({
channel,
text: `${issue.key}: PR merged; Linear state set to Done.`,
})
await this.#slack.reply(root.threadId, `${issue.key}: Linear state set to Done.`)
this.#recordSlackWritebackSuccess('merge-done-thread')
}
} catch (error) {
this.#markSlackWritebackFailure('merge-done-thread', error)
}
Expand Down Expand Up @@ -3084,21 +3089,24 @@ export class FactoryLoop implements Factory {

if (this.#slack && this.#config.slack && !await this.#shouldSkipSlackWriteback('completion-thread')) {
try {
const merged = opts.completionReason === 'pr-merged'
const completionText = merged
? `${record.issue.key}: PR merged; Linear state set to ${statusLabel}.`
: `${record.issue.key}: factory agents completed${humanReview ? '; awaiting human review' : ''}.\nStatus: ${statusLabel}\nMerge policy: ${this.#config.mergePolicy}`
const stateText = merged
? `${record.issue.key}: PR merged; Linear state set to ${statusLabel}.`
: humanReview
? `${record.issue.key}: awaiting human review; Linear state set to ${statusLabel}.`
: `${record.issue.key}: Linear state set to ${statusLabel}.`
const root = await this.#slack.postThread({
channel: this.#config.slack.channel,
text: completionText,
})
await this.#slack.reply(root.threadId, stateText)
this.#recordSlackWritebackSuccess('completion-thread')
const channel = await this.#slackChannelDir()
if (channel) {
const merged = opts.completionReason === 'pr-merged'
const completionText = merged
? `${record.issue.key}: PR merged; Linear state set to ${statusLabel}.`
: `${record.issue.key}: factory agents completed${humanReview ? '; awaiting human review' : ''}.\nStatus: ${statusLabel}\nMerge policy: ${this.#config.mergePolicy}`
const stateText = merged
? `${record.issue.key}: PR merged; Linear state set to ${statusLabel}.`
: humanReview
? `${record.issue.key}: awaiting human review; Linear state set to ${statusLabel}.`
: `${record.issue.key}: Linear state set to ${statusLabel}.`
const root = await this.#slack.postThread({
channel,
text: completionText,
})
await this.#slack.reply(root.threadId, stateText)
this.#recordSlackWritebackSuccess('completion-thread')
}
} catch (error) {
this.#markSlackWritebackFailure('completion-thread', error)
}
Expand Down Expand Up @@ -3362,7 +3370,7 @@ export class FactoryLoop implements Factory {
}

const root = await this.#slack.postThread({
channel: this.#config.slack.channel,
channel: await this.#slackChannelDir() ?? this.#config.slack.channel,
text: [
`${record.issue.key}: factory agents dispatched.`,
`State: ${result.stateId ?? 'dispatching'}`,
Expand Down Expand Up @@ -3427,7 +3435,7 @@ export class FactoryLoop implements Factory {

const issue = await this.#readIssue(decision.issue.path)
const root = await this.#slack.postThread({
channel: this.#config.slack.channel,
channel: await this.#slackChannelDir() ?? this.#config.slack.channel,
text: [
`${decision.issue.key}: factory triage escalation for ${issue?.title ?? decision.issue.key}`,
`Reason: ${reason}`,
Expand All @@ -3449,7 +3457,7 @@ export class FactoryLoop implements Factory {
return
}

const channelDir = this.#config.slack.channel
const channelDir = await this.#slackChannelDir() ?? this.#config.slack.channel
const messagesPrefix = slackChannelMessagesPrefix(channelDir)
const preExistingPaths = new Set<string>()
const seenReplies = new Set<string>()
Expand Down Expand Up @@ -3687,14 +3695,86 @@ export class FactoryLoop implements Factory {
return resolve(process.cwd(), '.integrations')
}

async #slackChannelDir(): Promise<string | undefined> {
if (!this.#config.slack) {
return undefined
}
if (this.#resolvedSlackChannelDir) {
return this.#resolvedSlackChannelDir
}
if (this.#slackChannelDirRefresh) {
return this.#slackChannelDirRefresh
}

this.#slackChannelDirRefresh = this.#resolveSlackChannelDir()
.finally(() => {
this.#slackChannelDirRefresh = undefined
})
return this.#slackChannelDirRefresh
}

async #resolveSlackChannelDir(): Promise<string | undefined> {
const configured = this.#config.slack?.channel.trim()
if (!configured) {
return undefined
}

const configuredSegment = slackChannelSegment(configured)
const configuredAliases = slackChannelAliases(configured)
if (configuredAliases.size === 0) {
return undefined
}

let paths: string[]
try {
paths = await this.#mount.listTree('/slack/channels')
} catch (error) {
this.#logger.warn?.('[factory] unable to resolve Slack channel from mount; using configured channel value', {
channel: configured,
error,
})
this.#resolvedSlackChannelDir = configuredSegment
return this.#resolvedSlackChannelDir
}

const channelDirs = [...new Set(paths
.map((path) => path.match(/^\/slack\/channels\/([^/]+)/u)?.[1])
.filter((channelDir): channelDir is string => Boolean(channelDir)))]
.sort()
const matches = channelDirs.filter((channelDir) => {
const aliases = slackChannelAliases(channelDir)
return [...aliases].some((alias) => configuredAliases.has(alias))
})

if (matches.length === 0) {
this.#logger.warn?.('[factory] Slack channel was not found in mount; using configured channel value', {
channel: configured,
})
this.#resolvedSlackChannelDir = configuredSegment
return this.#resolvedSlackChannelDir
}

const exact = matches.find((channelDir) => slackChannelSegment(channelDir).toLowerCase() === configuredSegment.toLowerCase())
this.#resolvedSlackChannelDir = exact ?? matches[0]
if (matches.length > 1 && !exact) {
this.#logger.warn?.('[factory] Slack channel name matched multiple mount directories; using first match', {
channel: configured,
selected: this.#resolvedSlackChannelDir,
matches,
})
}
return this.#resolvedSlackChannelDir
}

async #slackDispatchThreadFor(record: InFlightIssue): Promise<{ channel: string; threadId: string; mountRoot: string } | undefined> {
if (!this.#config.slack) {
return undefined
}

const threadId = await this.#state.getSlackThread(this.#workspaceId, issueKey(record.issue))
const channel = await this.#slackChannelDir() ?? this.#config.slack.channel
return threadId
? { channel: this.#config.slack.channel, threadId, mountRoot: this.#integrationsMountRoot() }
? { channel, threadId, mountRoot: this.#integrationsMountRoot() }
: undefined
}

Expand Down
2 changes: 2 additions & 0 deletions src/writeback/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export type {
} from './linear'
export {
MountSlackWriteback,
slackChannelAliases,
slackChannelSegment,
} from './slack'
export type {
MountSlackWritebackConfig,
Expand Down
10 changes: 5 additions & 5 deletions src/writeback/slack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ interface ThreadRef {

const channelIdFromDir = (channelDir: string): string => channelDir.split('__')[0] ?? channelDir

const channelSegment = (value: string): string => {
export const slackChannelSegment = (value: string): string => {
const trimmed = value.trim().replace(/^#/u, '')
const match = trimmed.match(/(?:^|\/)slack\/channels\/([^/]+)/u)
return match?.[1] ?? trimmed.split('/')[0] ?? trimmed
}

const channelAliases = (value?: string): Set<string> => {
const segment = typeof value === 'string' ? channelSegment(value) : ''
export const slackChannelAliases = (value?: string): Set<string> => {
const segment = typeof value === 'string' ? slackChannelSegment(value) : ''
if (!segment) return new Set()

const aliases = new Set<string>()
Expand All @@ -45,8 +45,8 @@ const assertSlackChannelAllowed = (
targetChannel: string | undefined,
context: string,
): void => {
const configured = channelAliases(configuredChannel)
const target = channelAliases(targetChannel)
const configured = slackChannelAliases(configuredChannel)
const target = slackChannelAliases(targetChannel)
if (configured.size === 0) {
throw new Error(`Refusing Slack writeback for ${context}: configured factory-e2e channel is required`)
}
Expand Down