diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 3f102b4ed..670c5f2dc 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -2,7 +2,6 @@ import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; diff --git a/sink-connector/deploy/docker/docker-compose.yaml b/sink-connector/deploy/docker/docker-compose.yaml index c7116086a..d674eb8d5 100755 --- a/sink-connector/deploy/docker/docker-compose.yaml +++ b/sink-connector/deploy/docker/docker-compose.yaml @@ -122,7 +122,7 @@ services: clickhouse: # clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test container_name: clickhouse - image: clickhouse/clickhouse-server:latest + image: clickhouse/clickhouse-server:23.8 restart: "no" depends_on: zookeeper: diff --git a/sink-connector/pom.xml b/sink-connector/pom.xml index 597bb9865..82a63dfb2 100644 --- a/sink-connector/pom.xml +++ b/sink-connector/pom.xml @@ -313,7 +313,6 @@ debezium-core ${version.debezium} - org.projectlombok lombok diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java index 11a9a0bd7..635524944 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java @@ -28,7 +28,7 @@ public class ClickHouseSinkConnectorConfig extends AbstractConfig { private static final Logger log = LogManager.getLogger(ClickHouseSinkConnectorConfig.class); // Configuration groups - + public static final String CONNECTOR_CLASS = "connector.class"; // Configuration group "clickhouse login info" private static final String CONFIG_GROUP_CLICKHOUSE_LOGIN_INFO = "ClickHouse Login Info"; // Configuration group "connector config" @@ -91,6 +91,14 @@ public static String getProperty(final Map config, final String */ static ConfigDef newConfigDef() { return new ConfigDef() + .define( + ClickHouseSinkConnectorConfigVariables.CONNECTOR_CLASS.toString(), + Type.STRING, + "", + null, + Importance.HIGH, + "Connector class" + ) // Config Group "Connector config" .define( ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_TOPICS_TABLES_MAP.toString(), diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java index b4fddc77c..2140eee93 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java @@ -75,8 +75,9 @@ public enum ClickHouseSinkConnectorConfigVariables { REPLICA_STATUS_VIEW("replica.status.view"), MAX_QUEUE_SIZE("sink.connector.max.queue.size"), - SINGLE_THREADED("single.threaded"); + SINGLE_THREADED("single.threaded"), + CONNECTOR_CLASS("connector.class"); private String label; ClickHouseSinkConnectorConfigVariables(String s) { diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java new file mode 100644 index 000000000..97745a79f --- /dev/null +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/ConnectorType.java @@ -0,0 +1,35 @@ +package com.altinity.clickhouse.sink.connector.common; + +import io.debezium.metadata.ConnectorDescriptor; + + + + public enum ConnectorType { + MYSQL("mysql"), + POSTGRES("postgres"); + + private final String value; + + ConnectorType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static ConnectorType fromString(String value) { + ConnectorType connectorType = ConnectorType.MYSQL; + + String displayName = ConnectorDescriptor.getIdForConnectorClass(value); + if(displayName != null) { + //connectorType =ConnectorType.valueOf(displayName); + if(displayName.contains(MYSQL.getValue())) { + connectorType = ConnectorType.MYSQL; + } else if(displayName.contains(POSTGRES.getValue())) { + connectorType = ConnectorType.POSTGRES; + } + } + return connectorType; + } + } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index 7d199b2fc..8e0970693 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -2,6 +2,7 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; +import com.altinity.clickhouse.sink.connector.common.ConnectorType; import com.altinity.clickhouse.sink.connector.common.Metrics; import com.altinity.clickhouse.sink.connector.common.Utils; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; @@ -28,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import static com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig.CONNECTOR_CLASS; + /** * Runnable object that will be called on * a schedule to perform the batch insert of @@ -127,6 +130,22 @@ private DBCredentials parseDBConfiguration() { return dbCredentials; } + ConnectorType getConnectorType() { + ConnectorType connectorType = ConnectorType.MYSQL; + + try { + String connectorClass = config.getString("connector.class"); + // For Kafka. connector.class -> com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector + if(connectorClass.contains("sink")) { + // Skip kafka check. + return connectorType; + } + connectorType = ConnectorType.fromString(config.getString("connector.class")); + } catch (Exception e) { + log.error("Error while getting connector type", e); + } + return connectorType; + } /** * Main run loop of the thread * which is called based on the schedule @@ -135,7 +154,7 @@ private DBCredentials parseDBConfiguration() { @Override public void run() { - + ConnectorType connectorType = getConnectorType(); Long taskId = config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString()); try { @@ -195,7 +214,7 @@ public void run() { if(result) { // Step 2: Check if the batch can be committed. - if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch)) { + if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch, connectorType)) { currentBatch = null; } } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java index b4d219102..b138cd47c 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java @@ -1,6 +1,7 @@ package com.altinity.clickhouse.sink.connector.executor; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; +import com.altinity.clickhouse.sink.connector.common.ConnectorType; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import org.apache.commons.lang3.tuple.Pair; @@ -74,7 +75,7 @@ public static Pair calculateMinMaxTimestampFromBatch(List currentBatch) { boolean result = false; @@ -100,7 +101,7 @@ static boolean checkIfThereAreInflightRequests(List currentBat return result; } - static synchronized public boolean checkIfBatchCanBeCommitted(List batch) throws InterruptedException { + static synchronized public boolean checkIfBatchCanBeCommitted(List batch, ConnectorType connectorType) throws InterruptedException { boolean result = false; if(true == checkIfThereAreInflightRequests(batch)) { @@ -111,13 +112,13 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List { if(false == checkIfThereAreInflightRequests(v)) { try { - acknowledgeRecords(v); + acknowledgeRecords(v, connectorType); } catch (InterruptedException e) { log.error("*** Error acknowlegeRecords ***", e); throw new RuntimeException(e); @@ -130,7 +131,7 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List batch) throws InterruptedException { + static synchronized void acknowledgeRecords(List batch, ConnectorType connectorType ) throws InterruptedException { // Acknowledge the records. // acknowledge records @@ -140,15 +141,18 @@ static synchronized void acknowledgeRecords(List batch) throws if (record.getCommitter() != null && record.getSourceRecord() != null) { record.getCommitter().markProcessed(record.getSourceRecord()); -// log.debug("***** Record successfully marked as processed ****" + "Binlog file:" + -// record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid() -// + "Sequence Number: " + record.getSequenceNumber() + "Debezium Timestamp: " + record.getDebezium_ts_ms()); if(record.isLastRecordInBatch()) { record.getCommitter().markBatchFinished(); - log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" + + + if(ConnectorType.MYSQL.getValue().equalsIgnoreCase(connectorType.getValue())) { + log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" + record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid() + " Sequence Number: " + record.getSequenceNumber() + " Debezium Timestamp: " + record.getDebezium_ts_ms()); + } else if(ConnectorType.POSTGRES.getValue().equalsIgnoreCase(connectorType.getValue())) { + log.info("***** BATCH marked as processed to debezium ****" + "LSN: " + record.getLsn() + + "Debezium Timestamp: " + record.getDebezium_ts_ms()); + } } } } diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java index eccf705a6..b4d47974e 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/common/SnowFlakeIdTest.java @@ -1,6 +1,5 @@ package com.altinity.clickhouse.sink.connector.common; -import com.altinity.clickhouse.sink.connector.common.SnowFlakeId; import org.junit.Assert; import org.junit.jupiter.api.Test; diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java index b359c3c3a..6798bef24 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java @@ -1,20 +1,21 @@ package com.altinity.clickhouse.sink.connector.executor; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.common.ConnectorType; import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; + import org.junit.Assert; -import org.junit.Before; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -24,7 +25,7 @@ public class ClickHouseBatchRunnableTest { LinkedBlockingQueue> records = new LinkedBlockingQueue<>(); Map topic2TableMap = new HashMap<>(); - @Before + @BeforeEach public void initTest() { @@ -82,6 +83,30 @@ public Struct getKafkaStruct() { return kafkaConnectStruct; } + @Test + public void testGetConnectorType() { + HashMap configMap = new HashMap<>(); + configMap.put("connector.class", "io.debezium.connector.mysql.MySqlConnector"); + + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap); + ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap); + + ConnectorType connectorType = run.getConnectorType(); + Assert.assertTrue(connectorType == ConnectorType.MYSQL); + } + + @Test + public void testGetConnectorTypePostgres() { + HashMap configMap = new HashMap<>(); + configMap.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap); + ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap); + + ConnectorType connectorType = run.getConnectorType(); + Assert.assertTrue(connectorType == ConnectorType.POSTGRES); + } + @Test public void testGetTableNameFromTopic() { ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(new HashMap());