Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ public class HoodieCleanConfig extends HoodieConfig {
+ "This can be useful for very large tables to avoid OOM issues during cleaning. "
+ "If both this config and " + CLEAN_PARTITION_FILTER_REGEX_KEY + " are set, the selected partitions take precedence.");

public static final ConfigProperty<Long> MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS = ConfigProperty
.key("hoodie.write.empty.clean.create.duration.ms")
.defaultValue(-1L)
.markAdvanced()
.withDocumentation("In some cases empty clean commit needs to be created to ensure the clean planner "
+ "does not look through entire dataset if there are no clean plans. This is possible for append-only "
+ "dataset. Also, for these datasets we cannot ignore clean completely since in the future there could "
+ "be upsert or replace operations. By creating empty clean commit, earliest_commit_to_retain value "
+ "will be updated so that now clean planner can only check for partitions that are modified after the "
+ "last empty clean's earliest_commit_toRetain value there by optimizing the clean planning");

/** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
@Deprecated
public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key();
Expand Down Expand Up @@ -414,6 +425,11 @@ public HoodieCleanConfig.Builder withPreWriteCleanerPolicy(HoodiePreWriteCleaner
return this;
}

public HoodieCleanConfig.Builder withMaxDurationToCreateEmptyClean(long duration) {
cleanConfig.setValue(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, String.valueOf(duration));
return this;
}

public HoodieCleanConfig build() {
cleanConfig.setDefaults(HoodieCleanConfig.class.getName());
HoodieCleaningPolicy.valueOf(cleanConfig.getString(CLEANER_POLICY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1835,6 +1835,10 @@ public boolean isAutoClean() {
return getBoolean(HoodieCleanConfig.AUTO_CLEAN);
}

public long maxDurationToCreateEmptyCleanMs() {
return getLong(HoodieCleanConfig.MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS);
}

public boolean shouldArchiveBeyondSavepoint() {
return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.util.CleanerUtils.CLEAN_METADATA_VERSION_2;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;

@Slf4j
Expand Down Expand Up @@ -134,9 +136,9 @@ private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator
* @throws IllegalArgumentException if unknown cleaning policy is provided
*/
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
int cleanerParallelism = Math.max(1, Math.min(
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(),
config.getCleanerParallelism());
config.getCleanerParallelism()));
log.info("Using cleanerParallelism: {}", cleanerParallelism);

context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName());
Expand All @@ -155,7 +157,7 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean

List<String> partitionsToBeDeleted = table.getMetaClient().getTableConfig().isTablePartitioned() && cleanerPlan.getPartitionsToBeDeleted() != null
? cleanerPlan.getPartitionsToBeDeleted()
: new ArrayList<>();
: Collections.emptyList();
partitionsToBeDeleted.forEach(entry -> {
if (!isNullOrEmpty(entry)) {
deleteFileAndGetResult(table.getStorage(), table.getMetaClient().getBasePath() + "/" + entry);
Expand Down Expand Up @@ -213,17 +215,18 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
}

List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);
table.getMetaClient().reloadActiveTimeline();
HoodieCleanMetadata metadata;
if (cleanStats.isEmpty()) {
return HoodieCleanMetadata.newBuilder().build();
metadata = createEmptyCleanMetadata(cleanerPlan, inflightInstant, timer.endTimer());
} else {
metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.requestedTime(),
Option.of(timer.endTimer()),
cleanStats,
cleanerPlan.getExtraMetadata()
);
}

table.getMetaClient().reloadActiveTimeline();
HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.requestedTime(),
Option.of(timer.endTimer()),
cleanStats,
cleanerPlan.getExtraMetadata()
);
this.txnManager.beginStateChange(Option.of(inflightInstant), Option.empty());
writeTableMetadata(metadata, inflightInstant.requestedTime());
table.getActiveTimeline().transitionCleanInflightToComplete(
Expand All @@ -238,6 +241,20 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
}
}

private static HoodieCleanMetadata createEmptyCleanMetadata(HoodieCleanerPlan cleanerPlan, HoodieInstant inflightInstant, long timeTakenMillis) {
return HoodieCleanMetadata.newBuilder()
.setStartCleanTime(inflightInstant.requestedTime())
.setTimeTakenInMillis(timeTakenMillis)
.setTotalFilesDeleted(0)
.setLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp())
.setEarliestCommitToRetain(cleanerPlan.getEarliestInstantToRetain().getTimestamp())
.setVersion(CLEAN_METADATA_VERSION_2)
.setPartitionMetadata(Collections.emptyMap())
.setExtraMetadata(cleanerPlan.getExtraMetadata())
.setBootstrapPartitionMetadata(Collections.emptyMap())
.build();
}

@Override
public HoodieCleanMetadata execute() {
List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
Expand All @@ -42,6 +43,8 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.text.ParseException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -94,6 +97,22 @@ private boolean needsCleaning(CleaningTriggerStrategy strategy) {
}
}

private HoodieCleanerPlan getEmptyCleanerPlan(Option<HoodieInstant> earliestInstant, CleanPlanner<T, I, K, O> planner) throws IOException {
HoodieCleanerPlan.Builder cleanBuilder = HoodieCleanerPlan.newBuilder()
.setFilePathsToBeDeletedPerPartition(Collections.emptyMap())
.setExtraMetadata(prepareExtraMetadata(planner.getSavepointedTimestamps()));
if (earliestInstant.isPresent()) {
HoodieInstant hoodieInstant = earliestInstant.get();
cleanBuilder.setPolicy(config.getCleanerPolicy().name())
.setVersion(CleanPlanner.LATEST_CLEAN_PLAN_VERSION)
.setEarliestInstantToRetain(new HoodieActionInstant(hoodieInstant.requestedTime(), hoodieInstant.getAction(), hoodieInstant.getState().name()))
.setLastCompletedCommitTimestamp(planner.getLastCompletedCommitTimestamp());
} else {
cleanBuilder.setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name());
}
return cleanBuilder.build();
}

/**
* Generates List of files to be cleaned.
*
Expand All @@ -109,8 +128,10 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
context.clearJobStatus();

if (partitionsToClean.isEmpty()) {
log.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
//log.info("Nothing to clean here. It is already clean");
//return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
log.info("Partitions to clean returned empty. Checking to see if empty clean needs to be created.");
return getEmptyCleanerPlan(earliestInstant, planner);
}
log.info(
"Earliest commit to retain for clean : {}",
Expand Down Expand Up @@ -214,15 +235,53 @@ protected Option<HoodieCleanerPlan> requestClean() {
}
final HoodieCleanerPlan cleanerPlan = requestClean(cleanerEngineContext);
Option<HoodieCleanerPlan> option = Option.empty();
if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
&& cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
if ((cleanerPlan.getPartitionsToBeDeleted() != null && !cleanerPlan.getPartitionsToBeDeleted().isEmpty())
|| (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
&& cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0)) {
// Only create cleaner plan which does some work
option = Option.of(cleanerPlan);
}
// If cleaner plan returned an empty list, incremental clean is enabled and there was no
// completed clean created in the last X hours configured in MAX_DURATION_TO_CREATE_EMPTY_CLEAN,
// create a dummy clean to avoid full scan in the future.
// Note: For a dataset with incremental clean enabled, that does not receive any updates, cleaner plan always comes
// with an empty list of files to be cleaned. CleanActionExecutor would never be invoked for this dataset.
// To avoid fullscan on the dataset with every ingestion run, empty clean commit is created here.
if (config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.maxDurationToCreateEmptyCleanMs() > 0) {
// Only create an empty clean commit if earliestInstantToRetain is present in the plan
boolean eligibleForEmptyCleanCommit = true;

// if there is no previous clean instant or the previous clean instant was before the configured max duration, schedule an empty clean commit
Option<HoodieInstant> lastCleanInstant = table.getCleanTimeline().lastInstant();
if (lastCleanInstant.isPresent()) {
try {
ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(java.time.Instant.now(), table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli();
long lastCleanTimeMs = HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli();
eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > config.maxDurationToCreateEmptyCleanMs();
} catch (ParseException e) {
log.error("Unable to parse last clean commit time", e);
throw new HoodieException("Unable to parse last clean commit time", e);
}
}
if (eligibleForEmptyCleanCommit) {
log.warn("Creating an empty clean instant with earliestCommitToRetain of {}", cleanerPlan.getEarliestInstantToRetain().getTimestamp());
return Option.of(cleanerPlan);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ensuring that the ECTR of the new empty clean is always >= than ECTR of latest clean?
For context, in our org's internal impl we initially had a bug where

  1. T1.clean is completed
  2. cleaner commits retained is increased
  3. empty clean T2.clean is completed, but with an ECTR that is before T1's ECTR

so wanted to make sure this wasn't an issue here as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if thats related to empty clean feature right.
we can take it as a separate follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only saw this issue arise after introducing empty clean, since without this empty clean logic (at least on 0.14):

  • incremental clean won't find any instants to scan, since there will be 0 instants between the interval [Prev ECTR, Proposed ECTR) since Proposed ECTR < Prev ECTR
  • The partition-level method for getting data files to clean (given proposed ECTR & current timeline) will anyway return 0 files to clean
    So there will end up being 0 files to clean -> no clean plan created

}
}
return option;
}

/**
* Create the clean.requested instant.
*
* @param startCleanTime - instant time of the clean.requested
* @param cleanerPlan - content to be written into the clean.requested.
*/
private void createCleanRequested(String startCleanTime, HoodieCleanerPlan cleanerPlan) {

}

@Override
public Option<HoodieCleanerPlan> execute() {
if (!needsCleaning(config.getCleaningTriggerStrategy())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,14 @@ static Stream<Arguments> keepLatestByHoursOrCommitsArgsIncrCleanPartitions() {
Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), Option.empty(),
activeInstantsPartitionsMap2, Collections.emptyList(), threePartitionsInActiveTimeline, true, Collections.emptyMap()));

// Empty cleaner plan case
arguments.add(Arguments.of(true, getCleanByHoursConfig(), earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.emptyList(), Collections.emptyMap(), Option.empty(),
activeInstantsPartitionsMap2, Collections.emptyList(), twoPartitionsInActiveTimeline, false, Collections.emptyMap()));
arguments.add(Arguments.of(false, getCleanByHoursConfig(), earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.emptyList(), Collections.emptyMap(), Option.empty(),
activeInstantsUnPartitionsMap, Collections.emptyList(), unPartitionsInActiveTimeline, false, Collections.emptyMap()));

return arguments.stream();
}

Expand Down Expand Up @@ -598,8 +606,8 @@ private static HoodieCleanMetadata getCleanCommitMetadata(List<String> partition
Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), false)));
Map<String, String> extraMetadata = new HashMap<>();
extraMetadata.put(SAVEPOINTED_TIMESTAMPS, savepointsToTrack.stream().collect(Collectors.joining(",")));
return new HoodieCleanMetadata(instantTime, 100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata.isEmpty() ? null : extraMetadata);
return new HoodieCleanMetadata(instantTime, 100L, partitionMetadata.isEmpty() ? 0 : 10, earliestCommitToRetain, lastCompletedTime,
partitionMetadata, CLEAN_METADATA_VERSION_2, Collections.emptyMap(), extraMetadata.isEmpty() ? null : extraMetadata);
}

private static HoodieSavepointMetadata getSavepointMetadata(List<String> partitions) {
Expand Down
Loading
Loading