Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
62bfa8b
lazy_eval_coalesce
coderfender Aug 31, 2025
e78bc99
lazy_coalesce_fallback_case_statement
coderfender Aug 31, 2025
2f8701e
lazy_coalesce_fallback_case_statement
coderfender Aug 31, 2025
bcda6c3
lazy_coalesce_fallback_case_statement
coderfender Aug 31, 2025
f9ec308
lazy_coalesce_fallback_case_statement
coderfender Sep 2, 2025
85acae5
init_ansi_mode_enabled
coderfender Aug 13, 2025
d4d125a
init_ansi_mode_enabled_add_tests
coderfender Aug 14, 2025
0d36d60
init_ansi_mode_enabled_add_tests
coderfender Aug 14, 2025
46790d3
init_ansi_mode_enabled_add_tests
coderfender Aug 14, 2025
0e95fa1
init_ansi_mode_enabled_add_tests
coderfender Aug 14, 2025
7e0603e
init_ansi_mode_enabled_add_tests
coderfender Aug 15, 2025
88f3745
init_ansi_mode_enabled_add_tests_exceptions
coderfender Aug 17, 2025
b8f6f64
disable_coalesce_ansi_mode
coderfender Aug 20, 2025
4fce770
disable_coalesce_ansi_mode
coderfender Aug 20, 2025
2b762fc
disable_coalesce_ansi_mode
coderfender Aug 23, 2025
332b402
disable_coalesce_ansi_mode
coderfender Aug 24, 2025
6897c53
undo_golden_file_generation_changes
coderfender Aug 31, 2025
7a0e9f8
lazy_coalesce_fallback_case_statement
coderfender Aug 31, 2025
219648e
lazy_coalesce_fallback_case_statement_rebase
coderfender Sep 3, 2025
82b654c
lazy_coalesce_fallback_case_statement_rebase
coderfender Sep 3, 2025
fc2a217
rebase_main
coderfender Sep 4, 2025
5255a75
rebase_main
coderfender Sep 4, 2025
44d1155
rebase_main
coderfender Sep 4, 2025
d8e30c6
rebase_main
coderfender Sep 5, 2025
7d5b742
rebase_main
coderfender Sep 6, 2025
31440b0
rebase_main
coderfender Sep 6, 2025
daaac80
fix_diff_file_generation
coderfender Sep 9, 2025
0cad323
fix_diff_file_generation
coderfender Sep 9, 2025
72a639c
rebase_main
coderfender Sep 9, 2025
f79f34d
address_review_comments
coderfender Sep 18, 2025
d08955c
address_review_comments
coderfender Sep 18, 2025
6835db4
impl_ansi
coderfender Sep 18, 2025
b70ac3b
impl_ansi_fix_edge_case
coderfender Sep 18, 2025
1eca257
impl_ansi_rebase_main
coderfender Sep 19, 2025
0ef832b
impl_ansi_rebase_main
coderfender Sep 19, 2025
182e245
impl_ansi_rebase_main
coderfender Sep 19, 2025
d8b8b27
rebase
coderfender Sep 25, 2025
0991f13
rebase
coderfender Sep 25, 2025
9603223
rebase
coderfender Sep 25, 2025
d9fbd0c
Merge branch 'main' into support_ansi_mode_arithmetic_functions
coderfender Sep 25, 2025
2398fd8
rebase
coderfender Sep 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,19 @@ index 41fd4de2a09..44cd244d3b0 100644
-- Test aggregate operator with codegen on and off.
--CONFIG_DIM1 spark.sql.codegen.wholeStage=true
--CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
index 3a409eea348..38fed024c98 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
@@ -69,6 +69,8 @@ SELECT '' AS one, i.* FROM INT4_TBL i WHERE (i.f1 % smallint('2')) = smallint('1
-- any evens
SELECT '' AS three, i.* FROM INT4_TBL i WHERE (i.f1 % int('2')) = smallint('0');

+-- https://github.com/apache/datafusion-comet/issues/2215
+--SET spark.comet.exec.enabled=false
-- [SPARK-28024] Incorrect value when out of range
SELECT '' AS five, i.f1, i.f1 * smallint('2') AS x FROM INT4_TBL i;

diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
index fac23b4a26f..2b73732c33f 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
Expand Down Expand Up @@ -881,7 +894,7 @@ index b5b34922694..a72403780c4 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 525d97e4998..5e04319dd97 100644
index 525d97e4998..843f0472c23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand All @@ -894,7 +907,27 @@ index 525d97e4998..5e04319dd97 100644
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
@@ -4467,7 +4468,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -4429,7 +4430,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("SPARK-39166: Query context of binary arithmetic should be serialized to executors" +
- " when WSCG is off") {
+ " when WSCG is off",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2215")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4450,7 +4452,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("SPARK-39175: Query context of Cast should be serialized to executors" +
- " when WSCG is off") {
+ " when WSCG is off",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2215")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4467,14 +4470,19 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
val msg = intercept[SparkException] {
sql(query).collect()
}.getMessage
Expand All @@ -907,6 +940,15 @@ index 525d97e4998..5e04319dd97 100644
}
}
}
}

test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " +
- "be serialized to executors when WSCG is off") {
+ "be serialized to executors when WSCG is off",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2215")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 48ad10992c5..51d1ee65422 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
Expand Down
46 changes: 44 additions & 2 deletions dev/diffs/3.5.6.diff
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@ index 41fd4de2a09..44cd244d3b0 100644
-- Test aggregate operator with codegen on and off.
--CONFIG_DIM1 spark.sql.codegen.wholeStage=true
--CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
index 3a409eea348..38fed024c98 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
@@ -69,6 +69,8 @@ SELECT '' AS one, i.* FROM INT4_TBL i WHERE (i.f1 % smallint('2')) = smallint('1
-- any evens
SELECT '' AS three, i.* FROM INT4_TBL i WHERE (i.f1 % int('2')) = smallint('0');

+-- https://github.com/apache/datafusion-comet/issues/2215
+--SET spark.comet.exec.enabled=false
-- [SPARK-28024] Incorrect value when out of range
SELECT '' AS five, i.f1, i.f1 * smallint('2') AS x FROM INT4_TBL i;

diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
index fac23b4a26f..2b73732c33f 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
Expand Down Expand Up @@ -866,7 +879,7 @@ index c26757c9cff..d55775f09d7 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 793a0da6a86..e48e74091cb 100644
index 793a0da6a86..181bfc16e4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand All @@ -879,7 +892,27 @@ index 793a0da6a86..e48e74091cb 100644
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
@@ -4497,7 +4498,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -4459,7 +4460,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("SPARK-39166: Query context of binary arithmetic should be serialized to executors" +
- " when WSCG is off") {
+ " when WSCG is off",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2215")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4480,7 +4482,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("SPARK-39175: Query context of Cast should be serialized to executors" +
- " when WSCG is off") {
+ " when WSCG is off",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2215")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4497,14 +4500,19 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
val msg = intercept[SparkException] {
sql(query).collect()
}.getMessage
Expand All @@ -892,6 +925,15 @@ index 793a0da6a86..e48e74091cb 100644
}
}
}
}

test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " +
- "be serialized to executors when WSCG is off") {
+ "be serialized to executors when WSCG is off",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2215")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
index fa1a64460fc..1d2e215d6a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
Expand Down
23 changes: 10 additions & 13 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ use datafusion::{
prelude::SessionContext,
};
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_modulo_expr, create_negate_expr, BinaryOutputStyle,
BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond,
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, create_modulo_expr,
create_negate_expr, BinaryOutputStyle, BloomFilterAgg, BloomFilterMightContain, EvalMode,
SparkHour, SparkMinute, SparkSecond,
};

use crate::execution::operators::ExecutionError::GeneralError;
Expand Down Expand Up @@ -242,8 +243,6 @@ impl PhysicalPlanner {
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
match spark_expr.expr_struct.as_ref().unwrap() {
ExprStruct::Add(expr) => {
// TODO respect ANSI eval mode
// https://github.com/apache/datafusion-comet/issues/536
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
self.create_binary_expr(
expr.left.as_ref().unwrap(),
Expand All @@ -255,8 +254,6 @@ impl PhysicalPlanner {
)
}
ExprStruct::Subtract(expr) => {
// TODO respect ANSI eval mode
// https://github.com/apache/datafusion-comet/issues/535
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
self.create_binary_expr(
expr.left.as_ref().unwrap(),
Expand All @@ -268,8 +265,6 @@ impl PhysicalPlanner {
)
}
ExprStruct::Multiply(expr) => {
// TODO respect ANSI eval mode
// https://github.com/apache/datafusion-comet/issues/534
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
self.create_binary_expr(
expr.left.as_ref().unwrap(),
Expand All @@ -281,8 +276,6 @@ impl PhysicalPlanner {
)
}
ExprStruct::Divide(expr) => {
// TODO respect ANSI eval mode
// https://github.com/apache/datafusion-comet/issues/533
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
self.create_binary_expr(
expr.left.as_ref().unwrap(),
Expand Down Expand Up @@ -1010,21 +1003,25 @@ impl PhysicalPlanner {
}
_ => {
let data_type = return_type.map(to_arrow_datatype).unwrap();
if eval_mode == EvalMode::Try && data_type.is_integer() {
if [EvalMode::Try, EvalMode::Ansi].contains(&eval_mode)
&& (data_type.is_integer()
|| (data_type.is_floating() && op == DataFusionOperator::Divide))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the float/divide case covered by existing tests, or should a new test be added for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comment. All division in comet is float or decimal division . I have had to add this check to make sure that float operands are supported only when the operation is division and the existing tests should cover this . I also have a draft PR ready for supporting Integral division #2421 which should be the ready for review once this is merged :)

{
let op_str = match op {
DataFusionOperator::Plus => "checked_add",
DataFusionOperator::Minus => "checked_sub",
DataFusionOperator::Multiply => "checked_mul",
DataFusionOperator::Divide => "checked_div",
_ => {
todo!("Operator yet to be implemented!");
todo!("ANSI mode for Operator yet to be implemented!");
}
};
let fun_expr = create_comet_physical_fun(
let fun_expr = create_comet_physical_fun_with_eval_mode(
op_str,
data_type.clone(),
&self.session_ctx.state(),
None,
eval_mode,
)?;
Ok(Arc::new(ScalarFunctionExpr::new(
op_str,
Expand Down
36 changes: 31 additions & 5 deletions native/spark-expr/src/comet_scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::math_funcs::modulo_expr::spark_modulo;
use crate::{
spark_array_repeat, spark_ceil, spark_date_add, spark_date_sub, spark_decimal_div,
spark_decimal_integral_div, spark_floor, spark_hex, spark_isnan, spark_make_decimal,
spark_read_side_padding, spark_round, spark_rpad, spark_unhex, spark_unscaled_value,
spark_read_side_padding, spark_round, spark_rpad, spark_unhex, spark_unscaled_value, EvalMode,
SparkBitwiseCount, SparkBitwiseGet, SparkBitwiseNot, SparkDateTrunc, SparkStringSpace,
};
use arrow::datatypes::DataType;
Expand Down Expand Up @@ -64,6 +64,15 @@ macro_rules! make_comet_scalar_udf {
);
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
}};
($name:expr, $func:ident, $data_type:ident, $eval_mode:ident) => {{
let scalar_func = CometScalarFunction::new(
$name.to_string(),
Signature::variadic_any(Volatility::Immutable),
$data_type.clone(),
Arc::new(move |args| $func(args, &$data_type, $eval_mode)),
);
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
}};
}

/// Create a physical scalar function.
Expand All @@ -72,6 +81,23 @@ pub fn create_comet_physical_fun(
data_type: DataType,
registry: &dyn FunctionRegistry,
fail_on_error: Option<bool>,
) -> Result<Arc<ScalarUDF>, DataFusionError> {
create_comet_physical_fun_with_eval_mode(
fun_name,
data_type,
registry,
fail_on_error,
EvalMode::Legacy,
)
}

/// Create a physical scalar function with eval mode. Goal is to deprecate above function once all the operators have ANSI support
pub fn create_comet_physical_fun_with_eval_mode(
fun_name: &str,
data_type: DataType,
registry: &dyn FunctionRegistry,
fail_on_error: Option<bool>,
eval_mode: EvalMode,
) -> Result<Arc<ScalarUDF>, DataFusionError> {
match fun_name {
"ceil" => {
Expand Down Expand Up @@ -117,16 +143,16 @@ pub fn create_comet_physical_fun(
)
}
"checked_add" => {
make_comet_scalar_udf!("checked_add", checked_add, data_type)
make_comet_scalar_udf!("checked_add", checked_add, data_type, eval_mode)
}
"checked_sub" => {
make_comet_scalar_udf!("checked_sub", checked_sub, data_type)
make_comet_scalar_udf!("checked_sub", checked_sub, data_type, eval_mode)
}
"checked_mul" => {
make_comet_scalar_udf!("checked_mul", checked_mul, data_type)
make_comet_scalar_udf!("checked_mul", checked_mul, data_type, eval_mode)
}
"checked_div" => {
make_comet_scalar_udf!("checked_div", checked_div, data_type)
make_comet_scalar_udf!("checked_div", checked_div, data_type, eval_mode)
}
"murmur3_hash" => {
let func = Arc::new(spark_murmur3_hash);
Expand Down
5 changes: 4 additions & 1 deletion native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ pub use conditional_funcs::*;
pub use conversion_funcs::*;
pub use nondetermenistic_funcs::*;

pub use comet_scalar_funcs::{create_comet_physical_fun, register_all_comet_functions};
pub use comet_scalar_funcs::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode,
register_all_comet_functions,
};
pub use datetime_funcs::{
spark_date_add, spark_date_sub, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond,
TimestampTruncExpr,
Expand Down
Loading
Loading