|
| 1 | +# Event Bus |
| 2 | + |
| 3 | +In-process pub/sub plus typed request/response. Owns the global `EventBus` singleton (built on `tokio::sync::broadcast`), the `DomainEvent` enum that names every cross-module event, the `NativeRegistry` (one-to-one typed dispatch keyed by method string with zero serialization), the `EventHandler` trait + `SubscriptionHandle` RAII guard, and the bundled `TracingSubscriber` debug logger. ~33 internal call sites — every domain that emits or consumes cross-module events lives here. |
| 4 | + |
| 5 | +## Public surface |
| 6 | + |
| 7 | +- `pub struct EventBus` — `bus.rs` — broadcast singleton over `tokio::sync::broadcast`. |
| 8 | +- `pub const DEFAULT_CAPACITY: usize = 256` — `bus.rs` — default channel capacity. |
| 9 | +- `pub fn init_global(capacity: usize) -> &'static EventBus` — `bus.rs` — initialize once at startup; safe to call repeatedly. |
| 10 | +- `pub fn global() -> Option<&'static EventBus>` — `bus.rs` — accessor; returns `None` before `init_global`. |
| 11 | +- `pub fn publish_global(event: DomainEvent)` — `bus.rs` — fire-and-forget broadcast. |
| 12 | +- `pub fn subscribe_global(handler: Arc<dyn EventHandler>) -> Option<SubscriptionHandle>` — `bus.rs` — register a subscriber. |
| 13 | +- `pub enum DomainEvent` — `events.rs` — `#[non_exhaustive]` catalog of events; current variants cover Agent (`AgentTurnStarted/Completed`, `AgentError`), Memory (`MemoryStored`, `MemoryRecalled`), Channels (`ChannelInboundMessage`, `ChannelMessageReceived/Processed`, `ChannelReactionReceived/Sent`, `ChannelConnected/Disconnected`), Cron (`CronJobTriggered/Completed`, `CronDeliveryRequested`), Skills, Tools, Webhooks, and System. |
| 14 | +- `pub trait EventHandler` — `subscriber.rs:12-24` — `name()` + optional `domains()` filter + async `handle()`. |
| 15 | +- `pub struct SubscriptionHandle` — `subscriber.rs:29` — RAII; drop aborts the subscriber task. |
| 16 | +- `pub struct TracingSubscriber` — `tracing.rs` — built-in handler that logs every event at `debug` level. |
| 17 | +- `pub struct NativeRegistry` — `native_request.rs` — typed in-process request/response dispatcher keyed by method string. |
| 18 | +- `pub enum NativeRequestError` — `native_request.rs` — `MethodNotFound`, `TypeMismatch`, etc. |
| 19 | +- `pub fn init_native_registry() -> &'static NativeRegistry` / `pub fn native_registry() -> Option<&'static NativeRegistry>` / `pub fn register_native_global` / `pub fn request_native_global` — `native_request.rs`. |
| 20 | +- `pub mod testing` — `testing.rs` — helpers to build isolated bus / registry instances per test. |
| 21 | + |
| 22 | +## Calls into |
| 23 | + |
| 24 | +- `tokio::sync::broadcast` for the broadcast channel. |
| 25 | +- `async_trait` and `tokio::task::JoinHandle` for handler plumbing. |
| 26 | +- No openhuman-domain dependencies — this module sits below every domain. |
| 27 | + |
| 28 | +## Called by |
| 29 | + |
| 30 | +- ~33 sites across the workspace. Hot consumers: |
| 31 | +- `src/openhuman/agent/bus.rs`, `agent/triage/{events,evaluator,escalation}.rs`, `tools/impl/agent/{dispatch,spawn_subagent}.rs` — agent + sub-agent events. |
| 32 | +- `src/openhuman/memory/conversations/bus.rs` — conversation persistence subscriber. |
| 33 | +- `src/openhuman/channels/bus.rs` — `ChannelInboundSubscriber`. |
| 34 | +- `src/openhuman/cron/{bus,scheduler}.rs` — `CronDeliverySubscriber` + `CronJobTriggered` emission. |
| 35 | +- `src/openhuman/webhooks/bus.rs` — `WebhookRequestSubscriber`. |
| 36 | +- `src/openhuman/health/bus.rs` — health-event subscriber. |
| 37 | +- `src/openhuman/update/scheduler.rs` — update-cycle events. |
| 38 | +- `src/openhuman/tree_summarizer/{engine,bus}.rs` — async summarisation triggers. |
| 39 | +- `src/openhuman/composio/bus.rs`, `notifications/`, `learning/` — analytics fan-out. |
| 40 | + |
| 41 | +## Tests |
| 42 | + |
| 43 | +- Unit: `bus_tests.rs`, `events_tests.rs`, `native_request_tests.rs`. |
| 44 | +- Test infrastructure: `testing.rs` exposes helpers; many domain tests construct a fresh `NativeRegistry::new()` for isolation, or override an existing method by re-registering it. |
0 commit comments