Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
### 4.11.0

* Performance: `mapiAsync` — replaced `asyncSeq`-builder + `collect` implementation with a direct optimised enumerator (`OptimizedMapiAsyncEnumerator`), eliminating `collect` overhead and bringing per-element cost in line with `mapAsync`. Benchmarks added in `AsyncSeqMapiBenchmarks`.
* Design parity with FSharp.Control.TaskSeq (#277, batch 2):
* Added `AsyncSeq.tryTail` — returns `None` if the sequence is empty; otherwise returns `Some` of the tail. Safe counterpart to `tail`. Mirrors `TaskSeq.tryTail`.
* Added `AsyncSeq.where` / `AsyncSeq.whereAsync` — aliases for `filter` / `filterAsync`, mirroring the naming convention in `TaskSeq` and F# 8 collection expressions.
Expand Down
31 changes: 25 additions & 6 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,29 @@
disposed <- true
source.Dispose()

// Optimized mapiAsync enumerator: avoids asyncSeq builder + collect overhead by
// maintaining the index in a mutable field and iterating the source directly.
type private OptimizedMapiAsyncEnumerator<'T, 'TResult>(source: IAsyncSeqEnumerator<'T>, f: int64 -> 'T -> Async<'TResult>) =
let mutable disposed = false
let mutable index = 0L

interface IAsyncSeqEnumerator<'TResult> with
member _.MoveNext() = async {
let! moveResult = source.MoveNext()
match moveResult with
| None -> return None
| Some value ->
let i = index
index <- index + 1L
let! mapped = f i value
return Some mapped
}

member _.Dispose() =
if not disposed then
disposed <- true
source.Dispose()

// Optimized filterAsync enumerator that avoids computation builder overhead
type private OptimizedFilterAsyncEnumerator<'T>(source: IAsyncSeqEnumerator<'T>, f: 'T -> Async<bool>) =
let mutable disposed = false
Expand Down Expand Up @@ -1039,12 +1062,8 @@
| _ ->
AsyncSeqImpl(fun () -> new OptimizedMapAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'TResult>) :> AsyncSeq<'TResult>

let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> = asyncSeq {
let i = ref 0L
for itm in source do
let! v = f i.Value itm
i := i.Value + 1L
yield v }
let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> =
AsyncSeqImpl(fun () -> new OptimizedMapiAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'TResult>) :> AsyncSeq<'TResult>

#if !FABLE_COMPILER
let mapAsyncParallel (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
Expand Down Expand Up @@ -2600,7 +2619,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2622 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2622 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
36 changes: 32 additions & 4 deletions tests/FSharp.Control.AsyncSeq.Benchmarks/AsyncSeqBenchmarks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ type AsyncSeqPipelineBenchmarks() =
[<MemoryDiagnoser>]
[<SimpleJob(RuntimeMoniker.Net80)>]
type AsyncSeqSliceBenchmarks() =

[<Params(1000, 10000)>]
member val ElementCount = 0 with get, set

/// Benchmark take: stops after N elements
[<Benchmark(Baseline = true)>]
member this.Take() =
Expand Down Expand Up @@ -214,6 +210,38 @@ type AsyncSeqSliceBenchmarks() =
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

[<Params(1000, 10000)>]
member val ElementCount = 0 with get, set

/// Benchmarks for map and mapi variants — ensures the direct-enumerator optimisation
/// for mapiAsync is visible and comparable against mapAsync.
[<MemoryDiagnoser>]
[<SimpleJob(RuntimeMoniker.Net80)>]
type AsyncSeqMapiBenchmarks() =
/// Baseline: mapAsync (already uses direct enumerator)
[<Benchmark(Baseline = true)>]
member this.MapAsync() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.mapAsync (fun x -> async.Return (x * 2))
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// mapiAsync — now uses direct enumerator; should be close to mapAsync cost
[<Benchmark>]
member this.MapiAsync() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.mapiAsync (fun i x -> async.Return (i, x * 2))
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// mapi — synchronous projection variant; dispatches through mapiAsync
[<Benchmark>]
member this.Mapi() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.mapi (fun i x -> (i, x * 2))
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// Entry point for running benchmarks.
/// Delegates directly to BenchmarkSwitcher so all BenchmarkDotNet CLI options
/// (--filter, --job short, --exporters, etc.) work out of the box.
Expand Down
Loading