diff --git a/pom.xml b/pom.xml
index 67b16f469..1f95b0506 100755
--- a/pom.xml
+++ b/pom.xml
@@ -472,6 +472,18 @@
+ net.javacrumbs.json-unit
+ json-unit
+ 1.19.0
+ test
+ org.apache.commons
+ commons-lang3
+ 3.5
+ test
diff --git a/src/test/java/com/basho/riak/client/RiakTestFunctions.java b/src/test/java/com/basho/riak/client/RiakTestFunctions.java
new file mode 100644
index 000000000..4fa8cebc1
--- /dev/null
+++ b/src/test/java/com/basho/riak/client/RiakTestFunctions.java
@@ -0,0 +1,328 @@
+package com.basho.riak.client;
+import com.basho.riak.client.api.RiakClient;
+import com.basho.riak.client.api.cap.Quorum;
+import com.basho.riak.client.api.cap.VClock;
+import com.basho.riak.client.api.commands.indexes.BinIndexQuery;
+import com.basho.riak.client.api.commands.indexes.IntIndexQuery;
+import com.basho.riak.client.api.commands.indexes.SecondaryIndexQuery;
+import com.basho.riak.client.api.commands.kv.FetchValue;
+import com.basho.riak.client.api.commands.kv.StoreValue;
+import com.basho.riak.client.core.query.Location;
+import com.basho.riak.client.core.query.Namespace;
+import com.basho.riak.client.core.query.RiakObject;
+import com.basho.riak.client.core.query.indexes.LongIntIndex;
+import com.basho.riak.client.core.query.indexes.RiakIndex;
+import com.basho.riak.client.core.query.indexes.RiakIndexes;
+import com.basho.riak.client.core.query.indexes.StringBinIndex;
+import com.basho.riak.client.core.util.BinaryValue;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.*;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import net.javacrumbs.jsonunit.core.internal.JsonUtils;
+import net.javacrumbs.jsonunit.core.internal.NodeFactory;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+public class RiakTestFunctions
+ public static class RiakObjectData
+ {
+ public String key;
+ public Object value;
+ public Map indices;
+ }
+ protected static Logger logger = LoggerFactory.getLogger(RiakTestFunctions.class);
+ /**
+ * Tolerant mapper that doesn't require quotation for field names
+ * and allows to use single quote for string values
+ */
+ protected final static ObjectMapper tolerantMapper = initializeJsonUnitMapper();
+ /**
+ * Making JsonAssert to be more tolerant to JSON format.
+ * And add some useful serializers
+ */
+ private static ObjectMapper initializeJsonUnitMapper()
+ {
+ final Object converter;
+ try
+ {
+ converter = FieldUtils.readStaticField(JsonUtils.class, "converter", true);
+ @SuppressWarnings("unchecked")
+ final List factories = (List) FieldUtils.readField(converter, "factories", true);
+ ObjectMapper mapper;
+ for (NodeFactory nf: factories)
+ {
+ if (nf.getClass().getSimpleName().equals("Jackson2NodeFactory"))
+ {
+ mapper = (ObjectMapper) FieldUtils.readField(nf, "mapper", true);
+ mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, true)
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true)
+ .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
+ .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
+ .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+ .registerModule( new SimpleModule()
+ .addSerializer(VClock.class, new VClockSerializer())
+ );
+ return mapper;
+ }
+ }
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new IllegalStateException("Can't initialize Jackson2 ObjectMapper because of UE", e);
+ }
+ throw new IllegalStateException("Can't initialize Jackson2 ObjectMapper, Jackson2NodeFactory is not found");
+ }
+ protected static List> parseRiakObjectsFromJsonData(String json) throws IOException
+ {
+ assert json != null && !json.isEmpty();
+ String actualJson = json;
+ // Add a list semantic if needed
+ if (!json.trim().startsWith("["))
+ {
+ actualJson = "[\n" + json + "\n]";
+ }
+ final List data = tolerantMapper.readValue(actualJson, new TypeReference>(){});
+ final List> r = new ArrayList<>(data.size());
+ for (RiakObjectData rod: data)
+ {
+ final RiakObject ro = new RiakObject();
+ final Map.Entry e = new AbstractMap.SimpleEntry<>(rod.key, ro);
+ r.add(e);
+ // populate value, if any
+ if( rod.value != null)
+ {
+ if ( rod.value instanceof Map || rod instanceof Collection)
+ {
+ final String v = tolerantMapper.writerWithDefaultPrettyPrinter()
+ .writeValueAsString(rod.value);
+ ro.setContentType("application/json")
+ .setValue(BinaryValue.create(v));
+ }
+ else
+ {
+ ro.setContentType("text/plain")
+ .setValue(BinaryValue.create(rod.value.toString()));
+ }
+ }
+ // populate 2i, if any
+ if (rod.indices == null || rod.indices.isEmpty())
+ {
+ continue;
+ }
+ final RiakIndexes riakIndexes = ro.getIndexes();
+ for (Map.Entry ie: rod.indices.entrySet())
+ {
+ assert ie.getValue() != null;
+ if (ie.getValue() instanceof Long)
+ {
+ riakIndexes.getIndex(LongIntIndex.named(ie.getKey()))
+ .add((Long)ie.getValue());
+ }
+ else if (ie.getValue() instanceof Integer)
+ {
+ riakIndexes.getIndex(LongIntIndex.named(ie.getKey()))
+ .add(((Integer)ie.getValue()).longValue());
+ }
+ else if (ie.getValue() instanceof String)
+ {
+ riakIndexes.getIndex(StringBinIndex.named(ie.getKey()))
+ .add((String)ie.getValue());
+ }
+ else throw new IllegalStateException("Unsupported 2i value type '" +
+ ie.getValue().getClass().getName() + "'");
+ }
+ }
+ return r;
+ }
+ public static void createKVData(RiakClient client, Namespace ns, String jsonData) throws IOException, ExecutionException, InterruptedException
+ {
+ final List> parsedData = parseRiakObjectsFromJsonData(jsonData);
+ for (Map.Entry pd: parsedData)
+ {
+ final String key = createKValue(client, ns, pd.getKey(), pd.getValue(), true);
+ }
+ }
+ protected static String createKValue(RiakClient client, Location location,
+ Object value, Boolean checkCreation ) throws ExecutionException, InterruptedException
+ {
+ return createKValue(client, location.getNamespace(), location.getKeyAsString(), value, checkCreation);
+ }
+ protected static String createKValue(RiakClient client, Namespace ns, String key,
+ Object value, Boolean checkCreation ) throws ExecutionException, InterruptedException
+ {
+ final StoreValue.Builder builder = new StoreValue.Builder(value)
+ .withOption(StoreValue.Option.PW, Quorum.allQuorum());
+ // Use provided key, if any
+ if (key != null && !key.isEmpty())
+ {
+ builder.withLocation(new Location(ns, key));
+ }
+ else
+ {
+ builder.withNamespace(ns);
+ }
+ final StoreValue cmd = builder
+ .withOption(StoreValue.Option.W, new Quorum(1))
+ .build();
+ final StoreValue.Response r = client.execute(cmd);
+ final String realKey = r.hasGeneratedKey() ? r.getGeneratedKey().toStringUtf8() : key;
+ if (checkCreation)
+ {
+ // -- check creation to be 100% sure that everything was created properly
+ final Location location = new Location(ns, BinaryValue.create(realKey));
+ FetchValue.Response fetchResponse = null;
+ for (int retryCount=6; retryCount>=0; --retryCount)
+ {
+ try
+ {
+ fetchResponse = fetchByLocation(client, location);
+ }
+ catch (IllegalStateException ex)
+ {
+ if (ex.getMessage().startsWith("Nothing was found") && retryCount > 1)
+ {
+ logger.trace("Value for '{}' hasn't been created yet, attempt {}", location, retryCount+1);
+ Thread.sleep(200);
+ continue;
+ }
+ throw ex;
+ }
+ }
+ // As soon as value is reachable by a key, it is expected that it also will be reachable by 2i
+ final RiakObject etalonRObj = value instanceof RiakObject ?
+ (RiakObject) value : fetchResponse.getValue(RiakObject.class);
+ for (RiakIndex> ri : etalonRObj.getIndexes())
+ {
+ assert(ri.values().size() == 1);
+ ri.values().forEach( v-> {
+ try {
+ final List locations = query2i(client, ns, ri.getName(), v);
+ throwIllegalStateIf( !locations.contains(location),
+ "Location '%s' is not reachable by 2i '%s'",
+ location, ri.getName());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+ return realKey;
+ }
+ protected static void throwIllegalStateIf(Boolean flag, String format, Object... args) throws IllegalStateException
+ {
+ if (flag)
+ {
+ throw new IllegalStateException(String.format(format, args));
+ }
+ }
+ protected static List query2i(RiakClient client, Namespace ns,
+ String indexName, T value) throws ExecutionException, InterruptedException
+ {
+ SecondaryIndexQuery,?, ?> cmd = null;
+ if (value instanceof String)
+ {
+ cmd = new BinIndexQuery.Builder(ns, indexName, (String)value).build();
+ }
+ else if (value instanceof Integer)
+ {
+ cmd = new IntIndexQuery.Builder(ns, indexName, ((Integer)value).longValue()).build();
+ }
+ else if (value instanceof Long)
+ {
+ cmd = new IntIndexQuery.Builder(ns, indexName, (Long)value).build();
+ }
+ else throwIllegalStateIf(true, "Type '%s' is not suitable for 2i", value.getClass().getName());
+ return client.execute(cmd)
+ .getEntries().stream()
+ .map(e->e.getRiakObjectLocation())
+ .collect(Collectors.toList());
+ }
+ protected static V fetchByLocationAs(RiakClient client, Location location, Class valueClazz)
+ throws ExecutionException, InterruptedException
+ {
+ final FetchValue.Response r = fetchByLocation(client, location);
+ throwIllegalStateIf(r.isNotFound(), "Nothing was found for location '%s'", location);
+ throwIllegalStateIf(r.getNumberOfValues() > 1,
+ "Fetch by Location '$location' returns more than one result: %d were actually returned",
+ r.getNumberOfValues());
+ final V v = r.getValue(valueClazz);
+ return v;
+ }
+ protected static FetchValue.Response fetchByLocation(RiakClient client, Location location)
+ throws ExecutionException, InterruptedException
+ {
+ final FetchValue cmd = new FetchValue.Builder(location).build();
+ final FetchValue.Response r = client.execute(cmd);
+ return r;
+ }
+ private static class VClockSerializer extends JsonSerializer
+ {
+ @Override
+ public void serialize(VClock value, JsonGenerator gen, SerializerProvider serializers) throws IOException, JsonProcessingException {
+ // Due to lack of support binary values in JsonUnit it is required to perform manual conversion to Base64
+ //gen.writeBinary(value.getBytes());
+ gen.writeString(Base64.getEncoder().encodeToString(value.getBytes()));
+ }
+ }
\ No newline at end of file
diff --git a/src/test/java/com/basho/riak/client/api/commands/itest/ITestFetchValue.java b/src/test/java/com/basho/riak/client/api/commands/itest/ITestFetchValue.java
index 7411d2a20..15a528ffe 100644
--- a/src/test/java/com/basho/riak/client/api/commands/itest/ITestFetchValue.java
+++ b/src/test/java/com/basho/riak/client/api/commands/itest/ITestFetchValue.java
@@ -16,15 +16,14 @@
package com.basho.riak.client.api.commands.itest;
+import com.basho.riak.client.RiakTestFunctions;
import com.basho.riak.client.api.cap.ConflictResolver;
import com.basho.riak.client.api.cap.ConflictResolverFactory;
import com.basho.riak.client.api.cap.UnresolvedConflictException;
import com.basho.riak.client.core.operations.itest.ITestAutoCleanupBase;
-import com.basho.riak.client.api.commands.kv.FetchValue.Option;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.annotations.RiakVClock;
-import com.basho.riak.client.api.cap.DefaultResolver;
import com.basho.riak.client.api.cap.VClock;
import com.basho.riak.client.core.operations.StoreBucketPropsOperation;
import com.basho.riak.client.api.commands.kv.StoreValue;
@@ -32,19 +31,17 @@
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.RiakObject;
-import com.basho.riak.client.core.query.indexes.LongIntIndex;
-import com.basho.riak.client.core.query.indexes.StringBinIndex;
-import com.basho.riak.client.core.util.BinaryValue;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import static org.junit.Assert.*;
import com.fasterxml.jackson.core.type.TypeReference;
+import static net.javacrumbs.jsonunit.JsonAssert.*;
+import net.javacrumbs.jsonunit.core.Option;
import org.junit.Assume;
-import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -57,49 +54,27 @@ public class ITestFetchValue extends ITestAutoCleanupBase
private RiakClient client = new RiakClient(cluster);
- public void simpleTestDefaultType()
- {
+ public void simpleTestDefaultType() throws ExecutionException, InterruptedException {
- public void simpleTestTestType()
- {
+ public void simpleTestTestType() throws ExecutionException, InterruptedException {
- private void simpleTest(String bucketType)
- {
- try
- {
- Namespace ns = new Namespace(bucketType, bucketName.toString());
- Location loc = new Location(ns, "test_fetch_key1");
- Pojo pojo = new Pojo();
- pojo.value = "test value";
- StoreValue sv =
- new StoreValue.Builder(pojo).withLocation(loc).build();
- StoreValue.Response resp = client.execute(sv);
+ private void simpleTest(String bucketType) throws ExecutionException, InterruptedException {
+ Namespace ns = new Namespace(bucketType, bucketName.toString());
+ Location loc = new Location(ns, "test_fetch_key1");
- FetchValue fv = new FetchValue.Builder(loc).build();
- FetchValue.Response fResp = client.execute(fv);
+ Pojo pojo = new Pojo();
+ pojo.value = "test value";
- assertEquals(pojo.value, fResp.getValue(Pojo.class).value);
+ createKValue(client, loc, pojo, false);
- RiakObject ro = fResp.getValue(RiakObject.class);
- assertNotNull(ro.getValue());
- assertEquals("{\"value\":\"test value\"}", ro.getValue().toString());
- }
- catch (ExecutionException ex)
- {
- System.out.println(ex.getCause().getCause());
- }
- catch (InterruptedException ex)
- {
- Logger.getLogger(ITestFetchValue.class.getName()).log(Level.SEVERE, null, ex);
- }
+ final Pojo theFetchedPojo = fetchByLocationAs(client, loc, Pojo.class);
+ assertJsonEquals("{value: 'test value', vclock: '${json-unit.regex}[a-zA-Z0-9+/=]+'}", theFetchedPojo);
@@ -117,10 +92,8 @@ public void notFoundTestType() throws ExecutionException, InterruptedException
private void notFound(String bucketType) throws ExecutionException, InterruptedException
- Namespace ns = new Namespace(bucketType, bucketName.toString());
- Location loc = new Location(ns, "test_fetch_key2");
- FetchValue fv = new FetchValue.Builder(loc).build();
- FetchValue.Response fResp = client.execute(fv);
+ final FetchValue.Response fResp = fetchByLocation(client,
+ new Location(new Namespace(bucketType, bucketName.toString()), "test_fetch_key2"));
@@ -157,7 +130,7 @@ public void ReproRiakTombstoneBehavior() throws ExecutionException, InterruptedE
FetchValue fv = new FetchValue.Builder(loc)
- .withOption(Option.DELETED_VCLOCK, false)
+ .withOption(FetchValue.Option.DELETED_VCLOCK, false)
FetchValue.Response fResp = client.execute(fv);
@@ -211,9 +184,7 @@ private void resolveSiblings(Namespace ns, ConflictResolver resolver) thro
.registerConflictResolver(pojoTypeRef, resolver);
- FetchValue fv = new FetchValue.Builder(loc).build();
- FetchValue.Response fResp = client.execute(fv);
+ FetchValue.Response fResp = fetchByLocation(client, loc);
assertEquals(2, fResp.getNumberOfValues());
@@ -233,103 +204,98 @@ private Pojo storeSiblings(Location loc) throws ExecutionException, InterruptedE
Pojo pojo = new Pojo();
pojo.value = "test value";
- StoreValue sv =
- new StoreValue.Builder(pojo).withLocation(loc).build();
- client.execute(sv);
+ createKValue(client, loc, pojo, false);
pojo.value = "Pick me!";
- sv = new StoreValue.Builder(pojo).withLocation(loc).build();
- client.execute(sv);
+ createKValue(client, loc, pojo, false);
return pojo;
- public void fetchAnnotatedPojoDefaultType() throws ExecutionException, InterruptedException
+ public void fetchAnnotatedPojoDefaultType() throws ExecutionException, InterruptedException, IOException
- public void fetchAnnotatedPojoTestType() throws ExecutionException, InterruptedException
+ public void fetchAnnotatedPojoTestType() throws ExecutionException, InterruptedException, IOException
- private void fetchAnnotatedPojo(String bucketType) throws ExecutionException, InterruptedException
+ private void fetchAnnotatedPojo(String bucketType) throws ExecutionException, InterruptedException, IOException
- Namespace ns = new Namespace(bucketType, bucketName.toString());
- Location loc = new Location(ns, "test_fetch_key5");
- String jsonValue = "{\"value\":\"my value\"}";
+ final Namespace ns = new Namespace(bucketType, bucketName.toString());
- RiakObject ro = new RiakObject()
- .setValue(BinaryValue.create(jsonValue))
- .setContentType("application/json");
+ createKVData(client, ns, "{" +
+ " key: 'test_fetch_key5', " +
+ " value: {" +
+ " value: 'my value'" +
+ " }" +
+ "}");
- StoreValue sv = new StoreValue.Builder(ro).withLocation(loc).build();
- client.execute(sv);
+ Location loc = new Location(ns, "test_fetch_key5");
- FetchValue fv = new FetchValue.Builder(loc).build();
- FetchValue.Response resp = client.execute(fv);
- RiakAnnotatedPojo rap = resp.getValue(RiakAnnotatedPojo.class);
- assertNotNull(rap.bucketName);
- assertEquals(ns.getBucketNameAsString(), rap.bucketName);
- assertNotNull(rap.key);
- assertEquals(loc.getKeyAsString(), rap.key);
- assertNotNull(rap.bucketType);
- assertEquals(ns.getBucketTypeAsString(), rap.bucketType);
- assertNotNull(rap.contentType);
- assertEquals(ro.getContentType(), rap.contentType);
- assertNotNull(rap.vclock);
- assertNotNull(rap.vtag);
- assertNotNull(rap.lastModified);
- assertNotNull(rap.value);
- assertFalse(rap.deleted);
- assertNotNull(rap.value);
- assertEquals("my value", rap.value);
+ final RiakAnnotatedPojo rap = fetchByLocationAs(client,
+ new Location(ns, "test_fetch_key5"),
+ RiakAnnotatedPojo.class);
+ assertJsonEquals("{" +
+ "key: 'test_fetch_key5', " +
+ "bucketName: 'ITestBase'," +
+ "bucketType: '" + ns.getBucketTypeAsString() +"'," +
+ "contentType: 'application/json'," +
+ "deleted: false," +
+ "value: 'my value'," +
+ "vclock: '${json-unit.any-string}'," +
+ "vtag: '${json-unit.any-string}'," +
+ "lastModified: '${json-unit.any-number}'" +
+ "}",
+ rap,
- public void fetchAnnotatedPojoWIthIndexes() throws ExecutionException, InterruptedException
+ public void fetchAnnotatedPojoWIthIndexes() throws ExecutionException, InterruptedException, IOException
- Namespace ns = new Namespace(Namespace.DEFAULT_BUCKET_TYPE, bucketName.toString());
- Location loc = new Location(ns,"test_fetch_key6");
- String jsonValue = "{\"value\":\"my value\"}";
- RiakObject ro = new RiakObject()
- .setValue(BinaryValue.create(jsonValue))
- .setContentType("application/json");
- ro.getIndexes().getIndex(StringBinIndex.named("email")).add("roach@basho.com");
- ro.getIndexes().getIndex(LongIntIndex.named("user_id")).add(1L);
- StoreValue sv = new StoreValue.Builder(ro).withLocation(loc).build();
- client.execute(sv);
- FetchValue fv = new FetchValue.Builder(loc).build();
- FetchValue.Response resp = client.execute(fv);
- RiakAnnotatedPojo rap = resp.getValue(RiakAnnotatedPojo.class);
- assertNotNull(rap.emailIndx);
- assertTrue(rap.emailIndx.contains("roach@basho.com"));
- assertEquals(rap.userId.longValue(), 1L);
+ final Namespace ns = new Namespace(Namespace.DEFAULT_BUCKET_TYPE, bucketName.toString());
+ createKVData(client, ns, "{" +
+ " key: 'test_fetch_key6', " +
+ " value: {" +
+ " value: 'my value'" +
+ " }," +
+ " indices: {" +
+ " email: 'roach@basho.com'," +
+ " user_id: 1" +
+ " }" +
+ "}");
+ final RiakAnnotatedPojo rap = RiakTestFunctions.fetchByLocationAs(client,
+ new Location(ns,"test_fetch_key6"),
+ RiakAnnotatedPojo.class);
+ assertJsonEquals("{" +
+ " key: 'test_fetch_key6'," +
+ " emailIndx: [" +
+ " 'roach@basho.com'" +
+ " ]," +
+ " userId: 1" +
+ "}",
+ rap,
public static class Pojo
- String value;
+ public String value;
- VClock vclock;
+ public VClock vclock;
public static class MyResolver implements ConflictResolver
diff --git a/src/test/java/com/basho/riak/client/core/operations/itest/ITestBase.java b/src/test/java/com/basho/riak/client/core/operations/itest/ITestBase.java
index 7667a22d4..376d24fe8 100644
--- a/src/test/java/com/basho/riak/client/core/operations/itest/ITestBase.java
+++ b/src/test/java/com/basho/riak/client/core/operations/itest/ITestBase.java
@@ -15,6 +15,7 @@
package com.basho.riak.client.core.operations.itest;
+import com.basho.riak.client.RiakTestFunctions;
import com.basho.riak.client.api.ListException;
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.kv.ListKeys;
@@ -54,7 +55,7 @@
* @author Brian Roach
* @since 2.0
-public abstract class ITestBase
+public abstract class ITestBase extends RiakTestFunctions
protected static RiakCluster cluster;
protected static boolean testYokozuna;