Skip to content

Commit

Permalink
[HUDI-8570] Use secondary index only for snapshot queries (#12322)
Browse files Browse the repository at this point in the history
Use secondary index only for snapshot queries. Skip secondary index and 
fallback to regular query path for query types such as time travel and incremental.
  • Loading branch information
codope authored Nov 24, 2024
1 parent f4e810b commit 7b773fc
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@
package org.apache.spark.sql.hudi.command.index

import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, INSERT_OPERATION_OPT_VAL, OPERATION, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE, UPSERT_OPERATION_OPT_VAL}
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils}
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR
import org.apache.hudi.metadata.SecondaryIndexKeyUtils
import org.apache.hudi.storage.StoragePath
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -224,18 +226,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
val basePath = s"${tmp.getCanonicalPath}/$tableName"
// Step 1: Initial Insertion of Records
val dataGen = new HoodieTestDataGenerator()
val initialRecords = recordsToStrings(dataGen.generateInserts(getInstantTime, 50, true)).asScala
val initialDf = spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))
val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ", HoodieWriteConfig.TBL_NAME.key -> tableName)
initialDf.write.format("hudi")
.options(hudiOpts)
.option(OPERATION.key, INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

// Step 2: Create table and secondary index on 'rider' column
spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'")
spark.sql(s"create index idx_rider on $tableName using secondary_index(rider)")
val hudiOpts: Map[String, String] = loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen)

// Verify initial state of secondary index
val initialKeys = spark.sql(s"select _row_key from $tableName limit 5").collect().map(_.getString(0))
Expand Down Expand Up @@ -287,7 +278,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
validateSecondaryIndex(basePath, tableName, updateKeys)

// Step 6: Perform Deletes on Records and Validate Secondary Index
val deleteKeys = initialKeys.take(3) // pick a subset of keys to delete
val deleteKeys = initialKeys.take(1) // pick a subset of keys to delete
val deleteDf = spark.read.format("hudi").load(basePath).filter(s"_row_key in ('${deleteKeys.mkString("','")}')")
deleteDf.write.format("hudi")
.options(hudiOpts)
Expand Down Expand Up @@ -329,18 +320,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
val basePath = s"${tmp.getCanonicalPath}/$tableName"
// Step 1: Initial Insertion of Records
val dataGen = new HoodieTestDataGenerator()
val initialRecords = recordsToStrings(dataGen.generateInserts(getInstantTime, 50, true)).asScala
val initialDf = spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))
val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ", HoodieWriteConfig.TBL_NAME.key -> tableName)
initialDf.write.format("hudi")
.options(hudiOpts)
.option(OPERATION.key, INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

// Step 2: Create table and secondary index on 'rider' column
spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'")
spark.sql(s"create index idx_rider on $tableName using secondary_index(rider)")
val hudiOpts: Map[String, String] = loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen)

// Verify initial state of secondary index
val initialKeys = spark.sql(s"select _row_key from $tableName limit 5").collect().map(_.getString(0))
Expand All @@ -354,7 +334,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
.options(hudiOpts)
.option(OPERATION.key, operationType)
.mode(SaveMode.Append)
.save(basePath)) (
.save(basePath))(
"Can not perform operation " + WriteOperationType.fromValue(operationType) + " on secondary index")
// disable secondary index and retry
df.write.format("hudi")
Expand All @@ -363,11 +343,117 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
.option(OPERATION.key, operationType)
.mode(SaveMode.Append)
.save(basePath)
dataGen.close()
}
}
}
}

test("Test Secondary Index With Time Travel Query") {
if (HoodieSparkUtils.gteqSpark3_3) {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
// Step 1: Initial Insertion of Records
val dataGen = new HoodieTestDataGenerator()
val numInserts = 5
val hudiOpts: Map[String, String] = loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen, numInserts)

// Verify initial state of secondary index
val initialKeys = spark.sql(s"select _row_key from $tableName limit 5").collect().map(_.getString(0))
validateSecondaryIndex(basePath, tableName, initialKeys)

// Step 3: Perform Update Operations on Subset of Records
val updateRecords = recordsToStrings(dataGen.generateUniqueUpdates(getInstantTime, 1, HoodieTestDataGenerator.TRIP_FLATTENED_SCHEMA)).asScala
val updateDf = spark.read.json(spark.sparkContext.parallelize(updateRecords.toSeq, 1))
val updateKeys = updateDf.select("_row_key").collect().map(_.getString(0))
val recordKeyToUpdate = updateKeys.head
val initialSecondaryKey = spark.sql(
s"SELECT key FROM hudi_metadata('$basePath') WHERE type=7 AND key LIKE '%$SECONDARY_INDEX_RECORD_KEY_SEPARATOR$recordKeyToUpdate'"
).collect().map(indexKey => SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(indexKey.getString(0))).head
// update the record
updateDf.write.format("hudi")
.options(hudiOpts)
.option(OPERATION.key, UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
// Verify secondary index after updates
validateSecondaryIndex(basePath, tableName, updateKeys)

// Step 4: Perform Time Travel Query
// get the first instant on the timeline
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
val firstInstant = metaClient.reloadActiveTimeline().filterCompletedInstants().firstInstant().get()
// do a time travel query with data skipping enabled
val readOpts = hudiOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> "true",
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
)
val timeTravelDF = spark.read.format("hudi")
.options(readOpts)
.option("as.of.instant", firstInstant.requestedTime)
.load(basePath)
assertEquals(numInserts, timeTravelDF.count())
// updated record should still show in time travel view
assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKeyToUpdate'").count())
// rider field (secondary key) should point to previous value
val secondaryKey = timeTravelDF.where(s"_row_key = '$recordKeyToUpdate'").select("rider").collect().head.getString(0)
assertEquals(initialSecondaryKey, secondaryKey)

// Perform Deletes on Records and Validate Secondary Index
val deleteDf = spark.read.format("hudi").load(basePath).filter(s"_row_key in ('${updateKeys.mkString("','")}')")
// Get fileId for the delete record
val deleteFileId = deleteDf.select("_hoodie_file_name").collect().head.getString(0)
deleteDf.write.format("hudi")
.options(hudiOpts)
.option(OPERATION.key, DELETE_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
// Verify secondary index for deletes
validateSecondaryIndex(basePath, tableName, updateKeys, hasDeleteKeys = true)
// Corrupt the data file that was written for the delete key in the first instant
val firstCommitMetadata = deserializeCommitMetadata(metaClient.reloadActiveTimeline().getInstantDetails(firstInstant).get())
val partitionToWriteStats = firstCommitMetadata.getPartitionToWriteStats.asScala.mapValues(_.asScala.toList)
// Find the path for the given fileId
val matchingPath: Option[String] = partitionToWriteStats.values.flatten
.find(_.getFileId == deleteFileId)
.map(_.getPath)
assertTrue(matchingPath.isDefined)
// Corrupt the data file
val dataFile = new StoragePath(basePath, matchingPath.get)
val storage = metaClient.getStorage
storage.deleteFile(dataFile)
storage.createNewFile(dataFile)
// Time travel query should now throw an exception
checkExceptionContain(() => spark.read.format("hudi")
.options(readOpts)
.option("as.of.instant", firstInstant.requestedTime)
.load(basePath).count())(s"${dataFile.toString} is not a Parquet file")

dataGen.close()
}
}
}

private def loadInitialBatchAndCreateSecondaryIndex(tableName: String, basePath: String, dataGen: HoodieTestDataGenerator, numInserts: Integer = 50) = {
val initialRecords = recordsToStrings(dataGen.generateInserts(getInstantTime, numInserts, true)).asScala
val initialDf = spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))
val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ", HoodieWriteConfig.TBL_NAME.key -> tableName)
initialDf.write.format("hudi")
.options(hudiOpts)
.option(OPERATION.key, INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

// Step 2: Create table and secondary index on 'rider' column
spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'")
spark.sql(s"create index idx_rider on $tableName using secondary_index(rider)")
hudiOpts
}

private def validateSecondaryIndex(basePath: String, tableName: String, recordKeys: Array[String], hasDeleteKeys: Boolean = false): Unit = {
// Check secondary index metadata for the selected keys
recordKeys.foreach { key =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
assertResult(true)(hasException)
}

protected def checkExceptionContain(runnable: Runnable)(errorMsg: String): Unit = {
var hasException = false
try {
runnable.run()
} catch {
case e: Throwable if checkMessageContains(e, errorMsg) || checkMessageContains(getRootCause(e), errorMsg) =>
hasException = true

case f: Throwable =>
fail("Exception should contain: " + errorMsg + ", error message: " + f.getMessage, f)
}
assertResult(true)(hasException)
}

protected def checkExceptionContain(sql: String)(errorMsg: String): Unit = {
var hasException = false
try {
Expand Down

0 comments on commit 7b773fc

Please sign in to comment.