-
Notifications
You must be signed in to change notification settings - Fork 2
chore: implment write insert logic #93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,3 +31,4 @@ build | |
|
||
# Intellij | ||
.idea | ||
.cq |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
import static java.util.Arrays.asList; | ||
|
||
import com.google.protobuf.ByteString; | ||
import io.cloudquery.scalar.ValidationException; | ||
import io.cloudquery.schema.Column; | ||
import io.cloudquery.schema.Resource; | ||
import io.cloudquery.schema.Table; | ||
|
@@ -221,14 +222,12 @@ public static Table fromArrowSchema(Schema schema) { | |
String constraintName = metaData.get(CQ_EXTENSION_CONSTRAINT_NAME); | ||
|
||
TableBuilder tableBuilder = | ||
Table.builder().name(name).constraintName(constraintName).columns(columns); | ||
|
||
if (title != null) { | ||
tableBuilder.title(title); | ||
} | ||
if (description != null) { | ||
tableBuilder.description(description); | ||
} | ||
Table.builder() | ||
.name(name) | ||
.constraintName(constraintName) | ||
.columns(columns) | ||
.title(title) | ||
.description(description); | ||
if (parent != null) { | ||
tableBuilder.parent(Table.builder().name(parent).build()); | ||
} | ||
|
@@ -260,4 +259,23 @@ public static ByteString encode(Resource resource) throws IOException { | |
} | ||
} | ||
} | ||
|
||
public static Resource decodeResource(ByteString byteString) | ||
throws IOException, ValidationException { | ||
try (BufferAllocator bufferAllocator = new RootAllocator()) { | ||
try (ArrowStreamReader reader = | ||
new ArrowStreamReader(byteString.newInput(), bufferAllocator)) { | ||
VectorSchemaRoot vectorSchemaRoot = reader.getVectorSchemaRoot(); | ||
reader.loadNextBatch(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we would need to loop on Though maybe not an issue at the moment since we always send a single record batch? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently the method signature implies only a single item There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's keep it as is for now and change it if we run into issues |
||
Resource resource = | ||
Resource.builder().table(fromArrowSchema(vectorSchemaRoot.getSchema())).build(); | ||
for (int i = 0; i < vectorSchemaRoot.getSchema().getFields().size(); i++) { | ||
FieldVector vector = vectorSchemaRoot.getVector(i); | ||
// TODO: We currently only support a single row | ||
resource.set(vector.getName(), vector.getObject(0)); | ||
} | ||
return resource; | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package io.cloudquery.messages; | ||
|
||
import io.cloudquery.schema.Resource; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
|
||
@AllArgsConstructor | ||
@Getter | ||
public class WriteInsert extends WriteMessage { | ||
private Resource resource; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,6 +105,9 @@ public static Scalar<?> fromArrowType(ArrowType arrowType) { | |
case Duration -> { | ||
return new Duration(); | ||
} | ||
case List -> { | ||
return new JSON(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now we will map arrow types of |
||
} | ||
} | ||
|
||
if (arrowType instanceof ArrowType.ExtensionType extensionType) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package io.cloudquery.types; | ||
|
||
import org.apache.arrow.vector.types.pojo.ExtensionTypeRegistry; | ||
|
||
public class Extensions { | ||
public static void registerExtensions() { | ||
ExtensionTypeRegistry.register(new UUIDType()); | ||
ExtensionTypeRegistry.register(new JSONType()); | ||
} | ||
|
||
private Extensions() {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,5 @@ | ||
package io.cloudquery.types; | ||
|
||
import static org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.UUID; | ||
import org.apache.arrow.memory.BufferAllocator; | ||
|
@@ -61,6 +59,9 @@ public UUIDVector(String name, BufferAllocator allocator, FixedSizeBinaryVector | |
|
||
@Override | ||
public Object getObject(int index) { | ||
if (getUnderlyingVector().isSet(index) == 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice |
||
return null; | ||
} | ||
final ByteBuffer bb = ByteBuffer.wrap(getUnderlyingVector().getObject(index)); | ||
return new UUID(bb.getLong(), bb.getLong()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 👍