Skip to content

HIVE-28341: Iceberg: Change Major QB Full Table Compaction to compact… #5328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public enum ErrorMsg {
"metastore."),
INVALID_COMPACTION_TYPE(10282, "Invalid compaction type, supported values are 'major' and " +
"'minor'"),
NO_COMPACTION_PARTITION(10283, "You must specify a partition to compact for partitioned tables"),
COMPACTION_NO_PARTITION(10283, "You must specify a partition to compact for partitioned tables"),
TOO_MANY_COMPACTION_PARTITIONS(10284, "Compaction can only be requested on one partition at a " +
"time."),
DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -604,11 +605,7 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
*/
private void commitCompaction(Table table, long startTime, FilesForCommit results,
RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) {
if (results.dataFiles().isEmpty()) {
LOG.info("Empty compaction commit, took {} ms for table: {}", System.currentTimeMillis() - startTime, table);
return;
}
if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
if (rewritePolicy == RewritePolicy.FULL_TABLE) {
// Full table compaction
Transaction transaction = table.newTransaction();
DeleteFiles delete = transaction.newDelete();
Expand All @@ -621,8 +618,12 @@ private void commitCompaction(Table table, long startTime, FilesForCommit result
LOG.debug("Compacted full table with files {}", results);
} else {
// Single partition compaction
List<DataFile> existingDataFiles = IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath);
List<DeleteFile> existingDeleteFiles = IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath);
List<DataFile> existingDataFiles =
IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath,
partitionPath == null ? Predicate.isEqual(partitionSpecId).negate() : Predicate.isEqual(partitionSpecId));
List<DeleteFile> existingDeleteFiles =
IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath,
partitionPath == null ? Predicate.isEqual(partitionSpecId).negate() : Predicate.isEqual(partitionSpecId));

RewriteFiles rewriteFiles = table.newRewrite();
rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1912,11 +1912,12 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
}

private boolean hasUndergonePartitionEvolution(Table table) {
// If it is a table which has undergone partition evolution, return true.
// The current spec is not necessary the latest which can happen when partition spec was changed to one of
// table's past specs.
return table.currentSnapshot() != null &&
table.currentSnapshot().allManifests(table.io()).parallelStream()
.map(ManifestFile::partitionSpecId)
.anyMatch(id -> id < table.spec().specId());
.anyMatch(id -> id != table.spec().specId());
}

private boolean isIdentityPartitionTable(org.apache.hadoop.hive.ql.metadata.Table table) {
Expand All @@ -1935,15 +1936,21 @@ public Optional<ErrorMsg> isEligibleForCompaction(

@Override
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec) throws SemanticException {
return getPartitionNames(table, partitionSpec).stream()
Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException {
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, latestSpecOnly).stream()
.map(partName -> {
Map<String, String> partSpecMap = Maps.newLinkedHashMap();
Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
return new DummyPartition(table, partName, partSpecMap);
}).collect(Collectors.toList());
}

public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return table.spec().isPartitioned();
}

@Override
public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec, Context.RewritePolicy policy) throws SemanticException {
Expand All @@ -1968,21 +1975,10 @@ private Partition getPartitionImpl(org.apache.hadoop.hive.ql.metadata.Table tabl
* @return A list of partition values which satisfies the partition spec provided corresponding to the table.
* @throws SemanticException Exception raised when there is an issue performing a scan on the partitions table.
*/
@Override
public List<String> getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Map<String, String> partitionSpec) throws SemanticException {
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());

try {
return IcebergTableUtil
.getPartitionInfo(icebergTable, partitionSpec, true).entrySet().stream().map(e -> {
PartitionData partitionData = e.getKey();
int specId = e.getValue();
return icebergTable.specs().get(specId).partitionToPath(partitionData);
}).collect(Collectors.toList());
} catch (IOException e) {
throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e));
}
return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, true);
}

/**
Expand Down Expand Up @@ -2132,4 +2128,10 @@ public void setMergeTaskDeleteProperties(TableDesc tableDesc) {
tableDesc.setProperty(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX +
tableDesc.getTableName(), Operation.DELETE.name());
}

@Override
public boolean hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return hasUndergonePartitionEvolution(table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Properties;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -393,28 +394,47 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy
return data;
}

public static List<DataFile> getDataFiles(Table table, int specId,
String partitionPath) {
/**
* Returns list of data files filtered by specId and partitionPath as following:
* 1. If matchBySpecId is true, then filters files by specId == file's specId, else by specId != file's specId
* 2. If partitionPath is not null, then also filters files where partitionPath == file's partition path
* @param table the iceberg table
* @param specId partition spec id
* @param partitionPath partition path
* @param matchBySpecId filter that's applied on data files' spec ids
*/
public static List<DataFile> getDataFiles(Table table, int specId, String partitionPath,
Predicate<Object> matchBySpecId) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return file.specId() == specId && table.specs()
.get(specId).partitionToPath(file.partition()).equals(partitionPath);
return matchBySpecId.test(file.specId()) && (partitionPath == null || (partitionPath != null &&
table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath)));
});
return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file()));
}

public static List<DeleteFile> getDeleteFiles(Table table, int specId, String partitionPath) {
/**
* Returns list of delete files filtered by specId and partitionPath as following:
* 1. If matchBySpecId is true, then filters files by specId == file's specId, else by specId != file's specId
* 2. If partitionPath is not null, then also filters files where partitionPath == file's partition path
* @param table the iceberg table
* @param specId partition spec id
* @param partitionPath partition path
* @param matchBySpecId filter that's applied on delete files' spec ids
*/
public static List<DeleteFile> getDeleteFiles(Table table, int specId, String partitionPath,
Predicate<Object> matchBySpecId) {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles();
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
return file.specId() == specId && table.specs()
.get(specId).partitionToPath(file.partition()).equals(partitionPath);
return matchBySpecId.test(file.specId()) && (partitionPath == null || (partitionPath != null &&
table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath)));
});
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file()));
Expand Down Expand Up @@ -465,7 +485,7 @@ public static List<PartitionField> getPartitionFields(Table table) {
}

public static Map<PartitionData, Integer> getPartitionInfo(Table icebergTable, Map<String, String> partSpecMap,
boolean allowPartialSpec) throws SemanticException, IOException {
boolean allowPartialSpec, boolean latestSpecOnly) throws SemanticException, IOException {
Expression expression = IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTable, partSpecMap);
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
.createMetadataTableInstance(icebergTable, MetadataTableType.PARTITIONS);
Expand All @@ -484,10 +504,26 @@ public static Map<PartitionData, Integer> getPartitionInfo(Table icebergTable, M
ResidualEvaluator resEval = ResidualEvaluator.of(icebergTable.specs().get(entry.getValue()),
expression, false);
return resEval.residualFor(entry.getKey()).isEquivalentTo(Expressions.alwaysTrue()) &&
(entry.getKey().size() == partSpecMap.size() || allowPartialSpec);
(entry.getKey().size() == partSpecMap.size() || allowPartialSpec) &&
(entry.getValue() == icebergTable.spec().specId() || !latestSpecOnly);
}).forEach(entry -> result.put(entry.getKey(), entry.getValue())));
}

return result;
}

public static List<String> getPartitionNames(Table icebergTable, Map<String, String> partitionSpec,
boolean latestSpecOnly) throws SemanticException {
try {
return IcebergTableUtil
.getPartitionInfo(icebergTable, partitionSpec, true, latestSpecOnly).entrySet().stream()
.map(e -> {
PartitionData partitionData = e.getKey();
int specId = e.getValue();
return icebergTable.specs().get(specId).partitionToPath(partitionData);
}).collect(Collectors.toList());
} catch (IOException e) {
throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,35 @@ public boolean run(CompactorContext context) throws IOException, HiveException,

HiveConf conf = new HiveConf(context.getConf());
String partSpec = context.getCompactionInfo().partName;
org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(),
context.getTable().getTableName());
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
String compactionQuery;

if (partSpec == null) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name());
compactionQuery = String.format("insert overwrite table %s select * from %<s", compactTableName);
if (!icebergTable.spec().isPartitioned()) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
compactionQuery = String.format("insert overwrite table %s select * from %<s", compactTableName);
} else if (icebergTable.specs().size() > 1) {
// Compacting partitions of old partition specs on a partitioned table with partition evolution
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
conf.set(IcebergCompactionService.PARTITION_SPEC_ID, String.valueOf(icebergTable.spec().specId()));
// A single filter on a virtual column causes errors during compilation,
// added another filter on file_path as a workaround.
compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " +
"where %2$s != %3$d and %4$s is not null",
compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
VirtualColumn.FILE_PATH.getName());
} else {
// Partitioned table without partition evolution with partition spec as null in the compaction request - this
// code branch is not supposed to be reachable
throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION);
}
} else {
org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(),
context.getTable().getTableName());
Map<String, String> partSpecMap = new LinkedHashMap<>();
Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);

Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
Map<PartitionData, Integer> partitionInfo = IcebergTableUtil
.getPartitionInfo(icebergTable, partSpecMap, false);
.getPartitionInfo(icebergTable, partSpecMap, false, false);
Optional<Integer> specId = partitionInfo.values().stream().findFirst();

if (!specId.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (
first_name string,
last_name string,
dept_id bigint
)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc VALUES ('fn1','ln1', 1);
insert into ice_orc VALUES ('fn2','ln2', 1);
insert into ice_orc VALUES ('fn3','ln3', 1);
insert into ice_orc VALUES ('fn4','ln4', 1);
delete from ice_orc where last_name in ('ln3', 'ln4');

alter table ice_orc set partition spec(dept_id);

insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5');
insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6');
insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7');
insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8');
delete from ice_orc where last_name in ('ln7', 'ln8');

select * from ice_orc;
describe formatted ice_orc;
show compactions;

alter table ice_orc COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
Expand Down
Loading
Loading