diff --git a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj index 9883983b..81bae0c7 100644 --- a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj +++ b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj @@ -29,6 +29,8 @@ + + diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs index 3738469e..660e4d21 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs @@ -37,6 +37,18 @@ let ``TaskSeq-iter should go over all items`` () = task { sum |> should equal 55 // task-dummies started at 1 } + +[] +let ``TaskSeq-iter multiple iterations over same sequence`` () = task { + let tq = createDummyTaskSeq 10 + let mutable sum = 0 + do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) + do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) + do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) + do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) + sum |> should equal 820 // task-dummies started at 1 +} + [] let ``TaskSeq-iteriAsync should go over all items`` () = task { let tq = createDummyTaskSeq 10 diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs index 08f94b4f..5974dbd9 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs @@ -6,12 +6,25 @@ open FsToolkit.ErrorHandling open FSharpy +/// Asserts that a sequence contains the char values 'A'..'J'. let validateSequence sequence = sequence |> Seq.map string |> String.concat "" |> should equal "ABCDEFGHIJ" +/// Validates for "ABCDEFGHIJ" char sequence, or any amount of char-value higher +let validateSequenceWithOffset offset sequence = + let expected = + [ 'A' .. 'J' ] + |> List.map (int >> (+) offset >> char >> string) + |> String.concat "" + + sequence + |> Seq.map string + |> String.concat "" + |> should equal expected + [] let ``TaskSeq-map maps in correct order`` () = task { let! sq = @@ -72,6 +85,27 @@ let ``TaskSeq-mapAsync maps in correct order`` () = task { validateSequence sq } +[] +let ``TaskSeq-mapAsync can map the same sequence multiple times`` () = task { + let mapAndCache = + TaskSeq.mapAsync (fun item -> task { return char (item + 64) }) + >> TaskSeq.toSeqCachedAsync + + let ts = createDummyDirectTaskSeq 10 + + let! result1 = mapAndCache ts + let! result2 = mapAndCache ts + let! result3 = mapAndCache ts + let! result4 = mapAndCache ts + validateSequence result1 + + // each time we do GetAsyncEnumerator(), and go through the whole sequence, + // the whole sequence gets re-evaluated, causing our +1 side-effect to run again. + validateSequenceWithOffset 10 result2 // the mutable is 10 higher + validateSequenceWithOffset 20 result3 // again + validateSequenceWithOffset 30 result4 // again +} + [] let ``TaskSeq-mapiAsync maps in correct order`` () = task { let! sq = diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs index 963181eb..47ce835d 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs @@ -22,8 +22,14 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) = interface IAsyncEnumerable with member reader.GetAsyncEnumerator(ct) = - output.WriteLine $"Cloning!! Current: {current}, lastPos: {lastPos}" - reader.MemberwiseClone() :?> IAsyncEnumerator<_> + { new IAsyncEnumerator<_> with + member this.Current = (reader :> IAsyncEnumerator<_>).Current + member this.MoveNextAsync() = (reader :> IAsyncEnumerator<_>).MoveNextAsync() + interface IAsyncDisposable with + member this.DisposeAsync() = ValueTask() + } + //output.WriteLine $"Cloning!! Current: {current}, lastPos: {lastPos}" + //reader.MemberwiseClone() :?> IAsyncEnumerator<_> interface IAsyncEnumerator with member _.Current = @@ -39,6 +45,8 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) = let! bytesRead = buffered.ReadAsync(mem, 0, mem.Length) // offset refers to offset in target buffer, not source lastPos <- buffered.Position + let x: seq = seq { 1 } |> Seq.cast + if bytesRead > 0 then current <- ValueSome mem return true @@ -48,7 +56,6 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) = } |> Task.toValueTask - interface IAsyncDisposable with member _.DisposeAsync() = try // this disposes of the mem stream @@ -57,8 +64,44 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) = // if the previous block raises, we should still try to get rid of the underlying stream stream.DisposeAsync().AsTask().Wait() + type ``Real world tests``(output: ITestOutputHelper) = [] + let ``Reading a 10MB buffered IAsync through TaskSeq.toArray non-async should succeed`` () = task { + use reader = AsyncBufferedReader(output, Array.init 2048 byte, 256) + // unreadable error with 'use' + //use bla = seq { 1} + let expected = Array.init 256 byte |> Array.replicate 8 + let results = reader |> TaskSeq.toArray + + (results, expected) + ||> Array.iter2 (fun a b -> should equal a b) + } + + [] + let ``Reading a user-code IAsync multiple times with TaskSeq.toArrayAsync should succeed`` () = task { + use reader = AsyncBufferedReader(output, Array.init 2048 byte, 256) + let expected = Array.init 256 byte |> Array.replicate 8 + // read four times + let! results1 = reader |> TaskSeq.toArrayAsync + let! results2 = reader |> TaskSeq.toArrayAsync + let! results3 = reader |> TaskSeq.toArrayAsync + let! results4 = reader |> TaskSeq.toArrayAsync + + (results1, expected) + ||> Array.iter2 (fun a b -> should equal a b) + + (results2, expected) + ||> Array.iter2 (fun a b -> should equal a b) + + (results3, expected) + ||> Array.iter2 (fun a b -> should equal a b) + + (results4, expected) + ||> Array.iter2 (fun a b -> should equal a b) + } + + [] let ``Reading a 10MB buffered IAsync stream from start to finish`` () = task { let mutable count = 0 use reader = AsyncBufferedReader(output, Array.init 2048 byte, 256) @@ -76,6 +119,8 @@ type ``Real world tests``(output: ITestOutputHelper) = // the following is extremely slow, which is why we just use F#'s comparison instead // Using this takes 67s, compared to 0.25s using normal F# comparison. + // reader |> TaskSeq.toArray |> should equal expected // VERY SLOW!! + do! reader |> TaskSeq.iter (should equal expected) do! reader |> TaskSeq.iter ((=) expected >> (should be True)) let! len = reader |> TaskSeq.mapi (fun i _ -> i + 1) |> TaskSeq.last @@ -113,13 +158,17 @@ type ``Real world tests``(output: ITestOutputHelper) = } + // This test used to have the following, which has since been solved through #42 + // please leave this test in, as it tests a case that's quite easily reached if we + // introduce mistakes in the resumable code. + // //System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed. // at .$TaskSeq.Realworld.clo@58-4.MoveNext() in D:\Projects\OpenSource\Abel\TaskSeq\src\FSharpy.TaskSeq.Test\TaskSeq.Realworld.fs:line 77 // at Xunit.Sdk.TestInvoker`1.<>c__DisplayClass48_0.<b__1>d.MoveNext() in /_/src/xunit.execution/Sdk/Frameworks/Runners/TestInvoker.cs:line 264 //--- End of stack trace from previous location --- // at Xunit.Sdk.ExecutionTimer.AggregateAsync(Func`1 asyncAction) in /_/src/xunit.execution/Sdk/Frameworks/ExecutionTimer.cs:line 48 // at Xunit.Sdk.ExceptionAggregator.RunAsync(Func`1 code) in /_/src/xunit.core/Sdk/ExceptionAggregator.cs:line 90\ - [] + [] let ``Reading a 1MB buffered IAsync stream from start to finish InvalidOperationException`` () = task { let mutable count = 0 use reader = AsyncBufferedReader(output, Array.init 1_048_576 byte, 256) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs new file mode 100644 index 00000000..08ce0641 --- /dev/null +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs @@ -0,0 +1,367 @@ +module FSharpy.Tests.``Bug #42 -- asynchronous`` // see PR #42 + +open System +open System.Threading.Tasks +open System.Diagnostics +open System.Collections.Generic + +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharpy + +// Module contains same tests as its previous file +// except that each item is delayed randomly to force +// an async Await behavior. + +let getEmptyVariant variant : IAsyncEnumerable = + match variant with + | "do" -> taskSeq { do! delayRandom () } + | "do!" -> taskSeq { do! task { return! delayRandom () } } // TODO: this doesn't work with Task, only Task... + | "yield! (seq)" -> taskSeq { + do! delayRandom () + yield! Seq.empty + } + | "yield! (taskseq)" -> taskSeq { yield! taskSeq { do! delayRandom () } } + | _ -> failwith "Uncovered variant of test" + + +[] +let ``CE empty taskSeq with MoveNextAsync -- untyped`` () = task { + let tskSeq = taskSeq { do! delayRandom () } + + Assert.IsAssignableFrom>(tskSeq) + |> ignore + + do! moveNextAndCheck false (tskSeq.GetAsyncEnumerator()) +} + +[] +let ``CE empty taskSeq with MoveNextAsync -- typed`` variant = task { + let tskSeq = getEmptyVariant variant + + Assert.IsAssignableFrom>(tskSeq) + |> ignore + + do! moveNextAndCheck false (tskSeq.GetAsyncEnumerator()) +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator multiple times`` variant = task { + let tskSeq = getEmptyVariant variant + use _e = tskSeq.GetAsyncEnumerator() + use _e = tskSeq.GetAsyncEnumerator() + use _e = tskSeq.GetAsyncEnumerator() + () +} + +// Note: this test used to hang (#42), please leave it in, no matter how silly it looks +[] +let ``CE empty taskSeq, GetAsyncEnumerator multiple times and then MoveNextAsync`` variant = task { + let tskSeq = getEmptyVariant variant + use enumerator = tskSeq.GetAsyncEnumerator() + use enumerator = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator +} + +// Note: this test used to cause xUnit to crash (#42), please leave it in, no matter how silly it looks +[] +let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` variant = task { + let tskSeq = getEmptyVariant variant + use enumerator1 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator1 + + // getting the enumerator again + use enumerator2 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator1 // original should still work without raising + do! moveNextAndCheck false enumerator2 // new hone should also work without raising +} + +// Note: this test used to cause xUnit to crash (#42), please leave it in, no matter how silly it looks +[] +let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant = task { + let tskSeq = getEmptyVariant variant + + // let's get the enumerator a few times + for i in 0..100 do + use enumerator = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator // these are all empty +} + +[] +let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { + let tskSeq = getEmptyVariant variant + let enumerator = tskSeq.GetAsyncEnumerator() + + // call Current *before* MoveNextAsync + let current = enumerator.Current + current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer +} + +[] +let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant = task { + let tskSeq = getEmptyVariant variant + let enumerator = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator // false for empty seq + + // call Current *after* MoveNextAsync returns false + enumerator.Current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer +} + +[] +let ``CE taskSeq, call Current before MoveNextAsync`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield "foo" + do! delayRandom () + yield "bar" + } + + let enumerator = tskSeq.GetAsyncEnumerator() + + // call Current before MoveNextAsync + let current = enumerator.Current + current |> should be Null // we return Unchecked.defaultof +} + +[] +let ``CE taskSeq, call Current after MoveNextAsync returns false`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield "foo" + do! delayRandom () + yield "bar" + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + + // call Current *after* MoveNextAsync returns false + enum.Current |> should be Null // we return Unchecked.defaultof +} + +[] +let ``CE taskSeq, MoveNext once too far`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield 1 + do! delayRandom () + yield 2 + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + do! moveNextAndCheck false enum // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq, MoveNext too far`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield Guid.NewGuid() + do! delayRandom () + yield Guid.NewGuid() + } + + // let's call MoveNext multiple times on an empty sequence + let enum = tskSeq.GetAsyncEnumerator() + + // first get past the post + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + + // then call it bunch of times to ensure we don't get an InvalidOperationException, see issue #39 and PR #42 + for i in 0..100 do + do! moveNextAndCheck false enum + + // after whatever amount of time MoveNextAsync, we can still safely call Current + enum.Current |> should equal Guid.Empty // we return Unchecked.defaultof, which is Guid.Empty for guids +} + +// Note: this test used to cause xUnit to crash (#42), please leave it in, no matter how silly it looks +[] +let ``CE taskSeq, call GetAsyncEnumerator twice, both should have equal behavior`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield 1 + do! delayRandom () + yield 2 + } + + let enum1 = tskSeq.GetAsyncEnumerator() + let enum2 = tskSeq.GetAsyncEnumerator() + + // enum1 + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + do! moveNextAndCheckCurrent true 1 enum2 // first item + do! moveNextAndCheckCurrent true 2 enum2 // second item + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +// Note: this test used to cause xUnit to crash (#42), please leave it in, no matter how silly it looks +[] +let ``CE taskSeq, cal GetAsyncEnumerator twice -- in lockstep`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield 1 + do! delayRandom () + yield 2 + } + + let enum1 = tskSeq.GetAsyncEnumerator() + let enum2 = tskSeq.GetAsyncEnumerator() + + // enum1 & enum2 in lock step + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 1 enum2 // first item + + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent true 2 enum2 // second item + + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +// Note: this test used to cause xUnit to crash (#42), please leave it in, no matter how silly it looks +[] +let ``CE taskSeq, call GetAsyncEnumerator twice -- after full iteration`` () = task { + let tskSeq = taskSeq { + yield 1 + do! delayRandom () + yield 2 + } + + // enum1 + let enum1 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + let enum2 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheckCurrent true 1 enum2 // first item + do! moveNextAndCheckCurrent true 2 enum2 // second item + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +// Note: this test used to hang (#42), please leave it in, no matter how silly it looks +[] +let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = task { + let tskSeq = taskSeq { + yield 1 + do! delayRandom () + yield 2 + do! delayRandom () + yield 3 + } + + // enum1 + let enum1 = tskSeq.GetAsyncEnumerator() + + // move #1 + do! moveNextAndCheckCurrent true 1 enum1 // first item + + // enum2 + let enum2 = tskSeq.GetAsyncEnumerator() + enum1.Current |> should equal 1 // remains the same + enum2.Current |> should equal 0 // should be at default location + + // move #2 + do! moveNextAndCheckCurrent true 1 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 1 + + // move #2 + do! moveNextAndCheckCurrent true 2 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 2 enum1 + enum1.Current |> should equal 2 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 3 enum1 + enum1.Current |> should equal 3 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent false 0 enum1 + enum1.Current |> should equal 0 + enum2.Current |> should equal 2 + + // move #2 + do! moveNextAndCheckCurrent true 3 enum2 + enum1.Current |> should equal 0 + enum2.Current |> should equal 3 + + // move #2 + do! moveNextAndCheckCurrent false 0 enum2 + enum1.Current |> should equal 0 +} + +// Note: this test used to hang (#42), please leave it in, no matter how silly it looks +[] +let ``TaskSeq-toArray can be applied multiple times to the same sequence`` () = + let tq = taskSeq { + yield! [ 1..3 ] + do! delayRandom () + yield! [ 4..7 ] + do! delayRandom () + } + + let (results1: _[]) = tq |> TaskSeq.toArray + let (results2: _[]) = tq |> TaskSeq.toArray + let (results3: _[]) = tq |> TaskSeq.toArray + let (results4: _[]) = tq |> TaskSeq.toArray + results1 |> should equal [| 1..7 |] + results2 |> should equal [| 1..7 |] // no mutable state in taskSeq, multi iter remains stable + results3 |> should equal [| 1..7 |] // id + results4 |> should equal [| 1..7 |] // id + +// Note: this test used to hang (#42), please leave it in, no matter how silly it looks +[] +let ``TaskSeq-toArray can be applied multiple times to the same sequence -- mutable state`` () = + let mutable before, middle, after = (0, 0, 0) + + let tq = taskSeq { + before <- before + 1 + yield before + yield! [ 100..120 ] + do! delayRandom () + middle <- middle + 1 + yield middle + yield! [ 100..120 ] + do! delayRandom () + after <- after + 1 + yield after + } + + let (results1: _ list) = tq |> TaskSeq.toList + let (results2: _ list) = tq |> TaskSeq.toList + let (results3: _ list) = tq |> TaskSeq.toList + let (results4: _ list) = tq |> TaskSeq.toList + + let expectMutatedTo a = (a :: [ 100..120 ] @ [ a ] @ [ 100..120 ] @ [ a ]) + results1 |> should equal (expectMutatedTo 1) + results2 |> should equal (expectMutatedTo 2) + results3 |> should equal (expectMutatedTo 3) + results4 |> should equal (expectMutatedTo 4) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs new file mode 100644 index 00000000..c7389115 --- /dev/null +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -0,0 +1,553 @@ +module FSharpy.Tests.``Bug #42 -- synchronous`` // see PR #42 + +open System +open System.Threading.Tasks +open System.Diagnostics +open System.Collections.Generic + +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharpy + +let getEmptyVariant variant : IAsyncEnumerable = + match variant with + | "do" -> taskSeq { do ignore () } + | "do!" -> taskSeq { do! task { return () } } // TODO: this doesn't work with Task, only Task... + | "yield! (seq)" -> taskSeq { yield! Seq.empty } + | "yield! (taskseq)" -> taskSeq { yield! taskSeq { do ignore () } } + | _ -> failwith "Uncovered variant of test" + + +[] +let ``CE empty taskSeq with MoveNextAsync -- untyped`` () = task { + let tskSeq = taskSeq { do ignore () } + + Assert.IsAssignableFrom>(tskSeq) + |> ignore + + do! moveNextAndCheck false (tskSeq.GetAsyncEnumerator()) +} + +[] +let ``CE empty taskSeq with MoveNextAsync -- typed`` variant = task { + let tskSeq = getEmptyVariant variant + + Assert.IsAssignableFrom>(tskSeq) + |> ignore + + do! moveNextAndCheck false (tskSeq.GetAsyncEnumerator()) +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator multiple times`` variant = task { + let tskSeq = getEmptyVariant variant + use _e = tskSeq.GetAsyncEnumerator() + use _e = tskSeq.GetAsyncEnumerator() + use _e = tskSeq.GetAsyncEnumerator() + () +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator multiple times and then MoveNextAsync`` variant = task { + let tskSeq = getEmptyVariant variant + use enumerator = tskSeq.GetAsyncEnumerator() + use enumerator = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` variant = task { + let tskSeq = getEmptyVariant variant + use enumerator1 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator1 + + // getting the enumerator again + use enumerator2 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator1 // original should still work without raising + do! moveNextAndCheck false enumerator2 // new hone should also work without raising +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant = task { + let tskSeq = getEmptyVariant variant + + // let's get the enumerator a few times + for i in 0..100 do + use enumerator = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator // these are all empty +} + +[] +let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { + let tskSeq = getEmptyVariant variant + let enumerator = tskSeq.GetAsyncEnumerator() + + // call Current *before* MoveNextAsync + let current = enumerator.Current + current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer +} + +[] +let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant = task { + let tskSeq = getEmptyVariant variant + let enumerator = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator // false for empty seq + + // call Current *after* MoveNextAsync returns false + enumerator.Current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer +} + +[] +let ``CE taskSeq, proper two-item task sequence`` () = task { + let tskSeq = taskSeq { + yield "foo" + yield "bar" + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + enum.Current |> should equal "foo" + do! moveNextAndCheck true enum // second item + enum.Current |> should equal "bar" + do! moveNextAndCheck false enum // third item: false +} + +[] +let ``CE taskSeq, proper two-item task sequence -- async variant`` () = task { + let tskSeq = taskSeq { + yield "foo" + do! delayRandom () + yield "bar" + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + enum.Current |> should equal "foo" + do! moveNextAndCheck true enum // second item + enum.Current |> should equal "bar" + do! moveNextAndCheck false enum // third item: false +} + +[] +let ``CE taskSeq, call Current before MoveNextAsync`` () = task { + let tskSeq = taskSeq { + yield "foo" + yield "bar" + } + + let enumerator = tskSeq.GetAsyncEnumerator() + + // call Current before MoveNextAsync + let current = enumerator.Current + current |> should be Null // we return Unchecked.defaultof +} + +[] +let ``CE taskSeq, call Current after MoveNextAsync returns false`` () = task { + let tskSeq = taskSeq { + yield "foo" + yield "bar" + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + + // call Current *after* MoveNextAsync returns false + enum.Current |> should be Null // we return Unchecked.defaultof +} + +[] +let ``CE taskSeq, MoveNext once too far`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + do! moveNextAndCheck false enum // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq, MoveNext too far`` () = task { + let tskSeq = taskSeq { + yield Guid.NewGuid() + yield Guid.NewGuid() + } + + // let's call MoveNext multiple times on an empty sequence + let enum = tskSeq.GetAsyncEnumerator() + + // first get past the post + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + + // then call it bunch of times to ensure we don't get an InvalidOperationException, see issue #39 and PR #42 + for i in 0..100 do + do! moveNextAndCheck false enum + + // after whatever amount of time MoveNextAsync, we can still safely call Current + enum.Current |> should equal Guid.Empty // we return Unchecked.defaultof, which is Guid.Empty for guids +} + +[] +let ``CE taskSeq, call GetAsyncEnumerator twice, both should have equal behavior`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + let enum1 = tskSeq.GetAsyncEnumerator() + let enum2 = tskSeq.GetAsyncEnumerator() + + // enum1 + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + do! moveNextAndCheckCurrent true 1 enum2 // first item + do! moveNextAndCheckCurrent true 2 enum2 // second item + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE seq -- comparison --, call GetEnumerator twice`` () = task { + // this test is for behavioral comparisoni between the same Async test above with TaskSeq + let sq = seq { + yield 1 + yield 2 + } + + let enum1 = sq.GetEnumerator() + let enum2 = sq.GetEnumerator() + + // enum1 + do seqMoveNextAndCheckCurrent true 1 enum1 // first item + do seqMoveNextAndCheckCurrent true 2 enum1 // second item + do seqMoveNextAndCheckCurrent false 0 enum1 // third item: false + do seqMoveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + do seqMoveNextAndCheckCurrent true 1 enum2 // first item + do seqMoveNextAndCheckCurrent true 2 enum2 // second item + do seqMoveNextAndCheckCurrent false 0 enum2 // third item: false + do seqMoveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + + +[] +let ``CE taskSeq, cal GetAsyncEnumerator twice -- in lockstep`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + let enum1 = tskSeq.GetAsyncEnumerator() + let enum2 = tskSeq.GetAsyncEnumerator() + + // enum1 & enum2 in lock step + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 1 enum2 // first item + + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent true 2 enum2 // second item + + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq, call GetAsyncEnumerator twice -- after full iteration`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + // enum1 + let enum1 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + let enum2 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheckCurrent true 1 enum2 // first item + do! moveNextAndCheckCurrent true 2 enum2 // second item + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + yield 3 + } + + // enum1 + let enum1 = tskSeq.GetAsyncEnumerator() + + // move #1 + do! moveNextAndCheckCurrent true 1 enum1 // first item + + // enum2 + let enum2 = tskSeq.GetAsyncEnumerator() + enum1.Current |> should equal 1 // remains the same + enum2.Current |> should equal 0 // should be at default location + + // move #2 + do! moveNextAndCheckCurrent true 1 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 1 + + // move #2 + do! moveNextAndCheckCurrent true 2 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 2 enum1 + enum1.Current |> should equal 2 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 3 enum1 + enum1.Current |> should equal 3 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent false 0 enum1 + enum1.Current |> should equal 0 + enum2.Current |> should equal 2 + + // move #2 + do! moveNextAndCheckCurrent true 3 enum2 + enum1.Current |> should equal 0 + enum2.Current |> should equal 3 + + // move #2 + do! moveNextAndCheckCurrent false 0 enum2 + enum1.Current |> should equal 0 +} + +[] +let ``CE taskSeq, call map multiple times over its own result`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + // let's map once, and then again on the new sequence + let ts1 = tskSeq |> TaskSeq.map (fun i -> i + 1) + let result1 = TaskSeq.toArray ts1 + let ts2 = ts1 |> TaskSeq.map (fun i -> i + 1) + let result2 = TaskSeq.toArray ts2 // NRE here + + tskSeq |> TaskSeq.toArray |> should equal [| 1; 2 |] + result1 |> should equal [| 2; 3 |] + result2 |> should equal [| 3; 4 |] +} + +[] +let ``CE taskSeq, call map multiple times over its own result - alternative #1`` () = task { + let tskSeq1 = taskSeq { + yield 1 + yield 2 + } + + // [ 2; 3] + let tskSeq2 = taskSeq { + for i in tskSeq1 do + yield i + 1 + } + + // [ 3; 4] + let tskSeq3 = taskSeq { + for i in tskSeq2 do + yield i + 1 + } + + let result3 = TaskSeq.toArray tskSeq3 + + result3 |> should equal [| 3; 4 |] +} + +[] +let ``CE taskSeq, call map multiple times over its own result - alternative #2`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let tskSeq1 = taskSeq { + yield 1 + yield 2 + } + + let result1 = TaskSeq.toArray tskSeq1 + result1 |> should equal [| 1; 2 |] + + // [ 2; 3] + let tskSeq2 = taskSeq { + for i in tskSeq1 do + yield i + 1 + } + + let result2 = TaskSeq.toArray tskSeq2 + result2 |> should equal [| 2; 3 |] + + // [ 3; 4] + let tskSeq3 = taskSeq { + for i in tskSeq2 do // NRE here + yield i + 1 + } + + let! result3 = TaskSeq.toArrayAsync tskSeq3 // from here + result3 |> should equal [| 3; 4 |] +} + +[] +let ``CE taskSeq, call map multiple times over its own result - alternative #3`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let tskSeq1 = taskSeq { + yield 1 + yield 2 + } + + let result1 = TaskSeq.toArray tskSeq1 + result1 |> should equal [| 1; 2 |] + + // [ 2; 3] + let tskSeq2 = taskSeq { + yield! taskSeq { + for i in tskSeq1 do + yield i + 1 + } + } + + let result2 = TaskSeq.toArray tskSeq2 + result2 |> should equal [| 2; 3 |] + + // [ 3; 4] + let tskSeq3 = taskSeq { + yield! taskSeq { // NRE here + for i in tskSeq2 do + yield i + 1 + } + } + + let result3 = TaskSeq.toArray tskSeq3 // from here + result3 |> should equal [| 3; 4 |] +} + +[] +let ``CE taskSeq, call map multiple times over its own result - alternative #4`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let sequence = seq { + yield 1 + yield 2 + } + + // [ 2; 3] + let tskSeq2 = taskSeq { + for i in sequence do + yield i + 1 + } + + let result2 = TaskSeq.toArray tskSeq2 + result2 |> should equal [| 2; 3 |] + + // [ 3; 4] + let tskSeq3 = taskSeq { + for i in tskSeq2 do + yield i + 1 // NRE here + } + + let result3 = TaskSeq.toArray tskSeq3 // NRE from here + result3 |> should equal [| 3; 4 |] +} + +[] +let ``CE taskSeq, call map multiple times over its own result - alternative #5`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let sequence = seq { + yield 1 + yield 2 + } + + // [ 2; 3] + let tskSeq2 = taskSeq { + yield! taskSeq { + for i in sequence do + yield i + 1 + } + } + + let result2 = TaskSeq.toArray tskSeq2 + result2 |> should equal [| 2; 3 |] + + // [ 3; 4] + let tskSeq3 = taskSeq { + yield! taskSeq { // NRE here + for i in tskSeq2 do + yield i + 1 + } + } + + let result3 = TaskSeq.toArray tskSeq3 // from here + result3 |> should equal [| 3; 4 |] +} + + +[] +let ``CE taskSeq, call mapAsync multiple times over its own result`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + // let's map once, and then again on the new sequence + let ts1 = tskSeq |> TaskSeq.mapAsync (fun i -> task { return i + 1 }) + let result1 = TaskSeq.toArray ts1 + let ts2 = ts1 |> TaskSeq.mapAsync (fun i -> task { return i + 1 }) + let result2 = TaskSeq.toArray ts2 // NRE here + result1 |> should equal [| 2; 3 |] + result2 |> should equal [| 3; 4 |] +} + +[] +let ``TaskSeq-toArray can be applied multiple times to the same sequence`` () = + let tq = taskSeq { yield! [ 1..10 ] } + let (results1: _[]) = tq |> TaskSeq.toArray + let (results2: _[]) = tq |> TaskSeq.toArray + let (results3: _[]) = tq |> TaskSeq.toArray + let (results4: _[]) = tq |> TaskSeq.toArray + results1 |> should equal [| 1..10 |] + results2 |> should equal [| 1..10 |] + results3 |> should equal [| 1..10 |] + results4 |> should equal [| 1..10 |] diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs index a8d4a0dc..af74e4bf 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs @@ -1,13 +1,10 @@ -module FSharpy.Tests.``taskSeq Computation Expression`` +module FSharpy.Tests.``taskSeq Computation Expression`` open Xunit open FsUnit.Xunit open FsToolkit.ErrorHandling open FSharpy -open System.Threading.Tasks -open System.Diagnostics - [] let ``CE taskSeq with several yield!`` () = task { diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs index 5d57feca..be02631a 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs @@ -27,6 +27,19 @@ let ``TaskSeq-toArrayAsync should succeed`` () = task { results |> should equal [| 1..10 |] } +[] +let ``TaskSeq-toArrayAsync can be applied multiple times to the same sequence`` () = task { + let tq = createDummyTaskSeq 10 + let! (results1: _[]) = tq |> TaskSeq.toArrayAsync + let! (results2: _[]) = tq |> TaskSeq.toArrayAsync + let! (results3: _[]) = tq |> TaskSeq.toArrayAsync + let! (results4: _[]) = tq |> TaskSeq.toArrayAsync + results1 |> should equal [| 1..10 |] + results2 |> should equal [| 11..20 |] + results3 |> should equal [| 21..30 |] + results4 |> should equal [| 31..40 |] +} + [] let ``TaskSeq-toListAsync should succeed`` () = task { let tq = createDummyTaskSeq 10 diff --git a/src/FSharpy.TaskSeq.Test/TestUtils.fs b/src/FSharpy.TaskSeq.Test/TestUtils.fs index e3e3b85d..370cb209 100644 --- a/src/FSharpy.TaskSeq.Test/TestUtils.fs +++ b/src/FSharpy.TaskSeq.Test/TestUtils.fs @@ -8,6 +8,8 @@ open System.Diagnostics open FsToolkit.ErrorHandling open FSharpy +open System.Collections.Generic +open FsUnit.Xunit /// Milliseconds [] @@ -117,6 +119,41 @@ type DummyTaskFactory(µsecMin: int64<µs>, µsecMax: int64<µs>) = [] module TestUtils = + /// Delays (no spin-wait!) between 20 and 200ms, assuming a 15.6ms resolution clock + let delayRandom () = task { do! Task.Delay(Random().Next(20, 200)) } + + /// Call MoveNextAsync() and check if return value is the expected value + let moveNextAndCheck expected (enumerator: IAsyncEnumerator<_>) = task { + let! (hasNext: bool) = enumerator.MoveNextAsync() + + if expected then + hasNext |> should be True + else + hasNext |> should be False + } + + /// Call MoveNextAsync() and check if Current has the expected value. Uses untyped 'should equal' + let moveNextAndCheckCurrent successMoveNext expectedValue (enumerator: IAsyncEnumerator<_>) = task { + let! (hasNext: bool) = enumerator.MoveNextAsync() + + if successMoveNext then + hasNext |> should be True + else + hasNext |> should be False + + enumerator.Current |> should equal expectedValue + } + + /// Call MoveNext() and check if Current has the expected value. Uses untyped 'should equal' + let seqMoveNextAndCheckCurrent successMoveNext expectedValue (enumerator: IEnumerator<_>) = + let (hasNext: bool) = enumerator.MoveNext() + + if successMoveNext then + hasNext |> should be True + else + hasNext |> should be False + + enumerator.Current |> should equal expectedValue /// Joins two tasks using merely BCL methods. This approach is what you can use to /// properly, sequentially execute a chain of tasks in a non-blocking, non-overlapping way. diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 19191d01..2ab3c239 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -54,6 +54,7 @@ type IPriority2 = [] type TaskSeqStateMachineData<'T>() = + [] val mutable cancellationToken: CancellationToken @@ -72,6 +73,9 @@ type TaskSeqStateMachineData<'T>() = [] val mutable taken: bool + [] + val mutable completed: bool + [] val mutable current: ValueOption<'T> @@ -126,10 +130,13 @@ and [] TaskSeq<'Machine, 'T let initialThreadId = Environment.CurrentManagedThreadId [] - val mutable Machine: 'Machine + val mutable _initialMachine: 'Machine + + [] + val mutable _machine: 'Machine member internal this.hijack() = - let res = this.Machine.Data.tailcallTarget + let res = this._machine.Data.tailcallTarget match res with | Some tg -> @@ -139,7 +146,7 @@ and [] TaskSeq<'Machine, 'T | None -> res | (Some tg2 as res2) -> // Cut out chains of tailcalls - this.Machine.Data.tailcallTarget <- Some tg2 + this._machine.Data.tailcallTarget <- Some tg2 res2 | None -> res @@ -149,25 +156,25 @@ and [] TaskSeq<'Machine, 'T match this.hijack () with | Some tg -> (tg :> IValueTaskSource).GetResult(token) | None -> - this.Machine.Data.promiseOfValueOrEnd.GetResult(token) + this._machine.Data.promiseOfValueOrEnd.GetResult(token) |> ignore member this.GetStatus(token: int16) = match this.hijack () with | Some tg -> (tg :> IValueTaskSource).GetStatus(token) - | None -> this.Machine.Data.promiseOfValueOrEnd.GetStatus(token) + | None -> this._machine.Data.promiseOfValueOrEnd.GetStatus(token) member this.OnCompleted(continuation, state, token, flags) = match this.hijack () with | Some tg -> (tg :> IValueTaskSource).OnCompleted(continuation, state, token, flags) - | None -> this.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) + | None -> this._machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) // Needed for MoveNextAsync to return a ValueTask interface IValueTaskSource with member this.GetStatus(token: int16) = match this.hijack () with | Some tg -> (tg :> IValueTaskSource).GetStatus(token) - | None -> this.Machine.Data.promiseOfValueOrEnd.GetStatus(token) + | None -> this._machine.Data.promiseOfValueOrEnd.GetStatus(token) member this.GetResult(token: int16) = match this.hijack () with @@ -176,14 +183,16 @@ and [] TaskSeq<'Machine, 'T printfn "Getting result for token on 'Some' branch, status: %A" ((tg :> IValueTaskSource).GetStatus(token)) + (tg :> IValueTaskSource).GetResult(token) | None -> try if verbose then printfn "Getting result for token on 'None' branch, status: %A" - (this.Machine.Data.promiseOfValueOrEnd.GetStatus(token)) - this.Machine.Data.promiseOfValueOrEnd.GetResult(token) + (this._machine.Data.promiseOfValueOrEnd.GetStatus(token)) + + this._machine.Data.promiseOfValueOrEnd.GetResult(token) with e -> // FYI: an exception here is usually caused by the CE statement (user code) throwing an exception // We're just logging here because the following error would also be caught right here: @@ -192,10 +201,11 @@ and [] TaskSeq<'Machine, 'T printfn "Error '%s' for token: %i" e.Message token reraise () + member this.OnCompleted(continuation, state, token, flags) = match this.hijack () with | Some tg -> (tg :> IValueTaskSource).OnCompleted(continuation, state, token, flags) - | None -> this.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) + | None -> this._machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) interface IAsyncStateMachine with /// The MoveNext method is called by builder.MoveNext() in the resumable code @@ -204,28 +214,29 @@ and [] TaskSeq<'Machine, 'T | Some tg -> // jump to the hijacked method (tg :> IAsyncStateMachine).MoveNext() - | None -> moveNextRef &this.Machine + | None -> moveNextRef &this._machine /// SetStatemachine is (currently) never called member _.SetStateMachine(_state) = if verbose then printfn "Setting state machine -- ignored" + () // not needed for reference type interface IAsyncEnumerable<'T> with member this.GetAsyncEnumerator(ct) = - let data = this.Machine.Data + let data = this._machine.Data if (not data.taken && initialThreadId = Environment.CurrentManagedThreadId) then + let data = this._machine.Data data.taken <- true data.cancellationToken <- ct data.builder <- AsyncIteratorMethodBuilder.Create() - if verbose then - printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint - (this :> IAsyncEnumerator<_>) + + this :> IAsyncEnumerator<_> else if verbose then printfn "GetAsyncEnumerator, cloning..." @@ -235,9 +246,22 @@ and [] TaskSeq<'Machine, 'T // iteration twice, we are trying to *await* twice, which is not allowed // see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4 let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> - data.taken <- true - clone.Machine.Data.cancellationToken <- ct - (clone :> System.Collections.Generic.IAsyncEnumerator<'T>) + clone._machine <- clone._initialMachine + clone._machine.Data <- TaskSeqStateMachineData() + clone._machine.Data.cancellationToken <- ct + clone._machine.Data.taken <- true + clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create() + + //// calling reset causes NRE in IValueTaskSource.GetResult above + //clone.Machine.Data.promiseOfValueOrEnd.Reset() + clone._machine.Data.boxed <- clone + ////clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack + //////clone.Machine.Data.tailcallTarget <- Some clone // this will lead to an SO exception + //clone.Machine.Data.awaiter <- null + //clone.Machine.Data.current <- ValueNone + //clone.Machine.Data.completed <- false + + clone :> System.Collections.Generic.IAsyncEnumerator<'T> interface IAsyncDisposable with member this.DisposeAsync() = @@ -248,12 +272,12 @@ and [] TaskSeq<'Machine, 'T printfn "DisposeAsync..." task { - match this.Machine.Data.disposalStack with + match this._machine.Data.disposalStack with | null -> () | _ -> let mutable exn = None - for d in Seq.rev this.Machine.Data.disposalStack do + for d in Seq.rev this._machine.Data.disposalStack do try do! d () with e -> @@ -271,9 +295,14 @@ and [] TaskSeq<'Machine, 'T match this.hijack () with | Some tg -> (tg :> IAsyncEnumerator<'T>).Current | None -> - match this.Machine.Data.current with + match this._machine.Data.current with | ValueSome x -> x - | ValueNone -> failwith "no current value" + | ValueNone -> + // Returning a default value is similar to how F#'s seq<'T> behaves + // According to the docs, behavior is Unspecified in case of a call + // to Current, which means that this is certainly fine, and arguably + // better than raising an exception. + Unchecked.defaultof<'T> member this.MoveNextAsync() = match this.hijack () with @@ -282,20 +311,33 @@ and [] TaskSeq<'Machine, 'T if verbose then printfn "MoveNextAsync..." - if this.Machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable + if this._machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable if verbose then printfn "at MoveNextAsync: Resumption point = -1" + ValueTask() + + elif this._machine.Data.completed then + if verbose then + printfn "at MoveNextAsync: completed = true" + + // return False when beyond the last item + this._machine.Data.promiseOfValueOrEnd.Reset() + ValueTask() + else if verbose then printfn "at MoveNextAsync: normal resumption scenario" - let data = this.Machine.Data + + let data = this._machine.Data data.promiseOfValueOrEnd.Reset() let mutable ts = this if verbose then printfn "at MoveNextAsync: start calling builder.MoveNext()" + data.builder.MoveNext(&ts) + if verbose then printfn "at MoveNextAsync: done calling builder.MoveNext()" @@ -306,7 +348,7 @@ and [] TaskSeq<'Machine, 'T override this.MoveNextAsyncResult() = - let data = this.Machine.Data + let data = this._machine.Data let version = data.promiseOfValueOrEnd.Version let status = data.promiseOfValueOrEnd.GetStatus(version) @@ -314,7 +356,14 @@ and [] TaskSeq<'Machine, 'T | ValueTaskSourceStatus.Succeeded -> if verbose then printfn "at MoveNextAsyncResult: case succeeded..." + let result = data.promiseOfValueOrEnd.GetResult(version) + + if not result then + // if beyond the end of the stream, ensure we unset + // the Current value + data.current <- ValueNone + ValueTask(result) | ValueTaskSourceStatus.Faulted @@ -352,33 +401,44 @@ type TaskSeqBuilder() = if verbose then printfn "Resuming at resumption point %i" sm.ResumptionPoint + try if verbose then printfn "at Run.MoveNext start" let __stack_code_fin = code.Invoke(&sm) + if verbose then printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}" + if __stack_code_fin then if verbose then printfn $"at Run.MoveNext, done" + sm.Data.promiseOfValueOrEnd.SetResult(false) sm.Data.builder.Complete() + sm.Data.completed <- true + elif sm.Data.current.IsSome then if verbose then printfn $"at Run.MoveNext, yield" + sm.Data.promiseOfValueOrEnd.SetResult(true) + else // Goto request match sm.Data.tailcallTarget with | Some tg -> if verbose then printfn $"at Run.MoveNext, hijack" + let mutable tg = tg moveNextRef &tg + | None -> if verbose then printfn $"at Run.MoveNext, await" + let boxed = sm.Data.boxed sm.Data.awaiter.UnsafeOnCompleted( @@ -390,6 +450,7 @@ type TaskSeqBuilder() = with exn -> if verbose then printfn "Setting exception of PromiseOfValueOrEnd to: %s" exn.Message + sm.Data.promiseOfValueOrEnd.SetException(exn) sm.Data.builder.Complete() //-- RESUMABLE CODE END @@ -397,14 +458,17 @@ type TaskSeqBuilder() = (SetStateMachineMethodImpl<_>(fun sm state -> if verbose then printfn "at SetStatemachingMethodImpl, ignored" + ())) (AfterCode<_, _>(fun sm -> if verbose then - printfn "at AfterCode<_, _>, setting the Machine field to the StateMachine" + printfn "at AfterCode<_, _>, after F# inits the sm, and we can attach extra info" + let ts = TaskSeq, 'T>() - ts.Machine <- sm - ts.Machine.Data <- TaskSeqStateMachineData() - ts.Machine.Data.boxed <- ts + ts._initialMachine <- sm + ts._machine <- sm + ts._machine.Data <- TaskSeqStateMachineData() + ts._machine.Data.boxed <- ts ts :> IAsyncEnumerable<'T>)) else failwith "no dynamic implementation as yet" @@ -418,9 +482,16 @@ type TaskSeqBuilder() = //sm.Start() - member inline _.Zero() : TaskSeqCode<'T> = ResumableCode.Zero() + member inline _.Zero() : TaskSeqCode<'T> = + if verbose then + printfn "at Zero()" + + ResumableCode.Zero() member inline _.Combine(task1: TaskSeqCode<'T>, task2: TaskSeqCode<'T>) : TaskSeqCode<'T> = + if verbose then + printfn "at Combine(.., ..)" + ResumableCode.Combine(task1, task2) member inline _.WhileAsync @@ -438,12 +509,14 @@ type TaskSeqBuilder() = if __stack_vtask.IsCompleted then if verbose then - printfn "Returning completed task (in while)" + printfn "at WhileAsync: returning completed task" + __stack_condition_fin <- true condition_res <- __stack_vtask.Result else if verbose then - printfn "Awaiting non-completed task (in while)" + printfn "at WhileAsync: awaiting non-completed task" + let task = __stack_vtask.AsTask() let mutable awaiter = task.GetAwaiter() // This will yield with __stack_fin = false @@ -451,6 +524,11 @@ type TaskSeqBuilder() = let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) __stack_condition_fin <- __stack_yield_fin + if verbose then + printfn + "at WhileAsync: after Yield().Invoke(sm), __stack_condition_fin=%b" + __stack_condition_fin + if __stack_condition_fin then condition_res <- task.Result else @@ -465,6 +543,9 @@ type TaskSeqBuilder() = ) member inline b.While([] condition: unit -> bool, body: TaskSeqCode<'T>) : TaskSeqCode<'T> = + if verbose then + printfn "at While(...), calling WhileAsync()" + b.WhileAsync((fun () -> ValueTask(condition ())), body) member inline _.TryWith(body: TaskSeqCode<'T>, catch: exn -> TaskSeqCode<'T>) : TaskSeqCode<'T> = @@ -558,7 +639,14 @@ type TaskSeqBuilder() = TaskSeqCode<'T>(fun sm -> // This will yield with __stack_fin = false // This will resume with __stack_fin = true + if verbose then + printfn "at Yield, before Yield().Invoke(sm)" + let __stack_fin = ResumableCode.Yield().Invoke(&sm) + + if verbose then + printfn "at Yield, __stack_fin = %b" __stack_fin + sm.Data.current <- ValueSome v sm.Data.awaiter <- null __stack_fin) @@ -573,18 +661,31 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true + if verbose then + printfn "at Bind" + if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false // This will resume with __stack_fin2 = true let __stack_fin2 = ResumableCode.Yield().Invoke(&sm) __stack_fin <- __stack_fin2 + if verbose then + printfn "at Bind: with __stack_fin = %b" __stack_fin + if __stack_fin then + if verbose then + printfn "at Bind: with getting result from awaiter" + let result = awaiter.GetResult() + + if verbose then + printfn "at Bind: calling continuation" + (continuation result).Invoke(&sm) else if verbose then - printfn "calling AwaitUnsafeOnCompleted" + printfn "at Bind: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone @@ -595,6 +696,9 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true + if verbose then + printfn "at BindV" + if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false // This will resume with __stack_fin2 = true @@ -606,7 +710,7 @@ type TaskSeqBuilder() = (continuation result).Invoke(&sm) else if verbose then - printfn "calling AwaitUnsafeOnCompleted" + printfn "at BindV: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone @@ -626,4 +730,5 @@ type TaskSeqBuilder() = sm.Data.current <- ValueNone // For tailcalls we return 'false' and re-run from the entry (trampoline) false + | _ -> b.YieldFrom(other).Invoke(&sm))