diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java index 3242a886a3f07..0ef67c360adb2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java @@ -244,6 +244,17 @@ public class HoodieCleanConfig extends HoodieConfig { + "This can be useful for very large tables to avoid OOM issues during cleaning. " + "If both this config and " + CLEAN_PARTITION_FILTER_REGEX_KEY + " are set, the selected partitions take precedence."); + public static final ConfigProperty MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS = ConfigProperty + .key("hoodie.write.empty.clean.create.duration.ms") + .defaultValue(-1L) + .markAdvanced() + .withDocumentation("In some cases empty clean commit needs to be created to ensure the clean planner " + + "does not look through entire dataset if there are no clean plans. This is possible for append-only " + + "dataset. Also, for these datasets we cannot ignore clean completely since in the future there could " + + "be upsert or replace operations. By creating empty clean commit, earliest_commit_to_retain value " + + "will be updated so that now clean planner can only check for partitions that are modified after the " + + "last empty clean's earliest_commit_toRetain value there by optimizing the clean planning"); + /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */ @Deprecated public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key(); @@ -414,6 +425,11 @@ public HoodieCleanConfig.Builder withPreWriteCleanerPolicy(HoodiePreWriteCleaner return this; } + public HoodieCleanConfig.Builder withMaxDurationToCreateEmptyClean(long duration) { + cleanConfig.setValue(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, String.valueOf(duration)); + return this; + } + public HoodieCleanConfig build() { cleanConfig.setDefaults(HoodieCleanConfig.class.getName()); HoodieCleaningPolicy.valueOf(cleanConfig.getString(CLEANER_POLICY)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9d32ec8b7d22d..cb00bafb6df41 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1842,6 +1842,10 @@ public boolean isAutoClean() { return getBoolean(HoodieCleanConfig.AUTO_CLEAN); } + public long maxDurationToCreateEmptyCleanMs() { + return getLong(HoodieCleanConfig.MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); + } + public boolean shouldArchiveBeyondSavepoint() { return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 64262ab673a29..f09459af24781 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -45,6 +45,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -52,6 +53,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.util.CleanerUtils.CLEAN_METADATA_VERSION_2; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; @Slf4j @@ -134,9 +136,9 @@ private static Stream> deleteFilesFunc(Iterator * @throws IllegalArgumentException if unknown cleaning policy is provided */ List clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { - int cleanerParallelism = Math.min( + int cleanerParallelism = Math.max(1, Math.min( cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(), - config.getCleanerParallelism()); + config.getCleanerParallelism())); log.info("Using cleanerParallelism: {}", cleanerParallelism); context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName()); @@ -155,7 +157,7 @@ List clean(HoodieEngineContext context, HoodieCleanerPlan clean List partitionsToBeDeleted = table.getMetaClient().getTableConfig().isTablePartitioned() && cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() - : new ArrayList<>(); + : Collections.emptyList(); partitionsToBeDeleted.forEach(entry -> { if (!isNullOrEmpty(entry)) { deleteFileAndGetResult(table.getStorage(), table.getMetaClient().getBasePath() + "/" + entry); @@ -213,17 +215,18 @@ private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstan } List cleanStats = clean(context, cleanerPlan); + table.getMetaClient().reloadActiveTimeline(); + HoodieCleanMetadata metadata; if (cleanStats.isEmpty()) { - return HoodieCleanMetadata.newBuilder().build(); + metadata = createEmptyCleanMetadata(cleanerPlan, inflightInstant, timer.endTimer()); + } else { + metadata = CleanerUtils.convertCleanMetadata( + inflightInstant.requestedTime(), + Option.of(timer.endTimer()), + cleanStats, + cleanerPlan.getExtraMetadata() + ); } - - table.getMetaClient().reloadActiveTimeline(); - HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata( - inflightInstant.requestedTime(), - Option.of(timer.endTimer()), - cleanStats, - cleanerPlan.getExtraMetadata() - ); this.txnManager.beginStateChange(Option.of(inflightInstant), Option.empty()); writeTableMetadata(metadata, inflightInstant.requestedTime()); table.getActiveTimeline().transitionCleanInflightToComplete( @@ -238,6 +241,20 @@ private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstan } } + private static HoodieCleanMetadata createEmptyCleanMetadata(HoodieCleanerPlan cleanerPlan, HoodieInstant inflightInstant, long timeTakenMillis) { + return HoodieCleanMetadata.newBuilder() + .setStartCleanTime(inflightInstant.requestedTime()) + .setTimeTakenInMillis(timeTakenMillis) + .setTotalFilesDeleted(0) + .setLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp()) + .setEarliestCommitToRetain(cleanerPlan.getEarliestInstantToRetain().getTimestamp()) + .setVersion(CLEAN_METADATA_VERSION_2) + .setPartitionMetadata(Collections.emptyMap()) + .setExtraMetadata(cleanerPlan.getExtraMetadata()) + .setBootstrapPartitionMetadata(Collections.emptyMap()) + .build(); + } + @Override public HoodieCleanMetadata execute() { List cleanMetadataList = new ArrayList<>(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 24e1d04ed0a29..f196bd644e6ab 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.Option; @@ -42,6 +43,8 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.text.ParseException; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -49,6 +52,8 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; +import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; import static org.apache.hudi.common.util.CleanerUtils.SAVEPOINTED_TIMESTAMPS; import static org.apache.hudi.common.util.MapUtils.nonEmpty; @@ -94,6 +99,22 @@ private boolean needsCleaning(CleaningTriggerStrategy strategy) { } } + private HoodieCleanerPlan getEmptyCleanerPlan(Option earliestInstant, CleanPlanner planner) throws IOException { + HoodieCleanerPlan.Builder cleanBuilder = HoodieCleanerPlan.newBuilder() + .setFilePathsToBeDeletedPerPartition(Collections.emptyMap()) + .setExtraMetadata(prepareExtraMetadata(planner.getSavepointedTimestamps())); + if (earliestInstant.isPresent()) { + HoodieInstant hoodieInstant = earliestInstant.get(); + cleanBuilder.setPolicy(config.getCleanerPolicy().name()) + .setVersion(CleanPlanner.LATEST_CLEAN_PLAN_VERSION) + .setEarliestInstantToRetain(new HoodieActionInstant(hoodieInstant.requestedTime(), hoodieInstant.getAction(), hoodieInstant.getState().name())) + .setLastCompletedCommitTimestamp(planner.getLastCompletedCommitTimestamp()); + } else { + cleanBuilder.setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()); + } + return cleanBuilder.build(); + } + /** * Generates List of files to be cleaned. * @@ -109,8 +130,8 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { context.clearJobStatus(); if (partitionsToClean.isEmpty()) { - log.info("Nothing to clean here. It is already clean"); - return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); + log.info("Partitions to clean returned empty. Checking to see if empty clean needs to be created."); + return getEmptyCleanerPlan(earliestInstant, planner); } log.info( "Earliest commit to retain for clean : {}", @@ -214,12 +235,58 @@ protected Option requestClean() { } final HoodieCleanerPlan cleanerPlan = requestClean(cleanerEngineContext); Option option = Option.empty(); - if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition()) - && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) { + if ((cleanerPlan.getPartitionsToBeDeleted() != null && !cleanerPlan.getPartitionsToBeDeleted().isEmpty()) + || (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition()) + && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0)) { // Only create cleaner plan which does some work option = Option.of(cleanerPlan); } + // If cleaner plan returned an empty list, incremental clean is enabled and there was no + // completed clean created in the last X hours configured in MAX_DURATION_TO_CREATE_EMPTY_CLEAN, + // create a dummy clean to avoid full scan in the future. + // Note: For a dataset with incremental clean enabled, that does not receive any updates, cleaner plan always comes + // with an empty list of files to be cleaned. CleanActionExecutor would never be invoked for this dataset. + // To avoid fullscan on the dataset with every ingestion run, empty clean commit is created here. + if (config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.maxDurationToCreateEmptyCleanMs() > 0) { + // Only create an empty clean commit if earliestInstantToRetain is present in the plan + boolean eligibleForEmptyCleanCommit = true; + + // if there is no previous clean instant or the previous clean instant was before the configured max duration, schedule an empty clean commit + Option lastCleanInstant = table.getCleanTimeline().lastInstant(); + if (lastCleanInstant.isPresent()) { + try { + ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(java.time.Instant.now(), table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId()); + long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli(); + long lastCleanTimeMs = HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli(); + eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > config.maxDurationToCreateEmptyCleanMs(); + } catch (ParseException e) { + log.error("Unable to parse last clean commit time", e); + throw new HoodieException("Unable to parse last clean commit time", e); + } + } + if (eligibleForEmptyCleanCommit) { + // Ensure earliestCommitToRetain doesn't go backwards when user changes cleaner configuration + if (lastCleanInstant.isPresent()) { + try { + HoodieCleanMetadata lastCleanMetadata = table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get()); + String previousEarliestCommitToRetain = lastCleanMetadata.getEarliestCommitToRetain(); + String currentEarliestCommitToRetain = cleanerPlan.getEarliestInstantToRetain().getTimestamp(); + if (compareTimestamps(currentEarliestCommitToRetain, LESSER_THAN, previousEarliestCommitToRetain)) { + log.warn("Skipping empty clean creation because earliestCommitToRetain would go backwards. " + + "Previous: {}, Current: {}. This can happen when cleaner configuration is changed.", + previousEarliestCommitToRetain, currentEarliestCommitToRetain); + return option; + } + } catch (IOException e) { + log.error("Unable to read last clean metadata", e); + throw new HoodieException("Unable to read last clean metadata", e); + } + } + log.info("Creating an empty clean instant with earliestCommitToRetain of {}", cleanerPlan.getEarliestInstantToRetain().getTimestamp()); + return Option.of(cleanerPlan); + } + } return option; } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java index cc53e67b74872..0b42837b35869 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java @@ -513,6 +513,14 @@ static Stream keepLatestByHoursOrCommitsArgsIncrCleanPartitions() { Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), Option.empty(), activeInstantsPartitionsMap2, Collections.emptyList(), threePartitionsInActiveTimeline, true, Collections.emptyMap())); + // Empty cleaner plan case + arguments.add(Arguments.of(true, getCleanByHoursConfig(), earliestInstant, lastCompletedInLastClean, lastCleanInstant, + earliestInstantInLastClean, Collections.emptyList(), Collections.emptyMap(), Option.empty(), + activeInstantsPartitionsMap2, Collections.emptyList(), twoPartitionsInActiveTimeline, false, Collections.emptyMap())); + arguments.add(Arguments.of(false, getCleanByHoursConfig(), earliestInstant, lastCompletedInLastClean, lastCleanInstant, + earliestInstantInLastClean, Collections.emptyList(), Collections.emptyMap(), Option.empty(), + activeInstantsUnPartitionsMap, Collections.emptyList(), unPartitionsInActiveTimeline, false, Collections.emptyMap())); + return arguments.stream(); } @@ -598,8 +606,8 @@ private static HoodieCleanMetadata getCleanCommitMetadata(List partition Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), false))); Map extraMetadata = new HashMap<>(); extraMetadata.put(SAVEPOINTED_TIMESTAMPS, savepointsToTrack.stream().collect(Collectors.joining(","))); - return new HoodieCleanMetadata(instantTime, 100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata, - CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata.isEmpty() ? null : extraMetadata); + return new HoodieCleanMetadata(instantTime, 100L, partitionMetadata.isEmpty() ? 0 : 10, earliestCommitToRetain, lastCompletedTime, + partitionMetadata, CLEAN_METADATA_VERSION_2, Collections.emptyMap(), extraMetadata.isEmpty() ? null : extraMetadata); } private static HoodieSavepointMetadata getSavepointMetadata(List partitions) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 6b4f4f7160196..fdac4a963a1b0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -64,7 +64,6 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CleanerUtils; @@ -114,9 +113,11 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.NO_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; import static org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY; +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -374,7 +375,7 @@ public void testCleanNonPartitionedTable() throws IOException { .build()) .withEmbeddedTimelineServerEnabled(false).build(); // datagen for non-partitioned table - initTestDataGenerator(new String[] {HoodieTestDataGenerator.NO_PARTITION_PATH}); + initTestDataGenerator(new String[] {NO_PARTITION_PATH}); // init non-partitioned table HoodieTestUtils.init(storageConf, basePath, HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET, true, "org.apache.hudi.keygen.NonpartitionedKeyGenerator", true); @@ -392,17 +393,17 @@ public void testCleanNonPartitionedTable() throws IOException { instantTime = cleanPlanPair.getLeft(); HoodieCleanerPlan cleanPlan = cleanPlanPair.getRight(); assertEquals(cleanPlan.getPartitionsToBeDeleted().size(), 0); - assertEquals(cleanPlan.getFilePathsToBeDeletedPerPartition().get(HoodieTestDataGenerator.NO_PARTITION_PATH).size(), 1); - String filePathToClean = cleanPlan.getFilePathsToBeDeletedPerPartition().get(HoodieTestDataGenerator.NO_PARTITION_PATH).get(0).getFilePath(); + assertEquals(cleanPlan.getFilePathsToBeDeletedPerPartition().get(NO_PARTITION_PATH).size(), 1); + String filePathToClean = cleanPlan.getFilePathsToBeDeletedPerPartition().get(NO_PARTITION_PATH).get(0).getFilePath(); // clean HoodieTable table = HoodieSparkTable.create(writeConfig, context); HoodieCleanMetadata cleanMetadata = table.clean(context, instantTime); // check the cleaned file - assertEquals(cleanMetadata.getPartitionMetadata().get(HoodieTestDataGenerator.NO_PARTITION_PATH).getSuccessDeleteFiles().size(), 1); - assertTrue(filePathToClean.contains(cleanMetadata.getPartitionMetadata().get(HoodieTestDataGenerator.NO_PARTITION_PATH).getSuccessDeleteFiles().get(0))); + assertEquals(cleanMetadata.getPartitionMetadata().get(NO_PARTITION_PATH).getSuccessDeleteFiles().size(), 1); + assertTrue(filePathToClean.contains(cleanMetadata.getPartitionMetadata().get(NO_PARTITION_PATH).getSuccessDeleteFiles().get(0))); // ensure table is not fully cleaned and has a file group assertTrue(FSUtils.isTableExists(basePath, storage)); - assertTrue(table.getFileSystemView().getAllFileGroups(HoodieTestDataGenerator.NO_PARTITION_PATH).findAny().isPresent()); + assertTrue(table.getFileSystemView().getAllFileGroups(NO_PARTITION_PATH).findAny().isPresent()); } } @@ -1695,4 +1696,52 @@ public void testPreWriteCleanPolicyDisabledWhenTableServicesDisabled(boolean com assertEquals(7, metaClient.reloadActiveTimeline().getWriteTimeline().countInstants()); assertEquals(0, metaClient.getActiveTimeline().getCleanerTimeline().countInstants()); } + + @Test + void testEmptyClean() throws IOException { + // validate that an empty cleaner plan does not throw any errors at execution time + HoodieWriteConfig writeConfig = getConfigBuilder().withPath(basePath) + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) + .retainCommits(1) + .build()) + .withEmbeddedTimelineServerEnabled(false).build(); + // datagen for non-partitioned table + initTestDataGenerator(new String[] {NO_PARTITION_PATH}); + // init non-partitioned table + HoodieTableMetaClient metaClient = HoodieTestUtils.init(getDefaultStorageConf(), basePath, HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET, + true, "org.apache.hudi.keygen.NonpartitionedKeyGenerator", true); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) { + String instantTime = client.startCommit(); + List records = dataGen.generateInserts(instantTime, 1); + client.commit(instantTime, client.insert(jsc.parallelize(records, 1), instantTime)); + + instantTime = metaClient.createNewInstantTime(false); + HoodieTable table = HoodieSparkTable.create(writeConfig, context); + + HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); + HoodieInstant hoodieInstant = timeline.firstInstant().get(); + HoodieCleanerPlan cleanerPlan = HoodieCleanerPlan.newBuilder() + .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name()) + .setVersion(CleanPlanner.LATEST_CLEAN_PLAN_VERSION) + .setEarliestInstantToRetain(new HoodieActionInstant(hoodieInstant.requestedTime(), hoodieInstant.getAction(), hoodieInstant.getState().name())) + .setLastCompletedCommitTimestamp(timeline.lastInstant().get().requestedTime()) + .setFilePathsToBeDeletedPerPartition(Collections.emptyMap()) + .build(); + final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, instantTime, + metaClient.getTimelineLayout().getInstantComparator().completionTimeOrderedComparator()); + table.getActiveTimeline().saveToCleanRequested(cleanInstant, Option.of(cleanerPlan)); + + table.getMetaClient().reloadActiveTimeline(); + // clean + HoodieCleanMetadata cleanMetadata = table.clean(context, instantTime); + // check the cleaned files are empty + assertTrue(cleanMetadata.getPartitionMetadata().isEmpty()); + } + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java index 66385e0952e96..cfc4ef3f5cd9a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -18,8 +18,10 @@ package org.apache.hudi.table.functional; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.WriteClientTestUtils; +import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.BootstrapFileMapping; @@ -30,11 +32,13 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -48,12 +52,14 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.nio.file.Files; import java.nio.file.Paths; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -61,8 +67,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_COMPARATOR; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -813,4 +821,202 @@ public void testKeepXHoursWithCleaning( testTable.close(); } } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testEmptyCleansAddedAfterThreshold(boolean secondCommitAfterThreshold) throws Exception { + boolean enableIncrementalClean = true; + boolean enableBootstrapSourceClean = false; + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withIncrementalCleaningMode(enableIncrementalClean) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2) + .withMaxDurationToCreateEmptyClean(TimeUnit.MINUTES.toMillis(60)) + .build()) + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + try { + String p0 = "2020/01/01"; + + String file1P0C0 = UUID.randomUUID().toString(); + Instant instant = Instant.now(); + ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, metaClient.getTableConfig().getTimelineTimezone().getZoneId()); + int minutesForFirstCommit = 180; + String firstCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusMinutes(minutesForFirstCommit).toInstant())); + + commitToTestTable(testTable, firstCommitTs, p0, file1P0C0); + testTable = tearDownTestTableAndReinit(testTable, config); + + // make next commit, with 1 insert & 1 update per partition + String file2P0C1 = UUID.randomUUID().toString(); + int minutesForSecondCommit = 150; + String secondCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusMinutes(minutesForSecondCommit).toInstant())); + testTable = tearDownTestTableAndReinit(testTable, config); + + commitToTestTable(testTable, secondCommitTs, p0, file2P0C1); + testTable = tearDownTestTableAndReinit(testTable, config); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // make next commit, with 1 insert per partition + int minutesForThirdCommit = 90; + String thirdCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusMinutes(minutesForThirdCommit).toInstant())); + String file3P0C2 = UUID.randomUUID().toString(); + + testTable = tearDownTestTableAndReinit(testTable, config); + + commitToTestTable(testTable, thirdCommitTs, p0, file3P0C2); + testTable = tearDownTestTableAndReinit(testTable, config); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // first empty clean can be generated since earliest instant to retain will be the first commit (always keep last two instants at a minimum) + String firstCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minus(secondCommitAfterThreshold ? 70 : 30, ChronoUnit.MINUTES).toInstant())); + + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + List hoodieCleanStatsThree = runCleaner(config, false, false, writeClient, firstCleanInstant); + assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any partitions and clean any files"); + assertEquals(1, metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants().countInstants()); + String actualFirst = metaClient.getActiveTimeline().getCleanerTimeline().lastInstant().get().requestedTime(); + writeClient.close(); + + String file4P0C1 = UUID.randomUUID().toString(); + int minutesForFourthCommit = 10; + String fourthCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusMinutes(minutesForFourthCommit).toInstant())); + testTable = tearDownTestTableAndReinit(testTable, config); + + commitToTestTable(testTable, fourthCommitTs, p0, file4P0C1); + testTable = tearDownTestTableAndReinit(testTable, config); + + // add a savepoint + getHoodieWriteClient(config).savepoint(fourthCommitTs, "user", "comment"); + + Date firstCleanDate = HoodieInstantTimeGenerator.parseDateFromInstantTime(firstCleanInstant); + int minutesBetweenCleans = secondCommitAfterThreshold ? 70 : 30; + String secondCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from(firstCleanDate.toInstant().plus(minutesBetweenCleans, ChronoUnit.MINUTES))); + + writeClient = getHoodieWriteClient(config); + List hoodieCleanStatsFour = runCleaner(config, false, false, writeClient, secondCleanInstant); + HoodieTimeline finalCompletedCleanInstants = metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants(); + if (secondCommitAfterThreshold) { + // second empty clean is added + assertEquals(0, hoodieCleanStatsFour.size(), "Must not scan any partitions and clean any files"); + assertEquals(2, finalCompletedCleanInstants.countInstants()); + // Ensure that extra metadata is properly set for empty clean commits + HoodieCleanMetadata secondCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), finalCompletedCleanInstants.lastInstant().get()); + // new clean should have the savepoint created + assertEquals(fourthCommitTs, secondCleanMetadata.getExtraMetadata().get(CleanerUtils.SAVEPOINTED_TIMESTAMPS)); + // assertEquals(thirdCommitTs, secondCleanMetadata.getExtraMetadata().get(CleanPlanner.EARLIEST_COMMIT_TO_NOT_ARCHIVE)); + } else { + // no cleaner commit should be added because the time since last clean threshold has not been met + assertEquals(1, finalCompletedCleanInstants.countInstants()); + // Ensure that extra metadata is properly set for empty clean commits + HoodieCleanMetadata firstCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), finalCompletedCleanInstants.lastInstant().get()); + //assertEquals(thirdCommitTs, firstCleanMetadata.getExtraMetadata().get(CleanPlanner.EARLIEST_COMMIT_TO_NOT_ARCHIVE)); + // first clean commit happened before the savepoint so this field is expected to not be present in the map + assertFalse(firstCleanMetadata.getExtraMetadata().containsKey(CleanerUtils.SAVEPOINTED_TIMESTAMPS)); + } + writeClient.close(); + } finally { + testTable.close(); + } + } + + @Test + void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception { + // Test that earliestCommitToRetain never goes backwards when user changes cleaner config + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withIncrementalCleaningMode(true) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(false) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(24) + .withMaxDurationToCreateEmptyClean(TimeUnit.MINUTES.toMillis(60)) + .build()) + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + try { + String p0 = "2020/01/01"; + Instant instant = Instant.now(); + ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, metaClient.getTableConfig().getTimelineTimezone().getZoneId()); + + // Create first commit 48 hours ago + String file1P0C0 = UUID.randomUUID().toString(); + String firstCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(48).toInstant())); + commitToTestTable(testTable, firstCommitTs, p0, file1P0C0); + testTable = tearDownTestTableAndReinit(testTable, config); + + // Create second commit 36 hours ago + String file2P0C1 = UUID.randomUUID().toString(); + String secondCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(36).toInstant())); + commitToTestTable(testTable, secondCommitTs, p0, file2P0C1); + testTable = tearDownTestTableAndReinit(testTable, config); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Create third commit 12 hours ago + String file3P0C2 = UUID.randomUUID().toString(); + String thirdCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(12).toInstant())); + commitToTestTable(testTable, thirdCommitTs, p0, file3P0C2); + testTable = tearDownTestTableAndReinit(testTable, config); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Run first empty clean 2 hours ago - should retain commits from 26 hours ago (24h retention + 2h safety) + String firstCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(2).toInstant())); + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + List hoodieCleanStatsOne = runCleaner(config, false, false, writeClient, firstCleanInstant); + assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); + assertEquals(1, metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants().countInstants()); + + // Get the earliestCommitToRetain from first clean + HoodieInstant firstCleanCompleted = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant().get(); + HoodieCleanMetadata firstCleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, firstCleanCompleted); + String firstEarliestCommitToRetain = firstCleanMetadata.getEarliestCommitToRetain(); + writeClient.close(); + + // Now change config to retain only 12 hours (which would normally make earliestCommitToRetain go backwards) + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withIncrementalCleaningMode(true) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(false) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(12) + .withMaxDurationToCreateEmptyClean(TimeUnit.MINUTES.toMillis(60)) + .build()) + .build(); + + // Try to create another empty clean with the new config 61 minutes after first clean + String secondCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from( + HoodieInstantTimeGenerator.parseDateFromInstantTime(firstCleanInstant).toInstant().plus(61, ChronoUnit.MINUTES))); + + writeClient = getHoodieWriteClient(newConfig); + List hoodieCleanStatsTwo = runCleaner(newConfig, false, false, writeClient, secondCleanInstant); + + // The clean should be skipped because earliestCommitToRetain would go backwards + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTimeline cleanTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); + assertEquals(1, cleanTimeline.countInstants(), "Second clean should be skipped to prevent earliestCommitToRetain from going backwards"); + + // Verify earliestCommitToRetain did not change + HoodieCleanMetadata latestCleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, cleanTimeline.lastInstant().get()); + assertEquals(firstEarliestCommitToRetain, latestCleanMetadata.getEarliestCommitToRetain(), + "earliestCommitToRetain should not go backwards"); + writeClient.close(); + } finally { + testTable.close(); + } + } + + private void commitToTestTable(HoodieTestTable testTable, String commitTimeTs, String partition, String fileId) throws Exception { + testTable.addInflightCommit(commitTimeTs); + testTable.withBaseFilesInPartition(partition, fileId); + HoodieCommitMetadata commitMeta = generateCommitMetadata(commitTimeTs, Collections.singletonMap(partition, Collections.singletonList(fileId))); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTimeTs, INSTANT_COMPARATOR.completionTimeOrderedComparator()), + Option.of(commitMeta)); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java index aa1a5ee6a9697..d698e470b98d7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java @@ -100,6 +100,14 @@ protected List runCleaner( return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 1, false); } + protected List runCleaner( + HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure, + Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d"); + return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, writeClient, cleanInstantTs); + } + /** * Helper to run cleaner and collect Clean Stats. * @@ -107,11 +115,8 @@ protected List runCleaner( */ protected List runCleaner( HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure, - Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { - SparkRDDWriteClient writeClient = getHoodieWriteClient(config); - String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d"); + SparkRDDWriteClient writeClient, String cleanInstantTs) throws IOException { HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); - if (null == cleanMetadata1) { return new ArrayList<>(); }