-
Notifications
You must be signed in to change notification settings - Fork 51
cyclops streams : Asynchronous Terminal Operations
johnmcclean-aol edited this page Nov 21, 2016
·
5 revisions
Cyclops has merged with simple-react. Please update your bookmarks (stars :) ) to https://github.com/aol/cyclops-react
All new develpoment on cyclops occurs in cyclops-react. Older modules are still available in maven central.
These methods are available via ReactiveSeq or to plain JDK 8 Streams via com.aol.cyclops.streams.StreamUtils, for Javaslang Streams use com.aol.cyclops.javaslang.streams.StreamUtils.
Terminal operations can now all be called asynchronously e.g.
CompletableFuture<Integer> size = ReactiveSeq.of(1,2,3,4)
.futureOperations(exec)
.count();
Available operations
- public CompletableFuture<List<T>> toList()
Asynchronously perform a mutable reduction to a JDK List
CompletableFuture<List<Data>> myList = ReactiveSeq.of(1,2,3,4)
.map(this::loadFromDb)
.futureOperations(getExecutor())
.toList();
- public CompletableFuture<Set<T>> toSet()
Asynchronously perform a mutable reduction to a JDK Set
CompletableFuture<Set<Data>> myList = ReactiveSeq.of(1,2,3,4)
.map(this::loadFromDb)
.futureOperations(getExecutor())
.toSet();
- public <U extends Comparable<U>> CompletableFuture<Optional<T>> minBy(Function<T, U> function) Asynchronously capture the minimum value in this stream using the provided function
CompletableFuture<Optional<Integer>> min = ReactiveSeq.of(1, 2, 3, 4, 5, 6)
.futureOperations(exec)
.minBy(t -> Math.abs(t - 5));
//min CompletableFuture[Optional[5]] //5-5 =0
- public <U extends Comparable<U>> CompletableFuture<Optional<T>> maxBy(Function<T, U> function) Asynchronously capture the maximum value in this stream using the provided function
CompletableFuture<Optional<Integer>> max = ReactiveSeq.of(1, 2, 3, 4, 5, 6)
.futureOperations(exec)
.maxBy(t -> Math.abs(t - 5));
//min CompletableFuture[Optional[1]] //Math.abs(1-5) =4
- public <R, A> CompletableFuture<R> collect(Collector<? super T, A, R> collector) Asynchronously perform a Stream collection
CompletableFuture<List<Integer>> list = ReactiveSeq.of(1,2,3,4,5)
.futureOperations(exec)
.collect(Collectors.toList());
//CompletableFuture[1,2,3,4,5]
- public CompletableFuture<Optional<T>> reduce(BinaryOperator<T> accumulator)
CompletableFuture<Optional<Integer>> sum = ReactiveSeq.of(1,2,3,4,5)
.map(it -> it*100).futureOperations(getExecutor())
.reduce( (acc,next) -> acc+next)
- public <A> CompletableFuture<A[]> toArray(IntFunction<A[]> generator)
CompletableFuture<Integer[]> array = ReactiveSeq.of(1,5,3,4,2).futureOperations()
.toArray(it->new Integer[it]);
- public CompletableFuture<Object[]> toArray()
CompletableFuture<Integer[]> array = ReactiveSeq.of(1,5,3,4,2).futureOperations(getExecutor())
.toArray()
- public <K> CompletableFuture<Map<K, List<T>>> groupBy(Function<? super T, ? extends K> classifier)
Map<Integer, List<Integer>> map1 = ReactiveSeq.of(1, 2, 3, 4)
.futureOperations(getExecutor())
.groupBy(i -> i % 2)
.join()
- public <K, A, D> CompletableFuture<Map<K, D>> groupBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream)
Map<Integer, Long> map3 =
ReactiveSeq.of(tuple(1, 1), tuple(1, 2), tuple(1, 3), tuple(2, 1), tuple(2, 2))
.futureOperations(exec)
.groupBy(t -> t.v1, counting()).join();
- public <K, D, A, M extends Map<K, D>> CompletableFuture<M> groupBy(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream)
Map<Integer, Tuple2<Long, String>> map4 =
of(tuple(1, 1), tuple(1, 2), tuple(1, 3), tuple(2, 4), tuple(2, 5))
.futureOperations(exec)
.groupBy(t -> t.v1, collectors(counting(),
mapping(t -> ((Tuple2<Integer,Integer>)t).map2(Object::toString).v2,
joining(", ")))).join();
- public <U> CompletableFuture<U> foldLeft(U seed, BiFunction<U, ? super T, U> function)
CompletableFuture<String> concat = ReactiveSeq.of("a", "b", "c")
.futureOperations(exec)
.foldLeft("", String::concat).join()
- public <U> CompletableFuture<U> foldRight(U seed, BiFunction<? super T, U, U> function)
CompletableFuture<String> reveresed = ReactiveSeq.of("a", "b", "c")
.futureOperations(exec)
.foldRight("", String::concat);
//cba
- public CompletableFuture<Optional<T>> min(Comparator<? super T> comparator)
CompletableFuture<Optional<Integer>> min = ReactiveSeq.of(1,2,3,4,5)
.futureOperations(exec)
.min((t1,t2) -> t1-t2);
//1
- public CompletableFuture<Optional<T>> max(Comparator<? super T> comparator)
CompletableFuture<Optional<Integer>> min = ReactiveSeq.of(1,2,3,4,5)
.futureOperations(exec)
.max((t1,t2) -> t1-t2);
//5
public <C extends Collection<T>> CompletableFuture<C> toCollection(Supplier<C> collectionFactory)
CompletableFuture<Collection<Integer>> col = ReactiveSeq.of(1,5,3,4,2).futureOperations()
.toCollection(()->new ArrayList())
- public <R> CompletableFuture<R> collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner)
- public <U> CompletableFuture<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
CompletableFuture<Integer> sum = ReactiveSeq.of(1,2,3,4,5).map(it -> it*100)
.futureOperations(getExecutor()).reduce( 0,
(acc, next) -> acc+next,
Integer::sum);
//1550
- public CompletableFuture<T> reduce(T identity, BinaryOperator<T> accumulator)
CompletableFuture<Integer> sum = ReactiveSeq.of(1,2,3,4,5).map(it -> it*100)
.futureOperations(getExecutor())
.reduce( 50,(acc,next) -> acc+next);
//1550
- public CompletableFuture<Long> count()
CompletableFuture<Integer> sum = ReactiveSeq.of(1,2,3,4,5).map(it -> it*100)
.futureOperations(getExecutor())
.count();
//5
- public CompletableFuture<String> join(CharSequence sep)
CompletableFuture<String> concat = ReactiveSeq.of(1, 2, 3)
.futureOperations(exec)
.join(", ");
//"1,2,3"
- public CompletableFuture<String> join()
CompletableFuture<String> concat = ReactiveSeq.of(1, 2, 3)
.futureOperations(exec)
.join();
//"123"
- public CompletableFuture<String> join(CharSequence delimiter, CharSequence prefix, CharSequence suffix)
CompletableFuture<String> concat = ReactiveSeq.of(1, 2, 3)
.futureOperations(exec)
.join(", ","!","?");
//"!1,2,3?"
- public CompletableFuture<Optional<T>> findAny()
CompletableFuture<Optional<Integer>> any = ReactiveSeq.of(1,2,3,4,5)
.filter(it -> it <3)
.futureOperations(exec)
.findAny()
//Optional[1]
- public CompletableFuture<Optional<T>> findFirst()
CompletableFuture<Optional<Integer>> first = ReactiveSeq.of(1,2,3,4,5)
.filter(it -> it <3)
.futureOperations(exec)
.findFirst()
//Optional[1]
- public CompletableFuture<T> firstValue()
CompletableFuture<Integer> first = ReactiveSeq.of(1,2,3,4,5)
.filter(it -> it <3)
.futureOperations(exec)
.firstValue()
//1
- public CompletableFuture<Boolean> allMatch(Predicate<? super T> predicate)
CompletableFuture<Boolean> match = ReactiveSeq.of(1,2,3,4,5)
.futureOperations(exec)
.allMatch(it-> it>0 && it <6);
//true
- public CompletableFuture<Boolean> anyMatch(Predicate<? super T> predicate)
CompletableFuture<Boolean> match = ReactiveSeq.of(1,2,3,4,5)
.futureOperations(exec)
.anyMatch(it-> it.equals(3));
//true
- public CompletableFuture<Boolean> noneMatch(Predicate<? super T> predicate)
CompletableFuture<Boolean> match = ReactiveSeq.of(1,2,3,4,5)
.futureOperations(exec)
.noneMatch(it-> it==5000);
//true
- CompletableFuture<Integer> sumInt(ToIntFunction<T> fn)
CompletableFuture<Integer> sum = ReactiveSeq.of(1,2,3,4)
.futureOperations(exec)
.sumInt(i->i);
//10
- CompletableFuture<OptionalInt> maxInt(ToIntFunction<T> fn)
CompletableFuture<OptionalInt> max = ReactiveSeq.of(1,2,3,4)
.futureOperations(exec)
.maxInt(i->i);
//4
- CompletableFuture<OptionalInt> minInt(ToIntFunction<T> fn)
CompletableFuture<OptionalInt> min = ReactiveSeq.of(1,2,3,4)
.futureOperations(exec)
.minInt(i->i);
//1
- CompletableFuture<OptionalDouble> averageInt(ToIntFunction<T> fn)
CompletableFuture<OptionalDouble> avg = ReactiveSeq.of(1,2,3,4)
.futureOperations(exec)
.averageInt(i->i);
//2.5
- CompletableFuture<IntSummaryStatistics> summaryStatisticsInt(ToIntFunction<T> fn)
CompletableFuture<IntSummaryStatistics> avg = ReactiveSeq.of(1,2,3,4)
.futureOperations(exec)
.summaryStatisticsInt(i->i);
- CompletableFuture<Long> sumLong(ToLongFunction<T> fn)
CompletableFuture<Long> sum = ReactiveSeq.of(1l,2l,3l,4l)
.futureOperations(exec)
.sumLong(i->i);
//10l
- CompletableFuture<OptionalLong> maxLong(ToLongFunction<T> fn)
CompletableFuture<OptionalLong> max = ReactiveSeq.of(1l,2l,3l,4l)
.futureOperations(exec)
.maxLong(i->i);
//4l
- CompletableFuture<OptionalLong> minLong(ToLongFunction<T> fn)
CompletableFuture<OptionalLong> min = ReactiveSeq.of(1l,2l,3l,4l)
.futureOperations(exec)
.minLong(i->i);
//1l
- CompletableFuture<OptionalDouble> averageLong(ToLongFunction<T> fn)
CompletableFuture<OptionalDouble> avg = ReactiveSeq.of(1l,2l,3l,4l)
.futureOperations(exec)
.averageLong(i->i);
//2.5
- CompletableFuture<LongSummaryStatistics> summaryStatisticsLong(ToLongFunction<T> fn)
CompletableFuture<LongSummaryStatistics> avg = ReactiveSeq.of(1,2,3,4)
.futureOperations(exec)
.summaryStatisticsLong(i->i);
- CompletableFuture<Double> sumDouble(ToDoubleFunction<T> fn)
CompletableFuture<Double> sum = ReactiveSeq.of(1d,2d,3d,4d)
.futureOperations(exec)
.sumDouble(i->i);
//10d
- CompletableFuture<OptionalDouble> maxDouble(ToDoubleFunction<T> fn)
CompletableFuture<OptionalDouble> max = ReactiveSeq.of(1d,2d,3d,4d)
.futureOperations(exec)
.maxInt(i->i);
//4d
- CompletableFuture<OptionalDouble> minDouble(ToDoubleFunction<T> fn)
CompletableFuture<OptionalDouble> min = ReactiveSeq.of(1d,2d,3d,4d)
.futureOperations(exec)
.minDouble(i->i);
//1d
- CompletableFuture<OptionalDouble> averageDouble(ToDoubleFunction<T> fn)
CompletableFuture<OptionalDouble> avg = ReactiveSeq.of(1d,2d,3d,4d)
.futureOperations(exec)
.averageDouble(i->i);
//2.5
- CompletableFuture<DoubleSummaryStatistics> summaryStatisticsDouble(ToDoubleFunction<T> fn)
CompletableFuture<DoubleSummaryStatistics> avg = ReactiveSeq.of(1,2,3,4)
.futureOperations(exec)
.summaryStatisticsDouble(i->i);