Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Add adaptive bucketSpec application for FilterIndexRule
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Jan 25, 2021
1 parent 09a2909 commit 55f2b62
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ object IndexConstants {
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"

// If this config is true, Hyperspace generates a plan with bucketSpec first and check
// the selectivity of the filter query by creating the physical plan in advance.
// If less than half number of buckets are selected, Filter Rule uses the plan with bucketSpec.
// Otherwise, newly generated bucketSpec is not used for Filter Rule.
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED =
"spark.hyperspace.index.filterRule.bucketCheck.enabled"
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT = "true"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package com.microsoft.hyperspace.index.rules

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Literal}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec, UnionExec}
import org.apache.spark.sql.execution.datasources._

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
Expand Down Expand Up @@ -56,14 +58,46 @@ object FilterIndexRule
findCoveringIndexes(filter, outputColumns, filterColumns)
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
case Some(index) =>
// Do not set BucketSpec to avoid limiting Spark's degree of parallelism.
val transformedPlan =
val transformedPlan = if (HyperspaceConf.filterRuleBucketCheckEnabled(spark)) {
// Test bucket spec first.
val transformedPlanWithBucketSpec =
RuleUtils.transformPlanToUseIndex(
spark,
index,
originalPlan,
useBucketSpec = true,
useBucketUnionForAppended = false)
val fileSourceScanNodes = if (transformedPlanWithBucketSpec.isInstanceOf[Union]) {
FileSourceStrategy(transformedPlanWithBucketSpec.children.head)
} else {
FileSourceStrategy(transformedPlanWithBucketSpec)
}
val foundPrunedBuckets = fileSourceScanNodes.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined && (optionalBucketSet.get
.cardinality() * 1.0 < index.bucketSpec.numBuckets * 0.8) =>
optionalBucketSet.get.cardinality()
}

if (foundPrunedBuckets.nonEmpty) {
transformedPlanWithBucketSpec
} else {
RuleUtils.transformPlanToUseIndex(
spark,
index,
originalPlan,
useBucketSpec = false,
useBucketUnionForAppended = false)
}
} else {
RuleUtils.transformPlanToUseIndex(
spark,
index,
originalPlan,
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketUnionForAppended = false)
}

logEvent(
HyperspaceIndexUsageEvent(
AppInfo(
Expand Down Expand Up @@ -134,7 +168,6 @@ object FilterIndexRule
* @param filterColumns List of columns in filter predicate.
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
* @param includedColumns List of included columns (e.g. from an index being checked)
* @param fileFormat FileFormat for input relation in original logical plan.
* @return 'true' if
* 1. Index fully covers output and filter columns, and
* 2. Filter predicate contains first column in index's 'indexed' columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ object HyperspaceConf {
.toBoolean
}

def filterRuleBucketCheckEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED,
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT)
.toBoolean
}

def numBucketsForIndex(spark: SparkSession): Int = {
getConfStringWithMultipleKeys(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, UnionExec}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec

Expand Down Expand Up @@ -579,6 +579,112 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}
}

test("Verify adaptive bucket spec application for FilterIndexRule.") {
withTempPathAsString { testPath =>
// Setup. Create data.
val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4"))
import spark.implicits._
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(10)
.write
.json(testPath)
val df = spark.read.json(testPath)

// Create index.
hyperspace.createIndex(df, indexConfig)
spark.enableHyperspace()

def query(): DataFrame =
df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")

withIndex("index") {
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
// Check bucketSpec is applied.
val execPlan = query.queryExecution.executedPlan
val foundPrunedBuckets = execPlan.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) =>
optionalBucketSet.get.cardinality()
}
assert(foundPrunedBuckets.length == 1)
assert(foundPrunedBuckets.head == 3)

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
}

// TODO: because of SPARK-33372, bucket pruning is not applied for InSet operator.
// As indexes are bucketed, supporting bucket pruning can improve the performance of
// queries with high selectivity. Will add a new FileSourceScanStrategy soon.
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
val execPlan = query().queryExecution.executedPlan
// Check bucketSpec is not applied.
val foundPrunedBuckets = execPlan.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if (optionalBucketSet.isDefined) =>
optionalBucketSet.get.cardinality()
}
assert(foundPrunedBuckets.isEmpty)
verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
}

// Append to original data.
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(3)
.write
.mode("append")
.json(testPath)

val df2 = spark.read.json(testPath)
val inputFiles = df.inputFiles
val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_))
def query2(): DataFrame = {
df2.filter(df2("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")
}

withSQLConf(TestConfig.HybridScanEnabled: _*) {
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
// Check bucketSpec is applied.
val execPlan = query2().queryExecution.executedPlan
val foundPrunedBuckets = execPlan.collect {
case _ @UnionExec(children) =>
val p = children.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined =>
optionalBucketSet.get.cardinality()
}
p.head
}
assert(foundPrunedBuckets.length == 1)
assert(foundPrunedBuckets.head == 3)
verifyIndexUsage(
query2,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
}

withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
val execPlan = query2().queryExecution.executedPlan
// Check bucketSpec is not applied.
val foundPrunedBuckets = execPlan.collect {
case _ @ UnionExec(children) =>
val b = children.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined =>
optionalBucketSet.get.cardinality()
}
assert(b.isEmpty)
true
}
assert(foundPrunedBuckets.length == 1)
verifyIndexUsage(
query2,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
}
}
}
}
}

test(
"Verify JoinIndexRule utilizes indexes correctly after quick refresh when some file " +
"gets deleted and some appended to source data.") {
Expand Down Expand Up @@ -670,13 +776,13 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {

// Refreshed index as quick mode can be applied with Hybrid Scan config.
withSQLConf(TestConfig.HybridScanEnabled: _*) {
spark.disableHyperspace()
val dfWithHyperspaceDisabled = query()
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
spark.disableHyperspace()
val dfWithHyperspaceDisabled = query()
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {

withSQLConf(
TestConfig.HybridScanEnabledAppendOnly :+
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED -> "false" :+
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) {
val filter = filterQuery
val planWithHybridScan = filter.queryExecution.optimizedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
.append(" Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ")
.append("ReadSchema: struct<Col2:int,Col1:string>---->")
.append("ReadSchema: struct<Col2:int,Col1:string>, SelectedBucketsCount: 1 out of 200---->")
.append(displayMode.newLine)
.append(" +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") +
Expand All @@ -329,7 +329,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
"Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") +
", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ")
.append("ReadSchema: struct<Col2:int,Col1:string>---->")
.append("ReadSchema: struct<Col2:int,Col1:string>, SelectedBucketsCount: 1 out of 200---->")
.append(displayMode.newLine)
.append(displayMode.newLine)
.append("=============================================================")
Expand Down Expand Up @@ -655,7 +655,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
.append("Batched: true, Format: Parquet, Location: " +
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]"))
.append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ")
.append("ReadSchema: struct<Col2:int,Col1:string>" + displayMode.highlightTag.close)
.append("ReadSchema: struct<Col2:int,Col1:string>, SelectedBucketsCount: 1 out of 200" + displayMode.highlightTag.close)
.append(displayMode.newLine)
.append(displayMode.newLine)
.append("=============================================================")
Expand Down

0 comments on commit 55f2b62

Please sign in to comment.