From 30de4ae0529f23efdb1c35b5fee32ce27cd8856b Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 5 Jun 2026 16:40:26 +0200 Subject: [PATCH] Add targeted integration context reads --- .../integration-event-bridge.test.ts | 318 ++++++++---------- src/main/integration-event-bridge.ts | 220 +++++++----- src/main/integrations.test.ts | 73 +++- src/main/integrations.ts | 46 ++- src/main/ipc-handlers.ts | 4 + src/preload/index.ts | 2 + src/shared/types/ipc.ts | 1 + 7 files changed, 400 insertions(+), 264 deletions(-) diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 89e005bd..f10654d6 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -1,6 +1,4 @@ import assert from 'node:assert/strict' -import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises' -import { tmpdir } from 'node:os' import { join } from 'node:path' import { beforeEach, test } from 'node:test' @@ -94,29 +92,6 @@ function changeEvent( } as ChangeEvent } -function changeEventWithFullData( - path: string, - provider: string, - data: Record -): ChangeEvent { - return { - ...changeEvent(path, provider), - expand: async (level = 'summary') => level === 'full' - ? { - level, - path, - data - } - : { - level, - path, - summary: { - title: path - } - } - } as ChangeEvent -} - async function withMockedNow(isoTimestamp: string, fn: () => Promise): Promise { const originalDateNow = Date.now Date.now = () => Date.parse(isoTimestamp) @@ -127,14 +102,28 @@ async function withMockedNow(isoTimestamp: string, fn: () => Promise): Pro } } -function makeHarness(agents = ['alice', 'bob'], options: { failSend?: boolean } = {}): { +function makeHarness( + agents = ['alice', 'bob'], + options: { + failSend?: boolean + readFileResponse?: (workspaceId: string, path: string) => { + path: string + revision: string + contentType: string + content: string + encoding: 'utf-8' | 'base64' + } + } = {} +): { bridge: IntegrationEventBridge subscribeCalls: SubscribeCall[] + readFileCalls: Array<{ workspaceId: string; path: string }> sent: SentMessage[] listAgentsCalls: string[] emit(event: ChangeEvent): Promise } { const subscribeCalls: SubscribeCall[] = [] + const readFileCalls: Array<{ workspaceId: string; path: string }> = [] const sent: SentMessage[] = [] const listAgentsCalls: string[] = [] const subscriptions: Subscription[] = [] @@ -149,6 +138,16 @@ function makeHarness(agents = ['alice', 'bob'], options: { failSend?: boolean } const subscription = { unsubscribe: async () => undefined } subscriptions.push(subscription) return subscription + }, + async readFile(workspaceId, path) { + readFileCalls.push({ workspaceId, path }) + return options.readFileResponse?.(workspaceId, path) ?? { + path, + revision: 'rev-1', + contentType: 'application/json', + content: JSON.stringify({ provider: 'slack', text: 'targeted Slack context' }), + encoding: 'utf-8' + } } }) }), @@ -171,7 +170,7 @@ function makeHarness(agents = ['alice', 'bob'], options: { failSend?: boolean } await new Promise((resolve) => setImmediate(resolve)) } - return { bridge, subscribeCalls, sent, listAgentsCalls, emit } + return { bridge, subscribeCalls, readFileCalls, sent, listAgentsCalls, emit } } beforeEach(() => { @@ -297,14 +296,29 @@ test('integration events watch selected relayfile mount paths', async () => { '.integrations/slack/users/*/messages/**' ]) - await harness.emit(changeEvent('/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json', 'slack')) + const selectedPath = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' + await harness.emit(changeEvent(selectedPath, 'slack')) await waitForSent(harness, 1) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) assert.match(harness.sent[0].input.text, /Path: \.integrations\/slack\/channels\/C123ABC__proj-cloud\/messages\/1780668000_000000\/meta\.json/u) assert.match(harness.sent[0].input.text, /Relayfile path: \/slack\/channels\/C123ABC__proj-cloud\/messages\/1780668000_000000\/meta\.json/u) + assert.match(harness.sent[0].input.text, /Inline context preview:/u) + assert.match(harness.sent[0].input.text, /targeted Slack context/u) + assert.equal(harness.sent[0].input.data?.provider, 'slack') + assert.equal(harness.sent[0].input.data?.resourcePath, selectedPath) + assert.equal(harness.sent[0].input.data?.resourceId, 'meta.json') + assert.deepEqual(harness.readFileCalls, [ + { + workspaceId: 'workspace-id', + path: selectedPath + } + ]) + assert.equal((harness.sent[0].input.data?.contextPreview as { kind?: string } | undefined)?.kind, 'text') + assert.equal((harness.sent[0].input.data?.contextPreview as { content?: string } | undefined)?.content, undefined) harness.sent.splice(0) + harness.readFileCalls.splice(0) await harness.emit(changeEvent('/slack/channels/C123ABC/messages/1780668060_000000/meta.json', 'slack')) assert.deepEqual(harness.sent, []) @@ -441,186 +455,150 @@ test('slack direct message event scope can be disabled', async () => { test('slack backfill and malformed nested message paths are not injected', async () => { const harness = makeHarness(['alice']) - const workspaceId = 'workspace-id' - const originalHome = process.env.HOME - const tempHome = await mkdtemp(join(tmpdir(), 'pear-integration-event-')) - const workspaceRoot = join(tempHome, '.agentworkforce', 'pear', 'relayfile', 'workspaces', workspaceId) const stalePath = '/slack/channels/C123ABC__proj-cloud/messages/1780017507_077969/meta.json' - const staleLocalPath = join(workspaceRoot, ...stalePath.split('/').filter(Boolean)) - try { - process.env.HOME = tempHome - await mkdir(join(staleLocalPath, '..'), { recursive: true }) - await writeFile(staleLocalPath, JSON.stringify({ - provider: 'slack', - objectType: 'message', - payload: { - text: 'old synced message', - channel: 'C123ABC', - ts: '1780017507.077969' - } - })) - - await withMockedNow('2026-06-05T14:00:00.000Z', async () => { - await harness.bridge.reconcile('project-1', [ - integration({ - provider: 'slack', - integrationId: 'slack-1', - mountPaths: ['/slack/channels/C123ABC__proj-cloud'], - scope: { notifyAgents: ['alice'] } - }) - ]) - }) + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + }) - await harness.emit({ - ...changeEvent(stalePath, 'slack'), - occurredAt: '2026-06-05T14:14:57.314Z' - }) - assert.deepEqual(harness.sent, []) - await waitForDropped('project-1', 1) - assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 1) - - await harness.emit(changeEvent( - '/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/meta.json', - 'slack' - )) - assert.deepEqual(harness.sent, []) - await waitForDropped('project-1', 2) - assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 2) - } finally { - await harness.bridge.close('project-1') - if (originalHome === undefined) { - delete process.env.HOME - } else { - process.env.HOME = originalHome - } - await rm(tempHome, { recursive: true, force: true }) - } + await harness.emit({ + ...changeEvent(stalePath, 'slack'), + occurredAt: '2026-06-05T14:14:57.314Z' + }) + assert.deepEqual(harness.sent, []) + await waitForDropped('project-1', 1) + assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 1) + + await harness.emit(changeEvent( + '/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/meta.json', + 'slack' + )) + assert.deepEqual(harness.sent, []) + await waitForDropped('project-1', 2) + assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 2) }) -test('slack thread reply events include local message text in the injected system message', async () => { +test('slack context resolves with history off through one targeted remote preview', async () => { const harness = makeHarness(['alice']) - const workspaceId = 'workspace-id' - const originalHome = process.env.HOME - const tempHome = await mkdtemp(join(tmpdir(), 'pear-integration-event-')) + const replyPath = '/slack/channels/C123ABC__proj-cloud/threads/1780667635_192799/replies/1780668181_544139.json' - try { - process.env.HOME = tempHome - const replyPath = '/slack/channels/C123ABC__proj-cloud/threads/1780667635_192799/replies/1780668181_544139.json' - const localPath = join(tempHome, '.agentworkforce', 'pear', 'relayfile', 'workspaces', workspaceId, ...replyPath.split('/').filter(Boolean)) - await mkdir(join(localPath, '..'), { recursive: true }) - await writeFile(localPath, JSON.stringify({ - provider: 'slack', - objectType: 'thread_reply', - payload: { - text: '<@U123> please handle this thread request', - channel: 'C123ABC', - thread_ts: '1780667635.192799', - ts: '1780668181.544139', - user: 'U456' - } - })) - - await withMockedNow('2026-06-05T14:00:00.000Z', async () => { - await harness.bridge.reconcile('project-1', [ - integration({ - provider: 'slack', - integrationId: 'slack-1', - mountPaths: ['/slack/channels/C123ABC__proj-cloud'], - scope: { notifyAgents: ['alice'] } - }) - ]) - }) + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + downloadHistoricalData: false, + scope: { notifyAgents: ['alice'] } + }) + ]) + }) - await harness.emit(changeEvent(replyPath, 'slack')) - await waitForSent(harness, 1) + await harness.emit(changeEvent(replyPath, 'slack')) + await waitForSent(harness, 1) - assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) - assert.match(harness.sent[0].input.text, /Slack text: <@U123> please handle this thread request/u) - assert.match(harness.sent[0].input.text, /Slack thread ts: 1780667635\.192799/u) - } finally { - await harness.bridge.close('project-1') - if (originalHome === undefined) { - delete process.env.HOME - } else { - process.env.HOME = originalHome + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) + assert.match(harness.sent[0].input.text, /Inline context preview:/u) + assert.match(harness.sent[0].input.text, /targeted Slack context/u) + assert.equal(harness.sent[0].input.text.match(/Inline context preview:/gu)?.length, 1) + assert.doesNotMatch(harness.sent[0].input.text, /Slack text:/u) + assert.deepEqual(harness.readFileCalls, [ + { + workspaceId: 'workspace-id', + path: replyPath } - await rm(tempHome, { recursive: true, force: true }) - } + ]) + assert.equal((harness.sent[0].input.data?.contextPreview as { kind?: string } | undefined)?.kind, 'text') + assert.equal((harness.sent[0].input.data?.contextPreview as { content?: string } | undefined)?.content, undefined) }) -test('remote slack events include expanded message text before local mount sync catches up', async () => { - const harness = makeHarness(['alice']) - const replyPath = '/slack/channels/C123ABC__proj-cloud/threads/1780667635_192799/replies/1780668181_544139.json' +test('integration event targeted context previews skip binary files', async () => { + const harness = makeHarness(['alice'], { + readFileResponse: (_workspaceId, path) => ({ + path, + revision: 'rev-binary', + contentType: 'application/octet-stream', + content: Buffer.from([0, 1, 2, 3]).toString('base64'), + encoding: 'base64' + }) + }) await withMockedNow('2026-06-05T14:00:00.000Z', async () => { await harness.bridge.reconcile('project-1', [ integration({ provider: 'slack', integrationId: 'slack-1', - mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + mountPaths: ['/slack/channels/C123ABC'], scope: { notifyAgents: ['alice'] } }) ]) }) - await harness.emit(changeEventWithFullData(replyPath, 'slack', { - provider: 'slack', - objectType: 'thread_reply', - payload: { - text: '<@U123> remote stream text', - channel: 'C123ABC', - thread_ts: '1780667635.192799', - ts: '1780668181.544139', - user: 'U456' - } - })) + await harness.emit(changeEvent('/slack/channels/C123ABC/messages/1780668000_000000/meta.json', 'slack')) await waitForSent(harness, 1) - assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) - assert.match(harness.sent[0].input.text, /Slack text: <@U123> remote stream text/u) - assert.match(harness.sent[0].input.text, /Slack thread ts: 1780667635\.192799/u) + assert.match(harness.sent[0].input.text, /Context preview skipped: binary content/u) + assert.equal((harness.sent[0].input.data?.contextPreview as { kind?: string } | undefined)?.kind, 'binary') }) -test('slack local event context rejects traversal paths', async () => { - const harness = makeHarness(['alice']) - const workspaceId = 'workspace-id' - const originalHome = process.env.HOME - const tempHome = await mkdtemp(join(tmpdir(), 'pear-integration-event-')) +test('integration event targeted context previews skip files above the injection cap', async () => { + const harness = makeHarness(['alice'], { + readFileResponse: (_workspaceId, path) => ({ + path, + revision: 'rev-large', + contentType: 'application/json', + content: 'x'.repeat(33 * 1024), + encoding: 'utf-8' + }) + }) - try { - process.env.HOME = tempHome - const escapedPath = join(tempHome, '.agentworkforce', 'pear', 'relayfile', 'workspaces', 'leak.json') - await mkdir(join(escapedPath, '..'), { recursive: true }) - await writeFile(escapedPath, JSON.stringify({ - payload: { - text: 'escaped local file should not be injected' - } - })) + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC'], + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + await harness.emit(changeEvent('/slack/channels/C123ABC/messages/1780668000_000000/meta.json', 'slack')) + await waitForSent(harness, 1) + assert.match(harness.sent[0].input.text, /exceeds the injection preview cap/u) + assert.equal((harness.sent[0].input.data?.contextPreview as { kind?: string } | undefined)?.kind, 'too-large') +}) + +test('integration event targeted context read is skipped for deleted files', async () => { + const harness = makeHarness(['alice']) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { await harness.bridge.reconcile('project-1', [ integration({ provider: 'slack', integrationId: 'slack-1', - mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + mountPaths: ['/slack/channels/C123ABC'], scope: { notifyAgents: ['alice'] } }) ]) + }) - await harness.emit(changeEvent('/slack/channels/C123ABC__proj-cloud/threads/../../../../../leak.json', 'slack')) - await waitForSent(harness, 1) + await harness.emit({ + ...changeEvent('/slack/channels/C123ABC/messages/1780668000_000000/meta.json', 'slack'), + type: 'file.deleted' + }) + await waitForSent(harness, 1) - assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) - assert.doesNotMatch(harness.sent[0].input.text, /escaped local file should not be injected/u) - } finally { - await harness.bridge.close('project-1') - if (originalHome === undefined) { - delete process.env.HOME - } else { - process.env.HOME = originalHome - } - await rm(tempHome, { recursive: true, force: true }) - } + assert.deepEqual(harness.readFileCalls, []) + assert.equal(harness.sent[0].input.data?.contextPreview, undefined) }) test('historical replay allowance is scoped to the matching integration', async () => { diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index ba8693e9..d20da2be 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -1,5 +1,5 @@ import { existsSync, watch, type FSWatcher } from 'node:fs' -import { appendFile, mkdir, readFile, stat } from 'node:fs/promises' +import { appendFile, mkdir, stat } from 'node:fs/promises' import { homedir } from 'node:os' import { dirname, isAbsolute, join, relative, resolve, sep } from 'node:path' import { @@ -7,6 +7,7 @@ import { RelayFileSync, RelayfileSetup, type ChangeEvent, + type FileReadResponse, type FilesystemEvent, type Subscription } from '@relayfile/sdk' @@ -25,13 +26,13 @@ const REPLAY_SKEW_TOLERANCE_MS = 15 * 60_000 const INTEGRATION_EVENT_LOG_PATH = join(homedir(), '.agentworkforce', 'pear', 'integration-events.log') const AGGREGATED_WARNING_REPEAT_EVERY = 25 const MAX_AGGREGATED_WARNING_KEYS = 256 -const MAX_INLINE_EVENT_TEXT_CHARS = 4_000 const SLACK_LIVE_EVENT_WINDOW_MS = 30 * 60 * 1_000 const SLACK_DM_EVENT_GLOBS = [ '/slack/channels/D*/**', '/slack/dms/*/**', '/slack/users/*/messages/**' ] +const MAX_EVENT_CONTEXT_PREVIEW_BYTES = 32 * 1024 type IntegrationEventCounterName = 'eventsReceived' | 'eventsInjected' | 'eventsCoalesced' | 'eventsDropped' type IntegrationEventGaugeName = 'queueDepth' | 'mountCount' @@ -61,6 +62,16 @@ type ProjectSubscription = { signature: string } +type EventContextPreview = { + path: string + kind: 'text' | 'binary' | 'too-large' + content: string + size: number + contentType?: string +} + +type EventContextPreviewMetadata = Omit + type LocalMountSubscription = Subscription & { localRoots: string[] } @@ -105,6 +116,7 @@ type RelayfileEventClient = { onQueueDepth?: (depth: number) => void } ): Subscription + readFile?(workspaceId: string, path: string): Promise } type RelayfileWorkspaceHandle = { @@ -196,11 +208,6 @@ function toErrorMessage(error: unknown): string { return error instanceof Error ? error.message : String(error) } -function truncateForSystemMessage(text: string, maxChars = MAX_INLINE_EVENT_TEXT_CHARS): string { - const normalized = text.trim() - return normalized.length > maxChars ? `${normalized.slice(0, maxChars)}...` : normalized -} - function isUnauthorizedError(error: unknown): boolean { if (!error || typeof error !== 'object') return false const status = (error as { httpStatus?: unknown; status?: unknown }).httpStatus ?? @@ -1040,6 +1047,69 @@ function eventSummaryValue(value: unknown): string | undefined { return undefined } +function integrationEventMetadata(event: ChangeEvent): Record { + const summary = isRecord(event.summary) ? event.summary : {} + const resource = isRecord(event.resource) ? event.resource : {} + const actor = isRecord(summary.actor) + ? eventSummaryValue(summary.actor.displayName) || eventSummaryValue(summary.actor.id) + : undefined + return { + provider: eventSummaryValue(resource.provider), + resourcePath: eventSummaryValue(resource.path), + resourceId: eventSummaryValue(resource.id), + title: eventSummaryValue(summary.title), + status: eventSummaryValue(summary.status), + actor + } +} + +function eventContextPreviewFromFile(file: FileReadResponse): EventContextPreview { + const rawContent = file.content || '' + const buffer = file.encoding === 'base64' + ? Buffer.from(rawContent, 'base64') + : Buffer.from(rawContent, 'utf8') + const size = buffer.byteLength + + // The current Relayfile SDK readFile call returns the full file; this cap only + // bounds what Pear injects into agent context after the targeted read. + if (size > MAX_EVENT_CONTEXT_PREVIEW_BYTES) { + return { + path: file.path, + kind: 'too-large', + content: '', + size, + contentType: file.contentType + } + } + + if (buffer.includes(0)) { + return { + path: file.path, + kind: 'binary', + content: '', + size, + contentType: file.contentType + } + } + + return { + path: file.path, + kind: 'text', + content: buffer.toString('utf8'), + size, + contentType: file.contentType + } +} + +function eventContextPreviewMetadata(preview: EventContextPreview): EventContextPreviewMetadata { + return { + path: preview.path, + kind: preview.kind, + size: preview.size, + contentType: preview.contentType + } +} + function shouldNotifyRelayfilePath(pathValue: string): boolean { const path = pathValue.trim() if (!path || !path.startsWith('/')) return false @@ -1093,38 +1163,11 @@ function repeatedSlackRoot(path: string): boolean { return (matches?.length || 0) > 1 } -async function readLocalEventRecord(event: ChangeEvent, localMountWorkspaceId: string): Promise { - const localPath = localPathForRemoteRoot(localMountWorkspaceId, event.resource.path) - const raw = await readFile(localPath, 'utf8').catch(() => null) - if (!raw) return null - try { - return JSON.parse(raw) - } catch { - return null - } -} - -function recordWebhookReceivedAtMs(record: unknown): number | null { - if (!isRecord(record)) return null - const payload = isRecord(record.payload) ? record.payload : record - const webhook = isRecord(payload._webhook) ? payload._webhook : undefined - const receivedAt = webhook?.receivedAt - if (typeof receivedAt !== 'string') return null - const parsed = Date.parse(receivedAt) - return Number.isFinite(parsed) ? parsed : null -} - -async function shouldNotifySlackMessageChange(event: ChangeEvent, localMountWorkspaceId: string): Promise { +function shouldNotifySlackMessageChange(event: ChangeEvent): boolean { const path = event.resource.path if (repeatedSlackRoot(path)) return false - const record = await readLocalEventRecord(event, localMountWorkspaceId) const occurredAtMs = Date.parse(event.occurredAt) - const webhookReceivedAtMs = recordWebhookReceivedAtMs(record) - if (webhookReceivedAtMs !== null && Number.isFinite(occurredAtMs)) { - return Math.abs(occurredAtMs - webhookReceivedAtMs) <= SLACK_LIVE_EVENT_WINDOW_MS - } - const slackTsMs = slackEventTimestampMs(path) if (slackTsMs !== null && Number.isFinite(occurredAtMs)) { return Math.abs(occurredAtMs - slackTsMs) <= SLACK_LIVE_EVENT_WINDOW_MS @@ -1133,11 +1176,11 @@ async function shouldNotifySlackMessageChange(event: ChangeEvent, localMountWork return true } -async function shouldNotifyRelayfileChange(event: ChangeEvent, localMountWorkspaceId: string): Promise { +function shouldNotifyRelayfileChange(event: ChangeEvent): boolean { if (eventOrigin(event) === 'agent_write') return false if (!shouldNotifyRelayfilePath(event.resource.path)) return false if (event.resource.provider === 'slack' && slackEventContextPath(event.resource.path)) { - return shouldNotifySlackMessageChange(event, localMountWorkspaceId) + return shouldNotifySlackMessageChange(event) } return true } @@ -1146,52 +1189,10 @@ function slackEventContextPath(path: string): boolean { return /^\/slack\/(?:channels|dms|users)\/[^/]+\/(?:messages|threads)\/.+\.json$/u.test(path) } -function slackRecordContextLines(record: unknown): string[] { - if (!isRecord(record)) return [] - const payload = isRecord(record.payload) ? record.payload : record - const rawEvent = isRecord(payload.raw_event) ? payload.raw_event : undefined - const text = eventSummaryValue(payload.text) || eventSummaryValue(rawEvent?.text) - const user = eventSummaryValue(payload.user) || eventSummaryValue(rawEvent?.user) - const channel = eventSummaryValue(payload.channel) || eventSummaryValue(payload.channelId) || eventSummaryValue(rawEvent?.channel) - const threadTs = eventSummaryValue(payload.thread_ts) || eventSummaryValue(payload.threadTs) || eventSummaryValue(rawEvent?.thread_ts) - const ts = eventSummaryValue(payload.ts) || eventSummaryValue(payload.replyTs) || eventSummaryValue(rawEvent?.ts) - - const lines: string[] = [] - if (text) lines.push(`Slack text: ${truncateForSystemMessage(text)}`) - if (channel) lines.push(`Slack channel: ${channel}`) - if (threadTs) lines.push(`Slack thread ts: ${threadTs}`) - if (ts) lines.push(`Slack message ts: ${ts}`) - if (user) lines.push(`Slack user: ${user}`) - return lines -} - -async function localEventContextLines(event: ChangeEvent, localMountWorkspaceId: string): Promise { - const path = event.resource.path - if (pathSegments(path).some((segment) => segment === '.' || segment === '..')) return [] - return slackRecordContextLines(await readLocalEventRecord(event, localMountWorkspaceId)) -} - -async function expandedEventContextLines(event: ChangeEvent): Promise { - try { - const expanded = await event.expand('full') - if (!isRecord(expanded)) return [] - const data = isRecord(expanded.data) ? expanded.data : expanded - return slackRecordContextLines(data) - } catch { - return [] - } -} - -async function eventContextLines(event: ChangeEvent, localMountWorkspaceId: string): Promise { - const provider = eventSummaryValue(event.resource.provider) - const path = event.resource.path - if (provider !== 'slack' || !slackEventContextPath(path)) return [] - - const expandedLines = await expandedEventContextLines(event) - return expandedLines.length > 0 ? expandedLines : localEventContextLines(event, localMountWorkspaceId) -} - -function formatIntegrationEventMessage(event: ChangeEvent, contextLines: string[] = []): string { +function formatIntegrationEventMessage( + event: ChangeEvent, + contextPreview?: EventContextPreview +): string { const summary = isRecord(event.summary) ? event.summary : {} const resource = isRecord(event.resource) ? event.resource : {} const provider = eventSummaryValue(resource.provider) || 'integration' @@ -1224,10 +1225,21 @@ function formatIntegrationEventMessage(event: ChangeEvent, contextLines: string[ if (actor) lines.push(`Actor: ${actor}`) if (fieldsChanged) lines.push(`Fields changed: ${fieldsChanged}`) if (labels) lines.push(`Labels: ${labels}`) - lines.push(...contextLines) + if (relayfilePath) { + lines.push(`Targeted context path: ${relayfilePath}`) + } + if (contextPreview) { + if (contextPreview.kind === 'text') { + lines.push('Inline context preview:', contextPreview.content) + } else if (contextPreview.kind === 'too-large') { + lines.push(`Context preview skipped: ${contextPreview.size} bytes exceeds the injection preview cap.`) + } else { + lines.push(`Context preview skipped: binary content (${contextPreview.size} bytes).`) + } + } lines.push( - 'Handle this like an incoming user-relevant integration update. The Relayfile path above identifies the changed record; use any inline event context first, then read the matching .integrations path for more detail when available. Use the existing writeback or messaging path when a response is needed.', + 'Handle this like an incoming user-relevant integration update. The inline context preview, Relayfile path, and structured event data identify the changed record. Use the matching .integrations path only when historical download is enabled. Use the existing writeback or messaging path when a response is needed.', '' ) return lines.join('\n') @@ -1400,13 +1412,38 @@ export class IntegrationEventBridge { await Promise.all(Array.from(this.subscriptions.keys()).map((projectId) => this.close(projectId))) } + private async readEventContextPreview(projectId: string, event: ChangeEvent): Promise { + if (event.type === 'file.deleted') return undefined + const path = eventSummaryValue(event.resource.path) + if (!path) return undefined + + try { + const handle = await this.getWorkspaceHandle() + const client = handle.client() + if (typeof client.readFile !== 'function') return undefined + return eventContextPreviewFromFile(await client.readFile(handle.workspaceId, path)) + } catch (error) { + warnIntegrationEventAggregated( + `event context read failed:${projectId}`, + 'event context read failed', + { + projectId, + eventId: event.id, + path, + error: toErrorMessage(error) + } + ) + return undefined + } + } + private async injectEvent( projectId: string, event: ChangeEvent, specs: SubscriptionSpec[], options: EventInjectionOptions ): Promise { - if (!await shouldNotifyRelayfileChange(event, options.localMountWorkspaceId)) { + if (!shouldNotifyRelayfileChange(event)) { incrementIntegrationEventCounter(projectId, 'eventsDropped') logIntegrationEvent('skipped filtered path', { projectId, @@ -1490,7 +1527,10 @@ export class IntegrationEventBridge { }) return } - const contextLines = await eventContextLines(event, options.localMountWorkspaceId) + + const eventMetadata = integrationEventMetadata(event) + const contextPreview = await this.readEventContextPreview(projectId, event) + const contextPreviewData = contextPreview ? eventContextPreviewMetadata(contextPreview) : undefined logIntegrationEvent('injecting', { projectId, eventId: event.id, @@ -1502,7 +1542,7 @@ export class IntegrationEventBridge { const input = { to: recipient, from: 'integration', - text: formatIntegrationEventMessage(event, contextLines), + text: formatIntegrationEventMessage(event, contextPreview), priority: 0, mode: 'steer', data: { @@ -1512,7 +1552,9 @@ export class IntegrationEventBridge { eventType: event.type, occurredAt: event.occurredAt, resource: isRecord(event.resource) ? { ...event.resource } : undefined, - path: event.resource.path + path: event.resource.path, + contextPreview: contextPreviewData, + ...eventMetadata } } as const return bridge.sendMessageAndWaitForDelivery diff --git a/src/main/integrations.test.ts b/src/main/integrations.test.ts index 8e8ecca2..111f1d85 100644 --- a/src/main/integrations.test.ts +++ b/src/main/integrations.test.ts @@ -36,6 +36,36 @@ const mock = vi.hoisted(() => { activeProjectId: 'project-1' } let mountReconcilePromise: Promise = Promise.resolve() + const readFileCalls: Array<{ workspaceId: string; path: string }> = [] + const relayClient = { + readFile: vi.fn(async (workspaceId: string, path: string) => { + readFileCalls.push({ workspaceId, path }) + return { + path, + revision: 'rev-1', + contentType: 'application/json', + content: JSON.stringify({ + provider: 'slack', + objectType: 'message', + payload: { + text: 'hello from the remote Slack record', + channel: 'C123', + ts: '1713220123.001100', + user: 'U456' + } + }), + encoding: 'utf-8' + } + }) + } + const workspaceHandle = { + workspaceId: 'account-workspace-id', + client: vi.fn(() => relayClient) + } + const relayWorkspaceManager = { + withHandle: vi.fn(async (fn: (handle: typeof workspaceHandle) => Promise) => fn(workspaceHandle)), + getWorkspaceHandle: vi.fn(async () => workspaceHandle) + } const jsonResponse = (payload: unknown, status = 200, statusText = 'OK') => ({ ok: status >= 200 && status < 300, @@ -130,6 +160,9 @@ const mock = vi.hoisted(() => { cloudAgentManager: { updateMountPaths: vi.fn(async () => undefined) }, + readFileCalls, + relayClient, + relayWorkspaceManager, brokerManager: { listAgents: vi.fn(async () => []), sendMessage: vi.fn(async () => undefined) @@ -190,10 +223,7 @@ vi.mock('./integration-symlinks', () => ({ })) vi.mock('./relay-workspace', () => ({ - getRelayWorkspaceManager: vi.fn(() => ({ - withHandle: vi.fn(), - getWorkspaceHandle: vi.fn() - })) + getRelayWorkspaceManager: vi.fn(() => mock.relayWorkspaceManager) })) import { IntegrationsManager, localSyncMountPathsForIntegration } from './integrations' @@ -221,6 +251,10 @@ describe('IntegrationsManager', () => { mock.integrationMountManager.stop.mockClear() mock.integrationEventBridge.reconcile.mockClear() mock.cloudAgentManager.updateMountPaths.mockClear() + mock.readFileCalls.splice(0) + mock.relayClient.readFile.mockClear() + mock.relayWorkspaceManager.withHandle.mockClear() + mock.relayWorkspaceManager.getWorkspaceHandle.mockClear() mock.brokerManager.listAgents.mockClear() mock.brokerManager.sendMessage.mockClear() mock.ensureProjectIntegrationsLink.mockClear() @@ -356,4 +390,35 @@ describe('IntegrationsManager', () => { finishMountReconcile() }) + + it('reads a targeted remote Slack event record without reconciling local mounts', async () => { + const manager = new IntegrationsManager() + + const preview = await manager.readRemoteFile( + 'project-1', + '/slack/channels/C123/messages/1713220123_001100.json' + ) + + expect(preview).toMatchObject({ + kind: 'text', + content: expect.stringContaining('hello from the remote Slack record') + }) + expect(mock.readFileCalls).toEqual([ + { + workspaceId: 'account-workspace-id', + path: '/slack/channels/C123/messages/1713220123_001100.json' + } + ]) + expect(mock.integrationMountManager.ensureMounted).not.toHaveBeenCalled() + }) + + it('rejects targeted remote file reads outside the project integration scope', async () => { + const manager = new IntegrationsManager() + + await expect( + manager.readRemoteFile('project-1', '/github/repos/acme/app/issues/1.json') + ).rejects.toThrow('outside this project integration scope') + + expect(mock.readFileCalls).toEqual([]) + }) }) diff --git a/src/main/integrations.ts b/src/main/integrations.ts index e5057cf6..5187c029 100644 --- a/src/main/integrations.ts +++ b/src/main/integrations.ts @@ -1,7 +1,7 @@ import { randomUUID } from 'node:crypto' import { isAbsolute, relative, resolve } from 'node:path' import { BrowserWindow, shell } from 'electron' -import type { TreeResponse, WorkspaceHandle } from '@relayfile/sdk' +import type { FileReadResponse, TreeResponse, WorkspaceHandle } from '@relayfile/sdk' import { accountWorkspaceReadyRetryOptions, getAccountWorkspaceId, getApiUrl, resolveCloudAuth } from './auth' import { brokerManager } from './broker' import { cloudAgentManager } from './cloud-agent' @@ -15,6 +15,7 @@ import { import { canListRemoteDirectoryForMountPaths, canShowRemoteDirectoryEntryForMountPaths, + isRelayfilePathWithinRoot, normalizeRemoteDirectoryPath, remotePathName } from './integration-remote-paths' @@ -212,6 +213,7 @@ const SYSTEM_MESSAGE_DEBOUNCE_MS = 1_000 const LOCAL_MOUNT_CLOUD_HYDRATION_THROTTLE_MS = 30_000 const CATALOG_PATH = '/api/v1/integrations/catalog' const MAX_REMOTE_DIRECTORY_ENTRIES = 5_000 +const MAX_REMOTE_FILE_PREVIEW_BYTES = 1024 * 1024 // Only providers currently active in ../cloud are surfaced. This mirrors the // non-deprecated relayfile providers in @@ -658,6 +660,33 @@ function getPayloadMessage(payload: unknown, fallback: string): string { return fallback } +function remoteFileReadToPreview(file: FileReadResponse): filesystem.FilePreview { + const rawContent = file.content || '' + const buffer = file.encoding === 'base64' + ? Buffer.from(rawContent, 'base64') + : Buffer.from(rawContent, 'utf8') + const size = buffer.byteLength + if (size > MAX_REMOTE_FILE_PREVIEW_BYTES) { + return { + kind: 'too-large', + content: '', + size + } + } + if (buffer.includes(0)) { + return { + kind: 'binary', + content: '', + size + } + } + return { + kind: 'text', + content: buffer.toString('utf8'), + size + } +} + function collectScopeLabels(scope: Record): string[] { const labels: string[] = [] const visit = (value: unknown): void => { @@ -770,6 +799,21 @@ export class IntegrationsManager { return filesystem.readTextPreview(resolvedPath) } + async readRemoteFile(projectId: string, remotePath: string): Promise { + if (!this.findProject(projectId)) throw new Error(`Project not found: ${projectId}`) + const path = normalizeRemoteDirectoryPath(remotePath) + if (!path || path === '/') throw new Error('Integration remote file path is required') + const mountPaths = this.listableRemoteMountPaths(projectId) + if (!mountPaths.some((mountPath) => isRelayfilePathWithinRoot(mountPath, path))) { + throw new Error('Integration remote file is outside this project integration scope') + } + + return this.withWorkspaceHandle(async (handle) => { + const file = await handle.client().readFile(handle.workspaceId, path) + return remoteFileReadToPreview(file) + }) + } + async listRemoteDirectory(projectId: string, remotePath: string): Promise { if (!this.findProject(projectId)) throw new Error(`Project not found: ${projectId}`) const path = normalizeRemoteDirectoryPath(remotePath) diff --git a/src/main/ipc-handlers.ts b/src/main/ipc-handlers.ts index 5df7fbef..482c4742 100644 --- a/src/main/ipc-handlers.ts +++ b/src/main/ipc-handlers.ts @@ -590,6 +590,10 @@ export function registerIpcHandlers(): void { return integrationsManager.readMountPreview(projectId, integrationId, filePath) }) + ipcMain.handle('integrations:read-remote-file', async (_, projectId: string, remotePath: string) => { + return integrationsManager.readRemoteFile(projectId, remotePath) + }) + ipcMain.handle('integrations:list-options', async (_, projectId: string, provider: string, resource: string) => { return integrationsManager.listOptions(projectId, provider, resource) }) diff --git a/src/preload/index.ts b/src/preload/index.ts index 7404c181..c7a6b640 100644 --- a/src/preload/index.ts +++ b/src/preload/index.ts @@ -380,6 +380,8 @@ const api = { invoke('integrations:list-mount-dir', projectId, integrationId, dirPath), listRemoteDir: (projectId: string, remotePath: string) => invoke('integrations:list-remote-dir', projectId, remotePath), + readRemoteFile: (projectId: string, remotePath: string) => + invoke('integrations:read-remote-file', projectId, remotePath), readMountPreview: (projectId: string, integrationId: string, filePath: string) => invoke('integrations:read-mount-preview', projectId, integrationId, filePath), listOptions: (projectId: string, provider: string, resource: string) => diff --git a/src/shared/types/ipc.ts b/src/shared/types/ipc.ts index 091a2c3e..a0f5ff37 100644 --- a/src/shared/types/ipc.ts +++ b/src/shared/types/ipc.ts @@ -916,6 +916,7 @@ export interface PearAPI { telemetry: () => Promise listMountDir: (projectId: string, integrationId: string, dirPath: string) => Promise listRemoteDir: (projectId: string, remotePath: string) => Promise + readRemoteFile: (projectId: string, remotePath: string) => Promise readMountPreview: (projectId: string, integrationId: string, filePath: string) => Promise listOptions: (projectId: string, provider: string, resource: string) => Promise startConnect: (projectId: string, provider: string) => Promise