Skip to content

Commit 1f434dc

Browse files
feat: ClassicJoin for PWMJ (#17482)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - part of #17427 ## Rationale for this change Adds regular joins (left, right, full, inner) for PWMJ as they behave differently in the code path. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Adds classic join + physical planner <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes SLT tests + unit tests ## Follow up work to this pull request - Handling partitioned queries and multiple record batches (fuzz testing will be handled with this) - Simplify physical planning - Add more unit tests for different types (another pr as the LOC in this pr is getting a little daunting) next would be to implement the existence joins --------- Co-authored-by: Yongting You <[email protected]>
1 parent 37aad28 commit 1f434dc

File tree

14 files changed

+3058
-109
lines changed

14 files changed

+3058
-109
lines changed

datafusion/common/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,11 @@ config_namespace! {
938938
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
939939
pub prefer_hash_join: bool, default = true
940940

941+
/// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
942+
/// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
943+
/// one range filter.
944+
pub enable_piecewise_merge_join: bool, default = false
945+
941946
/// The maximum estimated size in bytes for one input side of a HashJoin
942947
/// will be collected into a single partition
943948
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024

datafusion/core/src/physical_planner.rs

Lines changed: 155 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,11 @@ use datafusion_expr::expr::{
7878
};
7979
use datafusion_expr::expr_rewriter::unnormalize_cols;
8080
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
81+
use datafusion_expr::utils::split_conjunction;
8182
use datafusion_expr::{
82-
Analyze, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType,
83-
Filter, JoinType, RecursiveQuery, SkipType, StringifiedPlan, WindowFrame,
84-
WindowFrameBound, WriteOp,
83+
Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
84+
FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan,
85+
WindowFrame, WindowFrameBound, WriteOp,
8586
};
8687
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8788
use datafusion_physical_expr::expressions::Literal;
@@ -91,6 +92,7 @@ use datafusion_physical_expr::{
9192
use datafusion_physical_optimizer::PhysicalOptimizerRule;
9293
use datafusion_physical_plan::empty::EmptyExec;
9394
use datafusion_physical_plan::execution_plan::InvariantLevel;
95+
use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
9496
use datafusion_physical_plan::metrics::MetricType;
9597
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
9698
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
@@ -1133,8 +1135,42 @@ impl DefaultPhysicalPlanner {
11331135
})
11341136
.collect::<Result<join_utils::JoinOn>>()?;
11351137

1138+
// TODO: `num_range_filters` can be used later on for ASOF joins (`num_range_filters > 1`)
1139+
let mut num_range_filters = 0;
1140+
let mut range_filters: Vec<Expr> = Vec::new();
1141+
let mut total_filters = 0;
1142+
11361143
let join_filter = match filter {
11371144
Some(expr) => {
1145+
let split_expr = split_conjunction(expr);
1146+
for expr in split_expr.iter() {
1147+
match *expr {
1148+
Expr::BinaryExpr(BinaryExpr {
1149+
left: _,
1150+
right: _,
1151+
op,
1152+
}) => {
1153+
if matches!(
1154+
op,
1155+
Operator::Lt
1156+
| Operator::LtEq
1157+
| Operator::Gt
1158+
| Operator::GtEq
1159+
) {
1160+
range_filters.push((**expr).clone());
1161+
num_range_filters += 1;
1162+
}
1163+
total_filters += 1;
1164+
}
1165+
// TODO: Want to deal with `Expr::Between` for IEJoins, it counts as two range predicates
1166+
// which is why it is not dealt with in PWMJ
1167+
// Expr::Between(_) => {},
1168+
_ => {
1169+
total_filters += 1;
1170+
}
1171+
}
1172+
}
1173+
11381174
// Extract columns from filter expression and saved in a HashSet
11391175
let cols = expr.column_refs();
11401176

@@ -1190,6 +1226,7 @@ impl DefaultPhysicalPlanner {
11901226
)?;
11911227
let filter_schema =
11921228
Schema::new_with_metadata(filter_fields, metadata);
1229+
11931230
let filter_expr = create_physical_expr(
11941231
expr,
11951232
&filter_df_schema,
@@ -1212,10 +1249,125 @@ impl DefaultPhysicalPlanner {
12121249
let prefer_hash_join =
12131250
session_state.config_options().optimizer.prefer_hash_join;
12141251

1252+
// TODO: Allow PWMJ to deal with residual equijoin conditions
12151253
let join: Arc<dyn ExecutionPlan> = if join_on.is_empty() {
12161254
if join_filter.is_none() && matches!(join_type, JoinType::Inner) {
12171255
// cross join if there is no join conditions and no join filter set
12181256
Arc::new(CrossJoinExec::new(physical_left, physical_right))
1257+
} else if num_range_filters == 1
1258+
&& total_filters == 1
1259+
&& !matches!(
1260+
join_type,
1261+
JoinType::LeftSemi
1262+
| JoinType::RightSemi
1263+
| JoinType::LeftAnti
1264+
| JoinType::RightAnti
1265+
| JoinType::LeftMark
1266+
| JoinType::RightMark
1267+
)
1268+
&& session_state
1269+
.config_options()
1270+
.optimizer
1271+
.enable_piecewise_merge_join
1272+
{
1273+
let Expr::BinaryExpr(be) = &range_filters[0] else {
1274+
return plan_err!(
1275+
"Unsupported expression for PWMJ: Expected `Expr::BinaryExpr`"
1276+
);
1277+
};
1278+
1279+
let mut op = be.op;
1280+
if !matches!(
1281+
op,
1282+
Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq
1283+
) {
1284+
return plan_err!(
1285+
"Unsupported operator for PWMJ: {:?}. Expected one of <, <=, >, >=",
1286+
op
1287+
);
1288+
}
1289+
1290+
fn reverse_ineq(op: Operator) -> Operator {
1291+
match op {
1292+
Operator::Lt => Operator::Gt,
1293+
Operator::LtEq => Operator::GtEq,
1294+
Operator::Gt => Operator::Lt,
1295+
Operator::GtEq => Operator::LtEq,
1296+
_ => op,
1297+
}
1298+
}
1299+
1300+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1301+
enum Side {
1302+
Left,
1303+
Right,
1304+
Both,
1305+
}
1306+
1307+
let side_of = |e: &Expr| -> Result<Side> {
1308+
let cols = e.column_refs();
1309+
let any_left = cols
1310+
.iter()
1311+
.any(|c| left_df_schema.index_of_column(c).is_ok());
1312+
let any_right = cols
1313+
.iter()
1314+
.any(|c| right_df_schema.index_of_column(c).is_ok());
1315+
1316+
Ok(match (any_left, any_right) {
1317+
(true, false) => Side::Left,
1318+
(false, true) => Side::Right,
1319+
(true, true) => Side::Both,
1320+
_ => unreachable!(),
1321+
})
1322+
};
1323+
1324+
let mut lhs_logical = &be.left;
1325+
let mut rhs_logical = &be.right;
1326+
1327+
let left_side = side_of(lhs_logical)?;
1328+
let right_side = side_of(rhs_logical)?;
1329+
if matches!(left_side, Side::Both)
1330+
|| matches!(right_side, Side::Both)
1331+
{
1332+
return Ok(Arc::new(NestedLoopJoinExec::try_new(
1333+
physical_left,
1334+
physical_right,
1335+
join_filter,
1336+
join_type,
1337+
None,
1338+
)?));
1339+
}
1340+
1341+
if left_side == Side::Right && right_side == Side::Left {
1342+
std::mem::swap(&mut lhs_logical, &mut rhs_logical);
1343+
op = reverse_ineq(op);
1344+
} else if !(left_side == Side::Left && right_side == Side::Right)
1345+
{
1346+
return plan_err!(
1347+
"Unsupported operator for PWMJ: {:?}. Expected one of <, <=, >, >=",
1348+
op
1349+
);
1350+
}
1351+
1352+
let on_left = create_physical_expr(
1353+
lhs_logical,
1354+
left_df_schema,
1355+
session_state.execution_props(),
1356+
)?;
1357+
let on_right = create_physical_expr(
1358+
rhs_logical,
1359+
right_df_schema,
1360+
session_state.execution_props(),
1361+
)?;
1362+
1363+
Arc::new(PiecewiseMergeJoinExec::try_new(
1364+
physical_left,
1365+
physical_right,
1366+
(on_left, on_right),
1367+
op,
1368+
*join_type,
1369+
session_state.config().target_partitions(),
1370+
)?)
12191371
} else {
12201372
// there is no equal join condition, use the nested loop join
12211373
Arc::new(NestedLoopJoinExec::try_new(

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,7 @@ impl HashJoinStream {
637637
let (left_side, right_side) = get_final_indices_from_shared_bitmap(
638638
build_side.left_data.visited_indices_bitmap(),
639639
self.join_type,
640+
true,
640641
);
641642
let empty_right_batch = RecordBatch::new_empty(self.right.schema());
642643
// use the left and right indices to produce the batch result

datafusion/physical-plan/src/joins/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ pub use hash_join::HashJoinExec;
2424
pub use nested_loop_join::NestedLoopJoinExec;
2525
use parking_lot::Mutex;
2626
// Note: SortMergeJoin is not used in plans yet
27+
pub use piecewise_merge_join::PiecewiseMergeJoinExec;
2728
pub use sort_merge_join::SortMergeJoinExec;
2829
pub use symmetric_hash_join::SymmetricHashJoinExec;
2930
mod cross_join;
3031
mod hash_join;
3132
mod nested_loop_join;
33+
mod piecewise_merge_join;
3234
mod sort_merge_join;
3335
mod stream_join_utils;
3436
mod symmetric_hash_join;

0 commit comments

Comments
 (0)