From 1df9a37ba9ab8db050925b297f1c290cd6655bf9 Mon Sep 17 00:00:00 2001 From: Guillermo Ovejero Date: Mon, 23 Mar 2026 23:52:11 +0100 Subject: [PATCH 1/3] docs: add docs for kafka sink auto evolve option --- .../kafka/kafka-clickhouse-connect-sink.md | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index 0995ef1b0d8..02310085fcc 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -122,6 +122,7 @@ The full table of configuration options: | `bufferCount` (since v1.3.6) | Number of records to buffer in memory before flushing to ClickHouse. `0` disables internal buffering. Buffering is not supported with `exactlyOnce=true`. | `"0"` | | `bufferFlushTime` (since v1.3.6) | Maximum time in milliseconds to buffer records before flush when `exactlyOnce=false`. `0` disables time-based flushing. Default value is `0`. Only required for time-base threshold. Only effective when `bufferCount > 0`. | `"0"` | | `reportInsertedOffsets` (since v1.3.6) | Enables returning only successfully inserted offsets from `preCommit` (instead of `currentOffsets`) when `exactlyOnce=false`. This does not apply when `ignorePartitionsWhenBatching=true`, where `currentOffsets` are still returned. | `"false"` | +| `auto.evolve` (since v1.3.7) | Automatically add columns to the ClickHouse table when incoming records contain new fields not present in the table. See [Schema Evolution](#schema-evolution). | `"false"` | ### Target tables {#target-tables} @@ -327,6 +328,57 @@ Example: } ``` +### Schema Evolution {#schema-evolution} + +The connector supports automatic table schema evolution when `auto.evolve=true`. When incoming Kafka records contain fields not present in the destination ClickHouse table, the connector automatically issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` statements to add the missing columns. + +This mirrors the [`auto.evolve` feature](https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html#auto-evolution) in the Confluent JDBC Sink Connector. + +#### How it works {#how-schema-evolution-works} + +1. For each batch of records, the connector compares the record schema against the table's column list. +2. If new fields are detected, it maps the Kafka Connect types to ClickHouse types and issues DDL. +3. If multiple schema versions appear in a single batch, the batch is split at schema boundaries - each sub-batch is flushed and the table is evolved before continuing. + +#### Type mapping for new columns {#schema-evolution-type-mapping} + +When creating new columns, the connector maps Connect types to ClickHouse types as follows: + +| Kafka Connect Type | ClickHouse Type | Notes | +|---|---|---| +| `org.apache.kafka.connect.data.Decimal` | `Decimal(38, S)` | Scale from schema parameters | +| `org.apache.kafka.connect.data.Date` | `Date32` | | +| `org.apache.kafka.connect.data.Time` | `Int64` | | +| `org.apache.kafka.connect.data.Timestamp` | `DateTime64(3)` | | +| `INT8` .. `INT64` | `Int8` .. `Int64` | | +| `FLOAT32` / `FLOAT64` | `Float32` / `Float64` | | +| `BOOLEAN` | `Bool` | | +| `STRING` / `BYTES` | `String` | | +| `ARRAY` | `Array()` | Recursive | +| `MAP` | `Map(, )` | Recursive | +| `STRUCT` | Not supported | Throws an error | + +Optional (nullable) fields are wrapped in `Nullable(...)`, except for `ARRAY` and `MAP` types which [cannot be Nullable in ClickHouse](/sql-reference/data-types/nullable). Elements and values inside composite types can still be Nullable. + +#### Guards {#schema-evolution-guards} + +The connector rejects schema evolution in the following cases with a clear error message: + +- **Non-nullable field without a default value** - ClickHouse requires new columns to be either `Nullable` or have a `DEFAULT`. +- **STRUCT fields** - Mapping Connect STRUCT to ClickHouse is non-trivial (could be Tuple, JSON, or Nested). Not supported for auto-evolution. +- **Schemaless or string records** - No Connect schema is available to derive ClickHouse types. Evolution is skipped with a warning. + +#### Concurrent tasks {#schema-evolution-concurrent-tasks} + +Schema evolution is safe to use with multiple connector tasks. `ADD COLUMN IF NOT EXISTS` is idempotent - if two tasks race to add the same column, both succeed silently. DDL statements are executed with [`alter_sync=1`](/sql-reference/statements/alter#synchronicity-of-alter-queries) to wait for the local replica to apply the change. A retry loop on `DESCRIBE TABLE` (5 retries, 200ms backoff) handles propagation to other replicas. + +#### Limitations {#schema-evolution-limitations} + +- **Add-only** - Columns are only added, never removed or modified. This is the same behavior as the JDBC connector. Stale nullable columns accumulate harmlessly. +- **Schema required** - Evolution only works with schema-based data (Avro, Protobuf, JSON Schema). Schemaless JSON and string records are not evolved. +- **STRUCT not supported** - Individual STRUCT fields cannot be auto-evolved. The top-level value must be a STRUCT (i.e., a row), but nested STRUCT fields are rejected. +- **Schema Registry recommended** - For best results, use a [Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) with BACKWARD or FULL compatibility mode. This ensures new fields are always optional (nullable), which is the safest mode for auto-evolution. + ### Logging {#logging} Logging is automatically provided by Kafka Connect Platform. From 73f24d0cd9878467ac7896b93f3966f7f7711d81 Mon Sep 17 00:00:00 2001 From: Guillermo Ovejero Date: Thu, 26 Mar 2026 08:59:36 +0100 Subject: [PATCH 2/3] docs: kafka-clickhouse-connect-sink schema evolution changes --- .../kafka/kafka-clickhouse-connect-sink.md | 91 ++++++++++++------- 1 file changed, 58 insertions(+), 33 deletions(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index 02310085fcc..3464b527427 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -122,7 +122,9 @@ The full table of configuration options: | `bufferCount` (since v1.3.6) | Number of records to buffer in memory before flushing to ClickHouse. `0` disables internal buffering. Buffering is not supported with `exactlyOnce=true`. | `"0"` | | `bufferFlushTime` (since v1.3.6) | Maximum time in milliseconds to buffer records before flush when `exactlyOnce=false`. `0` disables time-based flushing. Default value is `0`. Only required for time-base threshold. Only effective when `bufferCount > 0`. | `"0"` | | `reportInsertedOffsets` (since v1.3.6) | Enables returning only successfully inserted offsets from `preCommit` (instead of `currentOffsets`) when `exactlyOnce=false`. This does not apply when `ignorePartitionsWhenBatching=true`, where `currentOffsets` are still returned. | `"false"` | -| `auto.evolve` (since v1.3.7) | Automatically add columns to the ClickHouse table when incoming records contain new fields not present in the table. See [Schema Evolution](#schema-evolution). | `"false"` | +| `auto.evolve` (since v1.3.7) | Automatically create new columns when incoming records contain fields not present in the destination table. Only applies to column creation - type changes and column removal are not supported. Requires schema-based data. See [Schema Evolution](#schema-evolution). | `"false"` | +| `auto.evolve.ddl.refresh.retries` (since v1.3.7) | Number of retries when waiting for DDL changes to propagate after schema evolution. | `"3"` | +| `auto.evolve.struct.to.json` (since v1.3.7) | When `auto.evolve=true`, automatically map STRUCT fields to [JSON](/sql-reference/data-types/json) columns. Without this flag, STRUCT fields that need auto-evolution will cause an error. See [Schema Evolution](#schema-evolution). | `"false"` | ### Target tables {#target-tables} @@ -334,50 +336,73 @@ The connector supports automatic table schema evolution when `auto.evolve=true`. This mirrors the [`auto.evolve` feature](https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html#auto-evolution) in the Confluent JDBC Sink Connector. -#### How it works {#how-schema-evolution-works} - -1. For each batch of records, the connector compares the record schema against the table's column list. -2. If new fields are detected, it maps the Kafka Connect types to ClickHouse types and issues DDL. -3. If multiple schema versions appear in a single batch, the batch is split at schema boundaries - each sub-batch is flushed and the table is evolved before continuing. +#### Requirements and limitations {#schema-evolution-limitations} -#### Type mapping for new columns {#schema-evolution-type-mapping} +- **Schema required** - `auto.evolve` requires schema-based data (Avro, Protobuf, or JSON Schema). Schemaless or string records will cause an error when `auto.evolve=true`. +- **Add-only** - Columns are only added, never removed or modified. This is the same behavior as the JDBC connector. Stale nullable columns accumulate harmlessly. +- **Union types auto-detected** - The connector automatically detects Avro unions and Protobuf `oneof` fields (via Confluent converter metadata). Unions where all branches map to the same ClickHouse type (e.g. `union(string, bytes)`) collapse to that type. Unions with multiple distinct types (e.g. `union(null, string, int)`) map to [`Variant(T1, T2, ...)`](/sql-reference/data-types/variant), which **requires ClickHouse 24.1+** (experimental, with `allow_experimental_variant_type = 1`) or **25.3+** (stable). If a union contains [suspicious similar types](https://clickhouse.com/docs/operations/settings/settings#allow_suspicious_variant_types) (e.g. `Int32` + `Int64`), it falls back to `Nullable(String)`. +- **STRUCT mapped to JSON (opt-in)** - By default, non-union STRUCT fields cannot be auto-evolved because they could map to Tuple, JSON, or Nested. Set `auto.evolve.struct.to.json=true` to automatically create STRUCT fields as [JSON](/sql-reference/data-types/json) columns. Without this flag, pre-create the column with the desired type manually. The connector supports STRUCT for *insertion* into existing columns regardless of this flag. +- **Schema Registry recommended** - For best results, use a [Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) with BACKWARD or FULL compatibility mode. This ensures new fields are always optional (nullable), which is the safest mode for auto-evolution. +- **Table must exist** - Auto-evolution adds columns to existing tables. It does not create new tables. -When creating new columns, the connector maps Connect types to ClickHouse types as follows: +#### Configuration {#schema-evolution-configuration} -| Kafka Connect Type | ClickHouse Type | Notes | +| Property | Description | Default | |---|---|---| -| `org.apache.kafka.connect.data.Decimal` | `Decimal(38, S)` | Scale from schema parameters | -| `org.apache.kafka.connect.data.Date` | `Date32` | | -| `org.apache.kafka.connect.data.Time` | `Int64` | | -| `org.apache.kafka.connect.data.Timestamp` | `DateTime64(3)` | | -| `INT8` .. `INT64` | `Int8` .. `Int64` | | -| `FLOAT32` / `FLOAT64` | `Float32` / `Float64` | | -| `BOOLEAN` | `Bool` | | -| `STRING` / `BYTES` | `String` | | -| `ARRAY` | `Array()` | Recursive | -| `MAP` | `Map(, )` | Recursive | -| `STRUCT` | Not supported | Throws an error | +| `auto.evolve` | Enable automatic column creation for new fields | `false` | +| `auto.evolve.ddl.refresh.retries` | Number of retries waiting for DDL propagation after schema evolution | `3` | +| `auto.evolve.struct.to.json` | Map Connect STRUCT fields to ClickHouse [JSON](/sql-reference/data-types/json) columns during schema evolution | `false` | -Optional (nullable) fields are wrapped in `Nullable(...)`, except for `ARRAY` and `MAP` types which [cannot be Nullable in ClickHouse](/sql-reference/data-types/nullable). Elements and values inside composite types can still be Nullable. +To use schema evolution, your upstream schemas should be configured so that new fields are either optional or have default values. For Avro, this means using `["null", "type"]` union types or specifying a `default`. For Protobuf, use `optional` fields. + +#### Example configuration {#schema-evolution-example-configuration} + +Sink connector properties: + +```json +{ + "name": "clickhouse-connect", + "config": { + "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector", + ... + "auto.evolve": true, + "auto.evolve.ddl.refresh.retries": 3, + "auto.evolve.struct.to.json": false + } +} +``` -#### Guards {#schema-evolution-guards} +#### How it works {#how-schema-evolution-works} -The connector rejects schema evolution in the following cases with a clear error message: +1. For each batch of records, the connector compares the record schema against the table's column list. +2. If new fields are detected, it maps the Kafka Connect types to ClickHouse types and issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` with all missing columns in a single DDL statement. +3. DDL is executed with [`alter_sync=1`](/sql-reference/statements/alter#synchronicity-of-alter-queries) to wait for the local replica to apply the change. +4. After DDL, the connector retries `DESCRIBE TABLE` (configurable via `auto.evolve.ddl.refresh.retries`, default 3 retries with 200ms backoff) to verify the new columns are visible. -- **Non-nullable field without a default value** - ClickHouse requires new columns to be either `Nullable` or have a `DEFAULT`. -- **STRUCT fields** - Mapping Connect STRUCT to ClickHouse is non-trivial (could be Tuple, JSON, or Nested). Not supported for auto-evolution. -- **Schemaless or string records** - No Connect schema is available to derive ClickHouse types. Evolution is skipped with a warning. +#### Type mapping for new columns {#schema-evolution-type-mapping} -#### Concurrent tasks {#schema-evolution-concurrent-tasks} +All new columns are created as `Nullable(...)` so that records predating the schema change can still be inserted - the new column receives `NULL` for these older records. `ARRAY`, `MAP`, and `Variant` types are the exceptions, as [ClickHouse does not allow Nullable for these types](/sql-reference/data-types/nullable) (`Variant` implicitly supports `NULL`). `JSON` columns are wrapped in `Nullable(JSON)`, which ClickHouse supports. Elements and values inside composite types can still be Nullable (e.g., `Array(Nullable(String))`). -Schema evolution is safe to use with multiple connector tasks. `ADD COLUMN IF NOT EXISTS` is idempotent - if two tasks race to add the same column, both succeed silently. DDL statements are executed with [`alter_sync=1`](/sql-reference/statements/alter#synchronicity-of-alter-queries) to wait for the local replica to apply the change. A retry loop on `DESCRIBE TABLE` (5 retries, 200ms backoff) handles propagation to other replicas. +| Kafka Connect Type | ClickHouse Type | Notes | +|---|---|---| +| `org.apache.kafka.connect.data.Decimal` | `Nullable(Decimal(38, S))` | Scale from schema parameters | +| `org.apache.kafka.connect.data.Date` | `Nullable(Date32)` | | +| `org.apache.kafka.connect.data.Time` | `Nullable(Int64)` | Native `Time`/`Time64` types available from ClickHouse 25.6 ([release blog](https://clickhouse.com/blog/clickhouse-release-25-06#timetime64-data-types)). Requires `SET enable_time_time64_type = 1` before v25.12; enabled by default from [v25.12](https://clickhouse.com/docs/whats-new/changelog/2025#2512). | +| `org.apache.kafka.connect.data.Timestamp` | `Nullable(DateTime64(3))` | | +| `INT8` .. `INT64` | `Nullable(Int8)` .. `Nullable(Int64)` | | +| `FLOAT32` / `FLOAT64` | `Nullable(Float32)` / `Nullable(Float64)` | | +| `BOOLEAN` | `Nullable(Bool)` | | +| `STRING` / `BYTES` | `Nullable(String)` | | +| `ARRAY` | `Array()` | Not wrapped in Nullable | +| `MAP` | `Map(, )` | Not wrapped in Nullable | +| `STRUCT` (union, same types) | `Nullable()` | e.g. Avro `union(string, bytes)` → `Nullable(String)` | +| `STRUCT` (union, distinct types) | `Variant(T1, T2, ...)` | e.g. Avro `union(null, string, int)` → `Variant(String, Int32)`. **Requires ClickHouse 24.1+** | +| `STRUCT` (union, suspicious similar types) | `Nullable(String)` | e.g. `union(int, long)` - falls back to String to avoid `allow_suspicious_variant_types` | +| `STRUCT` (non-union) | `Nullable(JSON)` (when `auto.evolve.struct.to.json=true`) | Requires opt-in; throws error by default | -#### Limitations {#schema-evolution-limitations} +#### Concurrent tasks {#schema-evolution-concurrent-tasks} -- **Add-only** - Columns are only added, never removed or modified. This is the same behavior as the JDBC connector. Stale nullable columns accumulate harmlessly. -- **Schema required** - Evolution only works with schema-based data (Avro, Protobuf, JSON Schema). Schemaless JSON and string records are not evolved. -- **STRUCT not supported** - Individual STRUCT fields cannot be auto-evolved. The top-level value must be a STRUCT (i.e., a row), but nested STRUCT fields are rejected. -- **Schema Registry recommended** - For best results, use a [Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) with BACKWARD or FULL compatibility mode. This ensures new fields are always optional (nullable), which is the safest mode for auto-evolution. +Schema evolution is safe to use with multiple connector tasks. `ADD COLUMN IF NOT EXISTS` is idempotent - if two tasks race to add the same column, both succeed silently. ### Logging {#logging} From cfc7e6b0d33abea6a4ee3c011e5bdcdfb8c62c82 Mon Sep 17 00:00:00 2001 From: Guillermo Ovejero Date: Sun, 29 Mar 2026 20:06:33 +0200 Subject: [PATCH 3/3] docs: clarify last record schema check in auto.evolve setting --- .../data-ingestion/kafka/kafka-clickhouse-connect-sink.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index 3464b527427..677d689713c 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -344,6 +344,7 @@ This mirrors the [`auto.evolve` feature](https://docs.confluent.io/kafka-connect - **STRUCT mapped to JSON (opt-in)** - By default, non-union STRUCT fields cannot be auto-evolved because they could map to Tuple, JSON, or Nested. Set `auto.evolve.struct.to.json=true` to automatically create STRUCT fields as [JSON](/sql-reference/data-types/json) columns. Without this flag, pre-create the column with the desired type manually. The connector supports STRUCT for *insertion* into existing columns regardless of this flag. - **Schema Registry recommended** - For best results, use a [Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) with BACKWARD or FULL compatibility mode. This ensures new fields are always optional (nullable), which is the safest mode for auto-evolution. - **Table must exist** - Auto-evolution adds columns to existing tables. It does not create new tables. +- **Last record schema check** - The connector checks only the last record in each batch for new fields. If a single batch contains multiple schema versions (e.g., records with schema V1 followed by V2 followed by V1), only the last record's schema is evaluated. In practice this is rarely an issue because schema changes are infrequent and Kafka partitions preserve ordering, so the last record typically carries the newest schema. If you use `ignorePartitionsWhenBatching=true` with producers at different schema versions, consider evolving the table manually before deploying the new schema. #### Configuration {#schema-evolution-configuration} @@ -374,7 +375,7 @@ Sink connector properties: #### How it works {#how-schema-evolution-works} -1. For each batch of records, the connector compares the record schema against the table's column list. +1. For each batch of records, the connector checks the last record's schema against the table's column list. 2. If new fields are detected, it maps the Kafka Connect types to ClickHouse types and issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` with all missing columns in a single DDL statement. 3. DDL is executed with [`alter_sync=1`](/sql-reference/statements/alter#synchronicity-of-alter-queries) to wait for the local replica to apply the change. 4. After DDL, the connector retries `DESCRIBE TABLE` (configurable via `auto.evolve.ddl.refresh.retries`, default 3 retries with 200ms backoff) to verify the new columns are visible.