Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 267 additions & 6 deletions src/main/__tests__/integration-event-bridge.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import assert from 'node:assert/strict'
import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { beforeEach, test } from 'node:test'

Expand Down Expand Up @@ -53,13 +55,17 @@ function integration(overrides: Partial<ConnectedIntegration> & {
function changeEvent(
path: string,
provider = path.split('/')[1] || 'github',
overrides: { digest?: string; origin?: string; revision?: string } = {}
overrides: { digest?: string; occurredAt?: string; origin?: string; revision?: string } = {}
): ChangeEvent {
const slackTs = path.match(/\/(?:messages|replies)\/(\d{10})_(\d+)(?:\/|\.json$)/u)
const occurredAt = overrides.occurredAt ?? (slackTs?.[1]
? new Date(Number(`${slackTs[1]}.${slackTs[2] || '0'}`) * 1000).toISOString()
: '2026-06-04T00:00:00.000Z')
return {
id: `evt:${path}`,
workspace: 'workspace-id',
type: 'relayfile.changed',
occurredAt: '2026-06-04T00:00:00.000Z',
occurredAt,
resource: {
path,
provider,
Expand All @@ -83,6 +89,29 @@ function changeEvent(
} as ChangeEvent
}

function changeEventWithFullData(
path: string,
provider: string,
data: Record<string, unknown>
): ChangeEvent {
return {
...changeEvent(path, provider),
expand: async (level = 'summary') => level === 'full'
? {
level,
path,
data
}
: {
level,
path,
summary: {
title: path
}
}
} as ChangeEvent
}

function makeHarness(agents = ['alice', 'bob'], options: { failSend?: boolean } = {}): {
bridge: IntegrationEventBridge
subscribeCalls: SubscribeCall[]
Expand Down Expand Up @@ -135,6 +164,13 @@ beforeEach(() => {
delete process.env.PEAR_INTEGRATION_EVENTS_DEBUG
})

async function waitForSent(harness: { sent: SentMessage[] }, count: number): Promise<void> {
const deadline = Date.now() + 1_000
while (harness.sent.length < count && Date.now() < deadline) {
await new Promise((resolve) => setTimeout(resolve, 10))
}
}

test('integration events route only to the targets for the matching integration path', async () => {
const harness = makeHarness()

Expand Down Expand Up @@ -175,6 +211,7 @@ test('channel notification targets do not fall back to all project agents', asyn
])

await harness.emit(changeEvent('/slack/channels/general/messages/123.json', 'slack'))
await waitForSent(harness, 1)

assert.deepEqual(harness.sent.map((message) => message.input.to), ['#triage'])
assert.deepEqual(harness.listAgentsCalls, [])
Expand All @@ -193,6 +230,7 @@ test('offline notification agents fall back to current project agents', async ()
])

await harness.emit(changeEvent('/slack/channels/general/messages/123.json', 'slack'))
await waitForSent(harness, 2)

assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'bob'])
assert.deepEqual(harness.listAgentsCalls, ['project-1'])
Expand All @@ -215,24 +253,238 @@ test('integration events watch selected relayfile mount paths', async () => {

assert.deepEqual(harness.subscribeCalls[0].globs, [
'/slack/channels/C123ABC/**',
'/slack/channels/C123ABC__proj-cloud/**'
'/slack/channels/C123ABC__proj-cloud/**',
'/slack/channels/D*/**',
'/slack/dms/*/**',
'/slack/users/*/messages/**'
])
assert.deepEqual(integrationSubscriptionSummaries([slackIntegration])[0].watches, [
'.integrations/slack/channels/C123ABC/**',
'.integrations/slack/channels/C123ABC__proj-cloud/**'
'.integrations/slack/channels/C123ABC__proj-cloud/**',
'.integrations/slack/channels/D*/**',
'.integrations/slack/dms/*/**',
'.integrations/slack/users/*/messages/**'
])

await harness.emit(changeEvent('/slack/channels/C123ABC__proj-cloud/messages/1713220123_001100/meta.json', 'slack'))
await waitForSent(harness, 1)

assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice'])
assert.match(harness.sent[0].input.text, /Path: \.integrations\/slack\/channels\/C123ABC__proj-cloud\/messages\/1713220123_001100\/meta\.json/u)
assert.match(harness.sent[0].input.text, /Relayfile path: \/slack\/channels\/C123ABC__proj-cloud\/messages\/1713220123_001100\/meta\.json/u)

harness.sent.splice(0)
await harness.emit(changeEvent('/slack/channels/C123ABC/messages/1713220124_001100/meta.json', 'slack'))
await waitForSent(harness, 1)
assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice'])

harness.sent.splice(0)
await harness.emit(changeEvent('/slack/channels/C999XYZ/messages/1713220125_001100/meta.json', 'slack'))
assert.deepEqual(harness.sent, [])

await harness.emit(changeEvent('/slack/channels/D123ABC/messages/1713220126_001100/meta.json', 'slack'))
await waitForSent(harness, 1)
assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice'])
})

test('slack direct message event scope can be disabled', async () => {
const harness = makeHarness()
const slackIntegration = integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
scope: {
channels: ['C123ABC'],
listenDms: false,
notifyAgents: ['alice']
}
})

await harness.bridge.reconcile('project-1', [slackIntegration])

assert.deepEqual(harness.subscribeCalls[0].globs, [
'/slack/channels/C123ABC/**',
'/slack/channels/C123ABC__proj-cloud/**'
])

await harness.emit(changeEvent('/slack/channels/D123ABC/messages/1713220126_001100/meta.json', 'slack'))
assert.deepEqual(harness.sent, [])
})

test('slack backfill and malformed nested message paths are not injected', async () => {
const harness = makeHarness(['alice'])
const workspaceId = 'workspace-id'
const originalHome = process.env.HOME
const tempHome = await mkdtemp(join(tmpdir(), 'pear-integration-event-'))
const workspaceRoot = join(tempHome, '.agentworkforce', 'pear', 'relayfile', 'workspaces', workspaceId)
const stalePath = '/slack/channels/C123ABC__proj-cloud/messages/1780017507_077969/meta.json'
const staleLocalPath = join(workspaceRoot, ...stalePath.split('/').filter(Boolean))

try {
process.env.HOME = tempHome
await mkdir(join(staleLocalPath, '..'), { recursive: true })
await writeFile(staleLocalPath, JSON.stringify({
provider: 'slack',
objectType: 'message',
payload: {
text: 'old synced message',
channel: 'C123ABC',
ts: '1780017507.077969'
}
}))

await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
scope: { notifyAgents: ['alice'] }
})
])

await harness.emit({
...changeEvent(stalePath, 'slack'),
occurredAt: '2026-06-05T14:14:57.314Z'
})
assert.deepEqual(harness.sent, [])

await harness.emit(changeEvent(
'/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/meta.json',
'slack'
))
assert.deepEqual(harness.sent, [])
} finally {
await harness.bridge.close('project-1')
if (originalHome === undefined) {
delete process.env.HOME
} else {
process.env.HOME = originalHome
}
await rm(tempHome, { recursive: true, force: true })
}
})

test('slack thread reply events include local message text in the injected system message', async () => {
const harness = makeHarness(['alice'])
const workspaceId = 'workspace-id'
const originalHome = process.env.HOME
const tempHome = await mkdtemp(join(tmpdir(), 'pear-integration-event-'))

try {
process.env.HOME = tempHome
const replyPath = '/slack/channels/C123ABC__proj-cloud/threads/1780667635_192799/replies/1780668181_544139.json'
const localPath = join(tempHome, '.agentworkforce', 'pear', 'relayfile', 'workspaces', workspaceId, ...replyPath.split('/').filter(Boolean))
await mkdir(join(localPath, '..'), { recursive: true })
await writeFile(localPath, JSON.stringify({
provider: 'slack',
objectType: 'thread_reply',
payload: {
text: '<@U123> please handle this thread request',
channel: 'C123ABC',
thread_ts: '1780667635.192799',
ts: '1780668181.544139',
user: 'U456'
}
}))

await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
scope: { notifyAgents: ['alice'] }
})
])

await harness.emit(changeEvent(replyPath, 'slack'))
await waitForSent(harness, 1)

assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice'])
assert.match(harness.sent[0].input.text, /Slack text: <@U123> please handle this thread request/u)
assert.match(harness.sent[0].input.text, /Slack thread ts: 1780667635\.192799/u)
} finally {
await harness.bridge.close('project-1')
if (originalHome === undefined) {
delete process.env.HOME
} else {
process.env.HOME = originalHome
}
await rm(tempHome, { recursive: true, force: true })
}
})

test('remote slack events include expanded message text before local mount sync catches up', async () => {
const harness = makeHarness(['alice'])
const replyPath = '/slack/channels/C123ABC__proj-cloud/threads/1780667635_192799/replies/1780668181_544139.json'

await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
scope: { notifyAgents: ['alice'] }
})
])

await harness.emit(changeEventWithFullData(replyPath, 'slack', {
provider: 'slack',
objectType: 'thread_reply',
payload: {
text: '<@U123> remote stream text',
channel: 'C123ABC',
thread_ts: '1780667635.192799',
ts: '1780668181.544139',
user: 'U456'
}
}))
await waitForSent(harness, 1)

assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice'])
assert.match(harness.sent[0].input.text, /Slack text: <@U123> remote stream text/u)
assert.match(harness.sent[0].input.text, /Slack thread ts: 1780667635\.192799/u)
})

test('slack local event context rejects traversal paths', async () => {
const harness = makeHarness(['alice'])
const workspaceId = 'workspace-id'
const originalHome = process.env.HOME
const tempHome = await mkdtemp(join(tmpdir(), 'pear-integration-event-'))

try {
process.env.HOME = tempHome
const escapedPath = join(tempHome, '.agentworkforce', 'pear', 'relayfile', 'workspaces', 'leak.json')
await mkdir(join(escapedPath, '..'), { recursive: true })
await writeFile(escapedPath, JSON.stringify({
payload: {
text: 'escaped local file should not be injected'
}
}))

await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
scope: { notifyAgents: ['alice'] }
})
])

await harness.emit(changeEvent('/slack/channels/C123ABC__proj-cloud/threads/../../../../../leak.json', 'slack'))
await waitForSent(harness, 1)

assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice'])
assert.doesNotMatch(harness.sent[0].input.text, /escaped local file should not be injected/u)
} finally {
await harness.bridge.close('project-1')
if (originalHome === undefined) {
delete process.env.HOME
} else {
process.env.HOME = originalHome
}
await rm(tempHome, { recursive: true, force: true })
}
})

test('local fallback watchers are disabled when historical download is off', () => {
const roots = localWatchRootsFor(
'workspace-id',
Expand All @@ -248,6 +500,7 @@ test('local fallback watchers are disabled when historical download is off', ()
})
],
[
'/slack/channels/D*/**',
'/slack/channels/C123ABC/**',
'/slack/channels/C123ABC__proj-cloud/**',
'/slack/dms/D123ABC/**'
Expand Down Expand Up @@ -418,10 +671,16 @@ test('integration events preserve discovery mount paths', async () => {
await harness.bridge.reconcile('project-1', [slackIntegration])

assert.deepEqual(harness.subscribeCalls[0].globs, [
'/discovery/slack/**'
'/discovery/slack/**',
'/slack/channels/D*/**',
'/slack/dms/*/**',
'/slack/users/*/messages/**'
])
assert.deepEqual(integrationSubscriptionSummaries([slackIntegration])[0].watches, [
'.integrations/discovery/slack/**'
'.integrations/discovery/slack/**',
'.integrations/slack/channels/D*/**',
'.integrations/slack/dms/*/**',
'.integrations/slack/users/*/messages/**'
])

await harness.emit(changeEvent('/discovery/slack/actions/create-message/.schema.json', 'slack'))
Expand Down Expand Up @@ -471,6 +730,7 @@ test('resource alias mount paths with the same revision inject one logical chang
'slack',
{ revision: 'same-content' }
))
await waitForSent(harness, 1)

assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice'])
})
Expand Down Expand Up @@ -519,6 +779,7 @@ test('integration events ignore index, discovery, tmp, dotfile, and local writeb
await harness.emit(changeEvent('/github/repos/create.json', 'github'))
await harness.emit(changeEvent('/slack/channels/C123ABC/messages/claude-1-codex-spawned.json', 'slack'))
await harness.emit(changeEvent('/slack/channels/C123ABC/threads/1780607825_485189/replies/claude-1-issue82-ack.json', 'slack'))
await harness.emit(changeEvent('/github/repos/.widgets.json.tmp-123', 'github'))

assert.deepEqual(harness.sent, [])
assert.deepEqual(harness.listAgentsCalls, [])
Expand Down
Loading
Loading