Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ public class HoodieCleanConfig extends HoodieConfig {
.withDocumentation("When " + KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
+ "the minimum number of file slices to retain in each file group, during cleaning.");

public static final ConfigProperty<String> MAX_COMMITS_TO_CLEAN = ConfigProperty
.key("hoodie.cleaner.max.commits.clean")
.defaultValue("200")
.withDocumentation("Max no. of instants to clean in a single clean call. This config is useful when clean is disabled or "
+ "blocked from running for a while and enabled later. During that time, it can accumulate many commits to clean "
+ "and run into memory issues. This config puts a cap on the no. of commits that can be cleaned. This is mainly applicable "
+ "for incremental clean policy. To handle corner cases while reading we skip 1 commit before earliestCommitToRetain, "
+ "so +1 will be added to maxInstantsToClean value. Set to -1 to disable.");

public static final ConfigProperty<String> CLEAN_TRIGGER_STRATEGY = ConfigProperty
.key("hoodie.clean.trigger.strategy")
.defaultValue(CleaningTriggerStrategy.NUM_COMMITS.name())
Expand Down Expand Up @@ -379,6 +388,11 @@ public HoodieCleanConfig.Builder retainCommits(int commitsRetained) {
return this;
}

public HoodieCleanConfig.Builder maxCommitsToClean(int maxCommitsToClean) {
cleanConfig.setValue(MAX_COMMITS_TO_CLEAN, String.valueOf(maxCommitsToClean));
return this;
}

public HoodieCleanConfig.Builder cleanerNumHoursRetained(int cleanerHoursRetained) {
cleanConfig.setValue(CLEANER_HOURS_RETAINED, String.valueOf(cleanerHoursRetained));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1749,6 +1749,10 @@ public int getCleanerHoursRetained() {
return getInt(HoodieCleanConfig.CLEANER_HOURS_RETAINED);
}

public int getMaxCommitsToClean() {
return getInt(HoodieCleanConfig.MAX_COMMITS_TO_CLEAN);
}

public boolean isCleanOptimizationWithLocalEngineEnabled() {
return getBoolean(HoodieCleanConfig.CLEAN_OPTIMIZE_USING_LOCAL_ENGINE_CONTEXT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;

/**
Expand Down Expand Up @@ -597,10 +598,66 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {
Instant.now(),
config.getCleanerHoursRetained(),
hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone());
log.info("EarliestCommitToRetain is {} after CleanerUtils.getEarliestCommitToRetain",
earliestCommitToRetain.map(HoodieInstant::requestedTime).orElse("null"));
earliestCommitToRetain = getEarliestCommitToRetainConsideringSafety(earliestCommitToRetain);
log.info("EarliestCommitToRetain is {} after getEarliestCommitToRetainConsideringSafety",
earliestCommitToRetain.map(HoodieInstant::requestedTime).orElse("null"));
}
return earliestCommitToRetain;
}

/**
* Caps the number of instants to clean in a single clean call and ensures ordering of
* earliest commit to retain across cleans (ECTR must be monotonically increasing).
*/
private Option<HoodieInstant> getEarliestCommitToRetainConsideringSafety(Option<HoodieInstant> earliestCommitToRetain) {
if (!earliestCommitToRetain.isPresent()) {
return earliestCommitToRetain;
}
try {
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
String previousEarliestCommitToRetain;
if (lastClean.isPresent()) {
HoodieCleanMetadata cleanMetadata = hoodieTable.getActiveTimeline().readCleanMetadata(lastClean.get());
if (cleanMetadata == null || cleanMetadata.getEarliestCommitToRetain() == null) {
log.warn("Empty clean metadata found for {}", lastClean.get());
previousEarliestCommitToRetain = HoodieTimeline.INIT_INSTANT_TS;
} else {
previousEarliestCommitToRetain = cleanMetadata.getEarliestCommitToRetain();
if (!StringUtils.isNullOrEmpty(previousEarliestCommitToRetain)) {
if (compareTimestamps(earliestCommitToRetain.get().requestedTime(),
LESSER_THAN_OR_EQUALS,
previousEarliestCommitToRetain)) {
log.info("earliestCommitToRetain {} is less than or equal to previousEarliestCommitToRetain {}.",
earliestCommitToRetain.get().requestedTime(), previousEarliestCommitToRetain);
return Option.empty();
}
}
}
} else {
previousEarliestCommitToRetain = HoodieTimeline.INIT_INSTANT_TS;
}
if (config.getMaxCommitsToClean() == -1) {
return earliestCommitToRetain;
}
int maxInstantsToClean = config.getMaxCommitsToClean() + 1;

Option<HoodieInstant> maxInstant = getCommitTimeline()
.findInstantsAfter(previousEarliestCommitToRetain, maxInstantsToClean + 1)
.lastInstant();
if (!maxInstant.isPresent()) {
return Option.empty();
}
return compareTimestamps(
earliestCommitToRetain.get().requestedTime(),
LESSER_THAN,
maxInstant.get().requestedTime()) ? earliestCommitToRetain : maxInstant;
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}

/**
* Returns the last completed commit timestamp before clean.
*/
Expand Down
Loading