Skip to content

Commit 461df0e

Browse files
committed
refactor: moving arrow conversion methods into a helper class
refs: #85
1 parent 6abae6d commit 461df0e

File tree

5 files changed

+64
-52
lines changed

5 files changed

+64
-52
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.cloudquery.helper;
2+
3+
import static java.util.Arrays.asList;
4+
5+
import com.google.protobuf.ByteString;
6+
import io.cloudquery.schema.Column;
7+
import io.cloudquery.schema.Table;
8+
import java.io.ByteArrayOutputStream;
9+
import java.io.IOException;
10+
import java.nio.channels.Channels;
11+
import java.util.HashMap;
12+
import java.util.List;
13+
import java.util.Map;
14+
import org.apache.arrow.memory.BufferAllocator;
15+
import org.apache.arrow.memory.RootAllocator;
16+
import org.apache.arrow.vector.VectorSchemaRoot;
17+
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
18+
import org.apache.arrow.vector.types.pojo.Field;
19+
import org.apache.arrow.vector.types.pojo.Schema;
20+
21+
public class ArrowHelper {
22+
public static ByteString encode(Table table) throws IOException {
23+
try (BufferAllocator bufferAllocator = new RootAllocator()) {
24+
Schema schema = toArrowSchema(table);
25+
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
26+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
27+
try (ArrowStreamWriter writer =
28+
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
29+
writer.start();
30+
writer.end();
31+
return ByteString.copyFrom(out.toByteArray());
32+
}
33+
}
34+
}
35+
}
36+
37+
public static Schema toArrowSchema(Table table) {
38+
List<Column> columns = table.getColumns();
39+
Field[] fields = new Field[columns.size()];
40+
for (int i = 0; i < columns.size(); i++) {
41+
Column column = columns.get(i);
42+
Field field = Field.nullable(column.getName(), column.getType());
43+
fields[i] = field;
44+
}
45+
Map<String, String> metadata = new HashMap<>();
46+
metadata.put("cq:table_name", table.getName());
47+
if (table.getTitle() != null) {
48+
metadata.put("cq:table_title", table.getTitle());
49+
}
50+
if (table.getDescription() != null) {
51+
metadata.put("cq:table_description", table.getDescription());
52+
}
53+
if (table.getParent() != null) {
54+
metadata.put("cq:table_depends_on", table.getParent().getName());
55+
}
56+
return new Schema(asList(fields), metadata);
57+
}
58+
}

lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.cloudquery.internal.servers.plugin.v3;
22

33
import com.google.protobuf.ByteString;
4+
import io.cloudquery.helper.ArrowHelper;
45
import io.cloudquery.plugin.BackendOptions;
56
import io.cloudquery.plugin.NewClientOptions;
67
import io.cloudquery.plugin.Plugin;
@@ -65,7 +66,7 @@ public void getTables(
6566
request.getSkipDependentTables());
6667
List<ByteString> byteStrings = new ArrayList<>();
6768
for (Table table : Table.flattenTables(tables)) {
68-
byteStrings.add(table.encode());
69+
byteStrings.add(ArrowHelper.encode(table));
6970
}
7071
responseObserver.onNext(
7172
io.cloudquery.plugin.v3.GetTables.Response.newBuilder()

lib/src/main/java/io/cloudquery/scheduler/Scheduler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.cloudquery.scheduler;
22

3+
import io.cloudquery.helper.ArrowHelper;
34
import io.cloudquery.plugin.v3.Sync;
45
import io.cloudquery.schema.ClientMeta;
56
import io.cloudquery.schema.Table;
@@ -25,7 +26,7 @@ public void sync() {
2526
try {
2627
logger.info("sending migrate message for table: {}", table.getName());
2728
Sync.MessageMigrateTable migrateTable =
28-
Sync.MessageMigrateTable.newBuilder().setTable(table.encode()).build();
29+
Sync.MessageMigrateTable.newBuilder().setTable(ArrowHelper.encode(table)).build();
2930
Sync.Response response = Sync.Response.newBuilder().setMigrateTable(migrateTable).build();
3031
syncStream.onNext(response);
3132
} catch (IOException e) {

lib/src/main/java/io/cloudquery/schema/Resource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.cloudquery.schema;
22

33
import com.google.protobuf.ByteString;
4+
import io.cloudquery.helper.ArrowHelper;
45
import io.cloudquery.scalar.Scalar;
56
import io.cloudquery.scalar.ValidationException;
67
import java.io.IOException;
@@ -42,6 +43,6 @@ public Scalar<?> get(String columnName) {
4243

4344
public ByteString encode() throws IOException {
4445
// TODO: Encode data and not only schema
45-
return table.encode();
46+
return ArrowHelper.encode(table);
4647
}
4748
}

lib/src/main/java/io/cloudquery/schema/Table.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
11
package io.cloudquery.schema;
22

3-
import static java.util.Arrays.asList;
4-
5-
import com.google.protobuf.ByteString;
63
import io.cloudquery.glob.Glob;
74
import io.cloudquery.schema.Column.ColumnBuilder;
85
import io.cloudquery.transformers.TransformerException;
9-
import java.io.ByteArrayOutputStream;
10-
import java.io.IOException;
11-
import java.nio.channels.Channels;
126
import java.util.ArrayList;
137
import java.util.Collections;
148
import java.util.HashMap;
@@ -20,12 +14,6 @@
2014
import lombok.Getter;
2115
import lombok.NonNull;
2216
import lombok.Setter;
23-
import org.apache.arrow.memory.BufferAllocator;
24-
import org.apache.arrow.memory.RootAllocator;
25-
import org.apache.arrow.vector.VectorSchemaRoot;
26-
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
27-
import org.apache.arrow.vector.types.pojo.Field;
28-
import org.apache.arrow.vector.types.pojo.Schema;
2917

3018
@Builder(toBuilder = true)
3119
@Getter
@@ -215,41 +203,4 @@ public Optional<Column> getColumn(String name) {
215203
}
216204
return Optional.empty();
217205
}
218-
219-
public Schema toArrowSchema() {
220-
Field[] fields = new Field[columns.size()];
221-
for (int i = 0; i < columns.size(); i++) {
222-
Column column = columns.get(i);
223-
Field field = Field.nullable(column.getName(), column.getType());
224-
fields[i] = field;
225-
}
226-
Map<String, String> metadata = new HashMap<>();
227-
metadata.put("cq:table_name", name);
228-
if (title != null) {
229-
metadata.put("cq:table_title", title);
230-
}
231-
if (description != null) {
232-
metadata.put("cq:table_description", description);
233-
}
234-
if (parent != null) {
235-
metadata.put("cq:table_depends_on", parent.getName());
236-
}
237-
Schema schema = new Schema(asList(fields), metadata);
238-
return schema;
239-
}
240-
241-
public ByteString encode() throws IOException {
242-
try (BufferAllocator bufferAllocator = new RootAllocator()) {
243-
Schema schema = toArrowSchema();
244-
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
245-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
246-
try (ArrowStreamWriter writer =
247-
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
248-
writer.start();
249-
writer.end();
250-
return ByteString.copyFrom(out.toByteArray());
251-
}
252-
}
253-
}
254-
}
255206
}

0 commit comments

Comments
 (0)