Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Check and remove unnecessary shuffle added by Hybrid Scan #331

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

sezruby
Copy link
Collaborator

@sezruby sezruby commented Jan 25, 2021

What is the context for this pull request?

  • Tracking Issue: n/a
  • Parent Issue: n/a.
  • Dependencies: n/a

What changes were proposed in this pull request?

This PR can remove additional shuffle node by generating executedPlan for the transformed join query plan in optimizer.

  • get the number of shuffle nodes for left and right plan.
  • having 2 shuffle nodes means
      1. the candidate index pair cannot remove shuffles for both left and right
      1. Hybrid scan causes unnecessary shuffle to use BucketUnion for appended files

(fyi, there could be max. 3 shuffle nodes in left and right, because if both children of join are bucketed with different number, Spark will shuffle the child with the smaller bucket number of two, using the larger bucket number. For example,

  • num of buckets for a left candidate index = 11
  • num of buckets for a right candidate index = 12
  • Spark will reshuffle the left child using 12 buckets.

In this case, we don't need to shuffle for the appended files, because they will be shuffled for SortMergeJoin after all.
So Hyperspace doesn't use the transformed plan for the child.
But indexes can still be applied for the child using Filter Rule.

Does this PR introduce any user-facing change?

Yes

With this change:

*(6) Project [c2#139, c3#140, c4#151]
+- *(6) SortMergeJoin [c2#139], [c2#149], Inner
   :- *(3) Sort [c2#139 ASC NULLS FIRST], false, 0
   :  +- BucketUnion 12 buckets, bucket columns: [c2]
   :     :- *(1) Project [c2#139, c3#140]
   :     :  +- *(1) Filter isnotnull(c2#139)
   :     :     +- *(1) FileScan parquet [c2#139,c3#140] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/path/to/src/test/resources/indexLocation/index2..., PartitionFilters: [], PushedFilters: [IsNotNull(c2)], ReadSchema: struct<c2:string,c3:string>, SelectedBucketsCount: 12 out of 12
   :     +- Exchange hashpartitioning(c2#139, 12)
   :        +- *(2) Project [c2#139, c3#140]
   :           +- *(2) Filter isnotnull(c2#139)
   :              +- *(2) FileScan parquet [c2#139,c3#140] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/path/to/Local/Temp/spark-e607f7ad-557a-4980-9eb9-e7284ed..., PartitionFilters: [], PushedFilters: [IsNotNull(c2)], ReadSchema: struct<c2:string,c3:string>
   +- *(5) Sort [c2#149 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(c2#149, 12)
         +- *(4) Project [c2#149, c4#151]
            +- *(4) Filter isnotnull(c2#149)
               +- *(4) FileScan parquet [c2#149,c4#151] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/path/to/src/test/resources/indexLocation/index/..., PartitionFilters: [], PushedFilters: [IsNotNull(c2)], ReadSchema: struct<c2:string,c4:int>

Without this change:

*(7) Project [c2#139, c3#140, c4#151]
+- *(7) SortMergeJoin [c2#139], [c2#149], Inner
   :- *(3) Sort [c2#139 ASC NULLS FIRST], false, 0
   :  +- BucketUnion 12 buckets, bucket columns: [c2]
   :     :- *(1) Project [c2#139, c3#140]
   :     :  +- *(1) Filter isnotnull(c2#139)
   :     :     +- *(1) FileScan parquet [c2#139,c3#140] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/repo/hyperspace2/src/test/resources/indexLocation/index2..., PartitionFilters: [], PushedFilters: [IsNotNull(c2)], ReadSchema: struct<c2:string,c3:string>, SelectedBucketsCount: 12 out of 12
   :     +- Exchange hashpartitioning(c2#139, 12)
   :        +- *(2) Project [c2#139, c3#140]
   :           +- *(2) Filter isnotnull(c2#139)
   :              +- *(2) FileScan parquet [c2#139,c3#140] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/AppData/Local/Temp/spark-e3245dda-a52a-4c4f-859f-f43f0c9..., PartitionFilters: [], PushedFilters: [IsNotNull(c2)], ReadSchema: struct<c2:string,c3:string>
   +- *(6) Sort [c2#149 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(c2#149, 12)
         +- BucketUnion 11 buckets, bucket columns: [c2]
            :- *(4) Project [c2#149, c4#151]
            :  +- *(4) Filter isnotnull(c2#149)
            :     +- *(4) FileScan parquet [c2#149,c4#151] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/repo/hyperspace2/src/test/resources/indexLocation/index/..., PartitionFilters: [], PushedFilters: [IsNotNull(c2)], ReadSchema: struct<c2:string,c4:int>, SelectedBucketsCount: 11 out of 11
            +- Exchange hashpartitioning(c2#149, 11)
               +- *(5) Project [c2#149, c4#151]
                  +- *(5) Filter isnotnull(c2#149)
                     +- *(5) FileScan parquet [c2#149,c4#151] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/AppData/Local/Temp/spark-e3245dda-a52a-4c4f-859f-f43f0c9..., PartitionFilters: [], PushedFilters: [IsNotNull(c2)], ReadSchema: struct<c2:string,c4:int>

How was this patch tested?

Unit test

@rapoth rapoth added the advanced issue This is the tag for advanced issues which involve major design changes or introduction label Jan 27, 2021
@sezruby
Copy link
Collaborator Author

sezruby commented Jan 28, 2021

Measured executePlan performance using TPCH dataset:

val filter1 = linetable.filter(linetable("l_orderkey") isin (1234,12341234, 123456)).select("l_orderkey")
val filter2 = linetable.filter(linetable("l_orderkey") isin (1234,12341234)).select("l_orderkey")
val join = filter1.join(filter2, "l_orderkey")
val plan = join.queryExecution.optimizedPlan
measure(spark.sessionState.executePlan(plan).executedPlan)

val join = filter1.join(filter2, "l_orderkey")
val plan = join.queryExecution.optimizedPlan
measure(spark.sessionState.executePlan(plan).executedPlan)

val join = filter1.join(filter2, "l_orderkey")
val plan = join.queryExecution.optimizedPlan
measure(spark.sessionState.executePlan(plan).executedPlan)

// result (unit: ms)
duration: 233
duration: 97
duration: 97

@imback82
Copy link
Contributor

Measured executePlan performance using TPCH dataset:

Could you explain the importance of this benchmark? (I wasn't sure what to get out of this benchmark.)

@imback82
Copy link
Contributor

I just checked the code, and you meant to measure re-executing the updated plan?

Comment on lines +93 to +95
// If the number of shuffle is 2, the candidate index pair cannot remove
// the shuffles in both left and right for join and also Hybrid Scan causes
// an additional shuffle for merging appended files. We don't apply the index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused with this comment. For example, is the additional shuffle by hybrid scan also counting toward 2?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 for sort merge join & 1 for hybrid scan on the fly shuffle, in left or right child plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea please reword the comments.

// the shuffles in both left and right for join and also Hybrid Scan causes
// an additional shuffle for merging appended files. We don't apply the index
// for the child with 2 shuffle nodes using JoinIndexRule.
// However, the child node is still applicable for FilterIndexRule.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify on the reference to FilterIndexRule?

// an additional shuffle for merging appended files. We don't apply the index
// for the child with 2 shuffle nodes using JoinIndexRule.
// However, the child node is still applicable for FilterIndexRule.
case (leftCnt, _) if leftCnt == 2 => Some(updatedPlan.copy(left = l))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that rightCnt is also 2 for this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Spark will shuffle with a higher bucket number. So only 1 shuffle for sort merge join can be required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, let's add this to the comment.

@imback82
Copy link
Contributor

Here is what I suggest on how to write the benchmark paragraph (my thoughts in parenthesis):

This PR introduces triggering SparkSession.executePlan inside JoinIndexRule to check if unnecessary shuffle is introduced in the physical plan. Since SparkSession.executePlan may not be a cheap operation (I guess this is why you were doing?), I ran a benchmark on the change.

.. result here ...

I see the results to be 233ms, 97ms, 97ms (explain the reasoning on the high latency on the first iteration - prob. caching), and this doesn't look like introducing much overhead (but I would also compare when SparkSession.executePlan is turned off)

.hybridScanShuffleCheckEnabled(spark)) {
updatedPlan
} else {
val shuffleCntPair = spark.sessionState
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that this will be recursively called since this is called inside a rule? (I am not sure if this is a good pattern or not to be honest).

Copy link
Collaborator Author

@sezruby sezruby Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. how about just using JoinSelection & and just check the bucket number of candidate index pair if it's SortMergeJoin?
Or we could generate the plan w/o hyperspace rules

@sezruby sezruby marked this pull request as draft February 5, 2021 00:37
@clee704 clee704 added the stale There was no activity for this issue/PR for a while label Jun 15, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
advanced issue This is the tag for advanced issues which involve major design changes or introduction stale There was no activity for this issue/PR for a while
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants