-
Notifications
You must be signed in to change notification settings - Fork 135
Reactive Tasks : reactive streams based operators
The forEachWithErrors operator allows users to iterate over a Stream providing a consumer for the elements for the Stream a lá Stream.forEach, and a consumer for the errors produced while processing the Stream.
List<Integer> list = new ArrayList<>();
Throwable error = null;
public String load(int i){
if(i==2)
throw new RuntimeException();
}
LazyFutureStream.of(1,2,3,4)
.map(this::load)
.forEachWithError( i->list.add(i), e->error=e);
//list =List[1,3,4]
//error = RuntimeException
```java
### forEachEvent
The forEachEvent operator is similar to forEachWithErrors but also accepts a Runnable that is run when the Stream has been completely consumed.
#### forEachEvent with a LazyFutureStream
```java
Closeable resource;
List<Integer> list = new ArrayList<>();
Throwable error = null;
public String load(int i){
if(i==2)
throw new RuntimeException();
}
LazyFutureStream.of(1,2,3,4)
.map(this::load)
.forEachEvent( i->list.add(i),
logger::error,
()->resource.close());
//list =List[1,3,4]
//runtime exception logged
//resource is closed
forEachX allows users to consume only a specified amount of data from the Stream, returning a reactive-streams Subscription object that in turn allows more data to be consumed as needed.
List<Integer> list = new ArrayList<>();
Subscription s = StreamUtils.forEachX(Stream.of(1,2,3), 2, i->list.add(i));
assertThat(list,hasItems(1,2));
assertThat(list.size(),equalTo(2));
s.request(1); //request an additional iterm from the Stream be processed.
assertThat(list,hasItems(1,2,3));
assertThat(list.size(),equalTo(3));
forEachXWithErrors allows users to consume only a specified amount of data from the Stream, returning a reactive-streams Subscription object that in turn allows more data to be consumed as needed. The forEachXWithErrors operator allows users to iterate over a Stream providing a consumer for the elements for the Stream a lá Stream.forEach, and a consumer for the errors produced while processing the Stream.
List<Integer> list = new ArrayList<>();
Throwable error = null;
public String load(int i){
if(i==2)
throw new RuntimeException();
}
Subscription s = LazyFutureStream.of(1,2,3,4)
.map(this::load)
.forEachXWithError( 2, i->list.add(i), e->error=e);
//list =List[1]
//error = RuntimeException
s.request(1);
//list =List[1,3]
s.request(1);
//list =List[1,3,4]
forEachXEvents allows users to consume only a specified amount of data from the Stream, returning a reactive-streams Subscription object that in turn allows more data to be consumed as needed. The forEachXEvents operator is similar to forEachXWithErrors but also accepts a Runnable that is run when the Stream has been completely consumed.
List<Integer> list = new ArrayList<>();
Throwable error = null;
Closeable resource;
public String load(int i){
if(i==2)
throw new RuntimeException();
}
Subscription s = LazyFutureStream.of(1,2,3,4)
.map(this::load)
.forEachXEvents( 2, i->list.add(i), logger::error,
()->resource.close());;
//list =List[1]
//error = RuntimeException
// resource open
s.request(1);
//list =List[1,3]
s.request(1);
//list =List[1,3,4]
s.request(1); //no new elements end of Stream
//list =List[1,3,4]
// resource closed
The reactive-streams based terminal operations can also be launched asynchronously, first by using the futureOperations operator to provide an Executor that will process the Stream.
The futureOperations operator opens up a world of asynchronously executed terminal operations. A large range of terminal operations are provided and for each one a CompletbableFuture is returned.
Executor exec = Executors.newFixedThreadPool(1);
FutureOperations<Integer> terminalOps = LazyFutureStream.of(1,2,3).futureOperations(exec);
//execute the collection & Stream evaluation on the provided executor
CompletableFuture<List<Integer>> futureList = terminalOps.collect(Collectors.toList());
List<Integer> result = list.join();
Each of the async Future Operations for reactive-streams (forEachX, forEachEvent etc), return a ReactiveTask object. This allows users to check the status of Stream processing, to cancel it, to request more elements to be processed from the Stream either synchronously or asynchronously.
List<Integer> list = new ArrayList<>();
ReactiveTask s = LazyFutureStream.of(1,2,3)
.futureOperations(exec)
.forEachX( 2, i->list.add(i));
//wait until first 2 elements are processed
s.block();
//list = List[1,2]
//trigger the remainder of the Stream processing asynchronously
ReactiveTask nextElements = s.requestAllAsync();
//if we wait until it completes
//nextElements.block();
//list = List[1,2,3]
oops - my bad