diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 15bf109e..cd5aaa70 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -1,4 +1,6 @@ import assert from 'node:assert/strict' +import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises' +import { tmpdir } from 'node:os' import { join } from 'node:path' import { beforeEach, test } from 'node:test' @@ -53,13 +55,17 @@ function integration(overrides: Partial & { function changeEvent( path: string, provider = path.split('/')[1] || 'github', - overrides: { digest?: string; origin?: string; revision?: string } = {} + overrides: { digest?: string; occurredAt?: string; origin?: string; revision?: string } = {} ): ChangeEvent { + const slackTs = path.match(/\/(?:messages|replies)\/(\d{10})_(\d+)(?:\/|\.json$)/u) + const occurredAt = overrides.occurredAt ?? (slackTs?.[1] + ? new Date(Number(`${slackTs[1]}.${slackTs[2] || '0'}`) * 1000).toISOString() + : '2026-06-04T00:00:00.000Z') return { id: `evt:${path}`, workspace: 'workspace-id', type: 'relayfile.changed', - occurredAt: '2026-06-04T00:00:00.000Z', + occurredAt, resource: { path, provider, @@ -83,6 +89,29 @@ function changeEvent( } as ChangeEvent } +function changeEventWithFullData( + path: string, + provider: string, + data: Record +): ChangeEvent { + return { + ...changeEvent(path, provider), + expand: async (level = 'summary') => level === 'full' + ? { + level, + path, + data + } + : { + level, + path, + summary: { + title: path + } + } + } as ChangeEvent +} + function makeHarness(agents = ['alice', 'bob'], options: { failSend?: boolean } = {}): { bridge: IntegrationEventBridge subscribeCalls: SubscribeCall[] @@ -135,6 +164,13 @@ beforeEach(() => { delete process.env.PEAR_INTEGRATION_EVENTS_DEBUG }) +async function waitForSent(harness: { sent: SentMessage[] }, count: number): Promise { + const deadline = Date.now() + 1_000 + while (harness.sent.length < count && Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, 10)) + } +} + test('integration events route only to the targets for the matching integration path', async () => { const harness = makeHarness() @@ -175,6 +211,7 @@ test('channel notification targets do not fall back to all project agents', asyn ]) await harness.emit(changeEvent('/slack/channels/general/messages/123.json', 'slack')) + await waitForSent(harness, 1) assert.deepEqual(harness.sent.map((message) => message.input.to), ['#triage']) assert.deepEqual(harness.listAgentsCalls, []) @@ -193,6 +230,7 @@ test('offline notification agents fall back to current project agents', async () ]) await harness.emit(changeEvent('/slack/channels/general/messages/123.json', 'slack')) + await waitForSent(harness, 2) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'bob']) assert.deepEqual(harness.listAgentsCalls, ['project-1']) @@ -215,14 +253,21 @@ test('integration events watch selected relayfile mount paths', async () => { assert.deepEqual(harness.subscribeCalls[0].globs, [ '/slack/channels/C123ABC/**', - '/slack/channels/C123ABC__proj-cloud/**' + '/slack/channels/C123ABC__proj-cloud/**', + '/slack/channels/D*/**', + '/slack/dms/*/**', + '/slack/users/*/messages/**' ]) assert.deepEqual(integrationSubscriptionSummaries([slackIntegration])[0].watches, [ '.integrations/slack/channels/C123ABC/**', - '.integrations/slack/channels/C123ABC__proj-cloud/**' + '.integrations/slack/channels/C123ABC__proj-cloud/**', + '.integrations/slack/channels/D*/**', + '.integrations/slack/dms/*/**', + '.integrations/slack/users/*/messages/**' ]) await harness.emit(changeEvent('/slack/channels/C123ABC__proj-cloud/messages/1713220123_001100/meta.json', 'slack')) + await waitForSent(harness, 1) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) assert.match(harness.sent[0].input.text, /Path: \.integrations\/slack\/channels\/C123ABC__proj-cloud\/messages\/1713220123_001100\/meta\.json/u) @@ -230,9 +275,216 @@ test('integration events watch selected relayfile mount paths', async () => { harness.sent.splice(0) await harness.emit(changeEvent('/slack/channels/C123ABC/messages/1713220124_001100/meta.json', 'slack')) + await waitForSent(harness, 1) + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) + + harness.sent.splice(0) + await harness.emit(changeEvent('/slack/channels/C999XYZ/messages/1713220125_001100/meta.json', 'slack')) + assert.deepEqual(harness.sent, []) + + await harness.emit(changeEvent('/slack/channels/D123ABC/messages/1713220126_001100/meta.json', 'slack')) + await waitForSent(harness, 1) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) }) +test('slack direct message event scope can be disabled', async () => { + const harness = makeHarness() + const slackIntegration = integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { + channels: ['C123ABC'], + listenDms: false, + notifyAgents: ['alice'] + } + }) + + await harness.bridge.reconcile('project-1', [slackIntegration]) + + assert.deepEqual(harness.subscribeCalls[0].globs, [ + '/slack/channels/C123ABC/**', + '/slack/channels/C123ABC__proj-cloud/**' + ]) + + await harness.emit(changeEvent('/slack/channels/D123ABC/messages/1713220126_001100/meta.json', 'slack')) + assert.deepEqual(harness.sent, []) +}) + +test('slack backfill and malformed nested message paths are not injected', async () => { + const harness = makeHarness(['alice']) + const workspaceId = 'workspace-id' + const originalHome = process.env.HOME + const tempHome = await mkdtemp(join(tmpdir(), 'pear-integration-event-')) + const workspaceRoot = join(tempHome, '.agentworkforce', 'pear', 'relayfile', 'workspaces', workspaceId) + const stalePath = '/slack/channels/C123ABC__proj-cloud/messages/1780017507_077969/meta.json' + const staleLocalPath = join(workspaceRoot, ...stalePath.split('/').filter(Boolean)) + + try { + process.env.HOME = tempHome + await mkdir(join(staleLocalPath, '..'), { recursive: true }) + await writeFile(staleLocalPath, JSON.stringify({ + provider: 'slack', + objectType: 'message', + payload: { + text: 'old synced message', + channel: 'C123ABC', + ts: '1780017507.077969' + } + })) + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + + await harness.emit({ + ...changeEvent(stalePath, 'slack'), + occurredAt: '2026-06-05T14:14:57.314Z' + }) + assert.deepEqual(harness.sent, []) + + await harness.emit(changeEvent( + '/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/meta.json', + 'slack' + )) + assert.deepEqual(harness.sent, []) + } finally { + await harness.bridge.close('project-1') + if (originalHome === undefined) { + delete process.env.HOME + } else { + process.env.HOME = originalHome + } + await rm(tempHome, { recursive: true, force: true }) + } +}) + +test('slack thread reply events include local message text in the injected system message', async () => { + const harness = makeHarness(['alice']) + const workspaceId = 'workspace-id' + const originalHome = process.env.HOME + const tempHome = await mkdtemp(join(tmpdir(), 'pear-integration-event-')) + + try { + process.env.HOME = tempHome + const replyPath = '/slack/channels/C123ABC__proj-cloud/threads/1780667635_192799/replies/1780668181_544139.json' + const localPath = join(tempHome, '.agentworkforce', 'pear', 'relayfile', 'workspaces', workspaceId, ...replyPath.split('/').filter(Boolean)) + await mkdir(join(localPath, '..'), { recursive: true }) + await writeFile(localPath, JSON.stringify({ + provider: 'slack', + objectType: 'thread_reply', + payload: { + text: '<@U123> please handle this thread request', + channel: 'C123ABC', + thread_ts: '1780667635.192799', + ts: '1780668181.544139', + user: 'U456' + } + })) + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + + await harness.emit(changeEvent(replyPath, 'slack')) + await waitForSent(harness, 1) + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) + assert.match(harness.sent[0].input.text, /Slack text: <@U123> please handle this thread request/u) + assert.match(harness.sent[0].input.text, /Slack thread ts: 1780667635\.192799/u) + } finally { + await harness.bridge.close('project-1') + if (originalHome === undefined) { + delete process.env.HOME + } else { + process.env.HOME = originalHome + } + await rm(tempHome, { recursive: true, force: true }) + } +}) + +test('remote slack events include expanded message text before local mount sync catches up', async () => { + const harness = makeHarness(['alice']) + const replyPath = '/slack/channels/C123ABC__proj-cloud/threads/1780667635_192799/replies/1780668181_544139.json' + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + + await harness.emit(changeEventWithFullData(replyPath, 'slack', { + provider: 'slack', + objectType: 'thread_reply', + payload: { + text: '<@U123> remote stream text', + channel: 'C123ABC', + thread_ts: '1780667635.192799', + ts: '1780668181.544139', + user: 'U456' + } + })) + await waitForSent(harness, 1) + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) + assert.match(harness.sent[0].input.text, /Slack text: <@U123> remote stream text/u) + assert.match(harness.sent[0].input.text, /Slack thread ts: 1780667635\.192799/u) +}) + +test('slack local event context rejects traversal paths', async () => { + const harness = makeHarness(['alice']) + const workspaceId = 'workspace-id' + const originalHome = process.env.HOME + const tempHome = await mkdtemp(join(tmpdir(), 'pear-integration-event-')) + + try { + process.env.HOME = tempHome + const escapedPath = join(tempHome, '.agentworkforce', 'pear', 'relayfile', 'workspaces', 'leak.json') + await mkdir(join(escapedPath, '..'), { recursive: true }) + await writeFile(escapedPath, JSON.stringify({ + payload: { + text: 'escaped local file should not be injected' + } + })) + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + + await harness.emit(changeEvent('/slack/channels/C123ABC__proj-cloud/threads/../../../../../leak.json', 'slack')) + await waitForSent(harness, 1) + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) + assert.doesNotMatch(harness.sent[0].input.text, /escaped local file should not be injected/u) + } finally { + await harness.bridge.close('project-1') + if (originalHome === undefined) { + delete process.env.HOME + } else { + process.env.HOME = originalHome + } + await rm(tempHome, { recursive: true, force: true }) + } +}) + test('local fallback watchers are disabled when historical download is off', () => { const roots = localWatchRootsFor( 'workspace-id', @@ -248,6 +500,7 @@ test('local fallback watchers are disabled when historical download is off', () }) ], [ + '/slack/channels/D*/**', '/slack/channels/C123ABC/**', '/slack/channels/C123ABC__proj-cloud/**', '/slack/dms/D123ABC/**' @@ -418,10 +671,16 @@ test('integration events preserve discovery mount paths', async () => { await harness.bridge.reconcile('project-1', [slackIntegration]) assert.deepEqual(harness.subscribeCalls[0].globs, [ - '/discovery/slack/**' + '/discovery/slack/**', + '/slack/channels/D*/**', + '/slack/dms/*/**', + '/slack/users/*/messages/**' ]) assert.deepEqual(integrationSubscriptionSummaries([slackIntegration])[0].watches, [ - '.integrations/discovery/slack/**' + '.integrations/discovery/slack/**', + '.integrations/slack/channels/D*/**', + '.integrations/slack/dms/*/**', + '.integrations/slack/users/*/messages/**' ]) await harness.emit(changeEvent('/discovery/slack/actions/create-message/.schema.json', 'slack')) @@ -471,6 +730,7 @@ test('resource alias mount paths with the same revision inject one logical chang 'slack', { revision: 'same-content' } )) + await waitForSent(harness, 1) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) }) @@ -519,6 +779,7 @@ test('integration events ignore index, discovery, tmp, dotfile, and local writeb await harness.emit(changeEvent('/github/repos/create.json', 'github')) await harness.emit(changeEvent('/slack/channels/C123ABC/messages/claude-1-codex-spawned.json', 'slack')) await harness.emit(changeEvent('/slack/channels/C123ABC/threads/1780607825_485189/replies/claude-1-issue82-ack.json', 'slack')) + await harness.emit(changeEvent('/github/repos/.widgets.json.tmp-123', 'github')) assert.deepEqual(harness.sent, []) assert.deepEqual(harness.listAgentsCalls, []) diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index f3ed0173..0a778169 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -1,5 +1,5 @@ import { existsSync, watch, type FSWatcher } from 'node:fs' -import { appendFile, mkdir, stat } from 'node:fs/promises' +import { appendFile, mkdir, readFile, stat } from 'node:fs/promises' import { homedir } from 'node:os' import { dirname, isAbsolute, join, relative, resolve, sep } from 'node:path' import { @@ -24,6 +24,13 @@ const RECENT_LOGICAL_CHANGE_TTL_MS = 10 * 60_000 const INTEGRATION_EVENT_LOG_PATH = join(homedir(), '.agentworkforce', 'pear', 'integration-events.log') const AGGREGATED_WARNING_REPEAT_EVERY = 25 const MAX_AGGREGATED_WARNING_KEYS = 256 +const MAX_INLINE_EVENT_TEXT_CHARS = 4_000 +const SLACK_LIVE_EVENT_WINDOW_MS = 30 * 60 * 1_000 +const SLACK_DM_EVENT_GLOBS = [ + '/slack/channels/D*/**', + '/slack/dms/*/**', + '/slack/users/*/messages/**' +] type IntegrationEventCounterName = 'eventsReceived' | 'eventsInjected' | 'eventsCoalesced' | 'eventsDropped' type IntegrationEventGaugeName = 'queueDepth' | 'mountCount' @@ -42,6 +49,7 @@ type SubscriptionSpec = { integrationId: string provider: string mountPaths: string[] + eventPathGlobs: string[] watches: WatchRegistration[] targets: DeliveryTargets } @@ -176,6 +184,11 @@ function toErrorMessage(error: unknown): string { return error instanceof Error ? error.message : String(error) } +function truncateForSystemMessage(text: string, maxChars = MAX_INLINE_EVENT_TEXT_CHARS): string { + const normalized = text.trim() + return normalized.length > maxChars ? `${normalized.slice(0, maxChars)}...` : normalized +} + function isUnauthorizedError(error: unknown): boolean { if (!error || typeof error !== 'object') return false const status = (error as { httpStatus?: unknown; status?: unknown }).httpStatus ?? @@ -203,6 +216,24 @@ function toRelayfileProvider(provider: string): string { return normalized === 'gmail' ? 'google-mail' : normalized } +function isSlackProvider(provider: string): boolean { + const normalized = toRelayfileProvider(provider) + return normalized === 'slack' || normalized.startsWith('slack-') +} + +function scopeBooleanDefault(scope: Record, keys: string[], defaultValue: boolean): boolean { + for (const key of keys) { + const value = scope[key] + if (typeof value === 'boolean') return value + } + return defaultValue +} + +function slackListenDms(integration: ConnectedIntegration): boolean { + if (!isSlackProvider(integration.provider)) return false + return scopeBooleanDefault(integration.scope, ['listenDms', 'listenDirectMessages', 'directMessages'], true) +} + function pathSegments(path: string): string[] { return path.split(/[\\/]+/u).filter(Boolean) } @@ -257,8 +288,15 @@ function watchGlobForPath(path: string): string { return root.endsWith('/**') ? root : `${root || '/'}/**` } +function eventPathGlobsForIntegration(integration: ConnectedIntegration): string[] { + return dedupeStrings([ + ...canonicalMountPaths(integration).map(watchGlobForPath), + ...(slackListenDms(integration) ? SLACK_DM_EVENT_GLOBS : []) + ]) +} + function watchRegistrationsFor(integrations: ConnectedIntegration[]): WatchRegistration[] { - return dedupeStrings(integrations.flatMap((integration) => canonicalMountPaths(integration).map(watchGlobForPath))) + return dedupeStrings(integrations.flatMap(eventPathGlobsForIntegration)) .map((glob) => ({ glob, coalesceMs: 750 @@ -313,11 +351,13 @@ function targetLabels(targets: DeliveryTargets): string[] { function subscriptionSpecsFor(integrations: ConnectedIntegration[]): SubscriptionSpec[] { return integrations.map((integration) => { const mountPaths = canonicalMountPaths(integration) + const eventPathGlobs = eventPathGlobsForIntegration(integration) return { integrationId: integration.integrationId, provider: integration.provider, mountPaths, - watches: mountPaths.map(watchGlobForPath).map((glob) => ({ + eventPathGlobs, + watches: eventPathGlobs.map((glob) => ({ glob, coalesceMs: 750 })), @@ -343,16 +383,27 @@ function normalizeChangePath(path: string): string[] { return trimmed === '' ? [] : trimmed.split('/').filter(Boolean) } +function globSegmentMatches(pattern: string, segment: string | undefined): boolean { + if (segment === undefined) return false + if (pattern === '*') return true + if (!pattern.includes('*')) return pattern === segment + const escaped = pattern + .split('*') + .map((part) => part.replace(/[|\\{}()[\]^$+?.]/g, '\\$&')) + .join('.*') + return new RegExp(`^${escaped}$`, 'u').test(segment) +} + function globMatchesPath(glob: string, path: string): boolean { const pattern = normalizeChangePath(glob) const target = normalizeChangePath(path) if (pattern.at(-1) === '**') { const prefix = pattern.slice(0, -1) return target.length >= prefix.length && - prefix.every((segment, index) => segment === '*' || segment === target[index]) + prefix.every((segment, index) => globSegmentMatches(segment, target[index])) } return pattern.length === target.length && - pattern.every((segment, index) => segment === '*' || segment === target[index]) + pattern.every((segment, index) => globSegmentMatches(segment, target[index])) } function shouldPublishFilesystemEvent(event: FilesystemEvent): boolean { @@ -607,6 +658,13 @@ function parentRemoteRootForDynamicChildren(remoteRoot: string): string | null { return `/${segments.slice(0, -1).join('/')}` } +function staticRemoteRootBeforeWildcard(remoteRoot: string): string | null { + const segments = pathSegments(remoteRoot) + const wildcardIndex = segments.findIndex((segment) => segment.includes('*')) + if (wildcardIndex <= 0) return null + return `/${segments.slice(0, wildcardIndex).join('/')}` +} + function localPathForRemoteRoot(workspaceId: string, remoteRoot: string): string { return join(homedir(), '.agentworkforce', 'pear', 'relayfile', 'workspaces', workspaceId, ...pathSegments(remoteRoot)) } @@ -629,10 +687,12 @@ export function localWatchRootsFor( for (const glob of globs) { const remoteRoot = remoteRootForWatchGlob(glob) if (!remoteRoot) continue - for (const candidate of dedupeStrings([ - remoteRoot, - parentRemoteRootForDynamicChildren(remoteRoot) - ].filter((entry): entry is string => entry !== null))) { + const candidates = [ + ...(remoteRoot.includes('*') ? [] : [remoteRoot]), + parentRemoteRootForDynamicChildren(remoteRoot), + staticRemoteRootBeforeWildcard(remoteRoot) + ].filter((entry): entry is string => entry !== null && !entry.includes('*')) + for (const candidate of dedupeStrings(candidates)) { if (!isBoundedLocalCommandRoot(candidate)) continue if (!hasWatchableLocalIntegrationFor(watchableIntegrations, candidate)) continue const localRoot = resolve(localPathForRemoteRoot(workspaceId, candidate)) @@ -814,7 +874,10 @@ function watchLocalMounts( function specsForEvent(event: ChangeEvent, specs: SubscriptionSpec[]): SubscriptionSpec[] { const path = event.resource.path - return specs.filter((spec) => spec.mountPaths.some((mountPath) => pathIsInsideMount(path, mountPath))) + return specs.filter((spec) => + spec.mountPaths.some((mountPath) => pathIsInsideMount(path, mountPath)) || + spec.eventPathGlobs.some((glob) => globMatchesPath(glob, path)) + ) } function longestMatchingMountPath(path: string, spec: SubscriptionSpec): string | null { @@ -835,9 +898,9 @@ function injectionDeduplicationKey(projectId: string, event: ChangeEvent, matche const scopedKeys = matchedSpecs .map((spec) => { const mountPath = longestMatchingMountPath(path, spec) - return mountPath - ? `${spec.integrationId}:${spec.provider}:${pathTailAfterMount(path, mountPath)}` - : null + if (mountPath) return `${spec.integrationId}:${spec.provider}:${pathTailAfterMount(path, mountPath)}` + const eventGlob = spec.eventPathGlobs.find((glob) => globMatchesPath(glob, path)) + return eventGlob ? `${spec.integrationId}:${spec.provider}:${eventGlob}:${path}` : null }) .filter((entry): entry is string => entry !== null) if (scopedKeys.length > 0) return `${projectId}:${event.type}:${dedupeStrings(scopedKeys).join('|')}` @@ -986,12 +1049,116 @@ function isLikelyLocalWritebackCommandPath(path: string): boolean { return !/^\d+(?:[._-]\d+)*$/u.test(stem) } -function shouldNotifyRelayfileChange(event: ChangeEvent): boolean { +function slackEventTimestampMs(path: string): number | null { + const match = path.match(/\/(?:messages|replies)\/(\d{10})_(\d+)(?:\/|\.json$)/u) + if (!match?.[1]) return null + return Number(`${match[1]}.${match[2] || '0'}`) * 1000 +} + +function repeatedSlackRoot(path: string): boolean { + const matches = path.match(/\/slack\/(?:channels|dms|users)\//gu) + return (matches?.length || 0) > 1 +} + +async function readLocalEventRecord(event: ChangeEvent, localMountWorkspaceId: string): Promise { + const localPath = localPathForRemoteRoot(localMountWorkspaceId, event.resource.path) + const raw = await readFile(localPath, 'utf8').catch(() => null) + if (!raw) return null + try { + return JSON.parse(raw) + } catch { + return null + } +} + +function recordWebhookReceivedAtMs(record: unknown): number | null { + if (!isRecord(record)) return null + const payload = isRecord(record.payload) ? record.payload : record + const webhook = isRecord(payload._webhook) ? payload._webhook : undefined + const receivedAt = webhook?.receivedAt + if (typeof receivedAt !== 'string') return null + const parsed = Date.parse(receivedAt) + return Number.isFinite(parsed) ? parsed : null +} + +async function shouldNotifySlackMessageChange(event: ChangeEvent, localMountWorkspaceId: string): Promise { + const path = event.resource.path + if (repeatedSlackRoot(path)) return false + + const record = await readLocalEventRecord(event, localMountWorkspaceId) + const occurredAtMs = Date.parse(event.occurredAt) + const webhookReceivedAtMs = recordWebhookReceivedAtMs(record) + if (webhookReceivedAtMs !== null && Number.isFinite(occurredAtMs)) { + return Math.abs(occurredAtMs - webhookReceivedAtMs) <= SLACK_LIVE_EVENT_WINDOW_MS + } + + const slackTsMs = slackEventTimestampMs(path) + if (slackTsMs !== null && Number.isFinite(occurredAtMs)) { + return Math.abs(occurredAtMs - slackTsMs) <= SLACK_LIVE_EVENT_WINDOW_MS + } + + return true +} + +async function shouldNotifyRelayfileChange(event: ChangeEvent, localMountWorkspaceId: string): Promise { if (eventOrigin(event) === 'agent_write') return false - return shouldNotifyRelayfilePath(event.resource.path) + if (!shouldNotifyRelayfilePath(event.resource.path)) return false + if (event.resource.provider === 'slack' && slackEventContextPath(event.resource.path)) { + return shouldNotifySlackMessageChange(event, localMountWorkspaceId) + } + return true +} + +function slackEventContextPath(path: string): boolean { + return /^\/slack\/(?:channels|dms|users)\/[^/]+\/(?:messages|threads)\/.+\.json$/u.test(path) +} + +function slackRecordContextLines(record: unknown): string[] { + if (!isRecord(record)) return [] + const payload = isRecord(record.payload) ? record.payload : record + const rawEvent = isRecord(payload.raw_event) ? payload.raw_event : undefined + const text = eventSummaryValue(payload.text) || eventSummaryValue(rawEvent?.text) + const user = eventSummaryValue(payload.user) || eventSummaryValue(rawEvent?.user) + const channel = eventSummaryValue(payload.channel) || eventSummaryValue(payload.channelId) || eventSummaryValue(rawEvent?.channel) + const threadTs = eventSummaryValue(payload.thread_ts) || eventSummaryValue(payload.threadTs) || eventSummaryValue(rawEvent?.thread_ts) + const ts = eventSummaryValue(payload.ts) || eventSummaryValue(payload.replyTs) || eventSummaryValue(rawEvent?.ts) + + const lines: string[] = [] + if (text) lines.push(`Slack text: ${truncateForSystemMessage(text)}`) + if (channel) lines.push(`Slack channel: ${channel}`) + if (threadTs) lines.push(`Slack thread ts: ${threadTs}`) + if (ts) lines.push(`Slack message ts: ${ts}`) + if (user) lines.push(`Slack user: ${user}`) + return lines +} + +async function localEventContextLines(event: ChangeEvent, localMountWorkspaceId: string): Promise { + const path = event.resource.path + if (pathSegments(path).some((segment) => segment === '.' || segment === '..')) return [] + return slackRecordContextLines(await readLocalEventRecord(event, localMountWorkspaceId)) } -function formatIntegrationEventMessage(event: ChangeEvent): string { +async function expandedEventContextLines(event: ChangeEvent): Promise { + try { + const expanded = await event.expand('full') + if (!isRecord(expanded)) return [] + const data = isRecord(expanded.data) ? expanded.data : expanded + return slackRecordContextLines(data) + } catch { + return [] + } +} + +async function eventContextLines(event: ChangeEvent, localMountWorkspaceId: string): Promise { + const provider = eventSummaryValue(event.resource.provider) + const path = event.resource.path + if (provider !== 'slack' || !slackEventContextPath(path)) return [] + + const expandedLines = await expandedEventContextLines(event) + return expandedLines.length > 0 ? expandedLines : localEventContextLines(event, localMountWorkspaceId) +} + +function formatIntegrationEventMessage(event: ChangeEvent, contextLines: string[] = []): string { const summary = isRecord(event.summary) ? event.summary : {} const resource = isRecord(event.resource) ? event.resource : {} const provider = eventSummaryValue(resource.provider) || 'integration' @@ -1024,9 +1191,10 @@ function formatIntegrationEventMessage(event: ChangeEvent): string { if (actor) lines.push(`Actor: ${actor}`) if (fieldsChanged) lines.push(`Fields changed: ${fieldsChanged}`) if (labels) lines.push(`Labels: ${labels}`) + lines.push(...contextLines) lines.push( - 'Handle this like an incoming user-relevant integration update. The Relayfile path above identifies the changed record; use the matching .integrations path for extra context only when historical download is enabled. Use the existing writeback or messaging path when a response is needed.', + 'Handle this like an incoming user-relevant integration update. The Relayfile path above identifies the changed record; use any inline event context first, then read the matching .integrations path for more detail when available. Use the existing writeback or messaging path when a response is needed.', '' ) return lines.join('\n') @@ -1067,6 +1235,7 @@ export class IntegrationEventBridge { integrationId: spec.integrationId, provider: spec.provider, mountPaths: spec.mountPaths, + eventPathGlobs: spec.eventPathGlobs, targets: spec.targets })) }) @@ -1084,6 +1253,7 @@ export class IntegrationEventBridge { integrationId: spec.integrationId, provider: spec.provider, mountPaths: spec.mountPaths, + eventPathGlobs: spec.eventPathGlobs, targets: targetLabels(spec.targets) })) }) @@ -1098,7 +1268,7 @@ export class IntegrationEventBridge { type: event.type, path: event.resource.path }) - void this.injectEvent(projectId, event, specs).catch((error) => { + void this.injectEvent(projectId, event, specs, handle.localMountWorkspaceId).catch((error) => { const errorMessage = toErrorMessage(error) warnIntegrationEventAggregated( `event delivery failed:${projectId}`, @@ -1132,7 +1302,7 @@ export class IntegrationEventBridge { path: event.resource.path, source: 'local-mount' }) - void this.injectEvent(projectId, event, specs).catch((error) => { + void this.injectEvent(projectId, event, specs, handle.localMountWorkspaceId).catch((error) => { const errorMessage = toErrorMessage(error) warnIntegrationEventAggregated( `local event delivery failed:${projectId}`, @@ -1181,8 +1351,13 @@ export class IntegrationEventBridge { await Promise.all(Array.from(this.subscriptions.keys()).map((projectId) => this.close(projectId))) } - private async injectEvent(projectId: string, event: ChangeEvent, specs: SubscriptionSpec[]): Promise { - if (!shouldNotifyRelayfileChange(event)) { + private async injectEvent( + projectId: string, + event: ChangeEvent, + specs: SubscriptionSpec[], + localMountWorkspaceId: string + ): Promise { + if (!await shouldNotifyRelayfileChange(event, localMountWorkspaceId)) { incrementIntegrationEventCounter(projectId, 'eventsDropped') logIntegrationEvent('skipped filtered path', { projectId, @@ -1253,6 +1428,7 @@ export class IntegrationEventBridge { }) return } + const contextLines = await eventContextLines(event, localMountWorkspaceId) logIntegrationEvent('injecting', { projectId, eventId: event.id, @@ -1264,7 +1440,7 @@ export class IntegrationEventBridge { const input = { to: recipient, from: 'integration', - text: formatIntegrationEventMessage(event), + text: formatIntegrationEventMessage(event, contextLines), priority: 0, mode: 'steer', data: { diff --git a/src/main/integrations.catalog.ts b/src/main/integrations.catalog.ts index 8a673869..91f42fa0 100644 --- a/src/main/integrations.catalog.ts +++ b/src/main/integrations.catalog.ts @@ -320,7 +320,7 @@ export const INTEGRATIONS_CATALOG: IntegrationAdapter[] = [ version: '0.2.12', capabilities: { webhook: true, poll: true, writeback: true }, authMethod: 'oauth', - defaultMountPaths: ['/slack/channels', '/slack/dms'], + defaultMountPaths: ['/slack/channels'], description: 'Slack adapter scaffolding for Relayfile' }, { diff --git a/src/main/integrations.ts b/src/main/integrations.ts index 70bf7d00..05b91e16 100644 --- a/src/main/integrations.ts +++ b/src/main/integrations.ts @@ -1564,7 +1564,7 @@ export class IntegrationsManager { lines.push('') if (!subscriptionsReady && subscriptions.length > 0) { lines.push('Integration event subscriptions are requested for this project, but Pear could not register them with Relayfile yet.') - lines.push('Do not assume notifications will arrive until a later integrations update confirms active subscriptions. If historical provider records are mounted for an integration, read them when the user asks for current state; otherwise rely on incoming events and explicit user-provided context.') + lines.push('Do not assume notifications will arrive until a later integrations update confirms active subscriptions. If historical provider records are mounted for an integration, read them when the user asks for current state; otherwise rely on incoming event notifications, inline event context, and the event file path when present.') } else if (subscriptions.length === 0) { lines.push('No integration event subscriptions are active for this project.') } else { diff --git a/src/renderer/src/components/settings/AccountSettings.tsx b/src/renderer/src/components/settings/AccountSettings.tsx index a029a156..6cb99e2f 100644 --- a/src/renderer/src/components/settings/AccountSettings.tsx +++ b/src/renderer/src/components/settings/AccountSettings.tsx @@ -79,7 +79,8 @@ function capabilityLabel(adapter: IntegrationAdapter): string { function defaultScope(adapter: IntegrationAdapter): Record { return { provider: adapter.provider, - scopes: adapter.requiredScopes || [] + scopes: adapter.requiredScopes || [], + ...(canonicalProviderKey(adapter.provider) === 'slack' ? { listenDms: true } : {}) } } diff --git a/src/renderer/src/components/settings/ProjectSettings.tsx b/src/renderer/src/components/settings/ProjectSettings.tsx index 047a3e4d..543823c9 100644 --- a/src/renderer/src/components/settings/ProjectSettings.tsx +++ b/src/renderer/src/components/settings/ProjectSettings.tsx @@ -226,6 +226,18 @@ function scopeStringList(scope: Record, key: string): string[] : [] } +function scopeBooleanDefault(scope: Record, keys: string[], defaultValue: boolean): boolean { + for (const key of keys) { + const value = scope[key] + if (typeof value === 'boolean') return value + } + return defaultValue +} + +function slackListenDmsFromScope(scope: Record): boolean { + return scopeBooleanDefault(scope, ['listenDms', 'listenDirectMessages', 'directMessages'], true) +} + function localPathSegments(path: string): string[] { return path.split(/[\\/]+/u).filter(Boolean) } @@ -345,6 +357,7 @@ function IntegrationVisibilitySection({ const [error, setError] = useState(null) const [scopeEditorIntegrationId, setScopeEditorIntegrationId] = useState(null) const [pendingScopeValue, setPendingScopeValue] = useState(null) + const [pendingSlackListenDms, setPendingSlackListenDms] = useState(null) const resourceCacheRef = useRef(new Map()) const load = useCallback(async () => { @@ -560,13 +573,14 @@ function IntegrationVisibilitySection({ }, [cachedResources, projectId]) const saveSlackSourceChannels = useCallback(async (integration: ConnectedIntegration) => { - if (!pendingScopeValue) return + const listenDms = pendingSlackListenDms ?? slackListenDmsFromScope(integration.scope) const nextScope = { ...integration.scope, - provider: pendingScopeValue.scope.provider, - selection: pendingScopeValue.scope.selection, - channels: pendingScopeValue.scope.channels, - resources: pendingScopeValue.scope.resources + provider: pendingScopeValue?.scope.provider ?? integration.scope.provider ?? 'slack', + selection: pendingScopeValue?.scope.selection ?? integration.scope.selection ?? 'selected', + channels: pendingScopeValue?.scope.channels ?? integration.scope.channels ?? [], + resources: pendingScopeValue?.scope.resources ?? integration.scope.resources ?? [], + listenDms } setBusyIntegrationId(integration.integrationId) @@ -576,7 +590,7 @@ function IntegrationVisibilitySection({ projectId, integration.integrationId, nextScope, - pendingScopeValue.mountPaths + pendingScopeValue?.mountPaths ?? integration.mountPaths ) setIntegrations((current) => current.map((entry) => @@ -585,12 +599,13 @@ function IntegrationVisibilitySection({ ) setScopeEditorIntegrationId(null) setPendingScopeValue(null) + setPendingSlackListenDms(null) } catch (err) { setError(err instanceof Error ? err.message : String(err)) } finally { setBusyIntegrationId(null) } - }, [pendingScopeValue, projectId]) + }, [pendingScopeValue, pendingSlackListenDms, projectId]) return (
`agent:${agent}`), @@ -656,6 +676,7 @@ function IntegrationVisibilitySection({ disabled={busy} onClick={() => { setPendingScopeValue(null) + setPendingSlackListenDms(scopeEditorOpen ? null : savedSlackListenDms) setScopeEditorIntegrationId(scopeEditorOpen ? null : integration.integrationId) }} className={`flex h-8 w-8 shrink-0 items-center justify-center rounded-md border transition-colors disabled:opacity-40 ${ @@ -751,6 +772,19 @@ function IntegrationVisibilitySection({ listAccessibleResources={() => listSlackChannelResources(integration)} onChange={setPendingScopeValue} /> +