Skip to content

Commit

Permalink
HIVE-27731: Iceberg: Perform metadata delete for queries with static …
Browse files Browse the repository at this point in the history
…filters (#4748) (Sourabh Badhya reviewed by Denys Kuzmenko, Krisztian Kasa)
  • Loading branch information
SourabhBadhya authored Oct 19, 2023
1 parent 162da6d commit b02cef4
Show file tree
Hide file tree
Showing 24 changed files with 1,004 additions and 368 deletions.
2 changes: 2 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2672,6 +2672,8 @@ public static enum ConfVars {

HIVE_OPTIMIZE_REPLACE_DELETE_WITH_TRUNCATE("hive.optimize.delete.all", false,
"Optimize delete the entire data from table, use truncate instead"),
HIVE_OPTIMIZE_METADATA_DELETE("hive.optimize.delete.metadata.only", true,
"Optimize delete the entire data from table, use truncate instead"),
HIVE_OPTIMIZE_LIMIT("hive.optimize.limit", true,
"Optimize limit by pushing through Left Outer Joins and Selects"),
HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable top n key optimizer."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,15 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
Expand All @@ -175,6 +180,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Throwables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand All @@ -185,6 +191,7 @@
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SerializationUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.StructProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -832,6 +839,12 @@ public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
(AlterTableExecuteSpec.CherryPickSpec) executeSpec.getOperationParams();
IcebergTableUtil.cherryPick(icebergTable, cherryPickSpec.getSnapshotId());
break;
case DELETE_METADATA:
AlterTableExecuteSpec.DeleteMetadataSpec deleteMetadataSpec =
(AlterTableExecuteSpec.DeleteMetadataSpec) executeSpec.getOperationParams();
IcebergTableUtil.performMetadataDelete(icebergTable, deleteMetadataSpec.getBranchName(),
deleteMetadataSpec.getSarg());
break;
default:
throw new UnsupportedOperationException(
String.format("Operation type %s is not supported", executeSpec.getOperationType().name()));
Expand Down Expand Up @@ -1878,4 +1891,45 @@ public ColumnInfo getColumnInfo(org.apache.hadoop.hive.ql.metadata.Table hmsTabl
throw new SemanticException(String.format("Unable to find a column with the name: %s", colName));
}
}

@Override
public boolean canPerformMetadataDelete(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
String branchName, SearchArgument sarg) {
Expression exp;
try {
exp = HiveIcebergFilterFactory.generateFilterExpression(sarg);
} catch (UnsupportedOperationException e) {
LOG.warn("Unable to create Iceberg filter," +
" continuing without metadata delete: ", e);
return false;
}
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());

// The following code is inspired & copied from Iceberg's SparkTable.java#canDeleteUsingMetadata
if (ExpressionUtil.selectsPartitions(exp, table, false)) {
return true;
}

TableScan scan = table.newScan().filter(exp).caseSensitive(false).includeColumnStats().ignoreResiduals();
if (branchName != null) {
scan.useRef(HiveUtils.getTableSnapshotRef(branchName));
}

try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
Map<Integer, Evaluator> evaluators = Maps.newHashMap();
StrictMetricsEvaluator metricsEvaluator =
new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table, branchName), exp);

return Iterables.all(tasks, task -> {
DataFile file = task.file();
PartitionSpec spec = task.spec();
Evaluator evaluator = evaluators.computeIfAbsent(spec.specId(), specId ->
new Evaluator(spec.partitionType(), Projections.strict(spec).project(exp)));
return evaluator.eval(file.partition()) || metricsEvaluator.eval(file);
});
} catch (IOException ioe) {
LOG.warn("Failed to close task iterable", ioe);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -39,6 +43,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
Expand Down Expand Up @@ -278,4 +283,13 @@ public static boolean isV2Table(Map<String, String> props) {
return props != null &&
"2".equals(props.get(TableProperties.FORMAT_VERSION));
}

public static void performMetadataDelete(Table icebergTable, String branchName, SearchArgument sarg) {
Expression exp = HiveIcebergFilterFactory.generateFilterExpression(sarg);
DeleteFiles deleteFiles = icebergTable.newDelete();
if (StringUtils.isNotEmpty(branchName)) {
deleteFiles = deleteFiles.toBranch(HiveUtils.getTableSnapshotRef(branchName));
}
deleteFiles.deleteFromRowFilter(exp).commit();
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--! qt:replace:/DeleteMetadataSpec(\S*)/#Masked#/
set hive.explain.user=false;

drop table if exists tbl_ice;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
-- SORT_QUERY_RESULTS
--! qt:replace:/DeleteMetadataSpec(\S*)/#Masked#/
set hive.explain.user=false;

create table ice_date (a int, b date) stored by iceberg stored as orc tblproperties ('format-version'='2');

insert into table ice_date values (1, '2021-01-01');
insert into table ice_date values (2, '2022-02-02'), (3, '2022-03-03');

delete from ice_date where b = '2022-02-02';
delete from ice_date where a = 1 and b = '2021-01-01';

select * from ice_date;

create table ice_date_year (a int, b date) stored by iceberg stored as orc tblproperties ('format-version'='2');
insert into table ice_date_year values (1, '2021-01-01');
insert into table ice_date_year values (2, '2022-02-02'), (3, '2022-03-03');

delete from ice_date_year where year(b) = 2022;

select * from ice_date_year;

-- Metadata delete should not be done here and fallback to normal delete.
create table ice_str_name (first_name string, last_name string) stored by iceberg stored as orc tblproperties ('format-version'='2');
insert into table ice_str_name values ('Alex', 'Clark');
insert into table ice_str_name values ('Bob', 'Bob');

delete from ice_str_name where first_name = last_name;

select * from ice_str_name;

-- Metadata delete should not be done here and fallback to normal delete.
create table ice_int_id (first_id int, last_id int) stored by iceberg stored as orc tblproperties ('format-version'='2');
insert into table ice_int_id values (7, 9);
insert into table ice_int_id values (8, 8);

delete from ice_int_id where first_id = last_id;

select * from ice_int_id;

-- Check if delete on a branch also uses the metadata delete whenever possible.
create table ice_branch_metadata_delete (a int, b string) stored by iceberg stored as orc tblproperties ('format-version'='2');
insert into table ice_branch_metadata_delete values (1, 'ABC');
insert into table ice_branch_metadata_delete values (2, 'DEF');
insert into table ice_branch_metadata_delete values (3, 'GHI');
insert into table ice_branch_metadata_delete values (4, 'JKL');

alter table ice_branch_metadata_delete create branch test01;
delete from default.ice_branch_metadata_delete.branch_test01 where a = 1;

select * from default.ice_branch_metadata_delete.branch_test01;

alter table ice_branch_metadata_delete drop branch test01;

-- Metadata delete must not be applied for multi-table scans which have subquery and fallback to normal delete logic.
create table ice_delete_multiple_table1 (a int, b string) stored by iceberg stored as orc tblproperties ('format-version' = '2');
create table ice_delete_multiple_table2 (a int, b string) stored by iceberg stored as orc tblproperties ('format-version' = '2');
insert into table ice_delete_multiple_table1 values (1, 'ABC'), (2, 'DEF'), (3, 'GHI');
insert into table ice_delete_multiple_table1 values (4, 'GHI'), (5, 'JKL'), (6, 'PQR');
insert into table ice_delete_multiple_table2 values (1, 'ABC'), (2, 'DEF'), (3, 'GHI');
insert into table ice_delete_multiple_table2 values (4, 'GHI'), (5, 'JKL'), (6, 'PQR');

delete from ice_delete_multiple_table2 where ice_delete_multiple_table2.a in (select ice_delete_multiple_table1.a from ice_delete_multiple_table1 where ice_delete_multiple_table1.b = 'GHI');

select * from ice_delete_multiple_table2;

create table test_delete_config (a int, b int) stored by iceberg stored as orc tblproperties ('format-version'='2');
insert into table test_delete_config values (1,2), (3,4);

explain delete from test_delete_config where b < 5;

set hive.optimize.delete.metadata.only=false;
explain delete from test_delete_config where b < 5;

drop table ice_date;
drop table ice_date_year;
drop table ice_str_name;
drop table ice_int_id;
drop table ice_branch_metadata_delete;
drop table ice_delete_multiple_table1;
drop table ice_delete_multiple_table2;
drop table test_delete_config;
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask width
--! qt:replace:/(width=17)\d+/$1####/
--! qt:replace:/(width=58)\d+/$1###/
-- Mask total data size
--! qt:replace:/(Data size: 35)\d+/$1####/
--! qt:replace:/(Data size: 11)\d+/$1####/

set hive.vectorized.execution.enabled=true;
set hive.llap.io.enabled=false;
Expand Down
Loading

0 comments on commit b02cef4

Please sign in to comment.