Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Add adaptive bucketSpec feature
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Feb 6, 2021
1 parent 88f1b43 commit e548325
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ object IndexConstants {
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"

// If this config is true, Hyperspace generates a plan with bucketSpec first and check
// the selectivity of the filter query by creating the physical plan in advance.
// If less than half number of buckets are selected, Filter Rule uses the plan with bucketSpec.
// Otherwise, newly generated bucketSpec is not used for Filter Rule.
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED =
"spark.hyperspace.index.filterRule.bucketCheck.enabled"
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT = "true"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -56,6 +57,11 @@ object FilterIndexRule
findCoveringIndexes(filter, outputColumns, filterColumns)
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
case Some(index) =>
val useBucketSpec = if (HyperspaceConf.filterRuleBucketCheckEnabled(spark)) {
BucketSelector(plan, index.bucketSpec).isDefined
} else {
HyperspaceConf.useBucketSpecForFilterRule(spark)
}
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
Expand All @@ -64,7 +70,7 @@ object FilterIndexRule
spark,
index,
originalPlan,
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketSpec = useBucketSpec,
useBucketUnionForAppended = false)
logEvent(
HyperspaceIndexUsageEvent(
Expand Down Expand Up @@ -136,7 +142,6 @@ object FilterIndexRule
* @param filterColumns List of columns in filter predicate.
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
* @param includedColumns List of included columns (e.g. from an index being checked)
* @param fileFormat FileFormat for input relation in original logical plan.
* @return 'true' if
* 1. Index fully covers output and filter columns, and
* 2. Filter predicate contains first column in index's 'indexed' columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ object HyperspaceConf {
.toBoolean
}

def filterRuleBucketCheckEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED,
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT)
.toBoolean
}

def numBucketsForIndex(spark: SparkSession): Int = {
getConfStringWithMultipleKeys(
spark,
Expand Down
151 changes: 149 additions & 2 deletions src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,23 @@

package com.microsoft.hyperspace.util

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.spark.util.collection.BitSet

import com.microsoft.hyperspace.index.IndexLogEntry

/**
* Utility functions for logical plan.
*/
object LogicalPlanUtils {
object LogicalPlanUtils extends Logging {

/**
* Check if a logical plan is a LogicalRelation.
Expand All @@ -35,4 +45,141 @@ object LogicalPlanUtils {
case _ => false
}
}

/**
* BucketSelector returns the selected buckets if bucket pruning is applicable for the given
* query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark.
*/
object BucketSelector {
// should prune buckets iff num buckets is greater than 1 and there is only one bucket column
private def shouldPruneBuckets(spec: BucketSpec): Boolean = {
spec.bucketColumnNames.length == 1 && spec.numBuckets > 1
}

private def getExpressionBuckets(
expr: Expression,
bucketColumnName: String,
numBuckets: Int): BitSet = {

def getBucketNumber(attr: Attribute, v: Any): Int = {
BucketingUtils.getBucketIdFromValue(attr, numBuckets, v)
}

def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
iter
.map(v => getBucketNumber(attr, v))
.foreach(bucketNum => matchedBuckets.set(bucketNum))
matchedBuckets
}

def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.set(getBucketNumber(attr, v))
matchedBuckets
}

expr match {
case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName =>
getBucketSetFromValue(a, v)
case expressions.In(a: Attribute, list)
if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow)))
case expressions.InSet(a: Attribute, hset)
if hset.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
// NOTE: Spark bug - https://issues.apache.org/jira/browse/SPARK-33372
// Bucket pruning is not applied for InSet without the fix.
getBucketSetFromIterable(a, hset)
case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
getBucketSetFromValue(a, null)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == FloatType =>
getBucketSetFromValue(a, Float.NaN)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == DoubleType =>
getBucketSetFromValue(a, Double.NaN)
case expressions.And(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) &
getExpressionBuckets(right, bucketColumnName, numBuckets)
case expressions.Or(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) |
getExpressionBuckets(right, bucketColumnName, numBuckets)
case _ =>
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.setUntil(numBuckets)
matchedBuckets
}
}

private def genBucketSet(
normalizedFilters: Seq[Expression],
bucketSpec: BucketSpec): Option[BitSet] = {
if (normalizedFilters.isEmpty) {
return None
}

val bucketColumnName = bucketSpec.bucketColumnNames.head
val numBuckets = bucketSpec.numBuckets

val normalizedFiltersAndExpr = normalizedFilters
.reduce(expressions.And)
val matchedBuckets =
getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets)

val numBucketsSelected = matchedBuckets.cardinality()

// None means all the buckets need to be scanned
if (numBucketsSelected == numBuckets) {
None
} else {
Some(matchedBuckets)
}
}

def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match {
case PhysicalOperation(
projects,
filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) =>
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
// subquery expressions are filtered out because they can't be used to prune buckets or
// pushed down as data filters, yet they would be executed
val normalizedFiltersWithoutSubqueries =
normalizedFilters.filterNot(SubqueryExpression.hasSubquery)

val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec)
} else {
None
}
bucketSet
case _ => None
}
}

private[hyperspace] def indexFilesSelectedBucketOnly(
plan: LogicalPlan,
index: IndexLogEntry): Seq[Path] = {
BucketSelector(plan, index.bucketSpec) match {
case Some(selectedBuckets) =>
val numBuckets = index.bucketSpec.numBuckets
val paths = index.content.files.filter(f =>
selectedBuckets.get(BucketingUtils.getBucketId(f.getName).get))
logInfo {
s"Pruned ${numBuckets - selectedBuckets.cardinality()} out of $numBuckets buckets." +
s"${index.content.files.size - paths.size} of index data files are excluded."
}
paths
case None =>
index.content.files
}
}
}
Loading

0 comments on commit e548325

Please sign in to comment.