Skip to content

Feat/rabbitmq masstransit async events implementation#46

Merged
Rashed99Azm merged 7 commits into
develop-backendfrom
feat/rabbitmq-masstransit-async-events-implementation
Jun 9, 2026
Merged

Feat/rabbitmq masstransit async events implementation#46
Rashed99Azm merged 7 commits into
develop-backendfrom
feat/rabbitmq-masstransit-async-events-implementation

Conversation

@Rashed99Azm

Copy link
Copy Markdown
Collaborator

Event-Driven Infrastructure — MassTransit, RabbitMQ, Redis Outbox & SignalR

Overview

This PR introduces a complete event-driven messaging infrastructure built on MassTransit, RabbitMQ, Redis, and SignalR. The implementation provides reliable asynchronous communication, transactional message delivery through the EF Core Outbox pattern, distributed real-time notifications, and Redis-backed feed/presence management.

The architecture supports both production and local development environments through configurable transport providers:

  • RabbitMQ for production deployments

  • MassTransit InMemory Transport for local development and testing

  • EF Core Transactional Outbox for reliable event delivery

  • Redis SignalR Backplane for multi-instance scale-out

  • SignalR for real-time client communication


Architecture

Domain Aggregate
      ↓
Domain Event (MediatR INotification)
      ↓
SaveChangesInterceptor (pre-commit capture)
      ↓
Domain Event Handler
      ↓
Integration Event
      ↓
IIntegrationEventPublisher
      ↓
MassTransit Bus Outbox
      ↓
Transport (RabbitMQ / InMemory)
      ↓
Consumer
      ↓
Redis + SignalR + Notifications

Event Flow

  1. Domain aggregates raise domain events.

  2. Events are captured during EF Core save operations.

  3. Application handlers translate domain events into integration events.

  4. Integration events are stored in the EF Core Outbox within the same transaction.

  5. After successful commit, MassTransit delivers messages to the configured transport.

  6. Consumers process events and update feeds, leaderboards, notifications, and real-time clients.


Added Components

MassTransit + EF Core Outbox

Namespace: CCE.Infrastructure.Notifications.Messaging

Features

  • Centralized MassTransit registration via MessagingServiceExtensions

  • EF Core transactional outbox support

  • Atomic message persistence and delivery

  • Inbox/Outbox state tracking

  • Automatic relay through BusOutboxDeliveryService

  • Configurable transport selection

  • RabbitMQ availability probe with optional fallback

Database Objects

Migration:

20260608082540_AddMassTransitOutbox

Tables:

inbox_state
outbox_state
outbox_message

Integration Events

Namespace:

CCE.Application.Common.Messaging.IntegrationEvents

Added events:

  • PostCreatedIntegrationEvent

  • VoteCreatedIntegrationEvent

  • ReplyCreatedIntegrationEvent

  • CommunityJoinRequestedIntegrationEvent

Bridge handlers convert domain events into integration events for asynchronous processing.


Consumers

Hosted exclusively in CCE.Worker.

APIs publish events only and do not execute consumer workloads.

Consumer | Event(s) | Purpose | Concurrency -- | -- | -- | -- FeedConsumer | PostCreated | Populate Redis feeds and timelines | 20 VoteConsumer | VoteCreated | Update Redis vote counters | 50 RankingConsumer | PostCreated | Rebuild community leaderboards | 1 NotificationConsumer | PostCreated, ReplyCreated, CommunityJoinRequested | Generate persisted notifications | 10 SignalRConsumer | PostCreated | Publish real-time community/topic updates | 30 NotificationMessageConsumer | NotificationMessage | Dispatch notification channels | 10

SignalR Realtime Infrastructure

NotificationsHub

Supported groups:

user:{id}
post:{id}
community:{id}
topic:{id}
moderation

Supported events:

  • ReceiveNotification

  • NewReply

  • VoteChanged

  • PollResultsChanged

  • NewPost

  • PostModerated

  • ContentModerated

  • PresenceChanged

  • TypingChanged

Scale-Out Support

  • Redis SignalR Backplane

  • Cross-instance group messaging

  • Distributed real-time notifications

Presence Tracking

RedisRealtimePresenceTracker

Features:

  • Per-post active viewer tracking

  • Connection lifecycle management

  • Graceful degradation when Redis is unavailable

Publisher Wrapper

CommunityRealtimePublisher

All Redis and SignalR operations are wrapped in exception handling to prevent user-facing failures.


Redis Infrastructure

Services

RedisFeedStore

Provides:

  • Feed storage

  • Sorted-set timelines

  • Community leaderboards

  • Feed metadata

RedisRealtimePresenceTracker

Provides:

  • User presence tracking

  • Active connection tracking

  • Redis-backed presence storage

Configuration

{
  "Infrastructure": {
    "RedisConnectionString": "localhost:6379"
  }
}

Resilience

All RedisException instances are caught and logged as warnings.

Redis outages will not result in HTTP 500 responses.


Domain Events Supported

Current domain events include:

Community

  • PostCreatedEvent

  • PostVotedEvent

  • ReplyCreatedEvent

  • CommunityJoinRequestedEvent

  • CommentCountChangedEvent

Content

  • NewsPublishedEvent

  • ResourcePublishedEvent

  • EventScheduledEvent

Country Content Requests

  • CountryContentRequestApprovedEvent

  • CountryContentRequestRejectedEvent

Expert Registration

  • ExpertRegistrationApprovedEvent

  • ExpertRegistrationRejectedEvent


Notification Pipeline

NotificationGateway

Flow:

Template Resolution
        ↓
Channel Selection
        ↓
Email / SMS / In-App Dispatch
        ↓
Persistence
        ↓
SignalR Delivery

Asynchronous Notification Dispatch

Introduced:

NotificationMessage

and

MassTransitNotificationMessageDispatcher

This replaces the previous in-process dispatcher and enables reliable asynchronous notification delivery.


Configuration

Development

{
  "Messaging": {
    "Transport": "InMemory",
    "RabbitMqHost": "localhost",
    "UseAsyncDispatcher": true,
    "FallbackToInMemoryIfUnavailable": true
  },
  "Infrastructure": {
    "RedisConnectionString": "localhost:6379"
  },
  "SignalR": {
    "ChannelPrefix": "cce"
  }
}

Production

Set:

"Transport": "RabbitMQ"

to enable broker-based messaging.


DevOps

Docker Services

Added support for:

  • Redis (redis:7-alpine)

  • RabbitMQ (rabbitmq:3-management-alpine)

via docker-compose.yml.

Health Checks

Added:

RabbitMqTcpHealthCheck

Features:

  • Host/port connectivity validation

  • 3-second timeout

  • Optional transport fallback support


Benefits

  • Reliable event delivery through transactional outbox

  • Decoupled event-driven architecture

  • Production-ready RabbitMQ integration

  • Infrastructure-free local development using InMemory transport

  • Distributed real-time notifications with SignalR

  • Redis-backed feeds, leaderboards, and presence tracking

  • Graceful degradation when Redis is unavailable

  • Cross-process scalability via Redis SignalR backplane

  • Improved fault tolerance and observability

…lly work

  The RabbitMQ/MassTransit branch had a sound topology (APIs publish-only →
  EF transactional outbox → CCE.Worker consumes) but the runtime path was
  broken and inconsistent. This wires it end-to-end and unifies event
  emission on a single Clean-Architecture pattern.

  Blocker
  - DomainEventDispatcher and AuditingInterceptor were registered only as
    concrete types, so `AddInterceptors(sp.GetServices<IInterceptor>())`
    never attached them — domain events never dispatched and audit columns
    stopped writing. Register both as IInterceptor so they attach (alongside
    MassTransit's outbox interceptor).

  Unify on aggregate → domain event → bridge → outbox → consumer
  - Add domain events PostVotedEvent, ReplyCreatedEvent,
    CommunityJoinRequestedEvent and the aggregate methods that raise them
    (Post.RegisterVote, Post.RegisterReply, Community.RegisterJoinRequest).
  - Add bridge handlers (PostVoted/ReplyCreated/CommunityJoinRequested
    BusPublisher) that translate domain events to integration events
    pre-commit, so the publish is captured atomically by the EF outbox.
  - Command handlers (VotePost, CreateReply, JoinCommunity, FollowUser,
    UnfollowUser) no longer inject IIntegrationEventPublisher. This removes
    the lost-message bug where Join/Follow/Unfollow published AFTER
    SaveChanges (outbox row never persisted) and the random-GUID mismatch in
    the join-request event.
  - Fix MassTransitIntegrationEventPublisher to use IPublishEndpoint (the
    non-existent IScopedBusContextProvider<> meant the branch didn't compile).

  Realtime (hybrid, deduped)
  - VoteConsumer no longer pushes VoteChanged (the API handler owns the
    instant push); it keeps only the Redis hot-counter update.
  - Remove the dead ICommunityRealtimePublisher injection from CreatePost.

  Move work off the API thread + cleanup
  - PostCreated notification fan-out now runs in NotificationConsumer
    (Worker); delete PostCreatedNotificationHandler.
  - Carry the real join-request id and the post Locale on the integration
    events.
  - Remove unconsumed dead contracts (UserFollowed, UserUnfollowed,
    ResourcePublished) and their inline publishing.

  Tests + docs
  - Add CommunityIntegrationEventConsumerHarnessTests (broker-free MassTransit
    harness): VoteCreated→VoteConsumer, PostCreated→NotificationConsumer
    (fan-out + Locale), PostCreated→SignalRConsumer.
  - Add docs/community-async-events-guide.md and document the canonical
    pattern in docs/masstransit-messaging-guide.md.

  Verified: solution builds clean; Domain 320/320; new harness tests 3/3;
  Application_does_not_depend_on_Infrastructure passes.
@Rashed99Azm Rashed99Azm merged commit 2df1ccd into develop-backend Jun 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants