From a797ba638cadcb363bae371c9158c29ed3aeba10 Mon Sep 17 00:00:00 2001 From: Alex Moore Date: Tue, 15 Nov 2016 15:10:02 -0500 Subject: [PATCH] Changing isStreamable to isStreaming, as we cannot treat a streaming result as a non-streaming one --- .../basho/riak/client/api/StreamableRiakCommand.java | 4 ++-- .../riak/client/api/commands/buckets/ListBuckets.java | 2 +- .../api/commands/indexes/SecondaryIndexQuery.java | 11 +++++------ .../basho/riak/client/api/commands/kv/ListKeys.java | 2 +- .../riak/client/api/commands/mapreduce/MapReduce.java | 4 ++-- .../commands/indexes/itest/ITestFullBucketRead.java | 4 ++-- .../api/commands/itest/ITestBucketMapReduce.java | 7 +------ 7 files changed, 14 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java b/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java index b24c6b2f4..ac983b63b 100644 --- a/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java +++ b/src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java @@ -70,7 +70,7 @@ protected StreamableResponse() * If false, they will appear in the original result collection. * @return true if the results are to be streamed. */ - public boolean isStreamable() + public boolean isStreaming() { return chunkedResponseIterator != null; } @@ -87,7 +87,7 @@ public boolean isStreamable() */ @Override public Iterator iterator() { - if (isStreamable()) { + if (isStreaming()) { assert chunkedResponseIterator != null; return chunkedResponseIterator; } diff --git a/src/main/java/com/basho/riak/client/api/commands/buckets/ListBuckets.java b/src/main/java/com/basho/riak/client/api/commands/buckets/ListBuckets.java index 3e9e7ffd3..80a33da31 100644 --- a/src/main/java/com/basho/riak/client/api/commands/buckets/ListBuckets.java +++ b/src/main/java/com/basho/riak/client/api/commands/buckets/ListBuckets.java @@ -147,7 +147,7 @@ public Response(BinaryValue type, List buckets) @Override public Iterator iterator() { - if (isStreamable()) { + if (isStreaming()) { return super.iterator(); } diff --git a/src/main/java/com/basho/riak/client/api/commands/indexes/SecondaryIndexQuery.java b/src/main/java/com/basho/riak/client/api/commands/indexes/SecondaryIndexQuery.java index f6a8075d6..52f6dba5b 100644 --- a/src/main/java/com/basho/riak/client/api/commands/indexes/SecondaryIndexQuery.java +++ b/src/main/java/com/basho/riak/client/api/commands/indexes/SecondaryIndexQuery.java @@ -27,7 +27,6 @@ import com.basho.riak.client.core.util.BinaryValue; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -638,7 +637,7 @@ protected Response(Namespace queryLocation, */ public Iterator iterator() { - if (isStreamable()) { + if (isStreaming()) { return super.iterator(); } @@ -663,7 +662,7 @@ protected E convert(SecondaryIndexQueryOperation.Response.Entry e) */ public boolean hasContinuation() { - if (isStreamable()) + if (isStreaming()) { return chunkedResponseIterator.hasContinuation(); } @@ -682,7 +681,7 @@ public boolean hasContinuation() */ public BinaryValue getContinuation() { - if (isStreamable()) + if (isStreaming()) { return chunkedResponseIterator.getContinuation(); } @@ -702,7 +701,7 @@ public BinaryValue getContinuation() */ public boolean hasEntries() { - if (isStreamable()) + if (isStreaming()) { return chunkedResponseIterator.hasNext(); } @@ -718,7 +717,7 @@ public boolean hasEntries() */ public final List getEntries() { - if(isStreamable()) + if(isStreaming()) { throw new IllegalStateException("Use the iterator() while using the streaming API"); } diff --git a/src/main/java/com/basho/riak/client/api/commands/kv/ListKeys.java b/src/main/java/com/basho/riak/client/api/commands/kv/ListKeys.java index 3e86929b6..beaf3bc71 100644 --- a/src/main/java/com/basho/riak/client/api/commands/kv/ListKeys.java +++ b/src/main/java/com/basho/riak/client/api/commands/kv/ListKeys.java @@ -139,7 +139,7 @@ public Response(Namespace namespace, List keys) @Override public Iterator iterator() { - if (isStreamable()) + if (isStreaming()) { return super.iterator(); } diff --git a/src/main/java/com/basho/riak/client/api/commands/mapreduce/MapReduce.java b/src/main/java/com/basho/riak/client/api/commands/mapreduce/MapReduce.java index 83868872e..ca13a51b2 100644 --- a/src/main/java/com/basho/riak/client/api/commands/mapreduce/MapReduce.java +++ b/src/main/java/com/basho/riak/client/api/commands/mapreduce/MapReduce.java @@ -355,7 +355,7 @@ public Response(Map results) } @Override - public boolean isStreamable() + public boolean isStreaming() { return responseIterator != null; } @@ -404,7 +404,7 @@ private ArrayNode flattenResults() @Override public Iterator iterator() { - if (isStreamable()) { + if (isStreaming()) { return responseIterator; } diff --git a/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestFullBucketRead.java b/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestFullBucketRead.java index 84adfc4ad..5da5123d4 100644 --- a/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestFullBucketRead.java +++ b/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestFullBucketRead.java @@ -335,7 +335,7 @@ private Map> retrieveChunkedKeysForCoverageEntry(Riak readResponse = rc.executeAsyncStreaming(cmd2, pollTimeout).get(); } - assertEquals(useStreaming, readResponse.isStreamable()); + assertEquals(useStreaming, readResponse.isStreaming()); final List list = new LinkedList<>(); readResponse.forEach( e -> list.add(e)); @@ -366,7 +366,7 @@ private Map> retrieveChunkedKeysForCoverageEntry(Riak r = rc.executeAsyncStreaming(cmd2, pollTimeout).get(); } - assertEquals(useStreaming, r.isStreamable()); + assertEquals(useStreaming, r.isStreaming()); final List entries = new LinkedList<>(); diff --git a/src/test/java/com/basho/riak/client/api/commands/itest/ITestBucketMapReduce.java b/src/test/java/com/basho/riak/client/api/commands/itest/ITestBucketMapReduce.java index 1ad19e568..dbd056547 100644 --- a/src/test/java/com/basho/riak/client/api/commands/itest/ITestBucketMapReduce.java +++ b/src/test/java/com/basho/riak/client/api/commands/itest/ITestBucketMapReduce.java @@ -18,7 +18,6 @@ import com.basho.riak.client.api.RiakClient; import com.basho.riak.client.core.RiakFuture; -import com.basho.riak.client.core.operations.itest.ITestAutoCleanupBase; import com.basho.riak.client.api.commands.buckets.StoreBucketProperties; import com.basho.riak.client.api.commands.kv.StoreValue; import com.basho.riak.client.api.commands.mapreduce.BucketMapReduce; @@ -34,13 +33,9 @@ import com.basho.riak.client.api.commands.mapreduce.filters.TokenizeFilter; import com.basho.riak.client.core.query.functions.Function; import com.basho.riak.client.core.util.BinaryValue; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; -import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.junit.*; @@ -178,7 +173,7 @@ private void streamingErlangBucketMR(String bucketType) throws InterruptedExcept int count = 0; final MapReduce.Response streamingResponse = streamingFuture.get(); - assertTrue(streamingResponse.isStreamable()); + assertTrue(streamingResponse.isStreaming()); // The streaming query should return many results which are JSON arrays, each // containing a piece of the array [0-199]. // Streaming result would look like: [[0], [1,2,3], ... [..., 199]], with the outer