From 4e020675763f17dd1b3020f4b931b2bb32156141 Mon Sep 17 00:00:00 2001 From: Dmitriy Fingerman Date: Thu, 27 Jun 2024 18:54:55 -0400 Subject: [PATCH 1/4] HIVE-28341: Iceberg: Change Major QB Full Table Compaction to compact partitions in parallel. --- .../mr/hive/HiveIcebergOutputCommitter.java | 13 +- .../mr/hive/HiveIcebergStorageHandler.java | 20 +- .../iceberg/mr/hive/IcebergTableUtil.java | 37 ++- .../IcebergMajorQueryCompactor.java | 27 +- ...erg_major_compaction_partition_evolution.q | 2 + ...rg_major_compaction_partition_evolution2.q | 55 ++++ .../iceberg_major_compaction_partitioned.q | 2 + ...ceberg_major_compaction_schema_evolution.q | 2 + ...major_compaction_partition_evolution.q.out | 8 +- ...ajor_compaction_partition_evolution2.q.out | 263 ++++++++++++++++++ ...iceberg_major_compaction_partitioned.q.out | 17 +- ...rg_major_compaction_schema_evolution.q.out | 7 +- .../resources/testconfiguration.properties | 1 + .../compact/AlterTableCompactOperation.java | 14 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 4 +- .../hive/ql/metadata/HiveStorageHandler.java | 42 ++- .../hive/ql/txn/compactor/CompactorUtil.java | 3 +- 17 files changed, 477 insertions(+), 40 deletions(-) create mode 100644 iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q create mode 100644 iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 66c34361fbad..c25a0ea6ee78 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -604,10 +604,6 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s */ private void commitCompaction(Table table, long startTime, FilesForCommit results, RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { - if (results.dataFiles().isEmpty()) { - LOG.info("Empty compaction commit, took {} ms for table: {}", System.currentTimeMillis() - startTime, table); - return; - } if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) { // Full table compaction Transaction transaction = table.newTransaction(); @@ -621,8 +617,13 @@ private void commitCompaction(Table table, long startTime, FilesForCommit result LOG.debug("Compacted full table with files {}", results); } else { // Single partition compaction - List existingDataFiles = IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath); - List existingDeleteFiles = IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath); + List existingDataFiles = partitionPath != null ? + IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath) : + IcebergTableUtil.getDataFilesNotInSpec(table, partitionSpecId); + + List existingDeleteFiles = partitionPath != null ? + IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath) : + IcebergTableUtil.getDeleteFilesNotInSpec(table, partitionSpecId); RewriteFiles rewriteFiles = table.newRewrite(); rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId()); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 071130f0977c..ef7f5dab98f6 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -1935,8 +1935,8 @@ public Optional isEligibleForCompaction( @Override public List getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, - Map partitionSpec) throws SemanticException { - return getPartitionNames(table, partitionSpec).stream() + Map partitionSpec, boolean latestSpecOnly) throws SemanticException { + return getPartitionNames(table, partitionSpec, latestSpecOnly).stream() .map(partName -> { Map partSpecMap = Maps.newLinkedHashMap(); Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null); @@ -1944,6 +1944,11 @@ public List getPartitions(org.apache.hadoop.hive.ql.metadata.Table ta }).collect(Collectors.toList()); } + public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); + return IcebergTableUtil.isPartitioned(table); + } + @Override public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table, Map partitionSpec, Context.RewritePolicy policy) throws SemanticException { @@ -1970,12 +1975,13 @@ private Partition getPartitionImpl(org.apache.hadoop.hive.ql.metadata.Table tabl */ @Override public List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, - Map partitionSpec) throws SemanticException { + Map partitionSpec, boolean latestSpecOnly) throws SemanticException { Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); try { return IcebergTableUtil - .getPartitionInfo(icebergTable, partitionSpec, true).entrySet().stream().map(e -> { + .getPartitionInfo(icebergTable, partitionSpec, true, latestSpecOnly).entrySet().stream() + .map(e -> { PartitionData partitionData = e.getKey(); int specId = e.getValue(); return icebergTable.specs().get(specId).partitionToPath(partitionData); @@ -2132,4 +2138,10 @@ public void setMergeTaskDeleteProperties(TableDesc tableDesc) { tableDesc.setProperty(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableDesc.getTableName(), Operation.DELETE.name()); } + + @Override + public boolean isUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); + return table.specs().size() > 1; + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 2016b68905a0..de83f3aab262 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -393,8 +393,7 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy return data; } - public static List getDataFiles(Table table, int specId, - String partitionPath) { + public static List getDataFiles(Table table, int specId, String partitionPath) { CloseableIterable fileScanTasks = table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles(); CloseableIterable filteredFileScanTasks = @@ -406,6 +405,17 @@ public static List getDataFiles(Table table, int specId, return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file())); } + public static List getDataFilesNotInSpec(Table table, int specId) { + CloseableIterable fileScanTasks = + table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles(); + CloseableIterable filteredFileScanTasks = + CloseableIterable.filter(fileScanTasks, t -> { + DataFile file = t.asFileScanTask().file(); + return file.specId() != specId; + }); + return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file())); + } + public static List getDeleteFiles(Table table, int specId, String partitionPath) { Table deletesTable = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); @@ -420,6 +430,19 @@ public static List getDeleteFiles(Table table, int specId, String pa t -> ((PositionDeletesScanTask) t).file())); } + public static List getDeleteFilesNotInSpec(Table table, int specId) { + Table deletesTable = + MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); + CloseableIterable deletesScanTasks = deletesTable.newBatchScan().planFiles(); + CloseableIterable filteredDeletesScanTasks = + CloseableIterable.filter(deletesScanTasks, t -> { + DeleteFile file = ((PositionDeletesScanTask) t).file(); + return file.specId() != specId; + }); + return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks, + t -> ((PositionDeletesScanTask) t).file())); + } + public static Expression generateExpressionFromPartitionSpec(Table table, Map partitionSpec) throws SemanticException { Map partitionFieldMap = getPartitionFields(table).stream() @@ -465,7 +488,7 @@ public static List getPartitionFields(Table table) { } public static Map getPartitionInfo(Table icebergTable, Map partSpecMap, - boolean allowPartialSpec) throws SemanticException, IOException { + boolean allowPartialSpec, boolean latestSpecOnly) throws SemanticException, IOException { Expression expression = IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTable, partSpecMap); PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils .createMetadataTableInstance(icebergTable, MetadataTableType.PARTITIONS); @@ -484,10 +507,16 @@ public static Map getPartitionInfo(Table icebergTable, M ResidualEvaluator resEval = ResidualEvaluator.of(icebergTable.specs().get(entry.getValue()), expression, false); return resEval.residualFor(entry.getKey()).isEquivalentTo(Expressions.alwaysTrue()) && - (entry.getKey().size() == partSpecMap.size() || allowPartialSpec); + (entry.getKey().size() == partSpecMap.size() || allowPartialSpec) && + (entry.getValue() == icebergTable.spec().specId() || !latestSpecOnly); }).forEach(entry -> result.put(entry.getKey(), entry.getValue()))); } return result; } + + public static boolean isPartitioned(Table table) { + return IcebergTableUtil.getPartitionFields(table).size() > 0; + } + } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java index 6173c804b4b0..6c08d5846a8b 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java @@ -63,20 +63,33 @@ public boolean run(CompactorContext context) throws IOException, HiveException, HiveConf conf = new HiveConf(context.getConf()); String partSpec = context.getCompactionInfo().partName; + org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(), + context.getTable().getTableName()); + Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); String compactionQuery; if (partSpec == null) { - HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name()); - compactionQuery = String.format("insert overwrite table %s select * from % 1) { + // Compacting partitions of old partition specs on a partitioned table with partition evolution + HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name()); + conf.set(IcebergCompactionService.PARTITION_SPEC_ID, String.valueOf(icebergTable.spec().specId())); + // A single filter on a virtual column causes errors during compilation, + // added another filter on file_path as a workaround. + compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " + + "where %2$s != %3$d and %4$s is not null", + compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(), + VirtualColumn.FILE_PATH.getName()); + } else { + return true; + } } else { - org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(), - context.getTable().getTableName()); Map partSpecMap = new LinkedHashMap<>(); Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null); - - Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); Map partitionInfo = IcebergTableUtil - .getPartitionInfo(icebergTable, partSpecMap, false); + .getPartitionInfo(icebergTable, partSpecMap, false, false); Optional specId = partitionInfo.values().stream().findFirst(); if (!specId.isPresent()) { diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q index aef5ae0ef239..7d046aea4e99 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q @@ -14,6 +14,8 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q new file mode 100644 index 000000000000..3fa6f91f90b7 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q @@ -0,0 +1,55 @@ +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ +-- Mask random uuid +--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/ +-- Mask a random snapshot id +--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/ +-- Mask added file size +--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask total file size +--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask current-snapshot-timestamp-ms +--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +-- Mask compaction id as they will be allocated in parallel threads +--! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ + +set hive.llap.io.enabled=true; +set hive.vectorized.execution.enabled=true; +set hive.optimize.shared.work.merge.ts.schema=true; + +create table ice_orc ( + first_name string, + last_name string, + dept_id bigint + ) +stored by iceberg stored as orc +tblproperties ('format-version'='2'); + +insert into ice_orc VALUES ('fn1','ln1', 1); +insert into ice_orc VALUES ('fn2','ln2', 1); +insert into ice_orc VALUES ('fn3','ln3', 1); +insert into ice_orc VALUES ('fn4','ln4', 1); +delete from ice_orc where last_name in ('ln3', 'ln4'); + +alter table ice_orc set partition spec(dept_id); + +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5'); +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6'); +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7'); +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8'); +delete from ice_orc where last_name in ('ln7', 'ln8'); + +select * from ice_orc; +describe formatted ice_orc; +show compactions; + +alter table ice_orc COMPACT 'major' and wait; + +select * from ice_orc; +describe formatted ice_orc; +show compactions; + diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q index 9d766c6b8333..cc0d2fa23b99 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q @@ -14,6 +14,8 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q index 73dbe19a94b5..8501e694de02 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q @@ -14,6 +14,8 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out index 3f8d8914c5e9..d9dee1d9902d 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out @@ -239,7 +239,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"1256\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 @@ -340,7 +340,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"6\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"8\",\"removed-position-delete-files\":\"5\",\"removed-delete-files\":\"5\",\"added-records\":\"3\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"5\",\"changed-partition-count\":\"5\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 @@ -352,7 +352,7 @@ Table Parameters: #### A masked pattern was here #### rawDataSize 0 serialization.format 1 - snapshot-count 19 + snapshot-count 20 storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler table_type ICEBERG totalSize #Masked# @@ -374,4 +374,6 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- #Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out new file mode 100644 index 000000000000..06611f2f27a7 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out @@ -0,0 +1,263 @@ +PREHOOK: query: create table ice_orc ( + first_name string, + last_name string, + dept_id bigint + ) +stored by iceberg stored as orc +tblproperties ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_orc +POSTHOOK: query: create table ice_orc ( + first_name string, + last_name string, + dept_id bigint + ) +stored by iceberg stored as orc +tblproperties ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4') +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +PREHOOK: query: alter table ice_orc set partition spec(dept_id) +PREHOOK: type: ALTERTABLE_SETPARTSPEC +PREHOOK: Input: default@ice_orc +POSTHOOK: query: alter table ice_orc set partition spec(dept_id) +POSTHOOK: type: ALTERTABLE_SETPARTSPEC +POSTHOOK: Input: default@ice_orc +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8') +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +PREHOOK: query: select * from ice_orc +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: select * from ice_orc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +fn1 ln1 1 +fn2 ln2 1 +fn5 ln5 2 +fn6 ln6 2 +PREHOOK: query: describe formatted ice_orc +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc +POSTHOOK: query: describe formatted ice_orc +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc +# col_name data_type comment +first_name string +last_name string +dept_id bigint + +# Partition Transform Information +# col_name transform_type +dept_id IDENTITY + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} + current-snapshot-id #Masked# + current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"4\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-timestamp-ms #Masked# + default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 4 + numRows 4 + parquet.compression zstd +#### A masked pattern was here #### + rawDataSize 0 + serialization.format 1 + snapshot-count 10 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [] +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +PREHOOK: query: alter table ice_orc COMPACT 'major' and wait +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: default@ice_orc +PREHOOK: Output: default@ice_orc +POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: default@ice_orc +POSTHOOK: Output: default@ice_orc +PREHOOK: query: select * from ice_orc +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: select * from ice_orc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +fn1 ln1 1 +fn2 ln2 1 +fn5 ln5 2 +fn6 ln6 2 +PREHOOK: query: describe formatted ice_orc +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc +POSTHOOK: query: describe formatted ice_orc +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc +# col_name data_type comment +first_name string +last_name string +dept_id bigint + +# Partition Transform Information +# col_name transform_type +dept_id IDENTITY + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} + current-snapshot-id #Masked# + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"2\",\"added-records\":\"2\",\"deleted-records\":\"2\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-timestamp-ms #Masked# + default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 2 + numRows 4 + parquet.compression zstd +#### A masked pattern was here #### + rawDataSize 0 + serialization.format 1 + snapshot-count 12 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [] +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out index c9314bc4d03c..781bb41dd14c 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out @@ -191,7 +191,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"3\",\"deleted-records\":\"3\",\"removed-files-size\":\"1440\",\"changed-partition-count\":\"2\",\"total-records\":\"11\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"11\",\"total-delete-files\":\"7\",\"total-position-deletes\":\"7\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"3\",\"deleted-records\":\"3\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"11\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"11\",\"total-delete-files\":\"7\",\"total-position-deletes\":\"7\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -287,7 +287,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"4\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"5\",\"removed-position-delete-files\":\"3\",\"removed-delete-files\":\"3\",\"added-records\":\"2\",\"deleted-records\":\"5\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -321,7 +321,8 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- PREHOOK: query: insert into ice_orc VALUES ('fn11','ln11', 1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table @@ -517,7 +518,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"4\",\"deleted-records\":\"4\",\"removed-files-size\":\"1948\",\"changed-partition-count\":\"2\",\"total-records\":\"16\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"4\",\"deleted-records\":\"4\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"16\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -617,7 +618,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"8\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"8\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"7\",\"removed-position-delete-files\":\"4\",\"removed-delete-files\":\"4\",\"added-records\":\"4\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"4\",\"changed-partition-count\":\"1\",\"total-records\":\"8\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -651,5 +652,7 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out index cfe8f3d3d46f..03deb181cf61 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out @@ -227,7 +227,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":2,\"fields\":[{\"id\":1,\"name\":\"fname\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"address\",\"required\":false,\"type\":\"string\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"6\",\"deleted-records\":\"6\",\"removed-files-size\":\"3167\",\"changed-partition-count\":\"2\",\"total-records\":\"10\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"10\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"6\",\"deleted-records\":\"6\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"10\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"10\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -325,7 +325,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":2,\"fields\":[{\"id\":1,\"name\":\"fname\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"address\",\"required\":false,\"type\":\"string\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"5\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"5\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"6\",\"removed-position-delete-files\":\"4\",\"removed-delete-files\":\"4\",\"added-records\":\"3\",\"deleted-records\":\"6\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"4\",\"changed-partition-count\":\"1\",\"total-records\":\"5\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -359,4 +359,5 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 9e7b79fd3197..cadf4d150062 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -422,6 +422,7 @@ iceberg.llap.query.files=\ iceberg.llap.query.compactor.files=\ iceberg_major_compaction_partition_evolution.q,\ + iceberg_major_compaction_partition_evolution2.q,\ iceberg_major_compaction_partitioned.q,\ iceberg_major_compaction_query_metadata.q,\ iceberg_major_compaction_schema_evolution.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java index ee67cc9a4e42..d2d87ab488da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLOperation; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; +import org.apache.hadoop.hive.ql.ddl.DDLUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -89,7 +90,8 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact } //Will directly initiate compaction if an un-partitioned table/a partition is specified in the request - if (desc.getPartitionSpec() != null || !table.isPartitioned()) { + if (desc.getPartitionSpec() != null || (!table.isPartitioned() && !DDLUtils.isIcebergTable(table) ) || + (DDLUtils.isIcebergTable(table) && !table.getStorageHandler().isPartitioned(table))) { if (desc.getPartitionSpec() != null) { Optional partitionName = partitionMap.keySet().stream().findFirst(); partitionName.ifPresent(compactionRequest::setPartitionname); @@ -104,6 +106,14 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact compactionRequest, ServerUtils.hostname(), txnHandler, context.getConf()); parseCompactionResponse(compactionResponse, table, partitionMapEntry.getKey()); } + // If Iceberg table had partition evolution, it will create compaction request without partition specification, + // and it will compact all files from old partition specs, besides compacting partitions of current spec in parallel. + if (DDLUtils.isIcebergTable(table) && table.getStorageHandler().isPartitioned(table) && + table.getStorageHandler().isUndergonePartitionEvolution(table)) { + compactionRequest.setPartitionname(null); + CompactionResponse compactionResponse = txnHandler.compact(compactionRequest); + parseCompactionResponse(compactionResponse, table, compactionRequest.getPartitionname()); + } } return 0; } @@ -135,7 +145,7 @@ private List getPartitions(Table table, AlterTableCompactDesc desc, D List partitions = new ArrayList<>(); if (desc.getPartitionSpec() == null) { - if (table.isPartitioned()) { + if (table.isPartitioned() || (DDLUtils.isIcebergTable(table) && table.getStorageHandler().isPartitioned(table))) { // Compaction will get initiated for all the potential partitions that meets the criteria partitions = context.getDb().getPartitions(table); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e688e3f5fdf5..fd1b875eb7c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4055,7 +4055,7 @@ public List getPartitionNames(String dbName, String tblName, List names = null; Table t = getTable(dbName, tblName); if (t.getStorageHandler() != null && t.getStorageHandler().alwaysUnpartitioned()) { - return t.getStorageHandler().getPartitionNames(t, partSpec); + return t.getStorageHandler().getPartitionNames(t, partSpec, false); } List pvals = MetaStoreUtils.getPvals(t.getPartCols(), partSpec); @@ -4298,7 +4298,7 @@ private List getPartitionsWithAuth(Table tbl, Map par public List getPartitions(Table tbl, Map partialPartSpec) throws HiveException { if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) { - return tbl.getStorageHandler().getPartitions(tbl, partialPartSpec); + return tbl.getStorageHandler().getPartitions(tbl, partialPartSpec, false); } else { return getPartitions(tbl, partialPartSpec, (short)-1); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index ae948d6e85da..0ddbad523876 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -778,10 +778,28 @@ default Optional isEligibleForCompaction(org.apache.hadoop.hive.ql.met throw new UnsupportedOperationException("Storage handler does not support validating eligibility for compaction"); } + /** + * Returns partitions names for the current table spec that correspond to the provided partition spec. + * @param table {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore + * @param partitionSpec Map of Strings {@link java.util.Map} partition specification + * @return Optional of ErrorMsg {@link org.apache.hadoop.hive.ql.ErrorMsg} + */ default List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map partitionSpec) throws SemanticException { + return getPartitionNames(hmsTable, partitionSpec, true); + } + + /** + * Returns partitions names that correspond to the provided partition spec. + * @param table {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore + * @param partitionSpec Map of Strings {@link java.util.Map} partition specification + * @param latestSpecOnly Tells whether to return partition names for the latest spec only or for past specs too + * @return Optional of ErrorMsg {@link org.apache.hadoop.hive.ql.ErrorMsg} + */ + default List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, + Map partitionSpec, boolean latestSpecOnly) throws SemanticException { throw new UnsupportedOperationException("Storage handler does not support getting partitions " + - "by a partition specification."); + "by a partition specification."); } default ColumnInfo getColumnInfo(org.apache.hadoop.hive.ql.metadata.Table hmsTable, String colName) @@ -839,9 +857,31 @@ default Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table, M */ default List getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, Map partitionSpec) throws SemanticException { + return getPartitions(table, partitionSpec, true); + } + + /** + * Returns a list of partitions based on table and partial partition specification. + * @param table {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore + * @param partitionSpec Map of Strings {@link java.util.Map} partition specification + * @param latestSpecOnly Specifies if to return only partitions for the latest partition spec + * @return List of Partitions {@link org.apache.hadoop.hive.ql.metadata.Partition} + * @throws SemanticException {@link org.apache.hadoop.hive.ql.parse.SemanticException} + */ + default List getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, + Map partitionSpec, boolean latestSpecOnly) throws SemanticException { throw new UnsupportedOperationException("Storage handler does not support getting partitions for a table."); } + default boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table table) { + throw new UnsupportedOperationException("Storage handler does not support checking if table is partitioned."); + } + + default boolean isUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table table) { + throw new UnsupportedOperationException("Storage handler does not support checking if table " + + "undergone partition evolution."); + } + default boolean supportsMergeFiles() { return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java index 70c550375a29..ef2307326f16 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java @@ -493,7 +493,8 @@ public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo ci, checkInterrupt(Initiator.class.getName()); - CompactionType type = checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs, txnHandler, conf); + CompactionType type = MetaStoreUtils.isIcebergTable(t.getParameters()) ? ci.type : + checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs, txnHandler, conf); if (type != null) { ci.type = type; return requestCompaction(ci, runAs, hostName, txnHandler); From 7f7d9edbebdf8e12b416cbc07c7c14545d7cb3c8 Mon Sep 17 00:00:00 2001 From: Dmitriy Fingerman Date: Tue, 23 Jul 2024 13:19:39 -0400 Subject: [PATCH 2/4] review comments July 23 --- .../mr/hive/HiveIcebergOutputCommitter.java | 13 ++-- .../mr/hive/HiveIcebergStorageHandler.java | 10 +-- .../iceberg/mr/hive/IcebergTableUtil.java | 61 ++++++++----------- .../IcebergMajorQueryCompactor.java | 8 ++- .../org/apache/hadoop/hive/ql/Context.java | 2 +- .../compact/AlterTableCompactOperation.java | 2 +- .../hive/ql/metadata/HiveStorageHandler.java | 6 +- .../hive/ql/txn/compactor/CompactorUtil.java | 7 ++- 8 files changed, 52 insertions(+), 57 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index c25a0ea6ee78..fd16cb3ef5d8 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -604,7 +604,7 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s */ private void commitCompaction(Table table, long startTime, FilesForCommit results, RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { - if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) { + if (rewritePolicy == RewritePolicy.FULL_TABLE) { // Full table compaction Transaction transaction = table.newTransaction(); DeleteFiles delete = transaction.newDelete(); @@ -617,13 +617,10 @@ private void commitCompaction(Table table, long startTime, FilesForCommit result LOG.debug("Compacted full table with files {}", results); } else { // Single partition compaction - List existingDataFiles = partitionPath != null ? - IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath) : - IcebergTableUtil.getDataFilesNotInSpec(table, partitionSpecId); - - List existingDeleteFiles = partitionPath != null ? - IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath) : - IcebergTableUtil.getDeleteFilesNotInSpec(table, partitionSpecId); + List existingDataFiles = + IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath, partitionPath != null); + List existingDeleteFiles = + IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath, partitionPath != null); RewriteFiles rewriteFiles = table.newRewrite(); rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId()); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index ef7f5dab98f6..8e9125df62f7 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -1913,10 +1913,12 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable, private boolean hasUndergonePartitionEvolution(Table table) { // If it is a table which has undergone partition evolution, return true. + // if a table has undergone partition evolution, the current spec is not necessary the latest which can happen + // if partition spec was changed to one of table's past specs. return table.currentSnapshot() != null && table.currentSnapshot().allManifests(table.io()).parallelStream() .map(ManifestFile::partitionSpecId) - .anyMatch(id -> id < table.spec().specId()); + .anyMatch(id -> id != table.spec().specId()); } private boolean isIdentityPartitionTable(org.apache.hadoop.hive.ql.metadata.Table table) { @@ -1946,7 +1948,7 @@ public List getPartitions(org.apache.hadoop.hive.ql.metadata.Table ta public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - return IcebergTableUtil.isPartitioned(table); + return table.spec().isPartitioned(); } @Override @@ -2140,8 +2142,8 @@ public void setMergeTaskDeleteProperties(TableDesc tableDesc) { } @Override - public boolean isUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + public boolean hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - return table.specs().size() > 1; + return hasUndergonePartitionEvolution(table); } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index de83f3aab262..1ceac31c8de6 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -393,51 +393,47 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy return data; } - public static List getDataFiles(Table table, int specId, String partitionPath) { - CloseableIterable fileScanTasks = - table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles(); - CloseableIterable filteredFileScanTasks = - CloseableIterable.filter(fileScanTasks, t -> { - DataFile file = t.asFileScanTask().file(); - return file.specId() == specId && table.specs() - .get(specId).partitionToPath(file.partition()).equals(partitionPath); - }); - return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file())); - } - - public static List getDataFilesNotInSpec(Table table, int specId) { + /** + * Returns list of data files filtered by specId and partitionPath as following: + * 1. If matchBySpecId is true, then filters files by specId == file's specId, else by specId != file's specId + * 2. If partitionPath is not null, then also filters files where partitionPath == file's partition path + * @param table the iceberg table + * @param specId partition spec id + * @param partitionPath partition path + * @param matchBySpecId If true, searches for files matching the specId, else searches for ones that don't match + */ + public static List getDataFiles(Table table, int specId, String partitionPath, boolean matchBySpecId) { CloseableIterable fileScanTasks = table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles(); CloseableIterable filteredFileScanTasks = CloseableIterable.filter(fileScanTasks, t -> { DataFile file = t.asFileScanTask().file(); - return file.specId() != specId; + return ((matchBySpecId && file.specId() == specId) || (!matchBySpecId && file.specId() != specId)) && + (partitionPath == null || (partitionPath != null && + table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath))); }); return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file())); } - public static List getDeleteFiles(Table table, int specId, String partitionPath) { - Table deletesTable = - MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); - CloseableIterable deletesScanTasks = deletesTable.newBatchScan().planFiles(); - CloseableIterable filteredDeletesScanTasks = - CloseableIterable.filter(deletesScanTasks, t -> { - DeleteFile file = ((PositionDeletesScanTask) t).file(); - return file.specId() == specId && table.specs() - .get(specId).partitionToPath(file.partition()).equals(partitionPath); - }); - return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks, - t -> ((PositionDeletesScanTask) t).file())); - } - - public static List getDeleteFilesNotInSpec(Table table, int specId) { + /** + * Returns list of delete files filtered by specId and partitionPath as following: + * 1. If matchBySpecId is true, then filters files by specId == file's specId, else by specId != file's specId + * 2. If partitionPath is not null, then also filters files where partitionPath == file's partition path + * @param table the iceberg table + * @param specId partition spec id + * @param partitionPath partition path + * @param matchBySpecId If true, searches for files matching the specId, else searches for ones that don't match + */ + public static List getDeleteFiles(Table table, int specId, String partitionPath, boolean matchBySpecId) { Table deletesTable = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); CloseableIterable deletesScanTasks = deletesTable.newBatchScan().planFiles(); CloseableIterable filteredDeletesScanTasks = CloseableIterable.filter(deletesScanTasks, t -> { DeleteFile file = ((PositionDeletesScanTask) t).file(); - return file.specId() != specId; + return ((matchBySpecId && file.specId() == specId) || (!matchBySpecId && file.specId() != specId)) && + (partitionPath == null || (partitionPath != null && + table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath))); }); return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks, t -> ((PositionDeletesScanTask) t).file())); @@ -514,9 +510,4 @@ public static Map getPartitionInfo(Table icebergTable, M return result; } - - public static boolean isPartitioned(Table table) { - return IcebergTableUtil.getPartitionFields(table).size() > 0; - } - } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java index 6c08d5846a8b..776163309d3d 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java @@ -69,8 +69,8 @@ public boolean run(CompactorContext context) throws IOException, HiveException, String compactionQuery; if (partSpec == null) { - if (!IcebergTableUtil.isPartitioned(icebergTable)) { - HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name()); + if (!icebergTable.spec().isPartitioned()) { + HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name()); compactionQuery = String.format("insert overwrite table %s select * from % 1) { // Compacting partitions of old partition specs on a partitioned table with partition evolution @@ -83,7 +83,9 @@ public boolean run(CompactorContext context) throws IOException, HiveException, compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(), VirtualColumn.FILE_PATH.getName()); } else { - return true; + // Partitioned table without partition evolution with partition spec as null in the compaction request - this + // code branch is not supposed to be reachable + throw new HiveException(ErrorMsg.NO_COMPACTION_PARTITION); } } else { Map partSpecMap = new LinkedHashMap<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 8634a55cc3ef..e8ab3f9eeda7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -255,7 +255,7 @@ public String toString() { public enum RewritePolicy { DEFAULT, - ALL_PARTITIONS, + FULL_TABLE, PARTITION; public static RewritePolicy fromString(String rewritePolicy) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java index d2d87ab488da..b664894ef606 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java @@ -109,7 +109,7 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact // If Iceberg table had partition evolution, it will create compaction request without partition specification, // and it will compact all files from old partition specs, besides compacting partitions of current spec in parallel. if (DDLUtils.isIcebergTable(table) && table.getStorageHandler().isPartitioned(table) && - table.getStorageHandler().isUndergonePartitionEvolution(table)) { + table.getStorageHandler().hasUndergonePartitionEvolution(table)) { compactionRequest.setPartitionname(null); CompactionResponse compactionResponse = txnHandler.compact(compactionRequest); parseCompactionResponse(compactionResponse, table, compactionRequest.getPartitionname()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 0ddbad523876..e00ec7bc9a61 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -780,7 +780,7 @@ default Optional isEligibleForCompaction(org.apache.hadoop.hive.ql.met /** * Returns partitions names for the current table spec that correspond to the provided partition spec. - * @param table {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore + * @param hmsTable {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore * @param partitionSpec Map of Strings {@link java.util.Map} partition specification * @return Optional of ErrorMsg {@link org.apache.hadoop.hive.ql.ErrorMsg} */ @@ -791,7 +791,7 @@ default List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table /** * Returns partitions names that correspond to the provided partition spec. - * @param table {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore + * @param hmsTable {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore * @param partitionSpec Map of Strings {@link java.util.Map} partition specification * @param latestSpecOnly Tells whether to return partition names for the latest spec only or for past specs too * @return Optional of ErrorMsg {@link org.apache.hadoop.hive.ql.ErrorMsg} @@ -877,7 +877,7 @@ default boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table table) { throw new UnsupportedOperationException("Storage handler does not support checking if table is partitioned."); } - default boolean isUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table table) { + default boolean hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table table) { throw new UnsupportedOperationException("Storage handler does not support checking if table " + "undergone partition evolution."); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java index ef2307326f16..7bcd4ffd558c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java @@ -424,6 +424,10 @@ private static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) thr private static CompactionType checkForCompaction(final CompactionInfo ci, final ValidWriteIdList writeIds, final StorageDescriptor sd, final Map tblProperties, final String runAs, TxnStore txnHandler, HiveConf conf) throws IOException, InterruptedException { + if (MetaStoreUtils.isIcebergTable(tblProperties)) { + return ci.type; + } + // If it's marked as too many aborted, we already know we need to compact if (ci.tooManyAborts) { LOG.debug("Found too many aborted transactions for " @@ -493,8 +497,7 @@ public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo ci, checkInterrupt(Initiator.class.getName()); - CompactionType type = MetaStoreUtils.isIcebergTable(t.getParameters()) ? ci.type : - checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs, txnHandler, conf); + CompactionType type = checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs, txnHandler, conf); if (type != null) { ci.type = type; return requestCompaction(ci, runAs, hostName, txnHandler); From 93198d0df2245ca3ff717a9153929e8343ebeb57 Mon Sep 17 00:00:00 2001 From: Dmitriy Fingerman Date: Thu, 1 Aug 2024 17:50:16 -0400 Subject: [PATCH 3/4] review comments Aug 1 --- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +- .../mr/hive/HiveIcebergOutputCommitter.java | 7 +++++-- .../mr/hive/HiveIcebergStorageHandler.java | 5 ++--- .../iceberg/mr/hive/IcebergTableUtil.java | 19 ++++++++++--------- .../IcebergMajorQueryCompactor.java | 2 +- .../org/apache/hadoop/hive/ql/Context.java | 4 ++-- .../apache/hadoop/hive/ql/metadata/Hive.java | 2 +- .../hive/ql/metadata/HiveStorageHandler.java | 4 ++-- 8 files changed, 24 insertions(+), 21 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 1d8ebdcd0bc4..15174dcc5009 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -373,7 +373,7 @@ public enum ErrorMsg { "metastore."), INVALID_COMPACTION_TYPE(10282, "Invalid compaction type, supported values are 'major' and " + "'minor'"), - NO_COMPACTION_PARTITION(10283, "You must specify a partition to compact for partitioned tables"), + COMPACTION_NO_PARTITION(10283, "You must specify a partition to compact for partitioned tables"), TOO_MANY_COMPACTION_PARTITIONS(10284, "Compaction can only be requested on one partition at a " + "time."), DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"), diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index fd16cb3ef5d8..275681199e37 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -618,9 +619,11 @@ private void commitCompaction(Table table, long startTime, FilesForCommit result } else { // Single partition compaction List existingDataFiles = - IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath, partitionPath != null); + IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath, + partitionPath == null ? Predicate.isEqual(partitionSpecId).negate() : Predicate.isEqual(partitionSpecId)); List existingDeleteFiles = - IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath, partitionPath != null); + IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath, + partitionPath == null ? Predicate.isEqual(partitionSpecId).negate() : Predicate.isEqual(partitionSpecId)); RewriteFiles rewriteFiles = table.newRewrite(); rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId()); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 8e9125df62f7..41b8fd9b9cab 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -1912,9 +1912,8 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable, } private boolean hasUndergonePartitionEvolution(Table table) { - // If it is a table which has undergone partition evolution, return true. - // if a table has undergone partition evolution, the current spec is not necessary the latest which can happen - // if partition spec was changed to one of table's past specs. + // The current spec is not necessary the latest which can happen when partition spec was changed to one of + // table's past specs. return table.currentSnapshot() != null && table.currentSnapshot().allManifests(table.io()).parallelStream() .map(ManifestFile::partitionSpecId) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 1ceac31c8de6..552bf9c5bc6a 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -27,6 +27,7 @@ import java.util.Properties; import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -400,16 +401,16 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy * @param table the iceberg table * @param specId partition spec id * @param partitionPath partition path - * @param matchBySpecId If true, searches for files matching the specId, else searches for ones that don't match + * @param matchBySpecId filter that's applied on data files' spec ids */ - public static List getDataFiles(Table table, int specId, String partitionPath, boolean matchBySpecId) { + public static List getDataFiles(Table table, int specId, String partitionPath, + Predicate matchBySpecId) { CloseableIterable fileScanTasks = table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles(); CloseableIterable filteredFileScanTasks = CloseableIterable.filter(fileScanTasks, t -> { DataFile file = t.asFileScanTask().file(); - return ((matchBySpecId && file.specId() == specId) || (!matchBySpecId && file.specId() != specId)) && - (partitionPath == null || (partitionPath != null && + return matchBySpecId.test(file.specId()) && (partitionPath == null || (partitionPath != null && table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath))); }); return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file())); @@ -422,18 +423,18 @@ public static List getDataFiles(Table table, int specId, String partit * @param table the iceberg table * @param specId partition spec id * @param partitionPath partition path - * @param matchBySpecId If true, searches for files matching the specId, else searches for ones that don't match + * @param matchBySpecId filter that's applied on delete files' spec ids */ - public static List getDeleteFiles(Table table, int specId, String partitionPath, boolean matchBySpecId) { + public static List getDeleteFiles(Table table, int specId, String partitionPath, + Predicate matchBySpecId) { Table deletesTable = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); CloseableIterable deletesScanTasks = deletesTable.newBatchScan().planFiles(); CloseableIterable filteredDeletesScanTasks = CloseableIterable.filter(deletesScanTasks, t -> { DeleteFile file = ((PositionDeletesScanTask) t).file(); - return ((matchBySpecId && file.specId() == specId) || (!matchBySpecId && file.specId() != specId)) && - (partitionPath == null || (partitionPath != null && - table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath))); + return matchBySpecId.test(file.specId()) && (partitionPath == null || (partitionPath != null && + table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath))); }); return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks, t -> ((PositionDeletesScanTask) t).file())); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java index 776163309d3d..411031b50c8e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java @@ -85,7 +85,7 @@ public boolean run(CompactorContext context) throws IOException, HiveException, } else { // Partitioned table without partition evolution with partition spec as null in the compaction request - this // code branch is not supposed to be reachable - throw new HiveException(ErrorMsg.NO_COMPACTION_PARTITION); + throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION); } } else { Map partSpecMap = new LinkedHashMap<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index e8ab3f9eeda7..2e6df97c1521 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -255,8 +255,8 @@ public String toString() { public enum RewritePolicy { DEFAULT, - FULL_TABLE, - PARTITION; + PARTITION, + FULL_TABLE; public static RewritePolicy fromString(String rewritePolicy) { if (rewritePolicy == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index fd1b875eb7c5..4afdc9a7d2d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4055,7 +4055,7 @@ public List getPartitionNames(String dbName, String tblName, List names = null; Table t = getTable(dbName, tblName); if (t.getStorageHandler() != null && t.getStorageHandler().alwaysUnpartitioned()) { - return t.getStorageHandler().getPartitionNames(t, partSpec, false); + return t.getStorageHandler().getPartitionNames(t, partSpec); } List pvals = MetaStoreUtils.getPvals(t.getPartCols(), partSpec); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index e00ec7bc9a61..42e8db4509e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -782,7 +782,7 @@ default Optional isEligibleForCompaction(org.apache.hadoop.hive.ql.met * Returns partitions names for the current table spec that correspond to the provided partition spec. * @param hmsTable {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore * @param partitionSpec Map of Strings {@link java.util.Map} partition specification - * @return Optional of ErrorMsg {@link org.apache.hadoop.hive.ql.ErrorMsg} + * @return List of partition names */ default List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map partitionSpec) throws SemanticException { @@ -794,7 +794,7 @@ default List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table * @param hmsTable {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore * @param partitionSpec Map of Strings {@link java.util.Map} partition specification * @param latestSpecOnly Tells whether to return partition names for the latest spec only or for past specs too - * @return Optional of ErrorMsg {@link org.apache.hadoop.hive.ql.ErrorMsg} + * @return List of partition names */ default List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map partitionSpec, boolean latestSpecOnly) throws SemanticException { From fddbc353bbc6f69f1ea33afe6f89c7d013ad923e Mon Sep 17 00:00:00 2001 From: Dmitriy Fingerman Date: Mon, 5 Aug 2024 10:11:56 -0400 Subject: [PATCH 4/4] review comments Aug 5 --- .../mr/hive/HiveIcebergStorageHandler.java | 19 ++++--------------- .../iceberg/mr/hive/IcebergTableUtil.java | 15 +++++++++++++++ .../compact/AlterTableCompactOperation.java | 7 +++---- .../hive/ql/metadata/HiveStorageHandler.java | 15 +-------------- 4 files changed, 23 insertions(+), 33 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 41b8fd9b9cab..ded1d1943582 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -1937,7 +1937,8 @@ public Optional isEligibleForCompaction( @Override public List getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, Map partitionSpec, boolean latestSpecOnly) throws SemanticException { - return getPartitionNames(table, partitionSpec, latestSpecOnly).stream() + Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); + return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, latestSpecOnly).stream() .map(partName -> { Map partSpecMap = Maps.newLinkedHashMap(); Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null); @@ -1974,22 +1975,10 @@ private Partition getPartitionImpl(org.apache.hadoop.hive.ql.metadata.Table tabl * @return A list of partition values which satisfies the partition spec provided corresponding to the table. * @throws SemanticException Exception raised when there is an issue performing a scan on the partitions table. */ - @Override public List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, - Map partitionSpec, boolean latestSpecOnly) throws SemanticException { + Map partitionSpec) throws SemanticException { Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - - try { - return IcebergTableUtil - .getPartitionInfo(icebergTable, partitionSpec, true, latestSpecOnly).entrySet().stream() - .map(e -> { - PartitionData partitionData = e.getKey(); - int specId = e.getValue(); - return icebergTable.specs().get(specId).partitionToPath(partitionData); - }).collect(Collectors.toList()); - } catch (IOException e) { - throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e)); - } + return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, true); } /** diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 552bf9c5bc6a..9a661bdaa738 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -511,4 +511,19 @@ public static Map getPartitionInfo(Table icebergTable, M return result; } + + public static List getPartitionNames(Table icebergTable, Map partitionSpec, + boolean latestSpecOnly) throws SemanticException { + try { + return IcebergTableUtil + .getPartitionInfo(icebergTable, partitionSpec, true, latestSpecOnly).entrySet().stream() + .map(e -> { + PartitionData partitionData = e.getKey(); + int specId = e.getValue(); + return icebergTable.specs().get(specId).partitionToPath(partitionData); + }).collect(Collectors.toList()); + } catch (IOException e) { + throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e)); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java index b664894ef606..45204d394c51 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java @@ -90,8 +90,8 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact } //Will directly initiate compaction if an un-partitioned table/a partition is specified in the request - if (desc.getPartitionSpec() != null || (!table.isPartitioned() && !DDLUtils.isIcebergTable(table) ) || - (DDLUtils.isIcebergTable(table) && !table.getStorageHandler().isPartitioned(table))) { + if (desc.getPartitionSpec() != null || !(table.isPartitioned() || + (DDLUtils.isIcebergTable(table) && table.getStorageHandler().isPartitioned(table)))) { if (desc.getPartitionSpec() != null) { Optional partitionName = partitionMap.keySet().stream().findFirst(); partitionName.ifPresent(compactionRequest::setPartitionname); @@ -108,8 +108,7 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact } // If Iceberg table had partition evolution, it will create compaction request without partition specification, // and it will compact all files from old partition specs, besides compacting partitions of current spec in parallel. - if (DDLUtils.isIcebergTable(table) && table.getStorageHandler().isPartitioned(table) && - table.getStorageHandler().hasUndergonePartitionEvolution(table)) { + if (DDLUtils.isIcebergTable(table) && table.getStorageHandler().hasUndergonePartitionEvolution(table)) { compactionRequest.setPartitionname(null); CompactionResponse compactionResponse = txnHandler.compact(compactionRequest); parseCompactionResponse(compactionResponse, table, compactionRequest.getPartitionname()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 42e8db4509e3..168a91f3c3de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -786,20 +786,7 @@ default Optional isEligibleForCompaction(org.apache.hadoop.hive.ql.met */ default List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map partitionSpec) throws SemanticException { - return getPartitionNames(hmsTable, partitionSpec, true); - } - - /** - * Returns partitions names that correspond to the provided partition spec. - * @param hmsTable {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore - * @param partitionSpec Map of Strings {@link java.util.Map} partition specification - * @param latestSpecOnly Tells whether to return partition names for the latest spec only or for past specs too - * @return List of partition names - */ - default List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, - Map partitionSpec, boolean latestSpecOnly) throws SemanticException { - throw new UnsupportedOperationException("Storage handler does not support getting partitions " + - "by a partition specification."); + throw new UnsupportedOperationException("Storage handler does not support getting partition names"); } default ColumnInfo getColumnInfo(org.apache.hadoop.hive.ql.metadata.Table hmsTable, String colName)