diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java index 2ca580a54..85adc7bcc 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java @@ -67,10 +67,13 @@ public void start(Map config) { this.id = "task-" + this.config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString()); this.records = new ConcurrentHashMap<>(); - ClickHouseBatchRunnable runnable = new ClickHouseBatchRunnable(this.records, this.config, topic2TableMap); - this.executor = new ClickHouseBatchExecutor(this.config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString())); - this.executor.scheduleAtFixedRate(runnable, 0, this.config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS); - + long runIntervalMs = this.config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()); + int threadPoolSize = this.config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); + this.executor = new ClickHouseBatchExecutor(threadPoolSize); + for (int i = 0; i < threadPoolSize; i++) { + ClickHouseBatchRunnable runnable = new ClickHouseBatchRunnable(this.records, this.config, topic2TableMap); + this.executor.scheduleAtFixedRate(runnable, 0, runIntervalMs, TimeUnit.MILLISECONDS); + } this.deduplicator = new DeDuplicator(this.config); }