Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,13 @@ public List<MetadataPartitionType> getEnabledPartitionTypes() {
protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
List<MetadataPartitionType> partitionsToInit = new ArrayList<>(MetadataPartitionType.values().length);
List<MetadataPartitionType> metadataPartitionsToInit = new ArrayList<>(MetadataPartitionType.values().length);

try {
boolean exists = metadataTableExists(dataMetaClient);
if (!exists) {
// FILES partition is always required
partitionsToInit.add(MetadataPartitionType.FILES);
metadataPartitionsToInit.add(MetadataPartitionType.FILES);
}

// check if any of the enabled partition types needs to be initialized
Expand All @@ -253,10 +253,10 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
LOG.info("Async metadata indexing disabled and following partitions already initialized: " + completedPartitions);
this.enabledPartitionTypes.stream()
.filter(p -> !completedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p))
.forEach(partitionsToInit::add);
.forEach(metadataPartitionsToInit::add);
}

if (partitionsToInit.isEmpty()) {
if (metadataPartitionsToInit.isEmpty()) {
// No partitions left to initialize, since all the metadata enabled partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
Expand All @@ -266,9 +266,7 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);

// Initialize partitions for the first time using data from the files on the file system
if (!initializeFromFilesystem(initializationTime, partitionsToInit, inflightInstantTimestamp)) {
if (!initializeFromFilesystem(initializationTime, metadataPartitionsToInit, inflightInstantTimestamp)) {
LOG.error("Failed to initialize MDT from filesystem");
return false;
}
Expand Down Expand Up @@ -351,6 +349,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
return false;
}

Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
// FILES partition is always required and is initialized first
boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
if (!filesPartitionAvailable) {
Expand All @@ -374,11 +373,11 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT
List<DirectoryInfo> partitionInfoList;
if (filesPartitionAvailable) {
partitionInfoList = listAllPartitionsFromMDT(initializationTime);
partitionInfoList = listAllPartitionsFromMDT(initializationTime, pendingDataInstants);
} else {
// if auto initialization is enabled, then we need to list all partitions from the file system
if (dataWriteConfig.getMetadataConfig().shouldAutoInitialize()) {
partitionInfoList = listAllPartitionsFromFilesystem(initializationTime);
partitionInfoList = listAllPartitionsFromFilesystem(initializationTime, pendingDataInstants);
} else {
// if auto initialization is disabled, we can return an empty list
partitionInfoList = Collections.emptyList();
Expand Down Expand Up @@ -424,8 +423,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
+ " bootstrap failed for " + metadataMetaClient.getBasePath(), e);
}

LOG.info(String.format("Initializing %s index with %d mappings and %d file groups.", partitionType.name(), fileGroupCountAndRecordsPair.getKey(),
fileGroupCountAndRecordsPair.getValue().count()));
LOG.info("Initializing {} index with {} mappings", partitionType.name(), fileGroupCountAndRecordsPair.getKey());
HoodieTimer partitionInitTimer = HoodieTimer.start();

// Generate the file groups
Expand Down Expand Up @@ -473,6 +471,7 @@ private String generateUniqueCommitInstantTime(String initializationTime) {
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
// during initialization, we need stats for base and log files.
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());

Expand Down Expand Up @@ -553,6 +552,16 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeFilesPartition(List<Di
return Pair.of(fileGroupCount, allPartitionsRecord.union(fileListRecords));
}

private Set<String> getPendingDataInstants(HoodieTableMetaClient dataMetaClient) {
// Initialize excluding the pending operations on the dataset
return dataMetaClient.getActiveTimeline()
.getInstantsAsStream().filter(i -> !i.isCompleted())
// regular writers should not be blocked due to pending indexing action
.filter(i -> !HoodieTimeline.INDEXING_ACTION.equals(i.getAction()))
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toSet());
}

private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) {
// We can only initialize if there are no pending operations on the dataset
List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline()
Expand Down Expand Up @@ -590,7 +599,7 @@ private HoodieTableMetaClient initializeMetaClient() throws IOException {
* @param initializationTime Files which have a timestamp after this are neglected
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializationTime) {
private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializationTime, Set<String> pendingDataInstants) {
List<SerializablePath> pathsToList = new LinkedList<>();
pathsToList.add(new SerializablePath(new CachingPath(dataWriteConfig.getBasePath())));

Expand All @@ -609,7 +618,7 @@ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializatio
List<DirectoryInfo> processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
FileSystem fs = path.get().getFileSystem(conf.get());
String relativeDirPath = FSUtils.getRelativePartitionPath(serializableBasePath.get(), path.get());
return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()), initializationTime);
return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()), initializationTime, pendingDataInstants);
}, numDirsToList);

pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size()));
Expand Down Expand Up @@ -646,13 +655,14 @@ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializatio
* @param initializationTime Files which have a timestamp after this are neglected
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime) throws IOException {
private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime, Set<String> pendingDataInstants) throws IOException {
List<DirectoryInfo> dirinfoList = new LinkedList<>();
List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
List<String> allAbsolutePartitionPaths = metadata.getAllPartitionPaths().stream()
.map(partitionPath -> dataWriteConfig.getBasePath() + "/" + partitionPath).collect(Collectors.toList());
Map<String, FileStatus[]> partitionFileMap = metadata.getAllFilesInPartitions(allPartitionPaths);
Map<String, FileStatus[]> partitionFileMap = metadata.getAllFilesInPartitions(allAbsolutePartitionPaths);
for (Map.Entry<String, FileStatus[]> entry : partitionFileMap.entrySet()) {
dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), initializationTime));
String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(dataWriteConfig.getBasePath()), new Path(entry.getKey()));
dirinfoList.add(new DirectoryInfo(relativeDirPath, entry.getValue(), initializationTime, pendingDataInstants, false));
}
return dirinfoList;
}
Expand Down Expand Up @@ -783,7 +793,8 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getColumnsEnabledForBloomFilterIndex());
dataWriteConfig.getColumnsEnabledForBloomFilterIndex(),
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize());
}

/**
Expand Down Expand Up @@ -936,7 +947,7 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {

// Restore requires the existing pipelines to be shutdown. So we can safely scan the dataset to find the current
// list of files in the filesystem.
List<DirectoryInfo> dirInfoList = listAllPartitionsFromFilesystem(instantTime);
List<DirectoryInfo> dirInfoList = listAllPartitionsFromFilesystem(instantTime, Collections.emptySet());
Map<String, DirectoryInfo> dirInfoMap = dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath, Function.identity()));
dirInfoList.clear();

Expand Down Expand Up @@ -1494,29 +1505,39 @@ static class DirectoryInfo implements Serializable {
// Is this a hoodie partition
private boolean isHoodiePartition = false;

public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String maxInstantTime) {
public DirectoryInfo(String relativePath, FileStatus[] fileStatuses, String maxInstantTime, Set<String> pendingDataInstants) {
this(relativePath, fileStatuses, maxInstantTime, pendingDataInstants, true);
}

/**
* When files are directly fetched from Metadata table we do not need to validate HoodiePartitions.
*/
public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String maxInstantTime, Set<String> pendingDataInstants,
boolean validateHoodiePartitions) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the parameter name fileStatus (singular) differs from the overload above which uses fileStatuses (plural) — could you make these consistent?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

this.relativePath = relativePath;

// Pre-allocate with the maximum length possible
filenameToSizeMap = new HashMap<>(fileStatus.length);

// Presence of partition meta file implies this is a HUDI partition
// if input files are directly fetched from MDT, it may not contain the HoodiePartitionMetadata file. So, we can ignore the validation for isHoodiePartition.
isHoodiePartition = !validateHoodiePartitions || Arrays.stream(fileStatus).anyMatch(status -> status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
for (FileStatus status : fileStatus) {
if (status.isDirectory()) {
// Do not attempt to search for more subdirectories inside directories that are partitions
if (!isHoodiePartition && status.isDirectory()) {
// Ignore .hoodie directory as there cannot be any partitions inside it
if (!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
this.subDirectories.add(status.getPath());
}
} else if (status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
// Presence of partition meta file implies this is a HUDI partition
this.isHoodiePartition = true;
} else if (FSUtils.isDataFile(status.getPath())) {
} else if (isHoodiePartition && FSUtils.isDataFile(status.getPath())) {
// Regular HUDI data file (base file or log file)
String dataFileCommitTime = FSUtils.getCommitTime(status.getPath().getName());
// Limit the file listings to files which were created before the maxInstant time.
if (HoodieTimeline.compareTimestamps(dataFileCommitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)) {
// Limit the file listings to files which were created by successful commits before the maxInstant time.
if (!pendingDataInstants.contains(dataFileCommitTime) && HoodieTimeline.compareTimestamps(dataFileCommitTime, LESSER_THAN_OR_EQUALS, maxInstantTime)) {
filenameToSizeMap.put(status.getPath().getName(), status.getLen());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 When isHoodiePartition is determined before the loop and is true, the code skips adding subdirectories entirely (!isHoodiePartition && status.isDirectory()). Is it intentional to not recurse into subdirectories within a Hudi partition? Previously, directories were added to subDirectories regardless of isHoodiePartition status since the partition metafile check happened mid-loop.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

}
}

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.testutils;

import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;

/**
* Util methods used in tests to fetch col stats records for a log file.
*/
public class LogFileColStatsTestUtil {

public static Option<Row> getLogFileColumnRangeMetadata(String filePath, HoodieTableMetaClient datasetMetaClient, String latestCommitTime,
List<String> columnsToIndex, Option<Schema> writerSchemaOpt,
int maxBufferSize) throws IOException {
if (writerSchemaOpt.isPresent()) {
List<Schema.Field> fieldsToIndex = writerSchemaOpt.get().getFields().stream()
.filter(field -> columnsToIndex.contains(field.name()))
.collect(Collectors.toList());
List<HoodieRecord> records = new ArrayList<>();
HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder()
.withFileSystem(datasetMetaClient.getFs())
.withBasePath(datasetMetaClient.getBasePath())
.withLogFilePaths(Collections.singletonList(filePath))
.withBufferSize(maxBufferSize)
.withLatestInstantTime(latestCommitTime)
.withReaderSchema(writerSchemaOpt.get())
.withLogRecordScannerCallback(records::add)
.build();
scanner.scan();
if (records.isEmpty()) {
return Option.empty();
}
List<IndexedRecord> indexedRecords = new ArrayList<>();
for (HoodieRecord hoodieRecord : records) {
Option<IndexedRecord> insertValue = ((HoodieRecordPayload) hoodieRecord.getData()).getInsertValue(writerSchemaOpt.get());
if (insertValue.isPresent()) {
indexedRecords.add(insertValue.get());
}
}
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataMap =
collectColumnRangeMetadata(indexedRecords, fieldsToIndex, filePath);
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>(columnRangeMetadataMap.values());
return Option.of(getColStatsEntry(filePath, columnRangeMetadataList));
} else {
throw new HoodieException("Writer schema needs to be set");
}
}

private static Row getColStatsEntry(String logFilePath, List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList) {
Collections.sort(columnRangeMetadataList, (o1, o2) -> o1.getColumnName().compareTo(o2.getColumnName()));
Object[] values = new Object[(columnRangeMetadataList.size() * 3) + 2];
values[0] = logFilePath.substring(logFilePath.lastIndexOf("/") + 1);
values[1] = columnRangeMetadataList.get(0).getValueCount();
int counter = 2;
for (HoodieColumnRangeMetadata columnRangeMetadata: columnRangeMetadataList) {
values[counter++] = columnRangeMetadata.getValueCount();
values[counter++] = columnRangeMetadata.getMinValue();
values[counter++] = columnRangeMetadata.getMaxValue();
}
return new GenericRow(values);
}

public static Option<Schema> getSchemaForTable(HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
return Option.of(schemaResolver.getTableAvroSchema());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ public static String getFileId(String fullFileName) {
return fullFileName.split("_", 2)[0];
}

/**
* @param filePath
* @returns the filename from the given path. Path could be the absolute path or just partition path and file name.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: Javadoc tag should be @return (not @returns) to match standard conventions.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

public static String getFileNameFromPath(String filePath) {
return filePath.substring(filePath.lastIndexOf("/") + 1);
}

/**
* Gets all partition paths assuming date partitioning (year, month, day) three levels down.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws Exception

@Override
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the comment // no op is too vague — consider explaining why delete records should be ignored in this scan mode, e.g., // Ignore delete records; unmerged scan only reads inserts/updates.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config");
// no op
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Changing this from throwing to no-op changes the contract for all callers of HoodieUnMergedLogRecordScanner, not just the column stats path. Previously, this exception served as a safety net to detect unexpected delete records. Could you instead handle delete records specifically in the column stats scanner callback, or add a builder option like withIgnoreDeleteRecords(true) to make this opt-in?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

}

/**
Expand Down
Loading
Loading