Skip to content

Commit

Permalink
Merge pull request #665 from basho/jbrisbin-tsput-column-names
Browse files Browse the repository at this point in the history
Add Column Names input for TS Store operation
  • Loading branch information
alexmoore authored Aug 25, 2016
2 parents 17b4096 + 632ccae commit 3023b84
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,37 @@ public static OtpOutputStream encodeTsQueryRequest(String queryText, byte[] cove
}

public static OtpOutputStream encodeTsPutRequest(String tableName, Collection<Row> rows)
{
return encodeTsPutRequest(tableName, Collections.<String>emptyList(), rows);
}

public static OtpOutputStream encodeTsPutRequest(String tableName, Collection<String> columns, Collection<Row> rows)
{
final OtpOutputStream os = new OtpOutputStream();
os.write(OtpExternal.versionTag); // NB: this is the reqired 0x83 (131) value

// TsPutReq is a 4-tuple: {'tsputreq', tableName, [], [rows]}
// columns is empte
// TsPutReq is a 4-tuple: {'tsputreq', tableName, [columns], [rows]}
os.write_tuple_head(4);

// tsputreq Atom
os.write_atom(TS_PUT_REQ);

// Table Name Binary
os.write_binary(tableName.getBytes(StandardCharsets.UTF_8));
// columns is an empty list

// Columns List
if(columns != null && !columns.isEmpty())
{
os.write_list_head(columns.size());

for (String column : columns)
{
os.write_binary(column.getBytes(StandardCharsets.UTF_8));
}
}
os.write_nil();

// write a list of rows
// Rows List
// each row is a tuple of cells
os.write_list_head(rows.size());
for (Row row : rows)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.basho.riak.client.core.operations.ts;

import com.basho.riak.client.core.operations.TTBFutureOperation;
import com.basho.riak.client.core.query.timeseries.CollectionConverters;
import com.basho.riak.client.core.query.timeseries.ColumnDescription;
import com.basho.riak.client.core.query.timeseries.ConvertibleIterable;
import com.basho.riak.client.core.query.timeseries.Row;
import com.basho.riak.protobuf.RiakTsPB;
import com.google.protobuf.ByteString;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

Expand Down Expand Up @@ -53,6 +58,7 @@ public static class Builder
{
private final String tableName;
private Collection<Row> rows;
private Collection<String> columns;

public Builder(String tableName)
{
Expand All @@ -64,9 +70,38 @@ public Builder(String tableName)
this.tableName = tableName;
}

public Builder withColumns(Collection<ColumnDescription> columns)
/**
* Add the names & order of the columns to be inserted.
* Order is implied by the order of the names in the Collection.
* <b>NOTE:</b>: As of Riak TS 1.4, this functionality is not implemented server-side,
* and any stored data is expected to be in the order of the table.
* @param columnNames The names of the columns to insert, and an implicit order.
* @return a reference to this object
*/
public Builder withColumns(Collection<String> columnNames)
{
throw new UnsupportedOperationException();
columns = columnNames;
return this;
}

/**
* Add the names & order of the columns to be inserted.
* Order is implied by the order of the ColumnDescriptions in the Collection.
* <b>NOTE:</b>: As of Riak TS 1.4, this functionality is not implemented server-side,
* and any stored data is expected to be in the order of the table.
* @param columns The ColumnDescriptions that contain a column name and an implicit order.
* @return a reference to this object
*/
public Builder withColumnDescriptions(Collection<? extends ColumnDescription> columns)
{
columns = new ArrayList<>(columns.size());

for (ColumnDescription column : columns)
{
this.columns.add(column.getName());
}

return this;
}

public Builder withRows(Collection<Row> rows)
Expand All @@ -85,6 +120,11 @@ public Collection<Row> getRows()
return rows;
}

public Collection<String> getColumns()
{
return columns;
}

public StoreOperation build()
{
return new StoreOperation(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static class StoreEncoder extends BuilderTTBEncoder<StoreOperation.Builder>
@Override
OtpOutputStream buildMessage()
{
return TermToBinaryCodec.encodeTsPutRequest(builder.getTableName(), builder.getRows());
return TermToBinaryCodec.encodeTsPutRequest(builder.getTableName(), builder.getColumns(), builder.getRows());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.0.3
*/
final class CollectionConverters
public final class CollectionConverters
{
private CollectionConverters() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void encodesPutRequestCorrectly_2()
// NB: this is what Erlang generates, an old-style float
// 99,51,46,52,50,57,57,57,57,57,57,57,57,57,57,57,57,57,55,49,53,55,56,101,43,48,49,0,0,0,0,0, // float_ext len 31
// NB: this is what JInterface generates, a new-style float
70, 64, 65, 38, 102, 102, 102, 102, 102,
70,64,65,38,102,102,102,102,102,
106, // null cell empty list
106}; // list arity 1 end

Expand All @@ -112,6 +112,60 @@ public void encodesPutRequestCorrectly_2()
}
}

@Test
public void encodesPutWithColumnsCorrectly()
{
/*
{tsputreq,<<"test_table">>,
[<<"a">>,<<"b">>,<<"c">>,<<"d">>,<<"e">>,<<"f">>,<<"g">>],
[{<<"series">>,<<"family">>,12345678,1,true,34.3,[]}]}
*/
final byte[] exp = {
(byte)131,104,4,
100,0,8,116,115,112,117,116,114,101,113,
109,0,0,0,10,116,101,115,116,95,116,97,98,108,101,
108,0,0,0,7,
109,0,0,0,1,97,
109,0,0,0,1,98,
109,0,0,0,1,99,
109,0,0,0,1,100,
109,0,0,0,1,101,
109,0,0,0,1,102,
109,0,0,0,1,103,
106,
108,0,0,0,1,
104,7,
109,0,0,0,6,115,101,114,105,101,115,
109,0,0,0,6,102,97,109,105,108,121,
98,0,(byte)188,97,78,
97,1,
100,0,4,116,114,117,101,
// NB: this is what Erlang generates, an old-style float
// 99,51,46,52,50,57,57,57,57,57,57,57,57,57,57,57,57,57,55,49,53,55,56,101,43,48,49,0,0,0,0,0, // float_ext len 31
// NB: this is what JInterface generates, a new-style float
70,64,65,38,102,102,102,102,102,
106,
106};


final List<String> columns = Arrays.asList("a", "b", "c", "d", "e", "f", "g");
final ArrayList<Row> rows = new ArrayList<>(1);
rows.add(new Row(new Cell("series"), new Cell("family"), Cell.newTimestamp(12345678),
new Cell(1L), new Cell(true), new Cell(34.3), null));

try
{
OtpOutputStream os = TermToBinaryCodec.encodeTsPutRequest(TABLE_NAME, columns, rows);
os.flush();
byte[] msg = os.toByteArray();
Assert.assertArrayEquals(exp, msg);
}
catch (IOException ex)
{
Assert.fail(ex.getMessage());
}
}

@Test
public void encodesGetRequestCorrectly()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.core.operations.ts.StoreOperation;
import com.basho.riak.client.core.query.timeseries.ColumnDescription;
import org.junit.Test;

import java.util.Collection;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertTrue;
Expand All @@ -26,4 +28,17 @@ public void writesDataWithoutError() throws ExecutionException, InterruptedExcep
assertTrue(future.isSuccess());
}

@Test
public void writesDataWithColumnsOptionWithoutError() throws ExecutionException, InterruptedException
{
StoreOperation storeOp = new StoreOperation.Builder(tableName)
.withColumnDescriptions(GeoCheckinWideTableDefinition.getFullColumnDescriptions())
.withRows(rows).build();

RiakFuture<Void, String> future = cluster.execute(storeOp);

future.get();
assertTrue(future.isSuccess());
}

}

0 comments on commit 3023b84

Please sign in to comment.