Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Add adaptive bucketSpec feature
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Feb 6, 2021
1 parent 88f1b43 commit 04aace9
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ object IndexConstants {
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"

// If this config is true, Hyperspace generates a plan with bucketSpec first and check
// the selectivity of the filter query by creating the physical plan in advance.
// If less than half number of buckets are selected, Filter Rule uses the plan with bucketSpec.
// Otherwise, newly generated bucketSpec is not used for Filter Rule.
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED =
"spark.hyperspace.index.filterRule.bucketCheck.enabled"
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT = "true"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -56,6 +57,11 @@ object FilterIndexRule
findCoveringIndexes(filter, outputColumns, filterColumns)
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
case Some(index) =>
val useBucketSpec = if (HyperspaceConf.filterRuleBucketCheckEnabled(spark)) {
BucketSelector(plan, index.bucketSpec).isDefined
} else {
HyperspaceConf.useBucketSpecForFilterRule(spark)
}
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
Expand All @@ -64,7 +70,7 @@ object FilterIndexRule
spark,
index,
originalPlan,
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketSpec = useBucketSpec,
useBucketUnionForAppended = false)
logEvent(
HyperspaceIndexUsageEvent(
Expand Down Expand Up @@ -136,7 +142,6 @@ object FilterIndexRule
* @param filterColumns List of columns in filter predicate.
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
* @param includedColumns List of included columns (e.g. from an index being checked)
* @param fileFormat FileFormat for input relation in original logical plan.
* @return 'true' if
* 1. Index fully covers output and filter columns, and
* 2. Filter predicate contains first column in index's 'indexed' columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ object HyperspaceConf {
.toBoolean
}

def filterRuleBucketCheckEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED,
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT)
.toBoolean
}

def numBucketsForIndex(spark: SparkSession): Int = {
getConfStringWithMultipleKeys(
spark,
Expand Down
131 changes: 130 additions & 1 deletion src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@

package com.microsoft.hyperspace.util

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.spark.util.collection.BitSet

import com.microsoft.hyperspace.index.IndexLogEntry

/**
* Utility functions for logical plan.
Expand All @@ -35,4 +45,123 @@ object LogicalPlanUtils {
case _ => false
}
}

/**
* BucketSelector returns the selected buckets if bucket pruning is applicable for the given
* query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark.
*/
object BucketSelector {
// should prune buckets iff num buckets is greater than 1 and there is only one bucket column
private def shouldPruneBuckets(spec: BucketSpec): Boolean = {
spec.bucketColumnNames.length == 1 && spec.numBuckets > 1
}

private def getExpressionBuckets(
expr: Expression,
bucketColumnName: String,
numBuckets: Int): BitSet = {

def getBucketNumber(attr: Attribute, v: Any): Int = {
BucketingUtils.getBucketIdFromValue(attr, numBuckets, v)
}

def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
iter
.map(v => getBucketNumber(attr, v))
.foreach(bucketNum => matchedBuckets.set(bucketNum))
matchedBuckets
}

def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.set(getBucketNumber(attr, v))
matchedBuckets
}

expr match {
case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName =>
getBucketSetFromValue(a, v)
case expressions.In(a: Attribute, list)
if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow)))
case expressions.InSet(a: Attribute, hset)
if hset.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
// NOTE: Spark bug - https://issues.apache.org/jira/browse/SPARK-33372
// Bucket pruning is not applied for InSet without the fix.
getBucketSetFromIterable(a, hset)
case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
getBucketSetFromValue(a, null)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == FloatType =>
getBucketSetFromValue(a, Float.NaN)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == DoubleType =>
getBucketSetFromValue(a, Double.NaN)
case expressions.And(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) &
getExpressionBuckets(right, bucketColumnName, numBuckets)
case expressions.Or(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) |
getExpressionBuckets(right, bucketColumnName, numBuckets)
case _ =>
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.setUntil(numBuckets)
matchedBuckets
}
}

private def genBucketSet(
normalizedFilters: Seq[Expression],
bucketSpec: BucketSpec): Option[BitSet] = {
if (normalizedFilters.isEmpty) {
return None
}

val bucketColumnName = bucketSpec.bucketColumnNames.head
val numBuckets = bucketSpec.numBuckets

val normalizedFiltersAndExpr = normalizedFilters
.reduce(expressions.And)
val matchedBuckets =
getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets)

val numBucketsSelected = matchedBuckets.cardinality()

// None means all the buckets need to be scanned
if (numBucketsSelected == numBuckets) {
None
} else {
Some(matchedBuckets)
}
}

def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match {
case PhysicalOperation(
projects,
filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) =>
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
// subquery expressions are filtered out because they can't be used to prune buckets or
// pushed down as data filters, yet they would be executed
val normalizedFiltersWithoutSubqueries =
normalizedFilters.filterNot(SubqueryExpression.hasSubquery)

val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec)
} else {
None
}
bucketSet
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, UnionExec}
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
Expand All @@ -30,6 +30,7 @@ import com.microsoft.hyperspace.index.IndexLogEntryTags._
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation
import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule}
import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector
import com.microsoft.hyperspace.util.PathUtils

class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
Expand Down Expand Up @@ -165,7 +166,8 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1")

// Verify no Project node is present in the query plan, as a result of using SELECT *
assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty)
val queryPlan = query().queryExecution.optimizedPlan
assert(queryPlan.collect { case p: Project => p }.isEmpty)

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName))
}
Expand Down Expand Up @@ -581,6 +583,112 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}
}

test("Verify adaptive bucket spec application for FilterIndexRule.") {
withTempPathAsString { testPath =>
// Setup. Create data.
val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4"))
import spark.implicits._
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(10)
.write
.json(testPath)
val df = spark.read.json(testPath)

// Create index.
hyperspace.createIndex(df, indexConfig)
spark.enableHyperspace()

def query(): DataFrame =
df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")

withIndex("index") {
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
// Check bucketSpec is applied.
val execPlan = query.queryExecution.executedPlan
val foundPrunedBuckets = execPlan.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) =>
optionalBucketSet.get.cardinality()
}
assert(foundPrunedBuckets.length == 1)
assert(foundPrunedBuckets.head == 3)

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
}

// TODO: because of SPARK-33372, bucket pruning is not applied for InSet operator.
// As indexes are bucketed, supporting bucket pruning can improve the performance of
// queries with high selectivity. Will add a new FileSourceScanStrategy soon.
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
val execPlan = query().queryExecution.executedPlan
// Check bucketSpec is not applied.
val foundPrunedBuckets = execPlan.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if (optionalBucketSet.isDefined) =>
optionalBucketSet.get.cardinality()
}
assert(foundPrunedBuckets.isEmpty)
verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
}

// Append to original data.
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(3)
.write
.mode("append")
.json(testPath)

val df2 = spark.read.json(testPath)
val inputFiles = df.inputFiles
val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_))
def query2(): DataFrame = {
df2.filter(df2("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")
}

withSQLConf(TestConfig.HybridScanEnabled: _*) {
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
// Check bucketSpec is applied.
val execPlan = query2().queryExecution.executedPlan
val foundPrunedBuckets = execPlan.collect {
case _ @UnionExec(children) =>
val p = children.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined =>
optionalBucketSet.get.cardinality()
}
p.head
}
assert(foundPrunedBuckets.length == 1)
assert(foundPrunedBuckets.head == 3)
verifyIndexUsage(
query2,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
}

withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
val execPlan = query2().queryExecution.executedPlan
// Check bucketSpec is not applied.
val foundPrunedBuckets = execPlan.collect {
case _ @ UnionExec(children) =>
val b = children.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined =>
optionalBucketSet.get.cardinality()
}
assert(b.isEmpty)
true
}
assert(foundPrunedBuckets.length == 1)
verifyIndexUsage(
query2,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
}
}
}
}
}

test(
"Verify JoinIndexRule utilizes indexes correctly after quick refresh when some file " +
"gets deleted and some appended to source data.") {
Expand Down Expand Up @@ -733,7 +841,7 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
withIndex(indexConfig.indexName) {
spark.enableHyperspace()
val df = spark.read.parquet(testPath)
def query(df: DataFrame): DataFrame = df.filter("c3 == 'facebook'").select("c3", "c4")
def query(df: DataFrame): DataFrame = df.filter("c3 != 'facebook'").select("c3", "c4")

val indexManager = Hyperspace
.getContext(spark)
Expand Down Expand Up @@ -980,6 +1088,19 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}.flatten
}

private def getIndexFilesPathWithBucketSelector(
plan: LogicalPlan,
indexName: String,
versions: Seq[Int] = Seq(0)): Seq[Path] = {
val paths = getIndexFilesPath(indexName, versions)
BucketSelector(plan, TestUtils.latestIndexLogEntry(systemPath, indexName).bucketSpec) match {
case Some(buckets) =>
paths.filter(f => buckets.get(BucketingUtils.getBucketId(f.getName).get))
case None =>
paths
}
}

private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = {
versions.flatMap { v =>
Content
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {

withSQLConf(
TestConfig.HybridScanEnabledAppendOnly :+
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED -> "false" :+
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) {
val filter = filterQuery
val planWithHybridScan = filter.queryExecution.optimizedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
.append("Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]"))
.append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ")
.append("ReadSchema: struct<Col2:int,Col1:string>" + displayMode.highlightTag.close)
.append("ReadSchema: struct<Col2:int,Col1:string>, SelectedBucketsCount: 1 out of 200")
.append(displayMode.highlightTag.close)
.append(displayMode.newLine)
.append(displayMode.newLine)
.append("=============================================================")
Expand Down

0 comments on commit 04aace9

Please sign in to comment.