-
Notifications
You must be signed in to change notification settings - Fork 115
Check and remove unnecessary shuffle added by Hybrid Scan #331
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,13 +24,17 @@ import org.apache.spark.sql.catalyst.analysis.CleanupAliases | |
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, AttributeSet, EqualTo, Expression} | ||
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.execution.SparkPlan | ||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} | ||
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec | ||
import org.apache.spark.sql.execution.joins.SortMergeJoinExec | ||
|
||
import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} | ||
import com.microsoft.hyperspace.actions.Constants | ||
import com.microsoft.hyperspace.index._ | ||
import com.microsoft.hyperspace.index.rankers.JoinIndexRanker | ||
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} | ||
import com.microsoft.hyperspace.util.HyperspaceConf | ||
import com.microsoft.hyperspace.util.ResolverUtils._ | ||
|
||
/** | ||
|
@@ -68,18 +72,54 @@ object JoinIndexRule | |
right = | ||
RuleUtils.transformPlanToUseIndex(spark, rIndex, r, useBucketSpec = true)) | ||
|
||
def getShuffleCnt(sparkPlan: SparkPlan): Long = { | ||
sparkPlan.collect { case _: ShuffleExchangeExec => true }.length | ||
} | ||
|
||
val resultPlan = | ||
if (!HyperspaceConf.hybridScanEnabled(spark) || !HyperspaceConf | ||
.hybridScanShuffleCheckEnabled(spark)) { | ||
updatedPlan | ||
} else { | ||
val shuffleCntPair = spark.sessionState | ||
.executePlan(updatedPlan) | ||
.executedPlan | ||
.collect { | ||
case smj: SortMergeJoinExec => | ||
(getShuffleCnt(smj.left), getShuffleCnt(smj.right)) | ||
} | ||
assert(shuffleCntPair.length <= 1) | ||
shuffleCntPair.headOption.flatMap { | ||
// If the number of shuffle is 2, the candidate index pair cannot remove | ||
// the shuffles in both left and right for join and also Hybrid Scan causes | ||
// an additional shuffle for merging appended files. We don't apply the index | ||
Comment on lines
+93
to
+95
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a bit confused with this comment. For example, is the additional shuffle by hybrid scan also counting toward 2? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1 for sort merge join & 1 for hybrid scan on the fly shuffle, in left or right child plan. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea please reword the comments. |
||
// for the child with 2 shuffle nodes using JoinIndexRule. | ||
// However, the child node is still applicable for FilterIndexRule. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you clarify on the reference to |
||
case (leftCnt, _) if leftCnt == 2 => Some(updatedPlan.copy(left = l)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible that rightCnt is also 2 for this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, Spark will shuffle with a higher bucket number. So only 1 shuffle for sort merge join can be required. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, let's add this to the comment. |
||
case (_, rightCnt) if rightCnt == 2 => Some(updatedPlan.copy(right = r)) | ||
case _ => None | ||
}.getOrElse { | ||
updatedPlan | ||
} | ||
} | ||
|
||
val appliedIndexes = resultPlan match { | ||
case j: Join if j.left.equals(l) => Seq(rIndex) | ||
case j: Join if j.right.equals(r) => Seq(lIndex) | ||
case _ => Seq(lIndex, rIndex) | ||
} | ||
|
||
logEvent( | ||
HyperspaceIndexUsageEvent( | ||
AppInfo( | ||
sparkContext.sparkUser, | ||
sparkContext.applicationId, | ||
sparkContext.appName), | ||
Seq(lIndex, rIndex), | ||
appliedIndexes, | ||
join.toString, | ||
updatedPlan.toString, | ||
resultPlan.toString, | ||
"Join index rule applied.")) | ||
|
||
updatedPlan | ||
resultPlan | ||
} | ||
.getOrElse(join) | ||
} catch { | ||
|
@@ -325,11 +365,7 @@ object JoinIndexRule | |
compatibleIndexPairs.map( | ||
indexPairs => | ||
JoinIndexRanker | ||
.rank( | ||
spark, | ||
leftRel, | ||
rightRel, | ||
indexPairs) | ||
.rank(spark, leftRel, rightRel, indexPairs) | ||
.head) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that this will be recursively called since this is called inside a rule? (I am not sure if this is a good pattern or not to be honest).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. how about just using
JoinSelection
& and just check the bucket number of candidate index pair if it's SortMergeJoin?Or we could generate the plan w/o hyperspace rules