Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming results for List Keys/Buckets, 2i, and MR #677

Merged
merged 58 commits into from
Nov 16, 2016
Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
e1480aa
Add streaming api to operations/futures
alexmoore Oct 5, 2016
af3d5ba
Add Streaming List Buckets
alexmoore Oct 5, 2016
5c56d9f
Add streaming list keys support
alexmoore Oct 5, 2016
78c9076
Add streaming 2i operations/results
alexmoore Oct 5, 2016
590bcde
Use TransferQueues instead of BlockingQueues
alexmoore Oct 10, 2016
5b87e24
Add Streaming MapReduce Operations
alexmoore Oct 11, 2016
69aed60
Forgot a file
alexmoore Oct 11, 2016
c7c40dd
Command level streaming for ListKeys and ListBuckets
alexmoore Oct 13, 2016
ca0aaf4
Cleanup & name things betterer
alexmoore Oct 15, 2016
ca29f1a
Start into Streaming 2i special commands.
alexmoore Oct 15, 2016
e6de1e0
Stub out rest of streaming 2i commands
alexmoore Oct 17, 2016
35e9c4f
Add Javadocs, remove need to check for nulls from streaming results
alexmoore Oct 18, 2016
82fb339
Streaming 2i Command Integration Tests
alexmoore Oct 19, 2016
9ac71b6
Implement Streaming MapReduce Results + Test
alexmoore Oct 19, 2016
d19cd3e
Fix spacing in control structures.
alexmoore Oct 20, 2016
fe24ba1
Fix headers for newest/recently changed files.
alexmoore Oct 20, 2016
3edac1f
Don't need synchronization on next(), removing unused import
alexmoore Oct 20, 2016
4e0a90b
Expand test coverage
alexmoore Oct 25, 2016
875999d
Add note on code coverage
alexmoore Oct 25, 2016
3ac537c
Move test to another thread, as to not upset other Interruptable test…
alexmoore Oct 25, 2016
051565d
Fix flappy test
alexmoore Oct 25, 2016
7a1ccde
Add javadocs on how to use Streaming interfaces.
alexmoore Oct 25, 2016
5b9f6b4
Fix misc javadocs warnings and errors
alexmoore Oct 25, 2016
511f95e
Fix another minor test bug
alexmoore Oct 25, 2016
a54aa45
Generalize StreamingFutureOperation.
srgg Oct 24, 2016
3dd8230
Introduce PBStreamingFutureOperation.
srgg Oct 26, 2016
1c5883c
Fix test that stubs a final method
alexmoore Oct 26, 2016
3869107
Remove RuntimeException + reset interrupted status after we're done l…
alexmoore Oct 27, 2016
74f7ac4
Merge pull request #682 from basho/streaming-api-2-restore-interrupt
alexmoore Oct 27, 2016
def141f
Avoid useless queue creation in PBStreamingFutureOperation
srgg Oct 27, 2016
96f1bb6
Fix interrupts in Streaming MR, check interrupted status before waiti…
alexmoore Oct 28, 2016
443cb40
Initial simplification of SecondaryIndexQuery command hierarchy
srgg Nov 3, 2016
37c6d55
Fix some unchecked cast warnings
srgg Nov 3, 2016
a112bf2
Coarse fix of unchecked cast warnings in SecondaryIndexQuery
srgg Nov 3, 2016
0f62ed9
Get rid of Response.getLocationFromCoreEntity().
srgg Nov 3, 2016
576eccf
Fix upper case in variable name
srgg Nov 6, 2016
9630a11
Simplification of a bunch of RiakCommand descendants
srgg Nov 7, 2016
63efe16
Simplification for the case when no needs to convert results
srgg Nov 8, 2016
52795d4
Generalize streamable responses
srgg Nov 8, 2016
3e5c7b0
Get rid of unsafe default response and infor conversions in GenericRi…
srgg Nov 8, 2016
d97183a
Get rid of unused GenericRiakCommand constructor
srgg Nov 8, 2016
76a6ee4
Request argument was added to the GenericRiakCommand.convertResponse …
srgg Nov 8, 2016
7cdf043
Generalize StreamableRiakCommand.executeAsyncStreaming()
srgg Nov 8, 2016
45f55c4
Remove if / subtype casting from Command execution
alexmoore Nov 8, 2016
12c8944
Make PBStreamingFutureOperation to be a part of StreamableRiakCommand…
srgg Nov 9, 2016
944fb90
Refactor streaming responses to re-use ConvertibleIterator
srgg Nov 9, 2016
67e56cb
Simplify ChunkedResponseIterator initial loading
srgg Nov 10, 2016
22655d4
Generalize StreamableResponse
srgg Nov 10, 2016
3afa145
fix iteration through chunked response
srgg Nov 10, 2016
a07833f
Merge pull request #684 from basho/simplification-streamable-commands
alexmoore Nov 11, 2016
e313956
Adding assert around chunk queue insertion
alexmoore Nov 11, 2016
51bff20
Adding integration tests for streamable full bucket read.
srgg Nov 15, 2016
002ba0e
Fix bug where calling 2i getEntries() while streaming would throw Nul…
alexmoore Nov 15, 2016
a13798b
Pedantic whitespace formatting.
alexmoore Nov 15, 2016
c1b575a
Merge branch 'develop' into streaming-api-2
alexmoore Nov 15, 2016
ac3d7e2
Switch 2i getEntries() to throw IllegalStateException.
alexmoore Nov 15, 2016
78c43f1
Adding javadocs to new public API pieces
alexmoore Nov 15, 2016
a797ba6
Changing isStreamable to isStreaming, as we cannot treat a streaming …
alexmoore Nov 15, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/main/java/com/basho/riak/client/api/AsIsRiakCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2013-2016 Basho Technologies Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.basho.riak.client.api;

import com.basho.riak.client.core.FutureOperation;
import com.basho.riak.client.core.PBStreamingFutureOperation;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakFuture;

/**
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.1.0
*/
public abstract class AsIsRiakCommand<R, I> extends RiakCommand<R, I>
{
protected abstract FutureOperation<R, ?, I> buildCoreOperation();

protected RiakFuture<R,I> executeAsync(RiakCluster cluster)
{
final FutureOperation<R, ?, I> coreOperation = buildCoreOperation();

return cluster.execute(coreOperation);
}

}
72 changes: 72 additions & 0 deletions src/main/java/com/basho/riak/client/api/GenericRiakCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2013-2016 Basho Technologies Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.basho.riak.client.api;

import com.basho.riak.client.api.commands.CoreFutureAdapter;
import com.basho.riak.client.core.FutureOperation;
import com.basho.riak.client.core.PBStreamingFutureOperation;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakFuture;


/**
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.1.0
*/
public abstract class GenericRiakCommand<R, I, CoreR, CoreI> extends RiakCommand<R, I>
{
public static abstract class GenericRiakCommandWithSameInfo<R, I, CoreR> extends GenericRiakCommand<R,I, CoreR, I>
{
@Override
protected I convertInfo(I coreInfo) {
return coreInfo;
}
}

protected abstract FutureOperation<CoreR, ?, CoreI> buildCoreOperation();

protected RiakFuture<R,I> executeAsync(RiakCluster cluster)
{
final FutureOperation<CoreR, ?, CoreI> coreOperation = buildCoreOperation();
assert coreOperation != null;

final RiakFuture<CoreR, CoreI> coreFuture = cluster.execute(coreOperation);

assert coreFuture != null;

final CoreFutureAdapter<R, I, CoreR, CoreI> future =
new CoreFutureAdapter<R, I, CoreR, CoreI>(coreFuture)
{
@Override
protected R convertResponse(CoreR coreResponse)
{
return GenericRiakCommand.this.convertResponse(coreOperation, coreResponse);
}

@Override
protected I convertQueryInfo(CoreI coreQueryInfo)
{
return GenericRiakCommand.this.convertInfo(coreQueryInfo);
}
};
coreFuture.addListener(future);
return future;
}

protected abstract R convertResponse(FutureOperation<CoreR, ?, CoreI> request, CoreR coreResponse);

protected abstract I convertInfo(CoreI coreInfo);
}
40 changes: 39 additions & 1 deletion src/main/java/com/basho/riak/client/api/RiakClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,9 @@ public static RiakClient newClient(InetSocketAddress... addresses) throws Unknow
* @return a new RiakClient instance.
* @throws java.net.UnknownHostException if a supplied hostname cannot be resolved.
* @since 2.0.3
* @see com.basho.riak.client.core.RiakCluster.Builder#Builder(RiakNode.Builder, List)
* @see com.basho.riak.client.core.RiakCluster.Builder#RiakCluster.Builder(RiakNode.Builder, List)
*/
// NB: IntelliJ will see the above @see statement as invalid, but it's correct: https://bugs.openjdk.java.net/browse/JDK-8031625
public static RiakClient newClient(RiakNode.Builder nodeBuilder, List<String> addresses) throws UnknownHostException
{
final RiakCluster cluster = new RiakCluster.Builder(nodeBuilder, addresses).build();
Expand Down Expand Up @@ -402,6 +403,43 @@ public <T,S> RiakFuture<T,S> executeAsync(RiakCommand<T,S> command)
return command.executeAsync(cluster);
}

/**
* Execute a StreamableRiakCommand asynchronously, and stream the results back before
* the command {@link RiakFuture#isDone() is done}.
* <p>
* Calling this method causes the client to execute the provided
* StreamableRiakCommand asynchronously.
* It will immediately return a RiakFuture that contains an
* <b>immediately</b> available result (via {@link RiakFuture#get()}) that
* data will be streamed to.
* The RiakFuture will also keep track of the overall operation's progress
* with the {@link RiakFuture#isDone}, etc methods.
* </p>
* <p>
* Because the consumer thread will poll for new results, it is advisable to check the
* consumer thread's interrupted status via
* {@link Thread#isInterrupted() Thread.currentThread().isInterrupted() }, as the result
* iterator will not propagate an InterruptedException, but it will set the Thread's
* interrupted flag.
* </p>
* @param <I> StreamableRiakCommand's immediate return type, available before the command/operation is complete.
* @param <S> The RiakCommand's query info type.
* @param command The RiakCommand to execute.
* @param timeoutMS The polling timeout in milliseconds for each result chunk.
* If the timeout is reached it will try again, instead of blocking indefinitely.
* If the value is too small (less than the average chunk arrival time), the
* result iterator will essentially busy wait.
* If the timeout is too large (much greater than the average chunk arrival time),
* the result iterator can block the consuming thread from seeing the done()
* status until the timeout is reached.
* @return a RiakFuture for the operation
* @see RiakFuture
*/
public <I extends StreamableRiakCommand.StreamableResponse,S> RiakFuture<I,S> executeAsyncStreaming(StreamableRiakCommand<I, S, ?, ?> command, int timeoutMS)
{
return command.executeAsyncStreaming(cluster, timeoutMS);
}

/**
* Shut down the client and the underlying RiakCluster.
* <p>
Expand Down
118 changes: 118 additions & 0 deletions src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2016 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.basho.riak.client.api;

import com.basho.riak.client.api.commands.ChunkedResponseIterator;
import com.basho.riak.client.api.commands.ImmediateCoreFutureAdapter;
import com.basho.riak.client.core.*;

import java.util.Iterator;

/*
* The base class for all Streamable Riak Commands.
* Allows the user to either use {@link RiakCommand#executeAsync} and return a "batch-mode" result
* that is only available after the command is complete, or
* use {@link StreamableRiakCommand#executeAsyncStreaming} and return a "immediate" or "stream-mode" result
* that data will flow into.
* @param <S> The response type returned by "streaming mode" {@link executeAsyncStreaming}
* @param <R> The response type returned by the "batch mode" @{link executeAsync}
* @param <I> The query info type
* @author Dave Rusek
* @author Brian Roach <roach at basho.com>
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.0
*/
public abstract class StreamableRiakCommand<R extends StreamableRiakCommand.StreamableResponse, I, CoreR, CoreI> extends GenericRiakCommand<R, I, CoreR, CoreI>
{
public static abstract class StreamableRiakCommandWithSameInfo<R extends StreamableResponse, I, CoreR> extends StreamableRiakCommand<R,I, CoreR, I>
{
@Override
protected I convertInfo(I coreInfo) {
return coreInfo;
}
}

public static abstract class StreamableResponse<T, S> implements Iterable<T>
{
protected ChunkedResponseIterator<T, ?, S> chunkedResponseIterator;

/**
* Constructor for streamable response
* @param chunkedResponseIterator
*/
protected StreamableResponse(ChunkedResponseIterator<T, ?, S> chunkedResponseIterator) {
this.chunkedResponseIterator = chunkedResponseIterator;
}

/**
* Constructor for not streamable response.
*/
protected StreamableResponse()
{
}

public boolean isStreamable()
{
return chunkedResponseIterator != null;
}

@Override
public Iterator<T> iterator() {
if (isStreamable()) {
assert chunkedResponseIterator != null;
return chunkedResponseIterator;
}

throw new UnsupportedOperationException("Iterating is only supported for streamable response");
}
}

protected abstract R createResponse(int timeout, StreamingRiakFuture<CoreR, CoreI> coreFuture);

protected abstract PBStreamingFutureOperation<CoreR, ?, CoreI> buildCoreOperation(boolean streamResults);

@Override
protected final FutureOperation<CoreR, ?, CoreI> buildCoreOperation() {
return buildCoreOperation(false);
}

protected final RiakFuture<R, I> executeAsyncStreaming(RiakCluster cluster, int timeout)
{
final PBStreamingFutureOperation<CoreR, ?, CoreI> coreOperation = buildCoreOperation(true);
final StreamingRiakFuture<CoreR, CoreI> coreFuture = cluster.execute(coreOperation);

final R r = createResponse(timeout, coreFuture);

final ImmediateCoreFutureAdapter<R,I, CoreR, CoreI> future = new ImmediateCoreFutureAdapter<R,I,CoreR,CoreI>(coreFuture, r)
{
@Override
protected R convertResponse(CoreR response)
{
return StreamableRiakCommand.this.convertResponse(coreOperation, response);
}

@Override
protected I convertQueryInfo(CoreI coreQueryInfo)
{
return StreamableRiakCommand.this.convertInfo(coreQueryInfo);
}
};

coreFuture.addListener(future);
return future;
}
}
Loading