Skip to content

Commit

Permalink
HIVE-28267: Support merge task functionality for Iceberg delete files
Browse files Browse the repository at this point in the history
  • Loading branch information
SourabhBadhya committed May 17, 2024
1 parent ba11932 commit 1cff8aa
Show file tree
Hide file tree
Showing 15 changed files with 643 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.ql.stats.Partish;
Expand Down Expand Up @@ -218,7 +219,8 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
public static final String TABLE_DEFAULT_LOCATION = "TABLE_DEFAULT_LOCATION";

private static final List<VirtualColumn> ACID_VIRTUAL_COLS = ImmutableList.of(VirtualColumn.PARTITION_SPEC_ID,
VirtualColumn.PARTITION_HASH, VirtualColumn.FILE_PATH, VirtualColumn.ROW_POSITION);
VirtualColumn.PARTITION_HASH, VirtualColumn.FILE_PATH, VirtualColumn.ROW_POSITION,
VirtualColumn.PARTITION_PROJECTION);
private static final List<FieldSchema> ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA = ACID_VIRTUAL_COLS.stream()
.map(v -> new FieldSchema(v.getName(), v.getTypeInfo().getTypeName(), ""))
.collect(Collectors.toList());
Expand Down Expand Up @@ -317,6 +319,11 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
jobConf.set(opKey, tableDesc.getProperties().getProperty(opKey));
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_SEPARATOR),
"Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
if (tableDesc.getProperties()
.get(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableName) != null) {
jobConf.set(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableName, tableDesc.getProperties()
.getProperty(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableName));
}
String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES);
tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName;
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables);
Expand Down Expand Up @@ -2024,4 +2031,11 @@ public List<FileStatus> getMergeTaskInputFiles(Properties properties) throws IOE
public MergeTaskProperties getMergeTaskProperties(Properties properties) {
return new IcebergMergeTaskProperties(properties);
}

public void setCustomDeleteProperties(TableDesc tableDesc) {
tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(),
Operation.DELETE.name());
tableDesc.getProperties().put(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX +
tableDesc.getTableName(), Operation.DELETE.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
import org.apache.iceberg.util.StructProjection;

public class IcebergAcidUtil {
Expand Down Expand Up @@ -114,7 +119,7 @@ public static PositionDelete<Record> getPositionDelete(Record rec, Record rowDat
String filePath = rec.get(SERDE_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
Long filePosition = rec.get(SERDE_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);

int dataOffset = SERDE_META_COLS.size(); // position in the rec where the actual row data begins
int dataOffset = SERDE_META_COLS.size() + 1; // position in the rec where the actual row data begins
for (int i = dataOffset; i < rec.size(); ++i) {
rowData.set(i - dataOffset, rec.get(i));
}
Expand Down Expand Up @@ -196,6 +201,26 @@ public static long computePartitionHash(Record rec) {
return computeHash(part);
}

public static PartitionKey parsePartitionKey(Record rec) {
String serializedStr = rec.get(4, String.class);
return SerializationUtil.deserializeFromBase64(serializedStr);
}

public static String getSerializedPartitionKey(Record rec, PartitionSpec partitionSpec) {
StructLike structLike = new InternalRecordWrapper(partitionSpec.schema().asStruct()).wrap(rec);
PartitionKey partitionKey = new PartitionKey(partitionSpec, partitionSpec.schema());
partitionKey.partition(structLike);
return SerializationUtil.serializeToBase64(partitionKey);
}

public static String getSerializedPartitionKey(StructLike structLike, PartitionSpec partitionSpec) {
PartitionKey partitionKey = new PartitionKey(partitionSpec, partitionSpec.schema());
for (int idx = 0; idx < structLike.size(); idx++) {
partitionKey.set(idx, structLike.get(idx, Object.class));
}
return SerializationUtil.serializeToBase64(partitionKey);
}

public static String parseFilePath(Record rec) {
return rec.get(FILE_READ_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
}
Expand Down Expand Up @@ -229,13 +254,16 @@ public static class VirtualColumnAwareIterator<T> implements CloseableIterator<T
private GenericRecord current;
private final Schema expectedSchema;
private final Configuration conf;
private final Table table;

public VirtualColumnAwareIterator(CloseableIterator<T> currentIterator, Schema expectedSchema, Configuration conf) {
public VirtualColumnAwareIterator(CloseableIterator<T> currentIterator, Schema expectedSchema, Configuration conf,
Table table) {
this.currentIterator = currentIterator;
this.expectedSchema = expectedSchema;
this.conf = conf;
current = GenericRecord.create(
new Schema(expectedSchema.columns().subList(4, expectedSchema.columns().size())));
new Schema(expectedSchema.columns().subList(4, expectedSchema.columns().size())));
this.table = table;
}

@Override
Expand All @@ -252,13 +280,63 @@ public boolean hasNext() {
public T next() {
T next = currentIterator.next();
GenericRecord rec = (GenericRecord) next;
IcebergAcidUtil.copyFields(rec, FILE_READ_META_COLS.size(), current.size(), current);
int specId = IcebergAcidUtil.parseSpecId(rec);
PositionDeleteInfo.setIntoConf(conf,
IcebergAcidUtil.parseSpecId(rec),
specId,
IcebergAcidUtil.computePartitionHash(rec),
IcebergAcidUtil.parseFilePath(rec),
IcebergAcidUtil.parseFilePosition(rec));
IcebergAcidUtil.copyFields(rec, FILE_READ_META_COLS.size(), current.size(), current);
IcebergAcidUtil.parseFilePosition(rec),
IcebergAcidUtil.getSerializedPartitionKey(current, table.specs().get(specId)));
return (T) current;
}
}

public static class MergeVirtualColumnAwareIterator<T> implements CloseableIterator<T> {

private final CloseableIterator<T> currentIterator;
private GenericRecord current;
private final Schema expectedSchema;
private final Configuration conf;
private final int specId;
private final PartitionSpec partitionSpec;
private final StructLike partition;

public MergeVirtualColumnAwareIterator(CloseableIterator<T> currentIterator,
Schema expectedSchema, Configuration conf, ContentFile contentFile,
Table table) {
this.currentIterator = currentIterator;
this.expectedSchema = expectedSchema;
this.conf = conf;
this.partition = contentFile.partition();
current = GenericRecord.create(
new Schema(expectedSchema.columns().subList(0, expectedSchema.columns().size())));
this.specId = contentFile.specId();

this.partitionSpec = table.specs().get(specId);
}

@Override
public void close() throws IOException {
currentIterator.close();
}

@Override
public boolean hasNext() {
return currentIterator.hasNext();
}

@Override
public T next() {
T next = currentIterator.next();
GenericRecord rec = (GenericRecord) next;
current.set(0, specId);
current.set(1, computeHash(partition));
current.set(2, rec.get(0, String.class));
current.set(3, rec.get(1, Long.class));
current.set(4, getSerializedPartitionKey(partition, partitionSpec));
return (T) current;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.util.StructProjection;
Expand All @@ -51,16 +52,19 @@ public final class HiveBatchIterator implements CloseableIterator<HiveBatchConte
private boolean advanced = false;
private long rowOffset = Long.MIN_VALUE;
private Map<Integer, ?> idToConstant;
private PartitionSpec partitionSpec;

HiveBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
int[] partitionColIndices, Object[] partitionValues, Map<Integer, ?> idToConstant) {
int[] partitionColIndices, Object[] partitionValues, Map<Integer, ?> idToConstant,
PartitionSpec partitionSpec) {
this.recordReader = recordReader;
this.key = recordReader.createKey();
this.batch = recordReader.createValue();
this.vrbCtx = LlapHiveUtils.findMapWork(job).getVectorizedRowBatchCtx();
this.partitionColIndices = partitionColIndices;
this.partitionValues = partitionValues;
this.idToConstant = idToConstant;
this.partitionSpec = partitionSpec;
}

@Override
Expand Down Expand Up @@ -91,40 +95,7 @@ private void advance() {
}
}
// Fill virtual columns
for (VirtualColumn vc : vrbCtx.getNeededVirtualColumns()) {
Object value;
int idx = vrbCtx.findVirtualColumnNum(vc);
switch (vc) {
case PARTITION_SPEC_ID:
value = idToConstant.get(MetadataColumns.SPEC_ID.fieldId());
vrbCtx.addPartitionColsToBatch(batch.cols[idx], value, idx);
break;
case PARTITION_HASH:
value = IcebergAcidUtil.computeHash(
(StructProjection) idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID));
vrbCtx.addPartitionColsToBatch(batch.cols[idx], value, idx);
break;
case FILE_PATH:
value = idToConstant.get(MetadataColumns.FILE_PATH.fieldId());
BytesColumnVector bcv = (BytesColumnVector) batch.cols[idx];
if (value == null) {
bcv.noNulls = false;
bcv.isNull[0] = true;
bcv.isRepeating = true;
} else {
bcv.fill(((String) value).getBytes());
}
break;
case ROW_POSITION:
value = LongStream.range(rowOffset, rowOffset + batch.size).toArray();
LongColumnVector lcv = (LongColumnVector) batch.cols[idx];
lcv.noNulls = true;
Arrays.fill(lcv.isNull, false);
lcv.isRepeating = false;
System.arraycopy(value, 0, lcv.vector, 0, batch.size);
break;
}
}
fillVirtualColumns();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
Expand All @@ -144,4 +115,53 @@ public HiveBatchContext next() {
advanced = false;
return new HiveBatchContext(batch, vrbCtx, rowOffset);
}

public void fillVirtualColumns() {
for (VirtualColumn vc : vrbCtx.getNeededVirtualColumns()) {
Object value;
int idx = vrbCtx.findVirtualColumnNum(vc);
switch (vc) {
case PARTITION_SPEC_ID:
value = idToConstant.get(MetadataColumns.SPEC_ID.fieldId());
vrbCtx.addPartitionColsToBatch(batch.cols[idx], value, idx);
break;
case PARTITION_HASH:
value = IcebergAcidUtil.computeHash(
(StructProjection) idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID));
vrbCtx.addPartitionColsToBatch(batch.cols[idx], value, idx);
break;
case FILE_PATH:
value = idToConstant.get(MetadataColumns.FILE_PATH.fieldId());
BytesColumnVector bcv = (BytesColumnVector) batch.cols[idx];
if (value == null) {
bcv.noNulls = false;
bcv.isNull[0] = true;
bcv.isRepeating = true;
} else {
bcv.fill(((String) value).getBytes());
}
break;
case ROW_POSITION:
value = LongStream.range(rowOffset, rowOffset + batch.size).toArray();
LongColumnVector lcv = (LongColumnVector) batch.cols[idx];
lcv.noNulls = true;
Arrays.fill(lcv.isNull, false);
lcv.isRepeating = false;
System.arraycopy(value, 0, lcv.vector, 0, batch.size);
break;
case PARTITION_PROJECTION:
value = IcebergAcidUtil.getSerializedPartitionKey(
(StructProjection) idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID), partitionSpec);
BytesColumnVector bv = (BytesColumnVector) batch.cols[idx];
if (value == null) {
bv.noNulls = false;
bv.isNull[0] = true;
bv.isRepeating = true;
} else {
bv.fill(((String) value).getBytes());
}
break;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public static CloseableIterable<HiveBatchContext> reader(Table table, Path path,
}

CloseableIterable<HiveBatchContext> vrbIterable =
createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues, idToConstant);
createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues, idToConstant,
partitionSpec);

return deleteFilter != null ? deleteFilter.filterBatch(vrbIterable) : vrbIterable;

Expand Down Expand Up @@ -253,10 +254,10 @@ private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReade

private static CloseableIterable<HiveBatchContext> createVectorizedRowBatchIterable(
RecordReader<NullWritable, VectorizedRowBatch> hiveRecordReader, JobConf job, int[] partitionColIndices,
Object[] partitionValues, Map<Integer, ?> idToConstant) {
Object[] partitionValues, Map<Integer, ?> idToConstant, PartitionSpec partitionSpec) {

HiveBatchIterator iterator =
new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues, idToConstant);
new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues, idToConstant, partitionSpec);

return new CloseableIterable<HiveBatchContext>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand Down Expand Up @@ -58,12 +59,12 @@ public void write(Writable row) throws IOException {
Record rec = ((Container<Record>) row).get();
PositionDelete<Record> positionDelete = IcebergAcidUtil.getPositionDelete(rec, rowDataTemplate);
int specId = IcebergAcidUtil.parseSpecId(rec);
Record rowData = positionDelete.row();
PartitionKey partitionKey = IcebergAcidUtil.parsePartitionKey(rec);
if (skipRowData) {
// Set null as the row data as we intend to avoid writing the actual row data in the delete file.
positionDelete.set(positionDelete.path(), positionDelete.pos(), null);
}
writer.write(positionDelete, specs.get(specId), partition(rowData, specId));
writer.write(positionDelete, specs.get(specId), partitionKey);
}

@Override
Expand Down
Loading

0 comments on commit 1cff8aa

Please sign in to comment.