diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs deleted file mode 100644 index 950be36b67..0000000000 --- a/native/core/src/execution/operators/copy.rs +++ /dev/null @@ -1,307 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::compute::{cast_with_options, CastOptions}; -use futures::{Stream, StreamExt}; -use std::{ - any::Any, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use arrow::array::{ - downcast_dictionary_array, make_array, Array, ArrayRef, MutableArrayData, RecordBatch, - RecordBatchOptions, -}; -use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; -use arrow::error::ArrowError; -use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; -use datafusion::physical_plan::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; -use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; -use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*}; - -/// An utility execution node which makes deep copies of input batches. -/// -/// In certain scenarios like sort, DF execution nodes only make shallow copy of input batches. -/// This could cause issues for Comet, since we re-use column vectors across different batches. -/// For those scenarios, this can be used as an adapter node. -#[derive(Debug)] -pub struct CopyExec { - input: Arc, - schema: SchemaRef, - cache: PlanProperties, - metrics: ExecutionPlanMetricsSet, - mode: CopyMode, -} - -#[derive(Debug, PartialEq, Clone)] -pub enum CopyMode { - /// Perform a deep copy and also unpack dictionaries - UnpackOrDeepCopy, - /// Perform a clone and also unpack dictionaries - UnpackOrClone, -} - -impl CopyExec { - pub fn new(input: Arc, mode: CopyMode) -> Self { - // change schema to remove dictionary types because CopyExec always unpacks - // dictionaries - - let fields: Vec = input - .schema() - .fields - .iter() - .map(|f: &FieldRef| match f.data_type() { - DataType::Dictionary(_, value_type) => { - Field::new(f.name(), value_type.as_ref().clone(), f.is_nullable()) - } - _ => f.as_ref().clone(), - }) - .collect(); - - let schema = Arc::new(Schema::new(fields)); - - let cache = PlanProperties::new( - EquivalenceProperties::new(Arc::clone(&schema)), - Partitioning::UnknownPartitioning(1), - EmissionType::Final, - Boundedness::Bounded, - ); - - Self { - input, - schema, - cache, - metrics: ExecutionPlanMetricsSet::default(), - mode, - } - } - - pub fn input(&self) -> &Arc { - &self.input - } - - pub fn mode(&self) -> &CopyMode { - &self.mode - } -} - -impl DisplayAs for CopyExec { - fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "CopyExec [{:?}]", self.mode) - } - DisplayFormatType::TreeRender => unimplemented!(), - } - } -} - -impl ExecutionPlan for CopyExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.input] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> DataFusionResult> { - let input = Arc::clone(&self.input); - let new_input = input.with_new_children(children)?; - Ok(Arc::new(CopyExec { - input: new_input, - schema: Arc::clone(&self.schema), - cache: self.cache.clone(), - metrics: self.metrics.clone(), - mode: self.mode.clone(), - })) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> DataFusionResult { - let child_stream = self.input.execute(partition, context)?; - Ok(Box::pin(CopyStream::new( - self, - self.schema(), - child_stream, - partition, - self.mode.clone(), - ))) - } - - fn partition_statistics(&self, partition: Option) -> DataFusionResult { - self.input.partition_statistics(partition) - } - - fn properties(&self) -> &PlanProperties { - &self.cache - } - - fn name(&self) -> &str { - "CopyExec" - } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - - fn maintains_input_order(&self) -> Vec { - vec![true; self.children().len()] - } - - fn supports_limit_pushdown(&self) -> bool { - true - } - - fn cardinality_effect(&self) -> CardinalityEffect { - CardinalityEffect::Equal - } -} - -struct CopyStream { - schema: SchemaRef, - child_stream: SendableRecordBatchStream, - baseline_metrics: BaselineMetrics, - mode: CopyMode, -} - -impl CopyStream { - fn new( - exec: &CopyExec, - schema: SchemaRef, - child_stream: SendableRecordBatchStream, - partition: usize, - mode: CopyMode, - ) -> Self { - Self { - schema, - child_stream, - baseline_metrics: BaselineMetrics::new(&exec.metrics, partition), - mode, - } - } - - // TODO: replace copy_or_cast_array with copy_array if upstream sort kernel fixes - // dictionary array sorting issue. - fn copy(&self, batch: RecordBatch) -> DataFusionResult { - let mut timer = self.baseline_metrics.elapsed_compute().timer(); - let vectors = batch - .columns() - .iter() - .map(|v| copy_or_unpack_array(v, &self.mode)) - .collect::, _>>()?; - - let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let maybe_batch = - RecordBatch::try_new_with_options(Arc::clone(&self.schema), vectors, &options) - .map_err(|e| arrow_datafusion_err!(e)); - timer.stop(); - self.baseline_metrics.record_output(batch.num_rows()); - maybe_batch - } -} - -impl Stream for CopyStream { - type Item = DataFusionResult; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.child_stream.poll_next_unpin(cx).map(|x| match x { - Some(Ok(batch)) => Some(self.copy(batch)), - other => other, - }) - } -} - -impl RecordBatchStream for CopyStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - -/// Copy an Arrow Array -pub(crate) fn copy_array(array: &dyn Array) -> ArrayRef { - let capacity = array.len(); - let data = array.to_data(); - - let mut mutable = MutableArrayData::new(vec![&data], false, capacity); - - mutable.extend(0, 0, capacity); - - if matches!(array.data_type(), DataType::Dictionary(_, _)) { - let copied_dict = make_array(mutable.freeze()); - let ref_copied_dict = &copied_dict; - - downcast_dictionary_array!( - ref_copied_dict => { - // Copying dictionary value array - let values = ref_copied_dict.values(); - let data = values.to_data(); - - let mut mutable = MutableArrayData::new(vec![&data], false, values.len()); - mutable.extend(0, 0, values.len()); - - let copied_dict = ref_copied_dict.with_values(make_array(mutable.freeze())); - Arc::new(copied_dict) - } - t => unreachable!("Should not reach here: {}", t) - ) - } else { - make_array(mutable.freeze()) - } -} - -/// Copy an Arrow Array or cast to primitive type if it is a dictionary array. -/// This is used for `CopyExec` to copy/cast the input array. If the input array -/// is a dictionary array, we will cast the dictionary array to primitive type -/// (i.e., unpack the dictionary array) and copy the primitive array. If the input -/// array is a primitive array, we simply copy the array. -pub(crate) fn copy_or_unpack_array( - array: &Arc, - mode: &CopyMode, -) -> Result { - match array.data_type() { - DataType::Dictionary(_, value_type) => { - let options = CastOptions::default(); - // We need to copy the array after `cast` because arrow-rs `take` kernel which is used - // to unpack dictionary array might reuse the input array's null buffer. - Ok(copy_array(&cast_with_options( - array, - value_type.as_ref(), - &options, - )?)) - } - _ => { - if mode == &CopyMode::UnpackOrDeepCopy { - Ok(copy_array(array)) - } else { - Ok(Arc::clone(array)) - } - } - } -} diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index c8cfebd45e..f292c83b38 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -21,10 +21,8 @@ use std::fmt::Debug; use jni::objects::GlobalRef; -pub use copy::*; pub use scan::*; -mod copy; mod expand; pub use expand::ExpandExec; mod scan; diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index fef4a5b9e9..9f356b1729 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::operators::{copy_array, copy_or_unpack_array, CopyMode}; use crate::{ errors::CometError, execution::{ @@ -23,9 +22,12 @@ use crate::{ }, jvm_bridge::{jni_call, JVMClasses}, }; -use arrow::array::{make_array, ArrayData, ArrayRef, RecordBatch, RecordBatchOptions}; +use arrow::array::{ + make_array, Array, ArrayData, ArrayRef, MutableArrayData, RecordBatch, RecordBatchOptions, +}; use arrow::compute::{cast_with_options, take, CastOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::downcast_dictionary_array; use arrow::ffi::FFI_ArrowArray; use arrow::ffi::FFI_ArrowSchema; use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; @@ -81,8 +83,6 @@ pub struct ScanExec { jvm_fetch_time: Time, /// Time spent in FFI arrow_ffi_time: Time, - /// Whether native code can assume ownership of batches that it receives - arrow_ffi_safe: bool, } impl ScanExec { @@ -91,7 +91,6 @@ impl ScanExec { input_source: Option>, input_source_description: &str, data_types: Vec, - arrow_ffi_safe: bool, ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); @@ -112,7 +111,6 @@ impl ScanExec { data_types.len(), &jvm_fetch_time, &arrow_ffi_time, - arrow_ffi_safe, )?; timer.stop(); batch @@ -143,7 +141,6 @@ impl ScanExec { jvm_fetch_time, arrow_ffi_time, schema, - arrow_ffi_safe, }) } @@ -178,7 +175,6 @@ impl ScanExec { self.data_types.len(), &self.jvm_fetch_time, &self.arrow_ffi_time, - self.arrow_ffi_safe, )?; *current_batch = Some(next_batch); } @@ -195,7 +191,6 @@ impl ScanExec { num_cols: usize, jvm_fetch_time: &Time, arrow_ffi_time: &Time, - arrow_ffi_safe: bool, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -264,15 +259,10 @@ impl ScanExec { array }; - let array = if arrow_ffi_safe { - // ownership of this array has been transferred to native - // but we still need to unpack dictionary arrays - copy_or_unpack_array(&array, &CopyMode::UnpackOrClone)? - } else { - // it is necessary to copy the array because the contents may be - // overwritten on the JVM side in the future - copy_array(&array) - }; + // copy array immediately to release JVM-side ArrowArray/ArrowSchema wrappers + // and also unpack dictionaries because not all DataFusion operators and expressions + // support them + let array = copy_array(&array); inputs.push(array); @@ -653,3 +643,35 @@ impl InputBatch { InputBatch::Batch(columns, num_rows) } } + +/// Copy an Arrow Array +fn copy_array(array: &dyn Array) -> ArrayRef { + let capacity = array.len(); + let data = array.to_data(); + + let mut mutable = MutableArrayData::new(vec![&data], false, capacity); + + mutable.extend(0, 0, capacity); + + if matches!(array.data_type(), DataType::Dictionary(_, _)) { + let copied_dict = make_array(mutable.freeze()); + let ref_copied_dict = &copied_dict; + + downcast_dictionary_array!( + ref_copied_dict => { + // Copying dictionary value array + let values = ref_copied_dict.values(); + let data = values.to_data(); + + let mut mutable = MutableArrayData::new(vec![&data], false, values.len()); + mutable.extend(0, 0, values.len()); + + let copied_dict = ref_copied_dict.with_values(make_array(mutable.freeze())); + Arc::new(copied_dict) + } + t => unreachable!("Should not reach here: {}", t) + ) + } else { + make_array(mutable.freeze()) + } +} diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index da56e01bb2..23dba9e8ce 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -17,12 +17,11 @@ //! Converts Spark physical plan to DataFusion physical plan -use crate::execution::operators::CopyMode; use crate::{ errors::ExpressionError, execution::{ expressions::subquery::Subquery, - operators::{CopyExec, ExecutionError, ExpandExec, ScanExec}, + operators::{ExecutionError, ExpandExec, ScanExec}, serde::to_arrow_datatype, shuffle::ShuffleWriterExec, }, @@ -1091,17 +1090,10 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - let filter: Arc = if filter.wrap_child_in_copy_exec { - Arc::new(FilterExec::try_new( - predicate, - Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), - )?) - } else { - Arc::new(FilterExec::try_new( - predicate, - Arc::clone(&child.native_plan), - )?) - }; + let filter: Arc = Arc::new(FilterExec::try_new( + predicate, + Arc::clone(&child.native_plan), + )?); Ok(( scans, @@ -1229,15 +1221,13 @@ impl PhysicalPlanner { .collect(); let fetch = sort.fetch.map(|num| num as usize); - // SortExec caches batches so we need to make a copy of incoming batches. Also, - // SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and - // it would be more efficient if we could avoid that. - // https://github.com/apache/datafusion-comet/issues/963 - let child_copied = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)); let mut sort_exec: Arc = Arc::new( - SortExec::new(LexOrdering::new(exprs?).unwrap(), Arc::clone(&child_copied)) - .with_fetch(fetch), + SortExec::new( + LexOrdering::new(exprs?).unwrap(), + Arc::clone(&child.native_plan), + ) + .with_fetch(fetch), ); if let Some(skip) = sort.skip.filter(|&n| n > 0).map(|n| n as usize) { @@ -1384,13 +1374,8 @@ impl PhysicalPlanner { }; // The `ScanExec` operator will take actual arrays from Spark during execution - let scan = ScanExec::new( - self.exec_context_id, - input_source, - &scan.source, - data_types, - scan.arrow_ffi_safe, - )?; + let scan = + ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?; Ok(( vec![scan.clone()], @@ -1401,13 +1386,9 @@ impl PhysicalPlanner { assert_eq!(children.len(), 1); let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; - // We wrap native shuffle in a CopyExec. This existed previously, but for - // RangePartitioning at least we want to ensure that dictionaries are unpacked. - let wrapped_child = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)); - let partitioning = self.create_partitioning( writer.partitioning.as_ref().unwrap(), - wrapped_child.schema(), + child.native_plan.schema(), )?; let codec = match writer.codec.try_into() { @@ -1424,7 +1405,7 @@ impl PhysicalPlanner { }?; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( - wrapped_child, + Arc::clone(&child.native_plan), partitioning, codec, writer.output_data_file.clone(), @@ -1514,8 +1495,8 @@ impl PhysicalPlanner { }) .collect(); - let left = Self::wrap_in_copy_exec(Arc::clone(&join_params.left.native_plan)); - let right = Self::wrap_in_copy_exec(Arc::clone(&join_params.right.native_plan)); + let left = Arc::clone(&join_params.left.native_plan); + let right = Arc::clone(&join_params.right.native_plan); let join = Arc::new(SortMergeJoinExec::try_new( Arc::clone(&left), @@ -1577,12 +1558,8 @@ impl PhysicalPlanner { partition_count, )?; - // HashJoinExec may cache the input batch internally. We need - // to copy the input batch to avoid the data corruption from reusing the input - // batch. We also need to unpack dictionary arrays, because the join operators - // do not support them. - let left = Self::wrap_in_copy_exec(Arc::clone(&join_params.left.native_plan)); - let right = Self::wrap_in_copy_exec(Arc::clone(&join_params.right.native_plan)); + let left = Arc::clone(&join_params.left.native_plan); + let right = Arc::clone(&join_params.right.native_plan); let hash_join = Arc::new(HashJoinExec::try_new( left, @@ -1812,11 +1789,6 @@ impl PhysicalPlanner { )) } - /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays. - fn wrap_in_copy_exec(plan: Arc) -> Arc { - Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) - } - /// Create a DataFusion physical aggregate expression from Spark physical aggregate expression fn create_agg_expr( &self, @@ -2985,7 +2957,6 @@ mod tests { type_info: None, }], source: "".to_string(), - arrow_ffi_safe: false, })), }; @@ -3059,7 +3030,6 @@ mod tests { type_info: None, }], source: "".to_string(), - arrow_ffi_safe: false, })), }; @@ -3267,7 +3237,6 @@ mod tests { op_struct: Some(OpStruct::Scan(spark_operator::Scan { fields: vec![create_proto_datatype()], source: "".to_string(), - arrow_ffi_safe: false, })), } } @@ -3310,7 +3279,6 @@ mod tests { }, ], source: "".to_string(), - arrow_ffi_safe: false, })), }; @@ -3426,7 +3394,6 @@ mod tests { }, ], source: "".to_string(), - arrow_ffi_safe: false, })), }; diff --git a/native/core/src/execution/spark_plan.rs b/native/core/src/execution/spark_plan.rs index 3334292629..560d00c5cd 100644 --- a/native/core/src/execution/spark_plan.rs +++ b/native/core/src/execution/spark_plan.rs @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::operators::CopyExec; use arrow::datatypes::SchemaRef; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; /// Wrapper around a native plan that maps to a Spark plan and can optionally contain /// references to other native plans that should contribute to the Spark SQL metrics -/// for the root plan (such as CopyExec and ScanExec nodes) +/// for the root plan (such as ScanExec nodes and additional projections) #[derive(Debug, Clone)] pub(crate) struct SparkPlan { /// Spark plan ID (used for informational purposes only) @@ -43,15 +42,11 @@ impl SparkPlan { native_plan: Arc, children: Vec>, ) -> Self { - let mut additional_native_plans: Vec> = vec![]; - for child in &children { - collect_additional_plans(Arc::clone(&child.native_plan), &mut additional_native_plans); - } Self { plan_id, native_plan, children, - additional_native_plans, + additional_native_plans: vec![], } } @@ -66,9 +61,6 @@ impl SparkPlan { for plan in &additional_native_plans { accum.push(Arc::clone(plan)); } - for child in &children { - collect_additional_plans(Arc::clone(&child.native_plan), &mut accum); - } Self { plan_id, native_plan, @@ -87,12 +79,3 @@ impl SparkPlan { &self.children } } - -fn collect_additional_plans( - child: Arc, - additional_native_plans: &mut Vec>, -) { - if child.as_any().is::() { - additional_native_plans.push(Arc::clone(&child)); - } -} diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index a243ab6b03..1f4abd7dde 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -77,8 +77,6 @@ message Scan { // is purely for informational purposes when viewing native query plans in // debug mode. string source = 2; - // Whether native code can assume ownership of batches that it receives - bool arrow_ffi_safe = 3; } message NativeScan { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9f418e3068..e7e64f0f90 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1501,23 +1501,6 @@ object QueryPlanSerde extends Logging with CometExprShim { scanBuilder.setSource(source) } - val ffiSafe = op match { - case _ if isExchangeSink(op) => - // Source of broadcast exchange batches is ArrowStreamReader - // Source of shuffle exchange batches is NativeBatchDecoderIterator - true - case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_COMET => - // native_comet scan reuses mutable buffers - false - case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => - // native_iceberg_compat scan reuses mutable buffers for constant columns - // https://github.com/apache/datafusion-comet/issues/2152 - false - case _ => - false - } - scanBuilder.setArrowFfiSafe(ffiSafe) - val scanTypes = op.output.flatten { attr => serializeDataType(attr.dataType) } diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 006112d2b0..8043b81b3d 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -200,6 +200,10 @@ class CometFuzzTestSuite extends CometFuzzTestBase { } test("join") { + // TODO enable native_datafusion tests + // https://github.com/apache/datafusion-comet/issues/2660 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") df.createOrReplaceTempView("t2") diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index d47b4e0c1a..010757d3a2 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -74,6 +74,10 @@ class CometJoinSuite extends CometTestBase { } test("Broadcast HashJoin without join filter") { + // TODO enable native_datafusion tests + // https://github.com/apache/datafusion-comet/issues/2660 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + withSQLConf( CometConf.COMET_BATCH_SIZE.key -> "100", SQLConf.PREFER_SORTMERGEJOIN.key -> "false", @@ -101,6 +105,10 @@ class CometJoinSuite extends CometTestBase { } test("Broadcast HashJoin with join filter") { + // TODO enable native_datafusion tests + // https://github.com/apache/datafusion-comet/issues/2660 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + withSQLConf( CometConf.COMET_BATCH_SIZE.key -> "100", SQLConf.PREFER_SORTMERGEJOIN.key -> "false",