From da69364cfadbb4062b034951efc8ae5e63635a93 Mon Sep 17 00:00:00 2001 From: eanzhao Date: Fri, 24 Apr 2026 11:25:54 +0800 Subject: [PATCH 1/2] Propagate event-store OCC so projection scope can retry ProjectionScopeGAgentBase previously caught every dispatch failure and logged a warning, so transient optimistic-concurrency conflicts on the scope's own event stream were silently swallowed. The watermark-advanced event never landed, the scope sat on a stale version, and downstream consumers observed gaps with no retry. Replace the generic InvalidOperationException raised by the three event-store implementations (Garnet, File, InMemory) with a typed EventStoreOptimisticConcurrencyException carrying agentId / expected / actual versions, then let the projection scope propagate that class (directly or unwrapped from ProjectionDispatchAggregateException / AggregateException) so the actor runtime reloads state and retries with the correct version. Deterministic failures keep the prior swallow semantics to avoid retry loops on projector bugs. The new exception subclasses InvalidOperationException and preserves the original message format, so existing consumers (EventSourcingBehavior's catch-all, GarnetEventStoreIntegrationTests' message-pattern assertion) keep working without changes. Validation: - dotnet test test/Aevatar.CQRS.Projection.Core.Tests (99 pass, 1 skip) - dotnet test test/Aevatar.Foundation.Core.Tests (159 pass) - dotnet test test/Aevatar.Foundation.Runtime.Hosting.Tests --filter GarnetEventStore (1 pass, 2 skip - require Garnet server) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../AssemblyInfo.cs | 3 ++ .../ProjectionObservationFailurePolicy.cs | 22 ++++++++++ .../ProjectionScopeGAgentBase.cs | 11 +++++ ...ventStoreOptimisticConcurrencyException.cs | 24 ++++++++++ .../GarnetEventStore.cs | 6 ++- .../Persistence/FileEventStore.cs | 6 ++- .../Persistence/InMemoryEventStore.cs | 7 ++- ...ProjectionObservationFailurePolicyTests.cs | 44 +++++++++++++++++++ 8 files changed, 118 insertions(+), 5 deletions(-) create mode 100644 src/Aevatar.CQRS.Projection.Core/AssemblyInfo.cs create mode 100644 src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionObservationFailurePolicy.cs create mode 100644 src/Aevatar.Foundation.Abstractions/Persistence/EventStoreOptimisticConcurrencyException.cs create mode 100644 test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs diff --git a/src/Aevatar.CQRS.Projection.Core/AssemblyInfo.cs b/src/Aevatar.CQRS.Projection.Core/AssemblyInfo.cs new file mode 100644 index 000000000..b18cf9646 --- /dev/null +++ b/src/Aevatar.CQRS.Projection.Core/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Aevatar.CQRS.Projection.Core.Tests")] diff --git a/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionObservationFailurePolicy.cs b/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionObservationFailurePolicy.cs new file mode 100644 index 000000000..2a1c71dca --- /dev/null +++ b/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionObservationFailurePolicy.cs @@ -0,0 +1,22 @@ +using Aevatar.Foundation.Abstractions.Persistence; + +namespace Aevatar.CQRS.Projection.Core.Orchestration; + +internal static class ProjectionObservationFailurePolicy +{ + public static bool ShouldPropagate(Exception exception) + { + ArgumentNullException.ThrowIfNull(exception); + + return exception switch + { + EventStoreOptimisticConcurrencyException => true, + ProjectionDispatchAggregateException aggregate => + aggregate.Failures.Any(static failure => ShouldPropagate(failure.Exception)), + AggregateException aggregate => + aggregate.InnerExceptions.Any(ShouldPropagate), + _ when exception.InnerException is not null => ShouldPropagate(exception.InnerException), + _ => false, + }; + } +} diff --git a/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeGAgentBase.cs b/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeGAgentBase.cs index 704042f1b..aee045b93 100644 --- a/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeGAgentBase.cs +++ b/src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeGAgentBase.cs @@ -124,6 +124,17 @@ public async Task HandleObservedEnvelopeAsync(EventEnvelope envelope) } catch (Exception ex) { + if (ProjectionObservationFailurePolicy.ShouldPropagate(ex)) + { + _logger.LogWarning( + ex, + "Projection scope observation handling hit a retryable failure. actorId={ActorId} projectionKind={ProjectionKind} sessionId={SessionId}", + Id, + State.ProjectionKind, + State.SessionId); + throw; + } + _logger.LogWarning( ex, "Projection scope observation handling failed. actorId={ActorId} projectionKind={ProjectionKind} sessionId={SessionId}", diff --git a/src/Aevatar.Foundation.Abstractions/Persistence/EventStoreOptimisticConcurrencyException.cs b/src/Aevatar.Foundation.Abstractions/Persistence/EventStoreOptimisticConcurrencyException.cs new file mode 100644 index 000000000..5282b0220 --- /dev/null +++ b/src/Aevatar.Foundation.Abstractions/Persistence/EventStoreOptimisticConcurrencyException.cs @@ -0,0 +1,24 @@ +namespace Aevatar.Foundation.Abstractions.Persistence; + +/// +/// Raised when an event-store append observes a different stream version than the caller expected. +/// +public sealed class EventStoreOptimisticConcurrencyException : InvalidOperationException +{ + public EventStoreOptimisticConcurrencyException( + string agentId, + long expectedVersion, + long actualVersion) + : base($"Optimistic concurrency conflict: expected {expectedVersion}, actual {actualVersion}") + { + AgentId = agentId ?? string.Empty; + ExpectedVersion = expectedVersion; + ActualVersion = actualVersion; + } + + public string AgentId { get; } + + public long ExpectedVersion { get; } + + public long ActualVersion { get; } +} diff --git a/src/Aevatar.Foundation.Runtime.Persistence.Implementations.Garnet/GarnetEventStore.cs b/src/Aevatar.Foundation.Runtime.Persistence.Implementations.Garnet/GarnetEventStore.cs index 0588bb17a..7549ea1aa 100644 --- a/src/Aevatar.Foundation.Runtime.Persistence.Implementations.Garnet/GarnetEventStore.cs +++ b/src/Aevatar.Foundation.Runtime.Persistence.Implementations.Garnet/GarnetEventStore.cs @@ -125,8 +125,10 @@ public async Task AppendAsync( var actualVersion = (long)result[1]; if (status == 0) { - throw new InvalidOperationException( - $"Optimistic concurrency conflict: expected {expectedVersion}, actual {actualVersion}"); + throw new EventStoreOptimisticConcurrencyException( + agentId, + expectedVersion, + actualVersion); } if (status != 1) diff --git a/src/Aevatar.Foundation.Runtime/Persistence/FileEventStore.cs b/src/Aevatar.Foundation.Runtime/Persistence/FileEventStore.cs index f839f8735..b978e30bd 100644 --- a/src/Aevatar.Foundation.Runtime/Persistence/FileEventStore.cs +++ b/src/Aevatar.Foundation.Runtime/Persistence/FileEventStore.cs @@ -73,8 +73,10 @@ public async Task AppendAsync( var currentVersion = stream.CurrentVersion; if (currentVersion != expectedVersion) { - throw new InvalidOperationException( - $"Optimistic concurrency conflict: expected {expectedVersion}, actual {currentVersion}"); + throw new EventStoreOptimisticConcurrencyException( + agentId, + expectedVersion, + currentVersion); } stream.Events.AddRange(pendingEvents); diff --git a/src/Aevatar.Foundation.Runtime/Persistence/InMemoryEventStore.cs b/src/Aevatar.Foundation.Runtime/Persistence/InMemoryEventStore.cs index 9be47da81..035bb39bb 100644 --- a/src/Aevatar.Foundation.Runtime/Persistence/InMemoryEventStore.cs +++ b/src/Aevatar.Foundation.Runtime/Persistence/InMemoryEventStore.cs @@ -36,7 +36,12 @@ public Task AppendAsync(string agentId, IEnumerable new EventStreamState()); var current = stream.CurrentVersion; if (current != expectedVersion) - throw new InvalidOperationException($"Optimistic concurrency conflict: expected {expectedVersion}, actual {current}"); + { + throw new EventStoreOptimisticConcurrencyException( + agentId, + expectedVersion, + current); + } var eventList = events.ToList(); stream.Events.AddRange(eventList.Select(static x => x.Clone())); if (eventList.Count > 0) diff --git a/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs new file mode 100644 index 000000000..06060438e --- /dev/null +++ b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs @@ -0,0 +1,44 @@ +using Aevatar.CQRS.Projection.Core.Orchestration; +using Aevatar.Foundation.Abstractions.Persistence; +using FluentAssertions; + +namespace Aevatar.CQRS.Projection.Core.Tests; + +public sealed class ProjectionObservationFailurePolicyTests +{ + [Fact] + public void ShouldPropagate_ShouldReturnTrue_ForOptimisticConcurrencyException() + { + var exception = new EventStoreOptimisticConcurrencyException("actor-1", 4, 5); + + ProjectionObservationFailurePolicy.ShouldPropagate(exception).Should().BeTrue(); + } + + [Fact] + public void ShouldPropagate_ShouldReturnTrue_ForProjectionDispatchAggregateContainingOptimisticConcurrencyException() + { + var aggregate = new ProjectionDispatchAggregateException( + [ + new ProjectionDispatchFailure( + "projector", + 1, + new EventStoreOptimisticConcurrencyException("actor-2", 7, 8)), + ]); + + ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeTrue(); + } + + [Fact] + public void ShouldPropagate_ShouldReturnFalse_ForDeterministicProjectionFailure() + { + var aggregate = new ProjectionDispatchAggregateException( + [ + new ProjectionDispatchFailure( + "projector", + 1, + new InvalidOperationException("projection failed")), + ]); + + ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeFalse(); + } +} From cf0dc2bd020ef5fc410c3412835625e0080d7983 Mon Sep 17 00:00:00 2001 From: eanzhao Date: Fri, 24 Apr 2026 11:56:50 +0800 Subject: [PATCH 2/2] test: cover OCC propagation paths for projection scope Patch coverage on #365 was 54.54% because the three new control flow additions were only exercised by a single happy-path test. Add focused unit tests for the retry-propagation branches: - EventStoreOptimisticConcurrencyException property capture and null agentId coalescing. - ProjectionObservationFailurePolicy null guard, generic AggregateException branch, inner-exception unwrap, and deterministic false path. - ProjectionScopeGAgentBase observation handler rethrows OCC via the retryable policy and swallows deterministic projection failures. --- ...ProjectionObservationFailurePolicyTests.cs | 45 ++++++++ .../ProjectionScopeGAgentBaseTests.cs | 105 ++++++++++++++++++ ...toreOptimisticConcurrencyExceptionTests.cs | 29 +++++ 3 files changed, 179 insertions(+) create mode 100644 test/Aevatar.CQRS.Projection.Core.Tests/ProjectionScopeGAgentBaseTests.cs create mode 100644 test/Aevatar.Foundation.Abstractions.Tests/EventStoreOptimisticConcurrencyExceptionTests.cs diff --git a/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs index 06060438e..dbccb6514 100644 --- a/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs +++ b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionObservationFailurePolicyTests.cs @@ -41,4 +41,49 @@ public void ShouldPropagate_ShouldReturnFalse_ForDeterministicProjectionFailure( ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeFalse(); } + + [Fact] + public void ShouldPropagate_ShouldThrow_ForNullException() + { + Action act = () => ProjectionObservationFailurePolicy.ShouldPropagate(null!); + + act.Should().Throw().And.ParamName.Should().Be("exception"); + } + + [Fact] + public void ShouldPropagate_ShouldReturnTrue_ForAggregateExceptionContainingOptimisticConcurrencyException() + { + var aggregate = new AggregateException( + new InvalidOperationException("unrelated"), + new EventStoreOptimisticConcurrencyException("actor-3", 1, 2)); + + ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeTrue(); + } + + [Fact] + public void ShouldPropagate_ShouldReturnFalse_ForAggregateExceptionWithOnlyDeterministicFailures() + { + var aggregate = new AggregateException(new InvalidOperationException("unrelated")); + + ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeFalse(); + } + + [Fact] + public void ShouldPropagate_ShouldUnwrap_InnerExceptionChain() + { + var wrapped = new InvalidOperationException( + "outer", + new InvalidOperationException( + "middle", + new EventStoreOptimisticConcurrencyException("actor-4", 3, 4))); + + ProjectionObservationFailurePolicy.ShouldPropagate(wrapped).Should().BeTrue(); + } + + [Fact] + public void ShouldPropagate_ShouldReturnFalse_ForDeterministicExceptionWithoutInner() + { + ProjectionObservationFailurePolicy.ShouldPropagate(new InvalidOperationException("boom")) + .Should().BeFalse(); + } } diff --git a/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionScopeGAgentBaseTests.cs b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionScopeGAgentBaseTests.cs new file mode 100644 index 000000000..0f2809d71 --- /dev/null +++ b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionScopeGAgentBaseTests.cs @@ -0,0 +1,105 @@ +using System.Reflection; +using Aevatar.CQRS.Projection.Core.Abstractions; +using Aevatar.CQRS.Projection.Core.Orchestration; +using Aevatar.Foundation.Abstractions; +using Aevatar.Foundation.Abstractions.Persistence; +using Aevatar.Foundation.Abstractions.Streaming; +using Aevatar.Foundation.Core; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; + +namespace Aevatar.CQRS.Projection.Core.Tests; + +public sealed class ProjectionScopeGAgentBaseTests +{ + [Fact] + public async Task HandleObservedEnvelopeAsync_ShouldPropagate_RetryableOptimisticConcurrencyException() + { + var agent = BuildActivatedAgent( + scopeId: "projection-scope-retry", + onProcess: _ => throw new EventStoreOptimisticConcurrencyException("root-1", 3, 4)); + + var envelope = BuildForwardedObserverEnvelope(targetStreamId: "projection-scope-retry"); + + Func act = () => agent.HandleObservedEnvelopeAsync(envelope); + + await act.Should().ThrowAsync(); + } + + [Fact] + public async Task HandleObservedEnvelopeAsync_ShouldSwallow_DeterministicProjectionFailure() + { + var agent = BuildActivatedAgent( + scopeId: "projection-scope-swallow", + onProcess: _ => throw new InvalidOperationException("deterministic boom")); + + var envelope = BuildForwardedObserverEnvelope(targetStreamId: "projection-scope-swallow"); + + Func act = () => agent.HandleObservedEnvelopeAsync(envelope); + + await act.Should().NotThrowAsync(); + } + + private static TestScopeAgent BuildActivatedAgent( + string scopeId, + Func onProcess) + { + var agent = new TestScopeAgent(onProcess); + + typeof(GAgentBase) + .GetProperty(nameof(GAgentBase.Id), BindingFlags.Instance | BindingFlags.Public)! + .SetValue(agent, scopeId); + + agent.State.RootActorId = "root-actor"; + agent.State.ProjectionKind = "test-kind"; + agent.State.SessionId = "session-1"; + agent.State.Active = true; + agent.State.Released = false; + + var services = new ServiceCollection(); + services.AddSingleton>( + static _ => new TestContext("root-actor", "test-kind")); + agent.Services = services.BuildServiceProvider(); + + return agent; + } + + private static EventEnvelope BuildForwardedObserverEnvelope(string targetStreamId) + { + var original = new EventEnvelope + { + Id = Guid.NewGuid().ToString("N"), + Route = EnvelopeRouteSemantics.CreateObserverPublication("publisher-actor"), + }; + + return StreamForwardingRules.BuildForwardedEnvelope( + original, + sourceStreamId: "publisher-actor", + targetStreamId: targetStreamId, + StreamForwardingMode.HandleThenForward); + } + + private sealed class TestScopeAgent : ProjectionScopeGAgentBase + { + private readonly Func _onProcess; + + public TestScopeAgent(Func onProcess) + { + _onProcess = onProcess; + } + + protected override ProjectionRuntimeMode RuntimeMode => + ProjectionRuntimeMode.DurableMaterialization; + + protected override ValueTask ProcessObservationCoreAsync( + TestContext context, + EventEnvelope envelope, + CancellationToken ct) + { + return ValueTask.FromResult(_onProcess(envelope)); + } + } + + private sealed record TestContext(string RootActorId, string ProjectionKind) + : IProjectionMaterializationContext; +} diff --git a/test/Aevatar.Foundation.Abstractions.Tests/EventStoreOptimisticConcurrencyExceptionTests.cs b/test/Aevatar.Foundation.Abstractions.Tests/EventStoreOptimisticConcurrencyExceptionTests.cs new file mode 100644 index 000000000..b3074df5f --- /dev/null +++ b/test/Aevatar.Foundation.Abstractions.Tests/EventStoreOptimisticConcurrencyExceptionTests.cs @@ -0,0 +1,29 @@ +using Aevatar.Foundation.Abstractions.Persistence; +using Shouldly; + +namespace Aevatar.Foundation.Abstractions.Tests; + +public class EventStoreOptimisticConcurrencyExceptionTests +{ + [Fact] + public void Constructor_ShouldCapturePropertiesAndFormatMessage() + { + var exception = new EventStoreOptimisticConcurrencyException("agent-1", expectedVersion: 4, actualVersion: 7); + + exception.AgentId.ShouldBe("agent-1"); + exception.ExpectedVersion.ShouldBe(4); + exception.ActualVersion.ShouldBe(7); + exception.Message.ShouldBe("Optimistic concurrency conflict: expected 4, actual 7"); + exception.ShouldBeAssignableTo(); + } + + [Fact] + public void Constructor_ShouldDefaultAgentIdToEmpty_WhenNullProvided() + { + var exception = new EventStoreOptimisticConcurrencyException(agentId: null!, expectedVersion: 1, actualVersion: 2); + + exception.AgentId.ShouldBe(string.Empty); + exception.ExpectedVersion.ShouldBe(1); + exception.ActualVersion.ShouldBe(2); + } +}