Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
4 Skipped Deployments
|
BentsiLeviav
left a comment
There was a problem hiding this comment.
Besides the comments I left, addressing these issues would make our docs even better:
- Let's fix the
UInt64mapping. Right now, it maps toLongType, after your change here (ClickHouse/spark-clickhouse-connector#477), it isDecimalType(20, 0) - For the option
allowUnsupportedShardingdocs say since0.10.0, the code says since0.9.0 - There is an SBT syntax issue (line 141) - the artifact name is unquoted
- We wrongly use
:in the jar path instead of-.--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar
should be changed to--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar - We have a typo in the Gradle snippet in line 131 (
repositries) - In the compatibility matrix - let's update that
mainuses JDBC0.9.5and not0.9.4 - In version
0.9.0, we added the option to provide settings on read usingspark.clickhouse.read.settings- let's document it in the configuration table. spark.clickhouse.read.jsonAsis missing from the configuration table as well
| | `spark.clickhouse.read.splitByPartitionId` (partition-id-based filtering) | 21.6+ | | ||
| | `VariantType` / `JSON` type support | 25.3+ | | ||
|
|
||
| For production deployments, we recommend running ClickHouse 23.x or later. The connector is tested against the latest stable ClickHouse releases. |
There was a problem hiding this comment.
I think we should recommend using the latest, and not a specific version - WDYT?
There was a problem hiding this comment.
Can you add a link to the GitHub workflow ClickHouse version list? That way, users would be able to see exactly what versions we test with
|
|
||
| When using ClickHouse `Distributed` tables with the connector, there is an important networking and architecture consideration: | ||
|
|
||
| **The connector bypasses the Distributed engine and connects directly to each shard node.** |
There was a problem hiding this comment.
The current phrasing sounds a bit like it’s “working around”. Ill suggest writing something like:
| **The connector bypasses the Distributed engine and connects directly to each shard node.** | |
| **Following ClickHouse best practices for high-throughput ingestion, the connector writes data directly to the underlying shard nodes rather than using the Distributed engine.** |
Feel free to suggest other versions if you find them fit.
| This means: | ||
|
|
||
| 1. **All shard hostnames must be reachable** from Spark executors. The connector reads cluster topology from `system.clusters` on the coordinator node, then opens direct connections to each shard. If the shard hostnames returned by `system.clusters` are internal cluster names not resolvable from outside, reads and writes will fail. | ||
| 2. **Inserts go to local tables**: When writing, data is inserted directly into the local `MergeTree` table on each shard (not through the `Distributed` table). This is more efficient but requires that the local table exists on every shard and that Spark can connect to each shard directly. |
There was a problem hiding this comment.
would be nice to include a link to this doc page https://clickhouse.com/docs/engines/table-engines/special/distributed#distributed-writing-data
| ## Passing query settings and Java client options {#passing-query-settings} | ||
|
|
||
| The connector uses ClickHouse's HTTP interface via the [ClickHouse Java client](https://github.com/ClickHouse/clickhouse-java). You can pass arbitrary ClickHouse session settings and HTTP client options using the `option.<key>` prefix. | ||
|
|
There was a problem hiding this comment.
| :::note All available Java client options are listed in [ClientConfigProperties.java](https://github.com/ClickHouse/clickhouse-java/blob/main/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java) and [this documentation page](https://clickhouse.com/docs/integrations/language-clients/java/client#configuration). | |
| All available server session settings are listed in [this documentation page](https://clickhouse.com/docs/operations/settings/settings). ::: | |
| | `max_execution_time=300` | `option.custom_http_params` | Extend query timeout (seconds) for large reads | | ||
| | `session_timeout=60` | `option.custom_http_params` | Extend HTTP session timeout | | ||
| | `ssl` | `option.ssl` | Enable TLS (`true`/`false`) | | ||
| | `client_name` | `option.client_name` | Tag requests with a client name visible in `system.query_log` | |
There was a problem hiding this comment.
We don't append multiple client_name values in the connector, and the Java Client doesn't do it automatically (in situations where there are multiple client_name sets, as in https://github.com/ClickHouse/spark-clickhouse-connector/blob/a1d8b7b32cae27e2fe38133c10ce10613b824013/clickhouse-core/src/main/scala/com/clickhouse/spark/client/NodeClient.scala#L97-L104), so I'm not sure that will work. Did you verify it?
| ### Via TableProvider API {#query-settings-tableprovider-api} | ||
|
|
||
| ```python | ||
| df.read \ |
There was a problem hiding this comment.
Did you mean spark.read?
|
|
||
| ### Common settings {#query-settings-common} | ||
|
|
||
| | Setting | Example value | Purpose | |
|
|
||
| **Cause**: When aggregation push-down is triggered, the connector sends a probe query to ClickHouse to determine the output schema of the pushed-down aggregation. This is by design and only occurs when aggregation push-down is active. | ||
|
|
||
| **Fix**: If this causes unacceptable load, avoid triggering aggregation push-down by handling the aggregation in Spark instead: |
There was a problem hiding this comment.
This "fix" reads the entire table into Spark before aggregating, which in Spark's use case (working with big data), would potentially be far worse than the probe query it is trying to avoid.
For most cases, the WHERE 1=0 probe query is just a schema-determination round-trip (near-zero cost, and if it is not, it might suggest other problems), and the SELECT * workaround trades a free probe query for a full table scan.
Do we have other alternatives to avoid it? I see the question in ClickHouse/spark-clickhouse-connector#374 was for a situation where a schema is being provided. Is there a feature flag disabling this once a schema is provided? If not, I would encourage us to keep the issue open, and once we develop such functionality, add it to the docs.
|
|
||
| ### Read parallelism {#read-parallelism} | ||
|
|
||
| The connector creates one Spark input partition per ClickHouse physical partition (data part group). The number of Spark read tasks equals the number of distinct partition values in the ClickHouse table. |
There was a problem hiding this comment.
| The connector creates one Spark input partition per ClickHouse physical partition (data part group). The number of Spark read tasks equals the number of distinct partition values in the ClickHouse table. | |
| The connector creates one Spark input partition per ClickHouse physical partition (data part group). The number of Spark read tasks equals the number of distinct partition values in the ClickHouse table. Please visit [Table Partitions](https://clickhouse.com/docs/partitions) for more information on partitioning. |
| | `Distributed` table with `spark.clickhouse.read.distributed.convertLocal=true` (default) | (number of shards) × (partitions per shard) tasks, each targeting a specific shard node directly | | ||
| | `Distributed` table with `spark.clickhouse.read.distributed.convertLocal=false` | 1 task, reading through the `Distributed` coordinator node | | ||
|
|
||
| For a table with no `PARTITION BY` clause, the connector creates one Spark partition per ClickHouse data part group, which may result in many small tasks. Use `PARTITION BY` in your ClickHouse table to control parallelism granularity. |
There was a problem hiding this comment.
which may result in many small
Would love to hear your thoughts to see if my understanding here is right:
- planInputPartitions returns
Array[InputParition]and Spark creates once task per element - - createReader(partition) is called once per element from that array. It takes a single
ClickHouseInputPartitionand creates one reader for it.
So, to summarize - the mapping is 1 to 1 - One InputPartition object mapped to one Spark task (which mapped to one reader).
If that is correct, there is no fan out.
For the no-PARTITION BY case: queryPartitionSpec returns Array(NoPartitionSpec) (one element) -> inputPartitions has length 1->
planInputPartitions returns an array of 1 → Spark schedules exactly 1 task.

option.<key>/option.custom_http_paramsmechanism for both Catalog and TableProvider APIs, with a common settings tableWHERE 1=0schema inference overhead, shard hostname resolution, too many tasks, partition expression errors, and stale schema