Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming results for List Keys/Buckets, 2i, and MR #677

Merged
merged 58 commits into from
Nov 16, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
e1480aa
Add streaming api to operations/futures
alexmoore Oct 5, 2016
af3d5ba
Add Streaming List Buckets
alexmoore Oct 5, 2016
5c56d9f
Add streaming list keys support
alexmoore Oct 5, 2016
78c9076
Add streaming 2i operations/results
alexmoore Oct 5, 2016
590bcde
Use TransferQueues instead of BlockingQueues
alexmoore Oct 10, 2016
5b87e24
Add Streaming MapReduce Operations
alexmoore Oct 11, 2016
69aed60
Forgot a file
alexmoore Oct 11, 2016
c7c40dd
Command level streaming for ListKeys and ListBuckets
alexmoore Oct 13, 2016
ca0aaf4
Cleanup & name things betterer
alexmoore Oct 15, 2016
ca29f1a
Start into Streaming 2i special commands.
alexmoore Oct 15, 2016
e6de1e0
Stub out rest of streaming 2i commands
alexmoore Oct 17, 2016
35e9c4f
Add Javadocs, remove need to check for nulls from streaming results
alexmoore Oct 18, 2016
82fb339
Streaming 2i Command Integration Tests
alexmoore Oct 19, 2016
9ac71b6
Implement Streaming MapReduce Results + Test
alexmoore Oct 19, 2016
d19cd3e
Fix spacing in control structures.
alexmoore Oct 20, 2016
fe24ba1
Fix headers for newest/recently changed files.
alexmoore Oct 20, 2016
3edac1f
Don't need synchronization on next(), removing unused import
alexmoore Oct 20, 2016
4e0a90b
Expand test coverage
alexmoore Oct 25, 2016
875999d
Add note on code coverage
alexmoore Oct 25, 2016
3ac537c
Move test to another thread, as to not upset other Interruptable test…
alexmoore Oct 25, 2016
051565d
Fix flappy test
alexmoore Oct 25, 2016
7a1ccde
Add javadocs on how to use Streaming interfaces.
alexmoore Oct 25, 2016
5b9f6b4
Fix misc javadocs warnings and errors
alexmoore Oct 25, 2016
511f95e
Fix another minor test bug
alexmoore Oct 25, 2016
a54aa45
Generalize StreamingFutureOperation.
srgg Oct 24, 2016
3dd8230
Introduce PBStreamingFutureOperation.
srgg Oct 26, 2016
1c5883c
Fix test that stubs a final method
alexmoore Oct 26, 2016
3869107
Remove RuntimeException + reset interrupted status after we're done l…
alexmoore Oct 27, 2016
74f7ac4
Merge pull request #682 from basho/streaming-api-2-restore-interrupt
alexmoore Oct 27, 2016
def141f
Avoid useless queue creation in PBStreamingFutureOperation
srgg Oct 27, 2016
96f1bb6
Fix interrupts in Streaming MR, check interrupted status before waiti…
alexmoore Oct 28, 2016
443cb40
Initial simplification of SecondaryIndexQuery command hierarchy
srgg Nov 3, 2016
37c6d55
Fix some unchecked cast warnings
srgg Nov 3, 2016
a112bf2
Coarse fix of unchecked cast warnings in SecondaryIndexQuery
srgg Nov 3, 2016
0f62ed9
Get rid of Response.getLocationFromCoreEntity().
srgg Nov 3, 2016
576eccf
Fix upper case in variable name
srgg Nov 6, 2016
9630a11
Simplification of a bunch of RiakCommand descendants
srgg Nov 7, 2016
63efe16
Simplification for the case when no needs to convert results
srgg Nov 8, 2016
52795d4
Generalize streamable responses
srgg Nov 8, 2016
3e5c7b0
Get rid of unsafe default response and infor conversions in GenericRi…
srgg Nov 8, 2016
d97183a
Get rid of unused GenericRiakCommand constructor
srgg Nov 8, 2016
76a6ee4
Request argument was added to the GenericRiakCommand.convertResponse …
srgg Nov 8, 2016
7cdf043
Generalize StreamableRiakCommand.executeAsyncStreaming()
srgg Nov 8, 2016
45f55c4
Remove if / subtype casting from Command execution
alexmoore Nov 8, 2016
12c8944
Make PBStreamingFutureOperation to be a part of StreamableRiakCommand…
srgg Nov 9, 2016
944fb90
Refactor streaming responses to re-use ConvertibleIterator
srgg Nov 9, 2016
67e56cb
Simplify ChunkedResponseIterator initial loading
srgg Nov 10, 2016
22655d4
Generalize StreamableResponse
srgg Nov 10, 2016
3afa145
fix iteration through chunked response
srgg Nov 10, 2016
a07833f
Merge pull request #684 from basho/simplification-streamable-commands
alexmoore Nov 11, 2016
e313956
Adding assert around chunk queue insertion
alexmoore Nov 11, 2016
51bff20
Adding integration tests for streamable full bucket read.
srgg Nov 15, 2016
002ba0e
Fix bug where calling 2i getEntries() while streaming would throw Nul…
alexmoore Nov 15, 2016
a13798b
Pedantic whitespace formatting.
alexmoore Nov 15, 2016
c1b575a
Merge branch 'develop' into streaming-api-2
alexmoore Nov 15, 2016
ac3d7e2
Switch 2i getEntries() to throw IllegalStateException.
alexmoore Nov 15, 2016
78c43f1
Adding javadocs to new public API pieces
alexmoore Nov 15, 2016
a797ba6
Changing isStreamable to isStreaming, as we cannot treat a streaming …
alexmoore Nov 15, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,16 @@ protected Response(Namespace queryLocation,
this.coreResponse = coreResponse;
}

/**
* Get an iterator over the result data.
*
* If using the streaming API, this method will block
* and wait for more data if none is immediately available.
* It is also advisable to check {@link Thread#isInterrupted()}
* in environments where thread interrupts must be obeyed.
*
* @return an iterator over the result data.
*/
public Iterator<E> iterator()
{
if (isStreamable()) {
Expand All @@ -644,6 +654,10 @@ protected E convert(SecondaryIndexQueryOperation.Response.Entry e)
/**
* Check if this response has a continuation.
*
* If using the streaming API, this property's value
* may change while data is being received, therefore
* it is best to call it after the operation is complete.
*
* @return true if the response contains a continuation.
*/
public boolean hasContinuation()
Expand All @@ -659,6 +673,10 @@ public boolean hasContinuation()
/**
* Get the continuation from this response.
*
* If using the streaming API, this property's value
* may change while data is being received, therefore
* it is best to call it after the operation is complete.
*
* @return the continuation, or null if none is present.
*/
public BinaryValue getContinuation()
Expand All @@ -674,6 +692,11 @@ public BinaryValue getContinuation()
/**
* Check is this response contains any entries.
*
* If using the streaming API, this method will block
* and wait for more data if none is immediately available.
* It is also advisable to check {@link Thread#isInterrupted()}
* in environments where thread interrupts must be obeyed.
*
* @return true if entries are present, false otherwise.
*/
public boolean hasEntries()
Expand All @@ -686,8 +709,19 @@ public boolean hasEntries()
return !coreResponse.getEntryList().isEmpty();
}

/**
* Get a list of the result entries for this response.
* If using the streaming API this method will return an empty list.
*
* @return A list of result entries.
*/
public final List<E> getEntries()
{
if(isStreamable())
{
return new ArrayList<>(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about throwing either IllegalState or UnsupportedOperation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not, it is better to return Collections.EMPTY_LIST

}

final List<SecondaryIndexQueryOperation.Response.Entry> coreEntries = coreResponse.getEntryList();
final List<E> convertedList = new ArrayList<>(coreEntries.size());

Expand All @@ -699,13 +733,6 @@ public final List<E> getEntries()
return convertedList;
}

/**
* Factory method.
* @param location
* @param coreEntry
* @param converter
* @return
*/
@SuppressWarnings("unchecked")
protected E createEntry(Location location, SecondaryIndexQueryOperation.Response.Entry coreEntry, IndexConverter<T> converter)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void testStreamingRangeQuery() throws ExecutionException, InterruptedExce
final BigIntIndexQuery.Response streamingResponse = streamingFuture.get();

assertTrue(streamingResponse.hasEntries());
assertTrue(streamingResponse.getEntries().isEmpty());

final String expectedObjectKey = objectKey(1);
boolean found = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.basho.riak.client.api.annotations.RiakIndex;
import com.basho.riak.client.api.annotations.RiakKey;
import com.basho.riak.client.api.commands.indexes.BinIndexQuery;
import com.basho.riak.client.api.commands.indexes.SecondaryIndexQuery;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.core.query.Location;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -105,6 +107,7 @@ public void testStreamingRangeQuery() throws ExecutionException, InterruptedExce
final BinIndexQuery.Response streamingResponse = streamingFuture.get();

assertTrue(streamingResponse.hasEntries());
assertTrue(streamingResponse.getEntries().isEmpty());

final String expectedObjectKey = objectKey(1);
final String expectedIndexKey = indexKey(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void testStreamingRangeQuery() throws ExecutionException, InterruptedExce
final IntIndexQuery.Response streamingResponse = streamingFuture.get();

assertTrue(streamingResponse.hasEntries());
assertTrue(streamingResponse.getEntries().isEmpty());

final String expectedObjectKey = objectKey(1);
boolean found = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public void testBucketIndexQueryStreaming() throws InterruptedException, Executi
final BinIndexQuery.Response streamingResponse = indexResult.get();

assertTrue(streamingResponse.hasEntries());
assertTrue(streamingResponse.getEntries().isEmpty());
assertEquals(100, StreamSupport.stream(streamingResponse.spliterator(), false).count());

// Assert everything was consumed
Expand Down