Skip to content

Infinite Streams in SimpleReact

johnmcclean-aol edited this page Nov 21, 2016 · 2 revisions

Infinite Streams in SimpleReact

(Since :- SimpleReact v0.2 / deprecated SimpleReact doesn't support infinite Streaming, use LazyReact to build infinite LazyFutureStreams instead)

By default SimpleReact processes data flows eagerly. As soon as new SimpleReact().react(()->1,()->2) is executed, data is moving concurrently through the system. As of SimpleReact v0.2, it is possible to create a lazy data flow, that will only start when .run() is called or an eager method from the Streams API on the underlying Stream. (e.g. http://www.drdobbs.com/jvm/lambdas-and-streams-in-java-8-libraries/240166818)

To take advantage of SimpleReact’s support for infinite Streams, use :-

private volatile int count=0;
..
SimpleReact.lazy().reactInfinitely( ()-> count++).run();

This will construct a Lazy Stream [0,1,2,3,4,5,6 and so on].

You can also import Infinite Streams from the Stream API

Stream<String> infinite = Stream.generate(()->count++).map(it -> “*”+it);

SimpleReact.lazy()
                    .fromStreamWithoutFutures(infinite)
                    .then(it -> it +”!”)
                    .peek(it-> System.out.println(it))
                    .run();

This will write *0! *1! *2! .. etc ..

to the console

To limit the infinite Stream you can use limit

Stream<String> infinite = Stream.generate(()->count++).map(it -> “*”+it).limit(2);

SimpleReact.lazy()
                    .fromStreamWithoutFutures(infinite)
                    .then(it -> it +”!”)
                    .peek(it-> System.out.println(it))
                    .run(); //run blocks current thread

This will write *0! *1!

to the console.

Why is this useful?

You can use one of the Datastructures provided by SimpleReact to create an Infinite Stream of CompletableFutures and use it to seed a SimpleReact data flow. E.g.

Queue queue = new Queue();  //SimpleReact Queue
SimpleReact.lazy()
                    .fromStream(queue.streamOfCompletableFutures())
                    .then(data -> transformData(data))
                    .then(entity -> saveToDb(entity))
                    .run(new ForkJoinPool(1));  //run does not block current thread

As long as the Data structure is held open, SimpleReact will react to any data added to the Queue. E.g.

 queue.add(“hello world”);

or

 queue.fromStream(Stream.of(1,2,3,4,5,6));

This will allow you to join together 2 (or more!) potentially infinite Streams of data / processing.

Clone this wiki locally