From 04aace931aa8d3a5a4b82ae0d1b01981be19f913 Mon Sep 17 00:00:00 2001 From: sezruby Date: Sat, 6 Feb 2021 18:13:47 +0900 Subject: [PATCH] Add adaptive bucketSpec feature --- .../hyperspace/index/IndexConstants.scala | 8 ++ .../index/rules/FilterIndexRule.scala | 9 +- .../hyperspace/util/HyperspaceConf.scala | 8 ++ .../hyperspace/util/LogicalPlanUtils.scala | 131 +++++++++++++++++- .../index/E2EHyperspaceRulesTest.scala | 129 ++++++++++++++++- .../hyperspace/index/HybridScanSuite.scala | 1 + .../index/plananalysis/ExplainTest.scala | 3 +- 7 files changed, 281 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index e4e930358..75ce15662 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -52,6 +52,14 @@ object IndexConstants { val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec" val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false" + // If this config is true, Hyperspace generates a plan with bucketSpec first and check + // the selectivity of the filter query by creating the physical plan in advance. + // If less than half number of buckets are selected, Filter Rule uses the plan with bucketSpec. + // Otherwise, newly generated bucketSpec is not used for Filter Rule. + val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED = + "spark.hyperspace.index.filterRule.bucketCheck.enabled" + val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT = "true" + // Identifier injected to HadoopFsRelation as an option if an index is applied. // Currently, the identifier is added to options field of HadoopFsRelation. // In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala index 7bcb4e58a..d8f66443b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala @@ -29,6 +29,7 @@ import com.microsoft.hyperspace.index.IndexLogEntry import com.microsoft.hyperspace.index.rankers.FilterIndexRanker import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} +import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector /** * FilterIndex rule looks for opportunities in a logical plan to replace @@ -56,6 +57,11 @@ object FilterIndexRule findCoveringIndexes(filter, outputColumns, filterColumns) FilterIndexRanker.rank(spark, filter, candidateIndexes) match { case Some(index) => + val useBucketSpec = if (HyperspaceConf.filterRuleBucketCheckEnabled(spark)) { + BucketSelector(plan, index.bucketSpec).isDefined + } else { + HyperspaceConf.useBucketSpecForFilterRule(spark) + } // As FilterIndexRule is not intended to support bucketed scan, we set // useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause // unnecessary shuffle for appended data to apply BucketUnion for merging data. @@ -64,7 +70,7 @@ object FilterIndexRule spark, index, originalPlan, - useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark), + useBucketSpec = useBucketSpec, useBucketUnionForAppended = false) logEvent( HyperspaceIndexUsageEvent( @@ -136,7 +142,6 @@ object FilterIndexRule * @param filterColumns List of columns in filter predicate. * @param indexedColumns List of indexed columns (e.g. from an index being checked) * @param includedColumns List of included columns (e.g. from an index being checked) - * @param fileFormat FileFormat for input relation in original logical plan. * @return 'true' if * 1. Index fully covers output and filter columns, and * 2. Filter predicate contains first column in index's 'indexed' columns. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index f587544b3..b899d0438 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -68,6 +68,14 @@ object HyperspaceConf { .toBoolean } + def filterRuleBucketCheckEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED, + IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT) + .toBoolean + } + def numBucketsForIndex(spark: SparkSession): Int = { getConfStringWithMultipleKeys( spark, diff --git a/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala index 0487fd0c5..b07e4cc4e 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala @@ -16,8 +16,18 @@ package com.microsoft.hyperspace.util +import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{DoubleType, FloatType} +import org.apache.spark.util.collection.BitSet + +import com.microsoft.hyperspace.index.IndexLogEntry /** * Utility functions for logical plan. @@ -35,4 +45,123 @@ object LogicalPlanUtils { case _ => false } } + + /** + * BucketSelector returns the selected buckets if bucket pruning is applicable for the given + * query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark. + */ + object BucketSelector { + // should prune buckets iff num buckets is greater than 1 and there is only one bucket column + private def shouldPruneBuckets(spec: BucketSpec): Boolean = { + spec.bucketColumnNames.length == 1 && spec.numBuckets > 1 + } + + private def getExpressionBuckets( + expr: Expression, + bucketColumnName: String, + numBuckets: Int): BitSet = { + + def getBucketNumber(attr: Attribute, v: Any): Int = { + BucketingUtils.getBucketIdFromValue(attr, numBuckets, v) + } + + def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = { + val matchedBuckets = new BitSet(numBuckets) + iter + .map(v => getBucketNumber(attr, v)) + .foreach(bucketNum => matchedBuckets.set(bucketNum)) + matchedBuckets + } + + def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = { + val matchedBuckets = new BitSet(numBuckets) + matchedBuckets.set(getBucketNumber(attr, v)) + matchedBuckets + } + + expr match { + case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName => + getBucketSetFromValue(a, v) + case expressions.In(a: Attribute, list) + if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => + getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow))) + case expressions.InSet(a: Attribute, hset) + if hset.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => + // NOTE: Spark bug - https://issues.apache.org/jira/browse/SPARK-33372 + // Bucket pruning is not applied for InSet without the fix. + getBucketSetFromIterable(a, hset) + case expressions.IsNull(a: Attribute) if a.name == bucketColumnName => + getBucketSetFromValue(a, null) + case expressions.IsNaN(a: Attribute) + if a.name == bucketColumnName && a.dataType == FloatType => + getBucketSetFromValue(a, Float.NaN) + case expressions.IsNaN(a: Attribute) + if a.name == bucketColumnName && a.dataType == DoubleType => + getBucketSetFromValue(a, Double.NaN) + case expressions.And(left, right) => + getExpressionBuckets(left, bucketColumnName, numBuckets) & + getExpressionBuckets(right, bucketColumnName, numBuckets) + case expressions.Or(left, right) => + getExpressionBuckets(left, bucketColumnName, numBuckets) | + getExpressionBuckets(right, bucketColumnName, numBuckets) + case _ => + val matchedBuckets = new BitSet(numBuckets) + matchedBuckets.setUntil(numBuckets) + matchedBuckets + } + } + + private def genBucketSet( + normalizedFilters: Seq[Expression], + bucketSpec: BucketSpec): Option[BitSet] = { + if (normalizedFilters.isEmpty) { + return None + } + + val bucketColumnName = bucketSpec.bucketColumnNames.head + val numBuckets = bucketSpec.numBuckets + + val normalizedFiltersAndExpr = normalizedFilters + .reduce(expressions.And) + val matchedBuckets = + getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets) + + val numBucketsSelected = matchedBuckets.cardinality() + + // None means all the buckets need to be scanned + if (numBucketsSelected == numBuckets) { + None + } else { + Some(matchedBuckets) + } + } + + def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match { + case PhysicalOperation( + projects, + filters, + l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we do not need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => + e transform { + case a: AttributeReference => + a.withName(l.output.find(_.semanticEquals(a)).get.name) + } + } + // subquery expressions are filtered out because they can't be used to prune buckets or + // pushed down as data filters, yet they would be executed + val normalizedFiltersWithoutSubqueries = + normalizedFilters.filterNot(SubqueryExpression.hasSubquery) + + val bucketSet = if (shouldPruneBuckets(bucketSpec)) { + genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec) + } else { + None + } + bucketSet + case _ => None + } + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index 69b1a4e47..02729267a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -20,8 +20,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.SortExec -import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, UnionExec} +import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils} @@ -30,6 +30,7 @@ import com.microsoft.hyperspace.index.IndexLogEntryTags._ import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} +import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector import com.microsoft.hyperspace.util.PathUtils class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { @@ -165,7 +166,8 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1") // Verify no Project node is present in the query plan, as a result of using SELECT * - assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty) + val queryPlan = query().queryExecution.optimizedPlan + assert(queryPlan.collect { case p: Project => p }.isEmpty) verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName)) } @@ -581,6 +583,112 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { } } + test("Verify adaptive bucket spec application for FilterIndexRule.") { + withTempPathAsString { testPath => + // Setup. Create data. + val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4")) + import spark.implicits._ + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(10) + .write + .json(testPath) + val df = spark.read.json(testPath) + + // Create index. + hyperspace.createIndex(df, indexConfig) + spark.enableHyperspace() + + def query(): DataFrame = + df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3") + + withIndex("index") { + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") { + // Check bucketSpec is applied. + val execPlan = query.queryExecution.executedPlan + val foundPrunedBuckets = execPlan.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) => + optionalBucketSet.get.cardinality() + } + assert(foundPrunedBuckets.length == 1) + assert(foundPrunedBuckets.head == 3) + + verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0))) + } + + // TODO: because of SPARK-33372, bucket pruning is not applied for InSet operator. + // As indexes are bucketed, supporting bucket pruning can improve the performance of + // queries with high selectivity. Will add a new FileSourceScanStrategy soon. + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") { + val execPlan = query().queryExecution.executedPlan + // Check bucketSpec is not applied. + val foundPrunedBuckets = execPlan.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if (optionalBucketSet.isDefined) => + optionalBucketSet.get.cardinality() + } + assert(foundPrunedBuckets.isEmpty) + verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0))) + } + + // Append to original data. + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(3) + .write + .mode("append") + .json(testPath) + + val df2 = spark.read.json(testPath) + val inputFiles = df.inputFiles + val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_)) + def query2(): DataFrame = { + df2.filter(df2("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3") + } + + withSQLConf(TestConfig.HybridScanEnabled: _*) { + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") { + // Check bucketSpec is applied. + val execPlan = query2().queryExecution.executedPlan + val foundPrunedBuckets = execPlan.collect { + case _ @UnionExec(children) => + val p = children.head.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if optionalBucketSet.isDefined => + optionalBucketSet.get.cardinality() + } + p.head + } + assert(foundPrunedBuckets.length == 1) + assert(foundPrunedBuckets.head == 3) + verifyIndexUsage( + query2, + getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq) + } + + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") { + val execPlan = query2().queryExecution.executedPlan + // Check bucketSpec is not applied. + val foundPrunedBuckets = execPlan.collect { + case _ @ UnionExec(children) => + val b = children.head.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if optionalBucketSet.isDefined => + optionalBucketSet.get.cardinality() + } + assert(b.isEmpty) + true + } + assert(foundPrunedBuckets.length == 1) + verifyIndexUsage( + query2, + getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq) + } + } + } + } + } + test( "Verify JoinIndexRule utilizes indexes correctly after quick refresh when some file " + "gets deleted and some appended to source data.") { @@ -733,7 +841,7 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { withIndex(indexConfig.indexName) { spark.enableHyperspace() val df = spark.read.parquet(testPath) - def query(df: DataFrame): DataFrame = df.filter("c3 == 'facebook'").select("c3", "c4") + def query(df: DataFrame): DataFrame = df.filter("c3 != 'facebook'").select("c3", "c4") val indexManager = Hyperspace .getContext(spark) @@ -980,6 +1088,19 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { }.flatten } + private def getIndexFilesPathWithBucketSelector( + plan: LogicalPlan, + indexName: String, + versions: Seq[Int] = Seq(0)): Seq[Path] = { + val paths = getIndexFilesPath(indexName, versions) + BucketSelector(plan, TestUtils.latestIndexLogEntry(systemPath, indexName).bucketSpec) match { + case Some(buckets) => + paths.filter(f => buckets.get(BucketingUtils.getBucketId(f.getName).get)) + case None => + paths + } + } + private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = { versions.flatMap { v => Content diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala index 3e2771882..84ebbe31c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala @@ -502,6 +502,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { withSQLConf( TestConfig.HybridScanEnabledAppendOnly :+ + IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED -> "false" :+ IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) { val filter = filterQuery val planWithHybridScan = filter.queryExecution.optimizedPlan diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index a62b78428..76e553b06 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -485,7 +485,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append("Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]")) .append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ") - .append("ReadSchema: struct" + displayMode.highlightTag.close) + .append("ReadSchema: struct, SelectedBucketsCount: 1 out of 200") + .append(displayMode.highlightTag.close) .append(displayMode.newLine) .append(displayMode.newLine) .append("=============================================================")