diff --git a/eng/proto b/eng/proto index 024446136..4207e1dbd 160000 --- a/eng/proto +++ b/eng/proto @@ -1 +1 @@ -Subproject commit 024446136eecc38c975b95f283138e7a0df149c2 +Subproject commit 4207e1dbd14cedc268f69c3befee60fcaad19367 diff --git a/src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs b/src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs index 860901f4e..81b747b18 100644 --- a/src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs +++ b/src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs @@ -15,24 +15,68 @@ public static class DurableTaskClientExtensions /// Filter purging to orchestrations after this date. /// Filter purging to orchestrations before this date. /// Filter purging to orchestrations with these statuses. + /// The optional options for purging the orchestration. /// The cancellation token. /// /// This method returns a object after the operation has completed with a - /// value of 1 or 0, depending on whether the target - /// instance was successfully purged. + /// indicating the number of orchestration instances that were purged, + /// including the count of sub-orchestrations purged if any. /// public static Task PurgeInstancesAsync( this DurableTaskClient client, DateTimeOffset? createdFrom, DateTimeOffset? createdTo, IEnumerable? statuses, + PurgeInstanceOptions? options, CancellationToken cancellation = default) { Check.NotNull(client); PurgeInstancesFilter filter = new(createdFrom, createdTo, statuses); - return client.PurgeAllInstancesAsync(filter, cancellation); + return client.PurgeAllInstancesAsync(filter, options, cancellation); } + /// + /// Purges orchestration instances metadata from the durable store. + /// + /// The DurableTask client. + /// Filter purging to orchestrations after this date. + /// Filter purging to orchestrations before this date. + /// Filter purging to orchestrations with these statuses. + /// The cancellation token. + /// + /// This method returns a object after the operation has completed with a + /// indicating the number of orchestration instances that were purged, + /// including the count of sub-orchestrations purged if any. + /// + public static Task PurgeInstancesAsync( + this DurableTaskClient client, + DateTimeOffset? createdFrom, + DateTimeOffset? createdTo, + IEnumerable? statuses, + CancellationToken cancellation = default) + => PurgeInstancesAsync(client, createdFrom, createdTo, statuses, null, cancellation); + + /// + /// Purges orchestration instances metadata from the durable store. + /// + /// The DurableTask client. + /// Filter purging to orchestrations after this date. + /// Filter purging to orchestrations before this date. + /// The optional options for purging the orchestration. + /// The cancellation token. + /// + /// This method returns a object after the operation has completed with a + /// indicating the number of orchestration instances that were purged, + /// including the count of sub-orchestrations purged if any. + /// + public static Task PurgeInstancesAsync( + this DurableTaskClient client, + DateTimeOffset? createdFrom, + DateTimeOffset? createdTo, + PurgeInstanceOptions? options, + CancellationToken cancellation = default) + => PurgeInstancesAsync(client, createdFrom, createdTo, null, options, cancellation); + /// /// Purges orchestration instances metadata from the durable store. /// @@ -42,13 +86,13 @@ public static Task PurgeInstancesAsync( /// The cancellation token. /// /// This method returns a object after the operation has completed with a - /// value of 1 or 0, depending on whether the target - /// instance was successfully purged. + /// indicating the number of orchestration instances that were purged, + /// including the count of sub-orchestrations purged if any. /// public static Task PurgeInstancesAsync( this DurableTaskClient client, DateTimeOffset? createdFrom, DateTimeOffset? createdTo, CancellationToken cancellation = default) - => PurgeInstancesAsync(client, createdFrom, createdTo, null, cancellation); + => PurgeInstancesAsync(client, createdFrom, createdTo, null, null, cancellation); } diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 510230b09..9fc8f89cc 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -209,12 +209,19 @@ public virtual Task WaitForInstanceCompletionAsync( public abstract Task WaitForInstanceCompletionAsync( string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default); - /// + /// public virtual Task TerminateInstanceAsync(string instanceId, CancellationToken cancellation) => this.TerminateInstanceAsync(instanceId, null, cancellation); + /// + public virtual Task TerminateInstanceAsync(string instanceId, object? output, CancellationToken cancellation = default) + { + TerminateInstanceOptions? options = output is null ? null : new() { Output = output }; + return this.TerminateInstanceAsync(instanceId, options, cancellation); + } + /// - /// Terminates a running orchestration instance and updates its runtime status to + /// Terminates an orchestration instance and updates its runtime status to /// . /// /// @@ -226,10 +233,12 @@ public virtual Task TerminateInstanceAsync(string instanceId, CancellationToken /// the terminated state. /// /// + /// Terminating an orchestration by default will not terminate any of the child sub-orchestrations that were started by + /// the orchetration instance. If you want to terminate sub-orchestration instances as well, you can set + /// flag to true which will enable termination of child sub-orchestration instances. It is set to false by default. /// Terminating an orchestration instance has no effect on any in-flight activity function executions - /// or sub-orchestrations that were started by the terminated instance. Those actions will continue to run - /// without interruption. However, their results will be discarded. If you want to terminate sub-orchestrations, - /// you must issue separate terminate commands for each sub-orchestration instance. + /// that were started by the terminated instance. Those actions will continue to run + /// without interruption. However, their results will be discarded. /// /// At the time of writing, there is no way to terminate an in-flight activity execution. /// @@ -237,14 +246,14 @@ public virtual Task TerminateInstanceAsync(string instanceId, CancellationToken /// /// /// The ID of the orchestration instance to terminate. - /// The optional output to set for the terminated orchestration instance. + /// The optional options for terminating the orchestration. /// /// The cancellation token. This only cancels enqueueing the termination request to the backend. Does not abort /// termination of the orchestration once enqueued. /// /// A task that completes when the terminate message is enqueued. - public abstract Task TerminateInstanceAsync( - string instanceId, object? output = null, CancellationToken cancellation = default); + public virtual Task TerminateInstanceAsync(string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default) + => throw new NotSupportedException($"{this.GetType()} does not support orchestration termination."); /// public virtual Task SuspendInstanceAsync(string instanceId, CancellationToken cancellation) @@ -327,6 +336,10 @@ public abstract Task ResumeInstanceAsync( /// An async pageable of the query results. public abstract AsyncPageable GetAllInstancesAsync(OrchestrationQuery? filter = null); + /// + public virtual Task PurgeInstancesAsync(string instanceId, CancellationToken cancellation) + => this.PurgeInstanceAsync(instanceId, null, cancellation); + /// /// Purges orchestration instance metadata from the durable store. /// @@ -338,37 +351,53 @@ public abstract Task ResumeInstanceAsync( /// , , or /// state can be purged. /// + /// Purging an orchestration will by default not purge any of the child sub-orchestrations that were started by the + /// orchetration instance. If you want to purge sub-orchestration instances, you can set flag to + /// true which will enable purging of child sub-orchestration instances. It is set to false by default. /// If is not found in the data store, or if the instance is found but not in a /// terminal state, then the returned object will have a /// value of 0. Otherwise, the existing data will be purged and - /// will be 1. + /// will be the count of purged instances. /// /// /// The unique ID of the orchestration instance to purge. + /// The optional options for purging the orchestration. /// /// A that can be used to cancel the purge operation. /// /// /// This method returns a object after the operation has completed with a - /// value of 1 or 0, depending on whether the target - /// instance was successfully purged. + /// indicating the number of orchestration instances that were purged, + /// including the count of sub-orchestrations purged if any. /// - public abstract Task PurgeInstanceAsync( - string instanceId, CancellationToken cancellation = default); + public virtual Task PurgeInstanceAsync( + string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) + { + throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances."); + } + + /// + public virtual Task PurgeAllInstancesAsync(PurgeInstancesFilter filter, CancellationToken cancellation) + => this.PurgeAllInstancesAsync(new PurgeInstancesFilter(), null, cancellation); /// /// Purges orchestration instances metadata from the durable store. /// /// The filter for which orchestrations to purge. + /// The optional options for purging the orchestration. /// /// A that can be used to cancel the purge operation. /// /// /// This method returns a object after the operation has completed with a - /// indicating the number of orchestration instances that were purged. + /// indicating the number of orchestration instances that were purged, + /// including the count of sub-orchestrations purged if any. /// - public abstract Task PurgeAllInstancesAsync( - PurgeInstancesFilter filter, CancellationToken cancellation = default); + public virtual Task PurgeAllInstancesAsync( + PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) + { + throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances."); + } // TODO: Create task hub diff --git a/src/Client/Core/PurgeInstanceOptions.cs b/src/Client/Core/PurgeInstanceOptions.cs new file mode 100644 index 000000000..1c7a7956a --- /dev/null +++ b/src/Client/Core/PurgeInstanceOptions.cs @@ -0,0 +1,10 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Client; + +/// +/// Options to purge an orchestration. +/// +/// The optional boolean value indicating whether to purge sub-orchestrations as well. +public record PurgeInstanceOptions(bool Recursive = false); diff --git a/src/Client/Core/TerminateInstanceOptions.cs b/src/Client/Core/TerminateInstanceOptions.cs new file mode 100644 index 000000000..ca49cb680 --- /dev/null +++ b/src/Client/Core/TerminateInstanceOptions.cs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Client; + +/// +/// Options to terminate an orchestration. +/// +/// The optional output to set for the terminated orchestration instance. +/// The optional boolean value indicating whether to terminate sub-orchestrations as well. +public record TerminateInstanceOptions(object? Output = null, bool Recursive = false); diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index b0efbb933..f0d6e6e8b 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -124,8 +124,11 @@ public override async Task RaiseEventAsync( /// public override async Task TerminateInstanceAsync( - string instanceId, object? output = null, CancellationToken cancellation = default) + string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default) { + object? output = options?.Output; + bool recursive = options?.Recursive ?? false; + Check.NotNullOrEmpty(instanceId); Check.NotEntity(this.options.EnableEntitySupport, instanceId); @@ -137,6 +140,7 @@ await this.sidecarClient.TerminateInstanceAsync( { InstanceId = instanceId, Output = serializedOutput, + Recursive = recursive, }, cancellationToken: cancellation); } @@ -321,18 +325,20 @@ public override async Task WaitForInstanceCompletionAsync /// public override Task PurgeInstanceAsync( - string instanceId, CancellationToken cancellation = default) + string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { + bool recursive = options?.Recursive ?? false; this.logger.PurgingInstanceMetadata(instanceId); - P.PurgeInstancesRequest request = new() { InstanceId = instanceId }; + P.PurgeInstancesRequest request = new() { InstanceId = instanceId, Recursive = recursive }; return this.PurgeInstancesCoreAsync(request, cancellation); } /// public override Task PurgeAllInstancesAsync( - PurgeInstancesFilter filter, CancellationToken cancellation = default) + PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { + bool recursive = options?.Recursive ?? false; this.logger.PurgingInstances(filter); P.PurgeInstancesRequest request = new() { @@ -341,6 +347,7 @@ public override Task PurgeAllInstancesAsync( CreatedTimeFrom = filter?.CreatedFrom.ToTimestamp(), CreatedTimeTo = filter?.CreatedTo.ToTimestamp(), }, + Recursive = recursive, }; if (filter?.Statuses is not null) diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index b948516c6..3f3c52116 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -95,20 +95,24 @@ public override AsyncPageable GetAllInstancesAsync(Orches /// public override async Task PurgeInstanceAsync( - string instanceId, CancellationToken cancellation = default) + string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { Check.NotNullOrEmpty(instanceId); cancellation.ThrowIfCancellationRequested(); + + // TODO: Support recursive purge of sub-orchestrations Core.PurgeResult result = await this.PurgeClient.PurgeInstanceStateAsync(instanceId); return result.ConvertFromCore(); } /// public override async Task PurgeAllInstancesAsync( - PurgeInstancesFilter filter, CancellationToken cancellation = default) + PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { Check.NotNull(filter); cancellation.ThrowIfCancellationRequested(); + + // TODO: Support recursive purge of sub-orchestrations Core.PurgeResult result = await this.PurgeClient.PurgeInstanceStateAsync(filter.ConvertToCore()); return result.ConvertFromCore(); } @@ -169,11 +173,14 @@ public override Task ResumeInstanceAsync( /// public override Task TerminateInstanceAsync( - string instanceId, object? output = null, CancellationToken cancellation = default) + string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default) { + object? output = options?.Output; Check.NotNullOrEmpty(instanceId); cancellation.ThrowIfCancellationRequested(); string? reason = this.DataConverter.Serialize(output); + + // TODO: Support recursive termination of sub-orchestrations return this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason); } diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 74dad9dd5..8c6bf674c 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -49,7 +49,6 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) OrchestrationInstance = proto.ExecutionStarted.ParentInstance.OrchestrationInstance.ToCore(), TaskScheduleId = proto.ExecutionStarted.ParentInstance.TaskScheduledId, }, - Correlation = proto.ExecutionStarted.CorrelationData, ScheduledStartTime = proto.ExecutionStarted.ScheduledStartTimestamp?.ToDateTime(), }; break; diff --git a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs index 0732c1a5e..b2b184cad 100644 --- a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs @@ -89,13 +89,13 @@ public override AsyncPageable GetAllInstancesAsync(Orches } public override Task PurgeInstanceAsync( - string instanceId, CancellationToken cancellation = default) + string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { throw new NotImplementedException(); } public override Task PurgeAllInstancesAsync( - PurgeInstancesFilter filter, CancellationToken cancellation = default) + PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { throw new NotImplementedException(); } @@ -128,7 +128,7 @@ public override Task SuspendInstanceAsync( } public override Task TerminateInstanceAsync( - string instanceId, object? output = null, CancellationToken cancellation = default) + string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default) { throw new NotImplementedException(); } diff --git a/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs b/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs index d92250c0a..9af6469ec 100644 --- a/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs @@ -119,13 +119,13 @@ public override AsyncPageable GetAllInstancesAsync(Orches } public override Task PurgeInstanceAsync( - string instanceId, CancellationToken cancellation = default) + string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { throw new NotImplementedException(); } public override Task PurgeAllInstancesAsync( - PurgeInstancesFilter filter, CancellationToken cancellation = default) + PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { throw new NotImplementedException(); } @@ -158,7 +158,7 @@ public override Task SuspendInstanceAsync( } public override Task TerminateInstanceAsync( - string instanceId, object? output = null, CancellationToken cancellation = default) + string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default) { throw new NotImplementedException(); }