Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming results for List Keys/Buckets, 2i, and MR #677

Merged
merged 58 commits into from
Nov 16, 2016
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
e1480aa
Add streaming api to operations/futures
alexmoore Oct 5, 2016
af3d5ba
Add Streaming List Buckets
alexmoore Oct 5, 2016
5c56d9f
Add streaming list keys support
alexmoore Oct 5, 2016
78c9076
Add streaming 2i operations/results
alexmoore Oct 5, 2016
590bcde
Use TransferQueues instead of BlockingQueues
alexmoore Oct 10, 2016
5b87e24
Add Streaming MapReduce Operations
alexmoore Oct 11, 2016
69aed60
Forgot a file
alexmoore Oct 11, 2016
c7c40dd
Command level streaming for ListKeys and ListBuckets
alexmoore Oct 13, 2016
ca0aaf4
Cleanup & name things betterer
alexmoore Oct 15, 2016
ca29f1a
Start into Streaming 2i special commands.
alexmoore Oct 15, 2016
e6de1e0
Stub out rest of streaming 2i commands
alexmoore Oct 17, 2016
35e9c4f
Add Javadocs, remove need to check for nulls from streaming results
alexmoore Oct 18, 2016
82fb339
Streaming 2i Command Integration Tests
alexmoore Oct 19, 2016
9ac71b6
Implement Streaming MapReduce Results + Test
alexmoore Oct 19, 2016
d19cd3e
Fix spacing in control structures.
alexmoore Oct 20, 2016
fe24ba1
Fix headers for newest/recently changed files.
alexmoore Oct 20, 2016
3edac1f
Don't need synchronization on next(), removing unused import
alexmoore Oct 20, 2016
4e0a90b
Expand test coverage
alexmoore Oct 25, 2016
875999d
Add note on code coverage
alexmoore Oct 25, 2016
3ac537c
Move test to another thread, as to not upset other Interruptable test…
alexmoore Oct 25, 2016
051565d
Fix flappy test
alexmoore Oct 25, 2016
7a1ccde
Add javadocs on how to use Streaming interfaces.
alexmoore Oct 25, 2016
5b9f6b4
Fix misc javadocs warnings and errors
alexmoore Oct 25, 2016
511f95e
Fix another minor test bug
alexmoore Oct 25, 2016
a54aa45
Generalize StreamingFutureOperation.
srgg Oct 24, 2016
3dd8230
Introduce PBStreamingFutureOperation.
srgg Oct 26, 2016
1c5883c
Fix test that stubs a final method
alexmoore Oct 26, 2016
3869107
Remove RuntimeException + reset interrupted status after we're done l…
alexmoore Oct 27, 2016
74f7ac4
Merge pull request #682 from basho/streaming-api-2-restore-interrupt
alexmoore Oct 27, 2016
def141f
Avoid useless queue creation in PBStreamingFutureOperation
srgg Oct 27, 2016
96f1bb6
Fix interrupts in Streaming MR, check interrupted status before waiti…
alexmoore Oct 28, 2016
443cb40
Initial simplification of SecondaryIndexQuery command hierarchy
srgg Nov 3, 2016
37c6d55
Fix some unchecked cast warnings
srgg Nov 3, 2016
a112bf2
Coarse fix of unchecked cast warnings in SecondaryIndexQuery
srgg Nov 3, 2016
0f62ed9
Get rid of Response.getLocationFromCoreEntity().
srgg Nov 3, 2016
576eccf
Fix upper case in variable name
srgg Nov 6, 2016
9630a11
Simplification of a bunch of RiakCommand descendants
srgg Nov 7, 2016
63efe16
Simplification for the case when no needs to convert results
srgg Nov 8, 2016
52795d4
Generalize streamable responses
srgg Nov 8, 2016
3e5c7b0
Get rid of unsafe default response and infor conversions in GenericRi…
srgg Nov 8, 2016
d97183a
Get rid of unused GenericRiakCommand constructor
srgg Nov 8, 2016
76a6ee4
Request argument was added to the GenericRiakCommand.convertResponse …
srgg Nov 8, 2016
7cdf043
Generalize StreamableRiakCommand.executeAsyncStreaming()
srgg Nov 8, 2016
45f55c4
Remove if / subtype casting from Command execution
alexmoore Nov 8, 2016
12c8944
Make PBStreamingFutureOperation to be a part of StreamableRiakCommand…
srgg Nov 9, 2016
944fb90
Refactor streaming responses to re-use ConvertibleIterator
srgg Nov 9, 2016
67e56cb
Simplify ChunkedResponseIterator initial loading
srgg Nov 10, 2016
22655d4
Generalize StreamableResponse
srgg Nov 10, 2016
3afa145
fix iteration through chunked response
srgg Nov 10, 2016
a07833f
Merge pull request #684 from basho/simplification-streamable-commands
alexmoore Nov 11, 2016
e313956
Adding assert around chunk queue insertion
alexmoore Nov 11, 2016
51bff20
Adding integration tests for streamable full bucket read.
srgg Nov 15, 2016
002ba0e
Fix bug where calling 2i getEntries() while streaming would throw Nul…
alexmoore Nov 15, 2016
a13798b
Pedantic whitespace formatting.
alexmoore Nov 15, 2016
c1b575a
Merge branch 'develop' into streaming-api-2
alexmoore Nov 15, 2016
ac3d7e2
Switch 2i getEntries() to throw IllegalStateException.
alexmoore Nov 15, 2016
78c43f1
Adding javadocs to new public API pieces
alexmoore Nov 15, 2016
a797ba6
Changing isStreamable to isStreaming, as we cannot treat a streaming …
alexmoore Nov 15, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/main/java/com/basho/riak/client/api/RiakClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,28 @@ public <T,S> RiakFuture<T,S> executeAsync(RiakCommand<T,S> command)
return command.executeAsync(cluster);
}

/**
* Execute a StreamableRiakCommand asynchronously, and stream the results back before the command is complete.
* <p>
* Calling this method causes the client to execute the provided StreamableRiakCommand
* asynchronously. It will immediately return a RiakFuture that contains an immediately available result that data
* will be streamed to. The RiakFuture will also keep track of the overall operation's progress with the
* {@see RiakFuture#isDone}, etc methods.
*
* @param <I> StreamableRiakCommand's immediate return type, available before the command/operation is complete.
* @param <S> The RiakCommand's query info type.
* @param command The RiakCommand to execute.
* @param timeoutMS The loading timeout in milliseconds for each result chunk.
* If the timeout is reached a {@see null} will be returned from the result's iterator,
* instead of blocking indefinitely.
* @return a RiakFuture for the operation
* @see RiakFuture
*/
public <I,S> RiakFuture<I,S> executeAsyncStreaming(StreamableRiakCommand<I, ?, S> command, int timeoutMS)
{
return command.executeAsyncStreaming(cluster, timeoutMS);
}

/**
* Shut down the client and the underlying RiakCluster.
* <p>
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.basho.riak.client.api;

import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakFuture;

/*
* The base class for all Streamable Riak Commands.
* Allows the user to either use {@link RiakCommand#executeAsync} and return a "batch-mode" result
* that is only available after the command is complete, or
* use {@link StreamableRiakCommand#executeAsyncStreaming} and return a "immediate" or "stream-mode" result
* that data will flow into.
* @param <S> The response type returned by "streaming mode" {@link executeAsyncStreaming}
* @param <R> The response type returned by the "batch mode" @{link executeAsync}
* @param <Q> The query info type
* @author Dave Rusek
* @author Brian Roach <roach at basho.com>
* @since 2.0
*/
public abstract class StreamableRiakCommand<S, R, Q> extends RiakCommand<R, Q>
{
protected abstract RiakFuture<S, Q> executeAsyncStreaming(RiakCluster cluster, int timeout);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.basho.riak.client.api.commands;

import com.basho.riak.client.core.StreamingRiakFuture;
import com.basho.riak.client.core.util.BinaryValue;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.function.Function;

public class ChunkedResponseIterator<FinalT, ChunkT extends Iterable<CoreT>, CoreT> implements Iterator<FinalT>
{
private final int timeout;
private volatile BinaryValue continuation = null;
private final StreamingRiakFuture<ChunkT, ?> coreFuture;
private final TransferQueue<ChunkT> chunkQueue;
private final Function<CoreT, FinalT> createNext;
private final Function<ChunkT, Iterator<CoreT>> getNextIterator;
private final Function<ChunkT, BinaryValue> getContinuationFn;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why it may require to expose BinaryValue as a part of getContinuationFn() signature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.


private Iterator<CoreT> currentIterator = null;

public ChunkedResponseIterator(StreamingRiakFuture<ChunkT, ?> coreFuture,
int pollTimeout,
Function<CoreT, FinalT> createNextFn,
Function<ChunkT, Iterator<CoreT>> getNextIteratorFn)
{
this(coreFuture, pollTimeout, createNextFn, getNextIteratorFn, (x) -> null);
}

public ChunkedResponseIterator(StreamingRiakFuture<ChunkT, ?> coreFuture,
int pollTimeout,
Function<CoreT, FinalT> createNextFn,
Function<ChunkT, Iterator<CoreT>> getNextIteratorFn,
Function<ChunkT, BinaryValue> getContinuationFn)
{
this.timeout = pollTimeout;
this.coreFuture = coreFuture;
this.chunkQueue = coreFuture.getResultsQueue();
this.createNext = createNextFn;
this.getNextIterator = getNextIteratorFn;
this.getContinuationFn = getContinuationFn;
loadNextChunkIterator();
}

@Override
public boolean hasNext()
{
if(currentIteratorHasNext())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ( :trollface:

{
return true;
}

loadNextChunkIterator();

return currentIteratorHasNext();
}

private boolean currentIteratorHasNext()
{
return currentIterator != null && currentIterator.hasNext();
}

private boolean possibleChunksRemaining()
{
// Chunks may remain if :
// Core Operation Not Done OR items still in chunk Queue
return !coreFuture.isDone() || !chunkQueue.isEmpty();
}

public boolean hasContinuation()
{
return continuation != null || possibleChunksRemaining();
}

public BinaryValue getContinuation()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't us get rid of BinaryValue here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuation values (if the command/operation uses them) are always returned as BinaryValue through our interface. The continuation also comes as part of one of the chunked responses, so we need to watch out for it.

{
return continuation;
}

@Override
public synchronized FinalT next()
{
return createNext.apply(currentIterator.next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lack of synchronization on hasNext() whereas next() is synchronized looks looks very suspicious for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not need the synchronization here, as iterators aren't typically guaranteed to be reentrant. The resulting iterator is single-use only too, so I'll need to make sure I document that somewhere as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I did all the lower-level iterator checking/loading in next() instead of hasNext() like it is now. I'll just remove it based on my previous comment.

}

private void loadNextChunkIterator()
{
this.currentIterator = null;
boolean populatedChunkLoaded = false;

try
{
while (!populatedChunkLoaded && possibleChunksRemaining())
{
final ChunkT nextChunk = chunkQueue.poll(timeout, TimeUnit.MILLISECONDS);

if (nextChunk != null)
{
this.currentIterator = getNextIterator.apply(nextChunk);
populatedChunkLoaded = currentIteratorHasNext();

loadContinuation(nextChunk);
}
}
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Copy link
Contributor

@srgg srgg Oct 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing an interruption behavior by re-throwing RuntimeException calling interrupt() in the same time is not a common pattern. I have no clear understanding what we-ve got as a result. Theoretically, the most possible behavior is -- the original InterruptedException will be swallowed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I didn't grok this either and just assumed it was the right way 🍤

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what I settled on was to mark the interrupt() flag on the thread and to wrap it in a RuntimeException(), that way the thread should give up control.

If we get an interrupt here it means that the user thread really wanted to interrupt for whatever reason, and we can't block indefinitely while we want for another chunk. In the "Riak is being slow", or "the user did a very bad/slow query" case, blocking indefinitely could deadlock the user thread for longer than wanted.

Also we can't extend hasNext() signature to pass along the InterruptedException, hence the wrap. I need to add a test to see if the user calls cancel() if the future/iterator will pick it up and stop iteration.

Copy link
Contributor

@srgg srgg Oct 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@A1 The more I think about the more I feel like there should be enough to throw teh only Runtime exception. We can't throw Interrupted due to the contract, therefore nobody will expect that any of it's methods might be blocked and as result nobody will expect that thread may be marked as interrupted. If you approve, I will introduce corresponding changes as a separate commit.

Copy link
Contributor Author

@alexmoore alexmoore Oct 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading more opinions about this... I guess we could add a flag to note that it was interrupted, and keep going. If we get to the end (hasNext() returns false) we could do something at that point, but not sure what since the iterator interface won't allow us to interrupt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thought was exactly the same: set the flag and move forward without raising any exception. If this call was made from the whatever execution thread the programmer should take care about the proper thread interuption and at least we do not make it worse.

Please excuse me for my hesitation.

}
}

private void loadContinuation(ChunkT nextChunk)
{
final BinaryValue fetchedContinuation = getContinuationFn.apply(nextChunk);
if(this.continuation == null && fetchedContinuation != null)
{
this.continuation = fetchedContinuation;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These headers should either be nuked or updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* Copyright 2014 Brian Roach <roach at basho dot com>.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.basho.riak.client.api.commands;

import com.basho.riak.client.core.RiakFuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Used when the converted response is available before the core future is complete.
*
* @param <T> The core response type.
* @param <S> The core query info type.
* @param <T2> The converted response type.
* @param <S2> The converted query info type.
*
* @author Alex Moore <amoore at basho dot com>
* @since 2.1
*/
public abstract class ImmediateCoreFutureAdapter<T2,S2,T,S> extends CoreFutureAdapter<T2,S2,T,S>
{
private final T2 immediateResponse;

protected ImmediateCoreFutureAdapter(RiakFuture<T, S> coreFuture, T2 immediateResponse)
{
super(coreFuture);
this.immediateResponse = immediateResponse;
}

@Override
public T2 get() throws InterruptedException, ExecutionException
{
return immediateResponse;
}

@Override
public T2 get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return immediateResponse;
}

@Override
public T2 getNow()
{
return immediateResponse;
}

@Override
protected T2 convertResponse(T unused) { return null; }

@Override
protected abstract S2 convertQueryInfo(S coreQueryInfo);

public static abstract class SameQueryInfo<T2,S,T> extends ImmediateCoreFutureAdapter<T2,S,T,S>
{
protected SameQueryInfo(RiakFuture<T, S> coreFuture, T2 immediateResponse)
{
super(coreFuture, immediateResponse);
}

@Override
protected S convertQueryInfo(S coreQueryInfo)
{
return coreQueryInfo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package com.basho.riak.client.api.commands.buckets;

import com.basho.riak.client.api.RiakCommand;
import com.basho.riak.client.api.commands.ChunkedResponseIterator;
import com.basho.riak.client.api.commands.CoreFutureAdapter;
import com.basho.riak.client.api.StreamableRiakCommand;
import com.basho.riak.client.api.commands.ImmediateCoreFutureAdapter;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.core.StreamingRiakFuture;
import com.basho.riak.client.core.operations.ListBucketsOperation;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.util.BinaryValue;
Expand All @@ -42,7 +45,7 @@
* @author Dave Rusek <drusek at basho dot com>
* @since 2.0
*/
public final class ListBuckets extends RiakCommand<ListBuckets.Response, BinaryValue>
public final class ListBuckets extends StreamableRiakCommand<ListBuckets.StreamingResponse, ListBuckets.Response, BinaryValue>
{
private final int timeout;
private final BinaryValue type;
Expand All @@ -57,7 +60,7 @@ public final class ListBuckets extends RiakCommand<ListBuckets.Response, BinaryV
protected RiakFuture<Response, BinaryValue> executeAsync(RiakCluster cluster)
{
RiakFuture<ListBucketsOperation.Response, BinaryValue> coreFuture =
cluster.execute(buildCoreOperation());
cluster.execute(buildCoreOperation(false));

CoreFutureAdapter<ListBuckets.Response, BinaryValue, ListBucketsOperation.Response, BinaryValue> future =
new CoreFutureAdapter<ListBuckets.Response, BinaryValue, ListBucketsOperation.Response, BinaryValue>(coreFuture)
Expand All @@ -78,7 +81,22 @@ protected BinaryValue convertQueryInfo(BinaryValue coreQueryInfo)
return future;
}

private ListBucketsOperation buildCoreOperation()
protected RiakFuture<StreamingResponse, BinaryValue> executeAsyncStreaming(RiakCluster cluster, int timeout)
{
StreamingRiakFuture<ListBucketsOperation.Response, BinaryValue> coreFuture =
cluster.execute(buildCoreOperation(true));

final StreamingResponse streamingResponse = new StreamingResponse(type, timeout, coreFuture);

ImmediateCoreFutureAdapter.SameQueryInfo<StreamingResponse, BinaryValue, ListBucketsOperation.Response> future =
new ImmediateCoreFutureAdapter.SameQueryInfo<StreamingResponse, BinaryValue, ListBucketsOperation.Response>
(coreFuture, streamingResponse) {};

coreFuture.addListener(future);
return future;
}

private ListBucketsOperation buildCoreOperation(boolean streamResults)
{
ListBucketsOperation.Builder builder = new ListBucketsOperation.Builder();
if (timeout > 0)
Expand All @@ -91,6 +109,8 @@ private ListBucketsOperation buildCoreOperation()
builder.withBucketType(type);
}

builder.streamResults(streamResults);

return builder.build();
}

Expand Down Expand Up @@ -126,6 +146,29 @@ public Iterator<Namespace> iterator()
}
}

public static class StreamingResponse extends Response
{
private final ChunkedResponseIterator<Namespace, ListBucketsOperation.Response, BinaryValue>
chunkedResponseIterator;

StreamingResponse(BinaryValue type,
int pollTimeout,
StreamingRiakFuture<ListBucketsOperation.Response, BinaryValue> coreFuture)
{
super(type, null);
chunkedResponseIterator = new ChunkedResponseIterator<>(coreFuture,
pollTimeout,
(bucketName) -> new Namespace(type, bucketName),
(response) -> response.getBuckets().iterator());
}

@Override
public Iterator<Namespace> iterator()
{
return chunkedResponseIterator;
}
}

private static class Itr implements Iterator<Namespace>
{
private final Iterator<BinaryValue> iterator;
Expand Down Expand Up @@ -207,4 +250,5 @@ public ListBuckets build()
return new ListBuckets(this);
}
}

}
Loading