Skip to content

Commit

Permalink
HIVE-28077: Iceberg: Major QB Compaction on partition level
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Fingerman committed Apr 9, 2024
1 parent 5e78ce0 commit 15b94f2
Show file tree
Hide file tree
Showing 15 changed files with 1,177 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
}
}

private Long getSnapshotId(Table table, String branchName) {
public static Long getSnapshotId(Table table, String branchName) {
Optional<Long> snapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId);
if (StringUtils.isNotEmpty(branchName)) {
String ref = HiveUtils.getTableSnapshotRef(branchName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static Table getTable(Configuration configuration, org.apache.hadoop.hive.metast
return getTable(configuration, properties, skipCache);
}

static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) {
public static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) {
return getTable(configuration, hmsTable, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,28 @@
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.Table;
import org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
import org.apache.iceberg.types.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergMajorQueryCompactor extends QueryCompactor {

private static final Logger LOG = LoggerFactory.getLogger(IcebergMajorQueryCompactor.class.getName());
private static final String branchName = "compaction";

@Override
public boolean run(CompactorContext context) throws IOException, HiveException, InterruptedException {
Expand All @@ -44,22 +53,117 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
Map<String, String> tblProperties = context.getTable().getParameters();
LOG.debug("Initiating compaction for the {} table", compactTableName);

String compactionQuery = String.format("insert overwrite table %s select * from %<s",
compactTableName);
String partSpec = context.getCompactionInfo().partName;
String compactionQuery;

SessionState sessionState = setupQueryCompactionSession(context.getConf(),
context.getCompactionInfo(), tblProperties);
HiveConf.setVar(context.getConf(), ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name());

try {
DriverUtils.runOnDriver(context.getConf(), sessionState, compactionQuery);
LOG.info("Completed compaction for table {}", compactTableName);
} catch (HiveException e) {
LOG.error("Error doing query based {} compaction", RewritePolicy.ALL_PARTITIONS.name(), e);
throw new RuntimeException(e);
if (partSpec == null) {
HiveConf.setVar(context.getConf(), ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name());

compactionQuery = String.format("insert overwrite table %s select * from %<s", compactTableName);

executeHiveQuery(context, compactionQuery, sessionState,
String.format("compacting table %s", compactTableName));
} else {
dropBranch(context, sessionState);

String createBranchQuery = String.format("alter table %s create branch %s", compactTableName,
branchName);

executeHiveQuery(context, createBranchQuery, sessionState,
String.format("creating compaction branch for table %s", compactTableName));

Table table = IcebergTableUtil.getTable(context.getConf(), context.getTable());
Long branchSnapshotId = HiveIcebergOutputCommitter.getSnapshotId(table, "branch_" + branchName);
PartitionData partitionData = DataFiles.data(table.spec(), partSpec);

String truncateQuery = String.format("truncate table %s.branch_%s partition(%s)",
compactTableName, branchName, partDataToSQL(partitionData, partSpec, ","));

executeHiveQuery(context, truncateQuery, sessionState,
String.format("truncating partition %s of table %s.branch_%s", partSpec, compactTableName,
branchName));

compactionQuery = String.format("insert overwrite table %1$s.branch_%4$s partition(%2$s) " +
"select * from %1$s for system_version as of %5$s where %3$s",
compactTableName, partDataToSQL(partitionData, partSpec, ","),
partDataToSQL(partitionData, partSpec, " and "), branchName, branchSnapshotId);

executeHiveQuery(context, compactionQuery, sessionState,
String.format("compacting table %s.branch_%s", compactTableName, branchName));

IcebergTableUtil.fastForwardBranch(table, "main", branchName);

dropBranch(context, sessionState);
}

return true;
} finally {
sessionState.setCompaction(false);
}
}

private void dropBranch(CompactorContext context, SessionState sessionState) throws HiveException {

Table table = IcebergTableUtil.getTable(context.getConf(), context.getTable());
String compactTableName = TableName.getDbTable(context.getTable().getDbName(), context.getTable().getTableName());

if (HiveIcebergOutputCommitter.getSnapshotId(table, "branch_" + branchName) != null) {

String dropBranchQuery = String.format("alter table %s drop branch %s",
compactTableName, branchName);

executeHiveQuery(context, dropBranchQuery, sessionState,
String.format("dropping compaction branch"));
}
}

private void executeHiveQuery(CompactorContext context, String query, SessionState sessionState,
String message) throws HiveException {

try {
DriverUtils.runOnDriver(context.getConf(), sessionState, query);
LOG.info("Completed " + message);
} catch (HiveException e) {
LOG.error("Error " + message, e);
throw e;
}
}

private String partDataToSQL(PartitionData partitionData, String partSpec, String delimiter)
throws HiveException {

try {
Map<String, String> partSpecMap = Warehouse.makeSpecFromName(partSpec);
StringBuilder sb = new StringBuilder();

return true;
for (int i = 0; i < partitionData.size(); ++i) {
if (i > 0) {
sb.append(delimiter);
}

String quoteOpt = "";
if (partitionData.getType(i).typeId() == Type.TypeID.STRING ||
partitionData.getType(i).typeId() == Type.TypeID.DATE ||
partitionData.getType(i).typeId() == Type.TypeID.TIME ||
partitionData.getType(i).typeId() == Type.TypeID.TIMESTAMP ||
partitionData.getType(i).typeId() == Type.TypeID.BINARY) {
quoteOpt = "'";
}

sb.append(partitionData.getSchema().getFields().get(i).name())
.append("=")
.append(quoteOpt)
.append(partSpecMap.get(partitionData.getPartitionType().fields().get(i).name()))
.append(quoteOpt);
}

return sb.toString();
} catch (MetaException e) {
throw new HiveException("Failed constructing compaction queries with partition spec " + partSpec, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask the totalSize value as it can have slight variability, causing test flakiness
--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (
first_name string,
last_name string
)
partitioned by (dept_id bigint,
city string,
registration_date date)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc VALUES ('fn1','ln1',1,'London','2024-03-11');
insert into ice_orc VALUES ('fn2','ln2',1,'London','2024-03-11');
insert into ice_orc VALUES ('fn3','ln3',1,'London','2024-03-11');
insert into ice_orc VALUES ('fn4','ln4',1,'London','2024-03-11');
insert into ice_orc VALUES ('fn5','ln5',2,'Paris','2024-02-16');
insert into ice_orc VALUES ('fn6','ln6',2,'Paris','2024-02-16');
insert into ice_orc VALUES ('fn7','ln7',2,'Paris','2024-02-16');

update ice_orc set last_name = 'ln1a' where first_name='fn1';
update ice_orc set last_name = 'ln2a' where first_name='fn2';
update ice_orc set last_name = 'ln3a' where first_name='fn3';
update ice_orc set last_name = 'ln4a' where first_name='fn4';
update ice_orc set last_name = 'ln5a' where first_name='fn5';
update ice_orc set last_name = 'ln6a' where first_name='fn6';
update ice_orc set last_name = 'ln7a' where first_name='fn7';

delete from ice_orc where last_name in ('ln1a', 'ln2a', 'ln7a');

select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc PARTITION (dept_id=1, city='London', registration_date='2024-03-11') COMPACT 'major' and wait;
alter table ice_orc PARTITION (dept_id=1, city='London', registration_date='2024-03-11') COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;

explain alter table ice_orc PARTITION (dept_id=2, city='Paris', registration_date='2024-02-16') COMPACT 'major' and wait;
alter table ice_orc PARTITION (dept_id=2, city='Paris', registration_date='2024-02-16') COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask the totalSize value as it can have slight variability, causing test flakiness
--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (
first_name string,
last_name string,
dept_id bigint,
city string
)
partitioned by (registration_date date)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc values
('fn1','ln1',1,'London','2024-03-11'),
('fn2','ln2',1,'London','2024-03-11'),
('fn3','ln3',1,'London','2024-03-11'),
('fn4','ln4',1,'London','2024-03-11'),
('fn5','ln5',2,'Paris','2024-03-11'),
('fn6','ln6',2,'Paris','2024-03-11'),
('fn7','ln7',2,'Paris','2024-03-11');

update ice_orc set last_name = 'ln1a' where first_name='fn1';
update ice_orc set last_name = 'ln2a' where first_name='fn2';
update ice_orc set last_name = 'ln3a' where first_name='fn3';
update ice_orc set last_name = 'ln4a' where first_name='fn4';
update ice_orc set last_name = 'ln5a' where first_name='fn5';
update ice_orc set last_name = 'ln6a' where first_name='fn6';
update ice_orc set last_name = 'ln7a' where first_name='fn7';

delete from ice_orc where last_name in ('ln1a', 'ln2a', 'ln7a');

alter table ice_orc set partition spec (dept_id, city, registration_date);

select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc partition (dept_id=1, city='London', registration_date='2024-03-11') compact 'major' and wait;
alter table ice_orc partition (registration_date='2024-03-11', dept_id=1, city='London' ) compact 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;

explain alter table ice_orc partition (dept_id=2, city='Paris', registration_date='2024-03-11') compact 'major' and wait;
alter table ice_orc partition (registration_date='2024-03-11', dept_id=2, city='Paris') compact 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions;
Loading

0 comments on commit 15b94f2

Please sign in to comment.