-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support distribution type hint to allow broadcast join #14797
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14797 +/- ##
============================================
+ Coverage 61.75% 63.72% +1.97%
- Complexity 207 1610 +1403
============================================
Files 2436 2708 +272
Lines 133233 151304 +18071
Branches 20636 23364 +2728
============================================
+ Hits 82274 96422 +14148
- Misses 44911 47634 +2723
- Partials 6048 7248 +1200
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
996e7a5
to
77e8914
Compare
I'll take a look, but I think we need to rethink the names of join strategies. A strategy should not be defined uniquely by what it does on one of the sides of the join. This case, for example, is clear: The broadcast strategy will be applied on the right-hand side, but what will happen with the left one? We already have a strategy where the right-hand side is broadcasted, but the left is randomly shuffled. |
@@ -54,10 +54,15 @@ public void onMatch(RelOptRuleCall call) { | |||
JoinInfo joinInfo = join.analyzeCondition(); | |||
RelNode newLeft; | |||
RelNode newRight; | |||
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) { | |||
String joinStrategyHint = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join); | |||
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(joinStrategyHint)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what about having a PinotHintOptions.JoinHintOptions.getJoinStrategy
that returns an enum we can use in a switch here? We expect to add at least 3 new extra strategies (listed in #14518, including random + broadcast right now cannot be specified), so the switch syntax may be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN); | ||
} else { | ||
// TODO: Consider renaming this operator type. It handles multiple join strategies. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are using names here in a strange way. This is a hash operator because it implements the join using a hash map. The other is a lookup join operator because it implements it using lookup logic.
In parallel, we have join strategies. One of the strategies creates logical partitions at query time based on the values of the columns being joined. The way these partitions are decided is based on hash code, so it is called hash strategy. In the documentation I used Query time partition join strategy because I didn't want to focus too much on the fact that is being using hashes.
Imagine a scenario where we add sorted joins. The type of the join should be sort and the strategy used for the distribution of its inputs may be hash.
TL;DR: I think we need to distinguish between join algorithm (lookup, hash, sorted, nested look) and distribution strategies (hash/partitioned, local, randon, broadcast, etc). The algorithm will probably change the operator class being used while the distribution strategy will change the exchange of the children of the joins
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we should separate the type of join and the type of distribution/shuffle. The latter is not unique to joins and could also be used in e.g. aggregations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Removed this TODO, and we can revisit this when adding the next join operator
We need to add documentation at https://docs.pinot.apache.org/users/user-guide-query/multi-stage-query/join-strategies. Feel free to use my diagrams at https://app.excalidraw.com/s/6rIIm06x9LN/amPNwZicV0. I don't know how to share excalidraw diagrams with edit permissions without giving the write permission to the whole internet. |
} | ||
PlanNode childPlanNode = children.get(0).getFragmentRoot(); | ||
return childPlanNode instanceof MailboxSendNode | ||
&& ((MailboxSendNode) childPlanNode).getDistributionType() == RelDistribution.Type.SINGLETON; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not return -1 and avoid allocating object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
77e8914
to
4b7f883
Compare
I feel |
4b7f883
to
01309b4
Compare
I think I had given some feedback on the naming thing long time ago when Multi Stage engine was being developed. I can't seem to find the thread now. Ideally (this is how OLAP engines typically do), there is a clear distinction between an EXCHANGE type and the implementation of physical operator. In this case: Exchange types can be (to name a few)
Exchange itself is an implemented as a pair operator:
Regardless of the exchange, there is some processing done in the receiver stage after the exchange (between 2 stages). In this case, it will be JOIN
Typically, the receiver does a HashJoin after a broadcast exchange but I don't think this is always going to be true With that being said, BROADCAST is NOT A JOIN Strategy. It is an exchange type (e.g an exchange between two Major Fragments in Presto / Trino). So, we should try to build this clear distinction both in code and design. |
May be for this PR it is fine if we are trying to get something going for a large fact to small dimension table JOIN. My recommendation would be to start thinking about revamping / refactoring all of this. It would make future additions more flexible, decouple physical operator implementation from exchange types (which should always be the case) and implement exchange as operators as well which will make it easier to optimize a plan with the desired exchange type. |
@siddharthteotia I like the idea of de-coupling exchange and join algorithm. Currently they are coupled under join strategy, where we support We can address this as a separate effort. cc @ankitsultana |
Exactly. It will be much more flexible to build an optimal plan with the decoupling where we can choose exchange type based on data characteristics and/or physical operator algorithm. |
I'm thinking changing the |
@siddharthteotia I am right now working on refactoring the optimizer where many of the optimizations like coloration, skipping of partial aggregates, etc. will become automatic. Will add you to the Slack channel. |
You are right @siddharthteotia. The distribution is a property of the exchange. I was discussing that with @bziobrowski yesterday, and he rightfully mentioned that exchange types also affect aggregates.
+1 to that. |
Sounds good.
I am ok with this for now. It's fine to provide exchange type as hint as long as the exchange_type is not a property of solely for JoinOptions. It should be independent and JoinOptions or any query type should be able to leverage it especially if user know what they are doing and are trying to dictate the exchange-type via hints. |
01309b4
to
c00a392
Compare
Updated the PR to decouple exchange type from join strategy. Allow customizing exchange type for both left and right side. |
@@ -125,6 +125,14 @@ | |||
"description": "Colocated JOIN with partition column and group by non-partitioned column with stage parallelism", | |||
"sql": "SET stageParallelism=2; SELECT {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name" | |||
}, | |||
{ | |||
"description": "Broadcast JOIN without partition hint", | |||
"sql": "SELECT /*+ joinOptions(left_exchange_type = 'local', right_exchange_type = 'broadcast') */ {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be better to create a new hint for exchanges instead of assigning it to the join?
The way I'm suggesting the query would be something like:
SELECT
{tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val
FROM {tbl1} /*+ exchangeOption(type = 'local') */
JOIN {tbl2} /*+ exchangeOption(type = 'broadcast') */
ON {tbl1}.num = {tbl2}.num;
This could also be used for example in aggregates. For example, we could write something like:
SELECT
{tbl1}.num, count(*)
from {tbl1} /*+ exchangeOption(type = 'local') */
GROUP BY {tbl1}.num;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. Let me try it and see if I can make it work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not able to make it work because this is not really a TABLE_SCAN
option, or an option applied to any specific RelNode
. The left and right side of a JOIN
could be any RelNode
, and it could be another chained JOIN
. Do you see a way to extract this hint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we not using this?
https://calcite.apache.org/javadocAggregate/org/apache/calcite/rel/hint/package-summary.html
Calcite has good support for hint extraction and propagation.
c00a392
to
c65d1e8
Compare
RelNode newLeft; | ||
RelNode newRight; | ||
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) { | ||
// Lookup join - add local exchange on the left side | ||
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON); | ||
// Lookup join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) this function is somewhat less readable. we should consider refactoring
we should also add validation. for example, a query can't have a exchange hint for left side as BROADCAST and right side as HASH for the same JOIN op.
Add support for customizing distribution type with join hint.
Allowed distribution types:
LOCAL
HASH
BROADCAST
RANDOM
Added 2 new join hint:
left_distribution_type
right_distribution_type
To achieve broadcast join without shuffling left side:
Related to #14518