Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Support adaptive bucketed scan for FilterIndexRule #332

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ object IndexConstants {
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"

// If INDEX_FILTER_RULE_USE_BUCKET_SPEC config is false, Hyperspace applies bucketing if
// the given filter query is applicable for bucket pruning AND the number of selected buckets
// is under this threshold.
val INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD =
"spark.hyperspace.index.filterRule.autoBucketing.threshold"
val INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD_DEFAULT = "0.8"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
import com.microsoft.hyperspace.util.BucketSelector

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -56,6 +57,15 @@ object FilterIndexRule
findCoveringIndexes(filter, outputColumns, filterColumns)
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
case Some(index) =>
val useBucketSpec = if (HyperspaceConf.useBucketSpecForFilterRule(spark)) {
true
} else {
// Check bucket pruning is applicable and threshold condition.
val selectedBuckets = BucketSelector(plan, index.bucketSpec).map(_.cardinality())
selectedBuckets.isDefined &&
selectedBuckets.get <= index.bucketSpec.numBuckets * HyperspaceConf
.prunedBucketRatioToAutoEnableBucketRead(spark)
}
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
Expand All @@ -64,7 +74,7 @@ object FilterIndexRule
spark,
index,
originalPlan,
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketSpec = useBucketSpec,
useBucketUnionForAppended = false)
logEvent(
HyperspaceIndexUsageEvent(
Expand Down Expand Up @@ -136,7 +146,6 @@ object FilterIndexRule
* @param filterColumns List of columns in filter predicate.
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
* @param includedColumns List of included columns (e.g. from an index being checked)
* @param fileFormat FileFormat for input relation in original logical plan.
* @return 'true' if
* 1. Index fully covers output and filter columns, and
* 2. Filter predicate contains first column in index's 'indexed' columns.
Expand Down
145 changes: 145 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/util/BucketSelector.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.util

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.spark.util.collection.BitSet

/**
* BucketSelector returns the selected buckets if bucket pruning is applicable for the given
* query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark.
*/
object BucketSelector {
// should prune buckets iff num buckets is greater than 1 and there is only one bucket column
private def shouldPruneBuckets(spec: BucketSpec): Boolean = {
spec.bucketColumnNames.length == 1 && spec.numBuckets > 1
}

private def getExpressionBuckets(
expr: Expression,
bucketColumnName: String,
numBuckets: Int): BitSet = {

def getBucketNumber(attr: Attribute, v: Any): Int = {
BucketingUtils.getBucketIdFromValue(attr, numBuckets, v)
}

def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
iter
.map(v => getBucketNumber(attr, v))
.foreach(bucketNum => matchedBuckets.set(bucketNum))
matchedBuckets
}

def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.set(getBucketNumber(attr, v))
matchedBuckets
}

expr match {
case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName =>
getBucketSetFromValue(a, v)
case expressions.In(a: Attribute, list)
if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow)))
case expressions.InSet(a: Attribute, hset)
if hset.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
// NOTE: Spark bug - https://issues.apache.org/jira/browse/SPARK-33372
// Bucket pruning is not applied for InSet without the fix.
getBucketSetFromIterable(a, hset)
case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
getBucketSetFromValue(a, null)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == FloatType =>
getBucketSetFromValue(a, Float.NaN)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == DoubleType =>
getBucketSetFromValue(a, Double.NaN)
case expressions.And(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) &
getExpressionBuckets(right, bucketColumnName, numBuckets)
case expressions.Or(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) |
getExpressionBuckets(right, bucketColumnName, numBuckets)
case _ =>
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.setUntil(numBuckets)
matchedBuckets
}
}

private def genBucketSet(
normalizedFilters: Seq[Expression],
bucketSpec: BucketSpec): Option[BitSet] = {
if (normalizedFilters.isEmpty) {
return None
}

val bucketColumnName = bucketSpec.bucketColumnNames.head
val numBuckets = bucketSpec.numBuckets

val normalizedFiltersAndExpr = normalizedFilters
.reduce(expressions.And)
val matchedBuckets =
getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets)

val numBucketsSelected = matchedBuckets.cardinality()

// None means all the buckets need to be scanned
if (numBucketsSelected == numBuckets) {
None
} else {
Some(matchedBuckets)
}
}

def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match {
case PhysicalOperation(
projects,
filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) =>
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
// subquery expressions are filtered out because they can't be used to prune buckets or
// pushed down as data filters, yet they would be executed
val normalizedFiltersWithoutSubqueries =
normalizedFilters.filterNot(SubqueryExpression.hasSubquery)

val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec)
} else {
None
}
bucketSet
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ object HyperspaceConf {
.toBoolean
}

def prunedBucketRatioToAutoEnableBucketRead(spark: SparkSession): Double = {
spark.conf
.get(
IndexConstants.INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD,
IndexConstants.INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD_DEFAULT)
.toDouble
}

def numBucketsForIndex(spark: SparkSession): Int = {
getConfStringWithMultipleKeys(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, UnionExec}
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}
import com.microsoft.hyperspace.index.IndexLogEntryTags._
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation
import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule}
import com.microsoft.hyperspace.util.BucketSelector
import com.microsoft.hyperspace.util.PathUtils

class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
Expand Down Expand Up @@ -165,7 +165,8 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1")

// Verify no Project node is present in the query plan, as a result of using SELECT *
assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty)
val queryPlan = query().queryExecution.optimizedPlan
assert(queryPlan.collect { case p: Project => p }.isEmpty)

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName))
}
Expand Down Expand Up @@ -603,6 +604,112 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}
}

test("Verify adaptive bucket spec application for FilterIndexRule.") {
withTempPathAsString { testPath =>
// Setup. Create data.
val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4"))
import spark.implicits._
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(10)
.write
.json(testPath)
val df = spark.read.json(testPath)

// Create index.
hyperspace.createIndex(df, indexConfig)
spark.enableHyperspace()

def query(): DataFrame =
df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")

withIndex("index") {
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
// Check bucketSpec is applied.
val execPlan = query.queryExecution.executedPlan
val foundPrunedBuckets = execPlan.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) =>
optionalBucketSet.get.cardinality()
}
assert(foundPrunedBuckets.length == 1)
assert(foundPrunedBuckets.head == 3)

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
}

// TODO: because of SPARK-33372, bucket pruning is not applied for InSet operator.
// As indexes are bucketed, supporting bucket pruning can improve the performance of
// queries with high selectivity. Will add a new FileSourceScanStrategy soon.
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
val execPlan = query().queryExecution.executedPlan
// Check bucketSpec is not applied.
val foundPrunedBuckets = execPlan.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if (optionalBucketSet.isDefined) =>
optionalBucketSet.get.cardinality()
}
assert(foundPrunedBuckets.isEmpty)
verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
}

// Append to original data.
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(3)
.write
.mode("append")
.json(testPath)

val df2 = spark.read.json(testPath)
val inputFiles = df.inputFiles
val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_))
def query2(): DataFrame = {
df2.filter(df2("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")
}

withSQLConf(TestConfig.HybridScanEnabled: _*) {
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
// Check bucketSpec is applied.
val execPlan = query2().queryExecution.executedPlan
val foundPrunedBuckets = execPlan.collect {
case _ @UnionExec(children) =>
val p = children.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined =>
optionalBucketSet.get.cardinality()
}
p.head
}
assert(foundPrunedBuckets.length == 1)
assert(foundPrunedBuckets.head == 3)
verifyIndexUsage(
query2,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
}

withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
val execPlan = query2().queryExecution.executedPlan
// Check bucketSpec is not applied.
val foundPrunedBuckets = execPlan.collect {
case _ @ UnionExec(children) =>
val b = children.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined =>
optionalBucketSet.get.cardinality()
}
assert(b.isEmpty)
true
}
assert(foundPrunedBuckets.length == 1)
verifyIndexUsage(
query2,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
}
}
}
}
}

test(
"Verify JoinIndexRule utilizes indexes correctly after quick refresh when some file " +
"gets deleted and some appended to source data.") {
Expand Down Expand Up @@ -755,7 +862,7 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
withIndex(indexConfig.indexName) {
spark.enableHyperspace()
val df = spark.read.parquet(testPath)
def query(df: DataFrame): DataFrame = df.filter("c3 == 'facebook'").select("c3", "c4")
def query(df: DataFrame): DataFrame = df.filter("c3 != 'facebook'").select("c3", "c4")

val indexManager = Hyperspace
.getContext(spark)
Expand Down Expand Up @@ -1002,6 +1109,19 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}.flatten
}

private def getIndexFilesPathWithBucketSelector(
plan: LogicalPlan,
indexName: String,
versions: Seq[Int] = Seq(0)): Seq[Path] = {
val paths = getIndexFilesPath(indexName, versions)
BucketSelector(plan, TestUtils.latestIndexLogEntry(systemPath, indexName).bucketSpec) match {
case Some(buckets) =>
paths.filter(f => buckets.get(BucketingUtils.getBucketId(f.getName).get))
case None =>
paths
}
}

private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = {
versions.flatMap { v =>
Content
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {

withSQLConf(
TestConfig.HybridScanEnabledAppendOnly :+
IndexConstants.INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD -> "0.0" :+
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) {
val filter = filterQuery
val planWithHybridScan = filter.queryExecution.optimizedPlan
Expand Down
Loading