From c603878c61055e911e5577f062362afa5dd3179e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 7 Apr 2026 01:25:09 +0000 Subject: [PATCH] feat: implement dynamic resumable code for taskSeq/taskSeqDynamic, fixes #246 Adds TaskSeqDynamicInfo<'T> and TaskSeqDynamic<'T> types that implement the dynamic (ResumptionDynamicInfo-based) execution path for taskSeq computation expressions. This fixes the NotImplementedException raised when taskSeq is used in contexts where the F# compiler cannot emit static resumable code, such as F# Interactive (FSI) or top-level functions. Key changes: - TaskSeqDynamicInfo<'T>: concrete ResumptionDynamicInfo subclass that handles MoveNext transitions (same logic as the static MoveNextMethodImpl) - TaskSeqDynamic<'T>: reference-type IAsyncEnumerable implementation using the dynamic path, with proper cloning support for re-enumeration - TaskSeqBuilder.Run: else-branch now creates TaskSeqDynamic instead of raising - taskSeqDynamic: new CE builder (inherits TaskSeqBuilder) and module value; identical to taskSeq in compiled code, uses dynamic path in FSI - 24 new tests covering: empty, single/multi yield, for/while loops, async bind, Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- release-notes.txt | 1 + .../FSharp.Control.TaskSeq.Test.fsproj | 1 + .../TaskSeq.Dynamic.Tests.CE.fs | 327 ++++++++++++++++++ src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs | 204 ++++++++++- src/FSharp.Control.TaskSeq/TaskSeqBuilder.fsi | 55 +++ 5 files changed, 578 insertions(+), 10 deletions(-) create mode 100644 src/FSharp.Control.TaskSeq.Test/TaskSeq.Dynamic.Tests.CE.fs diff --git a/release-notes.txt b/release-notes.txt index 9fb09615..d07e4fc8 100644 --- a/release-notes.txt +++ b/release-notes.txt @@ -2,6 +2,7 @@ Release notes: 1.0.0 + - adds taskSeqDynamic computation expression and TaskSeqDynamic/TaskSeqDynamicInfo types for dynamic (FSI-compatible) resumable code, fixing issue where taskSeq would raise NotImplementedException in F# Interactive, #246 - perf: TaskSeq.chunkBy and chunkByAsync reuse the ResizeArray buffer between chunks, reducing allocations on sequences with many chunk boundaries - fixes: TaskSeq.insertAt, insertManyAt, removeAt, removeManyAt, updateAt now raise ArgumentNullException (not NullReferenceException) when given a null source; insertManyAt also validates the values argument - refactor: simplify lengthBy and lengthBeforeMax to use while! and remove the redundant mutable 'go' and initial MoveNextAsync diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index 58856f1c..029fc910 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -68,6 +68,7 @@ + diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Dynamic.Tests.CE.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Dynamic.Tests.CE.fs new file mode 100644 index 00000000..ee31b361 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Dynamic.Tests.CE.fs @@ -0,0 +1,327 @@ +module TaskSeq.Tests.``taskSeqDynamic Computation Expression`` + +open System +open System.Threading + +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// ------------------------------------------------------- +// Basic sanity tests for the taskSeqDynamic CE builder. +// taskSeqDynamic uses the same static path as taskSeq when +// compiled normally, but falls back to the dynamic +// (ResumptionDynamicInfo-based) path in FSI / when the +// F# compiler cannot emit static resumable code. +// ------------------------------------------------------- + +[] +let ``CE taskSeqDynamic empty sequence`` () = task { + let ts = taskSeqDynamic { () } + let! data = ts |> TaskSeq.toListAsync + data |> should be Empty +} + +[] +let ``CE taskSeqDynamic single yield`` () = task { + let ts = taskSeqDynamic { yield 42 } + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 42 ] +} + +[] +let ``CE taskSeqDynamic multiple yields`` () = task { + let ts = taskSeqDynamic { + yield 1 + yield 2 + yield 3 + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1; 2; 3 ] +} + +[] +let ``CE taskSeqDynamic yield with for loop`` () = task { + let ts = taskSeqDynamic { + for i in 1..5 do + yield i + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1; 2; 3; 4; 5 ] +} + +[] +let ``CE taskSeqDynamic yield with async task bind`` () = task { + let ts = taskSeqDynamic { + let! x = task { return 10 } + yield x + let! y = task { return 20 } + yield y + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 10; 20 ] +} + +[] +let ``CE taskSeqDynamic yield from seq`` () = task { + let ts = taskSeqDynamic { yield! [ 1; 2; 3 ] } + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1; 2; 3 ] +} + +[] +let ``CE taskSeqDynamic yield from another taskSeqDynamic`` () = task { + let inner = taskSeqDynamic { + yield 1 + yield 2 + } + + let ts = taskSeqDynamic { + yield! inner + yield 3 + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1; 2; 3 ] +} + +[] +let ``CE taskSeqDynamic yield from taskSeq`` () = task { + let inner = taskSeq { + yield 1 + yield 2 + } + + let ts = taskSeqDynamic { yield! inner } + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1; 2 ] +} + +[] +let ``CE taskSeqDynamic with tryWith`` () = task { + let ts = taskSeqDynamic { + try + yield 1 + yield 2 + with _ -> + yield -1 + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1; 2 ] +} + +[] +let ``CE taskSeqDynamic with tryWith catching exception`` () = task { + let ts = taskSeqDynamic { + try + yield 1 + raise (InvalidOperationException "test") + yield 2 + with :? InvalidOperationException -> + yield 99 + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1; 99 ] +} + +[] +let ``CE taskSeqDynamic with tryFinally`` () = task { + let mutable finallyCalled = false + + let ts = taskSeqDynamic { + try + yield 1 + yield 2 + finally + finallyCalled <- true + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1; 2 ] + finallyCalled |> should equal true +} + +[] +let ``CE taskSeqDynamic with use`` () = task { + let mutable disposed = false + + let mkDisposable () = + { new IDisposable with + member _.Dispose() = disposed <- true + } + + let ts = taskSeqDynamic { + use _d = mkDisposable () + yield 42 + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 42 ] + disposed |> should equal true +} + +[] +let ``CE taskSeqDynamic supports re-enumeration`` () = task { + let ts = taskSeqDynamic { + yield 1 + yield 2 + yield 3 + } + + let! data1 = ts |> TaskSeq.toListAsync + let! data2 = ts |> TaskSeq.toListAsync + data1 |> should equal [ 1; 2; 3 ] + data2 |> should equal [ 1; 2; 3 ] +} + +[] +let ``CE taskSeqDynamic multiple re-enumerations produce same result`` () = task { + let ts = taskSeqDynamic { + for i in 1..10 do + yield i + } + + for _ in 1..5 do + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1..10 ] +} + +[] +let ``CE taskSeqDynamic with cancellation`` () = task { + use cts = new CancellationTokenSource() + cts.Cancel() + + let ts = taskSeqDynamic { + yield 1 + yield 2 + yield 3 + } + + let enumerator = ts.GetAsyncEnumerator(cts.Token) + + let mutable threw = false + + try + let! _ = enumerator.MoveNextAsync() + () + with :? OperationCanceledException -> + threw <- true + + threw |> should equal true + do! enumerator.DisposeAsync() +} + +[] +let ``CE taskSeqDynamic with large for loop`` () = task { + let ts = taskSeqDynamic { + for i in 1..1000 do + yield i + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1..1000 ] + data |> should haveLength 1000 +} + +[] +let ``CE taskSeqDynamic with nested for loops`` () = task { + let ts = taskSeqDynamic { + for i in 1..3 do + for j in 1..3 do + yield i * 10 + j + } + + let expected = [ + for i in 1..3 do + for j in 1..3 do + yield i * 10 + j + ] + + let! data = ts |> TaskSeq.toListAsync + data |> should equal expected +} + +[] +let ``CE taskSeqDynamic with async value task bind`` () = task { + let ts = taskSeqDynamic { + let! x = System.Threading.Tasks.ValueTask.FromResult(7) + yield x + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 7 ] +} + +[] +let ``CE taskSeqDynamic with Async bind`` () = task { + let ts = taskSeqDynamic { + let! x = async { return 99 } + yield x + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 99 ] +} + +[] +let ``CE taskSeqDynamic is IAsyncEnumerable`` () = task { + // In compiled mode this uses the static path (returns TaskSeq<_,_>), + // in FSI it returns TaskSeqDynamic<_>. Both implement IAsyncEnumerable. + let ts = taskSeqDynamic { yield 1 } + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 1 ] +} + +[] +let ``CE taskSeqDynamic empty produces no values`` () = task { + let mutable count = 0 + + let ts = taskSeqDynamic { () } + + let e = ts.GetAsyncEnumerator(CancellationToken.None) + + try + while! e.MoveNextAsync() do + count <- count + 1 + finally + () + + do! e.DisposeAsync() + count |> should equal 0 +} + +[] +let ``CE taskSeqDynamic with while loop`` () = task { + let ts = taskSeqDynamic { + let mutable i = 0 + + while i < 5 do + yield i + i <- i + 1 + } + + let! data = ts |> TaskSeq.toListAsync + data |> should equal [ 0; 1; 2; 3; 4 ] +} + +[] +let ``CE taskSeqDynamic is same type as TaskSeq`` () = + let ts: TaskSeq = taskSeqDynamic { yield 1 } + ts |> should not' (be Null) + +[] +let ``CE taskSeqDynamic with several yield!`` () = task { + let tskSeq = taskSeqDynamic { + yield! Gen.sideEffectTaskSeq 10 + yield! Gen.sideEffectTaskSeq 5 + } + + let! data = tskSeq |> TaskSeq.toListAsync + data |> should equal (List.concat [ [ 1..10 ]; [ 1..5 ] ]) +} diff --git a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs index 46c0705f..9a8fd826 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs @@ -4,6 +4,7 @@ open System.Diagnostics // note: this is *not* an experimental feature, but they forgot to switch off the flag #nowarn "57" // Experimental library feature, requires '--langversion:preview'. +#nowarn "3513" // Resumable code invocation: intentionally calling ResumableCode as a regular delegate in the dynamic (FSI) fallback path. open System open System.Collections.Generic @@ -316,6 +317,182 @@ and TaskSeqStateMachine<'T> = ResumableStateMachine> and TaskSeqResumptionFunc<'T> = ResumptionFunc> and TaskSeqResumptionDynamicInfo<'T> = ResumptionDynamicInfo> +/// Implements the dynamic (FSI) path for ResumptionDynamicInfo, used by TaskSeqDynamic. +/// Handles the state-machine transitions that the compiler-generated MoveNextMethodImpl handles in the static path. +and [] TaskSeqDynamicInfo<'T>(initialResumptionFunc: TaskSeqResumptionFunc<'T>) = + inherit TaskSeqResumptionDynamicInfo<'T>(initialResumptionFunc) + + override this.MoveNext(sm: byref>) = + try + Debug.logInfo "at TaskSeqDynamicInfo.MoveNext start" + + let __stack_code_fin = this.ResumptionFunc.Invoke(&sm) + + if __stack_code_fin then + Debug.logInfo "at TaskSeqDynamicInfo.MoveNext, done" + + sm.Data.promiseOfValueOrEnd.SetResult(false) + sm.Data.builder.Complete() + sm.Data.completed <- true + + elif sm.Data.current.IsSome then + Debug.logInfo "at TaskSeqDynamicInfo.MoveNext, still more items" + + sm.Data.promiseOfValueOrEnd.SetResult(true) + + else + Debug.logInfo "at TaskSeqDynamicInfo.MoveNext, await" + + let boxed = sm.Data.boxedSelf + + sm.Data.awaiter.UnsafeOnCompleted(fun () -> + let mutable boxed = boxed + moveNextRef &boxed) + + with exn -> + Debug.logInfo ("Setting exception of PromiseOfValueOrEnd to: ", exn.Message) + sm.Data.promiseOfValueOrEnd.SetException(exn) + sm.Data.builder.Complete() + + override _.SetStateMachine(_machine: byref>, _state: IAsyncStateMachine) = () + +/// Dynamic (FSI) implementation of IAsyncEnumerable for taskSeq computation expressions. +/// Used when the F# compiler cannot emit static resumable code (e.g., in F# Interactive). +and [] TaskSeqDynamic<'T>() = + inherit TaskSeqBase<'T>() + + let initialThreadId = Environment.CurrentManagedThreadId + + [] + val mutable _machine: TaskSeqStateMachine<'T> + + [] + val mutable _initialResumptionFunc: TaskSeqResumptionFunc<'T> + + member this.InitDynamicMachineData(ct: CancellationToken) = + let data = TaskSeqStateMachineData() + data.boxedSelf <- this + data.cancellationToken <- ct + data.builder <- AsyncIteratorMethodBuilder.Create() + this._machine.Data <- data + this._machine.ResumptionDynamicInfo <- TaskSeqDynamicInfo(this._initialResumptionFunc) + + interface IValueTaskSource with + member this.GetResult token = + let canMoveNext = this._machine.Data.promiseOfValueOrEnd.GetResult token + + if not canMoveNext then + this._machine.Data.completed <- true + + member this.GetStatus token = this._machine.Data.promiseOfValueOrEnd.GetStatus token + + member this.OnCompleted(continuation, state, token, flags) = + this._machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) + + interface IValueTaskSource with + member this.GetStatus token = this._machine.Data.promiseOfValueOrEnd.GetStatus token + + member this.GetResult token = + let canMoveNext = this._machine.Data.promiseOfValueOrEnd.GetResult token + + if not canMoveNext then + this._machine.Data.completed <- true + + canMoveNext + + member this.OnCompleted(continuation, state, token, flags) = + this._machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) + + interface IAsyncStateMachine with + member this.MoveNext() = moveNextRef &this._machine + member _.SetStateMachine(_state) = () + + interface IAsyncEnumerable<'T> with + member this.GetAsyncEnumerator(ct) = + match this._machine.Data :> obj with + | null when initialThreadId = Environment.CurrentManagedThreadId -> + this.InitDynamicMachineData(ct) + this + | _ -> + Debug.logInfo "TaskSeqDynamic.GetAsyncEnumerator, cloning..." + let clone = TaskSeqDynamic<'T>() + clone._initialResumptionFunc <- this._initialResumptionFunc + clone.InitDynamicMachineData(ct) + clone + + interface IAsyncEnumerator<'T> with + member this.Current = + match this._machine.Data.current with + | ValueSome x -> x + | ValueNone -> Unchecked.defaultof<'T> + + member this.MoveNextAsync() = + Debug.logInfo "TaskSeqDynamic.MoveNextAsync..." + + if this._machine.ResumptionPoint = -1 then + Debug.logInfo "at TaskSeqDynamic.MoveNextAsync: Resumption point = -1" + ValueTask.False + + elif this._machine.Data.completed then + Debug.logInfo "at TaskSeqDynamic.MoveNextAsync: completed = true" + this._machine.Data.promiseOfValueOrEnd.Reset() + ValueTask.False + + else + Debug.logInfo "at TaskSeqDynamic.MoveNextAsync: normal resumption" + let data = this._machine.Data + data.cancellationToken.ThrowIfCancellationRequested() + data.promiseOfValueOrEnd.Reset() + let mutable ts = this + data.builder.MoveNext(&ts) + this.MoveNextAsyncResult() + + member this.DisposeAsync() = + task { + match this._machine.Data.disposalStack with + | null -> () + | _ -> + let mutable exn = None + + for d in Seq.rev this._machine.Data.disposalStack do + try + do! d () + with e -> + if exn.IsNone then + exn <- Some e + + match exn with + | None -> () + | Some e -> raise e + } + |> ValueTask + + override this.MoveNextAsyncResult() = + let data = this._machine.Data + let version = data.promiseOfValueOrEnd.Version + let status = data.promiseOfValueOrEnd.GetStatus(version) + + match status with + | ValueTaskSourceStatus.Succeeded -> + Debug.logInfo "at TaskSeqDynamic MoveNextAsyncResult: case succeeded..." + + let result = data.promiseOfValueOrEnd.GetResult(version) + + if not result then + data.current <- ValueNone + + ValueTask.fromResult result + + | ValueTaskSourceStatus.Faulted + | ValueTaskSourceStatus.Canceled + | ValueTaskSourceStatus.Pending as state -> + Debug.logInfo ("at TaskSeqDynamic MoveNextAsyncResult: case ", state) + + ValueTask.ofSource this version + | _ -> + Debug.logInfo "at TaskSeqDynamic MoveNextAsyncResult: Unexpected state" + ValueTask.ofSource this version + type TaskSeqBuilder() = member inline _.Delay(f: unit -> ResumableTSC<'T>) = ResumableTSC<'T>(fun sm -> f().Invoke(&sm)) @@ -379,16 +556,11 @@ type TaskSeqBuilder() = ts._machine <- sm ts :> IAsyncEnumerable<'T>)) else - // let initialResumptionFunc = TaskSeqResumptionFunc<'T>(fun sm -> code.Invoke(&sm)) - // let resumptionFuncExecutor = TaskSeqResumptionExecutor<'T>(fun sm f -> - // // TODO: add exception handling? - // if f.Invoke(&sm) then - // sm.ResumptionPoint <- -2) - // let setStateMachine = SetStateMachineMethodImpl<_>(fun sm f -> ()) - // sm.Machine.ResumptionFuncInfo <- (initialResumptionFunc, resumptionFuncExecutor, setStateMachine) - //sm.Start() - NotImplementedException "No dynamic implementation for TaskSeq yet." - |> raise + // Dynamic path, used when __useResumableCode = false (e.g., in F# Interactive / FSI). + // Uses TaskSeqDynamic which drives the resumable code via ResumptionDynamicInfo. + let ts = TaskSeqDynamic<'T>() + ts._initialResumptionFunc <- TaskSeqResumptionFunc<'T>(fun sm -> code.Invoke(&sm)) + ts :> IAsyncEnumerable<'T> member inline _.Zero() : ResumableTSC<'T> = @@ -685,3 +857,15 @@ module HighPriority = module TaskSeqBuilder = /// Builds an asynchronous task sequence based on IAsyncEnumerable<'T> using computation expression syntax. let taskSeq = TaskSeqBuilder() + +/// Builder for computation expressions. Inherits all members from +/// , using the dynamic (ResumptionDynamicInfo-based) path when the +/// F# compiler cannot emit static resumable code (e.g., in F# Interactive). +type TaskSeqDynamicBuilder() = + inherit TaskSeqBuilder() + +[] +module TaskSeqDynamicBuilder = + /// Builds an asynchronous task sequence, with a dynamic resumable code fallback for scenarios + /// where the F# compiler cannot generate static resumable code (e.g., in F# Interactive / FSI). + let taskSeqDynamic = TaskSeqDynamicBuilder() diff --git a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fsi b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fsi index e0607046..9caa7091 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fsi @@ -48,6 +48,8 @@ and ResumableTSC<'T> = ResumableCode, unit> /// For use in this library only. Required by the method. /// and TaskSeqStateMachine<'T> = ResumableStateMachine> +and TaskSeqResumptionFunc<'T> = ResumptionFunc> +and TaskSeqResumptionDynamicInfo<'T> = ResumptionDynamicInfo> /// /// Contains the state data for the computation expression builder. @@ -133,6 +135,40 @@ and [] TaskSeq<'Machine, 'T member InitMachineData: ct: CancellationToken * machine: byref<'Machine> -> unit override MoveNextAsyncResult: unit -> ValueTask +/// +/// Concrete implementation of for taskSeq computation +/// expressions, used in the dynamic (FSI) path. Handles state-machine transitions when the F# compiler +/// cannot generate static resumable code. +/// For use by this library only. +/// +and [] TaskSeqDynamicInfo<'T> = + inherit TaskSeqResumptionDynamicInfo<'T> + new: initialResumptionFunc: TaskSeqResumptionFunc<'T> -> TaskSeqDynamicInfo<'T> + +/// +/// Dynamic (FSI-compatible) implementation of for taskSeq +/// computation expressions. Used when the F# compiler cannot generate static resumable code (e.g., in FSI). +/// For use by this library only. +/// +and [] TaskSeqDynamic<'T> = + inherit TaskSeqBase<'T> + interface IAsyncEnumerator<'T> + interface IAsyncEnumerable<'T> + interface IAsyncStateMachine + interface IValueTaskSource + interface IValueTaskSource + + new: unit -> TaskSeqDynamic<'T> + + [] + val mutable _machine: TaskSeqStateMachine<'T> + + [] + val mutable _initialResumptionFunc: TaskSeqResumptionFunc<'T> + + member InitDynamicMachineData: ct: CancellationToken -> unit + override MoveNextAsyncResult: unit -> ValueTask + /// /// Main builder class for the computation expression. /// @@ -209,3 +245,22 @@ module HighPriority = member inline Bind: task: Task<'T> * continuation: ('T -> ResumableTSC<'U>) -> ResumableTSC<'U> member inline Bind: computation: Async<'T> * continuation: ('T -> ResumableTSC<'U>) -> ResumableTSC<'U> + +/// +/// Builder class for the computation expression. Inherits all members +/// from , using the dynamic resumable code path as fallback when the +/// F# compiler cannot generate static resumable code (e.g., in F# Interactive / FSI). +/// +[] +type TaskSeqDynamicBuilder = + inherit TaskSeqBuilder + new: unit -> TaskSeqDynamicBuilder + +[] +module TaskSeqDynamicBuilder = + + /// + /// Builds an asynchronous task sequence, with a dynamic resumable code fallback for scenarios + /// where the F# compiler cannot generate static resumable code (e.g., in F# Interactive / FSI). + /// + val taskSeqDynamic: TaskSeqDynamicBuilder