Skip to content

Commit

Permalink
HIVE-27653: Iceberg: Add conflictDetectionFilter to validate concurre…
Browse files Browse the repository at this point in the history
…ntly added data and delete files (Simhadri Govindappa, reviewed by Ayush Saxena, Denys Kuzmenko)

Closes apache#4761
  • Loading branch information
simhadri-g authored and dengzhhu653 committed Jun 12, 2024
1 parent c7ebc71 commit 36cefb2
Show file tree
Hide file tree
Showing 10 changed files with 443 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value";
private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;

private static final String HIVE_ICEBERG_STORAGE_HANDLER = "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";

private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of(
// gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things but with different names
GC_ENABLED, "external.table.purge",
Expand Down Expand Up @@ -388,8 +390,11 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableM

// If needed set the 'storage_handler' property to enable query from Hive
if (hiveEngineEnabled) {
parameters.put(hive_metastoreConstants.META_TABLE_STORAGE,
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler");
String storageHandler = parameters.get(hive_metastoreConstants.META_TABLE_STORAGE);
// Check if META_TABLE_STORAGE is not present or is not an instance of ICEBERG_STORAGE_HANDLER
if (storageHandler == null || !isHiveIcebergStorageHandler(storageHandler)) {
parameters.put(hive_metastoreConstants.META_TABLE_STORAGE, HIVE_ICEBERG_STORAGE_HANDLER);
}
} else {
parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE);
}
Expand Down Expand Up @@ -591,4 +596,19 @@ HiveLock lockObject(TableMetadata metadata) {
return new NoLock();
}
}

/**
* Checks if the storage_handler property is already set to HIVE_ICEBERG_STORAGE_HANDLER.
* @param storageHandler Storage Handler class
* @return true if the storage_handler property is set to HIVE_ICEBERG_STORAGE_HANDLER
*/
private static boolean isHiveIcebergStorageHandler(String storageHandler) {
try {
Class<?> storageHandlerClass = Class.forName(storageHandler);
Class<?> icebergStorageHandlerClass = Class.forName(HIVE_ICEBERG_STORAGE_HANDLER);
return icebergStorageHandlerClass.isAssignableFrom(storageHandlerClass);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Error checking storage handler class", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private InputFormatConfig() {
public static final String CATALOG_WAREHOUSE_TEMPLATE = "iceberg.catalog.%s.warehouse";
public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default.";
public static final String QUERY_FILTERS = "iceberg.query.filters";

public enum InMemoryDataModel {
PIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ static Expression icebergDataFilterFromHiveConf(Configuration conf) {
if (hiveFilter != null) {
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
return getFilterExpr(conf, exprNodeDesc);
}
return null;
}

/**
* getFilterExpr extracts search argument from ExprNodeGenericFuncDesc and returns Iceberg Filter Expression
* @param conf - job conf
* @param exprNodeDesc - Describes a GenericFunc node
* @return Iceberg Filter Expression
*/
static Expression getFilterExpr(Configuration conf, ExprNodeGenericFuncDesc exprNodeDesc) {
if (exprNodeDesc != null) {
SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc);
try {
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -421,11 +422,19 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
Table table = null;
String branchName = null;

Expression filterExpr = Expressions.alwaysTrue();

for (JobContext jobContext : outputTable.jobContexts) {
JobConf conf = jobContext.getJobConf();
table = Optional.ofNullable(table).orElse(Catalogs.loadTable(conf, catalogProperties));
branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);

Expression jobContextFilterExpr = (Expression) SessionStateUtil.getResource(conf, InputFormatConfig.QUERY_FILTERS)
.orElse(Expressions.alwaysTrue());
if (!filterExpr.equals(jobContextFilterExpr)) {
filterExpr = Expressions.and(filterExpr, jobContextFilterExpr);
}
LOG.debug("Filter Expression :{}", filterExpr);
LOG.info("Committing job has started for table: {}, using location: {}",
table, generateJobLocation(outputTable.table.location(), conf, jobContext.getJobID()));

Expand Down Expand Up @@ -458,7 +467,7 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
.map(String::valueOf).collect(Collectors.joining(",")));
} else {
Long snapshotId = getSnapshotId(outputTable.table, branchName);
commitWrite(table, branchName, snapshotId, startTime, filesForCommit, operation);
commitWrite(table, branchName, snapshotId, startTime, filesForCommit, operation, filterExpr);
}
} else {

Expand All @@ -483,12 +492,13 @@ private Long getSnapshotId(Table table, String branchName) {
/**
* Creates and commits an Iceberg change with the provided data and delete files.
* If there are no delete files then an Iceberg 'append' is created, otherwise Iceberg 'overwrite' is created.
* @param table The table we are changing
* @param startTime The start time of the commit - used only for logging
* @param results The object containing the new files we would like to add to the table
* @param table The table we are changing
* @param startTime The start time of the commit - used only for logging
* @param results The object containing the new files we would like to add to the table
* @param filterExpr Filter expression for conflict detection filter
*/
private void commitWrite(Table table, String branchName, Long snapshotId, long startTime,
FilesForCommit results, Operation operation) {
FilesForCommit results, Operation operation, Expression filterExpr) {

if (!results.replacedDataFiles().isEmpty()) {
OverwriteFiles write = table.newOverwrite();
Expand All @@ -501,6 +511,7 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
if (snapshotId != null) {
write.validateFromSnapshot(snapshotId);
}
write.conflictDetectionFilter(filterExpr);
write.validateNoConflictingData();
write.validateNoConflictingDeletes();
write.commit();
Expand All @@ -525,6 +536,8 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
if (snapshotId != null) {
write.validateFromSnapshot(snapshotId);
}
write.conflictDetectionFilter(filterExpr);

if (!results.dataFiles().isEmpty()) {
write.validateDeletedFiles();
write.validateNoConflictingDeleteFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,14 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese
}
}
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) pushedPredicate;
Expression filterExpr = (Expression) HiveIcebergInputFormat.getFilterExpr(conf, predicate.pushedPredicate);
if (filterExpr != null) {
SessionStateUtil.addResource(conf, InputFormatConfig.QUERY_FILTERS, filterExpr);
}
return predicate;
}


@Override
public boolean canProvideBasicStatistics() {
return true;
Expand Down Expand Up @@ -748,8 +753,7 @@ public void storageHandlerCommit(Properties commitProperties, Operation operatio
if (jobContextList.isEmpty()) {
return;
}

HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
HiveIcebergOutputCommitter committer = getOutputCommitter();
try {
committer.commitJobs(jobContextList, operation);
} catch (Throwable e) {
Expand All @@ -769,6 +773,10 @@ public void storageHandlerCommit(Properties commitProperties, Operation operatio
}
}

public HiveIcebergOutputCommitter getOutputCommitter() {
return new HiveIcebergOutputCommitter();
}

@Override
public boolean isAllowedAlterOperation(AlterTableType opType) {
return HiveIcebergMetaHook.SUPPORTED_ALTER_OPS.contains(opType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.util.concurrent.Phaser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* HiveIcebergStorageHandlerStub is used only for unit tests.
* Currently, we use it to achieve a specific thread interleaving to simulate conflicts in concurrent writes
* deterministically.
*/
public class HiveIcebergStorageHandlerStub extends HiveIcebergStorageHandler {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergStorageHandlerStub.class);

@Override
public HiveIcebergOutputCommitter getOutputCommitter() {

try {
LOG.debug(" Using HiveIcebergStorageHandlerStub for unit tests");
if (TestUtilPhaser.isInstantiated()) {
Phaser testUtilPhaser = TestUtilPhaser.getInstance().getPhaser();
LOG.debug("Activating the Phaser Barrier for thread: {} ", Thread.currentThread().getName());
testUtilPhaser.arriveAndAwaitAdvance();
LOG.debug("Breaking the Phaser Barrier and deregistering the phaser for thread: {} ",
Thread.currentThread().getName());
}
} catch (Exception e) {
throw new RuntimeException("Phaser failed: ", e);
}

return new HiveIcebergOutputCommitter();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public static Collection<Object[]> parameters() {
public TemporaryFolder temp = new TemporaryFolder();

@Rule
public Timeout timeout = new Timeout(400_000, TimeUnit.MILLISECONDS);
public Timeout timeout = new Timeout(500_000, TimeUnit.MILLISECONDS);

@BeforeClass
public static void beforeClass() {
Expand Down
Loading

0 comments on commit 36cefb2

Please sign in to comment.