diff --git a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj index 489fd13c..f510b5b7 100644 --- a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj +++ b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj @@ -30,6 +30,7 @@ + diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs new file mode 100644 index 00000000..963181eb --- /dev/null +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs @@ -0,0 +1,143 @@ +namespace FSharpy.Tests + +open System +open System.IO +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharpy +open System.Threading.Tasks +open System.Diagnostics +open System.Collections.Generic +open Xunit.Abstractions + +/// Just a naive, simple in-memory reader that acts as an IAsyncEnumerable to use with tests +/// IMPORTANT: currently this is not thread-safe!!! +type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) = + let stream = new MemoryStream(data: byte[]) + let buffered = new BufferedStream(stream, blockSize) + let mutable current = ValueNone + let mutable lastPos = 0L + + interface IAsyncEnumerable with + member reader.GetAsyncEnumerator(ct) = + output.WriteLine $"Cloning!! Current: {current}, lastPos: {lastPos}" + reader.MemberwiseClone() :?> IAsyncEnumerator<_> + + interface IAsyncEnumerator with + member _.Current = + match current with + | ValueSome x -> x + | ValueNone -> failwith "Not a current item!" + + member _.MoveNextAsync() = + task { + let mem = Array.zeroCreate blockSize + buffered.Position <- lastPos + // this advances the "current" position automatically. However, this is clearly NOT threadsafe!!! + let! bytesRead = buffered.ReadAsync(mem, 0, mem.Length) // offset refers to offset in target buffer, not source + lastPos <- buffered.Position + + if bytesRead > 0 then + current <- ValueSome mem + return true + else + current <- ValueNone + return false + } + |> Task.toValueTask + + interface IAsyncDisposable with + member _.DisposeAsync() = + try + // this disposes of the mem stream + buffered.DisposeAsync() + finally + // if the previous block raises, we should still try to get rid of the underlying stream + stream.DisposeAsync().AsTask().Wait() + +type ``Real world tests``(output: ITestOutputHelper) = + [] + let ``Reading a 10MB buffered IAsync stream from start to finish`` () = task { + let mutable count = 0 + use reader = AsyncBufferedReader(output, Array.init 2048 byte, 256) + let expected = Array.init 256 byte + + let ts = taskSeq { + for data in reader do + do count <- count + 1 + + if count > 40960 then + failwith "Too far!!!!!!" // ensuring we don't end up in an endless loop + + yield data + } + + // the following is extremely slow, which is why we just use F#'s comparison instead + // Using this takes 67s, compared to 0.25s using normal F# comparison. + do! reader |> TaskSeq.iter (should equal expected) + do! reader |> TaskSeq.iter ((=) expected >> (should be True)) + let! len = reader |> TaskSeq.mapi (fun i _ -> i + 1) |> TaskSeq.last + len |> should equal 8 + //do! task { do count |> should equal 4096 } + } + + [] + let ``Reading a 10MB buffered IAsync stream from start to finish comparison`` () = task { + // NOTE: + // this test is meant to compare the test above for performance reasons + // and for soundness checks + + let expected = Array.init 256 byte + let stream = new MemoryStream(Array.init 10_485_760 byte) + let buffered = new BufferedStream(stream, 256) + let mutable current = true + let mutable count = 0 + + while current do + let mem = Array.zeroCreate 256 + + // this advances the "current" position automatically. However, this is clearly NOT threadsafe!!! + let! bytesRead = buffered.ReadAsync(mem, 0, mem.Length) + + if bytesRead > 0 then + count <- count + 1 + mem = expected |> should be True + // the following is extremely slow + //mem |> should equal (Array.init 256 byte) + + current <- bytesRead > 0 + + count |> should equal 40960 + } + + + //System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed. + // at .$TaskSeq.Realworld.clo@58-4.MoveNext() in D:\Projects\OpenSource\Abel\TaskSeq\src\FSharpy.TaskSeq.Test\TaskSeq.Realworld.fs:line 77 + // at Xunit.Sdk.TestInvoker`1.<>c__DisplayClass48_0.<b__1>d.MoveNext() in /_/src/xunit.execution/Sdk/Frameworks/Runners/TestInvoker.cs:line 264 + //--- End of stack trace from previous location --- + // at Xunit.Sdk.ExecutionTimer.AggregateAsync(Func`1 asyncAction) in /_/src/xunit.execution/Sdk/Frameworks/ExecutionTimer.cs:line 48 + // at Xunit.Sdk.ExceptionAggregator.RunAsync(Func`1 code) in /_/src/xunit.core/Sdk/ExceptionAggregator.cs:line 90\ + [] + let ``Reading a 1MB buffered IAsync stream from start to finish InvalidOperationException`` () = task { + let mutable count = 0 + use reader = AsyncBufferedReader(output, Array.init 1_048_576 byte, 256) + let expected = Array.init 256 byte + + let ts = taskSeq { + for data in reader do + do count <- count + 1 + + if count > 40960 then + failwith "Too far!!!!!!" // ensuring we don't end up in an endless loop + + yield data + } + + // the following is extremely slow, which is why we just use F#'s comparison instead + // Using this takes 67s, compared to 0.25s using normal F# comparison. + do! ts |> TaskSeq.iter (should equal expected) + do! ts |> TaskSeq.iter ((=) expected >> (should be True)) + do! task { do count |> should equal 4096 } + } diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index ec3384ce..8f6f6574 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -15,7 +15,7 @@ open FSharp.Core.CompilerServices.StateMachineHelpers [] module Internal = // cannot be marked with 'internal' scope - let verbose = false + let verbose = true let inline MoveNext (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext() diff --git a/src/FSharpy.TaskSeq/Utils.fs b/src/FSharpy.TaskSeq/Utils.fs index 8aa3207b..4499bd1f 100644 --- a/src/FSharpy.TaskSeq/Utils.fs +++ b/src/FSharpy.TaskSeq/Utils.fs @@ -15,6 +15,18 @@ module Task = /// Convert a Task into a Task let inline toTask (task: Task) = task :> Task + /// Convert a Task<'T> into a ValueTask<'T> + let inline toValueTask (task: Task<'T>) = ValueTask<'T> task + + /// Convert a Task into a non-generic ValueTask + let inline toIgnoreValueTask (task: Task) = ValueTask(task :> Task) + + /// + /// Convert a ValueTask<'T> to a Task<'T>. To use a non-generic ValueTask, + /// consider using: . + /// + let inline ofValueTask (valueTask: ValueTask<'T>) = task { return! valueTask } + /// Convert a Task<'T> into a Task, ignoring the result let inline ignore (task: Task<'T>) = TaskBuilder.task { @@ -23,7 +35,7 @@ module Task = } :> Task - /// Map a Tas<'T> + /// Map a Task<'T> let inline map mapper (task: Task<'T>) : Task<'U> = TaskBuilder.task { let! result = task