diff --git a/src/main/java/com/basho/riak/client/api/AsIsRiakCommand.java b/src/main/java/com/basho/riak/client/api/AsIsRiakCommand.java
new file mode 100644
index 000000000..b21dd82c5
--- /dev/null
+++ b/src/main/java/com/basho/riak/client/api/AsIsRiakCommand.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2013-2016 Basho Technologies Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.basho.riak.client.api;
+
+import com.basho.riak.client.core.FutureOperation;
+import com.basho.riak.client.core.PBStreamingFutureOperation;
+import com.basho.riak.client.core.RiakCluster;
+import com.basho.riak.client.core.RiakFuture;
+
+/**
+ * @author Sergey Galkin
+ * @since 2.1.0
+ */
+public abstract class AsIsRiakCommand extends RiakCommand
+{
+ protected abstract FutureOperation buildCoreOperation();
+
+ protected RiakFuture executeAsync(RiakCluster cluster)
+ {
+ final FutureOperation coreOperation = buildCoreOperation();
+
+ return cluster.execute(coreOperation);
+ }
+}
diff --git a/src/main/java/com/basho/riak/client/api/GenericRiakCommand.java b/src/main/java/com/basho/riak/client/api/GenericRiakCommand.java
new file mode 100644
index 000000000..18cc53cbd
--- /dev/null
+++ b/src/main/java/com/basho/riak/client/api/GenericRiakCommand.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2013-2016 Basho Technologies Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.basho.riak.client.api;
+
+import com.basho.riak.client.api.commands.CoreFutureAdapter;
+import com.basho.riak.client.core.FutureOperation;
+import com.basho.riak.client.core.RiakCluster;
+import com.basho.riak.client.core.RiakFuture;
+
+/**
+ * @author Sergey Galkin
+ * @since 2.1.0
+ */
+public abstract class GenericRiakCommand extends RiakCommand
+{
+ public static abstract class GenericRiakCommandWithSameInfo extends GenericRiakCommand
+ {
+ @Override
+ protected I convertInfo(I coreInfo) {
+ return coreInfo;
+ }
+ }
+
+ protected abstract FutureOperation buildCoreOperation();
+
+ protected RiakFuture executeAsync(RiakCluster cluster)
+ {
+ final FutureOperation coreOperation = buildCoreOperation();
+ assert coreOperation != null;
+
+ final RiakFuture coreFuture = cluster.execute(coreOperation);
+
+ assert coreFuture != null;
+
+ final CoreFutureAdapter future =
+ new CoreFutureAdapter(coreFuture)
+ {
+ @Override
+ protected R convertResponse(CoreR coreResponse)
+ {
+ return GenericRiakCommand.this.convertResponse(coreOperation, coreResponse);
+ }
+
+ @Override
+ protected I convertQueryInfo(CoreI coreQueryInfo)
+ {
+ return GenericRiakCommand.this.convertInfo(coreQueryInfo);
+ }
+ };
+ coreFuture.addListener(future);
+ return future;
+ }
+
+ protected abstract R convertResponse(FutureOperation request, CoreR coreResponse);
+
+ protected abstract I convertInfo(CoreI coreInfo);
+}
diff --git a/src/main/java/com/basho/riak/client/api/RiakClient.java b/src/main/java/com/basho/riak/client/api/RiakClient.java
index 114c31b38..2146beef8 100644
--- a/src/main/java/com/basho/riak/client/api/RiakClient.java
+++ b/src/main/java/com/basho/riak/client/api/RiakClient.java
@@ -155,6 +155,7 @@
*
* @author Dave Rusek
* @author Brian Roach
+ * @author Alex Moore
* @author Sergey Galkin
* @since 2.0
*/
@@ -277,8 +278,9 @@ public static RiakClient newClient(InetSocketAddress... addresses) throws Unknow
* @return a new RiakClient instance.
* @throws java.net.UnknownHostException if a supplied hostname cannot be resolved.
* @since 2.0.3
- * @see com.basho.riak.client.core.RiakCluster.Builder#Builder(RiakNode.Builder, List)
+ * @see com.basho.riak.client.core.RiakCluster.Builder#RiakCluster.Builder(RiakNode.Builder, List)
*/
+ // NB: IntelliJ will see the above @see statement as invalid, but it's correct: https://bugs.openjdk.java.net/browse/JDK-8031625
public static RiakClient newClient(RiakNode.Builder nodeBuilder, List addresses) throws UnknownHostException
{
final RiakCluster cluster = new RiakCluster.Builder(nodeBuilder, addresses).build();
@@ -402,6 +404,44 @@ public RiakFuture executeAsync(RiakCommand command)
return command.executeAsync(cluster);
}
+ /**
+ * Execute a StreamableRiakCommand asynchronously, and stream the results back before
+ * the command {@link RiakFuture#isDone() is done}.
+ *
+ * Calling this method causes the client to execute the provided
+ * StreamableRiakCommand asynchronously.
+ * It will immediately return a RiakFuture that contains an
+ * immediately available result (via {@link RiakFuture#get()}) that
+ * data will be streamed to.
+ * The RiakFuture will also keep track of the overall operation's progress
+ * with the {@link RiakFuture#isDone}, etc methods.
+ *
+ *
+ * Because the consumer thread will poll for new results, it is advisable to check the
+ * consumer thread's interrupted status via
+ * {@link Thread#isInterrupted() Thread.currentThread().isInterrupted() }, as the result
+ * iterator will not propagate an InterruptedException, but it will set the Thread's
+ * interrupted flag.
+ *
+ * @param StreamableRiakCommand's immediate return type, available before the command/operation is complete.
+ * @param The RiakCommand's query info type.
+ * @param command The RiakCommand to execute.
+ * @param timeoutMS The polling timeout in milliseconds for each result chunk.
+ * If the timeout is reached it will try again, instead of blocking indefinitely.
+ * If the value is too small (less than the average chunk arrival time), the
+ * result iterator will essentially busy wait.
+ * If the timeout is too large (much greater than the average chunk arrival time),
+ * the result iterator can block the consuming thread from seeing the done()
+ * status until the timeout is reached.
+ * @return a RiakFuture for the operation
+ * @since 2.1.0
+ * @see RiakFuture
+ */
+ public RiakFuture executeAsyncStreaming(StreamableRiakCommand command, int timeoutMS)
+ {
+ return command.executeAsyncStreaming(cluster, timeoutMS);
+ }
+
/**
* Shut down the client and the underlying RiakCluster.
*
diff --git a/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java b/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java
new file mode 100644
index 000000000..ac983b63b
--- /dev/null
+++ b/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2016 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.basho.riak.client.api;
+
+import com.basho.riak.client.api.commands.ChunkedResponseIterator;
+import com.basho.riak.client.api.commands.ImmediateCoreFutureAdapter;
+import com.basho.riak.client.core.*;
+
+import java.util.Iterator;
+
+/*
+ * The base class for all Streamable Riak Commands.
+ * Allows the user to either use {@link RiakCommand#executeAsync} and return a "batch-mode" result
+ * that is only available after the command is complete, or
+ * use {@link StreamableRiakCommand#executeAsyncStreaming} and return a "immediate" or "stream-mode" result
+ * that data will flow into.
+ * @param The response type returned by "streaming mode" {@link executeAsyncStreaming}
+ * @param The response type returned by the "batch mode" @{link executeAsync}
+ * @param The query info type
+ * @author Alex Moore
+ * @author Sergey Galkin
+ * @since 2.0
+ */
+public abstract class StreamableRiakCommand extends GenericRiakCommand
+{
+ public static abstract class StreamableRiakCommandWithSameInfo extends StreamableRiakCommand
+ {
+ @Override
+ protected I convertInfo(I coreInfo) {
+ return coreInfo;
+ }
+ }
+
+ public static abstract class StreamableResponse implements Iterable
+ {
+ protected ChunkedResponseIterator chunkedResponseIterator;
+
+ /**
+ * Constructor for streamable response
+ * @param chunkedResponseIterator
+ */
+ protected StreamableResponse(ChunkedResponseIterator chunkedResponseIterator) {
+ this.chunkedResponseIterator = chunkedResponseIterator;
+ }
+
+ /**
+ * Constructor for not streamable response.
+ */
+ protected StreamableResponse()
+ {
+ }
+
+ /**
+ * Whether the results are to be streamed back.
+ * If true, results will appear in this class's iterator.
+ * If false, they will appear in the original result collection.
+ * @return true if the results are to be streamed.
+ */
+ public boolean isStreaming()
+ {
+ return chunkedResponseIterator != null;
+ }
+
+ /**
+ * Get an iterator over the result data.
+ *
+ * If using the streaming API, this method will block
+ * and wait for more data if none is immediately available.
+ * It is also advisable to check {@link Thread#isInterrupted()}
+ * in environments where thread interrupts must be obeyed.
+ *
+ * @return an iterator over the result data.
+ */
+ @Override
+ public Iterator iterator() {
+ if (isStreaming()) {
+ assert chunkedResponseIterator != null;
+ return chunkedResponseIterator;
+ }
+
+ throw new UnsupportedOperationException("Iterating is only supported for streamable response");
+ }
+ }
+
+ protected abstract R createResponse(int timeout, StreamingRiakFuture coreFuture);
+
+ protected abstract PBStreamingFutureOperation buildCoreOperation(boolean streamResults);
+
+ @Override
+ protected final FutureOperation buildCoreOperation() {
+ return buildCoreOperation(false);
+ }
+
+ protected final RiakFuture executeAsyncStreaming(RiakCluster cluster, int timeout)
+ {
+ final PBStreamingFutureOperation coreOperation = buildCoreOperation(true);
+ final StreamingRiakFuture coreFuture = cluster.execute(coreOperation);
+
+ final R r = createResponse(timeout, coreFuture);
+
+ final ImmediateCoreFutureAdapter future = new ImmediateCoreFutureAdapter(coreFuture, r)
+ {
+ @Override
+ protected R convertResponse(CoreR response)
+ {
+ return StreamableRiakCommand.this.convertResponse(coreOperation, response);
+ }
+
+ @Override
+ protected I convertQueryInfo(CoreI coreQueryInfo)
+ {
+ return StreamableRiakCommand.this.convertInfo(coreQueryInfo);
+ }
+ };
+
+ coreFuture.addListener(future);
+ return future;
+ }
+}
diff --git a/src/main/java/com/basho/riak/client/api/commands/ChunkedResponseIterator.java b/src/main/java/com/basho/riak/client/api/commands/ChunkedResponseIterator.java
new file mode 100644
index 000000000..59f20a4fc
--- /dev/null
+++ b/src/main/java/com/basho/riak/client/api/commands/ChunkedResponseIterator.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2016 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.basho.riak.client.api.commands;
+
+import com.basho.riak.client.core.StreamingRiakFuture;
+import com.basho.riak.client.core.util.BinaryValue;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TransferQueue;
+import java.util.function.Function;
+
+/**
+ * Transforms a stream of response chunks to a Iterable of response items.
+ *
+ * When iterating over this class's {@link Iterator} this class will lazily walk
+ * through the response chunks's iterators and convert the items.
+ * It will also wait for more response chunks if none are available.
+ *
+ * Since this class polls for new "streaming" data, it is advisable
+ * to check {@link Thread#isInterrupted()} while using this class's
+ * {@link Iterator} in environments where thread interrupts must be obeyed.
+ *
+ * @param The final converted type that this class exposes as part of its iterator.
+ * @param The type of the response chunks, contains an Iterable<{@link CoreT}>
+ * @param The raw response type, will get converted to {@link FinalT}.
+ * @author Alex Moore
+ * @author Sergey Galkin
+ * @since 2.1.0
+ */
+public class ChunkedResponseIterator, CoreT> implements Iterator
+{
+ private final int timeout;
+ private volatile BinaryValue continuation = null;
+ private final StreamingRiakFuture coreFuture;
+ private final TransferQueue chunkQueue;
+ private final Function createNext;
+ private final Function> getNextIterator;
+ private final Function getContinuationFn;
+
+ protected Iterator currentIterator = null;
+
+ public ChunkedResponseIterator(StreamingRiakFuture coreFuture,
+ int pollTimeout,
+ Function createNextFn,
+ Function> getNextIteratorFn)
+ {
+ this(coreFuture, pollTimeout, createNextFn, getNextIteratorFn, (x) -> null);
+ }
+
+ public ChunkedResponseIterator(StreamingRiakFuture coreFuture,
+ int pollTimeout,
+ Function createNextFn,
+ Function> getNextIteratorFn,
+ Function getContinuationFn)
+ {
+ this.timeout = pollTimeout;
+ this.coreFuture = coreFuture;
+ this.chunkQueue = coreFuture.getResultsQueue();
+ this.createNext = createNextFn;
+ this.getNextIterator = getNextIteratorFn;
+ this.getContinuationFn = getContinuationFn;
+
+ // to kick of initial loading
+ hasNext();
+ }
+
+ /**
+ * Returns {@code true} if the iteration has more elements.
+ * (In other words, returns {@code true} if {@link #next} would
+ * return an element rather than throwing an exception.)
+ *
+ * This method will block and wait for more data if none is immediately available.
+ *
+ * Riak Java Client Note: Since this class polls for
+ * new "streaming" data, it is advisable to check {@link Thread#isInterrupted()}
+ * in environments where thread interrupts must be obeyed.
+ *
+ * @return {@code true} if the iteration has more elements
+ */
+ @Override
+ public boolean hasNext()
+ {
+ // Check & clear interrupted flag so we don't get an
+ // InterruptedException every time if the user
+ // doesn't clear it / deal with it.
+ boolean interrupted = Thread.interrupted();
+ Boolean dataLoaded = null;
+
+ try
+ {
+ while (!currentIteratorHasNext() && dataLoaded == null)
+ {
+ try
+ {
+ dataLoaded = tryLoadNextChunkIterator();
+ }
+ catch (InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ }
+ return currentIteratorHasNext();
+ }
+ finally
+ {
+ if (interrupted)
+ {
+ // Reset interrupted flag if we came in with it
+ // or we were interrupted while waiting.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private boolean currentIteratorHasNext()
+ {
+ return currentIterator != null && currentIterator.hasNext();
+ }
+
+ private boolean possibleChunksRemaining()
+ {
+ // Chunks may remain if :
+ // Core Operation Not Done OR items still in chunk Queue
+ return !coreFuture.isDone() || !chunkQueue.isEmpty();
+ }
+
+ /**
+ * Returns whether this response contains a continuation.
+ * Only run this once the operation is complete, otherwise it will return true as it's
+ * @return Whether this response has a continuation.
+ */
+ public boolean hasContinuation()
+ {
+ return continuation != null || possibleChunksRemaining();
+ }
+
+ /**
+ * Returns the current value of the continuation.
+ * Only run this once the operation is complete, or else you will get a null value.
+ * @return The continuation value (if any).
+ */
+ public BinaryValue getContinuation()
+ {
+ return continuation;
+ }
+
+ /**
+ * Returns the next element in the iteration.
+ * This method will block and wait for more data if none is immediately available.
+ *
+ * Riak Java Client Note: Since this class polls for
+ * new "streaming" data, it is advisable to check {@link Thread#isInterrupted()}
+ * in environments where thread interrupts must be obeyed.
+ *
+ * @return the next element in the iteration
+ * @throws NoSuchElementException if the iteration has no more elements
+ */
+ @Override
+ public FinalT next()
+ {
+ if (hasNext())
+ {
+ return createNext.apply(currentIterator.next());
+ }
+
+ throw new NoSuchElementException();
+ }
+
+ private boolean tryLoadNextChunkIterator() throws InterruptedException
+ {
+ this.currentIterator = null;
+ boolean populatedChunkLoaded = false;
+
+ while (!populatedChunkLoaded && possibleChunksRemaining())
+ {
+ final ChunkT nextChunk = chunkQueue.poll(timeout, TimeUnit.MILLISECONDS);
+
+ if (nextChunk != null)
+ {
+ this.currentIterator = getNextIterator.apply(nextChunk);
+ populatedChunkLoaded = currentIteratorHasNext();
+
+ loadContinuation(nextChunk);
+ }
+ }
+
+ return populatedChunkLoaded;
+ }
+
+ private void loadContinuation(ChunkT nextChunk)
+ {
+ final BinaryValue fetchedContinuation = getContinuationFn.apply(nextChunk);
+ if (this.continuation == null && fetchedContinuation != null)
+ {
+ this.continuation = fetchedContinuation;
+ }
+ }
+}
diff --git a/src/main/java/com/basho/riak/client/api/commands/ImmediateCoreFutureAdapter.java b/src/main/java/com/basho/riak/client/api/commands/ImmediateCoreFutureAdapter.java
new file mode 100644
index 000000000..b98c5b769
--- /dev/null
+++ b/src/main/java/com/basho/riak/client/api/commands/ImmediateCoreFutureAdapter.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2016 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.basho.riak.client.api.commands;
+
+import com.basho.riak.client.core.RiakFuture;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Used when the converted response is available before the core future is complete.
+ *
+ * @param The core response type.
+ * @param The core query info type.
+ * @param The converted response type.
+ * @param The converted query info type.
+ *
+ * @author Alex Moore
+ * @since 2.1.0
+ */
+public abstract class ImmediateCoreFutureAdapter extends CoreFutureAdapter
+{
+ private final T2 immediateResponse;
+
+ protected ImmediateCoreFutureAdapter(RiakFuture coreFuture, T2 immediateResponse)
+ {
+ super(coreFuture);
+ this.immediateResponse = immediateResponse;
+ }
+
+ @Override
+ public T2 get() throws InterruptedException, ExecutionException
+ {
+ return immediateResponse;
+ }
+
+ @Override
+ public T2 get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return immediateResponse;
+ }
+
+ @Override
+ public T2 getNow()
+ {
+ return immediateResponse;
+ }
+
+ @Override
+ protected T2 convertResponse(T unused) { return null; }
+
+ @Override
+ protected abstract S2 convertQueryInfo(S coreQueryInfo);
+
+ public static abstract class SameQueryInfo extends ImmediateCoreFutureAdapter
+ {
+ protected SameQueryInfo(RiakFuture coreFuture, T2 immediateResponse)
+ {
+ super(coreFuture, immediateResponse);
+ }
+
+ @Override
+ protected S convertQueryInfo(S coreQueryInfo)
+ {
+ return coreQueryInfo;
+ }
+ }
+}
diff --git a/src/main/java/com/basho/riak/client/api/commands/buckets/FetchBucketProperties.java b/src/main/java/com/basho/riak/client/api/commands/buckets/FetchBucketProperties.java
index a4c393975..4722d14ac 100644
--- a/src/main/java/com/basho/riak/client/api/commands/buckets/FetchBucketProperties.java
+++ b/src/main/java/com/basho/riak/client/api/commands/buckets/FetchBucketProperties.java
@@ -16,10 +16,7 @@
package com.basho.riak.client.api.commands.buckets;
-import com.basho.riak.client.api.RiakCommand;
-import com.basho.riak.client.api.commands.CoreFutureAdapter;
-import com.basho.riak.client.core.RiakCluster;
-import com.basho.riak.client.core.RiakFuture;
+import com.basho.riak.client.api.AsIsRiakCommand;
import com.basho.riak.client.core.operations.FetchBucketPropsOperation;
import com.basho.riak.client.core.query.Namespace;
@@ -39,7 +36,7 @@
* @author Dave Rusek
* @since 2.0
*/
-public final class FetchBucketProperties extends RiakCommand
+public final class FetchBucketProperties extends AsIsRiakCommand
{
private final Namespace namespace;
@@ -49,31 +46,7 @@ public FetchBucketProperties(Builder builder)
}
@Override
- protected final RiakFuture executeAsync(RiakCluster cluster)
- {
- RiakFuture coreFuture =
- cluster.execute(buildCoreOperation());
-
- CoreFutureAdapter future =
- new CoreFutureAdapter(coreFuture)
- {
- @Override
- protected FetchBucketPropsOperation.Response convertResponse(FetchBucketPropsOperation.Response coreResponse)
- {
- return coreResponse;
- }
-
- @Override
- protected Namespace convertQueryInfo(Namespace coreQueryInfo)
- {
- return coreQueryInfo;
- }
- };
- coreFuture.addListener(future);
- return future;
- }
-
- private FetchBucketPropsOperation buildCoreOperation()
+ protected FetchBucketPropsOperation buildCoreOperation()
{
return new FetchBucketPropsOperation.Builder(namespace).build();
}
diff --git a/src/main/java/com/basho/riak/client/api/commands/buckets/ListBuckets.java b/src/main/java/com/basho/riak/client/api/commands/buckets/ListBuckets.java
index a83bf4c07..80a33da31 100644
--- a/src/main/java/com/basho/riak/client/api/commands/buckets/ListBuckets.java
+++ b/src/main/java/com/basho/riak/client/api/commands/buckets/ListBuckets.java
@@ -15,11 +15,12 @@
*/
package com.basho.riak.client.api.commands.buckets;
-import com.basho.riak.client.api.RiakCommand;
-import com.basho.riak.client.api.commands.CoreFutureAdapter;
-import com.basho.riak.client.core.RiakCluster;
-import com.basho.riak.client.core.RiakFuture;
+import com.basho.riak.client.api.commands.ChunkedResponseIterator;
+import com.basho.riak.client.api.StreamableRiakCommand;
+import com.basho.riak.client.core.FutureOperation;
+import com.basho.riak.client.core.StreamingRiakFuture;
import com.basho.riak.client.core.operations.ListBucketsOperation;
+import com.basho.riak.client.core.query.ConvertibleIterator;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.util.BinaryValue;
@@ -39,10 +40,30 @@
* System.out.println(ns.getBucketName());
* }}
*
+ *
+ * You can also stream the results back before the operation is fully complete.
+ * This reduces the time between executing the operation and seeing a result,
+ * and reduces overall memory usage if the iterator is consumed quickly enough.
+ * The result iterable can only be iterated once though.
+ * If the thread is interrupted while the iterator is polling for more results,
+ * a {@link RuntimeException} will be thrown.
+ *