From af877c156dbea56d5fae49155de30417877bd7b9 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 5 Jun 2026 18:12:36 +0200 Subject: [PATCH] Cache integration event recipients and pace broker sends --- .../integration-event-bridge.test.ts | 169 +++++++++- src/main/integration-event-bridge.ts | 288 ++++++++++++++++-- src/main/ipc-handlers.ts | 11 +- src/shared/types/ipc.ts | 3 + 4 files changed, 424 insertions(+), 47 deletions(-) diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 854f2239..6cba3071 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -130,6 +130,7 @@ function makeHarness( } sendDelayMs?: number onSendStart?: (activeSends: number) => void + waitForDeliveryNeverSettles?: boolean } = {} ): { bridge: IntegrationEventBridge @@ -137,12 +138,14 @@ function makeHarness( readFileCalls: Array<{ workspaceId: string; path: string }> sent: SentMessage[] listAgentsCalls: string[] + deliveryConfirmationCalls: SentMessage[] emit(event: ChangeEvent): Promise } { const subscribeCalls: SubscribeCall[] = [] const readFileCalls: Array<{ workspaceId: string; path: string }> = [] const sent: SentMessage[] = [] const listAgentsCalls: string[] = [] + const deliveryConfirmationCalls: SentMessage[] = [] const subscriptions: Subscription[] = [] let activeSends = 0 @@ -186,7 +189,13 @@ function makeHarness( } finally { activeSends -= 1 } - } + }, + sendMessageAndWaitForDelivery: options.waitForDeliveryNeverSettles + ? async (projectId, input) => { + deliveryConfirmationCalls.push({ projectId, input }) + await new Promise(() => undefined) + } + : undefined } }) @@ -196,7 +205,7 @@ function makeHarness( await waitForDispatcherTick() } - return { bridge, subscribeCalls, readFileCalls, sent, listAgentsCalls, emit } + return { bridge, subscribeCalls, readFileCalls, sent, listAgentsCalls, deliveryConfirmationCalls, emit } } beforeEach(() => { @@ -204,8 +213,8 @@ beforeEach(() => { delete process.env.PEAR_INTEGRATION_EVENTS_DEBUG }) -async function waitForSent(harness: { sent: SentMessage[] }, count: number): Promise { - const deadline = Date.now() + 1_000 +async function waitForSent(harness: { sent: SentMessage[] }, count: number, timeoutMs = 1_000): Promise { + const deadline = Date.now() + timeoutMs while (harness.sent.length < count && Date.now() < deadline) { await new Promise((resolve) => setTimeout(resolve, 10)) } @@ -240,10 +249,12 @@ test('integration events route only to the targets for the matching integration assert.equal(harness.subscribeCalls[0].options?.from, 'now') await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) + await waitForSent(harness, 1) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) harness.sent.splice(0) await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear')) + await waitForSent(harness, 2) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'bob']) }) @@ -926,6 +937,7 @@ test('generic provider agent scope keys are not treated as notification targets' ]) await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) + await waitForSent(harness, 2) assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'bob']) }) @@ -1033,6 +1045,7 @@ test('integration event delivery is quiet by default while counters remain avail await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) await harness.emit(changeEvent('/github/repos/_index.json', 'github')) await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) + await waitForSent(harness, 1) } finally { console.debug = originalDebug console.info = originalInfo @@ -1045,8 +1058,11 @@ test('integration event delivery is quiet by default while counters remain avail eventsInjected: 1, eventsCoalesced: 0, eventsDropped: 1, + brokerSends: 1, + brokerSendsDeferred: 0, queueDepth: 0, - mountCount: 0 + mountCount: 0, + brokerSendQueueDepth: 0 }) }) @@ -1070,6 +1086,7 @@ test('integration event debug flag enables verbose delivery logs', async () => { ]) await harness.emit(changeEvent('/github/repos/acme/widgets.json', 'github')) + await waitForSent(harness, 1) } finally { console.debug = originalDebug } @@ -1124,8 +1141,11 @@ test('integration event delivery failures use aggregated warn cadence by default eventsInjected: 0, eventsCoalesced: 0, eventsDropped: 0, + brokerSends: 0, + brokerSendsDeferred: 0, queueDepth: 0, - mountCount: 0 + mountCount: 0, + brokerSendQueueDepth: 0 }) }) @@ -1160,13 +1180,19 @@ test('integration event dispatcher compacts large bursts into a bounded summary' assert.ok(summary) assert.match(summary.input.text, /950 Slack messages changed in #proj-cloud/u) assert.equal(summary.input.data?.eventType, 'relayfile.changed.summary') - assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], { + const telemetry = getIntegrationEventTelemetrySnapshot().projects['project-1'] + assert.ok(telemetry) + assert.equal(telemetry.brokerSendsDeferred >= 0, true) + assert.deepEqual({ ...telemetry, brokerSendsDeferred: 0 }, { eventsReceived: 1_000, eventsInjected: 51, eventsCoalesced: 950, eventsDropped: 0, + brokerSends: 51, + brokerSendsDeferred: 0, queueDepth: 0, - mountCount: 0 + mountCount: 0, + brokerSendQueueDepth: 0 }) }) @@ -1206,8 +1232,11 @@ test('integration event dispatcher filters noise before queue admission', async eventsInjected: 1, eventsCoalesced: 0, eventsDropped: 0, + brokerSends: 1, + brokerSendsDeferred: 0, queueDepth: 0, - mountCount: 0 + mountCount: 0, + brokerSendQueueDepth: 0 }) }) @@ -1239,8 +1268,11 @@ test('integration event dispatcher coalesces rapid distinct revisions for the sa eventsInjected: 1, eventsCoalesced: 9, eventsDropped: 0, + brokerSends: 1, + brokerSendsDeferred: 0, queueDepth: 0, - mountCount: 0 + mountCount: 0, + brokerSendQueueDepth: 0 }) }) @@ -1284,6 +1316,118 @@ test('integration event fanout sends to recipients sequentially', async () => { ]) }) +test('integration event recipient cache avoids listAgents per event during bursts', async () => { + const harness = makeHarness(['alice']) + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'github', + integrationId: 'github-1', + mountPaths: ['/github/repos'], + scope: { notifyAgents: ['alice'] } + }) + ]) + + for (let index = 0; index < 10; index += 1) { + harness.subscribeCalls[0].onChange(changeEvent( + `/github/repos/acme/widgets-${index}.json`, + 'github', + { revision: `rev-${index}` } + )) + } + await waitUntil(() => harness.sent.length === 10) + + assert.deepEqual(harness.listAgentsCalls, ['project-1']) + assert.deepEqual(harness.sent.map((message) => message.input.to), Array(10).fill('alice')) +}) + +test('integration event agent cache invalidates for newly spawned agents and expires briefly', async () => { + const agents = ['alice'] + const harness = makeHarness(agents) + let now = Date.parse('2026-06-05T14:00:00.000Z') + const originalDateNow = Date.now + Date.now = () => now + + try { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'linear', + integrationId: 'linear-1', + mountPaths: ['/linear/issues'] + }) + ]) + + await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear', { revision: 'rev-1' })) + await waitForSent(harness, 1) + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice']) + assert.deepEqual(harness.listAgentsCalls, ['project-1']) + + agents.push('bob') + harness.bridge.invalidateProjectAgentCache('project-1') + await harness.emit(changeEvent('/linear/issues/AR-2.json', 'linear', { revision: 'rev-2' })) + await waitForSent(harness, 3) + assert.deepEqual(harness.sent.slice(1).map((message) => message.input.to), ['alice', 'bob']) + assert.deepEqual(harness.listAgentsCalls, ['project-1', 'project-1']) + + agents.push('carol') + now += 2_001 + await harness.emit(changeEvent('/linear/issues/AR-3.json', 'linear', { revision: 'rev-3' })) + await waitForSent(harness, 6) + assert.deepEqual(harness.sent.slice(3).map((message) => message.input.to), ['alice', 'bob', 'carol']) + assert.deepEqual(harness.listAgentsCalls, ['project-1', 'project-1', 'project-1']) + } finally { + Date.now = originalDateNow + } +}) + +test('integration event broker sends are paced per project across many recipients', async () => { + const sendStartedAt: number[] = [] + const harness = makeHarness( + Array.from({ length: 26 }, (_, index) => `agent-${index}`), + { + onSendStart: () => { + sendStartedAt.push(Date.now()) + } + } + ) + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'linear', + integrationId: 'linear-1', + mountPaths: ['/linear/issues'] + }) + ]) + + await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear')) + await waitForSent(harness, 26, 2_500) + + assert.equal(harness.sent.length, 26) + assert.ok(sendStartedAt[25] - sendStartedAt[0] >= 900) + const telemetry = getIntegrationEventTelemetrySnapshot().projects['project-1'] + assert.equal(telemetry?.brokerSends, 26) + assert.equal(telemetry?.brokerSendsDeferred, 1) + assert.equal(telemetry?.brokerSendQueueDepth, 0) +}) + +test('integration event broker pacing does not wait on delivery confirmation path', async () => { + const harness = makeHarness(['alice', 'bob'], { waitForDeliveryNeverSettles: true }) + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'linear', + integrationId: 'linear-1', + mountPaths: ['/linear/issues'] + }) + ]) + + await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear')) + await waitForSent(harness, 2) + + assert.equal(harness.deliveryConfirmationCalls.length, 0) + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'bob']) +}) + test('integration event telemetry records coalescing and queue depth callbacks', async () => { const harness = makeHarness(['alice']) @@ -1304,7 +1448,10 @@ test('integration event telemetry records coalescing and queue depth callbacks', eventsInjected: 0, eventsCoalesced: 1, eventsDropped: 0, + brokerSends: 0, + brokerSendsDeferred: 0, queueDepth: 7, - mountCount: 0 + mountCount: 0, + brokerSendQueueDepth: 0 }) }) diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 59cd771b..80a4de6d 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -36,9 +36,17 @@ const MAX_EVENT_CONTEXT_PREVIEW_BYTES = 32 * 1024 const MAX_DISPATCH_QUEUE_EVENTS = 50 const MAX_DISPATCH_SUMMARY_GROUPS = 10 const MAX_DISPATCHED_EVENTS_PER_SECOND = 25 - -type IntegrationEventCounterName = 'eventsReceived' | 'eventsInjected' | 'eventsCoalesced' | 'eventsDropped' -type IntegrationEventGaugeName = 'queueDepth' | 'mountCount' +const PROJECT_AGENT_RECIPIENT_CACHE_TTL_MS = 2_000 +const MAX_BROKER_SENDS_PER_SECOND = 25 + +type IntegrationEventCounterName = + | 'eventsReceived' + | 'eventsInjected' + | 'eventsCoalesced' + | 'eventsDropped' + | 'brokerSends' + | 'brokerSendsDeferred' +type IntegrationEventGaugeName = 'queueDepth' | 'mountCount' | 'brokerSendQueueDepth' type WatchRegistration = { glob: string @@ -89,6 +97,24 @@ type DispatchSummary = { latestEvent: ChangeEvent } +type BrokerMessageInput = Parameters[1] + +type ProjectAgentRecipientCacheEntry = { + agents: string[] + expiresAt: number + pending?: Promise +} + +type CachedSpecTargets = { + agents: string[] + channels: string[] +} + +type NotificationTargetCacheEntry = { + specs: CachedSpecTargets[] + needsProjectAgents: boolean +} + type LocalMountSubscription = Subscription & { localRoots: string[] } @@ -181,8 +207,11 @@ function emptyIntegrationEventCounters(): IntegrationEventTelemetryCounters { eventsInjected: 0, eventsCoalesced: 0, eventsDropped: 0, + brokerSends: 0, + brokerSendsDeferred: 0, queueDepth: 0, - mountCount: 0 + mountCount: 0, + brokerSendQueueDepth: 0 } } @@ -1055,6 +1084,15 @@ function dispatchSummaryKey(summary: DispatchSummary): string { return `${summary.provider}:${summary.groupPath}` } +function notificationTargetCacheKey(matchedSpecs: SubscriptionSpec[]): string { + return matchedSpecs.map((spec) => JSON.stringify({ + integrationId: spec.integrationId, + provider: spec.provider, + agents: spec.targets.agents, + channels: spec.targets.channels + })).join('|') +} + function compactedSummaryTitle(summary: DispatchSummary): string { const providerLabel = summary.provider.charAt(0).toUpperCase() + summary.provider.slice(1) if (summary.provider === 'slack') { @@ -1531,16 +1569,128 @@ class ProjectEventDispatcher { } } +class ProjectBrokerSendPacer { + private readonly queue: Array<{ + input: BrokerMessageInput + resolve: () => void + reject: (error: unknown) => void + }> = [] + private readonly projectId: string + private readonly send: (input: BrokerMessageInput) => Promise + private drainTimer: ReturnType | null = null + private draining = false + private active = true + private windowStartedAt = 0 + private sentInWindow = 0 + + constructor(projectId: string, send: (input: BrokerMessageInput) => Promise) { + this.projectId = projectId + this.send = send + } + + enqueue(input: BrokerMessageInput): Promise { + if (!this.active) return Promise.resolve() + const deferred = this.queue.length > 0 || this.nextRateLimitDelayMs() > 0 || this.draining + if (deferred) incrementIntegrationEventCounter(this.projectId, 'brokerSendsDeferred') + return new Promise((resolveSend, rejectSend) => { + this.queue.push({ input, resolve: resolveSend, reject: rejectSend }) + this.updateDepthGauge() + this.scheduleDrain(0) + }) + } + + dispose(): void { + this.active = false + if (this.drainTimer) clearTimeout(this.drainTimer) + this.drainTimer = null + const error = new Error('integration event broker send pacer disposed') + for (const item of this.queue.splice(0)) item.reject(error) + this.updateDepthGauge() + } + + private scheduleDrain(delayMs: number): void { + if (!this.active || this.drainTimer || this.draining) return + this.drainTimer = setTimeout(() => { + this.drainTimer = null + void this.drain() + }, delayMs) + } + + private async drain(): Promise { + if (!this.active || this.draining) return + this.draining = true + try { + while (this.active && this.queue.length > 0) { + const waitMs = this.nextRateLimitDelayMs() + if (waitMs > 0) { + this.scheduleDrain(waitMs) + return + } + + const item = this.queue.shift() + if (!item) continue + this.updateDepthGauge() + this.sentInWindow += 1 + try { + await this.send(item.input) + incrementIntegrationEventCounter(this.projectId, 'brokerSends') + item.resolve() + } catch (error) { + item.reject(error) + } + } + } finally { + this.draining = false + this.updateDepthGauge() + if (this.active && this.queue.length > 0) { + this.scheduleDrain(this.nextRateLimitDelayMs()) + } + } + } + + private nextRateLimitDelayMs(): number { + const now = Date.now() + if (this.windowStartedAt === 0 || now - this.windowStartedAt >= 1_000) { + this.windowStartedAt = now + this.sentInWindow = 0 + } + if (this.sentInWindow < MAX_BROKER_SENDS_PER_SECOND) return 0 + return Math.max(1, 1_000 - (now - this.windowStartedAt)) + } + + private updateDepthGauge(): void { + setIntegrationEventGauge(this.projectId, 'brokerSendQueueDepth', this.queue.length) + } +} + export class IntegrationEventBridge { private subscriptions = new Map() private dispatchers = new Map() private recentInjections = new Map() + private projectAgentRecipientCache = new Map() + private notificationTargetCache = new Map() + private brokerSendPacers = new Map() private readonly deps: IntegrationEventBridgeDeps constructor(deps: IntegrationEventBridgeDeps = {}) { this.deps = deps } + invalidateProjectAgentCache(projectId?: string): void { + if (projectId) { + this.projectAgentRecipientCache.delete(projectId) + return + } + this.projectAgentRecipientCache.clear() + } + + private invalidateNotificationTargetCache(projectId: string): void { + const prefix = `${projectId}:` + for (const key of Array.from(this.notificationTargetCache.keys())) { + if (key.startsWith(prefix)) this.notificationTargetCache.delete(key) + } + } + async reconcile(projectId: string, integrations: ConnectedIntegration[]): Promise { const subscribed = integrations.filter((integration) => integration.subscribeAgent === true) if (subscribed.length === 0) { @@ -1574,6 +1724,7 @@ export class IntegrationEventBridge { }) if (this.subscriptions.get(projectId)?.signature === signature) return + this.invalidateNotificationTargetCache(projectId) await this.close(projectId) const subscriptions: Subscription[] = [] try { @@ -1691,8 +1842,13 @@ export class IntegrationEventBridge { this.subscriptions.delete(projectId) this.dispatchers.get(projectId)?.dispose() this.dispatchers.delete(projectId) + this.brokerSendPacers.get(projectId)?.dispose() + this.brokerSendPacers.delete(projectId) + this.invalidateProjectAgentCache(projectId) + this.invalidateNotificationTargetCache(projectId) setIntegrationEventGauge(projectId, 'queueDepth', 0) setIntegrationEventGauge(projectId, 'mountCount', 0) + setIntegrationEventGauge(projectId, 'brokerSendQueueDepth', 0) if (!subscription) return await Promise.all(subscription.subscriptions.map((entry) => entry.unsubscribe().catch(() => undefined))) } @@ -1796,31 +1952,7 @@ export class IntegrationEventBridge { } const bridge = await this.bridge() - let allProjectAgents: string[] | null = null - const recipients: string[] = [] - const listProjectAgents = async (): Promise => { - allProjectAgents ??= (await bridge.listAgents(projectId)) - .filter((agent) => agent.projectId === undefined || agent.projectId === projectId) - .map((agent) => agent.name) - return allProjectAgents - } - - for (const spec of matchedSpecs) { - const projectAgents = spec.targets.agents.length > 0 - ? await listProjectAgents() - : null - const onlineExplicitAgents = projectAgents - ? spec.targets.agents.filter((agent) => projectAgents.includes(agent)) - : [] - const explicitTargets = dedupeStrings([...onlineExplicitAgents, ...spec.targets.channels]) - if (explicitTargets.length === 0) { - recipients.push(...await listProjectAgents()) - } else { - recipients.push(...explicitTargets) - } - } - - const uniqueRecipients = dedupeStrings(recipients) + const uniqueRecipients = await this.recipientsForMatchedSpecs(projectId, matchedSpecs, bridge) if (uniqueRecipients.length === 0) { incrementIntegrationEventCounter(projectId, 'eventsDropped') logIntegrationEvent('skipped no recipients', { @@ -1859,13 +1991,103 @@ export class IntegrationEventBridge { ...eventMetadata } } as const - if (bridge.sendMessageAndWaitForDelivery) { - await bridge.sendMessageAndWaitForDelivery(projectId, input) + await this.sendBrokerMessage(projectId, input, bridge) + } + incrementIntegrationEventCounter(projectId, 'eventsInjected') + } + + private async recipientsForMatchedSpecs( + projectId: string, + matchedSpecs: SubscriptionSpec[], + bridge: BrokerEventBridge + ): Promise { + const targetGroups = this.notificationTargetsFor(projectId, matchedSpecs) + const projectAgents = targetGroups.needsProjectAgents + ? await this.listProjectAgentsCached(projectId, bridge) + : null + const recipients: string[] = [] + + for (const targets of targetGroups.specs) { + const onlineExplicitAgents = projectAgents + ? targets.agents.filter((agent) => projectAgents.includes(agent)) + : [] + const explicitTargets = dedupeStrings([...onlineExplicitAgents, ...targets.channels]) + if (explicitTargets.length === 0) { + recipients.push(...(projectAgents ?? await this.listProjectAgentsCached(projectId, bridge))) } else { - await bridge.sendMessage(projectId, input) + recipients.push(...explicitTargets) } } - incrementIntegrationEventCounter(projectId, 'eventsInjected') + + return dedupeStrings(recipients) + } + + private notificationTargetsFor(projectId: string, matchedSpecs: SubscriptionSpec[]): NotificationTargetCacheEntry { + const key = `${projectId}:${notificationTargetCacheKey(matchedSpecs)}` + const cached = this.notificationTargetCache.get(key) + if (cached) return cached + + const specs = matchedSpecs.map((spec) => ({ + agents: spec.targets.agents, + channels: spec.targets.channels + })) + const needsProjectAgents = specs.some((targets) => + targets.agents.length > 0 || (targets.agents.length === 0 && targets.channels.length === 0) + ) + const entry = { specs, needsProjectAgents } + this.notificationTargetCache.set(key, entry) + return entry + } + + private async listProjectAgentsCached(projectId: string, bridge: BrokerEventBridge): Promise { + const now = Date.now() + const cached = this.projectAgentRecipientCache.get(projectId) + if (cached?.pending) return cached.pending + if (cached && cached.expiresAt > now) return cached.agents + + const pending = bridge.listAgents(projectId) + .then((agents) => dedupeStrings( + agents + .filter((agent) => agent.projectId === undefined || agent.projectId === projectId) + .map((agent) => agent.name) + )) + .then((agents) => { + // If an explicit invalidation races with this in-flight refresh, the + // short TTL bounds how long the older roster can be reused. + this.projectAgentRecipientCache.set(projectId, { + agents, + expiresAt: Date.now() + PROJECT_AGENT_RECIPIENT_CACHE_TTL_MS + }) + return agents + }) + .catch((error) => { + this.projectAgentRecipientCache.delete(projectId) + throw error + }) + this.projectAgentRecipientCache.set(projectId, { + agents: cached?.agents ?? [], + expiresAt: 0, + pending + }) + return pending + } + + private async sendBrokerMessage( + projectId: string, + input: BrokerMessageInput, + bridge: BrokerEventBridge + ): Promise { + let pacer = this.brokerSendPacers.get(projectId) + if (!pacer) { + // Integration-event delivery is paced on broker send acceptance. Waiting + // for delivery confirmation here can head-of-line block every later event + // behind a slow agent receipt path. + pacer = new ProjectBrokerSendPacer(projectId, (message) => + Promise.resolve(bridge.sendMessage(projectId, message)) + ) + this.brokerSendPacers.set(projectId, pacer) + } + await pacer.enqueue(input) } private wasRecentlyInjected(key: string, ttlMs = RECENT_INJECTION_TTL_MS): boolean { diff --git a/src/main/ipc-handlers.ts b/src/main/ipc-handlers.ts index 482c4742..87ef44d7 100644 --- a/src/main/ipc-handlers.ts +++ b/src/main/ipc-handlers.ts @@ -22,7 +22,7 @@ import * as auth from './auth' import { cloudAgentManager } from './cloud-agent' import { proactiveAgentManager } from './proactive-agent' import { integrationsManager } from './integrations' -import { getIntegrationEventTelemetrySnapshot } from './integration-event-bridge' +import { getIntegrationEventTelemetrySnapshot, integrationEventBridge } from './integration-event-bridge' import { aiHistManager } from './ai-hist' import { burnManager, type BurnAgentInput, type BurnProjectInput, type BurnSessionBreakdownInput, type BurnFingerprintInput } from './burn' import { resetRelayWorkspaceManager } from './relay-workspace' @@ -210,7 +210,9 @@ export function registerIpcHandlers(): void { }) ipcMain.handle('broker:spawn-agent', async (_, projectId: string, input: SpawnPtyInput & { broker?: 'local' | 'cloud' }) => { - return brokerManager.spawnAgent(projectId, input) + const result = await brokerManager.spawnAgent(projectId, input) + integrationEventBridge.invalidateProjectAgentCache(projectId) + return result }) ipcMain.handle('broker:list-personas', async (_, projectId: string) => { @@ -218,7 +220,9 @@ export function registerIpcHandlers(): void { }) ipcMain.handle('broker:spawn-persona', async (_, projectId: string, personaId: string) => { - return brokerManager.spawnPersona(projectId, personaId) + const result = await brokerManager.spawnPersona(projectId, personaId) + integrationEventBridge.invalidateProjectAgentCache(projectId) + return result }) ipcMain.handle('broker:attach-terminal', async (_, input: { @@ -286,6 +290,7 @@ export function registerIpcHandlers(): void { ipcMain.handle('broker:release-agent', async (_, projectId: string | undefined, name: string) => { await brokerManager.releaseAgent(projectId, name) + integrationEventBridge.invalidateProjectAgentCache(projectId) }) ipcMain.handle('broker:list-agents', async (_, projectId?: string) => { diff --git a/src/shared/types/ipc.ts b/src/shared/types/ipc.ts index a0f5ff37..f7e3d938 100644 --- a/src/shared/types/ipc.ts +++ b/src/shared/types/ipc.ts @@ -714,8 +714,11 @@ export type IntegrationEventTelemetryCounters = { eventsInjected: number eventsCoalesced: number eventsDropped: number + brokerSends: number + brokerSendsDeferred: number queueDepth: number mountCount: number + brokerSendQueueDepth: number } export type IntegrationEventTelemetrySnapshot = {