Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions docs/source/contributor-guide/adding_a_new_spark_version.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ own test suites under the new profile.

Promote the new Spark version from the compile-only job to the main test
jobs in `.github/workflows/pr_build_linux.yml` (and `pr_build_macos.yml` if
capacity allows). Use `scan_impl: "auto"` so both `native_datafusion` and
`native_iceberg_compat` get exercised, matching how earlier versions are
configured.
capacity allows). Match how earlier versions are configured.

### Run the Suite Locally First

Expand Down Expand Up @@ -256,14 +254,12 @@ new-version bring-up are:
### CI for the Spark SQL Tests

Spark SQL tests do not run from the main PR build workflows. They have
their own dedicated workflow files:
their own dedicated workflow file:

- `.github/workflows/spark_sql_test.yml`
- `.github/workflows/spark_sql_test_native_iceberg_compat.yml`

Add the new version to the matrix in each of these files (`spark-short`,
`spark-full`, `java`, `scan-impl`). Use the closest existing entry as a
template.
Add the new version to the matrix (`spark-short`, `spark-full`, `java`).
Use the closest existing entry as a template.

Before merging, run `make format`, run clippy
(`cd native && cargo clippy --all-targets --workspace -- -D warnings`), and
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/compatibility/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Comet aims to provide consistent results with the version of Apache Spark that i

This guide documents areas where Comet's behavior is known to differ from Spark. Topics are grouped by subsystem:

- **Parquet**: limitations when reading Parquet files (both scan implementations, shared and per-implementation).
- **Parquet**: limitations when reading Parquet files.
- **Floating-point comparison**: NaN and signed-zero handling in comparisons.
- **Regular expressions**: differences between the Rust regexp crate and Java's regex engine.
- **Operators**: operator-level compatibility notes, including window functions and round-robin partitioning.
Expand Down
76 changes: 24 additions & 52 deletions docs/source/user-guide/latest/compatibility/scans.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,13 @@ under the License.

# Parquet Compatibility

Comet currently has two distinct implementations of the Parquet scan operator.
Comet's Parquet scan offloads decoding to native code and produces Arrow batches for the rest of
the plan. Comet falls back to Spark when the scan cannot be converted (for example, due to one of
the unsupported features listed below).

| Scan Implementation | Notes |
| ----------------------- | ---------------------- |
| `native_datafusion` | Fully native scan |
| `native_iceberg_compat` | Hybrid JVM/native scan |
## Parquet Scan Limitations

The configuration property `spark.comet.scan.impl` is used to select an implementation. The default setting is
`spark.comet.scan.impl=auto`, which attempts to use `native_datafusion` first, and falls back to Spark if the scan
cannot be converted (e.g., due to unsupported features). Most users should not need to change this setting. However,
it is possible to force Comet to use a particular implementation for all scan operations by setting this
configuration property to one of the following implementations. For example: `--conf spark.comet.scan.impl=native_datafusion`.

## Shared Limitations

The following features are not supported by either scan implementation, and Comet will fall back to Spark in these scenarios:
The following features are not supported and cause Comet to fall back to Spark:

- Decimals encoded in binary format.
- `ShortType` columns, by default. When reading Parquet files written by systems other than Spark that contain
Expand All @@ -46,17 +37,30 @@ The following features are not supported by either scan implementation, and Come
columns are always safe because they can only come from signed `INT8`, where truncation preserves the signed value.
- Default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported.
- Spark's Datasource V2 API. When `spark.sql.sources.useV1SourceList` does not include `parquet`, Spark uses the
V2 API for Parquet scans. The DataFusion-based implementations only support the V1 API.
V2 API for Parquet scans. Comet's Parquet scan only supports the V1 API.
- Spark metadata columns (e.g., `_metadata.file_path`)
- No support for row indexes
- No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions.
Comet's Parquet scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values.
- No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true`
- Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns)
are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`,
matching Spark's behavior.
- `spark.sql.parquet.enableVectorizedReader=false`. Disabling the vectorized reader opts into
Spark's parquet-mr semantics (silent overflow, null-on-narrowing), which Comet's native reader
does not replicate. By default Comet falls back to Spark in this case. Set
`spark.comet.scan.allowDisabledParquetVectorizedReader=true` to opt in to running the
Comet Parquet scan regardless. See
[#4352](https://github.com/apache/datafusion-comet/issues/4352).

The following shared limitation may produce incorrect results without falling back to Spark:
The following limitation may produce incorrect results without falling back to Spark:

- No support for datetime rebasing. When reading Parquet files containing dates or timestamps written before
Spark 3.0 (which used a hybrid Julian/Gregorian calendar), dates/timestamps will be read as if they were
written using the Proleptic Gregorian calendar. This may produce incorrect results for dates before
October 15, 1582.

The following shared limitation raises an error at scan time rather than falling back to Spark:
The following limitation raises an error at scan time rather than falling back to Spark:

- Invalid UTF-8 bytes in `STRING` columns. Spark permits arbitrary byte sequences in a `STRING`
column (for example from `CAST(X'C1' AS STRING)`), but Comet's native execution path is built on
Expand All @@ -65,28 +69,7 @@ The following shared limitation raises an error at scan time rather than falling
query, or cast the column to `BINARY` before persisting, if you need to preserve non-UTF-8 bytes.
See [#4121](https://github.com/apache/datafusion-comet/issues/4121).

## `native_datafusion` Limitations

The `native_datafusion` scan has some additional limitations, mostly related to Parquet metadata. All of these
cause Comet to fall back to Spark (including when using `auto` mode). Note that the `native_datafusion` scan
requires `spark.comet.exec.enabled=true` because the scan node must be wrapped by `CometExecRule`.

- No support for row indexes
- No support for reading Parquet field IDs
- No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions.
The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values.
- No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true`
- Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns)
are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`,
matching Spark's behavior.
- `spark.sql.parquet.enableVectorizedReader=false`. Disabling the vectorized reader opts into
Spark's parquet-mr semantics (silent overflow, null-on-narrowing), which Comet's native reader
does not replicate. By default Comet falls back to Spark in this case. Set
`spark.comet.scan.allowDisabledParquetVectorizedReader=true` to opt in to running the
`native_datafusion` scan regardless. See
[#4352](https://github.com/apache/datafusion-comet/issues/4352).

The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0
The following limitation may produce incorrect results on Spark versions prior to 4.0
without falling back to Spark:

- Reading `TimestampLTZ` as `TimestampNTZ`. On Spark 3.x, Spark raises an error per
Expand All @@ -112,8 +95,8 @@ Schema mismatch happens in two real-world scenarios:
table types at read time.

Spark's vectorized Parquet reader fully validates these conversions in `ParquetVectorUpdaterFactory.getUpdater`
and throws `SchemaColumnConvertNotSupportedException` for unsupported pairs. `native_datafusion` mirrors
that validation in its schema adapter; the entries below are the remaining gaps.
and throws `SchemaColumnConvertNotSupportedException` for unsupported pairs. Comet's Parquet scan
mirrors that validation in its schema adapter; the entries below are the remaining gaps.

Note that the exact set of accepted conversions has changed between Spark versions
(for example, Spark 3.x's `schemaEvolution.enabled` flag gates `INT32 → INT64`, `FLOAT → DOUBLE`,
Expand All @@ -136,14 +119,3 @@ SchemaColumnConvertNotSupportedException`) instead of the one-level chain Spark'
`SparkException` instead. Walk the cause chain to recover the
`SchemaColumnConvertNotSupportedException`. Spark 4.0+ produces a single-level chain, matching
vanilla Spark. See [#4354](https://github.com/apache/datafusion-comet/issues/4354).

## `native_iceberg_compat` Limitations

The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results
without falling back to Spark:

- Some Spark configuration values are hard-coded to their defaults rather than respecting user-specified values.
This may produce incorrect results when non-default values are set. The affected configurations are
`spark.sql.parquet.binaryAsString`, `spark.sql.parquet.int96AsTimestamp`, `spark.sql.caseSensitive`,
`spark.sql.parquet.inferTimestampNTZ.enabled`, and `spark.sql.legacy.parquet.nanosAsLong`. See
[issue #1816](https://github.com/apache/datafusion-comet/issues/1816) for more details.
16 changes: 8 additions & 8 deletions docs/source/user-guide/latest/compatibility/spark-versions.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ Spark 3.4.3 is supported with Java 11/17 and Scala 2.12/2.13.
### Known Limitations

- **Reading `TimestampLTZ` as `TimestampNTZ`**: Spark 3.4 raises an error for this operation
(SPARK-36182), but Comet's `native_datafusion` scan silently returns the raw UTC value instead.
See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details.
(SPARK-36182), but Comet's Parquet scan silently returns the raw UTC value instead.
See [Parquet Compatibility](scans.md#parquet-scan-limitations) for details.

- **Unsupported Parquet type conversions**: Spark 3.4 raises schema incompatibility errors for
certain type mismatches (e.g., reading INT32 as BIGINT, decimal precision changes), but Comet's
`native_datafusion` scan may not detect these and could return unexpected values.
See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details.
Comet's Parquet scan may not detect these and could return unexpected values.
See [Parquet Compatibility](scans.md#parquet-scan-limitations) for details.

## Spark 3.5

Expand All @@ -46,13 +46,13 @@ Spark 3.5.8 is supported with Java 11/17 and Scala 2.12/2.13.
### Known Limitations

- **Reading `TimestampLTZ` as `TimestampNTZ`**: Spark 3.5 raises an error for this operation
(SPARK-36182), but Comet's `native_datafusion` scan silently returns the raw UTC value instead.
See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details.
(SPARK-36182), but Comet's Parquet scan silently returns the raw UTC value instead.
See [Parquet Compatibility](scans.md#parquet-scan-limitations) for details.

- **Unsupported Parquet type conversions**: Spark 3.5 raises schema incompatibility errors for
certain type mismatches (e.g., reading INT32 as BIGINT, decimal precision changes), but Comet's
`native_datafusion` scan may not detect these and could return unexpected values.
See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details.
Comet's Parquet scan may not detect these and could return unexpected values.
See [Parquet Compatibility](scans.md#parquet-scan-limitations) for details.

## Spark 4.0

Expand Down
19 changes: 8 additions & 11 deletions docs/source/user-guide/latest/datasources.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,11 @@ Or use `spark-shell` with HDFS support as described [above](#building-comet-with

## S3

The `native_datafusion` and `native_iceberg_compat` Parquet scan implementations completely offload data loading
to native code. They use the [`object_store` crate](https://crates.io/crates/object_store) to read data from S3 and
support configuring S3 access using standard [Hadoop S3A configurations](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration) by translating them to
the `object_store` crate's format.
Comet's Parquet scan completely offloads data loading to native code. It uses the
[`object_store` crate](https://crates.io/crates/object_store) to read data from S3 and supports
configuring S3 access using standard
[Hadoop S3A configurations](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration)
by translating them to the `object_store` crate's format.

This implementation maintains compatibility with existing Hadoop S3A configurations, so existing code will
continue to work as long as the configurations are supported and can be translated without loss of functionality.
Expand Down Expand Up @@ -206,8 +207,7 @@ Multiple credential providers can be specified in a comma-separated list using t

### Additional S3 Configuration Options

Beyond credential providers, the `native_datafusion` and `native_iceberg_compat` implementations support additional
S3 configuration options:
Beyond credential providers, Comet's Parquet scan supports additional S3 configuration options:

| Option | Description |
| ------------------------------- | -------------------------------------------------------------------------------------------------- |
Expand All @@ -220,8 +220,7 @@ All configuration options support bucket-specific overrides using the pattern `f

### Examples

The following examples demonstrate how to configure S3 access with the `native_datafusion` and `native_iceberg_compat`
Parquet scan implementations using different authentication methods.
The following examples demonstrate how to configure S3 access using different authentication methods.

**Example 1: Simple Credentials**

Expand All @@ -230,7 +229,6 @@ This example shows how to access a private S3 bucket using an access key and sec
```shell
$SPARK_HOME/bin/spark-shell \
...
--conf spark.comet.scan.impl=native_datafusion \
--conf spark.hadoop.fs.s3a.access.key=my-access-key \
--conf spark.hadoop.fs.s3a.secret.key=my-secret-key
...
Expand All @@ -243,7 +241,6 @@ This example demonstrates using an assumed role credential to access a private S
```shell
$SPARK_HOME/bin/spark-shell \
...
--conf spark.comet.scan.impl=native_datafusion \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider \
--conf spark.hadoop.fs.s3a.assumed.role.arn=arn:aws:iam::123456789012:role/my-role \
--conf spark.hadoop.fs.s3a.assumed.role.session.name=my-session \
Expand All @@ -253,7 +250,7 @@ $SPARK_HOME/bin/spark-shell \

### Limitations

The S3 support of `native_datafusion` and `native_iceberg_compat` has the following limitations:
Comet's S3 support has the following limitations:

1. **Partial Hadoop S3A configuration support**: Not all Hadoop S3A configurations are currently supported. Only the configurations listed in the tables above are translated and applied to the underlying `object_store` crate.

Expand Down
13 changes: 6 additions & 7 deletions docs/source/user-guide/latest/understanding-comet-plans.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,12 @@ by role. Names match what is shown in the plan output.

### Scans

| Node | Description |
| ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `CometScan` | V1 Parquet scan driven by Spark's file-source path through Comet's Parquet reader. Decoding runs in native code; the resulting Arrow batches cross JNI into the native plan. The active scan implementation is shown in brackets, e.g. `CometScan [native_iceberg_compat]`. |
| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, that produces Arrow batches consumed by Comet. |
| `CometNativeScan` | Fully native Parquet scan that runs entirely in DataFusion (no JVM Parquet reader involvement). |
| `CometIcebergNativeScan` | Fully native Iceberg Parquet scan. |
| `CometCsvNativeScan` | Fully native CSV scan (experimental). |
| Node | Description |
| ------------------------ | --------------------------------------------------------------------------------------------- |
| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, that produces Arrow batches consumed by Comet. |
| `CometNativeScan` | Fully native Parquet scan that runs entirely in DataFusion. |
| `CometIcebergNativeScan` | Fully native Iceberg Parquet scan. |
| `CometCsvNativeScan` | Fully native CSV scan (experimental). |

### Native Execution Operators

Expand Down