diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 1d8ebdcd0bc4..15174dcc5009 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -373,7 +373,7 @@ public enum ErrorMsg { "metastore."), INVALID_COMPACTION_TYPE(10282, "Invalid compaction type, supported values are 'major' and " + "'minor'"), - NO_COMPACTION_PARTITION(10283, "You must specify a partition to compact for partitioned tables"), + COMPACTION_NO_PARTITION(10283, "You must specify a partition to compact for partitioned tables"), TOO_MANY_COMPACTION_PARTITIONS(10284, "Compaction can only be requested on one partition at a " + "time."), DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"), diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 66c34361fbad..275681199e37 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -604,11 +605,7 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s */ private void commitCompaction(Table table, long startTime, FilesForCommit results, RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { - if (results.dataFiles().isEmpty()) { - LOG.info("Empty compaction commit, took {} ms for table: {}", System.currentTimeMillis() - startTime, table); - return; - } - if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) { + if (rewritePolicy == RewritePolicy.FULL_TABLE) { // Full table compaction Transaction transaction = table.newTransaction(); DeleteFiles delete = transaction.newDelete(); @@ -621,8 +618,12 @@ private void commitCompaction(Table table, long startTime, FilesForCommit result LOG.debug("Compacted full table with files {}", results); } else { // Single partition compaction - List existingDataFiles = IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath); - List existingDeleteFiles = IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath); + List existingDataFiles = + IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath, + partitionPath == null ? Predicate.isEqual(partitionSpecId).negate() : Predicate.isEqual(partitionSpecId)); + List existingDeleteFiles = + IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath, + partitionPath == null ? Predicate.isEqual(partitionSpecId).negate() : Predicate.isEqual(partitionSpecId)); RewriteFiles rewriteFiles = table.newRewrite(); rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId()); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 071130f0977c..ded1d1943582 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -1912,11 +1912,12 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable, } private boolean hasUndergonePartitionEvolution(Table table) { - // If it is a table which has undergone partition evolution, return true. + // The current spec is not necessary the latest which can happen when partition spec was changed to one of + // table's past specs. return table.currentSnapshot() != null && table.currentSnapshot().allManifests(table.io()).parallelStream() .map(ManifestFile::partitionSpecId) - .anyMatch(id -> id < table.spec().specId()); + .anyMatch(id -> id != table.spec().specId()); } private boolean isIdentityPartitionTable(org.apache.hadoop.hive.ql.metadata.Table table) { @@ -1935,8 +1936,9 @@ public Optional isEligibleForCompaction( @Override public List getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, - Map partitionSpec) throws SemanticException { - return getPartitionNames(table, partitionSpec).stream() + Map partitionSpec, boolean latestSpecOnly) throws SemanticException { + Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); + return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, latestSpecOnly).stream() .map(partName -> { Map partSpecMap = Maps.newLinkedHashMap(); Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null); @@ -1944,6 +1946,11 @@ public List getPartitions(org.apache.hadoop.hive.ql.metadata.Table ta }).collect(Collectors.toList()); } + public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); + return table.spec().isPartitioned(); + } + @Override public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table, Map partitionSpec, Context.RewritePolicy policy) throws SemanticException { @@ -1968,21 +1975,10 @@ private Partition getPartitionImpl(org.apache.hadoop.hive.ql.metadata.Table tabl * @return A list of partition values which satisfies the partition spec provided corresponding to the table. * @throws SemanticException Exception raised when there is an issue performing a scan on the partitions table. */ - @Override public List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map partitionSpec) throws SemanticException { Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - - try { - return IcebergTableUtil - .getPartitionInfo(icebergTable, partitionSpec, true).entrySet().stream().map(e -> { - PartitionData partitionData = e.getKey(); - int specId = e.getValue(); - return icebergTable.specs().get(specId).partitionToPath(partitionData); - }).collect(Collectors.toList()); - } catch (IOException e) { - throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e)); - } + return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, true); } /** @@ -2132,4 +2128,10 @@ public void setMergeTaskDeleteProperties(TableDesc tableDesc) { tableDesc.setProperty(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableDesc.getTableName(), Operation.DELETE.name()); } + + @Override + public boolean hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); + return hasUndergonePartitionEvolution(table); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 2016b68905a0..9a661bdaa738 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -27,6 +27,7 @@ import java.util.Properties; import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -393,28 +394,47 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy return data; } - public static List getDataFiles(Table table, int specId, - String partitionPath) { + /** + * Returns list of data files filtered by specId and partitionPath as following: + * 1. If matchBySpecId is true, then filters files by specId == file's specId, else by specId != file's specId + * 2. If partitionPath is not null, then also filters files where partitionPath == file's partition path + * @param table the iceberg table + * @param specId partition spec id + * @param partitionPath partition path + * @param matchBySpecId filter that's applied on data files' spec ids + */ + public static List getDataFiles(Table table, int specId, String partitionPath, + Predicate matchBySpecId) { CloseableIterable fileScanTasks = table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles(); CloseableIterable filteredFileScanTasks = CloseableIterable.filter(fileScanTasks, t -> { DataFile file = t.asFileScanTask().file(); - return file.specId() == specId && table.specs() - .get(specId).partitionToPath(file.partition()).equals(partitionPath); + return matchBySpecId.test(file.specId()) && (partitionPath == null || (partitionPath != null && + table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath))); }); return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file())); } - public static List getDeleteFiles(Table table, int specId, String partitionPath) { + /** + * Returns list of delete files filtered by specId and partitionPath as following: + * 1. If matchBySpecId is true, then filters files by specId == file's specId, else by specId != file's specId + * 2. If partitionPath is not null, then also filters files where partitionPath == file's partition path + * @param table the iceberg table + * @param specId partition spec id + * @param partitionPath partition path + * @param matchBySpecId filter that's applied on delete files' spec ids + */ + public static List getDeleteFiles(Table table, int specId, String partitionPath, + Predicate matchBySpecId) { Table deletesTable = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); CloseableIterable deletesScanTasks = deletesTable.newBatchScan().planFiles(); CloseableIterable filteredDeletesScanTasks = CloseableIterable.filter(deletesScanTasks, t -> { DeleteFile file = ((PositionDeletesScanTask) t).file(); - return file.specId() == specId && table.specs() - .get(specId).partitionToPath(file.partition()).equals(partitionPath); + return matchBySpecId.test(file.specId()) && (partitionPath == null || (partitionPath != null && + table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath))); }); return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks, t -> ((PositionDeletesScanTask) t).file())); @@ -465,7 +485,7 @@ public static List getPartitionFields(Table table) { } public static Map getPartitionInfo(Table icebergTable, Map partSpecMap, - boolean allowPartialSpec) throws SemanticException, IOException { + boolean allowPartialSpec, boolean latestSpecOnly) throws SemanticException, IOException { Expression expression = IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTable, partSpecMap); PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils .createMetadataTableInstance(icebergTable, MetadataTableType.PARTITIONS); @@ -484,10 +504,26 @@ public static Map getPartitionInfo(Table icebergTable, M ResidualEvaluator resEval = ResidualEvaluator.of(icebergTable.specs().get(entry.getValue()), expression, false); return resEval.residualFor(entry.getKey()).isEquivalentTo(Expressions.alwaysTrue()) && - (entry.getKey().size() == partSpecMap.size() || allowPartialSpec); + (entry.getKey().size() == partSpecMap.size() || allowPartialSpec) && + (entry.getValue() == icebergTable.spec().specId() || !latestSpecOnly); }).forEach(entry -> result.put(entry.getKey(), entry.getValue()))); } return result; } + + public static List getPartitionNames(Table icebergTable, Map partitionSpec, + boolean latestSpecOnly) throws SemanticException { + try { + return IcebergTableUtil + .getPartitionInfo(icebergTable, partitionSpec, true, latestSpecOnly).entrySet().stream() + .map(e -> { + PartitionData partitionData = e.getKey(); + int specId = e.getValue(); + return icebergTable.specs().get(specId).partitionToPath(partitionData); + }).collect(Collectors.toList()); + } catch (IOException e) { + throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e)); + } + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java index 6173c804b4b0..411031b50c8e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java @@ -63,20 +63,35 @@ public boolean run(CompactorContext context) throws IOException, HiveException, HiveConf conf = new HiveConf(context.getConf()); String partSpec = context.getCompactionInfo().partName; + org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(), + context.getTable().getTableName()); + Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); String compactionQuery; if (partSpec == null) { - HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name()); - compactionQuery = String.format("insert overwrite table %s select * from % 1) { + // Compacting partitions of old partition specs on a partitioned table with partition evolution + HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name()); + conf.set(IcebergCompactionService.PARTITION_SPEC_ID, String.valueOf(icebergTable.spec().specId())); + // A single filter on a virtual column causes errors during compilation, + // added another filter on file_path as a workaround. + compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " + + "where %2$s != %3$d and %4$s is not null", + compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(), + VirtualColumn.FILE_PATH.getName()); + } else { + // Partitioned table without partition evolution with partition spec as null in the compaction request - this + // code branch is not supposed to be reachable + throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION); + } } else { - org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(), - context.getTable().getTableName()); Map partSpecMap = new LinkedHashMap<>(); Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null); - - Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); Map partitionInfo = IcebergTableUtil - .getPartitionInfo(icebergTable, partSpecMap, false); + .getPartitionInfo(icebergTable, partSpecMap, false, false); Optional specId = partitionInfo.values().stream().findFirst(); if (!specId.isPresent()) { diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q index aef5ae0ef239..7d046aea4e99 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q @@ -14,6 +14,8 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q new file mode 100644 index 000000000000..3fa6f91f90b7 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q @@ -0,0 +1,55 @@ +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ +-- Mask random uuid +--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/ +-- Mask a random snapshot id +--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/ +-- Mask added file size +--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask total file size +--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask current-snapshot-timestamp-ms +--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +-- Mask compaction id as they will be allocated in parallel threads +--! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ + +set hive.llap.io.enabled=true; +set hive.vectorized.execution.enabled=true; +set hive.optimize.shared.work.merge.ts.schema=true; + +create table ice_orc ( + first_name string, + last_name string, + dept_id bigint + ) +stored by iceberg stored as orc +tblproperties ('format-version'='2'); + +insert into ice_orc VALUES ('fn1','ln1', 1); +insert into ice_orc VALUES ('fn2','ln2', 1); +insert into ice_orc VALUES ('fn3','ln3', 1); +insert into ice_orc VALUES ('fn4','ln4', 1); +delete from ice_orc where last_name in ('ln3', 'ln4'); + +alter table ice_orc set partition spec(dept_id); + +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5'); +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6'); +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7'); +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8'); +delete from ice_orc where last_name in ('ln7', 'ln8'); + +select * from ice_orc; +describe formatted ice_orc; +show compactions; + +alter table ice_orc COMPACT 'major' and wait; + +select * from ice_orc; +describe formatted ice_orc; +show compactions; + diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q index 9d766c6b8333..cc0d2fa23b99 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q @@ -14,6 +14,8 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q index 73dbe19a94b5..8501e694de02 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q @@ -14,6 +14,8 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out index 3f8d8914c5e9..d9dee1d9902d 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out @@ -239,7 +239,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"1256\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 @@ -340,7 +340,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"6\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"8\",\"removed-position-delete-files\":\"5\",\"removed-delete-files\":\"5\",\"added-records\":\"3\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"5\",\"changed-partition-count\":\"5\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 @@ -352,7 +352,7 @@ Table Parameters: #### A masked pattern was here #### rawDataSize 0 serialization.format 1 - snapshot-count 19 + snapshot-count 20 storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler table_type ICEBERG totalSize #Masked# @@ -374,4 +374,6 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- #Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out new file mode 100644 index 000000000000..06611f2f27a7 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out @@ -0,0 +1,263 @@ +PREHOOK: query: create table ice_orc ( + first_name string, + last_name string, + dept_id bigint + ) +stored by iceberg stored as orc +tblproperties ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_orc +POSTHOOK: query: create table ice_orc ( + first_name string, + last_name string, + dept_id bigint + ) +stored by iceberg stored as orc +tblproperties ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4') +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +PREHOOK: query: alter table ice_orc set partition spec(dept_id) +PREHOOK: type: ALTERTABLE_SETPARTSPEC +PREHOOK: Input: default@ice_orc +POSTHOOK: query: alter table ice_orc set partition spec(dept_id) +POSTHOOK: type: ALTERTABLE_SETPARTSPEC +POSTHOOK: Input: default@ice_orc +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8') +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +PREHOOK: query: select * from ice_orc +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: select * from ice_orc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +fn1 ln1 1 +fn2 ln2 1 +fn5 ln5 2 +fn6 ln6 2 +PREHOOK: query: describe formatted ice_orc +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc +POSTHOOK: query: describe formatted ice_orc +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc +# col_name data_type comment +first_name string +last_name string +dept_id bigint + +# Partition Transform Information +# col_name transform_type +dept_id IDENTITY + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} + current-snapshot-id #Masked# + current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"4\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-timestamp-ms #Masked# + default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 4 + numRows 4 + parquet.compression zstd +#### A masked pattern was here #### + rawDataSize 0 + serialization.format 1 + snapshot-count 10 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [] +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +PREHOOK: query: alter table ice_orc COMPACT 'major' and wait +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: default@ice_orc +PREHOOK: Output: default@ice_orc +POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: default@ice_orc +POSTHOOK: Output: default@ice_orc +PREHOOK: query: select * from ice_orc +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: select * from ice_orc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +fn1 ln1 1 +fn2 ln2 1 +fn5 ln5 2 +fn6 ln6 2 +PREHOOK: query: describe formatted ice_orc +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc +POSTHOOK: query: describe formatted ice_orc +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc +# col_name data_type comment +first_name string +last_name string +dept_id bigint + +# Partition Transform Information +# col_name transform_type +dept_id IDENTITY + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} + current-snapshot-id #Masked# + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"2\",\"added-records\":\"2\",\"deleted-records\":\"2\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-timestamp-ms #Masked# + default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 2 + numRows 4 + parquet.compression zstd +#### A masked pattern was here #### + rawDataSize 0 + serialization.format 1 + snapshot-count 12 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [] +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out index c9314bc4d03c..781bb41dd14c 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out @@ -191,7 +191,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"3\",\"deleted-records\":\"3\",\"removed-files-size\":\"1440\",\"changed-partition-count\":\"2\",\"total-records\":\"11\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"11\",\"total-delete-files\":\"7\",\"total-position-deletes\":\"7\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"3\",\"deleted-records\":\"3\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"11\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"11\",\"total-delete-files\":\"7\",\"total-position-deletes\":\"7\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -287,7 +287,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"4\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"5\",\"removed-position-delete-files\":\"3\",\"removed-delete-files\":\"3\",\"added-records\":\"2\",\"deleted-records\":\"5\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -321,7 +321,8 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- PREHOOK: query: insert into ice_orc VALUES ('fn11','ln11', 1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table @@ -517,7 +518,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"4\",\"deleted-records\":\"4\",\"removed-files-size\":\"1948\",\"changed-partition-count\":\"2\",\"total-records\":\"16\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"4\",\"deleted-records\":\"4\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"16\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -617,7 +618,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"8\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"8\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"7\",\"removed-position-delete-files\":\"4\",\"removed-delete-files\":\"4\",\"added-records\":\"4\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"4\",\"changed-partition-count\":\"1\",\"total-records\":\"8\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -651,5 +652,7 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out index cfe8f3d3d46f..03deb181cf61 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out @@ -227,7 +227,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":2,\"fields\":[{\"id\":1,\"name\":\"fname\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"address\",\"required\":false,\"type\":\"string\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"6\",\"deleted-records\":\"6\",\"removed-files-size\":\"3167\",\"changed-partition-count\":\"2\",\"total-records\":\"10\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"10\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"6\",\"deleted-records\":\"6\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"10\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"10\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -325,7 +325,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":2,\"fields\":[{\"id\":1,\"name\":\"fname\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"address\",\"required\":false,\"type\":\"string\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"5\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"5\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"6\",\"removed-position-delete-files\":\"4\",\"removed-delete-files\":\"4\",\"added-records\":\"3\",\"deleted-records\":\"6\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"4\",\"changed-partition-count\":\"1\",\"total-records\":\"5\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -359,4 +359,5 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 9e7b79fd3197..cadf4d150062 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -422,6 +422,7 @@ iceberg.llap.query.files=\ iceberg.llap.query.compactor.files=\ iceberg_major_compaction_partition_evolution.q,\ + iceberg_major_compaction_partition_evolution2.q,\ iceberg_major_compaction_partitioned.q,\ iceberg_major_compaction_query_metadata.q,\ iceberg_major_compaction_schema_evolution.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 8634a55cc3ef..2e6df97c1521 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -255,8 +255,8 @@ public String toString() { public enum RewritePolicy { DEFAULT, - ALL_PARTITIONS, - PARTITION; + PARTITION, + FULL_TABLE; public static RewritePolicy fromString(String rewritePolicy) { if (rewritePolicy == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java index ee67cc9a4e42..45204d394c51 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLOperation; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; +import org.apache.hadoop.hive.ql.ddl.DDLUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -89,7 +90,8 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact } //Will directly initiate compaction if an un-partitioned table/a partition is specified in the request - if (desc.getPartitionSpec() != null || !table.isPartitioned()) { + if (desc.getPartitionSpec() != null || !(table.isPartitioned() || + (DDLUtils.isIcebergTable(table) && table.getStorageHandler().isPartitioned(table)))) { if (desc.getPartitionSpec() != null) { Optional partitionName = partitionMap.keySet().stream().findFirst(); partitionName.ifPresent(compactionRequest::setPartitionname); @@ -104,6 +106,13 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact compactionRequest, ServerUtils.hostname(), txnHandler, context.getConf()); parseCompactionResponse(compactionResponse, table, partitionMapEntry.getKey()); } + // If Iceberg table had partition evolution, it will create compaction request without partition specification, + // and it will compact all files from old partition specs, besides compacting partitions of current spec in parallel. + if (DDLUtils.isIcebergTable(table) && table.getStorageHandler().hasUndergonePartitionEvolution(table)) { + compactionRequest.setPartitionname(null); + CompactionResponse compactionResponse = txnHandler.compact(compactionRequest); + parseCompactionResponse(compactionResponse, table, compactionRequest.getPartitionname()); + } } return 0; } @@ -135,7 +144,7 @@ private List getPartitions(Table table, AlterTableCompactDesc desc, D List partitions = new ArrayList<>(); if (desc.getPartitionSpec() == null) { - if (table.isPartitioned()) { + if (table.isPartitioned() || (DDLUtils.isIcebergTable(table) && table.getStorageHandler().isPartitioned(table))) { // Compaction will get initiated for all the potential partitions that meets the criteria partitions = context.getDb().getPartitions(table); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e688e3f5fdf5..4afdc9a7d2d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4298,7 +4298,7 @@ private List getPartitionsWithAuth(Table tbl, Map par public List getPartitions(Table tbl, Map partialPartSpec) throws HiveException { if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) { - return tbl.getStorageHandler().getPartitions(tbl, partialPartSpec); + return tbl.getStorageHandler().getPartitions(tbl, partialPartSpec, false); } else { return getPartitions(tbl, partialPartSpec, (short)-1); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index ae948d6e85da..168a91f3c3de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -778,10 +778,15 @@ default Optional isEligibleForCompaction(org.apache.hadoop.hive.ql.met throw new UnsupportedOperationException("Storage handler does not support validating eligibility for compaction"); } + /** + * Returns partitions names for the current table spec that correspond to the provided partition spec. + * @param hmsTable {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore + * @param partitionSpec Map of Strings {@link java.util.Map} partition specification + * @return List of partition names + */ default List getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map partitionSpec) throws SemanticException { - throw new UnsupportedOperationException("Storage handler does not support getting partitions " + - "by a partition specification."); + throw new UnsupportedOperationException("Storage handler does not support getting partition names"); } default ColumnInfo getColumnInfo(org.apache.hadoop.hive.ql.metadata.Table hmsTable, String colName) @@ -839,9 +844,31 @@ default Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table, M */ default List getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, Map partitionSpec) throws SemanticException { + return getPartitions(table, partitionSpec, true); + } + + /** + * Returns a list of partitions based on table and partial partition specification. + * @param table {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore + * @param partitionSpec Map of Strings {@link java.util.Map} partition specification + * @param latestSpecOnly Specifies if to return only partitions for the latest partition spec + * @return List of Partitions {@link org.apache.hadoop.hive.ql.metadata.Partition} + * @throws SemanticException {@link org.apache.hadoop.hive.ql.parse.SemanticException} + */ + default List getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, + Map partitionSpec, boolean latestSpecOnly) throws SemanticException { throw new UnsupportedOperationException("Storage handler does not support getting partitions for a table."); } + default boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table table) { + throw new UnsupportedOperationException("Storage handler does not support checking if table is partitioned."); + } + + default boolean hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table table) { + throw new UnsupportedOperationException("Storage handler does not support checking if table " + + "undergone partition evolution."); + } + default boolean supportsMergeFiles() { return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java index 70c550375a29..7bcd4ffd558c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java @@ -424,6 +424,10 @@ private static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) thr private static CompactionType checkForCompaction(final CompactionInfo ci, final ValidWriteIdList writeIds, final StorageDescriptor sd, final Map tblProperties, final String runAs, TxnStore txnHandler, HiveConf conf) throws IOException, InterruptedException { + if (MetaStoreUtils.isIcebergTable(tblProperties)) { + return ci.type; + } + // If it's marked as too many aborted, we already know we need to compact if (ci.tooManyAborts) { LOG.debug("Found too many aborted transactions for "