-
Couldn't load subscription status.
- Fork 45
Streams in cyclops overview
cyclops-streams provides a sequential reactive-streams implmentation ( ReactiveSeq) that forms the basis of the multi-threaded reactive-streams implementations in simple-react, as such it provides all the advanced streaming functionality you would expect for an FRP library - windowing / batching (by size, time, state), failure handing, zipping, stream manipulation (insertAt, deleteBetween), advanced skip and limit operations (time based, conditional), asynchronous streaming operations (Future based operations and hotstreams).
Features include
-
Scheduling (cron, fixed rate, fixed delay)
-
Failure handling (recover / retry)
-
windowing / batching (by time, size, state, predicate)
-
zipping
-
HotStreams
-
reactive-streams : subscriber / publisher/ forEachX, forEachWithErrors, forEachEvent and more
-
Asynchronous execution
-
Stream manipulation - insert/At, deleteAt
-
Frequency management (xPer, onePer, jitter, debounce)
-
Efficient reversal
-
StreamUtils - static methods for java.util.stream.Streams
-
Streamables - efficient / lazy replayable Streams as java.util.stream.Stream or ReactiveSeq
-
See Streaming Examples wiki page for more details & examples
- Fast sequential Streams that can run asyncrhonously
- Reactive Stream Support
- Efficient Stream reversal and right based operations (foldRight / scanRight etc)
- static Stream Utilities
- ReactiveSeq implementation
- Terminal operations that return a Future to be populated asynchronously
- HotStream support
Large number of Stream operators available on ReactiveSeq or as static methods for java.util.Stream (ReactiveSeq extends java.util.Stream)
ReactiveSeq streams can be created via creational Operators such as
- of
- fromStream
- fromIterable
- fromList
- fromIntStream
- fromLongStream
- fromDoubleStream
range, of(List), of(..values) all result in Sequences that can be efficiently reversed (and used in scanRight, foldRight etc)
ReactiveSeq.range(0,Integer.MAX_VALUE);
List<Intger> list;
ReactiveSeq.fromList(list);
ReactiveSeq.of(1,2,3)
.reverse()
.forEach(System.out::println);Recover and retry operators allow different strategies for error recovery.
Recover allows a default value to be provided when an exception occurs
ReactiveSeq.of(1,2,3,4)
.map(u->{throw new RuntimeException();})
.recover(e->"hello")
.firstValue()
//helloRecovery can be linked to specific exception types.
ReactiveSeq.of(1,2,3,4)
.map(i->i+2)
.map(u->{throw ExceptionSoftener.throwSoftenedException( new IOException());})
.recover(IOException.class,e->"hello")
.firstValue()
//helloWith retry, a function will be called with an increasing back-off up to 7 times.
ReactiveSeq.of( 1, 2, 3)
.retry(this::loadData)
.firstValue()FutureOperations allow a Stream to be executed Asynchronously with the result of a terminal operation captured in a Future.
Executor exec = Executors.newfixedThreadPool(1);
CompletableFuture<List<Integer>> list = ReactiveSeq.of(1,2,3,4,5)
.map(this:expensiveOperation)
.futureOperations(exec)
.collect(Collectors.toList());
// list populates Asynchronously
list.join()
.forEach(System.out::println);HotStreams are executed immediately and asynchronously (on the provided Executor - they will make use of a single thread only). They are also connectable (by potentially multiple Streams), the connected Stream receive those elements emitted after they connect.
HotStream<Integer> range = ReactiveSeq.range(0,Integer.MAX_VALUE)
.peek(System.out::println)
.hotStream(Executors.fixedThreadPool(1));
// will start printing out each value in range
range.connect()
.limit(100)
.futureOperations(ForkJoinPool.commonPool())
.forEach(System.out::println);
//will print out the first 100 values it recieves (after joining) on a separate thread In conjunction with simple-react v0.99.3 and above ReactiveSeq can be turned into a reactive-stream publisher or subscriber.
CyclopsSubscriber<Integer> sub = ReactiveSeq.subscriber();
ReactiveSeq.of(1,2,3).subscribe(sub);
sub.ReactiveSeq().toList();
//[1,2,3]- Advanced & powerful Streaming api
- Reactive Streams support
- Asynchronous single threaded Streaming
- Terminal operations that return a Future to be populated asynchronously
- Reversable Spliterators for efficient Stream reversal and right based operations
- Retry / onFail
- HotStream support
This primarily defines the interfaces to be used for cyclops Streaming, for an implementation see cyclops-stream.