Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,4 @@ MigrationBackup/

# Ionide (cross platform F# VS Code tools) working folder
.ionide/
*.ncrunchproject
217 changes: 144 additions & 73 deletions src/FSharpy.TaskSeq/TaskSeqBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ open FSharp.Core.CompilerServices.StateMachineHelpers
module Internal = // cannot be marked with 'internal' scope
let verbose = false

let inline MoveNext (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext()
/// Call MoveNext on an IAsyncStateMachine by reference
let inline moveNextRef (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext()

// F# requires that we implement interfaces even on an abstract class
let inline raiseNotImpl () =
Expand Down Expand Up @@ -63,17 +64,15 @@ type TaskSeqStateMachineData<'T>() =
[<DefaultValue(false)>]
val mutable tailcallTarget: TaskSeq<'T> option

member data.PushDispose(f: unit -> Task) =
match data.disposalStack with
| null -> data.disposalStack <- ResizeArray()
| _ -> ()
member data.PushDispose(disposer: unit -> Task) =
if isNull data.disposalStack then
data.disposalStack <- ResizeArray()

data.disposalStack.Add(f)
data.disposalStack.Add disposer

member data.PopDispose() =
match data.disposalStack with
| null -> ()
| _ -> data.disposalStack.RemoveAt(data.disposalStack.Count - 1)
if not (isNull data.disposalStack) then
data.disposalStack.RemoveAt(data.disposalStack.Count - 1)

and [<AbstractClass; NoEquality; NoComparison>] TaskSeq<'T>() =

Expand Down Expand Up @@ -112,66 +111,93 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
[<DefaultValue(false)>]
val mutable Machine: 'Machine

member internal ts.hijack() =
let res = ts.Machine.Data.tailcallTarget
member internal this.hijack() =
let res = this.Machine.Data.tailcallTarget

match res with
| Some tg ->
// we get here only when there are multiple returns (it seems)
// hence the tailcall logic
match tg.TailcallTarget with
| None -> res
| (Some tg2 as res2) ->
// Cut out chains of tailcalls
ts.Machine.Data.tailcallTarget <- Some tg2
this.Machine.Data.tailcallTarget <- Some tg2
res2
| None -> res

// Note: Not entirely clear if this is needed, everything still compiles without it
interface IValueTaskSource with
member ts.GetResult(token: int16) =
match ts.hijack () with
member this.GetResult(token: int16) =
match this.hijack () with
| Some tg -> (tg :> IValueTaskSource).GetResult(token)
| None ->
ts.Machine.Data.promiseOfValueOrEnd.GetResult(token)
this.Machine.Data.promiseOfValueOrEnd.GetResult(token)
|> ignore

member ts.GetStatus(token: int16) =
match ts.hijack () with
member this.GetStatus(token: int16) =
match this.hijack () with
| Some tg -> (tg :> IValueTaskSource<bool>).GetStatus(token)
| None -> ts.Machine.Data.promiseOfValueOrEnd.GetStatus(token)
| None -> this.Machine.Data.promiseOfValueOrEnd.GetStatus(token)

member ts.OnCompleted(continuation, state, token, flags) =
match ts.hijack () with
member this.OnCompleted(continuation, state, token, flags) =
match this.hijack () with
| Some tg -> (tg :> IValueTaskSource).OnCompleted(continuation, state, token, flags)
| None -> ts.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags)
| None -> this.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags)

// Needed for MoveNextAsync to return a ValueTask
interface IValueTaskSource<bool> with
member ts.GetStatus(token: int16) =
match ts.hijack () with
member this.GetStatus(token: int16) =
match this.hijack () with
| Some tg -> (tg :> IValueTaskSource<bool>).GetStatus(token)
| None -> ts.Machine.Data.promiseOfValueOrEnd.GetStatus(token)
| None -> this.Machine.Data.promiseOfValueOrEnd.GetStatus(token)

member ts.GetResult(token: int16) =
match ts.hijack () with
| Some tg -> (tg :> IValueTaskSource<bool>).GetResult(token)
| None -> ts.Machine.Data.promiseOfValueOrEnd.GetResult(token)

member ts.OnCompleted(continuation, state, token, flags) =
match ts.hijack () with
member this.GetResult(token: int16) =
match this.hijack () with
| Some tg ->
if verbose then
printfn
"Getting result for token on 'Some' branch, status: %A"
((tg :> IValueTaskSource<bool>).GetStatus(token))
(tg :> IValueTaskSource<bool>).GetResult(token)
| None ->
try
if verbose then
printfn
"Getting result for token on 'None' branch, status: %A"
(this.Machine.Data.promiseOfValueOrEnd.GetStatus(token))
this.Machine.Data.promiseOfValueOrEnd.GetResult(token)
with e ->
// FYI: an exception here is usually caused by the CE statement (user code) throwing an exception
// We're just logging here because the following error would also be caught right here:
// "An attempt was made to transition a task to a final state when it had already completed."
if verbose then
printfn "Error '%s' for token: %i" e.Message token

reraise ()
member this.OnCompleted(continuation, state, token, flags) =
match this.hijack () with
| Some tg -> (tg :> IValueTaskSource<bool>).OnCompleted(continuation, state, token, flags)
| None -> ts.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags)
| None -> this.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags)

interface IAsyncStateMachine with
member ts.MoveNext() =
match ts.hijack () with
| Some tg -> (tg :> IAsyncStateMachine).MoveNext()
| None -> MoveNext(&ts.Machine)

member _.SetStateMachine(_state) = () // not needed for reference type
/// The MoveNext method is called by builder.MoveNext() in the resumable code
member this.MoveNext() =
match this.hijack () with
| Some tg ->
// jump to the hijacked method
(tg :> IAsyncStateMachine).MoveNext()
| None -> moveNextRef &this.Machine

/// SetStatemachine is (currently) never called
member _.SetStateMachine(_state) =
if verbose then
printfn "Setting state machine -- ignored"
() // not needed for reference type

interface IAsyncEnumerable<'T> with
member ts.GetAsyncEnumerator(ct) =
let data = ts.Machine.Data
member this.GetAsyncEnumerator(ct) =
let data = this.Machine.Data

if
(not data.taken
Expand All @@ -180,31 +206,37 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
data.taken <- true
data.cancellationToken <- ct
data.builder <- AsyncIteratorMethodBuilder.Create()
(ts :> IAsyncEnumerator<_>)
if verbose then
printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint
(this :> IAsyncEnumerator<_>)
else
if verbose then
printfn "GetAsyncEnumerator, cloning..."

let clone = ts.MemberwiseClone() :?> TaskSeq<'Machine, 'T>
// it appears that the issue is possibly caused by the problem
// of having ValueTask all over the place, and by going over the
// iteration twice, we are trying to *await* twice, which is not allowed
// see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4
let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T>
data.taken <- true
clone.Machine.Data.cancellationToken <- ct
(clone :> System.Collections.Generic.IAsyncEnumerator<'T>)

interface IAsyncDisposable with
member ts.DisposeAsync() =
match ts.hijack () with
member this.DisposeAsync() =
match this.hijack () with
| Some tg -> (tg :> IAsyncDisposable).DisposeAsync()
| None ->
if verbose then
printfn "DisposeAsync..."

task {
match ts.Machine.Data.disposalStack with
match this.Machine.Data.disposalStack with
| null -> ()
| _ ->
let mutable exn = None

for d in Seq.rev ts.Machine.Data.disposalStack do
for d in Seq.rev this.Machine.Data.disposalStack do
try
do! d ()
with e ->
Expand All @@ -218,47 +250,68 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
|> ValueTask

interface System.Collections.Generic.IAsyncEnumerator<'T> with
member ts.Current =
match ts.hijack () with
member this.Current =
match this.hijack () with
| Some tg -> (tg :> IAsyncEnumerator<'T>).Current
| None ->
match ts.Machine.Data.current with
match this.Machine.Data.current with
| ValueSome x -> x
| ValueNone -> failwith "no current value"

member ts.MoveNextAsync() =
match ts.hijack () with
member this.MoveNextAsync() =
match this.hijack () with
| Some tg -> (tg :> IAsyncEnumerator<'T>).MoveNextAsync()
| None ->
if verbose then
printfn "MoveNextAsync..."

if ts.Machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable
if this.Machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable
if verbose then
printfn "at MoveNextAsync: Resumption point = -1"
ValueTask<bool>()
else
let data = ts.Machine.Data
if verbose then
printfn "at MoveNextAsync: normal resumption scenario"
let data = this.Machine.Data
data.promiseOfValueOrEnd.Reset()
let mutable ts = ts
let mutable ts = this

if verbose then
printfn "at MoveNextAsync: start calling builder.MoveNext()"
data.builder.MoveNext(&ts)
if verbose then
printfn "at MoveNextAsync: done calling builder.MoveNext()"

// If the move did a hijack then get the result from the final one
match ts.hijack () with
match this.hijack () with
| Some tg -> tg.MoveNextAsyncResult()
| None -> ts.MoveNextAsyncResult()
| None -> this.MoveNextAsyncResult()

override ts.MoveNextAsyncResult() =
let data = ts.Machine.Data

override this.MoveNextAsyncResult() =
let data = this.Machine.Data
let version = data.promiseOfValueOrEnd.Version
let status = data.promiseOfValueOrEnd.GetStatus(version)

if status = ValueTaskSourceStatus.Succeeded then
match status with
| ValueTaskSourceStatus.Succeeded ->
if verbose then
printfn "at MoveNextAsyncResult: case succeeded..."
let result = data.promiseOfValueOrEnd.GetResult(version)
ValueTask<bool>(result)
else

| ValueTaskSourceStatus.Faulted
| ValueTaskSourceStatus.Canceled
| ValueTaskSourceStatus.Pending ->
if verbose then
printfn "MoveNextAsync pending/faulted/cancelled..."
printfn "at MoveNextAsyncResult: case pending/faulted/cancelled..."

ValueTask<bool>(ts, version) // uses IValueTaskSource<'T>
ValueTask<bool>(this, version) // uses IValueTaskSource<'T>
| _ ->
if verbose then
printfn "at MoveNextAsyncResult: Unexpected state"
// assume it's a possibly new, not yet supported case, treat as default
ValueTask<bool>(this, version) // uses IValueTaskSource<'T>

override cr.TailcallTarget = cr.hijack ()

Expand All @@ -280,43 +333,57 @@ type TaskSeqBuilder() =
//-- RESUMABLE CODE START
__resumeAt sm.ResumptionPoint

if verbose then
printfn "Resuming at resumption point %i" sm.ResumptionPoint
try
//printfn "at Run.MoveNext start"
//Console.WriteLine("[{0}] resuming by invoking {1}....", sm.MethodBuilder.Task.Id, hashq sm.ResumptionFunc )
if verbose then
printfn "at Run.MoveNext start"

let __stack_code_fin = code.Invoke(&sm)
//printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}"
if verbose then
printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}"
if __stack_code_fin then
//printfn $"at Run.MoveNext, done"
if verbose then
printfn $"at Run.MoveNext, done"
sm.Data.promiseOfValueOrEnd.SetResult(false)
sm.Data.builder.Complete()
elif sm.Data.current.IsSome then
//printfn $"at Run.MoveNext, yield"
if verbose then
printfn $"at Run.MoveNext, yield"
sm.Data.promiseOfValueOrEnd.SetResult(true)
else
// Goto request
match sm.Data.tailcallTarget with
| Some tg ->
//printfn $"at Run.MoveNext, hijack"
if verbose then
printfn $"at Run.MoveNext, hijack"
let mutable tg = tg
MoveNext(&tg)
moveNextRef &tg
| None ->
//printfn $"at Run.MoveNext, await"
if verbose then
printfn $"at Run.MoveNext, await"
let boxed = sm.Data.boxed

sm.Data.awaiter.UnsafeOnCompleted(
Action(fun () ->
let mutable boxed = boxed
MoveNext(&boxed))
moveNextRef &boxed)
)

with exn ->
//Console.WriteLine("[{0}] SetException {1}", sm.MethodBuilder.Task.Id, exn)
if verbose then
printfn "Setting exception of PromiseOfValueOrEnd to: %s" exn.Message
sm.Data.promiseOfValueOrEnd.SetException(exn)
sm.Data.builder.Complete()
//-- RESUMABLE CODE END
))
(SetStateMachineMethodImpl<_>(fun sm state -> ()))
(SetStateMachineMethodImpl<_>(fun sm state ->
if verbose then
printfn "at SetStatemachingMethodImpl, ignored"
()))
(AfterCode<_, _>(fun sm ->
if verbose then
printfn "at AfterCode<_, _>, setting the Machine field to the StateMachine"
let ts = TaskSeq<TaskSeqStateMachine<'T>, 'T>()
ts.Machine <- sm
ts.Machine.Data <- TaskSeqStateMachineData()
Expand Down Expand Up @@ -353,9 +420,13 @@ type TaskSeqBuilder() =
let __stack_vtask = condition ()

if __stack_vtask.IsCompleted then
if verbose then
printfn "Returning completed task (in while)"
__stack_condition_fin <- true
condition_res <- __stack_vtask.Result
else
if verbose then
printfn "Awaiting non-completed task (in while)"
let task = __stack_vtask.AsTask()
let mutable awaiter = task.GetAwaiter()
// This will yield with __stack_fin = false
Expand Down