Skip to content

Commit 1b344de

Browse files
authored
fix: Support auto scan mode with Spark 4.0.0 (#1975)
1 parent 2ac9c43 commit 1b344de

File tree

5 files changed

+195
-35
lines changed

5 files changed

+195
-35
lines changed

common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ import org.apache.spark.sql.types._
3939
import org.apache.spark.sql.vectorized.ColumnarBatch
4040
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
4141

42+
import org.apache.comet.shims.CometTypeShim
4243
import org.apache.comet.vector.CometVector
4344

44-
object Utils {
45+
object Utils extends CometTypeShim {
4546
def getConfPath(confFileName: String): String = {
4647
sys.env
4748
.get("COMET_CONF_DIR")
@@ -124,7 +125,8 @@ object Utils {
124125
case LongType => new ArrowType.Int(8 * 8, true)
125126
case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
126127
case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
127-
case StringType => ArrowType.Utf8.INSTANCE
128+
case _: StringType => ArrowType.Utf8.INSTANCE
129+
case dt if isStringCollationType(dt) => ArrowType.Utf8.INSTANCE
128130
case BinaryType => ArrowType.Binary.INSTANCE
129131
case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale, 128)
130132
case DateType => new ArrowType.Date(DateUnit.DAY)
@@ -138,7 +140,8 @@ object Utils {
138140
case TimestampNTZType =>
139141
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
140142
case _ =>
141-
throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}")
143+
throw new UnsupportedOperationException(
144+
s"Unsupported data type: [${dt.getClass.getName}] ${dt.catalogString}")
142145
}
143146

144147
/** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.comet.shims
20+
21+
import org.apache.spark.sql.types.DataType
22+
23+
trait CometTypeShim {
24+
def isStringCollationType(dt: DataType): Boolean = false
25+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.comet.shims
20+
21+
import org.apache.spark.sql.internal.types.StringTypeWithCollation
22+
import org.apache.spark.sql.types.DataType
23+
24+
trait CometTypeShim {
25+
def isStringCollationType(dt: DataType): Boolean = dt.isInstanceOf[StringTypeWithCollation]
26+
}

dev/diffs/4.0.0.diff

Lines changed: 116 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -700,10 +700,10 @@ index 9c529d14221..069b7c5adeb 100644
700700
}
701701
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
702702
new file mode 100644
703-
index 00000000000..5eb3fa17ca8
703+
index 00000000000..5691536c114
704704
--- /dev/null
705705
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
706-
@@ -0,0 +1,43 @@
706+
@@ -0,0 +1,45 @@
707707
+/*
708708
+ * Licensed to the Apache Software Foundation (ASF) under one or more
709709
+ * contributor license agreements. See the NOTICE file distributed with
@@ -732,6 +732,8 @@ index 00000000000..5eb3fa17ca8
732732
+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`).
733733
+ */
734734
+case class IgnoreComet(reason: String) extends Tag("DisableComet")
735+
+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet")
736+
+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet")
735737
+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
736738
+
737739
+/**
@@ -1277,11 +1279,26 @@ index 2e33f6505ab..e1e93ab3bad 100644
12771279
}
12781280

12791281
withTable("t1", "t2") {
1282+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
1283+
index fee375db10a..8c2c24e2c5f 100644
1284+
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
1285+
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
1286+
@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._
1287+
import org.apache.spark.types.variant._
1288+
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
1289+
1290+
-class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest {
1291+
+class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest
1292+
+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed
1293+
+ with IgnoreCometSuite {
1294+
def parseJson(s: String): VariantVal = {
1295+
val v = VariantBuilder.parseJson(s, false)
1296+
new VariantVal(v.getValue, v.getMetadata)
12801297
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
1281-
index 11e9547dfc5..df5678c8d82 100644
1298+
index 11e9547dfc5..be9ae40ab3d 100644
12821299
--- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
12831300
+++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
1284-
@@ -20,7 +20,7 @@ package org.apache.spark.sql.collation
1301+
@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation
12851302
import scala.jdk.CollectionConverters.MapHasAsJava
12861303

12871304
import org.apache.spark.SparkException
@@ -1290,7 +1307,21 @@ index 11e9547dfc5..df5678c8d82 100644
12901307
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
12911308
import org.apache.spark.sql.catalyst.expressions._
12921309
import org.apache.spark.sql.catalyst.util.CollationFactory
1293-
@@ -1505,7 +1505,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
1310+
+import org.apache.spark.sql.comet.{CometBroadcastHashJoinExec, CometHashJoinExec, CometSortMergeJoinExec}
1311+
import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2ProviderWithCustomSchema}
1312+
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable}
1313+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
1314+
@@ -55,7 +56,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
1315+
assert(
1316+
collectFirst(queryPlan) {
1317+
case _: SortMergeJoinExec => assert(isSortMergeForced)
1318+
+ case _: CometSortMergeJoinExec => assert(isSortMergeForced)
1319+
case _: HashJoin => assert(!isSortMergeForced)
1320+
+ case _: CometHashJoinExec | _: CometBroadcastHashJoinExec => assert(!isSortMergeForced)
1321+
}.nonEmpty
1322+
)
1323+
}
1324+
@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
12941325
}
12951326
}
12961327

@@ -1301,7 +1332,23 @@ index 11e9547dfc5..df5678c8d82 100644
13011332
val t1 = "T_1"
13021333
val t2 = "T_2"
13031334

1304-
@@ -1815,7 +1817,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
1335+
@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
1336+
} else {
1337+
assert(!collectFirst(queryPlan) {
1338+
case b: BroadcastHashJoinExec => b.leftKeys.head
1339+
+ case b: CometBroadcastHashJoinExec => b.leftKeys.head
1340+
}.head.isInstanceOf[ArrayTransform])
1341+
}
1342+
}
1343+
@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
1344+
} else {
1345+
assert(!collectFirst(queryPlan) {
1346+
case b: BroadcastHashJoinExec => b.leftKeys.head
1347+
+ case b: CometBroadcastHashJoinExec => b.leftKeys.head
1348+
}.head.isInstanceOf[ArrayTransform])
1349+
}
1350+
}
1351+
@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
13051352
}
13061353
}
13071354

@@ -2636,10 +2683,23 @@ index 22839d3f0d2..7e66d100e90 100644
26362683
checkAnswer(
26372684
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
26382685
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
2639-
index bba71f1c48d..4f33ce4b3f2 100644
2686+
index bba71f1c48d..38c60ee2584 100644
26402687
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
26412688
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
2642-
@@ -1060,7 +1060,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2689+
@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2690+
Seq(Some("A"), Some("A"), None).toDF().repartition(1)
2691+
.write.parquet(path.getAbsolutePath)
2692+
val df = spark.read.parquet(path.getAbsolutePath)
2693+
- checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
2694+
+ // Similar to Spark's vectorized reader, Comet doesn't do row-level filtering but relies
2695+
+ // on Spark to apply the data filters after columnar batches are returned
2696+
+ if (!isCometEnabled) {
2697+
+ checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
2698+
+ }
2699+
}
2700+
}
2701+
}
2702+
@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
26432703
checkAnswer(readParquet(schema2, path), df)
26442704
}
26452705

@@ -2649,7 +2709,7 @@ index bba71f1c48d..4f33ce4b3f2 100644
26492709
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
26502710
checkAnswer(readParquet(schema1, path), df)
26512711
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
2652-
@@ -1084,7 +1085,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2712+
@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
26532713
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
26542714
df.write.parquet(path.toString)
26552715

@@ -3330,40 +3390,64 @@ index 86c4e49f6f6..2e639e5f38d 100644
33303390
val tblTargetName = "tbl_target"
33313391
val tblSourceQualified = s"default.$tblSourceName"
33323392
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
3333-
index f0f3f94b811..486a436afb2 100644
3393+
index f0f3f94b811..d64e4e54e22 100644
33343394
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
33353395
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
3336-
@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
3396+
@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
3397+
import scala.language.implicitConversions
3398+
import scala.util.control.NonFatal
3399+
3400+
+import org.apache.comet.CometConf
3401+
import org.apache.hadoop.fs.Path
3402+
import org.scalactic.source.Position
3403+
import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
33373404
import org.scalatest.concurrent.Eventually
33383405

33393406
import org.apache.spark.SparkFunSuite
33403407
-import org.apache.spark.sql.{AnalysisException, Row}
3341-
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
3408+
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, IgnoreCometNativeScan, Row}
33423409
import org.apache.spark.sql.catalyst.FunctionIdentifier
33433410
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
33443411
import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE
3345-
@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
3412+
@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
33463413
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
33473414
import org.apache.spark.sql.catalyst.util._
33483415
import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits}
3349-
+import org.apache.spark.sql.comet._
3416+
+import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec}
33503417
import org.apache.spark.sql.execution.FilterExec
33513418
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
33523419
import org.apache.spark.sql.execution.datasources.DataSourceUtils
3353-
@@ -128,7 +129,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
3420+
@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
33543421
}
33553422
}
33563423
} else {
33573424
- super.test(testName, testTags: _*)(testFun)
33583425
+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
33593426
+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
33603427
+ } else {
3361-
+ super.test(testName, testTags: _*)(testFun)
3428+
+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
3429+
+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
3430+
+ cometScanImpl == CometConf.SCAN_AUTO
3431+
+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION ||
3432+
+ cometScanImpl == CometConf.SCAN_AUTO
3433+
+ if (isCometEnabled && isNativeIcebergCompat &&
3434+
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
3435+
+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun)
3436+
+ } else if (isCometEnabled && isNativeDataFusion &&
3437+
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
3438+
+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
3439+
+ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) &&
3440+
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
3441+
+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)",
3442+
+ testTags: _*)(testFun)
3443+
+ } else {
3444+
+ super.test(testName, testTags: _*)(testFun)
3445+
+ }
33623446
+ }
33633447
}
33643448
}
33653449

3366-
@@ -248,8 +253,33 @@ private[sql] trait SQLTestUtilsBase
3450+
@@ -248,8 +271,33 @@ private[sql] trait SQLTestUtilsBase
33673451
override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter
33683452
}
33693453

@@ -3397,7 +3481,7 @@ index f0f3f94b811..486a436afb2 100644
33973481
super.withSQLConf(pairs: _*)(f)
33983482
}
33993483

3400-
@@ -451,6 +481,8 @@ private[sql] trait SQLTestUtilsBase
3484+
@@ -451,6 +499,8 @@ private[sql] trait SQLTestUtilsBase
34013485
val schema = df.schema
34023486
val withoutFilters = df.queryExecution.executedPlan.transform {
34033487
case FilterExec(_, child) => child
@@ -3509,6 +3593,20 @@ index 4b27082e188..09f591dfed3 100644
35093593
Utils.withContextClassLoader(Utils.getSparkClassLoader) {
35103594
withUserDefinedFunction(udfInfo.funcName -> false) {
35113595
val sparkClassLoader = Thread.currentThread().getContextClassLoader
3596+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
3597+
index cc7bb193731..06555d48da7 100644
3598+
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
3599+
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
3600+
@@ -818,7 +818,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
3601+
}
3602+
}
3603+
3604+
- test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT") {
3605+
+ test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT",
3606+
+ IgnoreComet("Comet does not support reading non UTF-8 strings")) {
3607+
withTable("t1", "t2") {
3608+
withTempDir { dir =>
3609+
val file = new File(dir, "test.hex")
35123610
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
35133611
index b67370f6eb9..746b3974b29 100644
35143612
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala

0 commit comments

Comments
 (0)