Skip to content

feat(event_bus): wire webhooks, channels & skills through the event bus#379

Merged
senamakel merged 9 commits intotinyhumansai:mainfrom
senamakel:fix/event_bus_impl
Apr 6, 2026
Merged

feat(event_bus): wire webhooks, channels & skills through the event bus#379
senamakel merged 9 commits intotinyhumansai:mainfrom
senamakel:fix/event_bus_impl

Conversation

@senamakel
Copy link
Copy Markdown
Member

@senamakel senamakel commented Apr 6, 2026

Summary

  • Enriched DomainEvent variants with full payloads (content, arguments, responses, timing, errors) so subscribers have everything they need without secondary lookups
  • Added event publishers across three modules: webhooks (register/unregister/received/processed), channels (connected/disconnected/message received/processed), skills (loaded/stopped/failed/executed)
  • Decoupled socket from domain logic — moved webhook routing into WebhookRequestSubscriber and channel inbound handling into ChannelInboundSubscriber, making socket a thin event publisher
  • Created domain bus.rs files for webhooks, channels, and skills following the existing cron/bus.rs pattern

Architecture change

Before: socket/event_handlers.rs directly called into skills registry for webhook routing and into the web channel provider for inbound messages (~500 lines of cross-domain logic).

After: Socket parses events and publishes to the bus. Domain subscribers handle their own logic:

  • webhooks/bus.rs::WebhookRequestSubscriber — routes requests to skills, emits responses via global_socket_manager().emit()
  • channels/bus.rs::ChannelInboundSubscriber — runs agent loop, replies via REST API
  • socket/event_handlers.rs is now ~180 lines of pure transport

Test plan

  • cargo check — compiles clean
  • cargo test event_bus — all 8 bus tests pass (including new domain() coverage test)
  • cargo test cron::bus — existing cron subscriber tests still pass
  • Manual: webhook requests route correctly via event bus
  • Manual: channel inbound messages trigger agent loop via event bus

🤖 Generated with Claude Code

Summary by CodeRabbit

Release Notes

  • New Features

    • Implemented event-driven architecture for channels and webhooks with comprehensive lifecycle tracking.
    • Added global event bus initialization with cross-module subscriber registration.
    • Enhanced domain events with richer payloads including request details, response data, and processing metrics.
    • Skill lifecycle events now tracked and published (startup, shutdown, execution).
  • Documentation

    • Expanded event bus architecture documentation with domain ownership patterns and subscriber implementation guidelines.

senamakel and others added 4 commits April 6, 2026 14:58
- Added new `DomainEvent` variants for channel and skill events, including `ChannelMessageReceived`, `ChannelMessageProcessed`, `ChannelConnected`, `ChannelDisconnected`, `SkillLoaded`, `SkillStopped`, and `SkillStartFailed`.
- Implemented event publishing in the channels and skills modules to track message processing and skill lifecycle events.
- Created dedicated event bus handler files for the skills and webhooks domains, preparing for future subscriber implementations.
- Updated documentation in `CLAUDE.md` to reflect the new domain events and their usage.

These changes improve the observability and modularity of the system by leveraging an event-driven architecture for cross-module communication.
- Introduced `ChannelInboundSubscriber` to handle inbound channel messages, triggering the agent inference loop and sending replies via the backend REST API.
- Added `WebhookRequestSubscriber` to manage incoming webhook requests, routing them to the appropriate skill and handling responses.
- Updated the global event bus initialization in `bootstrap_skill_runtime` to register both channel and webhook subscribers.
- Enhanced `DomainEvent` with new variants for channel inbound messages and webhook requests, improving event-driven communication across modules.

These changes enhance the modularity and responsiveness of the system by leveraging an event-driven architecture for channel and webhook interactions.
… initialization

- Revised the documentation in `CLAUDE.md` to provide a concise overview of domain events and their associated subscriber files, enhancing clarity for future development.
- Updated the `start_channels` function to initialize `WebhookRequestSubscriber` and `ChannelInboundSubscriber`, ensuring proper event handling for webhooks and channel messages.
- Streamlined the event bus subscriber registration process, reinforcing the modular architecture of the system.

These changes improve the maintainability and usability of the event bus framework, facilitating better cross-module communication.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 6, 2026

Warning

Rate limit exceeded

@senamakel has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 7 minutes and 3 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 7 minutes and 3 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2dbcc0b9-8e50-469b-a7f6-d29f339bdf27

📥 Commits

Reviewing files that changed from the base of the PR and between 7ab4c48 and 25baec3.

📒 Files selected for processing (3)
  • src/core/jsonrpc.rs
  • src/openhuman/socket/event_handlers.rs
  • src/openhuman/webhooks/bus.rs
📝 Walkthrough

Walkthrough

This PR introduces an event-driven architecture for domain-specific event handlers. It extends the event bus with new domain events, registers subscriber implementations for channels and webhooks, publishes events from various subsystems (socket handling, channel supervision, skill engine), and replaces direct event handling with publish-subscribe patterns.

Changes

Cohort / File(s) Summary
Event Bus Core
src/openhuman/event_bus/events.rs
Extended DomainEvent enum with 8 new variants (channel inbound/processed, webhook lifecycle, skill lifecycle) and enriched 5 existing variants with additional payload fields. Updated domain() routing logic to cover all new variants. Added unit test enumerating all variants.
Channel Domain Handler
src/openhuman/channels/bus.rs, src/openhuman/channels/mod.rs
New ChannelInboundSubscriber subscribes to inbound channel messages, starts agent chat, awaits completion/error events with 180s timeout, and sends replies via REST API. Helper function send_channel_reply constructs client and sends responses. Exported via new public bus module.
Webhook Domain Handler
src/openhuman/webhooks/bus.rs, src/openhuman/webhooks/mod.rs
New WebhookRequestSubscriber subscribes to incoming webhook requests, performs tunnel lookup, routes based on target kind (echo/skill/error), publishes processed event, and emits socket response. Exported via new public bus module.
Skills Domain Stubs
src/openhuman/skills/bus.rs, src/openhuman/skills/mod.rs
Added placeholder module documentation for future skill event bus handlers. Exported via new public bus module.
Bootstrap & Initialization
src/core/jsonrpc.rs, src/openhuman/channels/runtime/startup.rs
Initialize global event bus in bootstrap_skill_runtime() and register WebhookRequestSubscriber and ChannelInboundSubscriber. Added clarifying comment in start_channels documenting subscriber registration location.
Event Publishing: Channels
src/openhuman/channels/runtime/dispatch.rs, src/openhuman/channels/runtime/supervision.rs
Publish DomainEvent::ChannelMessageReceived on inbound, ChannelMessageProcessed on completion with elapsed time and success flag. Supervision now emits ChannelConnected and ChannelDisconnected events with reason strings.
Event Publishing: Webhooks
src/openhuman/webhooks/router.rs
Emit WebhookRegistered and WebhookUnregistered events on tunnel registration changes; unregister_skill now publishes one event per removed tunnel.
Event Publishing: Skills
src/openhuman/skills/qjs_engine.rs
Publish SkillLoaded, SkillStartFailed on lifecycle transitions, and SkillExecuted on tool invocation with arguments, result, elapsed time.
Socket Event Routing
src/openhuman/socket/event_handlers.rs
Removed inline handle_webhook_request and handle_channel_inbound_message logic; now publishes WebhookIncomingRequest and ChannelInboundMessage events to event bus and delegates processing to subscribers. Simplified error handling via direct socket emission.
Documentation
CLAUDE.md
Enumerated supported domains and added section describing per-domain bus.rs module pattern with concrete examples for cross-module subscriber implementations.

Sequence Diagram(s)

sequenceDiagram
    participant Source as Event Sources<br/>(Socket, Channel, Webhook, Skill)
    participant Bus as Global Event Bus
    participant Router as Domain Router
    participant Sub as Subscribers<br/>(Channel, Webhook, Skills)
    participant Handler as Event Handlers<br/>(Chat, REST API, etc.)

    Source->>Bus: publish_global(DomainEvent)
    activate Bus
    Bus->>Router: route by domain
    Router->>Sub: dispatch to matching subscribers
    deactivate Bus
    
    activate Sub
    alt Channel Domain
        Sub->>Handler: start_chat / send_reply
    else Webhook Domain
        Sub->>Handler: lookup tunnel / route request
    else Skills Domain
        Sub->>Handler: record lifecycle event
    end
    Handler-->>Sub: result
    deactivate Sub
    
    Sub-->>Source: async complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • YellowSnnowmann
  • graycyrus

🐰 Hop hop! Events now flow through the gardens,
Channels and webhooks dance with grace,
Subscribers listen, handlers respond,
A symphony of messages in async space! ✨🌳

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(event_bus): wire webhooks, channels & skills through the event bus' directly and comprehensively describes the main change: integrating webhooks, channels, and skills into the event bus architecture.
Docstring Coverage ✅ Passed Docstring coverage is 96.30% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

…nnels

WebhookRequestSubscriber and ChannelInboundSubscriber were registered in
both bootstrap_skill_runtime() and start_channels(), causing events to
be handled twice when both paths run in the same process.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/openhuman/webhooks/router.rs (1)

170-199: ⚠️ Potential issue | 🟡 Minor

Only emit WebhookUnregistered when something was actually removed.

The else branch treats an unknown tunnel as a no-op, but the new publish at the end still emits DomainEvent::WebhookUnregistered. Retries or idempotent cleanup will therefore create false state transitions for bus consumers.

🧩 Minimal fix
-        if let Some(existing) = routes.get(tunnel_uuid) {
+        let removed = if let Some(existing) = routes.get(tunnel_uuid) {
             if existing.skill_id != skill_id {
                 return Err(format!(
                     "Tunnel {} is owned by skill '{}'; skill '{}' cannot unregister it",
                     tunnel_uuid, existing.skill_id, skill_id
                 ));
             }
             debug!(
                 "[webhooks] Unregistering tunnel {} (skill '{}')",
                 tunnel_uuid, skill_id
             );
-            routes.remove(tunnel_uuid);
+            routes.remove(tunnel_uuid).is_some()
         } else {
             debug!(
                 "[webhooks] Tunnel {} not found for unregister (skill '{}')",
                 tunnel_uuid, skill_id
             );
-        }
+            false
+        };
 
         drop(routes);
-        self.publish_event("registration_changed", None, Some(tunnel_uuid.to_string()));
-        self.persist();
-
-        publish_global(DomainEvent::WebhookUnregistered {
-            tunnel_id: tunnel_uuid.to_string(),
-            skill_id: skill_id.to_string(),
-        });
+        if removed {
+            self.publish_event("registration_changed", None, Some(tunnel_uuid.to_string()));
+            self.persist();
+            publish_global(DomainEvent::WebhookUnregistered {
+                tunnel_id: tunnel_uuid.to_string(),
+                skill_id: skill_id.to_string(),
+            });
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/webhooks/router.rs` around lines 170 - 199, The unregister
function currently always emits DomainEvent::WebhookUnregistered via
publish_global even when no route was removed; change unregister (the method on
the router) so that publish_global(DomainEvent::WebhookUnregistered { ... }) is
only called when an existing route was found and removed (i.e., inside the if
let Some(existing) { ... } branch after routes.remove(tunnel_uuid)); likewise
only call self.publish_event("registration_changed", ...) and self.persist()
when a removal actually occurred to avoid emitting false state transitions.
src/openhuman/channels/runtime/dispatch.rs (1)

91-107: ⚠️ Potential issue | 🟠 Major

Publish ChannelMessageProcessed on provider-init failures.

This branch returns before the later terminal-event emit, so consumers see ChannelMessageReceived with no matching completion whenever provider creation fails. Move started_at above this branch and publish a failure ChannelMessageProcessed before returning.

💡 Suggested fix
+    let started_at = Instant::now();
     let target_channel = ctx.channels_by_name.get(&msg.channel).cloned();
     let active_provider = match get_or_create_provider(ctx.as_ref(), &route.provider).await {
         Ok(provider) => provider,
         Err(err) => {
             let safe_err = providers::sanitize_api_error(&err.to_string());
             let message = format!(
                 "⚠️ Failed to initialize provider `{}`. Please run `/models` to choose another provider.\nDetails: {safe_err}",
                 route.provider
             );
             if let Some(channel) = target_channel.as_ref() {
                 let _ = channel
                     .send(
-                        &SendMessage::new(message, &msg.reply_target)
+                        &SendMessage::new(&message, &msg.reply_target)
                             .in_thread(msg.thread_ts.clone()),
                     )
                     .await;
             }
+            publish_global(DomainEvent::ChannelMessageProcessed {
+                channel: msg.channel.clone(),
+                sender: msg.sender.clone(),
+                content: msg.content.clone(),
+                response: message,
+                elapsed_ms: started_at.elapsed().as_millis() as u64,
+                success: false,
+            });
             return;
         }
     };
-
-    let started_at = Instant::now();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/channels/runtime/dispatch.rs` around lines 91 - 107, The
provider-init error branch returns before emitting the terminal event; move
creation of started_at (the timestamp used for the terminal event) before
calling get_or_create_provider(ctx.as_ref(), &route.provider).await, and before
returning publish a ChannelMessageProcessed failure event with the same
identifiers (use the same message/thread context and reply_target as used for
SendMessage::new) so consumers see a matching completion; ensure you still send
the human-facing SendMessage (using
providers::sanitize_api_error(&err.to_string())) and then publish
ChannelMessageProcessed (failed status and started_at) prior to the early
return.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/core/jsonrpc.rs`:
- Around line 697-702: The two local subscription handles
(_webhook_request_handle and _channel_inbound_handle) created in
bootstrap_skill_runtime() are dropped at function exit which cancels their
background tasks; instead, move subscription registration into long-lived
runtime state by registering the subscribers via the global helper (use
event_bus::subscribe_global) or store the returned SubscriptionHandle in your
runtime-owned state (e.g., the runtime singleton used for startup) so they are
not dropped when bootstrap_skill_runtime() returns; locate the subscriber
constructors crate::openhuman::webhooks::bus::WebhookRequestSubscriber::new()
and crate::openhuman::channels::bus::ChannelInboundSubscriber::new(), replace
local bus.subscribe(...) assignments with event_bus::subscribe_global(...) (or
assign the handles into the runtime state structure) and ensure registration
happens during startup (e.g., channels/runtime/startup.rs) so the background
tasks remain alive.

In `@src/openhuman/channels/bus.rs`:
- Around line 86-91: The handler currently returns early when ev.event ==
"chat_done" (or "chat:done") and ev.full_response is empty, leaving the user
with no reply; instead, replace the early return with sending a generic fallback
message (e.g. "Sorry, I couldn't generate a response right now. Please try
again.") via the same outbound/reply flow you use for non-empty replies so the
client always receives a response; keep the tracing::warn! for visibility and
use ev.full_response.unwrap_or_default() (or the existing reply variable) to
decide between the real reply and the fallback.

In `@src/openhuman/channels/runtime/supervision.rs`:
- Around line 51-54: The code currently publishes the raw provider error via
publish_global(DomainEvent::ChannelDisconnected { channel:
ch.name().to_string(), reason: e.to_string() }) which may leak credentials/PII;
instead map or convert the error into a redacted reason code or enum (e.g.,
"PROVIDER_ERROR", "AUTH_FAILED", "TIMEOUT", or ChannelDisconnectReason) before
publishing. Change the call to compute a sanitized reason (for example via a
helper like map_error_to_reason(&e) or by matching on the error type and
returning a fixed string/enum) and pass that redacted value as reason; do not
include e.to_string() or raw error text in DomainEvent::ChannelDisconnected.
Ensure any helper used is referenced (map_error_to_reason,
ChannelDisconnectReason, publish_global, DomainEvent::ChannelDisconnected,
ch.name()) so reviewers can locate and verify the change.

In `@src/openhuman/event_bus/events.rs`:
- Around line 3-5: DomainEvent and its variants currently clone full
ChannelBody/webhook payloads and tool args/results into the broadcasted enum;
change those variants to carry only identifiers and small redacted
previews/lengths instead of raw payloads (e.g., replace payload:
FullWebhookPayload with payload_id: Uuid, preview: String, size_bytes: usize)
and remove Debug/Display of raw fields to avoid accidental logging; update
constructors/emitters (the DomainEvent creation sites and any functions that
build events) to store full payloads in a secure store/cache and publish only
the ID+redacted preview, and ensure types like
WebhookPayload/ToolArgs/ToolResult are not cloned into the bus (use references
or opaque IDs) and that sensitive fields are redacted per guidelines before
including any preview.

In `@src/openhuman/socket/event_handlers.rs`:
- Around line 26-27: handle_sio_event currently prefers global_socket_manager()
to send a 400 response and can drop the response if the global manager isn't
initialized; change the logic to use the per-connection emitter (_emit_tx) as
the fallback (or primary when global_socket_manager() returns None) so the
active socket always receives the response. Locate the send path where
global_socket_manager() is used and modify it to check for Some(manager) and use
manager.emit/whatever when present, otherwise serialize the 400 response and
send it via the provided _emit_tx UnboundedSender<String> (the variables to
touch are _emit_tx, handle_sio_event, and any call sites referencing
global_socket_manager()). Ensure the same fallback fix is applied to the other
occurrence range around lines 95-113 so both code paths use _emit_tx when the
global manager is unavailable.

In `@src/openhuman/webhooks/bus.rs`:
- Around line 83-90: The code builds JSON error bodies by hand-escaping strings
(e.g., in the WebhookResponseData.body using base64_encode(&format!(...)) with
reg.skill_id/target_kind), which fails for backslashes, newlines and other
control chars; fix by constructing a proper serde_json value/object (e.g.,
create a struct or use serde_json::json!({ "error": format!(...) }) or
serde_json::to_string(&obj)) and then base64_encode the serialized string;
update all occurrences that currently use base64_encode(&format!(...)) (the
branches creating WebhookResponseData for unimplemented targets/errors) to
serialize with serde_json::to_string/json! instead of manual escaping.
- Around line 51-53: The webhook lifecycle events WebhookReceived and
WebhookProcessed are using request.tunnel_uuid for tunnel_id, which prevents
joining with WebhookRegistered/WebhookUnregistered that use the real tunnel ID;
update the event construction to use the real tunnel identifier
(request.tunnel_id) instead of request.tunnel_uuid (e.g., set tunnel_id =
request.tunnel_id.clone()). Apply the same change to the other occurrences noted
(the later WebhookReceived/WebhookProcessed creation around the 195-210 area) so
all lifecycle events consistently publish the true tunnel_id.

---

Outside diff comments:
In `@src/openhuman/channels/runtime/dispatch.rs`:
- Around line 91-107: The provider-init error branch returns before emitting the
terminal event; move creation of started_at (the timestamp used for the terminal
event) before calling get_or_create_provider(ctx.as_ref(),
&route.provider).await, and before returning publish a ChannelMessageProcessed
failure event with the same identifiers (use the same message/thread context and
reply_target as used for SendMessage::new) so consumers see a matching
completion; ensure you still send the human-facing SendMessage (using
providers::sanitize_api_error(&err.to_string())) and then publish
ChannelMessageProcessed (failed status and started_at) prior to the early
return.

In `@src/openhuman/webhooks/router.rs`:
- Around line 170-199: The unregister function currently always emits
DomainEvent::WebhookUnregistered via publish_global even when no route was
removed; change unregister (the method on the router) so that
publish_global(DomainEvent::WebhookUnregistered { ... }) is only called when an
existing route was found and removed (i.e., inside the if let Some(existing) {
... } branch after routes.remove(tunnel_uuid)); likewise only call
self.publish_event("registration_changed", ...) and self.persist() when a
removal actually occurred to avoid emitting false state transitions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2fddef45-9166-4b5c-b367-51827a486f3c

📥 Commits

Reviewing files that changed from the base of the PR and between 5c7c104 and 7ab4c48.

📒 Files selected for processing (15)
  • CLAUDE.md
  • src/core/jsonrpc.rs
  • src/openhuman/channels/bus.rs
  • src/openhuman/channels/mod.rs
  • src/openhuman/channels/runtime/dispatch.rs
  • src/openhuman/channels/runtime/startup.rs
  • src/openhuman/channels/runtime/supervision.rs
  • src/openhuman/event_bus/events.rs
  • src/openhuman/skills/bus.rs
  • src/openhuman/skills/mod.rs
  • src/openhuman/skills/qjs_engine.rs
  • src/openhuman/socket/event_handlers.rs
  • src/openhuman/webhooks/bus.rs
  • src/openhuman/webhooks/mod.rs
  • src/openhuman/webhooks/router.rs

Comment thread src/core/jsonrpc.rs Outdated
Comment on lines +697 to +702
let _webhook_request_handle = bus.subscribe(Arc::new(
crate::openhuman::webhooks::bus::WebhookRequestSubscriber::new(),
));
let _channel_inbound_handle = bus.subscribe(Arc::new(
crate::openhuman::channels::bus::ChannelInboundSubscriber::new(),
));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Keep these subscription handles alive after bootstrap.

SubscriptionHandle::drop aborts the background task, so both subscribers are cancelled as soon as bootstrap_skill_runtime() returns. After that, socket-published webhook/channel events have no long-lived domain consumers. Store the handles in runtime-owned state instead of locals, and register them once through the global helper. As per coding guidelines, "Register event subscribers in startup (e.g., channels/runtime/startup.rs) via the singleton using event_bus::subscribe_global(handler)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/core/jsonrpc.rs` around lines 697 - 702, The two local subscription
handles (_webhook_request_handle and _channel_inbound_handle) created in
bootstrap_skill_runtime() are dropped at function exit which cancels their
background tasks; instead, move subscription registration into long-lived
runtime state by registering the subscribers via the global helper (use
event_bus::subscribe_global) or store the returned SubscriptionHandle in your
runtime-owned state (e.g., the runtime singleton used for startup) so they are
not dropped when bootstrap_skill_runtime() returns; locate the subscriber
constructors crate::openhuman::webhooks::bus::WebhookRequestSubscriber::new()
and crate::openhuman::channels::bus::ChannelInboundSubscriber::new(), replace
local bus.subscribe(...) assignments with event_bus::subscribe_global(...) (or
assign the handles into the runtime state structure) and ensure registration
happens during startup (e.g., channels/runtime/startup.rs) so the background
tasks remain alive.

Comment on lines +86 to +91
if ev.event == "chat_done" || ev.event == "chat:done" {
let reply = ev.full_response.unwrap_or_default();
if reply.trim().is_empty() {
tracing::warn!("[channel-inbound] agent returned empty response");
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't silently drop empty chat_done responses.

If full_response is empty, this handler just returns and the inbound user gets no reply at all. Treat that as an error path and send a generic fallback message instead.

💡 Suggested fix
                                 let reply = ev.full_response.unwrap_or_default();
                                 if reply.trim().is_empty() {
                                     tracing::warn!("[channel-inbound] agent returned empty response");
+                                    send_channel_reply(
+                                        channel,
+                                        "Sorry, I couldn't generate a response. Please try again.",
+                                    )
+                                    .await;
                                     return;
                                 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if ev.event == "chat_done" || ev.event == "chat:done" {
let reply = ev.full_response.unwrap_or_default();
if reply.trim().is_empty() {
tracing::warn!("[channel-inbound] agent returned empty response");
return;
}
if ev.event == "chat_done" || ev.event == "chat:done" {
let reply = ev.full_response.unwrap_or_default();
if reply.trim().is_empty() {
tracing::warn!("[channel-inbound] agent returned empty response");
send_channel_reply(
channel,
"Sorry, I couldn't generate a response. Please try again.",
)
.await;
return;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/channels/bus.rs` around lines 86 - 91, The handler currently
returns early when ev.event == "chat_done" (or "chat:done") and ev.full_response
is empty, leaving the user with no reply; instead, replace the early return with
sending a generic fallback message (e.g. "Sorry, I couldn't generate a response
right now. Please try again.") via the same outbound/reply flow you use for
non-empty replies so the client always receives a response; keep the
tracing::warn! for visibility and use ev.full_response.unwrap_or_default() (or
the existing reply variable) to decide between the real reply and the fallback.

Comment on lines +51 to +54
publish_global(DomainEvent::ChannelDisconnected {
channel: ch.name().to_string(),
reason: e.to_string(),
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't publish raw provider errors onto the event bus.

e.to_string() becomes part of DomainEvent::ChannelDisconnected, and this flow is traced centrally for observability. That turns provider-specific error text into a loggable event payload, including any credentials or PII embedded in the original message. Publish a redacted reason code here instead of the raw error string. As per coding guidelines, "Never log secrets, API keys, JWTs, credentials, or full PII in Rust debug logs; redact or omit sensitive fields".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/channels/runtime/supervision.rs` around lines 51 - 54, The code
currently publishes the raw provider error via
publish_global(DomainEvent::ChannelDisconnected { channel:
ch.name().to_string(), reason: e.to_string() }) which may leak credentials/PII;
instead map or convert the error into a redacted reason code or enum (e.g.,
"PROVIDER_ERROR", "AUTH_FAILED", "TIMEOUT", or ChannelDisconnectReason) before
publishing. Change the call to compute a sanitized reason (for example via a
helper like map_error_to_reason(&e) or by matching on the error type and
returning a fixed string/enum) and pass that redacted value as reason; do not
include e.to_string() or raw error text in DomainEvent::ChannelDisconnected.
Ensure any helper used is referenced (map_error_to_reason,
ChannelDisconnectReason, publish_global, DomainEvent::ChannelDisconnected,
ch.name()) so reviewers can locate and verify the change.

Comment on lines +3 to +5
//! Events carry full payloads so subscribers have everything they need without
//! secondary lookups. The broadcast channel clones each event per subscriber,
//! which is fine — richness beats round-trips.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

These broadcast events are carrying too much raw user/tool data.

Full channel bodies, webhook payloads, tool arguments, and tool results are now deep-cloned into every broadcast send. That is a memory multiplier, and any subscriber that logs DomainEvent will dump raw user/tool data as well. Prefer IDs plus redacted previews/lengths, and keep raw payloads out of the general bus.

As per coding guidelines, "Never log secrets, API keys, JWTs, credentials, or full PII in Rust debug logs; redact or omit sensitive fields".

Also applies to: 39-61, 88-95, 112-143

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/event_bus/events.rs` around lines 3 - 5, DomainEvent and its
variants currently clone full ChannelBody/webhook payloads and tool args/results
into the broadcasted enum; change those variants to carry only identifiers and
small redacted previews/lengths instead of raw payloads (e.g., replace payload:
FullWebhookPayload with payload_id: Uuid, preview: String, size_bytes: usize)
and remove Debug/Display of raw fields to avoid accidental logging; update
constructors/emitters (the DomainEvent creation sites and any functions that
build events) to store full payloads in a secure store/cache and publish only
the ID+redacted preview, and ensure types like
WebhookPayload/ToolArgs/ToolResult are not cloned into the bus (use references
or opaque IDs) and that sensitive fields are redacted per guidelines before
including any preview.

Comment on lines +26 to 27
_emit_tx: &mpsc::UnboundedSender<String>,
shared: &Arc<SharedState>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep the per-connection emit channel as the fallback response path.

This malformed-webhook branch now depends on global_socket_manager(), but handle_sio_event already has the socket emitter for the active connection. If the global manager is not set yet, the 400 response is silently dropped even though the socket is alive.

💡 Suggested fix
 pub(super) fn handle_sio_event(
     event_name: &str,
     data: serde_json::Value,
-    _emit_tx: &mpsc::UnboundedSender<String>,
+    emit_tx: &mpsc::UnboundedSender<String>,
     shared: &Arc<SharedState>,
 ) {
@@
-                    if let Some(mgr) = crate::openhuman::socket::global_socket_manager() {
-                        let body = base64_encode(&format!(
-                            "{{\"error\":\"Bad request: {}\"}}",
-                            e.to_string().replace('"', "\\\"")
-                        ));
-                        let response_data = json!({
-                            "correlationId": cid,
-                            "statusCode": 400,
-                            "headers": {},
-                            "body": body,
-                        });
-                        let mgr = mgr.clone();
-                        tokio::spawn(async move {
-                            if let Err(e) = mgr.emit("webhook:response", response_data).await {
-                                log::error!("[socket] Failed to emit webhook error response: {e}");
-                            }
-                        });
-                    }
+                    let response_data = json!({
+                        "correlationId": cid,
+                        "statusCode": 400,
+                        "headers": {},
+                        "body": base64_encode(&json!({
+                            "error": format!("Bad request: {e}")
+                        }).to_string()),
+                    });
+                    emit_via_channel(emit_tx, "webhook:response", response_data);
                 }
             }
         }

Also applies to: 95-113

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/socket/event_handlers.rs` around lines 26 - 27,
handle_sio_event currently prefers global_socket_manager() to send a 400
response and can drop the response if the global manager isn't initialized;
change the logic to use the per-connection emitter (_emit_tx) as the fallback
(or primary when global_socket_manager() returns None) so the active socket
always receives the response. Locate the send path where global_socket_manager()
is used and modify it to check for Some(manager) and use manager.emit/whatever
when present, otherwise serialize the 400 response and send it via the provided
_emit_tx UnboundedSender<String> (the variables to touch are _emit_tx,
handle_sio_event, and any call sites referencing global_socket_manager()).
Ensure the same fallback fix is applied to the other occurrence range around
lines 95-113 so both code paths use _emit_tx when the global manager is
unavailable.

Comment on lines +51 to +53
let correlation_id = request.correlation_id.clone();
let tunnel_uuid = request.tunnel_uuid.clone();
let tunnel_name = request.tunnel_name.clone();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Publish the actual tunnel_id in webhook lifecycle events.

WebhookReceived and WebhookProcessed both populate tunnel_id from request.tunnel_uuid. That makes these events impossible to join reliably with WebhookRegistered / WebhookUnregistered, which use the real tunnel ID.

💡 Suggested fix
+        let tunnel_id = request.tunnel_id.clone();
         let tunnel_uuid = request.tunnel_uuid.clone();
         let tunnel_name = request.tunnel_name.clone();
@@
         if let Some(ref sid) = resolved_skill_id {
             publish_global(DomainEvent::WebhookReceived {
-                tunnel_id: tunnel_uuid.clone(),
+                tunnel_id: tunnel_id.clone(),
                 skill_id: sid.clone(),
                 method: method.clone(),
                 path: path.clone(),
                 correlation_id: correlation_id.clone(),
             });
         }
         publish_global(DomainEvent::WebhookProcessed {
-            tunnel_id: tunnel_uuid.clone(),
+            tunnel_id: tunnel_id.clone(),
             skill_id: resolved_skill_id.clone().unwrap_or_default(),
             method: method.clone(),
             path: path.clone(),
             correlation_id: correlation_id.clone(),
             status_code: response.status_code,

Also applies to: 195-210

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/webhooks/bus.rs` around lines 51 - 53, The webhook lifecycle
events WebhookReceived and WebhookProcessed are using request.tunnel_uuid for
tunnel_id, which prevents joining with WebhookRegistered/WebhookUnregistered
that use the real tunnel ID; update the event construction to use the real
tunnel identifier (request.tunnel_id) instead of request.tunnel_uuid (e.g., set
tunnel_id = request.tunnel_id.clone()). Apply the same change to the other
occurrences noted (the later WebhookReceived/WebhookProcessed creation around
the 195-210 area) so all lifecycle events consistently publish the true
tunnel_id.

Comment thread src/openhuman/webhooks/bus.rs
senamakel and others added 4 commits April 6, 2026 15:39
…tion exit

SubscriptionHandle::drop aborts the background task. Since
bootstrap_skill_runtime() returns immediately after setup, the local
handles were dropped, cancelling both subscribers. Use std::mem::forget
to leak the handles so the tasks live for the entire process.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Modified the handling of subscriber registration to prevent premature dropping of handles in `bootstrap_skill_runtime()`. This change ensures that the background tasks for subscribers remain active for the entire process lifecycle, enhancing event handling reliability.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Hand-escaped JSON strings only handled double quotes, not backslashes,
newlines, or other control chars. Replaced with serde_json serialization
via an error_body() helper.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Hand-escaped JSON strings only handled double quotes, not backslashes,
newlines, or other control chars. Replaced with serde_json serialization
via an error_body() helper.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant