-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming results for List Keys/Buckets, 2i, and MR #677
Changes from 14 commits
e1480aa
af3d5ba
5c56d9f
78c9076
590bcde
5b87e24
69aed60
c7c40dd
ca0aaf4
ca29f1a
e6de1e0
35e9c4f
82fb339
9ac71b6
d19cd3e
fe24ba1
3edac1f
4e0a90b
875999d
3ac537c
051565d
7a1ccde
5b9f6b4
511f95e
a54aa45
3dd8230
1c5883c
3869107
74f7ac4
def141f
96f1bb6
443cb40
37c6d55
a112bf2
0f62ed9
576eccf
9630a11
63efe16
52795d4
3e5c7b0
d97183a
76a6ee4
7cdf043
45f55c4
12c8944
944fb90
67e56cb
22655d4
3afa145
a07833f
e313956
51bff20
002ba0e
a13798b
c1b575a
ac3d7e2
78c43f1
a797ba6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package com.basho.riak.client.api; | ||
|
||
import com.basho.riak.client.core.RiakCluster; | ||
import com.basho.riak.client.core.RiakFuture; | ||
|
||
/* | ||
* The base class for all Streamable Riak Commands. | ||
* Allows the user to either use {@link RiakCommand#executeAsync} and return a "batch-mode" result | ||
* that is only available after the command is complete, or | ||
* use {@link StreamableRiakCommand#executeAsyncStreaming} and return a "immediate" or "stream-mode" result | ||
* that data will flow into. | ||
* @param <S> The response type returned by "streaming mode" {@link executeAsyncStreaming} | ||
* @param <R> The response type returned by the "batch mode" @{link executeAsync} | ||
* @param <Q> The query info type | ||
* @author Dave Rusek | ||
* @author Brian Roach <roach at basho.com> | ||
* @since 2.0 | ||
*/ | ||
public abstract class StreamableRiakCommand<S, R, Q> extends RiakCommand<R, Q> | ||
{ | ||
protected abstract RiakFuture<S, Q> executeAsyncStreaming(RiakCluster cluster, int timeout); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
package com.basho.riak.client.api.commands; | ||
|
||
import com.basho.riak.client.core.StreamingRiakFuture; | ||
import com.basho.riak.client.core.util.BinaryValue; | ||
|
||
import java.util.Iterator; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TransferQueue; | ||
import java.util.function.Function; | ||
|
||
public class ChunkedResponseIterator<FinalT, ChunkT extends Iterable<CoreT>, CoreT> implements Iterator<FinalT> | ||
{ | ||
private final int timeout; | ||
private volatile BinaryValue continuation = null; | ||
private final StreamingRiakFuture<ChunkT, ?> coreFuture; | ||
private final TransferQueue<ChunkT> chunkQueue; | ||
private final Function<CoreT, FinalT> createNext; | ||
private final Function<ChunkT, Iterator<CoreT>> getNextIterator; | ||
private final Function<ChunkT, BinaryValue> getContinuationFn; | ||
|
||
private Iterator<CoreT> currentIterator = null; | ||
|
||
public ChunkedResponseIterator(StreamingRiakFuture<ChunkT, ?> coreFuture, | ||
int pollTimeout, | ||
Function<CoreT, FinalT> createNextFn, | ||
Function<ChunkT, Iterator<CoreT>> getNextIteratorFn) | ||
{ | ||
this(coreFuture, pollTimeout, createNextFn, getNextIteratorFn, (x) -> null); | ||
} | ||
|
||
public ChunkedResponseIterator(StreamingRiakFuture<ChunkT, ?> coreFuture, | ||
int pollTimeout, | ||
Function<CoreT, FinalT> createNextFn, | ||
Function<ChunkT, Iterator<CoreT>> getNextIteratorFn, | ||
Function<ChunkT, BinaryValue> getContinuationFn) | ||
{ | ||
this.timeout = pollTimeout; | ||
this.coreFuture = coreFuture; | ||
this.chunkQueue = coreFuture.getResultsQueue(); | ||
this.createNext = createNextFn; | ||
this.getNextIterator = getNextIteratorFn; | ||
this.getContinuationFn = getContinuationFn; | ||
loadNextChunkIterator(); | ||
} | ||
|
||
@Override | ||
public boolean hasNext() | ||
{ | ||
if(currentIteratorHasNext()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
{ | ||
return true; | ||
} | ||
|
||
loadNextChunkIterator(); | ||
|
||
return currentIteratorHasNext(); | ||
} | ||
|
||
private boolean currentIteratorHasNext() | ||
{ | ||
return currentIterator != null && currentIterator.hasNext(); | ||
} | ||
|
||
private boolean possibleChunksRemaining() | ||
{ | ||
// Chunks may remain if : | ||
// Core Operation Not Done OR items still in chunk Queue | ||
return !coreFuture.isDone() || !chunkQueue.isEmpty(); | ||
} | ||
|
||
public boolean hasContinuation() | ||
{ | ||
return continuation != null || possibleChunksRemaining(); | ||
} | ||
|
||
public BinaryValue getContinuation() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't us get rid of BinaryValue here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Continuation values (if the command/operation uses them) are always returned as BinaryValue through our interface. The continuation also comes as part of one of the chunked responses, so we need to watch out for it. |
||
{ | ||
return continuation; | ||
} | ||
|
||
@Override | ||
public synchronized FinalT next() | ||
{ | ||
return createNext.apply(currentIterator.next()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lack of synchronization on hasNext() whereas next() is synchronized looks looks very suspicious for me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may not need the synchronization here, as iterators aren't typically guaranteed to be reentrant. The resulting iterator is single-use only too, so I'll need to make sure I document that somewhere as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Originally I did all the lower-level iterator checking/loading in |
||
} | ||
|
||
private void loadNextChunkIterator() | ||
{ | ||
this.currentIterator = null; | ||
boolean populatedChunkLoaded = false; | ||
|
||
try | ||
{ | ||
while (!populatedChunkLoaded && possibleChunksRemaining()) | ||
{ | ||
final ChunkT nextChunk = chunkQueue.poll(timeout, TimeUnit.MILLISECONDS); | ||
|
||
if (nextChunk != null) | ||
{ | ||
this.currentIterator = getNextIterator.apply(nextChunk); | ||
populatedChunkLoaded = currentIteratorHasNext(); | ||
|
||
loadContinuation(nextChunk); | ||
} | ||
} | ||
} | ||
catch (InterruptedException e) | ||
{ | ||
Thread.currentThread().interrupt(); | ||
throw new RuntimeException(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changing an interruption behavior by re-throwing RuntimeException calling interrupt() in the same time is not a common pattern. I have no clear understanding what we-ve got as a result. Theoretically, the most possible behavior is -- the original InterruptedException will be swallowed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, I didn't grok this either and just assumed it was the right way 🍤 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So what I settled on was to mark the interrupt() flag on the thread and to wrap it in a RuntimeException(), that way the thread should give up control. If we get an interrupt here it means that the user thread really wanted to interrupt for whatever reason, and we can't block indefinitely while we want for another chunk. In the "Riak is being slow", or "the user did a very bad/slow query" case, blocking indefinitely could deadlock the user thread for longer than wanted. Also we can't extend There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @A1 The more I think about the more I feel like there should be enough to throw teh only Runtime exception. We can't throw Interrupted due to the contract, therefore nobody will expect that any of it's methods might be blocked and as result nobody will expect that thread may be marked as interrupted. If you approve, I will introduce corresponding changes as a separate commit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading more opinions about this... I guess we could add a flag to note that it was interrupted, and keep going. If we get to the end ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My initial thought was exactly the same: set the flag and move forward without raising any exception. If this call was made from the whatever execution thread the programmer should take care about the proper thread interuption and at least we do not make it worse. Please excuse me for my hesitation. |
||
} | ||
} | ||
|
||
private void loadContinuation(ChunkT nextChunk) | ||
{ | ||
final BinaryValue fetchedContinuation = getContinuationFn.apply(nextChunk); | ||
if(this.continuation == null && fetchedContinuation != null) | ||
{ | ||
this.continuation = fetchedContinuation; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These headers should either be nuked or updated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
* Copyright 2014 Brian Roach <roach at basho dot com>. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.basho.riak.client.api.commands; | ||
|
||
import com.basho.riak.client.core.RiakFuture; | ||
|
||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
/** | ||
* Used when the converted response is available before the core future is complete. | ||
* | ||
* @param <T> The core response type. | ||
* @param <S> The core query info type. | ||
* @param <T2> The converted response type. | ||
* @param <S2> The converted query info type. | ||
* | ||
* @author Alex Moore <amoore at basho dot com> | ||
* @since 2.1 | ||
*/ | ||
public abstract class ImmediateCoreFutureAdapter<T2,S2,T,S> extends CoreFutureAdapter<T2,S2,T,S> | ||
{ | ||
private final T2 immediateResponse; | ||
|
||
protected ImmediateCoreFutureAdapter(RiakFuture<T, S> coreFuture, T2 immediateResponse) | ||
{ | ||
super(coreFuture); | ||
this.immediateResponse = immediateResponse; | ||
} | ||
|
||
@Override | ||
public T2 get() throws InterruptedException, ExecutionException | ||
{ | ||
return immediateResponse; | ||
} | ||
|
||
@Override | ||
public T2 get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException | ||
{ | ||
return immediateResponse; | ||
} | ||
|
||
@Override | ||
public T2 getNow() | ||
{ | ||
return immediateResponse; | ||
} | ||
|
||
@Override | ||
protected T2 convertResponse(T unused) { return null; } | ||
|
||
@Override | ||
protected abstract S2 convertQueryInfo(S coreQueryInfo); | ||
|
||
public static abstract class SameQueryInfo<T2,S,T> extends ImmediateCoreFutureAdapter<T2,S,T,S> | ||
{ | ||
protected SameQueryInfo(RiakFuture<T, S> coreFuture, T2 immediateResponse) | ||
{ | ||
super(coreFuture, immediateResponse); | ||
} | ||
|
||
@Override | ||
protected S convertQueryInfo(S coreQueryInfo) | ||
{ | ||
return coreQueryInfo; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why it may require to expose BinaryValue as a part of getContinuationFn() signature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.