diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 90c6817..184a300 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,6 @@ ### 4.11.0 +* Performance: `mapiAsync` — replaced `asyncSeq`-builder + `collect` implementation with a direct optimised enumerator (`OptimizedMapiAsyncEnumerator`), eliminating `collect` overhead and bringing per-element cost in line with `mapAsync`. Benchmarks added in `AsyncSeqMapiBenchmarks`. * Design parity with FSharp.Control.TaskSeq (#277, batch 2): * Added `AsyncSeq.tryTail` — returns `None` if the sequence is empty; otherwise returns `Some` of the tail. Safe counterpart to `tail`. Mirrors `TaskSeq.tryTail`. * Added `AsyncSeq.where` / `AsyncSeq.whereAsync` — aliases for `filter` / `filterAsync`, mirroring the naming convention in `TaskSeq` and F# 8 collection expressions. diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index a1bfb1e..07d339f 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -933,6 +933,29 @@ module AsyncSeq = disposed <- true source.Dispose() + // Optimized mapiAsync enumerator: avoids asyncSeq builder + collect overhead by + // maintaining the index in a mutable field and iterating the source directly. + type private OptimizedMapiAsyncEnumerator<'T, 'TResult>(source: IAsyncSeqEnumerator<'T>, f: int64 -> 'T -> Async<'TResult>) = + let mutable disposed = false + let mutable index = 0L + + interface IAsyncSeqEnumerator<'TResult> with + member _.MoveNext() = async { + let! moveResult = source.MoveNext() + match moveResult with + | None -> return None + | Some value -> + let i = index + index <- index + 1L + let! mapped = f i value + return Some mapped + } + + member _.Dispose() = + if not disposed then + disposed <- true + source.Dispose() + // Optimized filterAsync enumerator that avoids computation builder overhead type private OptimizedFilterAsyncEnumerator<'T>(source: IAsyncSeqEnumerator<'T>, f: 'T -> Async) = let mutable disposed = false @@ -1039,12 +1062,8 @@ module AsyncSeq = | _ -> AsyncSeqImpl(fun () -> new OptimizedMapAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'TResult>) :> AsyncSeq<'TResult> - let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> = asyncSeq { - let i = ref 0L - for itm in source do - let! v = f i.Value itm - i := i.Value + 1L - yield v } + let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> = + AsyncSeqImpl(fun () -> new OptimizedMapiAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'TResult>) :> AsyncSeq<'TResult> #if !FABLE_COMPILER let mapAsyncParallel (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq { diff --git a/tests/FSharp.Control.AsyncSeq.Benchmarks/AsyncSeqBenchmarks.fs b/tests/FSharp.Control.AsyncSeq.Benchmarks/AsyncSeqBenchmarks.fs index 0f692e6..dae0a6b 100644 --- a/tests/FSharp.Control.AsyncSeq.Benchmarks/AsyncSeqBenchmarks.fs +++ b/tests/FSharp.Control.AsyncSeq.Benchmarks/AsyncSeqBenchmarks.fs @@ -183,10 +183,6 @@ type AsyncSeqPipelineBenchmarks() = [] [] type AsyncSeqSliceBenchmarks() = - - [] - member val ElementCount = 0 with get, set - /// Benchmark take: stops after N elements [] member this.Take() = @@ -214,6 +210,38 @@ type AsyncSeqSliceBenchmarks() = |> AsyncSeq.iterAsync (fun _ -> async.Return()) |> Async.RunSynchronously + [] + member val ElementCount = 0 with get, set + +/// Benchmarks for map and mapi variants — ensures the direct-enumerator optimisation +/// for mapiAsync is visible and comparable against mapAsync. +[] +[] +type AsyncSeqMapiBenchmarks() = + /// Baseline: mapAsync (already uses direct enumerator) + [] + member this.MapAsync() = + AsyncSeq.replicate this.ElementCount 1 + |> AsyncSeq.mapAsync (fun x -> async.Return (x * 2)) + |> AsyncSeq.iterAsync (fun _ -> async.Return()) + |> Async.RunSynchronously + + /// mapiAsync — now uses direct enumerator; should be close to mapAsync cost + [] + member this.MapiAsync() = + AsyncSeq.replicate this.ElementCount 1 + |> AsyncSeq.mapiAsync (fun i x -> async.Return (i, x * 2)) + |> AsyncSeq.iterAsync (fun _ -> async.Return()) + |> Async.RunSynchronously + + /// mapi — synchronous projection variant; dispatches through mapiAsync + [] + member this.Mapi() = + AsyncSeq.replicate this.ElementCount 1 + |> AsyncSeq.mapi (fun i x -> (i, x * 2)) + |> AsyncSeq.iterAsync (fun _ -> async.Return()) + |> Async.RunSynchronously + /// Entry point for running benchmarks. /// Delegates directly to BenchmarkSwitcher so all BenchmarkDotNet CLI options /// (--filter, --job short, --exporters, etc.) work out of the box.