Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread improvements, process batch messages in thread pool #262

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added doc/Alter_table_ddl.md
Empty file.
5 changes: 5 additions & 0 deletions sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
name: "altinity_sink_connector"
#### MySQL information
database.hostname: "mysql-master"
database.port: "3306"
database.user: "root"
database.password: "root"
database.server.name: "ER54"
### MySQL Database(to be replicated)
database.include.list: sbtest
### MySQL Table filter
#table.include.list=sbtest1
### ClickHouse information
clickhouse.server.url: "clickhouse"
clickhouse.server.user: "root"
clickhouse.server.pass: "root"
Expand Down Expand Up @@ -42,3 +46,4 @@ schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exis

schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"
enable.snapshot.ddl: "false"
thread.pool.size: 100
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.DbWriter;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable;
import com.altinity.clickhouse.sink.connector.model.BlockMetaData;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ClickHouseBatchProcessingThread extends ClickHouseBatchRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ClickHouseBatchRunnable.class);

private List<ClickHouseStruct> chStructs;


private Map<String, Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>>> topicToRecordsMap = null;


public ClickHouseBatchProcessingThread(List<ClickHouseStruct> chStructs, ClickHouseSinkConnectorConfig config) {
super(null, config, null);
this.chStructs = chStructs;
topicToRecordsMap = new HashMap<>();
}
@Override
public void run() {

if (chStructs == null || chStructs.isEmpty()) {
return;
}
ClickHouseStruct topRecord = chStructs.get(0);
if (topRecord == null) {
return;
}

String topicName = topRecord.getTopic();

if (this.chStructs != null) {
//The user parameter will override the topic mapping to table.
String tableName = getTableFromTopic(topicName);
DbWriter writer = getDbWriterForTable(topicName, tableName, topRecord);

if (writer == null || writer.wasTableMetaDataRetrieved() == false) {
log.error("*** TABLE METADATA not retrieved, retry next time");
return;
}
// Step 1: The Batch Insert with preparedStatement in JDBC
// works by forming the Query and then adding records to the Batch.
// This step creates a Map of Query -> Records(List of ClickHouseStruct)
Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>> queryToRecordsMap;

if (topicToRecordsMap.containsKey(topicName)) {
queryToRecordsMap = topicToRecordsMap.get(topicName);
} else {
queryToRecordsMap = new HashMap<>();
topicToRecordsMap.put(topicName, queryToRecordsMap);
}

try {
Map<TopicPartition, Long> partitionToOffsetMap = writer.groupQueryWithRecords(chStructs, queryToRecordsMap);
BlockMetaData bmd = new BlockMetaData();

try {
if (flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd)) {
// Remove the entry.
queryToRecordsMap.remove(topicName);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}

} catch (Exception e) {
log.error("Error processing records in Thread" + Thread.currentThread().getName(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ public static void main(String[] args) throws Exception {
System.exit(-1);
}
} else {

props = injector.getInstance(ConfigurationService.class).parse();
log.error("Error parsing configuration file, USAGE: java -jar <jar_file> <yaml_config_file>");
System.exit(-1);
}

ClickHouseDebeziumEmbeddedApplication csg = new ClickHouseDebeziumEmbeddedApplication();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.altinity.clickhouse.debezium.embedded.cdc;

import com.altinity.clickhouse.debezium.embedded.ClickHouseBatchProcessingThread;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
Expand All @@ -20,6 +21,7 @@
import io.debezium.embedded.Connect;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -29,13 +31,18 @@

import java.io.IOException;
import java.sql.SQLException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
* Setup Debezium engine with the configuration passed by the user
Expand All @@ -46,7 +53,12 @@ public class DebeziumChangeEventCapture {

private static final Logger log = LoggerFactory.getLogger(DebeziumChangeEventCapture.class);

private ClickHouseBatchExecutor executor;
// private ClickHouseBatchExecutor executor;

private ThreadPoolExecutor executor;

private final BlockingQueue<Runnable> sinkRecordsQueue = new LinkedBlockingQueue<>();


private ClickHouseBatchRunnable runnable;

Expand Down Expand Up @@ -119,16 +131,35 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
Metrics.updateDdlMetrics(DDL, currentTime, elapsedTime, ddlProcessingResult);
}

/**
* Function to process batch of sink records.
* @param props
* @param records
* @param debeziumRecordParserService
* @param config
*/
private void processBatch(Properties props, List<ChangeEvent<SourceRecord, SourceRecord>> records,
DebeziumRecordParserService debeziumRecordParserService,
ClickHouseSinkConnectorConfig config) {

List<ClickHouseStruct> chStructs = records.stream().map(record -> processEveryChangeRecord(props, record,
debeziumRecordParserService, config))
.collect(Collectors.toList());
this.executor.execute(new ClickHouseBatchProcessingThread(chStructs, config));


}

/**
* Function to process every change event record
* as received from Debezium
*
* @param record ChangeEvent Record
*/
private void processEveryChangeRecord(Properties props, ChangeEvent<SourceRecord, SourceRecord> record,
private ClickHouseStruct processEveryChangeRecord(Properties props, ChangeEvent<SourceRecord, SourceRecord> record,
DebeziumRecordParserService debeziumRecordParserService,
ClickHouseSinkConnectorConfig config) {
ClickHouseStruct chStruct = null;
try {

SourceRecord sr = record.value();
Expand All @@ -144,7 +175,7 @@ private void processEveryChangeRecord(Properties props, ChangeEvent<SourceRecord

List<Field> schemaFields = struct.schema().fields();
if (schemaFields == null) {
return;
return null;
}
Field matchingDDLField = schemaFields.stream()
.filter(f -> "DDL".equalsIgnoreCase(f.name()))
Expand All @@ -164,21 +195,26 @@ private void processEveryChangeRecord(Properties props, ChangeEvent<SourceRecord


performDDLOperation(DDL, props, sr, config);
this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()));

this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
int maxThreadPoolSize = config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString());
this.executor = new ThreadPoolExecutor(100, 100, 30,
TimeUnit.SECONDS, sinkRecordsQueue,
new ThreadPoolExecutor.CallerRunsPolicy());
// this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()));
// for(int i = 0; i < config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); i++) {
// this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
// }
}

} else {
ClickHouseStruct chStruct = debeziumRecordParserService.parse(sr);
chStruct = debeziumRecordParserService.parse(sr);

ConcurrentLinkedQueue<ClickHouseStruct> queue = new ConcurrentLinkedQueue<ClickHouseStruct>();
if (chStruct != null) {
queue.add(chStruct);
}
synchronized (this.records) {
if (chStruct != null) {
addRecordsToSharedBuffer(chStruct.getTopic(), chStruct);
//addRecordsToSharedBuffer(chStruct.getTopic(), chStruct);
}
}
}
Expand All @@ -188,6 +224,8 @@ private void processEveryChangeRecord(Properties props, ChangeEvent<SourceRecord
} catch (Exception e) {
log.error("Exception processing record", e);
}

return chStruct;
}

private boolean isSnapshotDDL(SourceRecord sr) {
Expand Down Expand Up @@ -279,10 +317,23 @@ public void setupDebeziumEventCapture(Properties props, DebeziumRecordParserServ
try {
DebeziumEngine.Builder<ChangeEvent<SourceRecord, SourceRecord>> changeEventBuilder = DebeziumEngine.create(Connect.class);
changeEventBuilder.using(props);
changeEventBuilder.notifying(record -> {
processEveryChangeRecord(props, record, debeziumRecordParserService, config);

changeEventBuilder.notifying(new DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>>() {
@Override
public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records,
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException {

//for(ChangeEvent<SourceRecord, SourceRecord> record : records) {
processBatch(props, records, debeziumRecordParserService, config);
//}
for (ChangeEvent<SourceRecord, SourceRecord> record : records) {
committer.markProcessed(record);
}
}
});
// changeEventBuilder.notifying(record -> {
// processEveryChangeRecord(props, record, debeziumRecordParserService, config);
//
// });
this.engine = changeEventBuilder
.using(new DebeziumConnectorCallback()).using(new DebeziumEngine.CompletionCallback() {
@Override
Expand Down Expand Up @@ -364,11 +415,18 @@ private DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config)
private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLParserService ddlParserService) {
// Setup separate thread to read messages from shared buffer.
this.records = new ConcurrentHashMap<>();
this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap());
this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()));
this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
// this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap());
// this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()));
// this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);

int maxThreadPoolSize = config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString());
log.info("Setting up thread pool with size:" + maxThreadPoolSize);
this.executor = new ThreadPoolExecutor(100, 100, 30,
TimeUnit.SECONDS, sinkRecordsQueue,
new ThreadPoolExecutor.CallerRunsPolicy());
}


/**
* Function to write the transformed
* records to shared queue.
Expand All @@ -377,17 +435,19 @@ private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLPars
* @param chs
*/
private void addRecordsToSharedBuffer(String topicName, ClickHouseStruct chs) {
ConcurrentLinkedQueue<ClickHouseStruct> structs;

if (this.records.containsKey(topicName)) {
structs = this.records.get(topicName);
} else {
structs = new ConcurrentLinkedQueue<>();
}
structs.add(chs);
synchronized (this.records) {
this.records.put(topicName, structs);
}

// ConcurrentLinkedQueue<ClickHouseStruct> structs;

// if (this.records.containsKey(topicName)) {
// structs = this.records.get(topicName);
// } else {
// structs = new ConcurrentLinkedQueue<>();
// }
// structs.add(chs);
// synchronized (this.records) {
// this.records.put(topicName, structs);
// }
}

// db.items.insert({_id:ObjectId(), uuid:ObjectId(), price:22, name:"New record"});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -253,8 +254,8 @@ private void updatePartitionOffsetMap(Map<TopicPartition, Long> offsetToPartitio
* @param records
* @return
*/
public Map<TopicPartition, Long> groupQueryWithRecords(ConcurrentLinkedQueue<ClickHouseStruct> records,
Map<MutablePair<String, Map<String, Integer>>,
public Map<TopicPartition, Long> groupQueryWithRecords(Collection<ClickHouseStruct> records,
Map<MutablePair<String, Map<String, Integer>>,
List<ClickHouseStruct>> queryToRecordsMap) {


Expand All @@ -274,11 +275,11 @@ public Map<TopicPartition, Long> groupQueryWithRecords(ConcurrentLinkedQueue<Cli
while (iterator.hasNext()) {
ClickHouseStruct record = (ClickHouseStruct) iterator.next();

if(record == null) {
continue;
}
updatePartitionOffsetMap(partitionToOffsetMap, record.getKafkaPartition(), record.getTopic(), record.getKafkaOffset());

// Identify the min and max offsets of the bulk
// that's inserted.
int recordPartition = record.getKafkaPartition();

boolean enableSchemaEvolution = this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_SCHEMA_EVOLUTION.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue<Click
* @param queryToRecordsMap
* @return
*/
private boolean flushRecordsToClickHouse(String topicName, DbWriter writer, Map<MutablePair<String, Map<String, Integer>>,
protected boolean flushRecordsToClickHouse(String topicName, DbWriter writer, Map<MutablePair<String, Map<String, Integer>>,
List<ClickHouseStruct>> queryToRecordsMap, BlockMetaData bmd) throws SQLException {

boolean result = false;
Expand Down