feat(notifications): bridge CEF providers into core ingest pipeline with dedup#874
Conversation
…ith dedup Wire CDP-migrated providers (Slack, WhatsApp, Discord, Telegram) into the core triage pipeline by calling ingestNotification in handleFired after the existing Redux dispatches. Add a 60-second content-hash deduplication check (exists_recent) in the Rust store, invoke it at the top of handle_ingest to drop duplicate fires, and standardise observability prefixes to [notification_intel] throughout rpc.rs.
📝 WalkthroughWalkthroughFrontend forwards webview-fired notifications into an async ingest pipeline (fire-and-forget); backend store transactionally skips inserts for recent (60s) duplicates and returns Changes
Sequence DiagramsequenceDiagram
actor Webview
participant Frontend as "WebviewNotifications Service"
participant Backend as "notification_intel RPC"
participant DB as "Notifications Store"
participant Redux as "Redux Slice"
Webview->>Frontend: webview notification fired
Frontend->>Backend: ingestNotification (async, fire-and-forget)
Backend->>DB: insert_if_not_recent(provider, account_id, title, body, received_at)
DB-->>Backend: bool (inserted)
alt Insert skipped (duplicate)
Backend-->>Frontend: { skipped: true, reason: "duplicate" }
Frontend->>Frontend: log skip reason
else Inserted
Backend->>DB: persist notification (completed)
Backend->>Backend: spawn triage task
Backend-->>Frontend: { id, status: "unread", received_at }
Frontend->>Redux: dispatch addNotification
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/src/lib/webviewNotifications/service.ts`:
- Around line 89-90: The debug log call using log('[notification_intel]
forwarding to core ingest provider=%s account=%s', provider, accountId) must not
emit raw PII; update the log in service.ts to keep provider but redact accountId
(e.g., replace accountId with a deterministic hash, a truncated fingerprint, or
a simple boolean like accountPresent=true/false) so the code still conveys
identity presence without logging the full email/identifier; ensure you
reference the same log call and variables (log, provider, accountId) when making
the change.
In `@src/openhuman/notifications/rpc.rs`:
- Around line 55-60: The debug log in the is_dup branch currently emits raw user
content via %req.title; change it to avoid logging PII by replacing the raw
title with a non-sensitive identifier (e.g., title length or a stable hash/ID).
Update the tracing::debug call inside the if is_dup block (around the
notification_intel duplicate check) to log provider and either title_len =
req.title.len() or title_hash = sha256(req.title) or an internal
notification_id, and remove %req.title from the message.
- Around line 45-63: The dedup race comes from calling store::exists_recent(...)
and then store::insert(...) in separate DB connections; replace those two calls
with a single transactional store API (e.g. add store::insert_if_not_recent or
store::insert_notification_tx) that opens one SQLite connection/transaction,
checks for a recent duplicate and inserts only if none is found, returning
whether the insert was skipped; update notification_ingest to call that new
transactional method instead of exists_recent and insert so the check+insert is
atomic and prevents concurrent duplicate inserts.
In `@src/openhuman/notifications/store.rs`:
- Around line 470-480: The test exists_recent_detects_duplicate only checks that
a freshly inserted matching row is found but doesn't verify that an older
(stale) matching row outside the 60-second window is ignored; update the test to
insert a second notification with the same channel/title/body but with
received_at set to older than 60 seconds (e.g., now - Duration::from_secs(61))
using sample_notification or by mutating the Notification before calling insert,
then assert that exists_recent(&config, "slack", None, "Test notification",
"Test body").unwrap() returns false for that stale row; keep the original fresh
insert assertions as well so you cover both fresh and stale behavior.
- Around line 237-264: The query in exists_recent compares the RFC3339 TEXT
column received_at lexicographically to datetime('now','-60 seconds'), causing
incorrect matches; update both SQL branches in exists_recent
(integration_notifications table) to compare timestamps numerically, e.g., use
strftime('%s', received_at) >= strftime('%s','now','-60 seconds') (or an
equivalent julianday/epoch conversion) so the comparison is by epoch seconds
rather than string ordering; keep the same params and error handling but replace
the received_at predicate in both the Some(account_id) and None branches.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 82a015d2-4515-4f44-a3b8-961e9bedcfaa
📒 Files selected for processing (3)
app/src/lib/webviewNotifications/service.tssrc/openhuman/notifications/rpc.rssrc/openhuman/notifications/store.rs
Make ingest dedup atomic in a single transaction, compare recent-window timestamps by epoch, and redact account identifiers from debug logs while adding regression coverage for stale duplicate detection. Made-with: Cursor
graycyrus
left a comment
There was a problem hiding this comment.
🔍 Code Review — PR #874
Walkthrough
This PR closes the ingestion gap for CDP-migrated providers (Slack, WhatsApp, Discord, Telegram) by wiring handleFired in the webview notifications service to call ingestNotification(...), bridging into the Rust core triage pipeline. It adds atomic content-hash deduplication via a new insert_if_not_recent store function (single SQLite transaction), standardises log prefixes to [notification_intel], and ships 3 new unit tests.
Changes
| File | Summary |
|---|---|
app/src/lib/webviewNotifications/service.ts |
Wire handleFired → ingestNotification (fire-and-forget), dispatch addNotification to Redux on success |
src/openhuman/notifications/rpc.rs |
Replace store::insert with atomic store::insert_if_not_recent, add duplicate-skip branch, rename all log prefixes |
src/openhuman/notifications/store.rs |
Add insert_if_not_recent (transactional dedup+insert), exists_recent (standalone check), 3 new tests |
Actionable Comments
See inline comments below.
Trigger a fresh PR merge commit against current main so CI re-evaluates checks with the up-to-date base state. Made-with: Cursor
Enrich forwarded raw payload fields for triage context, make optimistic notification shape explicit, add a dedup query index, and remove unused exists_recent paths in favor of transactional insert_if_not_recent coverage. Made-with: Cursor
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/openhuman/notifications/store.rs (1)
421-554: Consider splitting this module to stay under the Rust file-size guideline.
store.rsis now 555 lines. Moving tests (or dedup logic) into a dedicated submodule would keep this easier to maintain.As per coding guidelines, "
src/**/*.rs: Source files should be ≤ ~500 lines; split modules when growing to improve maintainability".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/openhuman/notifications/store.rs` around lines 421 - 554, The tests module in store.rs has grown too large (≈555 lines); move the #[cfg(test)] mod tests into a new test-only file or submodule to keep store.rs under ~500 lines. Create a new file (e.g., store_tests.rs or tests/mod.rs) or a submodule (mod tests;) and relocate the entire tests block, ensuring you import the same symbols (insert, list, unread_count, mark_read, update_triage, insert_if_not_recent, get_settings, upsert_settings, sample_notification, test_config) and adjust visibility/imports (use super::* or crate::openhuman::notifications::store::* as needed) so all tests compile unchanged. Update store.rs to reference the new tests module with #[cfg(test)] mod tests;.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/openhuman/notifications/store.rs`:
- Around line 503-525: Add a unit test that covers the Some(account_id)
deduplication branch by creating notifications with account_id set (use
sample_notification and then set n.account_id = Some("acct1".into())), insert
the first, then call insert_if_not_recent to assert that a duplicate with the
same account_id is rejected, and also assert that a notification with a
different account_id (e.g. Some("acct2".into())) is accepted; use
TempDir::new(), test_config, insert, and insert_if_not_recent to mirror the
existing tests (similar to insert_if_not_recent_skips_duplicate and
insert_if_not_recent_rejects_expired_window_only) so coverage exercises the
account_id IS NOT NULL path.
- Around line 119-177: Add development-oriented debug/trace logs inside
insert_if_not_recent: log an entry message at the start of insert_if_not_recent
with a stable grep-friendly prefix (e.g. "[notifications::store]
insert_if_not_recent entry") including non-PII correlation fields such as
n.provider and a boolean account_id.is_some(); log the dedup branch decision
when count > 0 with the same prefix and the dedup result ("duplicate") and then
an exit log before returning false; log a success exit after the INSERT commit
with the prefix and result ("inserted"); and log failures on query/insert/commit
error paths via debug/error with the same prefix but without logging PII (do not
log title, body, raw_payload, or full account identifiers). Ensure logs use
Rust's debug/trace macros and reference insert_if_not_recent and the transaction
(tx) operations to place them near the dedup query, the duplicate return, and
the post-insert commit.
- Around line 121-149: The DEFERRED transaction started by
unchecked_transaction() lets concurrent callers both see count==0 and leads to
duplicate inserts; replace unchecked_transaction() with
transaction_with_behavior(TransactionBehavior::Immediate) so the code acquires a
write lock before running the COUNT query and subsequent insert/commit (i.e.,
change how tx is created in the insert_if_not_recent flow so the COUNT + insert
are serialized), keeping the existing COUNT query, params, and commit/error
handling intact.
---
Nitpick comments:
In `@src/openhuman/notifications/store.rs`:
- Around line 421-554: The tests module in store.rs has grown too large (≈555
lines); move the #[cfg(test)] mod tests into a new test-only file or submodule
to keep store.rs under ~500 lines. Create a new file (e.g., store_tests.rs or
tests/mod.rs) or a submodule (mod tests;) and relocate the entire tests block,
ensuring you import the same symbols (insert, list, unread_count, mark_read,
update_triage, insert_if_not_recent, get_settings, upsert_settings,
sample_notification, test_config) and adjust visibility/imports (use super::* or
crate::openhuman::notifications::store::* as needed) so all tests compile
unchanged. Update store.rs to reference the new tests module with #[cfg(test)]
mod tests;.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c77e1072-cd11-49ae-9005-155b0e9f83da
📒 Files selected for processing (2)
app/src/lib/webviewNotifications/service.tssrc/openhuman/notifications/store.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- app/src/lib/webviewNotifications/service.ts
Drop the second base64 key in app/src-tauri/Cargo.toml so cargo metadata, format checks, and tauri build/test jobs can parse the manifest in PR merge context. Made-with: Cursor
Handle non-text websocket frames in the mock bridge loop so request_round_trips_list_labels_through_mock_server does not time out intermittently in CI. Made-with: Cursor
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/webview_apis_bridge.rs`:
- Around line 88-95: The message loop currently silently breaks or continues on
send failures, close frames, non-text messages, and receive errors; update the
loop (around the sink.send(Message::Text(...)).await and the match arms for
Ok(Message::Close(_)), Ok(_), and Err(_)) to log meaningful development-oriented
messages (including context like the payload or error) at each branch before
breaking/continuing — e.g., log send failure with the response text and error,
log receipt of Close with the close reason, log unexpected non-text Message with
its variant, and log receive errors with the error details; use the project’s
test/logging facility (e.g., tracing or test logger) so these paths surface
during flaky test debugging.
- Around line 92-95: Add unit tests in tests/webview_apis_bridge.rs that
exercise the non-text/close/error branches shown in the match
(Message::Close(_), non-text Ok(_), and Err(_)). Specifically, add one test
sending a Close message and assert the handler breaks/terminates as expected
(matching existing behavior used in the text tests), another test sending a
non-text successful message (e.g., a Binary/NonText Message variant) and assert
the handler continues/skips appropriately, and a third test simulating an Err
from the message stream and assert the handler breaks. Locate the
message-handling function used by the current text RPC tests (the same test
helper or function invoked by those tests) and reuse its setup to drive these
three new cases so they cover the newly introduced branches.
- Around line 62-64: The mock WebSocket server currently uses
serde_json::from_str(&text).unwrap() and
req["id"].as_str().unwrap()/req["method"].as_str().unwrap(), which will panic on
malformed frames; change the logic around req, id, and method so you gracefully
handle errors: parse serde_json::from_str(&text) with match/if let and on Err
send a JSON error response (or log and continue) instead of panicking, then
extract id and method with safe checks (e.g., req.get("id").and_then(|v|
v.as_str()) and same for "method") and send a proper error response if missing,
continuing the spawned server task rather than unwrapping. Ensure the error
responses use the same WS response path the test harness expects so the server
task remains alive.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 79b59d01-b458-4055-a59b-2d285e78af6f
📒 Files selected for processing (1)
tests/webview_apis_bridge.rs
graycyrus
left a comment
There was a problem hiding this comment.
All review feedback addressed — dead code removed, dedup index added, raw_payload enriched, optional fields explicit. CI green. LGTM.
Summary
Closes part of #718 — Phase 1 of the notification intelligence pipeline.
handleFiredinwebviewNotifications/service.tsnow callsingestNotification(...)after dispatching to the in-memory slice, mirroring the existing recipe-based path for Gmail/LinkedIn/Google Meet. On a non-skipped result the new notification is also dispatched tointegrationNotificationsRedux state.store::exists_recentchecks for an identical (provider, account_id, title, body) notification within the last 60 seconds before inserting, preventing duplicates from webview retries or reloads.[notification_intel]throughoutnotifications/rpc.rsfor consistent observability.exists_recentadded instore.rs.Test plan
openhuman.notification_listRPC).{ skipped: true, reason: "duplicate" }.cargo test -p openhuman notifications::store—exists_recent_detects_duplicatepasses.Summary by CodeRabbit