diff --git a/Microsoft.DurableTask.sln b/Microsoft.DurableTask.sln index 0b05c418..9a00fb4b 100644 --- a/Microsoft.DurableTask.sln +++ b/Microsoft.DurableTask.sln @@ -127,6 +127,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ActivityVersioningSample", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EntityWithVersionedOrchestrationSample", "samples\EntityWithVersionedOrchestrationSample\EntityWithVersionedOrchestrationSample.csproj", "{8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "UnversionedFallbackSample", "samples\UnversionedFallbackSample\UnversionedFallbackSample.csproj", "{1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -761,6 +763,18 @@ Global {8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2}.Release|x64.Build.0 = Release|Any CPU {8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2}.Release|x86.ActiveCfg = Release|Any CPU {8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2}.Release|x86.Build.0 = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|x64.ActiveCfg = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|x64.Build.0 = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|x86.ActiveCfg = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|x86.Build.0 = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|Any CPU.Build.0 = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|x64.ActiveCfg = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|x64.Build.0 = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|x86.ActiveCfg = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -824,6 +838,7 @@ Global {1E30F09F-1ADA-4375-81CC-F0FBC74D5621} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} {3FBCFDBA-F547-4FD5-B8C6-0B645EF73E3A} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} {8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71} diff --git a/README.md b/README.md index 84686d37..8090c181 100644 --- a/README.md +++ b/README.md @@ -196,7 +196,7 @@ Durable Task Scheduler provides durable execution in Azure. Durable execution is This SDK can also be used with the Durable Task Scheduler directly, without any Durable Functions dependency. For getting started, you can find documentation and samples [here](https://learn.microsoft.com/en-us/azure/azure-functions/durable/what-is-durable-task). -For runnable DTS emulator examples that demonstrate versioning, see the [WorkerVersioningSample](samples/WorkerVersioningSample/README.md) (deployment-based versioning), the [EternalOrchestrationVersionMigrationSample](samples/EternalOrchestrationVersionMigrationSample/README.md) (multi-version routing with `[DurableTask(Version = "...")]`), the [ActivityVersioningSample](samples/ActivityVersioningSample/README.md) (activity versioning with inherited defaults and explicit override support), and the [EntityWithVersionedOrchestrationSample](samples/EntityWithVersionedOrchestrationSample/README.md) (a single instance migrating v1→v2 via `ContinueAsNew(NewVersion)` while preserving entity-held state). +For runnable DTS emulator examples that demonstrate versioning, see the [WorkerVersioningSample](samples/WorkerVersioningSample/README.md) (deployment-based versioning), the [EternalOrchestrationVersionMigrationSample](samples/EternalOrchestrationVersionMigrationSample/README.md) (multi-version routing with `[DurableTask(Version = "...")]`), the [ActivityVersioningSample](samples/ActivityVersioningSample/README.md) (activity versioning with inherited defaults and explicit override support), the [EntityWithVersionedOrchestrationSample](samples/EntityWithVersionedOrchestrationSample/README.md) (a single instance migrating v1→v2 via `ContinueAsNew(NewVersion)` while preserving entity-held state), and the [UnversionedFallbackSample](samples/UnversionedFallbackSample/README.md) (an unversioned catch-all for unmatched explicit versions). ## Obtaining the Protobuf definitions diff --git a/samples/UnversionedFallbackSample/Program.cs b/samples/UnversionedFallbackSample/Program.cs new file mode 100644 index 00000000..0e7bb598 --- /dev/null +++ b/samples/UnversionedFallbackSample/Program.cs @@ -0,0 +1,112 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +// This sample demonstrates opt-in unversioned fallback for per-task versioning. +// A worker can register one explicit legacy implementation for a known version +// and an unversioned implementation as the catch-all for versions that do not +// have an explicit [DurableTask(Version = "...")] registration. + +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +HostApplicationBuilder builder = Host.CreateApplicationBuilder(args); + +string connectionString = builder.Configuration.GetValue("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") + ?? throw new InvalidOperationException( + "Set DURABLE_TASK_SCHEDULER_CONNECTION_STRING. " + + "For the local emulator: Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"); + +builder.Services.AddDurableTaskWorker(wb => +{ + wb.AddTasks(tasks => tasks.AddAllGeneratedTasks()); + wb.UseVersioning(new DurableTaskWorkerOptions.VersioningOptions + { + // Activity fallback is the safer place to start: activities are stateless and do not replay + // history. Enable orchestrator fallback (commented below) only when the unversioned + // orchestrator is replay-compatible with every version it may receive. + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }); + wb.UseWorkItemFilters(); + wb.UseDurableTaskScheduler(connectionString); +}); + +builder.Services.AddDurableTaskClient(cb => cb.UseDurableTaskScheduler(connectionString)); + +IHost host = builder.Build(); +await host.StartAsync(); + +await using DurableTaskClient client = host.Services.GetRequiredService(); + +Console.WriteLine("=== Unversioned fallback for versioned task dispatch ==="); +Console.WriteLine(); + +SupportRequest request = new("Contoso", "BGP session down"); + +Console.WriteLine("Scheduling SupportWorkflow version 1.4.0 ..."); +string legacyId = await client.ScheduleNewOrchestrationInstanceAsync( + nameof(SupportWorkflow), + request, + new StartOrchestrationOptions + { + Version = new TaskVersion("1.4.0"), + }); +OrchestrationMetadata legacy = await client.WaitForInstanceCompletionAsync(legacyId, getInputsAndOutputs: true); +Console.WriteLine($" Result: {legacy.ReadOutputAs()}"); +Console.WriteLine(); + +Console.WriteLine("Scheduling SupportWorkflow version 1.0 ..."); +string fallbackId = await client.ScheduleNewOrchestrationInstanceAsync( + nameof(SupportWorkflow), + request, + new StartOrchestrationOptions + { + Version = new TaskVersion("1.0"), + }); +OrchestrationMetadata fallback = await client.WaitForInstanceCompletionAsync(fallbackId, getInputsAndOutputs: true); +Console.WriteLine($" Result: {fallback.ReadOutputAs()}"); +Console.WriteLine(); + +Console.WriteLine("Done! Version 1.4.0 used the explicit legacy class; version 1.0 used the unversioned fallback."); + +await host.StopAsync(); + +/// +/// The current implementation. With OrchestratorUnversionedFallback enabled, this unversioned registration +/// handles every requested SupportWorkflow version that does not have an exact explicit registration. +/// +[DurableTask(nameof(SupportWorkflow))] +public sealed class SupportWorkflow : TaskOrchestrator +{ + /// + public override Task RunAsync(TaskOrchestrationContext context, SupportRequest input) + { + return Task.FromResult( + $"Current SupportWorkflow handled version '{context.Version}' for {input.Customer}: {input.Issue}"); + } +} + +/// +/// A pinned legacy implementation for version 1.4.0. +/// +[DurableTask(nameof(SupportWorkflow), Version = "1.4.0")] +public sealed class SupportWorkflowLegacyV140 : TaskOrchestrator +{ + /// + public override Task RunAsync(TaskOrchestrationContext context, SupportRequest input) + { + return Task.FromResult( + $"Legacy SupportWorkflow 1.4.0 handled version '{context.Version}' for {input.Customer}: {input.Issue}"); + } +} + +/// +/// Request input for the support workflow. +/// +public sealed record SupportRequest(string Customer, string Issue); diff --git a/samples/UnversionedFallbackSample/README.md b/samples/UnversionedFallbackSample/README.md new file mode 100644 index 00000000..9eb5b92f --- /dev/null +++ b/samples/UnversionedFallbackSample/README.md @@ -0,0 +1,82 @@ +# Unversioned Fallback Sample + +This sample demonstrates opt-in unversioned fallback for per-task versioning. It shows how one explicit versioned class can coexist with an unversioned catch-all implementation for versions that do not have their own `[DurableTask(Version = "...")]` registration. + +## What it shows + +- `SupportWorkflowLegacyV140` is registered as `[DurableTask(nameof(SupportWorkflow), Version = "1.4.0")]`. +- `SupportWorkflow` is registered without a version and acts as the current catch-all implementation. +- The worker enables both `OrchestratorUnversionedFallback = CatchAll` and `ActivityUnversionedFallback = CatchAll`. The orchestrator flag is what the demo exercises; the activity flag is set to illustrate that the two sides are configured independently. +- `UseWorkItemFilters()` is enabled, so the generated filter must allow unmatched versions to reach the worker. +- A version `1.4.0` request dispatches to the explicit legacy class. +- A version `1.0` request has no exact registration, so it dispatches to the unversioned fallback class. + +## Prerequisites + +- .NET 10.0 SDK +- [Docker](https://www.docker.com/get-started) + +## Running the Sample + +### 1. Start the DTS emulator + +```bash +docker run --name durabletask-emulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest +``` + +The emulator exposes the gRPC sidecar on port 8080 and the local dashboard on port 8082. After running the sample below, you can open the dashboard at to inspect the orchestrations and their versions. + +### 2. Set the connection string + +```bash +export DURABLE_TASK_SCHEDULER_CONNECTION_STRING="Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" +``` + +PowerShell: + +```powershell +$env:DURABLE_TASK_SCHEDULER_CONNECTION_STRING = "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" +``` + +### 3. Run the sample + +```bash +dotnet run +``` + +Expected output: + +```text +=== Unversioned fallback for versioned task dispatch === + +Scheduling SupportWorkflow version 1.4.0 ... + Result: Legacy SupportWorkflow 1.4.0 handled version '1.4.0' for Contoso: BGP session down + +Scheduling SupportWorkflow version 1.0 ... + Result: Current SupportWorkflow handled version '1.0' for Contoso: BGP session down + +Done! Version 1.4.0 used the explicit legacy class; version 1.0 used the unversioned fallback. +``` + +### 4. Clean up + +```bash +docker rm -f durabletask-emulator +``` + +## Key takeaways + +- Exact version matches always win. A `1.4.0` request dispatches to the `1.4.0` class, not the unversioned class. +- Three modes are available per side via `UnversionedFallbackMode`: + - `Implicit` (default) — the unversioned registration serves versioned requests only when the name has no versioned siblings. Matches behavior before per-task versioning shipped. + - `CatchAll` — opt-in catch-all for unmatched versioned requests on mixed names. This sample uses it. + - `StrictExactOnly` — every versioned request requires an exact `(name, version)` registration. Use when bogus versions from upstream clients should fail loudly. +- Orchestrator and activity fallback are configured independently. `OrchestratorUnversionedFallback` carries replay risk (orchestrators rehydrate state from history on every replay); `ActivityUnversionedFallback` is safer because activities are stateless. Start with activity-only `CatchAll` if you are unsure. +- Use orchestrator `CatchAll` only when the unversioned implementation is replay-compatible with the versions it may receive. Replaying existing histories against a different implementation can cause non-determinism or deserialization failures. +- `UseWorkItemFilters()` composes with these modes: it widens to a wildcard when the worker can actually serve unmatched versioned requests (under `Implicit` for unversioned-only names, under `CatchAll` whenever an unversioned registration exists). Under `StrictExactOnly` the filter emits the concrete version set so the backend does not deliver work items the worker would reject. + +## See also + +- [EternalOrchestrationVersionMigrationSample](../EternalOrchestrationVersionMigrationSample/README.md) — multi-version orchestration dispatch and `ContinueAsNew(NewVersion = "...")` migration. +- [ActivityVersioningSample](../ActivityVersioningSample/README.md) — activity versioning with inherited defaults and explicit overrides. +- [WorkerVersioningSample](../WorkerVersioningSample/README.md) — worker-level deployment versioning via `UseVersioning()`. diff --git a/samples/UnversionedFallbackSample/UnversionedFallbackSample.csproj b/samples/UnversionedFallbackSample/UnversionedFallbackSample.csproj new file mode 100644 index 00000000..2f40b471 --- /dev/null +++ b/samples/UnversionedFallbackSample/UnversionedFallbackSample.csproj @@ -0,0 +1,25 @@ + + + + Exe + net10.0 + enable + + + + + + + + + + + + + + + + + diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index a2f400f2..3452e3d7 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -62,9 +62,16 @@ public TaskOptions(TaskOptions options) /// /// /// When non-null (including ), the task is scheduled with the - /// specified version explicitly. The worker dispatches to the registered (name, version) exactly; - /// when no exact match exists, it falls back to an unversioned registration only when the name has no - /// versioned registrations at all. + /// specified version explicitly, overriding any inherited version. + /// + /// + /// The receiving worker is responsible for resolving the scheduled version to a registered implementation + /// according to its own versioning configuration. That resolution is not visible to the scheduling client. + /// + /// + /// When deploying versioned workloads, treat each worker's versioning configuration as a deployment-time + /// policy: the same version value may dispatch to different registrations across deployments. Use the + /// worker's startup and per-dispatch diagnostic logs to verify behavior across environments. /// /// public TaskVersion? Version { get; init; } diff --git a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs index 5e966992..bd81e8e7 100644 --- a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs +++ b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs @@ -4,6 +4,7 @@ using Microsoft.DurableTask.Worker.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; namespace Microsoft.DurableTask.Worker; @@ -54,10 +55,25 @@ public IHostedService Build(IServiceProvider serviceProvider) Verify.NotNull(this.buildTarget, error); DurableTaskRegistry registry = serviceProvider.GetOptions(this.Name); + DurableTaskWorkerOptions workerOptions = serviceProvider.GetOptions(this.Name); + ILoggerFactory? loggerFactory = serviceProvider.GetService(); + if (loggerFactory is not null && workerOptions.Versioning is { } versioning) + { + ILogger workerLogger = Logs.CreateWorkerLogger(loggerFactory); + if (versioning.OrchestratorUnversionedFallback == DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll) + { + workerLogger.OrchestratorUnversionedFallbackEnabled(this.Name); + } + + if (versioning.ActivityUnversionedFallback == DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll) + { + workerLogger.ActivityUnversionedFallbackEnabled(this.Name); + } + } // Note: Modifying any logic in this section could introduce breaking changes. // Do not alter the input parameter. return (IHostedService)ActivatorUtilities.CreateInstance( - serviceProvider, this.buildTarget, this.Name, registry.BuildFactory()); + serviceProvider, this.buildTarget, this.Name, registry.BuildFactory(workerOptions, loggerFactory)); } } diff --git a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs index a9274078..ac1b058f 100644 --- a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs +++ b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs @@ -106,6 +106,8 @@ public static IDurableTaskWorkerBuilder UseVersioning(this IDurableTaskWorkerBui DefaultVersion = versionOptions.DefaultVersion, MatchStrategy = versionOptions.MatchStrategy, FailureStrategy = versionOptions.FailureStrategy, + OrchestratorUnversionedFallback = versionOptions.OrchestratorUnversionedFallback, + ActivityUnversionedFallback = versionOptions.ActivityUnversionedFallback, }; }); return builder; diff --git a/src/Worker/Core/DurableTaskFactory.cs b/src/Worker/Core/DurableTaskFactory.cs index fcc97800..923053d7 100644 --- a/src/Worker/Core/DurableTaskFactory.cs +++ b/src/Worker/Core/DurableTaskFactory.cs @@ -3,6 +3,7 @@ using System.Diagnostics.CodeAnalysis; using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; namespace Microsoft.DurableTask.Worker; @@ -16,6 +17,9 @@ sealed class DurableTaskFactory : IDurableTaskFactory2, IVersionedTaskFactory readonly IDictionary> entities; readonly HashSet versionedOrchestratorNames; readonly HashSet versionedActivityNames; + readonly DurableTaskWorkerOptions.UnversionedFallbackMode orchestratorFallbackMode; + readonly DurableTaskWorkerOptions.UnversionedFallbackMode activityFallbackMode; + readonly ILogger? logger; /// /// Initializes a new instance of the class. @@ -23,19 +27,28 @@ sealed class DurableTaskFactory : IDurableTaskFactory2, IVersionedTaskFactory /// The activity factories. /// The orchestrator factories. /// The entity factories. + /// The unversioned fallback mode for orchestrators. + /// The unversioned fallback mode for activities. + /// Optional logger factory used to emit per-dispatch fallback diagnostics. internal DurableTaskFactory( IDictionary> activities, IDictionary> orchestrators, - IDictionary> entities) + IDictionary> entities, + DurableTaskWorkerOptions.UnversionedFallbackMode orchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.Implicit, + DurableTaskWorkerOptions.UnversionedFallbackMode activityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.Implicit, + ILoggerFactory? loggerFactory = null) { this.activities = Check.NotNull(activities); this.orchestrators = Check.NotNull(orchestrators); this.entities = Check.NotNull(entities); + this.orchestratorFallbackMode = orchestratorUnversionedFallback; + this.activityFallbackMode = activityUnversionedFallback; + this.logger = loggerFactory is not null ? Logs.CreateWorkerLogger(loggerFactory) : null; - // Snapshot the set of logical names that have at least one versioned registration. Used to gate the - // unversioned-fallback path: when a logical name has any versioned registration, we refuse to fall - // back to its unversioned registration for an unmatched versioned request — that would silently - // route the call to a different implementation than the caller asked for. + // Snapshot the set of logical names that have at least one versioned registration. Used by the + // Implicit fallback mode to recognize "unversioned-only" names, where a versioned request is allowed + // to resolve through the unversioned registration. CatchAll widens this for mixed names; StrictExactOnly + // disables fallback entirely. this.versionedOrchestratorNames = new HashSet( this.orchestrators.Keys .Where(k => !string.IsNullOrWhiteSpace(k.Version)) @@ -63,14 +76,13 @@ public bool TryCreateActivity( return true; } - // Unversioned registrations remain the compatibility fallback for a versioned request, but ONLY when - // no versioned registration exists for the same logical name. This mirrors the orchestrator rule: - // once a name has any versioned registration, an unmatched versioned request returns "not found" - // rather than silently routing to a catch-all the caller did not ask for. + // Resolve a versioned request through the unversioned registration when the mode allows it. + // See UnversionedFallbackMode for the dispatch matrix. if (!string.IsNullOrWhiteSpace(version.Version) - && !this.versionedActivityNames.Contains(name.Name) + && ShouldUseUnversionedFallback(this.activityFallbackMode, this.versionedActivityNames, name.Name) && this.activities.TryGetValue(new TaskVersionKey(name, default(TaskVersion)), out factory)) { + this.logger?.ActivityDispatchedToUnversionedFallback(name.Name, version.Version); activity = factory.Invoke(serviceProvider); return true; } @@ -99,14 +111,13 @@ public bool TryCreateOrchestrator( return true; } - // Unversioned registrations remain the compatibility fallback for a versioned request, but ONLY when - // no versioned registration exists for the same logical name. If any versioned registration is present - // (e.g., v1 and v2 are registered, request asks for v3), we refuse to silently route the call to a - // catch-all registration the caller did not ask for. + // Resolve a versioned request through the unversioned registration when the mode allows it. + // See UnversionedFallbackMode for the dispatch matrix. if (!string.IsNullOrWhiteSpace(version.Version) - && !this.versionedOrchestratorNames.Contains(name.Name) + && ShouldUseUnversionedFallback(this.orchestratorFallbackMode, this.versionedOrchestratorNames, name.Name) && this.orchestrators.TryGetValue(new TaskVersionKey(name, default(TaskVersion)), out factory)) { + this.logger?.OrchestratorDispatchedToUnversionedFallback(name.Name, version.Version); orchestrator = factory.Invoke(serviceProvider); return true; } @@ -133,4 +144,18 @@ public bool TryCreateEntity( entity = null; return false; } + + static bool ShouldUseUnversionedFallback( + DurableTaskWorkerOptions.UnversionedFallbackMode mode, + HashSet versionedNames, + string requestedName) + { + return mode switch + { + DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly => false, + DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll => true, + DurableTaskWorkerOptions.UnversionedFallbackMode.Implicit => !versionedNames.Contains(requestedName), + _ => !versionedNames.Contains(requestedName), + }; + } } diff --git a/src/Worker/Core/DurableTaskRegistryExtensions.cs b/src/Worker/Core/DurableTaskRegistryExtensions.cs index 5d185394..4e5b5b4e 100644 --- a/src/Worker/Core/DurableTaskRegistryExtensions.cs +++ b/src/Worker/Core/DurableTaskRegistryExtensions.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using Microsoft.Extensions.Logging; + namespace Microsoft.DurableTask.Worker; /// @@ -14,8 +16,33 @@ static class DurableTaskRegistryExtensions /// The registry to build. /// The built factory. public static IDurableTaskFactory BuildFactory(this DurableTaskRegistry registry) + => registry.BuildFactory(workerOptions: null, loggerFactory: null); + + /// + /// Builds a into a . + /// + /// The registry to build. + /// The worker options to use when building the factory. + /// Optional logger factory used to emit per-dispatch fallback diagnostics. + /// The built factory. + public static IDurableTaskFactory BuildFactory( + this DurableTaskRegistry registry, + DurableTaskWorkerOptions? workerOptions, + ILoggerFactory? loggerFactory = null) { Check.NotNull(registry); - return new DurableTaskFactory(registry.ActivitiesByVersion, registry.OrchestratorsByVersion, registry.Entities); + DurableTaskWorkerOptions.UnversionedFallbackMode orchestratorFallback = + workerOptions?.Versioning?.OrchestratorUnversionedFallback + ?? DurableTaskWorkerOptions.UnversionedFallbackMode.Implicit; + DurableTaskWorkerOptions.UnversionedFallbackMode activityFallback = + workerOptions?.Versioning?.ActivityUnversionedFallback + ?? DurableTaskWorkerOptions.UnversionedFallbackMode.Implicit; + return new DurableTaskFactory( + registry.ActivitiesByVersion, + registry.OrchestratorsByVersion, + registry.Entities, + orchestratorFallback, + activityFallback, + loggerFactory); } } diff --git a/src/Worker/Core/DurableTaskWorkerOptions.cs b/src/Worker/Core/DurableTaskWorkerOptions.cs index 3aa4eea2..9d17767e 100644 --- a/src/Worker/Core/DurableTaskWorkerOptions.cs +++ b/src/Worker/Core/DurableTaskWorkerOptions.cs @@ -49,6 +49,65 @@ public enum VersionFailureStrategy Fail = 1, } + /// + /// Controls how an unversioned task registration is used to serve versioned task requests. Only affects + /// dispatch decisions; orchestration instance acceptance is controlled by . + /// + /// + /// The matrix below summarizes dispatch for a versioned request under each mode: + /// + /// + /// Registration shape for the task name + /// Result with / / + /// + /// + /// Only unversioned registration + /// Implicit: unversioned. CatchAll: unversioned. StrictExactOnly: not found. + /// + /// + /// Mixed (versioned + unversioned), exact version match + /// All three modes dispatch to the exact-matching versioned registration. + /// + /// + /// Mixed (versioned + unversioned), no exact version match + /// Implicit: not found. CatchAll: unversioned. StrictExactOnly: not found. + /// + /// + /// Only versioned registrations, no exact version match + /// All three modes return "not found" (no unversioned implementation exists). + /// + /// + /// + /// Unversioned requests (no version specified on the schedule call) always dispatch to the unversioned + /// registration when one exists, regardless of this setting. + /// + /// + public enum UnversionedFallbackMode + { + /// + /// Preserve the long-standing implicit fallback: the unversioned registration serves versioned requests + /// only when the task name has no versioned siblings. Once a name has at least one versioned + /// registration, an unmatched versioned request returns "not found" rather than dispatching to the + /// unversioned registration. This is the default and matches behavior prior to per-task versioning. + /// + Implicit = 0, + + /// + /// Use the unversioned registration as a catch-all when no exact versioned match exists, even when + /// the task name has versioned siblings. An exact versioned match still wins. Use only when the + /// unversioned implementation is replay-compatible with every version it may receive. + /// + CatchAll = 1, + + /// + /// Require an exact (name, version) registration for every versioned request. Versioned + /// requests for names without an exact registration return "not found" even when an unversioned + /// registration for the same name exists. Use this mode when stale or bogus version values from + /// upstream clients should fail loudly instead of landing on the unversioned registration. + /// + StrictExactOnly = 2, + } + /// /// Gets or sets the data converter. Default value is . /// @@ -176,7 +235,6 @@ public DataConverter DataConverter /// internal bool DataConverterExplicitlySet { get; private set; } - /// /// Applies these option values to another. /// @@ -190,7 +248,9 @@ internal void ApplyTo(DurableTaskWorkerOptions other) other.MaximumTimerInterval = this.MaximumTimerInterval; other.EnableEntitySupport = this.EnableEntitySupport; other.Versioning = this.Versioning; +#pragma warning disable CS0618 // Internal forwarding of the experimental OrchestrationFilter property. other.OrchestrationFilter = this.OrchestrationFilter; +#pragma warning restore CS0618 other.Logging.UseLegacyCategories = this.Logging.UseLegacyCategories; } } @@ -243,6 +303,75 @@ public class VersioningOptions /// If the version matching strategy is set to , this value has no effect. /// public VersionFailureStrategy FailureStrategy { get; set; } = VersionFailureStrategy.Reject; + + /// + /// Gets or sets how the unversioned orchestrator registration participates in dispatch for + /// versioned orchestrator requests. + /// + /// + /// + /// Defaults to . See + /// for the dispatch matrix across the three modes. + /// + /// + /// Replay risk is highest on the orchestrator side: orchestrators are deterministic and rehydrate + /// state from history on every replay. Enable only + /// when the unversioned orchestrator implementation is replay-compatible with every version it may + /// receive. Replaying existing histories against an incompatible implementation can cause + /// non-determinism faults or deserialization failures. + /// eliminates fallback entirely for this side; pair with explicit per-version registrations for every + /// version your callers may schedule. + /// + /// + /// Interaction with other versioning options: + /// + /// + /// Unlike , this setting applies regardless of + /// . The factory-level fallback decision runs whether or not the + /// pre-dispatch versioning gate is active. + /// When is , + /// the pre-dispatch versioning gate rejects instance versions that don't equal the worker's + /// configured . This setting does not bypass the gate, but governs how the + /// factory resolves instances that pass it. Note that with , + /// the worker's configured version must also have an exact registration for the receiving task name; + /// otherwise the factory will reject work items that the strict-mode work-item filter still requests + /// from the backend. + /// When is + /// , the pre-dispatch versioning gate rejects + /// orchestration versions newer than . This setting governs only how versions + /// accepted by the gate are resolved; newer-than-worker versions are still subject to + /// . + /// + /// + public UnversionedFallbackMode OrchestratorUnversionedFallback { get; set; } = UnversionedFallbackMode.Implicit; + + /// + /// Gets or sets how the unversioned activity registration participates in dispatch for versioned + /// activity requests. + /// + /// + /// + /// Defaults to . See + /// for the dispatch matrix across the three modes. + /// + /// + /// Activities are stateless and do not replay history, so + /// carries less risk than the orchestrator equivalent. The main concern is input contract + /// compatibility: ensure the unversioned activity implementation accepts the input shapes produced by + /// every version of the calling orchestrators that may schedule it. + /// eliminates fallback entirely for activities. + /// + /// + /// Interaction with other versioning options: + /// + /// + /// Unlike , this setting applies regardless of + /// . + /// applies to orchestration instance versions, not + /// activity scheduling versions, so it does not gate activity dispatch. + /// + /// + public UnversionedFallbackMode ActivityUnversionedFallback { get; set; } = UnversionedFallbackMode.Implicit; } /// diff --git a/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs index ccedc3b1..94cbe0aa 100644 --- a/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs +++ b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs @@ -7,8 +7,10 @@ namespace Microsoft.DurableTask.Worker; /// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend /// and only work items matching the filters will be processed by the worker. If no filters are provided, /// the worker will process all work items. To opt-in to work item filtering, call -/// on the worker builder with either -/// explicit filters or auto-generated filters from the . +/// for the +/// auto-generated filters from the worker's , or +/// +/// to supply explicit filters. /// public class DurableTaskWorkerWorkItemFilters { @@ -43,21 +45,32 @@ internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(Durable workerOptions?.Versioning?.MatchStrategy == DurableTaskWorkerOptions.VersionMatchStrategy.Strict ? [workerOptions.Versioning.Version ?? string.Empty] : null; + DurableTaskWorkerOptions.UnversionedFallbackMode orchestratorFallbackMode = + workerOptions?.Versioning?.OrchestratorUnversionedFallback + ?? DurableTaskWorkerOptions.UnversionedFallbackMode.Implicit; + DurableTaskWorkerOptions.UnversionedFallbackMode activityFallbackMode = + workerOptions?.Versioning?.ActivityUnversionedFallback + ?? DurableTaskWorkerOptions.UnversionedFallbackMode.Implicit; // Orchestration filters group registrations by logical name and emit the concrete distinct // version set actually registered (treating null/unversioned as ""). Strict mode overrides - // this with the single configured worker version. For unversioned-only names (no versioned - // registration exists for the name), we emit an empty version list — the filter wildcard — - // so the backend can deliver versioned work items that the factory will then resolve via - // the documented unversioned fallback in DurableTaskFactory.TryCreateOrchestrator. When a - // name has at least one versioned registration, the factory refuses unversioned-fallback, - // so emitting the concrete version set prevents the backend from streaming work items the - // worker would then reject after the fact. + // this with the single configured worker version. When the factory can resolve unmatched + // versions via the unversioned registration (unversioned-only names under Implicit, or any + // name with an unversioned registration under CatchAll), we emit an empty version list — the + // filter wildcard — so the backend delivers versioned work items the factory can handle. + // Under StrictExactOnly the factory rejects unmatched versioned requests for every name, + // including unversioned-only names, so the filter must emit the concrete version set instead + // of widening. + // + // Orchestrator and activity fallback are configured independently, so each filter set + // consults its own mode. List orchestrationFilters = registry.OrchestratorsByVersion .GroupBy(orchestration => orchestration.Key.Name, StringComparer.OrdinalIgnoreCase) .Select(group => { - IReadOnlyList versions = strictWorkerVersions ?? GetFilterVersions(group.Select(entry => entry.Key.Version)); + IReadOnlyList versions = + strictWorkerVersions + ?? GetFilterVersions(group.Select(entry => entry.Key.Version), orchestratorFallbackMode); return new OrchestrationFilter { @@ -71,7 +84,9 @@ internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(Durable .GroupBy(activity => activity.Key.Name, StringComparer.OrdinalIgnoreCase) .Select(group => { - IReadOnlyList versions = strictWorkerVersions ?? GetFilterVersions(group.Select(entry => entry.Key.Version)); + IReadOnlyList versions = + strictWorkerVersions + ?? GetFilterVersions(group.Select(entry => entry.Key.Version), activityFallbackMode); return new ActivityFilter { @@ -92,7 +107,9 @@ internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(Durable }).ToList(), }; - static IReadOnlyList GetFilterVersions(IEnumerable versions) + static IReadOnlyList GetFilterVersions( + IEnumerable versions, + DurableTaskWorkerOptions.UnversionedFallbackMode mode) { // Normalize null to "" so an unversioned registration appears consistently. string[] normalized = versions @@ -101,11 +118,29 @@ static IReadOnlyList GetFilterVersions(IEnumerable versions) .OrderBy(version => version, StringComparer.OrdinalIgnoreCase) .ToArray(); - // Unversioned-only: emit the wildcard match-all (empty list) so the backend can deliver - // versioned work items that the factory will resolve via unversioned fallback. Without - // this, callers asking for a specific version would be filtered out at the backend even - // though the worker can handle them. - if (normalized.Length == 1 && normalized[0].Length == 0) + // StrictExactOnly disables every fallback path, including the long-standing implicit + // unversioned-only fallback. Emit the concrete registered version set so the backend + // does not deliver versioned work items the factory will reject after the fact. + if (mode == DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly) + { + return normalized; + } + + // Otherwise, widen to a wildcard when the factory can actually resolve unmatched versions: + // - Implicit: only when the registry has no versioned siblings for this name (i.e. + // normalized is exactly [""]). + // - CatchAll: whenever the registry has an unversioned registration for this name. + bool hasUnversionedRegistration = + normalized.Contains(string.Empty, StringComparer.OrdinalIgnoreCase); + bool implicitWildcard = + mode == DurableTaskWorkerOptions.UnversionedFallbackMode.Implicit + && normalized.Length == 1 + && normalized[0].Length == 0; + bool catchAllWildcard = + mode == DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll + && hasUnversionedRegistration; + + if (implicitWildcard || catchAllWildcard) { return []; } diff --git a/src/Worker/Core/Logs.cs b/src/Worker/Core/Logs.cs index 81a65e25..eef4e1b0 100644 --- a/src/Worker/Core/Logs.cs +++ b/src/Worker/Core/Logs.cs @@ -35,6 +35,30 @@ static partial class Logs [LoggerMessage(EventId = 605, Level = LogLevel.Information, Message = "'{Name}' activity of orchestration ID '{InstanceId}' failed.")] public static partial void ActivityFailed(this ILogger logger, Exception ex, string instanceId, string name); + [LoggerMessage( + EventId = 606, + Level = LogLevel.Warning, + Message = "Orchestrator unversioned fallback is enabled for Durable Task worker '{workerName}'. Unmatched versioned orchestrators may run on the unversioned registration; ensure that implementation is replay-compatible with every version it may receive. Replaying existing histories against a different implementation can cause non-determinism or deserialization failures.")] + public static partial void OrchestratorUnversionedFallbackEnabled(this ILogger logger, string workerName); + + [LoggerMessage( + EventId = 607, + Level = LogLevel.Warning, + Message = "Activity unversioned fallback is enabled for Durable Task worker '{workerName}'. Unmatched versioned activities may run on the unversioned registration; ensure that implementation accepts the input shapes produced by every version of the calling orchestrators.")] + public static partial void ActivityUnversionedFallbackEnabled(this ILogger logger, string workerName); + + [LoggerMessage( + EventId = 608, + Level = LogLevel.Debug, + Message = "Orchestrator '{Name}' version '{RequestedVersion}' had no exact match; dispatching to the unversioned registration.")] + public static partial void OrchestratorDispatchedToUnversionedFallback(this ILogger logger, string name, string requestedVersion); + + [LoggerMessage( + EventId = 609, + Level = LogLevel.Debug, + Message = "Activity '{Name}' version '{RequestedVersion}' had no exact match; dispatching to the unversioned registration.")] + public static partial void ActivityDispatchedToUnversionedFallback(this ILogger logger, string name, string requestedVersion); + /// /// Creates a logger named "Microsoft.DurableTask.Worker" with an optional subcategory. /// diff --git a/test/Worker/Core.Tests/CapturingLoggerFactory.cs b/test/Worker/Core.Tests/CapturingLoggerFactory.cs new file mode 100644 index 00000000..46f9afa0 --- /dev/null +++ b/test/Worker/Core.Tests/CapturingLoggerFactory.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask.Worker.Tests; + +/// +/// In-memory that captures every log call so tests can assert on level + message. +/// +sealed class CapturingLoggerFactory : ILoggerFactory +{ + public List<(LogLevel Level, string Message)> Logs { get; } = []; + + public void AddProvider(ILoggerProvider provider) + { + } + + public ILogger CreateLogger(string categoryName) => new CapturingLogger(this.Logs); + + public void Dispose() + { + } + + sealed class CapturingLogger(List<(LogLevel Level, string Message)> logs) : ILogger + { + readonly List<(LogLevel Level, string Message)> logs = logs; + + public IDisposable? BeginScope(TState state) + where TState : notnull + => null; + + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log( + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + { + this.logs.Add((logLevel, formatter(state, exception))); + } + } +} diff --git a/test/Worker/Core.Tests/DependencyInjection/DefaultDurableTaskWorkerBuilderTests.cs b/test/Worker/Core.Tests/DependencyInjection/DefaultDurableTaskWorkerBuilderTests.cs index e510da34..ff8247e2 100644 --- a/test/Worker/Core.Tests/DependencyInjection/DefaultDurableTaskWorkerBuilderTests.cs +++ b/test/Worker/Core.Tests/DependencyInjection/DefaultDurableTaskWorkerBuilderTests.cs @@ -5,6 +5,7 @@ using Microsoft.DurableTask.Worker.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Microsoft.DurableTask.Worker.Tests; @@ -85,6 +86,98 @@ public void Build_Target_Built() target.Options.DataConverter.Should().BeSameAs(converter); } + [Fact] + public void Build_WithOrchestratorUnversionedFallback_LogsOrchestratorWarning() + { + // Arrange + CapturingLoggerFactory loggerFactory = new(); + ServiceCollection services = new(); + services.AddOptions(); + services.AddSingleton(loggerFactory); + DefaultDurableTaskWorkerBuilder builder = new("test", services) + { + BuildTarget = typeof(GoodBuildTarget), + }; + builder.UseVersioning(new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }); + + // Act + builder.Build(services.BuildServiceProvider()); + + // Assert + loggerFactory.Logs.Should().Contain(log => + log.Level == LogLevel.Warning + && log.Message.Contains("Orchestrator unversioned fallback", StringComparison.OrdinalIgnoreCase) + && log.Message.Contains("replay", StringComparison.OrdinalIgnoreCase) + && log.Message.Contains("non-determinism", StringComparison.OrdinalIgnoreCase) + && log.Message.Contains("deserialization", StringComparison.OrdinalIgnoreCase)); + loggerFactory.Logs.Should().NotContain(log => + log.Level == LogLevel.Warning + && log.Message.Contains("Activity unversioned fallback", StringComparison.OrdinalIgnoreCase)); + } + + [Fact] + public void Build_WithActivityUnversionedFallback_LogsActivityWarning() + { + // Arrange + CapturingLoggerFactory loggerFactory = new(); + ServiceCollection services = new(); + services.AddOptions(); + services.AddSingleton(loggerFactory); + DefaultDurableTaskWorkerBuilder builder = new("test", services) + { + BuildTarget = typeof(GoodBuildTarget), + }; + builder.UseVersioning(new DurableTaskWorkerOptions.VersioningOptions + { + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }); + + // Act + builder.Build(services.BuildServiceProvider()); + + // Assert + loggerFactory.Logs.Should().Contain(log => + log.Level == LogLevel.Warning + && log.Message.Contains("Activity unversioned fallback", StringComparison.OrdinalIgnoreCase) + && log.Message.Contains("input shapes", StringComparison.OrdinalIgnoreCase)); + loggerFactory.Logs.Should().NotContain(log => + log.Level == LogLevel.Warning + && log.Message.Contains("Orchestrator unversioned fallback", StringComparison.OrdinalIgnoreCase)); + } + + [Fact] + public void Build_WithBothFallbacksEnabled_LogsBothWarnings() + { + // Arrange + CapturingLoggerFactory loggerFactory = new(); + ServiceCollection services = new(); + services.AddOptions(); + services.AddSingleton(loggerFactory); + DefaultDurableTaskWorkerBuilder builder = new("test", services) + { + BuildTarget = typeof(GoodBuildTarget), + }; + builder.UseVersioning(new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }); + + // Act + builder.Build(services.BuildServiceProvider()); + + // Assert + loggerFactory.Logs.Should().Contain(log => + log.Level == LogLevel.Warning + && log.Message.Contains("Orchestrator unversioned fallback", StringComparison.OrdinalIgnoreCase)); + loggerFactory.Logs.Should().Contain(log => + log.Level == LogLevel.Warning + && log.Message.Contains("Activity unversioned fallback", StringComparison.OrdinalIgnoreCase)); + } + class BadBuildTarget : BackgroundService { protected override Task ExecuteAsync(CancellationToken stoppingToken) diff --git a/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs index 6ad2eac8..ae95d40e 100644 --- a/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs +++ b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs @@ -365,6 +365,79 @@ public void WorkItemFilters_UnversionedAndVersionedOrchestrators_EmitConcreteVer actual.Orchestrations[0].Versions.Should().BeEquivalentTo([string.Empty, "v2"]); } + [Fact] + public void WorkItemFilters_UnversionedFallbackWithMixedOrchestrators_EmitsWildcardVersionList() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddOrchestrator(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(); + actual.Orchestrations[0].Name.Should().Be("FilterWorkflow"); + actual.Orchestrations[0].Versions.Should().BeEmpty(); + } + + [Fact] + public void WorkItemFilters_OrchestratorFallbackOnly_DoesNotWidenActivityFilter() + { + // Arrange — orchestrator fallback ON, activity fallback OFF. Mixed orchestrator name widens to + // wildcard; mixed activity name must still emit the concrete version list because the worker + // refuses activity unversioned-fallback for that name. + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddOrchestrator(); + registry.AddActivity(); + registry.AddActivity(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(); + actual.Orchestrations[0].Versions.Should().BeEmpty(); + actual.Activities.Should().ContainSingle(); + actual.Activities[0].Versions.Should().BeEquivalentTo([string.Empty, "v2"]); + } + [Fact] public void WorkItemFilters_VersionedActivities_GroupVersionsByLogicalName() { @@ -421,6 +494,181 @@ public void WorkItemFilters_UnversionedAndVersionedActivities_EmitConcreteVersio actual.Activities[0].Versions.Should().BeEquivalentTo([string.Empty, "v2"]); } + [Fact] + public void WorkItemFilters_UnversionedFallbackWithMixedActivities_EmitsWildcardVersionList() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddActivity(); + registry.AddActivity(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Activities.Should().ContainSingle(); + actual.Activities[0].Name.Should().Be("FilterActivity"); + actual.Activities[0].Versions.Should().BeEmpty(); + } + + [Fact] + public void WorkItemFilters_UnversionedFallbackWithVersioningStrict_UsesConfiguredWorkerVersion() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddOrchestrator(); + registry.AddActivity(); + registry.AddActivity(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + Version = "1.0", + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.Strict, + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(); + actual.Orchestrations[0].Versions.Should().BeEquivalentTo(["1.0"]); + actual.Activities.Should().ContainSingle(); + actual.Activities[0].Versions.Should().BeEquivalentTo(["1.0"]); + } + + [Fact] + public void WorkItemFilters_StrictExactOnlyForOrchestrators_DoesNotWildcardUnversionedOnly() + { + // Arrange — only the unversioned orchestrator is registered. Under Implicit (default), the + // filter would widen to wildcard [] because the factory resolves unmatched versions via the + // implicit fallback. Under StrictExactOnly the factory rejects those requests, so the filter + // MUST emit the concrete [""] version list to prevent the backend from delivering versioned + // work items the worker will reject after the fact. + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => registry.AddOrchestrator()); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(); + actual.Orchestrations[0].Name.Should().Be("FilterWorkflow"); + actual.Orchestrations[0].Versions.Should().BeEquivalentTo([string.Empty]); + } + + [Fact] + public void WorkItemFilters_StrictExactOnlyForActivities_DoesNotWildcardUnversionedOnly() + { + // Arrange — symmetric activity-side coverage for the StrictExactOnly filter behavior. + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => registry.AddActivity()); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Activities.Should().ContainSingle(); + actual.Activities[0].Name.Should().Be("FilterActivity"); + actual.Activities[0].Versions.Should().BeEquivalentTo([string.Empty]); + } + + [Fact] + public void WorkItemFilters_StrictMatchOverridesStrictExactOnly_KnownLimitation() + { + // Arrange — pathological config: MatchStrategy=Strict with a worker Version, combined with + // StrictExactOnly, against an unversioned-only registration. The pre-existing strict override + // emits the worker's Version (best-effort assumption that the user has registered it). Under + // StrictExactOnly with no exact match, the factory will reject those work items. The filter + // still emits the worker version — captured here as a known limitation so the behavior is + // tracked, not silently changed. The per-property remarks document this gap; a proper fix + // would require per-name dispatch-capability analysis and is out of scope for this PR. + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => registry.AddOrchestrator()); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + Version = "1.0", + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.Strict, + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(); + actual.Orchestrations[0].Versions.Should().BeEquivalentTo(["1.0"]); + } + [Fact] public void WorkItemFilters_DefaultEmptyRegistry_ProducesEmptyFilters() { diff --git a/test/Worker/Core.Tests/DurableTaskFactoryActivityVersioningTests.cs b/test/Worker/Core.Tests/DurableTaskFactoryActivityVersioningTests.cs index dbef0947..007d31c4 100644 --- a/test/Worker/Core.Tests/DurableTaskFactoryActivityVersioningTests.cs +++ b/test/Worker/Core.Tests/DurableTaskFactoryActivityVersioningTests.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using Microsoft.Extensions.Logging; + namespace Microsoft.DurableTask.Worker.Tests; public class DurableTaskFactoryActivityVersioningTests @@ -130,6 +132,182 @@ public void TryCreateActivity_WithMixedRegistrations_DoesNotFallBackToUnversione activity.Should().BeNull(); } + [Fact] + public void TryCreateActivity_WithMixedRegistrationsAndUnversionedFallback_UsesUnversionedRegistrationForUnknownVersion() + { + // Arrange + DurableTaskRegistry registry = new(); + registry.AddActivity(); + registry.AddActivity(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateActivity( + new TaskName("InvoiceActivity"), + new TaskVersion("v2"), + Mock.Of(), + out ITaskActivity? activity); + + // Assert + found.Should().BeTrue(); + activity.Should().BeOfType(); + } + + [Fact] + public void TryCreateActivity_WithOrchestratorFallbackOnly_DoesNotEnableActivityFallback() + { + // Arrange — only the orchestrator-side flag is enabled. Activity dispatch must still be closed-set + // for mixed names; the split into two properties must isolate the two sides independently. + DurableTaskRegistry registry = new(); + registry.AddActivity(); + registry.AddActivity(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateActivity( + new TaskName("InvoiceActivity"), + new TaskVersion("v9"), + Mock.Of(), + out ITaskActivity? activity); + + // Assert + found.Should().BeFalse(); + activity.Should().BeNull(); + } + + [Fact] + public void TryCreateActivity_WithActivityFallback_LogsDispatchAtDebug() + { + // Arrange + DurableTaskRegistry registry = new(); + registry.AddActivity(); + registry.AddActivity(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }, + }; + CapturingLoggerFactory loggerFactory = new(); + IDurableTaskFactory factory = registry.BuildFactory(workerOptions, loggerFactory); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateActivity( + new TaskName("InvoiceActivity"), + new TaskVersion("v9"), + Mock.Of(), + out ITaskActivity? activity); + + // Assert + found.Should().BeTrue(); + activity.Should().BeOfType(); + loggerFactory.Logs.Should().Contain(log => + log.Level == LogLevel.Debug + && log.Message.Contains("InvoiceActivity", StringComparison.Ordinal) + && log.Message.Contains("v9", StringComparison.Ordinal) + && log.Message.Contains("unversioned", StringComparison.OrdinalIgnoreCase)); + } + + [Fact] + public void TryCreateActivity_StrictExactOnlyWithOnlyUnversionedRegistration_RejectsVersionedRequest() + { + // Arrange — only the unversioned registration exists. Under Implicit, a versioned request would + // resolve to it. StrictExactOnly disables that path: the request must fail. + DurableTaskRegistry registry = new(); + registry.AddActivity(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateActivity( + new TaskName("InvoiceActivity"), + new TaskVersion("v1"), + Mock.Of(), + out ITaskActivity? activity); + + // Assert + found.Should().BeFalse(); + activity.Should().BeNull(); + } + + [Fact] + public void TryCreateActivity_StrictExactOnlyWithOnlyUnversionedRegistration_AcceptsUnversionedRequest() + { + // Arrange — exact-match path for unversioned requests is preserved. + DurableTaskRegistry registry = new(); + registry.AddActivity(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = factory.TryCreateActivity( + new TaskName("InvoiceActivity"), + Mock.Of(), + out ITaskActivity? activity); + + // Assert + found.Should().BeTrue(); + activity.Should().BeOfType(); + } + + [Fact] + public void TryCreateActivity_OrchestratorStrictExactOnly_DoesNotAffectActivityFallback() + { + // Arrange — asymmetric explicit modes: orchestrator side StrictExactOnly, activity side CatchAll. + // Activity registry is mixed ([X] + [X v=1]). Versioned request for v9 must fall back to the + // unversioned activity per CatchAll on the activity side; the orchestrator-side flag must NOT leak. + DurableTaskRegistry registry = new(); + registry.AddActivity(); + registry.AddActivity(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly, + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateActivity( + new TaskName("InvoiceActivity"), + new TaskVersion("v9"), + Mock.Of(), + out ITaskActivity? activity); + + // Assert + found.Should().BeTrue(); + activity.Should().BeOfType(); + } + [DurableTask("InvoiceActivity", Version = "v1")] sealed class InvoiceActivityV1 : TaskActivity { diff --git a/test/Worker/Core.Tests/DurableTaskFactoryVersioningTests.cs b/test/Worker/Core.Tests/DurableTaskFactoryVersioningTests.cs index 5244607e..8ad0e15a 100644 --- a/test/Worker/Core.Tests/DurableTaskFactoryVersioningTests.cs +++ b/test/Worker/Core.Tests/DurableTaskFactoryVersioningTests.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using Microsoft.Extensions.Logging; + namespace Microsoft.DurableTask.Worker.Tests; public class DurableTaskFactoryVersioningTests @@ -112,6 +114,98 @@ public void TryCreateOrchestrator_WithMixedRegistrations_DoesNotFallBackForUnkno orchestrator.Should().BeNull(); } + [Fact] + public void TryCreateOrchestrator_WithMixedRegistrationsAndUnversionedFallback_UsesUnversionedRegistrationForUnknownVersion() + { + // Arrange + DurableTaskRegistry registry = new(); + registry.AddOrchestrator(); + registry.AddOrchestrator(); + registry.AddOrchestrator(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateOrchestrator( + new TaskName("InvoiceWorkflow"), + new TaskVersion("v3"), + Mock.Of(), + out ITaskOrchestrator? orchestrator); + + // Assert + found.Should().BeTrue(); + orchestrator.Should().BeOfType(); + } + + [Fact] + public void TryCreateOrchestrator_WithActivityFallbackOnly_DoesNotEnableOrchestratorFallback() + { + // Arrange — only the activity-side flag is enabled. Orchestrator dispatch must still be closed-set + // for mixed names; otherwise the split into two properties does not actually isolate the two sides. + DurableTaskRegistry registry = new(); + registry.AddOrchestrator(); + registry.AddOrchestrator(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + ActivityUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateOrchestrator( + new TaskName("InvoiceWorkflow"), + new TaskVersion("v9"), + Mock.Of(), + out ITaskOrchestrator? orchestrator); + + // Assert + found.Should().BeFalse(); + orchestrator.Should().BeNull(); + } + + [Fact] + public void TryCreateOrchestrator_WithOrchestratorFallback_LogsDispatchAtDebug() + { + // Arrange + DurableTaskRegistry registry = new(); + registry.AddOrchestrator(); + registry.AddOrchestrator(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }, + }; + CapturingLoggerFactory loggerFactory = new(); + IDurableTaskFactory factory = registry.BuildFactory(workerOptions, loggerFactory); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateOrchestrator( + new TaskName("InvoiceWorkflow"), + new TaskVersion("v9"), + Mock.Of(), + out ITaskOrchestrator? orchestrator); + + // Assert + found.Should().BeTrue(); + orchestrator.Should().BeOfType(); + loggerFactory.Logs.Should().Contain(log => + log.Level == LogLevel.Debug + && log.Message.Contains("InvoiceWorkflow", StringComparison.Ordinal) + && log.Message.Contains("v9", StringComparison.Ordinal) + && log.Message.Contains("unversioned", StringComparison.OrdinalIgnoreCase)); + } + [Fact] public void TryCreateOrchestrator_WithOnlyUnversionedRegistration_FallsBackForVersionedRequest() { @@ -134,6 +228,92 @@ public void TryCreateOrchestrator_WithOnlyUnversionedRegistration_FallsBackForVe orchestrator.Should().BeOfType(); } + [Fact] + public void TryCreateOrchestrator_StrictExactOnlyWithOnlyUnversionedRegistration_RejectsVersionedRequest() + { + // Arrange — only the unversioned registration exists. Under Implicit, a versioned request would + // resolve to it. StrictExactOnly disables that path: the request must fail. + DurableTaskRegistry registry = new(); + registry.AddOrchestrator(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateOrchestrator( + new TaskName("InvoiceWorkflow"), + new TaskVersion("v1"), + Mock.Of(), + out ITaskOrchestrator? orchestrator); + + // Assert + found.Should().BeFalse(); + orchestrator.Should().BeNull(); + } + + [Fact] + public void TryCreateOrchestrator_StrictExactOnlyWithOnlyUnversionedRegistration_AcceptsUnversionedRequest() + { + // Arrange — StrictExactOnly disables FALLBACK for versioned requests, not the exact-match path + // for unversioned requests. An unversioned request to an unversioned registration must still + // dispatch (it is an exact match in the empty-version key). + DurableTaskRegistry registry = new(); + registry.AddOrchestrator(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = factory.TryCreateOrchestrator( + new TaskName("InvoiceWorkflow"), + Mock.Of(), + out ITaskOrchestrator? orchestrator); + + // Assert + found.Should().BeTrue(); + orchestrator.Should().BeOfType(); + } + + [Fact] + public void TryCreateOrchestrator_StrictExactOnlyWithMixedRegistrations_RejectsUnmatchedVersion() + { + // Arrange — same registry shape as the "no fallback for unknown version" test, but explicitly + // under StrictExactOnly. Behavior should match Implicit for this shape (both reject), confirming + // StrictExactOnly is a strict generalization of Implicit on mixed names. + DurableTaskRegistry registry = new(); + registry.AddOrchestrator(); + registry.AddOrchestrator(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.StrictExactOnly, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateOrchestrator( + new TaskName("InvoiceWorkflow"), + new TaskVersion("v9"), + Mock.Of(), + out ITaskOrchestrator? orchestrator); + + // Assert + found.Should().BeFalse(); + orchestrator.Should().BeNull(); + } + [Fact] public void PublicTryCreateOrchestrator_UsesUnversionedRegistrationOnly() { diff --git a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerTests.cs b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerTests.cs index 4c78aac0..e46baff3 100644 --- a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerTests.cs +++ b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerTests.cs @@ -28,6 +28,9 @@ public class GrpcDurableTaskWorkerTests static readonly MethodInfo ProcessorConnectAsyncMethod = typeof(GrpcDurableTaskWorker) .GetNestedType("Processor", BindingFlags.NonPublic)! .GetMethod("ConnectAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; + static readonly MethodInfo ProcessorRunOrchestratorAsyncMethod = typeof(GrpcDurableTaskWorker) + .GetNestedType("Processor", BindingFlags.NonPublic)! + .GetMethod("OnRunOrchestratorAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; static readonly MethodInfo TryRecreateChannelAsyncMethod = typeof(GrpcDurableTaskWorker) .GetMethod("TryRecreateChannelAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; @@ -514,6 +517,71 @@ public void Constructor_StrictWorkerVersioningWithoutRegistryContents_DoesNotThr act.Should().NotThrow(); } + [Fact] + public async Task OnRunOrchestratorAsync_StrictVersioningWithUnversionedFallback_RejectsMismatchBeforeFactoryDispatch() + { + // Arrange + const string orchestrationName = "StrictFallbackWorkflow"; + string completionToken = Guid.NewGuid().ToString("N"); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + Version = "1.0", + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.Strict, + FailureStrategy = DurableTaskWorkerOptions.VersionFailureStrategy.Reject, + OrchestratorUnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.CatchAll, + }, + Logging = { UseLegacyCategories = false }, + }; + Mock factoryMock = new(MockBehavior.Strict); + GrpcDurableTaskWorker worker = CreateWorker( + new GrpcDurableTaskWorkerOptions(), + workerOptions, + NullLoggerFactory.Instance, + factoryMock.Object); + Mock clientMock = new( + MockBehavior.Strict, + Mock.Of()); + TaskCompletionSource abandonRequest = new( + TaskCreationOptions.RunContinuationsAsynchronously); + clientMock + .Setup(c => c.AbandonTaskOrchestratorWorkItemAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns((P.AbandonOrchestrationTaskRequest request, Metadata _, DateTime? _, CancellationToken _) => + { + abandonRequest.SetResult(request); + return CreateUnaryCall(Task.FromResult(new P.AbandonOrchestrationTaskResponse())); + }); + object processor = CreateProcessor(worker, clientMock.Object); + P.OrchestratorRequest request = CreateOrchestratorRequest( + orchestrationName, + instanceVersion: "2.0"); + + // Act + await InvokeProcessorRunOrchestratorAsync(processor, request, completionToken); + + // Assert + P.AbandonOrchestrationTaskRequest actualRequest = await abandonRequest.Task.WaitAsync(TimeSpan.FromSeconds(5)); + actualRequest.CompletionToken.Should().Be(completionToken); + factoryMock.Verify( + f => f.TryCreateOrchestrator( + It.IsAny(), + It.IsAny(), + out It.Ref.IsAny), + Times.Never); + clientMock.Verify( + c => c.CompleteOrchestratorTaskAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Never); + } + [Fact] public void Constructor_NoWorkerVersioningWithoutRegistryContents_DoesNotThrow() { @@ -606,6 +674,17 @@ static object CreateProcessor(GrpcDurableTaskWorker worker, P.TaskHubSidecarServ return (AsyncServerStreamingCall)task.GetType().GetProperty("Result")!.GetValue(task)!; } + static async Task InvokeProcessorRunOrchestratorAsync( + object processor, + P.OrchestratorRequest request, + string completionToken) + { + Task task = (Task)ProcessorRunOrchestratorAsyncMethod.Invoke( + processor, + new object?[] { request, completionToken, CancellationToken.None })!; + await task; + } + static async Task InvokeProcessorExecuteAsync(object processor, CancellationToken cancellationToken) { Task task = (Task)ProcessorExecuteAsyncMethod.Invoke(processor, new object?[] { cancellationToken })!; @@ -613,6 +692,36 @@ static async Task InvokeProcessorExecuteAsync(object proces return (ProcessorExitReason)task.GetType().GetProperty("Result")!.GetValue(task)!; } + static P.OrchestratorRequest CreateOrchestratorRequest(string orchestrationName, string instanceVersion) + { + string instanceId = Guid.NewGuid().ToString("N"); + string executionId = Guid.NewGuid().ToString("N"); + return new P.OrchestratorRequest + { + InstanceId = instanceId, + ExecutionId = executionId, + NewEvents = + { + new P.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new P.ExecutionStartedEvent + { + Name = orchestrationName, + Version = instanceVersion, + Input = "\"input\"", + OrchestrationInstance = new P.OrchestrationInstance + { + InstanceId = instanceId, + ExecutionId = executionId, + }, + }, + }, + }, + }; + } + static void InvokeApplySuccessfulRecreate( GrpcDurableTaskWorker worker, object result, diff --git a/test/Worker/Grpc.Tests/WorkItemStreamConsumerTests.cs b/test/Worker/Grpc.Tests/WorkItemStreamConsumerTests.cs index 2464c0c7..6e4ba572 100644 --- a/test/Worker/Grpc.Tests/WorkItemStreamConsumerTests.cs +++ b/test/Worker/Grpc.Tests/WorkItemStreamConsumerTests.cs @@ -142,9 +142,22 @@ public async Task PerItem_HeartbeatReset_KeepsTimerAlive() // Feed one item, wait long enough that the original timer would have expired, then complete. // Synchronize on the first item actually being processed so the second delay is measured from // the consumer's timer reset instead of from the test thread's write timing. + // + // Timing budget (with 500ms slack on both sides of the assertion to survive CI scheduling jitter): + // - The consumer arms a 2000ms silent-disconnect timer. + // - Test thread delays 1000ms, then writes the 1st item. + // - Test thread awaits firstItemProcessed; the consumer dequeues the 1st item and re-arms the + // timer at T = 1000ms + (small, variable processing delay). + // - Test thread delays another 1500ms, then writes the 2nd item. + // - Total elapsed before the 2nd write is at least 2500ms (1000 + 1500) plus the variable + // time to process the 1st item — well past the original 2000ms timer, which proves the test + // exercises the per-item reset path. + // - After the reset, the new timer fires ~2000ms after the 1st item was dequeued, leaving + // ~500ms margin between the 2nd item write and the new timer expiry. Channel channel = Channel.CreateUnbounded(); - TimeSpan timeout = TimeSpan.FromMilliseconds(500); + TimeSpan timeout = TimeSpan.FromMilliseconds(2000); TaskCompletionSource firstItemProcessed = new(TaskCreationOptions.RunContinuationsAsynchronously); + TaskCompletionSource secondItemProcessed = new(TaskCreationOptions.RunContinuationsAsynchronously); int itemCount = 0; Task consumeTask = WorkItemStreamConsumer.ConsumeAsync( @@ -152,21 +165,31 @@ public async Task PerItem_HeartbeatReset_KeepsTimerAlive() silentDisconnectTimeout: timeout, onItem: _ => { - if (Interlocked.Increment(ref itemCount) == 1) + int seen = Interlocked.Increment(ref itemCount); + if (seen == 1) { firstItemProcessed.TrySetResult(); } + else if (seen == 2) + { + secondItemProcessed.TrySetResult(); + } }, onFirstMessage: null, cancellation: CancellationToken.None); - await Task.Delay(TimeSpan.FromMilliseconds(150)); + await Task.Delay(TimeSpan.FromMilliseconds(1000)); await channel.Writer.WriteAsync(new P.WorkItem { HealthPing = new P.HealthPing() }); - await firstItemProcessed.Task.WaitAsync(TimeSpan.FromSeconds(5)); + await firstItemProcessed.Task.WaitAsync(TimeSpan.FromSeconds(10)); // Without the per-item reset, the original timer would fire before this second item arrives. - await Task.Delay(TimeSpan.FromMilliseconds(400)); + await Task.Delay(TimeSpan.FromMilliseconds(1500)); await channel.Writer.WriteAsync(new P.WorkItem { HealthPing = new P.HealthPing() }); + + // Wait for the consumer to actually dequeue and process the 2nd item (which re-arms the timer) + // before completing the channel. Without this barrier, the test could observe a SilentDisconnect + // if the timer fires after the test writes the 2nd item but before the consumer dequeues it. + await secondItemProcessed.Task.WaitAsync(TimeSpan.FromSeconds(10)); channel.Writer.Complete(); WorkItemStreamResult result = await consumeTask;