Skip to content

Commit 5fee296

Browse files
committed
chore: adding supporting decode for write logic
refs: #85
1 parent 4159f14 commit 5fee296

File tree

10 files changed

+295
-10
lines changed

10 files changed

+295
-10
lines changed

lib/src/main/java/io/cloudquery/helper/ArrowHelper.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,29 @@
55
import com.google.protobuf.ByteString;
66
import io.cloudquery.schema.Column;
77
import io.cloudquery.schema.Table;
8+
import io.cloudquery.schema.Table.TableBuilder;
89
import java.io.ByteArrayOutputStream;
910
import java.io.IOException;
1011
import java.nio.channels.Channels;
12+
import java.util.ArrayList;
1113
import java.util.HashMap;
1214
import java.util.List;
1315
import java.util.Map;
1416
import org.apache.arrow.memory.BufferAllocator;
1517
import org.apache.arrow.memory.RootAllocator;
1618
import org.apache.arrow.vector.VectorSchemaRoot;
19+
import org.apache.arrow.vector.ipc.ArrowReader;
20+
import org.apache.arrow.vector.ipc.ArrowStreamReader;
1721
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
1822
import org.apache.arrow.vector.types.pojo.Field;
1923
import org.apache.arrow.vector.types.pojo.Schema;
2024

2125
public class ArrowHelper {
26+
public static final String CQ_TABLE_NAME = "cq:table_name";
27+
public static final String CQ_TABLE_TITLE = "cq:table_title";
28+
public static final String CQ_TABLE_DESCRIPTION = "cq:table_description";
29+
public static final String CQ_TABLE_DEPENDS_ON = "cq:table_depends_on";
30+
2231
public static ByteString encode(Table table) throws IOException {
2332
try (BufferAllocator bufferAllocator = new RootAllocator()) {
2433
Schema schema = toArrowSchema(table);
@@ -34,6 +43,15 @@ public static ByteString encode(Table table) throws IOException {
3443
}
3544
}
3645

46+
public static Table decode(ByteString byteString) throws IOException {
47+
try (BufferAllocator bufferAllocator = new RootAllocator()) {
48+
try (ArrowReader reader = new ArrowStreamReader(byteString.newInput(), bufferAllocator)) {
49+
VectorSchemaRoot vectorSchemaRoot = reader.getVectorSchemaRoot();
50+
return fromArrowSchema(vectorSchemaRoot.getSchema());
51+
}
52+
}
53+
}
54+
3755
public static Schema toArrowSchema(Table table) {
3856
List<Column> columns = table.getColumns();
3957
Field[] fields = new Field[columns.size()];
@@ -43,16 +61,42 @@ public static Schema toArrowSchema(Table table) {
4361
fields[i] = field;
4462
}
4563
Map<String, String> metadata = new HashMap<>();
46-
metadata.put("cq:table_name", table.getName());
64+
metadata.put(CQ_TABLE_NAME, table.getName());
4765
if (table.getTitle() != null) {
48-
metadata.put("cq:table_title", table.getTitle());
66+
metadata.put(CQ_TABLE_TITLE, table.getTitle());
4967
}
5068
if (table.getDescription() != null) {
51-
metadata.put("cq:table_description", table.getDescription());
69+
metadata.put(CQ_TABLE_DESCRIPTION, table.getDescription());
5270
}
5371
if (table.getParent() != null) {
54-
metadata.put("cq:table_depends_on", table.getParent().getName());
72+
metadata.put(CQ_TABLE_DEPENDS_ON, table.getParent().getName());
5573
}
5674
return new Schema(asList(fields), metadata);
5775
}
76+
77+
public static Table fromArrowSchema(Schema schema) {
78+
List<Column> columns = new ArrayList<>();
79+
for (Field field : schema.getFields()) {
80+
columns.add(Column.builder().name(field.getName()).type(field.getType()).build());
81+
}
82+
83+
Map<String, String> metaData = schema.getCustomMetadata();
84+
String name = metaData.get(CQ_TABLE_NAME);
85+
String title = metaData.get(CQ_TABLE_TITLE);
86+
String description = metaData.get(CQ_TABLE_DESCRIPTION);
87+
String parent = metaData.get(CQ_TABLE_DEPENDS_ON);
88+
89+
TableBuilder tableBuilder = Table.builder().name(name).columns(columns);
90+
if (title != null) {
91+
tableBuilder.title(title);
92+
}
93+
if (description != null) {
94+
tableBuilder.description(description);
95+
}
96+
if (parent != null) {
97+
tableBuilder.parent(Table.builder().name(parent).build());
98+
}
99+
100+
return tableBuilder.build();
101+
}
58102
}

lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22

33
import com.google.protobuf.ByteString;
44
import io.cloudquery.helper.ArrowHelper;
5+
import io.cloudquery.messages.WriteMigrateTable;
56
import io.cloudquery.plugin.BackendOptions;
67
import io.cloudquery.plugin.NewClientOptions;
78
import io.cloudquery.plugin.Plugin;
89
import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase;
910
import io.cloudquery.plugin.v3.Write;
11+
import io.cloudquery.plugin.v3.Write.MessageMigrateTable;
1012
import io.cloudquery.schema.Table;
1113
import io.grpc.stub.StreamObserver;
14+
import java.io.IOException;
1215
import java.util.ArrayList;
1316
import java.util.List;
1417

@@ -107,13 +110,28 @@ public void read(
107110

108111
@Override
109112
public StreamObserver<Write.Request> write(StreamObserver<Write.Response> responseObserver) {
110-
plugin.write();
111113
return new StreamObserver<>() {
112114
@Override
113-
public void onNext(Write.Request request) {}
115+
public void onNext(Write.Request request) {
116+
Write.Request.MessageCase messageCase = request.getMessageCase();
117+
if (messageCase == Write.Request.MessageCase.MIGRATE_TABLE) {
118+
MessageMigrateTable migrateTable = request.getMigrateTable();
119+
ByteString byteString = migrateTable.getTable();
120+
Table table = null;
121+
try {
122+
table = ArrowHelper.decode(byteString);
123+
} catch (IOException ex) {
124+
onError(ex);
125+
}
126+
boolean migrateForce = request.getMigrateTable().getMigrateForce();
127+
plugin.write(new WriteMigrateTable(table, migrateForce));
128+
}
129+
}
114130

115131
@Override
116-
public void onError(Throwable t) {}
132+
public void onError(Throwable t) {
133+
responseObserver.onError(t);
134+
}
117135

118136
@Override
119137
public void onCompleted() {

lib/src/main/java/io/cloudquery/memdb/MemDB.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.cloudquery.memdb;
22

3+
import io.cloudquery.messages.WriteMessage;
34
import io.cloudquery.plugin.BackendOptions;
45
import io.cloudquery.plugin.ClientNotInitializedException;
56
import io.cloudquery.plugin.NewClientOptions;
@@ -93,8 +94,8 @@ public void read() {
9394
}
9495

9596
@Override
96-
public void write() {
97-
throw new UnsupportedOperationException("Unimplemented method 'Write'");
97+
public void write(WriteMessage message) {
98+
client.write(message);
9899
}
99100

100101
@Override
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,61 @@
11
package io.cloudquery.memdb;
22

3+
import io.cloudquery.messages.WriteMessage;
4+
import io.cloudquery.messages.WriteMigrateTable;
35
import io.cloudquery.schema.ClientMeta;
6+
import io.cloudquery.schema.Table;
7+
import io.cloudquery.schema.TableColumnChange;
8+
import java.util.ArrayList;
9+
import java.util.HashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.concurrent.locks.ReentrantReadWriteLock;
13+
import org.apache.arrow.vector.VectorSchemaRoot;
414

515
public class MemDBClient implements ClientMeta {
616
private static final String id = "memdb";
717

18+
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
19+
private Map<String, Table> tables = new HashMap<>();
20+
private Map<String, List<VectorSchemaRoot>> memDB = new HashMap<>();
21+
822
public MemDBClient() {}
923

1024
@Override
1125
public String getId() {
1226
return id;
1327
}
1428

29+
@Override
30+
public void write(WriteMessage message) {
31+
if (message instanceof WriteMigrateTable migrateTable) {
32+
migrate(migrateTable);
33+
}
34+
}
35+
1536
public void close() {
1637
// do nothing
1738
}
39+
40+
private void migrate(WriteMigrateTable migrateTable) {
41+
lock.writeLock().lock();
42+
try {
43+
Table table = migrateTable.getTable();
44+
String tableName = table.getName();
45+
if (!memDB.containsKey(tableName)) {
46+
memDB.put(tableName, new ArrayList<>());
47+
tables.put(tableName, table);
48+
return;
49+
}
50+
51+
List<TableColumnChange> changes = table.getChanges(tables.get(tableName));
52+
if (changes.isEmpty()) {
53+
return;
54+
}
55+
memDB.put(tableName, new ArrayList<>());
56+
tables.put(tableName, table);
57+
} finally {
58+
lock.writeLock().unlock();
59+
}
60+
}
1861
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package io.cloudquery.messages;
2+
3+
public abstract class WriteMessage {}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.cloudquery.messages;
2+
3+
import io.cloudquery.schema.Table;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Getter;
6+
7+
@AllArgsConstructor
8+
@Getter
9+
public class WriteMigrateTable extends WriteMessage {
10+
private Table table;
11+
private boolean migrateForce;
12+
}

lib/src/main/java/io/cloudquery/plugin/Plugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.cloudquery.plugin;
22

3+
import io.cloudquery.messages.WriteMessage;
34
import io.cloudquery.schema.ClientMeta;
45
import io.cloudquery.schema.SchemaException;
56
import io.cloudquery.schema.Table;
@@ -40,7 +41,7 @@ public abstract void sync(
4041

4142
public abstract void read();
4243

43-
public abstract void write();
44+
public abstract void write(WriteMessage message);
4445

4546
public abstract void close();
4647
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package io.cloudquery.schema;
22

3+
import io.cloudquery.messages.WriteMessage;
4+
35
public interface ClientMeta {
46
String getId();
7+
8+
void write(WriteMessage message);
59
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package io.cloudquery.helper;
2+
3+
import static io.cloudquery.helper.ArrowHelper.CQ_TABLE_DEPENDS_ON;
4+
import static io.cloudquery.helper.ArrowHelper.CQ_TABLE_DESCRIPTION;
5+
import static io.cloudquery.helper.ArrowHelper.CQ_TABLE_NAME;
6+
import static io.cloudquery.helper.ArrowHelper.CQ_TABLE_TITLE;
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
9+
import com.google.protobuf.ByteString;
10+
import io.cloudquery.schema.Column;
11+
import io.cloudquery.schema.Table;
12+
import java.io.IOException;
13+
import java.util.List;
14+
import java.util.Map;
15+
import org.apache.arrow.vector.types.pojo.ArrowType;
16+
import org.apache.arrow.vector.types.pojo.Field;
17+
import org.apache.arrow.vector.types.pojo.Schema;
18+
import org.junit.jupiter.api.Test;
19+
20+
public class ArrowHelperTest {
21+
22+
public static final Table TEST_TABLE =
23+
Table.builder()
24+
.name("table1")
25+
.description("A simple test table")
26+
.title("Test table title")
27+
.parent(Table.builder().name("parent").build())
28+
.columns(
29+
List.of(
30+
Column.builder().name("column1").type(ArrowType.Utf8.INSTANCE).build(),
31+
Column.builder().name("column2").type(ArrowType.Utf8.INSTANCE).build()))
32+
.build();
33+
34+
@Test
35+
public void testToArrowSchema() {
36+
Schema arrowSchema = ArrowHelper.toArrowSchema(TEST_TABLE);
37+
38+
assertEquals(arrowSchema.getFields().get(0).getName(), "column1");
39+
assertEquals(arrowSchema.getFields().get(1).getName(), "column2");
40+
41+
assertEquals(
42+
arrowSchema.getCustomMetadata(),
43+
Map.of(
44+
CQ_TABLE_NAME, "table1",
45+
CQ_TABLE_DESCRIPTION, "A simple test table",
46+
CQ_TABLE_TITLE, "Test table title",
47+
CQ_TABLE_DEPENDS_ON, "parent"));
48+
}
49+
50+
@Test
51+
public void testFromArrowSchema() {
52+
List<Field> fields =
53+
List.of(
54+
Field.nullable("column1", ArrowType.Utf8.INSTANCE),
55+
Field.nullable("column2", ArrowType.Utf8.INSTANCE));
56+
57+
Schema schema = new Schema(fields, Map.of(CQ_TABLE_NAME, "table1"));
58+
59+
Table table = ArrowHelper.fromArrowSchema(schema);
60+
61+
assertEquals(table.getName(), "table1");
62+
63+
for (int i = 0; i < table.getColumns().size(); i++) {
64+
Column column = table.getColumns().get(i);
65+
assertEquals(column.getName(), fields.get(i).getName());
66+
assertEquals(column.getType(), fields.get(i).getType());
67+
}
68+
}
69+
70+
@Test
71+
public void testRoundTrip() throws IOException {
72+
ByteString byteString = ArrowHelper.encode(TEST_TABLE);
73+
Table table = ArrowHelper.decode(byteString);
74+
75+
assertEquals(table.getName(), TEST_TABLE.getName());
76+
assertEquals(table.getDescription(), TEST_TABLE.getDescription());
77+
assertEquals(table.getTitle(), TEST_TABLE.getTitle());
78+
assertEquals(table.getParent().getName(), TEST_TABLE.getParent().getName());
79+
80+
for (int i = 0; i < TEST_TABLE.getColumns().size(); i++) {
81+
assertEquals(TEST_TABLE.getColumns().get(i).getName(), table.getColumns().get(i).getName());
82+
assertEquals(TEST_TABLE.getColumns().get(i).getType(), table.getColumns().get(i).getType());
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)