diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index cce0d39e8..6184aadb3 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -52,6 +52,14 @@ object IndexConstants { val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec" val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false" + // If this config is true, Hyperspace generates a plan with bucketSpec first and check + // the selectivity of the filter query by creating the physical plan in advance. + // If less than half number of buckets are selected, Filter Rule uses the plan with bucketSpec. + // Otherwise, newly generated bucketSpec is not used for Filter Rule. + val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED = + "spark.hyperspace.index.filterRule.bucketCheck.enabled" + val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT = "true" + // Identifier injected to HadoopFsRelation as an option if an index is applied. // Currently, the identifier is added to options field of HadoopFsRelation. // In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala index 89d074afe..9dc5853ef 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala @@ -18,9 +18,11 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.CleanupAliases -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Literal} +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec, UnionExec} import org.apache.spark.sql.execution.datasources._ import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} @@ -56,14 +58,46 @@ object FilterIndexRule findCoveringIndexes(filter, outputColumns, filterColumns) FilterIndexRanker.rank(spark, filter, candidateIndexes) match { case Some(index) => - // Do not set BucketSpec to avoid limiting Spark's degree of parallelism. - val transformedPlan = + val transformedPlan = if (HyperspaceConf.filterRuleBucketCheckEnabled(spark)) { + // Test bucket spec first. + val transformedPlanWithBucketSpec = + RuleUtils.transformPlanToUseIndex( + spark, + index, + originalPlan, + useBucketSpec = true, + useBucketUnionForAppended = false) + val fileSourceScanNodes = if (transformedPlanWithBucketSpec.isInstanceOf[Union]) { + FileSourceStrategy(transformedPlanWithBucketSpec.children.head) + } else { + FileSourceStrategy(transformedPlanWithBucketSpec) + } + val foundPrunedBuckets = fileSourceScanNodes.head.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if optionalBucketSet.isDefined && (optionalBucketSet.get + .cardinality() * 1.0 < index.bucketSpec.numBuckets * 0.8) => + optionalBucketSet.get.cardinality() + } + + if (foundPrunedBuckets.nonEmpty) { + transformedPlanWithBucketSpec + } else { + RuleUtils.transformPlanToUseIndex( + spark, + index, + originalPlan, + useBucketSpec = false, + useBucketUnionForAppended = false) + } + } else { RuleUtils.transformPlanToUseIndex( spark, index, originalPlan, useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark), useBucketUnionForAppended = false) + } + logEvent( HyperspaceIndexUsageEvent( AppInfo( @@ -134,7 +168,6 @@ object FilterIndexRule * @param filterColumns List of columns in filter predicate. * @param indexedColumns List of indexed columns (e.g. from an index being checked) * @param includedColumns List of included columns (e.g. from an index being checked) - * @param fileFormat FileFormat for input relation in original logical plan. * @return 'true' if * 1. Index fully covers output and filter columns, and * 2. Filter predicate contains first column in index's 'indexed' columns. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index f587544b3..b899d0438 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -68,6 +68,14 @@ object HyperspaceConf { .toBoolean } + def filterRuleBucketCheckEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED, + IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT) + .toBoolean + } + def numBucketsForIndex(spark: SparkSession): Int = { getConfStringWithMultipleKeys( spark, diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index 313e3cc50..c1af08cb1 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -20,7 +20,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.SortExec +import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, UnionExec} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -579,6 +579,112 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { } } + test("Verify adaptive bucket spec application for FilterIndexRule.") { + withTempPathAsString { testPath => + // Setup. Create data. + val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4")) + import spark.implicits._ + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(10) + .write + .json(testPath) + val df = spark.read.json(testPath) + + // Create index. + hyperspace.createIndex(df, indexConfig) + spark.enableHyperspace() + + def query(): DataFrame = + df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3") + + withIndex("index") { + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") { + // Check bucketSpec is applied. + val execPlan = query.queryExecution.executedPlan + val foundPrunedBuckets = execPlan.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) => + optionalBucketSet.get.cardinality() + } + assert(foundPrunedBuckets.length == 1) + assert(foundPrunedBuckets.head == 3) + + verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0))) + } + + // TODO: because of SPARK-33372, bucket pruning is not applied for InSet operator. + // As indexes are bucketed, supporting bucket pruning can improve the performance of + // queries with high selectivity. Will add a new FileSourceScanStrategy soon. + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") { + val execPlan = query().queryExecution.executedPlan + // Check bucketSpec is not applied. + val foundPrunedBuckets = execPlan.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if (optionalBucketSet.isDefined) => + optionalBucketSet.get.cardinality() + } + assert(foundPrunedBuckets.isEmpty) + verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0))) + } + + // Append to original data. + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(3) + .write + .mode("append") + .json(testPath) + + val df2 = spark.read.json(testPath) + val inputFiles = df.inputFiles + val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_)) + def query2(): DataFrame = { + df2.filter(df2("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3") + } + + withSQLConf(TestConfig.HybridScanEnabled: _*) { + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") { + // Check bucketSpec is applied. + val execPlan = query2().queryExecution.executedPlan + val foundPrunedBuckets = execPlan.collect { + case _ @UnionExec(children) => + val p = children.head.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if optionalBucketSet.isDefined => + optionalBucketSet.get.cardinality() + } + p.head + } + assert(foundPrunedBuckets.length == 1) + assert(foundPrunedBuckets.head == 3) + verifyIndexUsage( + query2, + getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq) + } + + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") { + val execPlan = query2().queryExecution.executedPlan + // Check bucketSpec is not applied. + val foundPrunedBuckets = execPlan.collect { + case _ @ UnionExec(children) => + val b = children.head.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if optionalBucketSet.isDefined => + optionalBucketSet.get.cardinality() + } + assert(b.isEmpty) + true + } + assert(foundPrunedBuckets.length == 1) + verifyIndexUsage( + query2, + getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq) + } + } + } + } + } + test( "Verify JoinIndexRule utilizes indexes correctly after quick refresh when some file " + "gets deleted and some appended to source data.") { @@ -670,13 +776,13 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { // Refreshed index as quick mode can be applied with Hybrid Scan config. withSQLConf(TestConfig.HybridScanEnabled: _*) { - spark.disableHyperspace() - val dfWithHyperspaceDisabled = query() - val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan - spark.enableHyperspace() - val dfWithHyperspaceEnabled = query() - assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan)) - checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled) + spark.disableHyperspace() + val dfWithHyperspaceDisabled = query() + val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan + spark.enableHyperspace() + val dfWithHyperspaceEnabled = query() + assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan)) + checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled) } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala index aa6091a7d..c981edcca 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala @@ -502,6 +502,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { withSQLConf( TestConfig.HybridScanEnabledAppendOnly :+ + IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED -> "false" :+ IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) { val filter = filterQuery val planWithHybridScan = filter.queryExecution.optimizedPlan diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index 4234b0400..f77ad0007 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -313,7 +313,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(" Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") - .append("ReadSchema: struct---->") + .append("ReadSchema: struct, SelectedBucketsCount: 1 out of 200---->") .append(displayMode.newLine) .append(" +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") + @@ -329,7 +329,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { "Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") - .append("ReadSchema: struct---->") + .append("ReadSchema: struct, SelectedBucketsCount: 1 out of 200---->") .append(displayMode.newLine) .append(displayMode.newLine) .append("=============================================================") @@ -655,7 +655,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append("Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]")) .append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ") - .append("ReadSchema: struct" + displayMode.highlightTag.close) + .append("ReadSchema: struct, SelectedBucketsCount: 1 out of 200" + displayMode.highlightTag.close) .append(displayMode.newLine) .append(displayMode.newLine) .append("=============================================================")