diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 8234b11a..dcd54cfb 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -9,6 +9,7 @@ import { integrationSubscriptionSummaries, localWatchEventPathsForFilename, localWatchRootsFor, + relayfileSdkPathFiltersFor, resetIntegrationEventTelemetryForTests } from '../integration-event-bridge.ts' import type { ConnectedIntegration } from '../integrations.ts' @@ -128,6 +129,7 @@ function makeHarness( content: string encoding: 'utf-8' | 'base64' } + failReadFile?: boolean sendDelayMs?: number onSendStart?: (activeSends: number) => void waitForDeliveryNeverSettles?: boolean @@ -164,6 +166,7 @@ function makeHarness( }, async readFile(workspaceId, path) { readFileCalls.push({ workspaceId, path }) + if (options.failReadFile) throw new Error('remote file not ready') return options.readFileResponse?.(workspaceId, path) ?? { path, revision: 'rev-1', @@ -224,6 +227,22 @@ beforeEach(() => { delete process.env.PEAR_INTEGRATION_EVENTS_DEBUG }) +test('relayfile sdk path filters broaden partial-segment Slack DM globs', () => { + assert.deepEqual(relayfileSdkPathFiltersFor([ + '/slack/channels/C123ABC/**', + '/slack/channels/C123ABC__proj-cloud/**', + '/slack/channels/D*/**', + '/slack/dms/*/**', + '/slack/users/*/messages/**' + ]), [ + '/slack/channels/*/**', + '/slack/channels/C123ABC/**', + '/slack/channels/C123ABC__proj-cloud/**', + '/slack/dms/*/**', + '/slack/users/*/messages/**' + ]) +}) + async function waitForSent(harness: { sent: SentMessage[] }, count: number, timeoutMs = 1_000): Promise { const deadline = Date.now() + timeoutMs while (harness.sent.length < count && Date.now() < deadline) { @@ -257,7 +276,7 @@ test('integration events route only to the targets for the matching integration assert.deepEqual(harness.subscribeCalls[0].globs, ['/github/repos/**', '/linear/issues/**']) assert.deepEqual(harness.subscribeCalls[0].options?.pathScope, ['/github/repos/**', '/linear/issues/**']) - assert.equal(harness.subscribeCalls[0].options?.from, 'now') + assert.equal(harness.subscribeCalls[0].options?.from, 'legacy') await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) await waitForSent(harness, 1) @@ -354,19 +373,22 @@ test('integration events watch selected relayfile mount paths', async () => { }) assert.deepEqual(harness.subscribeCalls[0].globs, [ + '/slack/channels/C123ABC/**', '/slack/channels/C123ABC__proj-cloud/**', '/slack/channels/D*/**', '/slack/dms/*/**', '/slack/users/*/messages/**' ]) assert.deepEqual(harness.subscribeCalls[0].options?.pathScope, [ + '/slack/channels/C123ABC/**', '/slack/channels/C123ABC__proj-cloud/**', '/slack/channels/D*/**', '/slack/dms/*/**', '/slack/users/*/messages/**' ]) - assert.equal(harness.subscribeCalls[0].options?.from, 'now') + assert.equal(harness.subscribeCalls[0].options?.from, 'legacy') assert.deepEqual(integrationSubscriptionSummaries([slackIntegration])[0].watches, [ + '.integrations/slack/channels/C123ABC/**', '.integrations/slack/channels/C123ABC__proj-cloud/**', '.integrations/slack/channels/D*/**', '.integrations/slack/dms/*/**', @@ -378,10 +400,12 @@ test('integration events watch selected relayfile mount paths', async () => { await waitForSent(harness, 1) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) + assert.match(harness.sent[0].input.text, /Slack message event/u) + assert.match(harness.sent[0].input.text, /Location: #proj-cloud/u) assert.match(harness.sent[0].input.text, /Path: \.integrations\/slack\/channels\/C123ABC__proj-cloud\/messages\/1780668000_000000\/meta\.json/u) - assert.match(harness.sent[0].input.text, /Relayfile path: \/slack\/channels\/C123ABC__proj-cloud\/messages\/1780668000_000000\/meta\.json/u) - assert.match(harness.sent[0].input.text, /Inline context preview:/u) - assert.match(harness.sent[0].input.text, /targeted Slack context/u) + assert.match(harness.sent[0].input.text, /Message:\ntargeted Slack context/u) + assert.doesNotMatch(harness.sent[0].input.text, /Relayfile path:/u) + assert.doesNotMatch(harness.sent[0].input.text, /Inline context preview:/u) assert.equal(harness.sent[0].input.data?.provider, 'slack') assert.equal(harness.sent[0].input.data?.resourcePath, selectedPath) assert.equal(harness.sent[0].input.data?.resourceId, 'meta.json') @@ -396,8 +420,17 @@ test('integration events watch selected relayfile mount paths', async () => { harness.sent.splice(0) harness.readFileCalls.splice(0) - await harness.emit(changeEvent('/slack/channels/C123ABC/messages/1780668060_000000/meta.json', 'slack')) - assert.deepEqual(harness.sent, []) + const canonicalPath = '/slack/channels/C123ABC/messages/1780668060_000000/meta.json' + await harness.emit(changeEvent(canonicalPath, 'slack')) + await waitForSent(harness, 1) + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) + assert.match(harness.sent[0].input.text, /Path: \.integrations\/slack\/channels\/C123ABC\/messages\/1780668060_000000\/meta\.json/u) + assert.deepEqual(harness.readFileCalls, [ + { + workspaceId: 'workspace-id', + path: canonicalPath + } + ]) harness.sent.splice(0) await harness.emit(changeEvent('/slack/channels/C999XYZ/messages/1780668120_000000/meta.json', 'slack')) @@ -408,6 +441,70 @@ test('integration events watch selected relayfile mount paths', async () => { assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) }) +test('slack raw-id and slug alias paths with distinct revisions inject once per logical message', async () => { + let slackText = 'original Slack message' + const harness = makeHarness(['alice'], { + readFileResponse: (_workspaceId, path) => ({ + path, + revision: 'rev-context', + contentType: 'application/json', + content: JSON.stringify({ provider: 'slack', text: slackText }), + encoding: 'utf-8' + }) + }) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + downloadHistoricalData: false, + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + // Cloud writes the same Slack message to both the raw channel-id tree and + // the `__` slug tree; each copy is a distinct file with a distinct + // revision, so revision-based fingerprints can never match across the + // copies (probe #1: evt_143356 raw / evt_143358 slug both injected). + await harness.emit(changeEvent( + '/slack/channels/C123ABC/messages/1780668000_000000/meta.json', + 'slack', + { digest: 'revision:raw-copy-1' } + )) + await waitForSent(harness, 1) + + await harness.emit(changeEvent( + '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json', + 'slack', + { digest: 'revision:slug-copy-1' } + )) + await waitForDropped('project-1', 1) + assert.equal(harness.sent.length, 1) + + // A real edit to the same Slack record must not be swallowed by the + // long-lived retry/alias dedupe window. + slackText = 'edited Slack message' + await harness.emit(changeEvent( + '/slack/channels/C123ABC/messages/1780668000_000000/meta.json', + 'slack', + { digest: 'revision:raw-copy-2' } + )) + await waitForSent(harness, 2) + assert.match(harness.sent[1].input.text, /Message:\nedited Slack message/u) + + // A different logical message via either alias form still injects. + await harness.emit(changeEvent( + '/slack/channels/C123ABC__proj-cloud/messages/1780668060_000000/meta.json', + 'slack', + { digest: 'revision:slug-copy-2' } + )) + await waitForSent(harness, 3) + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'alice', 'alice']) +}) + test('remote replayed events older than the subscription session are dropped by default', async () => { const harness = makeHarness() @@ -523,6 +620,7 @@ test('slack direct message event scope can be disabled', async () => { await harness.bridge.reconcile('project-1', [slackIntegration]) assert.deepEqual(harness.subscribeCalls[0].globs, [ + '/slack/channels/C123ABC/**', '/slack/channels/C123ABC__proj-cloud/**' ]) @@ -580,9 +678,10 @@ test('slack context resolves with history off through one targeted remote previe await waitForSent(harness, 1) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) - assert.match(harness.sent[0].input.text, /Inline context preview:/u) - assert.match(harness.sent[0].input.text, /targeted Slack context/u) - assert.equal(harness.sent[0].input.text.match(/Inline context preview:/gu)?.length, 1) + assert.match(harness.sent[0].input.text, /Slack message event/u) + assert.match(harness.sent[0].input.text, /Location: #proj-cloud/u) + assert.match(harness.sent[0].input.text, /Message:\ntargeted Slack context/u) + assert.doesNotMatch(harness.sent[0].input.text, /Inline context preview:/u) assert.doesNotMatch(harness.sent[0].input.text, /Slack text:/u) assert.deepEqual(harness.readFileCalls, [ { @@ -594,6 +693,48 @@ test('slack context resolves with history off through one targeted remote previe assert.equal((harness.sent[0].input.data?.contextPreview as { content?: string } | undefined)?.content, undefined) }) +test('slack context falls back to expanded event data when targeted remote preview is missing', async () => { + const harness = makeHarness(['alice'], { failReadFile: true }) + const messagePath = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + downloadHistoricalData: false, + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + await harness.emit({ + ...changeEvent(messagePath, 'slack'), + expand: async () => ({ + level: 'full', + path: messagePath, + data: { + text: 'expanded Slack context', + userName: 'Khaliq' + } + }) + } as ChangeEvent) + await waitForSent(harness, 1) + + assert.match(harness.sent[0].input.text, /Slack message event/u) + assert.match(harness.sent[0].input.text, /Author: Khaliq/u) + assert.match(harness.sent[0].input.text, /Message:\nexpanded Slack context/u) + assert.deepEqual(harness.readFileCalls, [ + { + workspaceId: 'workspace-id', + path: messagePath + } + ]) + assert.equal((harness.sent[0].input.data?.contextPreview as { kind?: string } | undefined)?.kind, 'text') + assert.equal((harness.sent[0].input.data?.contextPreview as { content?: string } | undefined)?.content, undefined) +}) + test('integration event targeted context previews skip binary files', async () => { const harness = makeHarness(['alice'], { readFileResponse: (_workspaceId, path) => ({ @@ -619,7 +760,7 @@ test('integration event targeted context previews skip binary files', async () = await harness.emit(changeEvent('/slack/channels/C123ABC/messages/1780668000_000000/meta.json', 'slack')) await waitForSent(harness, 1) - assert.match(harness.sent[0].input.text, /Context preview skipped: binary content/u) + assert.match(harness.sent[0].input.text, /Message: skipped; context preview is binary/u) assert.equal((harness.sent[0].input.data?.contextPreview as { kind?: string } | undefined)?.kind, 'binary') }) @@ -648,7 +789,7 @@ test('integration event targeted context previews skip files above the injection await harness.emit(changeEvent('/slack/channels/C123ABC/messages/1780668000_000000/meta.json', 'slack')) await waitForSent(harness, 1) - assert.match(harness.sent[0].input.text, /exceeds the injection preview cap/u) + assert.match(harness.sent[0].input.text, /Message: skipped; context preview is 33792 bytes/u) assert.equal((harness.sent[0].input.data?.contextPreview as { kind?: string } | undefined)?.kind, 'too-large') }) @@ -961,6 +1102,41 @@ test('resource alias mount paths with the same revision inject one logical chang assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) }) +test('slack channel aliases without revision inject one logical message only once', async () => { + const harness = makeHarness() + const slackIntegration = integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { + notifyAgents: ['alice'] + } + }) + + await withMockedNow('2026-06-04T21:10:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [slackIntegration]) + }) + + await withMockedNow('2026-06-04T21:10:05.000Z', async () => { + await harness.emit(changeEvent( + '/slack/channels/C123ABC/messages/1780607825_485189/meta.json', + 'slack' + )) + await waitForSent(harness, 1) + }) + + await withMockedNow('2026-06-04T21:11:05.000Z', async () => { + await harness.emit(changeEvent( + '/slack/channels/C123ABC__proj-cloud/messages/1780607825_485189/meta.json', + 'slack' + )) + await waitForDispatcherTick() + }) + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) + assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 1) +}) + test('generic provider agent scope keys are not treated as notification targets', async () => { const harness = makeHarness() @@ -1189,6 +1365,104 @@ test('integration event delivery failures use aggregated warn cadence by default }) }) +test('failed deliveries release the dedupe key so duplicate events retry', async () => { + const options = { failSend: true } + const harness = makeHarness(['alice'], options) + const warnCalls: unknown[][] = [] + const originalWarn = console.warn + console.warn = (...args: unknown[]) => { + warnCalls.push(args) + } + + try { + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' + await harness.emit(changeEvent(path, 'slack')) + await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] event delivery failed')) + assert.equal(harness.sent.length, 0) + + // The same logical change arrives again (remote copy of a local mount + // change). The failed injection must not have pinned the dedupe key. + options.failSend = false + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 1) + } finally { + console.warn = originalWarn + } + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) +}) + +test('no-recipient drops release the dedupe key so duplicates deliver after an agent registers', async () => { + const agents: string[] = [] + const harness = makeHarness(agents) + const warnCalls: unknown[][] = [] + const originalWarn = console.warn + console.warn = (...args: unknown[]) => { + warnCalls.push(args) + } + + try { + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'] + }) + ]) + }) + + const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' + await harness.emit(changeEvent(path, 'slack')) + await waitForDropped('project-1', 1) + assert.equal(harness.sent.length, 0) + assert.ok(warnCalls.some((call) => call[0] === '[integration-events] skipped no recipients')) + + // The configured recipient registers and a duplicate of the event arrives: + // the earlier no-recipient drop must not suppress delivery. + agents.push('claude-1') + harness.bridge.invalidateProjectAgentCache('project-1') + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 1) + } finally { + console.warn = originalWarn + } + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['claude-1']) +}) + +test('explicit notification agents are used while project roster is still empty', async () => { + const harness = makeHarness([]) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['claude-1'] } + }) + ]) + }) + + await harness.emit(changeEvent('/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json', 'slack')) + await waitForSent(harness, 1) + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['claude-1']) + assert.deepEqual(harness.listAgentsCalls, ['project-1']) +}) + test('integration event dispatcher compacts large bursts into a bounded summary', async () => { const harness = makeHarness(['alice']) @@ -1265,7 +1539,8 @@ test('integration event dispatcher filters noise before queue admission', async await waitUntil(() => harness.sent.length === 1) - assert.match(harness.sent[0].input.text, /Relayfile path: \/slack\/channels\/C123ABC__proj-cloud\/messages\/1780674600_000000\/meta\.json/u) + assert.match(harness.sent[0].input.text, /Slack message event/u) + assert.match(harness.sent[0].input.text, /Path: \.integrations\/slack\/channels\/C123ABC__proj-cloud\/messages\/1780674600_000000\/meta\.json/u) assert.doesNotMatch(harness.sent[0].input.text, /Slack messages changed/u) assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], { eventsReceived: 1_001, diff --git a/src/main/cloud-agent.ts b/src/main/cloud-agent.ts index d649a694..003b58b2 100644 --- a/src/main/cloud-agent.ts +++ b/src/main/cloud-agent.ts @@ -767,9 +767,7 @@ export class CloudAgentManager { system: true } } as const - return broker.sendMessageAndWaitForDelivery - ? broker.sendMessageAndWaitForDelivery(normalizedProjectId, input) - : broker.sendMessage(normalizedProjectId, input) + return broker.sendMessage(normalizedProjectId, input) }) ) } diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 736378f9..7af58e72 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -1,3 +1,4 @@ +import { createHash } from 'node:crypto' import { existsSync, watch, type FSWatcher } from 'node:fs' import { appendFile, mkdir, stat } from 'node:fs/promises' import { homedir } from 'node:os' @@ -22,7 +23,14 @@ const INTEGRATION_EVENT_SCOPES = ['relayfile:fs:read:/**'] const PROJECT_INTEGRATIONS_LINK_NAME = '.integrations' const RECENT_INJECTION_TTL_MS = 10_000 const RECENT_LOGICAL_CHANGE_TTL_MS = 10 * 60_000 +// Bounded persistent window for Slack record identities: must outlast the +// upstream queue's retry storm (max 5 retries of a batch that can stall for +// ~13 minutes each) so time-separated replays of one logical message are +// still suppressed. Slack dedupe is content-aware when preview data is +// available, so an edit to the same Slack message can still inject. +const SLACK_RECORD_REPLAY_TTL_MS = 60 * 60_000 const REPLAY_SKEW_TOLERANCE_MS = 15 * 60_000 +const REMOTE_SUBSCRIPTION_FROM: 'legacy' = 'legacy' const INTEGRATION_EVENT_LOG_PATH = join(homedir(), '.agentworkforce', 'pear', 'integration-events.log') const AGGREGATED_WARNING_REPEAT_EVERY = 25 const MAX_AGGREGATED_WARNING_KEYS = 256 @@ -331,18 +339,32 @@ function workspaceIdFromJwt(token: string | undefined): string | null { // Tolerates the legacy `/integrations//...` catalog form. function canonicalMountPaths(integration: ConnectedIntegration): string[] { const provider = toRelayfileProvider(integration.provider) - const mountPaths = integration.mountPaths.map((path) => { + const mountPaths = integration.mountPaths.flatMap((path) => { const discovery = path.match(/^\/discovery(?:\/.*)?$/) - if (discovery) return path + if (discovery) return [path] const prefixed = path.match(/^\/integrations\/[^/]+(\/.*)?$/) - if (prefixed) return `/${provider}${prefixed[1] ?? ''}` + if (prefixed) { + const normalized = `/${provider}${prefixed[1] ?? ''}` + return [normalized, ...slackCanonicalChannelAliases(provider, normalized)] + } const rootLevel = path.match(/^\/[^/]+(\/.*)?$/) - if (rootLevel) return `/${provider}${rootLevel[1] ?? ''}` - return path + if (rootLevel) { + const normalized = `/${provider}${rootLevel[1] ?? ''}` + return [normalized, ...slackCanonicalChannelAliases(provider, normalized)] + } + return [path, ...slackCanonicalChannelAliases(provider, path)] }) return dedupeStrings(mountPaths) } +function slackCanonicalChannelAliases(provider: string, path: string): string[] { + if (!isSlackProvider(provider)) return [] + const match = path.match(/^\/slack\/channels\/([^/]+)__(?:[^/]+)(\/.*)?$/u) + const channelId = match?.[1]?.trim() + if (!channelId) return [] + return [`/slack/channels/${channelId}${match?.[2] ?? ''}`] +} + function watchGlobForPath(path: string): string { const root = path.trim().replace(/\/+$/u, '') return root.endsWith('/**') ? root : `${root || '/'}/**` @@ -467,6 +489,19 @@ function globMatchesPath(glob: string, path: string): boolean { pattern.every((segment, index) => globSegmentMatches(segment, target[index])) } +export function relayfileSdkPathFiltersFor(globs: string[]): string[] { + return dedupeStrings(globs.map((glob) => { + const segments = normalizeChangePath(glob) + if (segments.length === 0) return '/' + const sdkSegments = segments.map((segment, index) => { + if (segment === '*') return segment + if (segment === '**') return index === segments.length - 1 ? segment : '*' + return segment.includes('*') ? '*' : segment + }) + return `/${sdkSegments.join('/')}` + })) +} + function shouldPublishFilesystemEvent(event: FilesystemEvent): boolean { return event.type === 'file.created' || event.type === 'file.updated' || event.type === 'file.deleted' } @@ -534,7 +569,8 @@ function filesystemEventToChangeEvent( function createWorkspaceScopedEventClient( client: RelayFileClient, workspaceId: string, - tokenProvider: TokenProvider + tokenProvider: TokenProvider, + baseUrl?: string ): RelayfileEventClient { return { subscribe(globs, onChange, options) { @@ -546,6 +582,9 @@ function createWorkspaceScopedEventClient( const pathScope = options?.pathScope?.length && !sameStringList(options.pathScope, globs) ? options.pathScope : null + const relayfilePathFilters = relayfileSdkPathFiltersFor( + options?.pathScope?.length ? options.pathScope : globs + ) const dispatch = (event: FilesystemEvent): void => { if (!active) return @@ -608,14 +647,17 @@ function createWorkspaceScopedEventClient( workspaceId, globs, pathScope: options?.pathScope, - from: options?.from ?? 'now' + relayfilePathFilters, + from: options?.from ?? 'now', + transport: baseUrl ? 'websocket' : 'polling' }) sync = new RelayFileSync({ client, workspaceId, + baseUrl, token, from: options?.from ?? 'now', - paths: options?.pathScope?.length ? options.pathScope : globs, + paths: relayfilePathFilters, onPollingFallback: (info) => { warnIntegrationEventAggregated( `remote stream polling fallback:${workspaceId}`, @@ -628,6 +670,12 @@ function createWorkspaceScopedEventClient( } }) sync.on('event', handleEvent) + sync.on('state', (state) => { + logIntegrationEvent('remote stream state', { + workspaceId, + state + }) + }) sync.on('error', (error) => { const errorMessage = toErrorMessage(error) warnIntegrationEventAggregated( @@ -1005,6 +1053,14 @@ function eventOrigin(event: ChangeEvent): string | null { } function eventChangeFingerprint(event: ChangeEvent): string | null { + // Slack channel records reach us as raw-id and `__` slug copies, + // and queue retries rewrite the same record with a fresh revision each time + // (probe #1: evt_143356/143358/143393/143401 all carried distinct + // revisions for one logical message). Logical path identity must therefore + // take precedence over per-file revisions, which can never match across + // copies or replays. + const slackFingerprint = slackLogicalChangeFingerprint(event) + if (slackFingerprint) return slackFingerprint const digest = eventRecordValue(event, 'digest') const revision = eventRecordValue(event, 'revision') const contentHash = eventRecordValue(event, 'contentHash') @@ -1024,6 +1080,10 @@ function slackChannelLabel(channelSegment: string): string { return label ? `#${label}` : channelSegment } +function canonicalSlackChannelSegment(channelSegment: string): string { + return channelSegment.split('__', 1)[0] || channelSegment +} + function dispatchCoalescingKey(event: ChangeEvent): string { const provider = eventProvider(event) const segments = pathSegments(event.resource.path) @@ -1033,7 +1093,7 @@ function dispatchCoalescingKey(event: ChangeEvent): string { const threadIndex = segments.indexOf('threads') const replyIndex = segments.indexOf('replies') if (channelIndex >= 0 && segments[channelIndex + 1]) { - const channel = segments[channelIndex + 1] + const channel = canonicalSlackChannelSegment(segments[channelIndex + 1]) if (messageIndex >= 0 && segments[messageIndex + 1]) { return `${provider}:channel:${channel}:message:${segments[messageIndex + 1]}` } @@ -1050,6 +1110,74 @@ function dispatchCoalescingKey(event: ChangeEvent): string { return `${provider}:${normalizeRelayfilePath(event.resource.path)}` } +function slackLogicalChangeFingerprint(event: ChangeEvent): string | null { + if (eventProvider(event) !== 'slack') return null + const segments = pathSegments(event.resource.path) + const channelIndex = segments.indexOf('channels') + const dmIndex = segments.indexOf('dms') + const userIndex = segments.indexOf('users') + const messageIndex = segments.indexOf('messages') + const threadIndex = segments.indexOf('threads') + const replyIndex = segments.indexOf('replies') + + const scopeIndex = channelIndex >= 0 ? channelIndex : dmIndex >= 0 ? dmIndex : userIndex + if (scopeIndex < 0 || !segments[scopeIndex + 1]) return null + + const scopeKind = segments[scopeIndex] + const scopeValue = scopeKind === 'channels' + ? canonicalSlackChannelSegment(segments[scopeIndex + 1]) + : segments[scopeIndex + 1] + + if (messageIndex >= 0 && segments[messageIndex + 1]) { + const suffix = segments.slice(messageIndex + 2).join('/') + return `slack:${scopeKind}:${scopeValue}:message:${segments[messageIndex + 1]}:${suffix}` + } + + if (threadIndex >= 0 && segments[threadIndex + 1]) { + const thread = segments[threadIndex + 1] + if (replyIndex >= 0 && segments[replyIndex + 1]) { + const suffix = segments.slice(replyIndex + 2).join('/') + return `slack:${scopeKind}:${scopeValue}:thread:${thread}:reply:${segments[replyIndex + 1]}:${suffix}` + } + const suffix = segments.slice(threadIndex + 2).join('/') + return `slack:${scopeKind}:${scopeValue}:thread:${thread}:${suffix}` + } + + return null +} + +function stableContentFingerprint(content: string): string { + return createHash('sha256').update(content).digest('hex').slice(0, 16) +} + +function eventDedupeKeyWithFingerprint( + duplicateKey: string, + fingerprint: string | null, + contextPreview?: EventContextPreview +): { key: string; ttlMs: number } { + if (!fingerprint) { + return { + key: duplicateKey, + ttlMs: RECENT_INJECTION_TTL_MS + } + } + + if (fingerprint.startsWith('slack:')) { + const contentAwareFingerprint = contextPreview?.kind === 'text' + ? `${fingerprint}:content:${stableContentFingerprint(contextPreview.content)}` + : fingerprint + return { + key: `${duplicateKey}:change:${contentAwareFingerprint}`, + ttlMs: SLACK_RECORD_REPLAY_TTL_MS + } + } + + return { + key: `${duplicateKey}:change:${fingerprint}`, + ttlMs: RECENT_LOGICAL_CHANGE_TTL_MS + } +} + function dispatchSummaryForEvent(event: ChangeEvent, specs: SubscriptionSpec[]): DispatchSummary { const provider = eventProvider(event) const segments = pathSegments(event.resource.path) @@ -1171,6 +1299,10 @@ function isIntegrationEventDebugEnabled(): boolean { function isTestProcess(): boolean { return process.env.NODE_ENV === 'test' || process.env.VITEST === 'true' || + // node:test children carry NODE_TEST_CONTEXT; `--test` itself is consumed + // by the node CLI and never reaches argv/execArgv in the test process. + process.env.NODE_TEST_CONTEXT !== undefined || + process.execArgv.includes('--test') || process.argv.some((arg) => arg === '--test' || arg.includes('/vitest/')) } @@ -1233,39 +1365,49 @@ function eventContextPreviewFromFile(file: FileReadResponse): EventContextPrevie const buffer = file.encoding === 'base64' ? Buffer.from(rawContent, 'base64') : Buffer.from(rawContent, 'utf8') + return eventContextPreviewFromBuffer(file.path, buffer, file.contentType) +} + +function eventContextPreviewFromBuffer(path: string, buffer: Buffer, contentType?: string): EventContextPreview { const size = buffer.byteLength // The current Relayfile SDK readFile call returns the full file; this cap only // bounds what Pear injects into agent context after the targeted read. if (size > MAX_EVENT_CONTEXT_PREVIEW_BYTES) { return { - path: file.path, + path, kind: 'too-large', content: '', size, - contentType: file.contentType + contentType } } if (buffer.includes(0)) { return { - path: file.path, + path, kind: 'binary', content: '', size, - contentType: file.contentType + contentType } } return { - path: file.path, + path, kind: 'text', content: buffer.toString('utf8'), size, - contentType: file.contentType + contentType } } +function eventContextPreviewFromData(path: string, data: unknown): EventContextPreview | undefined { + if (data === undefined || data === null) return undefined + const content = typeof data === 'string' ? data : JSON.stringify(data, null, 2) + return eventContextPreviewFromBuffer(path, Buffer.from(content, 'utf8'), 'application/json') +} + function eventContextPreviewMetadata(preview: EventContextPreview): EventContextPreviewMetadata { return { path: preview.path, @@ -1275,6 +1417,34 @@ function eventContextPreviewMetadata(preview: EventContextPreview): EventContext } } +function previewRecord(preview: EventContextPreview | undefined): Record | undefined { + if (!preview || preview.kind !== 'text') return undefined + try { + const parsed = JSON.parse(preview.content) + return isRecord(parsed) ? parsed : undefined + } catch { + return undefined + } +} + +function slackPreviewText(preview: EventContextPreview | undefined): string | undefined { + if (!preview || preview.kind !== 'text') return undefined + const record = previewRecord(preview) + const payload = isRecord(record?.payload) ? record.payload : record + const rawEvent = isRecord(payload?.raw_event) ? payload.raw_event : undefined + return eventSummaryValue(payload?.text) || eventSummaryValue(rawEvent?.text) || preview.content +} + +function slackPreviewAuthor(preview: EventContextPreview | undefined): string | undefined { + const record = previewRecord(preview) + const payload = isRecord(record?.payload) ? record.payload : record + const rawEvent = isRecord(payload?.raw_event) ? payload.raw_event : undefined + return eventSummaryValue(payload?.userName) || + eventSummaryValue(payload?.user_name) || + eventSummaryValue(payload?.user) || + eventSummaryValue(rawEvent?.user) +} + function shouldNotifyRelayfilePath(pathValue: string): boolean { const path = pathValue.trim() if (!path || !path.startsWith('/')) return false @@ -1355,10 +1525,62 @@ function slackEventContextPath(path: string): boolean { return /^\/slack\/(?:channels|dms|users)\/[^/]+\/(?:messages|threads)\/.+\.json$/u.test(path) } +function slackScopeLabel(path: string): string | undefined { + const segments = pathSegments(path) + const channelIndex = segments.indexOf('channels') + if (channelIndex >= 0 && segments[channelIndex + 1]) { + return slackChannelLabel(segments[channelIndex + 1]) + } + const dmIndex = segments.indexOf('dms') + if (dmIndex >= 0 && segments[dmIndex + 1]) return `DM ${segments[dmIndex + 1]}` + const userIndex = segments.indexOf('users') + if (userIndex >= 0 && segments[userIndex + 1]) return `User ${segments[userIndex + 1]}` + return undefined +} + +function formatSlackIntegrationEventMessage( + event: ChangeEvent, + contextPreview?: EventContextPreview +): string | null { + const resource = isRecord(event.resource) ? event.resource : {} + const provider = eventSummaryValue(resource.provider) || eventProvider(event) + const relayfilePath = eventSummaryValue(resource.path) + if (provider !== 'slack' || !relayfilePath || !slackEventContextPath(relayfilePath)) return null + + const projectPath = projectIntegrationPathForRelayfilePath(relayfilePath) + const scopeLabel = slackScopeLabel(relayfilePath) + const messageText = slackPreviewText(contextPreview) + const author = slackPreviewAuthor(contextPreview) + const lines = [ + '', + 'Slack message event', + `Event id: ${event.id}`, + `Occurred at: ${event.occurredAt}` + ] + + if (scopeLabel) lines.push(`Location: ${scopeLabel}`) + if (author) lines.push(`Author: ${author}`) + lines.push(`Path: ${projectPath}`) + if (messageText) { + lines.push('Message:', messageText) + } else if (contextPreview?.kind === 'too-large') { + lines.push(`Message: skipped; context preview is ${contextPreview.size} bytes.`) + } else if (contextPreview?.kind === 'binary') { + lines.push(`Message: skipped; context preview is binary (${contextPreview.size} bytes).`) + } else { + lines.push('Message: unavailable; targeted context read did not return content.') + } + lines.push('') + return lines.join('\n') +} + function formatIntegrationEventMessage( event: ChangeEvent, contextPreview?: EventContextPreview ): string { + const slackMessage = formatSlackIntegrationEventMessage(event, contextPreview) + if (slackMessage) return slackMessage + const summary = isRecord(event.summary) ? event.summary : {} const resource = isRecord(event.resource) ? event.resource : {} const provider = eventSummaryValue(resource.provider) || 'integration' @@ -1733,8 +1955,9 @@ export class IntegrationEventBridge { projectId, workspaceId: handle.workspaceId, localMountWorkspaceId: handle.localMountWorkspaceId, - // temporary-pending-SDK-contract: Relayfile WS currently replays - // recent catch-up events without a from=now/path-scope contract. + // Use the SDK catch-up stream and let Pear's replay filter below + // decide what is live enough to inject. This avoids losing a Slack + // webhook that lands while Pear is still attaching the stream. remoteSubscriptionStartedAt: new Date(remoteSubscriptionStartedAtMs).toISOString(), globs: watches.map((watch) => watch.glob), specs: specs.map((spec) => ({ @@ -1778,7 +2001,7 @@ export class IntegrationEventBridge { coalesce: 'fire-once', coalesceMs: Math.max(...watches.map((watch) => watch.coalesceMs), 750), pathScope: watches.map((watch) => watch.glob), - from: 'now', + from: REMOTE_SUBSCRIPTION_FROM, onCoalesced: () => incrementIntegrationEventCounter(projectId, 'eventsCoalesced'), onQueueDepth: (depth) => setIntegrationEventGauge(projectId, 'queueDepth', depth) } @@ -1870,12 +2093,25 @@ export class IntegrationEventBridge { const path = eventSummaryValue(event.resource.path) if (!path) return undefined + let readFileError: unknown try { const handle = await this.getWorkspaceHandle() const client = handle.client() - if (typeof client.readFile !== 'function') return undefined - return eventContextPreviewFromFile(await client.readFile(handle.workspaceId, path)) + if (typeof client.readFile === 'function') { + return eventContextPreviewFromFile(await client.readFile(handle.workspaceId, path)) + } } catch (error) { + readFileError = error + } + + try { + const expanded = await event.expand('full') + const expandedRecord = isRecord(expanded) ? expanded : {} + return eventContextPreviewFromData( + typeof expandedRecord.path === 'string' ? expandedRecord.path : path, + expandedRecord.data + ) + } catch (expandError) { warnIntegrationEventAggregated( `event context read failed:${projectId}`, 'event context read failed', @@ -1883,7 +2119,8 @@ export class IntegrationEventBridge { projectId, eventId: event.id, path, - error: toErrorMessage(error) + error: readFileError ? toErrorMessage(readFileError) : toErrorMessage(expandError), + expandError: readFileError ? toErrorMessage(expandError) : undefined } ) return undefined @@ -1947,32 +2184,60 @@ export class IntegrationEventBridge { ): Promise { const duplicateKey = injectionDeduplicationKey(projectId, event, matchedSpecs) const fingerprint = eventChangeFingerprint(event) - const recentKey = fingerprint ? `${duplicateKey}:change:${fingerprint}` : duplicateKey - if (this.wasRecentlyInjected(recentKey, fingerprint ? RECENT_LOGICAL_CHANGE_TTL_MS : RECENT_INJECTION_TTL_MS)) { - incrementIntegrationEventCounter(projectId, 'eventsDropped') - logIntegrationEvent('skipped duplicate path', { - projectId, - eventId: event.id, - path: event.resource.path, - duplicateKey: recentKey - }) - return + const needsSlackContentAwareDedupe = fingerprint?.startsWith('slack:') === true + let dedupe = eventDedupeKeyWithFingerprint(duplicateKey, fingerprint) + let dedupeClaimed = false + + if (!needsSlackContentAwareDedupe) { + if (this.wasRecentlyInjected(dedupe.key, dedupe.ttlMs)) { + incrementIntegrationEventCounter(projectId, 'eventsDropped') + logIntegrationEvent('skipped duplicate path', { + projectId, + eventId: event.id, + path: event.resource.path, + duplicateKey: dedupe.key + }) + return + } + dedupeClaimed = true } const bridge = await this.bridge() const uniqueRecipients = await this.recipientsForMatchedSpecs(projectId, matchedSpecs, bridge) if (uniqueRecipients.length === 0) { + // Release the dedupe key: a duplicate of this event (remote copy of a + // local change, coalesced update) must be allowed to retry once a + // recipient registers; otherwise the event is suppressed for the TTL. + if (dedupeClaimed) this.recentInjections.delete(dedupe.key) incrementIntegrationEventCounter(projectId, 'eventsDropped') - logIntegrationEvent('skipped no recipients', { - projectId, - eventId: event.id, - path: event.resource.path - }) + warnIntegrationEventAggregated( + `skipped no recipients:${projectId}`, + 'skipped no recipients', + { + projectId, + eventId: event.id, + path: event.resource.path + } + ) return } const eventMetadata = integrationEventMetadata(event) const contextPreview = await this.readEventContextPreview(projectId, event) + if (needsSlackContentAwareDedupe) { + dedupe = eventDedupeKeyWithFingerprint(duplicateKey, fingerprint, contextPreview) + if (this.wasRecentlyInjected(dedupe.key, dedupe.ttlMs)) { + incrementIntegrationEventCounter(projectId, 'eventsDropped') + logIntegrationEvent('skipped duplicate path', { + projectId, + eventId: event.id, + path: event.resource.path, + duplicateKey: dedupe.key + }) + return + } + dedupeClaimed = true + } const contextPreviewData = contextPreview ? eventContextPreviewMetadata(contextPreview) : undefined logIntegrationEvent('injecting', { projectId, @@ -1980,6 +2245,8 @@ export class IntegrationEventBridge { path: event.resource.path, recipients: uniqueRecipients }) + let deliveredCount = 0 + const sendErrors: Array<{ recipient: string; error: unknown }> = [] for (const recipient of uniqueRecipients) { const input = { to: recipient, @@ -1999,7 +2266,32 @@ export class IntegrationEventBridge { ...eventMetadata } } as const - await this.sendBrokerMessage(projectId, input, bridge) + try { + await this.sendBrokerMessage(projectId, input, bridge) + deliveredCount += 1 + } catch (error) { + sendErrors.push({ recipient, error }) + } + } + if (deliveredCount === 0 && sendErrors.length > 0) { + // No recipient got the event. Release the dedupe key so a duplicate of + // this event (remote copy of a local change, coalesced update) retries + // delivery instead of being dropped as a recent injection. + if (dedupeClaimed) this.recentInjections.delete(dedupe.key) + throw sendErrors[0].error + } + if (sendErrors.length > 0) { + warnIntegrationEventAggregated( + `event recipient send failed:${projectId}`, + 'event recipient send failed', + { + projectId, + eventId: event.id, + path: event.resource.path, + recipients: sendErrors.map((entry) => entry.recipient), + error: toErrorMessage(sendErrors[0].error) + } + ) } incrementIntegrationEventCounter(projectId, 'eventsInjected') } @@ -2019,7 +2311,10 @@ export class IntegrationEventBridge { const onlineExplicitAgents = projectAgents ? targets.agents.filter((agent) => projectAgents.includes(agent)) : [] - const explicitTargets = dedupeStrings([...onlineExplicitAgents, ...targets.channels]) + const explicitAgents = projectAgents && projectAgents.length === 0 && targets.agents.length > 0 + ? targets.agents + : onlineExplicitAgents + const explicitTargets = dedupeStrings([...explicitAgents, ...targets.channels]) if (explicitTargets.length === 0) { recipients.push(...(projectAgents ?? await this.listProjectAgentsCached(projectId, bridge))) } else { @@ -2164,7 +2459,7 @@ export class IntegrationEventBridge { const handle: RelayfileWorkspaceHandle = { workspaceId: relayWorkspaceId, localMountWorkspaceId: accountWorkspaceId, - client: () => createWorkspaceScopedEventClient(client, relayWorkspaceId, workspaceTokenProvider) + client: () => createWorkspaceScopedEventClient(client, relayWorkspaceId, workspaceTokenProvider, joined.info.relayfileUrl) } accountIntegrationEventHandle = { apiUrl: auth.apiUrl, diff --git a/src/main/integrations.test.ts b/src/main/integrations.test.ts index 1a996dbb..b379311c 100644 --- a/src/main/integrations.test.ts +++ b/src/main/integrations.test.ts @@ -181,7 +181,8 @@ const mock = vi.hoisted(() => { relayWorkspaceManager, brokerManager: { listAgents: vi.fn(async () => []), - sendMessage: vi.fn(async () => undefined) + sendMessage: vi.fn(async () => undefined), + sendMessageAndWaitForDelivery: vi.fn(async () => undefined) }, ensureProjectIntegrationsLink: vi.fn(async () => undefined), removeProjectIntegrationsLink: vi.fn(async () => undefined), @@ -275,6 +276,7 @@ describe('IntegrationsManager', () => { mock.relayWorkspaceManager.getWorkspaceHandle.mockClear() mock.brokerManager.listAgents.mockClear() mock.brokerManager.sendMessage.mockClear() + mock.brokerManager.sendMessageAndWaitForDelivery.mockClear() mock.ensureProjectIntegrationsLink.mockClear() mock.removeProjectIntegrationsLink.mockClear() mock.resetStore() @@ -442,6 +444,83 @@ describe('IntegrationsManager', () => { ) }) + it('does not wait for delivery confirmation when injecting integration guidance', async () => { + vi.useFakeTimers() + mock.brokerManager.listAgents.mockResolvedValue([{ name: 'claude-1', projectId: 'project-1' }]) + mock.brokerManager.sendMessageAndWaitForDelivery.mockRejectedValue(new Error('delivery timeout')) + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined) + const manager = new IntegrationsManager() + + await manager.notifyAgentState('project-1') + await vi.advanceTimersByTimeAsync(1_000) + + expect(mock.brokerManager.sendMessage).toHaveBeenCalledWith( + 'project-1', + expect.objectContaining({ + to: 'claude-1', + from: 'system', + text: expect.stringContaining('') + }) + ) + expect(mock.brokerManager.sendMessageAndWaitForDelivery).not.toHaveBeenCalled() + expect(warnSpy).not.toHaveBeenCalledWith( + '[integrations] Failed to inject integration system message:', + expect.any(String) + ) + warnSpy.mockRestore() + }) + + it('does not re-send an unchanged integrations update after the dedupe window lapses', async () => { + vi.useFakeTimers() + mock.brokerManager.listAgents.mockResolvedValue([{ name: 'claude-1', projectId: 'project-1' }]) + const manager = new IntegrationsManager() + const integrationsUpdateSends = (): number => + mock.brokerManager.sendMessage.mock.calls.filter( + ([, input]) => (input as { data?: { kind?: string } }).data?.kind === 'integrations-update' + ).length + + await manager.notifyAgentState('project-1') + await vi.advanceTimersByTimeAsync(1_000) + expect(integrationsUpdateSends()).toBe(1) + await vi.waitFor(() => expect(mock.integrationMountManager.ensureMounted).toHaveBeenCalled()) + mock.brokerManager.sendMessage.mockClear() + + // A later reconcile with identical integration state must not re-broadcast + // regardless of elapsed time — duplicate messages + // were observed minutes apart with the old 30s text-TTL dedupe. + await manager.notifyAgentState('project-1') + await vi.advanceTimersByTimeAsync(1_000) + expect(integrationsUpdateSends()).toBe(1) + + await vi.advanceTimersByTimeAsync(60_000) + await manager.notifyAgentState('project-1') + await vi.advanceTimersByTimeAsync(1_000) + expect(integrationsUpdateSends()).toBe(1) + }) + + it('re-broadcasts the integrations update after a failed send', async () => { + vi.useFakeTimers() + mock.brokerManager.listAgents.mockResolvedValue([{ name: 'claude-1', projectId: 'project-1' }]) + mock.brokerManager.sendMessage.mockRejectedValueOnce(new Error('broker unavailable')) + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined) + const manager = new IntegrationsManager() + const integrationsUpdateSends = (): number => + mock.brokerManager.sendMessage.mock.calls.filter( + ([, input]) => (input as { data?: { kind?: string } }).data?.kind === 'integrations-update' + ).length + + await manager.notifyAgentState('project-1') + await vi.advanceTimersByTimeAsync(1_000) + expect(integrationsUpdateSends()).toBe(1) + + // The failed broadcast must not record a last-sent signature; the next + // reconcile retries the identical message. + await manager.notifyAgentState('project-1') + await vi.advanceTimersByTimeAsync(1_000) + expect(integrationsUpdateSends()).toBe(2) + warnSpy.mockRestore() + }) + it('does not dedupe integration guidance when the first agent wait times out empty', async () => { vi.useFakeTimers() mock.brokerManager.listAgents.mockResolvedValue([]) @@ -495,6 +574,27 @@ describe('IntegrationsManager', () => { ) }) + it('retries active integration event subscriptions after startup mount hydration', async () => { + mock.store.projects[0].integrations[0].subscribeAgent = true + const manager = new IntegrationsManager() + + await manager.startLocalMountDaemon() + await vi.waitFor(() => expect(mock.integrationMountManager.ensureMounted).toHaveBeenCalled()) + + expect(mock.integrationEventBridge.closeAllExcept).toHaveBeenCalledTimes(2) + expect(mock.integrationEventBridge.reconcile).toHaveBeenCalledTimes(2) + expect(mock.integrationEventBridge.reconcile).toHaveBeenLastCalledWith( + 'project-1', + expect.arrayContaining([ + expect.objectContaining({ + provider: 'slack', + integrationId: 'slack-integration-1', + subscribeAgent: true + }) + ]) + ) + }) + it('builds initial spawn instructions from project integrations', () => { const manager = new IntegrationsManager() diff --git a/src/main/integrations.ts b/src/main/integrations.ts index 9ccd2671..2f49e8c7 100644 --- a/src/main/integrations.ts +++ b/src/main/integrations.ts @@ -214,7 +214,6 @@ const POLL_INTERVAL_MS = 2_000 const POLL_TIMEOUT_MS = 5 * 60_000 const CATALOG_CACHE_MS = 5 * 60_000 const SYSTEM_MESSAGE_DEBOUNCE_MS = 1_000 -const SYSTEM_MESSAGE_DEDUPE_MS = 30_000 const SYSTEM_MESSAGE_AGENT_WAIT_TIMEOUT_MS = 8_000 const SYSTEM_MESSAGE_AGENT_WAIT_INTERVAL_MS = 500 const LOCAL_MOUNT_CLOUD_HYDRATION_THROTTLE_MS = 30_000 @@ -720,7 +719,10 @@ export class IntegrationsManager { private sessionMetadata = new Map() private pollTimers = new Map() private systemMessageTimers = new Map() - private recentlyInjectedSystemMessages = new Map() + // Last successfully broadcast system message per project. Content-keyed (not + // TTL-keyed): an unchanged integrations update is never re-broadcast, no + // matter how often reconcile runs; any content change re-sends immediately. + private lastBroadcastSystemMessages = new Map() private catalogCache: IntegrationAdapter[] | null = null private catalogFetchedAt = 0 private localMountCloudHydrationStartedAt = 0 @@ -935,9 +937,11 @@ export class IntegrationsManager { async startLocalMountDaemon(): Promise { await this.syncActiveEventSubscriptions() - void this.syncLocalMounts().catch((error) => { - console.warn('[integrations] Failed to start local integration mounts:', toErrorMessage(error)) - }) + void this.syncLocalMounts() + .then(() => this.syncActiveEventSubscriptions()) + .catch((error) => { + console.warn('[integrations] Failed to start local integration mounts:', toErrorMessage(error)) + }) } async notifyAgentState(projectId: string): Promise { @@ -1793,12 +1797,7 @@ export class IntegrationsManager { options: IntegrationSystemMessageOptions = {} ): Promise { try { - const messageKey = `${projectId}\0${message}` - const now = Date.now() - for (const [key, expiresAt] of Array.from(this.recentlyInjectedSystemMessages.entries())) { - if (expiresAt <= now) this.recentlyInjectedSystemMessages.delete(key) - } - if (this.recentlyInjectedSystemMessages.has(messageKey)) return + if (this.lastBroadcastSystemMessages.get(projectId) === message) return const bridge = brokerManager as unknown as IntegrationSystemMessageBridge const agents = await this.listSystemMessageAgents(bridge, projectId, options.waitForAgent === true) @@ -1817,12 +1816,15 @@ export class IntegrationsManager { system: true } } as const - return bridge.sendMessageAndWaitForDelivery - ? bridge.sendMessageAndWaitForDelivery(projectId, input) - : bridge.sendMessage(projectId, input) + // Delivery confirmations can hang behind inactive PTYs. Integration + // updates are setup context, so broker send acceptance is the + // reliable boundary for idempotent notification. + return bridge.sendMessage(projectId, input) }) ) - this.recentlyInjectedSystemMessages.set(messageKey, Date.now() + SYSTEM_MESSAGE_DEDUPE_MS) + // Recorded only after every send was accepted: a failed broadcast leaves + // no signature, so the next reconcile retries the identical message. + this.lastBroadcastSystemMessages.set(projectId, message) } catch (error) { console.warn('[integrations] Failed to inject integration system message:', toErrorMessage(error)) }