From 376337058c411eead1dbeaee334ed30086360fd7 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 21 May 2026 16:44:09 -0400 Subject: [PATCH 1/6] enable CometLocalTableScanExec by default --- spark/src/main/scala/org/apache/comet/CometConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/CometConf.scala b/spark/src/main/scala/org/apache/comet/CometConf.scala index fdd1ae2073..faee23a8eb 100644 --- a/spark/src/main/scala/org/apache/comet/CometConf.scala +++ b/spark/src/main/scala/org/apache/comet/CometConf.scala @@ -273,7 +273,7 @@ object CometConf extends ShimCometConf { val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("takeOrderedAndProject", defaultValue = true) val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig("localTableScan", defaultValue = false) + createExecEnabledConfig("localTableScan", defaultValue = true) val COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.columnarToRow.native.enabled") From 810e5d5c38d106fae4a3bff6563137e3a5fcfd01 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 22 May 2026 07:46:49 -0400 Subject: [PATCH 2/6] add NullType to toArrowType --- .../main/scala/org/apache/spark/sql/comet/util/Utils.scala | 1 + .../test/scala/org/apache/comet/exec/CometExecSuite.scala | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 783367c054..4605e641f1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -148,6 +148,7 @@ object Utils extends CometTypeShim with Logging { } case TimestampNTZType => new ArrowType.Timestamp(TimeUnit.MICROSECOND, null) + case NullType => ArrowType.Null.INSTANCE case dt if isTimeType(dt) => new ArrowType.Time(TimeUnit.NANOSECOND, 64) case _ => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 16601d056b..8bf00de20c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3925,6 +3925,13 @@ class CometExecSuite extends CometTestBase { } } + test("CometLocalTableScanExec handles NullType column") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val df = spark.sql("SELECT * FROM VALUES ('a', null), ('b', null) AS t(x, y)") + checkSparkAnswer(df) + } + } + test("Native_datafusion reports correct files and bytes scanned") { val inputFiles = 2 From 174c939540a0be46d184f8e8b9d57a52ceae722a Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 22 May 2026 08:02:09 -0400 Subject: [PATCH 3/6] add NullType to shuffles --- native/shuffle/src/spark_unsafe/row.rs | 15 ++++++++++-- .../shuffle/CometShuffleExchangeExec.scala | 6 ++--- .../exec/CometColumnarShuffleSuite.scala | 23 ++++--------------- .../comet/exec/CometNativeShuffleSuite.scala | 6 +++++ 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/row.rs b/native/shuffle/src/spark_unsafe/row.rs index ec0903bc56..6ffe9d0b6e 100644 --- a/native/shuffle/src/spark_unsafe/row.rs +++ b/native/shuffle/src/spark_unsafe/row.rs @@ -28,8 +28,8 @@ use arrow::array::{ builder::{ ArrayBuilder, BinaryBuilder, BinaryDictionaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, - Int64Builder, Int8Builder, ListBuilder, MapBuilder, StringBuilder, StringDictionaryBuilder, - StructBuilder, TimestampMicrosecondBuilder, + Int64Builder, Int8Builder, ListBuilder, MapBuilder, NullBuilder, StringBuilder, + StringDictionaryBuilder, StructBuilder, TimestampMicrosecondBuilder, }, types::Int32Type, Array, ArrayRef, RecordBatch, RecordBatchOptions, @@ -267,6 +267,10 @@ pub(super) fn append_field( append_field_to_builder!(Date32Builder, |builder: &mut Date32Builder| builder .append_value(row.get_date(idx))); } + DataType::Null => { + let field_builder = get_field_builder!(struct_builder, NullBuilder, idx); + field_builder.append_null(); + } DataType::Timestamp(TimeUnit::Microsecond, _) => { append_field_to_builder!( TimestampMicrosecondBuilder, @@ -1148,6 +1152,12 @@ fn append_columns( .append_value(row.get_date(idx)) ); } + DataType::Null => { + let null_builder = downcast_builder_ref!(NullBuilder, builder); + for _ in row_start..row_end { + null_builder.append_null(); + } + } DataType::Timestamp(TimeUnit::Microsecond, _) => { append_column_to_builder!( TimestampMicrosecondBuilder, @@ -1252,6 +1262,7 @@ fn make_builders( } } DataType::Date32 => Box::new(Date32Builder::with_capacity(row_num)), + DataType::Null => Box::new(NullBuilder::new()), DataType::Timestamp(TimeUnit::Microsecond, _) => { Box::new(TimestampMicrosecondBuilder::with_capacity(row_num).with_data_type(dt.clone())) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 493c20f8b7..16e7a8b774 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructType, TimestampNTZType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, NullType, ShortType, StringType, StructType, TimestampNTZType, TimestampType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} @@ -364,7 +364,7 @@ object CometShuffleExchangeExec def supportedSerializableDataType(dt: DataType): Boolean = dt match { case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | - _: TimestampNTZType | _: DecimalType | _: DateType => + _: TimestampNTZType | _: DecimalType | _: DateType | _: NullType => true case StructType(fields) => fields.nonEmpty && fields.forall(f => supportedSerializableDataType(f.dataType)) @@ -487,7 +487,7 @@ object CometShuffleExchangeExec def supportedSerializableDataType(dt: DataType): Boolean = dt match { case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | - _: TimestampNTZType | _: DecimalType | _: DateType => + _: TimestampNTZType | _: DecimalType | _: DateType | _: NullType => true case StructType(fields) => fields.nonEmpty && fields.forall(f => supportedSerializableDataType(f.dataType)) && diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 86c6a6aa4b..70d427972a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -22,14 +22,13 @@ package org.apache.comet.exec import java.nio.file.{Files, Paths} import scala.reflect.runtime.universe._ -import scala.util.Random import org.scalactic.source.Position import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.{Partitioner, SparkConf} -import org.apache.spark.sql.{CometTestBase, DataFrame, RandomDataGenerator, Row} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec @@ -94,22 +93,10 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar """.stripMargin)) } - test("Fallback to Spark for unsupported input besides ordering") { - val dataGenerator = RandomDataGenerator - .forType( - dataType = NullType, - nullable = true, - new Random(System.nanoTime()), - validJulianDatetime = false) - .get - - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", NullType, nullable = true) - val rdd = - spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator()))) - val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - checkSparkAnswer(df) + test("columnar shuffle with NullType passthrough column") { + val df = sql("SELECT x, y FROM VALUES ('a', null), ('b', null), ('c', null) AS t(x, y)") + val shuffled = df.repartition(2, $"x") + checkShuffleAnswer(shuffled, 1) } test("columnar shuffle on nested struct including nulls") { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index e0ef1df1f4..60637102f0 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -218,6 +218,12 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + test("native shuffle with NullType passthrough column") { + val df = spark.sql("SELECT x, y FROM VALUES ('a', null), ('b', null), ('c', null) AS t(x, y)") + val shuffled = df.repartition(2, $"x") + checkShuffleAnswer(shuffled, 1) + } + test("fix: Comet native shuffle with binary data") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl") From 3790c1022b71cbe47cbde8e4553304373cfde1f4 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 22 May 2026 09:12:23 -0400 Subject: [PATCH 4/6] fix windowexec test and nulltype. fix timetype issues --- .../sql/comet/CometLocalTableScanExec.scala | 32 +++++++++++++++++-- .../apache/comet/exec/CometExecSuite.scala | 18 +++++++++++ .../comet/exec/CometWindowExecSuite.scala | 10 +----- 3 files changed, 49 insertions(+), 11 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index 622168bcc9..0a836bc389 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.comet +import scala.collection.mutable.ListBuffer + import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -27,11 +29,13 @@ import org.apache.spark.sql.comet.CometLocalTableScanExec.createMetricsIterator import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.types.{DataType, NullType} import org.apache.spark.sql.vectorized.ColumnarBatch import com.google.common.base.Objects -import org.apache.comet.{CometConf, ConfigEntry} +import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} +import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.operator.CometSink @@ -104,7 +108,7 @@ case class CometLocalTableScanExec( override def hashCode(): Int = Objects.hashCode(originalPlan, originalPlan.schema, output) } -object CometLocalTableScanExec extends CometSink[LocalTableScanExec] { +object CometLocalTableScanExec extends CometSink[LocalTableScanExec] with DataTypeSupport { // uses CometArrowConverters, which re-uses arrays override def isFfiSafe: Boolean = false @@ -112,6 +116,30 @@ object CometLocalTableScanExec extends CometSink[LocalTableScanExec] { override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED) + // CometArrowConverters / ArrowWriter support NullType (via Utils.toArrowType + + // NullWriter). Other types not on DataTypeSupport's allow list (e.g. TimeType, + // intervals) lack ArrowWriter coverage and must fall back to Spark. + override def isTypeSupported( + dt: DataType, + name: String, + fallbackReasons: ListBuffer[String]): Boolean = dt match { + case _: NullType => true + case _ => super.isTypeSupported(dt, name, fallbackReasons) + } + + override def convert( + op: LocalTableScanExec, + builder: Operator.Builder, + childOp: Operator*): Option[Operator] = { + val fallbackReasons = new ListBuffer[String]() + if (!isSchemaSupported(op.schema, fallbackReasons)) { + withInfo(op, fallbackReasons.mkString("; ")) + None + } else { + super.convert(op, builder, childOp: _*) + } + } + override def createExec(nativeOp: Operator, op: LocalTableScanExec): CometNativeExec = { CometScanWrapper(nativeOp, CometLocalTableScanExec(op, op.rows, op.output)) } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 8bf00de20c..c3e903f883 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3932,6 +3932,24 @@ class CometExecSuite extends CometTestBase { } } + test("CometLocalTableScanExec handles NullType nested in struct/array/map") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + checkSparkAnswer( + spark.sql("SELECT named_struct('a', 1, 'b', null) AS s, array(null, null) AS a, " + + "map('k', null) AS m")) + } + } + + test("CometLocalTableScanExec falls back when schema contains TimeType") { + assume( + org.apache.comet.CometSparkSessionExtensions.isSpark41Plus, + "TimeType requires Spark 4.1+") + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val df = spark.sql("SELECT TIME '12:34:56' AS t, 1 AS id") + checkSparkAnswer(df) + } + } + test("Native_datafusion reports correct files and bytes scanned") { val inputFiles = 2 diff --git a/spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala index 544cd91bd2..a9fdc96231 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala @@ -108,15 +108,7 @@ class CometWindowExecSuite extends CometTestBase { val cometShuffles = collect(df2.queryExecution.executedPlan) { case _: CometShuffleExchangeExec => true } - if (shuffleMode == "jvm" || shuffleMode == "auto") { - assert(cometShuffles.length == 1) - } else { - // we fall back to Spark for shuffle because we do not support - // native shuffle with a LocalTableScan input, and we do not fall - // back to Comet columnar shuffle due to - // https://github.com/apache/datafusion-comet/issues/1248 - assert(cometShuffles.isEmpty) - } + assert(cometShuffles.length == 1) } } } From 18cd14b0940eeb9a95e1a22e681b5a1c84f5b650 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 22 May 2026 10:03:50 -0400 Subject: [PATCH 5/6] Fix TimeType test. --- .../test/scala/org/apache/comet/exec/CometExecSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index c3e903f883..71a30adecd 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3945,8 +3945,12 @@ class CometExecSuite extends CometTestBase { org.apache.comet.CometSparkSessionExtensions.isSpark41Plus, "TimeType requires Spark 4.1+") withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { - val df = spark.sql("SELECT TIME '12:34:56' AS t, 1 AS id") - checkSparkAnswer(df) + // Spark 4.1's row encoder cannot serialize TIME columns to the JVM, so we cannot + // collect rows. count() exercises the LocalRelation -> scan path without materializing + // the TIME value, which is sufficient to verify the fallback (without the fallback the + // CometLocalTableScanExec ArrowWriter would crash on TimeType). + val cnt = spark.sql("SELECT TIME '12:34:56' AS t, 1 AS id").count() + assert(cnt == 1) } } From fc40d59a81abfc70c91ab1fb5d2453e00fa11e8e Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 22 May 2026 16:59:33 -0400 Subject: [PATCH 6/6] fix null value type in map in native shuffle --- native/shuffle/src/spark_unsafe/list.rs | 8 +++++++- .../comet/exec/CometColumnarShuffleSuite.scala | 6 ++++++ .../org/apache/comet/exec/CometExecSuite.scala | 16 +++++++++------- .../comet/exec/CometNativeShuffleSuite.scala | 6 ++++++ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index 3fea3fadeb..14f9feb843 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -24,7 +24,7 @@ use arrow::array::{ builder::{ ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, - ListBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, + ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, }, MapBuilder, }; @@ -393,6 +393,12 @@ pub fn append_to_builder( let builder = downcast_builder_ref!(Date32Builder, builder); array.append_dates_to_builder::(builder); } + DataType::Null => { + let builder = downcast_builder_ref!(NullBuilder, builder); + for _ in 0..array.get_num_elements() { + builder.append_null(); + } + } DataType::Binary => { add_values!( BinaryBuilder, diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 70d427972a..b0be2b90ac 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -99,6 +99,12 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar checkShuffleAnswer(shuffled, 1) } + test("columnar shuffle with Map[_, NullType] column") { + val df = sql("SELECT id, map(id, null) AS m FROM VALUES (1), (2), (3) AS t(id)") + val shuffled = df.repartition(2, $"id") + checkShuffleAnswer(shuffled, 1) + } + test("columnar shuffle on nested struct including nulls") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 71a30adecd..8c8c19bb9c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3944,13 +3944,15 @@ class CometExecSuite extends CometTestBase { assume( org.apache.comet.CometSparkSessionExtensions.isSpark41Plus, "TimeType requires Spark 4.1+") - withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { - // Spark 4.1's row encoder cannot serialize TIME columns to the JVM, so we cannot - // collect rows. count() exercises the LocalRelation -> scan path without materializing - // the TIME value, which is sufficient to verify the fallback (without the fallback the - // CometLocalTableScanExec ArrowWriter would crash on TimeType). - val cnt = spark.sql("SELECT TIME '12:34:56' AS t, 1 AS id").count() - assert(cnt == 1) + // spark.sql.timeType.enabled defaults to Utils.isTesting; enable explicitly so the + // row encoder accepts TIME (matches Spark's own TimeFunctionsSuiteBase setup). + withSQLConf( + "spark.sql.timeType.enabled" -> "true", + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + // VALUES folds to a LocalRelation, exercising the CometLocalTableScanExec convert + // path; the TimeType column should drive the schema-level fallback. + val df = spark.sql("SELECT * FROM VALUES (TIME '12:34:56'), (TIME '01:02:03') AS t(c)") + checkSparkAnswer(df) } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index 60637102f0..b34e75d137 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -224,6 +224,12 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper checkShuffleAnswer(shuffled, 1) } + test("native shuffle with Map[_, NullType] column") { + val df = spark.sql("SELECT id, map(id, null) AS m FROM VALUES (1), (2), (3) AS t(id)") + val shuffled = df.repartition(2, $"id") + checkShuffleAnswer(shuffled, 1) + } + test("fix: Comet native shuffle with binary data") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl")