Skip to content

feat: stream BrainBar brain bus events#295

Merged
EtanHey merged 1 commit into
fix/brainbar-hybrid-parityfrom
feat/watch-brain-bus
May 18, 2026
Merged

feat: stream BrainBar brain bus events#295
EtanHey merged 1 commit into
fix/brainbar-hybrid-parityfrom
feat/watch-brain-bus

Conversation

@EtanHey
Copy link
Copy Markdown
Owner

@EtanHey EtanHey commented May 18, 2026

Summary

  • Add a watch-brain-bus IPC method on the existing BrainBar Unix socket, streaming newline-delimited JSON notifications for queue depth, enrichment status, last stored chunk, DB busy state, and health ticks.
  • Replace the dashboard's one-second polling loop with a push-driven BrainBusClient subscription; Darwin DB notifications remain a fallback path.
  • Add a bounded per-subscriber event hub so slow UI clients cannot block producers.

Stacking

This PR is stacked on Phase D IPC foundation PR #293 (fix/brainbar-hybrid-parity) and follows its helper-subprocess IPC conventions: one existing UDS, long-lived raw JSON watch connection, existing Content-Length framing preserved for framed callers.

Before / After

Before: StatsCollector.start() created a pollTask, slept for one second in a loop, and refreshed dashboard state directly.

After: StatsCollector subscribes to BrainBusClient over /tmp/brainbar.sock; dashboard state changes are driven by pushed watch-brain-bus events, with no UI polling loop.

Tailscale Safesocket Pattern

  • Reuses the existing local Unix socket rather than opening a second listener.
  • Nonblocking accept/write path remains in place.
  • Watch subscribers get bounded buffers and are disconnected cleanly on socket close.
  • Slow subscribers drop old events instead of back-pressuring store/enrich paths.

Test Plan

  • TDD RED/GREEN for BrainBusEventHubTests, SocketIntegrationTests/testWatchBrainBusStreamsStoreEventsOverRawUnixSocket, and DashboardTests/testStatsCollectorSubscribesToBrainBusWithoutPollingDelay.
  • Clean staged temp worktree: swift test --package-path brain-bar -> 360 tests passed.
  • Push hook full gate passed on second run: pytest unit suite 2017 passed, 9 skipped, 75 deselected, 1 xfailed; MCP registration 3 passed; isolated eval/hook routing 32 passed; bun stale index 1 pass; FTS5 determinism shell regression PASS.

Notes

The first push hook run hit tests/test_arbitration.py::test_drain_daemon_serializes_three_concurrent_producers; the single test passed immediately on rerun and the full hook passed on the second push attempt.


Note

Medium Risk
Adds a new long-lived socket subscription path and buffered event fanout, which affects IPC framing/write behavior and could impact client stability or server queueing under load.

Overview
Adds a new watch-brain-bus JSON-RPC method on the BrainBar Unix socket that streams notifications/brain-bus events (DB busy, queue depth, enrich status, last stored chunk ID, and periodic health ticks) to subscribed clients, with per-subscriber buffering/backpressure via the new BrainBusEventHub.

Updates the dashboard StatsCollector to consume these pushed events via a new BrainBusClient (replacing the 1s polling loop when available) and wires this client into app startup. Socket write handling is tweaked (higher EAGAIN retry cap + shared write helper), and new unit/integration tests cover hub ordering/backpressure, dashboard subscription, and end-to-end brain bus streaming.

Reviewed by Cursor Bugbot for commit b65df04. Bugbot is set up for automated code reviews on this repo. Configure here.

Note

Stream real-time brain bus events from BrainBar server to dashboard clients

  • Adds a watch-brain-bus JSON-RPC method to BrainBarServer.swift that pushes notifications/brain-bus events (db_busy, queue_depth, enrich_status, last_chunk_id, health_tick) over the existing UNIX socket connection.
  • Introduces BrainBusEvent.swift with BrainBusEventHub for sequenced, backpressure-aware fan-out to multiple subscribers, and BrainBusClient.swift as the client-side consumer that reconnects on disconnect.
  • Replaces the 1-second polling loop in StatsCollector.swift with a push-driven event listener when a BrainBusEventSource is provided.
  • Increases socket write maxWriteRetries from 10 to 50 to reduce disconnects under transient EAGAIN conditions.
  • Behavioral Change: StatsCollector no longer polls on a fixed interval when wired to a BrainBusClient; dashboard updates are now event-driven.
📊 Macroscope summarized b65df04. 5 files reviewed, 9 issues evaluated, 0 issues filtered, 3 comments posted

🗂️ Filtered Issues

Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@EtanHey
Copy link
Copy Markdown
Owner Author

EtanHey commented May 18, 2026

@greptileai review

@EtanHey
Copy link
Copy Markdown
Owner Author

EtanHey commented May 18, 2026

@coderabbitai review

@EtanHey
Copy link
Copy Markdown
Owner Author

EtanHey commented May 18, 2026

@cursor @BugBot review

@EtanHey
Copy link
Copy Markdown
Owner Author

EtanHey commented May 18, 2026

@codex review

@cursor
Copy link
Copy Markdown

cursor Bot commented May 18, 2026

You need to increase your spend limit or enable usage-based billing to run background agents. Go to Cursor

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

Warning

Rate limit exceeded

@EtanHey has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 58 minutes and 31 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 72f8f90f-7ccb-4df9-97cb-5143446a48b0

📥 Commits

Reviewing files that changed from the base of the PR and between 942d195 and b65df04.

📒 Files selected for processing (8)
  • brain-bar/Sources/BrainBar/BrainBarApp.swift
  • brain-bar/Sources/BrainBar/BrainBarServer.swift
  • brain-bar/Sources/BrainBar/BrainBusClient.swift
  • brain-bar/Sources/BrainBar/BrainBusEvent.swift
  • brain-bar/Sources/BrainBar/Dashboard/StatsCollector.swift
  • brain-bar/Tests/BrainBarTests/BrainBusEventHubTests.swift
  • brain-bar/Tests/BrainBarTests/DashboardTests.swift
  • brain-bar/Tests/BrainBarTests/SocketIntegrationTests.swift
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/watch-brain-bus

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b65df049fb

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

return true
}

let delivered = self.writeFramedData(fd: fd, data: framed)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Serialize brain-bus writes per client

When a Content-Length client uses watch-brain-bus on the same socket as normal MCP calls, this write runs from the hub's per-subscriber drain queue while ordinary RPC responses still write the same fd on the server queue. Because the stream socket has no message boundaries and there is no per-client write serialization here, a notification can interleave with sendResponse and corrupt both Content-Length frames for that client.

Useful? React with 👍 / 👎.

Comment on lines +466 to +467
if toolCall.name == "brain_enrich", !isToolError(response) {
brainBus.publish(.enrichStatus("running"))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Emit enrichment status around the work

For a tools/call to brain_enrich, router.handle(request) runs handleBrainEnrich synchronously before this branch is reached, so subscribers only see enrich_status: running after the enrichment call has completed, and there is no matching idle event afterward. Watchers using the new brain bus status will therefore miss the actual busy period and can remain stuck showing enrichment as running after a successful call.

Useful? React with 👍 / 👎.

return true
}

let delivered = self.writeFramedData(fd: fd, data: framed)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium BrainBar/BrainBarServer.swift:578

writeFramedData(fd:clientFD:data:) at line 578 is invoked from the brain bus subscriber's drain queue (created by BrainBusEventHub.drainNext), while sendResponse(fd:response:useContentLength:) writes to the same file descriptor from self.queue. Concurrent writes from different queues can interleave bytes, corrupting the JSON-RPC stream. Dispatch the writeFramedData call to self.queue to serialize all socket I/O.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file brain-bar/Sources/BrainBar/BrainBarServer.swift around line 578:

`writeFramedData(fd:clientFD:data:)` at line 578 is invoked from the brain bus subscriber's drain queue (created by `BrainBusEventHub.drainNext`), while `sendResponse(fd:response:useContentLength:)` writes to the same file descriptor from `self.queue`. Concurrent writes from different queues can interleave bytes, corrupting the JSON-RPC stream. Dispatch the `writeFramedData` call to `self.queue` to serialize all socket I/O.

Evidence trail:
BrainBarServer.swift:95 — `self.queue` defined as serial DispatchQueue
BrainBarServer.swift:392 — read source dispatches on `self.queue`
BrainBarServer.swift:436 — `sendResponse` called from read handler (on `self.queue`)
BrainBarServer.swift:484-525 — `sendResponse` performs raw `write()` at line 500
BrainBarServer.swift:528-552 — `writeFramedData` performs raw `write()` at line 533
BrainBarServer.swift:562-585 — subscriber closure calls `writeFramedData` at line 578
BrainBusEvent.swift:125 — per-subscriber `drainQueue` created as separate DispatchQueue
BrainBusEvent.swift:179 — writer callback dispatched on `drainQueue.async`

}

private func publishStoredChunk(stored: StoreResultPayload, content: String, tags: [String], importance: Int) {
brainBusQueueDepthEstimate += 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium BrainBar/BrainBarServer.swift:849

brainBusQueueDepthEstimate is incremented at line 849 but never decremented, causing the queueDepth value to grow monotonically and report increasingly incorrect values to brain-bus subscribers. Consider decrementing the estimate after chunks are processed or delivered, or switch to tracking actual in-flight count.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file brain-bar/Sources/BrainBar/BrainBarServer.swift around line 849:

`brainBusQueueDepthEstimate` is incremented at line 849 but never decremented, causing the `queueDepth` value to grow monotonically and report increasingly incorrect values to brain-bus subscribers. Consider decrementing the estimate after chunks are processed or delivered, or switch to tracking actual in-flight count.

Evidence trail:
brain-bar/Sources/BrainBar/BrainBarServer.swift:105 (declaration: `private var brainBusQueueDepthEstimate = 0`), line 849 (only increment: `+= 1`), lines 346, 590, 850 (read-only usages publishing the value). git_grep for `brainBusQueueDepthEstimate -=` returned no results, confirming no decrement exists anywhere in the codebase.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High

func start() {
guard !isRunning else { return }
isRunning = true
installDarwinObserver()
if let brainBusEvents {
let eventStream = brainBusEvents.events()
brainBusTask = Task { [weak self] in
for await event in eventStream {
guard !Task.isCancelled else { break }
await MainActor.run {
self?.handleBrainBusEvent(event)
}
}
}
} else {
refresh(force: true)
}

When brainBusEvents is provided, start() skips the initial refresh(force: true) call. If the first events are .healthTick, handleBrainBusEvent only samples daemon/agent activity and never calls refresh(), so stats remains at zeroed initial values until a different event type arrives. Consider calling refresh(force: true) unconditionally before starting the event stream.

         guard !isRunning else { return }
         isRunning = true
         installDarwinObserver()
+        refresh(force: true)
         if let brainBusEvents {
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file brain-bar/Sources/BrainBar/Dashboard/StatsCollector.swift around lines 63-79:

When `brainBusEvents` is provided, `start()` skips the initial `refresh(force: true)` call. If the first events are `.healthTick`, `handleBrainBusEvent` only samples daemon/agent activity and never calls `refresh()`, so `stats` remains at zeroed initial values until a different event type arrives. Consider calling `refresh(force: true)` unconditionally before starting the event stream.

Evidence trail:
brain-bar/Sources/BrainBar/Dashboard/StatsCollector.swift lines 47-58 (zeroed initial stats), lines 63-80 (start() method — brainBusEvents branch skips refresh(force:true)), lines 131-139 (handleBrainBusEvent — .healthTick case never calls refresh()), line 35 (lastDataVersion initialized to nil). Reviewed at commit REVIEWED_COMMIT.

@EtanHey EtanHey merged commit 82d02ae into fix/brainbar-hybrid-parity May 18, 2026
3 checks passed
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit b65df04. Configure here.


private func publishStoredChunk(stored: StoreResultPayload, content: String, tags: [String], importance: Int) {
brainBusQueueDepthEstimate += 1
brainBus.publish(.queueDepth(brainBusQueueDepthEstimate))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue depth estimate only increments, never decrements

Medium Severity

brainBusQueueDepthEstimate is incremented in publishStoredChunk but never decremented anywhere. This means the "queue depth" reported to subscribers grows monotonically with every stored chunk and never decreases, making it a total-chunks-stored counter rather than an actual queue depth metric. Subscribers receive a meaningless ever-growing number.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b65df04. Configure here.

self?.disconnectClient(fd: fd)
}
}
return delivered
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrent writes to same fd from different queues

Medium Severity

The brain bus subscriber writer calls writeFramedData from the subscriber's drainQueue, while publishStoredChunk calls sendResponse on the server's queue for the same fd. A client that uses both watch-brain-bus and brain_subscribe on one connection gets concurrent unsynchronized write() calls from two threads, corrupting data on the wire.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b65df04. Configure here.

}
if toolCall.name == "brain_enrich", !isToolError(response) {
brainBus.publish(.enrichStatus("running"))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enrich status published as "running" after completion

Low Severity

The .enrichStatus("running") event is published after router.handle(request) returns — meaning the synchronous enrichment work has already completed. The status is never subsequently reset to "idle", so it remains permanently stuck at "running" for existing subscribers after the first successful brain_enrich call.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b65df04. Configure here.

brainBus.publish(.dbBusy(database == nil))
brainBus.publish(.queueDepth(brainBusQueueDepthEstimate))
brainBus.publish(.enrichStatus("idle"))
brainBus.publish(.healthTick(openConnections: clients.count))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial state events broadcast to all subscribers globally

Medium Severity

When a new watch-brain-bus client connects, handleWatchBrainBus publishes four initial-state events (including .enrichStatus("idle")) to the global brainBus, which delivers them to ALL existing subscribers, not just the newly-joined one. This incorrectly resets state for pre-existing subscribers — for example, forcing their enrich status back to "idle" even if enrichment is active — every time any new client connects.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b65df04. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant