Skip to content

Commit b6cf679

Browse files
committed
+SwitchMap, +Retry(Task), +Repeat(Task), +Concat(IA<IA>), +Merge(IA<IA>)
1 parent 731b7b7 commit b6cf679

File tree

13 files changed

+741
-27
lines changed

13 files changed

+741
-27
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ finally
5151
- `Merge` - run multiple sources at once and merge their items into a single async sequence
5252
- `Never` - the async sequence never produces any items and never terminates
5353
- `Range` - emit a range of numbers
54+
- `Switch` - switch between inner async sources produced by an outer async sequence
5455
- `Timer` - emit zero after some time delay
5556
- `Using` - use a resource for the duration of a generated actual `IAsyncEnumerable`
5657
- `Zip` - combine the next values of multiple sources via a function and emit its results
@@ -89,6 +90,7 @@ finally
8990
- `SkipUntil` - skip until another async sequence signals an item or completes
9091
- `SkipWhile` - skip items while the predicate returns true, start emitting when it turns false
9192
- `SwitchIfEmpty` - switch to an alternate async sequence if the main sequence turns out to be empty
93+
- `SwitchMap` - switch to a newer mapped-in async sequence, disposing the old one, whenever the source produces an item
9294
- `Take` - take at most a given number of items and stop the async sequence after that
9395
- `TakeLast` - take the last given number of items of the source async sequence and emit those
9496
- `TakeUntil` - take items from the main source until a secondary async sequence signals an item or completes

async-enumerable-dotnet-benchmark/Program.cs

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,43 +23,25 @@ static void Main(string[] args)
2323
{
2424
for (var i = 0; i < 100000; i++)
2525
{
26-
if (i % 100 == 0)
26+
if (i % 10 == 0)
2727
{
2828
Console.WriteLine(i);
2929
}
30-
var list = AsyncEnumerable.Range(1, 5)
31-
.Publish(a =>
32-
a.Take(3).MergeWith(a.Skip(3))
33-
)
34-
.ToList().GetAsyncEnumerator();
30+
var list = AsyncEnumerable.Range(1, 100_000)
31+
.SwitchMap(v => AsyncEnumerable.Range(v, 2))
32+
.Last()
33+
.GetAsyncEnumerator();
34+
3535
try
3636
{
37-
/*
38-
var t = 0;
39-
while (!list.IsCompleted && !list.IsFaulted && !list.IsCanceled && t < 5000)
40-
{
41-
t++;
42-
}
43-
44-
while (!list.IsCompleted && !list.IsFaulted && !list.IsCanceled)
45-
{
46-
await Task.Delay(1);
47-
}
48-
*/
49-
5037
if (!list.MoveNextAsync().Result)
5138
{
5239
Console.WriteLine("Empty?");
5340
}
5441

55-
if (list.Current.Count != 5)
42+
if (list.Current != 100_001)
5643
{
57-
foreach (var e in list.Current)
58-
{
59-
Console.Write(e);
60-
Console.Write(", ");
61-
}
62-
Console.WriteLine();
44+
Console.WriteLine(list.Current);
6345
Console.ReadLine();
6446
break;
6547
}

async-enumerable-dotnet-test/AsyncEnumerableTest.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ static AsyncEnumerableTest()
210210
Defaults.Add(typeof(Func<ValueTask>), (Func<ValueTask>)(() => new ValueTask()));
211211
Defaults.Add(typeof(Func<long, bool>), (Func<long, bool>)(v => false));
212212
Defaults.Add(typeof(Func<long, Exception, bool>), (Func<long, Exception, bool>)((v, w) => false));
213+
Defaults.Add(typeof(Func<long, Exception, Task<bool>>), (Func<long, Exception, Task<bool>>)((v, w) => Task.FromResult(false)));
214+
Defaults.Add(typeof(Func<long, Task<bool>>), (Func<long, Task<bool>>)(v => Task.FromResult(false)));
213215

214216
Defaults.Add(typeof(Func<IAsyncEnumerable<int>, IAsyncEnumerable<int>>), (Func<IAsyncEnumerable<int>, IAsyncEnumerable<int>>)(w => w));
215217

async-enumerable-dotnet-test/ConcatTest.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,22 @@ await AsyncEnumerable.Range(1, 5)
5757
.ConcatWith(AsyncEnumerable.Range(4, 2))
5858
.AssertResult(1, 2, 3, 4, 5);
5959
}
60+
61+
[Fact]
62+
public async void Async_Normal()
63+
{
64+
await
65+
AsyncEnumerable.FromArray(
66+
AsyncEnumerable.Range(1, 3),
67+
AsyncEnumerable.Empty<int>(),
68+
AsyncEnumerable.FromArray(4, 5, 6, 7),
69+
AsyncEnumerable.Empty<int>(),
70+
AsyncEnumerable.Just(8),
71+
AsyncEnumerable.FromEnumerable(new[] { 9, 10 }),
72+
AsyncEnumerable.Empty<int>()
73+
)
74+
.Concat()
75+
.AssertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
76+
}
6077
}
6178
}

async-enumerable-dotnet-test/MergeTest.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,5 +112,23 @@ await AsyncEnumerable.Range(1, 5)
112112
}
113113
}
114114

115+
116+
[Fact]
117+
public async void Async_Normal()
118+
{
119+
await
120+
AsyncEnumerable.FromArray(
121+
AsyncEnumerable.Range(1, 3),
122+
AsyncEnumerable.Empty<int>(),
123+
AsyncEnumerable.FromArray(4, 5, 6, 7),
124+
AsyncEnumerable.Empty<int>(),
125+
AsyncEnumerable.Just(8),
126+
AsyncEnumerable.FromEnumerable(new[] { 9, 10 }),
127+
AsyncEnumerable.Empty<int>()
128+
)
129+
.Merge()
130+
.AssertResultSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
131+
}
132+
115133
}
116134
}

async-enumerable-dotnet-test/RepeatTest.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
using Xunit;
66
using async_enumerable_dotnet;
7+
using System.Threading.Tasks;
78

89
namespace async_enumerable_dotnet_test
910
{
@@ -34,5 +35,15 @@ await AsyncEnumerable.Range(1, 2)
3435
.AssertResult(1, 2, 1, 2, 1, 2);
3536
}
3637

38+
[Fact]
39+
public async void Limited_Condition_Task()
40+
{
41+
await AsyncEnumerable.Range(1, 2)
42+
.Repeat(async n => {
43+
await Task.Delay(100);
44+
return n < 2;
45+
})
46+
.AssertResult(1, 2, 1, 2, 1, 2);
47+
}
3748
}
3849
}

async-enumerable-dotnet-test/RetryTest.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System;
66
using Xunit;
77
using async_enumerable_dotnet;
8+
using System.Threading.Tasks;
89

910
namespace async_enumerable_dotnet_test
1011
{
@@ -37,5 +38,17 @@ await AsyncEnumerable.Range(1, 2)
3738
.Retry((idx, ex) => idx < 2)
3839
.AssertFailure(typeof(InvalidOperationException), 1, 2, 1, 2, 1, 2);
3940
}
41+
42+
[Fact]
43+
public async void Retry_Condition_Task()
44+
{
45+
await AsyncEnumerable.Range(1, 2)
46+
.ConcatWith(AsyncEnumerable.Error<int>(new InvalidOperationException()))
47+
.Retry(async (idx, ex) => {
48+
await Task.Delay(100);
49+
return idx < 2;
50+
})
51+
.AssertFailure(typeof(InvalidOperationException), 1, 2, 1, 2, 1, 2);
52+
}
4053
}
4154
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) David Karnok & Contributors.
2+
// Licensed under the Apache 2.0 License.
3+
// See LICENSE file in the project root for full license information.
4+
5+
using Xunit;
6+
using async_enumerable_dotnet;
7+
using System;
8+
9+
namespace async_enumerable_dotnet_test
10+
{
11+
public class SwitchMapTest
12+
{
13+
[Fact]
14+
public async void Empty()
15+
{
16+
await AsyncEnumerable.Empty<int>()
17+
.SwitchMap(v => AsyncEnumerable.Range(1, 5))
18+
.AssertResult();
19+
}
20+
21+
[Fact]
22+
public async void Single()
23+
{
24+
await AsyncEnumerable.Just(2)
25+
.SwitchMap(v => AsyncEnumerable.Range(v, 5))
26+
.AssertResult(2, 3, 4, 5, 6);
27+
}
28+
29+
[Fact]
30+
public async void Many_Switched()
31+
{
32+
await AsyncEnumerable.Range(1, 5)
33+
.SwitchMap(v => AsyncEnumerable.Range(v * 10, 5))
34+
.Last()
35+
.AssertResult(54);
36+
}
37+
38+
39+
[Fact]
40+
public async void Many_Switched_Lots()
41+
{
42+
await AsyncEnumerable.Range(1, 1000)
43+
.SwitchMap(v => AsyncEnumerable.Range(v * 1000, 100))
44+
.Last()
45+
.AssertResult(1_000_099);
46+
}
47+
48+
[Fact]
49+
public async void Many_Switched_Lots_2()
50+
{
51+
await AsyncEnumerable.Range(1, 100_000)
52+
.SwitchMap(v => AsyncEnumerable.Range(v, 2))
53+
.Last()
54+
.AssertResult(100_001);
55+
}
56+
57+
[Fact]
58+
public async void Error_Outer()
59+
{
60+
await AsyncEnumerable.Just(2).ConcatWith(AsyncEnumerable.Error<int>(new InvalidOperationException()))
61+
.SwitchMap(v => AsyncEnumerable.Range(v, 5))
62+
.AssertFailure(typeof(InvalidOperationException), 2, 3, 4, 5, 6);
63+
}
64+
65+
[Fact]
66+
public async void Error_Inner()
67+
{
68+
await AsyncEnumerable.Just(2)
69+
.SwitchMap(v => AsyncEnumerable.Range(v, 5)
70+
.ConcatWith(AsyncEnumerable.Error<int>(new InvalidOperationException()))
71+
)
72+
.AssertFailure(typeof(InvalidOperationException), 2, 3, 4, 5, 6);
73+
}
74+
75+
[Fact]
76+
public async void Mapper_Crash()
77+
{
78+
await AsyncEnumerable.Just(2)
79+
.SwitchMap<int, int>(v => throw new InvalidOperationException())
80+
.AssertFailure(typeof(InvalidOperationException));
81+
}
82+
}
83+
}

async-enumerable-dotnet/AsyncEnumerable.cs

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ public static IAsyncEnumerable<T> Repeat<T>(this IAsyncEnumerable<T> source, lon
921921
/// <typeparam name="T">The element type.</typeparam>
922922
/// <param name="source">The source async sequence to repeatedly relay items of.</param>
923923
/// <param name="condition">The function called when the current run completes with
924-
/// the current run index (zero-based) and should return to repeat the sequence once more, false to end it.</param>
924+
/// the current run index (zero-based) and should return true to repeat the sequence once more, false to end it.</param>
925925
/// <returns>The new IAsyncEnumerable instance.</returns>
926926
public static IAsyncEnumerable<T> Repeat<T>(this IAsyncEnumerable<T> source, Func<long, bool> condition)
927927
{
@@ -930,6 +930,21 @@ public static IAsyncEnumerable<T> Repeat<T>(this IAsyncEnumerable<T> source, Fun
930930
return new Repeat<T>(source, long.MaxValue, condition);
931931
}
932932

933+
/// <summary>
934+
/// Repeatedly relay the source async sequence, once it completes the previous time, when the function returns true, ending the sequence otherwise.
935+
/// </summary>
936+
/// <typeparam name="T">The element type.</typeparam>
937+
/// <param name="source">The source async sequence to repeatedly relay items of.</param>
938+
/// <param name="condition">The function called when the current run completes with
939+
/// the current run index (zero-based) and should return a task with true to repeat the sequence once more, false to end it.</param>
940+
/// <returns>The new IAsyncEnumerable instance.</returns>
941+
public static IAsyncEnumerable<T> Repeat<T>(this IAsyncEnumerable<T> source, Func<long, Task<bool>> condition)
942+
{
943+
RequireNonNull(source, nameof(source));
944+
RequireNonNull(condition, nameof(condition));
945+
return new RepeatTask<T>(source, condition);
946+
}
947+
933948
/// <summary>
934949
/// Retry a possibly failing async sequence, optionally a limited number of times before giving up.
935950
/// </summary>
@@ -957,6 +972,20 @@ public static IAsyncEnumerable<T> Retry<T>(this IAsyncEnumerable<T> source, Func
957972
return new Retry<T>(source, long.MaxValue, condition);
958973
}
959974

975+
/// <summary>
976+
/// Retry a possibly failing async sequence if the condition returns true for the Exception and/or retry index.
977+
/// </summary>
978+
/// <typeparam name="T">The element type of the source async sequence.</typeparam>
979+
/// <param name="source">The source async sequence that could fail and should be repeated.</param>
980+
/// <param name="condition">Called when the sequence fails with the retry index (zero-based) and last failure Exception and should return a task with true if the sequence should be retried.</param>
981+
/// <returns>The new IAsyncEnumerable instance.</returns>
982+
public static IAsyncEnumerable<T> Retry<T>(this IAsyncEnumerable<T> source, Func<long, Exception, Task<bool>> condition)
983+
{
984+
RequireNonNull(source, nameof(source));
985+
RequireNonNull(condition, nameof(condition));
986+
return new RetryTask<T>(source, condition);
987+
}
988+
960989
/// <summary>
961990
/// Skips the last given number of items from the source
962991
/// async sequence.
@@ -1442,5 +1471,64 @@ public static IAsyncEnumerable<TResult> Replay<TSource, TResult>(this IAsyncEnum
14421471
RequireNonNull(func, nameof(func));
14431472
return new Replay<TSource, TResult>(source, func);
14441473
}
1474+
1475+
/// <summary>
1476+
/// Switches to a newer source async sequence, disposing the old one,
1477+
/// when the source async sequence produces
1478+
/// a value and is mapped to an async sequence via a function.
1479+
/// </summary>
1480+
/// <typeparam name="TSource">The source value type.</typeparam>
1481+
/// <typeparam name="TResult">The result value type.</typeparam>
1482+
/// <param name="source">The source to map into async sequences and switch between them.</param>
1483+
/// <param name="mapper">The function that receives the source item and should
1484+
/// return an async sequence to be run.</param>
1485+
/// <returns>The new IAsyncEnumerable sequence.</returns>
1486+
public static IAsyncEnumerable<TResult> SwitchMap<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> mapper)
1487+
{
1488+
RequireNonNull(source, nameof(source));
1489+
RequireNonNull(mapper, nameof(mapper));
1490+
return new SwitchMap<TSource, TResult>(source, mapper);
1491+
}
1492+
1493+
/// <summary>
1494+
/// Switches to a newer source async sequence, disposing the old one,
1495+
/// when the source async sequence produces a new inner async sequence.
1496+
/// </summary>
1497+
/// <typeparam name="TSource">The source value type.</typeparam>
1498+
/// <param name="sources">The async sequence of async sequences to switch between.</param>
1499+
/// <returns>The new IAsyncEnumerable sequence.</returns>
1500+
public static IAsyncEnumerable<TSource> Switch<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
1501+
{
1502+
RequireNonNull(sources, nameof(sources));
1503+
return sources.SwitchMap(v => v);
1504+
}
1505+
1506+
/// <summary>
1507+
/// Runs the inner async sequences, produced by an outer async sequence,
1508+
/// one after the other and relays their items.
1509+
/// </summary>
1510+
/// <typeparam name="TSource">The value type.</typeparam>
1511+
/// <param name="sources">The async sequence of async sequences to concatenate.</param>
1512+
/// <returns>The new IAsyncEnumerable sequence.</returns>
1513+
public static IAsyncEnumerable<TSource> Concat<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
1514+
{
1515+
RequireNonNull(sources, nameof(sources));
1516+
return sources.ConcatMap(v => v);
1517+
}
1518+
1519+
/// <summary>
1520+
/// Runs the inner async sequences, produced by an outer async sequence,
1521+
/// some or all concurrently and relays their items single a serialized async sequence.
1522+
/// </summary>
1523+
/// <typeparam name="TSource">The value type.</typeparam>
1524+
/// <param name="sources">The async sequence of async sequences to merge at once.</param>
1525+
/// <param name="maxConcurrency">The maximum number of inner sequences to run at once.</param>
1526+
/// <param name="prefetch">The number of items to prefetch from each inner async sequence.</param>
1527+
/// <returns>The new IAsyncEnumerable instance.</returns>
1528+
public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources, int maxConcurrency = int.MaxValue, int prefetch = 32)
1529+
{
1530+
RequireNonNull(sources, nameof(sources));
1531+
return sources.FlatMap(v => v, maxConcurrency, prefetch);
1532+
}
14451533
}
14461534
}

0 commit comments

Comments
 (0)