Add bounded integration event dispatcher#106
Conversation
|
CodeAnt AI is reviewing your PR. |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Free Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR introduces a per-project event dispatcher that coalesces and rate-limits integration events before delivery to agents and channels. It adds deterministic test infrastructure, implements the dispatcher with queue coalescing and rate limiting, refactors delivery from parallel to sequential, and verifies the new behavior with comprehensive tests. ChangesEvent Dispatcher with Sequential Delivery
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Note 🎁 Summarized by CodeRabbit FreeYour organization is on the Free plan. CodeRabbit will generate a high-level summary and a walkthrough for each pull request. For a comprehensive line-by-line review, please upgrade your subscription to CodeRabbit Pro by visiting https://app.coderabbit.ai/login. Comment |
kjgbot
left a comment
There was a problem hiding this comment.
Review: Track B (#82) — bounded per-project event dispatcher
Scope conformance: ✅ Every change maps to Track B (queue cap, per-second drain, coalescing, compaction summaries, fanout sequencing) plus its Track F counter wiring. No creep.
Acceptance criteria verified at head 390f3f7:
- 1,000-event burst ≠ 1,000 broker messages: ✅ Queue cap 50 + overflow compaction into ≤10 summary groups. The burst test proves it: 1,000 events → 51 injections (50 individual + 1 summary "950 Slack messages changed in #proj-cloud"), telemetry
eventsCoalesced: 950, eventsDropped: 0, queueDepth: 0. Drain rate-limited at 25/s (fixed window). - CPU bounded: drain is a single sequential loop with timer-based rate deferral; coalescing/compaction are O(1) map ops per event. No unbounded recursion or fanout.
- Useful summaries:
relayfile.changed.summaryevents with count + group label (Slack channel alias →#proj-cloud),latestEventId/latestEventPathfor context, routed through the normal spec-matching path. Promise.allfanout removed: ✅ sequential per-recipient sends, proven by themaxActiveSends === 1test with 12 recipients.- Slack coalescing granularity: channel/thread/message/reply path keys per the issue's open question — rapid distinct revisions of one path collapse to the latest (test: 10 revisions → 1 injection, 9 coalesced). This also resolves the dedup-loosening concern I raised on #101 — the dispatcher is now the burst-bounding layer, as intended.
Lifecycle: dispatcher disposed on close()/signature change; stale-workspace capture avoided since reconcile closes first. Drain/schedule interplay is correct (the in-loop scheduleDrain(waitMs) while draining=true is a no-op, but the finally block reschedules — redundant call, no bug).
Tests: 55/55 pass locally at head; CI checks green (smoke pending at review time).
Findings (non-blocking; recommend folding #1 during the mandatory post-FIFO rebase)
- Filtering happens after queue admission —
enqueueEventqueues every received event;shouldNotifyRelayfileChange+ spec matching run only at deliver time insideinjectEvent. Consequences during a noisy flood (the exact #99 live scenario): (a) tmp/dotfile/agent_write/unmatched events consume queue slots and the 25/s rate budget (each filtered event still incrementsdispatchedInWindowviadeliverItem), delaying real events; (b) worse, noise can fill the 50 individual slots and push real events into summary compaction; (c) summarycountincludes events that would have been filtered, so "N Slack messages changed" can materially overcount. Fix is small: runshouldNotifyRelayfileChange(event)(and ideally the spec match) inenqueueEventbeforedispatcher.enqueue. Since #106 must rebase + revalidate after #103/#104/#105 anyway, fold this then. - Per-second budget counts events, not broker messages — with R recipients, broker sends can reach 25×R/s. That's Track E's explicitly-scoped territory (broker
sendMessagerate-limiting), so fine here; noting so Track E picks it up deliberately. - Summary starvation under sustained saturation — the drain loop always prefers queued individual events over summaries; a sustained stream of fresh keys can defer summary delivery indefinitely. Bursts (the design target) are fine since the queue drains between waves. Consider interleaving (e.g. one summary per K events) if sustained floods show up in telemetry.
- Counter nit:
dispose()discards pending queue/summaries without incrementingeventsDropped— harmless at project close, but the counter understates drops if close happens mid-flood. - Head-of-line blocking observation: sequential
sendMessageAndWaitForDelivery(15s default timeout) means one stuck recipient stalls the drain; overflow then degrades to summaries, which is the designed backpressure — acceptable, just noting the failure mode for Track E's caching/rate work.
Verdict
APPROVE — Track B acceptance criteria met with direct test evidence. Fold finding #1 (pre-enqueue filtering) into the post-FIFO rebase before merge; findings 2/5 are Track E inputs; 3/4 are watch-items.
| if (this.queue.length >= MAX_DISPATCH_QUEUE_EVENTS) { | ||
| this.compact(event, specs) | ||
| this.updateDepthGauge() |
There was a problem hiding this comment.
🟠 Architect Review — HIGH
Queue overflow is compacted into summary items before per-event eligibility checks run, so bursts of events that would normally be filtered (for example stale Slack backfill/message-context events) can still produce and inject a summary notification via a relayfile.changed.summary event that bypasses the Slack-specific shouldNotifySlackMessageChange gating.
Suggestion: Apply shouldNotifyRelayfileChange (or equivalent path/provider eligibility) before adding overflow events into summary buckets, or track per-item eligibility and only compact events that would have been injectable, so that stale/backfill suppression semantics are preserved even under burst compaction.
Fix in Cursor | Fix in VSCode Claude
(Use Cmd/Ctrl + Click for best experience)
Prompt for AI Agent 🤖
This is an **Architect / Logical Review** comment left during a code review. These reviews are first-class, important findings — not optional suggestions. Do NOT dismiss this as a 'big architectural change' just because the title says architect review; most of these can be resolved with a small, localized fix once the intent is understood.
**Path:** src/main/integration-event-bridge.ts
**Line:** 1363:1365
**Comment:**
*HIGH: Queue overflow is compacted into summary items before per-event eligibility checks run, so bursts of events that would normally be filtered (for example stale Slack backfill/message-context events) can still produce and inject a summary notification via a `relayfile.changed.summary` event that bypasses the Slack-specific `shouldNotifySlackMessageChange` gating.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
If a suggested approach is provided above, use it as the authoritative instruction. If no explicit code suggestion is given, you MUST still draft and apply your own minimal, localized fix — do not punt back with 'no suggestion provided, review manually'. Keep the change as small as possible: add a guard clause, gate on a loading state, reorder an await, wrap in a conditional, etc. Do not refactor surrounding code or expand scope beyond the finding.
Once fix is implemented, also check other comments on the same PR, and ask user if the user wants to fix the rest of the comments as well. if said yes, then fetch all the comments validate the correctness and implement a minimal fix| for (const recipient of uniqueRecipients) { | ||
| const input = { | ||
| to: recipient, | ||
| from: 'integration', | ||
| text: formatIntegrationEventMessage(event, contextLines), | ||
| priority: 0, | ||
| mode: 'steer', | ||
| data: { | ||
| kind: 'integration-event', | ||
| system: true, | ||
| eventId: event.id, | ||
| eventType: event.type, | ||
| occurredAt: event.occurredAt, | ||
| resource: isRecord(event.resource) ? { ...event.resource } : undefined, | ||
| path: event.resource.path | ||
| } | ||
| } as const | ||
| if (bridge.sendMessageAndWaitForDelivery) { | ||
| await bridge.sendMessageAndWaitForDelivery(projectId, input) | ||
| } else { | ||
| await bridge.sendMessage(projectId, input) | ||
| } |
There was a problem hiding this comment.
Suggestion: The new sequential fanout stops at the first recipient send failure, so later recipients never receive the event. This is a behavioral regression from fanout semantics because one transient broker error now suppresses delivery to the remaining recipients for that event. Keep sequential sending, but handle failures per recipient (log/aggregate each failure) and continue sending to the rest. [logic error]
Severity Level: Critical 🚨
- ❌ Multi-recipient integration events can notify only the first target.
- ⚠️ Transient broker errors silently skip remaining agents/channels.
- ⚠️ Telemetry counts one failed event, hides per-recipient loss.Steps of Reproduction ✅
1. In tests, construct an IntegrationEventBridge via `makeHarness` in
`src/main/__tests__/integration-event-bridge.test.ts:31-71`, but pass many agents and `{
failSend: true }` (same harness wiring as the existing tests `integration event delivery
failures use aggregated warn cadence by default without verbose logs` and `integration
event fanout sends to recipients sequentially`).
2. Call `harness.bridge.reconcile('project-1', [integration({...})])` as in `integration
event fanout sends to recipients sequentially` at
`src/main/__tests__/integration-event-bridge.test.ts:192-200`, using a provider/mount path
that results in multiple recipients (e.g. many agents online so `uniqueRecipients` in
`injectEvent` at `src/main/integration-event-bridge.ts:1688-1703` contains more than one
name).
3. Emit a matching change event via `harness.emit(changeEvent('/linear/issues/AR-1.json',
'linear'))` (pattern from the same test at lines 200-201); the event flows through
`enqueueEvent` at `src/main/integration-event-bridge.ts:1620-1634`, into
`ProjectEventDispatcher.drain()` at lines 1422-1453, and then into `injectEvent` at lines
1636-1744.
4. Inside `injectEvent`, the loop at `src/main/integration-event-bridge.ts:1720-1741`
starts sending to `uniqueRecipients`; when the broker's `sendMessage` (wired by
`makeHarness` at `src/main/__tests__/integration-event-bridge.test.ts:69-77`) throws
`"broker unavailable"` for the first recipient due to `failSend: true`, the awaited call
rejects, aborts the `for` loop, propagates back to `ProjectEventDispatcher.deliverItem` at
lines 1465-1480 (where the error is logged), and no subsequent recipients in
`uniqueRecipients` are ever attempted or notified for that event.Fix in Cursor | Fix in VSCode Claude
(Use Cmd/Ctrl + Click for best experience)
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** src/main/integration-event-bridge.ts
**Line:** 1720:1741
**Comment:**
*Logic Error: The new sequential fanout stops at the first recipient send failure, so later recipients never receive the event. This is a behavioral regression from fanout semantics because one transient broker error now suppresses delivery to the remaining recipients for that event. Keep sequential sending, but handle failures per recipient (log/aggregate each failure) and continue sending to the rest.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask user if the user wants to fix the rest of the comments as well. if said yes, then fetch all the comments validate the correctness and implement a minimal fix| async close(projectId: string): Promise<void> { | ||
| const subscription = this.subscriptions.get(projectId) | ||
| this.subscriptions.delete(projectId) | ||
| this.dispatchers.get(projectId)?.dispose() |
There was a problem hiding this comment.
Suggestion: close() disposes the dispatcher (which clears queued events) before subscriptions are unsubscribed, so queued-but-not-yet-delivered events are dropped during reconcile/close without being delivered or counted as dropped. This can silently lose integration updates when configuration changes while the dispatcher still has backlog. Unsubscribe first (or drain/flush the dispatcher), then dispose it. [logic error]
Severity Level: Critical 🚨
- ❌ Integration updates can be lost during config changes.
- ❌ Telemetry undercounts dropped events versus eventsReceived.
- ⚠️ Agents may miss Slack/github updates after reconfiguration.Steps of Reproduction ✅
1. Start event subscriptions for a project by calling
`integrationEventBridge.reconcile(projectId, integrations)` via `syncEventSubscriptions`
in `src/main/integrations.ts:11-17` (invoked from `hydrateProject` at lines 12-15 or
`syncAgentState` at 16-26). This creates a `ProjectEventDispatcher` and registers
relayfile/local-mount subscriptions whose `onChange` callbacks call `enqueueEvent` at
`src/main/integration-event-bridge.ts:1620-1634`.
2. Generate a large burst of matching events (mirroring `integration event dispatcher
compacts large bursts into a bounded summary` at
`src/main/__tests__/integration-event-bridge.test.ts:106-145`), so that
`ProjectEventDispatcher.enqueue` at lines 1350-1375 fills `queue` and/or `summariesByKey`,
and `drain()` at lines 1422-1453 is rate-limited by `nextRateLimitDelayMs` at 1455-1463,
leaving a non-empty backlog while events are slowly delivered.
3. While telemetry `queueDepth` (maintained by `updateDepthGauge` at
`src/main/integration-event-bridge.ts:1483-1485`) is still > 0, trigger a configuration
change that re-syncs subscriptions for the same project (e.g. connect/disconnect an
integration), causing `syncAgentState` in `src/main/integrations.ts:16-26` to call
`syncEventSubscriptions`, which calls `integrationEventBridge.reconcile` at 11-17.
4. Inside `IntegrationEventBridge.reconcile` at
`src/main/integration-event-bridge.ts:1498-1533`, the first action for this project is
`await this.close(projectId)` at line 1530; `close` then calls
`this.dispatchers.get(projectId)?.dispose()` and `this.dispatchers.delete(projectId)` at
1608-1609, which clears `queue` and `summariesByKey` without delivering those queued
events or incrementing `eventsDropped`, before unsubscribing subscriptions at 1613—so all
pending integration updates in the dispatcher are silently lost and never reflected in
`eventsInjected`/`eventsDropped` telemetry.Fix in Cursor | Fix in VSCode Claude
(Use Cmd/Ctrl + Click for best experience)
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** src/main/integration-event-bridge.ts
**Line:** 1608:1613
**Comment:**
*Logic Error: `close()` disposes the dispatcher (which clears queued events) before subscriptions are unsubscribed, so queued-but-not-yet-delivered events are dropped during reconcile/close without being delivered or counted as dropped. This can silently lose integration updates when configuration changes while the dispatcher still has backlog. Unsubscribe first (or drain/flush the dispatcher), then dispose it.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask user if the user wants to fix the rest of the comments as well. if said yes, then fetch all the comments validate the correctness and implement a minimal fix|
CodeAnt AI finished reviewing your PR. |
390f3f7 to
ead8dfd
Compare
kjgbot
left a comment
There was a problem hiding this comment.
Delta re-check: 390f3f7 → ead8dfd
Delta verified as the agreed fold of review finding #1, exactly scoped:
shouldNotifyRelayfileChange+specsForEventnow run inenqueueEventbefore queue admission — tmp/dotfile/agent_write/unmatched events can no longer consume queue slots, the 25/s rate budget, or inflate summary counts.- Pre-enqueue rejections are logged as skips (
skipped filtered path/skipped unmatched path) and do not incrementeventsDropped— drops stay reserved for genuine budget-pressure discards, keeping the Track F counters diagnostic. The defensive spec-match ininjectEventis retained for dispatcher-emitted summary events. - The dispatcher now stores
matchedSpecs(not the full spec list), tightening summary targeting. - New regression matches the agreed shape: 1,000
.tmpSlack events + 1 real event → exactly one individual injection of the real event, no compaction summary,eventsDropped: 0,eventsReceived: 1001. - Async enqueue failures get aggregated warns at both call sites (no unhandled rejections).
Independently ran the full suite at ead8dfd: 56/56 pass.
Verdict
APPROVE (refreshed at head ead8dfd) — FIFO hold behind #103/#104/#105 remains; if the eventual rebase is range-diff-equal, merge under the clean-rebase convention, otherwise send the SHA pair.
kjgbot
left a comment
There was a problem hiding this comment.
Delta adjudication: ead8dfd → 00501cd (bot-pushed flushAndDispose)
Provenance: pushed by agent-relay-code[bot] onto a FIFO-held branch after the refreshed approval — correctly flagged by the author as superseding approval.
What the burst risk actually is: the feared "1,000 queued → 1,000 sends on quit" cannot happen — queue admission is already bounded (≤50 individual + ≤10 summary groups), so a close-flush emits at most ~61 sequential sends, and the bot's own test shows 1,000 events → 51 sends through close. The compaction invariant survives. That part of the semantics is sound.
Why this still fails review:
- Unbounded close latency.
flushAndDisposedrains viadeliverItem, which usessendMessageAndWaitForDelivery(15s default timeout) per recipient. With a degraded/stuck broker, a full flush is ~61 × 15s ≈ 15 minutes, andclose()is awaited byreconcile()on every integration-settings signature change — a stuck broker turns a settings tweak into a quarter-hour reconcile hang. The old dispose-and-drop path was O(1). The test covers only the healthy-broker happy path. - No drop accounting fallback. If flush can't complete (app quitting, broker gone), there's no budget after which remaining items become
eventsDropped— the design has no bounded exit. - Process question: a semantics-changing commit landed on a held branch without an owner. Whatever the verdict, branch hygiene matters more under FIFO: the approved head must be the branch head.
Verdict
REQUEST CHANGES on this delta. Two acceptable resolutions, author's choice:
- (A) Revert
00501cd, restoring the approved headead8dfd(back to dispose-and-drop). Track "flush-on-close" as a deliberate follow-up with proper bounds. Simplest; preserves the FIFO-held approved state. - (B) Keep flush-on-close but bound it: during teardown flush, use fire-and-forget
sendMessage(skip delivery-confirmation waits) or a total flush time budget (e.g. 2s); anything still queued after the budget incrementseventsDropped; add a slow-broker test provingclose()returns within the budget. This is the better long-term design (no lost notifications on settings changes) but should be done deliberately, not adopted from an unowned bot push.
If (A), my ead8dfd approval stands as-is. If (B), send the SHA pair for a delta re-check.
48ab8e5 to
ead8dfd
Compare
|
#106 is restored to exact reviewer-approved head Filed follow-up: #108 — carries reviewer-1's bounded flush-on-close acceptance bar verbatim, including ~2s close wall-time budget, leftover -> |
|
✅ pr-reviewer applied fixes — committed and pushed Implemented fixes for the validated PR issue:
Validation:
|
0f857e7 to
b0ebfcb
Compare
kjgbot
left a comment
There was a problem hiding this comment.
Delta re-check: ead8dfd → b0ebfcb (post-FIFO semantic composition over 69ad7c0)
Correctly declared as NOT a clean rebase — all five composition claims verified in the diff:
- Pre-enqueue filtering, skip-only:
shouldNotifyRelayfileChangerejects noise with a logged skip and noeventsDropped; the drop counter now fires only for genuine discards. Bonus: the unmatched-path case also stopped incrementing drops (increment moved inside theeventMatchedSpecs.length > 0branch) — the skip-vs-drop discipline is now fully consistent. - #103 composition:
specsForEvent(mount OR eventPathGlobs) runs pre-enqueue, so DM-glob-matched specs admit events to the dispatcher — preserves #97 DM delivery.historicalRemoteReplayAllowedSpecsruns before admission; replay-filtered events incrementeventsDropped+skipped historical remote replay, matching Track C/F's merged counter semantics. Partial filtering admits the allowed subset only. - matchedSpecs carried through: dispatcher queues/coalesces/summarizes the pre-matched specs;
injectEventno longer recomputes — summary spec inheritance is now explicit rather than re-derived (cleaner than the pre-rebase shape). - #105 composition:
readEventContextPreviewskipsrelayfile.changed.summary(and deleted) events — synthetic summaries trigger zero remote reads; one bounded preview per real event, sequential fanout preserved with preview in text + data. - Dispatcher semantics unchanged from approved
ead8dfd: queue 50 / summaries 10 / 25 events-per-sec / dispose-and-drop on close — the (A) resolution holds; flush-on-close remains #108.
Unadopted bot commit 0f857e7 dropped via force-with-lease per the standing rule. ✅
Independently verified at b0ebfcb: full suite 62/62 pass locally; all four Track B regressions (burst compaction, noise-before-admission with in-window Slack timestamps, revision coalescing, sequential fanout) carried through.
Verdict
APPROVE (refreshed at head b0ebfcb) — merge when checks + packaged-mcp-smoke are green at this head. Track B is the last #82 implementation track; after merge, the lane opens for codex-2's #99 follow-up and Track E.
User description
Track B for #82. Queued behind FIFO PRs #103/#104/#105; do not merge until those land and this PR is rebased/revalidated.
What changed
950 Slack messages changed in #proj-cloudinstead of flooding agents.Promise.allfanout with sequential broker sends per event.4ec4a71.Acceptance coverage
Validation
node --experimental-strip-types --no-warnings --test src/main/__tests__/integration-event-bridge.test.ts(29/29)npm test(55/55)npm run buildCoordination
This branch is based on current
origin/mainat4ec4a71(#97). #103 and #105 currently show conflicts against that main, so this PR will need another rebase after FIFO PRs #103/#104/#105 land.CodeAnt-AI Description
Bound integration event delivery to reduce bursts and duplicate notifications
What Changed
Impact
✅ Fewer notification floods during large syncs✅ Lower duplicate event delivery✅ Reduced broker send spikes🔄 Retrigger CodeAnt AI Review
💡 Usage Guide
Checking Your Pull Request
Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.
Talking to CodeAnt AI
Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:
This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.
Example
Preserve Org Learnings with CodeAnt
You can record team preferences so CodeAnt AI applies them in future reviews. Reply directly to the specific CodeAnt AI suggestion (in the same thread) and replace "Your feedback here" with your input:
This helps CodeAnt AI learn and adapt to your team's coding style and standards.
Example
Retrigger review
Ask CodeAnt AI to review the PR again, by typing:
Check Your Repository Health
To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.