Skip to content

Commit

Permalink
Merge pull request #677 from basho/streaming-api-2
Browse files Browse the repository at this point in the history
Streaming results for List Keys/Buckets, 2i, and MR
  • Loading branch information
alexmoore authored Nov 16, 2016
2 parents 709ce84 + a797ba6 commit e385fc3
Show file tree
Hide file tree
Showing 84 changed files with 3,774 additions and 2,151 deletions.
37 changes: 37 additions & 0 deletions src/main/java/com/basho/riak/client/api/AsIsRiakCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2013-2016 Basho Technologies Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.basho.riak.client.api;

import com.basho.riak.client.core.FutureOperation;
import com.basho.riak.client.core.PBStreamingFutureOperation;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakFuture;

/**
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.1.0
*/
public abstract class AsIsRiakCommand<R, I> extends RiakCommand<R, I>
{
protected abstract FutureOperation<R, ?, I> buildCoreOperation();

protected RiakFuture<R,I> executeAsync(RiakCluster cluster)
{
final FutureOperation<R, ?, I> coreOperation = buildCoreOperation();

return cluster.execute(coreOperation);
}
}
70 changes: 70 additions & 0 deletions src/main/java/com/basho/riak/client/api/GenericRiakCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2013-2016 Basho Technologies Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.basho.riak.client.api;

import com.basho.riak.client.api.commands.CoreFutureAdapter;
import com.basho.riak.client.core.FutureOperation;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakFuture;

/**
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.1.0
*/
public abstract class GenericRiakCommand<R, I, CoreR, CoreI> extends RiakCommand<R, I>
{
public static abstract class GenericRiakCommandWithSameInfo<R, I, CoreR> extends GenericRiakCommand<R,I, CoreR, I>
{
@Override
protected I convertInfo(I coreInfo) {
return coreInfo;
}
}

protected abstract FutureOperation<CoreR, ?, CoreI> buildCoreOperation();

protected RiakFuture<R,I> executeAsync(RiakCluster cluster)
{
final FutureOperation<CoreR, ?, CoreI> coreOperation = buildCoreOperation();
assert coreOperation != null;

final RiakFuture<CoreR, CoreI> coreFuture = cluster.execute(coreOperation);

assert coreFuture != null;

final CoreFutureAdapter<R, I, CoreR, CoreI> future =
new CoreFutureAdapter<R, I, CoreR, CoreI>(coreFuture)
{
@Override
protected R convertResponse(CoreR coreResponse)
{
return GenericRiakCommand.this.convertResponse(coreOperation, coreResponse);
}

@Override
protected I convertQueryInfo(CoreI coreQueryInfo)
{
return GenericRiakCommand.this.convertInfo(coreQueryInfo);
}
};
coreFuture.addListener(future);
return future;
}

protected abstract R convertResponse(FutureOperation<CoreR, ?, CoreI> request, CoreR coreResponse);

protected abstract I convertInfo(CoreI coreInfo);
}
42 changes: 41 additions & 1 deletion src/main/java/com/basho/riak/client/api/RiakClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
* </ul>
* @author Dave Rusek <drusek at basho dot com>
* @author Brian Roach <roach at basho dot com>
* @author Alex Moore <amoore at basho.com>
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.0
*/
Expand Down Expand Up @@ -277,8 +278,9 @@ public static RiakClient newClient(InetSocketAddress... addresses) throws Unknow
* @return a new RiakClient instance.
* @throws java.net.UnknownHostException if a supplied hostname cannot be resolved.
* @since 2.0.3
* @see com.basho.riak.client.core.RiakCluster.Builder#Builder(RiakNode.Builder, List)
* @see com.basho.riak.client.core.RiakCluster.Builder#RiakCluster.Builder(RiakNode.Builder, List)
*/
// NB: IntelliJ will see the above @see statement as invalid, but it's correct: https://bugs.openjdk.java.net/browse/JDK-8031625
public static RiakClient newClient(RiakNode.Builder nodeBuilder, List<String> addresses) throws UnknownHostException
{
final RiakCluster cluster = new RiakCluster.Builder(nodeBuilder, addresses).build();
Expand Down Expand Up @@ -402,6 +404,44 @@ public <T,S> RiakFuture<T,S> executeAsync(RiakCommand<T,S> command)
return command.executeAsync(cluster);
}

/**
* Execute a StreamableRiakCommand asynchronously, and stream the results back before
* the command {@link RiakFuture#isDone() is done}.
* <p>
* Calling this method causes the client to execute the provided
* StreamableRiakCommand asynchronously.
* It will immediately return a RiakFuture that contains an
* <b>immediately</b> available result (via {@link RiakFuture#get()}) that
* data will be streamed to.
* The RiakFuture will also keep track of the overall operation's progress
* with the {@link RiakFuture#isDone}, etc methods.
* </p>
* <p>
* Because the consumer thread will poll for new results, it is advisable to check the
* consumer thread's interrupted status via
* {@link Thread#isInterrupted() Thread.currentThread().isInterrupted() }, as the result
* iterator will not propagate an InterruptedException, but it will set the Thread's
* interrupted flag.
* </p>
* @param <I> StreamableRiakCommand's immediate return type, available before the command/operation is complete.
* @param <S> The RiakCommand's query info type.
* @param command The RiakCommand to execute.
* @param timeoutMS The polling timeout in milliseconds for each result chunk.
* If the timeout is reached it will try again, instead of blocking indefinitely.
* If the value is too small (less than the average chunk arrival time), the
* result iterator will essentially busy wait.
* If the timeout is too large (much greater than the average chunk arrival time),
* the result iterator can block the consuming thread from seeing the done()
* status until the timeout is reached.
* @return a RiakFuture for the operation
* @since 2.1.0
* @see RiakFuture
*/
public <I extends StreamableRiakCommand.StreamableResponse,S> RiakFuture<I,S> executeAsyncStreaming(StreamableRiakCommand<I, S, ?, ?> command, int timeoutMS)
{
return command.executeAsyncStreaming(cluster, timeoutMS);
}

/**
* Shut down the client and the underlying RiakCluster.
* <p>
Expand Down
133 changes: 133 additions & 0 deletions src/main/java/com/basho/riak/client/api/StreamableRiakCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2016 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.basho.riak.client.api;

import com.basho.riak.client.api.commands.ChunkedResponseIterator;
import com.basho.riak.client.api.commands.ImmediateCoreFutureAdapter;
import com.basho.riak.client.core.*;

import java.util.Iterator;

/*
* The base class for all Streamable Riak Commands.
* Allows the user to either use {@link RiakCommand#executeAsync} and return a "batch-mode" result
* that is only available after the command is complete, or
* use {@link StreamableRiakCommand#executeAsyncStreaming} and return a "immediate" or "stream-mode" result
* that data will flow into.
* @param <S> The response type returned by "streaming mode" {@link executeAsyncStreaming}
* @param <R> The response type returned by the "batch mode" @{link executeAsync}
* @param <I> The query info type
* @author Alex Moore <amoore at basho.com>
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.0
*/
public abstract class StreamableRiakCommand<R extends StreamableRiakCommand.StreamableResponse, I, CoreR, CoreI> extends GenericRiakCommand<R, I, CoreR, CoreI>
{
public static abstract class StreamableRiakCommandWithSameInfo<R extends StreamableResponse, I, CoreR> extends StreamableRiakCommand<R,I, CoreR, I>
{
@Override
protected I convertInfo(I coreInfo) {
return coreInfo;
}
}

public static abstract class StreamableResponse<T, S> implements Iterable<T>
{
protected ChunkedResponseIterator<T, ?, S> chunkedResponseIterator;

/**
* Constructor for streamable response
* @param chunkedResponseIterator
*/
protected StreamableResponse(ChunkedResponseIterator<T, ?, S> chunkedResponseIterator) {
this.chunkedResponseIterator = chunkedResponseIterator;
}

/**
* Constructor for not streamable response.
*/
protected StreamableResponse()
{
}

/**
* Whether the results are to be streamed back.
* If true, results will appear in this class's iterator.
* If false, they will appear in the original result collection.
* @return true if the results are to be streamed.
*/
public boolean isStreaming()
{
return chunkedResponseIterator != null;
}

/**
* Get an iterator over the result data.
*
* If using the streaming API, this method will block
* and wait for more data if none is immediately available.
* It is also advisable to check {@link Thread#isInterrupted()}
* in environments where thread interrupts must be obeyed.
*
* @return an iterator over the result data.
*/
@Override
public Iterator<T> iterator() {
if (isStreaming()) {
assert chunkedResponseIterator != null;
return chunkedResponseIterator;
}

throw new UnsupportedOperationException("Iterating is only supported for streamable response");
}
}

protected abstract R createResponse(int timeout, StreamingRiakFuture<CoreR, CoreI> coreFuture);

protected abstract PBStreamingFutureOperation<CoreR, ?, CoreI> buildCoreOperation(boolean streamResults);

@Override
protected final FutureOperation<CoreR, ?, CoreI> buildCoreOperation() {
return buildCoreOperation(false);
}

protected final RiakFuture<R, I> executeAsyncStreaming(RiakCluster cluster, int timeout)
{
final PBStreamingFutureOperation<CoreR, ?, CoreI> coreOperation = buildCoreOperation(true);
final StreamingRiakFuture<CoreR, CoreI> coreFuture = cluster.execute(coreOperation);

final R r = createResponse(timeout, coreFuture);

final ImmediateCoreFutureAdapter<R,I, CoreR, CoreI> future = new ImmediateCoreFutureAdapter<R,I,CoreR,CoreI>(coreFuture, r)
{
@Override
protected R convertResponse(CoreR response)
{
return StreamableRiakCommand.this.convertResponse(coreOperation, response);
}

@Override
protected I convertQueryInfo(CoreI coreQueryInfo)
{
return StreamableRiakCommand.this.convertInfo(coreQueryInfo);
}
};

coreFuture.addListener(future);
return future;
}
}
Loading

0 comments on commit e385fc3

Please sign in to comment.