Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ The full table of configuration options:
| `bufferCount` (since v1.3.6) | Number of records to buffer in memory before flushing to ClickHouse. `0` disables internal buffering. Buffering is not supported with `exactlyOnce=true`. | `"0"` |
| `bufferFlushTime` (since v1.3.6) | Maximum time in milliseconds to buffer records before flush when `exactlyOnce=false`. `0` disables time-based flushing. Default value is `0`. Only required for time-base threshold. Only effective when `bufferCount > 0`. | `"0"` |
| `reportInsertedOffsets` (since v1.3.6) | Enables returning only successfully inserted offsets from `preCommit` (instead of `currentOffsets`) when `exactlyOnce=false`. This does not apply when `ignorePartitionsWhenBatching=true`, where `currentOffsets` are still returned. | `"false"` |
| `auto.evolve` (since v1.3.7) | Automatically create new columns when incoming records contain fields not present in the destination table. Only applies to column creation - type changes and column removal are not supported. Requires schema-based data. See [Schema Evolution](#schema-evolution). | `"false"` |
| `auto.evolve.ddl.refresh.retries` (since v1.3.7) | Number of retries when waiting for DDL changes to propagate after schema evolution. | `"3"` |
| `auto.evolve.struct.to.json` (since v1.3.7) | When `auto.evolve=true`, automatically map STRUCT fields to [JSON](/sql-reference/data-types/json) columns. Without this flag, STRUCT fields that need auto-evolution will cause an error. See [Schema Evolution](#schema-evolution). | `"false"` |

### Target tables {#target-tables}

Expand Down Expand Up @@ -327,6 +330,81 @@ Example:
}
```

### Schema Evolution {#schema-evolution}

The connector supports automatic table schema evolution when `auto.evolve=true`. When incoming Kafka records contain fields not present in the destination ClickHouse table, the connector automatically issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` statements to add the missing columns.

This mirrors the [`auto.evolve` feature](https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html#auto-evolution) in the Confluent JDBC Sink Connector.

#### Requirements and limitations {#schema-evolution-limitations}

- **Schema required** - `auto.evolve` requires schema-based data (Avro, Protobuf, or JSON Schema). Schemaless or string records will cause an error when `auto.evolve=true`.
- **Add-only** - Columns are only added, never removed or modified. This is the same behavior as the JDBC connector. Stale nullable columns accumulate harmlessly.
- **Union types auto-detected** - The connector automatically detects Avro unions and Protobuf `oneof` fields (via Confluent converter metadata). Unions where all branches map to the same ClickHouse type (e.g. `union(string, bytes)`) collapse to that type. Unions with multiple distinct types (e.g. `union(null, string, int)`) map to [`Variant(T1, T2, ...)`](/sql-reference/data-types/variant), which **requires ClickHouse 24.1+** (experimental, with `allow_experimental_variant_type = 1`) or **25.3+** (stable). If a union contains [suspicious similar types](https://clickhouse.com/docs/operations/settings/settings#allow_suspicious_variant_types) (e.g. `Int32` + `Int64`), it falls back to `Nullable(String)`.
- **STRUCT mapped to JSON (opt-in)** - By default, non-union STRUCT fields cannot be auto-evolved because they could map to Tuple, JSON, or Nested. Set `auto.evolve.struct.to.json=true` to automatically create STRUCT fields as [JSON](/sql-reference/data-types/json) columns. Without this flag, pre-create the column with the desired type manually. The connector supports STRUCT for *insertion* into existing columns regardless of this flag.
- **Schema Registry recommended** - For best results, use a [Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) with BACKWARD or FULL compatibility mode. This ensures new fields are always optional (nullable), which is the safest mode for auto-evolution.
- **Table must exist** - Auto-evolution adds columns to existing tables. It does not create new tables.
- **Last record schema check** - The connector checks only the last record in each batch for new fields. If a single batch contains multiple schema versions (e.g., records with schema V1 followed by V2 followed by V1), only the last record's schema is evaluated. In practice this is rarely an issue because schema changes are infrequent and Kafka partitions preserve ordering, so the last record typically carries the newest schema. If you use `ignorePartitionsWhenBatching=true` with producers at different schema versions, consider evolving the table manually before deploying the new schema.

#### Configuration {#schema-evolution-configuration}

| Property | Description | Default |
|---|---|---|
| `auto.evolve` | Enable automatic column creation for new fields | `false` |
| `auto.evolve.ddl.refresh.retries` | Number of retries waiting for DDL propagation after schema evolution | `3` |
| `auto.evolve.struct.to.json` | Map Connect STRUCT fields to ClickHouse [JSON](/sql-reference/data-types/json) columns during schema evolution | `false` |

To use schema evolution, your upstream schemas should be configured so that new fields are either optional or have default values. For Avro, this means using `["null", "type"]` union types or specifying a `default`. For Protobuf, use `optional` fields.

#### Example configuration {#schema-evolution-example-configuration}

Sink connector properties:

```json
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"auto.evolve": true,
"auto.evolve.ddl.refresh.retries": 3,
"auto.evolve.struct.to.json": false
}
}
```

#### How it works {#how-schema-evolution-works}

1. For each batch of records, the connector checks the last record's schema against the table's column list.
2. If new fields are detected, it maps the Kafka Connect types to ClickHouse types and issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` with all missing columns in a single DDL statement.
3. DDL is executed with [`alter_sync=1`](/sql-reference/statements/alter#synchronicity-of-alter-queries) to wait for the local replica to apply the change.
4. After DDL, the connector retries `DESCRIBE TABLE` (configurable via `auto.evolve.ddl.refresh.retries`, default 3 retries with 200ms backoff) to verify the new columns are visible.

#### Type mapping for new columns {#schema-evolution-type-mapping}

All new columns are created as `Nullable(...)` so that records predating the schema change can still be inserted - the new column receives `NULL` for these older records. `ARRAY`, `MAP`, and `Variant` types are the exceptions, as [ClickHouse does not allow Nullable for these types](/sql-reference/data-types/nullable) (`Variant` implicitly supports `NULL`). `JSON` columns are wrapped in `Nullable(JSON)`, which ClickHouse supports. Elements and values inside composite types can still be Nullable (e.g., `Array(Nullable(String))`).

| Kafka Connect Type | ClickHouse Type | Notes |
|---|---|---|
| `org.apache.kafka.connect.data.Decimal` | `Nullable(Decimal(38, S))` | Scale from schema parameters |
| `org.apache.kafka.connect.data.Date` | `Nullable(Date32)` | |
| `org.apache.kafka.connect.data.Time` | `Nullable(Int64)` | Native `Time`/`Time64` types available from ClickHouse 25.6 ([release blog](https://clickhouse.com/blog/clickhouse-release-25-06#timetime64-data-types)). Requires `SET enable_time_time64_type = 1` before v25.12; enabled by default from [v25.12](https://clickhouse.com/docs/whats-new/changelog/2025#2512). |
| `org.apache.kafka.connect.data.Timestamp` | `Nullable(DateTime64(3))` | |
| `INT8` .. `INT64` | `Nullable(Int8)` .. `Nullable(Int64)` | |
| `FLOAT32` / `FLOAT64` | `Nullable(Float32)` / `Nullable(Float64)` | |
| `BOOLEAN` | `Nullable(Bool)` | |
| `STRING` / `BYTES` | `Nullable(String)` | |
| `ARRAY` | `Array(<element_type>)` | Not wrapped in Nullable |
| `MAP` | `Map(<key_type>, <value_type>)` | Not wrapped in Nullable |
| `STRUCT` (union, same types) | `Nullable(<type>)` | e.g. Avro `union(string, bytes)` → `Nullable(String)` |
| `STRUCT` (union, distinct types) | `Variant(T1, T2, ...)` | e.g. Avro `union(null, string, int)` → `Variant(String, Int32)`. **Requires ClickHouse 24.1+** |
| `STRUCT` (union, suspicious similar types) | `Nullable(String)` | e.g. `union(int, long)` - falls back to String to avoid `allow_suspicious_variant_types` |
| `STRUCT` (non-union) | `Nullable(JSON)` (when `auto.evolve.struct.to.json=true`) | Requires opt-in; throws error by default |

#### Concurrent tasks {#schema-evolution-concurrent-tasks}

Schema evolution is safe to use with multiple connector tasks. `ADD COLUMN IF NOT EXISTS` is idempotent - if two tasks race to add the same column, both succeed silently.

### Logging {#logging}

Logging is automatically provided by Kafka Connect Platform.
Expand Down
Loading