-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathGatherers.java
More file actions
709 lines (639 loc) · 29.9 KB
/
Gatherers.java
File metadata and controls
709 lines (639 loc) · 29.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
/*
* Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;
import jdk.internal.access.SharedSecrets;
import jdk.internal.vm.annotation.ForceInline;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Gatherer.Integrator;
import java.util.stream.Gatherer.Downstream;
/**
* Implementations of {@link Gatherer} that provide useful intermediate
* operations, such as windowing functions, folding functions,
* transforming elements concurrently, etc.
*
* @since 24
*/
@NullMarked
public final class Gatherers {
private Gatherers() { } // This class is not intended to be instantiated
// Public built-in Gatherers and factory methods for them
/**
* Returns a Gatherer that gathers elements into windows
* -- encounter-ordered groups of elements -- of a fixed size.
* If the stream is empty then no window will be produced.
* The last window may contain fewer elements than the supplied window size.
*
* <p>Example:
* {@snippet lang = java:
* // will contain: [[1, 2, 3], [4, 5, 6], [7, 8]]
* List<List<Integer>> windows =
* Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();
* }
*
* @implSpec Each window produced is an unmodifiable List; calls to any
* mutator method will always cause {@code UnsupportedOperationException}
* to be thrown. There are no guarantees on the implementation type or
* serializability of the produced Lists.
*
* @apiNote For efficiency reasons, windows may be allocated contiguously
* and eagerly. This means that choosing large window sizes for
* small streams may use excessive memory for the duration of
* evaluation of this operation.
*
* @param windowSize the size of the windows
* @param <TR> the type of elements the returned gatherer consumes
* and the contents of the windows it produces
* @return a new gatherer which groups elements into fixed-size windows
* @throws IllegalArgumentException when {@code windowSize} is less than 1
*/
public static <TR extends @Nullable Object> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize) {
if (windowSize < 1)
throw new IllegalArgumentException("'windowSize' must be greater than zero");
class FixedWindow {
Object[] window;
int at;
FixedWindow() {
at = 0;
window = new Object[windowSize];
}
boolean integrate(TR element, Downstream<? super List<TR>> downstream) {
window[at++] = element;
if (at < windowSize) {
return true;
} else {
final var oldWindow = window;
window = new Object[windowSize];
at = 0;
return downstream.push(
SharedSecrets.getJavaUtilCollectionAccess()
.listFromTrustedArrayNullsAllowed(oldWindow)
);
}
}
void finish(Downstream<? super List<TR>> downstream) {
if (at > 0 && !downstream.isRejecting()) {
var lastWindow = new Object[at];
System.arraycopy(window, 0, lastWindow, 0, at);
window = null;
at = 0;
downstream.push(
SharedSecrets.getJavaUtilCollectionAccess()
.listFromTrustedArrayNullsAllowed(lastWindow)
);
}
}
}
return Gatherer.<TR, FixedWindow, List<TR>>ofSequential(
// Initializer
FixedWindow::new,
// Integrator
Integrator.<FixedWindow, TR, List<TR>>ofGreedy(FixedWindow::integrate),
// Finisher
FixedWindow::finish
);
}
/**
* Returns a Gatherer that gathers elements into windows --
* encounter-ordered groups of elements -- of a given size, where each
* subsequent window includes all elements of the previous window except
* for the least recent, and adds the next element in the stream.
* If the stream is empty then no window will be produced. If the size of
* the stream is smaller than the window size then only one window will
* be produced, containing all elements in the stream.
*
* <p>Example:
* {@snippet lang = java:
* // will contain: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
* List<List<Integer>> windows2 =
* Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();
*
* // will contain: [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]
* List<List<Integer>> windows6 =
* Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(6)).toList();
* }
*
* @implSpec Each window produced is an unmodifiable List; calls to any
* mutator method will always cause {@code UnsupportedOperationException}
* to be thrown. There are no guarantees on the implementation type or
* serializability of the produced Lists.
*
* @apiNote For efficiency reasons, windows may be allocated contiguously
* and eagerly. This means that choosing large window sizes for
* small streams may use excessive memory for the duration of
* evaluation of this operation.
*
* @param windowSize the size of the windows
* @param <TR> the type of elements the returned gatherer consumes
* and the contents of the windows it produces
* @return a new gatherer which groups elements into sliding windows
* @throws IllegalArgumentException when windowSize is less than 1
*/
public static <TR extends @Nullable Object> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize) {
if (windowSize < 1)
throw new IllegalArgumentException("'windowSize' must be greater than zero");
class SlidingWindow {
Object[] window;
int at;
boolean firstWindow;
SlidingWindow() {
firstWindow = true;
at = 0;
window = new Object[windowSize];
}
boolean integrate(TR element, Downstream<? super List<TR>> downstream) {
window[at++] = element;
if (at < windowSize) {
return true;
} else {
final var oldWindow = window;
final var newWindow = new Object[windowSize];
System.arraycopy(oldWindow,1, newWindow, 0, windowSize - 1);
window = newWindow;
at -= 1;
firstWindow = false;
return downstream.push(
SharedSecrets.getJavaUtilCollectionAccess()
.listFromTrustedArrayNullsAllowed(oldWindow)
);
}
}
void finish(Downstream<? super List<TR>> downstream) {
if (firstWindow && at > 0 && !downstream.isRejecting()) {
var lastWindow = new Object[at];
System.arraycopy(window, 0, lastWindow, 0, at);
window = null;
at = 0;
downstream.push(
SharedSecrets.getJavaUtilCollectionAccess()
.listFromTrustedArrayNullsAllowed(lastWindow)
);
}
}
}
return Gatherer.<TR, SlidingWindow, List<TR>>ofSequential(
// Initializer
SlidingWindow::new,
// Integrator
Integrator.<SlidingWindow, TR, List<TR>>ofGreedy(SlidingWindow::integrate),
// Finisher
SlidingWindow::finish
);
}
/**
* Returns a Gatherer that performs an ordered, <i>reduction-like</i>,
* transformation for scenarios where no combiner-function can be
* implemented, or for reductions which are intrinsically
* order-dependent.
*
* @implSpec If no exceptions are thrown during processing, then this
* operation only ever produces a single element.
*
* <p>Example:
* {@snippet lang = java:
* // will contain: Optional["123456789"]
* Optional<String> numberString =
* Stream.of(1,2,3,4,5,6,7,8,9)
* .gather(
* Gatherers.fold(() -> "", (string, number) -> string + number)
* )
* .findFirst();
* }
*
* @see java.util.stream.Stream#reduce(Object, BinaryOperator)
*
* @param initial the identity value for the fold operation
* @param folder the folding function
* @param <T> the type of elements the returned gatherer consumes
* @param <R> the type of elements the returned gatherer produces
* @return a new Gatherer
* @throws NullPointerException if any of the parameters are {@code null}
*/
public static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T, ?, R> fold(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> folder) {
Objects.requireNonNull(initial, "'initial' must not be null");
Objects.requireNonNull(folder, "'folder' must not be null");
class State {
R value = initial.get();
State() {}
}
return Gatherer.ofSequential(
State::new,
Integrator.ofGreedy((state, element, downstream) -> {
state.value = folder.apply(state.value, element);
return true;
}),
(state, downstream) -> downstream.push(state.value)
);
}
/**
* Returns a Gatherer that performs a Prefix Scan -- an incremental
* accumulation -- using the provided functions. Starting with an
* initial value obtained from the {@code Supplier}, each subsequent
* value is obtained by applying the {@code BiFunction} to the current
* value and the next input element, after which the resulting value is
* produced downstream.
*
* <p>Example:
* {@snippet lang = java:
* // will contain: ["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"]
* List<String> numberStrings =
* Stream.of(1,2,3,4,5,6,7,8,9)
* .gather(
* Gatherers.scan(() -> "", (string, number) -> string + number)
* )
* .toList();
* }
*
* @param initial the supplier of the initial value for the scanner
* @param scanner the function to apply for each element
* @param <T> the type of element which this gatherer consumes
* @param <R> the type of element which this gatherer produces
* @return a new Gatherer which performs a prefix scan
* @throws NullPointerException if any of the parameters are {@code null}
*/
public static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T, ?, R> scan(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> scanner) {
Objects.requireNonNull(initial, "'initial' must not be null");
Objects.requireNonNull(scanner, "'scanner' must not be null");
class State {
R current = initial.get();
boolean integrate(T element, Downstream<? super R> downstream) {
return downstream.push(current = scanner.apply(current, element));
}
}
return Gatherer.ofSequential(State::new,
Integrator.<State,T, R>ofGreedy(State::integrate));
}
/**
* An operation which executes a function concurrently
* with a configured level of max concurrency, using
* <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>.
* This operation preserves the ordering of the stream.
*
* @apiNote In progress tasks will be attempted to be cancelled,
* on a best-effort basis, in situations where the downstream no longer
* wants to receive any more elements.
*
* @implSpec If a result of the function is to be pushed downstream but
* instead the function completed exceptionally then the corresponding
* exception will instead be rethrown by this method as an instance of
* {@link RuntimeException}, after which any remaining tasks are canceled.
*
* @param maxConcurrency the maximum concurrency desired
* @param mapper a function to be executed concurrently
* @param <T> the type of input
* @param <R> the type of output
* @return a new Gatherer
* @throws IllegalArgumentException if {@code maxConcurrency} is less than 1
* @throws NullPointerException if {@code mapper} is {@code null}
*/
public static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T,?,R> mapConcurrent(
final int maxConcurrency,
final Function<? super T, ? extends R> mapper) {
if (maxConcurrency < 1)
throw new IllegalArgumentException(
"'maxConcurrency' must be greater than 0");
Objects.requireNonNull(mapper, "'mapper' must not be null");
class State {
// ArrayDeque default initial size is 16
final ArrayDeque<Future<R>> window =
new ArrayDeque<>(Math.min(maxConcurrency, 16));
final Semaphore windowLock = new Semaphore(maxConcurrency);
final boolean integrate(T element,
Downstream<? super R> downstream) {
if (!downstream.isRejecting())
createTaskFor(element);
return flush(0, downstream);
}
final void createTaskFor(T element) {
windowLock.acquireUninterruptibly();
var task = new FutureTask<R>(() -> {
try {
return mapper.apply(element);
} finally {
windowLock.release();
}
});
var wasAddedToWindow = window.add(task);
assert wasAddedToWindow;
Thread.startVirtualThread(task);
}
final boolean flush(long atLeastN,
Downstream<? super R> downstream) {
boolean proceed = !downstream.isRejecting();
boolean interrupted = false;
try {
Future<R> current;
while (proceed
&& (current = window.peek()) != null
&& (current.isDone() || atLeastN > 0)) {
proceed &= downstream.push(current.get());
atLeastN -= 1;
var correctRemoval = window.pop() == current;
assert correctRemoval;
}
} catch(InterruptedException ie) {
proceed = false;
interrupted = true;
} catch (ExecutionException e) {
proceed = false; // Ensure cleanup
final var cause = e.getCause();
throw (cause instanceof RuntimeException re)
? re
: new RuntimeException(cause == null ? e : cause);
} finally {
// Clean up
if (!proceed) {
Future<R> next;
while ((next = window.pollFirst()) != null) {
next.cancel(true);
}
}
}
if (interrupted)
Thread.currentThread().interrupt();
return proceed;
}
}
return Gatherer.ofSequential(
State::new,
Integrator.<State, T, R>ofGreedy(State::integrate),
(state, downstream) -> state.flush(Long.MAX_VALUE, downstream)
);
}
// Implementation details
/*
* This enum is used to provide the default functions for the
* factory methods
* and for the default methods for when implementing the Gatherer interface.
*
* This serves the following purposes:
* 1. removes the need for using `null` for signalling absence of specified
* value and thereby hiding user bugs
* 2. allows to check against these default values to avoid calling methods
* needlessly
* 3. allows for more efficient composition and evaluation
*/
@SuppressWarnings("rawtypes")
enum Value implements Supplier, BinaryOperator, BiConsumer {
DEFAULT;
final BinaryOperator<Void> statelessCombiner = new BinaryOperator<>() {
@Override public Void apply(Void left, Void right) { return null; }
};
// BiConsumer
@Override public void accept(Object state, Object downstream) {}
// BinaryOperator
@Override public Object apply(Object left, Object right) {
throw new UnsupportedOperationException("This combiner cannot be used!");
}
// Supplier
@Override public Object get() { return null; }
@ForceInline
@SuppressWarnings("unchecked")
<A> Supplier<A> initializer() { return (Supplier<A>)this; }
@ForceInline
@SuppressWarnings("unchecked")
<T> BinaryOperator<T> combiner() { return (BinaryOperator<T>) this; }
@ForceInline
@SuppressWarnings("unchecked")
<T, R> BiConsumer<T, Gatherer.Downstream<? super R>> finisher() {
return (BiConsumer<T, Downstream<? super R>>) this;
}
}
record GathererImpl<T, A, R>(
@Override Supplier<A> initializer,
@Override Integrator<A, T, R> integrator,
@Override BinaryOperator<A> combiner,
@Override BiConsumer<A, Downstream<? super R>> finisher) implements Gatherer<T, A, R> {
static <T, A, R> GathererImpl<T, A, R> of(
Supplier<A> initializer,
Integrator<A, T, R> integrator,
BinaryOperator<A> combiner,
BiConsumer<A, Downstream<? super R>> finisher) {
return new GathererImpl<>(
Objects.requireNonNull(initializer,"initializer"),
Objects.requireNonNull(integrator, "integrator"),
Objects.requireNonNull(combiner, "combiner"),
Objects.requireNonNull(finisher, "finisher")
);
}
}
static final class Composite<T, A, R, AA, RR> implements Gatherer<T, Object, RR> {
private final Gatherer<T, A, ? extends R> left;
private final Gatherer<? super R, AA, ? extends RR> right;
// FIXME change `impl` to a computed constant when available
private GathererImpl<T, Object, RR> impl;
static <T, A, R, AA, RR> Composite<T, A, R, AA, RR> of(
Gatherer<T, A, ? extends R> left,
Gatherer<? super R, AA, ? extends RR> right) {
return new Composite<>(left, right);
}
private Composite(Gatherer<T, A, ? extends R> left,
Gatherer<? super R, AA, ? extends RR> right) {
this.left = left;
this.right = right;
}
@SuppressWarnings("unchecked")
private GathererImpl<T, Object, RR> impl() {
// ATTENTION: this method currently relies on a "benign" data-race
// as it should deterministically produce the same result even if
// initialized concurrently on different threads.
var i = impl;
return i != null
? i
: (impl = (GathererImpl<T, Object, RR>)impl(left, right));
}
@Override public Supplier<Object> initializer() {
return impl().initializer();
}
@Override public Integrator<Object, T, RR> integrator() {
return impl().integrator();
}
@Override public BinaryOperator<Object> combiner() {
return impl().combiner();
}
@Override public BiConsumer<Object, Downstream<? super RR>> finisher() {
return impl().finisher();
}
@Override
public <RRR> Gatherer<T, ?, RRR> andThen(
Gatherer<? super RR, ?, ? extends RRR> that) {
if (that.getClass() == Composite.class) {
@SuppressWarnings("unchecked")
final var c =
(Composite<? super RR, ?, Object, ?, ? extends RRR>) that;
return left.andThen(right.andThen(c.left).andThen(c.right));
} else {
return left.andThen(right.andThen(that));
}
}
static final <T, A, R, AA, RR> GathererImpl<T, ?, RR> impl(
Gatherer<T, A, R> left, Gatherer<? super R, AA, RR> right) {
final var leftInitializer = left.initializer();
final var leftIntegrator = left.integrator();
final var leftCombiner = left.combiner();
final var leftFinisher = left.finisher();
final var rightInitializer = right.initializer();
final var rightIntegrator = right.integrator();
final var rightCombiner = right.combiner();
final var rightFinisher = right.finisher();
final var leftStateless = leftInitializer == Gatherer.defaultInitializer();
final var rightStateless = rightInitializer == Gatherer.defaultInitializer();
final var leftGreedy = leftIntegrator instanceof Integrator.Greedy;
final var rightGreedy = rightIntegrator instanceof Integrator.Greedy;
/*
* For pairs of stateless and greedy Gatherers, we can optimize
* evaluation as we do not need to track any state nor any
* short-circuit signals. This can provide significant
* performance improvements.
*/
if (leftStateless && rightStateless && leftGreedy && rightGreedy) {
return new GathererImpl<>(
Gatherer.defaultInitializer(),
Gatherer.Integrator.ofGreedy((unused, element, downstream) ->
leftIntegrator.integrate(
null,
element,
r -> rightIntegrator.integrate(null, r, downstream))
),
(leftCombiner == Gatherer.defaultCombiner()
|| rightCombiner == Gatherer.defaultCombiner())
? Gatherer.defaultCombiner()
: Value.DEFAULT.statelessCombiner
,
(leftFinisher == Gatherer.<A,R>defaultFinisher()
&& rightFinisher == Gatherer.<AA,RR>defaultFinisher())
? Gatherer.defaultFinisher()
: (unused, downstream) -> {
if (leftFinisher != Gatherer.<A,R>defaultFinisher())
leftFinisher.accept(
null,
r -> rightIntegrator.integrate(null, r, downstream));
if (rightFinisher != Gatherer.<AA,RR>defaultFinisher())
rightFinisher.accept(null, downstream);
}
);
} else {
class State {
final A leftState;
final AA rightState;
boolean leftProceed;
boolean rightProceed;
private State(A leftState, AA rightState,
boolean leftProceed, boolean rightProceed) {
this.leftState = leftState;
this.rightState = rightState;
this.leftProceed = leftProceed;
this.rightProceed = rightProceed;
}
State() {
this(leftStateless ? null : leftInitializer.get(),
rightStateless ? null : rightInitializer.get(),
true, true);
}
State joinLeft(State right) {
return new State(
leftStateless ? null : leftCombiner.apply(this.leftState, right.leftState),
rightStateless ? null : rightCombiner.apply(this.rightState, right.rightState),
this.leftProceed && this.rightProceed,
right.leftProceed && right.rightProceed);
}
boolean integrate(T t, Downstream<? super RR> c) {
/*
* rightProceed must be checked after integration of
* left since that can cause right to short-circuit
* We always want to conditionally write leftProceed
* here, which means that we only do so if we are
* known to be not-greedy.
*/
return (leftIntegrator.integrate(leftState, t, r -> rightIntegrate(r, c))
|| leftGreedy
|| (leftProceed = false))
&& (rightGreedy || rightProceed);
}
void finish(Downstream<? super RR> c) {
if (leftFinisher != Gatherer.<A, R>defaultFinisher())
leftFinisher.accept(leftState, r -> rightIntegrate(r, c));
if (rightFinisher != Gatherer.<AA, RR>defaultFinisher())
rightFinisher.accept(rightState, c);
}
/*
* Currently we use the following to ferry elements from
* the left Gatherer to the right Gatherer, but we create
* the Gatherer.Downstream as a lambda which means that
* the default implementation of `isKnownDone()` is used.
*
* If it is determined that we want to be able to support
* the full interface of Gatherer.Downstream then we have
* the following options:
* 1. Have State implement Downstream<? super R>
* and store the passed in Downstream<? super RR>
* downstream as an instance field in integrate()
* and read it in push(R r).
* 2. Allocate a new Gatherer.Downstream<? super R> for
* each invocation of integrate() which might prove
* costly.
*/
public boolean rightIntegrate(R r, Downstream<? super RR> downstream) {
// The following logic is highly performance sensitive
return (rightGreedy || rightProceed)
&& (rightIntegrator.integrate(rightState, r, downstream)
|| rightGreedy
|| (rightProceed = false));
}
}
return new GathererImpl<T, State, RR>(
State::new,
(leftGreedy && rightGreedy)
? Integrator.<State, T, RR>ofGreedy(State::integrate)
: Integrator.<State, T, RR>of(State::integrate),
(leftCombiner == Gatherer.defaultCombiner()
|| rightCombiner == Gatherer.defaultCombiner())
? Gatherer.defaultCombiner()
: State::joinLeft,
(leftFinisher == Gatherer.<A, R>defaultFinisher()
&& rightFinisher == Gatherer.<AA, RR>defaultFinisher())
? Gatherer.defaultFinisher()
: State::finish
);
}
}
}
}