Skip to content

Commit

Permalink
[HUDI-8742] Add expression index test with auto keygen (#12517)
Browse files Browse the repository at this point in the history
  • Loading branch information
lokeshj1703 authored Dec 23, 2024
1 parent b71f34d commit ca058f3
Showing 1 changed file with 81 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, Hoodie
import org.apache.hudi.hive.testutils.HiveTestUtil
import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.index.expression.{HoodieExpressionIndex, HoodieSparkExpressionIndex}
import org.apache.hudi.index.expression.HoodieExpressionIndex
import org.apache.hudi.metadata.{HoodieMetadataFileSystemView, MetadataPartitionType}
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH, META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
Expand Down Expand Up @@ -743,11 +743,87 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
}
}

/**
* Test expression index with auto generation of record keys
*/
test("Test Expression Index With AutoGen") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName + s"_stats_pruning_binary_$tableType"
val basePath = s"${tmp.getCanonicalPath}/$tableName"

spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")

spark.sql(
s"""
CREATE TABLE $tableName (
| ts LONG,
| id STRING,
| rider STRING,
| driver STRING,
| fare DOUBLE,
| dateDefault STRING,
| date STRING,
| city STRING,
| state STRING
|) USING HUDI
|options(
| type = '$tableType',
| hoodie.metadata.enable = 'true',
| hoodie.enable.data.skipping = 'true'
|)
|PARTITIONED BY (state)
|location '$basePath'
|""".stripMargin)

spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
}

spark.sql(
s"""
|insert into $tableName(ts, id, rider, driver, fare, dateDefault, date, city, state) VALUES
| (1695414520,'trip2','rider-C','driver-M',27.70,'2024-11-30 01:30:40', '2024-11-30', 'sunnyvale','california'),
| (1699349649,'trip5','rider-A','driver-Q',3.32, '2019-11-30 01:30:40', '2019-11-30', 'san_diego','texas')
|""".stripMargin)
val tableSchema: StructType =
StructType(
Seq(
StructField("ts", LongType),
StructField("id", StringType),
StructField("rider", StringType),
StructField("driver", StringType),
StructField("fare", DoubleType),
StructField("dateDefault", StringType),
StructField("date", StringType),
StructField("city", StringType),
StructField("state", StringType)
)
)

spark.sql(s"create index idx_datestr on $tableName using column_stats(ts) options(expr='from_unixtime')")
checkAnswer(s"select id, rider, from_unixtime(ts) from $tableName where from_unixtime(ts) > '1970-01-03'")(
Seq("trip2", "rider-C", "2023-09-22 20:28:40"),
Seq("trip5", "rider-A", "2023-11-07 09:34:09")
)

// validate pruning
val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", HoodieMetadataConfig.ENABLE.key -> "true")
val metaClient = createMetaClient(spark, basePath)
val fromUnixTime = resolveExpr(spark, unapply(functions.from_unixtime(functions.col("ts"))).get, tableSchema)
val literal = Literal.create("2023-09-22 20:28:40")
val dataFilter = EqualTo(fromUnixTime, literal)
verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected = true)
spark.sql(s"drop index idx_datestr on $tableName")
}
}
}

/**
* Test expression index with invalid options
*/
@Test
def testInvalidOptions(): Unit = {
test("Test Expression Index Creation With Invalid Options") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName + s"_stats_pruning_binary_$tableType"
Expand Down Expand Up @@ -802,8 +878,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
/**
* Test expression index with data skipping for date and timestamp based expressions.
*/
@Test
def testColumnStatsPruningWithDateTimestampExpressions(): Unit = {
test("Test Expression Index Column Stat Pruning With Timestamp Expressions") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName + s"_stats_pruning_date_expr_$tableType"
Expand Down Expand Up @@ -952,8 +1027,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
/**
* Test expression index with data skipping for string expressions.
*/
@Test
def testColumnStatsPruningWithStringExpressions(): Unit = {
test("Test Expression Index Column Stat Pruning With String Expression") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName + s"_stats_pruning_string_expr_$tableType"
Expand Down

0 comments on commit ca058f3

Please sign in to comment.