Skip to content

Commit a210865

Browse files
authored
chore: add support for arrow JSON extension (#75)
fixes: #73
1 parent 73cd3bc commit a210865

File tree

2 files changed

+189
-5
lines changed

2 files changed

+189
-5
lines changed

lib/src/main/java/io/cloudquery/types/JSONType.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package io.cloudquery.types;
22

33
import org.apache.arrow.memory.BufferAllocator;
4+
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
5+
import org.apache.arrow.vector.ExtensionTypeVector;
46
import org.apache.arrow.vector.FieldVector;
7+
import org.apache.arrow.vector.VarBinaryVector;
58
import org.apache.arrow.vector.types.pojo.ArrowType;
69
import org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType;
710
import org.apache.arrow.vector.types.pojo.FieldType;
@@ -12,7 +15,7 @@ public class JSONType extends ExtensionType {
1215

1316
@Override
1417
public ArrowType storageType() {
15-
return ArrowType.Binary.INSTANCE;
18+
return Binary.INSTANCE;
1619
}
1720

1821
@Override
@@ -22,22 +25,28 @@ public String extensionName() {
2225

2326
@Override
2427
public boolean extensionEquals(ExtensionType other) {
25-
return false;
28+
return other instanceof JSONType;
2629
}
2730

2831
@Override
2932
public String serialize() {
30-
return null;
33+
return "json-serialized";
3134
}
3235

3336
@Override
3437
public ArrowType deserialize(ArrowType storageType, String serializedData) {
35-
return null;
38+
if (!serializedData.equals("json-serialized")) {
39+
throw new IllegalArgumentException("Type identifier did not match: " + serializedData);
40+
}
41+
if (!storageType.equals(storageType())) {
42+
throw new IllegalArgumentException("invalid storage type for JSONType: " + storageType.getTypeID());
43+
}
44+
return new JSONType();
3645
}
3746

3847
@Override
3948
public FieldVector getNewVector(String name, FieldType fieldType, BufferAllocator allocator) {
40-
return null;
49+
return new JSONVector(name, allocator, new VarBinaryVector(name, allocator));
4150
}
4251

4352
@Override
@@ -49,4 +58,33 @@ public int hashCode() {
4958
public boolean equals(Object obj) {
5059
return obj instanceof JSONType;
5160
}
61+
62+
public static class JSONVector extends ExtensionTypeVector<VarBinaryVector> {
63+
public JSONVector(String name, BufferAllocator allocator, VarBinaryVector underlyingVector) {
64+
super(name, allocator, underlyingVector);
65+
}
66+
67+
@Override
68+
public Object getObject(int index) {
69+
return getUnderlyingVector().getObject(index);
70+
}
71+
72+
@Override
73+
public int hashCode(int index) {
74+
return hashCode(index, null);
75+
}
76+
77+
@Override
78+
public int hashCode(int index, ArrowBufHasher hasher) {
79+
return getUnderlyingVector().hashCode(index, hasher);
80+
}
81+
82+
public String get(int index) {
83+
return new String((byte[]) getObject(index));
84+
}
85+
86+
public void set(int index, String value) {
87+
getUnderlyingVector().setSafe(index, value.getBytes(), 0, value.getBytes().length);
88+
}
89+
}
5290
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package io.cloudquery.types;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.cloudquery.types.JSONType.JSONVector;
5+
import lombok.AllArgsConstructor;
6+
import lombok.Data;
7+
import org.apache.arrow.memory.BufferAllocator;
8+
import org.apache.arrow.memory.RootAllocator;
9+
import org.apache.arrow.vector.VectorSchemaRoot;
10+
import org.apache.arrow.vector.ipc.ArrowFileReader;
11+
import org.apache.arrow.vector.ipc.ArrowFileWriter;
12+
import org.apache.arrow.vector.types.pojo.ArrowType;
13+
import org.apache.arrow.vector.types.pojo.ExtensionTypeRegistry;
14+
import org.apache.arrow.vector.types.pojo.Field;
15+
import org.apache.arrow.vector.types.pojo.Schema;
16+
import org.junit.jupiter.api.AfterAll;
17+
import org.junit.jupiter.api.BeforeAll;
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
21+
import java.io.ByteArrayOutputStream;
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.io.OutputStream;
25+
import java.nio.channels.FileChannel;
26+
import java.nio.channels.WritableByteChannel;
27+
import java.nio.file.Files;
28+
import java.nio.file.Paths;
29+
import java.nio.file.StandardOpenOption;
30+
import java.util.Collections;
31+
import java.util.List;
32+
33+
import static org.junit.jupiter.api.Assertions.assertEquals;
34+
import static org.junit.jupiter.api.Assertions.assertTrue;
35+
36+
public class JSONTypeTest {
37+
private static final String FIELD_NAME = "json";
38+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
39+
40+
@Data
41+
@AllArgsConstructor
42+
public static class Person {
43+
private String name;
44+
private Integer age;
45+
private List<String> hobbies;
46+
}
47+
48+
private File file;
49+
private List<String> jsonData;
50+
51+
@BeforeAll
52+
public static void setUpTest() {
53+
ExtensionTypeRegistry.register(new JSONType());
54+
}
55+
56+
@AfterAll
57+
public static void tearDown() {
58+
ExtensionTypeRegistry.unregister(new JSONType());
59+
}
60+
61+
@BeforeEach
62+
void setUp() throws IOException {
63+
file = File.createTempFile("json_test", ".arrow");
64+
jsonData = List.of(
65+
toJSON(new Person("John", 30, List.of("hiking", "swimming"))),
66+
toJSON(new Person("Jane", 25, List.of("reading", "cooking")))
67+
);
68+
}
69+
70+
@Test
71+
public void shouldSetJSONOnJSONVector() throws IOException {
72+
73+
try (BufferAllocator allocator = new RootAllocator()) {
74+
ArrowType.ExtensionType jsonType = ExtensionTypeRegistry.lookup("json");
75+
try (JSONVector vector = (JSONVector) jsonType.getNewVector("vector", null, allocator)) {
76+
vector.set(0, jsonData.get(0));
77+
vector.setNull(1);
78+
vector.set(2, jsonData.get(1));
79+
vector.setNull(3);
80+
vector.setValueCount(4);
81+
82+
// Assert that the values were set correctly
83+
assertEquals(jsonData.get(0), vector.get(0), "JSON should match");
84+
assertTrue(vector.isNull(1), "Should be null");
85+
assertEquals(jsonData.get(1), vector.get(2), "JSON should match");
86+
assertTrue(vector.isNull(3), "Should be null");
87+
88+
// Assert that the value count and null count are correct
89+
assertEquals(4, vector.getValueCount(), "Value count should match");
90+
assertEquals(2, vector.getNullCount(), "Null count should match");
91+
}
92+
}
93+
}
94+
95+
@Test
96+
public void roundTripJSON() throws IOException {
97+
// Generate some data and write it to a file
98+
try (BufferAllocator allocator = new RootAllocator(); VectorSchemaRoot root = createVectorSchemaRoot(allocator)) {
99+
generateDataAndWriteToFile(root);
100+
}
101+
102+
// Read the data back from the file and assert that it matches what we wrote
103+
try (BufferAllocator allocator = new RootAllocator();
104+
ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(file.getAbsolutePath())), allocator)) {
105+
106+
reader.loadNextBatch();
107+
108+
JSONVector fieldVector = (JSONVector) reader.getVectorSchemaRoot().getVector(FIELD_NAME);
109+
assertEquals(jsonData.size(), fieldVector.getValueCount(), "Value count should match");
110+
for (int i = 0; i < jsonData.size(); i++) {
111+
assertEquals(jsonData.get(i), fieldVector.get(i), "JSON should match");
112+
}
113+
}
114+
}
115+
116+
private static VectorSchemaRoot createVectorSchemaRoot(BufferAllocator allocator) {
117+
return VectorSchemaRoot.create(new Schema(Collections.singletonList(Field.nullable(FIELD_NAME, new JSONType()))), allocator);
118+
}
119+
120+
private void generateDataAndWriteToFile(VectorSchemaRoot root) throws IOException {
121+
// Get the vector representing the column
122+
JSONVector vector = (JSONVector) root.getVector(FIELD_NAME);
123+
124+
// Generate some JSON data
125+
vector.setValueCount(jsonData.size());
126+
for (int i = 0; i < jsonData.size(); i++) {
127+
vector.set(i, jsonData.get(i));
128+
}
129+
root.setRowCount(jsonData.size());
130+
131+
// Write the data to a file
132+
try (WritableByteChannel channel = FileChannel.open(Paths.get(file.getAbsolutePath()), StandardOpenOption.WRITE);
133+
ArrowFileWriter writer = new ArrowFileWriter(root, null, channel)) {
134+
writer.start();
135+
writer.writeBatch();
136+
writer.end();
137+
}
138+
}
139+
140+
private static String toJSON(Object object) throws IOException {
141+
try (OutputStream outputStream = new ByteArrayOutputStream()) {
142+
OBJECT_MAPPER.writeValue(outputStream, object);
143+
return outputStream.toString();
144+
}
145+
}
146+
}

0 commit comments

Comments
 (0)