Skip to content

Commit

Permalink
Add ListException to BucketMapReduce
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Bakken committed Feb 17, 2017
1 parent f513522 commit b881172
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.basho.riak.client.api.commands.mapreduce;

import com.basho.riak.client.api.ListException;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.api.commands.mapreduce.filters.KeyFilter;

Expand All @@ -13,9 +14,14 @@
*/
public class BucketMapReduce extends MapReduce
{
protected BucketMapReduce(BucketInput input, Builder builder)
protected BucketMapReduce(BucketInput input, Builder builder) throws ListException
{
super(input, builder);

if (builder.allowListing == false)
{
throw new ListException();
}
}

/**
Expand All @@ -25,13 +31,29 @@ public static class Builder extends MapReduce.Builder<Builder>
{
private Namespace namespace;
private final List<KeyFilter> filters = new ArrayList<>();
private boolean allowListing;

@Override
protected Builder self()
{
return this;
}

/**
* Allow this listing command
* <p>
* Bucket and key list operations are expensive and should not
* be used in production, however using this method will allow
* the command to be built.
* </p>
* @return a reference to this object.
*/
public Builder withAllowListing()
{
this.allowListing = true;
return this;
}

public Builder withNamespace(Namespace namespace)
{
this.namespace = namespace;
Expand All @@ -44,7 +66,7 @@ public Builder withKeyFilter(KeyFilter filter)
return this;
}

public BucketMapReduce build()
public BucketMapReduce build() throws ListException
{
if (namespace == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.basho.riak.client.api.commands.itest;

import com.basho.riak.client.api.ListException;
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
Expand All @@ -42,6 +43,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
*
Expand Down Expand Up @@ -134,13 +136,21 @@ public void erlangBucketMRTestTypeStreaming() throws InterruptedException, Execu
private void erlangBucketMR(String bucketType) throws InterruptedException, ExecutionException
{
Namespace ns = new Namespace(bucketType, mrBucketName);
BucketMapReduce bmr =
new BucketMapReduce.Builder()
BucketMapReduce bmr = null;
try
{
bmr = new BucketMapReduce.Builder()
.withAllowListing()
.withNamespace(ns)
.withMapPhase(Function.newErlangFunction("riak_kv_mapreduce", "map_object_value"), false)
.withReducePhase(Function.newErlangFunction("riak_kv_mapreduce", "reduce_string_to_integer"), false)
.withReducePhase(Function.newErlangFunction("riak_kv_mapreduce", "reduce_sort"), true)
.build();
}
catch (ListException ex)
{
fail(ex.getMessage());
}

MapReduce.Response response = client.execute(bmr);

Expand All @@ -157,13 +167,21 @@ private void erlangBucketMR(String bucketType) throws InterruptedException, Exec
private void streamingErlangBucketMR(String bucketType) throws InterruptedException, ExecutionException
{
Namespace ns = new Namespace(bucketType, mrBucketName);
BucketMapReduce bmr =
new BucketMapReduce.Builder()
.withNamespace(ns)
.withMapPhase(Function.newErlangFunction("riak_kv_mapreduce", "map_object_value"), false)
.withReducePhase(Function.newErlangFunction("riak_kv_mapreduce", "reduce_string_to_integer"), false)
.withReducePhase(Function.newErlangFunction("riak_kv_mapreduce", "reduce_sort"), true)
.build();
BucketMapReduce bmr = null;
try
{
bmr = new BucketMapReduce.Builder()
.withAllowListing()
.withNamespace(ns)
.withMapPhase(Function.newErlangFunction("riak_kv_mapreduce", "map_object_value"), false)
.withReducePhase(Function.newErlangFunction("riak_kv_mapreduce", "reduce_string_to_integer"), false)
.withReducePhase(Function.newErlangFunction("riak_kv_mapreduce", "reduce_sort"), true)
.build();
}
catch (ListException ex)
{
fail(ex.getMessage());
}

final RiakFuture<MapReduce.Response, BinaryValue> streamingFuture =
client.executeAsyncStreaming(bmr, 10);
Expand Down Expand Up @@ -231,12 +249,20 @@ public void JsBucketMRTestType() throws InterruptedException, ExecutionException
private void JsBucketMR(String bucketType) throws InterruptedException, ExecutionException
{
Namespace ns = new Namespace(bucketType, mrBucketName);
BucketMapReduce bmr =
new BucketMapReduce.Builder()
BucketMapReduce bmr = null;
try
{
bmr = new BucketMapReduce.Builder()
.withAllowListing()
.withNamespace(ns)
.withMapPhase(Function.newNamedJsFunction("Riak.mapValuesJson"), false)
.withReducePhase(Function.newNamedJsFunction("Riak.reduceNumericSort"), true)
.build();
}
catch (ListException ex)
{
fail(ex.getMessage());
}

RiakFuture<MapReduce.Response, BinaryValue> future = client.executeAsync(bmr);

Expand All @@ -259,12 +285,20 @@ private void JsBucketMR(String bucketType) throws InterruptedException, Executio
public void multiPhaseResult() throws InterruptedException, ExecutionException
{
Namespace ns = new Namespace(Namespace.DEFAULT_BUCKET_TYPE, mrBucketName);
BucketMapReduce bmr =
new BucketMapReduce.Builder()
BucketMapReduce bmr = null;
try
{
bmr = new BucketMapReduce.Builder()
.withAllowListing()
.withNamespace(ns)
.withMapPhase(Function.newNamedJsFunction("Riak.mapValuesJson"), true)
.withReducePhase(Function.newNamedJsFunction("Riak.reduceNumericSort"), true)
.build();
}
catch (ListException ex)
{
fail(ex.getMessage());
}

RiakFuture<MapReduce.Response, BinaryValue> future = client.executeAsync(bmr);

Expand All @@ -288,15 +322,23 @@ public void multiPhaseResult() throws InterruptedException, ExecutionException
public void keyFilter() throws InterruptedException, ExecutionException
{
Namespace ns = new Namespace(Namespace.DEFAULT_BUCKET_TYPE, mrBucketName);
BucketMapReduce bmr =
new BucketMapReduce.Builder()
BucketMapReduce bmr = null;
try
{
bmr = new BucketMapReduce.Builder()
.withAllowListing()
.withNamespace(ns)
.withMapPhase(Function.newNamedJsFunction("Riak.mapValuesJson"))
.withReducePhase(Function.newErlangFunction("riak_kv_mapreduce", "reduce_sort"),true)
.withKeyFilter(new TokenizeFilter("_",3))
.withKeyFilter(new StringToIntFilter())
.withKeyFilter(new LogicalAndFilter(new LessThanFilter<>(50), new GreaterThanFilter<>(45)))
.build();
}
catch (ListException ex)
{
fail(ex.getMessage());
}

RiakFuture<MapReduce.Response, BinaryValue> future = client.executeAsync(bmr);

Expand All @@ -315,8 +357,10 @@ public void differentBucketType() throws InterruptedException, ExecutionExceptio
Assume.assumeTrue(testBucketType);

Namespace ns = new Namespace(mapReduceBucketType.toString(), mrBucketName);
BucketMapReduce bmr =
new BucketMapReduce.Builder()
BucketMapReduce bmr = null;
try
{
bmr = new BucketMapReduce.Builder()
.withNamespace(ns)
.withMapPhase(Function.newAnonymousJsFunction(
"function(value, keydata, arg) {" +
Expand All @@ -327,6 +371,11 @@ public void differentBucketType() throws InterruptedException, ExecutionExceptio
" return[];" +
"}"), true)
.build();
}
catch (ListException ex)
{
fail(ex.getMessage());
}

MapReduce.Response response = client.execute(bmr);

Expand All @@ -339,8 +388,10 @@ public void differentBucketTypeWithFilter() throws InterruptedException, Executi
Assume.assumeTrue(testBucketType);

Namespace ns = new Namespace(mapReduceBucketType.toString(), mrBucketName);
BucketMapReduce bmr =
new BucketMapReduce.Builder()
BucketMapReduce bmr = null;
try
{
bmr = new BucketMapReduce.Builder()
.withNamespace(ns)
.withKeyFilter(new TokenizeFilter("_",3))
.withKeyFilter(new StringToIntFilter())
Expand All @@ -351,6 +402,11 @@ public void differentBucketTypeWithFilter() throws InterruptedException, Executi
" return [data];" +
"}"), true)
.build();
}
catch (ListException ex)
{
fail(ex.getMessage());
}

RiakFuture<MapReduce.Response, BinaryValue> future = client.executeAsync(bmr);

Expand Down

0 comments on commit b881172

Please sign in to comment.