Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Check and remove unnecessary shuffle added by Hybrid Scan #331

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ object IndexConstants {
"spark.hyperspace.index.hybridscan.maxAppendedRatio"
val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT = "0.3"

// If this config is enabled, Hybrid Scan won't be applied when it causes unnecessary shuffle
// to merge appended data.
val INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED =
"spark.hyperspace.index.hybridscan.shuffleCheck.enabled"
val INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED_DEFAULT = "true"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, AttributeSet, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.rankers.JoinIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.HyperspaceConf
import com.microsoft.hyperspace.util.ResolverUtils._

/**
Expand Down Expand Up @@ -68,18 +72,54 @@ object JoinIndexRule
right =
RuleUtils.transformPlanToUseIndex(spark, rIndex, r, useBucketSpec = true))

def getShuffleCnt(sparkPlan: SparkPlan): Long = {
sparkPlan.collect { case _: ShuffleExchangeExec => true }.length
}

val resultPlan =
if (!HyperspaceConf.hybridScanEnabled(spark) || !HyperspaceConf
.hybridScanShuffleCheckEnabled(spark)) {
updatedPlan
} else {
val shuffleCntPair = spark.sessionState
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that this will be recursively called since this is called inside a rule? (I am not sure if this is a good pattern or not to be honest).

Copy link
Collaborator Author

@sezruby sezruby Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. how about just using JoinSelection & and just check the bucket number of candidate index pair if it's SortMergeJoin?
Or we could generate the plan w/o hyperspace rules

.executePlan(updatedPlan)
.executedPlan
.collect {
case smj: SortMergeJoinExec =>
(getShuffleCnt(smj.left), getShuffleCnt(smj.right))
}
assert(shuffleCntPair.length <= 1)
shuffleCntPair.headOption.flatMap {
// If the number of shuffle is 2, the candidate index pair cannot remove
// the shuffles in both left and right for join and also Hybrid Scan causes
// an additional shuffle for merging appended files. We don't apply the index
Comment on lines +93 to +95
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused with this comment. For example, is the additional shuffle by hybrid scan also counting toward 2?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 for sort merge join & 1 for hybrid scan on the fly shuffle, in left or right child plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea please reword the comments.

// for the child with 2 shuffle nodes using JoinIndexRule.
// However, the child node is still applicable for FilterIndexRule.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify on the reference to FilterIndexRule?

case (leftCnt, _) if leftCnt == 2 => Some(updatedPlan.copy(left = l))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that rightCnt is also 2 for this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Spark will shuffle with a higher bucket number. So only 1 shuffle for sort merge join can be required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, let's add this to the comment.

case (_, rightCnt) if rightCnt == 2 => Some(updatedPlan.copy(right = r))
case _ => None
}.getOrElse {
updatedPlan
}
}

val appliedIndexes = resultPlan match {
case j: Join if j.left.equals(l) => Seq(rIndex)
case j: Join if j.right.equals(r) => Seq(lIndex)
case _ => Seq(lIndex, rIndex)
}

logEvent(
HyperspaceIndexUsageEvent(
AppInfo(
sparkContext.sparkUser,
sparkContext.applicationId,
sparkContext.appName),
Seq(lIndex, rIndex),
appliedIndexes,
join.toString,
updatedPlan.toString,
resultPlan.toString,
"Join index rule applied."))

updatedPlan
resultPlan
}
.getOrElse(join)
} catch {
Expand Down Expand Up @@ -325,11 +365,7 @@ object JoinIndexRule
compatibleIndexPairs.map(
indexPairs =>
JoinIndexRanker
.rank(
spark,
leftRel,
rightRel,
indexPairs)
.rank(spark, leftRel, rightRel, indexPairs)
.head)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ object HyperspaceConf {
hybridScanDeletedRatioThreshold(spark) > 0.0
}

def hybridScanShuffleCheckEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED,
IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED_DEFAULT)
.toBoolean
}

def optimizeFileSizeThreshold(spark: SparkSession): Long = {
spark.conf
.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.{SortExec, SparkPlan => SparkPlanNode}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}
Expand Down Expand Up @@ -670,13 +671,95 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {

// Refreshed index as quick mode can be applied with Hybrid Scan config.
withSQLConf(TestConfig.HybridScanEnabled: _*) {
spark.disableHyperspace()
val dfWithHyperspaceDisabled = query()
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
spark.disableHyperspace()
val dfWithHyperspaceDisabled = query()
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
}
}
}
}

test("Verify Hybrid Scan is not applied when shuffle is not removed.") {
withTempPathAsString { testPath =>
val indexConfig = IndexConfig("indexRight", Seq("c2"), Seq("c4"))
val indexConfig2 = IndexConfig("indexLeft", Seq("c2"), Seq("c3"))
import spark.implicits._
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(10)
.write
.parquet(testPath)
val df = spark.read.load(testPath)

withSQLConf(IndexConstants.INDEX_NUM_BUCKETS -> "11") {
// Create an index with bucket num 11.
hyperspace.createIndex(df, indexConfig)
}
withSQLConf(IndexConstants.INDEX_NUM_BUCKETS -> "12") {
// Create an index with bucket num 12.
hyperspace.createIndex(df, indexConfig2)
}

// Append to original data.
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(3)
.write
.mode("append")
.parquet(testPath)

{
// Create a join query.
val df2 = spark.read.parquet(testPath)

def query(): DataFrame = {
df2.select("c2", "c3").join(df2.select("c2", "c4"), "c2")
}

val inputFiles = df.inputFiles
val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_))

spark.enableHyperspace()
withSQLConf(TestConfig.HybridScanEnabled: _*) {
def getShuffleCnt(sparkPlan: SparkPlanNode): Long = {
sparkPlan.collect { case _: ShuffleExchangeExec => true }.length
}
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED -> "false") {
val execPlan = query().queryExecution.executedPlan
val shuffleCnt = execPlan.collect {
case smj: SortMergeJoinExec =>
(getShuffleCnt(smj.left), getShuffleCnt(smj.right))
}.head
assert(shuffleCnt._1 === 1)
// Right child of join has 2 shuffle nodes because of Hybrid Scan for appended files.
assert(shuffleCnt._2 === 2)

// Verify indexes are used, and all index files are picked.
verifyIndexUsage(
query,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++
getIndexFilesPath(indexConfig2.indexName, Seq(0)) ++ appendedFiles)
}
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED -> "true") {
val execPlan = query().queryExecution.executedPlan
val shuffleCnt = execPlan.collect {
case smj: SortMergeJoinExec =>
(getShuffleCnt(smj.left), getShuffleCnt(smj.right))
}.head
assert(shuffleCnt._1 === 1)
// One shuffle node of right child is removed with shuffle count check.
assert(shuffleCnt._2 === 1)

// For right child, indexRight can be still applied by FilterIndexRule.
verifyIndexUsage(
query,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++
getIndexFilesPath(indexConfig2.indexName, Seq(0)) ++ appendedFiles)
}
}
}
}
Expand Down