From 489daa66722d28be3e83bab47a60a152baa866e4 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 12:02:12 +0200 Subject: [PATCH 1/5] Add bidirectional Slack status watcher --- .../src/orchestrator/factory.test.ts | 185 ++++++++++++ .../factory-sdk/src/orchestrator/factory.ts | 267 +++++++++++++++++- 2 files changed, 451 insertions(+), 1 deletion(-) diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index cf738ad6..ad01accb 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -120,6 +120,21 @@ class ThrowingWatermarkMount extends FakeMountClient { } } +class RecordingSlack implements SlackWriteback { + readonly roots: Array<{ channel: string; text: string }> = [] + readonly replies: Array<{ threadId: string; text: string }> = [] + threadId = '1780751612.176219' + + async postThread(root: { channel: string; text: string }): Promise<{ threadId: string }> { + this.roots.push(root) + return { threadId: this.threadId } + } + + async reply(threadId: string, text: string): Promise { + this.replies.push({ threadId, text }) + } +} + describe('FactoryLoop', () => { it('parses wrapped Linear issue records', () => { expect(parseLinearIssue(issuePath(1), issueFile(1))).toMatchObject({ @@ -893,6 +908,149 @@ describe('FactoryLoop', () => { expect(closeInputs).toEqual([]) expect(fleet.releases.map((release) => release.name)).toEqual(['ar-23-impl', 'ar-23-review']) }) + + it('watches the in-flight factory Slack thread and replies to a human status request with live state, roster, and PR', async () => { + const mount = new FakeMountClient({ [issuePath(24)]: issueFile(24) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + probePrResolver: async () => ({ repo: 'AgentWorkforce/pear', prNumber: 240 }), + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(24), issueFile(24)))) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-1'), 'slack-human-1', { + text: 'status?', + user: 'U123', + user_name: 'human', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slack.replies).toHaveLength(1) + expect(slack.replies[0]).toMatchObject({ threadId: slack.threadId }) + expect(slack.replies[0]?.text).toContain('AR-24') + expect(slack.replies[0]?.text).toContain(implementing) + expect(slack.replies[0]?.text).toContain('ar-24-impl') + expect(slack.replies[0]?.text).toContain('ar-24-review') + expect(slack.replies[0]?.text).toContain('https://github.com/AgentWorkforce/pear/pull/240') + }) + + it('ignores the factory bot own Slack replies to avoid self-response loops', async () => { + const mount = new FakeMountClient({ [issuePath(25)]: issueFile(25) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(25), issueFile(25)))) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'bot-1'), 'slack-bot-1', { + text: 'AR-25: Ready for Agent', + user: 'BFACTORY', + user_name: 'file_by_agent_relay', + user_is_bot: true, + bot_id: 'BFACTORY', + }) + await flush() + await flush() + + expect(slack.replies).toEqual([]) + }) + + it('does not respond to Slack replies outside the watched factory-e2e issue thread', async () => { + const mount = new FakeMountClient({ [issuePath(26)]: issueFile(26) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(26), issueFile(26)))) + emitSlackReply(mount, slackReplyFixturePath('C0PRODUCT__general', slack.threadId, 'human-product'), 'slack-product-1', { + text: 'status?', + user: 'U123', + user_is_bot: false, + }) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', '1780751613.000001', 'human-other-thread'), 'slack-other-thread-1', { + text: 'status?', + user: 'U123', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slack.replies).toEqual([]) + }) + + it('connects Slack reply watchers from now and does not reprocess pre-existing thread replies', async () => { + const mount = new FakeMountClient({ [issuePath(27)]: issueFile(27) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const oldPath = slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'old-human') + emitSlackReply(mount, oldPath, 'slack-old-human', { + text: 'old status?', + user: 'U123', + user_is_bot: false, + }) + const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(27), issueFile(27)))) + await flush() + await flush() + + expect(slack.replies).toEqual([]) + + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'new-human'), 'slack-new-human', { + text: 'new status?', + user: 'U456', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slack.replies).toHaveLength(1) + }) + + it('dedupes duplicate inbound Slack reply delivery by event identity and content', async () => { + const mount = new FakeMountClient({ [issuePath(28)]: issueFile(28) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + const replyPath = slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-duplicate') + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(28), issueFile(28)))) + emitSlackReply(mount, replyPath, 'slack-duplicate-human', { + text: 'status?', + user: 'U123', + user_is_bot: false, + }) + mount.emit(changeEvent(replyPath, 'slack-duplicate-human')) + await flush() + await flush() + + expect(slack.replies).toHaveLength(1) + }) }) const changeEvent = (path: string, id: string, occurredAt = new Date().toISOString()) => ({ @@ -909,3 +1067,30 @@ const changeEvent = (path: string, id: string, occurredAt = new Date().toISOStri summary: {}, expand: async () => ({ level: 'summary', path, summary: {} }), }) as unknown as ChangeEvent + +const slackReplyFixturePath = (channelDir: string, threadId: string, replyId: string): string => + `/slack/channels/${channelDir}/messages/${threadId.replace(/\./g, '_')}/replies/${replyId}.json` + +const emitSlackReply = ( + mount: FakeMountClient, + path: string, + id: string, + payload: Record, +): void => { + const threadTs = path.match(/\/messages\/([^/]+)\/replies\//u)?.[1]?.replace(/_/g, '.') + const channel = path.match(/^\/slack\/channels\/([^/]+)\//u)?.[1]?.split('__')[0] + mount.files.set(path, { + content: { + provider: 'slack', + objectType: 'message', + objectId: id, + payload: { + channel, + thread_ts: threadTs, + ts: id, + ...payload, + }, + }, + }) + mount.emit(changeEvent(path, id)) +} diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index d90399c1..3ae2ddf8 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -1,5 +1,6 @@ import { FactoryConfigSchema, type FactoryConfig } from '../config/schema' import { LINEAR_STATE_IDS, linearByStatePath } from '../constants/linear' +import { slackReplyPath } from '../constants/slack' import { GithubMergeGate, closeProbePr, type GithubMergeGate as GithubMergeGatePort } from '../github' import type { AgentSpec, ChangeEvent, FleetClient, LinearWriteback, MountClient, SlackWriteback, Subscription } from '../ports' import type { Clock, Logger } from '../ports/system' @@ -22,11 +23,18 @@ import type { TriageEngine, } from '../types' import { MountGithubRead, MountLinearWriteback, MountSlackWriteback } from '../writeback' -import { asRecord, parseJsonContent, wrappedPayload } from '../writeback/shared' +import { asRecord, parseJsonContent, stableHash, wrappedPayload } from '../writeback/shared' import { BatchTracker, type InFlightIssue, issueKey } from './batch-tracker' type FactoryEvent = 'issue-queued' | 'dispatched' | 'issue-done' | 'writeback-verified' | 'error' type Listener = (payload: FactoryEventPayload) => void +type SlackThreadWatcher = { stop(): Promise } +type SlackReply = { + channelDir: string + threadTs: string + isBot: boolean + raw: Record +} const ISSUE_ROOT = '/linear/issues' const READY_EVENTS_LIMIT = 100 @@ -39,6 +47,8 @@ const STATE_NAME_TO_ID: Record = { Done: LINEAR_STATE_IDS.done, 'In Planning': LINEAR_STATE_IDS.inPlanning, } +const SLACK_REPLY_EVENTS_LIMIT = 100 +const SLACK_REPLY_POLL_INTERVAL_MS = 5_000 const realClock: Clock = { now: () => Date.now(), @@ -67,6 +77,9 @@ export class FactoryLoop implements Factory { readonly #criticalMessages = new Map[0] }>() readonly #resumeInFlight = new Map>() readonly #resumedExitKeys = new Set() + readonly #slackThreadIds = new Map() + readonly #slackWatchers = new Map() + readonly #slackWatcherStarts = new Map>() #subscription?: Subscription #livePollTimer?: ReturnType #livePollInFlight = false @@ -152,6 +165,10 @@ export class FactoryLoop implements Factory { this.#livePollInFlight = false await this.#subscription?.unsubscribe() this.#subscription = undefined + await Promise.all([...this.#slackWatchers.values()].map((watcher) => watcher.stop())) + this.#slackWatchers.clear() + this.#slackThreadIds.clear() + this.#slackWatcherStarts.clear() this.#offAgentExit?.() this.#offDeliveryFailed?.() this.#offAgentExit = undefined @@ -416,6 +433,7 @@ export class FactoryLoop implements Factory { record.result = result this.#increment('dispatched') this.#emit('dispatched', { issue: decision.issue, result }) + await this.#ensureSlackDispatchThread(record, result) await this.#sendCriticalReviewerMessage(record) return result } catch (error) { @@ -731,6 +749,7 @@ export class FactoryLoop implements Factory { this.#increment('done') this.#emit('issue-done', { issue: record.issue }) + await this.#stopSlackWatcher(record.issue) const next = this.#batch.complete(record.issue) if (next) { await this.dispatch(next.decision, { dryRun: next.dryRun }) @@ -756,6 +775,198 @@ export class FactoryLoop implements Factory { this.#counters[name] = (this.#counters[name] ?? 0) + 1 } + async #ensureSlackDispatchThread(record: InFlightIssue, result: DispatchResult): Promise { + if (!this.#slack || !this.#config.slack || result.dryRun) { + return + } + + const key = issueKey(record.issue) + if (this.#slackThreadIds.has(key) || this.#slackWatcherStarts.has(key)) { + await this.#slackWatcherStarts.get(key) + return + } + + const start = this.#postAndWatchSlackDispatchThread(record, result) + this.#slackWatcherStarts.set(key, start) + try { + await start + } finally { + this.#slackWatcherStarts.delete(key) + } + } + + async #postAndWatchSlackDispatchThread(record: InFlightIssue, result: DispatchResult): Promise { + if (!this.#slack || !this.#config.slack) { + return + } + + const root = await this.#slack.postThread({ + channel: this.#config.slack.channel, + text: [ + `${record.issue.key}: factory agents dispatched.`, + `State: ${result.stateId ?? 'dispatching'}`, + `Agents: ${result.agents.map((agent) => agent.name).join(', ') || 'none'}`, + ].join('\n'), + }) + this.#slackThreadIds.set(issueKey(record.issue), root.threadId) + await this.#watchSlackThread(record, root.threadId) + } + + async #watchSlackThread(record: InFlightIssue, threadId: string): Promise { + if (!this.#config.slack) { + return + } + + const key = issueKey(record.issue) + if (this.#slackWatchers.has(key)) { + return + } + + const channelDir = this.#config.slack.channel + const replyPrefix = slackReplyPrefix(channelDir, threadId) + const preExistingEvents = new Set() + const preExistingPaths = new Set() + const seenReplies = new Set() + let missingIdentityLogged = false + let cursor: string | undefined + let stopped = false + + const markPreExisting = async (): Promise => { + try { + const page = await this.#mount.getEvents({ limit: SLACK_REPLY_EVENTS_LIMIT }) + cursor = page.nextCursor ?? undefined + for (const event of page.events) { + const eventKey = eventIdentity(event) + if (eventKey) { + preExistingEvents.add(eventKey) + } + if (event.resource.path.startsWith(replyPrefix)) { + preExistingPaths.add(event.resource.path) + } + } + } catch (error) { + this.#logger.warn?.('[factory] unable to seed Slack reply watcher event cursor', error) + } + } + + const handle = async (event: ChangeEvent): Promise => { + if (stopped || !event.resource.path.startsWith(replyPrefix)) { + return + } + + const eventKey = eventIdentity(event) + if (!eventKey) { + if (!missingIdentityLogged) { + missingIdentityLogged = true + this.#logger.warn?.('[factory] Slack reply event missing stable identity; falling back to path/content dedupe') + } + } else if (preExistingEvents.has(eventKey)) { + return + } + + if (preExistingPaths.has(event.resource.path)) { + return + } + + const reply = await this.#readSlackReply(event.resource.path) + if (!reply || reply.threadTs !== threadId || reply.channelDir !== channelDir) { + return + } + + const replyKey = `${eventKey ?? event.resource.path}:${stableHash(JSON.stringify(reply.raw))}` + if (seenReplies.has(replyKey)) { + this.#logger.debug?.('[factory] suppressed duplicate Slack reply payload', { issue: record.issue.key, path: event.resource.path }) + return + } + seenReplies.add(replyKey) + + if (reply.isBot) { + return + } + + await this.#respondToSlackStatus(record, threadId) + } + + await markPreExisting() + + let subscription: Subscription | undefined + try { + subscription = this.#mount.subscribe([`${replyPrefix}**`], (event) => { + void handle(event) + }) + } catch (error) { + this.#logger.warn?.('[factory] Slack reply subscribe failed; relying on event polling', error) + } + + const poll = async (): Promise => { + while (!stopped) { + try { + const page = await this.#mount.getEvents({ cursor, limit: SLACK_REPLY_EVENTS_LIMIT }) + cursor = page.nextCursor ?? cursor + for (const event of page.events) { + await handle(event) + } + } catch (error) { + this.#logger.warn?.('[factory] Slack reply polling failed', error) + } + await unrefDelay(SLACK_REPLY_POLL_INTERVAL_MS) + } + } + void poll() + + this.#slackWatchers.set(key, { + stop: async () => { + stopped = true + await subscription?.unsubscribe() + }, + }) + } + + async #stopSlackWatcher(issue: IssueRef): Promise { + const key = issueKey(issue) + const watcher = this.#slackWatchers.get(key) + this.#slackWatchers.delete(key) + this.#slackThreadIds.delete(key) + await watcher?.stop() + } + + async #readSlackReply(path: string): Promise { + try { + const { content } = await this.#mount.readFile(path) + return parseSlackReply(path, content) + } catch (error) { + this.#logger.warn?.(`Unable to read Slack reply ${path}`, error) + return undefined + } + } + + async #respondToSlackStatus(record: InFlightIssue, threadId: string): Promise { + if (!this.#slack || !this.#config.slack) { + return + } + + const issue = await this.#readIssue(record.issue.path) + if (!issue || !isInFactoryScope(issue, this.#config.safety)) { + return + } + + const [roster, probe] = await Promise.all([ + this.#fleet.roster(), + this.#probePrResolver(issue), + ]) + const activeAgents = new Set(record.agents.keys()) + const liveAgents = roster.agents + .map((agent) => agent.name) + .filter((name) => activeAgents.has(name)) + .sort() + + await this.#slack.reply(threadId, [ + `${issue.key}: ${issueStateLabel(issue)}`, + `Agents: ${liveAgents.join(', ') || [...activeAgents].sort().join(', ') || 'none'}`, + `PR: ${probe ? githubPrUrl(probe.repo, probe.prNumber) : 'not found yet'}`, + ].join('\n')) + } + async #closeProbeIfRequired(issue: LinearIssue): Promise { if (this.#config.mergePolicy !== 'never' || !this.#isSyntheticProbeIssue(issue)) { return @@ -1071,3 +1282,57 @@ const isCompletionReason = (reason?: string): boolean => const defaultRestartPolicy = (spec: AgentSpec): AgentSpec['restartPolicy'] | undefined => spec.role === 'implementer' ? { maxRestarts: 3, strategy: 'resume' } as AgentSpec['restartPolicy'] : spec.restartPolicy + +const slackPathTs = (threadTs: string): string => threadTs.replace(/\./g, '_') + +const slackPayloadTs = (threadId: string): string => threadId.replace(/_/g, '.') + +const slackReplyPrefix = (channelDir: string, threadId: string): string => { + const marker = '__reply_watcher_marker__' + return slackReplyPath(channelDir, slackPathTs(threadId), marker).replace(`${marker}.json`, '') +} + +const eventIdentity = (event: ChangeEvent): string | undefined => { + const record = event as unknown as Record + const id = stringValue(record.id) ?? stringValue(record.event_id) ?? stringValue(record.seq) + return id ? `event:${id}` : undefined +} + +const parseSlackReply = (path: string, content: unknown): SlackReply | undefined => { + const raw = asRecord(parseJsonContent(content)) ?? {} + const payload = wrappedPayload(raw) + const channelDir = path.match(/^\/slack\/channels\/([^/]+)\//u)?.[1] ?? '' + const threadFromPath = path.match(/^\/slack\/channels\/[^/]+\/messages\/([^/]+)\/replies\//u)?.[1] ?? '' + const threadTs = stringValue(payload.thread_ts) ?? slackPayloadTs(threadFromPath) + if (!channelDir || !threadTs) { + return undefined + } + + return { + channelDir, + threadTs, + isBot: isOwnSlackBotReply(payload), + raw, + } +} + +const isOwnSlackBotReply = (payload: Record): boolean => + payload.user_is_bot === true || + Boolean(stringValue(payload.bot_id)) || + stringValue(payload.subtype) === 'bot_message' || + /agent[-_\s]?relay|relayfile|factory/u.test(stringValue(payload.user_name)?.toLowerCase() ?? '') + +const issueStateLabel = (issue: LinearIssue): string => { + const name = issue.state?.name?.trim() + if (name && issue.stateId) { + return `${name} (${issue.stateId})` + } + return name || issue.stateId || 'unknown state' +} + +const githubPrUrl = (repo: string, prNumber: number): string => `https://github.com/${repo}/pull/${prNumber}` + +const unrefDelay = (ms: number): Promise => new Promise((resolve) => { + const timer = setTimeout(resolve, ms) + timer.unref?.() +}) From 70992942cc2894eec3208718264d7018a6a2dfe0 Mon Sep 17 00:00:00 2001 From: "agent-relay-code[bot]" Date: Fri, 12 Jun 2026 10:14:37 +0000 Subject: [PATCH 2/5] chore: apply pr-reviewer fixes for #246 --- .../src/orchestrator/factory.test.ts | 107 +++++++++++++++++- .../factory-sdk/src/orchestrator/factory.ts | 74 ++++++------ 2 files changed, 145 insertions(+), 36 deletions(-) diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index ad01accb..0c12031e 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -124,13 +124,24 @@ class RecordingSlack implements SlackWriteback { readonly roots: Array<{ channel: string; text: string }> = [] readonly replies: Array<{ threadId: string; text: string }> = [] threadId = '1780751612.176219' + failPostThread = false + failReplies = 0 async postThread(root: { channel: string; text: string }): Promise<{ threadId: string }> { + if (this.failPostThread) { + throw new Error('slack post failed') + } + this.roots.push(root) return { threadId: this.threadId } } async reply(threadId: string, text: string): Promise { + if (this.failReplies > 0) { + this.failReplies -= 1 + throw new Error('slack reply failed') + } + this.replies.push({ threadId, text }) } } @@ -1051,9 +1062,103 @@ describe('FactoryLoop', () => { expect(slack.replies).toHaveLength(1) }) + + it('treats Slack dispatch thread startup as best-effort after agents are dispatched', async () => { + const mount = new FakeMountClient({ [issuePath(29)]: issueFile(29) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + slack.failPostThread = true + const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + const result = await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(29), issueFile(29)))) + + expect(result.agents.map((agent) => agent.name)).toEqual(['ar-29-impl', 'ar-29-review']) + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-29-impl', 'ar-29-review']) + }) + + it('continues processing Slack reply events after one response fails', async () => { + const mount = new FakeMountClient({ [issuePath(30)]: issueFile(30) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + slack.failReplies = 1 + const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(30), issueFile(30)))) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-fails'), 'slack-human-fails', { + text: 'status?', + user: 'U123', + user_is_bot: false, + }) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-next'), 'slack-human-next', { + text: 'status again?', + user: 'U456', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slack.replies).toHaveLength(1) + expect(slack.replies[0]?.text).toContain('AR-30') + }) + + it('uses numeric Slack reply event ids without dropping fresh low-seq replies', async () => { + const mount = new FakeMountClient({ [issuePath(31)]: issueFile(31) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const warnings: unknown[][] = [] + const logger = { + warn: (...args: unknown[]) => { + warnings.push(args) + }, + error: () => undefined, + debug: () => undefined, + } + mount.emit(changeEvent('/slack/channels/C0FACTORY__factory-e2e/messages/other/replies/old.json', 1)) + const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + logger, + }) + const replyPath = slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'human-numeric') + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(31), issueFile(31)))) + mount.files.set(replyPath, { + content: { + provider: 'slack', + objectType: 'message', + objectId: 'slack-human-numeric', + payload: { + channel: 'C0FACTORY', + thread_ts: slack.threadId, + ts: 'slack-human-numeric', + text: 'status?', + user: 'U123', + user_is_bot: false, + }, + }, + }) + mount.emit(changeEvent(replyPath, 1)) + await flush() + await flush() + + expect(slack.replies).toHaveLength(1) + expect(warnings.flat()).not.toContain('[factory] Slack reply event missing stable identity; falling back to path/content dedupe') + }) }) -const changeEvent = (path: string, id: string, occurredAt = new Date().toISOString()) => ({ +const changeEvent = (path: string, id: string | number, occurredAt = new Date().toISOString()) => ({ id, workspace: 'factory-test', type: 'relayfile.changed', diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 3ae2ddf8..bccd4c94 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -782,7 +782,11 @@ export class FactoryLoop implements Factory { const key = issueKey(record.issue) if (this.#slackThreadIds.has(key) || this.#slackWatcherStarts.has(key)) { - await this.#slackWatcherStarts.get(key) + try { + await this.#slackWatcherStarts.get(key) + } catch { + // The initiator logs Slack watcher startup failures. + } return } @@ -790,6 +794,8 @@ export class FactoryLoop implements Factory { this.#slackWatcherStarts.set(key, start) try { await start + } catch (error) { + this.#logger.warn?.(`[factory] failed to establish Slack dispatch thread for ${record.issue.key}`, error) } finally { this.#slackWatcherStarts.delete(key) } @@ -824,7 +830,6 @@ export class FactoryLoop implements Factory { const channelDir = this.#config.slack.channel const replyPrefix = slackReplyPrefix(channelDir, threadId) - const preExistingEvents = new Set() const preExistingPaths = new Set() const seenReplies = new Set() let missingIdentityLogged = false @@ -836,10 +841,6 @@ export class FactoryLoop implements Factory { const page = await this.#mount.getEvents({ limit: SLACK_REPLY_EVENTS_LIMIT }) cursor = page.nextCursor ?? undefined for (const event of page.events) { - const eventKey = eventIdentity(event) - if (eventKey) { - preExistingEvents.add(eventKey) - } if (event.resource.path.startsWith(replyPrefix)) { preExistingPaths.add(event.resource.path) } @@ -850,41 +851,43 @@ export class FactoryLoop implements Factory { } const handle = async (event: ChangeEvent): Promise => { - if (stopped || !event.resource.path.startsWith(replyPrefix)) { - return - } + try { + if (stopped || !event.resource.path.startsWith(replyPrefix)) { + return + } - const eventKey = eventIdentity(event) - if (!eventKey) { - if (!missingIdentityLogged) { - missingIdentityLogged = true - this.#logger.warn?.('[factory] Slack reply event missing stable identity; falling back to path/content dedupe') + const eventKey = eventIdentity(event) + if (!eventKey) { + if (!missingIdentityLogged) { + missingIdentityLogged = true + this.#logger.warn?.('[factory] Slack reply event missing stable identity; falling back to path/content dedupe') + } } - } else if (preExistingEvents.has(eventKey)) { - return - } - if (preExistingPaths.has(event.resource.path)) { - return - } + if (preExistingPaths.has(event.resource.path)) { + return + } - const reply = await this.#readSlackReply(event.resource.path) - if (!reply || reply.threadTs !== threadId || reply.channelDir !== channelDir) { - return - } + const reply = await this.#readSlackReply(event.resource.path) + if (!reply || reply.threadTs !== threadId || reply.channelDir !== channelDir) { + return + } - const replyKey = `${eventKey ?? event.resource.path}:${stableHash(JSON.stringify(reply.raw))}` - if (seenReplies.has(replyKey)) { - this.#logger.debug?.('[factory] suppressed duplicate Slack reply payload', { issue: record.issue.key, path: event.resource.path }) - return - } - seenReplies.add(replyKey) + const replyKey = `${eventKey ?? event.resource.path}:${stableHash(JSON.stringify(reply.raw))}` + if (seenReplies.has(replyKey)) { + this.#logger.debug?.('[factory] suppressed duplicate Slack reply payload', { issue: record.issue.key, path: event.resource.path }) + return + } + seenReplies.add(replyKey) - if (reply.isBot) { - return - } + if (reply.isBot) { + return + } - await this.#respondToSlackStatus(record, threadId) + await this.#respondToSlackStatus(record, threadId) + } catch (error) { + this.#logger.error?.('[factory] failed to handle Slack reply event', error) + } } await markPreExisting() @@ -1294,7 +1297,8 @@ const slackReplyPrefix = (channelDir: string, threadId: string): string => { const eventIdentity = (event: ChangeEvent): string | undefined => { const record = event as unknown as Record - const id = stringValue(record.id) ?? stringValue(record.event_id) ?? stringValue(record.seq) + const rawId = record.id ?? record.event_id ?? record.seq + const id = typeof rawId === 'string' || typeof rawId === 'number' ? String(rawId) : undefined return id ? `event:${id}` : undefined } From 6b60d309d352667feee665fdcfa44424f33fb1f0 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 13:00:28 +0200 Subject: [PATCH 3/5] Re-key Slack watcher on real thread ts --- .../factory-sdk/src/config/schema.test.ts | 1 + packages/factory-sdk/src/config/schema.ts | 1 + .../src/orchestrator/factory.test.ts | 87 ++++++++++++++++--- .../factory-sdk/src/orchestrator/factory.ts | 43 +++++---- packages/factory-sdk/src/writeback/slack.ts | 24 +++-- .../src/writeback/writeback.test.ts | 39 +++++++++ 6 files changed, 155 insertions(+), 40 deletions(-) diff --git a/packages/factory-sdk/src/config/schema.test.ts b/packages/factory-sdk/src/config/schema.test.ts index af79dac4..85169018 100644 --- a/packages/factory-sdk/src/config/schema.test.ts +++ b/packages/factory-sdk/src/config/schema.test.ts @@ -34,6 +34,7 @@ describe('FactoryConfigSchema', () => { expect(parsed.slack).toEqual({ channel: 'C123', style: 'threaded-summarized', + botUserId: 'U0B2596R7EZ', }) expect(parsed.mergePolicy).toBe('never') expect(parsed.stateIds).toEqual(LINEAR_STATE_IDS) diff --git a/packages/factory-sdk/src/config/schema.ts b/packages/factory-sdk/src/config/schema.ts index a7af7b4f..86a5839d 100644 --- a/packages/factory-sdk/src/config/schema.ts +++ b/packages/factory-sdk/src/config/schema.ts @@ -32,6 +32,7 @@ export const FactoryConfigSchema = z.object({ slack: z.object({ channel: z.string(), style: z.literal('threaded-summarized').default('threaded-summarized'), + botUserId: z.string().default('U0B2596R7EZ'), }).optional(), mergePolicy: z.enum(['never', 'on-green-with-review']).default('never'), stateIds: z.object({ diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 0c12031e..9992bf69 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -52,6 +52,12 @@ const realIssueFile = (n: number, stateId = ready, overrides: Record ({ + channel, + style: 'threaded-summarized' as const, + botUserId: 'U0B2596R7EZ', +}) + const flush = async () => { await new Promise((resolve) => setTimeout(resolve, 0)) } @@ -834,7 +840,7 @@ describe('FactoryLoop', () => { }, } const closeInputs: Array> = [] - const factory = createFactory(config({ slack: { channel: 'C0FACTORY', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig('C0FACTORY') }), { mount, fleet, triage: new StaticTriage(), @@ -924,7 +930,7 @@ describe('FactoryLoop', () => { const mount = new FakeMountClient({ [issuePath(24)]: issueFile(24) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() - const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, triage: new StaticTriage(), @@ -951,11 +957,36 @@ describe('FactoryLoop', () => { expect(slack.replies[0]?.text).toContain('https://github.com/AgentWorkforce/pear/pull/240') }) + it('watches top-level inbound Slack thread replies keyed by real reply ts', async () => { + const mount = new FakeMountClient({ [issuePath(32)]: issueFile(32) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(32), issueFile(32)))) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', '1780751619.000001', 'slack-human-top-level', { + text: 'status?', + thread_ts: slack.threadId, + user: 'U123', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slack.replies).toHaveLength(1) + expect(slack.replies[0]).toMatchObject({ threadId: slack.threadId }) + }) + it('ignores the factory bot own Slack replies to avoid self-response loops', async () => { const mount = new FakeMountClient({ [issuePath(25)]: issueFile(25) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() - const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, triage: new StaticTriage(), @@ -965,10 +996,9 @@ describe('FactoryLoop', () => { await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(25), issueFile(25)))) emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'bot-1'), 'slack-bot-1', { text: 'AR-25: Ready for Agent', - user: 'BFACTORY', + user: 'U0B2596R7EZ', user_name: 'file_by_agent_relay', - user_is_bot: true, - bot_id: 'BFACTORY', + user_is_bot: false, }) await flush() await flush() @@ -980,7 +1010,7 @@ describe('FactoryLoop', () => { const mount = new FakeMountClient({ [issuePath(26)]: issueFile(26) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() - const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, triage: new StaticTriage(), @@ -998,6 +1028,12 @@ describe('FactoryLoop', () => { user: 'U123', user_is_bot: false, }) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', slack.threadId, 'slack-parent-root', { + text: 'root parent mirror', + thread_ts: slack.threadId, + user: 'U123', + user_is_bot: false, + }) await flush() await flush() @@ -1014,7 +1050,7 @@ describe('FactoryLoop', () => { user: 'U123', user_is_bot: false, }) - const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, triage: new StaticTriage(), @@ -1042,7 +1078,7 @@ describe('FactoryLoop', () => { const mount = new FakeMountClient({ [issuePath(28)]: issueFile(28) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() - const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, triage: new StaticTriage(), @@ -1068,7 +1104,7 @@ describe('FactoryLoop', () => { const fleet = new FakeFleetClient() const slack = new RecordingSlack() slack.failPostThread = true - const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, triage: new StaticTriage(), @@ -1086,7 +1122,7 @@ describe('FactoryLoop', () => { const fleet = new FakeFleetClient() const slack = new RecordingSlack() slack.failReplies = 1 - const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, triage: new StaticTriage(), @@ -1124,7 +1160,7 @@ describe('FactoryLoop', () => { debug: () => undefined, } mount.emit(changeEvent('/slack/channels/C0FACTORY__factory-e2e/messages/other/replies/old.json', 1)) - const factory = createFactory(config({ slack: { channel: 'C0FACTORY__factory-e2e', style: 'threaded-summarized' } }), { + const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, triage: new StaticTriage(), @@ -1176,6 +1212,33 @@ const changeEvent = (path: string, id: string | number, occurredAt = new Date(). const slackReplyFixturePath = (channelDir: string, threadId: string, replyId: string): string => `/slack/channels/${channelDir}/messages/${threadId.replace(/\./g, '_')}/replies/${replyId}.json` +const slackTopLevelMessageFixturePath = (channelDir: string, messageTs: string): string => + `/slack/channels/${channelDir}/messages/${messageTs.replace(/\./g, '_')}/meta.json` + +const emitSlackTopLevelMessage = ( + mount: FakeMountClient, + channelDir: string, + messageTs: string, + id: string, + payload: Record, +): void => { + const path = slackTopLevelMessageFixturePath(channelDir, messageTs) + const channel = channelDir.split('__')[0] + mount.files.set(path, { + content: { + provider: 'slack', + objectType: 'message', + objectId: id, + payload: { + channel, + ts: messageTs, + ...payload, + }, + }, + }) + mount.emit(changeEvent(path, id)) +} + const emitSlackReply = ( mount: FakeMountClient, path: string, diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index bccd4c94..3c392a31 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -1,6 +1,5 @@ import { FactoryConfigSchema, type FactoryConfig } from '../config/schema' import { LINEAR_STATE_IDS, linearByStatePath } from '../constants/linear' -import { slackReplyPath } from '../constants/slack' import { GithubMergeGate, closeProbePr, type GithubMergeGate as GithubMergeGatePort } from '../github' import type { AgentSpec, ChangeEvent, FleetClient, LinearWriteback, MountClient, SlackWriteback, Subscription } from '../ports' import type { Clock, Logger } from '../ports/system' @@ -32,6 +31,8 @@ type SlackThreadWatcher = { stop(): Promise } type SlackReply = { channelDir: string threadTs: string + messageTs: string + isThreadReply: boolean isBot: boolean raw: Record } @@ -829,7 +830,7 @@ export class FactoryLoop implements Factory { } const channelDir = this.#config.slack.channel - const replyPrefix = slackReplyPrefix(channelDir, threadId) + const messagesPrefix = slackChannelMessagesPrefix(channelDir) const preExistingPaths = new Set() const seenReplies = new Set() let missingIdentityLogged = false @@ -841,7 +842,7 @@ export class FactoryLoop implements Factory { const page = await this.#mount.getEvents({ limit: SLACK_REPLY_EVENTS_LIMIT }) cursor = page.nextCursor ?? undefined for (const event of page.events) { - if (event.resource.path.startsWith(replyPrefix)) { + if (event.resource.path.startsWith(messagesPrefix)) { preExistingPaths.add(event.resource.path) } } @@ -852,7 +853,7 @@ export class FactoryLoop implements Factory { const handle = async (event: ChangeEvent): Promise => { try { - if (stopped || !event.resource.path.startsWith(replyPrefix)) { + if (stopped || !event.resource.path.startsWith(messagesPrefix)) { return } @@ -869,7 +870,7 @@ export class FactoryLoop implements Factory { } const reply = await this.#readSlackReply(event.resource.path) - if (!reply || reply.threadTs !== threadId || reply.channelDir !== channelDir) { + if (!reply || !reply.isThreadReply || reply.threadTs !== threadId || reply.channelDir !== channelDir) { return } @@ -894,7 +895,7 @@ export class FactoryLoop implements Factory { let subscription: Subscription | undefined try { - subscription = this.#mount.subscribe([`${replyPrefix}**`], (event) => { + subscription = this.#mount.subscribe([`${messagesPrefix}**`], (event) => { void handle(event) }) } catch (error) { @@ -936,7 +937,7 @@ export class FactoryLoop implements Factory { async #readSlackReply(path: string): Promise { try { const { content } = await this.#mount.readFile(path) - return parseSlackReply(path, content) + return parseSlackReply(path, content, this.#config.slack?.botUserId ?? 'U0B2596R7EZ') } catch (error) { this.#logger.warn?.(`Unable to read Slack reply ${path}`, error) return undefined @@ -1286,14 +1287,9 @@ const isCompletionReason = (reason?: string): boolean => const defaultRestartPolicy = (spec: AgentSpec): AgentSpec['restartPolicy'] | undefined => spec.role === 'implementer' ? { maxRestarts: 3, strategy: 'resume' } as AgentSpec['restartPolicy'] : spec.restartPolicy -const slackPathTs = (threadTs: string): string => threadTs.replace(/\./g, '_') - const slackPayloadTs = (threadId: string): string => threadId.replace(/_/g, '.') -const slackReplyPrefix = (channelDir: string, threadId: string): string => { - const marker = '__reply_watcher_marker__' - return slackReplyPath(channelDir, slackPathTs(threadId), marker).replace(`${marker}.json`, '') -} +const slackChannelMessagesPrefix = (channelDir: string): string => `/slack/channels/${channelDir}/messages/` const eventIdentity = (event: ChangeEvent): string | undefined => { const record = event as unknown as Record @@ -1302,29 +1298,32 @@ const eventIdentity = (event: ChangeEvent): string | undefined => { return id ? `event:${id}` : undefined } -const parseSlackReply = (path: string, content: unknown): SlackReply | undefined => { +const parseSlackReply = (path: string, content: unknown, botUserId: string): SlackReply | undefined => { const raw = asRecord(parseJsonContent(content)) ?? {} const payload = wrappedPayload(raw) const channelDir = path.match(/^\/slack\/channels\/([^/]+)\//u)?.[1] ?? '' - const threadFromPath = path.match(/^\/slack\/channels\/[^/]+\/messages\/([^/]+)\/replies\//u)?.[1] ?? '' - const threadTs = stringValue(payload.thread_ts) ?? slackPayloadTs(threadFromPath) - if (!channelDir || !threadTs) { + const pathMatch = path.match(/^\/slack\/channels\/[^/]+\/messages\/([^/]+)(?:\/replies\/([^/]+))?/u) + const parentFromPath = pathMatch?.[2] ? slackPayloadTs(pathMatch[1]) : undefined + const messageFromPath = slackPayloadTs(pathMatch?.[2] ?? pathMatch?.[1] ?? '') + const messageTs = stringValue(payload.ts) ?? messageFromPath + const threadTs = stringValue(payload.thread_ts) ?? parentFromPath + if (!channelDir || !threadTs || !messageTs) { return undefined } return { channelDir, threadTs, - isBot: isOwnSlackBotReply(payload), + messageTs, + isThreadReply: Boolean(parentFromPath) || threadTs !== messageTs, + isBot: isOwnSlackBotReply(payload, botUserId), raw, } } -const isOwnSlackBotReply = (payload: Record): boolean => +const isOwnSlackBotReply = (payload: Record, botUserId: string): boolean => payload.user_is_bot === true || - Boolean(stringValue(payload.bot_id)) || - stringValue(payload.subtype) === 'bot_message' || - /agent[-_\s]?relay|relayfile|factory/u.test(stringValue(payload.user_name)?.toLowerCase() ?? '') + stringValue(payload.user) === botUserId const issueStateLabel = (issue: LinearIssue): string => { const name = issue.state?.name?.trim() diff --git a/packages/factory-sdk/src/writeback/slack.ts b/packages/factory-sdk/src/writeback/slack.ts index 06eda71c..0b5e3a00 100644 --- a/packages/factory-sdk/src/writeback/slack.ts +++ b/packages/factory-sdk/src/writeback/slack.ts @@ -1,6 +1,6 @@ import { slackMessagePath, slackReplyPath } from '../constants/slack' import type { MountClient } from '../ports' -import { safePathSegment, stableHash, trimToLines } from './shared' +import { safePathSegment, stableHash, trimToLines, wrappedPayload } from './shared' export interface MountSlackWritebackConfig { channel?: string @@ -65,13 +65,19 @@ const rootClientId = (prefix: string, channelDir: string, text: string): string const replyClientId = (prefix: string, threadId: string, text: string): string => `${safePathSegment(prefix)}-reply-${safePathSegment(threadId)}-${stableHash(text)}` -const confirmPath = async (mount: MountClient, path: string): Promise => { +const confirmPath = async (mount: MountClient, path: string): Promise => { const confirmation = await mount.confirmWrite(path, { timeoutMs: 90_000 }) if (confirmation !== 'acked') { throw new Error(`Writeback not acked for ${path}: ${confirmation}`) } - await mount.readFile(path) + return (await mount.readFile(path)).content +} + +const slackTsFromContent = (content: unknown): string | undefined => { + const payload = wrappedPayload(content) + const ts = stringValue(payload.ts) ?? stringValue(payload.thread_ts) + return ts && /^\d+\.\d+$/u.test(ts) ? ts : undefined } export const MountSlackWriteback = ( @@ -93,10 +99,14 @@ export const MountSlackWriteback = ( const path = slackMessagePath(channelDir, clientId) await mount.writeFile(path, { channelId, text }, { guarded: true }) - await confirmPath(mount, path) + const content = await confirmPath(mount, path) + const threadTs = slackTsFromContent(content) ?? clientId - threads.set(clientId, { channelDir, channelId, threadTs: clientId }) - return { threadId: clientId } + threads.set(threadTs, { channelDir, channelId, threadTs }) + if (threadTs !== clientId) { + threads.set(clientId, { channelDir, channelId, threadTs }) + } + return { threadId: threadTs } }, async reply(threadId: string, text: string): Promise { @@ -125,3 +135,5 @@ export const MountSlackWriteback = ( }, } } + +const stringValue = (value: unknown): string | undefined => typeof value === 'string' ? value : undefined diff --git a/packages/factory-sdk/src/writeback/writeback.test.ts b/packages/factory-sdk/src/writeback/writeback.test.ts index 09b8ac88..f829a0c3 100644 --- a/packages/factory-sdk/src/writeback/writeback.test.ts +++ b/packages/factory-sdk/src/writeback/writeback.test.ts @@ -448,6 +448,45 @@ describe('MountSlackWriteback', () => { }) }) + it('returns the real Slack parent ts from the acked thread root when available', async () => { + class AckedSlackMountClient extends FakeMountClient { + override async writeFile(path: string, content: unknown, opts?: { guarded?: boolean }): Promise { + await super.writeFile(path, content, opts) + this.files.set(path, { + content: { + provider: 'slack', + objectType: 'message', + payload: { + channel: 'C0FACTORY', + ts: '1780751612.176219', + text: 'Factory update', + }, + }, + }) + } + } + const mount = new AckedSlackMountClient() + const slack = MountSlackWriteback(mount, { + channel: 'C0FACTORY__factory-e2e', + channelDir: 'C0FACTORY__factory-e2e', + clientIdPrefix: 'factory-e2e', + }) + + const root = await slack.postThread({ + channel: 'C0FACTORY__factory-e2e', + text: 'Factory update', + }) + await slack.reply(root.threadId, 'Factory reply') + + expect(root.threadId).toBe('1780751612.176219') + expect(mount.writes[1]?.path).toContain('/slack/channels/C0FACTORY__factory-e2e/messages/1780751612_176219/replies/') + expect(mount.writes[1]?.content).toMatchObject({ + channelId: 'C0FACTORY', + thread_ts: '1780751612.176219', + text: 'Factory reply', + }) + }) + it('surfaces non-acked thread writes even when local read-back succeeds', async () => { class TimeoutAfterWriteMountClient extends FakeMountClient { override async confirmWrite( From 0780aaf3ce5ad2c5fa121688d33517549c8f0fd0 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 13:03:18 +0200 Subject: [PATCH 4/5] Add degraded Slack watcher safety tests --- .../src/orchestrator/factory.test.ts | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 9992bf69..24c9882a 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -982,6 +982,92 @@ describe('FactoryLoop', () => { expect(slack.replies[0]).toMatchObject({ threadId: slack.threadId }) }) + it.each([ + ['user_is_bot marker', { user: 'U-BOT-MIRROR', user_is_bot: true }], + ['configured bot user id', { user: 'U0B2596R7EZ', user_is_bot: false }], + ])('degraded self-ignore: inbound %s is ignored', async (_name, marker) => { + const mount = new FakeMountClient({ [issuePath(33)]: issueFile(33) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(33), issueFile(33)))) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', `1780751620.${marker.user === 'U0B2596R7EZ' ? '000002' : '000001'}`, `slack-self-${marker.user}`, { + text: 'status?', + thread_ts: slack.threadId, + ...marker, + }) + await flush() + await flush() + + expect(slack.replies).toEqual([]) + }) + + it('degraded thread/channel guard: off-thread, mismatched-thread, and wrong-channel replies are skipped', async () => { + const mount = new FakeMountClient({ [issuePath(34)]: issueFile(34) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(34), issueFile(34)))) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', '1780751621.000001', 'slack-mismatched-thread', { + text: 'wrong parent', + thread_ts: '1780759999.000001', + user: 'U123', + user_is_bot: false, + }) + emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', '1780759999.000001', 'human-off-thread'), 'slack-off-thread', { + text: 'wrong nested parent', + user: 'U123', + user_is_bot: false, + }) + emitSlackTopLevelMessage(mount, 'C0PRODUCT__general', '1780751621.000002', 'slack-wrong-channel', { + text: 'right parent wrong channel', + thread_ts: slack.threadId, + user: 'U123', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slack.replies).toEqual([]) + }) + + it('degraded positive control: genuine human reply in the watched thread is answered', async () => { + const mount = new FakeMountClient({ [issuePath(35)]: issueFile(35) }) + const fleet = new FakeFleetClient() + const slack = new RecordingSlack() + const factory = createFactory(config({ slack: slackConfig() }), { + mount, + fleet, + triage: new StaticTriage(), + slack, + }) + + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(35), issueFile(35)))) + emitSlackTopLevelMessage(mount, 'C0FACTORY__factory-e2e', '1780751622.000001', 'slack-human-positive', { + text: 'status?', + thread_ts: slack.threadId, + user: 'U-HUMAN', + user_is_bot: false, + }) + await flush() + await flush() + + expect(slack.replies).toHaveLength(1) + expect(slack.replies[0]?.text).toContain('AR-35') + }) + it('ignores the factory bot own Slack replies to avoid self-response loops', async () => { const mount = new FakeMountClient({ [issuePath(25)]: issueFile(25) }) const fleet = new FakeFleetClient() From fd5228157a3bb400bd4e402d76d13a5612cc3686 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 13:44:50 +0200 Subject: [PATCH 5/5] Force Slack responder onto cloud writeback --- .../src/mount/relayfile-cloud-mount-client.ts | 1 + .../src/orchestrator/factory.test.ts | 163 ++++++++++++------ .../factory-sdk/src/orchestrator/factory.ts | 2 +- packages/factory-sdk/src/ports/mount.ts | 1 + packages/factory-sdk/src/testing/fakes.ts | 1 + packages/factory-sdk/src/writeback/slack.ts | 7 + .../src/writeback/writeback.test.ts | 39 +++++ 7 files changed, 165 insertions(+), 49 deletions(-) diff --git a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts index c7d39d65..55b307d0 100644 --- a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts +++ b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts @@ -61,6 +61,7 @@ export type RelayFileClientLike = export class RelayfileCloudMountClient implements MountClient { readonly workspaceId: string + readonly writebackTransport = 'relayfile-cloud' readonly #client: RelayFileClientLike readonly #tokenProvider: TokenProvider diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 24c9882a..b628fbbe 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -152,6 +152,59 @@ class RecordingSlack implements SlackWriteback { } } +class CloudWritebackFakeMountClient extends FakeMountClient { + constructor(initialFiles: Record = {}, readonly threadTs = '1780751612.176219') { + super(initialFiles) + } + + override async writeFile(path: string, content: unknown, opts?: { guarded?: boolean }): Promise { + await super.writeFile(path, content, opts) + if (isSlackRootWritePath(path)) { + this.files.set(path, { + content: { + provider: 'slack', + objectType: 'message', + payload: { + ...record(content), + ts: this.threadTs, + thread_ts: this.threadTs, + }, + }, + }) + } + } +} + +class ConfirmRecordingSlackMountClient extends CloudWritebackFakeMountClient { + readonly confirmedPaths: string[] = [] + + override async confirmWrite(path: string, opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> { + this.confirmedPaths.push(path) + return super.confirmWrite(path, opts) + } +} + +class FailFirstSlackReplyMountClient extends CloudWritebackFakeMountClient { + failedReply = false + + override async confirmWrite(path: string, opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> { + if (!this.failedReply && path.includes('/replies/')) { + this.failedReply = true + return 'failed' + } + return super.confirmWrite(path, opts) + } +} + +class FailSlackRootMountClient extends CloudWritebackFakeMountClient { + override async confirmWrite(path: string, opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> { + if (isSlackRootWritePath(path)) { + return 'failed' + } + return super.confirmWrite(path, opts) + } +} + describe('FactoryLoop', () => { it('parses wrapped Linear issue records', () => { expect(parseLinearIssue(issuePath(1), issueFile(1))).toMatchObject({ @@ -795,7 +848,17 @@ describe('FactoryLoop', () => { it('closes a synthetic probe PR after done writebacks and before release when mergePolicy is never', async () => { const order: string[] = [] - const mount = new FakeMountClient({ + class OrderedSlackMountClient extends CloudWritebackFakeMountClient { + override async writeFile(path: string, content: unknown, opts?: { guarded?: boolean }): Promise { + if (isSlackRootWritePath(path)) { + order.push('slack-root') + } else if (path.includes('/replies/')) { + order.push('slack-reply') + } + await super.writeFile(path, content, opts) + } + } + const mount = new OrderedSlackMountClient({ [issuePath(18)]: issueFile(18), '/github/repos/AgentWorkforce__pear/pulls/by-id/18.json': { provider: 'github', @@ -830,22 +893,12 @@ describe('FactoryLoop', () => { return true }, } - const slack: SlackWriteback = { - async postThread() { - order.push('slack-root') - return { threadId: 'thread-1' } - }, - async reply() { - order.push('slack-reply') - }, - } const closeInputs: Array> = [] const factory = createFactory(config({ slack: slackConfig('C0FACTORY') }), { mount, fleet, triage: new StaticTriage(), linear, - slack, probeCloser: async (input) => { order.push('probe-close') closeInputs.push(input) @@ -927,7 +980,7 @@ describe('FactoryLoop', () => { }) it('watches the in-flight factory Slack thread and replies to a human status request with live state, roster, and PR', async () => { - const mount = new FakeMountClient({ [issuePath(24)]: issueFile(24) }) + const mount = new ConfirmRecordingSlackMountClient({ [issuePath(24)]: issueFile(24) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const factory = createFactory(config({ slack: slackConfig() }), { @@ -948,17 +1001,20 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toHaveLength(1) - expect(slack.replies[0]).toMatchObject({ threadId: slack.threadId }) - expect(slack.replies[0]?.text).toContain('AR-24') - expect(slack.replies[0]?.text).toContain(implementing) - expect(slack.replies[0]?.text).toContain('ar-24-impl') - expect(slack.replies[0]?.text).toContain('ar-24-review') - expect(slack.replies[0]?.text).toContain('https://github.com/AgentWorkforce/pear/pull/240') + const replies = slackReplyWrites(mount) + expect(slack.replies).toEqual([]) + expect(replies).toHaveLength(1) + expect(replies[0]?.content.thread_ts).toBe(slack.threadId) + expect(replies[0]?.content.text).toContain('AR-24') + expect(replies[0]?.content.text).toContain(implementing) + expect(replies[0]?.content.text).toContain('ar-24-impl') + expect(replies[0]?.content.text).toContain('ar-24-review') + expect(replies[0]?.content.text).toContain('https://github.com/AgentWorkforce/pear/pull/240') + expect(mount.confirmedPaths.filter((path) => path.includes('/replies/'))).toEqual([replies[0]?.path]) }) it('watches top-level inbound Slack thread replies keyed by real reply ts', async () => { - const mount = new FakeMountClient({ [issuePath(32)]: issueFile(32) }) + const mount = new CloudWritebackFakeMountClient({ [issuePath(32)]: issueFile(32) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const factory = createFactory(config({ slack: slackConfig() }), { @@ -978,15 +1034,17 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toHaveLength(1) - expect(slack.replies[0]).toMatchObject({ threadId: slack.threadId }) + const replies = slackReplyWrites(mount) + expect(slack.replies).toEqual([]) + expect(replies).toHaveLength(1) + expect(replies[0]?.content.thread_ts).toBe(slack.threadId) }) it.each([ ['user_is_bot marker', { user: 'U-BOT-MIRROR', user_is_bot: true }], ['configured bot user id', { user: 'U0B2596R7EZ', user_is_bot: false }], ])('degraded self-ignore: inbound %s is ignored', async (_name, marker) => { - const mount = new FakeMountClient({ [issuePath(33)]: issueFile(33) }) + const mount = new CloudWritebackFakeMountClient({ [issuePath(33)]: issueFile(33) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const factory = createFactory(config({ slack: slackConfig() }), { @@ -1005,11 +1063,11 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toEqual([]) + expect(slackReplyWrites(mount)).toEqual([]) }) it('degraded thread/channel guard: off-thread, mismatched-thread, and wrong-channel replies are skipped', async () => { - const mount = new FakeMountClient({ [issuePath(34)]: issueFile(34) }) + const mount = new CloudWritebackFakeMountClient({ [issuePath(34)]: issueFile(34) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const factory = createFactory(config({ slack: slackConfig() }), { @@ -1040,11 +1098,11 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toEqual([]) + expect(slackReplyWrites(mount)).toEqual([]) }) it('degraded positive control: genuine human reply in the watched thread is answered', async () => { - const mount = new FakeMountClient({ [issuePath(35)]: issueFile(35) }) + const mount = new CloudWritebackFakeMountClient({ [issuePath(35)]: issueFile(35) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const factory = createFactory(config({ slack: slackConfig() }), { @@ -1064,12 +1122,13 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toHaveLength(1) - expect(slack.replies[0]?.text).toContain('AR-35') + const replies = slackReplyWrites(mount) + expect(replies).toHaveLength(1) + expect(replies[0]?.content.text).toContain('AR-35') }) it('ignores the factory bot own Slack replies to avoid self-response loops', async () => { - const mount = new FakeMountClient({ [issuePath(25)]: issueFile(25) }) + const mount = new CloudWritebackFakeMountClient({ [issuePath(25)]: issueFile(25) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const factory = createFactory(config({ slack: slackConfig() }), { @@ -1089,11 +1148,11 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toEqual([]) + expect(slackReplyWrites(mount)).toEqual([]) }) it('does not respond to Slack replies outside the watched factory-e2e issue thread', async () => { - const mount = new FakeMountClient({ [issuePath(26)]: issueFile(26) }) + const mount = new CloudWritebackFakeMountClient({ [issuePath(26)]: issueFile(26) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const factory = createFactory(config({ slack: slackConfig() }), { @@ -1123,11 +1182,11 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toEqual([]) + expect(slackReplyWrites(mount)).toEqual([]) }) it('connects Slack reply watchers from now and does not reprocess pre-existing thread replies', async () => { - const mount = new FakeMountClient({ [issuePath(27)]: issueFile(27) }) + const mount = new CloudWritebackFakeMountClient({ [issuePath(27)]: issueFile(27) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const oldPath = slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'old-human') @@ -1147,7 +1206,7 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toEqual([]) + expect(slackReplyWrites(mount)).toEqual([]) emitSlackReply(mount, slackReplyFixturePath('C0FACTORY__factory-e2e', slack.threadId, 'new-human'), 'slack-new-human', { text: 'new status?', @@ -1157,11 +1216,11 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toHaveLength(1) + expect(slackReplyWrites(mount)).toHaveLength(1) }) it('dedupes duplicate inbound Slack reply delivery by event identity and content', async () => { - const mount = new FakeMountClient({ [issuePath(28)]: issueFile(28) }) + const mount = new CloudWritebackFakeMountClient({ [issuePath(28)]: issueFile(28) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const factory = createFactory(config({ slack: slackConfig() }), { @@ -1182,19 +1241,16 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toHaveLength(1) + expect(slackReplyWrites(mount)).toHaveLength(1) }) it('treats Slack dispatch thread startup as best-effort after agents are dispatched', async () => { - const mount = new FakeMountClient({ [issuePath(29)]: issueFile(29) }) + const mount = new FailSlackRootMountClient({ [issuePath(29)]: issueFile(29) }) const fleet = new FakeFleetClient() - const slack = new RecordingSlack() - slack.failPostThread = true const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, triage: new StaticTriage(), - slack, }) const result = await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(29), issueFile(29)))) @@ -1204,10 +1260,9 @@ describe('FactoryLoop', () => { }) it('continues processing Slack reply events after one response fails', async () => { - const mount = new FakeMountClient({ [issuePath(30)]: issueFile(30) }) + const mount = new FailFirstSlackReplyMountClient({ [issuePath(30)]: issueFile(30) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() - slack.failReplies = 1 const factory = createFactory(config({ slack: slackConfig() }), { mount, fleet, @@ -1229,12 +1284,13 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toHaveLength(1) - expect(slack.replies[0]?.text).toContain('AR-30') + const replies = slackReplyWrites(mount) + expect(replies).toHaveLength(2) + expect(replies[1]?.content.text).toContain('AR-30') }) it('uses numeric Slack reply event ids without dropping fresh low-seq replies', async () => { - const mount = new FakeMountClient({ [issuePath(31)]: issueFile(31) }) + const mount = new CloudWritebackFakeMountClient({ [issuePath(31)]: issueFile(31) }) const fleet = new FakeFleetClient() const slack = new RecordingSlack() const warnings: unknown[][] = [] @@ -1275,7 +1331,7 @@ describe('FactoryLoop', () => { await flush() await flush() - expect(slack.replies).toHaveLength(1) + expect(slackReplyWrites(mount)).toHaveLength(1) expect(warnings.flat()).not.toContain('[factory] Slack reply event missing stable identity; falling back to path/content dedupe') }) }) @@ -1348,3 +1404,14 @@ const emitSlackReply = ( }) mount.emit(changeEvent(path, id)) } + +const isSlackRootWritePath = (path: string): boolean => + /^\/slack\/channels\/[^/]+\/messages\/[^/]+\.json$/u.test(path) + +const slackReplyWrites = (mount: FakeMountClient): Array<{ path: string; content: { text?: string; thread_ts?: string } }> => + mount.writes + .filter((write) => write.path.includes('/replies/')) + .map((write) => ({ path: write.path, content: record(write.content) as { text?: string; thread_ts?: string } })) + +const record = (value: unknown): Record => + value !== null && typeof value === 'object' && !Array.isArray(value) ? value as Record : {} diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 3c392a31..92a898ed 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -104,7 +104,7 @@ export class FactoryLoop implements Factory { stateIds: config.stateIds, safety: config.safety, }) - this.#slack = ports.slack ?? (config.slack ? MountSlackWriteback(ports.mount, config.slack) : undefined) + this.#slack = config.slack ? MountSlackWriteback(ports.mount, config.slack) : ports.slack void (ports.github ?? MountGithubRead(ports.mount)) this.#mergeGate = ports.mergeGate ?? new GithubMergeGate() this.#probeCloser = ports.probeCloser ?? closeProbePr diff --git a/packages/factory-sdk/src/ports/mount.ts b/packages/factory-sdk/src/ports/mount.ts index 5a2ee426..93a41871 100644 --- a/packages/factory-sdk/src/ports/mount.ts +++ b/packages/factory-sdk/src/ports/mount.ts @@ -20,6 +20,7 @@ export interface EventPage { } export interface MountClient { + readonly writebackTransport?: 'relayfile-cloud' | 'test' readFile(path: string): Promise<{ content: unknown; revision?: string }> writeFile(path: string, content: unknown, opts?: { guarded?: boolean }): Promise deleteFile(path: string): Promise diff --git a/packages/factory-sdk/src/testing/fakes.ts b/packages/factory-sdk/src/testing/fakes.ts index 2b9898c3..e66d8dea 100644 --- a/packages/factory-sdk/src/testing/fakes.ts +++ b/packages/factory-sdk/src/testing/fakes.ts @@ -16,6 +16,7 @@ type ExitListener = (name: string, reason?: string) => void type DeliveryFailedListener = (info: { to: string; msgId?: string; reason?: string }) => void export class FakeMountClient implements MountClient { + readonly writebackTransport = 'test' readonly files = new Map() readonly writes: Array<{ path: string; content: unknown }> = [] readonly deletes: string[] = [] diff --git a/packages/factory-sdk/src/writeback/slack.ts b/packages/factory-sdk/src/writeback/slack.ts index 0b5e3a00..3e79e6ff 100644 --- a/packages/factory-sdk/src/writeback/slack.ts +++ b/packages/factory-sdk/src/writeback/slack.ts @@ -86,9 +86,15 @@ export const MountSlackWriteback = ( ) => { const threads = new Map() const prefix = slackCfg.clientIdPrefix ?? 'factory' + const assertCloudWriteback = (): void => { + if (mount.writebackTransport !== 'relayfile-cloud' && mount.writebackTransport !== 'test') { + throw new Error('Slack writeback requires RelayfileCloudMountClient cloud writeback transport') + } + } return { async postThread(root: { channel: string; text: string }): Promise<{ threadId: string }> { + assertCloudWriteback() const channelDir = slackCfg.channelDir ?? root.channel ?? slackCfg.channel assertSlackChannelAllowed(slackCfg.channel, root.channel, 'postThread') assertSlackChannelAllowed(slackCfg.channel, channelDir, 'postThread path') @@ -110,6 +116,7 @@ export const MountSlackWriteback = ( }, async reply(threadId: string, text: string): Promise { + assertCloudWriteback() const fallbackChannelDir = slackCfg.channelDir ?? slackCfg.channel assertSlackChannelAllowed(slackCfg.channel, fallbackChannelDir, 'reply') const ref = threads.get(threadId) ?? ( diff --git a/packages/factory-sdk/src/writeback/writeback.test.ts b/packages/factory-sdk/src/writeback/writeback.test.ts index f829a0c3..483d2bc6 100644 --- a/packages/factory-sdk/src/writeback/writeback.test.ts +++ b/packages/factory-sdk/src/writeback/writeback.test.ts @@ -4,6 +4,7 @@ import { FactoryConfigSchema } from '../config/schema' import { linearCommentPath } from '../constants/linear' import { slackReplyPath } from '../constants/slack' import { createFactory, linearCommentName, MountGithubRead, MountLinearWriteback, MountSlackWriteback } from '../index' +import type { MountClient } from '../ports' import type { LinearIssue } from '../types' import { FakeFleetClient, FakeMountClient } from '../testing' @@ -487,6 +488,44 @@ describe('MountSlackWriteback', () => { }) }) + it('refuses Slack writes over a local mirror mount even if file writes appear acked', async () => { + const writes: Array<{ path: string; content: unknown }> = [] + const localMirrorMount: MountClient = { + async readFile() { + return { content: {} } + }, + async writeFile(path, content) { + writes.push({ path, content }) + }, + async deleteFile() { + throw new Error('local mirror delete is not used') + }, + async listTree() { + return [] + }, + subscribe() { + return { unsubscribe: async () => undefined } + }, + async getEvents() { + return { events: [] } + }, + async confirmWrite() { + return 'acked' + }, + async ensureSubRoot() { + return 'ready' + }, + } + const slack = MountSlackWriteback(localMirrorMount, { + channel: 'C0FACTORY__factory-e2e', + channelDir: 'C0FACTORY__factory-e2e', + }) + + await expect(slack.reply('1780751612.176219', 'Factory reply')) + .rejects.toThrow(/requires RelayfileCloudMountClient cloud writeback transport/) + expect(writes).toEqual([]) + }) + it('surfaces non-acked thread writes even when local read-back succeeds', async () => { class TimeoutAfterWriteMountClient extends FakeMountClient { override async confirmWrite(