Skip to content

chore: add support for arrow JSON extension #75

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions lib/src/main/java/io/cloudquery/types/JSONType.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.cloudquery.types;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
import org.apache.arrow.vector.ExtensionTypeVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType;
import org.apache.arrow.vector.types.pojo.FieldType;
Expand All @@ -12,7 +15,7 @@ public class JSONType extends ExtensionType {

@Override
public ArrowType storageType() {
return ArrowType.Binary.INSTANCE;
return Binary.INSTANCE;
}

@Override
Expand All @@ -22,22 +25,28 @@ public String extensionName() {

@Override
public boolean extensionEquals(ExtensionType other) {
return false;
return other instanceof JSONType;
}

@Override
public String serialize() {
return null;
return "json-serialized";
}

@Override
public ArrowType deserialize(ArrowType storageType, String serializedData) {
return null;
if (!serializedData.equals("json-serialized")) {
throw new IllegalArgumentException("Type identifier did not match: " + serializedData);
}
if (!storageType.equals(storageType())) {
throw new IllegalArgumentException("invalid storage type for JSONType: " + storageType.getTypeID());
}
return new JSONType();
}

@Override
public FieldVector getNewVector(String name, FieldType fieldType, BufferAllocator allocator) {
return null;
return new JSONVector(name, allocator, new VarBinaryVector(name, allocator));
}

@Override
Expand All @@ -49,4 +58,33 @@ public int hashCode() {
public boolean equals(Object obj) {
return obj instanceof JSONType;
}

public static class JSONVector extends ExtensionTypeVector<VarBinaryVector> {
public JSONVector(String name, BufferAllocator allocator, VarBinaryVector underlyingVector) {
super(name, allocator, underlyingVector);
}

@Override
public Object getObject(int index) {
return getUnderlyingVector().getObject(index);
}

@Override
public int hashCode(int index) {
return hashCode(index, null);
}

@Override
public int hashCode(int index, ArrowBufHasher hasher) {
return getUnderlyingVector().hashCode(index, hasher);
}

public String get(int index) {
return new String((byte[]) getObject(index));
}

public void set(int index, String value) {
getUnderlyingVector().setSafe(index, value.getBytes(), 0, value.getBytes().length);
}
}
}
146 changes: 146 additions & 0 deletions lib/src/test/java/io/cloudquery/types/JSONTypeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package io.cloudquery.types;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudquery.types.JSONType.JSONVector;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ExtensionTypeRegistry;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class JSONTypeTest {
private static final String FIELD_NAME = "json";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Data
@AllArgsConstructor
public static class Person {
private String name;
private Integer age;
private List<String> hobbies;
}

private File file;
private List<String> jsonData;

@BeforeAll
public static void setUpTest() {
ExtensionTypeRegistry.register(new JSONType());
}

@AfterAll
public static void tearDown() {
ExtensionTypeRegistry.unregister(new JSONType());
}

@BeforeEach
void setUp() throws IOException {
file = File.createTempFile("json_test", ".arrow");
jsonData = List.of(
toJSON(new Person("John", 30, List.of("hiking", "swimming"))),
toJSON(new Person("Jane", 25, List.of("reading", "cooking")))
);
}

@Test
public void shouldSetJSONOnJSONVector() throws IOException {

try (BufferAllocator allocator = new RootAllocator()) {
ArrowType.ExtensionType jsonType = ExtensionTypeRegistry.lookup("json");
try (JSONVector vector = (JSONVector) jsonType.getNewVector("vector", null, allocator)) {
vector.set(0, jsonData.get(0));
vector.setNull(1);
vector.set(2, jsonData.get(1));
vector.setNull(3);
vector.setValueCount(4);

// Assert that the values were set correctly
assertEquals(jsonData.get(0), vector.get(0), "JSON should match");
assertTrue(vector.isNull(1), "Should be null");
assertEquals(jsonData.get(1), vector.get(2), "JSON should match");
assertTrue(vector.isNull(3), "Should be null");

// Assert that the value count and null count are correct
assertEquals(4, vector.getValueCount(), "Value count should match");
assertEquals(2, vector.getNullCount(), "Null count should match");
}
}
}

@Test
public void roundTripJSON() throws IOException {
// Generate some data and write it to a file
try (BufferAllocator allocator = new RootAllocator(); VectorSchemaRoot root = createVectorSchemaRoot(allocator)) {
generateDataAndWriteToFile(root);
}

// Read the data back from the file and assert that it matches what we wrote
try (BufferAllocator allocator = new RootAllocator();
ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(file.getAbsolutePath())), allocator)) {

reader.loadNextBatch();

JSONVector fieldVector = (JSONVector) reader.getVectorSchemaRoot().getVector(FIELD_NAME);
assertEquals(jsonData.size(), fieldVector.getValueCount(), "Value count should match");
for (int i = 0; i < jsonData.size(); i++) {
assertEquals(jsonData.get(i), fieldVector.get(i), "JSON should match");
}
}
}

private static VectorSchemaRoot createVectorSchemaRoot(BufferAllocator allocator) {
return VectorSchemaRoot.create(new Schema(Collections.singletonList(Field.nullable(FIELD_NAME, new JSONType()))), allocator);
}

private void generateDataAndWriteToFile(VectorSchemaRoot root) throws IOException {
// Get the vector representing the column
JSONVector vector = (JSONVector) root.getVector(FIELD_NAME);

// Generate some JSON data
vector.setValueCount(jsonData.size());
for (int i = 0; i < jsonData.size(); i++) {
vector.set(i, jsonData.get(i));
}
root.setRowCount(jsonData.size());

// Write the data to a file
try (WritableByteChannel channel = FileChannel.open(Paths.get(file.getAbsolutePath()), StandardOpenOption.WRITE);
ArrowFileWriter writer = new ArrowFileWriter(root, null, channel)) {
writer.start();
writer.writeBatch();
writer.end();
}
}

private static String toJSON(Object object) throws IOException {
try (OutputStream outputStream = new ByteArrayOutputStream()) {
OBJECT_MAPPER.writeValue(outputStream, object);
return outputStream.toString();
}
}
}