-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-8371][CHERRYPICK] Fix column stats index with MDT for a few scenarios #18314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c97664f
8d17a04
a519ca1
03a878b
a7d9668
5419e5d
099f023
74b57ec
d80a4b6
82913a2
b27240d
e4306a7
6abac32
07602e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -237,13 +237,13 @@ public List<MetadataPartitionType> getEnabledPartitionTypes() { | |
| protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, | ||
| Option<String> inflightInstantTimestamp) throws IOException { | ||
| HoodieTimer timer = HoodieTimer.start(); | ||
| List<MetadataPartitionType> partitionsToInit = new ArrayList<>(MetadataPartitionType.values().length); | ||
| List<MetadataPartitionType> metadataPartitionsToInit = new ArrayList<>(MetadataPartitionType.values().length); | ||
|
|
||
| try { | ||
| boolean exists = metadataTableExists(dataMetaClient); | ||
| if (!exists) { | ||
| // FILES partition is always required | ||
| partitionsToInit.add(MetadataPartitionType.FILES); | ||
| metadataPartitionsToInit.add(MetadataPartitionType.FILES); | ||
| } | ||
|
|
||
| // check if any of the enabled partition types needs to be initialized | ||
|
|
@@ -253,10 +253,10 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, | |
| LOG.info("Async metadata indexing disabled and following partitions already initialized: " + completedPartitions); | ||
| this.enabledPartitionTypes.stream() | ||
| .filter(p -> !completedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) | ||
| .forEach(partitionsToInit::add); | ||
| .forEach(metadataPartitionsToInit::add); | ||
| } | ||
|
|
||
| if (partitionsToInit.isEmpty()) { | ||
| if (metadataPartitionsToInit.isEmpty()) { | ||
| // No partitions left to initialize, since all the metadata enabled partitions are either initialized before | ||
| // or current in the process of initialization. | ||
| initMetadataReader(); | ||
|
|
@@ -266,9 +266,7 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, | |
| // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit | ||
| // Otherwise, we use the timestamp of the latest completed action. | ||
| String initializationTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); | ||
|
|
||
| // Initialize partitions for the first time using data from the files on the file system | ||
| if (!initializeFromFilesystem(initializationTime, partitionsToInit, inflightInstantTimestamp)) { | ||
| if (!initializeFromFilesystem(initializationTime, metadataPartitionsToInit, inflightInstantTimestamp)) { | ||
| LOG.error("Failed to initialize MDT from filesystem"); | ||
| return false; | ||
| } | ||
|
|
@@ -351,6 +349,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat | |
| return false; | ||
| } | ||
|
|
||
| Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient); | ||
| // FILES partition is always required and is initialized first | ||
| boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES); | ||
| if (!filesPartitionAvailable) { | ||
|
|
@@ -374,11 +373,11 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat | |
| // Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT | ||
| List<DirectoryInfo> partitionInfoList; | ||
| if (filesPartitionAvailable) { | ||
| partitionInfoList = listAllPartitionsFromMDT(initializationTime); | ||
| partitionInfoList = listAllPartitionsFromMDT(initializationTime, pendingDataInstants); | ||
| } else { | ||
| // if auto initialization is enabled, then we need to list all partitions from the file system | ||
| if (dataWriteConfig.getMetadataConfig().shouldAutoInitialize()) { | ||
| partitionInfoList = listAllPartitionsFromFilesystem(initializationTime); | ||
| partitionInfoList = listAllPartitionsFromFilesystem(initializationTime, pendingDataInstants); | ||
| } else { | ||
| // if auto initialization is disabled, we can return an empty list | ||
| partitionInfoList = Collections.emptyList(); | ||
|
|
@@ -424,8 +423,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat | |
| + " bootstrap failed for " + metadataMetaClient.getBasePath(), e); | ||
| } | ||
|
|
||
| LOG.info(String.format("Initializing %s index with %d mappings and %d file groups.", partitionType.name(), fileGroupCountAndRecordsPair.getKey(), | ||
| fileGroupCountAndRecordsPair.getValue().count())); | ||
| LOG.info("Initializing {} index with {} mappings", partitionType.name(), fileGroupCountAndRecordsPair.getKey()); | ||
| HoodieTimer partitionInitTimer = HoodieTimer.start(); | ||
|
|
||
| // Generate the file groups | ||
|
|
@@ -473,6 +471,7 @@ private String generateUniqueCommitInstantTime(String initializationTime) { | |
| } | ||
|
|
||
| private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) { | ||
| // during initialization, we need stats for base and log files. | ||
| HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( | ||
| engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); | ||
|
|
||
|
|
@@ -553,6 +552,16 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeFilesPartition(List<Di | |
| return Pair.of(fileGroupCount, allPartitionsRecord.union(fileListRecords)); | ||
| } | ||
|
|
||
| private Set<String> getPendingDataInstants(HoodieTableMetaClient dataMetaClient) { | ||
| // Initialize excluding the pending operations on the dataset | ||
| return dataMetaClient.getActiveTimeline() | ||
| .getInstantsAsStream().filter(i -> !i.isCompleted()) | ||
| // regular writers should not be blocked due to pending indexing action | ||
| .filter(i -> !HoodieTimeline.INDEXING_ACTION.equals(i.getAction())) | ||
| .map(HoodieInstant::getTimestamp) | ||
| .collect(Collectors.toSet()); | ||
| } | ||
|
|
||
| private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) { | ||
| // We can only initialize if there are no pending operations on the dataset | ||
| List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline() | ||
|
|
@@ -590,7 +599,7 @@ private HoodieTableMetaClient initializeMetaClient() throws IOException { | |
| * @param initializationTime Files which have a timestamp after this are neglected | ||
| * @return List consisting of {@code DirectoryInfo} for each partition found. | ||
| */ | ||
| private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializationTime) { | ||
| private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializationTime, Set<String> pendingDataInstants) { | ||
| List<SerializablePath> pathsToList = new LinkedList<>(); | ||
| pathsToList.add(new SerializablePath(new CachingPath(dataWriteConfig.getBasePath()))); | ||
|
|
||
|
|
@@ -609,7 +618,7 @@ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializatio | |
| List<DirectoryInfo> processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> { | ||
| FileSystem fs = path.get().getFileSystem(conf.get()); | ||
| String relativeDirPath = FSUtils.getRelativePartitionPath(serializableBasePath.get(), path.get()); | ||
| return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()), initializationTime); | ||
| return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()), initializationTime, pendingDataInstants); | ||
| }, numDirsToList); | ||
|
|
||
| pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size())); | ||
|
|
@@ -646,13 +655,14 @@ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializatio | |
| * @param initializationTime Files which have a timestamp after this are neglected | ||
| * @return List consisting of {@code DirectoryInfo} for each partition found. | ||
| */ | ||
| private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime) throws IOException { | ||
| private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime, Set<String> pendingDataInstants) throws IOException { | ||
| List<DirectoryInfo> dirinfoList = new LinkedList<>(); | ||
| List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream() | ||
| List<String> allAbsolutePartitionPaths = metadata.getAllPartitionPaths().stream() | ||
| .map(partitionPath -> dataWriteConfig.getBasePath() + "/" + partitionPath).collect(Collectors.toList()); | ||
| Map<String, FileStatus[]> partitionFileMap = metadata.getAllFilesInPartitions(allPartitionPaths); | ||
| Map<String, FileStatus[]> partitionFileMap = metadata.getAllFilesInPartitions(allAbsolutePartitionPaths); | ||
| for (Map.Entry<String, FileStatus[]> entry : partitionFileMap.entrySet()) { | ||
| dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), initializationTime)); | ||
| String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(dataWriteConfig.getBasePath()), new Path(entry.getKey())); | ||
| dirinfoList.add(new DirectoryInfo(relativeDirPath, entry.getValue(), initializationTime, pendingDataInstants, false)); | ||
| } | ||
| return dirinfoList; | ||
| } | ||
|
|
@@ -783,7 +793,8 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() { | |
| dataWriteConfig.isMetadataColumnStatsIndexEnabled(), | ||
| dataWriteConfig.getColumnStatsIndexParallelism(), | ||
| dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), | ||
| dataWriteConfig.getColumnsEnabledForBloomFilterIndex()); | ||
| dataWriteConfig.getColumnsEnabledForBloomFilterIndex(), | ||
| dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -936,7 +947,7 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { | |
|
|
||
| // Restore requires the existing pipelines to be shutdown. So we can safely scan the dataset to find the current | ||
| // list of files in the filesystem. | ||
| List<DirectoryInfo> dirInfoList = listAllPartitionsFromFilesystem(instantTime); | ||
| List<DirectoryInfo> dirInfoList = listAllPartitionsFromFilesystem(instantTime, Collections.emptySet()); | ||
| Map<String, DirectoryInfo> dirInfoMap = dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath, Function.identity())); | ||
| dirInfoList.clear(); | ||
|
|
||
|
|
@@ -1494,29 +1505,39 @@ static class DirectoryInfo implements Serializable { | |
| // Is this a hoodie partition | ||
| private boolean isHoodiePartition = false; | ||
|
|
||
| public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String maxInstantTime) { | ||
| public DirectoryInfo(String relativePath, FileStatus[] fileStatuses, String maxInstantTime, Set<String> pendingDataInstants) { | ||
| this(relativePath, fileStatuses, maxInstantTime, pendingDataInstants, true); | ||
| } | ||
|
|
||
| /** | ||
| * When files are directly fetched from Metadata table we do not need to validate HoodiePartitions. | ||
| */ | ||
| public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String maxInstantTime, Set<String> pendingDataInstants, | ||
| boolean validateHoodiePartitions) { | ||
| this.relativePath = relativePath; | ||
|
|
||
| // Pre-allocate with the maximum length possible | ||
| filenameToSizeMap = new HashMap<>(fileStatus.length); | ||
|
|
||
| // Presence of partition meta file implies this is a HUDI partition | ||
| // if input files are directly fetched from MDT, it may not contain the HoodiePartitionMetadata file. So, we can ignore the validation for isHoodiePartition. | ||
| isHoodiePartition = !validateHoodiePartitions || Arrays.stream(fileStatus).anyMatch(status -> status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)); | ||
| for (FileStatus status : fileStatus) { | ||
| if (status.isDirectory()) { | ||
| // Do not attempt to search for more subdirectories inside directories that are partitions | ||
| if (!isHoodiePartition && status.isDirectory()) { | ||
| // Ignore .hoodie directory as there cannot be any partitions inside it | ||
| if (!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { | ||
| this.subDirectories.add(status.getPath()); | ||
| } | ||
| } else if (status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { | ||
| // Presence of partition meta file implies this is a HUDI partition | ||
| this.isHoodiePartition = true; | ||
| } else if (FSUtils.isDataFile(status.getPath())) { | ||
| } else if (isHoodiePartition && FSUtils.isDataFile(status.getPath())) { | ||
| // Regular HUDI data file (base file or log file) | ||
| String dataFileCommitTime = FSUtils.getCommitTime(status.getPath().getName()); | ||
| // Limit the file listings to files which were created before the maxInstant time. | ||
| if (HoodieTimeline.compareTimestamps(dataFileCommitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)) { | ||
| // Limit the file listings to files which were created by successful commits before the maxInstant time. | ||
| if (!pendingDataInstants.contains(dataFileCommitTime) && HoodieTimeline.compareTimestamps(dataFileCommitTime, LESSER_THAN_OR_EQUALS, maxInstantTime)) { | ||
| filenameToSizeMap.put(status.getPath().getName(), status.getLen()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 When - Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying. |
||
| } | ||
| } | ||
|
|
||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hudi.testutils; | ||
|
|
||
| import org.apache.hudi.common.model.HoodieColumnRangeMetadata; | ||
| import org.apache.hudi.common.model.HoodieRecord; | ||
| import org.apache.hudi.common.model.HoodieRecordPayload; | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient; | ||
| import org.apache.hudi.common.table.TableSchemaResolver; | ||
| import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.exception.HoodieException; | ||
|
|
||
| import org.apache.avro.Schema; | ||
| import org.apache.avro.generic.IndexedRecord; | ||
| import org.apache.spark.sql.Row; | ||
| import org.apache.spark.sql.catalyst.expressions.GenericRow; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata; | ||
|
|
||
| /** | ||
| * Util methods used in tests to fetch col stats records for a log file. | ||
| */ | ||
| public class LogFileColStatsTestUtil { | ||
|
|
||
| public static Option<Row> getLogFileColumnRangeMetadata(String filePath, HoodieTableMetaClient datasetMetaClient, String latestCommitTime, | ||
| List<String> columnsToIndex, Option<Schema> writerSchemaOpt, | ||
| int maxBufferSize) throws IOException { | ||
| if (writerSchemaOpt.isPresent()) { | ||
| List<Schema.Field> fieldsToIndex = writerSchemaOpt.get().getFields().stream() | ||
| .filter(field -> columnsToIndex.contains(field.name())) | ||
| .collect(Collectors.toList()); | ||
| List<HoodieRecord> records = new ArrayList<>(); | ||
| HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() | ||
| .withFileSystem(datasetMetaClient.getFs()) | ||
| .withBasePath(datasetMetaClient.getBasePath()) | ||
| .withLogFilePaths(Collections.singletonList(filePath)) | ||
| .withBufferSize(maxBufferSize) | ||
| .withLatestInstantTime(latestCommitTime) | ||
| .withReaderSchema(writerSchemaOpt.get()) | ||
| .withLogRecordScannerCallback(records::add) | ||
| .build(); | ||
| scanner.scan(); | ||
| if (records.isEmpty()) { | ||
| return Option.empty(); | ||
| } | ||
| List<IndexedRecord> indexedRecords = new ArrayList<>(); | ||
| for (HoodieRecord hoodieRecord : records) { | ||
| Option<IndexedRecord> insertValue = ((HoodieRecordPayload) hoodieRecord.getData()).getInsertValue(writerSchemaOpt.get()); | ||
| if (insertValue.isPresent()) { | ||
| indexedRecords.add(insertValue.get()); | ||
| } | ||
| } | ||
| Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataMap = | ||
| collectColumnRangeMetadata(indexedRecords, fieldsToIndex, filePath); | ||
| List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>(columnRangeMetadataMap.values()); | ||
| return Option.of(getColStatsEntry(filePath, columnRangeMetadataList)); | ||
| } else { | ||
| throw new HoodieException("Writer schema needs to be set"); | ||
| } | ||
| } | ||
|
|
||
| private static Row getColStatsEntry(String logFilePath, List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList) { | ||
| Collections.sort(columnRangeMetadataList, (o1, o2) -> o1.getColumnName().compareTo(o2.getColumnName())); | ||
| Object[] values = new Object[(columnRangeMetadataList.size() * 3) + 2]; | ||
| values[0] = logFilePath.substring(logFilePath.lastIndexOf("/") + 1); | ||
| values[1] = columnRangeMetadataList.get(0).getValueCount(); | ||
| int counter = 2; | ||
| for (HoodieColumnRangeMetadata columnRangeMetadata: columnRangeMetadataList) { | ||
| values[counter++] = columnRangeMetadata.getValueCount(); | ||
| values[counter++] = columnRangeMetadata.getMinValue(); | ||
| values[counter++] = columnRangeMetadata.getMaxValue(); | ||
| } | ||
| return new GenericRow(values); | ||
| } | ||
|
|
||
| public static Option<Schema> getSchemaForTable(HoodieTableMetaClient metaClient) throws Exception { | ||
| TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); | ||
| return Option.of(schemaResolver.getTableAvroSchema()); | ||
| } | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -211,6 +211,14 @@ public static String getFileId(String fullFileName) { | |
| return fullFileName.split("_", 2)[0]; | ||
| } | ||
|
|
||
| /** | ||
| * @param filePath | ||
| * @returns the filename from the given path. Path could be the absolute path or just partition path and file name. | ||
| */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: Javadoc tag should be - Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying. |
||
| public static String getFileNameFromPath(String filePath) { | ||
| return filePath.substring(filePath.lastIndexOf("/") + 1); | ||
| } | ||
|
|
||
| /** | ||
| * Gets all partition paths assuming date partitioning (year, month, day) three levels down. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,7 +79,7 @@ public <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws Exception | |
|
|
||
| @Override | ||
| protected void processNextDeletedRecord(DeleteRecord deleteRecord) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: the comment - Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying. |
||
| throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config"); | ||
| // no op | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 Changing this from throwing to no-op changes the contract for all callers of - Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying. |
||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 nit: the parameter name
fileStatus(singular) differs from the overload above which usesfileStatuses(plural) — could you make these consistent?- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.