From 6fe58db097a93d48e64d728e341ada82ac5160ca Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 6 Jun 2023 08:13:15 -0400 Subject: [PATCH 1/2] Thread improvements, process batch messages in thread pool --- doc/Alter_table_ddl.md | 0 sink-connector-lightweight/docker/config.yml | 4 + .../ClickHouseBatchProcessingThread.java | 20 ++++ ...ClickHouseDebeziumEmbeddedApplication.java | 3 +- .../cdc/DebeziumChangeEventCapture.java | 107 ++++++++++++++---- 5 files changed, 109 insertions(+), 25 deletions(-) create mode 100644 doc/Alter_table_ddl.md create mode 100644 sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseBatchProcessingThread.java diff --git a/doc/Alter_table_ddl.md b/doc/Alter_table_ddl.md new file mode 100644 index 000000000..e69de29bb diff --git a/sink-connector-lightweight/docker/config.yml b/sink-connector-lightweight/docker/config.yml index efbd2c358..7d7fa6fd0 100644 --- a/sink-connector-lightweight/docker/config.yml +++ b/sink-connector-lightweight/docker/config.yml @@ -1,11 +1,15 @@ name: "altinity_sink_connector" +#### MySQL information database.hostname: "mysql-master" database.port: "3306" database.user: "root" database.password: "root" database.server.name: "ER54" +### MySQL Database(to be replicated) database.include.list: sbtest +### MySQL Table filter #table.include.list=sbtest1 +### ClickHouse information clickhouse.server.url: "clickhouse" clickhouse.server.user: "root" clickhouse.server.pass: "root" diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseBatchProcessingThread.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseBatchProcessingThread.java new file mode 100644 index 000000000..82b9689dd --- /dev/null +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseBatchProcessingThread.java @@ -0,0 +1,20 @@ +package com.altinity.clickhouse.debezium.embedded; + +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable; +import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; + +import java.util.List; + +public class ClickHouseBatchProcessingThread extends ClickHouseBatchRunnable implements Runnable { + + + public ClickHouseBatchProcessingThread(List chStructs, ClickHouseSinkConnectorConfig config) { + super(null, config, null); + } + + @Override + public void run() { + + } +} diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java index f7ecb15ca..d027e735e 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java @@ -57,8 +57,9 @@ public static void main(String[] args) throws Exception { System.exit(-1); } } else { - props = injector.getInstance(ConfigurationService.class).parse(); + log.error("Error parsing configuration file, USAGE: java -jar "); + System.exit(-1); } ClickHouseDebeziumEmbeddedApplication csg = new ClickHouseDebeziumEmbeddedApplication(); diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 7ab33a191..80f42c25f 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -1,5 +1,6 @@ package com.altinity.clickhouse.debezium.embedded.cdc; +import com.altinity.clickhouse.debezium.embedded.ClickHouseBatchProcessingThread; import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; @@ -20,6 +21,7 @@ import io.debezium.embedded.Connect; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.spi.OffsetCommitPolicy; import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; @@ -29,13 +31,18 @@ import java.io.IOException; import java.sql.SQLException; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Properties; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * Setup Debezium engine with the configuration passed by the user @@ -46,7 +53,12 @@ public class DebeziumChangeEventCapture { private static final Logger log = LoggerFactory.getLogger(DebeziumChangeEventCapture.class); - private ClickHouseBatchExecutor executor; +// private ClickHouseBatchExecutor executor; + + private ThreadPoolExecutor executor; + + private final BlockingQueue sinkRecordsQueue = new LinkedBlockingQueue<>(); + private ClickHouseBatchRunnable runnable; @@ -119,6 +131,24 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr, Metrics.updateDdlMetrics(DDL, currentTime, elapsedTime, ddlProcessingResult); } + /** + * Function to process batch of sink records. + * @param props + * @param records + * @param debeziumRecordParserService + * @param config + */ + private void processBatch(Properties props, List> records, + DebeziumRecordParserService debeziumRecordParserService, + ClickHouseSinkConnectorConfig config) { + + List chStructs = records.stream().map(record -> processEveryChangeRecord(props, record, + debeziumRecordParserService, config)) + .collect(Collectors.toList()); + this.executor.execute(new ClickHouseBatchProcessingThread(chStructs)); + + + } /** * Function to process every change event record @@ -126,9 +156,10 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr, * * @param record ChangeEvent Record */ - private void processEveryChangeRecord(Properties props, ChangeEvent record, + private ClickHouseStruct processEveryChangeRecord(Properties props, ChangeEvent record, DebeziumRecordParserService debeziumRecordParserService, ClickHouseSinkConnectorConfig config) { + ClickHouseStruct chStruct = null; try { SourceRecord sr = record.value(); @@ -144,7 +175,7 @@ private void processEveryChangeRecord(Properties props, ChangeEvent schemaFields = struct.schema().fields(); if (schemaFields == null) { - return; + return null; } Field matchingDDLField = schemaFields.stream() .filter(f -> "DDL".equalsIgnoreCase(f.name())) @@ -164,13 +195,18 @@ private void processEveryChangeRecord(Properties props, ChangeEvent queue = new ConcurrentLinkedQueue(); if (chStruct != null) { @@ -178,7 +214,7 @@ private void processEveryChangeRecord(Properties props, ChangeEvent> changeEventBuilder = DebeziumEngine.create(Connect.class); changeEventBuilder.using(props); - changeEventBuilder.notifying(record -> { - processEveryChangeRecord(props, record, debeziumRecordParserService, config); - + changeEventBuilder.notifying(new DebeziumEngine.ChangeConsumer>() { + @Override + public void handleBatch(List> records, + DebeziumEngine.RecordCommitter> committer) throws InterruptedException { + + //for(ChangeEvent record : records) { + processBatch(props, records, debeziumRecordParserService, config); + //} + for (ChangeEvent record : records) { + committer.markProcessed(record); + } + } }); +// changeEventBuilder.notifying(record -> { +// processEveryChangeRecord(props, record, debeziumRecordParserService, config); +// +// }); this.engine = changeEventBuilder .using(new DebeziumConnectorCallback()).using(new DebeziumEngine.CompletionCallback() { @Override @@ -364,11 +415,17 @@ private DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config) private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLParserService ddlParserService) { // Setup separate thread to read messages from shared buffer. this.records = new ConcurrentHashMap<>(); - this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap()); - this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString())); - this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS); +// this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap()); +// this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString())); +// this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS); + + int maxThreadPoolSize = config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); + this.executor = new ThreadPoolExecutor(maxThreadPoolSize, maxThreadPoolSize, 30, + TimeUnit.SECONDS, sinkRecordsQueue, + new ThreadPoolExecutor.CallerRunsPolicy()); } + /** * Function to write the transformed * records to shared queue. @@ -377,17 +434,19 @@ private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLPars * @param chs */ private void addRecordsToSharedBuffer(String topicName, ClickHouseStruct chs) { - ConcurrentLinkedQueue structs; - if (this.records.containsKey(topicName)) { - structs = this.records.get(topicName); - } else { - structs = new ConcurrentLinkedQueue<>(); - } - structs.add(chs); - synchronized (this.records) { - this.records.put(topicName, structs); - } + + // ConcurrentLinkedQueue structs; + +// if (this.records.containsKey(topicName)) { +// structs = this.records.get(topicName); +// } else { +// structs = new ConcurrentLinkedQueue<>(); +// } +// structs.add(chs); +// synchronized (this.records) { +// this.records.put(topicName, structs); +// } } // db.items.insert({_id:ObjectId(), uuid:ObjectId(), price:22, name:"New record"}); From bf6b9095fdc567e654aa2fa01590e9f528c7468b Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 9 Jun 2023 11:13:19 -0400 Subject: [PATCH 2/2] Changes to process batch of records received from debezium --- sink-connector-lightweight/docker/config.yml | 1 + .../ClickHouseBatchProcessingThread.java | 67 ++++++++++++++++++- .../cdc/DebeziumChangeEventCapture.java | 7 +- .../sink/connector/db/DbWriter.java | 11 +-- .../executor/ClickHouseBatchRunnable.java | 2 +- 5 files changed, 78 insertions(+), 10 deletions(-) diff --git a/sink-connector-lightweight/docker/config.yml b/sink-connector-lightweight/docker/config.yml index 7d7fa6fd0..329a891c6 100644 --- a/sink-connector-lightweight/docker/config.yml +++ b/sink-connector-lightweight/docker/config.yml @@ -46,3 +46,4 @@ schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exis schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history" enable.snapshot.ddl: "false" +thread.pool.size: 100 \ No newline at end of file diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseBatchProcessingThread.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseBatchProcessingThread.java index 82b9689dd..331f93e0b 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseBatchProcessingThread.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseBatchProcessingThread.java @@ -1,20 +1,85 @@ package com.altinity.clickhouse.debezium.embedded; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.DbWriter; import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable; +import com.altinity.clickhouse.sink.connector.model.BlockMetaData; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class ClickHouseBatchProcessingThread extends ClickHouseBatchRunnable implements Runnable { + private static final Logger log = LoggerFactory.getLogger(ClickHouseBatchRunnable.class); + + private List chStructs; + + + private Map>, List>> topicToRecordsMap = null; public ClickHouseBatchProcessingThread(List chStructs, ClickHouseSinkConnectorConfig config) { super(null, config, null); + this.chStructs = chStructs; + topicToRecordsMap = new HashMap<>(); } - @Override public void run() { + if (chStructs == null || chStructs.isEmpty()) { + return; + } + ClickHouseStruct topRecord = chStructs.get(0); + if (topRecord == null) { + return; + } + + String topicName = topRecord.getTopic(); + + if (this.chStructs != null) { + //The user parameter will override the topic mapping to table. + String tableName = getTableFromTopic(topicName); + DbWriter writer = getDbWriterForTable(topicName, tableName, topRecord); + + if (writer == null || writer.wasTableMetaDataRetrieved() == false) { + log.error("*** TABLE METADATA not retrieved, retry next time"); + return; + } + // Step 1: The Batch Insert with preparedStatement in JDBC + // works by forming the Query and then adding records to the Batch. + // This step creates a Map of Query -> Records(List of ClickHouseStruct) + Map>, List> queryToRecordsMap; + + if (topicToRecordsMap.containsKey(topicName)) { + queryToRecordsMap = topicToRecordsMap.get(topicName); + } else { + queryToRecordsMap = new HashMap<>(); + topicToRecordsMap.put(topicName, queryToRecordsMap); + } + + try { + Map partitionToOffsetMap = writer.groupQueryWithRecords(chStructs, queryToRecordsMap); + BlockMetaData bmd = new BlockMetaData(); + + try { + if (flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd)) { + // Remove the entry. + queryToRecordsMap.remove(topicName); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + } catch (Exception e) { + log.error("Error processing records in Thread" + Thread.currentThread().getName(), e); + } + } } } diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 80f42c25f..4426a0641 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -145,7 +145,7 @@ private void processBatch(Properties props, List chStructs = records.stream().map(record -> processEveryChangeRecord(props, record, debeziumRecordParserService, config)) .collect(Collectors.toList()); - this.executor.execute(new ClickHouseBatchProcessingThread(chStructs)); + this.executor.execute(new ClickHouseBatchProcessingThread(chStructs, config)); } @@ -196,7 +196,7 @@ private ClickHouseStruct processEveryChangeRecord(Properties props, ChangeEvent< performDDLOperation(DDL, props, sr, config); int maxThreadPoolSize = config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); - this.executor = new ThreadPoolExecutor(maxThreadPoolSize, maxThreadPoolSize, 30, + this.executor = new ThreadPoolExecutor(100, 100, 30, TimeUnit.SECONDS, sinkRecordsQueue, new ThreadPoolExecutor.CallerRunsPolicy()); // this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString())); @@ -420,7 +420,8 @@ private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLPars // this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS); int maxThreadPoolSize = config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); - this.executor = new ThreadPoolExecutor(maxThreadPoolSize, maxThreadPoolSize, 30, + log.info("Setting up thread pool with size:" + maxThreadPoolSize); + this.executor = new ThreadPoolExecutor(100, 100, 30, TimeUnit.SECONDS, sinkRecordsQueue, new ThreadPoolExecutor.CallerRunsPolicy()); } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index e17ec2811..72e85882a 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -27,6 +27,7 @@ import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -253,8 +254,8 @@ private void updatePartitionOffsetMap(Map offsetToPartitio * @param records * @return */ - public Map groupQueryWithRecords(ConcurrentLinkedQueue records, - Map>, + public Map groupQueryWithRecords(Collection records, + Map>, List> queryToRecordsMap) { @@ -274,11 +275,11 @@ public Map groupQueryWithRecords(ConcurrentLinkedQueue>, + protected boolean flushRecordsToClickHouse(String topicName, DbWriter writer, Map>, List> queryToRecordsMap, BlockMetaData bmd) throws SQLException { boolean result = false;