Skip to content

Commit

Permalink
[HUDI-7262] Validate checksum only if it exists (#10417)
Browse files Browse the repository at this point in the history
  • Loading branch information
beyond1920 authored and yihua committed Feb 27, 2024
1 parent 0b0990d commit d07274d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ private static TypedProperties fetchConfigs(FileSystem fs, String metaPath) thro
props.clear();
props.load(is);
found = true;
ValidationUtils.checkArgument(validateChecksum(props));
if (props.containsKey(TABLE_CHECKSUM.key())) {
ValidationUtils.checkArgument(HoodieTableConfig.validateChecksum(props));
}
return props;
} catch (IOException e) {
LOG.warn(String.format("Could not read properties from %s: %s", path, e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -160,14 +161,15 @@ public void testReadRetry() throws IOException {

// Should return backup config if hoodie.properties is corrupted
Properties props = new Properties();
props.put(TABLE_CHECKSUM.key(), "0");
try (OutputStream out = fs.create(cfgPath)) {
props.store(out, "No checksum in file so is invalid");
props.store(out, "Wrong checksum in file so is invalid");
}
new HoodieTableConfig(fs, metaPath.toString(), null, null);

// Should throw exception if both hoodie.properties and backup are corrupted
try (OutputStream out = fs.create(backupCfgPath)) {
props.store(out, "No checksum in file so is invalid");
props.store(out, "Wrong checksum in file so is invalid");
}
assertThrows(IllegalArgumentException.class, () -> new HoodieTableConfig(fs, metaPath.toString(), null, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package org.apache.spark.sql.hudi.procedure
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils}
import org.apache.spark.api.java.JavaSparkContext

import java.io.IOException
import java.time.Instant

class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase {

Expand Down Expand Up @@ -104,8 +106,38 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase {

// downgrade table to THREE
checkAnswer(s"""call downgrade_table(table => '$tableName', to_version => 'THREE')""")(Seq(true))
// upgrade table to FOUR
checkAnswer(s"""call upgrade_table(table => '$tableName', to_version => 'FOUR')""")(Seq(true))
var metaClient = HoodieTableMetaClient.builder
.setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration())
.setBasePath(tablePath)
.build
// verify hoodie.table.version of the table is THREE
assertResult(HoodieTableVersion.THREE.versionCode) {
metaClient.getTableConfig.getTableVersion.versionCode()
}
val metaPathDir = new Path(metaClient.getBasePath, HoodieTableMetaClient.METAFOLDER_NAME)
// delete checksum from hoodie.properties
val props = ConfigUtils.fetchConfigs(metaClient.getFs, metaPathDir.toString, HoodieTableConfig.HOODIE_PROPERTIES_FILE, HoodieTableConfig.HOODIE_PROPERTIES_FILE_BACKUP, 1, 1000)
props.remove(HoodieTableConfig.TABLE_CHECKSUM.key)
try {
val outputStream = metaClient.getFs.create(new Path(metaPathDir, HoodieTableConfig.HOODIE_PROPERTIES_FILE))
props.store(outputStream, "Updated at " + Instant.now)
outputStream.close()
} catch {
case e: Exception => fail(e)
}
// verify hoodie.table.checksum is deleted from hoodie.properties
metaClient = HoodieTableMetaClient.reload(metaClient)
assertResult(false) {metaClient.getTableConfig.contains(HoodieTableConfig.TABLE_CHECKSUM)}
// upgrade table to SIX
checkAnswer(s"""call upgrade_table(table => '$tableName', to_version => 'SIX')""")(Seq(true))
metaClient = HoodieTableMetaClient.reload(metaClient)
assertResult(HoodieTableVersion.SIX.versionCode) {
metaClient.getTableConfig.getTableVersion.versionCode()
}
val expectedCheckSum = BinaryUtil.generateChecksum(StringUtils.getUTF8Bytes(tableName))
assertResult(expectedCheckSum) {
metaClient.getTableConfig.getLong(HoodieTableConfig.TABLE_CHECKSUM)
}
}
}

Expand Down

0 comments on commit d07274d

Please sign in to comment.