diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index c5b37c62b01f..1339f37a0076 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hive.ql.plan.MergeTaskProperties; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.hadoop.hive.ql.stats.Partish; @@ -218,7 +219,8 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H public static final String TABLE_DEFAULT_LOCATION = "TABLE_DEFAULT_LOCATION"; private static final List ACID_VIRTUAL_COLS = ImmutableList.of(VirtualColumn.PARTITION_SPEC_ID, - VirtualColumn.PARTITION_HASH, VirtualColumn.FILE_PATH, VirtualColumn.ROW_POSITION); + VirtualColumn.PARTITION_HASH, VirtualColumn.FILE_PATH, VirtualColumn.ROW_POSITION, + VirtualColumn.PARTITION_PROJECTION); private static final List ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA = ACID_VIRTUAL_COLS.stream() .map(v -> new FieldSchema(v.getName(), v.getTypeInfo().getTypeName(), "")) .collect(Collectors.toList()); @@ -317,6 +319,11 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { jobConf.set(opKey, tableDesc.getProperties().getProperty(opKey)); Preconditions.checkArgument(!tableName.contains(TABLE_NAME_SEPARATOR), "Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'"); + if (tableDesc.getProperties() + .get(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableName) != null) { + jobConf.set(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableName, tableDesc.getProperties() + .getProperty(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableName)); + } String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES); tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName; jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables); @@ -2024,4 +2031,11 @@ public List getMergeTaskInputFiles(Properties properties) throws IOE public MergeTaskProperties getMergeTaskProperties(Properties properties) { return new IcebergMergeTaskProperties(properties); } + + public void setCustomDeleteProperties(TableDesc tableDesc) { + tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), + Operation.DELETE.name()); + tableDesc.getProperties().put(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + + tableDesc.getTableName(), Operation.DELETE.name()); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java index e225c2b50781..0a981b6c96a3 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java @@ -28,17 +28,22 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.PositionDeleteInfo; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SerializationUtil; import org.apache.iceberg.util.StructProjection; public class IcebergAcidUtil { @@ -114,7 +119,7 @@ public static PositionDelete getPositionDelete(Record rec, Record rowDat String filePath = rec.get(SERDE_META_COLS.get(MetadataColumns.FILE_PATH), String.class); Long filePosition = rec.get(SERDE_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class); - int dataOffset = SERDE_META_COLS.size(); // position in the rec where the actual row data begins + int dataOffset = SERDE_META_COLS.size() + 1; // position in the rec where the actual row data begins for (int i = dataOffset; i < rec.size(); ++i) { rowData.set(i - dataOffset, rec.get(i)); } @@ -196,6 +201,26 @@ public static long computePartitionHash(Record rec) { return computeHash(part); } + public static PartitionKey parsePartitionKey(Record rec) { + String serializedStr = rec.get(4, String.class); + return SerializationUtil.deserializeFromBase64(serializedStr); + } + + public static String getSerializedPartitionKey(Record rec, PartitionSpec partitionSpec) { + StructLike structLike = new InternalRecordWrapper(partitionSpec.schema().asStruct()).wrap(rec); + PartitionKey partitionKey = new PartitionKey(partitionSpec, partitionSpec.schema()); + partitionKey.partition(structLike); + return SerializationUtil.serializeToBase64(partitionKey); + } + + public static String getSerializedPartitionKey(StructLike structLike, PartitionSpec partitionSpec) { + PartitionKey partitionKey = new PartitionKey(partitionSpec, partitionSpec.schema()); + for (int idx = 0; idx < structLike.size(); idx++) { + partitionKey.set(idx, structLike.get(idx, Object.class)); + } + return SerializationUtil.serializeToBase64(partitionKey); + } + public static String parseFilePath(Record rec) { return rec.get(FILE_READ_META_COLS.get(MetadataColumns.FILE_PATH), String.class); } @@ -229,13 +254,16 @@ public static class VirtualColumnAwareIterator implements CloseableIterator currentIterator, Schema expectedSchema, Configuration conf) { + public VirtualColumnAwareIterator(CloseableIterator currentIterator, Schema expectedSchema, Configuration conf, + Table table) { this.currentIterator = currentIterator; this.expectedSchema = expectedSchema; this.conf = conf; current = GenericRecord.create( - new Schema(expectedSchema.columns().subList(4, expectedSchema.columns().size()))); + new Schema(expectedSchema.columns().subList(4, expectedSchema.columns().size()))); + this.table = table; } @Override @@ -252,13 +280,63 @@ public boolean hasNext() { public T next() { T next = currentIterator.next(); GenericRecord rec = (GenericRecord) next; + IcebergAcidUtil.copyFields(rec, FILE_READ_META_COLS.size(), current.size(), current); + int specId = IcebergAcidUtil.parseSpecId(rec); PositionDeleteInfo.setIntoConf(conf, - IcebergAcidUtil.parseSpecId(rec), + specId, IcebergAcidUtil.computePartitionHash(rec), IcebergAcidUtil.parseFilePath(rec), - IcebergAcidUtil.parseFilePosition(rec)); - IcebergAcidUtil.copyFields(rec, FILE_READ_META_COLS.size(), current.size(), current); + IcebergAcidUtil.parseFilePosition(rec), + IcebergAcidUtil.getSerializedPartitionKey(current, table.specs().get(specId))); return (T) current; } } + + public static class MergeVirtualColumnAwareIterator implements CloseableIterator { + + private final CloseableIterator currentIterator; + private GenericRecord current; + private final Schema expectedSchema; + private final Configuration conf; + private final int specId; + private final PartitionSpec partitionSpec; + private final StructLike partition; + + public MergeVirtualColumnAwareIterator(CloseableIterator currentIterator, + Schema expectedSchema, Configuration conf, ContentFile contentFile, + Table table) { + this.currentIterator = currentIterator; + this.expectedSchema = expectedSchema; + this.conf = conf; + this.partition = contentFile.partition(); + current = GenericRecord.create( + new Schema(expectedSchema.columns().subList(0, expectedSchema.columns().size()))); + this.specId = contentFile.specId(); + + this.partitionSpec = table.specs().get(specId); + } + + @Override + public void close() throws IOException { + currentIterator.close(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + T next = currentIterator.next(); + GenericRecord rec = (GenericRecord) next; + current.set(0, specId); + current.set(1, computeHash(partition)); + current.set(2, rec.get(0, String.class)); + current.set(3, rec.get(1, Long.class)); + current.set(4, getSerializedPartitionKey(partition, partitionSpec)); + return (T) current; + } + } + } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java index c12a0d039c7e..5daefcc8e1cc 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.mr.hive.IcebergAcidUtil; import org.apache.iceberg.util.StructProjection; @@ -51,9 +52,11 @@ public final class HiveBatchIterator implements CloseableIterator idToConstant; + private PartitionSpec partitionSpec; HiveBatchIterator(RecordReader recordReader, JobConf job, - int[] partitionColIndices, Object[] partitionValues, Map idToConstant) { + int[] partitionColIndices, Object[] partitionValues, Map idToConstant, + PartitionSpec partitionSpec) { this.recordReader = recordReader; this.key = recordReader.createKey(); this.batch = recordReader.createValue(); @@ -61,6 +64,7 @@ public final class HiveBatchIterator implements CloseableIterator reader(Table table, Path path, } CloseableIterable vrbIterable = - createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues, idToConstant); + createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues, idToConstant, + partitionSpec); return deleteFilter != null ? deleteFilter.filterBatch(vrbIterable) : vrbIterable; @@ -253,10 +254,10 @@ private static RecordReader parquetRecordReade private static CloseableIterable createVectorizedRowBatchIterable( RecordReader hiveRecordReader, JobConf job, int[] partitionColIndices, - Object[] partitionValues, Map idToConstant) { + Object[] partitionValues, Map idToConstant, PartitionSpec partitionSpec) { HiveBatchIterator iterator = - new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues, idToConstant); + new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues, idToConstant, partitionSpec); return new CloseableIterable() { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java index 6753ffa46c25..cfdeb4d52482 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.hadoop.io.Writable; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; @@ -58,12 +59,12 @@ public void write(Writable row) throws IOException { Record rec = ((Container) row).get(); PositionDelete positionDelete = IcebergAcidUtil.getPositionDelete(rec, rowDataTemplate); int specId = IcebergAcidUtil.parseSpecId(rec); - Record rowData = positionDelete.row(); + PartitionKey partitionKey = IcebergAcidUtil.parsePartitionKey(rec); if (skipRowData) { // Set null as the row data as we intend to avoid writing the actual row data in the delete file. positionDelete.set(positionDelete.path(), positionDelete.pos(), null); } - writer.write(positionDelete, specs.get(specId), partition(rowData, specId)); + writer.write(positionDelete, specs.get(specId), partitionKey); } @Override diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index f84ee3261f64..76398656e523 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -52,6 +52,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTableScan; import org.apache.iceberg.DataTask; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IncrementalAppendScan; @@ -343,13 +344,20 @@ public void initialize(InputSplit split, TaskAttemptContext newContext) { private CloseableIterator nextTask() { if (isMerge) { - return open(mergeSplit.getContentFile(), table.schema()).iterator(); + CloseableIterator closeableIterator = open(mergeSplit.getContentFile(), table.schema()).iterator(); + if (mergeSplit.getContentFile() instanceof DeleteFile) { + return new IcebergAcidUtil.MergeVirtualColumnAwareIterator<>(closeableIterator, + IcebergAcidUtil.createSerdeSchemaForDelete(table.schema().columns()), conf, + mergeSplit.getContentFile(), table); + } else { + return closeableIterator; + } } else { CloseableIterator closeableIterator = open(tasks.next(), expectedSchema).iterator(); if (!fetchVirtualColumns || Utilities.getIsVectorized(conf)) { return closeableIterator; } - return new IcebergAcidUtil.VirtualColumnAwareIterator(closeableIterator, expectedSchema, conf); + return new IcebergAcidUtil.VirtualColumnAwareIterator(closeableIterator, expectedSchema, conf, table); } } @@ -455,6 +463,12 @@ private CloseableIterable openGeneric(ContentFile contentFile, Schema readSch table.io().newInputFile(dataFile.path().toString()), dataFile.keyMetadata())); schema = readSchema; + } else if (contentFile instanceof DeleteFile) { + DeleteFile deleteFile = (DeleteFile) contentFile; + inputFile = table.encryption().decrypt(EncryptedFiles.encryptedInput( + table.io().newInputFile(deleteFile.path().toString()), + deleteFile.keyMetadata())); + schema = new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); } CloseableIterable iterable; switch (contentFile.format()) { diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java index dfad82947172..5bb4e8d308c3 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java @@ -112,7 +112,7 @@ public void testRowIterator() throws Exception { inputFormat.getRecordReader(new FileSplit(dataFilePath, 0L, Long.MAX_VALUE, new String[]{}), jobConf, new MockReporter()); HiveBatchIterator hiveBatchIterator = new HiveBatchIterator( - internalVectorizedRecordReader, jobConf, null, null, null); + internalVectorizedRecordReader, jobConf, null, null, null, table.spec()); // Expected to be one batch exactly HiveBatchContext hiveBatchContext = hiveBatchIterator.next(); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_delete_files.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_delete_files.q new file mode 100644 index 000000000000..41f996f03174 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_delete_files.q @@ -0,0 +1,76 @@ +--! qt:dataset:src +set hive.merge.mapredfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.tezfiles=true; +set hive.optimize.sort.dynamic.partition.threshold=-1; +set mapred.reduce.tasks=5; +set hive.blobstore.supported.schemes=hdfs,file; + +create table orc_source(key string) stored by iceberg stored as orc tblproperties('format-version'='2'); +insert overwrite table orc_source select key from src; +insert into table orc_source select key from src; +insert into table orc_source select key from src; +insert into table orc_source select key from src; + +select count(*) from orc_source; + +select count(distinct(file_path)) from default.orc_source.files; + +delete from orc_source where key in (select key from src); + +select count(*) from orc_source; + +-- Only 1 file corresponding to deletes must be generated. +select count(distinct(file_path)) from default.orc_source.files; + +-- Parquet file format +create table parquet_source(key string) stored by iceberg stored as parquet tblproperties('format-version'='2'); +insert overwrite table parquet_source select key from src; +insert into table parquet_source select key from src; +insert into table parquet_source select key from src; +insert into table parquet_source select key from src; + +select count(*) from parquet_source; + +select count(distinct(file_path)) from default.parquet_source.files; + +delete from parquet_source where key in (select key from src); + +-- Only 1 file corresponding to deletes must be generated. +select count(distinct(file_path)) from default.parquet_source.files; + +select count(*) from parquet_source; + +-- Avro file format +create table avro_source(key string) stored by iceberg stored as avro tblproperties('format-version'='2'); +insert overwrite table avro_source select key from src; +insert into table avro_source select key from src; +insert into table avro_source select key from src; +insert into table avro_source select key from src; + +select count(*) from avro_source; + +select count(distinct(file_path)) from default.avro_source.files; + +delete from avro_source where key in (select key from src); + +-- Only 1 file corresponding to deletes must be generated. +select count(distinct(file_path)) from default.avro_source.files; + +select count(*) from avro_source; + +create table orc_part_source(key string) partitioned by spec(key) stored by iceberg stored as orc tblproperties('format-version'='2'); +insert overwrite table orc_part_source select key from src; +insert into table orc_part_source select key from src; +insert into table orc_part_source select key from src; +insert into table orc_part_source select key from src; + +select count(*) from orc_part_source; + +select count(distinct(file_path)) from default.orc_part_source.files; + +delete from orc_part_source where key in (select key from src); + +select count(*) from orc_part_source; + +select count(distinct(file_path)) from default.orc_part_source.files; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_delete_files.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_delete_files.q.out new file mode 100644 index 000000000000..395a923d4c54 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_delete_files.q.out @@ -0,0 +1,344 @@ +PREHOOK: query: create table orc_source(key string) stored by iceberg stored as orc tblproperties('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_source +POSTHOOK: query: create table orc_source(key string) stored by iceberg stored as orc tblproperties('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_source +PREHOOK: query: insert overwrite table orc_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_source +POSTHOOK: query: insert overwrite table orc_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_source +PREHOOK: query: insert into table orc_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_source +POSTHOOK: query: insert into table orc_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_source +PREHOOK: query: insert into table orc_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_source +POSTHOOK: query: insert into table orc_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_source +PREHOOK: query: insert into table orc_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_source +POSTHOOK: query: insert into table orc_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_source +PREHOOK: query: select count(*) from orc_source +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from orc_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(distinct(file_path)) from default.orc_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.orc_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +#### A masked pattern was here #### +4 +PREHOOK: query: delete from orc_source where key in (select key from src) +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_source +POSTHOOK: query: delete from orc_source where key in (select key from src) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_source +PREHOOK: query: select count(*) from orc_source +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from orc_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +#### A masked pattern was here #### +0 +PREHOOK: query: select count(distinct(file_path)) from default.orc_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.orc_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +#### A masked pattern was here #### +5 +PREHOOK: query: create table parquet_source(key string) stored by iceberg stored as parquet tblproperties('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_source +POSTHOOK: query: create table parquet_source(key string) stored by iceberg stored as parquet tblproperties('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_source +PREHOOK: query: insert overwrite table parquet_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_source +POSTHOOK: query: insert overwrite table parquet_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_source +PREHOOK: query: insert into table parquet_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_source +POSTHOOK: query: insert into table parquet_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_source +PREHOOK: query: insert into table parquet_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_source +POSTHOOK: query: insert into table parquet_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_source +PREHOOK: query: insert into table parquet_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_source +POSTHOOK: query: insert into table parquet_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_source +PREHOOK: query: select count(*) from parquet_source +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from parquet_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(distinct(file_path)) from default.parquet_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.parquet_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +#### A masked pattern was here #### +4 +PREHOOK: query: delete from parquet_source where key in (select key from src) +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_source +POSTHOOK: query: delete from parquet_source where key in (select key from src) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_source +PREHOOK: query: select count(distinct(file_path)) from default.parquet_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.parquet_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +#### A masked pattern was here #### +5 +PREHOOK: query: select count(*) from parquet_source +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from parquet_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +#### A masked pattern was here #### +0 +PREHOOK: query: create table avro_source(key string) stored by iceberg stored as avro tblproperties('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@avro_source +POSTHOOK: query: create table avro_source(key string) stored by iceberg stored as avro tblproperties('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@avro_source +PREHOOK: query: insert overwrite table avro_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_source +POSTHOOK: query: insert overwrite table avro_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_source +PREHOOK: query: insert into table avro_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_source +POSTHOOK: query: insert into table avro_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_source +PREHOOK: query: insert into table avro_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_source +POSTHOOK: query: insert into table avro_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_source +PREHOOK: query: insert into table avro_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_source +POSTHOOK: query: insert into table avro_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_source +PREHOOK: query: select count(*) from avro_source +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from avro_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(distinct(file_path)) from default.avro_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.avro_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +#### A masked pattern was here #### +4 +PREHOOK: query: delete from avro_source where key in (select key from src) +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_source +POSTHOOK: query: delete from avro_source where key in (select key from src) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_source +PREHOOK: query: select count(distinct(file_path)) from default.avro_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.avro_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +#### A masked pattern was here #### +5 +PREHOOK: query: select count(*) from avro_source +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from avro_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +#### A masked pattern was here #### +0 +PREHOOK: query: create table orc_part_source(key string) partitioned by spec(key) stored by iceberg stored as orc tblproperties('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_part_source +POSTHOOK: query: create table orc_part_source(key string) partitioned by spec(key) stored by iceberg stored as orc tblproperties('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_part_source +PREHOOK: query: insert overwrite table orc_part_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_part_source +POSTHOOK: query: insert overwrite table orc_part_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_part_source +PREHOOK: query: insert into table orc_part_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_part_source +POSTHOOK: query: insert into table orc_part_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_part_source +PREHOOK: query: insert into table orc_part_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_part_source +POSTHOOK: query: insert into table orc_part_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_part_source +PREHOOK: query: insert into table orc_part_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_part_source +POSTHOOK: query: insert into table orc_part_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_part_source +PREHOOK: query: select count(*) from orc_part_source +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from orc_part_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(distinct(file_path)) from default.orc_part_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.orc_part_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +1236 +PREHOOK: query: delete from orc_part_source where key in (select key from src) +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_part_source +POSTHOOK: query: delete from orc_part_source where key in (select key from src) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_part_source +PREHOOK: query: select count(*) from orc_part_source +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from orc_part_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +0 +PREHOOK: query: select count(distinct(file_path)) from default.orc_part_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.orc_part_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +1545 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 1fdc03557e68..89a45b8c0a3f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -689,6 +689,12 @@ public static Object[] populateVirtualColumnValues(ExecMapperContext ctx, vcValues[i] = new LongWritable(ctx.getIoCxt().getPositionDeleteInfo().getPartitionHash()); } break; + case PARTITION_PROJECTION: + vcValues[i] = null; + if (ctx.getIoCxt().getPositionDeleteInfo() != null) { + vcValues[i] = new Text(ctx.getIoCxt().getPositionDeleteInfo().getPartitionProjection()); + } + break; case FILE_PATH: vcValues[i] = null; if (ctx.getIoCxt().getPositionDeleteInfo() != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/PositionDeleteInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/io/PositionDeleteInfo.java index a77099d0fb9e..784289e00455 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/PositionDeleteInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/PositionDeleteInfo.java @@ -27,32 +27,38 @@ public class PositionDeleteInfo { private static final String CONF_KEY_PART_HASH = "hive.io.context.position.delete.partition.hash"; private static final String CONF_KEY_FILE_PATH = "hive.io.context.position.delete.file.path"; private static final String CONF_KEY_ROW_POSITION = "hive.io.context.position.delete.row.position"; + private static final String CONF_KEY_PARTITION_PROJECTION = "hive.io.context.position.delete.partition.projection"; public static PositionDeleteInfo parseFromConf(Configuration conf) { int specId = conf.getInt(CONF_KEY_SPEC_ID, -1); long partHash = conf.getLong(CONF_KEY_PART_HASH, -1); String filePath = conf.get(CONF_KEY_FILE_PATH); long rowPos = conf.getLong(CONF_KEY_ROW_POSITION, -1); - return new PositionDeleteInfo(specId, partHash, filePath, rowPos); + String partitionProjection = conf.get(CONF_KEY_PARTITION_PROJECTION); + return new PositionDeleteInfo(specId, partHash, filePath, rowPos, partitionProjection); } - public static void setIntoConf(Configuration conf, int specId, long partHash, String filePath, long filePos) { + public static void setIntoConf(Configuration conf, int specId, long partHash, String filePath, + long filePos, String partitionProjection) { conf.setInt(CONF_KEY_SPEC_ID, specId); conf.setLong(CONF_KEY_PART_HASH, partHash); conf.set(CONF_KEY_FILE_PATH, filePath); conf.setLong(CONF_KEY_ROW_POSITION, filePos); + conf.set(CONF_KEY_PARTITION_PROJECTION, partitionProjection); } private final int specId; private final long partitionHash; private final String filePath; private final long filePos; + private final String partitionProjection; - public PositionDeleteInfo(int specId, long partitionHash, String filePath, long filePos) { + public PositionDeleteInfo(int specId, long partitionHash, String filePath, long filePos, String partitionProjection) { this.specId = specId; this.partitionHash = partitionHash; this.filePath = filePath; this.filePos = filePos; + this.partitionProjection = partitionProjection; } public int getSpecId() { @@ -70,4 +76,8 @@ public String getFilePath() { public long getFilePos() { return filePos; } + + public String getPartitionProjection() { + return partitionProjection; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index a2c476c9ad54..b064de77cfc2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -764,4 +764,8 @@ default MergeTaskProperties getMergeTaskProperties(Properties properties) { throw new UnsupportedOperationException("Storage handler does not support getting merge input files " + "for a table."); } + + default void setCustomDeleteProperties(TableDesc tableDesc) { + throw new UnsupportedOperationException("Storage handler does not support getting custom delete merge schema."); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java index e5109b34ef7e..a53f0913cfb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java @@ -59,6 +59,7 @@ public enum VirtualColumn { FILE_PATH("FILE__PATH", TypeInfoFactory.stringTypeInfo), ROW_POSITION("ROW__POSITION", TypeInfoFactory.longTypeInfo), SNAPSHOT_ID("SNAPSHOT__ID", TypeInfoFactory.longTypeInfo), + PARTITION_PROJECTION("PARTITION_PROJECTION", TypeInfoFactory.stringTypeInfo), /** * GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index be66ddb3791f..c3ead01d100f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1261,14 +1261,30 @@ public static void createMRWorkForMergingFiles(FileSinkOperator fsInput, FileSinkDesc fsOutputDesc = null; TableScanOperator tsMerge = null; if (!isBlockMerge) { + TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); + String storageHandlerClass = ts.getProperties().getProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE); + HiveStorageHandler storageHandler = null; + try { + storageHandler = HiveUtils.getStorageHandler(conf, storageHandlerClass); + } catch (HiveException e) { + throw new SemanticException(e); + } + boolean isCustomDelete = false; + if (Context.Operation.DELETE.equals(fsInputDesc.getWriteOperation()) && storageHandler != null && storageHandler.supportsMergeFiles()) { + storageHandler.setCustomDeleteProperties(ts); + isCustomDelete = true; + } // Create a TableScan operator tsMerge = GenMapRedUtils.createTemporaryTableScanOperator( - fsInput.getCompilationOpContext(), inputRS); + fsInput.getCompilationOpContext(), inputRS); // Create a FileSink operator - TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); Path mergeDest = srcMmWriteId == null ? finalName : finalName.getParent(); fsOutputDesc = new FileSinkDesc(mergeDest, ts, conf.getBoolVar(ConfVars.COMPRESS_RESULT)); + if (isCustomDelete) { + fsOutputDesc.setWriteOperation(Context.Operation.DELETE); + } fsOutputDesc.setMmWriteId(srcMmWriteId); fsOutputDesc.setIsMerge(true); // Create and attach the filesink for the merge. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 7d7c5f4fe626..1b4dd349f9af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -8167,8 +8167,10 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) canBeMerged &= destinationTable.getStorageHandler().supportsMergeFiles(); // TODO: Support for merge task for update, delete and merge queries // when storage handler supports it. + if (Context.Operation.DELETE.equals(ctx.getOperation()) && !deleting(dest)) { + canBeMerged = true; + } if (Context.Operation.UPDATE.equals(ctx.getOperation()) - || Context.Operation.DELETE.equals(ctx.getOperation()) || Context.Operation.MERGE.equals(ctx.getOperation())) { canBeMerged = false; }