Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28077: Iceberg: Major QB Compaction on partition level #5123

Merged
merged 3 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ public enum ErrorMsg {
INVALID_METADATA_TABLE_NAME(10430, "Invalid metadata table name {0}.", true),
TABLE_META_REF_NOT_SUPPORTED(10431, "Table Meta Ref extension is not supported for table {0}.", true),
COMPACTION_REFUSED(10432, "Compaction request for {0}.{1}{2} is refused, details: {3}.", true),
COMPACTION_PARTITION_EVOLUTION(10438, "Compaction for {0}.{1} on partition level is not allowed on a table that has undergone partition evolution", true),
COMPACTION_NON_IDENTITY_PARTITION_SPEC(10439, "Compaction for {0}.{1} is not supported on the table with non-identity partition spec", true),
CBO_IS_REQUIRED(10433,
"The following functionality requires CBO (" + HiveConf.ConfVars.HIVE_CBO_ENABLED.varname + "): {0}", true),
CTLF_UNSUPPORTED_FORMAT(10434, "CREATE TABLE LIKE FILE is not supported by the ''{0}'' file format", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
Expand Down Expand Up @@ -1920,6 +1922,45 @@ private boolean hasUndergonePartitionEvolution(Table table) {
.anyMatch(id -> id < table.spec().specId());
}

private boolean isIdentityPartitionTable(org.apache.hadoop.hive.ql.metadata.Table table) {
return getPartitionTransformSpec(table).stream().map(TransformSpec::getTransformType)
.allMatch(type -> type == TransformSpec.TransformType.IDENTITY);
}

@Override
public org.apache.commons.lang3.tuple.Pair<Boolean, ErrorMsg> isEligibleForCompaction(
org.apache.hadoop.hive.ql.metadata.Table table, Map<String, String> partitionSpec) {
if (partitionSpec != null) {
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
if (hasUndergonePartitionEvolution(icebergTable)) {
return org.apache.commons.lang3.tuple.Pair.of(false, ErrorMsg.COMPACTION_PARTITION_EVOLUTION);
}
if (!isIdentityPartitionTable(table)) {
return org.apache.commons.lang3.tuple.Pair.of(false, ErrorMsg.COMPACTION_NON_IDENTITY_PARTITION_SPEC);
}
}
return org.apache.commons.lang3.tuple.Pair.of(true, null);
}

@Override
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec) throws SemanticException {
return getPartitionNames(table, partitionSpec).stream()
.map(partName -> new DummyPartition(table, partName, partitionSpec)).collect(Collectors.toList());
}

@Override
public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec) throws SemanticException {
validatePartSpec(table, partitionSpec);
try {
String partName = Warehouse.makePartName(partitionSpec, false);
return new DummyPartition(table, partName, partitionSpec);
} catch (MetaException e) {
difin marked this conversation as resolved.
Show resolved Hide resolved
throw new SemanticException("Unable to construct name for dummy partition due to: ", e);
}
}

/**
* Returns a list of partitions which are corresponding to the table based on the partition spec provided.
* @param hmsTable A Hive table instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static Table getTable(Configuration configuration, org.apache.hadoop.hive.metast
return getTable(configuration, properties, skipCache);
}

static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) {
public static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) {
return getTable(configuration, hmsTable, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,26 @@
package org.apache.iceberg.mr.hive.compaction;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,22 +55,51 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
Map<String, String> tblProperties = context.getTable().getParameters();
LOG.debug("Initiating compaction for the {} table", compactTableName);

String compactionQuery = String.format("insert overwrite table %s select * from %<s",
compactTableName);
HiveConf conf = new HiveConf(context.getConf());
String partSpec = context.getCompactionInfo().partName;
String compactionQuery;

if (partSpec == null) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name());
compactionQuery = String.format("insert overwrite table %s select * from %<s", compactTableName);
} else {
Hive db = Hive.get(conf);
difin marked this conversation as resolved.
Show resolved Hide resolved
org.apache.hadoop.hive.ql.metadata.Table table = db.getTable(context.getTable().getDbName(),
context.getTable().getTableName());
Map<String, String> partSpecMap = new LinkedHashMap<>();
Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);

List<FieldSchema> partitionKeys = table.getStorageHandler().getPartitionKeys(table);
List<String> partValues = partitionKeys.stream().map(
fs -> String.join("=", HiveUtils.unparseIdentifier(fs.getName()),
TypeInfoUtils.convertStringToLiteralForSQL(partSpecMap.get(fs.getName()),
((PrimitiveTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(fs.getType())).getPrimitiveCategory())
)
).collect(Collectors.toList());

String queryFields = table.getCols().stream()
.map(FieldSchema::getName)
.filter(col -> !partSpecMap.containsKey(col))
.collect(Collectors.joining(","));

compactionQuery = String.format("insert overwrite table %1$s partition(%2$s) select %4$s from %1$s where %3$s",
compactTableName,
StringUtils.join(partValues, ","),
StringUtils.join(partValues, " and "),
queryFields);
}

SessionState sessionState = setupQueryCompactionSession(conf, context.getCompactionInfo(), tblProperties);

SessionState sessionState = setupQueryCompactionSession(context.getConf(),
context.getCompactionInfo(), tblProperties);
HiveConf.setVar(context.getConf(), ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name());
try {
DriverUtils.runOnDriver(context.getConf(), sessionState, compactionQuery);
DriverUtils.runOnDriver(conf, sessionState, compactionQuery);
LOG.info("Completed compaction for table {}", compactTableName);
return true;
} catch (HiveException e) {
LOG.error("Error doing query based {} compaction", RewritePolicy.ALL_PARTITIONS.name(), e);
throw new RuntimeException(e);
LOG.error("Failed compacting table {}", compactTableName, e);
deniskuzZ marked this conversation as resolved.
Show resolved Hide resolved
throw e;
} finally {
sessionState.setCompaction(false);
}

return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask the totalSize value as it can have slight variability, causing test flakiness
--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc_wo_evo (
first_name string,
last_name string
)
partitioned by (dept_id bigint,
city string,
registration_date date)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc_wo_evo VALUES ('fn1','ln1',1,'London','2024-03-11');
insert into ice_orc_wo_evo VALUES ('fn2','ln2',1,'London','2024-03-11');
insert into ice_orc_wo_evo VALUES ('fn3','ln3',1,'London','2024-03-11');
insert into ice_orc_wo_evo VALUES ('fn4','ln4',1,'London','2024-03-11');
insert into ice_orc_wo_evo VALUES ('fn5','ln5',2,'Paris','2024-02-16');
insert into ice_orc_wo_evo VALUES ('fn6','ln6',2,'Paris','2024-02-16');
insert into ice_orc_wo_evo VALUES ('fn7','ln7',2,'Paris','2024-02-16');

update ice_orc_wo_evo set last_name = 'ln1a' where first_name='fn1';
update ice_orc_wo_evo set last_name = 'ln2a' where first_name='fn2';
update ice_orc_wo_evo set last_name = 'ln3a' where first_name='fn3';
update ice_orc_wo_evo set last_name = 'ln4a' where first_name='fn4';
update ice_orc_wo_evo set last_name = 'ln5a' where first_name='fn5';
update ice_orc_wo_evo set last_name = 'ln6a' where first_name='fn6';
update ice_orc_wo_evo set last_name = 'ln7a' where first_name='fn7';

delete from ice_orc_wo_evo where last_name in ('ln1a', 'ln2a', 'ln7a');

select * from ice_orc_wo_evo;
describe formatted ice_orc_wo_evo;

explain alter table ice_orc_wo_evo PARTITION (dept_id=1, city='London', registration_date='2024-03-11') COMPACT 'major' and wait;
alter table ice_orc_wo_evo PARTITION (dept_id=1, city='London', registration_date='2024-03-11') COMPACT 'major' and wait;

select * from ice_orc_wo_evo;
describe formatted ice_orc_wo_evo;
show compactions;

explain alter table ice_orc_wo_evo PARTITION (dept_id=2, city='Paris', registration_date='2024-02-16') COMPACT 'major' and wait;
alter table ice_orc_wo_evo PARTITION (dept_id=2, city='Paris', registration_date='2024-02-16') COMPACT 'major' and wait;

select * from ice_orc_wo_evo;
describe formatted ice_orc_wo_evo;
show compactions;
Loading
Loading