From 5b78157b1d419ca60bbf3a61f0d5c68f9a4b79d4 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 2 Feb 2025 19:08:52 -0500 Subject: [PATCH 1/3] Added integration test to cover MySQL demo. --- .../debezium/embedded/ddl/parser/AlterTableAddColumnIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java index 66d77a177..a1f5ed89d 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java @@ -1,10 +1,8 @@ package com.altinity.clickhouse.debezium.embedded.ddl.parser; import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; -import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; -import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; import com.clickhouse.jdbc.ClickHouseConnection; import org.apache.log4j.BasicConfigurator; From 9e8ca0bbabcdc0386263b29d4fcb654d6f3b9b24 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 2 Feb 2025 19:12:25 -0500 Subject: [PATCH 2/3] Added integration test to cover MySQL demo. --- .../embedded/ddl/parser/MySQLDemoIT.java | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDemoIT.java diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDemoIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDemoIT.java new file mode 100644 index 000000000..448bd7ef1 --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDemoIT.java @@ -0,0 +1,124 @@ +package com.altinity.clickhouse.debezium.embedded.ddl.parser; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.log4j.BasicConfigurator; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.DockerImageName; + +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.clickhouse.jdbc.ClickHouseConnection; + +public class MySQLDemoIT extends DDLBaseIT { + + + @BeforeEach + public void startContainers() throws InterruptedException { + mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("employees").withUsername("root").withPassword("adminpass") + .withInitScript("employees.sql") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(15000); + } + + @Test + public void testAddColumn() throws Exception { + + AtomicReference engine = new AtomicReference<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + + Properties properties = getDebeziumProperties(); + + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(properties, new SourceRecordParserService(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(10000);// + + Connection conn = connectToMySQL(); + + conn.prepareStatement("update employees set jobTitle = concat('Senior ', jobTitle);").execute(); + Thread.sleep(10000); + + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "employees"); + ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + + ResultSet rs = conn.prepareStatement("select jobTitle from employees;").executeQuery(); + while(rs.next()) { + System.out.println(rs.getString(1)); + } + conn.prepareStatement("select * from employees;").execute(); + + + + // deletion example, 3 tables + + conn.prepareStatement("select count(*) from payments;").execute(); + conn.prepareStatement("select count(*) from customers;").execute(); + conn.prepareStatement("select count(*) from employees;").execute(); + + + conn.prepareStatement("delete from payments where customerNumber in (select customerNumber from customers where salesRepEmployeeNumber=1621);").execute(); + conn.prepareStatement("delete from customers where salesRepEmployeeNumber=1621;").execute(); + conn.prepareStatement("delete from employees where lastName='Kato';").execute(); + + conn.prepareStatement("select count(*) from payments;").execute(); + conn.prepareStatement("select count(*) from customers;").execute(); + conn.prepareStatement("select count(*) from employees;").execute(); + + // DDL + conn.prepareStatement("alter table employees add column hireDate Date not null default (curdate());").execute(); + conn.prepareStatement("alter table employees drop column hireDate;").execute(); + + // inserts + conn.prepareStatement("insert into employees(employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle) select 2000,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle from employees where lastName='Gerard';").execute(); + + + // alter table employees drop CONSTRAINT `employees_ibfk_2`; + conn.prepareStatement("alter table employees modify column officeCode int not null;").execute(); + conn.prepareStatement("show create table employees;").execute(); + + Thread.sleep(25000); + + if(engine.get() != null) { + engine.get().stop(); + } + // Files.deleteIfExists(tmpFilePath); + executorService.shutdown(); + + + + } + +} From 4b6d0f9c00e2d1f108427e496a33d00f64370bb6 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 3 Feb 2025 11:09:17 -0500 Subject: [PATCH 3/3] Added integration test to cover MySQL demo. --- .../clickhouse/debezium/embedded/ddl/parser/MySQLDemoIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDemoIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDemoIT.java index 448bd7ef1..fccffca8b 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDemoIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDemoIT.java @@ -42,7 +42,7 @@ public void startContainers() throws InterruptedException { } @Test - public void testAddColumn() throws Exception { + public void testMySQLDemo() throws Exception { AtomicReference engine = new AtomicReference<>(); @@ -59,7 +59,7 @@ public void testAddColumn() throws Exception { } }); - Thread.sleep(10000);// + Thread.sleep(20000);// Connection conn = connectToMySQL();