diff --git a/src/Aevatar.CQRS.Projection.Core/AssemblyInfo.cs b/src/Aevatar.CQRS.Projection.Core/AssemblyInfo.cs new file mode 100644 index 000000000..b18cf9646 --- /dev/null +++ b/src/Aevatar.CQRS.Projection.Core/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Aevatar.CQRS.Projection.Core.Tests")] diff --git a/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionObservationFailurePolicy.cs b/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionObservationFailurePolicy.cs new file mode 100644 index 000000000..2a1c71dca --- /dev/null +++ b/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionObservationFailurePolicy.cs @@ -0,0 +1,22 @@ +using Aevatar.Foundation.Abstractions.Persistence; + +namespace Aevatar.CQRS.Projection.Core.Orchestration; + +internal static class ProjectionObservationFailurePolicy +{ + public static bool ShouldPropagate(Exception exception) + { + ArgumentNullException.ThrowIfNull(exception); + + return exception switch + { + EventStoreOptimisticConcurrencyException => true, + ProjectionDispatchAggregateException aggregate => + aggregate.Failures.Any(static failure => ShouldPropagate(failure.Exception)), + AggregateException aggregate => + aggregate.InnerExceptions.Any(ShouldPropagate), + _ when exception.InnerException is not null => ShouldPropagate(exception.InnerException), + _ => false, + }; + } +} diff --git a/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeGAgentBase.cs b/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeGAgentBase.cs index 704042f1b..aee045b93 100644 --- a/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeGAgentBase.cs +++ b/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeGAgentBase.cs @@ -124,6 +124,17 @@ public async Task HandleObservedEnvelopeAsync(EventEnvelope envelope) } catch (Exception ex) { + if (ProjectionObservationFailurePolicy.ShouldPropagate(ex)) + { + _logger.LogWarning( + ex, + "Projection scope observation handling hit a retryable failure. actorId={ActorId} projectionKind={ProjectionKind} sessionId={SessionId}", + Id, + State.ProjectionKind, + State.SessionId); + throw; + } + _logger.LogWarning( ex, "Projection scope observation handling failed. actorId={ActorId} projectionKind={ProjectionKind} sessionId={SessionId}", diff --git a/src/Aevatar.Foundation.Abstractions/Persistence/EventStoreOptimisticConcurrencyException.cs b/src/Aevatar.Foundation.Abstractions/Persistence/EventStoreOptimisticConcurrencyException.cs new file mode 100644 index 000000000..5282b0220 --- /dev/null +++ b/src/Aevatar.Foundation.Abstractions/Persistence/EventStoreOptimisticConcurrencyException.cs @@ -0,0 +1,24 @@ +namespace Aevatar.Foundation.Abstractions.Persistence; + +/// +/// Raised when an event-store append observes a different stream version than the caller expected. +/// +public sealed class EventStoreOptimisticConcurrencyException : InvalidOperationException +{ + public EventStoreOptimisticConcurrencyException( + string agentId, + long expectedVersion, + long actualVersion) + : base($"Optimistic concurrency conflict: expected {expectedVersion}, actual {actualVersion}") + { + AgentId = agentId ?? string.Empty; + ExpectedVersion = expectedVersion; + ActualVersion = actualVersion; + } + + public string AgentId { get; } + + public long ExpectedVersion { get; } + + public long ActualVersion { get; } +} diff --git a/src/Aevatar.Foundation.Runtime.Persistence.Implementations.Garnet/GarnetEventStore.cs b/src/Aevatar.Foundation.Runtime.Persistence.Implementations.Garnet/GarnetEventStore.cs index 0588bb17a..7549ea1aa 100644 --- a/src/Aevatar.Foundation.Runtime.Persistence.Implementations.Garnet/GarnetEventStore.cs +++ b/src/Aevatar.Foundation.Runtime.Persistence.Implementations.Garnet/GarnetEventStore.cs @@ -125,8 +125,10 @@ public async Task AppendAsync( var actualVersion = (long)result[1]; if (status == 0) { - throw new InvalidOperationException( - $"Optimistic concurrency conflict: expected {expectedVersion}, actual {actualVersion}"); + throw new EventStoreOptimisticConcurrencyException( + agentId, + expectedVersion, + actualVersion); } if (status != 1) diff --git a/src/Aevatar.Foundation.Runtime/Persistence/FileEventStore.cs b/src/Aevatar.Foundation.Runtime/Persistence/FileEventStore.cs index f839f8735..b978e30bd 100644 --- a/src/Aevatar.Foundation.Runtime/Persistence/FileEventStore.cs +++ b/src/Aevatar.Foundation.Runtime/Persistence/FileEventStore.cs @@ -73,8 +73,10 @@ public async Task AppendAsync( var currentVersion = stream.CurrentVersion; if (currentVersion != expectedVersion) { - throw new InvalidOperationException( - $"Optimistic concurrency conflict: expected {expectedVersion}, actual {currentVersion}"); + throw new EventStoreOptimisticConcurrencyException( + agentId, + expectedVersion, + currentVersion); } stream.Events.AddRange(pendingEvents); diff --git a/src/Aevatar.Foundation.Runtime/Persistence/InMemoryEventStore.cs b/src/Aevatar.Foundation.Runtime/Persistence/InMemoryEventStore.cs index 9be47da81..035bb39bb 100644 --- a/src/Aevatar.Foundation.Runtime/Persistence/InMemoryEventStore.cs +++ b/src/Aevatar.Foundation.Runtime/Persistence/InMemoryEventStore.cs @@ -36,7 +36,12 @@ public Task AppendAsync(string agentId, IEnumerable new EventStreamState()); var current = stream.CurrentVersion; if (current != expectedVersion) - throw new InvalidOperationException($"Optimistic concurrency conflict: expected {expectedVersion}, actual {current}"); + { + throw new EventStoreOptimisticConcurrencyException( + agentId, + expectedVersion, + current); + } var eventList = events.ToList(); stream.Events.AddRange(eventList.Select(static x => x.Clone())); if (eventList.Count > 0) diff --git a/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs new file mode 100644 index 000000000..dbccb6514 --- /dev/null +++ b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs @@ -0,0 +1,89 @@ +using Aevatar.CQRS.Projection.Core.Orchestration; +using Aevatar.Foundation.Abstractions.Persistence; +using FluentAssertions; + +namespace Aevatar.CQRS.Projection.Core.Tests; + +public sealed class ProjectionObservationFailurePolicyTests +{ + [Fact] + public void ShouldPropagate_ShouldReturnTrue_ForOptimisticConcurrencyException() + { + var exception = new EventStoreOptimisticConcurrencyException("actor-1", 4, 5); + + ProjectionObservationFailurePolicy.ShouldPropagate(exception).Should().BeTrue(); + } + + [Fact] + public void ShouldPropagate_ShouldReturnTrue_ForProjectionDispatchAggregateContainingOptimisticConcurrencyException() + { + var aggregate = new ProjectionDispatchAggregateException( + [ + new ProjectionDispatchFailure( + "projector", + 1, + new EventStoreOptimisticConcurrencyException("actor-2", 7, 8)), + ]); + + ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeTrue(); + } + + [Fact] + public void ShouldPropagate_ShouldReturnFalse_ForDeterministicProjectionFailure() + { + var aggregate = new ProjectionDispatchAggregateException( + [ + new ProjectionDispatchFailure( + "projector", + 1, + new InvalidOperationException("projection failed")), + ]); + + ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeFalse(); + } + + [Fact] + public void ShouldPropagate_ShouldThrow_ForNullException() + { + Action act = () => ProjectionObservationFailurePolicy.ShouldPropagate(null!); + + act.Should().Throw().And.ParamName.Should().Be("exception"); + } + + [Fact] + public void ShouldPropagate_ShouldReturnTrue_ForAggregateExceptionContainingOptimisticConcurrencyException() + { + var aggregate = new AggregateException( + new InvalidOperationException("unrelated"), + new EventStoreOptimisticConcurrencyException("actor-3", 1, 2)); + + ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeTrue(); + } + + [Fact] + public void ShouldPropagate_ShouldReturnFalse_ForAggregateExceptionWithOnlyDeterministicFailures() + { + var aggregate = new AggregateException(new InvalidOperationException("unrelated")); + + ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeFalse(); + } + + [Fact] + public void ShouldPropagate_ShouldUnwrap_InnerExceptionChain() + { + var wrapped = new InvalidOperationException( + "outer", + new InvalidOperationException( + "middle", + new EventStoreOptimisticConcurrencyException("actor-4", 3, 4))); + + ProjectionObservationFailurePolicy.ShouldPropagate(wrapped).Should().BeTrue(); + } + + [Fact] + public void ShouldPropagate_ShouldReturnFalse_ForDeterministicExceptionWithoutInner() + { + ProjectionObservationFailurePolicy.ShouldPropagate(new InvalidOperationException("boom")) + .Should().BeFalse(); + } +} diff --git a/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionScopeGAgentBaseTests.cs b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionScopeGAgentBaseTests.cs new file mode 100644 index 000000000..0f2809d71 --- /dev/null +++ b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionScopeGAgentBaseTests.cs @@ -0,0 +1,105 @@ +using System.Reflection; +using Aevatar.CQRS.Projection.Core.Abstractions; +using Aevatar.CQRS.Projection.Core.Orchestration; +using Aevatar.Foundation.Abstractions; +using Aevatar.Foundation.Abstractions.Persistence; +using Aevatar.Foundation.Abstractions.Streaming; +using Aevatar.Foundation.Core; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; + +namespace Aevatar.CQRS.Projection.Core.Tests; + +public sealed class ProjectionScopeGAgentBaseTests +{ + [Fact] + public async Task HandleObservedEnvelopeAsync_ShouldPropagate_RetryableOptimisticConcurrencyException() + { + var agent = BuildActivatedAgent( + scopeId: "projection-scope-retry", + onProcess: _ => throw new EventStoreOptimisticConcurrencyException("root-1", 3, 4)); + + var envelope = BuildForwardedObserverEnvelope(targetStreamId: "projection-scope-retry"); + + Func act = () => agent.HandleObservedEnvelopeAsync(envelope); + + await act.Should().ThrowAsync(); + } + + [Fact] + public async Task HandleObservedEnvelopeAsync_ShouldSwallow_DeterministicProjectionFailure() + { + var agent = BuildActivatedAgent( + scopeId: "projection-scope-swallow", + onProcess: _ => throw new InvalidOperationException("deterministic boom")); + + var envelope = BuildForwardedObserverEnvelope(targetStreamId: "projection-scope-swallow"); + + Func act = () => agent.HandleObservedEnvelopeAsync(envelope); + + await act.Should().NotThrowAsync(); + } + + private static TestScopeAgent BuildActivatedAgent( + string scopeId, + Func onProcess) + { + var agent = new TestScopeAgent(onProcess); + + typeof(GAgentBase) + .GetProperty(nameof(GAgentBase.Id), BindingFlags.Instance | BindingFlags.Public)! + .SetValue(agent, scopeId); + + agent.State.RootActorId = "root-actor"; + agent.State.ProjectionKind = "test-kind"; + agent.State.SessionId = "session-1"; + agent.State.Active = true; + agent.State.Released = false; + + var services = new ServiceCollection(); + services.AddSingleton>( + static _ => new TestContext("root-actor", "test-kind")); + agent.Services = services.BuildServiceProvider(); + + return agent; + } + + private static EventEnvelope BuildForwardedObserverEnvelope(string targetStreamId) + { + var original = new EventEnvelope + { + Id = Guid.NewGuid().ToString("N"), + Route = EnvelopeRouteSemantics.CreateObserverPublication("publisher-actor"), + }; + + return StreamForwardingRules.BuildForwardedEnvelope( + original, + sourceStreamId: "publisher-actor", + targetStreamId: targetStreamId, + StreamForwardingMode.HandleThenForward); + } + + private sealed class TestScopeAgent : ProjectionScopeGAgentBase + { + private readonly Func _onProcess; + + public TestScopeAgent(Func onProcess) + { + _onProcess = onProcess; + } + + protected override ProjectionRuntimeMode RuntimeMode => + ProjectionRuntimeMode.DurableMaterialization; + + protected override ValueTask ProcessObservationCoreAsync( + TestContext context, + EventEnvelope envelope, + CancellationToken ct) + { + return ValueTask.FromResult(_onProcess(envelope)); + } + } + + private sealed record TestContext(string RootActorId, string ProjectionKind) + : IProjectionMaterializationContext; +} diff --git a/test/Aevatar.Foundation.Abstractions.Tests/EventStoreOptimisticConcurrencyExceptionTests.cs b/test/Aevatar.Foundation.Abstractions.Tests/EventStoreOptimisticConcurrencyExceptionTests.cs new file mode 100644 index 000000000..b3074df5f --- /dev/null +++ b/test/Aevatar.Foundation.Abstractions.Tests/EventStoreOptimisticConcurrencyExceptionTests.cs @@ -0,0 +1,29 @@ +using Aevatar.Foundation.Abstractions.Persistence; +using Shouldly; + +namespace Aevatar.Foundation.Abstractions.Tests; + +public class EventStoreOptimisticConcurrencyExceptionTests +{ + [Fact] + public void Constructor_ShouldCapturePropertiesAndFormatMessage() + { + var exception = new EventStoreOptimisticConcurrencyException("agent-1", expectedVersion: 4, actualVersion: 7); + + exception.AgentId.ShouldBe("agent-1"); + exception.ExpectedVersion.ShouldBe(4); + exception.ActualVersion.ShouldBe(7); + exception.Message.ShouldBe("Optimistic concurrency conflict: expected 4, actual 7"); + exception.ShouldBeAssignableTo(); + } + + [Fact] + public void Constructor_ShouldDefaultAgentIdToEmpty_WhenNullProvided() + { + var exception = new EventStoreOptimisticConcurrencyException(agentId: null!, expectedVersion: 1, actualVersion: 2); + + exception.AgentId.ShouldBe(string.Empty); + exception.ExpectedVersion.ShouldBe(1); + exception.ActualVersion.ShouldBe(2); + } +}