From 9422d67cd317617d004464951c3e4b003f58f57d Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 22 Jan 2025 16:56:18 -0500 Subject: [PATCH 01/11] Added logic to detect connector class(MySQL or PostgreSQL) and change log messages. --- .../embedded/cdc/DebeziumChangeEventCapture.java | 1 - .../executor/DebeziumOffsetManagement.java | 16 ++++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 3f102b4ed..670c5f2dc 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -2,7 +2,6 @@ import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java index b4d219102..3420d7546 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java @@ -1,6 +1,7 @@ package com.altinity.clickhouse.sink.connector.executor; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; +import com.altinity.clickhouse.sink.connector.common.ConnectorType; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import org.apache.commons.lang3.tuple.Pair; @@ -74,7 +75,7 @@ public static Pair calculateMinMaxTimestampFromBatch(List currentBatch) { boolean result = false; @@ -130,7 +131,7 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List batch) throws InterruptedException { + static synchronized void acknowledgeRecords(List batch, ConnectorType connectorType ) throws InterruptedException { // Acknowledge the records. // acknowledge records @@ -140,15 +141,18 @@ static synchronized void acknowledgeRecords(List batch) throws if (record.getCommitter() != null && record.getSourceRecord() != null) { record.getCommitter().markProcessed(record.getSourceRecord()); -// log.debug("***** Record successfully marked as processed ****" + "Binlog file:" + -// record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid() -// + "Sequence Number: " + record.getSequenceNumber() + "Debezium Timestamp: " + record.getDebezium_ts_ms()); if(record.isLastRecordInBatch()) { record.getCommitter().markBatchFinished(); - log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" + + + if(ConnectorType.MYSQL.getValue().equalsIgnoreCase(connectorType.getValue())) { + log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" + record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid() + " Sequence Number: " + record.getSequenceNumber() + " Debezium Timestamp: " + record.getDebezium_ts_ms()); + } else if(ConnectorType.POSTGRES.getValue().equalsIgnoreCase(connectorType.getValue())) { + log.info("***** BATCH marked as processed to debezium ****" + "LSN: " + record.getLsn() + + "Debezium Timestamp: " + record.getDebezium_ts_ms()); + } } } } From 80506f1757002a712ac9d0c4d8ef07f5cbf0a3b2 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 22 Jan 2025 16:57:22 -0500 Subject: [PATCH 02/11] Added logic to detect connector class(MySQL or PostgreSQL) and change log messages. --- .../sink/connector/common/ConnectorType.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java new file mode 100644 index 000000000..3beb3d6e1 --- /dev/null +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java @@ -0,0 +1,26 @@ +package com.altinity.clickhouse.sink.connector.common; + +import io.debezium.metadata.ConnectorDescriptor; + + + + public enum ConnectorType { + MYSQL("mysql"), + POSTGRES("postgres"); + + private final String value; + + ConnectorType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static ConnectorType fromString(String value) { + + ConnectorDescriptor.getDisplayNameForConnectorClass(value); + return ConnectorType.valueOf(value); + } + } From 6fd6fb72619cd45402d5957cb25839395e314e75 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 28 Jan 2025 11:19:08 -0500 Subject: [PATCH 03/11] Pass connectorType to acknowledgeRecords. --- .../sink/connector/ClickHouseSinkConnectorConfig.java | 2 +- .../sink/connector/executor/ClickHouseBatchRunnable.java | 7 +++++-- .../sink/connector/executor/DebeziumOffsetManagement.java | 6 +++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java index 11a9a0bd7..e93816da4 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java @@ -28,7 +28,7 @@ public class ClickHouseSinkConnectorConfig extends AbstractConfig { private static final Logger log = LogManager.getLogger(ClickHouseSinkConnectorConfig.class); // Configuration groups - + public static final String CONNECTOR_CLASS = "connector.class"; // Configuration group "clickhouse login info" private static final String CONFIG_GROUP_CLICKHOUSE_LOGIN_INFO = "ClickHouse Login Info"; // Configuration group "connector config" diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index 7d199b2fc..a7c5a44e8 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -2,6 +2,7 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; +import com.altinity.clickhouse.sink.connector.common.ConnectorType; import com.altinity.clickhouse.sink.connector.common.Metrics; import com.altinity.clickhouse.sink.connector.common.Utils; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; @@ -28,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import static com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig.CONNECTOR_CLASS; + /** * Runnable object that will be called on * a schedule to perform the batch insert of @@ -135,7 +138,7 @@ private DBCredentials parseDBConfiguration() { @Override public void run() { - + ConnectorType connectorType = ConnectorType.fromString(config.getString(CONNECTOR_CLASS)); Long taskId = config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString()); try { @@ -195,7 +198,7 @@ public void run() { if(result) { // Step 2: Check if the batch can be committed. - if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch)) { + if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch, connectorType)) { currentBatch = null; } } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java index 3420d7546..b138cd47c 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java @@ -101,7 +101,7 @@ static boolean checkIfThereAreInflightRequests(List currentBat return result; } - static synchronized public boolean checkIfBatchCanBeCommitted(List batch) throws InterruptedException { + static synchronized public boolean checkIfBatchCanBeCommitted(List batch, ConnectorType connectorType) throws InterruptedException { boolean result = false; if(true == checkIfThereAreInflightRequests(batch)) { @@ -112,13 +112,13 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List { if(false == checkIfThereAreInflightRequests(v)) { try { - acknowledgeRecords(v); + acknowledgeRecords(v, connectorType); } catch (InterruptedException e) { log.error("*** Error acknowlegeRecords ***", e); throw new RuntimeException(e); From 2e4efae509afacc92a2271ef7ec6f43ca9f9f8d3 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 28 Jan 2025 14:46:25 -0500 Subject: [PATCH 04/11] Added unit test to test get connector type call. --- sink-connector/pom.xml | 12 ++++++++++- .../ClickHouseSinkConnectorConfig.java | 8 +++++++ ...lickHouseSinkConnectorConfigVariables.java | 3 ++- .../sink/connector/common/ConnectorType.java | 13 ++++++++++-- .../executor/ClickHouseBatchRunnable.java | 13 +++++++++++- .../connector/common/SnowFlakeIdTest.java | 1 - .../executor/ClickHouseBatchRunnableTest.java | 21 ++++++++++++++++--- 7 files changed, 62 insertions(+), 9 deletions(-) diff --git a/sink-connector/pom.xml b/sink-connector/pom.xml index 597bb9865..3bc2fca9d 100644 --- a/sink-connector/pom.xml +++ b/sink-connector/pom.xml @@ -313,7 +313,17 @@ debezium-core ${version.debezium} - + + io.debezium + debezium-embedded + ${version.debezium} + + + org.slf4j + slf4j-api + + + org.projectlombok lombok diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java index e93816da4..635524944 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java @@ -91,6 +91,14 @@ public static String getProperty(final Map config, final String */ static ConfigDef newConfigDef() { return new ConfigDef() + .define( + ClickHouseSinkConnectorConfigVariables.CONNECTOR_CLASS.toString(), + Type.STRING, + "", + null, + Importance.HIGH, + "Connector class" + ) // Config Group "Connector config" .define( ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_TOPICS_TABLES_MAP.toString(), diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java index b4fddc77c..2140eee93 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java @@ -75,8 +75,9 @@ public enum ClickHouseSinkConnectorConfigVariables { REPLICA_STATUS_VIEW("replica.status.view"), MAX_QUEUE_SIZE("sink.connector.max.queue.size"), - SINGLE_THREADED("single.threaded"); + SINGLE_THREADED("single.threaded"), + CONNECTOR_CLASS("connector.class"); private String label; ClickHouseSinkConnectorConfigVariables(String s) { diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java index 3beb3d6e1..e3ed23da7 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java @@ -19,8 +19,17 @@ public String getValue() { } public static ConnectorType fromString(String value) { + ConnectorType connectorType = ConnectorType.MYSQL; - ConnectorDescriptor.getDisplayNameForConnectorClass(value); - return ConnectorType.valueOf(value); + String displayName = ConnectorDescriptor.getIdForConnectorClass(value); + if(displayName != null) { + connectorType =ConnectorType.valueOf(displayName); + if(displayName.contains(MYSQL.getValue())) { + connectorType = ConnectorType.MYSQL; + } else if(displayName.contains(POSTGRES.getValue())) { + connectorType = ConnectorType.POSTGRES; + } + } + return connectorType; } } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index a7c5a44e8..5e75c10d7 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -15,6 +15,7 @@ import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; import com.altinity.clickhouse.sink.connector.model.DBCredentials; import com.clickhouse.jdbc.ClickHouseConnection; +import io.debezium.embedded.EmbeddedEngineConfig; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.kafka.common.TopicPartition; import org.apache.logging.log4j.LogManager; @@ -130,6 +131,16 @@ private DBCredentials parseDBConfiguration() { return dbCredentials; } + ConnectorType getConnectorType() { + ConnectorType connectorType = ConnectorType.MYSQL; + + try { + connectorType = ConnectorType.fromString(config.getString(EmbeddedEngineConfig.CONNECTOR_CLASS.toString())); + } catch (Exception e) { + log.error("Error while getting connector type", e); + } + return connectorType; + } /** * Main run loop of the thread * which is called based on the schedule @@ -138,7 +149,7 @@ private DBCredentials parseDBConfiguration() { @Override public void run() { - ConnectorType connectorType = ConnectorType.fromString(config.getString(CONNECTOR_CLASS)); + ConnectorType connectorType = getConnectorType(); Long taskId = config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString()); try { diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java index eccf705a6..b4d47974e 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java @@ -1,6 +1,5 @@ package com.altinity.clickhouse.sink.connector.common; -import com.altinity.clickhouse.sink.connector.common.SnowFlakeId; import org.junit.Assert; import org.junit.jupiter.api.Test; diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java index b359c3c3a..e57a9170f 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java @@ -1,20 +1,21 @@ package com.altinity.clickhouse.sink.connector.executor; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.common.ConnectorType; import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; + import org.junit.Assert; -import org.junit.Before; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -24,7 +25,7 @@ public class ClickHouseBatchRunnableTest { LinkedBlockingQueue> records = new LinkedBlockingQueue<>(); Map topic2TableMap = new HashMap<>(); - @Before + @BeforeEach public void initTest() { @@ -82,6 +83,20 @@ public Struct getKafkaStruct() { return kafkaConnectStruct; } + @Test + public void testGetConnectorType() { + HashMap configMap = new HashMap<>(); + configMap.put("connector.class", "io.debezium.connector.mysql.MySqlConnector"); + + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap); + ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap); + + ConnectorType connectorType = run.getConnectorType(); + Assert.assertTrue(connectorType == ConnectorType.MYSQL); + } + + + @Test public void testGetTableNameFromTopic() { ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(new HashMap()); From 4d076318dd049807871e7d50b4e99b847e6de2d1 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 28 Jan 2025 14:57:15 -0500 Subject: [PATCH 05/11] Added unit test to test get connector type call. --- .../clickhouse/sink/connector/common/ConnectorType.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java index e3ed23da7..97745a79f 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java @@ -23,7 +23,7 @@ public static ConnectorType fromString(String value) { String displayName = ConnectorDescriptor.getIdForConnectorClass(value); if(displayName != null) { - connectorType =ConnectorType.valueOf(displayName); + //connectorType =ConnectorType.valueOf(displayName); if(displayName.contains(MYSQL.getValue())) { connectorType = ConnectorType.MYSQL; } else if(displayName.contains(POSTGRES.getValue())) { From 1004aa582957699a5e60dfa54c7474b4b9b94a9d Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 28 Jan 2025 15:01:18 -0500 Subject: [PATCH 06/11] Added unit test to test get connector type call for postgres. --- .../executor/ClickHouseBatchRunnableTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java index e57a9170f..6798bef24 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java @@ -95,7 +95,17 @@ public void testGetConnectorType() { Assert.assertTrue(connectorType == ConnectorType.MYSQL); } + @Test + public void testGetConnectorTypePostgres() { + HashMap configMap = new HashMap<>(); + configMap.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap); + ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap); + + ConnectorType connectorType = run.getConnectorType(); + Assert.assertTrue(connectorType == ConnectorType.POSTGRES); + } @Test public void testGetTableNameFromTopic() { From e74c1fa76d0e1ef58dd4a5c609142c3a99f64705 Mon Sep 17 00:00:00 2001 From: Selfeer Date: Mon, 3 Feb 2025 14:36:03 +0400 Subject: [PATCH 07/11] change kafka image --- sink-connector/tests/integration/env/kafka-service.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sink-connector/tests/integration/env/kafka-service.yml b/sink-connector/tests/integration/env/kafka-service.yml index 0724d6907..696b4eb5a 100644 --- a/sink-connector/tests/integration/env/kafka-service.yml +++ b/sink-connector/tests/integration/env/kafka-service.yml @@ -4,7 +4,7 @@ services: kafka: container_name: kafka hostname: kafka - image: redpandadata/redpanda + image: redpandadata/redpanda:v24.3.3 restart: "no" expose: - "19092" From 188d2686ea8a0905b36518eceef924cf8a29a5be Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 3 Feb 2025 11:49:30 -0500 Subject: [PATCH 08/11] Removed debezium-embedded dependency to get connector class. --- sink-connector/pom.xml | 11 ----------- .../connector/executor/ClickHouseBatchRunnable.java | 3 +-- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/sink-connector/pom.xml b/sink-connector/pom.xml index 3bc2fca9d..82a63dfb2 100644 --- a/sink-connector/pom.xml +++ b/sink-connector/pom.xml @@ -313,17 +313,6 @@ debezium-core ${version.debezium} - - io.debezium - debezium-embedded - ${version.debezium} - - - org.slf4j - slf4j-api - - - org.projectlombok lombok diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index 5e75c10d7..31032fdfb 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -15,7 +15,6 @@ import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; import com.altinity.clickhouse.sink.connector.model.DBCredentials; import com.clickhouse.jdbc.ClickHouseConnection; -import io.debezium.embedded.EmbeddedEngineConfig; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.kafka.common.TopicPartition; import org.apache.logging.log4j.LogManager; @@ -135,7 +134,7 @@ ConnectorType getConnectorType() { ConnectorType connectorType = ConnectorType.MYSQL; try { - connectorType = ConnectorType.fromString(config.getString(EmbeddedEngineConfig.CONNECTOR_CLASS.toString())); + connectorType = ConnectorType.fromString(config.getString("connector.class")); } catch (Exception e) { log.error("Error while getting connector type", e); } From dd57abcf23be0fcda067dfd651a0bcb5d459f90e Mon Sep 17 00:00:00 2001 From: selfeer Date: Tue, 4 Feb 2025 18:00:21 +0400 Subject: [PATCH 09/11] try to bring back old kafka version --- sink-connector/tests/integration/env/kafka-service.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sink-connector/tests/integration/env/kafka-service.yml b/sink-connector/tests/integration/env/kafka-service.yml index 696b4eb5a..0724d6907 100644 --- a/sink-connector/tests/integration/env/kafka-service.yml +++ b/sink-connector/tests/integration/env/kafka-service.yml @@ -4,7 +4,7 @@ services: kafka: container_name: kafka hostname: kafka - image: redpandadata/redpanda:v24.3.3 + image: redpandadata/redpanda restart: "no" expose: - "19092" From 24517d928fc62739563c2e290ec83d3868649def Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 16 Feb 2025 22:28:01 -0500 Subject: [PATCH 10/11] Skip connectorType check for kafka. --- .../sink/connector/executor/ClickHouseBatchRunnable.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index 31032fdfb..8e0970693 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -134,6 +134,12 @@ ConnectorType getConnectorType() { ConnectorType connectorType = ConnectorType.MYSQL; try { + String connectorClass = config.getString("connector.class"); + // For Kafka. connector.class -> com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector + if(connectorClass.contains("sink")) { + // Skip kafka check. + return connectorType; + } connectorType = ConnectorType.fromString(config.getString("connector.class")); } catch (Exception e) { log.error("Error while getting connector type", e); From 39ec975417149fe384b55bd3420f7b831544c617 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 16 Feb 2025 22:30:57 -0500 Subject: [PATCH 11/11] Set clickhouse server version to 23.8 --- sink-connector/deploy/docker/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sink-connector/deploy/docker/docker-compose.yaml b/sink-connector/deploy/docker/docker-compose.yaml index c7116086a..d674eb8d5 100755 --- a/sink-connector/deploy/docker/docker-compose.yaml +++ b/sink-connector/deploy/docker/docker-compose.yaml @@ -122,7 +122,7 @@ services: clickhouse: # clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test container_name: clickhouse - image: clickhouse/clickhouse-server:latest + image: clickhouse/clickhouse-server:23.8 restart: "no" depends_on: zookeeper: