diff --git a/.gitignore b/.gitignore index dfcfd56f..f5910e87 100644 --- a/.gitignore +++ b/.gitignore @@ -348,3 +348,4 @@ MigrationBackup/ # Ionide (cross platform F# VS Code tools) working folder .ionide/ +*.ncrunchproject diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index ec3384ce..7b835749 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -17,7 +17,8 @@ open FSharp.Core.CompilerServices.StateMachineHelpers module Internal = // cannot be marked with 'internal' scope let verbose = false - let inline MoveNext (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext() + /// Call MoveNext on an IAsyncStateMachine by reference + let inline moveNextRef (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext() // F# requires that we implement interfaces even on an abstract class let inline raiseNotImpl () = @@ -63,17 +64,15 @@ type TaskSeqStateMachineData<'T>() = [] val mutable tailcallTarget: TaskSeq<'T> option - member data.PushDispose(f: unit -> Task) = - match data.disposalStack with - | null -> data.disposalStack <- ResizeArray() - | _ -> () + member data.PushDispose(disposer: unit -> Task) = + if isNull data.disposalStack then + data.disposalStack <- ResizeArray() - data.disposalStack.Add(f) + data.disposalStack.Add disposer member data.PopDispose() = - match data.disposalStack with - | null -> () - | _ -> data.disposalStack.RemoveAt(data.disposalStack.Count - 1) + if not (isNull data.disposalStack) then + data.disposalStack.RemoveAt(data.disposalStack.Count - 1) and [] TaskSeq<'T>() = @@ -112,66 +111,93 @@ and [] TaskSeq<'Machine, 'T [] val mutable Machine: 'Machine - member internal ts.hijack() = - let res = ts.Machine.Data.tailcallTarget + member internal this.hijack() = + let res = this.Machine.Data.tailcallTarget match res with | Some tg -> + // we get here only when there are multiple returns (it seems) + // hence the tailcall logic match tg.TailcallTarget with | None -> res | (Some tg2 as res2) -> // Cut out chains of tailcalls - ts.Machine.Data.tailcallTarget <- Some tg2 + this.Machine.Data.tailcallTarget <- Some tg2 res2 | None -> res // Note: Not entirely clear if this is needed, everything still compiles without it interface IValueTaskSource with - member ts.GetResult(token: int16) = - match ts.hijack () with + member this.GetResult(token: int16) = + match this.hijack () with | Some tg -> (tg :> IValueTaskSource).GetResult(token) | None -> - ts.Machine.Data.promiseOfValueOrEnd.GetResult(token) + this.Machine.Data.promiseOfValueOrEnd.GetResult(token) |> ignore - member ts.GetStatus(token: int16) = - match ts.hijack () with + member this.GetStatus(token: int16) = + match this.hijack () with | Some tg -> (tg :> IValueTaskSource).GetStatus(token) - | None -> ts.Machine.Data.promiseOfValueOrEnd.GetStatus(token) + | None -> this.Machine.Data.promiseOfValueOrEnd.GetStatus(token) - member ts.OnCompleted(continuation, state, token, flags) = - match ts.hijack () with + member this.OnCompleted(continuation, state, token, flags) = + match this.hijack () with | Some tg -> (tg :> IValueTaskSource).OnCompleted(continuation, state, token, flags) - | None -> ts.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) + | None -> this.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) // Needed for MoveNextAsync to return a ValueTask interface IValueTaskSource with - member ts.GetStatus(token: int16) = - match ts.hijack () with + member this.GetStatus(token: int16) = + match this.hijack () with | Some tg -> (tg :> IValueTaskSource).GetStatus(token) - | None -> ts.Machine.Data.promiseOfValueOrEnd.GetStatus(token) + | None -> this.Machine.Data.promiseOfValueOrEnd.GetStatus(token) - member ts.GetResult(token: int16) = - match ts.hijack () with - | Some tg -> (tg :> IValueTaskSource).GetResult(token) - | None -> ts.Machine.Data.promiseOfValueOrEnd.GetResult(token) - - member ts.OnCompleted(continuation, state, token, flags) = - match ts.hijack () with + member this.GetResult(token: int16) = + match this.hijack () with + | Some tg -> + if verbose then + printfn + "Getting result for token on 'Some' branch, status: %A" + ((tg :> IValueTaskSource).GetStatus(token)) + (tg :> IValueTaskSource).GetResult(token) + | None -> + try + if verbose then + printfn + "Getting result for token on 'None' branch, status: %A" + (this.Machine.Data.promiseOfValueOrEnd.GetStatus(token)) + this.Machine.Data.promiseOfValueOrEnd.GetResult(token) + with e -> + // FYI: an exception here is usually caused by the CE statement (user code) throwing an exception + // We're just logging here because the following error would also be caught right here: + // "An attempt was made to transition a task to a final state when it had already completed." + if verbose then + printfn "Error '%s' for token: %i" e.Message token + + reraise () + member this.OnCompleted(continuation, state, token, flags) = + match this.hijack () with | Some tg -> (tg :> IValueTaskSource).OnCompleted(continuation, state, token, flags) - | None -> ts.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) + | None -> this.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) interface IAsyncStateMachine with - member ts.MoveNext() = - match ts.hijack () with - | Some tg -> (tg :> IAsyncStateMachine).MoveNext() - | None -> MoveNext(&ts.Machine) - - member _.SetStateMachine(_state) = () // not needed for reference type + /// The MoveNext method is called by builder.MoveNext() in the resumable code + member this.MoveNext() = + match this.hijack () with + | Some tg -> + // jump to the hijacked method + (tg :> IAsyncStateMachine).MoveNext() + | None -> moveNextRef &this.Machine + + /// SetStatemachine is (currently) never called + member _.SetStateMachine(_state) = + if verbose then + printfn "Setting state machine -- ignored" + () // not needed for reference type interface IAsyncEnumerable<'T> with - member ts.GetAsyncEnumerator(ct) = - let data = ts.Machine.Data + member this.GetAsyncEnumerator(ct) = + let data = this.Machine.Data if (not data.taken @@ -180,31 +206,37 @@ and [] TaskSeq<'Machine, 'T data.taken <- true data.cancellationToken <- ct data.builder <- AsyncIteratorMethodBuilder.Create() - (ts :> IAsyncEnumerator<_>) + if verbose then + printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint + (this :> IAsyncEnumerator<_>) else if verbose then printfn "GetAsyncEnumerator, cloning..." - let clone = ts.MemberwiseClone() :?> TaskSeq<'Machine, 'T> + // it appears that the issue is possibly caused by the problem + // of having ValueTask all over the place, and by going over the + // iteration twice, we are trying to *await* twice, which is not allowed + // see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4 + let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> data.taken <- true clone.Machine.Data.cancellationToken <- ct (clone :> System.Collections.Generic.IAsyncEnumerator<'T>) interface IAsyncDisposable with - member ts.DisposeAsync() = - match ts.hijack () with + member this.DisposeAsync() = + match this.hijack () with | Some tg -> (tg :> IAsyncDisposable).DisposeAsync() | None -> if verbose then printfn "DisposeAsync..." task { - match ts.Machine.Data.disposalStack with + match this.Machine.Data.disposalStack with | null -> () | _ -> let mutable exn = None - for d in Seq.rev ts.Machine.Data.disposalStack do + for d in Seq.rev this.Machine.Data.disposalStack do try do! d () with e -> @@ -218,47 +250,68 @@ and [] TaskSeq<'Machine, 'T |> ValueTask interface System.Collections.Generic.IAsyncEnumerator<'T> with - member ts.Current = - match ts.hijack () with + member this.Current = + match this.hijack () with | Some tg -> (tg :> IAsyncEnumerator<'T>).Current | None -> - match ts.Machine.Data.current with + match this.Machine.Data.current with | ValueSome x -> x | ValueNone -> failwith "no current value" - member ts.MoveNextAsync() = - match ts.hijack () with + member this.MoveNextAsync() = + match this.hijack () with | Some tg -> (tg :> IAsyncEnumerator<'T>).MoveNextAsync() | None -> if verbose then printfn "MoveNextAsync..." - if ts.Machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable + if this.Machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable + if verbose then + printfn "at MoveNextAsync: Resumption point = -1" ValueTask() else - let data = ts.Machine.Data + if verbose then + printfn "at MoveNextAsync: normal resumption scenario" + let data = this.Machine.Data data.promiseOfValueOrEnd.Reset() - let mutable ts = ts + let mutable ts = this + + if verbose then + printfn "at MoveNextAsync: start calling builder.MoveNext()" data.builder.MoveNext(&ts) + if verbose then + printfn "at MoveNextAsync: done calling builder.MoveNext()" // If the move did a hijack then get the result from the final one - match ts.hijack () with + match this.hijack () with | Some tg -> tg.MoveNextAsyncResult() - | None -> ts.MoveNextAsyncResult() + | None -> this.MoveNextAsyncResult() - override ts.MoveNextAsyncResult() = - let data = ts.Machine.Data + + override this.MoveNextAsyncResult() = + let data = this.Machine.Data let version = data.promiseOfValueOrEnd.Version let status = data.promiseOfValueOrEnd.GetStatus(version) - if status = ValueTaskSourceStatus.Succeeded then + match status with + | ValueTaskSourceStatus.Succeeded -> + if verbose then + printfn "at MoveNextAsyncResult: case succeeded..." let result = data.promiseOfValueOrEnd.GetResult(version) ValueTask(result) - else + + | ValueTaskSourceStatus.Faulted + | ValueTaskSourceStatus.Canceled + | ValueTaskSourceStatus.Pending -> if verbose then - printfn "MoveNextAsync pending/faulted/cancelled..." + printfn "at MoveNextAsyncResult: case pending/faulted/cancelled..." - ValueTask(ts, version) // uses IValueTaskSource<'T> + ValueTask(this, version) // uses IValueTaskSource<'T> + | _ -> + if verbose then + printfn "at MoveNextAsyncResult: Unexpected state" + // assume it's a possibly new, not yet supported case, treat as default + ValueTask(this, version) // uses IValueTaskSource<'T> override cr.TailcallTarget = cr.hijack () @@ -280,43 +333,57 @@ type TaskSeqBuilder() = //-- RESUMABLE CODE START __resumeAt sm.ResumptionPoint + if verbose then + printfn "Resuming at resumption point %i" sm.ResumptionPoint try - //printfn "at Run.MoveNext start" - //Console.WriteLine("[{0}] resuming by invoking {1}....", sm.MethodBuilder.Task.Id, hashq sm.ResumptionFunc ) + if verbose then + printfn "at Run.MoveNext start" + let __stack_code_fin = code.Invoke(&sm) - //printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}" + if verbose then + printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}" if __stack_code_fin then - //printfn $"at Run.MoveNext, done" + if verbose then + printfn $"at Run.MoveNext, done" sm.Data.promiseOfValueOrEnd.SetResult(false) sm.Data.builder.Complete() elif sm.Data.current.IsSome then - //printfn $"at Run.MoveNext, yield" + if verbose then + printfn $"at Run.MoveNext, yield" sm.Data.promiseOfValueOrEnd.SetResult(true) else // Goto request match sm.Data.tailcallTarget with | Some tg -> - //printfn $"at Run.MoveNext, hijack" + if verbose then + printfn $"at Run.MoveNext, hijack" let mutable tg = tg - MoveNext(&tg) + moveNextRef &tg | None -> - //printfn $"at Run.MoveNext, await" + if verbose then + printfn $"at Run.MoveNext, await" let boxed = sm.Data.boxed sm.Data.awaiter.UnsafeOnCompleted( Action(fun () -> let mutable boxed = boxed - MoveNext(&boxed)) + moveNextRef &boxed) ) with exn -> - //Console.WriteLine("[{0}] SetException {1}", sm.MethodBuilder.Task.Id, exn) + if verbose then + printfn "Setting exception of PromiseOfValueOrEnd to: %s" exn.Message sm.Data.promiseOfValueOrEnd.SetException(exn) sm.Data.builder.Complete() //-- RESUMABLE CODE END )) - (SetStateMachineMethodImpl<_>(fun sm state -> ())) + (SetStateMachineMethodImpl<_>(fun sm state -> + if verbose then + printfn "at SetStatemachingMethodImpl, ignored" + ())) (AfterCode<_, _>(fun sm -> + if verbose then + printfn "at AfterCode<_, _>, setting the Machine field to the StateMachine" let ts = TaskSeq, 'T>() ts.Machine <- sm ts.Machine.Data <- TaskSeqStateMachineData() @@ -353,9 +420,13 @@ type TaskSeqBuilder() = let __stack_vtask = condition () if __stack_vtask.IsCompleted then + if verbose then + printfn "Returning completed task (in while)" __stack_condition_fin <- true condition_res <- __stack_vtask.Result else + if verbose then + printfn "Awaiting non-completed task (in while)" let task = __stack_vtask.AsTask() let mutable awaiter = task.GetAwaiter() // This will yield with __stack_fin = false