Skip to content

Commit

Permalink
[HUDI-7507] Adding timestamp ordering validation before creating requ…
Browse files Browse the repository at this point in the history
…ested instant (#11580)

- Adding timestamp ordering validation before creating requested timeline file
  • Loading branch information
nsivabalan authored Oct 7, 2024
1 parent 0fb3e94 commit 506f106
Show file tree
Hide file tree
Showing 34 changed files with 605 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.client.timeline.TimestampUtils;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
Expand Down Expand Up @@ -331,6 +332,21 @@ protected final void saveInternalSchema(HoodieTable table, String instantTime, H

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient);

/**
* Validate timestamp for new commits. Here we validate that the instant time being validated is the latest among all entries in the timeline.
* This will ensure timestamps are monotonically increasing. With multi=writers, out of order commits completion are still possible, just that
* when a new commit starts, it will always get the highest commit time compared to other instants in the timeline.
* @param metaClient instance of{@link HoodieTableMetaClient} to be used.
* @param instantTime instant time of the current commit thats in progress.
*/
protected abstract void validateTimestamp(HoodieTableMetaClient metaClient, String instantTime);

protected void validateTimestampInternal(HoodieTableMetaClient metaClient, String instantTime) {
if (config.shouldEnableTimestampOrderinValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
}
}

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
Expand Down Expand Up @@ -932,6 +948,15 @@ private void startCommit(String instantTime, String actionType, HoodieTableMetaC
+ "table could be in an inconsistent state. Pending restores: " + Arrays.toString(inflightRestoreTimeline.getInstantsAsStream()
.map(instant -> instant.getTimestamp()).collect(Collectors.toList()).toArray()));

HoodieInstant requestedInstant = new HoodieInstant(State.REQUESTED, instantTime, actionType);
this.txnManager.beginTransaction(Option.of(requestedInstant),
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
validateTimestamp(metaClient, instantTime);
} finally {
txnManager.endTransaction(Option.of(requestedInstant));
}

// if there are pending compactions, their instantTime must not be greater than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
ValidationUtils.checkArgument(
Expand All @@ -941,7 +966,6 @@ private void startCommit(String instantTime, String actionType, HoodieTableMetaC
if (config.getFailedWritesCleanPolicy().isLazy()) {
this.heartbeatClient.start(instantTime);
}

if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
metaClient.getActiveTimeline().createRequestedReplaceCommit(instantTime, actionType);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.timeline;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;

public class TimestampUtils {

public static void validateForLatestTimestamp(HoodieTableMetaClient metaClient, String instantTime) {
// validate that the instant for which requested is about to be created is the latest in the timeline.
if (!metaClient.isMetadataTable()) { // lets validate data table that timestamps are generated in monotically increasing order.
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
reloadedMetaClient.getActiveTimeline().getWriteTimeline().lastInstant().ifPresent(entry -> {
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(entry.getTimestamp(), HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime),
"Found later commit time " + entry + ", compared to the current instant " + instantTime + ", hence failing to create requested commit meta file");
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,14 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "The class must be a subclass of `org.apache.hudi.callback.HoodieClientInitCallback`."
+ "By default, no Hudi client init callback is executed.");

public static final ConfigProperty<Boolean> ENABLE_TIMESTAMP_ORDERING_VALIDATION = ConfigProperty
.key("hoodie.timestamp.ordering.validate.enable")
.defaultValue(false)
.markAdvanced()
.sinceVersion("0.15.1")
.withDocumentation("Enable validation for commit time generation to ensure new commit time generated is always the latest among other entries. "
+ "This is for additional safety to always generate a monotonically increasing commit times (for ingestion writer, table services etc).");

/**
* Config key with boolean value that indicates whether record being written during MERGE INTO Spark SQL
* operation are already prepped.
Expand Down Expand Up @@ -2621,6 +2629,10 @@ public Integer getWritesFileIdEncoding() {
return props.getInteger(WRITES_FILEID_ENCODING, HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
}

public Boolean shouldEnableTimestampOrderinValidation() {
return getBoolean(ENABLE_TIMESTAMP_ORDERING_VALIDATION);
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
Expand Down Expand Up @@ -3135,6 +3147,11 @@ public Builder withWritesFileIdEncoding(Integer fileIdEncoding) {
return this;
}

public Builder withEnableTimestampOrderingValidation(boolean enableTimestampOrderingValidation) {
writeConfig.setValue(ENABLE_TIMESTAMP_ORDERING_VALIDATION, Boolean.toString(enableTimestampOrderingValidation));
return this;
}

protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.timeline.TimestampUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -896,6 +897,18 @@ public void validateInsertSchema() throws HoodieInsertException {
}
}

/**
* Validates that the instantTime is latest in the write timeline.
* @param instantTime instant time of interest.
*/
public abstract void validateForLatestTimestamp(String instantTime);

protected void validateForLatestTimestampInternal(String instantTime) {
if (this.config.shouldEnableTimestampOrderinValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
}
}

public HoodieFileFormat getBaseFileFormat() {
return metaClient.getTableConfig().getBaseFileFormat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
// Save to both aux and timeline folder
try {
table.validateForLatestTimestamp(cleanInstant.getTimestamp());
table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
LOG.info("Requesting Cleaning with instant time " + cleanInstant);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public Option<HoodieClusteringPlan> execute() {
.setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
.setClusteringPlan(planOption.get())
.build();
table.validateForLatestTimestamp(clusteringInstant.getTimestamp());
table.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public Option<HoodieCompactionPlan> execute() {
Option<HoodieCompactionPlan> option = Option.empty();
if (plan != null && nonEmpty(plan.getOperations())) {
extraMetadata.ifPresent(plan::setExtraMetadata);
table.validateForLatestTimestamp(instantTime);
try {
if (operationType.equals(WriteOperationType.COMPACT)) {
HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ protected Option<HoodieRollbackPlan> requestRollback(String startRollbackTime) {
HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new HoodieInstantInfo(instantToRollback.getTimestamp(),
instantToRollback.getAction()), rollbackRequests, LATEST_ROLLBACK_PLAN_VERSION);
if (!skipTimelinePublish) {
table.validateForLatestTimestamp(rollbackInstant.getTimestamp());
if (table.getRollbackTimeline().filterInflightsAndRequested().containsInstant(rollbackInstant.getTimestamp())) {
LOG.warn("Request Rollback found with instant time " + rollbackInstant + ", hence skipping scheduling rollback");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, metaClient);
}

@Override
protected void validateTimestamp(HoodieTableMetaClient metaClient, String instantTime) {
// no op
}

@Override
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext
super(config, context, metaClient);
}

@Override
public void validateForLatestTimestamp(String instantTime) {
// no-op
}

/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieJavaTable.create(config, context, metaClient);
}

@Override
protected void validateTimestamp(HoodieTableMetaClient metaClient, String instantTime) {
validateTimestampInternal(metaClient, instantTime);
}

@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
String instantTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
super(config, context, metaClient);
}

@Override
public void validateForLatestTimestamp(String instantTime) {
validateForLatestTimestampInternal(instantTime);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1786,15 +1786,15 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {
HoodieJavaWriteClient client = getHoodieWriteClient(config);

// Write 1 (Bulk insert)
String newCommitTime = "0000001";
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.insert(records, newCommitTime);
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// Write 2 (inserts)
newCommitTime = "0000002";
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(records, newCommitTime);
Expand Down Expand Up @@ -1827,7 +1827,7 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {
replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file))));

// trigger new write to mimic other writes succeeding before re-attempt.
newCommitTime = "0000003";
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(records, newCommitTime);
Expand Down
Loading

0 comments on commit 506f106

Please sign in to comment.