Skip to content

Commit

Permalink
Merge branch 'ClickHouse:main' into nix
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored Sep 15, 2023
2 parents c698dd4 + a93eceb commit a7d9c53
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 32 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
## 0.0.19 2023-08-23
## 1.0.1 2023-09-11
* Added support for `tableRefreshInterval` to re-fetch table changes from ClickHouse

## 1.0.0 2023-08-23
* Additional tests for ExactlyOnce
* Allows customized settings (WIP)
* Allows customized ClickHouse settings using `clickhouse.settings' property
* Tweaked deduplication behavior to account for dynamic fields
* Added support for `errors.tolerance' and the DLQ

## 0.0.18 2023-07-17
* Support inline schema with org.apache.kafka.connect.data.Timestamp type
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# ClickHouse Kafka Connect Sink
**The connector is available in beta stage for early adopters. If you notice a problem, please [file an issue.](https://github.com/ClickHouse/clickhouse-kafka-connect/issues/new)**

## About
clickhouse-kafka-connect is the official Kafka Connect sink connector for [ClickHouse](https://clickhouse.com/).
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.0.19
v1.0.1
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public class ClickHouseSinkConfig {
public static final String RETRY_COUNT = "retryCount";
public static final String EXACTLY_ONCE = "exactlyOnce";
public static final String SUPPRESS_TABLE_EXISTENCE_EXCEPTION = "suppressTableExistenceException";
public static final String CLICKHOUSE_SETTINGS = "clickhouseSettings";
public static final String ERRORS_TOLERANCE = "errors.tolerance";
public static final String TABLE_REFRESH_INTERVAL = "tableRefreshInterval";



Expand All @@ -35,6 +38,7 @@ public class ClickHouseSinkConfig {
public static final Boolean sslDefault = Boolean.TRUE;
public static final Integer timeoutSecondsDefault = 30;
public static final Integer retryCountDefault = 3;
public static final Integer tableRefreshIntervalDefault = 0;
public static final Boolean exactlyOnceDefault = Boolean.FALSE;
public enum StateStores {
NONE,
Expand All @@ -52,7 +56,9 @@ public enum StateStores {
private final boolean exactlyOnce;
private final int timeout;
private final int retry;
private final long tableRefreshInterval;
private final boolean suppressTableExistenceException;
private final boolean errorsTolerance;

private final Map<String, String> clickhouseSettings;

Expand Down Expand Up @@ -82,9 +88,13 @@ public ClickHouseSinkConfig(Map<String, String> props) {
sslEnabled = Boolean.parseBoolean(props.getOrDefault(SSL_ENABLED,"false"));
timeout = Integer.parseInt(props.getOrDefault(TIMEOUT_SECONDS, timeoutSecondsDefault.toString())) * MILLI_IN_A_SEC; // multiple in 1000 milli
retry = Integer.parseInt(props.getOrDefault(RETRY_COUNT, retryCountDefault.toString()));
tableRefreshInterval = Long.parseLong(props.getOrDefault(TABLE_REFRESH_INTERVAL, tableRefreshIntervalDefault.toString())) * MILLI_IN_A_SEC; // multiple in 1000 milli
exactlyOnce = Boolean.parseBoolean(props.getOrDefault(EXACTLY_ONCE,"false"));
suppressTableExistenceException = Boolean.parseBoolean(props.getOrDefault("suppressTableExistenceException","false"));

String errorsToleranceString = props.getOrDefault("errors.tolerance", "none").trim();
errorsTolerance = errorsToleranceString.equalsIgnoreCase("all");

Map<String, String> clickhouseSettings = new HashMap<>();
String clickhouseSettingsString = props.getOrDefault("clickhouseSettings", "").trim();

Expand Down Expand Up @@ -201,6 +211,16 @@ private static ConfigDef createConfigDef() {
++orderInGroup,
ConfigDef.Width.SHORT,
"ClickHouse driver retry");
configDef.define(TABLE_REFRESH_INTERVAL,
ConfigDef.Type.LONG,
tableRefreshIntervalDefault,
ConfigDef.Range.between(0, 60 * 10),
ConfigDef.Importance.LOW,
"table refresh interval in sec, default: 0",
group,
++orderInGroup,
ConfigDef.Width.SHORT,
"table refresh interval");
configDef.define(EXACTLY_ONCE,
ConfigDef.Type.BOOLEAN,
exactlyOnceDefault,
Expand All @@ -219,6 +239,24 @@ private static ConfigDef createConfigDef() {
++orderInGroup,
ConfigDef.Width.SHORT,
"suppress table existence exception.");
configDef.define(CLICKHOUSE_SETTINGS,
ConfigDef.Type.LIST,
"",
ConfigDef.Importance.LOW,
"A comma-separated list of ClickHouse settings to be appended to the JDBC URL",
group,
++orderInGroup,
ConfigDef.Width.LONG,
"ClickHouse settings.");
configDef.define(ERRORS_TOLERANCE,
ConfigDef.Type.STRING,
"none",
ConfigDef.Importance.LOW,
"Should we tolerate exceptions? default: none",
group,
++orderInGroup,
ConfigDef.Width.SHORT,
"Tolerate errors.");

return configDef;
}
Expand Down Expand Up @@ -250,10 +288,14 @@ public int getTimeout() {
return timeout;
}
public int getRetry() { return retry; }
public long getTableRefreshInterval() {
return tableRefreshInterval;
}
public boolean getExactlyOnce() { return exactlyOnce; }
public boolean getSuppressTableExistenceException() {
return suppressTableExistenceException;
}
public Map<String, String> getClickhouseSettings() {return clickhouseSettings;}
public boolean getErrorsTolerance() { return errorsTolerance; }

}
21 changes: 16 additions & 5 deletions src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.db.ClickHouseWriter;
import com.clickhouse.kafka.connect.sink.db.DBWriter;
import com.clickhouse.kafka.connect.sink.db.TableMappingRefresher;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.sink.processing.Processing;
import com.clickhouse.kafka.connect.sink.state.StateProvider;
import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState;
import com.clickhouse.kafka.connect.sink.state.provider.KeeperStateProvider;
import com.clickhouse.kafka.connect.util.jmx.MBeanServerUtils;
import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
import com.clickhouse.kafka.connect.util.jmx.Timer;
import com.clickhouse.kafka.connect.util.jmx.ExecutionTimer;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.Timer;

public class ProxySinkTask {

Expand All @@ -34,13 +36,22 @@ public class ProxySinkTask {
private int id = NEXT_ID.getAndAdd(1);

public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final ErrorReporter errorReporter) {
LOGGER.info(String.format("enable ExactlyOnce %s", Boolean.toString(clickHouseSinkConfig.getExactlyOnce())));
LOGGER.info("Enable ExactlyOnce? {}", clickHouseSinkConfig.getExactlyOnce());
if ( clickHouseSinkConfig.getExactlyOnce() ) {
this.stateProvider = new KeeperStateProvider(clickHouseSinkConfig);
} else {
this.stateProvider = new InMemoryState();
}
this.dbWriter = new ClickHouseWriter();

ClickHouseWriter chWriter = new ClickHouseWriter();
this.dbWriter = chWriter;

// Add table mapping refresher
if (clickHouseSinkConfig.getTableRefreshInterval() > 0) {
TableMappingRefresher tableMappingRefresher = new TableMappingRefresher(chWriter);
Timer tableRefreshTimer = new Timer();
tableRefreshTimer.schedule(tableMappingRefresher, clickHouseSinkConfig.getTableRefreshInterval(), clickHouseSinkConfig.getTableRefreshInterval());
}

// Add dead letter queue
boolean isStarted = dbWriter.start(clickHouseSinkConfig);
Expand All @@ -65,10 +76,10 @@ public void put(final Collection<SinkRecord> records) {
return;
}
// Group by topic & partition
Timer taskTime = Timer.start();
ExecutionTimer taskTime = ExecutionTimer.start();
statistics.receivedRecords(records.size());
LOGGER.trace(String.format("Got %d records from put API.", records.size()));
Timer processingTime = Timer.start();
ExecutionTimer processingTime = ExecutionTimer.start();
Map<String, List<Record>> dataRecords = records.stream()
.map(v -> Record.convert(v))
.collect(Collectors.groupingBy(Record::getTopicAndPartition));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.sink.db.mapping.Type;
import com.clickhouse.kafka.connect.sink.dlq.DuplicateException;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.util.Mask;

import com.clickhouse.kafka.connect.util.Utils;
Expand All @@ -37,7 +39,7 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class ClickHouseWriter implements DBWriter{
public class ClickHouseWriter implements DBWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseWriter.class);

Expand All @@ -49,7 +51,7 @@ public class ClickHouseWriter implements DBWriter{
private boolean isBinary = false;

public ClickHouseWriter() {
this.mapping = new HashMap<>();
this.mapping = new HashMap<String, Table>();
}

@Override
Expand All @@ -72,17 +74,34 @@ public boolean start(ClickHouseSinkConfig csc) {

LOGGER.debug("Ping was successful.");

List<Table> tableList = chc.extractTablesMapping();
if (tableList.isEmpty()) {
this.updateMapping();
if (mapping.isEmpty()) {
LOGGER.error("Did not find any tables in destination Please create before running.");
return false;
}

for (Table table: tableList) {//TODO: Should we pull ALL tables in memory?
this.mapping.put(table.getName(), table);
}
return true;
}
public void updateMapping() {
LOGGER.debug("Update table mapping.");

// Getting tables from ClickHouse
List<Table> tableList = this.chc.extractTablesMapping();
if (tableList.isEmpty()) {
return;
}

HashMap<String, Table> mapping = new HashMap<String, Table>();

// Adding new tables to mapping
// TODO: check Kafka Connect's topics name or topics regex config and
// only add tables to in-memory mapping that matches the topics we consume.
for (Table table: tableList) {
mapping.put(table.getName(), table);
}

this.mapping = mapping;
}

@Override
public void stop() {
Expand Down Expand Up @@ -119,8 +138,11 @@ public ClickHouseNode getServer() {
return chc.getServer();
}

@Override
public void doInsert(List<Record> records) {
doInsert(records, null);
}
@Override
public void doInsert(List<Record> records, ErrorReporter errorReporter) {
if ( records.isEmpty() )
return;

Expand Down Expand Up @@ -148,7 +170,12 @@ public void doInsert(List<Record> records) {
}
} catch (Exception e) {
LOGGER.trace("Passing the exception to the exception handler.");
Utils.handleException(e);
Utils.handleException(e, csc.getErrorsTolerance());
if (csc.getErrorsTolerance() && errorReporter != null) {
records.forEach( r ->
Utils.sendTODlq(errorReporter, r, e)
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;

import java.util.List;
import java.util.Map;
Expand All @@ -11,5 +12,6 @@ public interface DBWriter {
public boolean start(ClickHouseSinkConfig csc);
public void stop();
public void doInsert(List<Record> records);
public void doInsert(List<Record> records, ErrorReporter errorReporter);
public long recordsInserted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -33,6 +34,11 @@ public void doInsert(List<Record> records) {
records.stream().forEach( r -> this.recordMap.put(r.getRecordOffsetContainer().getOffset(), r) );
}

@Override
public void doInsert(List<Record> records, ErrorReporter errorReporter) {
doInsert(records);
}

@Override
public long recordsInserted() {
return this.recordMap.size();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.clickhouse.kafka.connect.sink.db;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.TimerTask;

public class TableMappingRefresher extends TimerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(TableMappingRefresher.class);
private ClickHouseWriter chWriter = null;

public TableMappingRefresher(final ClickHouseWriter chWriter) {
this.chWriter = chWriter;
}

@Override
public void run() {
try {
chWriter.updateMapping();
} catch (Exception e) {
LOGGER.error("Update mapping Error: {}", e.getMessage());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.clickhouse.kafka.connect.sink.state.State;
import com.clickhouse.kafka.connect.sink.state.StateProvider;
import com.clickhouse.kafka.connect.sink.state.StateRecord;
import com.clickhouse.kafka.connect.util.Utils;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -42,10 +43,12 @@ public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter
* @param records
*/
private void doInsert(List<Record> records) {
dbWriter.doInsert(records);
dbWriter.doInsert(records, errorReporter);
}




private RangeContainer extractRange(List<Record> records, String topic, int partition) {
RangeContainer rangeContainer = new RangeContainer(topic, partition);
records.stream().forEach(record -> rangeContainer.defineInRange(record.getRecordOffsetContainer().getOffset()));
Expand All @@ -72,11 +75,7 @@ private List<List<Record>> splitRecordsByOffset(List<Record> records, long offse
.values()
);
}
private void sendTODlq(Record record, Exception exception) {
if (errorReporter != null && record.getSinkRecord() != null) {
errorReporter.report(record.getSinkRecord(), exception);
}
}


public void doLogic(List<Record> records) {
List<Record> trimmedRecords;
Expand Down Expand Up @@ -124,7 +123,7 @@ public void doLogic(List<Record> records) {
LOGGER.warn(String.format("Records seemingly missing compared to prior batch for topic [%s] partition [%s].", topic, partition));
// Do nothing - write to dead letter queue
records.forEach( r ->
sendTODlq(record, new DuplicateException(String.format(record.getTopicAndPartition())))
Utils.sendTODlq(errorReporter, r, new DuplicateException(String.format(record.getTopicAndPartition())))
);
break;
case OVER_LAPPING:
Expand Down
Loading

0 comments on commit a7d9c53

Please sign in to comment.