diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
index 3f102b4ed..670c5f2dc 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
@@ -2,7 +2,6 @@
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
-import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
diff --git a/sink-connector/deploy/docker/docker-compose.yaml b/sink-connector/deploy/docker/docker-compose.yaml
index c7116086a..d674eb8d5 100755
--- a/sink-connector/deploy/docker/docker-compose.yaml
+++ b/sink-connector/deploy/docker/docker-compose.yaml
@@ -122,7 +122,7 @@ services:
clickhouse:
# clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test
container_name: clickhouse
- image: clickhouse/clickhouse-server:latest
+ image: clickhouse/clickhouse-server:23.8
restart: "no"
depends_on:
zookeeper:
diff --git a/sink-connector/pom.xml b/sink-connector/pom.xml
index 597bb9865..82a63dfb2 100644
--- a/sink-connector/pom.xml
+++ b/sink-connector/pom.xml
@@ -313,7 +313,6 @@
debezium-core
${version.debezium}
-
org.projectlombok
lombok
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java
index 11a9a0bd7..635524944 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java
@@ -28,7 +28,7 @@ public class ClickHouseSinkConnectorConfig extends AbstractConfig {
private static final Logger log = LogManager.getLogger(ClickHouseSinkConnectorConfig.class);
// Configuration groups
-
+ public static final String CONNECTOR_CLASS = "connector.class";
// Configuration group "clickhouse login info"
private static final String CONFIG_GROUP_CLICKHOUSE_LOGIN_INFO = "ClickHouse Login Info";
// Configuration group "connector config"
@@ -91,6 +91,14 @@ public static String getProperty(final Map config, final String
*/
static ConfigDef newConfigDef() {
return new ConfigDef()
+ .define(
+ ClickHouseSinkConnectorConfigVariables.CONNECTOR_CLASS.toString(),
+ Type.STRING,
+ "",
+ null,
+ Importance.HIGH,
+ "Connector class"
+ )
// Config Group "Connector config"
.define(
ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_TOPICS_TABLES_MAP.toString(),
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
index b4fddc77c..2140eee93 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
@@ -75,8 +75,9 @@ public enum ClickHouseSinkConnectorConfigVariables {
REPLICA_STATUS_VIEW("replica.status.view"),
MAX_QUEUE_SIZE("sink.connector.max.queue.size"),
- SINGLE_THREADED("single.threaded");
+ SINGLE_THREADED("single.threaded"),
+ CONNECTOR_CLASS("connector.class");
private String label;
ClickHouseSinkConnectorConfigVariables(String s) {
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java
new file mode 100644
index 000000000..97745a79f
--- /dev/null
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java
@@ -0,0 +1,35 @@
+package com.altinity.clickhouse.sink.connector.common;
+
+import io.debezium.metadata.ConnectorDescriptor;
+
+
+
+ public enum ConnectorType {
+ MYSQL("mysql"),
+ POSTGRES("postgres");
+
+ private final String value;
+
+ ConnectorType(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public static ConnectorType fromString(String value) {
+ ConnectorType connectorType = ConnectorType.MYSQL;
+
+ String displayName = ConnectorDescriptor.getIdForConnectorClass(value);
+ if(displayName != null) {
+ //connectorType =ConnectorType.valueOf(displayName);
+ if(displayName.contains(MYSQL.getValue())) {
+ connectorType = ConnectorType.MYSQL;
+ } else if(displayName.contains(POSTGRES.getValue())) {
+ connectorType = ConnectorType.POSTGRES;
+ }
+ }
+ return connectorType;
+ }
+ }
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java
index 7d199b2fc..8e0970693 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java
@@ -2,6 +2,7 @@
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
+import com.altinity.clickhouse.sink.connector.common.ConnectorType;
import com.altinity.clickhouse.sink.connector.common.Metrics;
import com.altinity.clickhouse.sink.connector.common.Utils;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
@@ -28,6 +29,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import static com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig.CONNECTOR_CLASS;
+
/**
* Runnable object that will be called on
* a schedule to perform the batch insert of
@@ -127,6 +130,22 @@ private DBCredentials parseDBConfiguration() {
return dbCredentials;
}
+ ConnectorType getConnectorType() {
+ ConnectorType connectorType = ConnectorType.MYSQL;
+
+ try {
+ String connectorClass = config.getString("connector.class");
+ // For Kafka. connector.class -> com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector
+ if(connectorClass.contains("sink")) {
+ // Skip kafka check.
+ return connectorType;
+ }
+ connectorType = ConnectorType.fromString(config.getString("connector.class"));
+ } catch (Exception e) {
+ log.error("Error while getting connector type", e);
+ }
+ return connectorType;
+ }
/**
* Main run loop of the thread
* which is called based on the schedule
@@ -135,7 +154,7 @@ private DBCredentials parseDBConfiguration() {
@Override
public void run() {
-
+ ConnectorType connectorType = getConnectorType();
Long taskId = config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());
try {
@@ -195,7 +214,7 @@ public void run() {
if(result) {
// Step 2: Check if the batch can be committed.
- if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch)) {
+ if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch, connectorType)) {
currentBatch = null;
}
}
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java
index b4d219102..b138cd47c 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java
@@ -1,6 +1,7 @@
package com.altinity.clickhouse.sink.connector.executor;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
+import com.altinity.clickhouse.sink.connector.common.ConnectorType;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import org.apache.commons.lang3.tuple.Pair;
@@ -74,7 +75,7 @@ public static Pair calculateMinMaxTimestampFromBatch(List currentBatch) {
boolean result = false;
@@ -100,7 +101,7 @@ static boolean checkIfThereAreInflightRequests(List currentBat
return result;
}
- static synchronized public boolean checkIfBatchCanBeCommitted(List batch) throws InterruptedException {
+ static synchronized public boolean checkIfBatchCanBeCommitted(List batch, ConnectorType connectorType) throws InterruptedException {
boolean result = false;
if(true == checkIfThereAreInflightRequests(batch)) {
@@ -111,13 +112,13 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List {
if(false == checkIfThereAreInflightRequests(v)) {
try {
- acknowledgeRecords(v);
+ acknowledgeRecords(v, connectorType);
} catch (InterruptedException e) {
log.error("*** Error acknowlegeRecords ***", e);
throw new RuntimeException(e);
@@ -130,7 +131,7 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List batch) throws InterruptedException {
+ static synchronized void acknowledgeRecords(List batch, ConnectorType connectorType ) throws InterruptedException {
// Acknowledge the records.
// acknowledge records
@@ -140,15 +141,18 @@ static synchronized void acknowledgeRecords(List batch) throws
if (record.getCommitter() != null && record.getSourceRecord() != null) {
record.getCommitter().markProcessed(record.getSourceRecord());
-// log.debug("***** Record successfully marked as processed ****" + "Binlog file:" +
-// record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid()
-// + "Sequence Number: " + record.getSequenceNumber() + "Debezium Timestamp: " + record.getDebezium_ts_ms());
if(record.isLastRecordInBatch()) {
record.getCommitter().markBatchFinished();
- log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" +
+
+ if(ConnectorType.MYSQL.getValue().equalsIgnoreCase(connectorType.getValue())) {
+ log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" +
record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid()
+ " Sequence Number: " + record.getSequenceNumber() + " Debezium Timestamp: " + record.getDebezium_ts_ms());
+ } else if(ConnectorType.POSTGRES.getValue().equalsIgnoreCase(connectorType.getValue())) {
+ log.info("***** BATCH marked as processed to debezium ****" + "LSN: " + record.getLsn()
+ + "Debezium Timestamp: " + record.getDebezium_ts_ms());
+ }
}
}
}
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java
index eccf705a6..b4d47974e 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java
@@ -1,6 +1,5 @@
package com.altinity.clickhouse.sink.connector.common;
-import com.altinity.clickhouse.sink.connector.common.SnowFlakeId;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java
index b359c3c3a..6798bef24 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java
@@ -1,20 +1,21 @@
package com.altinity.clickhouse.sink.connector.executor;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.common.ConnectorType;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
+
import org.junit.Assert;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -24,7 +25,7 @@ public class ClickHouseBatchRunnableTest {
LinkedBlockingQueue> records = new LinkedBlockingQueue<>();
Map topic2TableMap = new HashMap<>();
- @Before
+ @BeforeEach
public void initTest() {
@@ -82,6 +83,30 @@ public Struct getKafkaStruct() {
return kafkaConnectStruct;
}
+ @Test
+ public void testGetConnectorType() {
+ HashMap configMap = new HashMap<>();
+ configMap.put("connector.class", "io.debezium.connector.mysql.MySqlConnector");
+
+ ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap);
+ ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap);
+
+ ConnectorType connectorType = run.getConnectorType();
+ Assert.assertTrue(connectorType == ConnectorType.MYSQL);
+ }
+
+ @Test
+ public void testGetConnectorTypePostgres() {
+ HashMap configMap = new HashMap<>();
+ configMap.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
+
+ ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap);
+ ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap);
+
+ ConnectorType connectorType = run.getConnectorType();
+ Assert.assertTrue(connectorType == ConnectorType.POSTGRES);
+ }
+
@Test
public void testGetTableNameFromTopic() {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(new HashMap());