Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 189 additions & 9 deletions src/main/__tests__/integration-event-bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,21 @@ async function withMockedNow<T>(isoTimestamp: string, fn: () => Promise<T>): Pro
}
}

async function waitForDispatcherTick(): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, 0))
await new Promise((resolve) => setImmediate(resolve))
await new Promise((resolve) => setImmediate(resolve))
}

async function waitUntil(predicate: () => boolean, timeoutMs = 2_000): Promise<void> {
const deadline = Date.now() + timeoutMs
while (Date.now() <= deadline) {
if (predicate()) return
await new Promise((resolve) => setTimeout(resolve, 10))
}
assert.equal(predicate(), true)
}

function makeHarness(
agents = ['alice', 'bob'],
options: {
Expand All @@ -113,6 +128,8 @@ function makeHarness(
content: string
encoding: 'utf-8' | 'base64'
}
sendDelayMs?: number
onSendStart?: (activeSends: number) => void
} = {}
): {
bridge: IntegrationEventBridge
Expand All @@ -127,6 +144,7 @@ function makeHarness(
const sent: SentMessage[] = []
const listAgentsCalls: string[] = []
const subscriptions: Subscription[] = []
let activeSends = 0

const bridge = new IntegrationEventBridge({
getWorkspaceHandle: async () => ({
Expand Down Expand Up @@ -157,17 +175,25 @@ function makeHarness(
return agents.map((name) => ({ name, projectId }))
},
sendMessage: async (projectId, input) => {
if (options.failSend) throw new Error('broker unavailable')
sent.push({ projectId, input })
activeSends += 1
options.onSendStart?.(activeSends)
try {
if (options.sendDelayMs) {
await new Promise((resolve) => setTimeout(resolve, options.sendDelayMs))
}
if (options.failSend) throw new Error('broker unavailable')
sent.push({ projectId, input })
} finally {
activeSends -= 1
}
}
}
})

async function emit(event: ChangeEvent): Promise<void> {
assert.equal(subscribeCalls.length, 1, 'expected a single relayfile subscription')
subscribeCalls[0].onChange(event)
await new Promise((resolve) => setImmediate(resolve))
await new Promise((resolve) => setImmediate(resolve))
await waitForDispatcherTick()
}

return { bridge, subscribeCalls, readFileCalls, sent, listAgentsCalls, emit }
Expand Down Expand Up @@ -473,16 +499,14 @@ test('slack backfill and malformed nested message paths are not injected', async
occurredAt: '2026-06-05T14:14:57.314Z'
})
assert.deepEqual(harness.sent, [])
await waitForDropped('project-1', 1)
assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 1)
assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 0)

await harness.emit(changeEvent(
'/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/meta.json',
'slack'
))
assert.deepEqual(harness.sent, [])
await waitForDropped('project-1', 2)
assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 2)
assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 0)
})

test('slack context resolves with history off through one targeted remote preview', async () => {
Expand Down Expand Up @@ -997,7 +1021,7 @@ test('integration event delivery is quiet by default while counters remain avail
eventsReceived: 3,
eventsInjected: 1,
eventsCoalesced: 0,
eventsDropped: 2,
eventsDropped: 1,
queueDepth: 0,
mountCount: 0
})
Expand Down Expand Up @@ -1057,6 +1081,7 @@ test('integration event delivery failures use aggregated warn cadence by default
for (let index = 1; index <= 26; index += 1) {
await harness.emit(changeEvent(`/github/repos/acme/widgets-${index}.json`, 'github'))
}
await waitUntil(() => warnCalls.length === 2)
} finally {
console.debug = originalDebug
console.warn = originalWarn
Expand All @@ -1081,6 +1106,161 @@ test('integration event delivery failures use aggregated warn cadence by default
})
})

test('integration event dispatcher compacts large bursts into a bounded summary', async () => {
const harness = makeHarness(['alice'])

await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
scope: { notifyAgents: ['alice'] }
})
])

for (let index = 0; index < 1_000; index += 1) {
harness.subscribeCalls[0].onChange(changeEvent(
`/slack/channels/C123ABC__proj-cloud/messages/1780607${String(index).padStart(4, '0')}/meta.json`,
'slack',
{ revision: `rev-${index}` }
))
}

await waitUntil(
() => harness.sent.some((message) => /950 Slack messages changed in #proj-cloud/u.test(message.input.text)),
5_000
)

assert.equal(harness.sent.length, 51)
assert.ok(harness.sent.length < 1_000)
const summary = harness.sent.find((message) => /Slack messages changed/u.test(message.input.text))
assert.ok(summary)
assert.match(summary.input.text, /950 Slack messages changed in #proj-cloud/u)
assert.equal(summary.input.data?.eventType, 'relayfile.changed.summary')
assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], {
eventsReceived: 1_000,
eventsInjected: 51,
eventsCoalesced: 950,
eventsDropped: 0,
queueDepth: 0,
mountCount: 0
})
})

test('integration event dispatcher filters noise before queue admission', async () => {
const harness = makeHarness(['alice'])

await withMockedNow('2026-06-05T15:50:00.000Z', async () => {
await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
scope: { notifyAgents: ['alice'] }
})
])

for (let index = 0; index < 1_000; index += 1) {
harness.subscribeCalls[0].onChange(changeEvent(
`/slack/channels/C123ABC__proj-cloud/messages/1780607${String(index).padStart(4, '0')}/.meta.json.tmp-${index}`,
'slack',
{ revision: `tmp-${index}` }
))
}
harness.subscribeCalls[0].onChange(changeEvent(
'/slack/channels/C123ABC__proj-cloud/messages/1780674600_000000/meta.json',
'slack',
{ revision: 'real-change' }
))
})

await waitUntil(() => harness.sent.length === 1)

assert.match(harness.sent[0].input.text, /Relayfile path: \/slack\/channels\/C123ABC__proj-cloud\/messages\/1780674600_000000\/meta\.json/u)
assert.doesNotMatch(harness.sent[0].input.text, /Slack messages changed/u)
assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], {
eventsReceived: 1_001,
eventsInjected: 1,
eventsCoalesced: 0,
eventsDropped: 0,
queueDepth: 0,
mountCount: 0
})
})

test('integration event dispatcher coalesces rapid distinct revisions for the same path', async () => {
const harness = makeHarness(['alice'])

await harness.bridge.reconcile('project-1', [
integration({
provider: 'github',
integrationId: 'github-1',
mountPaths: ['/github/repos'],
scope: { notifyAgents: ['alice'] }
})
])

for (let index = 0; index < 10; index += 1) {
harness.subscribeCalls[0].onChange(changeEvent(
'/github/repos/acme/widgets.json',
'github',
{ revision: `rev-${index}` }
))
}

await waitUntil(() => harness.sent.length === 1)

assert.equal(harness.sent[0].input.data?.eventId, 'evt:/github/repos/acme/widgets.json')
assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], {
eventsReceived: 10,
eventsInjected: 1,
eventsCoalesced: 9,
eventsDropped: 0,
queueDepth: 0,
mountCount: 0
})
})

test('integration event fanout sends to recipients sequentially', async () => {
let maxActiveSends = 0
const harness = makeHarness(
Array.from({ length: 12 }, (_, index) => `agent-${index}`),
{
sendDelayMs: 2,
onSendStart: (activeSends) => {
maxActiveSends = Math.max(maxActiveSends, activeSends)
}
}
)

await harness.bridge.reconcile('project-1', [
integration({
provider: 'linear',
integrationId: 'linear-1',
mountPaths: ['/linear/issues']
})
])

await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear'))
await waitUntil(() => harness.sent.length === 12)

assert.equal(maxActiveSends, 1)
assert.deepEqual(harness.sent.map((message) => message.input.to), [
'agent-0',
'agent-1',
'agent-10',
'agent-11',
'agent-2',
'agent-3',
'agent-4',
'agent-5',
'agent-6',
'agent-7',
'agent-8',
'agent-9'
])
})

test('integration event telemetry records coalescing and queue depth callbacks', async () => {
const harness = makeHarness(['alice'])

Expand Down
Loading
Loading