diff --git a/sink-connector-lightweight/dependency-reduced-pom.xml b/sink-connector-lightweight/dependency-reduced-pom.xml index f24a5763b..e3f79deb7 100644 --- a/sink-connector-lightweight/dependency-reduced-pom.xml +++ b/sink-connector-lightweight/dependency-reduced-pom.xml @@ -116,10 +116,22 @@ + + io.javalin + javalin-testtools + 5.5.0 + test + + + okhttp + com.squareup.okhttp3 + + + io.debezium debezium-connector-mongodb - 2.7.0.Beta2 + 3.0.0.Final test @@ -128,6 +140,46 @@ + + io.debezium + debezium-testing-testcontainers + 3.0.0.Final + test + + + kafka + org.testcontainers + + + awaitility + org.awaitility + + + mssqlserver + org.testcontainers + + + oracle-xe + org.testcontainers + + + junit-platform-launcher + org.junit.platform + + + junit-jupiter + org.junit.jupiter + + + quarkus-test-common + io.quarkus + + + okhttp + com.squareup.okhttp3 + + + org.mongodb mongo-java-driver @@ -261,14 +313,14 @@ junit-jupiter-params org.junit.jupiter - - junit-platform-launcher - org.junit.platform - junit-jupiter-api org.junit.jupiter + + junit-platform-launcher + org.junit.platform + @@ -308,12 +360,13 @@ 3.0.0-M7 UTF-8 0.0.9 + 5.5.0 5.9.1 17 UTF-8 3.1.1 17 - 2.7.0.Beta2 + 3.0.0.Final io.quarkus.platform diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index 24039460d..3c727d7c2 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -142,6 +142,13 @@ ${version.debezium} test + + + io.debezium + debezium-testing-testcontainers + ${version.debezium} + test + diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedMongoIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedMongoIT.java index 1279ff487..1e264329a 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedMongoIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedMongoIT.java @@ -1,168 +1,138 @@ -//package com.altinity.clickhouse.debezium.embedded; -// -//import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; -//import com.altinity.clickhouse.debezium.embedded.config.EnvironmentConfigurationService; -//import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; -//import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; -//import com.mongodb.MongoException; -//import com.mongodb.client.MongoClient; -//import com.mongodb.client.MongoClients; -//import com.mongodb.client.MongoCollection; -//import com.mongodb.client.MongoDatabase; -//import com.mongodb.client.result.InsertOneResult; -//import org.bson.Document; -//import org.bson.types.ObjectId; -//import org.junit.jupiter.api.Test; -//import org.testcontainers.containers.ClickHouseContainer; -//import org.testcontainers.containers.MongoDBContainer; -//import org.testcontainers.containers.wait.strategy.Wait; -//import org.testcontainers.junit.jupiter.Container; -//import org.testcontainers.junit.jupiter.Testcontainers; -//import org.testcontainers.utility.MountableFile; -// -//import java.nio.file.Files; -//import java.nio.file.Path; -//import java.util.Properties; -//import java.util.concurrent.ExecutorService; -//import java.util.concurrent.Executors; -// -//@Testcontainers -//public class ClickHouseDebeziumEmbeddedMongoIT { -// +package com.altinity.clickhouse.debezium.embedded; + +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.config.EnvironmentConfigurationService; +import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.mongodb.MongoException; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.result.InsertOneResult; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.junit.ClassRule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.MountableFile; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +@Testcontainers +public class ClickHouseDebeziumEmbeddedMongoIT { + + @Container + public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest") + .withInitScript("init_clickhouse.sql") + .withExposedPorts(8123); + + //https://github.com/testcontainers/testcontainers-java/issues/3066 // @Container -// public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest") -// .withInitScript("init_clickhouse.sql") -// .withExposedPorts(8123); -// -// //https://github.com/testcontainers/testcontainers-java/issues/3066 -// @Container -// public static MongoDBContainer mongoContainer = new MongoDBContainer("mongo:latest") -//// .withEnv("MONGO_INITDB_ROOT_USERNAME", "project") -//// .withEnv("MONGO_INITDB_ROOT_PASSWORD", "project") +// public static GenericContainer mongoContainer = new GenericContainer("mongo:latest") // .withEnv("MONGO_INITDB_DATABASE", "project") +// .withEnv("MONGO_REPLICA_SET_NAME", "docker-rs") // .withCopyFileToContainer(MountableFile.forClasspathResource("mongo-init.js"), // "/docker-entrypoint-initdb.d/mongo-init.js") +// .withExposedPorts(27017) +// .withCommand("--replSet docker-rs") +// .withNetworkAliases("mongo") +// .withCommand("--replSet docker-rs").withCommand("--bind_ip_all").withCommand("--port 27017") // .waitingFor(Wait.forLogMessage("(?i).*Waiting for connections*.*", 1)); -// // .waitingFor(Wait.forLogMessage("(?i).*waiting for connections.*", 2)) -// // .withStartupTimeout(Duration.ofSeconds(10)); -// -//// .withInitScript("init_postgres.sql") -//// .withDatabaseName("public") -//// .withUsername("root") -//// .withPassword("adminpass") -//// .withExposedPorts(5432) -//// .withCommand("postgres -c wal_level=logical"); -// -// @Test -// //@Disabled -// public void testDataTypesDB() throws Exception { -// -// -// // Start the debezium embedded application. -// -// Properties defaultProps = (new EnvironmentConfigurationService()).parse(); -// System.out.println("MYSQL HOST" + mongoContainer.getHost()); -// System.out.println("Connection string" + mongoContainer.getConnectionString()); -// defaultProps.setProperty("mongodb.connection.string", mongoContainer.getConnectionString()); -// defaultProps.setProperty("mongodb.members.auto.discover", "true"); -// defaultProps.setProperty("topic.prefix", "mongo-ch"); -// defaultProps.setProperty("collection.include.list", "project.items"); -// defaultProps.setProperty("snapshot.include.collection.list", "project.items"); -// defaultProps.setProperty("database.include.list", "project"); -// defaultProps.setProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter"); -// -// defaultProps.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter"); -// defaultProps.setProperty("value.converter.schemas.enable", "true"); -// -// //defaultProps.setProperty("mongodb.hosts", mongoContainer.getHost() + ":" + mongoContainer.getFirstMappedPort()); -// // defaultProps.setProperty("topic.prefix", mongoContainer.getC()); -// //System.out.println("JDBC URL" + mySqlContainer.getJdbcUrl()); -//// defaultProps.setProperty("database.hostname", mongoContainer.getHost()); -//// defaultProps.setProperty("database.port", String.valueOf(mongoContainer.getFirstMappedPort())); -// defaultProps.setProperty("database.dbname", "project"); -// defaultProps.setProperty("database.user", "project"); -// defaultProps.setProperty("database.password", "project"); -// -// // defaultProps.setProperty("database.include.list", "public"); -// defaultProps.setProperty("snapshot.mode", "initial"); -// defaultProps.setProperty("connector.class", "io.debezium.connector.mongodb.MongoDbConnector"); -// //defaultProps.setProperty("plugin.name", "pgoutput"); -// //defaultProps.setProperty("table.include.list", "public.tm"); -// -// -// defaultProps.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); -// -// //String tempOffsetPath = "/tmp/2/offsets" + System.currentTimeMillis() + ".dat"; -// Path tmpFilePath = Files.createTempFile("offsets", ".dat"); -// -// if (tmpFilePath != null) { -// System.out.println("TEMP FILE PATH" + tmpFilePath); -// } -// -// Files.deleteIfExists(tmpFilePath); -// defaultProps.setProperty("offset.storage.file.filename", tmpFilePath.toString()); -// defaultProps.setProperty("offset.flush.interval.ms", "60000"); -// defaultProps.setProperty("auto.create.tables", "true"); -// defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost()); -// defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort())); -// defaultProps.setProperty("clickhouse.server.user", "default"); -// defaultProps.setProperty("clickhouse.server.password", ""); -// defaultProps.setProperty("clickhouse.server.database", "project"); -// defaultProps.setProperty("replacingmergetree.delete.column", "_sign"); -// defaultProps.setProperty("metrics.port", "8087"); -// defaultProps.setProperty("database.allowPublicKeyRetrieval", "true"); -// -// ExecutorService executorService = Executors.newFixedThreadPool(1); -// executorService.execute(() -> { -// try { -// new DebeziumChangeEventCapture().setup(defaultProps, new SourceRecordParserService(), -// new MySQLDDLParserService()); -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// }); -// Thread.sleep(15000); -// -// insertNewDocument(); -// Thread.sleep(60000); -// -//// BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), -//// "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null); -//// Map tmColumns = writer.getColumnsDataTypesForTable("tm"); -//// Assert.assertTrue(tmColumns.size() == 22); -//// Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID")); -//// Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)")); -//// //Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))")); -//// Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))")); -//// -//// -//// int tmCount = 0; -//// ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery(); -//// while(chRs.next()) { -//// tmCount = chRs.getInt(1); -//// } -// -// // Assert.assertTrue(tmCount == 2); -// -// executorService.shutdown(); -// Files.deleteIfExists(tmpFilePath); -// -// -// } -// -// private void insertNewDocument() { -// try (MongoClient mongoClient = MongoClients.create(mongoContainer.getConnectionString())) { -// MongoDatabase database = mongoClient.getDatabase("project"); -// MongoCollection collection = database.getCollection("items"); -// try { -// InsertOneResult result = collection.insertOne(new Document() -// .append("uuid", new ObjectId()) -// .append("price", 44) -// .append("name", "Record one")); -// System.out.println("Success! Inserted document id: " + result.getInsertedId()); -// } catch (MongoException me) { -// System.err.println("Unable to insert due to an error: " + me); -// } + + + @ClassRule + public static DockerComposeContainer environment = + new DockerComposeContainer(new File("src/test/resources/docker-compose-mongodb.yml")) + .withExposedService("mongo", 27017); + + @BeforeEach + + public void startContainers() throws InterruptedException { + environment.start(); + } + @Test + //@Disabled + public void testDataTypesDB() throws Exception { + AtomicReference engine = new AtomicReference<>(); + + Properties defaultProps = ITCommon.getDebeziumProperties(clickHouseContainer); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(defaultProps, new SourceRecordParserService(), + new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Thread.sleep(15000); + + insertNewDocument(); + Thread.sleep(60000); + +// BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), +// "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null); +// Map tmColumns = writer.getColumnsDataTypesForTable("tm"); +// Assert.assertTrue(tmColumns.size() == 22); +// Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID")); +// Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)")); +// //Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))")); +// Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))")); +// +// +// int tmCount = 0; +// ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery(); +// while(chRs.next()) { +// tmCount = chRs.getInt(1); // } -// } -//} + + // Assert.assertTrue(tmCount == 2); + + if(engine.get() != null) { + engine.get().stop(); + } + // Files.deleteIfExists(tmpFilePath); + executorService.shutdown(); + + //writer.getConnection().close(); + + + } + + private void insertNewDocument() { + String mongoConnectionString = String.format("mongodb://%s:%s", "mongo", "27017"); + + try (MongoClient mongoClient = MongoClients.create(mongoConnectionString)) { + MongoDatabase database = mongoClient.getDatabase("project"); + MongoCollection collection = database.getCollection("items"); + try { + InsertOneResult result = collection.insertOne(new Document() + .append("uuid", new ObjectId()) + .append("price", 44) + .append("name", "Record one")); + System.out.println("Success! Inserted document id: " + result.getInsertedId()); + } catch (MongoException me) { + System.err.println("Unable to insert due to an error: " + me); + } + } + } +} diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java index c24ac4241..e358d2ab2 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java @@ -3,8 +3,7 @@ import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader; import org.testcontainers.clickhouse.ClickHouseContainer; -import org.testcontainers.containers.MySQLContainer; -import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.*; import java.sql.Connection; import java.sql.DriverManager; @@ -96,6 +95,58 @@ static public Properties getDebeziumProperties(String mySQLHost, String mySQLPor } + static public Properties getDebeziumProperties( ClickHouseContainer clickHouseContainer) throws Exception { + + Properties defaultProps = new Properties(); + Properties defaultProperties = PropertiesHelper.getProperties("config.properties"); + + defaultProps.putAll(defaultProperties); + Properties fileProps = new ConfigLoader().load("config.yml"); + defaultProps.putAll(fileProps); + + + defaultProps.setProperty("connector.class", "io.debezium.connector.mongodb.MongoDbConnector"); + + // Construct mongodb connection string + String mongoConnectionString = String.format("mongodb://%s:%s", "mongo", + "27017"); + + defaultProps.setProperty("mongodb.connection.string", mongoConnectionString +"/?replicaSet=rs0"); + //defaultProps.setProperty("mongodb.connection.string", mongoConnectionString ); + + //defaultProps.setProperty("mongodb.connection.string", mongoConnectionString + "/?replicaSet=docker-rs"); + + defaultProps.setProperty("capture.scope", "database"); + defaultProps.setProperty("mongodb.members.auto.discover", "true"); + defaultProps.setProperty("topic.prefix", "mongo-ch"); + defaultProps.setProperty("collection.include.list", "project.items"); + defaultProps.setProperty("snapshot.include.collection.list", "project.items"); + defaultProps.setProperty("database.include.list", "project"); + defaultProps.setProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + + defaultProps.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter"); + defaultProps.setProperty("value.converter.schemas.enable", "true"); + + defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost()); + defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort())); + defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername()); + defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword()); + + defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s", + clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort())); + + defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s", + clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort())); + + defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s", + clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort())); + + defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s", + clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort())); + + + return defaultProps; + } static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, ClickHouseContainer clickHouseContainer) throws Exception { // Start the debezium embedded application. diff --git a/sink-connector-lightweight/src/test/resources/docker-compose-mongodb.yml b/sink-connector-lightweight/src/test/resources/docker-compose-mongodb.yml new file mode 100644 index 000000000..9759ffd8e --- /dev/null +++ b/sink-connector-lightweight/src/test/resources/docker-compose-mongodb.yml @@ -0,0 +1,21 @@ +version: "3.8" + +services: + mongo: + image: mongo:7.0 + command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"] + ports: + - 27017:27017 + healthcheck: + test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'mongo:27017'}]}) }" | mongosh --port 27017 --quiet + interval: 5s + timeout: 30s + start_period: 0s + retries: 30 + volumes: + - "mongo1_data:/data/db" + - "mongo1_config:/data/configdb" + +volumes: + mongo1_data: + mongo1_config: diff --git a/sink-connector-lightweight/src/test/resources/mongo-init.js b/sink-connector-lightweight/src/test/resources/mongo-init.js index 13c59e4e0..10f44b56d 100644 --- a/sink-connector-lightweight/src/test/resources/mongo-init.js +++ b/sink-connector-lightweight/src/test/resources/mongo-init.js @@ -1,3 +1,10 @@ +# initiate replica set +rs.initiate({ + _id: "docker-rs", + members: [ + { _id: 0, host: "mongo:27017" } + ] +}); db.createUser({ user: 'project', pwd: 'project',