Skip to content

feat: enable CometLocalTableScanExec by default#4393

Draft
mbutrovich wants to merge 8 commits into
apache:mainfrom
mbutrovich:enable_localtablescan
Draft

feat: enable CometLocalTableScanExec by default#4393
mbutrovich wants to merge 8 commits into
apache:mainfrom
mbutrovich:enable_localtablescan

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4347.

Rationale for this change

Currently CometLocalTableScanExec is disabled by default. A lot of Spark SQL tests (UDFs, expressions) don't write their input to sources that Comet reads natively (e.g., Parquet, Iceberg) so they are likely not being exercised through Comet.

What changes are included in this PR?

Enable localTableScan translation to Comet by default.

How are these changes tested?

Existing tests.

@mbutrovich mbutrovich self-assigned this May 22, 2026
@mbutrovich mbutrovich marked this pull request as draft May 22, 2026 11:18
@mbutrovich
Copy link
Copy Markdown
Contributor Author

Failure counts

Log archive Job set Failures Notes
logs_70206724473 Comet (Linux, Spark 3.4–4.2) 5 All same test
logs_70206740518 Comet (macOS, Spark 4.0–4.2) 3 Same test; no platform-specific issue
logs_70206737291 Upstream Spark SQL (3.4.3–4.1.1) ~691 reported + 5 jobs hit 6h timeout/OOM True total likely 800–1000
logs_70206724472 Iceberg integration (Spark 3.4/3.5 × Iceberg 1.8/1.9/1.10) 220 Silent assertion mismatches; no Comet stack frames

Clean: catalyst modules, all iceberg-spark-runtime runs, TPC-DS, TPC-H, Rust tests, lint, native builds, sql_hive-2.

Root-cause buckets (ranked by blast radius)

B1 — NullType rejected by Utils.toArrowType (~600 failures, ~75–85% of Spark SQL)

Stack identical across occurrences:

java.lang.UnsupportedOperationException: Unsupported data type: NullType
  at org.apache.spark.sql.comet.util.Utils$.toArrowType(Utils.scala:155)
  at CometArrowConverters$ArrowBatchIterBase.<init>(54)
  at CometLocalTableScanExec.doExecuteColumnar(78)

Triggers any time Seq(...).toDF / values (..., null) yields a void column, including nested MapType(_, NullType) (e.g. DatasetPrimitiveSuite "special floating point values").

Fix options:

  • (a) Reject schemas containing NullType (including nested) in the rule that builds CometLocalTableScanExec, so plan falls back. Smallest blast radius.
  • (b) Wire NullType → Arrow Null in Utils.toArrowType (+ corresponding ArrowWriter case).

Refs:

  • spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:155
  • spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala:54
  • spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala:66-81

B2 — TimeType rejected (~25 failures, Spark 4.1.x only)

SparkException: not support type: TimeType(6)
  at ArrowWriter$.createFieldWriter(ArrowWriters.scala:89)

Hits Spark 4.1 TIME-type tests (SPARK-51402, SPARK-52883, SPARK-53929, SPARK-53109, SPARK-53107, SPARK-53108, SPARK-52626, SPARK-52660, etc.). Same fix shape as B1; fallback is the immediate move since the rest of Comet does not yet support TIME.

Refs:

  • spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala:89

B3 — Nested-type nullability mismatch (~20 Spark SQL + likely root cause of all 220 Iceberg failures)

DataFusion error shape:

Incorrect datatype for StructArray field "nested",
  expected Struct("a": Int32, "b": Int64),
  got      Struct("a": non-null Int32, "b": non-null Int64)

CometArrowConverters derives child nullability from the Spark schema; downstream native operators were planned against schemas with different child-nullability inference. One side has to normalize.

Iceberg connection:

  • Every failing Iceberg test seeds rows via spark.createDataFrame(input, Employee.class) (TestDelete.java:1526) — a LocalRelation that now becomes CometLocalTableScanExec.
  • Failures concentrate on the conjunction branch=test, distributionMode=none, fanout=false, vectorized=true (other parameterizations of the same test methods pass).
  • Symptoms are silent: assertion failures like expected:<199> but was:<200>, [Snapshot property added-data-files has unexpected value], View should have correct data: expected:<2> but was:<0>. No exception, no Comet stack frame.
  • CometLocalTableScanExec.isFfiSafe = false (line 110) due to array reuse in CometArrowConverters. Safe for Comet-native consumers (the native side copies based on the proto flag). Unsafe for non-Comet consumers that buffer batches across next() calls. The Iceberg branch-write path likely buffers, which combined with B3-style nullability mismatches could produce the silent loss.

Investigate first:

  • Run TestCopyOnWriteDelete.testSkewDelete with the failing parameterization locally; dump physical plan and per-batch row counts.
  • Determine whether Iceberg's DSv2 columnar branch write honors isFfiSafe=false, or whether the gap is purely nullability.

Refs:

  • spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala:48-81,110
  • spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala:44,68
  • Iceberg spark-extensions TestDelete.append(String, Employee...) (TestDelete.java:1520-1528) and SparkRowLevelOperationsTestBase.createAndInitTable / append / commitTarget (SparkRowLevelOperationsTestBase.java:282,294,433)

B4 — Stale CometWindowExecSuite assertion (8 failures: Comet's own suites, all platforms)

Test: window function: partition and order expressions, all 5 Spark versions on Linux + 3 on macOS. Inverted assertion:

// CometWindowExecSuite.scala:111-119
} else {
  // we fall back to Spark for shuffle because we do not support
  // native shuffle with a LocalTableScan input, and we do not fall
  // back to Comet columnar shuffle due to
  // https://github.com/apache/datafusion-comet/issues/1248
  assert(cometShuffles.isEmpty)
}

The premise is no longer true; native shuffle now composes with the new local scan.

Action: update the assertion to cometShuffles.length == 1, drop the comment, re-evaluate whether #1248 is still relevant. Grep for siblings: "LocalTableScan input", "issues/1248", "fall back to Spark for shuffle".

Refs:

  • spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala:108-119

B5 — Plan-shape / explain / metrics assertions in upstream Spark SQL (~40 failures)

Tests grep the physical plan for Spark-only nodes or check WholeStageCodegen / SQL metrics that Comet doesn't emit. Examples:

  • ExplainSuite "Support ExplainMode in Dataset.explain"
  • SQLAppStatusListenerSuite "SPARK-29894 test Codegen Stage Id", "SPARK-32615,SPARK-33016"
  • BatchEvalPythonExecSuite "Python UDF: push down deterministic FilterExec predicates"
  • JoinHintSuite "broadcast", "shuffle-replicate-nl"
  • DataFrameSetOperationsSuite SPARK-37371
  • OptimizeLocalRelationsSuite SPARK-25860 / SPARK-33847

Not Comet defects. Either extend the upstream-test skip list or add per-test config disable for localTableScan.

B6 — Subquery not found when plan root is CometLocalTableScan (~8 failures)

CometRuntimeException: Subquery NNN not found for plan MMM

Hits CTEHintSuite "subquery in repartition", SPARK-36447, CTE Predicate push-down and column pruning. Real Comet bug — subquery registration path doesn't handle the new root operator.

B7 — Long tail (~20 failures total)

  • ULP-level float math diffs in MathFunctionsSuite (asinh, acosh, cosh, tan, cot, cbrt, pow, atan2, etc.). Known JVM-libm vs Rust-libm gap; no Comet carve-out for upstream suite.
  • bit_length / octet_length on BinaryType rejected by DataFusion's string-only UDF (SPARK-36751).
  • null IN () returns false in Comet, should be null (EmptyInSuite).
  • RuntimeNullChecksV2Writes expects SparkRuntimeException, gets SparkException (wrapper depth).
  • to_binary/unhex error class differs from Spark's CONVERSION_INVALID_INPUT.
  • collect_set element order differs.
  • 5 Spark SQL shards hit GitHub Actions 6h wall clock / exit 137 — likely cascade from B1 stack-trace volume; should resolve once B1 is fixed.

Suggested fix order

Order Action Eliminates Risk
1 B1 fallback (NullType in schema) ~600 + likely unblocks the 5 timed-out shards Low
2 B2 fallback (TimeType in schema) ~25 Low
3 B3 investigation + fix (Iceberg silent corruption) 220 + ~20 Spark SQL Medium (data correctness)
4 B4 stale assertion update + sibling grep 8 Trivial
5 B6 subquery registration ~8 Low-medium
6 B5 skip list (or per-test config) ~40 Low
7 B7 individually ~20 Varies

After (1)–(2), failure count should drop from ~1,200 to ~250 and most remaining failures will be the substantive ones.

Open questions

  • B3: does Iceberg's columnar DSv2 branch-write path honor setArrowFfiSafe(false), or does the contract only apply to native consumers? If only native, CometLocalTableScanExec needs to copy batches before exposing them to non-Comet consumers (or be disallowed as a direct columnar input to non-Comet writers).
  • B5: skip list vs per-test disable — preference? A skip list is easier to maintain; per-test config keeps Comet exercised.
  • Whether to keep the default flip in PR feat: enable CometLocalTableScanExec by default #4393 and land fixes incrementally, or to gate the flip behind B1–B3 landing first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable spark.comet.exec.localTableScan.enabled when running Spark SQL tests

1 participant