Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28341: Iceberg: Change Major QB Full Table Compaction to compact… #5328

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,6 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
*/
private void commitCompaction(Table table, long startTime, FilesForCommit results,
RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) {
if (results.dataFiles().isEmpty()) {
LOG.info("Empty compaction commit, took {} ms for table: {}", System.currentTimeMillis() - startTime, table);
return;
}
if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
// Full table compaction
Transaction transaction = table.newTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1923,15 +1923,20 @@ public Optional<ErrorMsg> isEligibleForCompaction(

@Override
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec) throws SemanticException {
return getPartitionNames(table, partitionSpec).stream()
Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException {
return getPartitionNames(table, partitionSpec, latestSpecOnly).stream()
.map(partName -> {
Map<String, String> partSpecMap = Maps.newLinkedHashMap();
Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
return new DummyPartition(table, partName, partSpecMap);
}).collect(Collectors.toList());
}

public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return IcebergTableUtil.isPartitioned(table);
}

@Override
public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec, Context.RewritePolicy policy) throws SemanticException {
Expand All @@ -1958,12 +1963,13 @@ private Partition getPartitionImpl(org.apache.hadoop.hive.ql.metadata.Table tabl
*/
@Override
public List<String> getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Map<String, String> partitionSpec) throws SemanticException {
Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException {
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());

try {
return IcebergTableUtil
.getPartitionInfo(icebergTable, partitionSpec, true).entrySet().stream().map(e -> {
.getPartitionInfo(icebergTable, partitionSpec, true, latestSpecOnly).entrySet().stream()
.map(e -> {
PartitionData partitionData = e.getKey();
int specId = e.getValue();
return icebergTable.specs().get(specId).partitionToPath(partitionData);
Expand Down Expand Up @@ -2112,4 +2118,10 @@ public List<FileStatus> getMergeTaskInputFiles(Properties properties) throws IOE
public MergeTaskProperties getMergeTaskProperties(Properties properties) {
return new IcebergMergeTaskProperties(properties);
}

@Override
public boolean isUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return table.specs().size() > 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,13 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy
return data;
}

public static List<DataFile> getDataFiles(Table table, int specId,
String partitionPath) {
public static List<DataFile> getDataFiles(Table table, int specId, String partitionPath) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return file.specId() == specId && table.specs()
return partitionPath == null ? file.specId() != specId : file.specId() == specId && table.specs()
.get(specId).partitionToPath(file.partition()).equals(partitionPath);
});
return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file()));
Expand All @@ -413,7 +412,7 @@ public static List<DeleteFile> getDeleteFiles(Table table, int specId, String pa
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
return file.specId() == specId && table.specs()
return partitionPath == null ? file.specId() != specId : file.specId() == specId && table.specs()
.get(specId).partitionToPath(file.partition()).equals(partitionPath);
});
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
Expand Down Expand Up @@ -465,7 +464,7 @@ public static List<PartitionField> getPartitionFields(Table table) {
}

public static Map<PartitionData, Integer> getPartitionInfo(Table icebergTable, Map<String, String> partSpecMap,
boolean allowPartialSpec) throws SemanticException, IOException {
boolean allowPartialSpec, boolean latestSpecOnly) throws SemanticException, IOException {
Expression expression = IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTable, partSpecMap);
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
.createMetadataTableInstance(icebergTable, MetadataTableType.PARTITIONS);
Expand All @@ -484,10 +483,16 @@ public static Map<PartitionData, Integer> getPartitionInfo(Table icebergTable, M
ResidualEvaluator resEval = ResidualEvaluator.of(icebergTable.specs().get(entry.getValue()),
expression, false);
return resEval.residualFor(entry.getKey()).isEquivalentTo(Expressions.alwaysTrue()) &&
(entry.getKey().size() == partSpecMap.size() || allowPartialSpec);
(entry.getKey().size() == partSpecMap.size() || allowPartialSpec) &&
(entry.getValue() == icebergTable.spec().specId() || !latestSpecOnly);
}).forEach(entry -> result.put(entry.getKey(), entry.getValue())));
}

return result;
}

public static boolean isPartitioned(Table table) {
return IcebergTableUtil.getPartitionFields(table).size() > 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,33 @@ public boolean run(CompactorContext context) throws IOException, HiveException,

HiveConf conf = new HiveConf(context.getConf());
String partSpec = context.getCompactionInfo().partName;
org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(),
context.getTable().getTableName());
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
String compactionQuery;

if (partSpec == null) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name());
compactionQuery = String.format("insert overwrite table %s select * from %<s", compactTableName);
if (!IcebergTableUtil.isPartitioned(icebergTable)) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name());
compactionQuery = String.format("insert overwrite table %s select * from %<s", compactTableName);
} else if (icebergTable.specs().size() > 1) {
// Compacting partitions of old partition specs on a partitioned table with partition evolution
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
conf.set(IcebergCompactionService.PARTITION_SPEC_ID, String.valueOf(icebergTable.spec().specId()));
// A single filter on a virtual column causes errors during compilation,
// added another filter on file_path as a workaround.
compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " +
"where %2$s != %3$d and %4$s is not null",
compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
VirtualColumn.FILE_PATH.getName());
} else {
return true;
}
} else {
org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(),
context.getTable().getTableName());
Map<String, String> partSpecMap = new LinkedHashMap<>();
Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);

Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
Map<PartitionData, Integer> partitionInfo = IcebergTableUtil
.getPartitionInfo(icebergTable, partSpecMap, false);
.getPartitionInfo(icebergTable, partSpecMap, false, false);
Optional<Integer> specId = partitionInfo.values().stream().findFirst();

if (!specId.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (
first_name string,
last_name string,
dept_id bigint
)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc VALUES ('fn1','ln1', 1);
insert into ice_orc VALUES ('fn2','ln2', 1);
insert into ice_orc VALUES ('fn3','ln3', 1);
insert into ice_orc VALUES ('fn4','ln4', 1);
delete from ice_orc where last_name in ('ln3', 'ln4');

alter table ice_orc set partition spec(dept_id);

insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5');
insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6');
insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7');
insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8');
delete from ice_orc where last_name in ('ln7', 'ln8');

select * from ice_orc;
describe formatted ice_orc;
show compactions;

alter table ice_orc COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ Table Parameters:
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"1256\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"}
current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
format-version 2
Expand Down Expand Up @@ -340,7 +340,7 @@ Table Parameters:
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"6\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"}
current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"8\",\"removed-position-delete-files\":\"5\",\"removed-delete-files\":\"5\",\"added-records\":\"3\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"5\",\"changed-partition-count\":\"5\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
format-version 2
Expand All @@ -352,7 +352,7 @@ Table Parameters:
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 19
snapshot-count 20
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
Expand All @@ -374,4 +374,6 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
Loading
Loading