diff --git a/packages/factory-sdk/src/config/schema.test.ts b/packages/factory-sdk/src/config/schema.test.ts index af79dac4..85169018 100644 --- a/packages/factory-sdk/src/config/schema.test.ts +++ b/packages/factory-sdk/src/config/schema.test.ts @@ -34,6 +34,7 @@ describe('FactoryConfigSchema', () => { expect(parsed.slack).toEqual({ channel: 'C123', style: 'threaded-summarized', + botUserId: 'U0B2596R7EZ', }) expect(parsed.mergePolicy).toBe('never') expect(parsed.stateIds).toEqual(LINEAR_STATE_IDS) diff --git a/packages/factory-sdk/src/config/schema.ts b/packages/factory-sdk/src/config/schema.ts index a7af7b4f..86a5839d 100644 --- a/packages/factory-sdk/src/config/schema.ts +++ b/packages/factory-sdk/src/config/schema.ts @@ -32,6 +32,7 @@ export const FactoryConfigSchema = z.object({ slack: z.object({ channel: z.string(), style: z.literal('threaded-summarized').default('threaded-summarized'), + botUserId: z.string().default('U0B2596R7EZ'), }).optional(), mergePolicy: z.enum(['never', 'on-green-with-review']).default('never'), stateIds: z.object({ diff --git a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts index c7d39d65..55b307d0 100644 --- a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts +++ b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts @@ -61,6 +61,7 @@ export type RelayFileClientLike = export class RelayfileCloudMountClient implements MountClient { readonly workspaceId: string + readonly writebackTransport = 'relayfile-cloud' readonly #client: RelayFileClientLike readonly #tokenProvider: TokenProvider diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index cf738ad6..b628fbbe 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -52,6 +52,12 @@ const realIssueFile = (n: number, stateId = ready, overrides: Record ({ + channel, + style: 'threaded-summarized' as const, + botUserId: 'U0B2596R7EZ', +}) + const flush = async () => { await new Promise((resolve) => setTimeout(resolve, 0)) } @@ -120,6 +126,85 @@ class ThrowingWatermarkMount extends FakeMountClient { } } +class RecordingSlack implements SlackWriteback { + readonly roots: Array<{ channel: string; text: string }> = [] + readonly replies: Array<{ threadId: string; text: string }> = [] + threadId = '1780751612.176219' + failPostThread = false + failReplies = 0 + + async postThread(root: { channel: string; text: string }): Promise<{ threadId: string }> { + if (this.failPostThread) { + throw new Error('slack post failed') + } + + this.roots.push(root) + return { threadId: this.threadId } + } + + async reply(threadId: string, text: string): Promise { + if (this.failReplies > 0) { + this.failReplies -= 1 + throw new Error('slack reply failed') + } + + this.replies.push({ threadId, text }) + } +} + +class CloudWritebackFakeMountClient extends FakeMountClient { + constructor(initialFiles: Record = {}, readonly threadTs = '1780751612.176219') { + super(initialFiles) + } + + override async writeFile(path: string, content: unknown, opts?: { guarded?: boolean }): Promise { + await super.writeFile(path, content, opts) + if (isSlackRootWritePath(path)) { + this.files.set(path, { + content: { + provider: 'slack', + objectType: 'message', + payload: { + ...record(content), + ts: this.threadTs, + thread_ts: this.threadTs, + }, + }, + }) + } + } +} + +class ConfirmRecordingSlackMountClient extends CloudWritebackFakeMountClient { + readonly confirmedPaths: string[] = [] + + override async confirmWrite(path: string, opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> { + this.confirmedPaths.push(path) + return super.confirmWrite(path, opts) + } +} + +class FailFirstSlackReplyMountClient extends CloudWritebackFakeMountClient { + failedReply = false + + override async confirmWrite(path: string, opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> { + if (!this.failedReply && path.includes('/replies/')) { + this.failedReply = true + return 'failed' + } + return super.confirmWrite(path, opts) + } +} + +class FailSlackRootMountClient extends CloudWritebackFakeMountClient { + override async confirmWrite(path: string, opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> { + if (isSlackRootWritePath(path)) { + return 'failed' + } + return super.confirmWrite(path, opts) + } +} + describe('FactoryLoop', () => { it('parses wrapped Linear issue records', () => { expect(parseLinearIssue(issuePath(1), issueFile(1))).toMatchObject({ @@ -763,7 +848,17 @@ describe('FactoryLoop', () => { it('closes a synthetic probe PR after done writebacks and before release when mergePolicy is never', async () => { const order: string[] = [] - const mount = new FakeMountClient({ + class OrderedSlackMountClient extends CloudWritebackFakeMountClient { + override async writeFile(path: string, content: unknown, opts?: { guarded?: boolean }): Promise { + if (isSlackRootWritePath(path)) { + order.push('slack-root') + } else if (path.includes('/replies/')) { + order.push('slack-reply') + } + await super.writeFile(path, content, opts) + } + } + const mount = new OrderedSlackMountClient({ [issuePath(18)]: issueFile(18), '/github/repos/AgentWorkforce__pear/pulls/by-id/18.json': { provider: 'github', @@ -798,22 +893,12 @@ describe('FactoryLoop', () => { return true }, } - const slack: SlackWriteback = { - async postThread() { - order.push('slack-root') - return { threadId: 'thread-1' } - }, - async reply() { - order.push('slack-reply') - }, - } const closeInputs: Array> = [] - const factory = createFactory(config({ slack: { channel: 'C0FACTORY', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig('C0FACTORY') }), { mount, fleet, triage: new StaticTriage(), linear, - slack, probeCloser: async (input) => { order.push('probe-close') closeInputs.push(input) @@ -893,9 +978,365 @@ describe('FactoryLoop', () => { expect(closeInputs).toEqual([]) expect(fleet.releases.map((release) => release.name)).toEqual(['ar-23-impl', 'ar-23-review']) }) + + it('watches the in-flight factory Slack thread and replies to a human status request with live state, roster, and PR', async () => { + const mount = new ConfirmRecordingSlackMountClient({ [issuePath(24)]: issueFile(24) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + probePrResolver: async () => ({ repo: 'AgentWorkforce/pear', prNumber: 240 }), + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(24), issueFile(24)))) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-1'), 'slack-human-1', { + text: 'status?', + user: 'U123', + user_name: 'human', + user_is_bot: false, + }) + await flush() + await flush() + + const replies = slackReplyWrites(mount) + expect(slack.replies).toEqual([]) + expect(replies).toHaveLength(1) + expect(replies[0]?.content.thread_ts).toBe(slack.threadId) + expect(replies[0]?.content.text).toContain('AR-24') + expect(replies[0]?.content.text).toContain(implementing) + expect(replies[0]?.content.text).toContain('ar-24-impl') + expect(replies[0]?.content.text).toContain('ar-24-review') + expect(replies[0]?.content.text).toContain('https://github.com/AgentWorkforce/pear/pull/240') + expect(mount.confirmedPaths.filter((path) => path.includes('/replies/'))).toEqual([replies[0]?.path]) + }) + + it('watches top-level inbound Slack thread replies keyed by real reply ts', async () => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(32)]: issueFile(32) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(32), issueFile(32)))) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', '1780751619.000001', 'slack-human-top-level', { + text: 'status?', + thread_ts: slack.threadId, + user: 'U123', + user_is_bot: false, + }) + await flush() + await flush() + + const replies = slackReplyWrites(mount) + expect(slack.replies).toEqual([]) + expect(replies).toHaveLength(1) + expect(replies[0]?.content.thread_ts).toBe(slack.threadId) + }) + + it.each([ + ['user_is_bot marker', { user: 'U-BOT-MIRROR', user_is_bot: true }], + ['configured bot user id', { user: 'U0B2596R7EZ', user_is_bot: false }], + ])('degraded self-ignore: inbound %s is ignored', async (_name, marker) => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(33)]: issueFile(33) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(33), issueFile(33)))) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', `1780751620.${marker.user === 'U0B2596R7EZ' ? '000002' : '000001'}`, `slack-self-${marker.user}`, { + text: 'status?', + thread_ts: slack.threadId, + ...marker, + }) + await flush() + await flush() + + expect(slackReplyWrites(mount)).toEqual([]) + }) + + it('degraded thread/channel guard: off-thread, mismatched-thread, and wrong-channel replies are skipped', async () => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(34)]: issueFile(34) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(34), issueFile(34)))) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', '1780751621.000001', 'slack-mismatched-thread', { + text: 'wrong parent', + thread_ts: '1780759999.000001', + user: 'U123', + user_is_bot: false, + }) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', '1780759999.000001', 'human-off-thread'), 'slack-off-thread', { + text: 'wrong nested parent', + user: 'U123', + user_is_bot: false, + }) + emitSlackTopLevelMessage(mount, 'C0PRODUCT__general', '1780751621.000002', 'slack-wrong-channel', { + text: 'right parent wrong channel', + thread_ts: slack.threadId, + user: 'U123', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slackReplyWrites(mount)).toEqual([]) + }) + + it('degraded positive control: genuine human reply in the watched thread is answered', async () => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(35)]: issueFile(35) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(35), issueFile(35)))) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', '1780751622.000001', 'slack-human-positive', { + text: 'status?', + thread_ts: slack.threadId, + user: 'U-HUMAN', + user_is_bot: false, + }) + await flush() + await flush() + + const replies = slackReplyWrites(mount) + expect(replies).toHaveLength(1) + expect(replies[0]?.content.text).toContain('AR-35') + }) + + it('ignores the factory bot own Slack replies to avoid self-response loops', async () => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(25)]: issueFile(25) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(25), issueFile(25)))) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'bot-1'), 'slack-bot-1', { + text: 'AR-25: Ready for Agent', + user: 'U0B2596R7EZ', + user_name: 'file_by_agent_relay', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slackReplyWrites(mount)).toEqual([]) + }) + + it('does not respond to Slack replies outside the watched factory-e2e issue thread', async () => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(26)]: issueFile(26) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(26), issueFile(26)))) + emitSlackReply(mount, slackReplyFixturePath('C0PRODUCT__general', slack.threadId, 'human-product'), 'slack-product-1', { + text: 'status?', + user: 'U123', + user_is_bot: false, + }) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', '1780751613.000001', 'human-other-thread'), 'slack-other-thread-1', { + text: 'status?', + user: 'U123', + user_is_bot: false, + }) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', slack.threadId, 'slack-parent-root', { + text: 'root parent mirror', + thread_ts: slack.threadId, + user: 'U123', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slackReplyWrites(mount)).toEqual([]) + }) + + it('connects Slack reply watchers from now and does not reprocess pre-existing thread replies', async () => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(27)]: issueFile(27) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const oldPath = slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'old-human') + emitSlackReply(mount, oldPath, 'slack-old-human', { + text: 'old status?', + user: 'U123', + user_is_bot: false, + }) + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(27), issueFile(27)))) + await flush() + await flush() + + expect(slackReplyWrites(mount)).toEqual([]) + + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'new-human'), 'slack-new-human', { + text: 'new status?', + user: 'U456', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slackReplyWrites(mount)).toHaveLength(1) + }) + + it('dedupes duplicate inbound Slack reply delivery by event identity and content', async () => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(28)]: issueFile(28) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + const replyPath = slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-duplicate') + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(28), issueFile(28)))) + emitSlackReply(mount, replyPath, 'slack-duplicate-human', { + text: 'status?', + user: 'U123', + user_is_bot: false, + }) + mount.emit(changeEvent(replyPath, 'slack-duplicate-human')) + await flush() + await flush() + + expect(slackReplyWrites(mount)).toHaveLength(1) + }) + + it('treats Slack dispatch thread startup as best-effort after agents are dispatched', async () => { + const mount = new FailSlackRootMountClient({ [issuePath(29)]: issueFile(29) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + }) + + const result = await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(29), issueFile(29)))) + + expect(result.agents.map((agent) => agent.name)).toEqual(['ar-29-impl', 'ar-29-review']) + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-29-impl', 'ar-29-review']) + }) + + it('continues processing Slack reply events after one response fails', async () => { + const mount = new FailFirstSlackReplyMountClient({ [issuePath(30)]: issueFile(30) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(30), issueFile(30)))) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-fails'), 'slack-human-fails', { + text: 'status?', + user: 'U123', + user_is_bot: false, + }) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-next'), 'slack-human-next', { + text: 'status again?', + user: 'U456', + user_is_bot: false, + }) + await flush() + await flush() + + const replies = slackReplyWrites(mount) + expect(replies).toHaveLength(2) + expect(replies[1]?.content.text).toContain('AR-30') + }) + + it('uses numeric Slack reply event ids without dropping fresh low-seq replies', async () => { + const mount = new CloudWritebackFakeMountClient({ [issuePath(31)]: issueFile(31) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const warnings: unknown[][] = [] + const logger = { + warn: (...args: unknown[]) => { + warnings.push(args) + }, + error: () => undefined, + debug: () => undefined, + } + mount.emit(changeEvent('/slack/channels/C0FACTORY__factory-e2e/messages/other/replies/old.json', 1)) + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + logger, + }) + const replyPath = slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-numeric') + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(31), issueFile(31)))) + mount.files.set(replyPath, { + content: { + provider: 'slack', + objectType: 'message', + objectId: 'slack-human-numeric', + payload: { + channel: 'C0FACTORY', + thread_ts: slack.threadId, + ts: 'slack-human-numeric', + text: 'status?', + user: 'U123', + user_is_bot: false, + }, + }, + }) + mount.emit(changeEvent(replyPath, 1)) + await flush() + await flush() + + expect(slackReplyWrites(mount)).toHaveLength(1) + expect(warnings.flat()).not.toContain('[factory] Slack reply event missing stable identity; falling back to path/content dedupe') + }) }) -const changeEvent = (path: string, id: string, occurredAt = new Date().toISOString()) => ({ +const changeEvent = (path: string, id: string | number, occurredAt = new Date().toISOString()) => ({ id, workspace: 'factory-test', type: 'relayfile.changed', @@ -909,3 +1350,68 @@ const changeEvent = (path: string, id: string, occurredAt = new Date().toISOStri summary: {}, expand: async () => ({ level: 'summary', path, summary: {} }), }) as unknown as ChangeEvent + +const slackReplyFixturePath = (channelDir: string, threadId: string, replyId: string): string => + `/slack/channels/${channelDir}/messages/${threadId.replace(/\./g, '_')}/replies/${replyId}.json` + +const slackTopLevelMessageFixturePath = (channelDir: string, messageTs: string): string => + `/slack/channels/${channelDir}/messages/${messageTs.replace(/\./g, '_')}/meta.json` + +const emitSlackTopLevelMessage = ( + mount: FakeMountClient, + channelDir: string, + messageTs: string, + id: string, + payload: Record, +): void => { + const path = slackTopLevelMessageFixturePath(channelDir, messageTs) + const channel = channelDir.split('__')[0] + mount.files.set(path, { + content: { + provider: 'slack', + objectType: 'message', + objectId: id, + payload: { + channel, + ts: messageTs, + ...payload, + }, + }, + }) + mount.emit(changeEvent(path, id)) +} + +const emitSlackReply = ( + mount: FakeMountClient, + path: string, + id: string, + payload: Record, +): void => { + const threadTs = path.match(/\/messages\/([^/]+)\/replies\//u)?.[1]?.replace(/_/g, '.') + const channel = path.match(/^\/slack\/channels\/([^/]+)\//u)?.[1]?.split('__')[0] + mount.files.set(path, { + content: { + provider: 'slack', + objectType: 'message', + objectId: id, + payload: { + channel, + thread_ts: threadTs, + ts: id, + ...payload, + }, + }, + }) + mount.emit(changeEvent(path, id)) +} + +const isSlackRootWritePath = (path: string): boolean => + /^\/slack\/channels\/[^/]+\/messages\/[^/]+\.json$/u.test(path) + +const slackReplyWrites = (mount: FakeMountClient): Array<{ path: string; content: { text?: string; thread_ts?: string } }> => + mount.writes + .filter((write) => write.path.includes('/replies/')) + .map((write) => ({ path: write.path, content: record(write.content) as { text?: string; thread_ts?: string } })) + +const record = (value: unknown): Record => + value !== null && typeof value === 'object' && !Array.isArray(value) ? value as Record : {} diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index d90399c1..92a898ed 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -22,11 +22,20 @@ import type { TriageEngine, } from '../types' import { MountGithubRead, MountLinearWriteback, MountSlackWriteback } from '../writeback' -import { asRecord, parseJsonContent, wrappedPayload } from '../writeback/shared' +import { asRecord, parseJsonContent, stableHash, wrappedPayload } from '../writeback/shared' import { BatchTracker, type InFlightIssue, issueKey } from './batch-tracker' type FactoryEvent = 'issue-queued' | 'dispatched' | 'issue-done' | 'writeback-verified' | 'error' type Listener = (payload: FactoryEventPayload) => void +type SlackThreadWatcher = { stop(): Promise } +type SlackReply = { + channelDir: string + threadTs: string + messageTs: string + isThreadReply: boolean + isBot: boolean + raw: Record +} const ISSUE_ROOT = '/linear/issues' const READY_EVENTS_LIMIT = 100 @@ -39,6 +48,8 @@ const STATE_NAME_TO_ID: Record = { Done: LINEAR_STATE_IDS.done, 'In Planning': LINEAR_STATE_IDS.inPlanning, } +const SLACK_REPLY_EVENTS_LIMIT = 100 +const SLACK_REPLY_POLL_INTERVAL_MS = 5_000 const realClock: Clock = { now: () => Date.now(), @@ -67,6 +78,9 @@ export class FactoryLoop implements Factory { readonly #criticalMessages = new Map[0] }>() readonly #resumeInFlight = new Map>() readonly #resumedExitKeys = new Set() + readonly #slackThreadIds = new Map() + readonly #slackWatchers = new Map() + readonly #slackWatcherStarts = new Map>() #subscription?: Subscription #livePollTimer?: ReturnType #livePollInFlight = false @@ -90,7 +104,7 @@ export class FactoryLoop implements Factory { stateIds: config.stateIds, safety: config.safety, }) - this.#slack = ports.slack ?? (config.slack ? MountSlackWriteback(ports.mount, config.slack) : undefined) + this.#slack = config.slack ? MountSlackWriteback(ports.mount, config.slack) : ports.slack void (ports.github ?? MountGithubRead(ports.mount)) this.#mergeGate = ports.mergeGate ?? new GithubMergeGate() this.#probeCloser = ports.probeCloser ?? closeProbePr @@ -152,6 +166,10 @@ export class FactoryLoop implements Factory { this.#livePollInFlight = false await this.#subscription?.unsubscribe() this.#subscription = undefined + await Promise.all([...this.#slackWatchers.values()].map((watcher) => watcher.stop())) + this.#slackWatchers.clear() + this.#slackThreadIds.clear() + this.#slackWatcherStarts.clear() this.#offAgentExit?.() this.#offDeliveryFailed?.() this.#offAgentExit = undefined @@ -416,6 +434,7 @@ export class FactoryLoop implements Factory { record.result = result this.#increment('dispatched') this.#emit('dispatched', { issue: decision.issue, result }) + await this.#ensureSlackDispatchThread(record, result) await this.#sendCriticalReviewerMessage(record) return result } catch (error) { @@ -731,6 +750,7 @@ export class FactoryLoop implements Factory { this.#increment('done') this.#emit('issue-done', { issue: record.issue }) + await this.#stopSlackWatcher(record.issue) const next = this.#batch.complete(record.issue) if (next) { await this.dispatch(next.decision, { dryRun: next.dryRun }) @@ -756,6 +776,201 @@ export class FactoryLoop implements Factory { this.#counters[name] = (this.#counters[name] ?? 0) + 1 } + async #ensureSlackDispatchThread(record: InFlightIssue, result: DispatchResult): Promise { + if (!this.#slack || !this.#config.slack || result.dryRun) { + return + } + + const key = issueKey(record.issue) + if (this.#slackThreadIds.has(key) || this.#slackWatcherStarts.has(key)) { + try { + await this.#slackWatcherStarts.get(key) + } catch { + // The initiator logs Slack watcher startup failures. + } + return + } + + const start = this.#postAndWatchSlackDispatchThread(record, result) + this.#slackWatcherStarts.set(key, start) + try { + await start + } catch (error) { + this.#logger.warn?.(`[factory] failed to establish Slack dispatch thread for ${record.issue.key}`, error) + } finally { + this.#slackWatcherStarts.delete(key) + } + } + + async #postAndWatchSlackDispatchThread(record: InFlightIssue, result: DispatchResult): Promise { + if (!this.#slack || !this.#config.slack) { + return + } + + const root = await this.#slack.postThread({ + channel: this.#config.slack.channel, + text: [ + `${record.issue.key}: factory agents dispatched.`, + `State: ${result.stateId ?? 'dispatching'}`, + `Agents: ${result.agents.map((agent) => agent.name).join(', ') || 'none'}`, + ].join('\n'), + }) + this.#slackThreadIds.set(issueKey(record.issue), root.threadId) + await this.#watchSlackThread(record, root.threadId) + } + + async #watchSlackThread(record: InFlightIssue, threadId: string): Promise { + if (!this.#config.slack) { + return + } + + const key = issueKey(record.issue) + if (this.#slackWatchers.has(key)) { + return + } + + const channelDir = this.#config.slack.channel + const messagesPrefix = slackChannelMessagesPrefix(channelDir) + const preExistingPaths = new Set() + const seenReplies = new Set() + let missingIdentityLogged = false + let cursor: string | undefined + let stopped = false + + const markPreExisting = async (): Promise => { + try { + const page = await this.#mount.getEvents({ limit: SLACK_REPLY_EVENTS_LIMIT }) + cursor = page.nextCursor ?? undefined + for (const event of page.events) { + if (event.resource.path.startsWith(messagesPrefix)) { + preExistingPaths.add(event.resource.path) + } + } + } catch (error) { + this.#logger.warn?.('[factory] unable to seed Slack reply watcher event cursor', error) + } + } + + const handle = async (event: ChangeEvent): Promise => { + try { + if (stopped || !event.resource.path.startsWith(messagesPrefix)) { + return + } + + const eventKey = eventIdentity(event) + if (!eventKey) { + if (!missingIdentityLogged) { + missingIdentityLogged = true + this.#logger.warn?.('[factory] Slack reply event missing stable identity; falling back to path/content dedupe') + } + } + + if (preExistingPaths.has(event.resource.path)) { + return + } + + const reply = await this.#readSlackReply(event.resource.path) + if (!reply || !reply.isThreadReply || reply.threadTs !== threadId || reply.channelDir !== channelDir) { + return + } + + const replyKey = `${eventKey ?? event.resource.path}:${stableHash(JSON.stringify(reply.raw))}` + if (seenReplies.has(replyKey)) { + this.#logger.debug?.('[factory] suppressed duplicate Slack reply payload', { issue: record.issue.key, path: event.resource.path }) + return + } + seenReplies.add(replyKey) + + if (reply.isBot) { + return + } + + await this.#respondToSlackStatus(record, threadId) + } catch (error) { + this.#logger.error?.('[factory] failed to handle Slack reply event', error) + } + } + + await markPreExisting() + + let subscription: Subscription | undefined + try { + subscription = this.#mount.subscribe([`${messagesPrefix}**`], (event) => { + void handle(event) + }) + } catch (error) { + this.#logger.warn?.('[factory] Slack reply subscribe failed; relying on event polling', error) + } + + const poll = async (): Promise => { + while (!stopped) { + try { + const page = await this.#mount.getEvents({ cursor, limit: SLACK_REPLY_EVENTS_LIMIT }) + cursor = page.nextCursor ?? cursor + for (const event of page.events) { + await handle(event) + } + } catch (error) { + this.#logger.warn?.('[factory] Slack reply polling failed', error) + } + await unrefDelay(SLACK_REPLY_POLL_INTERVAL_MS) + } + } + void poll() + + this.#slackWatchers.set(key, { + stop: async () => { + stopped = true + await subscription?.unsubscribe() + }, + }) + } + + async #stopSlackWatcher(issue: IssueRef): Promise { + const key = issueKey(issue) + const watcher = this.#slackWatchers.get(key) + this.#slackWatchers.delete(key) + this.#slackThreadIds.delete(key) + await watcher?.stop() + } + + async #readSlackReply(path: string): Promise { + try { + const { content } = await this.#mount.readFile(path) + return parseSlackReply(path, content, this.#config.slack?.botUserId ?? 'U0B2596R7EZ') + } catch (error) { + this.#logger.warn?.(`Unable to read Slack reply ${path}`, error) + return undefined + } + } + + async #respondToSlackStatus(record: InFlightIssue, threadId: string): Promise { + if (!this.#slack || !this.#config.slack) { + return + } + + const issue = await this.#readIssue(record.issue.path) + if (!issue || !isInFactoryScope(issue, this.#config.safety)) { + return + } + + const [roster, probe] = await Promise.all([ + this.#fleet.roster(), + this.#probePrResolver(issue), + ]) + const activeAgents = new Set(record.agents.keys()) + const liveAgents = roster.agents + .map((agent) => agent.name) + .filter((name) => activeAgents.has(name)) + .sort() + + await this.#slack.reply(threadId, [ + `${issue.key}: ${issueStateLabel(issue)}`, + `Agents: ${liveAgents.join(', ') || [...activeAgents].sort().join(', ') || 'none'}`, + `PR: ${probe ? githubPrUrl(probe.repo, probe.prNumber) : 'not found yet'}`, + ].join('\n')) + } + async #closeProbeIfRequired(issue: LinearIssue): Promise { if (this.#config.mergePolicy !== 'never' || !this.#isSyntheticProbeIssue(issue)) { return @@ -1071,3 +1286,56 @@ const isCompletionReason = (reason?: string): boolean => const defaultRestartPolicy = (spec: AgentSpec): AgentSpec['restartPolicy'] | undefined => spec.role === 'implementer' ? { maxRestarts: 3, strategy: 'resume' } as AgentSpec['restartPolicy'] : spec.restartPolicy + +const slackPayloadTs = (threadId: string): string => threadId.replace(/_/g, '.') + +const slackChannelMessagesPrefix = (channelDir: string): string => `/slack/channels/${channelDir}/messages/` + +const eventIdentity = (event: ChangeEvent): string | undefined => { + const record = event as unknown as Record + const rawId = record.id ?? record.event_id ?? record.seq + const id = typeof rawId === 'string' || typeof rawId === 'number' ? String(rawId) : undefined + return id ? `event:${id}` : undefined +} + +const parseSlackReply = (path: string, content: unknown, botUserId: string): SlackReply | undefined => { + const raw = asRecord(parseJsonContent(content)) ?? {} + const payload = wrappedPayload(raw) + const channelDir = path.match(/^\/slack\/channels\/([^/]+)\//u)?.[1] ?? '' + const pathMatch = path.match(/^\/slack\/channels\/[^/]+\/messages\/([^/]+)(?:\/replies\/([^/]+))?/u) + const parentFromPath = pathMatch?.[2] ? slackPayloadTs(pathMatch[1]) : undefined + const messageFromPath = slackPayloadTs(pathMatch?.[2] ?? pathMatch?.[1] ?? '') + const messageTs = stringValue(payload.ts) ?? messageFromPath + const threadTs = stringValue(payload.thread_ts) ?? parentFromPath + if (!channelDir || !threadTs || !messageTs) { + return undefined + } + + return { + channelDir, + threadTs, + messageTs, + isThreadReply: Boolean(parentFromPath) || threadTs !== messageTs, + isBot: isOwnSlackBotReply(payload, botUserId), + raw, + } +} + +const isOwnSlackBotReply = (payload: Record, botUserId: string): boolean => + payload.user_is_bot === true || + stringValue(payload.user) === botUserId + +const issueStateLabel = (issue: LinearIssue): string => { + const name = issue.state?.name?.trim() + if (name && issue.stateId) { + return `${name} (${issue.stateId})` + } + return name || issue.stateId || 'unknown state' +} + +const githubPrUrl = (repo: string, prNumber: number): string => `https://github.com/${repo}/pull/${prNumber}` + +const unrefDelay = (ms: number): Promise => new Promise((resolve) => { + const timer = setTimeout(resolve, ms) + timer.unref?.() +}) diff --git a/packages/factory-sdk/src/ports/mount.ts b/packages/factory-sdk/src/ports/mount.ts index 5a2ee426..93a41871 100644 --- a/packages/factory-sdk/src/ports/mount.ts +++ b/packages/factory-sdk/src/ports/mount.ts @@ -20,6 +20,7 @@ export interface EventPage { } export interface MountClient { + readonly writebackTransport?: 'relayfile-cloud' | 'test' readFile(path: string): Promise<{ content: unknown; revision?: string }> writeFile(path: string, content: unknown, opts?: { guarded?: boolean }): Promise deleteFile(path: string): Promise diff --git a/packages/factory-sdk/src/testing/fakes.ts b/packages/factory-sdk/src/testing/fakes.ts index 2b9898c3..e66d8dea 100644 --- a/packages/factory-sdk/src/testing/fakes.ts +++ b/packages/factory-sdk/src/testing/fakes.ts @@ -16,6 +16,7 @@ type ExitListener = (name: string, reason?: string) => void type DeliveryFailedListener = (info: { to: string; msgId?: string; reason?: string }) => void export class FakeMountClient implements MountClient { + readonly writebackTransport = 'test' readonly files = new Map() readonly writes: Array<{ path: string; content: unknown }> = [] readonly deletes: string[] = [] diff --git a/packages/factory-sdk/src/writeback/slack.ts b/packages/factory-sdk/src/writeback/slack.ts index 06eda71c..3e79e6ff 100644 --- a/packages/factory-sdk/src/writeback/slack.ts +++ b/packages/factory-sdk/src/writeback/slack.ts @@ -1,6 +1,6 @@ import { slackMessagePath, slackReplyPath } from '../constants/slack' import type { MountClient } from '../ports' -import { safePathSegment, stableHash, trimToLines } from './shared' +import { safePathSegment, stableHash, trimToLines, wrappedPayload } from './shared' export interface MountSlackWritebackConfig { channel?: string @@ -65,13 +65,19 @@ const rootClientId = (prefix: string, channelDir: string, text: string): string const replyClientId = (prefix: string, threadId: string, text: string): string => `${safePathSegment(prefix)}-reply-${safePathSegment(threadId)}-${stableHash(text)}` -const confirmPath = async (mount: MountClient, path: string): Promise => { +const confirmPath = async (mount: MountClient, path: string): Promise => { const confirmation = await mount.confirmWrite(path, { timeoutMs: 90_000 }) if (confirmation !== 'acked') { throw new Error(`Writeback not acked for ${path}: ${confirmation}`) } - await mount.readFile(path) + return (await mount.readFile(path)).content +} + +const slackTsFromContent = (content: unknown): string | undefined => { + const payload = wrappedPayload(content) + const ts = stringValue(payload.ts) ?? stringValue(payload.thread_ts) + return ts && /^\d+\.\d+$/u.test(ts) ? ts : undefined } export const MountSlackWriteback = ( @@ -80,9 +86,15 @@ export const MountSlackWriteback = ( ) => { const threads = new Map() const prefix = slackCfg.clientIdPrefix ?? 'factory' + const assertCloudWriteback = (): void => { + if (mount.writebackTransport !== 'relayfile-cloud' && mount.writebackTransport !== 'test') { + throw new Error('Slack writeback requires RelayfileCloudMountClient cloud writeback transport') + } + } return { async postThread(root: { channel: string; text: string }): Promise<{ threadId: string }> { + assertCloudWriteback() const channelDir = slackCfg.channelDir ?? root.channel ?? slackCfg.channel assertSlackChannelAllowed(slackCfg.channel, root.channel, 'postThread') assertSlackChannelAllowed(slackCfg.channel, channelDir, 'postThread path') @@ -93,13 +105,18 @@ export const MountSlackWriteback = ( const path = slackMessagePath(channelDir, clientId) await mount.writeFile(path, { channelId, text }, { guarded: true }) - await confirmPath(mount, path) + const content = await confirmPath(mount, path) + const threadTs = slackTsFromContent(content) ?? clientId - threads.set(clientId, { channelDir, channelId, threadTs: clientId }) - return { threadId: clientId } + threads.set(threadTs, { channelDir, channelId, threadTs }) + if (threadTs !== clientId) { + threads.set(clientId, { channelDir, channelId, threadTs }) + } + return { threadId: threadTs } }, async reply(threadId: string, text: string): Promise { + assertCloudWriteback() const fallbackChannelDir = slackCfg.channelDir ?? slackCfg.channel assertSlackChannelAllowed(slackCfg.channel, fallbackChannelDir, 'reply') const ref = threads.get(threadId) ?? ( @@ -125,3 +142,5 @@ export const MountSlackWriteback = ( }, } } + +const stringValue = (value: unknown): string | undefined => typeof value === 'string' ? value : undefined diff --git a/packages/factory-sdk/src/writeback/writeback.test.ts b/packages/factory-sdk/src/writeback/writeback.test.ts index 09b8ac88..483d2bc6 100644 --- a/packages/factory-sdk/src/writeback/writeback.test.ts +++ b/packages/factory-sdk/src/writeback/writeback.test.ts @@ -4,6 +4,7 @@ import { FactoryConfigSchema } from '../config/schema' import { linearCommentPath } from '../constants/linear' import { slackReplyPath } from '../constants/slack' import { createFactory, linearCommentName, MountGithubRead, MountLinearWriteback, MountSlackWriteback } from '../index' +import type { MountClient } from '../ports' import type { LinearIssue } from '../types' import { FakeFleetClient, FakeMountClient } from '../testing' @@ -448,6 +449,83 @@ describe('MountSlackWriteback', () => { }) }) + it('returns the real Slack parent ts from the acked thread root when available', async () => { + class AckedSlackMountClient extends FakeMountClient { + override async writeFile(path: string, content: unknown, opts?: { guarded?: boolean }): Promise { + await super.writeFile(path, content, opts) + this.files.set(path, { + content: { + provider: 'slack', + objectType: 'message', + payload: { + channel: 'C0FACTORY', + ts: '1780751612.176219', + text: 'Factory update', + }, + }, + }) + } + } + const mount = new AckedSlackMountClient() + const slack = MountSlackWriteback(mount, { + channel: 'C0FACTORY__factory-e2e', + channelDir: 'C0FACTORY__factory-e2e', + clientIdPrefix: 'factory-e2e', + }) + + const root = await slack.postThread({ + channel: 'C0FACTORY__factory-e2e', + text: 'Factory update', + }) + await slack.reply(root.threadId, 'Factory reply') + + expect(root.threadId).toBe('1780751612.176219') + expect(mount.writes[1]?.path).toContain('/slack/channels/C0FACTORY__factory-e2e/messages/1780751612_176219/replies/') + expect(mount.writes[1]?.content).toMatchObject({ + channelId: 'C0FACTORY', + thread_ts: '1780751612.176219', + text: 'Factory reply', + }) + }) + + it('refuses Slack writes over a local mirror mount even if file writes appear acked', async () => { + const writes: Array<{ path: string; content: unknown }> = [] + const localMirrorMount: MountClient = { + async readFile() { + return { content: {} } + }, + async writeFile(path, content) { + writes.push({ path, content }) + }, + async deleteFile() { + throw new Error('local mirror delete is not used') + }, + async listTree() { + return [] + }, + subscribe() { + return { unsubscribe: async () => undefined } + }, + async getEvents() { + return { events: [] } + }, + async confirmWrite() { + return 'acked' + }, + async ensureSubRoot() { + return 'ready' + }, + } + const slack = MountSlackWriteback(localMirrorMount, { + channel: 'C0FACTORY__factory-e2e', + channelDir: 'C0FACTORY__factory-e2e', + }) + + await expect(slack.reply('1780751612.176219', 'Factory reply')) + .rejects.toThrow(/requires RelayfileCloudMountClient cloud writeback transport/) + expect(writes).toEqual([]) + }) + it('surfaces non-acked thread writes even when local read-back succeeds', async () => { class TimeoutAfterWriteMountClient extends FakeMountClient { override async confirmWrite(