-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming results for List Keys/Buckets, 2i, and MR #677
Conversation
@Override | ||
public boolean hasNext() | ||
{ | ||
if(currentIteratorHasNext()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (
@@ -0,0 +1,83 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These headers should either be nuked or updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a a good first step. IMO it would be good to have as an overall goal to make more use of lambdas, not just as callbacks for asynchronous functions but as a tool in the quiver to reduce garbage. If you follow the code path through a request you see a lot of objects being created to support a single operation. Some of that can't be avoided but there are places I think a lambda-centric approach (versus creating a concrete implementation of some abstract class and instantiating it) would be more efficient. That said, we're probably not at a level where heap usage, GC, and those other considerations are very high priority. |
Indeed, I was lucky to get away with only 4 new class files, and I think I On Thu, Oct 20, 2016 at 9:18 AM, Jon Brisbin [email protected]
|
@Override | ||
public synchronized FinalT next() | ||
{ | ||
return createNext.apply(currentIterator.next()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lack of synchronization on hasNext() whereas next() is synchronized looks looks very suspicious for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may not need the synchronization here, as iterators aren't typically guaranteed to be reentrant. The resulting iterator is single-use only too, so I'll need to make sure I document that somewhere as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I did all the lower-level iterator checking/loading in next()
instead of hasNext()
like it is now. I'll just remove it based on my previous comment.
return continuation != null || possibleChunksRemaining(); | ||
} | ||
|
||
public BinaryValue getContinuation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't us get rid of BinaryValue here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continuation values (if the command/operation uses them) are always returned as BinaryValue through our interface. The continuation also comes as part of one of the chunked responses, so we need to watch out for it.
private final TransferQueue<ChunkT> chunkQueue; | ||
private final Function<CoreT, FinalT> createNext; | ||
private final Function<ChunkT, Iterator<CoreT>> getNextIterator; | ||
private final Function<ChunkT, BinaryValue> getContinuationFn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why it may require to expose BinaryValue as a part of getContinuationFn() signature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
catch (InterruptedException e) | ||
{ | ||
Thread.currentThread().interrupt(); | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing an interruption behavior by re-throwing RuntimeException calling interrupt() in the same time is not a common pattern. I have no clear understanding what we-ve got as a result. Theoretically, the most possible behavior is -- the original InterruptedException will be swallowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I didn't grok this either and just assumed it was the right way 🍤
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what I settled on was to mark the interrupt() flag on the thread and to wrap it in a RuntimeException(), that way the thread should give up control.
If we get an interrupt here it means that the user thread really wanted to interrupt for whatever reason, and we can't block indefinitely while we want for another chunk. In the "Riak is being slow", or "the user did a very bad/slow query" case, blocking indefinitely could deadlock the user thread for longer than wanted.
Also we can't extend hasNext()
signature to pass along the InterruptedException, hence the wrap. I need to add a test to see if the user calls cancel()
if the future/iterator will pick it up and stop iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@A1 The more I think about the more I feel like there should be enough to throw teh only Runtime exception. We can't throw Interrupted due to the contract, therefore nobody will expect that any of it's methods might be blocked and as result nobody will expect that thread may be marked as interrupted. If you approve, I will introduce corresponding changes as a separate commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading more opinions about this... I guess we could add a flag to note that it was interrupted, and keep going. If we get to the end (hasNext()
returns false) we could do something at that point, but not sure what since the iterator interface won't allow us to interrupt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial thought was exactly the same: set the flag and move forward without raising any exception. If this call was made from the whatever execution thread the programmer should take care about the proper thread interuption and at least we do not make it worse.
Please excuse me for my hesitation.
This all seems fine to a non-Java dev. While I know why the queue and |
|
||
import java.util.concurrent.ExecutionException; | ||
|
||
public abstract class ITestIndexBase extends ITestBase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have ITestAutoCleanupBase that do mostly the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly, but ITestAutoCleanupBase cleans up after every test, while this one cleans up after each suite/class of tests. With some of the tests doing 1K/10K entries it speeds things up a bit to use common data across all a class's tests.
I wanted to also put an abstract setupData()
method in here but I couldn't get the RiakCluster/RiakClient dependency and it's initialization correct with all the static context going on.
Simplification streamable commands
public final List<E> getEntries() | ||
{ | ||
if(isStreamable()) | ||
{ | ||
return new ArrayList<>(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about throwing either IllegalState or UnsupportedOperation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not, it is better to return Collections.EMPTY_LIST
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to any changes I didn't do. Also added some misc public api javadocs and did some whitespace cleanup.
+1 to any changes I didn't do. |
Looks like more code was deleted than written, always a good sign. Tests pass with Riak 2.2.0, 2.1.4, and 2.0.7 (except one YZ test there). 👍 |
This PR adds support for streaming results in:
This was a very interesting exercise in creating a new results return method from top-to-bottom, while not breaking/changing existing APIs.