Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logic to detect connector class(MySQL or PostgreSQL) and change offset commit message #975

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
Expand Down
2 changes: 1 addition & 1 deletion sink-connector/deploy/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ services:
clickhouse:
# clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test
container_name: clickhouse
image: clickhouse/clickhouse-server:latest
image: clickhouse/clickhouse-server:23.8
restart: "no"
depends_on:
zookeeper:
Expand Down
1 change: 0 additions & 1 deletion sink-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ClickHouseSinkConnectorConfig extends AbstractConfig {
private static final Logger log = LogManager.getLogger(ClickHouseSinkConnectorConfig.class);

// Configuration groups

public static final String CONNECTOR_CLASS = "connector.class";
// Configuration group "clickhouse login info"
private static final String CONFIG_GROUP_CLICKHOUSE_LOGIN_INFO = "ClickHouse Login Info";
// Configuration group "connector config"
Expand Down Expand Up @@ -91,6 +91,14 @@ public static String getProperty(final Map<String, String> config, final String
*/
static ConfigDef newConfigDef() {
return new ConfigDef()
.define(
ClickHouseSinkConnectorConfigVariables.CONNECTOR_CLASS.toString(),
Type.STRING,
"",
null,
Importance.HIGH,
"Connector class"
)
// Config Group "Connector config"
.define(
ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_TOPICS_TABLES_MAP.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public enum ClickHouseSinkConnectorConfigVariables {
REPLICA_STATUS_VIEW("replica.status.view"),
MAX_QUEUE_SIZE("sink.connector.max.queue.size"),

SINGLE_THREADED("single.threaded");
SINGLE_THREADED("single.threaded"),

CONNECTOR_CLASS("connector.class");
private String label;

ClickHouseSinkConnectorConfigVariables(String s) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.altinity.clickhouse.sink.connector.common;

import io.debezium.metadata.ConnectorDescriptor;



public enum ConnectorType {
MYSQL("mysql"),
POSTGRES("postgres");

private final String value;

ConnectorType(String value) {
this.value = value;
}

public String getValue() {
return value;
}

public static ConnectorType fromString(String value) {
ConnectorType connectorType = ConnectorType.MYSQL;

String displayName = ConnectorDescriptor.getIdForConnectorClass(value);
if(displayName != null) {
//connectorType =ConnectorType.valueOf(displayName);
if(displayName.contains(MYSQL.getValue())) {
connectorType = ConnectorType.MYSQL;
} else if(displayName.contains(POSTGRES.getValue())) {
connectorType = ConnectorType.POSTGRES;
}
}
return connectorType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.common.ConnectorType;
import com.altinity.clickhouse.sink.connector.common.Metrics;
import com.altinity.clickhouse.sink.connector.common.Utils;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
Expand All @@ -28,6 +29,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import static com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig.CONNECTOR_CLASS;

/**
* Runnable object that will be called on
* a schedule to perform the batch insert of
Expand Down Expand Up @@ -127,6 +130,22 @@ private DBCredentials parseDBConfiguration() {
return dbCredentials;
}

ConnectorType getConnectorType() {
ConnectorType connectorType = ConnectorType.MYSQL;

try {
String connectorClass = config.getString("connector.class");
// For Kafka. connector.class -> com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector
if(connectorClass.contains("sink")) {
// Skip kafka check.
return connectorType;
}
connectorType = ConnectorType.fromString(config.getString("connector.class"));
} catch (Exception e) {
log.error("Error while getting connector type", e);
}
return connectorType;
}
/**
* Main run loop of the thread
* which is called based on the schedule
Expand All @@ -135,7 +154,7 @@ private DBCredentials parseDBConfiguration() {
@Override
public void run() {


ConnectorType connectorType = getConnectorType();
Long taskId = config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());
try {

Expand Down Expand Up @@ -195,7 +214,7 @@ public void run() {

if(result) {
// Step 2: Check if the batch can be committed.
if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch)) {
if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch, connectorType)) {
currentBatch = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.altinity.clickhouse.sink.connector.executor;

import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.altinity.clickhouse.sink.connector.common.ConnectorType;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -74,7 +75,7 @@ public static Pair<Long, Long> calculateMinMaxTimestampFromBatch(List<ClickHouse
/**
* Function to check if there are inflight requests that are within the range of the
* current batch.
* @param batch
* @param currentBatch
*/
static boolean checkIfThereAreInflightRequests(List<ClickHouseStruct> currentBatch) {
boolean result = false;
Expand All @@ -100,7 +101,7 @@ static boolean checkIfThereAreInflightRequests(List<ClickHouseStruct> currentBat
return result;
}

static synchronized public boolean checkIfBatchCanBeCommitted(List<ClickHouseStruct> batch) throws InterruptedException {
static synchronized public boolean checkIfBatchCanBeCommitted(List<ClickHouseStruct> batch, ConnectorType connectorType) throws InterruptedException {
boolean result = false;

if(true == checkIfThereAreInflightRequests(batch)) {
Expand All @@ -111,13 +112,13 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List<ClickHouseStr
completedBatches.put(pair, batch);
} else {
// Acknowledge current batch
acknowledgeRecords(batch);
acknowledgeRecords(batch, connectorType);
result = true;
// Check if completed batch can also be acknowledged.
completedBatches.forEach((k, v) -> {
if(false == checkIfThereAreInflightRequests(v)) {
try {
acknowledgeRecords(v);
acknowledgeRecords(v, connectorType);
} catch (InterruptedException e) {
log.error("*** Error acknowlegeRecords ***", e);
throw new RuntimeException(e);
Expand All @@ -130,7 +131,7 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List<ClickHouseStr
return result;
}

static synchronized void acknowledgeRecords(List<ClickHouseStruct> batch) throws InterruptedException {
static synchronized void acknowledgeRecords(List<ClickHouseStruct> batch, ConnectorType connectorType ) throws InterruptedException {
// Acknowledge the records.

// acknowledge records
Expand All @@ -140,15 +141,18 @@ static synchronized void acknowledgeRecords(List<ClickHouseStruct> batch) throws
if (record.getCommitter() != null && record.getSourceRecord() != null) {

record.getCommitter().markProcessed(record.getSourceRecord());
// log.debug("***** Record successfully marked as processed ****" + "Binlog file:" +
// record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid()
// + "Sequence Number: " + record.getSequenceNumber() + "Debezium Timestamp: " + record.getDebezium_ts_ms());

if(record.isLastRecordInBatch()) {
record.getCommitter().markBatchFinished();
log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" +

if(ConnectorType.MYSQL.getValue().equalsIgnoreCase(connectorType.getValue())) {
log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" +
record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid()
+ " Sequence Number: " + record.getSequenceNumber() + " Debezium Timestamp: " + record.getDebezium_ts_ms());
} else if(ConnectorType.POSTGRES.getValue().equalsIgnoreCase(connectorType.getValue())) {
log.info("***** BATCH marked as processed to debezium ****" + "LSN: " + record.getLsn()
+ "Debezium Timestamp: " + record.getDebezium_ts_ms());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.altinity.clickhouse.sink.connector.common;

import com.altinity.clickhouse.sink.connector.common.SnowFlakeId;
import org.junit.Assert;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package com.altinity.clickhouse.sink.connector.executor;

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.common.ConnectorType;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

import org.junit.Assert;
import org.junit.Before;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

Expand All @@ -24,7 +25,7 @@ public class ClickHouseBatchRunnableTest {
LinkedBlockingQueue<List<ClickHouseStruct>> records = new LinkedBlockingQueue<>();
Map<String, String> topic2TableMap = new HashMap<>();

@Before
@BeforeEach
public void initTest() {


Expand Down Expand Up @@ -82,6 +83,30 @@ public Struct getKafkaStruct() {
return kafkaConnectStruct;
}

@Test
public void testGetConnectorType() {
HashMap<String, String> configMap = new HashMap<>();
configMap.put("connector.class", "io.debezium.connector.mysql.MySqlConnector");

ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap);
ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap);

ConnectorType connectorType = run.getConnectorType();
Assert.assertTrue(connectorType == ConnectorType.MYSQL);
}

@Test
public void testGetConnectorTypePostgres() {
HashMap<String, String> configMap = new HashMap<>();
configMap.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");

ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap);
ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap);

ConnectorType connectorType = run.getConnectorType();
Assert.assertTrue(connectorType == ConnectorType.POSTGRES);
}

@Test
public void testGetTableNameFromTopic() {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(new HashMap<String, String>());
Expand Down
Loading