diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index f10654d6..fcf93981 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -102,6 +102,21 @@ async function withMockedNow(isoTimestamp: string, fn: () => Promise): Pro } } +async function waitForDispatcherTick(): Promise { + await new Promise((resolve) => setTimeout(resolve, 0)) + await new Promise((resolve) => setImmediate(resolve)) + await new Promise((resolve) => setImmediate(resolve)) +} + +async function waitUntil(predicate: () => boolean, timeoutMs = 2_000): Promise { + const deadline = Date.now() + timeoutMs + while (Date.now() <= deadline) { + if (predicate()) return + await new Promise((resolve) => setTimeout(resolve, 10)) + } + assert.equal(predicate(), true) +} + function makeHarness( agents = ['alice', 'bob'], options: { @@ -113,6 +128,8 @@ function makeHarness( content: string encoding: 'utf-8' | 'base64' } + sendDelayMs?: number + onSendStart?: (activeSends: number) => void } = {} ): { bridge: IntegrationEventBridge @@ -127,6 +144,7 @@ function makeHarness( const sent: SentMessage[] = [] const listAgentsCalls: string[] = [] const subscriptions: Subscription[] = [] + let activeSends = 0 const bridge = new IntegrationEventBridge({ getWorkspaceHandle: async () => ({ @@ -157,8 +175,17 @@ function makeHarness( return agents.map((name) => ({ name, projectId })) }, sendMessage: async (projectId, input) => { - if (options.failSend) throw new Error('broker unavailable') - sent.push({ projectId, input }) + activeSends += 1 + options.onSendStart?.(activeSends) + try { + if (options.sendDelayMs) { + await new Promise((resolve) => setTimeout(resolve, options.sendDelayMs)) + } + if (options.failSend) throw new Error('broker unavailable') + sent.push({ projectId, input }) + } finally { + activeSends -= 1 + } } } }) @@ -166,8 +193,7 @@ function makeHarness( async function emit(event: ChangeEvent): Promise { assert.equal(subscribeCalls.length, 1, 'expected a single relayfile subscription') subscribeCalls[0].onChange(event) - await new Promise((resolve) => setImmediate(resolve)) - await new Promise((resolve) => setImmediate(resolve)) + await waitForDispatcherTick() } return { bridge, subscribeCalls, readFileCalls, sent, listAgentsCalls, emit } @@ -473,16 +499,14 @@ test('slack backfill and malformed nested message paths are not injected', async occurredAt: '2026-06-05T14:14:57.314Z' }) assert.deepEqual(harness.sent, []) - await waitForDropped('project-1', 1) - assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 1) + assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 0) await harness.emit(changeEvent( '/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/slack/channels/C123ABC__proj-cloud/messages/1780668181_544139/meta.json', 'slack' )) assert.deepEqual(harness.sent, []) - await waitForDropped('project-1', 2) - assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 2) + assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped, 0) }) test('slack context resolves with history off through one targeted remote preview', async () => { @@ -997,7 +1021,7 @@ test('integration event delivery is quiet by default while counters remain avail eventsReceived: 3, eventsInjected: 1, eventsCoalesced: 0, - eventsDropped: 2, + eventsDropped: 1, queueDepth: 0, mountCount: 0 }) @@ -1057,6 +1081,7 @@ test('integration event delivery failures use aggregated warn cadence by default for (let index = 1; index <= 26; index += 1) { await harness.emit(changeEvent(`/github/repos/acme/widgets-${index}.json`, 'github')) } + await waitUntil(() => warnCalls.length === 2) } finally { console.debug = originalDebug console.warn = originalWarn @@ -1081,6 +1106,161 @@ test('integration event delivery failures use aggregated warn cadence by default }) }) +test('integration event dispatcher compacts large bursts into a bounded summary', async () => { + const harness = makeHarness(['alice']) + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + + for (let index = 0; index < 1_000; index += 1) { + harness.subscribeCalls[0].onChange(changeEvent( + `/slack/channels/C123ABC__proj-cloud/messages/1780607${String(index).padStart(4, '0')}/meta.json`, + 'slack', + { revision: `rev-${index}` } + )) + } + + await waitUntil( + () => harness.sent.some((message) => /950 Slack messages changed in #proj-cloud/u.test(message.input.text)), + 5_000 + ) + + assert.equal(harness.sent.length, 51) + assert.ok(harness.sent.length < 1_000) + const summary = harness.sent.find((message) => /Slack messages changed/u.test(message.input.text)) + assert.ok(summary) + assert.match(summary.input.text, /950 Slack messages changed in #proj-cloud/u) + assert.equal(summary.input.data?.eventType, 'relayfile.changed.summary') + assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], { + eventsReceived: 1_000, + eventsInjected: 51, + eventsCoalesced: 950, + eventsDropped: 0, + queueDepth: 0, + mountCount: 0 + }) +}) + +test('integration event dispatcher filters noise before queue admission', async () => { + const harness = makeHarness(['alice']) + + await withMockedNow('2026-06-05T15:50:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + + for (let index = 0; index < 1_000; index += 1) { + harness.subscribeCalls[0].onChange(changeEvent( + `/slack/channels/C123ABC__proj-cloud/messages/1780607${String(index).padStart(4, '0')}/.meta.json.tmp-${index}`, + 'slack', + { revision: `tmp-${index}` } + )) + } + harness.subscribeCalls[0].onChange(changeEvent( + '/slack/channels/C123ABC__proj-cloud/messages/1780674600_000000/meta.json', + 'slack', + { revision: 'real-change' } + )) + }) + + await waitUntil(() => harness.sent.length === 1) + + assert.match(harness.sent[0].input.text, /Relayfile path: \/slack\/channels\/C123ABC__proj-cloud\/messages\/1780674600_000000\/meta\.json/u) + assert.doesNotMatch(harness.sent[0].input.text, /Slack messages changed/u) + assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], { + eventsReceived: 1_001, + eventsInjected: 1, + eventsCoalesced: 0, + eventsDropped: 0, + queueDepth: 0, + mountCount: 0 + }) +}) + +test('integration event dispatcher coalesces rapid distinct revisions for the same path', async () => { + const harness = makeHarness(['alice']) + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'github', + integrationId: 'github-1', + mountPaths: ['/github/repos'], + scope: { notifyAgents: ['alice'] } + }) + ]) + + for (let index = 0; index < 10; index += 1) { + harness.subscribeCalls[0].onChange(changeEvent( + '/github/repos/acme/widgets.json', + 'github', + { revision: `rev-${index}` } + )) + } + + await waitUntil(() => harness.sent.length === 1) + + assert.equal(harness.sent[0].input.data?.eventId, 'evt:/github/repos/acme/widgets.json') + assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], { + eventsReceived: 10, + eventsInjected: 1, + eventsCoalesced: 9, + eventsDropped: 0, + queueDepth: 0, + mountCount: 0 + }) +}) + +test('integration event fanout sends to recipients sequentially', async () => { + let maxActiveSends = 0 + const harness = makeHarness( + Array.from({ length: 12 }, (_, index) => `agent-${index}`), + { + sendDelayMs: 2, + onSendStart: (activeSends) => { + maxActiveSends = Math.max(maxActiveSends, activeSends) + } + } + ) + + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'linear', + integrationId: 'linear-1', + mountPaths: ['/linear/issues'] + }) + ]) + + await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear')) + await waitUntil(() => harness.sent.length === 12) + + assert.equal(maxActiveSends, 1) + assert.deepEqual(harness.sent.map((message) => message.input.to), [ + 'agent-0', + 'agent-1', + 'agent-10', + 'agent-11', + 'agent-2', + 'agent-3', + 'agent-4', + 'agent-5', + 'agent-6', + 'agent-7', + 'agent-8', + 'agent-9' + ]) +}) + test('integration event telemetry records coalescing and queue depth callbacks', async () => { const harness = makeHarness(['alice']) diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index d20da2be..892c3958 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -33,6 +33,9 @@ const SLACK_DM_EVENT_GLOBS = [ '/slack/users/*/messages/**' ] const MAX_EVENT_CONTEXT_PREVIEW_BYTES = 32 * 1024 +const MAX_DISPATCH_QUEUE_EVENTS = 50 +const MAX_DISPATCH_SUMMARY_GROUPS = 10 +const MAX_DISPATCHED_EVENTS_PER_SECOND = 25 type IntegrationEventCounterName = 'eventsReceived' | 'eventsInjected' | 'eventsCoalesced' | 'eventsDropped' type IntegrationEventGaugeName = 'queueDepth' | 'mountCount' @@ -72,6 +75,20 @@ type EventContextPreview = { type EventContextPreviewMetadata = Omit +type DispatchItem = { + event: ChangeEvent + specs: SubscriptionSpec[] +} + +type DispatchSummary = { + count: number + provider: string + groupPath: string + label: string + specs: SubscriptionSpec[] + latestEvent: ChangeEvent +} + type LocalMountSubscription = Subscription & { localRoots: string[] } @@ -967,6 +984,116 @@ function eventChangeFingerprint(event: ChangeEvent): string | null { return typeof fingerprint === 'string' ? fingerprint.trim() : null } +function eventProvider(event: ChangeEvent): string { + const provider = eventRecordValue(event, 'provider') + if (typeof provider === 'string' && provider.trim()) return provider.trim().toLowerCase() + return pathSegments(event.resource.path)[0]?.toLowerCase() || 'integration' +} + +function slackChannelLabel(channelSegment: string): string { + const [, label] = channelSegment.split(/__(.+)/u) + return label ? `#${label}` : channelSegment +} + +function dispatchCoalescingKey(event: ChangeEvent): string { + const provider = eventProvider(event) + const segments = pathSegments(event.resource.path) + if (provider === 'slack') { + const channelIndex = segments.indexOf('channels') + const messageIndex = segments.indexOf('messages') + const threadIndex = segments.indexOf('threads') + const replyIndex = segments.indexOf('replies') + if (channelIndex >= 0 && segments[channelIndex + 1]) { + const channel = segments[channelIndex + 1] + if (messageIndex >= 0 && segments[messageIndex + 1]) { + return `${provider}:channel:${channel}:message:${segments[messageIndex + 1]}` + } + if (threadIndex >= 0 && segments[threadIndex + 1]) { + const thread = segments[threadIndex + 1] + if (replyIndex >= 0 && segments[replyIndex + 1]) { + return `${provider}:channel:${channel}:thread:${thread}:reply:${segments[replyIndex + 1]}` + } + return `${provider}:channel:${channel}:thread:${thread}` + } + return `${provider}:channel:${channel}:${segments.slice(channelIndex + 2).join('/')}` + } + } + return `${provider}:${normalizeRelayfilePath(event.resource.path)}` +} + +function dispatchSummaryForEvent(event: ChangeEvent, specs: SubscriptionSpec[]): DispatchSummary { + const provider = eventProvider(event) + const segments = pathSegments(event.resource.path) + if (provider === 'slack') { + const channelIndex = segments.indexOf('channels') + if (channelIndex >= 0 && segments[channelIndex + 1]) { + const channel = segments[channelIndex + 1] + return { + count: 1, + provider, + groupPath: `/${segments.slice(0, channelIndex + 2).join('/')}`, + label: slackChannelLabel(channel), + specs, + latestEvent: event + } + } + } + + const groupSegments = segments.length > 1 ? segments.slice(0, -1) : segments + const groupPath = groupSegments.length > 0 ? `/${groupSegments.join('/')}` : normalizeRelayfilePath(event.resource.path) + return { + count: 1, + provider, + groupPath, + label: groupPath, + specs, + latestEvent: event + } +} + +function dispatchSummaryKey(summary: DispatchSummary): string { + return `${summary.provider}:${summary.groupPath}` +} + +function compactedSummaryTitle(summary: DispatchSummary): string { + const providerLabel = summary.provider.charAt(0).toUpperCase() + summary.provider.slice(1) + if (summary.provider === 'slack') { + return `${summary.count} Slack messages changed in ${summary.label}` + } + return `${summary.count} ${providerLabel} records changed under ${summary.label}` +} + +function dispatchSummaryEvent(projectId: string, summary: DispatchSummary): ChangeEvent { + const occurredAt = new Date().toISOString() + const title = compactedSummaryTitle(summary) + return { + id: `summary:${projectId}:${summary.provider}:${summary.groupPath}:${Date.now()}`, + workspace: summary.latestEvent.workspace, + type: 'relayfile.changed.summary', + occurredAt, + resource: { + path: summary.groupPath, + provider: summary.provider, + kind: 'summary', + id: summary.groupPath + }, + summary: { + title, + compactedIntegrationEvents: summary.count, + latestEventId: summary.latestEvent.id, + latestEventPath: summary.latestEvent.resource.path + }, + digest: `summary:${summary.count}:${summary.latestEvent.id}`, + expand: async () => ({ + level: 'summary', + path: summary.groupPath, + summary: { + title + } + }) + } as ChangeEvent +} + function logIntegrationEvent(message: string, metadata: Record): void { if (!isIntegrationEventDebugEnabled()) return console.debug(`[integration-events] ${message}`, metadata) @@ -1245,8 +1372,167 @@ function formatIntegrationEventMessage( return lines.join('\n') } +class ProjectEventDispatcher { + private readonly queue: DispatchItem[] = [] + private readonly pendingByKey = new Map() + private readonly summariesByKey = new Map() + private readonly projectId: string + private readonly deliver: (event: ChangeEvent, specs: SubscriptionSpec[]) => Promise + private drainTimer: ReturnType | null = null + private draining = false + private windowStartedAt = 0 + private dispatchedInWindow = 0 + private active = true + + constructor( + projectId: string, + deliver: (event: ChangeEvent, specs: SubscriptionSpec[]) => Promise + ) { + this.projectId = projectId + this.deliver = deliver + } + + enqueue(event: ChangeEvent, specs: SubscriptionSpec[]): void { + if (!this.active) return + + const key = dispatchCoalescingKey(event) + const pending = this.pendingByKey.get(key) + if (pending) { + pending.event = event + pending.specs = specs + incrementIntegrationEventCounter(this.projectId, 'eventsCoalesced') + this.scheduleDrain(0) + return + } + + if (this.queue.length >= MAX_DISPATCH_QUEUE_EVENTS) { + this.compact(event, specs) + this.updateDepthGauge() + this.scheduleDrain(0) + return + } + + const item = { event, specs } + this.queue.push(item) + this.pendingByKey.set(key, item) + this.updateDepthGauge() + this.scheduleDrain(0) + } + + dispose(): void { + this.active = false + if (this.drainTimer) clearTimeout(this.drainTimer) + this.drainTimer = null + this.queue.length = 0 + this.pendingByKey.clear() + this.summariesByKey.clear() + this.updateDepthGauge() + } + + private compact(event: ChangeEvent, specs: SubscriptionSpec[]): void { + const summary = dispatchSummaryForEvent(event, specs) + const key = dispatchSummaryKey(summary) + const existing = this.summariesByKey.get(key) + if (existing) { + existing.count += 1 + existing.specs = specs + existing.latestEvent = event + incrementIntegrationEventCounter(this.projectId, 'eventsCoalesced') + return + } + + if (this.summariesByKey.size >= MAX_DISPATCH_SUMMARY_GROUPS) { + incrementIntegrationEventCounter(this.projectId, 'eventsDropped') + logIntegrationEvent('dropped event because dispatcher summary budget is full', { + projectId: this.projectId, + eventId: event.id, + path: event.resource.path, + maxSummaryGroups: MAX_DISPATCH_SUMMARY_GROUPS + }) + return + } + + this.summariesByKey.set(key, summary) + incrementIntegrationEventCounter(this.projectId, 'eventsCoalesced') + } + + private scheduleDrain(delayMs: number): void { + if (!this.active || this.drainTimer || this.draining) return + this.drainTimer = setTimeout(() => { + this.drainTimer = null + void this.drain() + }, delayMs) + } + + private async drain(): Promise { + if (!this.active || this.draining) return + this.draining = true + try { + while (this.active && (this.queue.length > 0 || this.summariesByKey.size > 0)) { + const waitMs = this.nextRateLimitDelayMs() + if (waitMs > 0) { + this.scheduleDrain(waitMs) + return + } + + const item = this.queue.shift() + if (item) { + this.pendingByKey.delete(dispatchCoalescingKey(item.event)) + this.updateDepthGauge() + await this.deliverItem(item.event, item.specs) + continue + } + + const [key, summary] = this.summariesByKey.entries().next().value as [string, DispatchSummary] + this.summariesByKey.delete(key) + this.updateDepthGauge() + await this.deliverItem(dispatchSummaryEvent(this.projectId, summary), summary.specs) + } + } finally { + this.draining = false + this.updateDepthGauge() + if (this.active && (this.queue.length > 0 || this.summariesByKey.size > 0)) { + this.scheduleDrain(this.nextRateLimitDelayMs()) + } + } + } + + private nextRateLimitDelayMs(): number { + const now = Date.now() + if (this.windowStartedAt === 0 || now - this.windowStartedAt >= 1_000) { + this.windowStartedAt = now + this.dispatchedInWindow = 0 + } + if (this.dispatchedInWindow < MAX_DISPATCHED_EVENTS_PER_SECOND) return 0 + return Math.max(1, 1_000 - (now - this.windowStartedAt)) + } + + private async deliverItem(event: ChangeEvent, specs: SubscriptionSpec[]): Promise { + this.dispatchedInWindow += 1 + try { + await this.deliver(event, specs) + } catch (error) { + const errorMessage = toErrorMessage(error) + warnIntegrationEventAggregated( + `event delivery failed:${this.projectId}`, + 'event delivery failed', + { + projectId: this.projectId, + eventId: event.id, + error: errorMessage + } + ) + } + } + + private updateDepthGauge(): void { + setIntegrationEventGauge(this.projectId, 'queueDepth', this.queue.length + this.summariesByKey.size) + } +} + export class IntegrationEventBridge { private subscriptions = new Map() + private dispatchers = new Map() private recentInjections = new Map() private readonly deps: IntegrationEventBridgeDeps @@ -1319,15 +1605,15 @@ export class IntegrationEventBridge { type: event.type, path: event.resource.path }) - void this.injectEvent(projectId, event, specs, { + void this.enqueueEvent(projectId, event, specs, { source: 'remote', subscriptionStartedAtMs: remoteSubscriptionStartedAtMs, localMountWorkspaceId: handle.localMountWorkspaceId }).catch((error) => { const errorMessage = toErrorMessage(error) warnIntegrationEventAggregated( - `event delivery failed:${projectId}`, - 'event delivery failed', + `event enqueue failed:${projectId}`, + 'event enqueue failed', { projectId, eventId: event.id, @@ -1359,15 +1645,15 @@ export class IntegrationEventBridge { path: event.resource.path, source: 'local-mount' }) - void this.injectEvent(projectId, event, specs, { + void this.enqueueEvent(projectId, event, specs, { source: 'local-mount', subscriptionStartedAtMs: remoteSubscriptionStartedAtMs, localMountWorkspaceId: handle.localMountWorkspaceId }).catch((error) => { const errorMessage = toErrorMessage(error) warnIntegrationEventAggregated( - `local event delivery failed:${projectId}`, - 'local event delivery failed', + `local event enqueue failed:${projectId}`, + 'local event enqueue failed', { projectId, eventId: event.id, @@ -1402,6 +1688,8 @@ export class IntegrationEventBridge { async close(projectId: string): Promise { const subscription = this.subscriptions.get(projectId) this.subscriptions.delete(projectId) + this.dispatchers.get(projectId)?.dispose() + this.dispatchers.delete(projectId) setIntegrationEventGauge(projectId, 'queueDepth', 0) setIntegrationEventGauge(projectId, 'mountCount', 0) if (!subscription) return @@ -1413,7 +1701,7 @@ export class IntegrationEventBridge { } private async readEventContextPreview(projectId: string, event: ChangeEvent): Promise { - if (event.type === 'file.deleted') return undefined + if (event.type === 'file.deleted' || event.type === 'relayfile.changed.summary') return undefined const path = eventSummaryValue(event.resource.path) if (!path) return undefined @@ -1437,14 +1725,13 @@ export class IntegrationEventBridge { } } - private async injectEvent( + private async enqueueEvent( projectId: string, event: ChangeEvent, specs: SubscriptionSpec[], options: EventInjectionOptions ): Promise { if (!shouldNotifyRelayfileChange(event)) { - incrementIntegrationEventCounter(projectId, 'eventsDropped') logIntegrationEvent('skipped filtered path', { projectId, eventId: event.id, @@ -1456,8 +1743,8 @@ export class IntegrationEventBridge { const eventMatchedSpecs = specsForEvent(event, specs) const matchedSpecs = historicalRemoteReplayAllowedSpecs(event, eventMatchedSpecs, options) if (matchedSpecs.length === 0) { - incrementIntegrationEventCounter(projectId, 'eventsDropped') if (eventMatchedSpecs.length > 0) { + incrementIntegrationEventCounter(projectId, 'eventsDropped') logIntegrationEvent('skipped historical remote replay', { projectId, eventId: event.id, @@ -1478,6 +1765,21 @@ export class IntegrationEventBridge { return } + let dispatcher = this.dispatchers.get(projectId) + if (!dispatcher) { + dispatcher = new ProjectEventDispatcher(projectId, (queuedEvent, queuedSpecs) => + this.injectEvent(projectId, queuedEvent, queuedSpecs) + ) + this.dispatchers.set(projectId, dispatcher) + } + dispatcher.enqueue(event, matchedSpecs) + } + + private async injectEvent( + projectId: string, + event: ChangeEvent, + matchedSpecs: SubscriptionSpec[] + ): Promise { const duplicateKey = injectionDeduplicationKey(projectId, event, matchedSpecs) const fingerprint = eventChangeFingerprint(event) const recentKey = fingerprint ? `${duplicateKey}:change:${fingerprint}` : duplicateKey @@ -1537,31 +1839,31 @@ export class IntegrationEventBridge { path: event.resource.path, recipients: uniqueRecipients }) - await Promise.all( - uniqueRecipients.map((recipient) => { - const input = { - to: recipient, - from: 'integration', - text: formatIntegrationEventMessage(event, contextPreview), - priority: 0, - mode: 'steer', - data: { - kind: 'integration-event', - system: true, - eventId: event.id, - eventType: event.type, - occurredAt: event.occurredAt, - resource: isRecord(event.resource) ? { ...event.resource } : undefined, - path: event.resource.path, - contextPreview: contextPreviewData, - ...eventMetadata - } - } as const - return bridge.sendMessageAndWaitForDelivery - ? bridge.sendMessageAndWaitForDelivery(projectId, input) - : bridge.sendMessage(projectId, input) - }) - ) + for (const recipient of uniqueRecipients) { + const input = { + to: recipient, + from: 'integration', + text: formatIntegrationEventMessage(event, contextPreview), + priority: 0, + mode: 'steer', + data: { + kind: 'integration-event', + system: true, + eventId: event.id, + eventType: event.type, + occurredAt: event.occurredAt, + resource: isRecord(event.resource) ? { ...event.resource } : undefined, + path: event.resource.path, + contextPreview: contextPreviewData, + ...eventMetadata + } + } as const + if (bridge.sendMessageAndWaitForDelivery) { + await bridge.sendMessageAndWaitForDelivery(projectId, input) + } else { + await bridge.sendMessage(projectId, input) + } + } incrementIntegrationEventCounter(projectId, 'eventsInjected') }