From 63dfc539ce88974518d48c3917be85f18e45fe05 Mon Sep 17 00:00:00 2001 From: sezruby Date: Fri, 5 Feb 2021 08:42:30 +0900 Subject: [PATCH] Filter index file paths if bucket pruning is applicable --- .../hyperspace/index/rules/RuleUtils.scala | 13 +- .../hyperspace/util/LogicalPlanUtils.scala | 139 +++++++++++++++++- .../index/E2EHyperspaceRulesTest.scala | 100 +++++++++++-- 3 files changed, 236 insertions(+), 16 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 65dcbb398..d3aa2c8f7 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -32,7 +32,7 @@ import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONFIGS, IS_HYBRIDSCAN_CANDIDATE} import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation} -import com.microsoft.hyperspace.util.HyperspaceConf +import com.microsoft.hyperspace.util.{HyperspaceConf, LogicalPlanUtils} object RuleUtils { @@ -273,8 +273,9 @@ object RuleUtils { // Project(A,B) -> Filter(C = 10) -> Index Scan (A,B,C) plan transformDown { case baseRelation @ LogicalRelation(_: HadoopFsRelation, baseOutput, _, _) => + val indexFiles = LogicalPlanUtils.indexFilesSelectedBuecktdOnly(plan, index) val location = index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY) { - new InMemoryFileIndex(spark, index.content.files, Map(), None) + new InMemoryFileIndex(spark, indexFiles, Map(), None) } val relation = new IndexHadoopFsRelation( @@ -353,6 +354,8 @@ object RuleUtils { } } + val indexFiles = LogicalPlanUtils.indexFilesSelectedBuecktdOnly(plan, index) + val filesToRead = { if (useBucketSpec || !index.hasParquetAsSourceFormat || filesDeleted.nonEmpty || location.partitionSchema.nonEmpty) { @@ -367,12 +370,12 @@ object RuleUtils { // If the source relation is partitioned, we cannot read the appended files with the // index data as the schema of partitioned files are not equivalent to the index data. unhandledAppendedFiles = filesAppended - index.content.files + indexFiles } else { // If BucketSpec of index data isn't used (e.g., in the case of FilterIndex currently) // and the source format is parquet, we could read the appended files along // with the index data. - index.content.files ++ filesAppended + indexFiles ++ filesAppended } } @@ -386,7 +389,7 @@ object RuleUtils { def fileIndex: InMemoryFileIndex = new InMemoryFileIndex(spark, filesToRead, Map(), None) - val newLocation = if (filesToRead.length == index.content.files.size) { + val newLocation = if (filesToRead.length == indexFiles.size) { index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY)(fileIndex) } else { index.withCachedTag(plan, IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN)(fileIndex) diff --git a/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala index 0487fd0c5..3977038fa 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala @@ -16,8 +16,17 @@ package com.microsoft.hyperspace.util +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{DoubleType, FloatType} +import org.apache.spark.util.collection.BitSet + +import com.microsoft.hyperspace.index.IndexLogEntry /** * Utility functions for logical plan. @@ -35,4 +44,132 @@ object LogicalPlanUtils { case _ => false } } + + /** + * BucketSelector returns the selected buckets if bucket pruning is applicable for the given + * query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark. + */ + object BucketSelector { + // should prune buckets iff num buckets is greater than 1 and there is only one bucket column + private def shouldPruneBuckets(spec: BucketSpec): Boolean = { + spec.bucketColumnNames.length == 1 && spec.numBuckets > 1 + } + + private def getExpressionBuckets( + expr: Expression, + bucketColumnName: String, + numBuckets: Int): BitSet = { + + def getBucketNumber(attr: Attribute, v: Any): Int = { + BucketingUtils.getBucketIdFromValue(attr, numBuckets, v) + } + + def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = { + val matchedBuckets = new BitSet(numBuckets) + iter + .map(v => getBucketNumber(attr, v)) + .foreach(bucketNum => matchedBuckets.set(bucketNum)) + matchedBuckets + } + + def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = { + val matchedBuckets = new BitSet(numBuckets) + matchedBuckets.set(getBucketNumber(attr, v)) + matchedBuckets + } + + expr match { + case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName => + getBucketSetFromValue(a, v) + case expressions.In(a: Attribute, list) + if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => + getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow))) + case expressions.InSet(a: Attribute, hset) if a.name == bucketColumnName => + getBucketSetFromIterable(a, hset) + case expressions.IsNull(a: Attribute) if a.name == bucketColumnName => + getBucketSetFromValue(a, null) + case expressions.IsNaN(a: Attribute) + if a.name == bucketColumnName && a.dataType == FloatType => + getBucketSetFromValue(a, Float.NaN) + case expressions.IsNaN(a: Attribute) + if a.name == bucketColumnName && a.dataType == DoubleType => + getBucketSetFromValue(a, Double.NaN) + case expressions.And(left, right) => + getExpressionBuckets(left, bucketColumnName, numBuckets) & + getExpressionBuckets(right, bucketColumnName, numBuckets) + case expressions.Or(left, right) => + getExpressionBuckets(left, bucketColumnName, numBuckets) | + getExpressionBuckets(right, bucketColumnName, numBuckets) + case _ => + val matchedBuckets = new BitSet(numBuckets) + matchedBuckets.setUntil(numBuckets) + matchedBuckets + } + } + + private def genBucketSet( + normalizedFilters: Seq[Expression], + bucketSpec: BucketSpec): Option[BitSet] = { + if (normalizedFilters.isEmpty) { + return None + } + + val bucketColumnName = bucketSpec.bucketColumnNames.head + val numBuckets = bucketSpec.numBuckets + + val normalizedFiltersAndExpr = normalizedFilters + .reduce(expressions.And) + val matchedBuckets = + getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets) + + val numBucketsSelected = matchedBuckets.cardinality() + + // None means all the buckets need to be scanned + if (numBucketsSelected == numBuckets) { + None + } else { + Some(matchedBuckets) + } + } + + def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match { + case PhysicalOperation( + projects, + filters, + l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we do not need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => + e transform { + case a: AttributeReference => + a.withName(l.output.find(_.semanticEquals(a)).get.name) + } + } + // subquery expressions are filtered out because they can't be used to prune buckets or + // pushed down as data filters, yet they would be executed + val normalizedFiltersWithoutSubqueries = + normalizedFilters.filterNot(SubqueryExpression.hasSubquery) + + val bucketSet = if (shouldPruneBuckets(bucketSpec)) { + genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec) + } else { + None + } + bucketSet + case _ => None + } + } + + private[hyperspace] def indexFilesSelectedBuecktdOnly( + plan: LogicalPlan, + index: IndexLogEntry): Seq[Path] = { + BucketSelector(plan, index.bucketSpec) match { + case Some(selectedBuckets) => + index.content.files.filter(f => + selectedBuckets.get(BucketingUtils.getBucketId(f.getName).get)) + case None => + index.content.files + } + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index 69b1a4e47..62f65ba0c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.SortExec -import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils} @@ -30,6 +30,7 @@ import com.microsoft.hyperspace.index.IndexLogEntryTags._ import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} +import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector import com.microsoft.hyperspace.util.PathUtils class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { @@ -112,7 +113,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { def query(): DataFrame = df.filter("c3 == 'facebook'").select("c3", "c1") - verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName)) + verifyIndexUsage( + query, + getIndexFilesPathWithBucketSelector( + query().queryExecution.optimizedPlan, + indexConfig.indexName)) } } } @@ -128,7 +133,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { def query(): DataFrame = df.filter("C3 == 'facebook'").select("C3", "c1") // Verify if case-insensitive index works with case-insensitive query. - verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName)) + verifyIndexUsage( + query, + getIndexFilesPathWithBucketSelector( + query().queryExecution.optimizedPlan, + indexConfig.indexName)) } test("E2E test for case sensitive filter query where changing conf changes behavior.") { @@ -145,7 +154,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { } withSQLConf("spark.sql.caseSensitive" -> "false") { - verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName)) + verifyIndexUsage( + query, + getIndexFilesPathWithBucketSelector( + query().queryExecution.optimizedPlan, + indexConfig.indexName)) } } @@ -165,9 +178,12 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1") // Verify no Project node is present in the query plan, as a result of using SELECT * - assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty) + val queryPlan = query().queryExecution.optimizedPlan + assert(queryPlan.collect { case p: Project => p }.isEmpty) - verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName)) + verifyIndexUsage( + query, + getIndexFilesPathWithBucketSelector(queryPlan, indexConfig.indexName)) } } } @@ -388,10 +404,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { spark.enableHyperspace() val dfWithHyperspaceEnabled = query(df) + val planWithHyperspaceEnabled = dfWithHyperspaceEnabled.queryExecution.optimizedPlan verifyQueryPlanHasExpectedRootPaths( - dfWithHyperspaceEnabled.queryExecution.optimizedPlan, - getIndexFilesPath(indexConfig.indexName)) + planWithHyperspaceEnabled, + getIndexFilesPathWithBucketSelector(planWithHyperspaceEnabled, indexConfig.indexName)) assert(schemaWithHyperspaceDisabled.equals(dfWithHyperspaceEnabled.schema)) assert(sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfWithHyperspaceEnabled))) @@ -503,7 +520,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { def query(): DataFrame = spark.read.parquet(testPath).filter("c3 == 'facebook'").select("c3", "c1") - verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName)) + verifyIndexUsage( + query, + getIndexFilesPathWithBucketSelector( + query().queryExecution.optimizedPlan, + indexConfig.indexName)) // Delete some source data file. TestUtils.deleteFiles(testPath, "*parquet", 1) @@ -518,7 +539,12 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL) // Verify index usage on latest version of index (v=1) after refresh. - verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(1))) + verifyIndexUsage( + query, + getIndexFilesPathWithBucketSelector( + query().queryExecution.optimizedPlan, + indexConfig.indexName, + Seq(1))) } } } @@ -951,6 +977,47 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { } } + test("Verify excluding index data file path using bucket pruning.") { + withTempPathAsString { testPath => + // Setup. Create data. + val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4")) + import spark.implicits._ + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(10) + .write + .json(testPath) + val df = spark.read.json(testPath) + + // Create index. + hyperspace.createIndex(df, indexConfig) + spark.enableHyperspace() + + def query(): DataFrame = + df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3") + + withIndex("index") { + val index = TestUtils.latestIndexLogEntry(systemPath, indexConfig.indexName) + val plan = query().queryExecution.optimizedPlan + val buckets = BucketSelector(plan, index.bucketSpec) + assert(buckets.isDefined) + + val locs = getFsLocation(plan) + assert(locs.size == 1) + assert(buckets.get.cardinality() == 3) + + val indexFiles = locs.head.inputFiles + assert(indexFiles.length == buckets.get.cardinality()) + assert(indexFiles.length < index.content.files.length) + + val indexFilesBitIdSet = indexFiles.map(BucketingUtils.getBucketId(_).get).toSet + indexFilesBitIdSet.forall(buckets.get.get(_)) + (1 to index.bucketSpec.numBuckets).forall(n => + !(buckets.get.get(n) ^ indexFilesBitIdSet.contains(n))) + } + } + } + /** * Verify that the query plan has the expected rootPaths. * @@ -980,6 +1047,19 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { }.flatten } + private def getIndexFilesPathWithBucketSelector( + plan: LogicalPlan, + indexName: String, + versions: Seq[Int] = Seq(0)): Seq[Path] = { + val paths = getIndexFilesPath(indexName, versions) + BucketSelector(plan, TestUtils.latestIndexLogEntry(systemPath, indexName).bucketSpec) match { + case Some(buckets) => + paths.filter(f => buckets.get(BucketingUtils.getBucketId(f.getName).get)) + case None => + paths + } + } + private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = { versions.flatMap { v => Content