-
Notifications
You must be signed in to change notification settings - Fork 1
Cache integration event recipients and pace broker sends #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -130,19 +130,22 @@ function makeHarness( | |||||||||
| } | ||||||||||
| sendDelayMs?: number | ||||||||||
| onSendStart?: (activeSends: number) => void | ||||||||||
| waitForDeliveryNeverSettles?: boolean | ||||||||||
| } = {} | ||||||||||
| ): { | ||||||||||
| bridge: IntegrationEventBridge | ||||||||||
| subscribeCalls: SubscribeCall[] | ||||||||||
| readFileCalls: Array<{ workspaceId: string; path: string }> | ||||||||||
| sent: SentMessage[] | ||||||||||
| listAgentsCalls: string[] | ||||||||||
| deliveryConfirmationCalls: SentMessage[] | ||||||||||
| emit(event: ChangeEvent): Promise<void> | ||||||||||
| } { | ||||||||||
| const subscribeCalls: SubscribeCall[] = [] | ||||||||||
| const readFileCalls: Array<{ workspaceId: string; path: string }> = [] | ||||||||||
| const sent: SentMessage[] = [] | ||||||||||
| const listAgentsCalls: string[] = [] | ||||||||||
| const deliveryConfirmationCalls: SentMessage[] = [] | ||||||||||
| const subscriptions: Subscription[] = [] | ||||||||||
| let activeSends = 0 | ||||||||||
|
|
||||||||||
|
|
@@ -186,7 +189,13 @@ function makeHarness( | |||||||||
| } finally { | ||||||||||
| activeSends -= 1 | ||||||||||
| } | ||||||||||
| } | ||||||||||
| }, | ||||||||||
| sendMessageAndWaitForDelivery: options.waitForDeliveryNeverSettles | ||||||||||
| ? async (projectId, input) => { | ||||||||||
| deliveryConfirmationCalls.push({ projectId, input }) | ||||||||||
| await new Promise(() => undefined) | ||||||||||
| } | ||||||||||
| : undefined | ||||||||||
| } | ||||||||||
| }) | ||||||||||
|
|
||||||||||
|
|
@@ -196,16 +205,16 @@ function makeHarness( | |||||||||
| await waitForDispatcherTick() | ||||||||||
| } | ||||||||||
|
|
||||||||||
| return { bridge, subscribeCalls, readFileCalls, sent, listAgentsCalls, emit } | ||||||||||
| return { bridge, subscribeCalls, readFileCalls, sent, listAgentsCalls, deliveryConfirmationCalls, emit } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| beforeEach(() => { | ||||||||||
| resetIntegrationEventTelemetryForTests() | ||||||||||
| delete process.env.PEAR_INTEGRATION_EVENTS_DEBUG | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| async function waitForSent(harness: { sent: SentMessage[] }, count: number): Promise<void> { | ||||||||||
| const deadline = Date.now() + 1_000 | ||||||||||
| async function waitForSent(harness: { sent: SentMessage[] }, count: number, timeoutMs = 1_000): Promise<void> { | ||||||||||
| const deadline = Date.now() + timeoutMs | ||||||||||
| while (harness.sent.length < count && Date.now() < deadline) { | ||||||||||
|
Comment on lines
+217
to
218
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Prompt for AI agents
Suggested change
|
||||||||||
| await new Promise((resolve) => setTimeout(resolve, 10)) | ||||||||||
| } | ||||||||||
|
|
@@ -240,10 +249,12 @@ test('integration events route only to the targets for the matching integration | |||||||||
| assert.equal(harness.subscribeCalls[0].options?.from, 'now') | ||||||||||
|
|
||||||||||
| await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) | ||||||||||
| await waitForSent(harness, 1) | ||||||||||
| assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) | ||||||||||
|
|
||||||||||
| harness.sent.splice(0) | ||||||||||
| await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear')) | ||||||||||
| await waitForSent(harness, 2) | ||||||||||
| assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'bob']) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
|
|
@@ -926,6 +937,7 @@ test('generic provider agent scope keys are not treated as notification targets' | |||||||||
| ]) | ||||||||||
|
|
||||||||||
| await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) | ||||||||||
| await waitForSent(harness, 2) | ||||||||||
|
|
||||||||||
| assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'bob']) | ||||||||||
| }) | ||||||||||
|
|
@@ -1033,6 +1045,7 @@ test('integration event delivery is quiet by default while counters remain avail | |||||||||
| await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) | ||||||||||
| await harness.emit(changeEvent('/github/repos/_index.json', 'github')) | ||||||||||
| await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) | ||||||||||
| await waitForSent(harness, 1) | ||||||||||
| } finally { | ||||||||||
| console.debug = originalDebug | ||||||||||
| console.info = originalInfo | ||||||||||
|
|
@@ -1045,8 +1058,11 @@ test('integration event delivery is quiet by default while counters remain avail | |||||||||
| eventsInjected: 1, | ||||||||||
| eventsCoalesced: 0, | ||||||||||
| eventsDropped: 1, | ||||||||||
| brokerSends: 1, | ||||||||||
| brokerSendsDeferred: 0, | ||||||||||
| queueDepth: 0, | ||||||||||
| mountCount: 0 | ||||||||||
| mountCount: 0, | ||||||||||
| brokerSendQueueDepth: 0 | ||||||||||
| }) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
|
|
@@ -1070,6 +1086,7 @@ test('integration event debug flag enables verbose delivery logs', async () => { | |||||||||
| ]) | ||||||||||
|
|
||||||||||
| await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) | ||||||||||
| await waitForSent(harness, 1) | ||||||||||
| } finally { | ||||||||||
| console.debug = originalDebug | ||||||||||
| } | ||||||||||
|
|
@@ -1124,8 +1141,11 @@ test('integration event delivery failures use aggregated warn cadence by default | |||||||||
| eventsInjected: 0, | ||||||||||
| eventsCoalesced: 0, | ||||||||||
| eventsDropped: 0, | ||||||||||
| brokerSends: 0, | ||||||||||
| brokerSendsDeferred: 0, | ||||||||||
| queueDepth: 0, | ||||||||||
| mountCount: 0 | ||||||||||
| mountCount: 0, | ||||||||||
| brokerSendQueueDepth: 0 | ||||||||||
| }) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
|
|
@@ -1160,13 +1180,19 @@ test('integration event dispatcher compacts large bursts into a bounded summary' | |||||||||
| assert.ok(summary) | ||||||||||
| assert.match(summary.input.text, /950 Slack messages changed in #proj-cloud/u) | ||||||||||
| assert.equal(summary.input.data?.eventType, 'relayfile.changed.summary') | ||||||||||
| assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], { | ||||||||||
| const telemetry = getIntegrationEventTelemetrySnapshot().projects['project-1'] | ||||||||||
| assert.ok(telemetry) | ||||||||||
| assert.equal(telemetry.brokerSendsDeferred >= 0, true) | ||||||||||
| assert.deepEqual({ ...telemetry, brokerSendsDeferred: 0 }, { | ||||||||||
| eventsReceived: 1_000, | ||||||||||
| eventsInjected: 51, | ||||||||||
| eventsCoalesced: 950, | ||||||||||
| eventsDropped: 0, | ||||||||||
| brokerSends: 51, | ||||||||||
| brokerSendsDeferred: 0, | ||||||||||
| queueDepth: 0, | ||||||||||
| mountCount: 0 | ||||||||||
| mountCount: 0, | ||||||||||
| brokerSendQueueDepth: 0 | ||||||||||
| }) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
|
|
@@ -1206,8 +1232,11 @@ test('integration event dispatcher filters noise before queue admission', async | |||||||||
| eventsInjected: 1, | ||||||||||
| eventsCoalesced: 0, | ||||||||||
| eventsDropped: 0, | ||||||||||
| brokerSends: 1, | ||||||||||
| brokerSendsDeferred: 0, | ||||||||||
| queueDepth: 0, | ||||||||||
| mountCount: 0 | ||||||||||
| mountCount: 0, | ||||||||||
| brokerSendQueueDepth: 0 | ||||||||||
| }) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
|
|
@@ -1239,8 +1268,11 @@ test('integration event dispatcher coalesces rapid distinct revisions for the sa | |||||||||
| eventsInjected: 1, | ||||||||||
| eventsCoalesced: 9, | ||||||||||
| eventsDropped: 0, | ||||||||||
| brokerSends: 1, | ||||||||||
| brokerSendsDeferred: 0, | ||||||||||
| queueDepth: 0, | ||||||||||
| mountCount: 0 | ||||||||||
| mountCount: 0, | ||||||||||
| brokerSendQueueDepth: 0 | ||||||||||
| }) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
|
|
@@ -1284,6 +1316,118 @@ test('integration event fanout sends to recipients sequentially', async () => { | |||||||||
| ]) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| test('integration event recipient cache avoids listAgents per event during bursts', async () => { | ||||||||||
| const harness = makeHarness(['alice']) | ||||||||||
|
|
||||||||||
| await harness.bridge.reconcile('project-1', [ | ||||||||||
| integration({ | ||||||||||
| provider: 'github', | ||||||||||
| integrationId: 'github-1', | ||||||||||
| mountPaths: ['/github/repos'], | ||||||||||
| scope: { notifyAgents: ['alice'] } | ||||||||||
| }) | ||||||||||
| ]) | ||||||||||
|
|
||||||||||
| for (let index = 0; index < 10; index += 1) { | ||||||||||
| harness.subscribeCalls[0].onChange(changeEvent( | ||||||||||
| `/github/repos/acme/widgets-${index}.json`, | ||||||||||
| 'github', | ||||||||||
| { revision: `rev-${index}` } | ||||||||||
| )) | ||||||||||
| } | ||||||||||
| await waitUntil(() => harness.sent.length === 10) | ||||||||||
|
|
||||||||||
| assert.deepEqual(harness.listAgentsCalls, ['project-1']) | ||||||||||
| assert.deepEqual(harness.sent.map((message) => message.input.to), Array(10).fill('alice')) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| test('integration event agent cache invalidates for newly spawned agents and expires briefly', async () => { | ||||||||||
| const agents = ['alice'] | ||||||||||
| const harness = makeHarness(agents) | ||||||||||
| let now = Date.parse('2026-06-05T14:00:00.000Z') | ||||||||||
| const originalDateNow = Date.now | ||||||||||
| Date.now = () => now | ||||||||||
|
|
||||||||||
| try { | ||||||||||
| await harness.bridge.reconcile('project-1', [ | ||||||||||
| integration({ | ||||||||||
| provider: 'linear', | ||||||||||
| integrationId: 'linear-1', | ||||||||||
| mountPaths: ['/linear/issues'] | ||||||||||
| }) | ||||||||||
| ]) | ||||||||||
|
|
||||||||||
| await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear', { revision: 'rev-1' })) | ||||||||||
| await waitForSent(harness, 1) | ||||||||||
| assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) | ||||||||||
| assert.deepEqual(harness.listAgentsCalls, ['project-1']) | ||||||||||
|
|
||||||||||
| agents.push('bob') | ||||||||||
| harness.bridge.invalidateProjectAgentCache('project-1') | ||||||||||
| await harness.emit(changeEvent('/linear/issues/AR-2.json', 'linear', { revision: 'rev-2' })) | ||||||||||
| await waitForSent(harness, 3) | ||||||||||
| assert.deepEqual(harness.sent.slice(1).map((message) => message.input.to), ['alice', 'bob']) | ||||||||||
| assert.deepEqual(harness.listAgentsCalls, ['project-1', 'project-1']) | ||||||||||
|
|
||||||||||
| agents.push('carol') | ||||||||||
| now += 2_001 | ||||||||||
| await harness.emit(changeEvent('/linear/issues/AR-3.json', 'linear', { revision: 'rev-3' })) | ||||||||||
| await waitForSent(harness, 6) | ||||||||||
| assert.deepEqual(harness.sent.slice(3).map((message) => message.input.to), ['alice', 'bob', 'carol']) | ||||||||||
| assert.deepEqual(harness.listAgentsCalls, ['project-1', 'project-1', 'project-1']) | ||||||||||
| } finally { | ||||||||||
| Date.now = originalDateNow | ||||||||||
| } | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| test('integration event broker sends are paced per project across many recipients', async () => { | ||||||||||
| const sendStartedAt: number[] = [] | ||||||||||
| const harness = makeHarness( | ||||||||||
| Array.from({ length: 26 }, (_, index) => `agent-${index}`), | ||||||||||
| { | ||||||||||
| onSendStart: () => { | ||||||||||
| sendStartedAt.push(Date.now()) | ||||||||||
| } | ||||||||||
| } | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| await harness.bridge.reconcile('project-1', [ | ||||||||||
| integration({ | ||||||||||
| provider: 'linear', | ||||||||||
| integrationId: 'linear-1', | ||||||||||
| mountPaths: ['/linear/issues'] | ||||||||||
| }) | ||||||||||
| ]) | ||||||||||
|
|
||||||||||
| await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear')) | ||||||||||
| await waitForSent(harness, 26, 2_500) | ||||||||||
|
|
||||||||||
| assert.equal(harness.sent.length, 26) | ||||||||||
| assert.ok(sendStartedAt[25] - sendStartedAt[0] >= 900) | ||||||||||
| const telemetry = getIntegrationEventTelemetrySnapshot().projects['project-1'] | ||||||||||
| assert.equal(telemetry?.brokerSends, 26) | ||||||||||
| assert.equal(telemetry?.brokerSendsDeferred, 1) | ||||||||||
| assert.equal(telemetry?.brokerSendQueueDepth, 0) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| test('integration event broker pacing does not wait on delivery confirmation path', async () => { | ||||||||||
| const harness = makeHarness(['alice', 'bob'], { waitForDeliveryNeverSettles: true }) | ||||||||||
|
|
||||||||||
| await harness.bridge.reconcile('project-1', [ | ||||||||||
| integration({ | ||||||||||
| provider: 'linear', | ||||||||||
| integrationId: 'linear-1', | ||||||||||
| mountPaths: ['/linear/issues'] | ||||||||||
| }) | ||||||||||
| ]) | ||||||||||
|
|
||||||||||
| await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear')) | ||||||||||
| await waitForSent(harness, 2) | ||||||||||
|
|
||||||||||
| assert.equal(harness.deliveryConfirmationCalls.length, 0) | ||||||||||
| assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'bob']) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| test('integration event telemetry records coalescing and queue depth callbacks', async () => { | ||||||||||
| const harness = makeHarness(['alice']) | ||||||||||
|
|
||||||||||
|
|
@@ -1304,7 +1448,10 @@ test('integration event telemetry records coalescing and queue depth callbacks', | |||||||||
| eventsInjected: 0, | ||||||||||
| eventsCoalesced: 1, | ||||||||||
| eventsDropped: 0, | ||||||||||
| brokerSends: 0, | ||||||||||
| brokerSendsDeferred: 0, | ||||||||||
| queueDepth: 7, | ||||||||||
| mountCount: 0 | ||||||||||
| mountCount: 0, | ||||||||||
| brokerSendQueueDepth: 0 | ||||||||||
| }) | ||||||||||
| }) | ||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: This timeout helper relies on
Date.now(), but several tests monkeypatchDate.nowto a fixed value; when that happens, the deadline never advances and this loop can hang forever if messages are not sent. Use a monotonic real-time source for timeout checks (or assert with a bounded attempt count) so the wait always terminates. [logic error]Severity Level: Major⚠️
Steps of Reproduction ✅
Fix in Cursor | Fix in VSCode Claude
(Use Cmd/Ctrl + Click for best experience)
Prompt for AI Agent 🤖