Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Check and remove unnecessary shuffle added by Hybrid Scan
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Jan 25, 2021
1 parent ec1dfb0 commit 8bec7c5
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ object IndexConstants {
"spark.hyperspace.index.hybridscan.maxAppendedRatio"
val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT = "0.3"

// If this config is enabled, Hybrid Scan won't be applied when it causes unnecessary shuffle
// to merge appended data.
val INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED =
"spark.hyperspace.index.hybridscan.shuffleCheck.enabled"
val INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED_DEFAULT = "true"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, AttributeSet, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.rankers.JoinIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.HyperspaceConf
import com.microsoft.hyperspace.util.ResolverUtils._

/**
Expand Down Expand Up @@ -68,18 +72,54 @@ object JoinIndexRule
right =
RuleUtils.transformPlanToUseIndex(spark, rIndex, r, useBucketSpec = true))

def getShuffleCnt(sparkPlan: SparkPlan): Long = {
sparkPlan.collect { case _: ShuffleExchangeExec => true }.length
}

val resultPlan =
if (!HyperspaceConf.hybridScanEnabled(spark) || !HyperspaceConf
.hybridScanShuffleCheckEnabled(spark)) {
updatedPlan
} else {
val shuffleCntPair = spark.sessionState
.executePlan(updatedPlan)
.executedPlan
.collect {
case smj: SortMergeJoinExec =>
(getShuffleCnt(smj.left), getShuffleCnt(smj.right))
}
assert(shuffleCntPair.length <= 1)
shuffleCntPair.headOption.flatMap {
// If the number of shuffle is 2, the candidate index pair cannot remove
// the shuffles in both left and right for join and also Hybrid Scan causes
// an additional shuffle for merging appended files. We don't apply the index
// for the child with 2 shuffle nodes using JoinIndexRule.
// However, the child node is still applicable for FilterIndexRule.
case (leftCnt, _) if leftCnt == 2 => Some(updatedPlan.copy(left = l))
case (_, rightCnt) if rightCnt == 2 => Some(updatedPlan.copy(right = r))
case _ => None
}.getOrElse {
updatedPlan
}
}

val appliedIndexes = resultPlan match {
case j: Join if j.left.equals(l) => Seq(rIndex)
case j: Join if j.right.equals(r) => Seq(lIndex)
case _ => Seq(lIndex, rIndex)
}

logEvent(
HyperspaceIndexUsageEvent(
AppInfo(
sparkContext.sparkUser,
sparkContext.applicationId,
sparkContext.appName),
Seq(lIndex, rIndex),
appliedIndexes,
join.toString,
updatedPlan.toString,
resultPlan.toString,
"Join index rule applied."))

updatedPlan
resultPlan
}
.getOrElse(join)
} catch {
Expand Down Expand Up @@ -325,11 +365,7 @@ object JoinIndexRule
compatibleIndexPairs.map(
indexPairs =>
JoinIndexRanker
.rank(
spark,
leftRel,
rightRel,
indexPairs)
.rank(spark, leftRel, rightRel, indexPairs)
.head)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ object HyperspaceConf {
hybridScanDeletedRatioThreshold(spark) > 0.0
}

def hybridScanShuffleCheckEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED,
IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED_DEFAULT)
.toBoolean
}

def optimizeFileSizeThreshold(spark: SparkSession): Long = {
spark.conf
.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.{SortExec, SparkPlan => SparkPlanNode}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}
Expand Down Expand Up @@ -670,13 +671,95 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {

// Refreshed index as quick mode can be applied with Hybrid Scan config.
withSQLConf(TestConfig.HybridScanEnabled: _*) {
spark.disableHyperspace()
val dfWithHyperspaceDisabled = query()
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
spark.disableHyperspace()
val dfWithHyperspaceDisabled = query()
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
}
}
}
}

test("Verify Hybrid Scan is not applied when shuffle is not removed.") {
withTempPathAsString { testPath =>
val indexConfig = IndexConfig("indexRight", Seq("c2"), Seq("c4"))
val indexConfig2 = IndexConfig("indexLeft", Seq("c2"), Seq("c3"))
import spark.implicits._
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(10)
.write
.parquet(testPath)
val df = spark.read.load(testPath)

withSQLConf(IndexConstants.INDEX_NUM_BUCKETS -> "11") {
// Create an index with bucket num 11.
hyperspace.createIndex(df, indexConfig)
}
withSQLConf(IndexConstants.INDEX_NUM_BUCKETS -> "12") {
// Create an index with bucket num 12.
hyperspace.createIndex(df, indexConfig2)
}

// Append to original data.
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(3)
.write
.mode("append")
.parquet(testPath)

{
// Create a join query.
val df2 = spark.read.parquet(testPath)

def query(): DataFrame = {
df2.select("c2", "c3").join(df2.select("c2", "c4"), "c2")
}

val inputFiles = df.inputFiles
val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_))

spark.enableHyperspace()
withSQLConf(TestConfig.HybridScanEnabled: _*) {
def getShuffleCnt(sparkPlan: SparkPlanNode): Long = {
sparkPlan.collect { case _: ShuffleExchangeExec => true }.length
}
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED -> "false") {
val execPlan = query().queryExecution.executedPlan
val shuffleCnt = execPlan.collect {
case smj: SortMergeJoinExec =>
(getShuffleCnt(smj.left), getShuffleCnt(smj.right))
}.head
assert(shuffleCnt._1 === 1)
// Right child of join has 2 shuffle nodes because of Hybrid Scan for appended files.
assert(shuffleCnt._2 === 2)

// Verify indexes are used, and all index files are picked.
verifyIndexUsage(
query,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++
getIndexFilesPath(indexConfig2.indexName, Seq(0)) ++ appendedFiles)
}
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED -> "true") {
val execPlan = query().queryExecution.executedPlan
val shuffleCnt = execPlan.collect {
case smj: SortMergeJoinExec =>
(getShuffleCnt(smj.left), getShuffleCnt(smj.right))
}.head
assert(shuffleCnt._1 === 1)
// One shuffle node of right child is removed with shuffle count check.
assert(shuffleCnt._2 === 1)

// For right child, indexRight can be still applied by FilterIndexRule.
verifyIndexUsage(
query,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++
getIndexFilesPath(indexConfig2.indexName, Seq(0)) ++ appendedFiles)
}
}
}
}
Expand Down

0 comments on commit 8bec7c5

Please sign in to comment.