diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java index 3242a886a3f07..85af6a2d3e5be 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java @@ -127,6 +127,15 @@ public class HoodieCleanConfig extends HoodieConfig { .withDocumentation("When " + KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, " + "the minimum number of file slices to retain in each file group, during cleaning."); + public static final ConfigProperty MAX_COMMITS_TO_CLEAN = ConfigProperty + .key("hoodie.cleaner.max.commits.clean") + .defaultValue("200") + .withDocumentation("Max no. of instants to clean in a single clean call. This config is useful when clean is disabled or " + + "blocked from running for a while and enabled later. During that time, it can accumulate many commits to clean " + + "and run into memory issues. This config puts a cap on the no. of commits that can be cleaned. This is mainly applicable " + + "for incremental clean policy. To handle corner cases while reading we skip 1 commit before earliestCommitToRetain, " + + "so +1 will be added to maxInstantsToClean value. Set to -1 to disable."); + public static final ConfigProperty CLEAN_TRIGGER_STRATEGY = ConfigProperty .key("hoodie.clean.trigger.strategy") .defaultValue(CleaningTriggerStrategy.NUM_COMMITS.name()) @@ -379,6 +388,11 @@ public HoodieCleanConfig.Builder retainCommits(int commitsRetained) { return this; } + public HoodieCleanConfig.Builder maxCommitsToClean(int maxCommitsToClean) { + cleanConfig.setValue(MAX_COMMITS_TO_CLEAN, String.valueOf(maxCommitsToClean)); + return this; + } + public HoodieCleanConfig.Builder cleanerNumHoursRetained(int cleanerHoursRetained) { cleanConfig.setValue(CLEANER_HOURS_RETAINED, String.valueOf(cleanerHoursRetained)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d68e3e6a94a72..89fb0c9b2488d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1749,6 +1749,10 @@ public int getCleanerHoursRetained() { return getInt(HoodieCleanConfig.CLEANER_HOURS_RETAINED); } + public int getMaxCommitsToClean() { + return getInt(HoodieCleanConfig.MAX_COMMITS_TO_CLEAN); + } + public boolean isCleanOptimizationWithLocalEngineEnabled() { return getBoolean(HoodieCleanConfig.CLEAN_OPTIMIZE_USING_LOCAL_ENGINE_CONTEXT); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 772f0a236219a..8009be1b98ce4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -69,6 +69,7 @@ import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; +import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; /** @@ -597,10 +598,66 @@ public Option getEarliestCommitToRetain() { Instant.now(), config.getCleanerHoursRetained(), hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone()); + log.info("EarliestCommitToRetain is {} after CleanerUtils.getEarliestCommitToRetain", + earliestCommitToRetain.map(HoodieInstant::requestedTime).orElse("null")); + earliestCommitToRetain = getEarliestCommitToRetainConsideringSafety(earliestCommitToRetain); + log.info("EarliestCommitToRetain is {} after getEarliestCommitToRetainConsideringSafety", + earliestCommitToRetain.map(HoodieInstant::requestedTime).orElse("null")); } return earliestCommitToRetain; } + /** + * Caps the number of instants to clean in a single clean call and ensures ordering of + * earliest commit to retain across cleans (ECTR must be monotonically increasing). + */ + private Option getEarliestCommitToRetainConsideringSafety(Option earliestCommitToRetain) { + if (!earliestCommitToRetain.isPresent()) { + return earliestCommitToRetain; + } + try { + Option lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant(); + String previousEarliestCommitToRetain; + if (lastClean.isPresent()) { + HoodieCleanMetadata cleanMetadata = hoodieTable.getActiveTimeline().readCleanMetadata(lastClean.get()); + if (cleanMetadata == null || cleanMetadata.getEarliestCommitToRetain() == null) { + log.warn("Empty clean metadata found for {}", lastClean.get()); + previousEarliestCommitToRetain = HoodieTimeline.INIT_INSTANT_TS; + } else { + previousEarliestCommitToRetain = cleanMetadata.getEarliestCommitToRetain(); + if (!StringUtils.isNullOrEmpty(previousEarliestCommitToRetain)) { + if (compareTimestamps(earliestCommitToRetain.get().requestedTime(), + LESSER_THAN_OR_EQUALS, + previousEarliestCommitToRetain)) { + log.info("earliestCommitToRetain {} is less than or equal to previousEarliestCommitToRetain {}.", + earliestCommitToRetain.get().requestedTime(), previousEarliestCommitToRetain); + return Option.empty(); + } + } + } + } else { + previousEarliestCommitToRetain = HoodieTimeline.INIT_INSTANT_TS; + } + if (config.getMaxCommitsToClean() == -1) { + return earliestCommitToRetain; + } + int maxInstantsToClean = config.getMaxCommitsToClean() + 1; + + Option maxInstant = getCommitTimeline() + .findInstantsAfter(previousEarliestCommitToRetain, maxInstantsToClean + 1) + .lastInstant(); + if (!maxInstant.isPresent()) { + return Option.empty(); + } + return compareTimestamps( + earliestCommitToRetain.get().requestedTime(), + LESSER_THAN, + maxInstant.get().requestedTime()) ? earliestCommitToRetain : maxInstant; + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + /** * Returns the last completed commit timestamp before clean. */