-
Notifications
You must be signed in to change notification settings - Fork 136
Reactive 'plugin' for easier use
johnmcclean-aol edited this page Aug 29, 2015
·
1 revision
- LazyReactive : implement LazyReactive for useful functionality for managing LazyFutureStreams E.g. create a Stream optimised for IO operations
this.ioStream().react(()->load1(),load2())
.thenSync(String::toUpperCase)
.peekSync(str->val=str)
.block();
- EagerReactive : implement EagerReactive for useful functionality for managing EagerFutureStreams
this.cpuStream().of("hello")
.map(String::toUpperCase)
.peek(str->val=str)
.block();
- Pipes : register Adapters (Queue, Topics, Signals) and get Streams back and Reactive Streams support (see below)
Pipes.register("test", QueueFactories.<String>boundedNonBlockingQueue(100)
.build());
LazyFutureStream<String> stream = PipesToLazyStreams.cpuBoundStream("test");
stream.filter(it->it!=null).peek(System.out::println).run();
- PipesToLazyStreams : convert registered Pipes to LazyFutureStreams
LazyFutureStream<String> stream = PipesToLazyStreams.registerForCPU("test", QueueFactories.
<String>boundedNonBlockingQueue(100)
.build());
stream.filter(it->it!=null)
.async()
.peek(this::process)
.sync()
.forEach(System.out::println);
- PipesToEagerStreams : convert unregistered Pipes to EagerFutureStreams
EagerFutureStream<String> stream = PipesToEagerStreams.registerForCPU("test", QueueFactories.
<String>boundedNonBlockingQueue(100)
.build());
stream.filter(it->it!=null)
.async()
.peek(this::process)
.sync()
.forEach(System.out::println);
- Ability to publish and subscribe to Pipes (Queues, Topics and Signals) via the Pipes class
Have a JDK 8 Stream subscribe to a Queue using the Reactive Streams support (a simpler alternative in practice for JDK Streams is to simply call the stream() method on the Queue).
JDKReactiveStreamsSubscriber subscriber = new JDKReactiveStreamsSubscriber ();
Queue queue = new Queue();
Pipes.register("hello", queue);
Pipes.subscribeTo("hello",subscriber);
queue.offer("world");
queue.close();
assertThat(subscriber.getStream().findAny().get(),equalTo("world"));
JDKReactiveStreamsSubscriber subscriber = new JDKReactiveStreamsSubscriber ();
Queue queue = new Queue();
Pipes.register("hello", queue);
Pipes.publisher("hello").get().subscribe(subscriber);
queue.offer("world");
queue.close();
assertThat(subscriber.getStream().findAny().get(),equalTo("world"));
oops - my bad