From d128533af65640bad68c06f402f3f416225a8b38 Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Wed, 24 Aug 2016 17:09:39 -0500 Subject: [PATCH 1/2] Partially implement withColumns method in the StoreOperation.Builder class to prevent the Spark connector from failing due to calling this method. Understand that this functionality isn't implemented all the way back to Riak. There will need to be some additional work to fully support passing the column names to TS. --- .../client/core/operations/ts/StoreOperation.java | 11 ++++++++++- .../core/query/timeseries/CollectionConverters.java | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java b/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java index d5e9c410e..0db5f6a9d 100644 --- a/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java @@ -1,8 +1,12 @@ package com.basho.riak.client.core.operations.ts; import com.basho.riak.client.core.operations.TTBFutureOperation; +import com.basho.riak.client.core.query.timeseries.CollectionConverters; import com.basho.riak.client.core.query.timeseries.ColumnDescription; +import com.basho.riak.client.core.query.timeseries.ConvertibleIterable; import com.basho.riak.client.core.query.timeseries.Row; +import com.basho.riak.protobuf.RiakTsPB; +import com.google.protobuf.ByteString; import java.util.Collection; import java.util.List; @@ -51,6 +55,7 @@ private String createQueryInfoMessage() public static class Builder { + private final RiakTsPB.TsPutReq.Builder reqBuilder; private final String tableName; private Collection rows; @@ -62,16 +67,20 @@ public Builder(String tableName) } this.tableName = tableName; + this.reqBuilder = RiakTsPB.TsPutReq.newBuilder(); + this.reqBuilder.setTable(ByteString.copyFromUtf8(tableName)); } public Builder withColumns(Collection columns) { - throw new UnsupportedOperationException(); + this.reqBuilder.addAllColumns(CollectionConverters.convertColumnDescriptionsToPb(columns)); + return this; } public Builder withRows(Collection rows) { this.rows = rows; + this.reqBuilder.addAllRows(ConvertibleIterable.asIterablePbRow(rows)); return this; } diff --git a/src/main/java/com/basho/riak/client/core/query/timeseries/CollectionConverters.java b/src/main/java/com/basho/riak/client/core/query/timeseries/CollectionConverters.java index 1114fa769..4ec0893d5 100644 --- a/src/main/java/com/basho/riak/client/core/query/timeseries/CollectionConverters.java +++ b/src/main/java/com/basho/riak/client/core/query/timeseries/CollectionConverters.java @@ -11,7 +11,7 @@ * @author Sergey Galkin * @since 2.0.3 */ -final class CollectionConverters +public final class CollectionConverters { private CollectionConverters() {} From 2dd723e1e023fdf67f8cba2ff169a06930cba72b Mon Sep 17 00:00:00 2001 From: Alex Moore Date: Thu, 25 Aug 2016 14:49:46 -0400 Subject: [PATCH 2/2] Add column support to TS Store operation --- .../client/core/codec/TermToBinaryCodec.java | 26 +++++++-- .../core/operations/ts/StoreOperation.java | 43 ++++++++++++-- .../core/operations/ts/TTBConverters.java | 2 +- .../core/codec/TermToBinaryCodecTest.java | 56 ++++++++++++++++++- .../itest/ts/ITestStoreOperation.java | 15 +++++ 5 files changed, 130 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/basho/riak/client/core/codec/TermToBinaryCodec.java b/src/main/java/com/basho/riak/client/core/codec/TermToBinaryCodec.java index bb02c3d7a..bab1e3bef 100644 --- a/src/main/java/com/basho/riak/client/core/codec/TermToBinaryCodec.java +++ b/src/main/java/com/basho/riak/client/core/codec/TermToBinaryCodec.java @@ -86,19 +86,37 @@ public static OtpOutputStream encodeTsQueryRequest(String queryText, byte[] cove } public static OtpOutputStream encodeTsPutRequest(String tableName, Collection rows) + { + return encodeTsPutRequest(tableName, Collections.emptyList(), rows); + } + + public static OtpOutputStream encodeTsPutRequest(String tableName, Collection columns, Collection rows) { final OtpOutputStream os = new OtpOutputStream(); os.write(OtpExternal.versionTag); // NB: this is the reqired 0x83 (131) value - // TsPutReq is a 4-tuple: {'tsputreq', tableName, [], [rows]} - // columns is empte + // TsPutReq is a 4-tuple: {'tsputreq', tableName, [columns], [rows]} os.write_tuple_head(4); + + // tsputreq Atom os.write_atom(TS_PUT_REQ); + + // Table Name Binary os.write_binary(tableName.getBytes(StandardCharsets.UTF_8)); - // columns is an empty list + + // Columns List + if(columns != null && !columns.isEmpty()) + { + os.write_list_head(columns.size()); + + for (String column : columns) + { + os.write_binary(column.getBytes(StandardCharsets.UTF_8)); + } + } os.write_nil(); - // write a list of rows + // Rows List // each row is a tuple of cells os.write_list_head(rows.size()); for (Row row : rows) diff --git a/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java b/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java index 0db5f6a9d..058e79931 100644 --- a/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java @@ -8,6 +8,7 @@ import com.basho.riak.protobuf.RiakTsPB; import com.google.protobuf.ByteString; +import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -55,9 +56,9 @@ private String createQueryInfoMessage() public static class Builder { - private final RiakTsPB.TsPutReq.Builder reqBuilder; private final String tableName; private Collection rows; + private Collection columns; public Builder(String tableName) { @@ -67,20 +68,45 @@ public Builder(String tableName) } this.tableName = tableName; - this.reqBuilder = RiakTsPB.TsPutReq.newBuilder(); - this.reqBuilder.setTable(ByteString.copyFromUtf8(tableName)); } - public Builder withColumns(Collection columns) + /** + * Add the names & order of the columns to be inserted. + * Order is implied by the order of the names in the Collection. + * NOTE:: As of Riak TS 1.4, this functionality is not implemented server-side, + * and any stored data is expected to be in the order of the table. + * @param columnNames The names of the columns to insert, and an implicit order. + * @return a reference to this object + */ + public Builder withColumns(Collection columnNames) { - this.reqBuilder.addAllColumns(CollectionConverters.convertColumnDescriptionsToPb(columns)); + columns = columnNames; + return this; + } + + /** + * Add the names & order of the columns to be inserted. + * Order is implied by the order of the ColumnDescriptions in the Collection. + * NOTE:: As of Riak TS 1.4, this functionality is not implemented server-side, + * and any stored data is expected to be in the order of the table. + * @param columns The ColumnDescriptions that contain a column name and an implicit order. + * @return a reference to this object + */ + public Builder withColumnDescriptions(Collection columns) + { + columns = new ArrayList<>(columns.size()); + + for (ColumnDescription column : columns) + { + this.columns.add(column.getName()); + } + return this; } public Builder withRows(Collection rows) { this.rows = rows; - this.reqBuilder.addAllRows(ConvertibleIterable.asIterablePbRow(rows)); return this; } @@ -94,6 +120,11 @@ public Collection getRows() return rows; } + public Collection getColumns() + { + return columns; + } + public StoreOperation build() { return new StoreOperation(this); diff --git a/src/main/java/com/basho/riak/client/core/operations/ts/TTBConverters.java b/src/main/java/com/basho/riak/client/core/operations/ts/TTBConverters.java index f6a5f94f7..290f1f6f5 100644 --- a/src/main/java/com/basho/riak/client/core/operations/ts/TTBConverters.java +++ b/src/main/java/com/basho/riak/client/core/operations/ts/TTBConverters.java @@ -44,7 +44,7 @@ static class StoreEncoder extends BuilderTTBEncoder @Override OtpOutputStream buildMessage() { - return TermToBinaryCodec.encodeTsPutRequest(builder.getTableName(), builder.getRows()); + return TermToBinaryCodec.encodeTsPutRequest(builder.getTableName(), builder.getColumns(), builder.getRows()); } } diff --git a/src/test/java/com/basho/riak/client/core/codec/TermToBinaryCodecTest.java b/src/test/java/com/basho/riak/client/core/codec/TermToBinaryCodecTest.java index 6eaf0ab0e..9bd9be2c7 100644 --- a/src/test/java/com/basho/riak/client/core/codec/TermToBinaryCodecTest.java +++ b/src/test/java/com/basho/riak/client/core/codec/TermToBinaryCodecTest.java @@ -91,7 +91,7 @@ public void encodesPutRequestCorrectly_2() // NB: this is what Erlang generates, an old-style float // 99,51,46,52,50,57,57,57,57,57,57,57,57,57,57,57,57,57,55,49,53,55,56,101,43,48,49,0,0,0,0,0, // float_ext len 31 // NB: this is what JInterface generates, a new-style float - 70, 64, 65, 38, 102, 102, 102, 102, 102, + 70,64,65,38,102,102,102,102,102, 106, // null cell empty list 106}; // list arity 1 end @@ -112,6 +112,60 @@ public void encodesPutRequestCorrectly_2() } } + @Test + public void encodesPutWithColumnsCorrectly() + { + /* + {tsputreq,<<"test_table">>, + [<<"a">>,<<"b">>,<<"c">>,<<"d">>,<<"e">>,<<"f">>,<<"g">>], + [{<<"series">>,<<"family">>,12345678,1,true,34.3,[]}]} + */ + final byte[] exp = { + (byte)131,104,4, + 100,0,8,116,115,112,117,116,114,101,113, + 109,0,0,0,10,116,101,115,116,95,116,97,98,108,101, + 108,0,0,0,7, + 109,0,0,0,1,97, + 109,0,0,0,1,98, + 109,0,0,0,1,99, + 109,0,0,0,1,100, + 109,0,0,0,1,101, + 109,0,0,0,1,102, + 109,0,0,0,1,103, + 106, + 108,0,0,0,1, + 104,7, + 109,0,0,0,6,115,101,114,105,101,115, + 109,0,0,0,6,102,97,109,105,108,121, + 98,0,(byte)188,97,78, + 97,1, + 100,0,4,116,114,117,101, + // NB: this is what Erlang generates, an old-style float + // 99,51,46,52,50,57,57,57,57,57,57,57,57,57,57,57,57,57,55,49,53,55,56,101,43,48,49,0,0,0,0,0, // float_ext len 31 + // NB: this is what JInterface generates, a new-style float + 70,64,65,38,102,102,102,102,102, + 106, + 106}; + + + final List columns = Arrays.asList("a", "b", "c", "d", "e", "f", "g"); + final ArrayList rows = new ArrayList<>(1); + rows.add(new Row(new Cell("series"), new Cell("family"), Cell.newTimestamp(12345678), + new Cell(1L), new Cell(true), new Cell(34.3), null)); + + try + { + OtpOutputStream os = TermToBinaryCodec.encodeTsPutRequest(TABLE_NAME, columns, rows); + os.flush(); + byte[] msg = os.toByteArray(); + Assert.assertArrayEquals(exp, msg); + } + catch (IOException ex) + { + Assert.fail(ex.getMessage()); + } + } + @Test public void encodesGetRequestCorrectly() { diff --git a/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestStoreOperation.java b/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestStoreOperation.java index c46e3eca5..d1a7456b4 100644 --- a/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestStoreOperation.java +++ b/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestStoreOperation.java @@ -2,8 +2,10 @@ import com.basho.riak.client.core.RiakFuture; import com.basho.riak.client.core.operations.ts.StoreOperation; +import com.basho.riak.client.core.query.timeseries.ColumnDescription; import org.junit.Test; +import java.util.Collection; import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertTrue; @@ -26,4 +28,17 @@ public void writesDataWithoutError() throws ExecutionException, InterruptedExcep assertTrue(future.isSuccess()); } + @Test + public void writesDataWithColumnsOptionWithoutError() throws ExecutionException, InterruptedException + { + StoreOperation storeOp = new StoreOperation.Builder(tableName) + .withColumnDescriptions(GeoCheckinWideTableDefinition.getFullColumnDescriptions()) + .withRows(rows).build(); + + RiakFuture future = cluster.execute(storeOp); + + future.get(); + assertTrue(future.isSuccess()); + } + }