Skip to content

Commit

Permalink
[HUDI-7582] Fix functional index lookup (apache#11021)
Browse files Browse the repository at this point in the history
The read path for functional index leads to a NPE which causes 
file-pruning to be skipped entirely. This PR fixes the issue as follows:
1. Parse query filter to extrat function name and target column
2. Check functional index definiton and if there is an index, then use that to skip files.

---------

Co-authored-by: Vinaykumar Bhat <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
  • Loading branch information
3 people authored Apr 16, 2024
1 parent c54be84 commit b4451c0
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* under the License.
*/

package org.apache.hudi.index.functional;
package org.apache.hudi;

import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.functional.HoodieFunctionalIndex;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.functions;
Expand All @@ -44,143 +45,143 @@ interface SparkFunction extends Serializable {
* NOTE: This is not an exhaustive list of spark-sql functions. Only the common date/timestamp and string functions have been added.
* Add more functions as needed. However, keep the key should match the exact spark-sql function name in lowercase.
*/
private static final Map<String, SparkFunction> SPARK_FUNCTION_MAP = CollectionUtils.createImmutableMap(
public static final Map<String, SparkFunction> SPARK_FUNCTION_MAP = CollectionUtils.createImmutableMap(
// Date/Timestamp functions
Pair.of("date_format", (columns, options) -> {
Pair.of(SPARK_DATE_FORMAT, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("DATE_FORMAT requires 1 column");
}
return functions.date_format(columns.get(0), options.get("format"));
}),
Pair.of("day", (columns, options) -> {
Pair.of(SPARK_DAY, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("DAY requires 1 column");
}
return functions.dayofmonth(columns.get(0));
}),
Pair.of("year", (columns, options) -> {
Pair.of(SPARK_YEAR, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("YEAR requires 1 column");
}
return functions.year(columns.get(0));
}),
Pair.of("month", (columns, options) -> {
Pair.of(SPARK_MONTH, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("MONTH requires 1 column");
}
return functions.month(columns.get(0));
}),
Pair.of("hour", (columns, options) -> {
Pair.of(SPARK_HOUR, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("HOUR requires 1 column");
}
return functions.hour(columns.get(0));
}),
Pair.of("from_unixtime", (columns, options) -> {
Pair.of(SPARK_FROM_UNIXTIME, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("FROM_UNIXTIME requires 1 column");
}
return functions.from_unixtime(columns.get(0), options.get("format"));
}),
Pair.of("unix_timestamp", (columns, options) -> {
Pair.of(SPARK_UNIX_TIMESTAMP, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("UNIX_TIMESTAMP requires 1 column");
}
return functions.unix_timestamp(columns.get(0), options.get("format"));
}),
Pair.of("to_date", (columns, options) -> {
Pair.of(SPARK_TO_DATE, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("TO_DATE requires 1 column");
}
return functions.to_date(columns.get(0));
}),
Pair.of("to_timestamp", (columns, options) -> {
Pair.of(SPARK_TO_TIMESTAMP, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("TO_TIMESTAMP requires 1 column");
}
return functions.to_timestamp(columns.get(0));
}),
Pair.of("date_add", (columns, options) -> {
Pair.of(SPARK_DATE_ADD, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("DATE_ADD requires 1 column");
}
return functions.date_add(columns.get(0), Integer.parseInt(options.get("days")));
}),
Pair.of("date_sub", (columns, options) -> {
Pair.of(SPARK_DATE_SUB, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("DATE_SUB requires 1 column");
}
return functions.date_sub(columns.get(0), Integer.parseInt(options.get("days")));
}),

// String functions
Pair.of("concat", (columns, options) -> {
Pair.of(SPARK_CONCAT, (columns, options) -> {
if (columns.size() < 2) {
throw new IllegalArgumentException("CONCAT requires at least 2 columns");
}
return functions.concat(columns.toArray(new Column[0]));
}),
Pair.of("substring", (columns, options) -> {
Pair.of(SPARK_SUBSTRING, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("SUBSTRING requires 1 column");
}
return functions.substring(columns.get(0), Integer.parseInt(options.get("pos")), Integer.parseInt(options.get("len")));
}),
Pair.of("lower", (columns, options) -> {
Pair.of(SPARK_LOWER, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("LOWER requires 1 column");
}
return functions.lower(columns.get(0));
}),
Pair.of("upper", (columns, options) -> {
Pair.of(SPARK_UPPER, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("UPPER requires 1 column");
}
return functions.upper(columns.get(0));
}),
Pair.of("trim", (columns, options) -> {
Pair.of(SPARK_TRIM, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("TRIM requires 1 column");
}
return functions.trim(columns.get(0));
}),
Pair.of("ltrim", (columns, options) -> {
Pair.of(SPARK_LTRIM, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("LTRIM requires 1 column");
}
return functions.ltrim(columns.get(0));
}),
Pair.of("rtrim", (columns, options) -> {
Pair.of(SPARK_RTRIM, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("RTRIM requires 1 column");
}
return functions.rtrim(columns.get(0));
}),
Pair.of("length", (columns, options) -> {
Pair.of(SPARK_LENGTH, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("LENGTH requires 1 column");
}
return functions.length(columns.get(0));
}),
Pair.of("regexp_replace", (columns, options) -> {
Pair.of(SPARK_REGEXP_REPLACE, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("REGEXP_REPLACE requires 1 column");
}
return functions.regexp_replace(columns.get(0), options.get("pattern"), options.get("replacement"));
}),
Pair.of("regexp_extract", (columns, options) -> {
Pair.of(SPARK_REGEXP_EXTRACT, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("REGEXP_EXTRACT requires 1 column");
}
return functions.regexp_extract(columns.get(0), options.get("pattern"), Integer.parseInt(options.get("idx")));
}),
Pair.of("split", (columns, options) -> {
Pair.of(SPARK_SPLIT, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("SPLIT requires 1 column");
}
return functions.split(columns.get(0), options.get("pattern"));
}),
Pair.of("identity", (columns, options) -> {
Pair.of(SPARK_IDENTITY, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("IDENTITY requires 1 column");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.functional.HoodieFunctionalIndex;
import org.apache.hudi.index.functional.HoodieSparkFunctionalIndex;
import org.apache.hudi.HoodieSparkFunctionalIndex;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.MetricsReporterType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.index.functional;

import org.apache.hudi.HoodieSparkFunctionalIndex;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;

import org.apache.spark.sql.Column;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,31 @@
* @param <T> The target type after applying the transformation. Represents the type of the indexed value.
*/
public interface HoodieFunctionalIndex<S, T> extends Serializable {

public static final String SPARK_DATE_FORMAT = "date_format";
public static final String SPARK_DAY = "day";
public static final String SPARK_MONTH = "month";
public static final String SPARK_YEAR = "year";
public static final String SPARK_HOUR = "hour";
public static final String SPARK_FROM_UNIXTIME = "from_unixtime";
public static final String SPARK_UNIX_TIMESTAMP = "unix_timestamp";
public static final String SPARK_TO_DATE = "to_date";
public static final String SPARK_TO_TIMESTAMP = "to_timestamp";
public static final String SPARK_DATE_ADD = "date_add";
public static final String SPARK_DATE_SUB = "date_sub";
public static final String SPARK_CONCAT = "concat";
public static final String SPARK_SUBSTRING = "substring";
public static final String SPARK_UPPER = "upper";
public static final String SPARK_LOWER = "lower";
public static final String SPARK_TRIM = "trim";
public static final String SPARK_LTRIM = "ltrim";
public static final String SPARK_RTRIM = "rtrim";
public static final String SPARK_LENGTH = "length";
public static final String SPARK_REGEXP_REPLACE = "regexp_replace";
public static final String SPARK_REGEXP_EXTRACT = "regexp_extract";
public static final String SPARK_SPLIT = "split";
public static final String SPARK_IDENTITY = "identity";

/**
* Get the name of the index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.hudi
import org.apache.hadoop.fs.FileStatus
import org.apache.hudi.FunctionalIndexSupport._
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieSparkFunctionalIndex.SPARK_FUNCTION_MAP
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, HoodieMetadataRecord}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
Expand Down Expand Up @@ -113,6 +114,63 @@ class FunctionalIndexSupport(spark: SparkSession,
.select(requiredIndexColumns: _*)
}

/**
* Searches for an index partition based on the specified index function and target column name.
*
* This method looks up the index definitions available in the metadata of a `metaClient` instance
* and attempts to find an index partition where the index function and the source fields match
* the provided arguments. If a matching index definition is found, the partition identifier for
* that index is returned.
*
* @param queryFilters A sequence of `Expression` objects to analyze. Each expression should involve a single column
* for the method to consider it (expressions involving multiple columns are skipped).
* @return An `Option` containing the index partition identifier if a matching index definition is found.
* Returns `None` if no matching index definition is found.
*/
def getFunctionalIndexPartition(queryFilters: Seq[Expression]): Option[String] = {
val functionToColumnNames = extractSparkFunctionNames(queryFilters)
if (functionToColumnNames.nonEmpty) {
// Currently, only one functional index in the query is supported. HUDI-7620 for supporting multiple functions.
checkState(functionToColumnNames.size == 1, "Currently, only one function with functional index in the query is supported")
val (indexFunction, targetColumnName) = functionToColumnNames.head
val indexDefinitions = metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions
indexDefinitions.asScala.foreach {
case (indexPartition, indexDefinition) =>
if (indexDefinition.getIndexFunction.equals(indexFunction) && indexDefinition.getSourceFields.contains(targetColumnName)) {
Option.apply(indexPartition)
}
}
Option.empty
} else {
Option.empty
}
}

/**
* Extracts mappings from function names to column names from a sequence of expressions.
*
* This method iterates over a given sequence of Spark SQL expressions and identifies expressions
* that contain function calls corresponding to keys in the `SPARK_FUNCTION_MAP`. It supports only
* expressions that are simple binary expressions involving a single column. If an expression contains
* one of the functions and operates on a single column, this method maps the function name to the
* column name.
*/
private def extractSparkFunctionNames(queryFilters: Seq[Expression]): Map[String, String] = {
queryFilters.flatMap { expr =>
// Support only simple binary expression on single column
if (expr.references.size == 1) {
val targetColumnName = expr.references.head.name
// Check if the expression string contains any of the function names
val exprString = expr.toString
SPARK_FUNCTION_MAP.asScala.keys
.find(exprString.contains)
.map(functionName => functionName -> targetColumnName)
} else {
None // Skip expressions that do not match the criteria
}
}.toMap
}

def loadFunctionalIndexDataFrame(indexPartition: String,
shouldReadInMemory: Boolean): DataFrame = {
val colStatsDF = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metadata.HoodieMetadataPayload
Expand Down Expand Up @@ -344,15 +345,16 @@ case class HoodieFileIndex(spark: SparkSession,

lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
lazy val (_, recordKeys) = recordLevelIndex.filterQueriesWithRecordKey(queryFilters)
lazy val functionalIndexPartitionOpt = functionalIndex.getFunctionalIndexPartition(queryFilters)
if (!isMetadataTableEnabled || !isDataSkippingEnabled) {
validateConfig()
Option.empty
} else if (recordKeys.nonEmpty) {
Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), recordKeys))
} else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) {
val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
} else if (functionalIndex.isIndexAvailable && queryFilters.nonEmpty && functionalIndexPartitionOpt.nonEmpty) {
val shouldReadInMemory = functionalIndex.shouldReadInMemory(this, queryReferencedColumns)
val indexDf = functionalIndex.loadFunctionalIndexDataFrame("", shouldReadInMemory)
val indexDf = functionalIndex.loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, shouldReadInMemory)
val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
} else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty || queryReferencedColumns.isEmpty) {
validateConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,62 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
}
}

test("Test Create Functional Index With Data Skipping") {
if (HoodieSparkUtils.gteqSpark3_2) {
withTempDir { tmp =>
Seq("cow").foreach { tableType =>
val databaseName = "default"
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql("set hoodie.metadata.enable=true")
spark.sql("set hoodie.enable.data.skipping=true")
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| ts long,
| price int
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(price)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName (id, name, ts, price) values(1, 'a1', 1000, 10)")
spark.sql(s"insert into $tableName (id, name, ts, price) values(2, 'a2', 200000, 100)")
spark.sql(s"insert into $tableName (id, name, ts, price) values(3, 'a3', 2000000000, 1000)")

var metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(spark.sessionState.newHadoopConf())
.build()

var createIndexSql = s"create index idx_datestr on $tableName using column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')"

spark.sql(createIndexSql)
spark.sql(s"select key, type, ColumnStatsMetadata from hudi_metadata('$tableName') where type = 3").show(false)

metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(spark.sessionState.newHadoopConf())
.build()
assertTrue(metaClient.getFunctionalIndexMetadata.isPresent)
var functionalIndexMetadata = metaClient.getFunctionalIndexMetadata.get()
assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
assertEquals("func_index_idx_datestr", functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)

checkAnswer(s"select id, name, price, ts, from_unixtime(ts, 'yyyy-MM-dd') from $tableName where from_unixtime(ts, 'yyyy-MM-dd') < '1970-01-03'")(
Seq(1, "a1", 10, 1000, "1970-01-01")
)
}
}
}
}

private def assertTableIdentifier(catalogTable: CatalogTable,
expectedDatabaseName: String,
expectedTableName: String): Unit = {
Expand Down

0 comments on commit b4451c0

Please sign in to comment.