Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-notes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Release notes:

1.0.0
- perf: use while! in groupBy, countBy, partition, except, exceptOfSeq to eliminate redundant mutable 'go' variables and initial MoveNextAsync calls
- adds taskSeqDynamic computation expression and TaskSeqDynamic/TaskSeqDynamicInfo types for dynamic (FSI-compatible) resumable code, fixing issue where taskSeq would raise NotImplementedException in F# Interactive, #246
- perf: TaskSeq.chunkBy and chunkByAsync reuse the ResizeArray buffer between chunks, reducing allocations on sequences with many chunk boundaries
- fixes: TaskSeq.insertAt, insertManyAt, removeAt, removeManyAt, updateAt now raise ArgumentNullException (not NullReferenceException) when given a null source; insertManyAt also validates the values argument
Expand Down
73 changes: 23 additions & 50 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1462,35 +1462,29 @@ module internal TaskSeqInternal =

taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None
let mutable go = true
let! step = e.MoveNextAsync()
go <- step
let! hasFirst = e.MoveNextAsync()

if step then
if hasFirst then
// only create hashset by the time we actually start iterating;
// taskSeq enumerates sequentially, so a plain HashSet suffices β€” no locking needed.
let hashSet = HashSet<_>(HashIdentity.Structural)

use excl = itemsToExclude.GetAsyncEnumerator CancellationToken.None
let! exclStep = excl.MoveNextAsync()
let mutable exclGo = exclStep

while exclGo do
while! excl.MoveNextAsync() do
hashSet.Add excl.Current |> ignore
let! exclStep = excl.MoveNextAsync()
exclGo <- exclStep

while go do
// if true, it was added, and therefore unique, so we return it
// if false, it existed, and therefore a duplicate, and we skip
if hashSet.Add e.Current then
yield e.Current

while! e.MoveNextAsync() do
let current = e.Current

// if true, it was added, and therefore unique, so we return it
// if false, it existed, and therefore a duplicate, and we skip
if hashSet.Add current then
yield current

let! step = e.MoveNextAsync()
go <- step

}

let exceptOfSeq itemsToExclude (source: TaskSeq<_>) =
Expand All @@ -1499,26 +1493,24 @@ module internal TaskSeqInternal =

taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None
let mutable go = true
let! step = e.MoveNextAsync()
go <- step
let! hasFirst = e.MoveNextAsync()

if step then
if hasFirst then
// only create hashset by the time we actually start iterating;
// initialize directly from the seq β€” taskSeq is sequential so no locking needed.
let hashSet = HashSet<_>(itemsToExclude, HashIdentity.Structural)

while go do
// if true, it was added, and therefore unique, so we return it
// if false, it existed, and therefore a duplicate, and we skip
if hashSet.Add e.Current then
yield e.Current

while! e.MoveNextAsync() do
let current = e.Current

// if true, it was added, and therefore unique, so we return it
// if false, it existed, and therefore a duplicate, and we skip
if hashSet.Add current then
yield current

let! step = e.MoveNextAsync()
go <- step

}

let distinctUntilChanged (source: TaskSeq<_>) =
Expand Down Expand Up @@ -1601,12 +1593,10 @@ module internal TaskSeqInternal =
use e = source.GetAsyncEnumerator CancellationToken.None
let groups = Dictionary<'Key, ResizeArray<'T>>(HashIdentity.Structural)
let order = ResizeArray<'Key>()
let! step = e.MoveNextAsync()
let mutable go = step

match projector with
| ProjectorAction proj ->
while go do
while! e.MoveNextAsync() do
let key = proj e.Current
let mutable ra = Unchecked.defaultof<_>

Expand All @@ -1616,11 +1606,9 @@ module internal TaskSeqInternal =
order.Add key

ra.Add e.Current
let! step = e.MoveNextAsync()
go <- step

| AsyncProjectorAction proj ->
while go do
while! e.MoveNextAsync() do
let! key = proj e.Current
let mutable ra = Unchecked.defaultof<_>

Expand All @@ -1630,8 +1618,6 @@ module internal TaskSeqInternal =
order.Add key

ra.Add e.Current
let! step = e.MoveNextAsync()
go <- step

return
Array.init order.Count (fun i ->
Expand All @@ -1646,33 +1632,27 @@ module internal TaskSeqInternal =
use e = source.GetAsyncEnumerator CancellationToken.None
let counts = Dictionary<'Key, int>(HashIdentity.Structural)
let order = ResizeArray<'Key>()
let! step = e.MoveNextAsync()
let mutable go = step

match projector with
| ProjectorAction proj ->
while go do
while! e.MoveNextAsync() do
let key = proj e.Current
let mutable count = 0

if not (counts.TryGetValue(key, &count)) then
order.Add key

counts[key] <- count + 1
let! step = e.MoveNextAsync()
go <- step

| AsyncProjectorAction proj ->
while go do
while! e.MoveNextAsync() do
let! key = proj e.Current
let mutable count = 0

if not (counts.TryGetValue(key, &count)) then
order.Add key

counts[key] <- count + 1
let! step = e.MoveNextAsync()
go <- step

return Array.init order.Count (fun i -> let k = order[i] in k, counts[k])
}
Expand All @@ -1684,29 +1664,22 @@ module internal TaskSeqInternal =
use e = source.GetAsyncEnumerator CancellationToken.None
let trueItems = ResizeArray<'T>()
let falseItems = ResizeArray<'T>()
let! step = e.MoveNextAsync()
let mutable go = step

match predicate with
| Predicate pred ->
while go do
while! e.MoveNextAsync() do
let item = e.Current

if pred item then
trueItems.Add item
else
falseItems.Add item

let! step = e.MoveNextAsync()
go <- step

| PredicateAsync pred ->
while go do
while! e.MoveNextAsync() do
let item = e.Current
let! result = pred item
if result then trueItems.Add item else falseItems.Add item
let! step = e.MoveNextAsync()
go <- step

return trueItems.ToArray(), falseItems.ToArray()
}
Expand Down