diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/functional/HoodieSparkFunctionalIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndex.java similarity index 84% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/functional/HoodieSparkFunctionalIndex.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndex.java index dcc4b8f3926ac..5196f67baf3c2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/functional/HoodieSparkFunctionalIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndex.java @@ -17,10 +17,11 @@ * under the License. */ -package org.apache.hudi.index.functional; +package org.apache.hudi; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.index.functional.HoodieFunctionalIndex; import org.apache.spark.sql.Column; import org.apache.spark.sql.functions; @@ -44,69 +45,69 @@ interface SparkFunction extends Serializable { * NOTE: This is not an exhaustive list of spark-sql functions. Only the common date/timestamp and string functions have been added. * Add more functions as needed. However, keep the key should match the exact spark-sql function name in lowercase. */ - private static final Map SPARK_FUNCTION_MAP = CollectionUtils.createImmutableMap( + public static final Map SPARK_FUNCTION_MAP = CollectionUtils.createImmutableMap( // Date/Timestamp functions - Pair.of("date_format", (columns, options) -> { + Pair.of(SPARK_DATE_FORMAT, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("DATE_FORMAT requires 1 column"); } return functions.date_format(columns.get(0), options.get("format")); }), - Pair.of("day", (columns, options) -> { + Pair.of(SPARK_DAY, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("DAY requires 1 column"); } return functions.dayofmonth(columns.get(0)); }), - Pair.of("year", (columns, options) -> { + Pair.of(SPARK_YEAR, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("YEAR requires 1 column"); } return functions.year(columns.get(0)); }), - Pair.of("month", (columns, options) -> { + Pair.of(SPARK_MONTH, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("MONTH requires 1 column"); } return functions.month(columns.get(0)); }), - Pair.of("hour", (columns, options) -> { + Pair.of(SPARK_HOUR, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("HOUR requires 1 column"); } return functions.hour(columns.get(0)); }), - Pair.of("from_unixtime", (columns, options) -> { + Pair.of(SPARK_FROM_UNIXTIME, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("FROM_UNIXTIME requires 1 column"); } return functions.from_unixtime(columns.get(0), options.get("format")); }), - Pair.of("unix_timestamp", (columns, options) -> { + Pair.of(SPARK_UNIX_TIMESTAMP, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("UNIX_TIMESTAMP requires 1 column"); } return functions.unix_timestamp(columns.get(0), options.get("format")); }), - Pair.of("to_date", (columns, options) -> { + Pair.of(SPARK_TO_DATE, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("TO_DATE requires 1 column"); } return functions.to_date(columns.get(0)); }), - Pair.of("to_timestamp", (columns, options) -> { + Pair.of(SPARK_TO_TIMESTAMP, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("TO_TIMESTAMP requires 1 column"); } return functions.to_timestamp(columns.get(0)); }), - Pair.of("date_add", (columns, options) -> { + Pair.of(SPARK_DATE_ADD, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("DATE_ADD requires 1 column"); } return functions.date_add(columns.get(0), Integer.parseInt(options.get("days"))); }), - Pair.of("date_sub", (columns, options) -> { + Pair.of(SPARK_DATE_SUB, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("DATE_SUB requires 1 column"); } @@ -114,73 +115,73 @@ interface SparkFunction extends Serializable { }), // String functions - Pair.of("concat", (columns, options) -> { + Pair.of(SPARK_CONCAT, (columns, options) -> { if (columns.size() < 2) { throw new IllegalArgumentException("CONCAT requires at least 2 columns"); } return functions.concat(columns.toArray(new Column[0])); }), - Pair.of("substring", (columns, options) -> { + Pair.of(SPARK_SUBSTRING, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("SUBSTRING requires 1 column"); } return functions.substring(columns.get(0), Integer.parseInt(options.get("pos")), Integer.parseInt(options.get("len"))); }), - Pair.of("lower", (columns, options) -> { + Pair.of(SPARK_LOWER, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("LOWER requires 1 column"); } return functions.lower(columns.get(0)); }), - Pair.of("upper", (columns, options) -> { + Pair.of(SPARK_UPPER, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("UPPER requires 1 column"); } return functions.upper(columns.get(0)); }), - Pair.of("trim", (columns, options) -> { + Pair.of(SPARK_TRIM, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("TRIM requires 1 column"); } return functions.trim(columns.get(0)); }), - Pair.of("ltrim", (columns, options) -> { + Pair.of(SPARK_LTRIM, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("LTRIM requires 1 column"); } return functions.ltrim(columns.get(0)); }), - Pair.of("rtrim", (columns, options) -> { + Pair.of(SPARK_RTRIM, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("RTRIM requires 1 column"); } return functions.rtrim(columns.get(0)); }), - Pair.of("length", (columns, options) -> { + Pair.of(SPARK_LENGTH, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("LENGTH requires 1 column"); } return functions.length(columns.get(0)); }), - Pair.of("regexp_replace", (columns, options) -> { + Pair.of(SPARK_REGEXP_REPLACE, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("REGEXP_REPLACE requires 1 column"); } return functions.regexp_replace(columns.get(0), options.get("pattern"), options.get("replacement")); }), - Pair.of("regexp_extract", (columns, options) -> { + Pair.of(SPARK_REGEXP_EXTRACT, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("REGEXP_EXTRACT requires 1 column"); } return functions.regexp_extract(columns.get(0), options.get("pattern"), Integer.parseInt(options.get("idx"))); }), - Pair.of("split", (columns, options) -> { + Pair.of(SPARK_SPLIT, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("SPLIT requires 1 column"); } return functions.split(columns.get(0), options.get("pattern")); }), - Pair.of("identity", (columns, options) -> { + Pair.of(SPARK_IDENTITY, (columns, options) -> { if (columns.size() != 1) { throw new IllegalArgumentException("IDENTITY requires 1 column"); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index e02f703a8e366..4feefdc5b5af7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -38,7 +38,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.index.functional.HoodieFunctionalIndex; -import org.apache.hudi.index.functional.HoodieSparkFunctionalIndex; +import org.apache.hudi.HoodieSparkFunctionalIndex; import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.metrics.MetricsReporterType; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkFunctionalIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkFunctionalIndex.java index 85754fe8762fc..afb1419a73b94 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkFunctionalIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkFunctionalIndex.java @@ -19,6 +19,7 @@ package org.apache.hudi.index.functional; +import org.apache.hudi.HoodieSparkFunctionalIndex; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; import org.apache.spark.sql.Column; diff --git a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java index 2465731cc8d24..0ca71cf3aac84 100644 --- a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java @@ -30,6 +30,31 @@ * @param The target type after applying the transformation. Represents the type of the indexed value. */ public interface HoodieFunctionalIndex extends Serializable { + + public static final String SPARK_DATE_FORMAT = "date_format"; + public static final String SPARK_DAY = "day"; + public static final String SPARK_MONTH = "month"; + public static final String SPARK_YEAR = "year"; + public static final String SPARK_HOUR = "hour"; + public static final String SPARK_FROM_UNIXTIME = "from_unixtime"; + public static final String SPARK_UNIX_TIMESTAMP = "unix_timestamp"; + public static final String SPARK_TO_DATE = "to_date"; + public static final String SPARK_TO_TIMESTAMP = "to_timestamp"; + public static final String SPARK_DATE_ADD = "date_add"; + public static final String SPARK_DATE_SUB = "date_sub"; + public static final String SPARK_CONCAT = "concat"; + public static final String SPARK_SUBSTRING = "substring"; + public static final String SPARK_UPPER = "upper"; + public static final String SPARK_LOWER = "lower"; + public static final String SPARK_TRIM = "trim"; + public static final String SPARK_LTRIM = "ltrim"; + public static final String SPARK_RTRIM = "rtrim"; + public static final String SPARK_LENGTH = "length"; + public static final String SPARK_REGEXP_REPLACE = "regexp_replace"; + public static final String SPARK_REGEXP_EXTRACT = "regexp_extract"; + public static final String SPARK_SPLIT = "split"; + public static final String SPARK_IDENTITY = "identity"; + /** * Get the name of the index. * diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala index 0dbfc1193fd45..035eeabb4d593 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala @@ -22,6 +22,7 @@ package org.apache.hudi import org.apache.hadoop.fs.FileStatus import org.apache.hudi.FunctionalIndexSupport._ import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.HoodieSparkFunctionalIndex.SPARK_FUNCTION_MAP import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, HoodieMetadataRecord} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig @@ -113,6 +114,63 @@ class FunctionalIndexSupport(spark: SparkSession, .select(requiredIndexColumns: _*) } + /** + * Searches for an index partition based on the specified index function and target column name. + * + * This method looks up the index definitions available in the metadata of a `metaClient` instance + * and attempts to find an index partition where the index function and the source fields match + * the provided arguments. If a matching index definition is found, the partition identifier for + * that index is returned. + * + * @param queryFilters A sequence of `Expression` objects to analyze. Each expression should involve a single column + * for the method to consider it (expressions involving multiple columns are skipped). + * @return An `Option` containing the index partition identifier if a matching index definition is found. + * Returns `None` if no matching index definition is found. + */ + def getFunctionalIndexPartition(queryFilters: Seq[Expression]): Option[String] = { + val functionToColumnNames = extractSparkFunctionNames(queryFilters) + if (functionToColumnNames.nonEmpty) { + // Currently, only one functional index in the query is supported. HUDI-7620 for supporting multiple functions. + checkState(functionToColumnNames.size == 1, "Currently, only one function with functional index in the query is supported") + val (indexFunction, targetColumnName) = functionToColumnNames.head + val indexDefinitions = metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions + indexDefinitions.asScala.foreach { + case (indexPartition, indexDefinition) => + if (indexDefinition.getIndexFunction.equals(indexFunction) && indexDefinition.getSourceFields.contains(targetColumnName)) { + Option.apply(indexPartition) + } + } + Option.empty + } else { + Option.empty + } + } + + /** + * Extracts mappings from function names to column names from a sequence of expressions. + * + * This method iterates over a given sequence of Spark SQL expressions and identifies expressions + * that contain function calls corresponding to keys in the `SPARK_FUNCTION_MAP`. It supports only + * expressions that are simple binary expressions involving a single column. If an expression contains + * one of the functions and operates on a single column, this method maps the function name to the + * column name. + */ + private def extractSparkFunctionNames(queryFilters: Seq[Expression]): Map[String, String] = { + queryFilters.flatMap { expr => + // Support only simple binary expression on single column + if (expr.references.size == 1) { + val targetColumnName = expr.references.head.name + // Check if the expression string contains any of the function names + val exprString = expr.toString + SPARK_FUNCTION_MAP.asScala.keys + .find(exprString.contains) + .map(functionName => functionName -> targetColumnName) + } else { + None // Skip expressions that do not match the criteria + } + }.toMap + } + def loadFunctionalIndexDataFrame(indexPartition: String, shouldReadInMemory: Boolean): DataFrame = { val colStatsDF = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index affed871cad90..ee18abec5040c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.metadata.HoodieMetadataPayload @@ -344,15 +345,16 @@ case class HoodieFileIndex(spark: SparkSession, lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) lazy val (_, recordKeys) = recordLevelIndex.filterQueriesWithRecordKey(queryFilters) + lazy val functionalIndexPartitionOpt = functionalIndex.getFunctionalIndexPartition(queryFilters) if (!isMetadataTableEnabled || !isDataSkippingEnabled) { validateConfig() Option.empty } else if (recordKeys.nonEmpty) { Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), recordKeys)) - } else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) { - val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) + } else if (functionalIndex.isIndexAvailable && queryFilters.nonEmpty && functionalIndexPartitionOpt.nonEmpty) { val shouldReadInMemory = functionalIndex.shouldReadInMemory(this, queryReferencedColumns) - val indexDf = functionalIndex.loadFunctionalIndexDataFrame("", shouldReadInMemory) + val indexDf = functionalIndex.loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, shouldReadInMemory) + val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames)) } else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty || queryReferencedColumns.isEmpty) { validateConfig() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala index 2a313f704611a..e2a97c9adb023 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala @@ -321,6 +321,62 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase { } } + test("Test Create Functional Index With Data Skipping") { + if (HoodieSparkUtils.gteqSpark3_2) { + withTempDir { tmp => + Seq("cow").foreach { tableType => + val databaseName = "default" + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql("set hoodie.metadata.enable=true") + spark.sql("set hoodie.enable.data.skipping=true") + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | ts long, + | price int + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(price) + | location '$basePath' + """.stripMargin) + spark.sql(s"insert into $tableName (id, name, ts, price) values(1, 'a1', 1000, 10)") + spark.sql(s"insert into $tableName (id, name, ts, price) values(2, 'a2', 200000, 100)") + spark.sql(s"insert into $tableName (id, name, ts, price) values(3, 'a3', 2000000000, 1000)") + + var metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + + var createIndexSql = s"create index idx_datestr on $tableName using column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')" + + spark.sql(createIndexSql) + spark.sql(s"select key, type, ColumnStatsMetadata from hudi_metadata('$tableName') where type = 3").show(false) + + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + assertTrue(metaClient.getFunctionalIndexMetadata.isPresent) + var functionalIndexMetadata = metaClient.getFunctionalIndexMetadata.get() + assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size()) + assertEquals("func_index_idx_datestr", functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName) + + checkAnswer(s"select id, name, price, ts, from_unixtime(ts, 'yyyy-MM-dd') from $tableName where from_unixtime(ts, 'yyyy-MM-dd') < '1970-01-03'")( + Seq(1, "a1", 10, 1000, "1970-01-01") + ) + } + } + } + } + private def assertTableIdentifier(catalogTable: CatalogTable, expectedDatabaseName: String, expectedTableName: String): Unit = {