fix: implement two-phase dispose for MessageBus subscription lifecycle#492
Conversation
- Add IAsyncDisposable to IMessageBus interface - Implement ShutdownAsync/CleanupAsync two-phase dispose in MessageBusBase - Add _isDisposed guard in SendMessageToSubscribersAsync - Simplify CancellationToken.Register callbacks to dict-remove-only - Add NullMessageBus.DisposeAsync implementation - Add 6 shared lifecycle tests to MessageBusTestBase - Add InMemoryMessageBus lifecycle test overrides - Update messaging docs with disposal lifecycle and message durability table
- Remove ad-hoc Dispose/DisposeAsync overrides - Add CleanupAsync override for connection/channel teardown - Handle OperationCanceledException in OnMessageAsync for message requeue - Depends on FoundatioFx/Foundatio#492
- Move cleanup into CleanupAsync override - Fix ProcessMessageAsync to return bool for conditional message deletion - Remove ad-hoc Dispose/DisposeAsync/BeginDispose - Depends on FoundatioFx/Foundatio#492
- Add ShutdownAsync override for StopProcessingAsync - Add CleanupAsync override for processor/sender/client disposal - Handle OperationCanceledException in OnMessageAsync - Remove ad-hoc Dispose/DisposeAsync - Depends on FoundatioFx/Foundatio#492
- Add CleanupAsync override for listener task, producer flush/dispose - Handle OperationCanceledException in OnMessageAsync to skip offset commit - Guard offset commit with IsDisposed check - Remove ad-hoc Dispose override - Depends on FoundatioFx/Foundatio#492
- Move unsubscribe logic to CleanupAsync override - Handle OperationCanceledException in OnMessage - Remove ad-hoc Dispose cleanup - Depends on FoundatioFx/Foundatio#492
There was a problem hiding this comment.
Pull request overview
Implements an async-friendly, two-phase disposal lifecycle for message buses to better control shutdown/drain behavior and standardize cleanup across providers and tests.
Changes:
- Extend
IMessageBusto includeIAsyncDisposable, and implementDisposeAsyncacross core message bus types. - Add two-phase disposal hooks (
ShutdownAsyncthenCleanupAsync) inMessageBusBase, plus a disposal guard during subscriber dispatch. - Add shared lifecycle tests in
MessageBusTestBaseand wire them intoInMemoryMessageBusTests; update messaging docs with the new lifecycle and provider behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/Foundatio.Tests/Messaging/InMemoryMessageBusTests.cs | Exposes new lifecycle tests for the in-memory provider via [Fact] overrides. |
| src/Foundatio/Messaging/NullMessageBus.cs | Adds DisposeAsync to satisfy the updated interface contract. |
| src/Foundatio/Messaging/MessageBusBase.cs | Adds IAsyncDisposable, two-phase disposal hooks, and disposal-time subscriber dispatch guard. |
| src/Foundatio/Messaging/InMemoryMessageBus.cs | Overrides DisposeAsync to clear message counts before delegating to base disposal. |
| src/Foundatio/Messaging/IMessageBus.cs | Updates the message bus interface to require IAsyncDisposable. |
| src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs | Adds shared async-disposal lifecycle tests for all message bus implementations. |
| docs/guide/messaging.md | Documents async disposal, two-phase lifecycle, and shutdown durability expectations per provider. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…append CleanupMessageBusAsync
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
src/Foundatio/Messaging/MessageBusBase.cs:263
- In
SubscribeImplAsync, the cancellation callback is registered before the subscriber is added to_subscribers. If the token is canceled concurrently (afterThrowIfCancellationRequestedinSubscribeAsyncbut before/while registering), the callback can run immediately, fail to remove (not yet added), and then the subsequentTryAddleaves a canceled subscriber in the dictionary until the next message dispatch. Consider adding the subscriber to_subscribersbefore registering, or re-checkingcancellationToken.IsCancellationRequestedafterTryAddand removing immediately if it’s already canceled.
protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken) where T : class
{
var subscriber = new Subscriber
{
CancellationToken = cancellationToken,
Type = typeof(T),
Action = (message, token) =>
{
if (message is T typedMessage)
return handler(typedMessage, token);
if (message is null)
{
_logger.LogWarning("Subscriber action skipped: message body is null (likely a deserialization failure) for subscriber type {SubscriberType}", typeof(T));
return Task.CompletedTask;
}
_logger.LogTrace("Unable to call subscriber action: {MessageType} cannot be safely casted to {SubscriberType}", message.GetType(), typeof(T));
return Task.CompletedTask;
}
};
if (cancellationToken != CancellationToken.None)
{
// CancellationToken.Register only accepts synchronous callbacks, so we cannot safely
// call RemoveTopicSubscriptionAsync here. The async-capable CancelAsync was added in
// .NET 8 but still does not support async callbacks — see:
// https://github.com/dotnet/runtime/issues/31315
// Topic subscription teardown is handled during DisposeAsync via ShutdownAsync/CleanupAsync.
cancellationToken.Register(() =>
{
_subscribers.TryRemove(subscriber.Id, out _);
});
}
if (subscriber.Type.Name == "IMessage`1" && subscriber.Type.GenericTypeArguments.Length == 1)
{
var modelType = subscriber.Type.GenericTypeArguments.Single();
subscriber.GenericType = typeof(Message<>).MakeGenericType(modelType);
}
if (!_subscribers.TryAdd(subscriber.Id, subscriber))
_logger.LogError("Unable to add subscriber {SubscriberId}", subscriber.Id);
return Task.CompletedTask;
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| var countdown = new AsyncCountdownEvent(1); | ||
|
|
||
| await messageBus.SubscribeAsync<SimpleMessageA>(msg => | ||
| { | ||
| Assert.Equal("Hello", msg.Data); | ||
| countdown.Signal(); | ||
| }, cts.Token); | ||
|
|
||
| await cts.CancelAsync(); | ||
| await Task.Delay(100, TestCancellationToken); | ||
|
|
||
| var countdown2 = new AsyncCountdownEvent(1); | ||
| await messageBus.SubscribeAsync<SimpleMessageA>(msg => | ||
| { | ||
| Assert.Equal("Hello", msg.Data); | ||
| countdown2.Signal(); | ||
| }, TestCancellationToken); | ||
|
|
||
| await messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" }, cancellationToken: TestCancellationToken); | ||
| await countdown2.WaitAsync(TimeSpan.FromSeconds(5)); | ||
| Assert.Equal(0, countdown2.CurrentCount); |
There was a problem hiding this comment.
SubscribeAsync_CancelledToken_DoesNotTearDownInfrastructureAsync doesn’t actually assert that infrastructure teardown didn’t occur; it only verifies that subscribing/publishing still works afterwards (which would also pass if infrastructure was torn down and recreated). Also, the Task.Delay(100) introduces timing-based flakiness. Consider asserting directly that the canceled subscriber is not invoked after cancellation (e.g., publish after cancel and assert the first countdown is unchanged), and replace the fixed delay with a deterministic wait (signal from the handler, polling with timeout, etc.).
| var countdown = new AsyncCountdownEvent(1); | |
| await messageBus.SubscribeAsync<SimpleMessageA>(msg => | |
| { | |
| Assert.Equal("Hello", msg.Data); | |
| countdown.Signal(); | |
| }, cts.Token); | |
| await cts.CancelAsync(); | |
| await Task.Delay(100, TestCancellationToken); | |
| var countdown2 = new AsyncCountdownEvent(1); | |
| await messageBus.SubscribeAsync<SimpleMessageA>(msg => | |
| { | |
| Assert.Equal("Hello", msg.Data); | |
| countdown2.Signal(); | |
| }, TestCancellationToken); | |
| await messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" }, cancellationToken: TestCancellationToken); | |
| await countdown2.WaitAsync(TimeSpan.FromSeconds(5)); | |
| Assert.Equal(0, countdown2.CurrentCount); | |
| long cancelledSubscriberInvocations = 0; | |
| await messageBus.SubscribeAsync<SimpleMessageA>(msg => | |
| { | |
| Assert.Equal("Hello", msg.Data); | |
| Interlocked.Increment(ref cancelledSubscriberInvocations); | |
| }, cts.Token); | |
| await cts.CancelAsync(); | |
| var activeSubscriberCountdown = new AsyncCountdownEvent(1); | |
| await messageBus.SubscribeAsync<SimpleMessageA>(msg => | |
| { | |
| Assert.Equal("Hello", msg.Data); | |
| activeSubscriberCountdown.Signal(); | |
| }, TestCancellationToken); | |
| await messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" }, cancellationToken: TestCancellationToken); | |
| await activeSubscriberCountdown.WaitAsync(TimeSpan.FromSeconds(5)); | |
| Assert.Equal(0, activeSubscriberCountdown.CurrentCount); | |
| Assert.Equal(0, Interlocked.Read(ref cancelledSubscriberInvocations)); |
There was a problem hiding this comment.
Acknowledged. The test verifies that a cancelled subscription token does not tear down the bus infrastructure by confirming that re-subscribing and publishing still work. This is a reasonable integration-level assertion for the behavior being tested. The Task.Delay is used to give the cancellation time to propagate — we can replace with a more deterministic approach in a follow-up if flakiness is observed.
| catch (Exception ex) | ||
| { | ||
| _logger.LogError(ex, "Error during shutdown for {MessageBusId}: {Message}", MessageBusId, ex.Message); | ||
| } |
| catch (Exception ex) | ||
| { | ||
| _logger.LogError(ex, "Error during cleanup for {MessageBusId}: {Message}", MessageBusId, ex.Message); | ||
| } |
| catch (Exception ex) | ||
| { | ||
| _logger.LogError(ex, "Error during shutdown for {MessageBusId}: {Message}", MessageBusId, ex.Message); | ||
| } |
| catch (Exception ex) | ||
| { | ||
| _logger.LogError(ex, "Error during cleanup for {MessageBusId}: {Message}", MessageBusId, ex.Message); | ||
| } |
* fix: adapt RabbitMQ MessageBus to two-phase dispose lifecycle - Remove ad-hoc Dispose/DisposeAsync overrides - Add CleanupAsync override for connection/channel teardown - Handle OperationCanceledException in OnMessageAsync for message requeue - Depends on FoundatioFx/Foundatio#492 * Update CanDisposeWithNoSubscribersOrPublishers to async * Order MessageBus lifecycle test overrides alphabetically * Address PR feedback: remove dead sync ClosePublisher/SubscriberConnection methods * Update Foundatio dependencies to 13.0.0-beta5.2 * Address PR feedback: restore RemoveTopicSubscriptionAsync override for subscriber connection close
* fix: adapt SQS MessageBus to two-phase dispose lifecycle - Move cleanup into CleanupAsync override - Fix ProcessMessageAsync to return bool for conditional message deletion - Remove ad-hoc Dispose/DisposeAsync/BeginDispose - Depends on FoundatioFx/Foundatio#492 * Update CanDisposeWithNoSubscribersOrPublishers to async * Order MessageBus lifecycle test overrides alphabetically * Address PR feedback: add trace logging, widen exception catch, fix ProcessMessageAsync return * Update Foundatio dependencies to 13.0.0-beta5.2 * Fix code formatting: expand single-line try/catch blocks to multi-line * Restore RemoveTopicSubscriptionAsync for two-phase dispose lifecycle
#77) * fix: adapt Azure Service Bus MessageBus to two-phase dispose lifecycle - Add ShutdownAsync override for StopProcessingAsync - Add CleanupAsync override for processor/sender/client disposal - Handle OperationCanceledException in OnMessageAsync - Remove ad-hoc Dispose/DisposeAsync - Depends on FoundatioFx/Foundatio#492 * Update CanDisposeWithNoSubscribersOrPublishers to async * Order MessageBus lifecycle test overrides alphabetically * Address PR feedback: rethrow OCE to prevent message loss, fix constant-condition warnings * Update Foundatio dependencies to 13.0.0-beta5.2 * Address PR feedback: restore comments, move to RemoveTopicSubscriptionAsync, add pre-lock null check * Add pre-lock null checks in CleanupAsync to avoid unnecessary lock acquisition
* fix: adapt Kafka MessageBus to two-phase dispose lifecycle - Add CleanupAsync override for listener task, producer flush/dispose - Handle OperationCanceledException in OnMessageAsync to skip offset commit - Guard offset commit with IsDisposed check - Remove ad-hoc Dispose override - Depends on FoundatioFx/Foundatio#492 * Update CanDisposeWithNoSubscribersOrPublishers to async * Order MessageBus lifecycle test overrides alphabetically * Address PR feedback: fix offset commit after OCE, add disposal filter, harden CleanupAsync * Update Foundatio dependencies to 13.0.0-beta5.2 * Fix DeleteTopicAsync disposal guard and cancellation support * Address PR feedback: restore removed comment blocks, fix single-line catch formatting in test * Commit offset when no subscribers and add comment to OperationCanceledException catch * Add comments explaining Kafka offset acknowledgment logic * Fix: do not acknowledge messages when no subscribers to preserve Kafka redelivery
* fix: adapt Redis MessageBus to two-phase dispose lifecycle - Move unsubscribe logic to CleanupAsync override - Handle OperationCanceledException in OnMessage - Remove ad-hoc Dispose cleanup - Depends on FoundatioFx/Foundatio#492 * Update CanDisposeWithNoSubscribersOrPublishers to async * Order MessageBus lifecycle test overrides alphabetically * Address PR feedback: add IsDisposed filter to OCE catch * Fix CleanupAsync race by checking _channelMessageQueue instead of _isSubscribed * Update Foundatio dependencies to 13.0.0-beta5.2 * Fix Foundatio package version: 3.0.0-beta5.2 -> 13.0.0-beta5.2 * Address PR feedback: restore removed comment, add _isSubscribed pre-lock check in CleanupAsync * Fix CleanupAsync to always reset _isSubscribed inside lock
Summary
Test plan
Related Provider PRs