Skip to content

Commit

Permalink
HIVE-28341: Iceberg: Change Major QB Full Table Compaction to compact…
Browse files Browse the repository at this point in the history
… partition by partition
  • Loading branch information
Dmitriy Fingerman committed Jun 28, 2024
1 parent 74b9c88 commit 1d80307
Show file tree
Hide file tree
Showing 15 changed files with 392 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,6 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
*/
private void commitCompaction(Table table, long startTime, FilesForCommit results,
RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) {
if (results.dataFiles().isEmpty()) {
LOG.info("Empty compaction commit, took {} ms for table: {}", System.currentTimeMillis() - startTime, table);
return;
}
if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
// Full table compaction
Transaction transaction = table.newTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1927,11 +1927,18 @@ public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table ta
return getPartitionNames(table, partitionSpec).stream()
.map(partName -> {
Map<String, String> partSpecMap = Maps.newLinkedHashMap();
Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
if (partName != null && !partName.isEmpty()) {
Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
}
return new DummyPartition(table, partName, partSpecMap);
}).collect(Collectors.toList());
}

public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return IcebergTableUtil.getPartitionFields(table).size() > 0;
}

@Override
public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec, Context.RewritePolicy policy) throws SemanticException {
Expand All @@ -1951,6 +1958,7 @@ private Partition getPartitionImpl(org.apache.hadoop.hive.ql.metadata.Table tabl

/**
* Returns a list of partitions which are corresponding to the table based on the partition spec provided.
* Partitions in the currently active partition spec are returned at the end of the list.
* @param hmsTable A Hive table instance.
* @param partitionSpec Map containing partition specification.
* @return A list of partition values which satisfies the partition spec provided corresponding to the table.
Expand All @@ -1963,7 +1971,9 @@ public List<String> getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table h

try {
return IcebergTableUtil
.getPartitionInfo(icebergTable, partitionSpec, true).entrySet().stream().map(e -> {
.getPartitionInfo(icebergTable, partitionSpec, true).entrySet().stream()
.sorted((o1, o2) -> o1.getValue() == icebergTable.spec().specId() ? 1 : -1)
.map(e -> {
PartitionData partitionData = e.getKey();
int specId = e.getValue();
return icebergTable.specs().get(specId).partitionToPath(partitionData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(),
context.getTable().getTableName());
Map<String, String> partSpecMap = new LinkedHashMap<>();
Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);
if (!partSpec.isEmpty()) {
// Can be empty for a table that was unpartitioned and then after partition evolution became partitioned.
// In this case partSpec will be null for its unpartitioned part.
Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);
}

Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
Map<PartitionData, Integer> partitionInfo = IcebergTableUtil
Expand All @@ -89,7 +93,7 @@ public boolean run(CompactorContext context) throws IOException, HiveException,

HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
conf.set(IcebergCompactionService.PARTITION_SPEC_ID, String.valueOf(specId.get()));
conf.set(IcebergCompactionService.PARTITION_PATH, new Path(partSpec).toString());
conf.set(IcebergCompactionService.PARTITION_PATH, partSpec.isEmpty() ? "" : new Path(partSpec).toString());

List<FieldSchema> partitionKeys = IcebergTableUtil.getPartitionKeys(icebergTable, specId.get());
List<String> partValues = partitionKeys.stream().map(
Expand All @@ -104,12 +108,22 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
.filter(col -> !partSpecMap.containsKey(col))
.collect(Collectors.joining(","));

compactionQuery = String.format("insert overwrite table %1$s partition(%2$s) " +
"select %4$s from %1$s where %3$s and %6$s = %5$d",
compactTableName,
StringUtils.join(partValues, ","),
StringUtils.join(partValues, " and "),
queryFields, specId.get(), VirtualColumn.PARTITION_SPEC_ID.getName());
if (!partSpec.isEmpty()) {
compactionQuery = String.format("insert overwrite table %1$s partition(%2$s) " +
"select %4$s from %1$s where %3$s and %6$s = %5$d",
compactTableName,
StringUtils.join(partValues, ","),
StringUtils.join(partValues, " and "),
queryFields, specId.get(), VirtualColumn.PARTITION_SPEC_ID.getName());
} else {
// A single filter on a virtual column causes errors during compilation,
// added another filter on file_path as a workaround.
compactionQuery = String.format("insert overwrite table %1$s " +
"select %2$s from %1$s where %4$s = %3$d and %5$s is not null",
compactTableName,
queryFields, specId.get(), VirtualColumn.PARTITION_SPEC_ID.getName(),
VirtualColumn.FILE_PATH.getName());
}
}

SessionState sessionState = setupQueryCompactionSession(conf, context.getCompactionInfo(), tblProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (
first_name string,
last_name string,
dept_id bigint
)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc VALUES ('fn1','ln1', 1);
insert into ice_orc VALUES ('fn2','ln2', 1);
insert into ice_orc VALUES ('fn3','ln3', 1);
insert into ice_orc VALUES ('fn4','ln4', 1);
delete from ice_orc where last_name in ('ln3', 'ln4');

alter table ice_orc set partition spec(dept_id);

insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5');
insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6');
insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7');
insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8');
delete from ice_orc where last_name in ('ln7', 'ln8');

select * from ice_orc;
describe formatted ice_orc;
show compactions;

alter table ice_orc COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ Table Parameters:
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"1256\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"}
current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
format-version 2
Expand Down Expand Up @@ -340,7 +340,7 @@ Table Parameters:
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"6\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"}
current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"5\",\"removed-position-delete-files\":\"2\",\"removed-delete-files\":\"2\",\"added-records\":\"3\",\"deleted-records\":\"5\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"2\",\"changed-partition-count\":\"1\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
format-version 2
Expand All @@ -352,7 +352,7 @@ Table Parameters:
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 19
snapshot-count 23
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
Expand All @@ -374,4 +374,9 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=1/team_id=10 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2/team_id=21 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=1/team_id=11 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
Loading

0 comments on commit 1d80307

Please sign in to comment.