-
Notifications
You must be signed in to change notification settings - Fork 136
Infinite Streams in SimpleReact
(Since :- SimpleReact v0.2 / deprecated SimpleReact doesn't support infinite Streaming, use LazyReact to build infinite LazyFutureStreams instead)
By default SimpleReact processes data flows eagerly. As soon as new SimpleReact().react(()->1,()->2) is executed, data is moving concurrently through the system. As of SimpleReact v0.2, it is possible to create a lazy data flow, that will only start when .run() is called or an eager method from the Streams API on the underlying Stream. (e.g. http://www.drdobbs.com/jvm/lambdas-and-streams-in-java-8-libraries/240166818)
To take advantage of SimpleReact’s support for infinite Streams, use :-
private volatile int count=0;
..
SimpleReact.lazy().reactInfinitely( ()-> count++).run();
This will construct a Lazy Stream [0,1,2,3,4,5,6 and so on].
You can also import Infinite Streams from the Stream API
Stream<String> infinite = Stream.generate(()->count++).map(it -> “*”+it);
SimpleReact.lazy()
.fromStreamWithoutFutures(infinite)
.then(it -> it +”!”)
.peek(it-> System.out.println(it))
.run();
This will write *0! *1! *2! .. etc ..
to the console
To limit the infinite Stream you can use limit
Stream<String> infinite = Stream.generate(()->count++).map(it -> “*”+it).limit(2);
SimpleReact.lazy()
.fromStreamWithoutFutures(infinite)
.then(it -> it +”!”)
.peek(it-> System.out.println(it))
.run(); //run blocks current thread
This will write *0! *1!
to the console.
You can use one of the Datastructures provided by SimpleReact to create an Infinite Stream of CompletableFutures and use it to seed a SimpleReact data flow. E.g.
Queue queue = new Queue(); //SimpleReact Queue
SimpleReact.lazy()
.fromStream(queue.streamOfCompletableFutures())
.then(data -> transformData(data))
.then(entity -> saveToDb(entity))
.run(new ForkJoinPool(1)); //run does not block current thread
As long as the Data structure is held open, SimpleReact will react to any data added to the Queue. E.g.
queue.add(“hello world”);
or
queue.fromStream(Stream.of(1,2,3,4,5,6));
This will allow you to join together 2 (or more!) potentially infinite Streams of data / processing.
oops - my bad