-
Notifications
You must be signed in to change notification settings - Fork 136
Example : Reacting to Asynchronous Events with a Stream of CompletableFutures
You can use JDK8's exceptionally powerful CompletableFuture class to act as an Observer for asynchronously received data.
e.g.
CompletableFuture<Integer> future1 = new CompletableFuture<>();
...
future1.complete(100);
This means we can externally construct a Stream of CompletableFutures and pass that to SimpleReact e.g.
Queue<CompletableFuture<Integer>> queue = buildQueueOfAsyncEvents();
Stage<String> convertedToStrings = new SimpleReact()
.fromStream(queue.stream())
.<String>then(it -> it + "*");
With helper method buildQueueOfAsyncEvents
private Queue<CompletableFuture<Integer>> buildQueueOfAsyncEvents() {
CompletableFuture<Integer> future1 = new CompletableFuture<>();
CompletableFuture<Integer> future2 = new CompletableFuture<>();
CompletableFuture<Integer> future3 = new CompletableFuture<>();
Queue<CompletableFuture<Integer>> queue = new ConcurrentLinkedQueue(Arrays.asList(future1,future2,future3));
return queue;
}
N.B. Because we aren't providing the CompletableFutures with data - calling block here will result in the current thread pausing indefinitely.
Data will flow from the initial CompleteableFutures to the String conversion stage when the complete method on each CompletableFuture object is called.
Example :
CompletableFuture<Integer> future1 = new CompletableFuture<>();
...
future1.complete(100);
...
new SimpleReact().fromStream(Arrays.asList(future1).stream())
.<String>then(it -> it + "*"); //it=100 will result in 100*
e.g. Given the following set up Queue<CompletableFuture> queue = buildQueueOfAsyncEvents();
Stage<String> convertedToStrings = new SimpleReact()
.fromStream(queue.stream())
.<String>then(it -> it + "*");
convertedToStrings.stream().forEach(f -> assertFalse(f.isDone()));
We can generate another reactive flow to complete each future
new SimpleReact(new ForkJoinPool(3)).react( ()-> 100, ()->200, ()->400)
.then( it-> sleep(it))
.then(it -> queue.poll().complete(it));
With helper method sleep
private Integer sleep(Integer it) {
try {
Thread.currentThread().sleep(it);
} catch (InterruptedException e) {
}
return it;
}
With our initial Futures being populated Asynchronously the reactive dataflow will complete and thus calling block is a safe operation.
Queue<CompletableFuture<Integer>> queue = buildQueueOfAsyncEvents();
Stage<String> convertedToStrings = new SimpleReact()
.fromStream(queue.stream())
.<String>then(it -> it + "*");
convertedToStrings.stream().forEach(f -> assertFalse(f.isDone()));
new SimpleReact(new ForkJoinPool(3)).react( ()-> 100, ()->200, ()->400)
.then( it-> sleep(it))
.then(it -> queue.poll().complete(it));
List<String> result = convertedToStrings.block();
assertThat(result.size(),is(3));
assertThat(result,hasItem("400*"));
oops - my bad