diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index cf4bf2cd163fd..3d05c65365953 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -529,35 +529,48 @@ fn estimate_join_cardinality( }) } - // For SemiJoins estimation result is either zero, in cases when inputs - // are non-overlapping according to statistics, or equal to number of rows - // for outer input - JoinType::LeftSemi | JoinType::RightSemi => { - let (outer_stats, inner_stats) = match join_type { - JoinType::LeftSemi => (left_stats, right_stats), - _ => (right_stats, left_stats), - }; - let cardinality = match estimate_disjoint_inputs(&outer_stats, &inner_stats) { - Some(estimation) => *estimation.get_value()?, - None => *outer_stats.num_rows.get_value()?, - }; + JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::LeftAnti + | JoinType::RightAnti => { + let is_left = matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti); + let is_anti = matches!(join_type, JoinType::LeftAnti | JoinType::RightAnti); + + let ((outer_stats, inner_stats), (outer_col_stats, inner_col_stats)) = + if is_left { + ( + (&left_stats, &right_stats), + (&left_col_stats, &right_col_stats), + ) + } else { + ( + (&right_stats, &left_stats), + (&right_col_stats, &left_col_stats), + ) + }; - Some(PartialJoinStatistics { - num_rows: cardinality, - column_statistics: outer_stats.column_statistics, - }) - } + let outer_rows = *outer_stats.num_rows.get_value()?; - // For AntiJoins estimation always equals to outer statistics, as - // non-overlapping inputs won't affect estimation - JoinType::LeftAnti | JoinType::RightAnti => { - let outer_stats = match join_type { - JoinType::LeftAnti => left_stats, - _ => right_stats, - }; + let cardinality = + if estimate_disjoint_inputs(outer_stats, inner_stats).is_some() { + // Disjoint inputs: semi produces 0, anti keeps all rows. + if is_anti { outer_rows } else { 0 } + } else { + match estimate_semi_join_cardinality( + &outer_stats.num_rows, + &inner_stats.num_rows, + outer_col_stats, + inner_col_stats, + ) { + Some(semi) if is_anti => outer_rows.saturating_sub(semi), + Some(semi) => semi, + None => outer_rows, + } + }; + let outer_stats = if is_left { left_stats } else { right_stats }; Some(PartialJoinStatistics { - num_rows: *outer_stats.num_rows.get_value()?, + num_rows: cardinality, column_statistics: outer_stats.column_statistics, }) } @@ -697,6 +710,95 @@ fn estimate_disjoint_inputs( None } +/// Estimates the number of outer rows that have at least one matching +/// key on the inner side (i.e. semi join cardinality) using NDV +/// (Number of Distinct Values) statistics. +/// +/// Assuming the smaller domain is contained in the larger, the number +/// of overlapping distinct values is `min(outer_ndv, inner_ndv)`. +/// Under the uniformity assumption (each distinct value contributes +/// equally to row counts), the surviving fraction of outer rows is: +/// +/// Null rows cannot match, so each column's selectivity is further +/// reduced by the outer null fraction: +/// +/// ```text +/// null_frac_i = outer_null_count_i / outer_rows +/// selectivity_i = min(outer_ndv_i, inner_ndv_i) / outer_ndv_i * (1 - null_frac_i) +/// ``` +/// +/// For multi-column join keys the overall selectivity is the product +/// of per-column factors: +/// +/// ```text +/// semi_cardinality = outer_rows * product_i(selectivity_i) +/// ``` +/// +/// Anti join cardinality is derived as the complement: +/// `outer_rows - semi_cardinality`. +/// +/// Boundary cases: +/// * `inner_ndv >= outer_ndv` → selectivity = `1.0 - null_frac` +/// * `null_frac = 1.0` → selectivity = 0.0 (no non-null rows can match) +/// * Missing NDV statistics → returns `None` (fallback to `outer_rows`) +/// +/// PostgreSQL uses a similar approach in `eqjoinsel_semi` +/// (`src/backend/utils/adt/selfuncs.c`). When NDV statistics are +/// available on both sides it computes selectivity as `nd2 / nd1`, +/// which is equivalent to `min(outer_ndv, inner_ndv) / outer_ndv`. +/// If either side lacks statistics it falls back to a default. +fn estimate_semi_join_cardinality( + outer_num_rows: &Precision, + inner_num_rows: &Precision, + outer_col_stats: &[ColumnStatistics], + inner_col_stats: &[ColumnStatistics], +) -> Option { + let outer_rows = *outer_num_rows.get_value()?; + if outer_rows == 0 { + return Some(0); + } + let inner_rows = *inner_num_rows.get_value()?; + if inner_rows == 0 { + return Some(0); + } + + let mut selectivity = 1.0_f64; + let mut has_selectivity_estimate = false; + + for (outer_stat, inner_stat) in outer_col_stats.iter().zip(inner_col_stats.iter()) { + let outer_has_stats = outer_stat.distinct_count.get_value().is_some() + || (outer_stat.min_value.get_value().is_some() + && outer_stat.max_value.get_value().is_some()); + let inner_has_stats = inner_stat.distinct_count.get_value().is_some() + || (inner_stat.min_value.get_value().is_some() + && inner_stat.max_value.get_value().is_some()); + if !outer_has_stats || !inner_has_stats { + continue; + } + + let outer_ndv = max_distinct_count(outer_num_rows, outer_stat); + let inner_ndv = max_distinct_count(inner_num_rows, inner_stat); + + if let (Some(&o), Some(&i)) = (outer_ndv.get_value(), inner_ndv.get_value()) + && o > 0 + { + let null_frac = outer_stat + .null_count + .get_value() + .map(|&nc| nc as f64 / outer_rows as f64) + .unwrap_or(0.0); + selectivity *= (o.min(i) as f64) / (o as f64) * (1.0 - null_frac); + has_selectivity_estimate = true; + } + } + + if has_selectivity_estimate { + Some((outer_rows as f64 * selectivity).ceil() as usize) + } else { + None + } +} + /// Estimate the number of maximum distinct values that can be present in the /// given column from its statistics. If distinct_count is available, uses it /// directly. Otherwise, if the column is numeric and has min/max values, it @@ -2566,7 +2668,7 @@ mod tests { JoinType::LeftSemi, (50, Inexact(10), Inexact(20), Absent, Absent), (10, Inexact(15), Inexact(25), Absent, Absent), - Some(50), + Some(46), ), ( JoinType::RightSemi, @@ -2602,13 +2704,13 @@ mod tests { JoinType::LeftAnti, (50, Inexact(10), Inexact(20), Absent, Absent), (10, Inexact(15), Inexact(25), Absent, Absent), - Some(50), + Some(4), ), ( JoinType::RightAnti, (50, Inexact(10), Inexact(20), Absent, Absent), (10, Inexact(15), Inexact(25), Absent, Absent), - Some(10), + Some(0), ), ( JoinType::LeftAnti, @@ -2634,6 +2736,89 @@ mod tests { (10, Inexact(30), Absent, Absent, Absent), Some(50), ), + // NDV-based semi join: outer_ndv=20, inner_ndv=10 + // selectivity = 10/20 = 0.5, cardinality = ceil(50 * 0.5) = 25 + ( + JoinType::LeftSemi, + (50, Inexact(1), Inexact(100), Inexact(20), Absent), + (10, Inexact(1), Inexact(100), Inexact(10), Absent), + Some(25), + ), + // inner_ndv(30) >= outer_ndv(20) -> selectivity 1.0, no reduction + ( + JoinType::LeftSemi, + (50, Inexact(1), Inexact(100), Inexact(20), Absent), + (100, Inexact(1), Inexact(100), Inexact(30), Absent), + Some(50), + ), + // NDV-based anti join: semi=25, anti = 50 - 25 = 25 + ( + JoinType::LeftAnti, + (50, Inexact(1), Inexact(100), Inexact(20), Absent), + (10, Inexact(1), Inexact(100), Inexact(10), Absent), + Some(25), + ), + // inner covers all outer: semi=50, anti = 0 + ( + JoinType::LeftAnti, + (50, Inexact(1), Inexact(100), Inexact(20), Absent), + (100, Inexact(1), Inexact(100), Inexact(30), Absent), + Some(0), + ), + // RightSemi with explicit NDV + ( + JoinType::RightSemi, + (50, Inexact(1), Inexact(100), Inexact(10), Absent), + (10, Inexact(1), Inexact(100), Inexact(20), Absent), + Some(5), + ), + // RightAnti with explicit NDV + ( + JoinType::RightAnti, + (50, Inexact(1), Inexact(100), Inexact(10), Absent), + (10, Inexact(1), Inexact(100), Inexact(20), Absent), + Some(5), + ), + // Empty inner table: no match possible, semi → 0 + ( + JoinType::LeftSemi, + (100, Absent, Absent, Absent, Absent), + (0, Absent, Absent, Absent, Absent), + Some(0), + ), + // NDV-based semi with nulls on outer side: + // outer_ndv=20, inner_ndv=10, null_frac=10/100=0.1 + // selectivity = 10/20 * (1-0.1) = 0.5 * 0.9 = 0.45 + // semi = ceil(100 * 0.45) = 45 + ( + JoinType::LeftSemi, + (100, Absent, Absent, Inexact(20), Inexact(10)), + (200, Absent, Absent, Inexact(10), Absent), + Some(45), + ), + // Anti-join with nulls on outer side: + // semi=45, anti = 100 - 45 = 55 + ( + JoinType::LeftAnti, + (100, Absent, Absent, Inexact(20), Inexact(10)), + (200, Absent, Absent, Inexact(10), Absent), + Some(55), + ), + // All outer rows are null: null_frac=1.0 + // selectivity = 10/20 * (1-1.0) = 0.0, semi = 0 + ( + JoinType::LeftSemi, + (100, Absent, Absent, Inexact(20), Inexact(100)), + (200, Absent, Absent, Inexact(10), Absent), + Some(0), + ), + // All outer rows are null (anti): anti = 100 - 0 = 100 + ( + JoinType::LeftAnti, + (100, Absent, Absent, Inexact(20), Inexact(100)), + (200, Absent, Absent, Inexact(10), Absent), + Some(100), + ), ]; let join_on = vec![( @@ -2753,6 +2938,157 @@ mod tests { Ok(()) } + #[test] + fn test_semi_join_multi_column_and_mixed_stats() -> Result<()> { + let join_on = vec![ + ( + Arc::new(Column::new("l_col0", 0)) as _, + Arc::new(Column::new("r_col0", 0)) as _, + ), + ( + Arc::new(Column::new("l_col1", 1)) as _, + Arc::new(Column::new("r_col1", 1)) as _, + ), + ]; + + // Multi-column: both columns have NDV on both sides. + // col0: outer_ndv=20, inner_ndv=10 → selectivity = 10/20 = 0.5 + // col1: outer_ndv=40, inner_ndv=10 → selectivity = 10/40 = 0.25 + // total selectivity = 0.5 * 0.25 = 0.125 + // semi = ceil(100 * 0.125) = 13 + let result = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Inexact(100), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Inexact(20), Absent), + create_column_stats(Absent, Absent, Inexact(40), Absent), + ], + }, + Statistics { + num_rows: Inexact(200), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Inexact(10), Absent), + create_column_stats(Absent, Absent, Inexact(10), Absent), + ], + }, + &join_on, + ) + .map(|c| c.num_rows); + assert_eq!(result, Some(13), "multi-column semi join"); + + // Multi-column anti: anti = 100 - 13 = 87 + let result = estimate_join_cardinality( + &JoinType::LeftAnti, + Statistics { + num_rows: Inexact(100), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Inexact(20), Absent), + create_column_stats(Absent, Absent, Inexact(40), Absent), + ], + }, + Statistics { + num_rows: Inexact(200), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Inexact(10), Absent), + create_column_stats(Absent, Absent, Inexact(10), Absent), + ], + }, + &join_on, + ) + .map(|c| c.num_rows); + assert_eq!(result, Some(87), "multi-column anti join"); + + // Mixed stats: col0 has NDV on both sides, col1 has NDV only on outer. + // col1 is skipped (either side missing), so selectivity comes from col0 only. + // col0: outer_ndv=20, inner_ndv=10 → selectivity = 0.5 + // semi = ceil(100 * 0.5) = 50 + let result = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Inexact(100), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Inexact(20), Absent), + create_column_stats(Absent, Absent, Inexact(40), Absent), + ], + }, + Statistics { + num_rows: Inexact(200), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Inexact(10), Absent), + create_column_stats(Absent, Absent, Absent, Absent), + ], + }, + &join_on, + ) + .map(|c| c.num_rows); + assert_eq!(result, Some(50), "mixed stats: col1 skipped"); + + // Mixed stats: neither column has stats on both sides → fallback to outer_rows + let result = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Inexact(100), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Inexact(20), Absent), + create_column_stats(Absent, Absent, Absent, Absent), + ], + }, + Statistics { + num_rows: Inexact(200), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Absent, Absent), + create_column_stats(Absent, Absent, Inexact(10), Absent), + ], + }, + &join_on, + ) + .map(|c| c.num_rows); + assert_eq!(result, Some(100), "no column has stats on both sides"); + + // Multi-column with nulls on one column: + // col0: outer_ndv=20, inner_ndv=10, null_frac=0.0 → 10/20 * 1.0 = 0.5 + // col1: outer_ndv=40, inner_ndv=10, null_frac=20/100=0.2 → 10/40 * 0.8 = 0.2 + // total selectivity = 0.5 * 0.2 = 0.1 + // semi = ceil(100 * 0.1) = 10 + let result = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Inexact(100), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Inexact(20), Absent), + create_column_stats(Absent, Absent, Inexact(40), Inexact(20)), + ], + }, + Statistics { + num_rows: Inexact(200), + total_byte_size: Absent, + column_statistics: vec![ + create_column_stats(Absent, Absent, Inexact(10), Absent), + create_column_stats(Absent, Absent, Inexact(10), Absent), + ], + }, + &join_on, + ) + .map(|c| c.num_rows); + assert_eq!( + result, + Some(10), + "multi-column semi join with nulls on one column" + ); + + Ok(()) + } + #[test] fn test_calculate_join_output_ordering() -> Result<()> { let left_ordering = LexOrdering::new(vec![