From 3c3cce9f0d12c1388956dc68b6b814ab6d950a6e Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Wed, 9 Mar 2022 18:18:40 -0800 Subject: [PATCH 1/4] Failure details support amongst other things: - New activity registration overloads - Static helper for running inline orchestrator functions - Misc. logging improvements - Updated DT.Core dependency to v2.8.0 - Minor test refactoring --- .../DurableTask.Generators.csproj | 7 +- .../DurableTaskSourceGenerator.cs | 2 +- src/DurableTask/DurableTask.csproj | 6 +- src/DurableTask/DurableTaskClient.cs | 29 +++- src/DurableTask/Grpc/DurableTaskGrpcClient.cs | 6 + src/DurableTask/Grpc/DurableTaskGrpcWorker.cs | 142 +++++++++++++----- src/DurableTask/Grpc/ProtoUtils.cs | 74 ++++++++- src/DurableTask/IDurableTaskRegistry.cs | 22 ++- src/DurableTask/Logs.cs | 14 +- .../OrchestrationFailureDetails.cs | 11 -- src/DurableTask/OrchestrationMetadata.cs | 59 +++++++- src/DurableTask/OrchestrationRunner.cs | 105 +++++++++++++ src/DurableTask/PurgeResult.cs | 21 +++ src/DurableTask/SdkUtils.cs | 16 -- src/DurableTask/TaskFailedException.cs | 31 ++++ src/DurableTask/TaskFailureDetails.cs | 12 ++ src/DurableTask/TaskOrchestrationContext.cs | 10 +- src/DurableTask/TaskOrchestrationShim.cs | 19 ++- .../AzureFunctionsTests.cs | 4 +- .../DurableTask.Sdk.Tests.csproj | 2 +- .../IntegrationTestBase.cs | 65 ++++++++ .../OrchestrationErrorHandling.cs | 133 ++++++++++++++++ .../OrchestrationPatterns.cs | 53 +------ 23 files changed, 690 insertions(+), 153 deletions(-) delete mode 100644 src/DurableTask/OrchestrationFailureDetails.cs create mode 100644 src/DurableTask/OrchestrationRunner.cs create mode 100644 src/DurableTask/PurgeResult.cs create mode 100644 src/DurableTask/TaskFailedException.cs create mode 100644 src/DurableTask/TaskFailureDetails.cs create mode 100644 test/DurableTask.Sdk.Tests/IntegrationTestBase.cs create mode 100644 test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs diff --git a/src/DurableTask.Generators/DurableTask.Generators.csproj b/src/DurableTask.Generators/DurableTask.Generators.csproj index 3fe959de8..8836b34e7 100644 --- a/src/DurableTask.Generators/DurableTask.Generators.csproj +++ b/src/DurableTask.Generators/DurableTask.Generators.csproj @@ -5,7 +5,6 @@ enable true 10.0 - 0.2.0 true ../../key.snk @@ -23,6 +22,12 @@ true + + + 0.2.0 + alpha + + diff --git a/src/DurableTask.Generators/DurableTaskSourceGenerator.cs b/src/DurableTask.Generators/DurableTaskSourceGenerator.cs index bc69fdd7f..571fac8c9 100644 --- a/src/DurableTask.Generators/DurableTaskSourceGenerator.cs +++ b/src/DurableTask.Generators/DurableTaskSourceGenerator.cs @@ -156,7 +156,7 @@ static void AddOrchestratorFunctionDeclaration(StringBuilder sourceBuilder, Dura [Function(nameof({orchestrator.TaskName}))] public static string {orchestrator.TaskName}([OrchestrationTrigger] string orchestratorState) {{ - return DurableOrchestrator.LoadAndRun(orchestratorState, singleton{orchestrator.TaskName}); + return OrchestrationRunner.LoadAndRun(orchestratorState, singleton{orchestrator.TaskName}); }}"); } diff --git a/src/DurableTask/DurableTask.csproj b/src/DurableTask/DurableTask.csproj index c177c3db1..7dd237a4a 100644 --- a/src/DurableTask/DurableTask.csproj +++ b/src/DurableTask/DurableTask.csproj @@ -32,13 +32,13 @@ + + + - - - diff --git a/src/DurableTask/DurableTaskClient.cs b/src/DurableTask/DurableTaskClient.cs index 377daa02e..0dcb91436 100644 --- a/src/DurableTask/DurableTaskClient.cs +++ b/src/DurableTask/DurableTaskClient.cs @@ -16,7 +16,7 @@ public abstract class DurableTaskClient : IAsyncDisposable /// /// The name of the orchestrator to schedule. /// The ID of the orchestration instance to schedule. If not specified, a random GUID value is used. - /// The optional input to pass to the scheduled orchestration instance. This must be a serializeable value. + /// The optional input to pass to the scheduled orchestration instance. This must be a serializable value. /// The time when the orchestration instance should start executing. If not specified, the orchestration instance will be scheduled immediately. /// Returns the instance ID of the scheduled orchestration instance. /// Thrown if is empty. @@ -49,7 +49,7 @@ public abstract Task ScheduleNewOrchestrationInstanceAsync( /// /// The ID of the orchestration instance that will handle the event. /// The name of the event. Event names are case-insensitive. - /// The serializeable data payload to include with the event. + /// The serializable data payload to include with the event. /// A task that completes when the event notification message has been enqueued. /// Thrown if or is null or empty. public abstract Task RaiseEventAsync(string instanceId, string eventName, object? eventPayload); @@ -151,6 +151,31 @@ public abstract Task WaitForInstanceCompletionAsync( string instanceId, bool getInputsAndOutputs = false); + /// + /// Purges orchestration instance metadata from the durable store. + /// + /// + /// + /// This method can be used to permanently delete orchestration metadata from the underlying storage provider, + /// including any stored inputs, outputs, and orchestration history records. This is often useful for implementing + /// data retension policies and for keeping storage costs minimal. Only orchestration instances in the + /// , , or + /// state can be purged. + /// + /// If is not found in the data store, or if the instance is found but not in a terminal + /// state, then the returned object will have a + /// value of 0. Otherwise, the existing data will be purged and will be 1. + /// + /// + /// The unique ID of the orchestration instance to purge. + /// A that can be used to cancel the purge operation. + /// + /// This method returns a object after the operation has completed with a + /// value of 1 or 0, depending on whether the target instance + /// was successfully purged. + /// + public abstract Task PurgeInstanceMetadataAsync(string instanceId, CancellationToken cancellation = default); + /// /// Disposes any unmanaged resources associated with this . /// diff --git a/src/DurableTask/Grpc/DurableTaskGrpcClient.cs b/src/DurableTask/Grpc/DurableTaskGrpcClient.cs index b47d77a9a..5cde567fa 100644 --- a/src/DurableTask/Grpc/DurableTaskGrpcClient.cs +++ b/src/DurableTask/Grpc/DurableTaskGrpcClient.cs @@ -227,6 +227,12 @@ public override async Task WaitForInstanceCompletionAsync return new OrchestrationMetadata(response, this.dataConverter, getInputsAndOutputs); } + /// + public override Task PurgeInstanceMetadataAsync(string instanceId, CancellationToken cancellation = default) + { + throw new NotImplementedException(); + } + public sealed class Builder { internal IServiceProvider? services; diff --git a/src/DurableTask/Grpc/DurableTaskGrpcWorker.cs b/src/DurableTask/Grpc/DurableTaskGrpcWorker.cs index 91d6b22d1..75eb7c0f8 100644 --- a/src/DurableTask/Grpc/DurableTaskGrpcWorker.cs +++ b/src/DurableTask/Grpc/DurableTaskGrpcWorker.cs @@ -270,10 +270,14 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request) OrchestrationRuntimeState runtimeState = BuildRuntimeState(request); TaskName name = new(runtimeState.Name, runtimeState.Version); - this.logger.ReceivedOrchestratorRequest(name, request.InstanceId); - - OrchestratorExecutionResult result; + this.logger.ReceivedOrchestratorRequest( + name, + request.InstanceId, + runtimeState.PastEvents.Count, + runtimeState.NewEvents.Count); + OrchestratorExecutionResult? result = null; + P.TaskFailureDetails? failureDetails = null; try { TaskOrchestration orchestrator; @@ -281,36 +285,61 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request) { // Both the factory invocation and the ExecuteAsync could involve user code and need to be handled as part of try/catch. orchestrator = factory.Invoke(this.workerContext); - TaskOrchestrationExecutor executor = new(runtimeState, orchestrator, BehaviorOnContinueAsNew.Carryover); + TaskOrchestrationExecutor executor = new( + runtimeState, + orchestrator, + BehaviorOnContinueAsNew.Carryover, + ErrorPropagationMode.UseFailureDetails); result = executor.Execute(); } else { - result = this.CreateOrchestrationFailedActionResult($"No task orchestration named '{name}' was found."); + failureDetails = new P.TaskFailureDetails + { + ErrorName = "OrchestratorTaskNotFound", + ErrorMessage = $"No orchestrator task named '{name}' was found.", + }; } } - catch (Exception applicationException) + catch (Exception unexpected) { - this.logger.OrchestratorFailed(name, request.InstanceId, applicationException.ToString()); - result = this.CreateOrchestrationFailedActionResult(applicationException); + // This is not expected: Normally TaskOrchestrationExecutor handles exceptions in user code. + this.logger.OrchestratorFailed(name, request.InstanceId, unexpected.ToString()); + failureDetails = new P.TaskFailureDetails + { + ErrorName = unexpected.GetType().FullName, + ErrorMessage = $"An internal error occurred in the orchestrator execution pipeline: {unexpected.Message}", + ErrorDetails = unexpected.ToString(), + }; } - // TODO: This is a workaround that allows us to change how the exception is presented to the user. - // Need to move this workaround into DurableTask.Core as a breaking change. - if (result.Actions.FirstOrDefault(a => a.OrchestratorActionType == OrchestratorActionType.OrchestrationComplete) is OrchestrationCompleteOrchestratorAction completedAction && - completedAction.OrchestrationStatus == OrchestrationStatus.Failed && - !string.IsNullOrEmpty(completedAction.Details)) + P.OrchestratorResponse response; + if (result != null) { - completedAction.Result = SdkUtils.GetSerializedErrorPayload( - this.workerContext.DataConverter, - "The orchestrator failed with an unhandled exception.", - completedAction.Details); + response = ProtoUtils.ConstructOrchestratorResponse( + request.InstanceId, + result.CustomStatus, + result.Actions); + } + else + { + // This is the case for failures that happened *outside* the orchestrator executor + response = new P.OrchestratorResponse + { + InstanceId = request.InstanceId, + Actions = + { + new P.OrchestratorAction + { + CompleteOrchestration = new P.CompleteOrchestrationAction + { + OrchestrationStatus = P.OrchestrationStatus.Failed, + FailureDetails = failureDetails, + }, + }, + }, + }; } - - P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse( - request.InstanceId, - result.CustomStatus, - result.Actions); this.logger.SendingOrchestratorResponse( name, @@ -352,14 +381,6 @@ OrchestratorExecutionResult CreateOrchestrationFailedActionResult(string message return OrchestratorExecutionResult.ForFailure(message, fullText); } - string CreateActivityFailedOutput(Exception? e = null, string? message = null) - { - return SdkUtils.GetSerializedErrorPayload( - this.dataConverter, - message ?? "The activity failed with an unhandled exception.", - e); - } - async Task OnRunActivityAsync(P.ActivityRequest request) { OrchestrationInstance instance = ProtoUtils.ConvertOrchestrationInstance(request.OrchestrationInstance); @@ -372,7 +393,8 @@ async Task OnRunActivityAsync(P.ActivityRequest request) TaskName name = new(request.Name, request.Version); - string output; + string? output = null; + P.TaskFailureDetails? failureDetails = null; try { if (this.activities.TryGetValue(name, out Func? factory) && factory != null) @@ -388,23 +410,47 @@ async Task OnRunActivityAsync(P.ActivityRequest request) } else { - output = this.CreateActivityFailedOutput(message: $"No task activity named '{name}' was found."); + failureDetails = new P.TaskFailureDetails + { + ErrorName = "ActivityTaskNotFound", + ErrorMessage = $"No activity task named '{name}' was found.", + }; } } catch (Exception applicationException) { - output = this.CreateActivityFailedOutput( - applicationException, - $"The activity '{name}#{request.TaskId}' failed with an unhandled exception."); + failureDetails = new P.TaskFailureDetails + { + ErrorName = applicationException.GetType().FullName, + ErrorMessage = applicationException.Message, + ErrorDetails = applicationException.ToString(), + }; } - int outputSize = output != null ? Encoding.UTF8.GetByteCount(output) : 0; - this.logger.SendingActivityResponse(name, request.TaskId, instance.InstanceId, outputSize); + int outputSizeInBytes = 0; + if (failureDetails != null) + { + outputSizeInBytes = + Encoding.UTF8.GetByteCount(failureDetails.ErrorName ?? "") + + Encoding.UTF8.GetByteCount(failureDetails.ErrorMessage ?? "") + + Encoding.UTF8.GetByteCount(failureDetails.ErrorDetails ?? ""); + } + else if (output != null) + { + outputSizeInBytes = Encoding.UTF8.GetByteCount(output); + } + + string successOrFailure = failureDetails != null ? "failure" : "success"; + this.logger.SendingActivityResponse(successOrFailure, name, request.TaskId, instance.InstanceId, outputSizeInBytes); + + P.ActivityResponse response = new() + { + InstanceId = instance.InstanceId, + TaskId = request.TaskId, + Result = output, + FailureDetails = failureDetails, + }; - P.ActivityResponse response = ProtoUtils.ConstructActivityResponse( - instance.InstanceId, - request.TaskId, - output); await this.sidecarClient.CompleteActivityTaskAsync(response); } @@ -569,6 +615,20 @@ public IDurableTaskRegistry AddOrchestrator() where TOrchestrator return this; } + public IDurableTaskRegistry AddActivity(TaskName name, Action implementation) + { + return this.AddActivity(name, (context, _) => + { + implementation(context); + return null!; + }); + } + + public IDurableTaskRegistry AddActivity(TaskName name, Func implementation) + { + return this.AddActivity(name, (context, input) => implementation(context)); + } + public IDurableTaskRegistry AddActivity( TaskName name, Func implementation) diff --git a/src/DurableTask/Grpc/ProtoUtils.cs b/src/DurableTask/Grpc/ProtoUtils.cs index 376f71b19..4da834905 100644 --- a/src/DurableTask/Grpc/ProtoUtils.cs +++ b/src/DurableTask/Grpc/ProtoUtils.cs @@ -2,11 +2,15 @@ // Licensed under the MIT License. using System; +using System.Buffers; +using System.Buffers.Text; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; +using System.Text; using DurableTask.Core; using DurableTask.Core.Command; using DurableTask.Core.History; +using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using P = DurableTask.Protobuf; @@ -65,8 +69,9 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) historyEvent = new TaskFailedEvent( proto.EventId, proto.TaskFailed.TaskScheduledId, - proto.TaskFailed.Reason, - proto.TaskFailed.Details); + reason: null, /* not supported */ + details: null, /* not supported */ + ConvertFailureDetails(proto.TaskFailed.FailureDetails)); break; case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated: historyEvent = new SubOrchestrationInstanceCreatedEvent(proto.EventId) @@ -87,8 +92,8 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) historyEvent = new SubOrchestrationInstanceFailedEvent( proto.EventId, proto.SubOrchestrationInstanceFailed.TaskScheduledId, - proto.SubOrchestrationInstanceFailed.Reason, - proto.SubOrchestrationInstanceFailed.Details); + reason: null /* not supported */, + details: null /* not supported */); break; case P.HistoryEvent.EventTypeOneofCase.TimerCreated: historyEvent = new TimerCreatedEvent( @@ -215,6 +220,11 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse( break; case OrchestratorActionType.SendEvent: var sendEventAction = (SendEventOrchestratorAction)action; + if (sendEventAction.Instance == null) + { + throw new ArgumentException($"{nameof(SendEventOrchestratorAction)} cannot have a null Instance property!"); + } + protoAction.SendEvent = new P.SendEventAction { Instance = ConvertOrchestrationInstance(sendEventAction.Instance), @@ -227,14 +237,24 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse( protoAction.CompleteOrchestration = new P.CompleteOrchestrationAction { CarryoverEvents = - { - // TODO - }, + { + // TODO + }, Details = completeAction.Details, NewVersion = completeAction.NewVersion, OrchestrationStatus = ConvertOrchestrationRuntimeStatus(completeAction.OrchestrationStatus), Result = completeAction.Result, }; + + if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed) + { + protoAction.CompleteOrchestration.FailureDetails = new P.TaskFailureDetails + { + ErrorName = completeAction.FailureDetails?.ErrorName, + ErrorMessage = completeAction.FailureDetails?.ErrorMessage, + ErrorDetails = completeAction.FailureDetails?.ErrorDetails, + }; + } break; default: throw new NotImplementedException($"Unknown orchestrator action: {action.OrchestratorActionType}"); @@ -292,4 +312,44 @@ static P.OrchestrationInstance ConvertOrchestrationInstance(OrchestrationInstanc ExecutionId = instance.ExecutionId, }; } + + static FailureDetails? ConvertFailureDetails(P.TaskFailureDetails? failureDetails) + { + if (failureDetails == null) + { + return null; + } + + return new FailureDetails( + failureDetails.ErrorName, + failureDetails.ErrorMessage, + failureDetails.ErrorDetails); + } + + internal static T Base64Decode(string encodedMessage, MessageParser parser) where T : IMessage + { + // Decode the base64 in a way that doesn't allocate a byte[] on each request + int encodedByteCount = Encoding.UTF8.GetByteCount(encodedMessage); + byte[] buffer = ArrayPool.Shared.Rent(encodedByteCount); + try + { + // The Base64 APIs require first converting the string into UTF-8 bytes. We then + // do an in-place conversion from base64 UTF-8 bytes to protobuf bytes so that + // we can finally decode the protobuf request. + Encoding.UTF8.GetBytes(encodedMessage, 0, encodedMessage.Length, buffer, 0); + OperationStatus status = Base64.DecodeFromUtf8InPlace( + buffer.AsSpan(0, encodedByteCount), + out int bytesWritten); + if (status != OperationStatus.Done) + { + throw new ArgumentException($"Failed to base64-decode the '{typeof(T).Name}' payload: {status}", nameof(encodedMessage)); + } + + return (T)parser.ParseFrom(buffer, 0, bytesWritten); + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } } diff --git a/src/DurableTask/IDurableTaskRegistry.cs b/src/DurableTask/IDurableTaskRegistry.cs index 3f79996d3..c64383948 100644 --- a/src/DurableTask/IDurableTaskRegistry.cs +++ b/src/DurableTask/IDurableTaskRegistry.cs @@ -47,13 +47,29 @@ public IDurableTaskRegistry AddOrchestrator( public IDurableTaskRegistry AddOrchestrator() where T : ITaskOrchestrator; /// - /// Registers an activity as an synchronous (blocking) lambda function. + /// Registers an activity as a synchronous (blocking) lambda function that doesn't take any input nor returns any output. /// - /// The input type of the activity. - /// The output type of the activity. /// The name of the activity. /// The lambda function to invoke when the activity is called. /// Returns this instance. + public IDurableTaskRegistry AddActivity( + TaskName name, + Action implementation); + + /// + /// Registers an activity as an asynchronous (non-blocking) lambda function. + /// + /// + public IDurableTaskRegistry AddActivity( + TaskName name, + Func implementation); + + /// + /// Registers an activity as an synchronous (blocking) lambda function with an input and an output. + /// + /// The input type of the activity. + /// The output type of the activity. + /// public IDurableTaskRegistry AddActivity( TaskName name, Func implementation); diff --git a/src/DurableTask/Logs.cs b/src/DurableTask/Logs.cs index 0d3c646ca..fa1fc94e8 100644 --- a/src/DurableTask/Logs.cs +++ b/src/DurableTask/Logs.cs @@ -9,10 +9,10 @@ namespace DurableTask // NOTE: Trying to make logs consistent with https://github.com/Azure/durabletask/blob/main/src/DurableTask.Core/Logging/LogEvents.cs. static partial class Logs { - [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = "Task hub worker is connecting to sidecar at {address}.")] + [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = "Durable Task worker is connecting to sidecar at {address}.")] public static partial void StartingTaskHubWorker(this ILogger logger, string address); - [LoggerMessage(EventId = 2, Level = LogLevel.Information, Message = "Task hub worker has disconnected from {address}.")] + [LoggerMessage(EventId = 2, Level = LogLevel.Information, Message = "Durable Task worker has disconnected from {address}.")] public static partial void SidecarDisconnected(this ILogger logger, string address); [LoggerMessage(EventId = 3, Level = LogLevel.Information, Message = "The sidecar at address {address} is unavailable. Will continue retrying.")] @@ -21,8 +21,8 @@ static partial class Logs [LoggerMessage(EventId = 4, Level = LogLevel.Information, Message = "Sidecar work-item streaming connection established.")] public static partial void EstablishedWorkItemConnection(this ILogger logger); - [LoggerMessage(EventId = 10, Level = LogLevel.Debug, Message = "{instanceId}: Received request for '{name}' orchestrator.")] - public static partial void ReceivedOrchestratorRequest(this ILogger logger, string name, string instanceId); + [LoggerMessage(EventId = 10, Level = LogLevel.Debug, Message = "{instanceId}: Received request to run orchestrator '{name}' with {oldEventCount} replay and {newEventCount} new history events.")] + public static partial void ReceivedOrchestratorRequest(this ILogger logger, string name, string instanceId, int oldEventCount, int newEventCount); [LoggerMessage(EventId = 11, Level = LogLevel.Debug, Message = "{instanceId}: Sending {count} action(s) [{actionsList}] for '{name}' orchestrator.")] public static partial void SendingOrchestratorResponse(this ILogger logger, string name, string instanceId, int count, string actionsList); @@ -30,11 +30,11 @@ static partial class Logs [LoggerMessage(EventId = 12, Level = LogLevel.Warning, Message = "{instanceId}: '{name}' orchestrator failed with an unhandled exception: {details}.")] public static partial void OrchestratorFailed(this ILogger logger, string name, string instanceId, string details); - [LoggerMessage(EventId = 13, Level = LogLevel.Debug, Message = "{instanceId}: Received request for '{name}#{taskId}' activity with {sizeInBytes} bytes of input data.")] + [LoggerMessage(EventId = 13, Level = LogLevel.Debug, Message = "{instanceId}: Received request to run activity '{name}#{taskId}' with {sizeInBytes} bytes of input data.")] public static partial void ReceivedActivityRequest(this ILogger logger, string name, int taskId, string instanceId, int sizeInBytes); - [LoggerMessage(EventId = 14, Level = LogLevel.Debug, Message = "{instanceId}: Sending response for '{name}#{taskId}' activity with {sizeInBytes} bytes of output data.")] - public static partial void SendingActivityResponse(this ILogger logger, string name, int taskId, string instanceId, int sizeInBytes); + [LoggerMessage(EventId = 14, Level = LogLevel.Debug, Message = "{instanceId}: Sending {successOrFailure} response for '{name}#{taskId}' activity with {sizeInBytes} bytes of output data.")] + public static partial void SendingActivityResponse(this ILogger logger, string successOrFailure, string name, int taskId, string instanceId, int sizeInBytes); [LoggerMessage(EventId = 20, Level = LogLevel.Error, Message = "Unexpected error in handling of instance ID '{instanceId}'. Details: {details}")] public static partial void UnexpectedError(this ILogger logger, string instanceId, string details); diff --git a/src/DurableTask/OrchestrationFailureDetails.cs b/src/DurableTask/OrchestrationFailureDetails.cs deleted file mode 100644 index a45a06b6e..000000000 --- a/src/DurableTask/OrchestrationFailureDetails.cs +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace DurableTask; - -/// -/// Record that represents the details of an orchestration instance failure. -/// -/// A summary description of the failure. -/// The full details of the failure, which is often an exception call-stack. -public record OrchestrationFailureDetails(string Message, string? Details); diff --git a/src/DurableTask/OrchestrationMetadata.cs b/src/DurableTask/OrchestrationMetadata.cs index f259d890e..47dc63609 100644 --- a/src/DurableTask/OrchestrationMetadata.cs +++ b/src/DurableTask/OrchestrationMetadata.cs @@ -7,6 +7,9 @@ namespace DurableTask; +/// +/// Represents a snapshot of an orchestration instance's current state. +/// public sealed class OrchestrationMetadata { readonly IDataConverter dataConverter; @@ -17,6 +20,7 @@ internal OrchestrationMetadata( IDataConverter dataConverter, bool requestedInputsAndOutputs) { + this.Name = response.OrchestrationState.Name; this.InstanceId = response.OrchestrationState.InstanceId; this.RuntimeStatus = (OrchestrationRuntimeStatus)response.OrchestrationState.OrchestrationStatus; this.CreatedAt = response.OrchestrationState.CreatedTimestamp.ToDateTimeOffset(); @@ -26,25 +30,76 @@ internal OrchestrationMetadata( this.SerializedCustomStatus = response.OrchestrationState.CustomStatus; this.dataConverter = dataConverter; this.requestedInputsAndOutputs = requestedInputsAndOutputs; + + this.FailureDetails = new OrchestrationFailureDetails( + response.OrchestrationState.FailureDetails.ErrorName, + response.OrchestrationState.FailureDetails.ErrorMessage, + response.OrchestrationState.FailureDetails.ErrorDetails); } + /// + /// Gets the name of the orchestration. + /// + public string Name { get; } + + /// + /// Gets the unique ID of the orchestration instance. + /// public string InstanceId { get; } + /// + /// Gets the current runtime status of the orchestration instance at the time this object was fetched. + /// public OrchestrationRuntimeStatus RuntimeStatus { get; } + /// + /// Gets the orchestration instance's creation time in UTC. + /// public DateTimeOffset CreatedAt { get; } + /// + /// Gets the orchestration instance's last updated time in UTC. + /// public DateTimeOffset LastUpdatedAt { get; } + /// + /// Gets the orchestration instance's serialized input, if any, as a string value. + /// public string? SerializedInput { get; } + /// + /// Gets the orchestration instance's serialized output, if any, as a string value. + /// public string? SerializedOutput { get; } - + + /// + /// Gets the orchestration instance's serialized custom status, if any, as a string value. + /// public string? SerializedCustomStatus { get; } + /// + /// Gets the failure details, if any, for the orchestration instance. + /// + /// + /// This property contains data only if the orchestration is in the state, + /// and only if this instance metadata was fetched with the option to include output data. + /// + public OrchestrationFailureDetails? FailureDetails { get; } + + /// + /// Gets a value indicating whether the orchestration instance was running at the time this object was fetched. + /// public bool IsRunning => this.RuntimeStatus == OrchestrationRuntimeStatus.Running; + /// + /// Gets a value indicating whether the orchestration instance was completed at the time this object was fetched. + /// + /// + /// An orchestration instance is considered completed when its value is + /// , , + /// or . + /// public bool IsCompleted => this.RuntimeStatus == OrchestrationRuntimeStatus.Completed || this.RuntimeStatus == OrchestrationRuntimeStatus.Failed || @@ -88,7 +143,7 @@ internal OrchestrationMetadata( public override string ToString() { - StringBuilder sb = new($"[ID: '{this.InstanceId}', RuntimeStatus: {this.RuntimeStatus}, CreatedAt: {this.CreatedAt:s}, LastUpdatedAt: {this.LastUpdatedAt:s}"); + StringBuilder sb = new($"[Name: '{this.Name}', ID: '{this.InstanceId}', RuntimeStatus: {this.RuntimeStatus}, CreatedAt: {this.CreatedAt:s}, LastUpdatedAt: {this.LastUpdatedAt:s}"); if (this.SerializedInput != null) { sb.Append(", Input: '").Append(GetTrimmedPayload(this.SerializedInput)).Append('\''); diff --git a/src/DurableTask/OrchestrationRunner.cs b/src/DurableTask/OrchestrationRunner.cs new file mode 100644 index 000000000..2e5347c83 --- /dev/null +++ b/src/DurableTask/OrchestrationRunner.cs @@ -0,0 +1,105 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Buffers.Text; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using DurableTask.Core; +using DurableTask.Core.History; +using DurableTask.Grpc; +using Google.Protobuf; +using P = DurableTask.Protobuf; + +namespace DurableTask; + + +public static class OrchestrationRunner +{ + + /// + /// Deserializes orchestration history from and uses it to execute the orchestrator function + /// code pointed to by . + /// + /// The type of the orchestrator function input. This type must be deserializeable from JSON. + /// The type of the orchestrator function output. This type must be serializeable to JSON. + /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. + /// A function that implements the orchestrator logic. + /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. + /// Thrown if or is null. + public static string LoadAndRun(string encodedOrchestratorRequest, Func> orchestratorFunc) + { + if (orchestratorFunc == null) + { + throw new ArgumentNullException(nameof(orchestratorFunc)); + } + + FuncTaskOrchestrator orchestrator = new(orchestratorFunc); + return LoadAndRun(encodedOrchestratorRequest, orchestrator); + } + + /// + /// Deserializes orchestration history from and uses it to resume the orchestrator + /// implemented by . + /// + /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. + /// An implementation that defines the orchestrator logic. + /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. + /// Thrown if or is null. + /// Thrown if contains invalid data. + public static string LoadAndRun(string encodedOrchestratorRequest, ITaskOrchestrator implementation) + { + if (string.IsNullOrEmpty(encodedOrchestratorRequest)) + { + throw new ArgumentNullException(nameof(encodedOrchestratorRequest)); + } + + if (implementation == null) + { + throw new ArgumentNullException(nameof(implementation)); + } + + P.OrchestratorRequest request = ProtoUtils.Base64Decode( + encodedOrchestratorRequest, + P.OrchestratorRequest.Parser); + + List pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList(); + IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); + + SimpleWorkerContext workerContext = new(JsonDataConverter.Default); + + // Re-construct the orchestration state from the history. + // New events must be added using the AddEvent method. + OrchestrationRuntimeState runtimeState = new(pastEvents); + foreach (HistoryEvent newEvent in newEvents) + { + runtimeState.AddEvent(newEvent); + } + + TaskName orchestratorName = new(runtimeState.Name, runtimeState.Version); + + TaskOrchestrationShim orchestrator = new(workerContext, orchestratorName, implementation); + TaskOrchestrationExecutor executor = new(runtimeState, orchestrator, BehaviorOnContinueAsNew.Carryover); + OrchestratorExecutionResult result = executor.Execute(); + + P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse( + request.InstanceId, + result.CustomStatus, + result.Actions); + byte[] responseBytes = response.ToByteArray(); + return Convert.ToBase64String(responseBytes); + } + + sealed class SimpleWorkerContext : IWorkerContext + { + public SimpleWorkerContext(IDataConverter dataConverter) + { + this.DataConverter = dataConverter; + } + + public IDataConverter DataConverter { get; } + } +} diff --git a/src/DurableTask/PurgeResult.cs b/src/DurableTask/PurgeResult.cs new file mode 100644 index 000000000..412e6aebc --- /dev/null +++ b/src/DurableTask/PurgeResult.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask; + +/// +/// Results of a purge operation. +/// +public class PurgeResult +{ + internal PurgeResult(int count) + { + this.PurgedInstanceCount = count; + } + + /// + /// Gets the number of purged instances. + /// + /// The number of purged instances. + public int PurgedInstanceCount { get; } +} diff --git a/src/DurableTask/SdkUtils.cs b/src/DurableTask/SdkUtils.cs index a541ecda6..adb55363e 100644 --- a/src/DurableTask/SdkUtils.cs +++ b/src/DurableTask/SdkUtils.cs @@ -39,22 +39,6 @@ internal static string ValidateAddress(string address) internal static ILogger GetLogger(ILoggerFactory loggerFactory) => loggerFactory.CreateLogger("DurableTask.Sdk"); - /// - /// Gets a serialized representation of an error payload. The format of this error is considered the standard that all SDKs should adhere to. - /// - internal static string GetSerializedErrorPayload(IDataConverter dataConverter, string message, Exception? e = null) - { - return GetSerializedErrorPayload(dataConverter, message, e?.ToString()); - } - - /// - /// Gets a serialized representation of an error payload. The format of this error is considered the standard that all SDKs should adhere to. - /// - internal static string GetSerializedErrorPayload(IDataConverter dataConverter, string message, string? fullText) - { - return dataConverter.Serialize(new OrchestrationFailureDetails(message, fullText)); - } - /// /// Gets the address of the Durable Task sidecar, which is responsible for managing and scheduling durable tasks. /// diff --git a/src/DurableTask/TaskFailedException.cs b/src/DurableTask/TaskFailedException.cs new file mode 100644 index 000000000..4f7d881ec --- /dev/null +++ b/src/DurableTask/TaskFailedException.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Text.Json; + +namespace DurableTask; + +// TODO: Documentation +public sealed class TaskFailedException : Exception +{ + internal TaskFailedException(string taskName, int taskId, string errorName, string errorMessage, string? errorDetails) + : base($"Activity task '{taskName}' (#{taskId}) failed with an unhandled exception: {errorMessage}") + { + this.TaskName = taskName; + this.TaskId = taskId; + this.ErrorName = errorName; + this.ErrorMessage = errorMessage; + this.ErrorDetails = errorDetails; + } + + public string TaskName { get; } + + public int TaskId { get; } + + public string ErrorName { get; } + + public string ErrorMessage { get; } + + public string? ErrorDetails { get; } +} diff --git a/src/DurableTask/TaskFailureDetails.cs b/src/DurableTask/TaskFailureDetails.cs new file mode 100644 index 000000000..285f9b1b6 --- /dev/null +++ b/src/DurableTask/TaskFailureDetails.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask; + +/// +/// Record that represents the details of an orchestration instance failure. +/// +/// The name of the error type. For .NET, this is the namespace-qualified exception type name. +/// A summary description of the failure. +/// The full details of the failure, which often includes an exception call-stack. +public record OrchestrationFailureDetails(string ErrorName, string ErrorMessage, string? ErrorDetails); diff --git a/src/DurableTask/TaskOrchestrationContext.cs b/src/DurableTask/TaskOrchestrationContext.cs index 94b8478fd..99bd9e136 100644 --- a/src/DurableTask/TaskOrchestrationContext.cs +++ b/src/DurableTask/TaskOrchestrationContext.cs @@ -54,7 +54,7 @@ public abstract class TaskOrchestrationContext // TODO: Summary and detailed remarks /// The name of the activity to call. - /// The serializeable input to pass to the activity. + /// The serializable input to pass to the activity. /// Additional options that control the execution and processing of the activity. /// A task that completes when the activity completes or fails. /// The specified orchestrator does not exist. @@ -106,7 +106,7 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo /// /// The name of the event to wait for. Event names are case-insensitive. External event names can be reused any number of times; they are not required to be unique. /// A CancellationToken to use to abort waiting for the event. - /// Any serializeable type that represents the event payload. + /// Any serializable type that represents the event payload. /// A task that completes when the external event is received. The value of the task is the deserialized event payload. public abstract Task WaitForExternalEvent(string eventName, CancellationToken cancellationToken = default); @@ -145,7 +145,7 @@ public async Task WaitForExternalEvent(string eventName, TimeSpan timeout) /// The serialized value must not exceed 16 KB of UTF-16 encoded text. /// /// - /// A serializeable value to assign as the custom status value or null to clear the custom status. + /// A serializable value to assign as the custom status value or null to clear the custom status. /// public abstract void SetCustomStatus(object? customStatus); @@ -190,7 +190,7 @@ public abstract Task CallSubOrchestratorAsync( /// /// A unique ID to use for the sub-orchestration instance. If not specified, a random instance ID will be generated. /// - /// The serializeable input to pass to the sub-orchestrator. + /// The serializable input to pass to the sub-orchestrator. /// Additional options that control the execution and processing of the sub-orchestrator. /// A task that completes when the sub-orchestrator completes or fails. /// The specified orchestrator does not exist. @@ -228,7 +228,7 @@ public Task CallSubOrchestratorAsync( /// Orchestrator functions should return immediately after calling the method. /// /// - /// The JSON-serializeable input data to re-initialize the instance with. + /// The JSON-serializable input data to re-initialize the instance with. /// /// If set to true, re-adds any unprocessed external events into the new execution /// history when the orchestration instance restarts. If false, any unprocessed diff --git a/src/DurableTask/TaskOrchestrationShim.cs b/src/DurableTask/TaskOrchestrationShim.cs index 6519fe496..5e0b8aa8e 100644 --- a/src/DurableTask/TaskOrchestrationShim.cs +++ b/src/DurableTask/TaskOrchestrationShim.cs @@ -125,13 +125,26 @@ public TaskOrchestrationContextWrapper( public override DateTime CurrentDateTimeUtc => this.innerContext.CurrentUtcDateTime; - public override Task CallActivityAsync( + public override async Task CallActivityAsync( TaskName name, object? input = null, TaskOptions? options = null) { - // TODO: Retry options - return this.innerContext.ScheduleTask(name.Name, name.Version, input); + try + { + // TODO: Retry options + return await this.innerContext.ScheduleTask(name.Name, name.Version, input); + } + catch (DurableTask.Core.Exceptions.TaskFailedException coreTfe) + { + // Hide the core DTFx types and instead use our own + throw new TaskFailedException( + taskName: name, + taskId: coreTfe.ScheduleId, + errorName: coreTfe.FailureDetails?.ErrorName ?? "(unknown)", + errorMessage: coreTfe.FailureDetails?.ErrorMessage ?? "(unknown)", + errorDetails: coreTfe.FailureDetails?.ErrorDetails); + } } [Obsolete("This method is not yet fully implemented")] diff --git a/test/DurableTask.Generators.Tests/AzureFunctionsTests.cs b/test/DurableTask.Generators.Tests/AzureFunctionsTests.cs index 6cee6ff4e..667484000 100644 --- a/test/DurableTask.Generators.Tests/AzureFunctionsTests.cs +++ b/test/DurableTask.Generators.Tests/AzureFunctionsTests.cs @@ -133,7 +133,7 @@ public class MyOrchestrator : TaskOrchestratorBase<{inputType}, {outputType}> [Function(nameof(MyOrchestrator))] public static string MyOrchestrator([OrchestrationTrigger] string orchestratorState) {{ - return DurableOrchestrator.LoadAndRun(orchestratorState, singletonMyOrchestrator); + return OrchestrationRunner.LoadAndRun(orchestratorState, singletonMyOrchestrator); }} /// @@ -217,7 +217,7 @@ public abstract class MyOrchestratorBase : TaskOrchestratorBase<{inputType}, {ou [Function(nameof(MyOrchestrator))] public static string MyOrchestrator([OrchestrationTrigger] string orchestratorState) {{ - return DurableOrchestrator.LoadAndRun(orchestratorState, singletonMyOrchestrator); + return OrchestrationRunner.LoadAndRun(orchestratorState, singletonMyOrchestrator); }} /// diff --git a/test/DurableTask.Sdk.Tests/DurableTask.Sdk.Tests.csproj b/test/DurableTask.Sdk.Tests/DurableTask.Sdk.Tests.csproj index 0b732b166..19f4ae4f5 100644 --- a/test/DurableTask.Sdk.Tests/DurableTask.Sdk.Tests.csproj +++ b/test/DurableTask.Sdk.Tests/DurableTask.Sdk.Tests.csproj @@ -21,7 +21,7 @@ - + diff --git a/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs b/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs new file mode 100644 index 000000000..a5bb9565b --- /dev/null +++ b/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Diagnostics; +using System.Threading; +using DurableTask.Grpc; +using DurableTask.Sdk.Tests.Logging; +using Microsoft.Extensions.Logging; +using Xunit; +using Xunit.Abstractions; + +namespace DurableTask.Sdk.Tests; + +/// +/// Base class for integration tests that use a in-process sidecar for executing orchestrations. +/// +public class IntegrationTestBase : IClassFixture, IDisposable +{ + readonly CancellationTokenSource testTimeoutSource = new(Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(10)); + readonly ILoggerFactory loggerFactory; + + // Documentation on xunit test fixtures: https://xunit.net/docs/shared-context + readonly GrpcSidecarFixture sidecarFixture; + + public IntegrationTestBase(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) + { + TestLogProvider logProvider = new(output); + this.loggerFactory = LoggerFactory.Create(builder => + { + builder.AddProvider(logProvider); + builder.SetMinimumLevel(LogLevel.Debug); + }); + this.sidecarFixture = sidecarFixture; + } + + + /// + /// Gets a that triggers after a default test timeout period. + /// The actual timeout value is increased if a debugger is attached to the test process. + /// + public CancellationToken TimeoutToken => this.testTimeoutSource.Token; + + void IDisposable.Dispose() + { + this.testTimeoutSource.Dispose(); + GC.SuppressFinalize(this); + } + + /// + /// Creates a configured to output logs to xunit logging infrastructure. + /// + protected DurableTaskGrpcWorker.Builder CreateWorkerBuilder() + { + return this.sidecarFixture.GetWorkerBuilder().UseLoggerFactory(this.loggerFactory); + } + + /// + /// Creates a configured to output logs to xunit logging infrastructure. + /// + protected DurableTaskClient CreateDurableTaskClient() + { + return this.sidecarFixture.GetClientBuilder().UseLoggerFactory(this.loggerFactory).Build(); + } +} diff --git a/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs b/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs new file mode 100644 index 000000000..bc4fca078 --- /dev/null +++ b/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs @@ -0,0 +1,133 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Threading.Tasks; +using DurableTask.Grpc; +using Xunit; +using Xunit.Abstractions; + +namespace DurableTask.Sdk.Tests; + +/// +/// Integration tests that are designed to exercise the error handling and retry functionality +/// of the Durable Task SDK. +/// +public class OrchestrationErrorHandling : IntegrationTestBase +{ + public OrchestrationErrorHandling(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) + : base(output, sidecarFixture) + { } + + /// + /// Tests the behavior and output of an unhandled exception that originates from an activity. + /// + [Fact] + public async Task UnhandledActivityException() + { + string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging + + TaskName orchestratorName = "FaultyOrchestration"; + TaskName activityName = "FaultyActivity"; + + // Use local function definitions to simplify the validation of the callstacks + async Task MyOrchestrationImpl(TaskOrchestrationContext ctx) => await ctx.CallActivityAsync(activityName); + void MyActivityImpl(TaskActivityContext ctx) => throw new Exception(errorMessage); + + await using DurableTaskGrpcWorker server = this.CreateWorkerBuilder() + .AddTasks(tasks => tasks + .AddOrchestrator(orchestratorName, MyOrchestrationImpl) + .AddActivity(activityName, MyActivityImpl)) + .Build(); + await server.StartAsync(this.TimeoutToken); + + DurableTaskClient client = this.CreateDurableTaskClient(); + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync( + instanceId, + this.TimeoutToken, + getInputsAndOutputs: true); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + + Assert.NotNull(metadata.FailureDetails); + OrchestrationFailureDetails failureDetails = metadata.FailureDetails!; + Assert.Equal(typeof(TaskFailedException).FullName, failureDetails.ErrorName); + + // Expecting something like: + // "The activity 'FaultyActivity' (#0) failed with an unhandled exception: Kah-BOOOOOM!!!" + int failingTaskId = 0; // This is the first task to be scheduled by the orchestrator, thus taskID = 0 + Assert.Contains($"#{failingTaskId}", failureDetails.ErrorMessage); + Assert.Contains(activityName, failureDetails.ErrorMessage); + Assert.Contains(errorMessage, failureDetails.ErrorMessage); + + // A callstack for the orchestration is expected in the error details (not the activity callstack). + Assert.NotNull(failureDetails.ErrorDetails); + Assert.Contains(nameof(MyOrchestrationImpl), failureDetails.ErrorDetails); + Assert.DoesNotContain(nameof(MyActivityImpl), failureDetails.ErrorDetails); + } + + /// + /// Tests the behavior and output of an unhandled exception that occurs in orchestrator code. + /// + /// + /// This is different from in that the source of the + /// exception is in the orchestrator code directly, and not from an unhandled activity task. + /// + [Fact] + public async Task UnhandledOrchestratorException() + { + string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging + string? expectedCallstack = null; + + TaskName orchestratorName = "FaultyOrchestration"; + await using DurableTaskGrpcWorker server = this.CreateWorkerBuilder() + .AddTasks(tasks => + tasks.AddOrchestrator(orchestratorName, ctx => + { + expectedCallstack = Environment.StackTrace; + throw new Exception(errorMessage); + })) + .Build(); + await server.StartAsync(this.TimeoutToken); + + DurableTaskClient client = this.CreateDurableTaskClient(); + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync( + instanceId, + this.TimeoutToken, + getInputsAndOutputs: true); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + + Assert.NotNull(metadata.FailureDetails); + OrchestrationFailureDetails failureDetails = metadata.FailureDetails!; + Assert.Equal(typeof(Exception).FullName, failureDetails.ErrorName); + Assert.Equal(errorMessage, failureDetails.ErrorMessage); + Assert.NotNull(failureDetails.ErrorDetails); + Assert.NotNull(expectedCallstack); + Assert.Contains(expectedCallstack![..300], failureDetails.ErrorDetails); + } + + /////// + /////// Tests retry policies for activity calls. + /////// + ////[Fact] + ////public async Task RetryActivityFailures() + ////{ + //// throw new NotImplementedException(); + ////} + + /////// + /////// Tests retry policies for sub-orchestrations. + /////// + ////[Fact] + ////public async Task RetrySubOrchestrationFailures() + ////{ + //// throw new NotImplementedException(); + ////} +} diff --git a/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs b/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs index 8922b6d4b..067e69ccc 100644 --- a/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs +++ b/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs @@ -3,66 +3,22 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using DurableTask.Grpc; using DurableTask.Sdk.Tests; -using DurableTask.Sdk.Tests.Logging; -using Microsoft.Extensions.Logging; using Xunit; using Xunit.Abstractions; namespace DurableTask.Tests; -public class OrchestrationPatterns : IClassFixture, IDisposable +public class OrchestrationPatterns : IntegrationTestBase { - readonly CancellationTokenSource testTimeoutSource = new(Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(10)); - readonly ILoggerFactory loggerFactory; - - // Documentation on xunit test fixtures: https://xunit.net/docs/shared-context - readonly GrpcSidecarFixture sidecarFixture; - public OrchestrationPatterns(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) - { - TestLogProvider logProvider = new(output); - this.loggerFactory = LoggerFactory.Create(builder => - { - builder.AddProvider(logProvider); - builder.SetMinimumLevel(LogLevel.Debug); - }); - this.sidecarFixture = sidecarFixture; - } - - /// - /// Gets a that triggers after a default test timeout period. - /// The actual timeout value is increased if a debugger is attached to the test process. - /// - public CancellationToken TimeoutToken => this.testTimeoutSource.Token; - - void IDisposable.Dispose() - { - this.testTimeoutSource.Dispose(); - GC.SuppressFinalize(this); - } - - /// - /// Creates a configured to output logs to xunit logging infrastructure. - /// - DurableTaskGrpcWorker.Builder CreateWorkerBuilder() - { - return this.sidecarFixture.GetWorkerBuilder().UseLoggerFactory(this.loggerFactory); - } - - /// - /// Creates a configured to output logs to xunit logging infrastructure. - /// - DurableTaskClient CreateDurableTaskClient() - { - return this.sidecarFixture.GetClientBuilder().UseLoggerFactory(this.loggerFactory).Build(); - } + : base(output, sidecarFixture) + { } [Fact] public async Task EmptyOrchestration() @@ -284,7 +240,8 @@ public async Task OrchestratorException() OrchestrationFailureDetails? failureDetails = metadata.ReadOutputAs(); Assert.NotNull(failureDetails); - Assert.Contains(errorMessage, failureDetails!.Details); + Assert.Equal(typeof(Exception).FullName, failureDetails!.ErrorName); + Assert.Contains(errorMessage, failureDetails!.ErrorDetails); } [Fact] From f9ab8f97a51934b0c78f2a1bad510ca8d4d5b6e3 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Sat, 26 Mar 2022 14:32:51 -0700 Subject: [PATCH 2/4] More changes: - Retry policies and custom handlers - non-retriable errors - various refactoring and cleanup - ErrorName --> ErrorType - ErrorDetails --> CallStack - Better support for nested failure details - Support dependency injected services in OrchestrationRunner - More documentation comments - Replay-safe logger --- .../DurableTaskSourceGenerator.cs | 4 +- .../{ => Converters}/JsonDataConverter.cs | 2 +- src/DurableTask/FuncTaskOrchestrator.cs | 33 ++ src/DurableTask/Grpc/DurableTaskGrpcWorker.cs | 36 +- src/DurableTask/Grpc/ProtoUtils.cs | 92 ++++- src/DurableTask/IWorkerContext.cs | 11 - src/DurableTask/Logs.cs | 4 +- src/DurableTask/OrchestrationMetadata.cs | 18 +- src/DurableTask/OrchestrationRunner.cs | 34 +- src/DurableTask/RetryHandler.cs | 33 ++ src/DurableTask/RetryPolicy.cs | 143 ++++++++ src/DurableTask/SdkUtils.cs | 1 + src/DurableTask/TaskFailedException.cs | 48 ++- src/DurableTask/TaskFailureDetails.cs | 47 ++- src/DurableTask/TaskOptions.cs | 144 +++++++- src/DurableTask/TaskOrchestrationContext.cs | 42 ++- src/DurableTask/TaskOrchestrationShim.cs | 198 +++++++---- src/DurableTask/WorkerContext.cs | 12 + .../AzureFunctionsTests.cs | 8 +- .../OrchestrationErrorHandling.cs | 319 ++++++++++++++++-- .../OrchestrationPatterns.cs | 28 -- 21 files changed, 1039 insertions(+), 218 deletions(-) rename src/DurableTask/{ => Converters}/JsonDataConverter.cs (97%) create mode 100644 src/DurableTask/FuncTaskOrchestrator.cs delete mode 100644 src/DurableTask/IWorkerContext.cs create mode 100644 src/DurableTask/RetryHandler.cs create mode 100644 src/DurableTask/RetryPolicy.cs create mode 100644 src/DurableTask/WorkerContext.cs diff --git a/src/DurableTask.Generators/DurableTaskSourceGenerator.cs b/src/DurableTask.Generators/DurableTaskSourceGenerator.cs index 571fac8c9..b4157e9fa 100644 --- a/src/DurableTask.Generators/DurableTaskSourceGenerator.cs +++ b/src/DurableTask.Generators/DurableTaskSourceGenerator.cs @@ -154,9 +154,9 @@ static void AddOrchestratorFunctionDeclaration(StringBuilder sourceBuilder, Dura { sourceBuilder.AppendLine($@" [Function(nameof({orchestrator.TaskName}))] - public static string {orchestrator.TaskName}([OrchestrationTrigger] string orchestratorState) + public static string {orchestrator.TaskName}([OrchestrationTrigger] string orchestratorState, FunctionContext executionContext) {{ - return OrchestrationRunner.LoadAndRun(orchestratorState, singleton{orchestrator.TaskName}); + return OrchestrationRunner.LoadAndRun(orchestratorState, singleton{orchestrator.TaskName}, executionContext.InstanceServices); }}"); } diff --git a/src/DurableTask/JsonDataConverter.cs b/src/DurableTask/Converters/JsonDataConverter.cs similarity index 97% rename from src/DurableTask/JsonDataConverter.cs rename to src/DurableTask/Converters/JsonDataConverter.cs index 3051fc9d4..0dd3fec9c 100644 --- a/src/DurableTask/JsonDataConverter.cs +++ b/src/DurableTask/Converters/JsonDataConverter.cs @@ -4,7 +4,7 @@ using System; using System.Text.Json; -namespace DurableTask; +namespace DurableTask.Converters; /// /// An implementation of that uses System.Text.Json APIs for data serialization. diff --git a/src/DurableTask/FuncTaskOrchestrator.cs b/src/DurableTask/FuncTaskOrchestrator.cs new file mode 100644 index 000000000..0b2375004 --- /dev/null +++ b/src/DurableTask/FuncTaskOrchestrator.cs @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Threading.Tasks; + +namespace DurableTask; + +/// +/// Implementation of that uses +/// a delegate as its implementation. +/// +/// The orchestrator input type. +/// The orchestrator output type. +public class FuncTaskOrchestrator : TaskOrchestratorBase +{ + readonly Func> implementation; + + /// + /// Initializes a new instance of the class. + /// + /// The orchestrator function. + public FuncTaskOrchestrator(Func> implementation) + { + this.implementation = implementation; + } + + /// + protected override Task OnRunAsync(TaskOrchestrationContext context, TInput? input) + { + return this.implementation(context, input); + } +} diff --git a/src/DurableTask/Grpc/DurableTaskGrpcWorker.cs b/src/DurableTask/Grpc/DurableTaskGrpcWorker.cs index 75eb7c0f8..c2029d252 100644 --- a/src/DurableTask/Grpc/DurableTaskGrpcWorker.cs +++ b/src/DurableTask/Grpc/DurableTaskGrpcWorker.cs @@ -57,8 +57,7 @@ public class DurableTaskGrpcWorker : IHostedService, IAsyncDisposable this.workerContext = new WorkerContext( this.dataConverter, this.logger, - this.services, - new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase)); + this.services); this.orchestrators = builder.taskProvider.orchestratorsBuilder.ToImmutable(); this.activities = builder.taskProvider.activitiesBuilder.ToImmutable(); @@ -296,8 +295,9 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request) { failureDetails = new P.TaskFailureDetails { - ErrorName = "OrchestratorTaskNotFound", + ErrorType = "OrchestratorTaskNotFound", ErrorMessage = $"No orchestrator task named '{name}' was found.", + IsNonRetriable = true, }; } } @@ -305,12 +305,7 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request) { // This is not expected: Normally TaskOrchestrationExecutor handles exceptions in user code. this.logger.OrchestratorFailed(name, request.InstanceId, unexpected.ToString()); - failureDetails = new P.TaskFailureDetails - { - ErrorName = unexpected.GetType().FullName, - ErrorMessage = $"An internal error occurred in the orchestrator execution pipeline: {unexpected.Message}", - ErrorDetails = unexpected.ToString(), - }; + failureDetails = ProtoUtils.ToTaskFailureDetails(unexpected); } P.OrchestratorResponse response; @@ -403,17 +398,13 @@ async Task OnRunActivityAsync(P.ActivityRequest request) TaskActivity activity = factory.Invoke(this.workerContext); output = await activity.RunAsync(innerContext, request.Input); } - else if (this.workerContext.DynamicActivities.TryGetValue(name, out TaskActivity? activity)) - { - // TODO: Need to do work in the worker that ensures - output = await activity.RunAsync(innerContext, request.Input); - } else { failureDetails = new P.TaskFailureDetails { - ErrorName = "ActivityTaskNotFound", + ErrorType = "ActivityTaskNotFound", ErrorMessage = $"No activity task named '{name}' was found.", + IsNonRetriable = true, }; } } @@ -421,19 +412,16 @@ async Task OnRunActivityAsync(P.ActivityRequest request) { failureDetails = new P.TaskFailureDetails { - ErrorName = applicationException.GetType().FullName, + ErrorType = applicationException.GetType().FullName, ErrorMessage = applicationException.Message, - ErrorDetails = applicationException.ToString(), + StackTrace = applicationException.StackTrace, }; } int outputSizeInBytes = 0; if (failureDetails != null) { - outputSizeInBytes = - Encoding.UTF8.GetByteCount(failureDetails.ErrorName ?? "") + - Encoding.UTF8.GetByteCount(failureDetails.ErrorMessage ?? "") + - Encoding.UTF8.GetByteCount(failureDetails.ErrorDetails ?? ""); + outputSizeInBytes = ProtoUtils.GetApproximateByteCount(failureDetails); } else if (output != null) { @@ -689,10 +677,4 @@ static TaskName GetTaskName(Type taskDeclarationType) } } } - - internal record WorkerContext( - IDataConverter DataConverter, - ILogger Logger, - IServiceProvider Services, - ConcurrentDictionary DynamicActivities) : IWorkerContext; } diff --git a/src/DurableTask/Grpc/ProtoUtils.cs b/src/DurableTask/Grpc/ProtoUtils.cs index 4da834905..b17075e8d 100644 --- a/src/DurableTask/Grpc/ProtoUtils.cs +++ b/src/DurableTask/Grpc/ProtoUtils.cs @@ -93,7 +93,8 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) proto.EventId, proto.SubOrchestrationInstanceFailed.TaskScheduledId, reason: null /* not supported */, - details: null /* not supported */); + details: null /* not supported */, + ConvertFailureDetails(proto.SubOrchestrationInstanceFailed.FailureDetails)); break; case P.HistoryEvent.EventTypeOneofCase.TimerCreated: historyEvent = new TimerCreatedEvent( @@ -248,12 +249,7 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse( if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed) { - protoAction.CompleteOrchestration.FailureDetails = new P.TaskFailureDetails - { - ErrorName = completeAction.FailureDetails?.ErrorName, - ErrorMessage = completeAction.FailureDetails?.ErrorMessage, - ErrorDetails = completeAction.FailureDetails?.ErrorDetails, - }; + protoAction.CompleteOrchestration.FailureDetails = ConvertFailureDetails(completeAction.FailureDetails); } break; default: @@ -321,9 +317,87 @@ static P.OrchestrationInstance ConvertOrchestrationInstance(OrchestrationInstanc } return new FailureDetails( - failureDetails.ErrorName, + failureDetails.ErrorType, + failureDetails.ErrorMessage, + failureDetails.StackTrace, + ConvertFailureDetails(failureDetails.InnerFailure), + failureDetails.IsNonRetriable); + } + + internal static TaskFailureDetails? ConvertTaskFailureDetails(P.TaskFailureDetails? failureDetails) + { + if (failureDetails == null) + { + return null; + } + + return new TaskFailureDetails( + failureDetails.ErrorType, failureDetails.ErrorMessage, - failureDetails.ErrorDetails); + failureDetails.StackTrace, + ConvertTaskFailureDetails(failureDetails.InnerFailure)); + } + + static P.TaskFailureDetails? ConvertFailureDetails(FailureDetails? failureDetails) + { + if (failureDetails == null) + { + return null; + } + + return new P.TaskFailureDetails + { + ErrorType = failureDetails.ErrorType ?? "(unkown)", + ErrorMessage = failureDetails.ErrorMessage ?? "(unkown)", + StackTrace = failureDetails.StackTrace, + IsNonRetriable = failureDetails.IsNonRetriable, + InnerFailure = ConvertFailureDetails(failureDetails.InnerFailure), + }; + } + + internal static P.TaskFailureDetails? ToTaskFailureDetails(Exception? e) + { + if (e == null) + { + return null; + } + + return new P.TaskFailureDetails + { + ErrorType = e.GetType().FullName, + ErrorMessage = e.Message, + StackTrace = e.StackTrace, + InnerFailure = ToTaskFailureDetails(e.InnerException), + }; + } + + internal static int GetApproximateByteCount(P.TaskFailureDetails failureDetails) + { + // Protobuf strings are always UTF-8: https://developers.google.com/protocol-buffers/docs/proto3#scalar + Encoding encoding = Encoding.UTF8; + + int byteCount = 0; + if (failureDetails.ErrorType != null) + { + byteCount += encoding.GetByteCount(failureDetails.ErrorType); + } + + if (failureDetails.ErrorMessage != null) + { + byteCount += encoding.GetByteCount(failureDetails.ErrorMessage); + } + + if (failureDetails.StackTrace != null) + { + byteCount += encoding.GetByteCount(failureDetails.StackTrace); + } + + if (failureDetails.InnerFailure != null) + { + byteCount += GetApproximateByteCount(failureDetails.InnerFailure); + } + + return byteCount; } internal static T Base64Decode(string encodedMessage, MessageParser parser) where T : IMessage diff --git a/src/DurableTask/IWorkerContext.cs b/src/DurableTask/IWorkerContext.cs deleted file mode 100644 index 92d281243..000000000 --- a/src/DurableTask/IWorkerContext.cs +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace DurableTask; - -// TODO: This is public only because it's needed for Functions. Consider -// a design that doesn't require this type to be public. -public interface IWorkerContext -{ - public IDataConverter DataConverter { get; } -} diff --git a/src/DurableTask/Logs.cs b/src/DurableTask/Logs.cs index fa1fc94e8..997771279 100644 --- a/src/DurableTask/Logs.cs +++ b/src/DurableTask/Logs.cs @@ -48,7 +48,6 @@ static partial class Logs [LoggerMessage(EventId = 23, Level = LogLevel.Warning, Message = "The worker is busy servicing other clients. Waiting for the worker to become available for a new connection.")] public static partial void WorkerBusy(this ILogger logger); - // Management APIs [LoggerMessage(EventId = 40, Level = LogLevel.Information, Message = "Scheduling new {name} orchestration with instance ID '{instanceId}' and {sizeInBytes} bytes of input data.")] public static partial void SchedulingOrchestration(this ILogger logger, string instanceId, string name, int sizeInBytes, DateTimeOffset startTime); @@ -62,5 +61,8 @@ static partial class Logs [LoggerMessage(EventId = 44, Level = LogLevel.Information, Message = "Terminating instance '{instanceId}'.")] public static partial void TerminatingInstance(this ILogger logger, string instanceId); + + [LoggerMessage(EventId = 55, Level = LogLevel.Information, Message = "{instanceId}: Evaluating custom retry handler for failed '{name}' task. Attempt = {attempt}.")] + public static partial void RetryingTask(this ILogger logger, string instanceId, string name, int attempt); } } diff --git a/src/DurableTask/OrchestrationMetadata.cs b/src/DurableTask/OrchestrationMetadata.cs index 47dc63609..1cbd4fca9 100644 --- a/src/DurableTask/OrchestrationMetadata.cs +++ b/src/DurableTask/OrchestrationMetadata.cs @@ -3,6 +3,7 @@ using System; using System.Text; +using DurableTask.Grpc; using P = DurableTask.Protobuf; namespace DurableTask; @@ -28,13 +29,9 @@ internal OrchestrationMetadata( this.SerializedInput = response.OrchestrationState.Input; this.SerializedOutput = response.OrchestrationState.Output; this.SerializedCustomStatus = response.OrchestrationState.CustomStatus; + this.FailureDetails = ProtoUtils.ConvertTaskFailureDetails(response.OrchestrationState?.FailureDetails); this.dataConverter = dataConverter; this.requestedInputsAndOutputs = requestedInputsAndOutputs; - - this.FailureDetails = new OrchestrationFailureDetails( - response.OrchestrationState.FailureDetails.ErrorName, - response.OrchestrationState.FailureDetails.ErrorMessage, - response.OrchestrationState.FailureDetails.ErrorDetails); } /// @@ -84,7 +81,7 @@ internal OrchestrationMetadata( /// This property contains data only if the orchestration is in the state, /// and only if this instance metadata was fetched with the option to include output data. /// - public OrchestrationFailureDetails? FailureDetails { get; } + public TaskFailureDetails? FailureDetails { get; } /// /// Gets a value indicating whether the orchestration instance was running at the time this object was fetched. @@ -154,6 +151,15 @@ public override string ToString() sb.Append(", Output: '").Append(GetTrimmedPayload(this.SerializedOutput)).Append('\''); } + if (this.FailureDetails != null) + { + sb.Append(", FailureDetails: '") + .Append(this.FailureDetails.ErrorType) + .Append(" - ") + .Append(GetTrimmedPayload(this.FailureDetails.ErrorMessage)) + .Append('\''); + } + return sb.Append(']').ToString(); } diff --git a/src/DurableTask/OrchestrationRunner.cs b/src/DurableTask/OrchestrationRunner.cs index 2e5347c83..b5b5220a1 100644 --- a/src/DurableTask/OrchestrationRunner.cs +++ b/src/DurableTask/OrchestrationRunner.cs @@ -2,16 +2,16 @@ // Licensed under the MIT License. using System; -using System.Buffers; -using System.Buffers.Text; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using DurableTask.Core; using DurableTask.Core.History; using DurableTask.Grpc; using Google.Protobuf; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using P = DurableTask.Protobuf; namespace DurableTask; @@ -19,7 +19,6 @@ namespace DurableTask; public static class OrchestrationRunner { - /// /// Deserializes orchestration history from and uses it to execute the orchestrator function /// code pointed to by . @@ -28,9 +27,13 @@ public static class OrchestrationRunner /// The type of the orchestrator function output. This type must be serializeable to JSON. /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. /// A function that implements the orchestrator logic. + /// Optional from which injected dependencies can be retrieved. /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. /// Thrown if or is null. - public static string LoadAndRun(string encodedOrchestratorRequest, Func> orchestratorFunc) + public static string LoadAndRun( + string encodedOrchestratorRequest, + Func> orchestratorFunc, + IServiceProvider? services = null) { if (orchestratorFunc == null) { @@ -38,7 +41,7 @@ public static string LoadAndRun(string encodedOrchestratorReque } FuncTaskOrchestrator orchestrator = new(orchestratorFunc); - return LoadAndRun(encodedOrchestratorRequest, orchestrator); + return LoadAndRun(encodedOrchestratorRequest, orchestrator, services); } /// @@ -47,10 +50,11 @@ public static string LoadAndRun(string encodedOrchestratorReque /// /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. /// An implementation that defines the orchestrator logic. + /// Optional from which injected dependencies can be retrieved. /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. /// Thrown if or is null. /// Thrown if contains invalid data. - public static string LoadAndRun(string encodedOrchestratorRequest, ITaskOrchestrator implementation) + public static string LoadAndRun(string encodedOrchestratorRequest, ITaskOrchestrator implementation, IServiceProvider? services = null) { if (string.IsNullOrEmpty(encodedOrchestratorRequest)) { @@ -69,7 +73,11 @@ public static string LoadAndRun(string encodedOrchestratorRequest, ITaskOrchestr List pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList(); IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); - SimpleWorkerContext workerContext = new(JsonDataConverter.Default); + IDataConverter dataConverter = services?.GetService() ?? SdkUtils.DefaultDataConverter; + ILoggerFactory loggerFactory = services?.GetService() ?? NullLoggerFactory.Instance; + ILogger logger = SdkUtils.GetLogger(loggerFactory); + + WorkerContext workerContext = new(dataConverter, logger, services ?? SdkUtils.EmptyServiceProvider); // Re-construct the orchestration state from the history. // New events must be added using the AddEvent method. @@ -92,14 +100,4 @@ public static string LoadAndRun(string encodedOrchestratorRequest, ITaskOrchestr byte[] responseBytes = response.ToByteArray(); return Convert.ToBase64String(responseBytes); } - - sealed class SimpleWorkerContext : IWorkerContext - { - public SimpleWorkerContext(IDataConverter dataConverter) - { - this.DataConverter = dataConverter; - } - - public IDataConverter DataConverter { get; } - } } diff --git a/src/DurableTask/RetryHandler.cs b/src/DurableTask/RetryHandler.cs new file mode 100644 index 000000000..3d0cd3820 --- /dev/null +++ b/src/DurableTask/RetryHandler.cs @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace DurableTask; + +/// +/// Delegate for manually handling task retries. +/// +/// Retry context that's updated between each retry attempt. +/// Returns true to continue retrying or false to stop retrying. +public delegate bool RetryHandler(RetryContext retryContext); + +/// +public delegate Task AsyncRetryHandler(RetryContext retryContext); + +/// +/// Retry context data that's provided to task retry handler implementations. +/// +/// The context of the parent orchestrator. +/// The current attempt number experiencing a failure. +/// The details of the previous task failure. +/// The total amount of time spent in a retry loop for the current task. +/// A cancellation token that can be used to cancel the retries. +public record RetryContext( + TaskOrchestrationContext OrchestrationContext, + int LastAttemptNumber, + TaskFailureDetails LastFailure, + TimeSpan TotalRetryTime, + CancellationToken CancellationToken); diff --git a/src/DurableTask/RetryPolicy.cs b/src/DurableTask/RetryPolicy.cs new file mode 100644 index 000000000..42903e9b3 --- /dev/null +++ b/src/DurableTask/RetryPolicy.cs @@ -0,0 +1,143 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace DurableTask; + +/// +/// A declarative retry policy that can be configured for activity or sub-orchestration calls. +/// +public class RetryPolicy +{ + /// + /// Initializes a new instance of the class. + /// + /// The maximum number of attempts. Must be 1 or greater. + /// The amount of time to delay between the first and second attempt. + /// The exponential backoff coefficient used to determine the delay between subsequent retries. Must be 1.0 or greater. + /// The maximum time to delay between attempts, regardless of . + /// Tthe overall timeout for retries. + /// + /// The value can be used to specify an unlimited timeout for or . + /// + /// + /// Thrown if any of the following are true: + /// + /// The value for is less than or equal to zero. + /// The value for is less than or equal to . + /// The value for is less than 1.0. + /// The value for is less than . + /// The value for is less than . + /// + /// + public RetryPolicy( + int maxNumberOfAttempts, + TimeSpan firstRetryInterval, + double backoffCoefficient = 1.0, + TimeSpan? maxRetryInterval = null, + TimeSpan? retryTimeout = null) + { + if (maxNumberOfAttempts <= 0) + { + throw new ArgumentOutOfRangeException( + paramName: nameof(maxNumberOfAttempts), + actualValue: maxNumberOfAttempts, + message: $"The value for {nameof(maxNumberOfAttempts)} must be greater than zero."); + } + + if (firstRetryInterval <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + paramName: nameof(firstRetryInterval), + actualValue: firstRetryInterval, + message: $"The value for {nameof(firstRetryInterval)} must be greater than zero."); + } + + if (backoffCoefficient < 1.0) + { + throw new ArgumentOutOfRangeException( + paramName: nameof(backoffCoefficient), + actualValue: backoffCoefficient, + message: $"The value for {nameof(backoffCoefficient)} must be greater or equal to 1.0."); + } + + if (maxRetryInterval < firstRetryInterval && maxRetryInterval != Timeout.InfiniteTimeSpan) + { + throw new ArgumentOutOfRangeException( + paramName: nameof(maxRetryInterval), + actualValue: maxRetryInterval, + message: $"The value for {nameof(maxRetryInterval)} must be greater or equal to the value for {nameof(firstRetryInterval)}."); + } + + if (retryTimeout < firstRetryInterval && retryTimeout != Timeout.InfiniteTimeSpan) + { + throw new ArgumentOutOfRangeException( + paramName: nameof(retryTimeout), + actualValue: retryTimeout, + message: $"The value for {nameof(retryTimeout)} must be greater or equal to the value for {nameof(firstRetryInterval)}."); + } + + this.MaxNumberOfAttempts = maxNumberOfAttempts; + this.FirstRetryInterval = firstRetryInterval; + this.BackoffCoefficient = backoffCoefficient; + this.MaxRetryInterval = maxRetryInterval ?? TimeSpan.FromHours(1); + this.RetryTimeout = retryTimeout ?? Timeout.InfiniteTimeSpan; + } + + /// + /// Gets the max number of attempts for executing a given task. + /// + public int MaxNumberOfAttempts { get; } + + /// + /// Gets the amount of time to delay between the first and second attempt. + /// + public TimeSpan FirstRetryInterval { get; } + + /// + /// Gets the exponential backoff coefficient used to determine the delay between subsequent retries. + /// + /// + /// Defaults to 1.0 for no backoff. + /// + public double BackoffCoefficient { get; } + + /// + /// Gets the maximum time to delay between attempts. + /// + /// + /// Defaults to 1 hour. + /// + public TimeSpan MaxRetryInterval { get; } + + /// + /// Gets the overall timeout for retries. No further attempts will be made at executing a task after this retry timeout expires. + /// + /// + /// Defaults to . + /// + public TimeSpan RetryTimeout { get; } + + /// + /// Gets or sets a Func to call on exception to determine if retries should proceed + /// + public Func>? HandleAsync { get; set; } + + internal DurableTask.Core.RetryOptions ToDurableTaskCoreRetryOptions() + { + // The legacy framework doesn't support Timeout.InfiniteTimeSpan so we have to convert that + // to TimeSpan.MaxValue when encountered. + static TimeSpan ConvertInfiniteTimeSpans(TimeSpan timeout) => + timeout == Timeout.InfiniteTimeSpan ? TimeSpan.MaxValue : timeout; + + return new DurableTask.Core.RetryOptions(this.FirstRetryInterval, this.MaxNumberOfAttempts) + { + BackoffCoefficient = this.BackoffCoefficient, + MaxRetryInterval = ConvertInfiniteTimeSpans(this.MaxRetryInterval), + RetryTimeout = ConvertInfiniteTimeSpans(this.RetryTimeout), + }; + } +} diff --git a/src/DurableTask/SdkUtils.cs b/src/DurableTask/SdkUtils.cs index adb55363e..993c8b0cb 100644 --- a/src/DurableTask/SdkUtils.cs +++ b/src/DurableTask/SdkUtils.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System; +using DurableTask.Converters; using DurableTask.Grpc; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; diff --git a/src/DurableTask/TaskFailedException.cs b/src/DurableTask/TaskFailedException.cs index 4f7d881ec..60af5b107 100644 --- a/src/DurableTask/TaskFailedException.cs +++ b/src/DurableTask/TaskFailedException.cs @@ -2,30 +2,54 @@ // Licensed under the MIT License. using System; -using System.Text.Json; +using DurableTask.Core.Exceptions; namespace DurableTask; -// TODO: Documentation +/// +/// Exception that gets thrown when when a durable task, such as an activity or a sub-orchestration, fails with an unhandled exception. +/// public sealed class TaskFailedException : Exception { - internal TaskFailedException(string taskName, int taskId, string errorName, string errorMessage, string? errorDetails) - : base($"Activity task '{taskName}' (#{taskId}) failed with an unhandled exception: {errorMessage}") + internal TaskFailedException(string taskName, int taskId, OrchestrationException cause) + : base(GetExceptionMessage(taskName, taskId, cause)) { this.TaskName = taskName; this.TaskId = taskId; - this.ErrorName = errorName; - this.ErrorMessage = errorMessage; - this.ErrorDetails = errorDetails; + this.FailureDetails = TaskFailureDetails.FromCoreException(cause); } + /// + /// Gets the name of the failed task. + /// public string TaskName { get; } + /// + /// Gets the ID of the failed task. + /// public int TaskId { get; } - public string ErrorName { get; } - - public string ErrorMessage { get; } - - public string? ErrorDetails { get; } + /// + /// Gets the details of the task failure. + /// + public TaskFailureDetails FailureDetails { get; } + + /// + /// Returns true if the task failure was provided by the specified exception type. + /// + /// + /// This method allows checking if a task failed due to an exception of a specific type. + /// The comparison relies on a string comparison of the full type name (e.g., "System.InvalidOperationException") + /// and therefore doesn't support base types. + /// + /// The type of exception to test against. + /// Returns true if the value matches ; false otherwise. + public bool IsCausedByException() where T : Exception => this.FailureDetails.ErrorType == typeof(T).FullName; + + static string GetExceptionMessage(string taskName, int taskId, OrchestrationException cause) + { + // NOTE: Some integration tests depend on the format of this exception message. + string subMessage = cause.FailureDetails?.ErrorMessage ?? cause.Message; + return $"Activity task '{taskName}' (#{taskId}) failed with an unhandled exception: {subMessage}"; + } } diff --git a/src/DurableTask/TaskFailureDetails.cs b/src/DurableTask/TaskFailureDetails.cs index 285f9b1b6..2a9a638ad 100644 --- a/src/DurableTask/TaskFailureDetails.cs +++ b/src/DurableTask/TaskFailureDetails.cs @@ -1,12 +1,51 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System; + namespace DurableTask; /// -/// Record that represents the details of an orchestration instance failure. +/// Record that represents the details of a task failure. /// -/// The name of the error type. For .NET, this is the namespace-qualified exception type name. +/// The error type. For .NET, this is the namespace-qualified exception type name. /// A summary description of the failure. -/// The full details of the failure, which often includes an exception call-stack. -public record OrchestrationFailureDetails(string ErrorName, string ErrorMessage, string? ErrorDetails); +/// The stack trace of the failure. +/// The inner cause of the task failure. +public record TaskFailureDetails(string ErrorType, string ErrorMessage, string? StackTrace, TaskFailureDetails? InnerFailure) +{ + Type? exceptionType; + + /// + /// Gets a debug-friendly description of the failure information. + /// + public override string ToString() + { + return $"{this.ErrorType}: {this.ErrorMessage}"; + } + + /// + /// Returns true if the task failure was provided by the specified exception type. + /// + /// + /// This method allows checking if a task failed due to an exception of a specific type by attempting + /// to load the type specified in . If the exception type cannot be loaded + /// for any reason, this method will return false. Base types are supported. + /// + /// The type of exception to test against. + /// Returns true if the value matches ; false otherwise. + public bool IsCausedBy() where T : Exception + { + this.exceptionType ??= Type.GetType(this.ErrorType, throwOnError: false); + return this.exceptionType != null && typeof(T).IsAssignableFrom(this.exceptionType); + } + + internal static TaskFailureDetails FromCoreException(DurableTask.Core.Exceptions.OrchestrationException e) + { + return new TaskFailureDetails( + e.FailureDetails?.ErrorType ?? "(unknown)", + e.FailureDetails?.ErrorMessage ?? "(unknown)", + e.FailureDetails?.StackTrace, + null /* InnerFailure */); + } +} diff --git a/src/DurableTask/TaskOptions.cs b/src/DurableTask/TaskOptions.cs index a646be389..652e4cba5 100644 --- a/src/DurableTask/TaskOptions.cs +++ b/src/DurableTask/TaskOptions.cs @@ -1,18 +1,150 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System; using System.Threading; -using DurableTask.Core; -using DurableTask.Core.Serializing; +using System.Threading.Tasks; namespace DurableTask; +/// +/// Options that can be used to control the behavior of orchestrator task execution. +/// public class TaskOptions { - // TODO: Don't expose DurableTask.Core types! - public RetryOptions? RetryOptions { get; set; } + internal TaskOptions(Builder builder) + { + this.RetryPolicy = builder.RetryPolicy; + this.RetryHandler = builder.RetryHandler; + } - public DataConverter? DataConverter { get; set; } + /// + /// Gets the retry policy that was configured for this instance. + /// + public RetryPolicy? RetryPolicy { get; } - public CancellationToken CancellationToken { get; set; } + /// + /// Gets the cancellation token that was configured for this instance. + /// + public CancellationToken CancellationToken { get; } + + internal AsyncRetryHandler? RetryHandler { get; } + + /// + /// Convenience method from creating a object from a . + /// + /// The task retry policy to configure. + /// Optional cancellation token for canceling the task. + /// Returns a newly created object. + public static TaskOptions FromRetryPolicy(RetryPolicy policy, CancellationToken cancellationToken = default) + { + return CreateBuilder().UseRetryPolicy(policy).UseCancellationToken(cancellationToken).Build(); + } + + /// + /// Convenience method from creating a object from a . + /// + /// The task retry handler to configure. + /// Optional cancellation token for canceling the task. + /// Returns a newly created object. + public static TaskOptions FromRetryHandler(RetryHandler retryHandler, CancellationToken cancellationToken = default) + { + return CreateBuilder().UseRetryHandler(retryHandler).UseCancellationToken(cancellationToken).Build(); + } + + /// + /// Convenience method from creating a object from a . + /// + /// + public static TaskOptions FromRetryHandler(AsyncRetryHandler retryHandler, CancellationToken cancellationToken = default) + { + return CreateBuilder().UseRetryHandler(retryHandler).UseCancellationToken(cancellationToken).Build(); + } + + /// + /// Creates a new object that can be used to construct a new object. + /// + /// Returns a new object that can be used to construct a new object. + public static Builder CreateBuilder() => new(); + + /// + /// Builder for creating instances. + /// + public sealed class Builder + { + internal RetryPolicy? RetryPolicy { get; private set; } + + internal AsyncRetryHandler? RetryHandler { get; private set; } + + internal CancellationToken CancellationToken { get; private set; } + + /// + /// Configures a task retry policy. + /// + /// The task retry policy to configure. + /// Returns the current object. + /// Thrown if a was already configured for this . + public Builder UseRetryPolicy(RetryPolicy policy) + { + if (this.RetryHandler != null) + { + throw new InvalidOperationException("You can configure a retry policy or a retry handler, but not both."); + } + + this.RetryPolicy = policy; + return this; + } + + /// + public Builder UseRetryHandler(RetryHandler handler) + { + // Synchronous handlers are wrapped in an async handler so that we only have + // to keep track of a single handler assignment. + return this.UseRetryHandler(retryContext => Task.FromResult(handler(retryContext))); + } + + /// + /// Configures a retry handler. + /// + /// The handler to invoke when deciding whether to retry a failed orchestrator task. + /// Returns the current object. + /// Thrown if a was already configured for this . + public Builder UseRetryHandler(AsyncRetryHandler handler) + { + if (this.RetryPolicy != null) + { + throw new InvalidOperationException("You can configure a retry policy or a retry handler, but not both."); + } + + this.RetryHandler = handler; + return this; + } + + /// + /// Configures a that can be used to cancel the task execution. + /// + /// + /// Cancellation tokens can be used to stop the current orchestrator from awaiting a pending activity or + /// sub-orchestration completion. However, this cancellation won't necessarily stop the activity or + /// sub-orchestration from running in the background. + /// + /// The cancellation token to use for cancelling task execution. + /// Returns the current object. + public Builder UseCancellationToken(CancellationToken cancellationToken) + { + if (cancellationToken != default) + { + throw new NotSupportedException("Durable task cancellation is not yet supported. See https://github.com/microsoft/durabletask-dotnet/issues/7 for more information."); + } + + this.CancellationToken = cancellationToken; + return this; + } + + /// + /// Creates a new object from this builder. + /// + /// The created object. + public TaskOptions Build() => new(this); + } } diff --git a/src/DurableTask/TaskOrchestrationContext.cs b/src/DurableTask/TaskOrchestrationContext.cs index 99bd9e136..e4c123e59 100644 --- a/src/DurableTask/TaskOrchestrationContext.cs +++ b/src/DurableTask/TaskOrchestrationContext.cs @@ -4,6 +4,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace DurableTask; @@ -236,5 +237,44 @@ public Task CallSubOrchestratorAsync( /// public abstract void ContinueAsNew(object newInput, bool preserveUnprocessedEvents = true); - // TODO: More + /// + /// Returns an instance of that is replay-safe, meaning that the logger only + /// writes logs when the orchestrator is not replaying previous history. + /// + /// + /// This method wraps the provider instance with a new + /// implementation that only writes log messages when is false. + /// The resulting logger can be used normally in orchestrator code without needing to worry about duplicate + /// log messages caused by orchestrator replays. + /// + /// The to be wrapped for use by the orchestration. + /// An instance of that wraps the specified . + public ILogger CreateReplaySafeLogger(ILogger logger) + { + return new ReplaySafeLogger(this, logger); + } + + class ReplaySafeLogger : ILogger + { + readonly TaskOrchestrationContext context; + readonly ILogger logger; + + internal ReplaySafeLogger(TaskOrchestrationContext context, ILogger logger) + { + this.context = context ?? throw new ArgumentNullException(nameof(context)); + this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public IDisposable BeginScope(TState state) => this.logger.BeginScope(state); + + public bool IsEnabled(LogLevel logLevel) => this.logger.IsEnabled(logLevel); + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + if (!this.context.IsReplaying) + { + this.logger.Log(logLevel, eventId, state, exception, formatter); + } + } + } } diff --git a/src/DurableTask/TaskOrchestrationShim.cs b/src/DurableTask/TaskOrchestrationShim.cs index 5e0b8aa8e..4869ab26f 100644 --- a/src/DurableTask/TaskOrchestrationShim.cs +++ b/src/DurableTask/TaskOrchestrationShim.cs @@ -7,15 +7,14 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; +using Microsoft.Extensions.Logging; namespace DurableTask; -// TODO: This is public only because it's needed for Functions. Consider -// a design that doesn't require this type to be public. -public class TaskOrchestrationShim : TaskOrchestrationShim +class TaskOrchestrationShim : TaskOrchestrationShim { public TaskOrchestrationShim( - IWorkerContext workerContext, + WorkerContext workerContext, TaskName name, Func> implementation) : base(workerContext, name, new FuncTaskOrchestrator(implementation)) @@ -23,45 +22,16 @@ public TaskOrchestrationShim( } } -// TODO: Move to its own file -/// -/// Implementation of that uses -/// a delegate as its implementation. -/// -/// The orchestrator input type. -/// The orchestrator output type. -public class FuncTaskOrchestrator : TaskOrchestratorBase -{ - readonly Func> implementation; - - /// - /// Initializes a new instance of the class. - /// - /// The orchestrator function. - public FuncTaskOrchestrator(Func> implementation) - { - this.implementation = implementation; - } - - /// - protected override Task OnRunAsync(TaskOrchestrationContext context, TInput? input) - { - return this.implementation(context, input); - } -} - -// TODO: This is public only because it's needed for Functions. Consider -// a design that doesn't require this type to be public. -public class TaskOrchestrationShim : TaskOrchestration +class TaskOrchestrationShim : TaskOrchestration { readonly TaskName name; readonly ITaskOrchestrator implementation; - readonly IWorkerContext workerContext; + readonly WorkerContext workerContext; TaskOrchestrationContextWrapper? wrapperContext; public TaskOrchestrationShim( - IWorkerContext workerContext, + WorkerContext workerContext, TaskName name, ITaskOrchestrator implementation) { @@ -103,18 +73,20 @@ sealed class TaskOrchestrationContextWrapper : TaskOrchestrationContext readonly OrchestrationContext innerContext; readonly TaskName name; - readonly IWorkerContext workerContext; + readonly WorkerContext workerContext; + readonly ILogger orchestratorLogger; object? customStatus; public TaskOrchestrationContextWrapper( OrchestrationContext innerContext, TaskName name, - IWorkerContext workerContext) + WorkerContext workerContext) { this.innerContext = innerContext; this.name = name; this.workerContext = workerContext; + this.orchestratorLogger = this.CreateReplaySafeLogger(workerContext.Logger); } public override TaskName Name => this.name; @@ -130,20 +102,47 @@ public override async Task CallActivityAsync( object? input = null, TaskOptions? options = null) { + // Since the input parameter takes any object, it's possible that callers may accidentally provide a TaskOptions parameter here + // when the actually meant to provide TaskOptions for the optional options parameter. + if (input is TaskOptions && options == null) + { + throw new ArgumentException( + $"A {nameof(TaskOptions)} value was provided for the activity input but no value was provided for {nameof(options)}. " + + $"Did you actually mean to provide a {nameof(TaskOptions)} value for the {nameof(options)} parameter?", + nameof(input)); + } + try { - // TODO: Retry options - return await this.innerContext.ScheduleTask(name.Name, name.Version, input); + // TODO: Cancellation (https://github.com/microsoft/durabletask-dotnet/issues/7) + // TODO: DataConverter? + + if (options?.RetryPolicy != null) + { + return await this.innerContext.ScheduleWithRetry( + name.Name, + name.Version, + options.RetryPolicy.ToDurableTaskCoreRetryOptions(), + input); + } + else if (options?.RetryHandler != null) + { + return await this.InvokeWithCustomRetryHandler( + () => this.innerContext.ScheduleTask(name.Name, name.Version, input), + name.Name, + options.RetryHandler, + options.CancellationToken); + } + else + { + return await this.innerContext.ScheduleTask(name.Name, name.Version, input); + } + } - catch (DurableTask.Core.Exceptions.TaskFailedException coreTfe) + catch (DurableTask.Core.Exceptions.TaskFailedException e) { // Hide the core DTFx types and instead use our own - throw new TaskFailedException( - taskName: name, - taskId: coreTfe.ScheduleId, - errorName: coreTfe.FailureDetails?.ErrorName ?? "(unknown)", - errorMessage: coreTfe.FailureDetails?.ErrorMessage ?? "(unknown)", - errorDetails: coreTfe.FailureDetails?.ErrorDetails); + throw new TaskFailedException(name, e.ScheduleId, e); } } @@ -183,25 +182,54 @@ internal void ExecuteLocalActivityCalls() } } - public override Task CallSubOrchestratorAsync( + public override async Task CallSubOrchestratorAsync( TaskName orchestratorName, string? instanceId = null, object? input = null, TaskOptions? options = null) { - if (options != null) - { - throw new NotImplementedException($"{nameof(TaskOptions)} are not yet supported."); - } - // TODO: Check to see if this orchestrator is defined - // TODO: Support for retry options and custom deserialization via TaskOptions - return this.innerContext.CreateSubOrchestrationInstance( - orchestratorName.Name, - orchestratorName.Version, - instanceId ?? Guid.NewGuid().ToString("N"), - input); + // TODO: IDataConverter + + instanceId ??= Guid.NewGuid().ToString("N"); + + try + { + if (options?.RetryPolicy != null) + { + return await this.innerContext.CreateSubOrchestrationInstanceWithRetry( + orchestratorName.Name, + orchestratorName.Version, + options.RetryPolicy.ToDurableTaskCoreRetryOptions(), + input); + } + else if (options?.RetryHandler != null) + { + return await this.InvokeWithCustomRetryHandler( + () => this.innerContext.CreateSubOrchestrationInstance( + orchestratorName.Name, + orchestratorName.Version, + instanceId, + input), + orchestratorName.Name, + options.RetryHandler, + options.CancellationToken); + } + else + { + return await this.innerContext.CreateSubOrchestrationInstance( + orchestratorName.Name, + orchestratorName.Version, + instanceId, + input); + } + } + catch (DurableTask.Core.Exceptions.SubOrchestrationFailedException e) + { + // Hide the core DTFx types and instead use our own + throw new TaskFailedException(orchestratorName, e.ScheduleId, e); + } } public override Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken) @@ -289,6 +317,58 @@ public override void ContinueAsNew(object newInput, bool preserveUnprocessedEven return this.workerContext.DataConverter.Serialize(this.customStatus); } + async Task InvokeWithCustomRetryHandler( + Func> action, + string taskName, + AsyncRetryHandler retryHandler, + CancellationToken cancellationToken) + { + DateTime startTime = this.CurrentDateTimeUtc; + int failureCount = 0; + + while (true) + { + try + { + return await action(); + } + catch (DurableTask.Core.Exceptions.OrchestrationException e) + { + // Some failures are not retriable, like failures for missing activities or sub-orchestrations + if (e.FailureDetails?.IsNonRetriable == true) + { + throw; + } + + failureCount++; + + this.orchestratorLogger.RetryingTask( + this.InstanceId, + taskName, + attempt: failureCount); + + RetryContext retryContext = new( + this, + failureCount, + TaskFailureDetails.FromCoreException(e), + this.CurrentDateTimeUtc.Subtract(startTime), + cancellationToken); + + bool keepRetrying = await retryHandler(retryContext); + if (!keepRetrying) + { + throw; + } + + if (failureCount == int.MaxValue) + { + // Integer overflow safety check + throw; + } + } + } + } + class EventTaskCompletionSource : TaskCompletionSource, IEventSource { /// diff --git a/src/DurableTask/WorkerContext.cs b/src/DurableTask/WorkerContext.cs new file mode 100644 index 000000000..67c91c9a7 --- /dev/null +++ b/src/DurableTask/WorkerContext.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using Microsoft.Extensions.Logging; + +namespace DurableTask; + +record WorkerContext( + IDataConverter DataConverter, + ILogger Logger, + IServiceProvider Services); \ No newline at end of file diff --git a/test/DurableTask.Generators.Tests/AzureFunctionsTests.cs b/test/DurableTask.Generators.Tests/AzureFunctionsTests.cs index 667484000..01e345112 100644 --- a/test/DurableTask.Generators.Tests/AzureFunctionsTests.cs +++ b/test/DurableTask.Generators.Tests/AzureFunctionsTests.cs @@ -131,9 +131,9 @@ public class MyOrchestrator : TaskOrchestratorBase<{inputType}, {outputType}> static readonly MyNS.MyOrchestrator singletonMyOrchestrator = new MyNS.MyOrchestrator(); [Function(nameof(MyOrchestrator))] -public static string MyOrchestrator([OrchestrationTrigger] string orchestratorState) +public static string MyOrchestrator([OrchestrationTrigger] string orchestratorState, FunctionContext executionContext) {{ - return OrchestrationRunner.LoadAndRun(orchestratorState, singletonMyOrchestrator); + return OrchestrationRunner.LoadAndRun(orchestratorState, singletonMyOrchestrator, executionContext.InstanceServices); }} /// @@ -215,9 +215,9 @@ public abstract class MyOrchestratorBase : TaskOrchestratorBase<{inputType}, {ou static readonly MyNS.MyOrchestrator singletonMyOrchestrator = new MyNS.MyOrchestrator(); [Function(nameof(MyOrchestrator))] -public static string MyOrchestrator([OrchestrationTrigger] string orchestratorState) +public static string MyOrchestrator([OrchestrationTrigger] string orchestratorState, FunctionContext executionContext) {{ - return OrchestrationRunner.LoadAndRun(orchestratorState, singletonMyOrchestrator); + return OrchestrationRunner.LoadAndRun(orchestratorState, singletonMyOrchestrator, executionContext.InstanceServices); }} /// diff --git a/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs b/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs index bc4fca078..309215c0b 100644 --- a/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs +++ b/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs @@ -53,8 +53,8 @@ public async Task UnhandledActivityException() Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); Assert.NotNull(metadata.FailureDetails); - OrchestrationFailureDetails failureDetails = metadata.FailureDetails!; - Assert.Equal(typeof(TaskFailedException).FullName, failureDetails.ErrorName); + TaskFailureDetails failureDetails = metadata.FailureDetails!; + Assert.Equal(typeof(TaskFailedException).FullName, failureDetails.ErrorType); // Expecting something like: // "The activity 'FaultyActivity' (#0) failed with an unhandled exception: Kah-BOOOOOM!!!" @@ -63,10 +63,10 @@ public async Task UnhandledActivityException() Assert.Contains(activityName, failureDetails.ErrorMessage); Assert.Contains(errorMessage, failureDetails.ErrorMessage); - // A callstack for the orchestration is expected in the error details (not the activity callstack). - Assert.NotNull(failureDetails.ErrorDetails); - Assert.Contains(nameof(MyOrchestrationImpl), failureDetails.ErrorDetails); - Assert.DoesNotContain(nameof(MyActivityImpl), failureDetails.ErrorDetails); + // A callstack for the orchestration is expected (but not the activity callstack). + Assert.NotNull(failureDetails.StackTrace); + Assert.Contains(nameof(MyOrchestrationImpl), failureDetails.StackTrace); + Assert.DoesNotContain(nameof(MyActivityImpl), failureDetails.StackTrace); } /// @@ -87,8 +87,10 @@ public async Task UnhandledOrchestratorException() .AddTasks(tasks => tasks.AddOrchestrator(orchestratorName, ctx => { - expectedCallstack = Environment.StackTrace; - throw new Exception(errorMessage); + // The Environment.StackTrace and throw statements need to be on the same line + // to keep line numbers consistent between the expected stack trace and the actual stack trace. + // Also need to remove the top frame from Environment.StackTrace. + expectedCallstack = Environment.StackTrace.Replace("at System.Environment.get_StackTrace()", string.Empty).TrimStart(); throw new Exception(errorMessage); })) .Build(); await server.StartAsync(this.TimeoutToken); @@ -105,29 +107,288 @@ public async Task UnhandledOrchestratorException() Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); Assert.NotNull(metadata.FailureDetails); - OrchestrationFailureDetails failureDetails = metadata.FailureDetails!; - Assert.Equal(typeof(Exception).FullName, failureDetails.ErrorName); + TaskFailureDetails failureDetails = metadata.FailureDetails!; + Assert.Equal(typeof(Exception).FullName, failureDetails.ErrorType); Assert.Equal(errorMessage, failureDetails.ErrorMessage); - Assert.NotNull(failureDetails.ErrorDetails); + Assert.NotNull(failureDetails.StackTrace); Assert.NotNull(expectedCallstack); - Assert.Contains(expectedCallstack![..300], failureDetails.ErrorDetails); + Assert.Contains(expectedCallstack![..300], failureDetails.StackTrace); + } + + /// + /// Tests retry policies for activity calls. + /// + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(10)] + public async Task RetryActivityFailures(int expectedNumberOfAttempts) + { + string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging + + TaskOptions retryOptions = TaskOptions.FromRetryPolicy(new RetryPolicy( + expectedNumberOfAttempts, + firstRetryInterval: TimeSpan.FromMilliseconds(1))); + + int actualNumberOfAttempts = 0; + + TaskName orchestratorName = "BustedOrchestration"; + await using DurableTaskGrpcWorker server = this.CreateWorkerBuilder() + .AddTasks(tasks => + tasks.AddOrchestrator(orchestratorName, async ctx => + { + await ctx.CallActivityAsync("Foo", options: retryOptions); + }) + .AddActivity("Foo", context => + { + actualNumberOfAttempts++; + throw new Exception(errorMessage); + })) + .Build(); + await server.StartAsync(this.TimeoutToken); + + DurableTaskClient client = this.CreateDurableTaskClient(); + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync( + instanceId, + this.TimeoutToken, + getInputsAndOutputs: true); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts); + } + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(10)] + public async Task RetryActivityFailuresCustomLogic(int expectedNumberOfAttempts) + { + string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging + + int retryHandlerCalls = 0; + TaskOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + // This is technically orchestrator code that gets replayed, like everything else + if (!retryContext.OrchestrationContext.IsReplaying) + { + retryHandlerCalls++; + } + + // IsCausedBy is supposed to handle exception inheritance; fail if it doesn't + if (!retryContext.LastFailure.IsCausedBy()) + { + return false; + } + + // This handler only works with ApplicationException + if (!retryContext.LastFailure.IsCausedBy()) + { + return false; + } + + // Quit after N attempts + return retryContext.LastAttemptNumber < expectedNumberOfAttempts; + }); + + int actualNumberOfAttempts = 0; + + TaskName orchestratorName = "BustedOrchestration"; + await using DurableTaskGrpcWorker server = this.CreateWorkerBuilder() + .AddTasks(tasks => + tasks.AddOrchestrator(orchestratorName, async ctx => + { + await ctx.CallActivityAsync("Foo", options: retryOptions); + }) + .AddActivity("Foo", context => + { + actualNumberOfAttempts++; + throw new ApplicationException(errorMessage); + })) + .Build(); + await server.StartAsync(this.TimeoutToken); + + DurableTaskClient client = this.CreateDurableTaskClient(); + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync( + instanceId, + this.TimeoutToken, + getInputsAndOutputs: true); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.Equal(expectedNumberOfAttempts, retryHandlerCalls); + Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts); + } + + /// + /// Tests retry policies for sub-orchestration calls. + /// + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(10)] + public async Task RetrySubOrchestrationFailures(int expectedNumberOfAttempts) + { + string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging + + TaskOptions retryOptions = TaskOptions.FromRetryPolicy(new RetryPolicy( + expectedNumberOfAttempts, + firstRetryInterval: TimeSpan.FromMilliseconds(1))); + + int actualNumberOfAttempts = 0; + + TaskName orchestratorName = "OrchestrationWithBustedSubOrchestrator"; + await using DurableTaskGrpcWorker server = this.CreateWorkerBuilder() + .AddTasks(tasks => + tasks.AddOrchestrator(orchestratorName, async ctx => + { + await ctx.CallSubOrchestratorAsync("BustedSubOrchestrator", options: retryOptions); + }) + .AddOrchestrator("BustedSubOrchestrator", context => + { + actualNumberOfAttempts++; + throw new ApplicationException(errorMessage); + })) + .Build(); + await server.StartAsync(this.TimeoutToken); + + DurableTaskClient client = this.CreateDurableTaskClient(); + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync( + instanceId, + this.TimeoutToken, + getInputsAndOutputs: true); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts); } - /////// - /////// Tests retry policies for activity calls. - /////// - ////[Fact] - ////public async Task RetryActivityFailures() - ////{ - //// throw new NotImplementedException(); - ////} - - /////// - /////// Tests retry policies for sub-orchestrations. - /////// - ////[Fact] - ////public async Task RetrySubOrchestrationFailures() - ////{ - //// throw new NotImplementedException(); - ////} + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(10)] + public async Task RetrySubOrchestratorFailuresCustomLogic(int expectedNumberOfAttempts) + { + string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging + + int retryHandlerCalls = 0; + TaskOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + // This is technically orchestrator code that gets replayed, like everything else + if (!retryContext.OrchestrationContext.IsReplaying) + { + retryHandlerCalls++; + } + + // IsCausedBy is supposed to handle exception inheritance; fail if it doesn't + if (!retryContext.LastFailure.IsCausedBy()) + { + return false; + } + + // This handler only works with ApplicationException + if (!retryContext.LastFailure.IsCausedBy()) + { + return false; + } + + // Quit after N attempts + return retryContext.LastAttemptNumber < expectedNumberOfAttempts; + }); + + int actualNumberOfAttempts = 0; + + TaskName orchestratorName = "OrchestrationWithBustedSubOrchestrator"; + await using DurableTaskGrpcWorker server = this.CreateWorkerBuilder() + .AddTasks(tasks => + tasks.AddOrchestrator(orchestratorName, async ctx => + { + await ctx.CallSubOrchestratorAsync("BustedSubOrchestrator", options: retryOptions); + }) + .AddOrchestrator("BustedSubOrchestrator", context => + { + actualNumberOfAttempts++; + throw new ApplicationException(errorMessage); + })) + .Build(); + await server.StartAsync(this.TimeoutToken); + + DurableTaskClient client = this.CreateDurableTaskClient(); + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync( + instanceId, + this.TimeoutToken, + getInputsAndOutputs: true); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.Equal(expectedNumberOfAttempts, retryHandlerCalls); + Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TaskNotFoundErrorsAreNotRetried(bool activity) + { + int retryHandlerCalls = 0; + TaskOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + retryHandlerCalls++; + return false; + }); + + TaskName orchestratorName = "OrchestrationWithMissingTask"; + await using DurableTaskGrpcWorker server = this.CreateWorkerBuilder() + .AddTasks(tasks => + tasks.AddOrchestrator(orchestratorName, async ctx => + { + if (activity) + { + await ctx.CallActivityAsync("Bogus", options: retryOptions); + } + else + { + await ctx.CallSubOrchestratorAsync("Bogus", options: retryOptions); + } + })) + .Build(); + await server.StartAsync(this.TimeoutToken); + + DurableTaskClient client = this.CreateDurableTaskClient(); + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync( + instanceId, + this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + + // The retry handler should never get called for a missing activity or sub-orchestrator exception + Assert.Equal(0, retryHandlerCalls); + } + + [Fact] + public void ThrowsIfRetryPolicyAndThenHandler() + { + Assert.Throws(() => TaskOptions.CreateBuilder() + .UseRetryPolicy(new RetryPolicy(3, TimeSpan.FromHours(1))) + .UseRetryHandler(_ => true)); + } + + [Fact] + public void ThrowsIfRetryHandlerAndThenPolicy() + { + Assert.Throws(() => TaskOptions.CreateBuilder() + .UseRetryHandler(_ => true) + .UseRetryPolicy(new RetryPolicy(3, TimeSpan.FromHours(1)))); + } } diff --git a/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs b/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs index 067e69ccc..d213c1fcf 100644 --- a/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs +++ b/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs @@ -216,34 +216,6 @@ public async Task ActivityChain() Assert.Equal(10, metadata.ReadOutputAs()); } - [Fact] - public async Task OrchestratorException() - { - string errorMessage = "Kah-BOOOOOM!!!"; - - TaskName orchestratorName = nameof(OrchestratorException); - await using DurableTaskGrpcWorker server = this.CreateWorkerBuilder() - .AddTasks(tasks => tasks.AddOrchestrator(orchestratorName, ctx => throw new Exception(errorMessage))) - .Build(); - await server.StartAsync(this.TimeoutToken); - - DurableTaskClient client = this.CreateDurableTaskClient(); - string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync( - instanceId, - this.TimeoutToken, - getInputsAndOutputs: true); - - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); - - OrchestrationFailureDetails? failureDetails = metadata.ReadOutputAs(); - Assert.NotNull(failureDetails); - Assert.Equal(typeof(Exception).FullName, failureDetails!.ErrorName); - Assert.Contains(errorMessage, failureDetails!.ErrorDetails); - } - [Fact] public async Task ActivityFanOut() { From 6466c2e2a93ce50ff02a429e4f412f51e57e3d54 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Sat, 26 Mar 2022 15:12:21 -0700 Subject: [PATCH 3/4] Enable running multiple test classes in parallel --- test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs | 10 +++++----- test/DurableTask.Sdk.Tests/IntegrationTestBase.cs | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs b/test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs index 961b7be47..2081f4b00 100644 --- a/test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs +++ b/test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs @@ -15,9 +15,10 @@ namespace DurableTask.Sdk.Tests; -public class GrpcSidecarFixture : IDisposable +public sealed class GrpcSidecarFixture : IDisposable { - const string ListenAddress = "http://127.0.0.1:4002"; + // Use a random port number to allow multiple instances to run in parallel + readonly string listenAddress = $"http://127.0.0.1:{Random.Shared.Next(30000, 40000)}"; readonly IWebHost host; readonly GrpcChannel sidecarChannel; @@ -33,7 +34,7 @@ public GrpcSidecarFixture() // https://docs.microsoft.com/en-us/aspnet/core/grpc/troubleshoot?view=aspnetcore-3.0 options.ConfigureEndpointDefaults(listenOptions => listenOptions.Protocols = HttpProtocols.Http2); }) - .UseUrls(ListenAddress) + .UseUrls(this.listenAddress) .ConfigureServices(services => { services.AddGrpc(); @@ -53,14 +54,13 @@ public GrpcSidecarFixture() this.host.Start(); - this.sidecarChannel = GrpcChannel.ForAddress(ListenAddress); + this.sidecarChannel = GrpcChannel.ForAddress(this.listenAddress); } public DurableTaskGrpcWorker.Builder GetWorkerBuilder() { // The gRPC channel is reused across tests to avoid the overhead of creating new connections (which is very slow) return DurableTaskGrpcWorker.CreateBuilder().UseGrpcChannel(this.sidecarChannel); - } public DurableTaskGrpcClient.Builder GetClientBuilder() diff --git a/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs b/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs index a5bb9565b..0b77156e2 100644 --- a/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs +++ b/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs @@ -34,7 +34,6 @@ public IntegrationTestBase(ITestOutputHelper output, GrpcSidecarFixture sidecarF this.sidecarFixture = sidecarFixture; } - /// /// Gets a that triggers after a default test timeout period. /// The actual timeout value is increased if a debugger is attached to the test process. From 10c35db98163c7378d26cf4168c46d5951d3b441 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Thu, 7 Apr 2022 13:58:54 -0700 Subject: [PATCH 4/4] Minor retry policy API updates --- samples/ConsoleApp/Program.cs | 4 +-- src/DurableTask/TaskFailedException.cs | 2 +- src/DurableTask/TaskOptions.cs | 32 +++++++------------ src/DurableTask/TaskOrchestrationShim.cs | 4 --- .../OrchestrationErrorHandling.cs | 16 ---------- 5 files changed, 14 insertions(+), 44 deletions(-) diff --git a/samples/ConsoleApp/Program.cs b/samples/ConsoleApp/Program.cs index fd8c4a904..5400d3243 100644 --- a/samples/ConsoleApp/Program.cs +++ b/samples/ConsoleApp/Program.cs @@ -22,7 +22,7 @@ }); }); -DurableTaskGrpcWorker server = DurableTaskGrpcWorker.CreateBuilder() +DurableTaskGrpcWorker worker = DurableTaskGrpcWorker.CreateBuilder() .AddTasks(tasks => { tasks.AddOrchestrator("HelloSequence", async context => @@ -41,7 +41,7 @@ await context.CallActivityAsync("SayHello", "Seattle"), .UseLoggerFactory(loggerFactory) .Build(); -await server.StartAsync(timeout: TimeSpan.FromSeconds(30)); +await worker.StartAsync(timeout: TimeSpan.FromSeconds(30)); await using DurableTaskClient client = DurableTaskGrpcClient.Create(); string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("HelloSequence"); diff --git a/src/DurableTask/TaskFailedException.cs b/src/DurableTask/TaskFailedException.cs index 60af5b107..2cd48d7d3 100644 --- a/src/DurableTask/TaskFailedException.cs +++ b/src/DurableTask/TaskFailedException.cs @@ -50,6 +50,6 @@ static string GetExceptionMessage(string taskName, int taskId, OrchestrationExce { // NOTE: Some integration tests depend on the format of this exception message. string subMessage = cause.FailureDetails?.ErrorMessage ?? cause.Message; - return $"Activity task '{taskName}' (#{taskId}) failed with an unhandled exception: {subMessage}"; + return $"Task '{taskName}' (#{taskId}) failed with an unhandled exception: {subMessage}"; } } diff --git a/src/DurableTask/TaskOptions.cs b/src/DurableTask/TaskOptions.cs index 652e4cba5..70fb9e466 100644 --- a/src/DurableTask/TaskOptions.cs +++ b/src/DurableTask/TaskOptions.cs @@ -38,7 +38,7 @@ internal TaskOptions(Builder builder) /// Returns a newly created object. public static TaskOptions FromRetryPolicy(RetryPolicy policy, CancellationToken cancellationToken = default) { - return CreateBuilder().UseRetryPolicy(policy).UseCancellationToken(cancellationToken).Build(); + return CreateBuilder().WithRetryStrategy(policy).WithCancellationToken(cancellationToken).Build(); } /// @@ -49,7 +49,7 @@ public static TaskOptions FromRetryPolicy(RetryPolicy policy, CancellationToken /// Returns a newly created object. public static TaskOptions FromRetryHandler(RetryHandler retryHandler, CancellationToken cancellationToken = default) { - return CreateBuilder().UseRetryHandler(retryHandler).UseCancellationToken(cancellationToken).Build(); + return CreateBuilder().WithRetryStrategy(retryHandler).WithCancellationToken(cancellationToken).Build(); } /// @@ -58,7 +58,7 @@ public static TaskOptions FromRetryHandler(RetryHandler retryHandler, Cancellati /// public static TaskOptions FromRetryHandler(AsyncRetryHandler retryHandler, CancellationToken cancellationToken = default) { - return CreateBuilder().UseRetryHandler(retryHandler).UseCancellationToken(cancellationToken).Build(); + return CreateBuilder().WithRetryStrategy(retryHandler).WithCancellationToken(cancellationToken).Build(); } /// @@ -83,24 +83,19 @@ public sealed class Builder /// /// The task retry policy to configure. /// Returns the current object. - /// Thrown if a was already configured for this . - public Builder UseRetryPolicy(RetryPolicy policy) + public Builder WithRetryStrategy(RetryPolicy policy) { - if (this.RetryHandler != null) - { - throw new InvalidOperationException("You can configure a retry policy or a retry handler, but not both."); - } - this.RetryPolicy = policy; + this.RetryHandler = null; return this; } - /// - public Builder UseRetryHandler(RetryHandler handler) + /// + public Builder WithRetryStrategy(RetryHandler handler) { // Synchronous handlers are wrapped in an async handler so that we only have // to keep track of a single handler assignment. - return this.UseRetryHandler(retryContext => Task.FromResult(handler(retryContext))); + return this.WithRetryStrategy(retryContext => Task.FromResult(handler(retryContext))); } /// @@ -108,15 +103,10 @@ public Builder UseRetryHandler(RetryHandler handler) /// /// The handler to invoke when deciding whether to retry a failed orchestrator task. /// Returns the current object. - /// Thrown if a was already configured for this . - public Builder UseRetryHandler(AsyncRetryHandler handler) + public Builder WithRetryStrategy(AsyncRetryHandler handler) { - if (this.RetryPolicy != null) - { - throw new InvalidOperationException("You can configure a retry policy or a retry handler, but not both."); - } - this.RetryHandler = handler; + this.RetryPolicy = null; return this; } @@ -130,7 +120,7 @@ public Builder UseRetryHandler(AsyncRetryHandler handler) /// /// The cancellation token to use for cancelling task execution. /// Returns the current object. - public Builder UseCancellationToken(CancellationToken cancellationToken) + public Builder WithCancellationToken(CancellationToken cancellationToken) { if (cancellationToken != default) { diff --git a/src/DurableTask/TaskOrchestrationShim.cs b/src/DurableTask/TaskOrchestrationShim.cs index 4869ab26f..ff2273bba 100644 --- a/src/DurableTask/TaskOrchestrationShim.cs +++ b/src/DurableTask/TaskOrchestrationShim.cs @@ -45,10 +45,6 @@ public TaskOrchestrationShim( this.wrapperContext = new(innerContext, this.name, this.workerContext); object? input = this.workerContext.DataConverter.Deserialize(rawInput, this.implementation.InputType); - - // NOTE: If this throws, the error response will be generated by DurableTask.Core. However, - // it won't be consistent with our expected format. We currently work around this - // in the gRPC handling code, but ideally we wouldn't need this workaround. object? output = await this.implementation.RunAsync(this.wrapperContext, input); // Return the output (if any) as a serialized string. diff --git a/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs b/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs index 309215c0b..bfab16624 100644 --- a/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs +++ b/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs @@ -375,20 +375,4 @@ public async Task TaskNotFoundErrorsAreNotRetried(bool activity) // The retry handler should never get called for a missing activity or sub-orchestrator exception Assert.Equal(0, retryHandlerCalls); } - - [Fact] - public void ThrowsIfRetryPolicyAndThenHandler() - { - Assert.Throws(() => TaskOptions.CreateBuilder() - .UseRetryPolicy(new RetryPolicy(3, TimeSpan.FromHours(1))) - .UseRetryHandler(_ => true)); - } - - [Fact] - public void ThrowsIfRetryHandlerAndThenPolicy() - { - Assert.Throws(() => TaskOptions.CreateBuilder() - .UseRetryHandler(_ => true) - .UseRetryPolicy(new RetryPolicy(3, TimeSpan.FromHours(1)))); - } }