diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java index 737fa3e4..d7fdce7b 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java @@ -8,6 +8,7 @@ import com.clickhouse.kafka.connect.sink.db.mapping.Column; import com.clickhouse.kafka.connect.sink.db.mapping.Table; import com.clickhouse.kafka.connect.util.Mask; +import com.clickhouse.kafka.connect.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,10 +176,13 @@ public List extractTablesMapping() { public List
extractTablesMapping(Map cache) { List
tableList = new ArrayList<>(); for (String tableName : showTables() ) { + // Table names are escaped in the cache + String escapedTableName = Utils.escapeTopicName(tableName); + // Read from cache if we already described this table before // This means we won't pick up edited table configs until the connector is restarted - if (cache.containsKey(tableName)) { - tableList.add(cache.get(tableName)); + if (cache.containsKey(escapedTableName)) { + tableList.add(cache.get(escapedTableName)); continue; }