Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Aevatar.CQRS.Projection.Core/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Aevatar.CQRS.Projection.Core.Tests")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Aevatar.Foundation.Abstractions.Persistence;

namespace Aevatar.CQRS.Projection.Core.Orchestration;

internal static class ProjectionObservationFailurePolicy
{
public static bool ShouldPropagate(Exception exception)
{
ArgumentNullException.ThrowIfNull(exception);

return exception switch
{
EventStoreOptimisticConcurrencyException => true,
ProjectionDispatchAggregateException aggregate =>
aggregate.Failures.Any(static failure => ShouldPropagate(failure.Exception)),
AggregateException aggregate =>
aggregate.InnerExceptions.Any(ShouldPropagate),
_ when exception.InnerException is not null => ShouldPropagate(exception.InnerException),
_ => false,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ public async Task HandleObservedEnvelopeAsync(EventEnvelope envelope)
}
catch (Exception ex)
{
if (ProjectionObservationFailurePolicy.ShouldPropagate(ex))
{
_logger.LogWarning(
ex,
"Projection scope observation handling hit a retryable failure. actorId={ActorId} projectionKind={ProjectionKind} sessionId={SessionId}",
Id,
State.ProjectionKind,
State.SessionId);
throw;
}

_logger.LogWarning(
ex,
"Projection scope observation handling failed. actorId={ActorId} projectionKind={ProjectionKind} sessionId={SessionId}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Aevatar.Foundation.Abstractions.Persistence;

/// <summary>
/// Raised when an event-store append observes a different stream version than the caller expected.
/// </summary>
public sealed class EventStoreOptimisticConcurrencyException : InvalidOperationException
{
public EventStoreOptimisticConcurrencyException(
string agentId,
long expectedVersion,
long actualVersion)
: base($"Optimistic concurrency conflict: expected {expectedVersion}, actual {actualVersion}")
{
AgentId = agentId ?? string.Empty;
ExpectedVersion = expectedVersion;
ActualVersion = actualVersion;
}

public string AgentId { get; }

public long ExpectedVersion { get; }

public long ActualVersion { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ public async Task<EventStoreCommitResult> AppendAsync(
var actualVersion = (long)result[1];
if (status == 0)
{
throw new InvalidOperationException(
$"Optimistic concurrency conflict: expected {expectedVersion}, actual {actualVersion}");
throw new EventStoreOptimisticConcurrencyException(
agentId,
expectedVersion,
actualVersion);
}

if (status != 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ public async Task<EventStoreCommitResult> AppendAsync(
var currentVersion = stream.CurrentVersion;
if (currentVersion != expectedVersion)
{
throw new InvalidOperationException(
$"Optimistic concurrency conflict: expected {expectedVersion}, actual {currentVersion}");
throw new EventStoreOptimisticConcurrencyException(
agentId,
expectedVersion,
currentVersion);
}

stream.Events.AddRange(pendingEvents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ public Task<EventStoreCommitResult> AppendAsync(string agentId, IEnumerable<Stat
var stream = _store.GetOrAdd(agentId, _ => new EventStreamState());
var current = stream.CurrentVersion;
if (current != expectedVersion)
throw new InvalidOperationException($"Optimistic concurrency conflict: expected {expectedVersion}, actual {current}");
{
throw new EventStoreOptimisticConcurrencyException(
agentId,
expectedVersion,
current);
}
var eventList = events.ToList();
stream.Events.AddRange(eventList.Select(static x => x.Clone()));
if (eventList.Count > 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using Aevatar.CQRS.Projection.Core.Orchestration;
using Aevatar.Foundation.Abstractions.Persistence;
using FluentAssertions;

namespace Aevatar.CQRS.Projection.Core.Tests;

public sealed class ProjectionObservationFailurePolicyTests
{
[Fact]
public void ShouldPropagate_ShouldReturnTrue_ForOptimisticConcurrencyException()
{
var exception = new EventStoreOptimisticConcurrencyException("actor-1", 4, 5);

ProjectionObservationFailurePolicy.ShouldPropagate(exception).Should().BeTrue();
}

[Fact]
public void ShouldPropagate_ShouldReturnTrue_ForProjectionDispatchAggregateContainingOptimisticConcurrencyException()
{
var aggregate = new ProjectionDispatchAggregateException(
[
new ProjectionDispatchFailure(
"projector",
1,
new EventStoreOptimisticConcurrencyException("actor-2", 7, 8)),
]);

ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeTrue();
}

[Fact]
public void ShouldPropagate_ShouldReturnFalse_ForDeterministicProjectionFailure()
{
var aggregate = new ProjectionDispatchAggregateException(
[
new ProjectionDispatchFailure(
"projector",
1,
new InvalidOperationException("projection failed")),
]);

ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeFalse();
}

[Fact]
public void ShouldPropagate_ShouldThrow_ForNullException()
{
Action act = () => ProjectionObservationFailurePolicy.ShouldPropagate(null!);

act.Should().Throw<ArgumentNullException>().And.ParamName.Should().Be("exception");
}

[Fact]
public void ShouldPropagate_ShouldReturnTrue_ForAggregateExceptionContainingOptimisticConcurrencyException()
{
var aggregate = new AggregateException(
new InvalidOperationException("unrelated"),
new EventStoreOptimisticConcurrencyException("actor-3", 1, 2));

ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeTrue();
}

[Fact]
public void ShouldPropagate_ShouldReturnFalse_ForAggregateExceptionWithOnlyDeterministicFailures()
{
var aggregate = new AggregateException(new InvalidOperationException("unrelated"));

ProjectionObservationFailurePolicy.ShouldPropagate(aggregate).Should().BeFalse();
}

[Fact]
public void ShouldPropagate_ShouldUnwrap_InnerExceptionChain()
{
var wrapped = new InvalidOperationException(
"outer",
new InvalidOperationException(
"middle",
new EventStoreOptimisticConcurrencyException("actor-4", 3, 4)));

ProjectionObservationFailurePolicy.ShouldPropagate(wrapped).Should().BeTrue();
}

[Fact]
public void ShouldPropagate_ShouldReturnFalse_ForDeterministicExceptionWithoutInner()
{
ProjectionObservationFailurePolicy.ShouldPropagate(new InvalidOperationException("boom"))
.Should().BeFalse();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using System.Reflection;
using Aevatar.CQRS.Projection.Core.Abstractions;
using Aevatar.CQRS.Projection.Core.Orchestration;
using Aevatar.Foundation.Abstractions;
using Aevatar.Foundation.Abstractions.Persistence;
using Aevatar.Foundation.Abstractions.Streaming;
using Aevatar.Foundation.Core;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;

namespace Aevatar.CQRS.Projection.Core.Tests;

public sealed class ProjectionScopeGAgentBaseTests
{
[Fact]
public async Task HandleObservedEnvelopeAsync_ShouldPropagate_RetryableOptimisticConcurrencyException()
{
var agent = BuildActivatedAgent(
scopeId: "projection-scope-retry",
onProcess: _ => throw new EventStoreOptimisticConcurrencyException("root-1", 3, 4));

var envelope = BuildForwardedObserverEnvelope(targetStreamId: "projection-scope-retry");

Func<Task> act = () => agent.HandleObservedEnvelopeAsync(envelope);

await act.Should().ThrowAsync<EventStoreOptimisticConcurrencyException>();
}

[Fact]
public async Task HandleObservedEnvelopeAsync_ShouldSwallow_DeterministicProjectionFailure()
{
var agent = BuildActivatedAgent(
scopeId: "projection-scope-swallow",
onProcess: _ => throw new InvalidOperationException("deterministic boom"));

var envelope = BuildForwardedObserverEnvelope(targetStreamId: "projection-scope-swallow");

Func<Task> act = () => agent.HandleObservedEnvelopeAsync(envelope);

await act.Should().NotThrowAsync();
}

private static TestScopeAgent BuildActivatedAgent(
string scopeId,
Func<EventEnvelope, ProjectionScopeDispatchResult> onProcess)
{
var agent = new TestScopeAgent(onProcess);

typeof(GAgentBase)
.GetProperty(nameof(GAgentBase.Id), BindingFlags.Instance | BindingFlags.Public)!
.SetValue(agent, scopeId);

agent.State.RootActorId = "root-actor";
agent.State.ProjectionKind = "test-kind";
agent.State.SessionId = "session-1";
agent.State.Active = true;
agent.State.Released = false;

var services = new ServiceCollection();
services.AddSingleton<Func<ProjectionRuntimeScopeKey, TestContext>>(
static _ => new TestContext("root-actor", "test-kind"));
agent.Services = services.BuildServiceProvider();

return agent;
}

private static EventEnvelope BuildForwardedObserverEnvelope(string targetStreamId)
{
var original = new EventEnvelope
{
Id = Guid.NewGuid().ToString("N"),
Route = EnvelopeRouteSemantics.CreateObserverPublication("publisher-actor"),
};

return StreamForwardingRules.BuildForwardedEnvelope(
original,
sourceStreamId: "publisher-actor",
targetStreamId: targetStreamId,
StreamForwardingMode.HandleThenForward);
}

private sealed class TestScopeAgent : ProjectionScopeGAgentBase<TestContext>
{
private readonly Func<EventEnvelope, ProjectionScopeDispatchResult> _onProcess;

public TestScopeAgent(Func<EventEnvelope, ProjectionScopeDispatchResult> onProcess)
{
_onProcess = onProcess;
}

protected override ProjectionRuntimeMode RuntimeMode =>
ProjectionRuntimeMode.DurableMaterialization;

protected override ValueTask<ProjectionScopeDispatchResult> ProcessObservationCoreAsync(
TestContext context,
EventEnvelope envelope,
CancellationToken ct)
{
return ValueTask.FromResult(_onProcess(envelope));
}
}

private sealed record TestContext(string RootActorId, string ProjectionKind)
: IProjectionMaterializationContext;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using Aevatar.Foundation.Abstractions.Persistence;
using Shouldly;

namespace Aevatar.Foundation.Abstractions.Tests;

public class EventStoreOptimisticConcurrencyExceptionTests
{
[Fact]
public void Constructor_ShouldCapturePropertiesAndFormatMessage()
{
var exception = new EventStoreOptimisticConcurrencyException("agent-1", expectedVersion: 4, actualVersion: 7);

exception.AgentId.ShouldBe("agent-1");
exception.ExpectedVersion.ShouldBe(4);
exception.ActualVersion.ShouldBe(7);
exception.Message.ShouldBe("Optimistic concurrency conflict: expected 4, actual 7");
exception.ShouldBeAssignableTo<InvalidOperationException>();
}

[Fact]
public void Constructor_ShouldDefaultAgentIdToEmpty_WhenNullProvided()
{
var exception = new EventStoreOptimisticConcurrencyException(agentId: null!, expectedVersion: 1, actualVersion: 2);

exception.AgentId.ShouldBe(string.Empty);
exception.ExpectedVersion.ShouldBe(1);
exception.ActualVersion.ShouldBe(2);
}
}
Loading