Skip to content

Commit

Permalink
[HUDI-9030] Compatibility fixes for table version 6 with 1.0 (#12888)
Browse files Browse the repository at this point in the history
  • Loading branch information
lokeshj1703 authored Mar 5, 2025
1 parent e0af47f commit 5ea9d4c
Show file tree
Hide file tree
Showing 24 changed files with 364 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public HoodieData<WriteStatus> compact(

boolean useFileGroupReaderBasedCompaction = context.supportsFileGroupReader() // the engine needs to support fg reader first
&& !metaClient.isMetadataTable()
&& config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
&& HoodieReaderConfig.isFileGroupReaderEnabled(metaClient.getTableConfig().getTableVersion(), config)
&& operationType == WriteOperationType.COMPACT
&& !hasBootstrapFile(operations) // bootstrap file read for fg reader is not ready
&& config.populateMetaFields(); // Virtual key support by fg reader is not ready
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,9 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.CollectionUtils.nonEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
Expand Down Expand Up @@ -105,14 +102,6 @@ public Option<HoodieCompactionPlan> execute() {
return Option.empty();
}
}
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstantsAsStream()
.filter(instant -> compareTimestamps(instant.requestedTime(), GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
}

HoodieCompactionPlan plan = scheduleCompaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,14 +52,18 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.table.timeline.MetadataConversionUtils.getHoodieCommitMetadata;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;

/**
Expand Down Expand Up @@ -164,7 +174,9 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
// We do not know fileIds for inserts (first inserts are either log files or base files),
// delete all files for the corresponding failed commit, if present (same as COW)
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, filesToDelete.get()));

if (metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) {
hoodieRollbackRequests.addAll(getRollbackRequestToAppendForVersionSix(partitionPath, instantToRollback, commitMetadataOptional.get(), table));
}
break;
default:
throw new HoodieRollbackException("Unknown listing type, during rollback of " + instantToRollback);
Expand All @@ -181,6 +193,62 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
}
}

public static List<HoodieRollbackRequest> getRollbackRequestToAppendForVersionSix(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) {
List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>();
checkArgument(table.version().lesserThan(HoodieTableVersion.EIGHT));
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));

// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the base and not the requested, hence get the
// baseCommit always by listing the file slice
// With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
Map<String, FileSlice> latestFileSlices = table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.requestedTime(), true)
.collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));

List<HoodieWriteStat> hoodieWriteStats = Option.ofNullable(commitMetadata.getPartitionToWriteStats().get(partitionPath)).orElse(Collections.emptyList());
hoodieWriteStats = hoodieWriteStats.stream()
.filter(writeStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId());

if (!validForRollback) {
return false;
}

FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());

// For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back
checkArgument(
compareTimestamps(latestFileSlice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, rollbackInstant.requestedTime()),
"Log-file base-instant could not be less than the instant being rolled back");

// Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
// w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less
// than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up
// in a different branch of the flow.
return compareTimestamps(latestFileSlice.getBaseInstantTime(), LESSER_THAN, rollbackInstant.requestedTime());
})
.collect(Collectors.toList());

for (HoodieWriteStat writeStat : hoodieWriteStats.stream().filter(
hoodieWriteStat -> !StringUtils.isNullOrEmpty(hoodieWriteStat.getFileId())).collect(Collectors.toList())) {
FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
String fileId = writeStat.getFileId();
String latestBaseInstant = latestFileSlice.getBaseInstantTime();
Path fullLogFilePath = HadoopFSUtils.constructAbsolutePathInHadoopPath(table.getConfig().getBasePath(), writeStat.getPath());
Map<String, Long> logFilesWithBlocksToRollback = Collections.singletonMap(
fullLogFilePath.toString(), writeStat.getTotalWriteBytes() > 0 ? writeStat.getTotalWriteBytes() : 1L);
hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant,
Collections.emptyList(), logFilesWithBlocksToRollback));
}
return hoodieRollbackRequests;
}

private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
String baseFileExtension,
String partitionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
switch (type) {
case MERGE:
case CREATE:
return createRollbackRequestForCreateAndMerge(fileId, partitionPath, filePath, instantToRollback);
return createRollbackRequestForCreateAndMerge(fileId, partitionPath, filePath, instantToRollback, filePathStr);
case APPEND:
return createRollbackRequestForAppend(fileId, partitionPath, filePath, instantToRollback, filePathStr);
default:
Expand All @@ -106,10 +106,8 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
}
}

protected HoodieRollbackRequest createRollbackRequestForCreateAndMerge(String fileId,
String partitionPath,
StoragePath filePath,
HoodieInstant instantToRollback) {
protected HoodieRollbackRequest createRollbackRequestForCreateAndMerge(String fileId, String partitionPath, StoragePath filePath,
HoodieInstant instantToRollback, String filePathToRollback) {
if (table.version().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
return new HoodieRollbackRequest(partitionPath, fileId, instantToRollback.requestedTime(),
Collections.singletonList(filePath.toString()), Collections.emptyMap());
Expand All @@ -120,7 +118,7 @@ protected HoodieRollbackRequest createRollbackRequestForCreateAndMerge(String fi
fileId = baseFileToDelete.getFileId();
baseInstantTime = baseFileToDelete.getCommitTime();
} else if (FSUtils.isLogFile(filePath)) {
throw new HoodieRollbackException("Log files should have only APPEND as IOTypes " + filePath);
return createRollbackRequestForAppend(fileId, partitionPath, filePath, instantToRollback, filePathToRollback);
}
Objects.requireNonNull(fileId, "Cannot find valid fileId from path: " + filePath);
Objects.requireNonNull(baseInstantTime, "Cannot find valid base instant from path: " + filePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,
Schema tableSchemaWithMetaFields) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream()
.map(ClusteringOperation::create).collect(Collectors.toList());
boolean canUseFileGroupReaderBasedClustering = getWriteConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
boolean canUseFileGroupReaderBasedClustering = HoodieReaderConfig.isFileGroupReaderEnabled(getHoodieTable().getMetaClient().getTableConfig().getTableVersion(), getWriteConfig())
&& getWriteConfig().getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
&& clusteringOps.stream().allMatch(slice -> StringUtils.isNullOrEmpty(slice.getBootstrapFilePath()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@

package org.apache.hudi.common.config;

import org.apache.hudi.common.table.HoodieTableVersion;

import org.apache.hadoop.conf.Configuration;

import javax.annotation.concurrent.Immutable;

import java.util.Map;

/**
* Configurations for reading a file group
*/
Expand Down Expand Up @@ -89,4 +95,18 @@ public class HoodieReaderConfig extends HoodieConfig {
"hoodie.write.record.merge.custom.implementation.classes";
public static final String RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY =
"hoodie.datasource.write.record.merger.impls";

public static boolean isFileGroupReaderEnabled(HoodieTableVersion tableVersion, HoodieConfig config) {
return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) && config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED);
}

public static boolean isFileGroupReaderEnabled(HoodieTableVersion tableVersion, Map<String, String> parameters) {
return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
&& Boolean.parseBoolean(parameters.getOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString()));
}

public static boolean isFileGroupReaderEnabled(HoodieTableVersion tableVersion, Configuration conf) {
return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
&& conf.getBoolean(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ public class HoodieTableConfig extends HoodieConfig {
DATE_TIME_PARSER
);

private static final Set<String> CONFIGS_REQUIRED_FOR_OLDER_VERSIONED_TABLES = new HashSet<>(Arrays.asList(
KEY_GENERATOR_CLASS_NAME.key(),
KEY_GENERATOR_TYPE.key()
));

public static final ConfigProperty<String> TABLE_CHECKSUM = ConfigProperty
.key("hoodie.table.checksum")
.noDefaultValue()
Expand Down Expand Up @@ -632,6 +637,7 @@ static boolean validateConfigVersion(ConfigProperty<?> configProperty, HoodieTab
// validate that the table version is greater than or equal to the config version
HoodieTableVersion firstVersion = HoodieTableVersion.fromReleaseVersion(configProperty.getSinceVersion().get());
boolean valid = tableVersion.greaterThan(firstVersion) || tableVersion.equals(firstVersion);
valid = valid || CONFIGS_REQUIRED_FOR_OLDER_VERSIONED_TABLES.contains(configProperty.key());
if (!valid) {
LOG.warn("Table version {} is lower than or equal to config's first version {}. Config {} will be ignored.",
tableVersion, firstVersion, configProperty.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public abstract class AbstractHoodieLogRecordScanner {
private final boolean enableOptimizedLogBlocksScan;
// table version for compatibility
private final HoodieTableVersion tableVersion;
// Allows to consider inflight instants while merging log records
protected boolean allowInflightInstants = false;
// for pending log block check with table version before 8
private HoodieTimeline commitsTimeline = null;
private HoodieTimeline completedInstantsTimeline = null;
Expand Down Expand Up @@ -281,7 +283,7 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
if (logBlock.isDataOrDeleteBlock()) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) && !allowInflightInstants) {
if (!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
|| getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
Expand Down Expand Up @@ -479,7 +481,7 @@ && compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THA
continue;
}
if (logBlock.getBlockType() != COMMAND_BLOCK) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) && !allowInflightInstants) {
if (!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
|| getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ protected HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, L
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
Option<HoodieTableMetaClient> hoodieTableMetaClientOption,
boolean allowInflightInstants) {
super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize,
instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
hoodieTableMetaClientOption);
Expand All @@ -113,6 +114,7 @@ protected HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, L
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, new DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled, getClass().getSimpleName());
this.scannedPrefixes = new HashSet<>();
this.allowInflightInstants = allowInflightInstants;
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
}
Expand Down Expand Up @@ -342,6 +344,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
// By default, we're doing a full-scan
private boolean forceFullScan = true;
private boolean enableOptimizedLogBlocksScan = false;
protected boolean allowInflightInstants = false;
private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE;
protected HoodieTableMetaClient hoodieTableMetaClient;

Expand Down Expand Up @@ -466,6 +469,11 @@ public Builder withTableMetaClient(HoodieTableMetaClient hoodieTableMetaClient)
return this;
}

public Builder withAllowInflightInstants(boolean allowInflightInstants) {
this.allowInflightInstants = allowInflightInstants;
return this;
}

@Override
public HoodieMergedLogRecordScanner build() {
if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFilePaths)) {
Expand All @@ -479,7 +487,7 @@ public HoodieMergedLogRecordScanner build() {
bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
Option.ofNullable(hoodieTableMetaClient));
Option.ofNullable(hoodieTableMetaClient), allowInflightInstants);
}
}
}
Loading

0 comments on commit 5ea9d4c

Please sign in to comment.