Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support distribution type hint to allow broadcast join #14797

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Jackie-Jiang
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang commented Jan 11, 2025

Add support for customizing distribution type with join hint.

Allowed distribution types:

  • LOCAL
  • HASH
  • BROADCAST
  • RANDOM

Added 2 new join hint:

  • left_distribution_type
  • right_distribution_type

To achieve broadcast join without shuffling left side:

SELECT /*+ joinOptions(left_distribution_type = 'local', right_distribution_type = 'broadcast') */ a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col2 = 'foo' AND b.col2 = 'bar'

Related to #14518

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes multi-stage Related to the multi-stage query engine labels Jan 11, 2025
@codecov-commenter
Copy link

codecov-commenter commented Jan 11, 2025

Codecov Report

Attention: Patch coverage is 44.68085% with 52 lines in your changes missing coverage. Please review.

Project coverage is 63.72%. Comparing base (59551e4) to head (c65d1e8).
Report is 1592 commits behind head on master.

Files with missing lines Patch % Lines
...ite/rel/rules/PinotJoinExchangeNodeInsertRule.java 27.41% 39 Missing and 6 partials ⚠️
...pache/pinot/calcite/rel/hint/PinotHintOptions.java 71.42% 4 Missing and 2 partials ⚠️
.../org/apache/pinot/query/routing/WorkerManager.java 90.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14797      +/-   ##
============================================
+ Coverage     61.75%   63.72%   +1.97%     
- Complexity      207     1610    +1403     
============================================
  Files          2436     2708     +272     
  Lines        133233   151304   +18071     
  Branches      20636    23364    +2728     
============================================
+ Hits          82274    96422   +14148     
- Misses        44911    47634    +2723     
- Partials       6048     7248    +1200     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.71% <44.68%> (+2.00%) ⬆️
java-21 63.61% <44.68%> (+1.99%) ⬆️
skip-bytebuffers-false 63.72% <44.68%> (+1.97%) ⬆️
skip-bytebuffers-true 63.59% <44.68%> (+35.86%) ⬆️
temurin 63.72% <44.68%> (+1.97%) ⬆️
unittests 63.72% <44.68%> (+1.97%) ⬆️
unittests1 56.31% <44.68%> (+9.41%) ⬆️
unittests2 34.02% <0.00%> (+6.29%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@gortiz
Copy link
Contributor

gortiz commented Jan 14, 2025

I'll take a look, but I think we need to rethink the names of join strategies. A strategy should not be defined uniquely by what it does on one of the sides of the join. This case, for example, is clear: The broadcast strategy will be applied on the right-hand side, but what will happen with the left one? We already have a strategy where the right-hand side is broadcasted, but the left is randomly shuffled.

@gortiz
Copy link
Contributor

gortiz commented Jan 14, 2025

@@ -54,10 +54,15 @@ public void onMatch(RelOptRuleCall call) {
JoinInfo joinInfo = join.analyzeCondition();
RelNode newLeft;
RelNode newRight;
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) {
String joinStrategyHint = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join);
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(joinStrategyHint)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what about having a PinotHintOptions.JoinHintOptions.getJoinStrategy that returns an enum we can use in a switch here? We expect to add at least 3 new extra strategies (listed in #14518, including random + broadcast right now cannot be specified), so the switch syntax may be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN);
} else {
// TODO: Consider renaming this operator type. It handles multiple join strategies.
Copy link
Contributor

@gortiz gortiz Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are using names here in a strange way. This is a hash operator because it implements the join using a hash map. The other is a lookup join operator because it implements it using lookup logic.

In parallel, we have join strategies. One of the strategies creates logical partitions at query time based on the values of the columns being joined. The way these partitions are decided is based on hash code, so it is called hash strategy. In the documentation I used Query time partition join strategy because I didn't want to focus too much on the fact that is being using hashes.

Imagine a scenario where we add sorted joins. The type of the join should be sort and the strategy used for the distribution of its inputs may be hash.

TL;DR: I think we need to distinguish between join algorithm (lookup, hash, sorted, nested look) and distribution strategies (hash/partitioned, local, randon, broadcast, etc). The algorithm will probably change the operator class being used while the distribution strategy will change the exchange of the children of the joins

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should separate the type of join and the type of distribution/shuffle. The latter is not unique to joins and could also be used in e.g. aggregations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Removed this TODO, and we can revisit this when adding the next join operator

@gortiz
Copy link
Contributor

gortiz commented Jan 14, 2025

We need to add documentation at https://docs.pinot.apache.org/users/user-guide-query/multi-stage-query/join-strategies. Feel free to use my diagrams at https://app.excalidraw.com/s/6rIIm06x9LN/amPNwZicV0. I don't know how to share excalidraw diagrams with edit permissions without giving the write permission to the whole internet.

}
PlanNode childPlanNode = children.get(0).getFragmentRoot();
return childPlanNode instanceof MailboxSendNode
&& ((MailboxSendNode) childPlanNode).getDistributionType() == RelDistribution.Type.SINGLETON;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not return -1 and avoid allocating object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@Jackie-Jiang
Copy link
Contributor Author

I'll take a look, but I think we need to rethink the names of join strategies. A strategy should not be defined uniquely by what it does on one of the sides of the join. This case, for example, is clear: The broadcast strategy will be applied on the right-hand side, but what will happen with the left one? We already have a strategy where the right-hand side is broadcasted, but the left is randomly shuffled.

I feel BROADCAST usually means fixing one side, and broadcasting the other side. Several query engines only support this strategy. We don't have an explicit join strategy for randomly shuffling left side and broadcasting right side, so if we want to add one, we can think of a new name for this less commonly used one.

@siddharthteotia
Copy link
Contributor

siddharthteotia commented Jan 15, 2025

I think I had given some feedback on the naming thing long time ago when Multi Stage engine was being developed. I can't seem to find the thread now.

Ideally (this is how OLAP engines typically do), there is a clear distinction between an EXCHANGE type and the implementation of physical operator.

In this case:

Exchange types can be (to name a few)

  • BROADCAST
  • HASH_PARTITION (like Spark shuffle)
  • ROUND_ROBIN
  • SEND_TO_SINGLE

Exchange itself is an implemented as a pair operator:

  • Sender operator as the root operator in the sender stage (downstream)
  • Receiver operator as the leaf operator in the receiver stage (upstream)

Regardless of the exchange, there is some processing done in the receiver stage after the exchange (between 2 stages). In this case, it will be JOIN

  • Logical Operation - JOIN
  • Exchange type - BROADCAST
  • Physical operation (depending on planner / optimizer) - HashJoin, Sort-Merge Join, NLJ etc

Typically, the receiver does a HashJoin after a broadcast exchange but I don't think this is always going to be true

With that being said, BROADCAST is NOT A JOIN Strategy. It is an exchange type (e.g an exchange between two Major Fragments in Presto / Trino).

So, we should try to build this clear distinction both in code and design.

@siddharthteotia
Copy link
Contributor

May be for this PR it is fine if we are trying to get something going for a large fact to small dimension table JOIN. My recommendation would be to start thinking about revamping / refactoring all of this. It would make future additions more flexible, decouple physical operator implementation from exchange types (which should always be the case) and implement exchange as operators as well which will make it easier to optimize a plan with the desired exchange type.

@Jackie-Jiang
Copy link
Contributor Author

@siddharthteotia I like the idea of de-coupling exchange and join algorithm. Currently they are coupled under join strategy, where we support hash, lookup, dynamic_broadcast before this PR. We can start thinking how to organize the hint so that we can combine different exchange type with join algorithm. Exchange type is also useful for other operations such as aggregate.

We can address this as a separate effort. cc @ankitsultana

@siddharthteotia
Copy link
Contributor

we can combine different exchange type with join algorithm. Exchange type is also useful for other operations such as aggregate.

Exactly. It will be much more flexible to build an optimal plan with the decoupling where we can choose exchange type based on data characteristics and/or physical operator algorithm.

@Jackie-Jiang
Copy link
Contributor Author

I'm thinking changing the join_strategy hint to exchange_type hint (still under joinOptions). For this particular PR's purpose, I can add local_broadcast to represent left local, right broadcast. @gortiz @siddharthteotia wdyt?

@ankitsultana
Copy link
Contributor

@siddharthteotia I am right now working on refactoring the optimizer where many of the optimizations like coloration, skipping of partial aggregates, etc. will become automatic. Will add you to the Slack channel.

@gortiz
Copy link
Contributor

gortiz commented Jan 15, 2025

You are right @siddharthteotia. The distribution is a property of the exchange. I was discussing that with @bziobrowski yesterday, and he rightfully mentioned that exchange types also affect aggregates.

I'm thinking changing the join_strategy hint to exchange_type hint (still under joinOptions). For this particular PR's purpose, I can add local_broadcast to represent left local, right broadcast. @gortiz @siddharthteotia wdyt?

+1 to that.

@siddharthteotia
Copy link
Contributor

Sounds good.

I'm thinking changing the join_strategy hint to exchange_type hint (still under joinOptions)

I am ok with this for now. It's fine to provide exchange type as hint as long as the exchange_type is not a property of solely for JoinOptions. It should be independent and JoinOptions or any query type should be able to leverage it especially if user know what they are doing and are trying to dictate the exchange-type via hints.

@Jackie-Jiang Jackie-Jiang changed the title Support BROADCAST join strategy Support exchange type hint to allow broadcast join Jan 16, 2025
@Jackie-Jiang
Copy link
Contributor Author

Updated the PR to decouple exchange type from join strategy. Allow customizing exchange type for both left and right side.

@@ -125,6 +125,14 @@
"description": "Colocated JOIN with partition column and group by non-partitioned column with stage parallelism",
"sql": "SET stageParallelism=2; SELECT {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name"
},
{
"description": "Broadcast JOIN without partition hint",
"sql": "SELECT /*+ joinOptions(left_exchange_type = 'local', right_exchange_type = 'broadcast') */ {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be better to create a new hint for exchanges instead of assigning it to the join?

The way I'm suggesting the query would be something like:

SELECT 
    {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val
FROM {tbl1} /*+ exchangeOption(type = 'local') */
JOIN {tbl2} /*+ exchangeOption(type = 'broadcast') */
ON {tbl1}.num = {tbl2}.num;

This could also be used for example in aggregates. For example, we could write something like:

SELECT
    {tbl1}.num, count(*)
from {tbl1} /*+ exchangeOption(type = 'local') */
GROUP BY {tbl1}.num;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Let me try it and see if I can make it work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not able to make it work because this is not really a TABLE_SCAN option, or an option applied to any specific RelNode. The left and right side of a JOIN could be any RelNode, and it could be another chained JOIN. Do you see a way to extract this hint?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not using this?
https://calcite.apache.org/javadocAggregate/org/apache/calcite/rel/hint/package-summary.html

Calcite has good support for hint extraction and propagation.

@Jackie-Jiang Jackie-Jiang changed the title Support exchange type hint to allow broadcast join Support distribution type hint to allow broadcast join Jan 17, 2025
RelNode newLeft;
RelNode newRight;
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) {
// Lookup join - add local exchange on the left side
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
// Lookup join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) this function is somewhat less readable. we should consider refactoring

we should also add validation. for example, a query can't have a exchange hint for left side as BROADCAST and right side as HASH for the same JOIN op.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation feature multi-stage Related to the multi-stage query engine release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants