Skip to content

Streams in cyclops overview

johnmcclean-aol edited this page Nov 23, 2016 · 5 revisions

cyclops-streams

USER GUIDE

cyclops-streams provides a sequential reactive-streams implmentation ( ReactiveSeq) that forms the basis of the multi-threaded reactive-streams implementations in simple-react, as such it provides all the advanced streaming functionality you would expect for an FRP library - windowing / batching (by size, time, state), failure handing, zipping, stream manipulation (insertAt, deleteBetween), advanced skip and limit operations (time based, conditional), asynchronous streaming operations (Future based operations and hotstreams).

Features include

  • Scheduling (cron, fixed rate, fixed delay)

  • Failure handling (recover / retry)

  • windowing / batching (by time, size, state, predicate)

  • zipping

  • HotStreams

  • reactive-streams : subscriber / publisher/ forEachX, forEachWithErrors, forEachEvent and more

  • Asynchronous execution

  • Stream manipulation - insert/At, deleteAt

  • Frequency management (xPer, onePer, jitter, debounce)

  • Efficient reversal

  • StreamUtils - static methods for java.util.stream.Streams

  • Streamables - efficient / lazy replayable Streams as java.util.stream.Stream or ReactiveSeq

  • See Streaming Examples wiki page for more details & examples

  • Asynchronous operations

Overview

  • Fast sequential Streams that can run asyncrhonously
  • Reactive Stream Support
  • Efficient Stream reversal and right based operations (foldRight / scanRight etc)
  • static Stream Utilities
  • ReactiveSeq implementation
  • Terminal operations that return a Future to be populated asynchronously
  • HotStream support

Operators

Large number of Stream operators available on ReactiveSeq or as static methods for java.util.Stream (ReactiveSeq extends java.util.Stream)

Examples

Creating Streams

ReactiveSeq streams can be created via creational Operators such as

  • of
  • fromStream
  • fromIterable
  • fromList
  • fromIntStream
  • fromLongStream
  • fromDoubleStream

With efficient reversability

range, of(List), of(..values) all result in Sequences that can be efficiently reversed (and used in scanRight, foldRight etc)

ReactiveSeq.range(0,Integer.MAX_VALUE);

List<Intger> list;
ReactiveSeq.fromList(list);

ReactiveSeq.of(1,2,3)
        .reverse()
        .forEach(System.out::println);

recover / Retry

Recover and retry operators allow different strategies for error recovery.

Recover allows a default value to be provided when an exception occurs

ReactiveSeq.of(1,2,3,4)
					.map(u->{throw new RuntimeException();})
					.recover(e->"hello")
					.firstValue()
//hello

Recovery can be linked to specific exception types.

ReactiveSeq.of(1,2,3,4)
					.map(i->i+2)
					.map(u->{throw ExceptionSoftener.throwSoftenedException( new IOException());})
					.recover(IOException.class,e->"hello")
					.firstValue()
//hello

With retry, a function will be called with an increasing back-off up to 7 times.

ReactiveSeq.of( 1,  2, 3)
				.retry(this::loadData)
				.firstValue()

FutureOperations

FutureOperations allow a Stream to be executed Asynchronously with the result of a terminal operation captured in a Future.

Executor exec = Executors.newfixedThreadPool(1);
CompletableFuture<List<Integer>> list = ReactiveSeq.of(1,2,3,4,5)
                                                 .map(this:expensiveOperation)
         										 .futureOperations(exec)
        										 .collect(Collectors.toList());
        										 
 // list populates Asynchronously
 
 list.join()
     .forEach(System.out::println);

HotStream

HotStreams are executed immediately and asynchronously (on the provided Executor - they will make use of a single thread only). They are also connectable (by potentially multiple Streams), the connected Stream receive those elements emitted after they connect.

HotStream<Integer> range = ReactiveSeq.range(0,Integer.MAX_VALUE)
									.peek(System.out::println)
									.hotStream(Executors.fixedThreadPool(1));

// will start printing out each value in range									
									
range.connect()
	.limit(100)
	.futureOperations(ForkJoinPool.commonPool())
	.forEach(System.out::println);
	
//will print out the first 100 values it recieves (after joining) on a separate thread	

Reactive Streams

In conjunction with simple-react v0.99.3 and above ReactiveSeq can be turned into a reactive-stream publisher or subscriber.

CyclopsSubscriber<Integer> sub = ReactiveSeq.subscriber();
		ReactiveSeq.of(1,2,3).subscribe(sub);
		sub.ReactiveSeq().toList();
		
//[1,2,3]

Overview

  • Advanced & powerful Streaming api
  • Reactive Streams support
  • Asynchronous single threaded Streaming
  • Terminal operations that return a Future to be populated asynchronously
  • Reversable Spliterators for efficient Stream reversal and right based operations
  • Retry / onFail
  • HotStream support

This primarily defines the interfaces to be used for cyclops Streaming, for an implementation see cyclops-stream.

Clone this wiki locally