diff --git a/src/main/java/com/basho/riak/client/api/RiakClient.java b/src/main/java/com/basho/riak/client/api/RiakClient.java index 84fe79e11..2146beef8 100644 --- a/src/main/java/com/basho/riak/client/api/RiakClient.java +++ b/src/main/java/com/basho/riak/client/api/RiakClient.java @@ -155,6 +155,7 @@ * * @author Dave Rusek * @author Brian Roach + * @author Alex Moore * @author Sergey Galkin * @since 2.0 */ @@ -433,6 +434,7 @@ public RiakFuture executeAsync(RiakCommand command) * the result iterator can block the consuming thread from seeing the done() * status until the timeout is reached. * @return a RiakFuture for the operation + * @since 2.1.0 * @see RiakFuture */ public RiakFuture executeAsyncStreaming(StreamableRiakCommand command, int timeoutMS) diff --git a/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java b/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java index ba6bf058f..b24c6b2f4 100644 --- a/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java +++ b/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java @@ -31,8 +31,7 @@ * @param The response type returned by "streaming mode" {@link executeAsyncStreaming} * @param The response type returned by the "batch mode" @{link executeAsync} * @param The query info type - * @author Dave Rusek - * @author Brian Roach + * @author Alex Moore * @author Sergey Galkin * @since 2.0 */ @@ -65,11 +64,27 @@ protected StreamableResponse() { } + /** + * Whether the results are to be streamed back. + * If true, results will appear in this class's iterator. + * If false, they will appear in the original result collection. + * @return true if the results are to be streamed. + */ public boolean isStreamable() { return chunkedResponseIterator != null; } + /** + * Get an iterator over the result data. + * + * If using the streaming API, this method will block + * and wait for more data if none is immediately available. + * It is also advisable to check {@link Thread#isInterrupted()} + * in environments where thread interrupts must be obeyed. + * + * @return an iterator over the result data. + */ @Override public Iterator iterator() { if (isStreamable()) { diff --git a/src/main/java/com/basho/riak/client/api/commands/ChunkedResponseIterator.java b/src/main/java/com/basho/riak/client/api/commands/ChunkedResponseIterator.java index 182bf7b3f..59f20a4fc 100644 --- a/src/main/java/com/basho/riak/client/api/commands/ChunkedResponseIterator.java +++ b/src/main/java/com/basho/riak/client/api/commands/ChunkedResponseIterator.java @@ -25,6 +25,24 @@ import java.util.concurrent.TransferQueue; import java.util.function.Function; +/** + * Transforms a stream of response chunks to a Iterable of response items. + * + * When iterating over this class's {@link Iterator} this class will lazily walk + * through the response chunks's iterators and convert the items. + * It will also wait for more response chunks if none are available. + * + * Since this class polls for new "streaming" data, it is advisable + * to check {@link Thread#isInterrupted()} while using this class's + * {@link Iterator} in environments where thread interrupts must be obeyed. + * + * @param The final converted type that this class exposes as part of its iterator. + * @param The type of the response chunks, contains an Iterable<{@link CoreT}> + * @param The raw response type, will get converted to {@link FinalT}. + * @author Alex Moore + * @author Sergey Galkin + * @since 2.1.0 + */ public class ChunkedResponseIterator, CoreT> implements Iterator { private final int timeout; @@ -62,6 +80,19 @@ public ChunkedResponseIterator(StreamingRiakFuture coreFuture, hasNext(); } + /** + * Returns {@code true} if the iteration has more elements. + * (In other words, returns {@code true} if {@link #next} would + * return an element rather than throwing an exception.) + * + * This method will block and wait for more data if none is immediately available. + * + * Riak Java Client Note: Since this class polls for + * new "streaming" data, it is advisable to check {@link Thread#isInterrupted()} + * in environments where thread interrupts must be obeyed. + * + * @return {@code true} if the iteration has more elements + */ @Override public boolean hasNext() { @@ -109,16 +140,37 @@ private boolean possibleChunksRemaining() return !coreFuture.isDone() || !chunkQueue.isEmpty(); } + /** + * Returns whether this response contains a continuation. + * Only run this once the operation is complete, otherwise it will return true as it's + * @return Whether this response has a continuation. + */ public boolean hasContinuation() { return continuation != null || possibleChunksRemaining(); } + /** + * Returns the current value of the continuation. + * Only run this once the operation is complete, or else you will get a null value. + * @return The continuation value (if any). + */ public BinaryValue getContinuation() { return continuation; } + /** + * Returns the next element in the iteration. + * This method will block and wait for more data if none is immediately available. + * + * Riak Java Client Note: Since this class polls for + * new "streaming" data, it is advisable to check {@link Thread#isInterrupted()} + * in environments where thread interrupts must be obeyed. + * + * @return the next element in the iteration + * @throws NoSuchElementException if the iteration has no more elements + */ @Override public FinalT next() { diff --git a/src/main/java/com/basho/riak/client/api/commands/mapreduce/MapReduce.java b/src/main/java/com/basho/riak/client/api/commands/mapreduce/MapReduce.java index a61708d85..83868872e 100644 --- a/src/main/java/com/basho/riak/client/api/commands/mapreduce/MapReduce.java +++ b/src/main/java/com/basho/riak/client/api/commands/mapreduce/MapReduce.java @@ -426,6 +426,19 @@ private class MapReduceResponseIterator implements Iterator this.pollTimeout = pollTimeout; } + /** + * Returns {@code true} if the iteration has more elements. + * (In other words, returns {@code true} if {@link #next} would + * return an element rather than throwing an exception.) + * + * This method will block and wait for more data if none is immediately available. + * + * Riak Java Client Note: Since this class polls for + * new "streaming" data, it is advisable to check {@link Thread#isInterrupted()} + * in environments where thread interrupts must be obeyed. + * + * @return {@code true} if the iteration has more elements + */ @Override public boolean hasNext() { @@ -478,6 +491,17 @@ private boolean peekWaitForNextQueueEntry() throws InterruptedException return !resultsQueue.isEmpty(); } + /** + * Returns the next element in the iteration. + * This method will block and wait for more data if none is immediately available. + * + * Riak Java Client Note: Since this class polls for + * new "streaming" data, it is advisable to check {@link Thread#isInterrupted()} + * in environments where thread interrupts must be obeyed. + * + * @return the next element in the iteration + * @throws NoSuchElementException if the iteration has no more elements + */ @Override public Response next() {