Skip to content

Commit

Permalink
HIVE-28077: Iceberg: Major QB Compaction on partition level
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Fingerman committed Mar 12, 2024
1 parent 8900565 commit 2bf323f
Show file tree
Hide file tree
Showing 22 changed files with 1,638 additions and 22 deletions.
7 changes: 5 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2232,7 +2232,9 @@ public static enum ConfVars {
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
"If this is set to true, then all the data files being read should be withing the table location"),

HIVE_ICEBERG_COMPACTION_PARTITION("hive.iceberg.compaction.partition", "",
"Partition specification for Iceberg compaction"),

HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +
Expand Down Expand Up @@ -5597,7 +5599,8 @@ public static enum ConfVars {
"hive.zookeeper.ssl.truststore.password," +
"hive.zookeeper.ssl.truststore.type," +
"hive.iceberg.allow.datafiles.in.table.location.only," +
"hive.rewrite.data.policy",
"hive.rewrite.data.policy," +
"hive.iceberg.compaction.partition",
"Comma separated list of configuration options which are immutable at runtime"),
HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
METASTORE_PWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,13 @@ public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table table, E
throws MetaException {
this.catalogProperties = getCatalogProperties(table);
this.icebergTable = Catalogs.loadTable(conf, catalogProperties);
Map<String, PartitionField> partitionFieldMap = icebergTable.spec().fields().stream()
preTruncateTable(icebergTable, partNames, icebergTable.newDelete());
context.putToProperties("truncateSkipDataDeletion", "true");
}

public static void preTruncateTable(Table table, List<String> partNames, DeleteFiles delete)
throws MetaException {
Map<String, PartitionField> partitionFieldMap = table.spec().fields().stream()
.collect(Collectors.toMap(PartitionField::name, Function.identity()));
Expression finalExp = CollectionUtils.isEmpty(partNames) ? Expressions.alwaysTrue() : Expressions.alwaysFalse();
if (partNames != null) {
Expand All @@ -657,8 +663,8 @@ public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table table, E
}
if (partitionFieldMap.containsKey(entry.getKey())) {
PartitionField partitionField = partitionFieldMap.get(entry.getKey());
Type resultType = partitionField.transform().getResultType(icebergTable.schema()
.findField(partitionField.sourceId()).type());
Type resultType = partitionField.transform().getResultType(table.schema()
.findField(partitionField.sourceId()).type());
TransformSpec.TransformType transformType = TransformSpec.fromString(partitionField.transform().toString());
Object value = Conversions.fromPartitionString(resultType, partColValue);
Iterable iterable = () -> Collections.singletonList(value).iterator();
Expand All @@ -678,10 +684,8 @@ public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table table, E
}
}

DeleteFiles delete = icebergTable.newDelete();
delete.deleteFromRowFilter(finalExp);
delete.commit();
context.putToProperties("truncateSkipDataDeletion", "true");
}

@Override public boolean createHMSTableInHook() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,12 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
.map(x -> x.getJobConf().get(ConfVars.REWRITE_POLICY.varname))
.orElse(RewritePolicy.DEFAULT.name()));

commitOverwrite(table, branchName, startTime, filesForCommit, rewritePolicy);
String partitionSpec = outputTable.jobContexts.stream()
.findAny()
.map(x -> x.getJobConf().get(ConfVars.HIVE_ICEBERG_COMPACTION_PARTITION.varname))
.orElse(StringUtils.EMPTY);

commitOverwrite(table, branchName, startTime, filesForCommit, rewritePolicy, partitionSpec);
}
}

Expand Down Expand Up @@ -544,14 +549,20 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
* @param rewritePolicy The rewrite policy to use for the insert overwrite commit
*/
private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results,
RewritePolicy rewritePolicy) {
RewritePolicy rewritePolicy, String partitionSpec) {
Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite");
if (!results.dataFiles().isEmpty()) {
Transaction transaction = table.newTransaction();
if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
DeleteFiles delete = transaction.newDelete();
delete.deleteFromRowFilter(Expressions.alwaysTrue());
delete.commit();
} else if (rewritePolicy == RewritePolicy.SINGLE_PARTITION) {
try {
HiveIcebergMetaHook.preTruncateTable(table, Arrays.asList(partitionSpec), transaction.newDelete());
} catch (Exception e) {
throw new RuntimeException("Failed truncating partitions", e);
}
}
ReplacePartitions overwrite = transaction.newReplacePartitions();
results.dataFiles().forEach(overwrite::addFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1110,8 +1110,9 @@ public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException {
// If the table is empty we don't have any danger that some data can get lost.
return;
}
if (RewritePolicy.fromString(conf.get(ConfVars.REWRITE_POLICY.varname, RewritePolicy.DEFAULT.name())) ==
RewritePolicy.ALL_PARTITIONS) {
RewritePolicy rewritePolicy = RewritePolicy.fromString(conf.get(ConfVars.REWRITE_POLICY.varname,
RewritePolicy.DEFAULT.name()));
if (rewritePolicy == RewritePolicy.ALL_PARTITIONS || rewritePolicy == RewritePolicy.SINGLE_PARTITION) {
// Table rewriting has special logic as part of IOW that handles the case when table had a partition evolution
return;
}
Expand Down Expand Up @@ -2021,4 +2022,34 @@ public List<Partition> getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e));
}
}

@Override
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable, List<String> partSpecs) {

return partSpecs
.stream()
.map(spec -> {
Map<String, String> partSpecsMap = Arrays.stream(spec.split("/"))
.collect(Collectors.toMap(
x -> x.split("=")[0],
x -> x.split("=")[1]
));
validatePartitionName(hmsTable, partSpecsMap);
return spec;
})
.map(partName -> new DummyPartition(hmsTable, partName))
.collect(Collectors.toList());
}

private void validatePartitionName(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partSpec) {
try {
List<String> partNames = getPartitionNames(hmsTable, partSpec);

if (partNames.isEmpty()) {
throw new RuntimeException("Failed getting partitions from partition spec");
}
} catch (Exception e) {
throw new RuntimeException("Error getting partition names", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static Table getTable(Configuration configuration, org.apache.hadoop.hive.metast
return getTable(configuration, properties, skipCache);
}

static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) {
public static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) {
return getTable(configuration, hmsTable, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.iceberg.mr.hive.compaction;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
Expand All @@ -30,6 +33,11 @@
import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.Table;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
import org.apache.iceberg.types.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,22 +52,65 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
Map<String, String> tblProperties = context.getTable().getParameters();
LOG.debug("Initiating compaction for the {} table", compactTableName);

String compactionQuery = String.format("insert overwrite table %s select * from %<s",
compactTableName);
String partSpec = context.getCompactionInfo().partName;
String compactionQuery;
RewritePolicy rewritePolicy;

if (partSpec == null) {
compactionQuery = String.format("insert overwrite table %s select * from %<s",
compactTableName);
rewritePolicy = RewritePolicy.ALL_PARTITIONS;
} else {
Table table = IcebergTableUtil.getTable(context.getConf(), context.getTable());
PartitionData partitionData = DataFiles.data(table.spec(), partSpec);
HiveConf.setVar(context.getConf(), ConfVars.HIVE_ICEBERG_COMPACTION_PARTITION, partSpec);
compactionQuery = String.format("insert overwrite table %1$s partition(%2$s) select * from %1$s where %3$s",
compactTableName, partDataToSQL(partitionData, partSpec, ","),
partDataToSQL(partitionData, partSpec, " and "));
rewritePolicy = RewritePolicy.SINGLE_PARTITION;
}

SessionState sessionState = setupQueryCompactionSession(context.getConf(),
context.getCompactionInfo(), tblProperties);
HiveConf.setVar(context.getConf(), ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name());
HiveConf.setVar(context.getConf(), ConfVars.REWRITE_POLICY, rewritePolicy.name());
try {
DriverUtils.runOnDriver(context.getConf(), sessionState, compactionQuery);
LOG.info("Completed compaction for table {}", compactTableName);
} catch (HiveException e) {
LOG.error("Error doing query based {} compaction", RewritePolicy.ALL_PARTITIONS.name(), e);
LOG.error("Error doing query based {} compaction", rewritePolicy.name(), e);
throw new RuntimeException(e);
} finally {
sessionState.setCompaction(false);
}

return true;
}

public String partDataToSQL(PartitionData partitionData, String partSpec, String delimiter) {
StringBuilder sb = new StringBuilder();
List<String> values = Arrays
.stream(partSpec.split("/"))
.map(x -> x.split("=")[1])
.collect(Collectors.toList());

for (int i = 0; i < partitionData.size(); ++i) {
if (i > 0) {
sb.append(delimiter);
}

String quoteOpt = "";
if (partitionData.getType(i).typeId() == Type.TypeID.STRING ||
partitionData.getType(i).typeId() == Type.TypeID.DATE) {
quoteOpt = "'";
}

sb.append(partitionData.getSchema().getFields().get(i).name())
.append("=")
.append(quoteOpt)
.append(values.get(i))
.append(quoteOpt);
}

return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask the totalSize value as it can have slight variability, causing test flakiness
--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (
first_name string,
last_name string
)
partitioned by (registration_date date)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc VALUES ('fn1','ln1','2024-03-11');
insert into ice_orc VALUES ('fn2','ln2','2024-03-11');
insert into ice_orc VALUES ('fn3','ln3','2024-03-11');
insert into ice_orc VALUES ('fn4','ln4','2024-03-11');
insert into ice_orc VALUES ('fn5','ln5','2024-03-12');
insert into ice_orc VALUES ('fn6','ln6','2024-03-12');
insert into ice_orc VALUES ('fn7','ln7','2024-03-12');

update ice_orc set last_name = 'ln1a' where first_name='fn1';
update ice_orc set last_name = 'ln2a' where first_name='fn2';
update ice_orc set last_name = 'ln3a' where first_name='fn3';
update ice_orc set last_name = 'ln4a' where first_name='fn4';
update ice_orc set last_name = 'ln5a' where first_name='fn5';
update ice_orc set last_name = 'ln6a' where first_name='fn6';
update ice_orc set last_name = 'ln7a' where first_name='fn7';

delete from ice_orc where last_name in ('ln1a', 'ln2a', 'ln7a');

select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc PARTITION (registration_date='2024-03-11') COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;

explain alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc PARTITION (registration_date='2024-03-12') COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask the totalSize value as it can have slight variability, causing test flakiness
--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (
first_name string,
last_name string
)
partitioned by (dept_id bigint)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc VALUES ('fn1','ln1', 1);
insert into ice_orc VALUES ('fn2','ln2', 1);
insert into ice_orc VALUES ('fn3','ln3', 1);
insert into ice_orc VALUES ('fn4','ln4', 1);
insert into ice_orc VALUES ('fn5','ln5', 2);
insert into ice_orc VALUES ('fn6','ln6', 2);
insert into ice_orc VALUES ('fn7','ln7', 2);

update ice_orc set last_name = 'ln1a' where first_name='fn1';
update ice_orc set last_name = 'ln2a' where first_name='fn2';
update ice_orc set last_name = 'ln3a' where first_name='fn3';
update ice_orc set last_name = 'ln4a' where first_name='fn4';
update ice_orc set last_name = 'ln5a' where first_name='fn5';
update ice_orc set last_name = 'ln6a' where first_name='fn6';
update ice_orc set last_name = 'ln7a' where first_name='fn7';

delete from ice_orc where last_name in ('ln1a', 'ln2a', 'ln7a');

select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc PARTITION (dept_id=1) COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;

explain alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc PARTITION (dept_id=2) COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;
Loading

0 comments on commit 2bf323f

Please sign in to comment.