Skip to content

Commit

Permalink
HIVE-28077: Iceberg: Major QB Compaction on partition level
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Fingerman committed Mar 8, 2024
1 parent 8900565 commit afe0bd0
Show file tree
Hide file tree
Showing 15 changed files with 591 additions and 20 deletions.
4 changes: 3 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2232,7 +2232,9 @@ public static enum ConfVars {
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
"If this is set to true, then all the data files being read should be withing the table location"),

HIVE_ICEBERG_COMPACTION_PARTITION("hive.iceberg.compaction.partition", "",
"Partition specification for Iceberg compaction"),

HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,13 @@ public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table table, E
throws MetaException {
this.catalogProperties = getCatalogProperties(table);
this.icebergTable = Catalogs.loadTable(conf, catalogProperties);
Map<String, PartitionField> partitionFieldMap = icebergTable.spec().fields().stream()
preTruncateTable(icebergTable, partNames, icebergTable.newDelete());
context.putToProperties("truncateSkipDataDeletion", "true");
}

public static void preTruncateTable(Table table, List<String> partNames, DeleteFiles delete)
throws MetaException {
Map<String, PartitionField> partitionFieldMap = table.spec().fields().stream()
.collect(Collectors.toMap(PartitionField::name, Function.identity()));
Expression finalExp = CollectionUtils.isEmpty(partNames) ? Expressions.alwaysTrue() : Expressions.alwaysFalse();
if (partNames != null) {
Expand All @@ -657,8 +663,8 @@ public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table table, E
}
if (partitionFieldMap.containsKey(entry.getKey())) {
PartitionField partitionField = partitionFieldMap.get(entry.getKey());
Type resultType = partitionField.transform().getResultType(icebergTable.schema()
.findField(partitionField.sourceId()).type());
Type resultType = partitionField.transform().getResultType(table.schema()
.findField(partitionField.sourceId()).type());
TransformSpec.TransformType transformType = TransformSpec.fromString(partitionField.transform().toString());
Object value = Conversions.fromPartitionString(resultType, partColValue);
Iterable iterable = () -> Collections.singletonList(value).iterator();
Expand All @@ -678,10 +684,8 @@ public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table table, E
}
}

DeleteFiles delete = icebergTable.newDelete();
delete.deleteFromRowFilter(finalExp);
delete.commit();
context.putToProperties("truncateSkipDataDeletion", "true");
}

@Override public boolean createHMSTableInHook() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,12 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
.map(x -> x.getJobConf().get(ConfVars.REWRITE_POLICY.varname))
.orElse(RewritePolicy.DEFAULT.name()));

commitOverwrite(table, branchName, startTime, filesForCommit, rewritePolicy);
String partitionSpec = outputTable.jobContexts.stream()
.findAny()
.map(x -> x.getJobConf().get(ConfVars.HIVE_ICEBERG_COMPACTION_PARTITION.varname))
.orElse(StringUtils.EMPTY);

commitOverwrite(table, branchName, startTime, filesForCommit, rewritePolicy, partitionSpec);
}
}

Expand Down Expand Up @@ -544,14 +549,20 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
* @param rewritePolicy The rewrite policy to use for the insert overwrite commit
*/
private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results,
RewritePolicy rewritePolicy) {
RewritePolicy rewritePolicy, String partitionSpec) {
Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite");
if (!results.dataFiles().isEmpty()) {
Transaction transaction = table.newTransaction();
if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
DeleteFiles delete = transaction.newDelete();
delete.deleteFromRowFilter(Expressions.alwaysTrue());
delete.commit();
} else if (rewritePolicy == RewritePolicy.SINGLE_PARTITION) {
try {
HiveIcebergMetaHook.preTruncateTable(table, Arrays.asList(partitionSpec), transaction.newDelete());
} catch (Exception e) {
throw new RuntimeException("Failed truncating partitions", e);
}
}
ReplacePartitions overwrite = transaction.newReplacePartitions();
results.dataFiles().forEach(overwrite::addFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1110,8 +1110,9 @@ public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException {
// If the table is empty we don't have any danger that some data can get lost.
return;
}
if (RewritePolicy.fromString(conf.get(ConfVars.REWRITE_POLICY.varname, RewritePolicy.DEFAULT.name())) ==
RewritePolicy.ALL_PARTITIONS) {
RewritePolicy rewritePolicy = RewritePolicy.fromString(conf.get(ConfVars.REWRITE_POLICY.varname,
RewritePolicy.DEFAULT.name()));
if (rewritePolicy == RewritePolicy.ALL_PARTITIONS || rewritePolicy == RewritePolicy.SINGLE_PARTITION) {
// Table rewriting has special logic as part of IOW that handles the case when table had a partition evolution
return;
}
Expand Down Expand Up @@ -2021,4 +2022,22 @@ public List<Partition> getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e));
}
}

@Override
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable, List<String> partSpecs)
throws SemanticException {

Map<String, String> partMap = partSpecs
.stream()
.collect(Collectors.toMap(
spec -> spec.split("=")[0],
spec -> spec.split("=")[1]
));

List<String> partNames = getPartitionNames(hmsTable, partMap);

return partNames.stream()
.map(partName -> new DummyPartition(hmsTable, partName))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,29 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
Map<String, String> tblProperties = context.getTable().getParameters();
LOG.debug("Initiating compaction for the {} table", compactTableName);

String compactionQuery = String.format("insert overwrite table %s select * from %<s",
compactTableName);
String partition = context.getCompactionInfo().partName;
String compactionQuery;
RewritePolicy rewritePolicy;

if (partition == null) {
compactionQuery = String.format("insert overwrite table %s select * from %<s",
compactTableName);
rewritePolicy = RewritePolicy.ALL_PARTITIONS;
} else {
HiveConf.setVar(context.getConf(), ConfVars.HIVE_ICEBERG_COMPACTION_PARTITION, partition);
compactionQuery = String.format("insert overwrite table %1$s partition(%2$s) select * from %1$s where %2$s",
compactTableName, partition);
rewritePolicy = RewritePolicy.SINGLE_PARTITION;
}

SessionState sessionState = setupQueryCompactionSession(context.getConf(),
context.getCompactionInfo(), tblProperties);
HiveConf.setVar(context.getConf(), ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name());
HiveConf.setVar(context.getConf(), ConfVars.REWRITE_POLICY, rewritePolicy.name());
try {
DriverUtils.runOnDriver(context.getConf(), sessionState, compactionQuery);
LOG.info("Completed compaction for table {}", compactTableName);
} catch (HiveException e) {
LOG.error("Error doing query based {} compaction", RewritePolicy.ALL_PARTITIONS.name(), e);
LOG.error("Error doing query based {} compaction", rewritePolicy.name(), e);
throw new RuntimeException(e);
} finally {
sessionState.setCompaction(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask the totalSize value as it can have slight variability, causing test flakiness
--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (
first_name string,
last_name string
)
partitioned by (dept_id bigint)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc VALUES ('fn1','ln1', 1);
insert into ice_orc VALUES ('fn2','ln2', 1);
insert into ice_orc VALUES ('fn3','ln3', 1);
insert into ice_orc VALUES ('fn4','ln4', 1);
insert into ice_orc VALUES ('fn5','ln5', 2);
insert into ice_orc VALUES ('fn6','ln6', 2);
insert into ice_orc VALUES ('fn7','ln7', 2);

update ice_orc set last_name = 'ln1a' where first_name='fn1';
update ice_orc set last_name = 'ln2a' where first_name='fn2';
update ice_orc set last_name = 'ln3a' where first_name='fn3';
update ice_orc set last_name = 'ln4a' where first_name='fn4';
update ice_orc set last_name = 'ln5a' where first_name='fn5';
update ice_orc set last_name = 'ln6a' where first_name='fn6';
update ice_orc set last_name = 'ln7a' where first_name='fn7';

delete from ice_orc where last_name in ('ln1a', 'ln2a', 'ln7a');

select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc PARTITION (dept_id=1) COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;

explain alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc PARTITION (dept_id=2) COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;
Loading

0 comments on commit afe0bd0

Please sign in to comment.