Skip to content

Commit 1d0c300

Browse files
committed
HIVE-29287: Variant Shredding support
1 parent 2055879 commit 1d0c300

File tree

8 files changed

+1639
-6
lines changed

8 files changed

+1639
-6
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iceberg.mr.hive.serde.objectinspector;
2121

2222
import java.nio.ByteBuffer;
23+
import java.nio.ByteOrder;
2324
import java.util.List;
2425
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
2526
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -61,11 +62,13 @@ public Object getStructFieldData(Object data, StructField fieldRef) {
6162

6263
switch (field.getFieldID()) {
6364
case 0: // "metadata" field (binary)
64-
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
65+
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
66+
.order(ByteOrder.LITTLE_ENDIAN);
6567
variant.metadata().writeTo(metadata, 0);
6668
return metadata.array();
6769
case 1: // "value" field (binary)
68-
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
70+
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
71+
.order(ByteOrder.LITTLE_ENDIAN);
6972
variant.value().writeTo(value, 0);
7073
return value.array();
7174
default:
@@ -79,10 +82,12 @@ public List<Object> getStructFieldsDataAsList(Object data) {
7982
return null;
8083
}
8184
Variant variant = (Variant) data;
82-
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
85+
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
86+
.order(ByteOrder.LITTLE_ENDIAN);
8387
variant.metadata().writeTo(metadata, 0);
8488

85-
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
89+
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
90+
.order(ByteOrder.LITTLE_ENDIAN);
8691
variant.value().writeTo(value, 0);
8792

8893
// Return the data for our fields in the correct order: metadata, value

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iceberg.mr.hive.writer;
2121

22+
import java.util.Map;
2223
import org.apache.iceberg.FileFormat;
2324
import org.apache.iceberg.Schema;
2425
import org.apache.iceberg.SortOrder;
@@ -31,9 +32,13 @@
3132
import org.apache.iceberg.data.parquet.GenericParquetWriter;
3233
import org.apache.iceberg.orc.ORC;
3334
import org.apache.iceberg.parquet.Parquet;
35+
import org.apache.iceberg.types.Types;
3436

3537
class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
3638

39+
private final Map<String, String> properties;
40+
private Record sampleRecord = null;
41+
3742
HiveFileWriterFactory(
3843
Table table,
3944
FileFormat dataFileFormat,
@@ -54,6 +59,7 @@ class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
5459
equalityDeleteRowSchema,
5560
equalityDeleteSortOrder,
5661
positionDeleteRowSchema);
62+
properties = table.properties();
5763
}
5864

5965
static Builder builderFor(Table table) {
@@ -78,6 +84,11 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
7884
@Override
7985
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
8086
builder.createWriterFunc(GenericParquetWriter::create);
87+
// Configure variant shredding function if conditions are met:
88+
if (hasVariantColumns(dataSchema()) && isVariantShreddingEnabled(properties)) {
89+
builder.variantShreddingFunc(
90+
Parquet.constructVariantShreddingFunction(sampleRecord, dataSchema()));
91+
}
8192
}
8293

8394
@Override
@@ -149,4 +160,30 @@ HiveFileWriterFactory build() {
149160
positionDeleteRowSchema);
150161
}
151162
}
163+
164+
/**
165+
* Check if the schema contains any variant columns.
166+
*/
167+
private static boolean hasVariantColumns(Schema schema) {
168+
return schema.columns().stream()
169+
.anyMatch(field -> field.type() instanceof Types.VariantType);
170+
}
171+
172+
/**
173+
* Check if variant shredding is enabled via table properties.
174+
*/
175+
private static boolean isVariantShreddingEnabled(Map<String, String> properties) {
176+
String shreddingEnabled = properties.get("variant.shredding.enabled");
177+
return "true".equalsIgnoreCase(shreddingEnabled);
178+
}
179+
180+
/**
181+
* Set a sample record to use for data-driven variant shredding schema generation.
182+
* Should be called before the Parquet writer is created.
183+
*/
184+
public void initialize(Record record) {
185+
if (this.sampleRecord != null) {
186+
this.sampleRecord = record;
187+
}
188+
}
152189
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
4040
private final Set<String> missingColumns;
4141
private final List<Types.NestedField> missingOrStructFields;
4242

43+
private final HiveFileWriterFactory fileWriterFactory;
44+
4345
HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
4446
OutputFileFactory dataFileFactory, Context context) {
4547
super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, context));
@@ -48,17 +50,18 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
4850
this.missingColumns = context.missingColumns();
4951
this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream()
5052
.filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()).toList();
53+
this.fileWriterFactory = fileWriterFactory;
5154
}
5255

5356
@Override
5457
public void write(Writable row) throws IOException {
5558
Record record = ((Container<Record>) row).get();
5659
HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, missingColumns);
60+
fileWriterFactory.initialize(record);
5761

5862
writer.write(record, specs.get(currentSpecId), partition(record, currentSpecId));
5963
}
6064

61-
6265
@Override
6366
public FilesForCommit files() {
6467
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();

0 commit comments

Comments
 (0)