diff --git a/sink-connector-lightweight/docker/config_sql_server.yml b/sink-connector-lightweight/docker/config_sql_server.yml new file mode 100644 index 000000000..17d15db7f --- /dev/null +++ b/sink-connector-lightweight/docker/config_sql_server.yml @@ -0,0 +1,48 @@ +name: "debezium-embedded-sql-server" +database.hostname: "sql-server-db" +database.port: "1433" +database.user: "sa" +database.password: "Root1234$$" +database.names: "Prime" +table.include.list: "MyTable" +clickhouse.server.url: "clickhouse" +clickhouse.server.user: "root" +clickhouse.server.pass: "root" +clickhouse.server.port: "8123" +clickhouse.server.database: "test" +database.allowPublicKeyRetrieval: "true" +snapshot.mode: "initial" +offset.flush.interval.ms: 5000 +connector.class: "io.debezium.connector.sqlserver.SqlServerConnector" +offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore" +offset.storage.offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info" +offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector" +offset.storage.jdbc.user: "root" +offset.storage.jdbc.password: "root" +offset.storage.offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s +( + `id` String, + `offset_key` String, + `offset_val` String, + `record_insert_ts` DateTime, + `record_insert_seq` UInt64, + `_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9)) +) +ENGINE = ReplacingMergeTree(_version) +ORDER BY id +SETTINGS index_granularity = 8198" +offset.storage.offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1" +schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory" +schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector" +schema.history.internal.jdbc.user: "root" +schema.history.internal.jdbc.password: "root" +schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s +(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id" + +schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history" +enable.snapshot.ddl: "true" +auto.create.tables: "true" +database.dbname: "public" +database.ssl.truststore: "${project.basedir}/src/test/resources/ssl" +database.ssl.truststore.password: "debezium" +database.trustServerCertificate: "true" \ No newline at end of file diff --git a/sink-connector-lightweight/docker/data_sql_server/MyTable.csv b/sink-connector-lightweight/docker/data_sql_server/MyTable.csv new file mode 100644 index 000000000..4b02e279f --- /dev/null +++ b/sink-connector-lightweight/docker/data_sql_server/MyTable.csv @@ -0,0 +1,3 @@ +id,value +1,yes +2,it works:) \ No newline at end of file diff --git a/sink-connector-lightweight/docker/data_sql_server/MyTable.sql b/sink-connector-lightweight/docker/data_sql_server/MyTable.sql new file mode 100644 index 000000000..bb0cfa830 --- /dev/null +++ b/sink-connector-lightweight/docker/data_sql_server/MyTable.sql @@ -0,0 +1,8 @@ +USE Prime; +GO + +CREATE TABLE MyTable( + Id nvarchar(max), + Value nvarchar(max) +); +GO \ No newline at end of file diff --git a/sink-connector-lightweight/docker/data_sql_server/entrypoint.sh b/sink-connector-lightweight/docker/data_sql_server/entrypoint.sh new file mode 100644 index 000000000..12e1155e6 --- /dev/null +++ b/sink-connector-lightweight/docker/data_sql_server/entrypoint.sh @@ -0,0 +1,28 @@ +#!/bin/bash +database=Prime +wait_time=15s +password=Root1234$$ + +# wait for SQL Server to come up +echo importing data will start in $wait_time... +sleep $wait_time +echo importing data... + +# run the init script to create the DB and the tables in /table +/opt/mssql-tools/bin/sqlcmd -S 0.0.0.0 -U sa -P $password -i ./init.sql + +for entry in "*.sql" +do + echo executing $entry + /opt/mssql-tools/bin/sqlcmd -S 0.0.0.0 -U sa -P $password -i $entry +done + +#import the data from the csv files +for entry in "*.csv" +do + # i.e: transform /data/MyTable.csv to MyTable + shortname=$(echo $entry | cut -f 1 -d '.' | cut -f 2 -d '/') + tableName=$database.dbo.$shortname + echo importing $tableName from $entry + /opt/mssql-tools/bin/bcp $tableName in $entry -c -t',' -F 2 -S 0.0.0.0 -U sa -P $password +done \ No newline at end of file diff --git a/sink-connector-lightweight/docker/data_sql_server/init.sql b/sink-connector-lightweight/docker/data_sql_server/init.sql new file mode 100644 index 000000000..0d0e7fdc0 --- /dev/null +++ b/sink-connector-lightweight/docker/data_sql_server/init.sql @@ -0,0 +1,4 @@ +DROP DATABASE Prime + +CREATE DATABASE Prime; +GO \ No newline at end of file diff --git a/sink-connector-lightweight/docker/docker-compose-sql-server.yaml b/sink-connector-lightweight/docker/docker-compose-sql-server.yaml new file mode 100644 index 000000000..115c51a8d --- /dev/null +++ b/sink-connector-lightweight/docker/docker-compose-sql-server.yaml @@ -0,0 +1,68 @@ +version: "3.4" + +# Ubuntu , set this for redpanda to start +# https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm + +# Clickhouse Table Schema +# create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id; + +services: + + sql-server-db: + container_name: sql-server-db + image: mcr.microsoft.com/mssql/server:2022-latest + #command: sh -c ' chmod +x /tmp/entrypoint.sh; /tmp/./entrypoint.sh & /opt/mssql/bin/sqlservr;' + ports: + - "1433:1433" + environment: + MSSQL_SA_PASSWORD: "Root1234$$" + ACCEPT_EULA: "Y" + volumes: + - ./data_sqlserver:/tmp + + clickhouse: + extends: + file: clickhouse-service.yml + service: clickhouse + depends_on: + zookeeper: + condition: service_healthy + + zookeeper: + extends: + file: zookeeper-service.yml + service: zookeeper + + clickhouse-sink-connector-lt: + extends: + file: clickhouse-sink-connector-lt-service.yml + service: clickhouse-sink-connector-lt + depends_on: + - clickhouse + extra_hosts: + - "host.docker.internal:host-gateway" + environment: + JAVA_OPTS: > + -Xmx5G + -Xms128m + volumes: + - ./config_sql_server.yml:/config.yml + + ### MONITORING #### + prometheus: + extends: + file: prometheus-service.yml + service: prometheus + + + grafana: + extends: + file: grafana-service.yml + service: grafana + volumes: + - ./config/grafana/config/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml + - ./config/grafana/config/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml + - ./config/grafana/config/altinity_sink_connector.json:/var/lib/grafana/dashboards/altinity_sink_connector.json + depends_on: + - prometheus + ## END OF MONITORING ### \ No newline at end of file diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index 250bbd953..af7eb7574 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -113,6 +113,11 @@ debezium-connector-postgres ${version.debezium} + + io.debezium + debezium-connector-sqlserver + ${version.debezium} + org.postgresql postgresql @@ -274,6 +279,12 @@ ${version.testcontainers} test + + org.testcontainers + mssqlserver + ${version.testcontainers} + test + @@ -539,6 +550,9 @@ *:* org/apache/log4j/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java new file mode 100644 index 000000000..76d2e7fc4 --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java @@ -0,0 +1,127 @@ +package com.altinity.clickhouse.debezium.embedded; + + +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.clickhouse.jdbc.ClickHouseConnection; +import org.junit.Assert; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static com.altinity.clickhouse.debezium.embedded.PostgresProperties.getDefaultProperties; + +@Testcontainers +public class SQLServerIT { + + @Container + public static org.testcontainers.clickhouse.ClickHouseContainer clickHouseContainer = + new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_it.sql") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + + @Container + public static MSSQLServerContainer sqlServerContainer = new MSSQLServerContainer<> + (DockerImageName.parse("mcr.microsoft.com/mssql/server:2019-latest") + .asCompatibleSubstituteFor("sqlserver")) + .withInitScript("init_sqlserver.sql") + .withUsername("sa") + .withPassword("Password!") + .withDatabaseName("employees") + .withExposedPorts(1433); + + + public Properties getProperties() throws Exception { + + Properties properties = new Properties(); +// Properties properties = getDefaultProperties(postgreSQLContainer, clickHouseContainer); +// properties.put("plugin.name", "decoderbufs"); +// properties.put("plugin.path", "/"); +// properties.put("table.include.list", "public.tm"); +// properties.put("slot.max.retries", "6"); +// properties.put("slot.retry.delay.ms", "5000"); +// properties.put("database.allowPublicKeyRetrieval", "true"); +// properties.put("table.include.list", "public.tm,public.tm2"); + + return properties; + } + + @Test + @DisplayName("Integration Test - Validates PostgreSQL replication when the plugin is set to DecoderBufs") + public void testDecoderBufsPlugin() throws Exception { + Network network = Network.newNetwork(); + + sqlServerContainer.withNetwork(network).start(); + clickHouseContainer.withNetwork(network).start(); + Thread.sleep(10000); + + org.testcontainers.Testcontainers.exposeHostPorts(sqlServerContainer.getFirstMappedPort()); + AtomicReference engine = new AtomicReference<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(getProperties(), new SourceRecordParserService(), + new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), + "employees"), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(10000);// + Thread.sleep(50000); + + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "public"); + ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + Map tmColumns = writer.getColumnsDataTypesForTable("tm"); + Assert.assertTrue(tmColumns.size() == 22); + Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID")); + Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)")); + //Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))")); + Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))")); + + + int tmCount = 0; + ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery(); + while(chRs.next()) { + tmCount = chRs.getInt(1); + } + + Assert.assertTrue(tmCount == 2); + + if(engine.get() != null) { + engine.get().stop(); + } + // Files.deleteIfExists(tmpFilePath); + executorService.shutdown(); + + } +} diff --git a/sink-connector-lightweight/src/test/resources/ssl/README.md b/sink-connector-lightweight/src/test/resources/ssl/README.md new file mode 100644 index 000000000..4ba59537b --- /dev/null +++ b/sink-connector-lightweight/src/test/resources/ssl/README.md @@ -0,0 +1,57 @@ +This directory contains the truststore (used for validating DB server certificate). + +The files are generated based on the certificates in src/test/docker, which are generated following the +guidelines outlined in https://docs.microsoft.com/en-us/sql/linux/sql-server-linux-docker-container-security?view=sql-server-ver16. + +To generate the server certificate and key: + +``` +openssl req -x509 -nodes -newkey rsa:2048 -subj '/CN=localhost' -keyout mssql.key -out mssql.pem -days 3650 +``` + +This will output two files, `mssql.key` and `mssql.pem`. + +These files are mounted to the SQL Server docker container and therefore the correct permissions must be set +on these files so that they can be read properly by SQL Server's database process. To set the right permissions, +you need to run the following: + +``` +chmod 444 mssql.key +chmod 444 mssql.pem +``` + +Note, we explicitly use `444` rather than `440` since the volume is mounted as root but the SQL Server database +is started by the `mssql` user, this allows the key and certificate to be read. + +In addition, a special `mssql.conf` file will also be mounted in order for Microsoft SQL Server to know where +to read the database certificates from. This configuration file will be mounted automatically to +`/var/opt/mssql/mssql.conf`. The contents of this file is as follows: + +``` +[network] +tlscert = /etc/ssl/debezium/certs/mssql.pem +tlskey = /etc/ssl/debezium/private/mssql.key +tlsprotocols = 1.2 +forceencryption = 1 +``` + +This configuration enforces TLS 1.2 and encryption in order to operate with the SQL Server instance. +If the client does not support encryption or cannot negotiate TLS 1.2, the client connection will be rejected. + +Finally, the truststore is generated as follows: + +``` +keytool -import -v -trustcacerts -alias localhost -file ../../docker/mssql.pem -keystore truststore.ks -storepass debezium -noprompt +``` + +This imports the `mssql.pem` public certificate into the truststore that we mount into the container and +this truststore will be configured as part of the SQL Server connector's configuration using: + +``` +database.ssl.truststore=${project.basedir}/src/test/resources/ssl +database.ssl.truststore.password=debezium +database.trustServerCertificate=true +``` + +We specifically set `trustServerCertificate` because this certificate is self-signed. +This certificate is also valid for 10 years from August 9th, 2022, so expires August 9th, 2032. \ No newline at end of file diff --git a/sink-connector-lightweight/src/test/resources/ssl/truststore.ks b/sink-connector-lightweight/src/test/resources/ssl/truststore.ks new file mode 100644 index 000000000..8c5e79201 Binary files /dev/null and b/sink-connector-lightweight/src/test/resources/ssl/truststore.ks differ