Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,262 @@ for converting data types when reading from ClickHouse into Spark and when inser
| `Object` | | ❌ | | |
| `Nested` | | ❌ | | |

## Supported ClickHouse server versions {#supported-clickhouse-server-versions}

The connector communicates with ClickHouse exclusively over HTTP. There is no minimum version enforced at runtime, but the following version requirements apply for specific features:

| Feature | Minimum ClickHouse Version |
|---------|---------------------------|
| General HTTP connectivity | 20.7+ |
| `spark.clickhouse.read.splitByPartitionId` (partition-id-based filtering) | 21.6+ |
| `VariantType` / `JSON` type support | 25.3+ |

For production deployments, we recommend running ClickHouse 23.x or later. The connector is tested against the latest stable ClickHouse releases.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should recommend using the latest, and not a specific version - WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a link to the GitHub workflow ClickHouse version list? That way, users would be able to see exactly what versions we test with


## Push-down operations {#push-down-operations}

The connector implements the Spark DataSource V2 push-down interfaces, meaning the following operations are translated into SQL and executed on the ClickHouse server rather than in Spark memory. This significantly reduces data transfer and improves query performance.

| Push-down Type | Spark Interface | What gets pushed |
|---|---|---|
| **Column pruning** | `SupportsPushDownRequiredColumns` | Only the columns selected by the query are fetched from ClickHouse. `SELECT col1, col2` avoids transferring all columns. |
| **Filter predicates** | `SupportsPushDownFilters` | `WHERE` conditions using standard comparison operators, `IN`, `IS NULL`, `LIKE`, etc. Unsupported expressions fall back to Spark-side evaluation. |
| **Limit** | `SupportsPushDownLimit` | `LIMIT N` is sent to ClickHouse, preventing full table scans for small result sets. |
| **Aggregations** | `SupportsPushDownAggregates` | `GROUP BY` + aggregate functions (`SUM`, `COUNT`, `MIN`, `MAX`, `AVG`) are executed in ClickHouse. A probe query (`WHERE 1=0`) is first sent to determine the output schema. |
| **Runtime filters** | `SupportsRuntimeFiltering` | Dynamic filters produced during query execution (e.g. from broadcast joins) are applied at scan time. Must be enabled explicitly. |

To enable runtime filtering:

```python
spark.conf.set("spark.clickhouse.read.runtimeFilter.enabled", "true")
```

:::note
Complex filter expressions that Spark cannot compile to SQL (e.g., user-defined functions, unsupported predicates) are **not** pushed down. They are returned to Spark and evaluated in memory after the data is fetched. Use `EXPLAIN` in Spark to see which filters were actually pushed down.
:::

## Connector parallelism {#connector-parallelism}

Understanding how the connector maps Spark tasks to ClickHouse shards and partitions is important for tuning performance.

### Read parallelism {#read-parallelism}

The connector creates one Spark input partition per ClickHouse physical partition (data part group). The number of Spark read tasks equals the number of distinct partition values in the ClickHouse table.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The connector creates one Spark input partition per ClickHouse physical partition (data part group). The number of Spark read tasks equals the number of distinct partition values in the ClickHouse table.
The connector creates one Spark input partition per ClickHouse physical partition (data part group). The number of Spark read tasks equals the number of distinct partition values in the ClickHouse table. Please visit [Table Partitions](https://clickhouse.com/docs/partitions) for more information on partitioning.


| Scenario | Spark tasks created |
|---|---|
| `MergeTree` table with N distinct partitions | N tasks, one per partition, all targeting the same ClickHouse node |
| `Distributed` table with `spark.clickhouse.read.distributed.convertLocal=true` (default) | (number of shards) × (partitions per shard) tasks, each targeting a specific shard node directly |
| `Distributed` table with `spark.clickhouse.read.distributed.convertLocal=false` | 1 task, reading through the `Distributed` coordinator node |

For a table with no `PARTITION BY` clause, the connector creates one Spark partition per ClickHouse data part group, which may result in many small tasks. Use `PARTITION BY` in your ClickHouse table to control parallelism granularity.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which may result in many small

Would love to hear your thoughts to see if my understanding here is right:

  1. planInputPartitions returns Array[InputParition] and Spark creates once task per element -
  2. createReader(partition) is called once per element from that array. It takes a single ClickHouseInputPartition and creates one reader for it.

So, to summarize - the mapping is 1 to 1 - One InputPartition object mapped to one Spark task (which mapped to one reader).

If that is correct, there is no fan out.

For the no-PARTITION BY case: queryPartitionSpec returns Array(NoPartitionSpec) (one element) -> inputPartitions has length 1->
planInputPartitions returns an array of 1 → Spark schedules exactly 1 task.


### Write parallelism {#write-parallelism}

Before writing, Spark reshuffles the DataFrame to match ClickHouse's distribution requirements:

1. **Repartition by partition key** (`spark.clickhouse.write.repartitionByPartition=true`, default): Spark groups rows by ClickHouse partition key values so that all rows for the same partition land in the same Spark task.
2. **Sort within partition**: Rows are locally sorted by `[sharding_key, partition_key, order_by_key]` to produce optimally-ordered inserts.
3. **Write tasks**: Each Spark task writes its rows to ClickHouse in batches of `spark.clickhouse.write.batchSize` (default 10,000).

For **Distributed tables**, when `spark.clickhouse.write.distributed.useClusterNodes=true` (default), each Spark task can write to multiple shard nodes in parallel. When `spark.clickhouse.write.distributed.convertLocal=true`, Spark computes the sharding key and routes each row directly to the correct shard's local table.

To control the number of write tasks explicitly:

```python
spark.conf.set("spark.clickhouse.write.repartitionNum", "16") # 16 write tasks
```

## Working with Distributed tables {#working-with-distributed-tables}

When using ClickHouse `Distributed` tables with the connector, there is an important networking and architecture consideration:

**The connector bypasses the Distributed engine and connects directly to each shard node.**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current phrasing sounds a bit like it’s “working around”. Ill suggest writing something like:

Suggested change
**The connector bypasses the Distributed engine and connects directly to each shard node.**
**Following ClickHouse best practices for high-throughput ingestion, the connector writes data directly to the underlying shard nodes rather than using the Distributed engine.**

Feel free to suggest other versions if you find them fit.


This means:

1. **All shard hostnames must be reachable** from Spark executors. The connector reads cluster topology from `system.clusters` on the coordinator node, then opens direct connections to each shard. If the shard hostnames returned by `system.clusters` are internal cluster names not resolvable from outside, reads and writes will fail.
2. **Inserts go to local tables**: When writing, data is inserted directly into the local `MergeTree` table on each shard (not through the `Distributed` table). This is more efficient but requires that the local table exists on every shard and that Spark can connect to each shard directly.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3. **Schema must be consistent across all shards**: Since Spark reads schema from one node, the table structure must be identical on all shards.

To use coordinator-only routing (simpler networking, less parallelism):

```text
spark.clickhouse.read.distributed.convertLocal false
spark.clickhouse.write.distributed.useClusterNodes false
spark.clickhouse.write.distributed.convertLocal false
```

With these settings, all reads and writes go through the single coordinator node using the `Distributed` engine, and only the coordinator needs to be accessible from Spark.

## Passing query settings and Java client options {#passing-query-settings}

The connector uses ClickHouse's HTTP interface via the [ClickHouse Java client](https://github.com/ClickHouse/clickhouse-java). You can pass arbitrary ClickHouse session settings and HTTP client options using the `option.<key>` prefix.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:::note All available Java client options are listed in [ClientConfigProperties.java](https://github.com/ClickHouse/clickhouse-java/blob/main/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java) and [this documentation page](https://clickhouse.com/docs/integrations/language-clients/java/client#configuration).
All available server session settings are listed in [this documentation page](https://clickhouse.com/docs/operations/settings/settings). :::

### Via Catalog API {#query-settings-catalog-api}

Add `option.<key>` entries to `spark-defaults.conf` or your Spark session configuration:

```text
spark.sql.catalog.clickhouse.option.ssl false
spark.sql.catalog.clickhouse.option.async false
spark.sql.catalog.clickhouse.option.client_name spark
spark.sql.catalog.clickhouse.option.custom_http_params async_insert=1,wait_for_async_insert=1
```

The `custom_http_params` key lets you pass multiple ClickHouse server settings as a comma-separated list of `key=value` pairs. These are appended to every HTTP request as query parameters.

### Via TableProvider API {#query-settings-tableprovider-api}

```python
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add Java/Scala examples to align with all the examples we have in this document.

df.read \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean spark.read?

.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "my_table") \
.option("option.custom_http_params", "max_execution_time=300,max_memory_usage=10000000000") \
.load()
```

### Common settings {#query-settings-common}

| Setting | Example value | Purpose |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table header is not aligned with the data. The first two columns should be swapped

Image

|---|---|---|
| `async_insert=1,wait_for_async_insert=1` | `option.custom_http_params` | Enable ClickHouse async inserts; reduces memory pressure during high-frequency writes |
| `insert_deduplicate=0` | `option.custom_http_params` | Disable deduplication for idempotent write pipelines |
| `max_insert_block_size=1048576` | `option.custom_http_params` | Control max block size for inserts |
| `max_execution_time=300` | `option.custom_http_params` | Extend query timeout (seconds) for large reads |
| `session_timeout=60` | `option.custom_http_params` | Extend HTTP session timeout |
| `ssl` | `option.ssl` | Enable TLS (`true`/`false`) |
| `client_name` | `option.client_name` | Tag requests with a client name visible in `system.query_log` |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't append multiple client_name values in the connector, and the Java Client doesn't do it automatically (in situations where there are multiple client_name sets, as in https://github.com/ClickHouse/spark-clickhouse-connector/blob/a1d8b7b32cae27e2fe38133c10ce10613b824013/clickhouse-core/src/main/scala/com/clickhouse/spark/client/NodeClient.scala#L97-L104), so I'm not sure that will work. Did you verify it?


## Performance tuning {#performance-tuning}

### Read performance {#read-performance}

| Tuning | Configuration | Notes |
|---|---|---|
| **Use binary read format** | `spark.clickhouse.read.format=binary` | Binary (Arrow) format is faster than JSON for large reads. Default is `json` for wider compatibility. |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support Arrow read. ClickHouseBinaryReader uses RowBinaryWithNamesAndTypes.

| **Column pruning** | Use explicit `SELECT col1, col2` | Avoid `SELECT *`; the connector pushes column selection to ClickHouse. |
| **Filter push-down** | Keep `WHERE` clauses on native ClickHouse types | String, numeric, and DateTime filters are pushed down. Complex expressions are evaluated in Spark. |
| **Parallel reads across shards** | `spark.clickhouse.read.distributed.convertLocal=true` (default) | Enables one Spark task per shard partition. Ensure all shard hosts are accessible. |
| **Runtime filtering** | `spark.clickhouse.read.runtimeFilter.enabled=true` | Allows dynamic filters from broadcast joins to reduce data fetched. |
| **Compression** | `spark.clickhouse.read.compression.codec=lz4` (default) | LZ4 reduces network transfer; disable only if CPU is the bottleneck. |

### Write performance {#write-performance}

| Tuning | Configuration | Notes |
|---|---|---|
| **Increase batch size** | `spark.clickhouse.write.batchSize=50000` | Default is 10,000. Larger batches reduce round-trips. Reduce if you see "Broken Pipe" errors. |
| **Arrow write format** | `spark.clickhouse.write.format=arrow` (default) | Arrow is faster than JSON for most data types. Use `json` only for Variant/JSON column types. |
| **Compression** | `spark.clickhouse.write.compression.codec=lz4` (default) | Reduces network transfer during writes. |
| **Async inserts** | `option.custom_http_params=async_insert=1,wait_for_async_insert=1` | Shifts buffering to ClickHouse, reducing memory pressure on the server for high-frequency writes. |
| **Pre-sort data** | `spark.clickhouse.write.localSortByKey=true` (default) | Local sort before writing reduces MergeTree merge pressure. |
| **Repartition by partition** | `spark.clickhouse.write.repartitionByPartition=true` (default) | Groups rows by partition before writing, reducing the number of parts created. |
| **Explicit partition count** | `spark.clickhouse.write.repartitionNum=N` | Set if you want to control write parallelism explicitly. |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity - how does this differ from doing regular narrowing operations like coalesce/repartition?

| **Strict distribution** | `spark.clickhouse.write.repartitionStrictly=true` | Forces exact distribution matching. Required only if data ordering is critical (Spark 3.4+). |

### Recommended starting configuration for bulk loads {#bulk-load-config}

```python
spark.conf.set("spark.clickhouse.write.batchSize", "50000")
spark.conf.set("spark.clickhouse.write.format", "arrow")
spark.conf.set("spark.clickhouse.write.compression.codec", "lz4")
spark.conf.set("spark.clickhouse.write.repartitionByPartition", "true")
spark.conf.set("spark.clickhouse.write.localSortByKey", "true")
```

## Troubleshooting {#troubleshooting}

### Broken pipe or connection reset during write {#troubleshooting-broken-pipe}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see multiple people suggested different solutions, none of them got satisfactory confirmation from the community ClickHouse/spark-clickhouse-connector#365

  • Just to make sure - did you manage to reproduce the Broken Pipe error and verify the steps mentioned below?
  • In addition, can you create a follow-up issue to improve the error message to include a link/steps for remediation? If you managed to reproduce it, please include there reproduceable steps


**Symptom**: `CHServerException: [HTTP] Broken pipe (Write failed)` or `Connection reset by peer` during INSERT.

**Cause**: ClickHouse closed the connection mid-stream, typically due to memory pressure on the ClickHouse server when the batch is too large.

**Fix**:
1. Reduce `spark.clickhouse.write.batchSize`. Start with 1,000–5,000 and increase gradually.
2. Enable async inserts to shift buffering to ClickHouse:
```python
spark.conf.set("spark.sql.catalog.clickhouse.option.custom_http_params",
"async_insert=1,wait_for_async_insert=1")
```
3. Increase `max_memory_usage` on the ClickHouse server if possible.

---

### Schema inference `WHERE 1=0` queries {#troubleshooting-schema-inference}

**Symptom**: A `SELECT ... WHERE 1=0` query is sent to ClickHouse every time a DataFrame is used.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any error here? If not, did you add it because of the load of these SELECT ... WHERE 1=0 queries? If so, it is worth mentioning that the symptom is seeing too many of these. If there's no user-visible problem (no error, no meaningful load), it's connector internals/behavior documentation, not a troubleshooting entry.

Having said that, and based on the answer of this question, I would update the title from:

### Schema inference WHERE 1=0 queries {#troubleshooting-schema-inference}

to:

### Too many schema inference WHERE 1=0 queries {#troubleshooting-schema-inference}

or

### Long urnning schema inference WHERE 1=0 queries {#troubleshooting-schema-inference}


**Cause**: When aggregation push-down is triggered, the connector sends a probe query to ClickHouse to determine the output schema of the pushed-down aggregation. This is by design and only occurs when aggregation push-down is active.

**Fix**: If this causes unacceptable load, avoid triggering aggregation push-down by handling the aggregation in Spark instead:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "fix" reads the entire table into Spark before aggregating, which in Spark's use case (working with big data), would potentially be far worse than the probe query it is trying to avoid.

For most cases, the WHERE 1=0 probe query is just a schema-determination round-trip (near-zero cost, and if it is not, it might suggest other problems), and the SELECT * workaround trades a free probe query for a full table scan.

Do we have other alternatives to avoid it? I see the question in ClickHouse/spark-clickhouse-connector#374 was for a situation where a schema is being provided. Is there a feature flag disabling this once a schema is provided? If not, I would encourage us to keep the issue open, and once we develop such functionality, add it to the docs.


```python
# Instead of letting ClickHouse handle the aggregation:
df = spark.sql("SELECT sum(value) FROM clickhouse.default.my_table GROUP BY id")

# Handle the aggregation in Spark memory:
df = spark.sql("SELECT * FROM clickhouse.default.my_table")
df.groupBy("id").agg({"value": "sum"})
```

---

### Cannot connect to shard hostname {#troubleshooting-shard-hostname}

**Symptom**: Reads or writes fail with connection errors referencing shard hostnames that are not the coordinator node.

**Cause**: The connector reads cluster topology from `system.clusters` and tries to connect directly to each shard. If shard hostnames are internal cluster DNS names not resolvable from Spark executors, the connections will fail.

**Fix**:
- Configure DNS so all shard hostnames are resolvable from Spark executors.
- Alternatively, use coordinator-only routing (less parallelism):
```text
spark.clickhouse.read.distributed.convertLocal false
spark.clickhouse.write.distributed.useClusterNodes false
```

---

### Too many Spark tasks when reading a partitioned Distributed table {#troubleshooting-too-many-tasks}

**Symptom**: Spark creates far more tasks than expected (e.g., thousands) when reading a `Distributed` table.

**Cause**: With `spark.clickhouse.read.distributed.convertLocal=true` (default), one Spark partition is created per (shard × ClickHouse partition). A 4-shard cluster with a table partitioned by day over 1 year = 4 × 365 = 1,460 Spark tasks.

**Fix**:
- Use `spark.clickhouse.read.distributed.convertLocal=false` to read through the coordinator (1 task total).
- Apply a partition filter in your query so only relevant partitions are read, reducing the partition list the connector discovers.

---

### Write fails with unsupported partition expression {#troubleshooting-partition-expression}

**Symptom**: `AnalysisException` referencing an unsupported transform (e.g. `months(key)`) when writing to a table with `PARTITION BY toYYYYMM(key)`.

**Cause**: Spark cannot evaluate complex ClickHouse partition expressions as partitioning transforms.

**Fix**: Set `spark.clickhouse.ignoreUnsupportedTransform=true` (the default). The connector will log a warning and skip the partition-level clustering requirement. ClickHouse will still handle routing correctly when writing to the `Distributed` table.

:::warning
If you also have `spark.clickhouse.write.distributed.convertLocal=true`, ignoring unsupported sharding keys can cause incorrect data distribution. In that case, either use a supported sharding key or set `spark.clickhouse.write.distributed.convertLocal.allowUnsupportedSharding=true` only if you understand the implications.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Understand the implications" is vague for a warning - could you spell out the actual consequence? Something like: "rows may be written to the wrong shard, causing data skew or incorrect query results when querying shards directly." Users need to know what can go wrong, not just that something can.

:::

---

### Table not found or stale schema after DDL {#troubleshooting-stale-schema}

**Symptom**: After creating or altering a ClickHouse table, Spark still sees the old schema or raises a "table not found" error.

**Cause**: Spark caches catalog metadata. For ClickHouse Cloud, replication to all nodes may also take a moment.

**Fix**:
```python
spark.catalog.refreshTable("clickhouse.database.table_name")
```

## Contributing and support {#contributing-and-support}

If you'd like to contribute to the project or report any issues, we welcome your input!
Expand Down
Loading