Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Support adaptive bucketed scan for FilterIndexRule #332

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ object IndexConstants {
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"

// If this config is true, Hyperspace generates a plan with bucketSpec first and check
// the selectivity of the filter query by creating the physical plan in advance.
// If less than half number of buckets are selected, Filter Rule uses the plan with bucketSpec.
// Otherwise, newly generated bucketSpec is not used for Filter Rule.
sezruby marked this conversation as resolved.
Show resolved Hide resolved
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED =
"spark.hyperspace.index.filterRule.bucketCheck.enabled"
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT = "true"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -56,6 +57,11 @@ object FilterIndexRule
findCoveringIndexes(filter, outputColumns, filterColumns)
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
case Some(index) =>
val useBucketSpec = if (HyperspaceConf.filterRuleBucketCheckEnabled(spark)) {
sezruby marked this conversation as resolved.
Show resolved Hide resolved
BucketSelector(plan, index.bucketSpec).isDefined
} else {
HyperspaceConf.useBucketSpecForFilterRule(spark)
sezruby marked this conversation as resolved.
Show resolved Hide resolved
}
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
Expand All @@ -64,7 +70,7 @@ object FilterIndexRule
spark,
index,
originalPlan,
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketSpec = useBucketSpec,
useBucketUnionForAppended = false)
logEvent(
HyperspaceIndexUsageEvent(
Expand Down Expand Up @@ -136,7 +142,6 @@ object FilterIndexRule
* @param filterColumns List of columns in filter predicate.
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
* @param includedColumns List of included columns (e.g. from an index being checked)
* @param fileFormat FileFormat for input relation in original logical plan.
* @return 'true' if
* 1. Index fully covers output and filter columns, and
* 2. Filter predicate contains first column in index's 'indexed' columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ object HyperspaceConf {
.toBoolean
}

def filterRuleBucketCheckEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED,
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT)
.toBoolean
}

def numBucketsForIndex(spark: SparkSession): Int = {
getConfStringWithMultipleKeys(
spark,
Expand Down
131 changes: 130 additions & 1 deletion src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@

package com.microsoft.hyperspace.util

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.spark.util.collection.BitSet

import com.microsoft.hyperspace.index.IndexLogEntry

/**
* Utility functions for logical plan.
Expand All @@ -35,4 +45,123 @@ object LogicalPlanUtils {
case _ => false
}
}

/**
* BucketSelector returns the selected buckets if bucket pruning is applicable for the given
* query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark.
*/
object BucketSelector {
sezruby marked this conversation as resolved.
Show resolved Hide resolved
// should prune buckets iff num buckets is greater than 1 and there is only one bucket column
private def shouldPruneBuckets(spec: BucketSpec): Boolean = {
spec.bucketColumnNames.length == 1 && spec.numBuckets > 1
}

private def getExpressionBuckets(
expr: Expression,
bucketColumnName: String,
numBuckets: Int): BitSet = {

def getBucketNumber(attr: Attribute, v: Any): Int = {
BucketingUtils.getBucketIdFromValue(attr, numBuckets, v)
}

def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
iter
.map(v => getBucketNumber(attr, v))
.foreach(bucketNum => matchedBuckets.set(bucketNum))
matchedBuckets
}

def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.set(getBucketNumber(attr, v))
matchedBuckets
}

expr match {
case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName =>
getBucketSetFromValue(a, v)
case expressions.In(a: Attribute, list)
if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow)))
case expressions.InSet(a: Attribute, hset)
if hset.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
// NOTE: Spark bug - https://issues.apache.org/jira/browse/SPARK-33372
// Bucket pruning is not applied for InSet without the fix.
getBucketSetFromIterable(a, hset)
case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
getBucketSetFromValue(a, null)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == FloatType =>
getBucketSetFromValue(a, Float.NaN)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == DoubleType =>
getBucketSetFromValue(a, Double.NaN)
case expressions.And(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) &
getExpressionBuckets(right, bucketColumnName, numBuckets)
case expressions.Or(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) |
getExpressionBuckets(right, bucketColumnName, numBuckets)
case _ =>
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.setUntil(numBuckets)
matchedBuckets
}
}

private def genBucketSet(
normalizedFilters: Seq[Expression],
bucketSpec: BucketSpec): Option[BitSet] = {
if (normalizedFilters.isEmpty) {
return None
}

val bucketColumnName = bucketSpec.bucketColumnNames.head
val numBuckets = bucketSpec.numBuckets

val normalizedFiltersAndExpr = normalizedFilters
.reduce(expressions.And)
val matchedBuckets =
getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets)

val numBucketsSelected = matchedBuckets.cardinality()

// None means all the buckets need to be scanned
if (numBucketsSelected == numBuckets) {
None
} else {
Some(matchedBuckets)
}
}

def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match {
case PhysicalOperation(
projects,
filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) =>
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
// subquery expressions are filtered out because they can't be used to prune buckets or
// pushed down as data filters, yet they would be executed
val normalizedFiltersWithoutSubqueries =
normalizedFilters.filterNot(SubqueryExpression.hasSubquery)

val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec)
} else {
None
}
bucketSet
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, UnionExec}
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
Expand All @@ -30,6 +30,7 @@ import com.microsoft.hyperspace.index.IndexLogEntryTags._
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation
import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule}
import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector
import com.microsoft.hyperspace.util.PathUtils

class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
Expand Down Expand Up @@ -165,7 +166,8 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1")

// Verify no Project node is present in the query plan, as a result of using SELECT *
assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty)
val queryPlan = query().queryExecution.optimizedPlan
assert(queryPlan.collect { case p: Project => p }.isEmpty)

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName))
}
Expand Down Expand Up @@ -581,6 +583,112 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}
}

test("Verify adaptive bucket spec application for FilterIndexRule.") {
withTempPathAsString { testPath =>
// Setup. Create data.
val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4"))
import spark.implicits._
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(10)
.write
.json(testPath)
val df = spark.read.json(testPath)

// Create index.
hyperspace.createIndex(df, indexConfig)
spark.enableHyperspace()

def query(): DataFrame =
df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")

withIndex("index") {
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
// Check bucketSpec is applied.
val execPlan = query.queryExecution.executedPlan
val foundPrunedBuckets = execPlan.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) =>
optionalBucketSet.get.cardinality()
}
assert(foundPrunedBuckets.length == 1)
assert(foundPrunedBuckets.head == 3)

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
}

// TODO: because of SPARK-33372, bucket pruning is not applied for InSet operator.
// As indexes are bucketed, supporting bucket pruning can improve the performance of
// queries with high selectivity. Will add a new FileSourceScanStrategy soon.
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
val execPlan = query().queryExecution.executedPlan
// Check bucketSpec is not applied.
val foundPrunedBuckets = execPlan.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if (optionalBucketSet.isDefined) =>
optionalBucketSet.get.cardinality()
}
assert(foundPrunedBuckets.isEmpty)
verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
}

// Append to original data.
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(3)
.write
.mode("append")
.json(testPath)

val df2 = spark.read.json(testPath)
val inputFiles = df.inputFiles
val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_))
def query2(): DataFrame = {
df2.filter(df2("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")
}

withSQLConf(TestConfig.HybridScanEnabled: _*) {
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
// Check bucketSpec is applied.
val execPlan = query2().queryExecution.executedPlan
val foundPrunedBuckets = execPlan.collect {
case _ @UnionExec(children) =>
val p = children.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined =>
optionalBucketSet.get.cardinality()
}
p.head
}
assert(foundPrunedBuckets.length == 1)
assert(foundPrunedBuckets.head == 3)
verifyIndexUsage(
query2,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
}

withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
val execPlan = query2().queryExecution.executedPlan
// Check bucketSpec is not applied.
val foundPrunedBuckets = execPlan.collect {
case _ @ UnionExec(children) =>
val b = children.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined =>
optionalBucketSet.get.cardinality()
}
assert(b.isEmpty)
true
}
assert(foundPrunedBuckets.length == 1)
verifyIndexUsage(
query2,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
}
}
}
}
}

test(
"Verify JoinIndexRule utilizes indexes correctly after quick refresh when some file " +
"gets deleted and some appended to source data.") {
Expand Down Expand Up @@ -733,7 +841,7 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
withIndex(indexConfig.indexName) {
spark.enableHyperspace()
val df = spark.read.parquet(testPath)
def query(df: DataFrame): DataFrame = df.filter("c3 == 'facebook'").select("c3", "c4")
def query(df: DataFrame): DataFrame = df.filter("c3 != 'facebook'").select("c3", "c4")

val indexManager = Hyperspace
.getContext(spark)
Expand Down Expand Up @@ -980,6 +1088,19 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}.flatten
}

private def getIndexFilesPathWithBucketSelector(
plan: LogicalPlan,
indexName: String,
versions: Seq[Int] = Seq(0)): Seq[Path] = {
val paths = getIndexFilesPath(indexName, versions)
BucketSelector(plan, TestUtils.latestIndexLogEntry(systemPath, indexName).bucketSpec) match {
case Some(buckets) =>
paths.filter(f => buckets.get(BucketingUtils.getBucketId(f.getName).get))
case None =>
paths
}
}

private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = {
versions.flatMap { v =>
Content
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {

withSQLConf(
TestConfig.HybridScanEnabledAppendOnly :+
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED -> "false" :+
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) {
val filter = filterQuery
val planWithHybridScan = filter.queryExecution.optimizedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
.append("Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]"))
.append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ")
.append("ReadSchema: struct<Col2:int,Col1:string>" + displayMode.highlightTag.close)
.append("ReadSchema: struct<Col2:int,Col1:string>, SelectedBucketsCount: 1 out of 200")
.append(displayMode.highlightTag.close)
.append(displayMode.newLine)
.append(displayMode.newLine)
.append("=============================================================")
Expand Down