diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index a4e39998d1dc3..dd0f76b91cfa4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -27,6 +27,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -112,6 +113,7 @@ import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromBaseFiles; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFileSlices; @@ -548,10 +550,6 @@ protected abstract HoodieData getExpressionIndexRecords(List getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, - Map recordKeySecondaryKeyMap, - HoodieIndexDefinition indexDefinition); - private Pair> initializeExpressionIndexPartition(String indexName, String instantTime) throws Exception { HoodieIndexDefinition indexDefinition = getIndexDefinition(indexName); ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName); @@ -1051,6 +1049,7 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List Map> partitionToRecordMap = HoodieTableMetadataUtil.convertMetadataToRecords( engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, dataWriteConfig.getMetadataConfig(), - enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding()); + enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding(), getEngineType()); HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(records, commitMetadata); partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), records.union(additionalUpdates)); updateExpressionIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); @@ -1160,17 +1159,19 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception { List>>> partitionFilePairs = getPartitionFilePairs(commitMetadata); + if (partitionFilePairs.isEmpty()) { + return engineContext.emptyHoodieData(); + } // Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of // the secondary index partition for each of these keys. - List keysToRemove = HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(), - dataMetaClient, instantTime); + HoodieData keysToRemove = getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(), dataMetaClient, instantTime); HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition); // Fetch the secondary keys that each of the record keys ('keysToRemove') maps to - // This is obtained by scanning the entire secondary index partition in the metadata table - // This could be an expensive operation for a large commit (updating/deleting millions of rows) - Map recordKeySecondaryKeyMap = metadata.getSecondaryKeys(keysToRemove, indexDefinition.getIndexName()); - HoodieData deletedRecords = getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap, indexDefinition); + HoodiePairData recordKeySecondaryKeyMap = + metadata.getSecondaryKeys(keysToRemove, indexDefinition.getIndexName(), dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism()); + HoodieData deleteRecords = recordKeySecondaryKeyMap.map( + (recordKeyAndSecondaryKey) -> HoodieMetadataPayload.createSecondaryIndexRecord(recordKeyAndSecondaryKey.getKey(), recordKeyAndSecondaryKey.getValue(), indexDefinition.getIndexName(), true)); // first deduce parallelism to avoid too few tasks for large number of records. long totalWriteBytesForSecondaryIndex = commitMetadata.getPartitionToWriteStats().values().stream() .flatMap(Collection::stream) @@ -1181,16 +1182,13 @@ private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata c long targetPartitionSize = 100 * 1024 * 1024; int parallelism = (int) Math.max(1, (totalWriteBytesForSecondaryIndex + targetPartitionSize - 1) / targetPartitionSize); - return readSecondaryKeysFromBaseFiles( - engineContext, - partitionFilePairs, - parallelism, - this.getClass().getSimpleName(), - dataMetaClient, - getEngineType(), - indexDefinition) - .union(deletedRecords) - .distinctWithKey(HoodieRecord::getKey, parallelism); + // Load file system view for only the affected partitions on the driver. + // By loading on the driver one time, we avoid loading the same metadata multiple times on the executors. + HoodieMetadataFileSystemView fsView = getMetadataView(); + fsView.loadPartitions(partitionFilePairs.stream().map(Pair::getKey).collect(Collectors.toList())); + HoodieData insertRecords = + readSecondaryKeysFromBaseFiles(engineContext, partitionFilePairs, parallelism, this.getClass().getSimpleName(), dataMetaClient, getEngineType(), indexDefinition, fsView); + return insertRecords.union(deleteRecords).distinctWithKey(HoodieRecord::getKey, parallelism); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index be73658e0e24e..9b3308695de92 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -24,6 +24,8 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.data.HoodieListPairData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -93,6 +95,11 @@ public HoodieData emptyHoodieData() { return HoodieListData.eager(Collections.emptyList()); } + @Override + public HoodiePairData emptyHoodiePairData() { + return HoodieListPairData.eager(Collections.emptyList()); + } + @Override public HoodieData parallelize(List data, int parallelism) { return HoodieListData.eager(data); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 5d63075c68cee..cab79a36809bf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -203,9 +203,4 @@ protected HoodieTable getTable(HoodieWriteConfig writeConfig, HoodieTableMetaCli protected EngineType getEngineType() { return EngineType.FLINK; } - - @Override - public HoodieData getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map recordKeySecondaryKeyMap, HoodieIndexDefinition indexDefinition) { - throw new HoodieNotSupportedException("Flink metadata table does not support secondary index yet."); - } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java index f542d5a0f7f9c..a127089958b10 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.data.HoodieListPairData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -76,6 +78,11 @@ public HoodieData emptyHoodieData() { return HoodieListData.eager(Collections.emptyList()); } + @Override + public HoodiePairData emptyHoodiePairData() { + return HoodieListPairData.eager(Collections.emptyList()); + } + @Override public HoodieData parallelize(List data, int parallelism) { return HoodieListData.eager(data); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java index 52bf184d2dd1d..c6d5c96bff6ca 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java @@ -133,9 +133,4 @@ protected HoodieData getExpressionIndexRecords(List getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map recordKeySecondaryKeyMap, HoodieIndexDefinition indexDefinition) { - throw new HoodieNotSupportedException("Java metadata table writer does not support secondary index yet."); - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java index 6485e8e6ad411..723490d31c682 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableBiFunction; @@ -33,6 +34,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.data.HoodieSparkLongAccumulator; import org.apache.hudi.exception.HoodieException; @@ -40,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; @@ -108,6 +111,11 @@ public HoodieData emptyHoodieData() { return HoodieJavaRDD.of(javaSparkContext.emptyRDD()); } + @Override + public HoodiePairData emptyHoodiePairData() { + return HoodieJavaPairRDD.of(JavaPairRDD.fromJavaRDD(javaSparkContext.emptyRDD())); + } + public boolean supportsFileGroupReader() { return true; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index c3fa8269bb75c..0625d55183c76 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -19,7 +19,6 @@ package org.apache.hudi.metadata; import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.index.expression.HoodieSparkExpressionIndex; import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -42,6 +41,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.index.expression.HoodieExpressionIndex; +import org.apache.hudi.index.expression.HoodieSparkExpressionIndex; import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.storage.StorageConfiguration; @@ -62,7 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -236,20 +235,4 @@ protected HoodieTable getTable(HoodieWriteConfig writeConfig, HoodieTableMetaCli protected EngineType getEngineType() { return EngineType.SPARK; } - - @Override - public HoodieData getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map recordKeySecondaryKeyMap, HoodieIndexDefinition indexDefinition) { - HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; - if (recordKeySecondaryKeyMap.isEmpty()) { - return sparkEngineContext.emptyHoodieData(); - } - - List deletedRecords = new ArrayList<>(); - recordKeySecondaryKeyMap.forEach((key, value) -> { - HoodieRecord siRecord = HoodieMetadataPayload.createSecondaryIndexRecord(key, value, indexDefinition.getIndexName(), true); - deletedRecords.add(siRecord); - }); - - return HoodieJavaRDD.of(deletedRecords, sparkEngineContext, 1); - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java index 8f5e7ebaa2229..4de3a3b1d8995 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; @@ -67,6 +68,8 @@ public TaskContextSupplier getTaskContextSupplier() { public abstract HoodieData emptyHoodieData(); + public abstract HoodiePairData emptyHoodiePairData(); + public boolean supportsFileGroupReader() { return false; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java index d605349b30d32..979a65413e5ea 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.data.HoodieListPairData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; @@ -73,6 +75,11 @@ public HoodieData emptyHoodieData() { return HoodieListData.eager(Collections.emptyList()); } + @Override + public HoodiePairData emptyHoodiePairData() { + return HoodieListPairData.eager(Collections.emptyList()); + } + @Override public HoodieData parallelize(List data, int parallelism) { return HoodieListData.eager(data); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java index ac2739237f2b6..b96930a312016 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -61,13 +60,12 @@ public class BaseFileRecordParsingUtils { * @param instantTime instant time of interest. * @param storage instance of {@link HoodieStorage}. * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition. - * @throws IOException */ public static Iterator generateRLIMetadataHoodieRecordsForBaseFile(String basePath, HoodieWriteStat writeStat, Integer writesFileIdEncoding, String instantTime, - HoodieStorage storage) throws IOException { + HoodieStorage storage) { String partition = writeStat.getPartitionPath(); String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath()); String fileId = FSUtils.getFileId(latestFileName); @@ -100,12 +98,11 @@ public static Iterator generateRLIMetadataHoodieRecordsForBaseFile * @param writeStat {@link HoodieWriteStat} instance of interest. * @param storage {@link HoodieStorage} instance of interest. * @return list of record keys deleted or updated. - * @throws IOException */ @VisibleForTesting public static List getRecordKeysDeletedOrUpdated(String basePath, HoodieWriteStat writeStat, - HoodieStorage storage) throws IOException { + HoodieStorage storage) { String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath()); Set recordStatuses = new HashSet<>(); recordStatuses.add(RecordStatus.UPDATE); @@ -121,15 +118,14 @@ public static List getRecordKeysDeletedOrUpdated(String basePath, * @param basePath base path of the table. * @param storage {@link HoodieStorage} instance of interest. * @return list of record keys deleted or updated. - * @throws IOException */ @VisibleForTesting public static Map> getRecordKeyStatuses(String basePath, - String partition, - String latestFileName, - String prevFileName, - HoodieStorage storage, - Set recordStatusesOfInterest) throws IOException { + String partition, + String latestFileName, + String prevFileName, + HoodieStorage storage, + Set recordStatusesOfInterest) { Set recordKeysFromLatestBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName); if (prevFileName == null) { if (recordStatusesOfInterest.contains(RecordStatus.INSERT)) { @@ -171,7 +167,7 @@ public static Map> getRecordKeyStatuses(String basePa } } - private static Set getRecordKeysFromBaseFile(HoodieStorage storage, String basePath, String partition, String fileName) throws IOException { + private static Set getRecordKeysFromBaseFile(HoodieStorage storage, String basePath, String partition, String fileName) { StoragePath dataFilePath = new StoragePath(basePath, StringUtils.isNullOrEmpty(partition) ? fileName : (partition + Path.SEPARATOR) + fileName); FileFormatUtils fileFormatUtils = HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.PARQUET); return fileFormatUtils.readRowKeys(storage, dataFilePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 4aea9eeb3566a..959e0b65866f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -24,6 +24,8 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -338,12 +340,12 @@ public Map> readSecondaryIndex(List secondary-key) for the provided record keys. */ - public Map getSecondaryKeys(List recordKeys, String secondaryIndexName) { + public HoodiePairData getSecondaryKeys(HoodieData recordKeys, String secondaryIndexName, int parallelism) { ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX), "Record index is not initialized in MDT"); ValidationUtils.checkState(dataMetaClient.getTableConfig().getMetadataPartitions().contains(secondaryIndexName), "Secondary index is not initialized in MDT"); - return getSecondaryKeysForRecordKeys(recordKeys, secondaryIndexName); + return getSecondaryKeysForRecordKeys(recordKeys, secondaryIndexName, parallelism); } /** @@ -459,7 +461,7 @@ private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, Stri protected abstract Map> getRecordsByKeys(List keys, String partitionName); - protected abstract Map getSecondaryKeysForRecordKeys(List recordKeys, String partitionName); + protected abstract HoodiePairData getSecondaryKeysForRecordKeys(HoodieData recordKeys, String partitionName, int batchSize); /** * Returns a map of (secondary-key -> set-of-record-keys) for the provided secondary keys. diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 522b004112786..d8b074c33ad20 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.model.FileSlice; @@ -75,7 +76,6 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES; import static org.apache.hudi.common.util.CollectionUtils.toStream; @@ -801,28 +801,47 @@ public int getNumFileGroupsForPartition(MetadataPartitionType partition) { } @Override - protected Map getSecondaryKeysForRecordKeys(List recordKeys, String partitionName) { + protected HoodiePairData getSecondaryKeysForRecordKeys(HoodieData recordKeys, String partitionName, int batchSize) { if (recordKeys.isEmpty()) { - return Collections.emptyMap(); + return getEngineContext().emptyHoodiePairData(); } // Load the file slices for the partition. Each file slice is a shard which saves a portion of the keys. List partitionFileSlices = partitionFileSliceMap.computeIfAbsent(partitionName, k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, metadataFileSystemView, partitionName)); if (partitionFileSlices.isEmpty()) { - return Collections.emptyMap(); + return getEngineContext().emptyHoodiePairData(); } - // Parallel lookup keys from each file slice - Map reverseSecondaryKeyMap = new HashMap<>(recordKeys.size()); - getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Lookup secondary keys from metadata table partition " + partitionName); - List> secondaryToRecordKeyPairList = getEngineContext().flatMap(partitionFileSlices, - (SerializableFunction>>) v1 -> reverseLookupSecondaryKeys(partitionName, recordKeys, v1) - .entrySet().stream() - .map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList()).stream(), partitionFileSlices.size()); + // Step 1: Batch record keys + HoodieData> batchedRecordKeys = recordKeys.mapPartitions(iter -> { + List> batches = new ArrayList<>(); + List currentBatch = new ArrayList<>(); + + while (iter.hasNext()) { + currentBatch.add(iter.next()); + if (currentBatch.size() == batchSize) { + batches.add(new ArrayList<>(currentBatch)); + currentBatch.clear(); + } + } + + // Add any remaining items as the last batch + if (!currentBatch.isEmpty()) { + batches.add(currentBatch); + } + + return batches.iterator(); + }, true); - secondaryToRecordKeyPairList.forEach(pair -> reverseSecondaryKeyMap.put(pair.getKey(), pair.getValue())); - return reverseSecondaryKeyMap; + // Step 2: Process each batch of record keys against all file slices + return batchedRecordKeys.flatMapToPair(recordKeysBatch -> { + Map reverseSecondaryKeyMap = new HashMap<>(); + for (FileSlice fileSlice : partitionFileSlices) { + reverseSecondaryKeyMap.putAll(reverseLookupSecondaryKeys(partitionName, recordKeysBatch, fileSlice)); + } + return reverseSecondaryKeyMap.entrySet().stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).iterator(); + }); } private Map reverseLookupSecondaryKeys(String partitionName, List recordKeys, FileSlice fileSlice) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 27ea15bc2509f..0ffe83a64eb6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -377,7 +377,8 @@ public static Map> convertMetadataToRecords(Hoo HoodieMetadataConfig metadataConfig, List enabledPartitionTypes, String bloomFilterType, - int bloomIndexParallelism, Integer writesFileIdEncoding) { + int bloomIndexParallelism, Integer writesFileIdEncoding, + EngineType engineType) { final Map> partitionToRecordsMap = new HashMap<>(); final HoodieData filesPartitionRecordsRDD = context.parallelize( convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 1); @@ -402,7 +403,7 @@ public static Map> convertMetadataToRecords(Hoo } if (enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX)) { partitionToRecordsMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), convertMetadataToRecordIndexRecords(context, commitMetadata, metadataConfig, - dataMetaClient, writesFileIdEncoding, instantTime)); + dataMetaClient, writesFileIdEncoding, instantTime, engineType)); } return partitionToRecordsMap; } @@ -774,65 +775,93 @@ public static HoodieData convertMetadataToColumnStatsRecords(Hoodi @VisibleForTesting public static HoodieData convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext, - HoodieCommitMetadata commitMetadata, - HoodieMetadataConfig metadataConfig, - HoodieTableMetaClient dataTableMetaClient, - int writesFileIdEncoding, - String instantTime) { - + HoodieCommitMetadata commitMetadata, + HoodieMetadataConfig metadataConfig, + HoodieTableMetaClient dataTableMetaClient, + int writesFileIdEncoding, + String instantTime, + EngineType engineType) { List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() .flatMap(Collection::stream).collect(Collectors.toList()); - + // Return early if there are no write stats, or if the operation is a compaction. if (allWriteStats.isEmpty() || commitMetadata.getOperationType() == WriteOperationType.COMPACT) { return engineContext.emptyHoodieData(); } + // RLI cannot support logs having inserts with current offering. So, lets validate that. + if (allWriteStats.stream().anyMatch(writeStat -> { + String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); + return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0; + })) { + throw new HoodieIOException("RLI cannot support logs having inserts with current offering. Would recommend disabling Record Level Index"); + } try { - int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); + Map> writeStatsByFileId = allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId)); + int parallelism = Math.max(Math.min(writeStatsByFileId.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); String basePath = dataTableMetaClient.getBasePath().toString(); HoodieFileFormat baseFileFormat = dataTableMetaClient.getTableConfig().getBaseFileFormat(); - // RLI cannot support logs having inserts with current offering. So, lets validate that. - if (allWriteStats.stream().anyMatch(writeStat -> { - String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); - return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0; - })) { - throw new HoodieIOException("RLI cannot support logs having inserts with current offering. Would recommend disabling Record Level Index"); - } - - // we might need to set some additional variables if we need to process log files. - // for RLI and MOR table, we only care about log files if they contain any deletes. If not, all entries in logs are considered as updates, for which - // we do not need to generate new RLI record. - boolean anyLogFilesWithDeletes = allWriteStats.stream().anyMatch(writeStat -> { - String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); - return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0; - }); - - Option writerSchemaOpt = Option.empty(); - if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes. - writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); - } - int maxBufferSize = metadataConfig.getMaxReaderBufferSize(); StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); + Option writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); Option finalWriterSchemaOpt = writerSchemaOpt; - HoodieData recordIndexRecords = engineContext.parallelize(allWriteStats, parallelism) - .flatMap(writeStat -> { - HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); - StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); - // handle base files - if (writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) { - return BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); - } else if (FSUtils.isLogFile(fullFilePath)) { - // for logs, we only need to process log files containing deletes - if (writeStat.getNumDeletes() > 0) { - Set deletedRecordKeys = getRecordKeys(fullFilePath.toString(), dataTableMetaClient, - finalWriterSchemaOpt, maxBufferSize, instantTime, false, true); - return deletedRecordKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator(); - } - // ignore log file data blocks. - return new ArrayList().iterator(); - } else { - throw new HoodieIOException("Unsupported file type " + fullFilePath.toString() + " while generating MDT records"); + HoodieData recordIndexRecords = engineContext.parallelize(new ArrayList<>(writeStatsByFileId.entrySet()), parallelism) + .flatMap(writeStatsByFileIdEntry -> { + String fileId = writeStatsByFileIdEntry.getKey(); + List writeStats = writeStatsByFileIdEntry.getValue(); + // Partition the write stats into base file and log file write stats + List baseFileWriteStats = writeStats.stream() + .filter(writeStat -> writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) + .collect(Collectors.toList()); + List logFileWriteStats = writeStats.stream() + .filter(writeStat -> FSUtils.isLogFile(writeStat.getPath())) + .collect(Collectors.toList()); + // Ensure that only one of base file or log file write stats exists + checkState(baseFileWriteStats.isEmpty() || logFileWriteStats.isEmpty(), + "A single fileId cannot have both base file and log file write stats in the same commit. FileId: " + fileId); + // Process base file write stats + if (!baseFileWriteStats.isEmpty()) { + return baseFileWriteStats.stream() + .flatMap(writeStat -> { + HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); + return CollectionUtils.toStream(BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage)); + }) + .iterator(); + } + // Process log file write stats + if (!logFileWriteStats.isEmpty()) { + String partitionPath = logFileWriteStats.get(0).getPartitionPath(); + List currentLogFilePaths = logFileWriteStats.stream() + .map(writeStat -> new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()).toString()) + .collect(Collectors.toList()); + List allLogFilePaths = logFileWriteStats.stream() + .flatMap(writeStat -> { + checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file should be associated with a delta write stat"); + List currentLogFiles = ((HoodieDeltaWriteStat) writeStat).getLogFiles().stream() + .map(logFile -> new StoragePath(new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()), logFile).toString()) + .collect(Collectors.toList()); + return currentLogFiles.stream(); + }) + .collect(Collectors.toList()); + // Extract revived and deleted keys + Pair, Set> revivedAndDeletedKeys = + getRevivedAndDeletedKeysFromMergedLogs(dataTableMetaClient, instantTime, engineType, allLogFilePaths, finalWriterSchemaOpt, currentLogFilePaths); + Set revivedKeys = revivedAndDeletedKeys.getLeft(); + Set deletedKeys = revivedAndDeletedKeys.getRight(); + // Process revived keys to create updates + List revivedRecords = revivedKeys.stream() + .map(recordKey -> HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partitionPath, fileId, instantTime, writesFileIdEncoding)) + .collect(Collectors.toList()); + // Process deleted keys to create deletes + List deletedRecords = deletedKeys.stream() + .map(HoodieMetadataPayload::createRecordIndexDelete) + .collect(Collectors.toList()); + // Combine all records into one list + List allRecords = new ArrayList<>(); + allRecords.addAll(revivedRecords); + allRecords.addAll(deletedRecords); + return allRecords.iterator(); } + // No base file or log file write stats found + return Collections.emptyIterator(); }); // there are chances that same record key from data table has 2 entries (1 delete from older partition and 1 insert to newer partition) @@ -855,6 +884,129 @@ public static HoodieData convertMetadataToRecordIndexRecords(Hoodi } } + /** + * Get the deleted keys from the merged log files. The logic is as below. Suppose: + *
  • A = Set of keys that are valid (not deleted) in the previous log files merged
  • + *
  • B = Set of keys that are valid in all log files including current log file merged
  • + *
  • C = Set of keys that are deleted in the current log file
  • + *
  • Then, D = Set of deleted keys = C - (B - A)
  • + * + * @param dataTableMetaClient data table meta client + * @param instantTime timestamp of the commit + * @param engineType engine type (SPARK, FLINK, JAVA) + * @param logFilePaths list of log file paths including current and previous file slices + * @param finalWriterSchemaOpt records schema + * @param currentLogFilePaths list of log file paths for the current instant + * @return pair of revived and deleted keys + */ + @VisibleForTesting + public static Pair, Set> getRevivedAndDeletedKeysFromMergedLogs(HoodieTableMetaClient dataTableMetaClient, + String instantTime, + EngineType engineType, + List logFilePaths, + Option finalWriterSchemaOpt, + List currentLogFilePaths) { + // Separate out the current log files + List logFilePathsWithoutCurrentLogFiles = logFilePaths.stream() + .filter(logFilePath -> !currentLogFilePaths.contains(logFilePath)) + .collect(toList()); + if (logFilePathsWithoutCurrentLogFiles.isEmpty()) { + // Only current log file is present, so we can directly get the deleted record keys from it and return the RLI records. + Map currentLogRecords = + getLogRecords(currentLogFilePaths, dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType); + Set deletedKeys = currentLogRecords.entrySet().stream() + .filter(entry -> isDeleteRecord(dataTableMetaClient, finalWriterSchemaOpt, entry.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + return Pair.of(Collections.emptySet(), deletedKeys); + } + // Fetch log records for all log files + Map allLogRecords = + getLogRecords(logFilePaths, dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType); + + // Fetch log records for previous log files (excluding the current log files) + Map previousLogRecords = + getLogRecords(logFilePathsWithoutCurrentLogFiles, dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType); + + // Partition valid (non-deleted) and deleted keys from previous log files in a single pass + Map> partitionedKeysForPreviousLogs = previousLogRecords.entrySet().stream() + .collect(Collectors.partitioningBy( + entry -> !isDeleteRecord(dataTableMetaClient, finalWriterSchemaOpt, entry.getValue()), + Collectors.mapping(Map.Entry::getKey, Collectors.toSet()) + )); + Set validKeysForPreviousLogs = partitionedKeysForPreviousLogs.get(true); + Set deletedKeysForPreviousLogs = partitionedKeysForPreviousLogs.get(false); + + // Partition valid (non-deleted) and deleted keys from all log files, including current, in a single pass + Map> partitionedKeysForAllLogs = allLogRecords.entrySet().stream() + .collect(Collectors.partitioningBy( + entry -> !isDeleteRecord(dataTableMetaClient, finalWriterSchemaOpt, entry.getValue()), + Collectors.mapping(Map.Entry::getKey, Collectors.toSet()) + )); + Set validKeysForAllLogs = partitionedKeysForAllLogs.get(true); + Set deletedKeysForAllLogs = partitionedKeysForAllLogs.get(false); + + return computeRevivedAndDeletedKeys(validKeysForPreviousLogs, deletedKeysForPreviousLogs, validKeysForAllLogs, deletedKeysForAllLogs); + } + + private static boolean isDeleteRecord(HoodieTableMetaClient dataTableMetaClient, Option finalWriterSchemaOpt, HoodieRecord record) { + try { + return record.isDelete(finalWriterSchemaOpt.get(), dataTableMetaClient.getTableConfig().getProps()); + } catch (IOException e) { + throw new HoodieException("Failed to check if record is delete", e); + } + } + + private static Map getLogRecords(List logFilePaths, + HoodieTableMetaClient datasetMetaClient, + Option writerSchemaOpt, + String latestCommitTimestamp, + EngineType engineType) { + if (writerSchemaOpt.isPresent()) { + final StorageConfiguration storageConf = datasetMetaClient.getStorageConf(); + HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger( + datasetMetaClient.getBasePath().toString(), + engineType, + Collections.emptyList(), + datasetMetaClient.getTableConfig().getRecordMergeStrategyId()); + + HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder() + .withStorage(datasetMetaClient.getStorage()) + .withBasePath(datasetMetaClient.getBasePath()) + .withLogFilePaths(logFilePaths) + .withReaderSchema(writerSchemaOpt.get()) + .withLatestInstantTime(latestCommitTimestamp) + .withReverseReader(false) + .withMaxMemorySizeInBytes(storageConf.getLong(MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)) + .withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue()) + .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()) + .withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false)) + .withDiskMapType(storageConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue())) + .withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) + .withRecordMerger(recordMerger) + .withTableMetaClient(datasetMetaClient) + .build(); + return mergedLogRecordScanner.getRecords(); + } + return Collections.emptyMap(); + } + + @VisibleForTesting + public static Pair, Set> computeRevivedAndDeletedKeys(Set validKeysForPreviousLogs, + Set deletedKeysForPreviousLogs, + Set validKeysForAllLogs, + Set deletedKeysForAllLogs) { + // Compute revived keys: previously deleted but now valid + Set revivedKeys = new HashSet<>(deletedKeysForPreviousLogs); + revivedKeys.retainAll(validKeysForAllLogs); // Intersection of previously deleted and now valid + + // Compute deleted keys: previously valid but now deleted + Set deletedKeys = new HashSet<>(validKeysForPreviousLogs); + deletedKeys.retainAll(deletedKeysForAllLogs); // Intersection of previously valid and now deleted + + return Pair.of(revivedKeys, deletedKeys); + } + /** * There are chances that same record key from data table has 2 entries (1 delete from older partition and 1 insert to newer partition) * So, this method performs reduce by key to ignore the deleted entry. @@ -884,17 +1036,16 @@ public static HoodieData reduceByKeys(HoodieData rec } @VisibleForTesting - public static List getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext, - HoodieCommitMetadata commitMetadata, - HoodieMetadataConfig metadataConfig, - HoodieTableMetaClient dataTableMetaClient, - String instantTime) { - + public static HoodieData getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext, + HoodieCommitMetadata commitMetadata, + HoodieMetadataConfig metadataConfig, + HoodieTableMetaClient dataTableMetaClient, + String instantTime) { List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() .flatMap(Collection::stream).collect(Collectors.toList()); if (allWriteStats.isEmpty()) { - return Collections.emptyList(); + return engineContext.emptyHoodieData(); } try { @@ -930,17 +1081,45 @@ public static List getRecordKeysDeletedOrUpdated(HoodieEngineContext eng return BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat, storage).iterator(); } else if (FSUtils.isLogFile(fullFilePath)) { // for logs, every entry is either an update or a delete - return getRecordKeys(fullFilePath.toString(), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime, true, true) - .iterator(); + return getRecordKeys(Collections.singletonList(fullFilePath.toString()), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime, true, true).iterator(); } else { - throw new HoodieIOException("Found unsupported file type " + fullFilePath.toString() + ", while generating MDT records"); + throw new HoodieIOException("Found unsupported file type " + fullFilePath + ", while generating MDT records"); } - }).collectAsList(); + }); } catch (Exception e) { throw new HoodieException("Failed to fetch deleted record keys while preparing MDT records", e); } } + @VisibleForTesting + public static Set getRecordKeys(List logFilePaths, HoodieTableMetaClient datasetMetaClient, + Option writerSchemaOpt, int maxBufferSize, + String latestCommitTimestamp, boolean includeValidKeys, + boolean includeDeletedKeys) throws IOException { + if (writerSchemaOpt.isPresent()) { + // read log file records without merging + Set allRecordKeys = new HashSet<>(); + HoodieUnMergedLogRecordScanner.Builder builder = HoodieUnMergedLogRecordScanner.newBuilder() + .withStorage(datasetMetaClient.getStorage()) + .withBasePath(datasetMetaClient.getBasePath()) + .withLogFilePaths(logFilePaths) + .withBufferSize(maxBufferSize) + .withLatestInstantTime(latestCommitTimestamp) + .withReaderSchema(writerSchemaOpt.get()) + .withTableMetaClient(datasetMetaClient); + if (includeValidKeys) { + builder.withLogRecordScannerCallback(record -> allRecordKeys.add(record.getRecordKey())); + } + if (includeDeletedKeys) { + builder.withRecordDeletionCallback(deletedKey -> allRecordKeys.add(deletedKey.getRecordKey())); + } + HoodieUnMergedLogRecordScanner scanner = builder.build(); + scanner.scan(); + return allRecordKeys; + } + return Collections.emptySet(); + } + private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTableMetaClient, String instantTime, Map> partitionToFilesMap) { InstantGenerator factory = dataTableMetaClient.getInstantGenerator(); @@ -1540,35 +1719,6 @@ public static List> getLogFileColumnRangeM return Collections.emptyList(); } - @VisibleForTesting - public static Set getRecordKeys(String filePath, HoodieTableMetaClient datasetMetaClient, - Option writerSchemaOpt, int maxBufferSize, - String latestCommitTimestamp, boolean includeValidKeys, - boolean includeDeletedKeys) throws IOException { - if (writerSchemaOpt.isPresent()) { - // read log file records without merging - Set allRecordKeys = new HashSet<>(); - HoodieUnMergedLogRecordScanner.Builder builder = HoodieUnMergedLogRecordScanner.newBuilder() - .withStorage(datasetMetaClient.getStorage()) - .withBasePath(datasetMetaClient.getBasePath()) - .withLogFilePaths(Collections.singletonList(filePath)) - .withBufferSize(maxBufferSize) - .withLatestInstantTime(latestCommitTimestamp) - .withReaderSchema(writerSchemaOpt.get()) - .withTableMetaClient(datasetMetaClient); - if (includeValidKeys) { - builder.withLogRecordScannerCallback(record -> allRecordKeys.add(record.getRecordKey())); - } - if (includeDeletedKeys) { - builder.withRecordDeletionCallback(deletedKey -> allRecordKeys.add(deletedKey.getRecordKey())); - } - HoodieUnMergedLogRecordScanner scanner = builder.build(); - scanner.scan(); - return allRecordKeys; - } - return Collections.emptySet(); - } - /** * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by * the {@link LogicalTypes.Decimal} Avro logical type @@ -2212,7 +2362,8 @@ public static HoodieData readSecondaryKeysFromBaseFiles(HoodieEngi List>>> partitionFiles, int secondaryIndexMaxParallelism, String activeModule, HoodieTableMetaClient metaClient, EngineType engineType, - HoodieIndexDefinition indexDefinition) { + HoodieIndexDefinition indexDefinition, + HoodieMetadataFileSystemView fsView) { if (partitionFiles.isEmpty()) { return engineContext.emptyHoodieData(); } @@ -2228,9 +2379,17 @@ public static HoodieData readSecondaryKeysFromBaseFiles(HoodieEngi engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFiles.size() + " partitions"); return engineContext.parallelize(partitionFiles, parallelism).flatMap(partitionWithBaseAndLogFiles -> { final String partition = partitionWithBaseAndLogFiles.getKey(); + // get the log files for the partition and group them by fileId + Map> logFilesByFileId = getPartitionLatestFileSlicesIncludingInflight(metaClient, Option.of(fsView), partition).stream() + .map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().collect(toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); final Pair> baseAndLogFiles = partitionWithBaseAndLogFiles.getValue(); - List logFilePaths = new ArrayList<>(); - baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile)); + Set logFilePaths = new HashSet<>(); + baseAndLogFiles.getValue().forEach(logFile -> { + // add all log files for the fileId + logFilesByFileId.get(FSUtils.getFileIdFromFileName(logFile)).stream() + .map(HoodieLogFile::getPath).map(StoragePath::toString).forEach(logFilePaths::add); + logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile); + }); String baseFilePath = baseAndLogFiles.getKey(); Option dataFilePath = baseFilePath.isEmpty() ? Option.empty() : Option.of(FSUtils.constructAbsolutePath(basePath, baseFilePath)); Schema readerSchema; @@ -2241,7 +2400,7 @@ public static HoodieData readSecondaryKeysFromBaseFiles(HoodieEngi } else { readerSchema = tableSchema; } - return createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, readerSchema, partition, dataFilePath, indexDefinition, + return createSecondaryIndexGenerator(metaClient, engineType, new ArrayList<>(logFilePaths), readerSchema, partition, dataFilePath, indexDefinition, metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse("")); }); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java index a39465577c0ae..5aadd3634d4d4 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java @@ -64,6 +64,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.avro.TestHoodieAvroUtils.SCHEMA_WITH_AVRO_TYPES; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.computeRevivedAndDeletedKeys; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileIDForFileGroup; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForPartitionStats; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryIndex; @@ -589,4 +590,44 @@ public void testValidateDataTypeForPartitionStats() { .endRecord(); assertFalse(validateDataTypeForPartitionStats("dateField", schema)); } + + @Test + public void testComputeRevivedAndDeletedKeys() { + // Test Input Sets + Set validKeysForPreviousLogs = new HashSet<>(Arrays.asList("K1", "K2", "K3")); + Set deletedKeysForPreviousLogs = new HashSet<>(Arrays.asList("K4", "K5")); + Set validKeysForAllLogs = new HashSet<>(Arrays.asList("K2", "K3", "K6")); + Set deletedKeysForAllLogs = new HashSet<>(Arrays.asList("K1", "K5", "K7")); + + // Expected Results + Set expectedRevivedKeys = new HashSet<>(Collections.singletonList("K4")); // Revived: Deleted in previous but now valid + Set expectedDeletedKeys = new HashSet<>(Collections.singletonList("K1")); // Deleted: Valid in previous but now deleted + + // Compute Revived and Deleted Keys + Pair, Set> result = computeRevivedAndDeletedKeys(validKeysForPreviousLogs, deletedKeysForPreviousLogs, validKeysForAllLogs, deletedKeysForAllLogs); + assertEquals(expectedRevivedKeys, result.getKey()); + assertEquals(expectedDeletedKeys, result.getValue()); + + // Case 1: All keys remain valid + Set allValidKeys = new HashSet<>(Arrays.asList("K1", "K2", "K3")); + Set allEmpty = Collections.emptySet(); + result = computeRevivedAndDeletedKeys(allValidKeys, allEmpty, allValidKeys, allEmpty); + assertEquals(Collections.emptySet(), result.getKey()); + assertEquals(Collections.emptySet(), result.getValue()); + + // Case 2: All keys are deleted + result = computeRevivedAndDeletedKeys(allValidKeys, allEmpty, allEmpty, allValidKeys); + assertEquals(Collections.emptySet(), result.getKey()); + assertEquals(allValidKeys, result.getValue()); + + // Case 3: New keys added in the current log file - K9 + result = computeRevivedAndDeletedKeys(allValidKeys, allEmpty, new HashSet<>(Arrays.asList("K1", "K2", "K3", "K8")), new HashSet<>(Arrays.asList("K4", "K9"))); + assertEquals(Collections.emptySet(), result.getKey()); + assertEquals(new HashSet<>(Arrays.asList("K4", "K9")), result.getValue()); + + // Case 4: Empty input sets + result = computeRevivedAndDeletedKeys(Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet()); + assertEquals(Collections.emptySet(), result.getKey()); + assertEquals(Collections.emptySet(), result.getValue()); + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala index aa7f45dac260f..a7210fb20c468 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieSparkIndexClient -import org.apache.hudi.common.config.RecordMergeMode -import org.apache.hudi.common.model.{HoodieIndexDefinition, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.model.HoodieIndexDefinition import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.{StringUtils, ValidationUtils} import org.apache.hudi.exception.HoodieIndexException @@ -74,12 +73,6 @@ case class CreateIndexCommand(table: CatalogTable, } else { HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX } - if (derivedIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) { - if ((metaClient.getTableConfig.getPayloadClass != null && !(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName))) - || (metaClient.getTableConfig.getRecordMergeMode ne RecordMergeMode.COMMIT_TIME_ORDERING)) { - throw new HoodieIndexException("Secondary Index can only be enabled on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set to OVERWRITE_WITH_LATEST") - } - } new HoodieSparkIndexClient(sparkSession).create(metaClient, indexName, derivedIndexType, columnsMap, options.asJava, table.properties.asJava) } else { throw new HoodieIndexException(String.format("%s is not supported", indexType)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java index 03560c8597dc4..1e0a7915f0979 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -41,7 +42,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metadata.BaseFileRecordParsingUtils; import org.apache.hudi.metadata.HoodieMetadataPayload; -import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.testutils.HoodieClientTestBase; @@ -61,6 +62,11 @@ import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRecordKeys; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRevivedAndDeletedKeysFromMergedLogs; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.reduceByKeys; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -102,23 +108,19 @@ public void testRecordGenerationAPIsForCOW() throws IOException { writeStatuses1.forEach(writeStatus -> { assertEquals(writeStatus.getStat().getNumDeletes(), 0); // Fetch record keys for all - try { - String writeStatFileId = writeStatus.getFileId(); - if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) { - fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); - } + String writeStatFileId = writeStatus.getFileId(); + if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) { + fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); + } - // poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch MDT RLI records for inserts and deletes. - Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), - writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); - while (rliRecordsItr.hasNext()) { - HoodieRecord rliRecord = rliRecordsItr.next(); - String key = rliRecord.getRecordKey(); - String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath(); - recordKeyToPartitionMapping1.put(key, partition); - } - } catch (IOException e) { - throw new HoodieException("Should not have failed ", e); + // poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch MDT RLI records for inserts and deletes. + Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), + writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath(); + recordKeyToPartitionMapping1.put(key, partition); } }); @@ -210,23 +212,19 @@ public void testRecordGenerationAPIsForMOR() throws IOException { writeStatuses1.forEach(writeStatus -> { assertEquals(writeStatus.getStat().getNumDeletes(), 0); // Fetch record keys for all - try { - String writeStatFileId = writeStatus.getFileId(); - if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) { - fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); - } + String writeStatFileId = writeStatus.getFileId(); + if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) { + fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); + } - // poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch MDT RLI records for inserts and deletes. - Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), - writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); - while (rliRecordsItr.hasNext()) { - HoodieRecord rliRecord = rliRecordsItr.next(); - String key = rliRecord.getRecordKey(); - String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath(); - recordKeyToPartitionMapping1.put(key, partition); - } - } catch (IOException e) { - throw new HoodieException("Should not have failed ", e); + // poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch MDT RLI records for inserts and deletes. + Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), + writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath(); + recordKeyToPartitionMapping1.put(key, partition); } }); @@ -273,7 +271,7 @@ public void testRecordGenerationAPIsForMOR() throws IOException { Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit"); try { - HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3); + getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3); fail("Should not have reached here"); } catch (Exception e) { // no op @@ -285,8 +283,8 @@ public void testRecordGenerationAPIsForMOR() throws IOException { HoodieWriteMetadata compactionWriteMetadata = client.compact(compactionInstantOpt.get()); HoodieCommitMetadata compactionCommitMetadata = (HoodieCommitMetadata) compactionWriteMetadata.getCommitMetadata().get(); // no RLI records should be generated for compaction operation. - assertTrue(HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords(context, compactionCommitMetadata, writeConfig.getMetadataConfig(), - metaClient, writeConfig.getWritesFileIdEncoding(), compactionInstantOpt.get()).isEmpty()); + assertTrue(convertMetadataToRecordIndexRecords(context, compactionCommitMetadata, writeConfig.getMetadataConfig(), + metaClient, writeConfig.getWritesFileIdEncoding(), compactionInstantOpt.get(), EngineType.SPARK).isEmpty()); } } @@ -314,12 +312,13 @@ private void assertRLIandSIRecordGenerationAPIs(List inserts3, Lis writeStatuses3.stream().filter(writeStatus -> FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath()))) .forEach(writeStatus -> { try { + StoragePath fullFilePath = new StoragePath(basePath, writeStatus.getStat().getPath()); // used for RLI - finalActualDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt, - writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp, false, true)); + finalActualDeletes.addAll(getRevivedAndDeletedKeysFromMergedLogs(metaClient, latestCommitTimestamp, EngineType.SPARK, Collections.singletonList(fullFilePath.toString()), writerSchemaOpt, + Collections.singletonList(fullFilePath.toString())).getValue()); // used in SI flow - actualUpdatesAndDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt, + actualUpdatesAndDeletes.addAll(getRecordKeys(Collections.singletonList(fullFilePath.toString()), metaClient, writerSchemaOpt, writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp, true, true)); } catch (IOException e) { throw new HoodieIOException("Failed w/ IOException ", e); @@ -334,7 +333,7 @@ private void assertRLIandSIRecordGenerationAPIs(List inserts3, Lis // validate HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated for entire CommitMetadata which is used in SI code path. List updatedOrDeletedKeys = - new ArrayList<>(HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3)); + new ArrayList<>(getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3).collectAsList()); List expectedUpdatesOrDeletes = new ArrayList<>(expectedUpdates); expectedUpdatesOrDeletes.addAll(expectedRLIDeletes); assertListEquality(expectedUpatesAndDeletes, updatedOrDeletedKeys); @@ -368,14 +367,14 @@ public void testReducedByKeysForRLIRecords() throws IOException { recordsToTest.addAll(adjustedInserts); recordsToTest.addAll(deleteRecords); // happy paths. no dups. in and out are same. - List actualRecords = HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList(); + List actualRecords = reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList(); assertHoodieRecordListEquality(actualRecords, recordsToTest); // few records has both inserts and deletes. recordsToTest = new ArrayList<>(); recordsToTest.addAll(insertRecords); recordsToTest.addAll(deleteRecords); - actualRecords = HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList(); + actualRecords = reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList(); List expectedList = new ArrayList<>(); expectedList.addAll(insertRecords); assertHoodieRecordListEquality(actualRecords, expectedList); @@ -385,7 +384,7 @@ public void testReducedByKeysForRLIRecords() throws IOException { recordsToTest.addAll(adjustedInserts); recordsToTest.addAll(deleteRecords); recordsToTest.addAll(deleteRecords.subList(0, 10)); - actualRecords = HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList(); + actualRecords = reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList(); expectedList = new ArrayList<>(); expectedList.addAll(adjustedInserts); expectedList.addAll(deleteRecords); @@ -396,7 +395,7 @@ public void testReducedByKeysForRLIRecords() throws IOException { recordsToTest.addAll(adjustedInserts); recordsToTest.addAll(adjustedInserts.subList(0, 5)); try { - HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList(); + reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList(); fail("Should not have reached here"); } catch (Exception e) { // expected. no-op @@ -446,25 +445,21 @@ private void generateRliRecordsAndAssert(List writeStatuses, Map { if (!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath()))) { // Fetch record keys for all - try { - String writeStatFileId = writeStatus.getFileId(); - if (!fileIdToFileNameMapping.isEmpty()) { - assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping.get(writeStatFileId)); - } + String writeStatFileId = writeStatus.getFileId(); + if (!fileIdToFileNameMapping.isEmpty()) { + assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping.get(writeStatFileId)); + } - Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), - writeConfig.getWritesFileIdEncoding(), commitTime, metaClient.getStorage()); - while (rliRecordsItr.hasNext()) { - HoodieRecord rliRecord = rliRecordsItr.next(); - String key = rliRecord.getRecordKey(); - if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { - actualDeletes.add(key); - } else { - actualInserts.add(key); - } + Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), + writeConfig.getWritesFileIdEncoding(), commitTime, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { + actualDeletes.add(key); + } else { + actualInserts.add(key); } - } catch (IOException e) { - throw new HoodieException("Should not have failed ", e); } } }); @@ -476,33 +471,29 @@ private void parseRecordKeysFromBaseFiles(List writeStatuses, Map { if (!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath()))) { // Fetch record keys for all - try { - String writeStatFileId = writeStatus.getFileId(); - if (!fileIdToFileNameMapping.isEmpty()) { - assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping.get(writeStatFileId)); - } + String writeStatFileId = writeStatus.getFileId(); + if (!fileIdToFileNameMapping.isEmpty()) { + assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping.get(writeStatFileId)); + } - String partition = writeStatus.getStat().getPartitionPath(); - String latestFileName = FSUtils.getFileNameFromPath(writeStatus.getStat().getPath()); + String partition = writeStatus.getStat().getPartitionPath(); + String latestFileName = FSUtils.getFileNameFromPath(writeStatus.getStat().getPath()); - Set recordStatusSet = new HashSet<>(); - recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.INSERT); - recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.UPDATE); - recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.DELETE); + Set recordStatusSet = new HashSet<>(); + recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.INSERT); + recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.UPDATE); + recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.DELETE); - Map> recordKeyMappings = BaseFileRecordParsingUtils.getRecordKeyStatuses(metaClient.getBasePath().toString(), partition, latestFileName, - writeStatus.getStat().getPrevBaseFile(), storage, recordStatusSet); - if (recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.INSERT)) { - actualInserts.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.INSERT)); - } - if (recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.UPDATE)) { - actualUpdates.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.UPDATE)); - } - if (recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.DELETE)) { - actualDeletes.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.DELETE)); - } - } catch (IOException e) { - throw new HoodieException("Should not have failed ", e); + Map> recordKeyMappings = BaseFileRecordParsingUtils.getRecordKeyStatuses(metaClient.getBasePath().toString(), partition, latestFileName, + writeStatus.getStat().getPrevBaseFile(), storage, recordStatusSet); + if (recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.INSERT)) { + actualInserts.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.INSERT)); + } + if (recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.UPDATE)) { + actualUpdates.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.UPDATE)); + } + if (recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.DELETE)) { + actualDeletes.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.DELETE)); } } }); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index 5693f1804e1ba..2e63457094db0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -243,6 +243,27 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { validateDataAndRecordIndices(hudiOpts, deleteDf) } + @Test + def testRLIWithEmptyPayload(): Unit = { + val hudiOpts = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name(), + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key -> "0") + val insertDf = doWriteAndValidateDataAndRecordIndex(hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + + val deleteDf = insertDf.limit(2) + deleteDf.cache() + deleteDf.write.format("hudi") + .options(hudiOpts) + .option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.EmptyHoodieRecordPayload") + .mode(SaveMode.Append) + .save(basePath) + val prevDf = mergedDfList.last + mergedDfList = mergedDfList :+ prevDf.except(deleteDf) + validateDataAndRecordIndices(hudiOpts, deleteDf) + } + @ParameterizedTest @EnumSource(classOf[HoodieTableType]) def testRLIForDeletesWithHoodieIsDeletedColumn(tableType: HoodieTableType): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index 5f633034e974c..adc28b606928e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -52,6 +52,7 @@ import org.scalatest.Assertions.{assertResult, assertThrows} import java.util.concurrent.Executors import scala.collection.JavaConverters +import scala.collection.immutable.Seq import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} @@ -706,6 +707,156 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { ) } + /** + * Test case to write with updates and validate secondary index with EVENT_TIME_ORDERING merge mode. + */ + @Test + def testSecondaryIndexWithEventTimeOrderingMerge(): Unit = { + val tableName = "test_secondary_index_with_event_time_ordering_merge" + + // Create the Hudi table + spark.sql( + s""" + |CREATE TABLE $tableName ( + | ts BIGINT, + | record_key_col STRING, + | not_record_key_col STRING, + | partition_key_col STRING + |) USING hudi + | OPTIONS ( + | primaryKey = 'record_key_col', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col', + | hoodie.enable.data.skipping = 'true' + | ) + | PARTITIONED BY (partition_key_col) + | LOCATION '$basePath' + """.stripMargin) + // by setting small file limit to 0, each insert will create a new file + // need to generate more file for non-partitioned table to test data skipping + // as the partitioned table will have only one file per partition + spark.sql("set hoodie.parquet.small.file.limit=0") + // Insert some data + spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')") + spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')") + // create secondary index + spark.sql(s"create index idx_not_record_key_col on $tableName (not_record_key_col)") + + // Update the secondary key column with higher ts value + spark.sql(s"update $tableName set not_record_key_col = 'xyz', ts = 3 where record_key_col = 'row1'") + // validate data and SI + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where record_key_col = 'row1'")( + Seq(3, "row1", "xyz", "p1") + ) + // validate the secondary index records themselves + checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), + Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false) + ) + + // update the secondary key column with higher ts value and to original secondary key value 'abc' + spark.sql(s"update $tableName set not_record_key_col = 'abc', ts = 4 where record_key_col = 'row1'") + // validate data and SI + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where record_key_col = 'row1'")( + Seq(4, "row1", "abc", "p1") + ) + // validate the secondary index records themselves + checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), + Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false) + ) + + // update the secondary key column with lower ts value, this should be ignored + spark.sql(s"update $tableName set not_record_key_col = 'xyz', ts = 0 where record_key_col = 'row1'") + // validate data and SI + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where record_key_col = 'row1'")( + Seq(4, "row1", "abc", "p1") + ) + // validate the secondary index records themselves + checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), + Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false) + ) + } + + /** + * Test case to write with updates and validate secondary index with CUSTOM merge mode using CDC payload. + */ + @Test + def testSecondaryIndexWithCustomMergeMode(): Unit = { + val tableName = "test_secondary_index_with_custom_merge" + + // Create the Hudi table + spark.sql( + s""" + |CREATE TABLE $tableName ( + | record_key_col BIGINT, + | Op STRING, + | replicadmstimestamp STRING, + | not_record_key_col STRING + |) USING hudi + | OPTIONS ( + | primaryKey = 'record_key_col', + | type = 'mor', + | preCombineField = 'replicadmstimestamp', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col', + | hoodie.enable.data.skipping = 'true', + | hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.AWSDmsAvroPayload', + | hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', + | hoodie.write.record.merge.mode = 'CUSTOM', + | hoodie.table.cdc.enabled = 'true', + | hoodie.table.cdc.supplemental.logging.mode = 'data_before_after' + | ) + | LOCATION '$basePath' + """.stripMargin) + if (HoodieSparkUtils.gteqSpark3_4) { + spark.sql("set spark.sql.defaultColumn.enabled=false") + } + // by setting small file limit to 0, each insert will create a new file + // need to generate more file for non-partitioned table to test data skipping + // as the partitioned table will have only one file per partition + spark.sql("set hoodie.parquet.small.file.limit=0") + // Insert some data + spark.sql( + s"""|INSERT INTO $tableName(record_key_col, Op, replicadmstimestamp, not_record_key_col) VALUES + | (1, 'I', '2023-06-14 15:46:06.953746', 'A'), + | (2, 'I', '2023-06-14 15:46:07.953746', 'B'), + | (3, 'I', '2023-06-14 15:46:08.953746', 'C'); + | """.stripMargin) + // create secondary index + spark.sql(s"create index idx_not_record_key_col on $tableName (not_record_key_col)") + + // validate data and SI + checkAnswer(s"select record_key_col, Op, replicadmstimestamp, not_record_key_col from $tableName")( + Seq(1, "I", "2023-06-14 15:46:06.953746", "A"), + Seq(2, "I", "2023-06-14 15:46:07.953746", "B"), + Seq(3, "I", "2023-06-14 15:46:08.953746", "C") + ) + checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq(s"A${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}1", false), + Seq(s"B${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}2", false), + Seq(s"C${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}3", false) + ) + + // Update the delete Op value for record_key_col = 3 + spark.sql(s"update $tableName set Op = 'D' where record_key_col = 3") + + // validate data and SI + checkAnswer(s"select record_key_col, Op, replicadmstimestamp, not_record_key_col from $tableName")( + Seq(1, "I", "2023-06-14 15:46:06.953746", "A"), + Seq(2, "I", "2023-06-14 15:46:07.953746", "B") + ) + checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq(s"A${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}1", false), + Seq(s"B${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}2", false) + ) + } + @Test def testSecondaryIndexWithMultipleUpdatesForSameRecord(): Unit = { var hudiOpts = commonOpts diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala index 43b03806261fe..fbc33f7299e0c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala @@ -231,62 +231,6 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase { } } - test("Test Secondary Index Creation Failure For Unsupported payloads") { - withTempDir { - tmp => { - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - - spark.sql( - s""" - |create table $tableName ( - | ts bigint, - | id string, - | rider string, - | driver string, - | fare int, - | city string, - | state string - |) using hudi - | options ( - | primaryKey ='id', - | type = 'mor', - | preCombineField = 'ts', - | hoodie.metadata.enable = 'true', - | hoodie.metadata.record.index.enable = 'true', - | hoodie.metadata.index.secondary.enable = 'true', - | hoodie.datasource.write.recordkey.field = 'id', - | hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' - | ) - | partitioned by(state) - | location '$basePath' - """.stripMargin) - spark.sql( - s""" - | insert into $tableName - | values - | (1695159649087, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19, 'san_francisco', 'california'), - | (1695091554787, 'e96c4396-3fad-413a-a942-4cb36106d720', 'rider-B', 'driver-M', 27, 'austin', 'texas') - | """.stripMargin - ) - - // validate record_index created successfully - val metadataDF = spark.sql(s"select key from hudi_metadata('$basePath') where type=5") - assert(metadataDF.count() == 2) - - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(basePath) - .setConf(HoodieTestUtils.getDefaultStorageConf) - .build() - assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index")) - // create secondary index throws error when trying to create on multiple fields at a time - checkException(sql = s"create index idx_city on $tableName (city)")( - "Secondary Index can only be enabled on table with OverwriteWithLatestAvroPayload payload class or Merge mode set to OVERWRITE_WITH_LATEST" - ) - } - } - } - test("Test Secondary Index With Updates Compaction Clustering Deletes") { withTempDir { tmp => val tableName = generateTableName