Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Check and remove unnecessary shuffle added by Hybrid Scan
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Jan 25, 2021
1 parent ec1dfb0 commit e082581
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ object IndexConstants {
"spark.hyperspace.index.hybridscan.maxAppendedRatio"
val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT = "0.3"

// If this config is enabled, Hybrid Scan won't be applied when it causes unnecessary shuffle
// to merge appended data.
val INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED =
"spark.hyperspace.index.hybridscan.shuffleCheck.enabled"
val INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED_DEFAULT = "true"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Attribu
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.rankers.JoinIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.HyperspaceConf
import com.microsoft.hyperspace.util.ResolverUtils._

/**
Expand Down Expand Up @@ -68,18 +71,51 @@ object JoinIndexRule
right =
RuleUtils.transformPlanToUseIndex(spark, rIndex, r, useBucketSpec = true))

logEvent(
HyperspaceIndexUsageEvent(
AppInfo(
sparkContext.sparkUser,
sparkContext.applicationId,
sparkContext.appName),
Seq(lIndex, rIndex),
join.toString,
updatedPlan.toString,
"Join index rule applied."))

updatedPlan
val resultPlan =
if (HyperspaceConf.hybridScanEnabled(spark) && HyperspaceConf
.hybridScanShuffleCheckEnabled(spark)) {
val execPlan = spark.sessionState.executePlan(updatedPlan).executedPlan
val shuffleCnt = execPlan.collect {
case smj: SortMergeJoinExec =>
val leftShuffleCnt = smj.left.collect {
case _: ShuffleExchangeExec => true
}.length
val rightShuffleCnt = smj.right.collect {
case _: ShuffleExchangeExec => true
}.length
(leftShuffleCnt, rightShuffleCnt)
}.head

// If the number of shuffle is 2, the candidate index pair cannot remove
// the shuffles in both left and right for join and also Hybrid Scan causes
// an additional shuffle for merging appended files.
// We don't apply the index for the child with 2 shuffle nodes using Join Rule.
// However, the child node is still applicable for Filter Rule.
if (shuffleCnt._1 == 2) {
updatedPlan.copy(left = l)
} else if (shuffleCnt._2 == 2) {
updatedPlan.copy(right = r)
} else {
updatedPlan
}
} else {
updatedPlan
}

if (!resultPlan.equals(join)) {
logEvent(
HyperspaceIndexUsageEvent(
AppInfo(
sparkContext.sparkUser,
sparkContext.applicationId,
sparkContext.appName),
Seq(lIndex, rIndex),
join.toString,
updatedPlan.toString,
"Join index rule applied."))
}

resultPlan
}
.getOrElse(join)
} catch {
Expand Down Expand Up @@ -325,11 +361,7 @@ object JoinIndexRule
compatibleIndexPairs.map(
indexPairs =>
JoinIndexRanker
.rank(
spark,
leftRel,
rightRel,
indexPairs)
.rank(spark, leftRel, rightRel, indexPairs)
.head)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ object HyperspaceConf {
hybridScanDeletedRatioThreshold(spark) > 0.0
}

def hybridScanShuffleCheckEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED,
IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED_DEFAULT)
.toBoolean
}

def optimizeFileSizeThreshold(spark: SparkSession): Long = {
spark.conf
.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}
Expand Down Expand Up @@ -670,13 +671,108 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {

// Refreshed index as quick mode can be applied with Hybrid Scan config.
withSQLConf(TestConfig.HybridScanEnabled: _*) {
spark.disableHyperspace()
val dfWithHyperspaceDisabled = query()
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
spark.disableHyperspace()
val dfWithHyperspaceDisabled = query()
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
}
}
}
}

test("Verify Hybrid Scan is not applied with Shuffle checker when shuffle is not removed.") {
withTempPathAsString { testPath =>
val indexConfig = IndexConfig("indexRight", Seq("c2"), Seq("c4"))
val indexConfig2 = IndexConfig("indexLeft", Seq("c2"), Seq("c3"))
import spark.implicits._
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(10)
.write
.parquet(testPath)
val df = spark.read.load(testPath)
val inputFiles = df.inputFiles

withSQLConf(IndexConstants.INDEX_NUM_BUCKETS -> "11") {
// Create index.
hyperspace.createIndex(df, indexConfig)
}
withSQLConf(IndexConstants.INDEX_NUM_BUCKETS -> "12") {
// Create index.
hyperspace.createIndex(df, indexConfig2)
}

// Append to original data.
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(3)
.write
.mode("append")
.parquet(testPath)

{
// Create a join query.
val leftDf = spark.read.parquet(testPath)
val rightDf = spark.read.parquet(testPath)

def query(): DataFrame = {
val filter1 = leftDf.select("c2", "c3")
val filter2 = rightDf.select("c2", "c4")
filter1
.join(filter2, "c2")
}

val appendedFiles = leftDf.inputFiles.diff(inputFiles).map(new Path(_))

spark.enableHyperspace()
withSQLConf(TestConfig.HybridScanEnabled: _*) {
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED -> "false") {
val execPlan = query().queryExecution.executedPlan
val shuffleCnt = execPlan.collect {
case smj: SortMergeJoinExec =>
val leftShuffleCnt = smj.left.collect {
case _: ShuffleExchangeExec => true
}.length
val rightShuffleCnt = smj.right.collect {
case _: ShuffleExchangeExec => true
}.length
(leftShuffleCnt, rightShuffleCnt)
}.head
assert(shuffleCnt._1 === 1)
// Right child of join has 2 shuffle node because of hybrid scan.
assert(shuffleCnt._2 === 2)

// Verify indexes are used, and all index files are picked.
verifyIndexUsage(
query,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++
getIndexFilesPath(indexConfig2.indexName, Seq(0)) ++ appendedFiles) // for Right
}
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED -> "true") {
val execPlan = query().queryExecution.executedPlan
val shuffleCnt = execPlan.collect {
case smj: SortMergeJoinExec =>
val leftShuffleCnt = smj.left.collect {
case _: ShuffleExchangeExec => true
}.length
val rightShuffleCnt = smj.right.collect {
case _: ShuffleExchangeExec => true
}.length
(leftShuffleCnt, rightShuffleCnt)
}.head
assert(shuffleCnt._1 === 1)
// One shuffle node of right child is removed by shuffle checker.
assert(shuffleCnt._2 === 1)

// For right child, indexRight can be still applied by filter rule.
verifyIndexUsage(
query,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++
getIndexFilesPath(indexConfig2.indexName, Seq(0)) ++ appendedFiles) // for Right
}
}
}
}
Expand Down

0 comments on commit e082581

Please sign in to comment.