Skip to content

Commit 3741210

Browse files
committed
+first, +last, +switchIfEmpty
1 parent 47440f3 commit 3741210

File tree

8 files changed

+500
-1
lines changed

8 files changed

+500
-1
lines changed

src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ default <S> AsyncEnumerable<T> retryWhen(Function<? super Throwable, ? extends C
325325
}
326326

327327
default <S> AsyncEnumerable<T> retryWhen(Supplier<S> stateSupplier, BiFunction<? super S, ? super Throwable, ? extends CompletionStage<Boolean>> completer) {
328-
return new AsyncRetryWhen<T, S>(this, stateSupplier, completer);
328+
return new AsyncRetryWhen<>(this, stateSupplier, completer);
329329
}
330330

331331
default <K> AsyncEnumerable<GroupedAsyncEnumerable<T, K>> groupBy(Function<? super T, ? extends K> keySelector) {
@@ -406,6 +406,18 @@ default <R> AsyncEnumerable<R> publish(Function<? super AsyncEnumerable<T>, ? ex
406406
return new AsyncPublish<>(this, handler);
407407
}
408408

409+
default AsyncEnumerable<T> switchIfEmpty(AsyncEnumerable<T> fallback) {
410+
return new AsyncSwitchIfEmpty<>(this, fallback);
411+
}
412+
413+
default AsyncEnumerable<T> first() {
414+
return new AsyncFirst<>(this);
415+
}
416+
417+
default AsyncEnumerable<T> last() {
418+
return new AsyncLast<>(this);
419+
}
420+
409421
// -------------------------------------------------------------------------------------
410422
// Instance consumers
411423

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.concurrent.CompletionStage;
20+
21+
final class AsyncFirst<T> implements AsyncEnumerable<T> {
22+
23+
final AsyncEnumerable<T> source;
24+
25+
AsyncFirst(AsyncEnumerable<T> source) {
26+
this.source = source;
27+
}
28+
29+
@Override
30+
public AsyncEnumerator<T> enumerator() {
31+
return new FirstEnumerator<>(source.enumerator());
32+
}
33+
34+
static final class FirstEnumerator<T> implements AsyncEnumerator<T> {
35+
36+
final AsyncEnumerator<T> source;
37+
38+
boolean once;
39+
40+
FirstEnumerator(AsyncEnumerator<T> source) {
41+
this.source = source;
42+
}
43+
44+
@Override
45+
public CompletionStage<Boolean> moveNext() {
46+
if (once) {
47+
source.cancel();
48+
return FALSE;
49+
}
50+
once = true;
51+
return source.moveNext();
52+
}
53+
54+
@Override
55+
public T current() {
56+
return source.current();
57+
}
58+
59+
@Override
60+
public void cancel() {
61+
source.cancel();
62+
}
63+
}
64+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.function.BiConsumer;
22+
23+
final class AsyncLast<T> implements AsyncEnumerable<T> {
24+
25+
final AsyncEnumerable<T> source;
26+
27+
AsyncLast(AsyncEnumerable<T> source) {
28+
this.source = source;
29+
}
30+
31+
@Override
32+
public AsyncEnumerator<T> enumerator() {
33+
return new LastEnumerator<>(source.enumerator());
34+
}
35+
36+
static final class LastEnumerator<T>
37+
extends AtomicInteger
38+
implements AsyncEnumerator<T>, BiConsumer<Boolean, Throwable> {
39+
40+
final AsyncEnumerator<T> source;
41+
42+
volatile boolean cancelled;
43+
44+
CompletableFuture<Boolean> completable;
45+
46+
T result;
47+
48+
boolean once;
49+
50+
boolean hasValue;
51+
52+
LastEnumerator(AsyncEnumerator<T> source) {
53+
this.source = source;
54+
}
55+
56+
@Override
57+
public CompletionStage<Boolean> moveNext() {
58+
if (once) {
59+
result = null;
60+
return FALSE;
61+
}
62+
once = true;
63+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
64+
completable = cf;
65+
nextSource();
66+
return cf;
67+
}
68+
69+
void nextSource() {
70+
if (getAndIncrement() == 0) {
71+
do {
72+
source.moveNext().whenComplete(this);
73+
} while (decrementAndGet() != 0);
74+
}
75+
}
76+
77+
@Override
78+
public T current() {
79+
return result;
80+
}
81+
82+
@Override
83+
public void cancel() {
84+
cancelled = true;
85+
source.cancel();
86+
}
87+
88+
@Override
89+
public void accept(Boolean aBoolean, Throwable throwable) {
90+
if (throwable != null) {
91+
completable.completeExceptionally(throwable);
92+
return;
93+
}
94+
95+
if (aBoolean) {
96+
if (!hasValue) {
97+
hasValue = true;
98+
}
99+
result = source.current();
100+
nextSource();
101+
} else {
102+
completable.complete(hasValue);
103+
}
104+
}
105+
}
106+
}

src/main/java/hu/akarnokd/asyncenum/AsyncPublish.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ static final class PublishCoordinator<T, R> implements BiConsumer<Boolean, Throw
6767
volatile boolean sourceDone;
6868
volatile Throwable sourceError;
6969

70+
@SuppressWarnings("unchecked")
7071
PublishCoordinator() {
7172
enumerators = new AtomicReference<>(EMPTY);
7273
enumeratorWip = new AtomicInteger();
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.function.BiConsumer;
22+
23+
final class AsyncSwitchIfEmpty<T> implements AsyncEnumerable<T> {
24+
25+
final AsyncEnumerable<T> source;
26+
27+
final AsyncEnumerable<T> fallback;
28+
29+
AsyncSwitchIfEmpty(AsyncEnumerable<T> source, AsyncEnumerable<T> fallback) {
30+
this.source = source;
31+
this.fallback = fallback;
32+
}
33+
34+
@Override
35+
public AsyncEnumerator<T> enumerator() {
36+
return new SwitchIfEmptyEnumerator<>(source.enumerator(), fallback);
37+
}
38+
39+
static final class SwitchIfEmptyEnumerator<T>
40+
implements AsyncEnumerator<T>, BiConsumer<Boolean, Throwable> {
41+
42+
final AtomicReference<AsyncEnumerator<T>> source;
43+
44+
AsyncEnumerable<T> fallback;
45+
46+
CompletableFuture<Boolean> completable;
47+
48+
T result;
49+
50+
boolean hasValue;
51+
52+
SwitchIfEmptyEnumerator(AsyncEnumerator<T> source, AsyncEnumerable<T> fallback) {
53+
this.source = new AtomicReference<>(source);
54+
this.fallback = fallback;
55+
}
56+
57+
@Override
58+
public CompletionStage<Boolean> moveNext() {
59+
result = null;
60+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
61+
completable = cf;
62+
source.getPlain().moveNext().whenComplete(this);
63+
return cf;
64+
}
65+
66+
@Override
67+
public T current() {
68+
return result;
69+
}
70+
71+
@Override
72+
public void cancel() {
73+
AsyncEnumeratorHelper.cancel(source);
74+
}
75+
76+
@Override
77+
public void accept(Boolean aBoolean, Throwable throwable) {
78+
if (throwable != null) {
79+
completable.completeExceptionally(throwable);
80+
return;
81+
}
82+
83+
if (aBoolean) {
84+
if (!hasValue) {
85+
hasValue = true;
86+
}
87+
result = source.getPlain().current();
88+
completable.complete(true);
89+
} else {
90+
if (hasValue) {
91+
completable.complete(false);
92+
} else {
93+
hasValue = true;
94+
AsyncEnumerator<T> fb = fallback.enumerator();
95+
fallback = null;
96+
if (AsyncEnumeratorHelper.replace(source, fb)) {
97+
fb.moveNext().whenComplete(this);
98+
}
99+
}
100+
}
101+
}
102+
}
103+
}
104+
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import org.junit.Test;
20+
21+
import java.io.IOException;
22+
23+
public class AsyncFirstTest {
24+
25+
@Test
26+
public void simple() {
27+
TestHelper.assertResult(
28+
AsyncEnumerable.range(1, 5)
29+
.first(),
30+
1
31+
);
32+
}
33+
34+
35+
@Test
36+
public void take() {
37+
TestHelper.assertResult(
38+
AsyncEnumerable.range(1, 5)
39+
.first()
40+
.take(1),
41+
1
42+
);
43+
}
44+
45+
@Test
46+
public void empty() {
47+
TestHelper.assertResult(
48+
AsyncEnumerable.empty()
49+
.first()
50+
);
51+
}
52+
53+
@Test
54+
public void error() {
55+
TestHelper.assertFailure(
56+
AsyncEnumerable.error(new IOException())
57+
.first(),
58+
IOException.class
59+
);
60+
}
61+
}

0 commit comments

Comments
 (0)