diff --git a/sink-connector-lightweight/docker/config_sql_server.yml b/sink-connector-lightweight/docker/config_sql_server.yml
new file mode 100644
index 000000000..17d15db7f
--- /dev/null
+++ b/sink-connector-lightweight/docker/config_sql_server.yml
@@ -0,0 +1,48 @@
+name: "debezium-embedded-sql-server"
+database.hostname: "sql-server-db"
+database.port: "1433"
+database.user: "sa"
+database.password: "Root1234$$"
+database.names: "Prime"
+table.include.list: "MyTable"
+clickhouse.server.url: "clickhouse"
+clickhouse.server.user: "root"
+clickhouse.server.pass: "root"
+clickhouse.server.port: "8123"
+clickhouse.server.database: "test"
+database.allowPublicKeyRetrieval: "true"
+snapshot.mode: "initial"
+offset.flush.interval.ms: 5000
+connector.class: "io.debezium.connector.sqlserver.SqlServerConnector"
+offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"
+offset.storage.offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"
+offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
+offset.storage.jdbc.user: "root"
+offset.storage.jdbc.password: "root"
+offset.storage.offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
+(
+ `id` String,
+ `offset_key` String,
+ `offset_val` String,
+ `record_insert_ts` DateTime,
+ `record_insert_seq` UInt64,
+ `_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
+)
+ENGINE = ReplacingMergeTree(_version)
+ORDER BY id
+SETTINGS index_granularity = 8198"
+offset.storage.offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
+schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
+schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
+schema.history.internal.jdbc.user: "root"
+schema.history.internal.jdbc.password: "root"
+schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s
+(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id"
+
+schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"
+enable.snapshot.ddl: "true"
+auto.create.tables: "true"
+database.dbname: "public"
+database.ssl.truststore: "${project.basedir}/src/test/resources/ssl"
+database.ssl.truststore.password: "debezium"
+database.trustServerCertificate: "true"
\ No newline at end of file
diff --git a/sink-connector-lightweight/docker/data_sql_server/MyTable.csv b/sink-connector-lightweight/docker/data_sql_server/MyTable.csv
new file mode 100644
index 000000000..4b02e279f
--- /dev/null
+++ b/sink-connector-lightweight/docker/data_sql_server/MyTable.csv
@@ -0,0 +1,3 @@
+id,value
+1,yes
+2,it works:)
\ No newline at end of file
diff --git a/sink-connector-lightweight/docker/data_sql_server/MyTable.sql b/sink-connector-lightweight/docker/data_sql_server/MyTable.sql
new file mode 100644
index 000000000..bb0cfa830
--- /dev/null
+++ b/sink-connector-lightweight/docker/data_sql_server/MyTable.sql
@@ -0,0 +1,8 @@
+USE Prime;
+GO
+
+CREATE TABLE MyTable(
+ Id nvarchar(max),
+ Value nvarchar(max)
+);
+GO
\ No newline at end of file
diff --git a/sink-connector-lightweight/docker/data_sql_server/entrypoint.sh b/sink-connector-lightweight/docker/data_sql_server/entrypoint.sh
new file mode 100644
index 000000000..12e1155e6
--- /dev/null
+++ b/sink-connector-lightweight/docker/data_sql_server/entrypoint.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+database=Prime
+wait_time=15s
+password=Root1234$$
+
+# wait for SQL Server to come up
+echo importing data will start in $wait_time...
+sleep $wait_time
+echo importing data...
+
+# run the init script to create the DB and the tables in /table
+/opt/mssql-tools/bin/sqlcmd -S 0.0.0.0 -U sa -P $password -i ./init.sql
+
+for entry in "*.sql"
+do
+ echo executing $entry
+ /opt/mssql-tools/bin/sqlcmd -S 0.0.0.0 -U sa -P $password -i $entry
+done
+
+#import the data from the csv files
+for entry in "*.csv"
+do
+ # i.e: transform /data/MyTable.csv to MyTable
+ shortname=$(echo $entry | cut -f 1 -d '.' | cut -f 2 -d '/')
+ tableName=$database.dbo.$shortname
+ echo importing $tableName from $entry
+ /opt/mssql-tools/bin/bcp $tableName in $entry -c -t',' -F 2 -S 0.0.0.0 -U sa -P $password
+done
\ No newline at end of file
diff --git a/sink-connector-lightweight/docker/data_sql_server/init.sql b/sink-connector-lightweight/docker/data_sql_server/init.sql
new file mode 100644
index 000000000..0d0e7fdc0
--- /dev/null
+++ b/sink-connector-lightweight/docker/data_sql_server/init.sql
@@ -0,0 +1,4 @@
+DROP DATABASE Prime
+
+CREATE DATABASE Prime;
+GO
\ No newline at end of file
diff --git a/sink-connector-lightweight/docker/docker-compose-sql-server.yaml b/sink-connector-lightweight/docker/docker-compose-sql-server.yaml
new file mode 100644
index 000000000..115c51a8d
--- /dev/null
+++ b/sink-connector-lightweight/docker/docker-compose-sql-server.yaml
@@ -0,0 +1,68 @@
+version: "3.4"
+
+# Ubuntu , set this for redpanda to start
+# https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm
+
+# Clickhouse Table Schema
+# create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id;
+
+services:
+
+ sql-server-db:
+ container_name: sql-server-db
+ image: mcr.microsoft.com/mssql/server:2022-latest
+ #command: sh -c ' chmod +x /tmp/entrypoint.sh; /tmp/./entrypoint.sh & /opt/mssql/bin/sqlservr;'
+ ports:
+ - "1433:1433"
+ environment:
+ MSSQL_SA_PASSWORD: "Root1234$$"
+ ACCEPT_EULA: "Y"
+ volumes:
+ - ./data_sqlserver:/tmp
+
+ clickhouse:
+ extends:
+ file: clickhouse-service.yml
+ service: clickhouse
+ depends_on:
+ zookeeper:
+ condition: service_healthy
+
+ zookeeper:
+ extends:
+ file: zookeeper-service.yml
+ service: zookeeper
+
+ clickhouse-sink-connector-lt:
+ extends:
+ file: clickhouse-sink-connector-lt-service.yml
+ service: clickhouse-sink-connector-lt
+ depends_on:
+ - clickhouse
+ extra_hosts:
+ - "host.docker.internal:host-gateway"
+ environment:
+ JAVA_OPTS: >
+ -Xmx5G
+ -Xms128m
+ volumes:
+ - ./config_sql_server.yml:/config.yml
+
+ ### MONITORING ####
+ prometheus:
+ extends:
+ file: prometheus-service.yml
+ service: prometheus
+
+
+ grafana:
+ extends:
+ file: grafana-service.yml
+ service: grafana
+ volumes:
+ - ./config/grafana/config/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml
+ - ./config/grafana/config/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml
+ - ./config/grafana/config/altinity_sink_connector.json:/var/lib/grafana/dashboards/altinity_sink_connector.json
+ depends_on:
+ - prometheus
+ ## END OF MONITORING ###
\ No newline at end of file
diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml
index 250bbd953..af7eb7574 100644
--- a/sink-connector-lightweight/pom.xml
+++ b/sink-connector-lightweight/pom.xml
@@ -113,6 +113,11 @@
debezium-connector-postgres
${version.debezium}
+
+ io.debezium
+ debezium-connector-sqlserver
+ ${version.debezium}
+
org.postgresql
postgresql
@@ -274,6 +279,12 @@
${version.testcontainers}
test
+
+ org.testcontainers
+ mssqlserver
+ ${version.testcontainers}
+ test
+
@@ -539,6 +550,9 @@
*:*
org/apache/log4j/**
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java
new file mode 100644
index 000000000..76d2e7fc4
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java
@@ -0,0 +1,127 @@
+package com.altinity.clickhouse.debezium.embedded;
+
+
+import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
+import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
+import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import com.clickhouse.jdbc.ClickHouseConnection;
+import org.junit.Assert;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.altinity.clickhouse.debezium.embedded.PostgresProperties.getDefaultProperties;
+
+@Testcontainers
+public class SQLServerIT {
+
+ @Container
+ public static org.testcontainers.clickhouse.ClickHouseContainer clickHouseContainer =
+ new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
+ .asCompatibleSubstituteFor("clickhouse"))
+ .withInitScript("init_clickhouse_it.sql")
+ .withUsername("ch_user")
+ .withPassword("password")
+ .withExposedPorts(8123);
+
+ @Container
+ public static MSSQLServerContainer sqlServerContainer = new MSSQLServerContainer<>
+ (DockerImageName.parse("mcr.microsoft.com/mssql/server:2019-latest")
+ .asCompatibleSubstituteFor("sqlserver"))
+ .withInitScript("init_sqlserver.sql")
+ .withUsername("sa")
+ .withPassword("Password!")
+ .withDatabaseName("employees")
+ .withExposedPorts(1433);
+
+
+ public Properties getProperties() throws Exception {
+
+ Properties properties = new Properties();
+// Properties properties = getDefaultProperties(postgreSQLContainer, clickHouseContainer);
+// properties.put("plugin.name", "decoderbufs");
+// properties.put("plugin.path", "/");
+// properties.put("table.include.list", "public.tm");
+// properties.put("slot.max.retries", "6");
+// properties.put("slot.retry.delay.ms", "5000");
+// properties.put("database.allowPublicKeyRetrieval", "true");
+// properties.put("table.include.list", "public.tm,public.tm2");
+
+ return properties;
+ }
+
+ @Test
+ @DisplayName("Integration Test - Validates PostgreSQL replication when the plugin is set to DecoderBufs")
+ public void testDecoderBufsPlugin() throws Exception {
+ Network network = Network.newNetwork();
+
+ sqlServerContainer.withNetwork(network).start();
+ clickHouseContainer.withNetwork(network).start();
+ Thread.sleep(10000);
+
+ org.testcontainers.Testcontainers.exposeHostPorts(sqlServerContainer.getFirstMappedPort());
+ AtomicReference engine = new AtomicReference<>();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(() -> {
+ try {
+
+ engine.set(new DebeziumChangeEventCapture());
+ engine.get().setup(getProperties(), new SourceRecordParserService(),
+ new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
+ "employees"), false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ Thread.sleep(10000);//
+ Thread.sleep(50000);
+
+ String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "public");
+ ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
+ clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
+
+ BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
+ Map tmColumns = writer.getColumnsDataTypesForTable("tm");
+ Assert.assertTrue(tmColumns.size() == 22);
+ Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
+ Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)"));
+ //Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))"));
+ Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))"));
+
+
+ int tmCount = 0;
+ ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery();
+ while(chRs.next()) {
+ tmCount = chRs.getInt(1);
+ }
+
+ Assert.assertTrue(tmCount == 2);
+
+ if(engine.get() != null) {
+ engine.get().stop();
+ }
+ // Files.deleteIfExists(tmpFilePath);
+ executorService.shutdown();
+
+ }
+}
diff --git a/sink-connector-lightweight/src/test/resources/ssl/README.md b/sink-connector-lightweight/src/test/resources/ssl/README.md
new file mode 100644
index 000000000..4ba59537b
--- /dev/null
+++ b/sink-connector-lightweight/src/test/resources/ssl/README.md
@@ -0,0 +1,57 @@
+This directory contains the truststore (used for validating DB server certificate).
+
+The files are generated based on the certificates in src/test/docker, which are generated following the
+guidelines outlined in https://docs.microsoft.com/en-us/sql/linux/sql-server-linux-docker-container-security?view=sql-server-ver16.
+
+To generate the server certificate and key:
+
+```
+openssl req -x509 -nodes -newkey rsa:2048 -subj '/CN=localhost' -keyout mssql.key -out mssql.pem -days 3650
+```
+
+This will output two files, `mssql.key` and `mssql.pem`.
+
+These files are mounted to the SQL Server docker container and therefore the correct permissions must be set
+on these files so that they can be read properly by SQL Server's database process. To set the right permissions,
+you need to run the following:
+
+```
+chmod 444 mssql.key
+chmod 444 mssql.pem
+```
+
+Note, we explicitly use `444` rather than `440` since the volume is mounted as root but the SQL Server database
+is started by the `mssql` user, this allows the key and certificate to be read.
+
+In addition, a special `mssql.conf` file will also be mounted in order for Microsoft SQL Server to know where
+to read the database certificates from. This configuration file will be mounted automatically to
+`/var/opt/mssql/mssql.conf`. The contents of this file is as follows:
+
+```
+[network]
+tlscert = /etc/ssl/debezium/certs/mssql.pem
+tlskey = /etc/ssl/debezium/private/mssql.key
+tlsprotocols = 1.2
+forceencryption = 1
+```
+
+This configuration enforces TLS 1.2 and encryption in order to operate with the SQL Server instance.
+If the client does not support encryption or cannot negotiate TLS 1.2, the client connection will be rejected.
+
+Finally, the truststore is generated as follows:
+
+```
+keytool -import -v -trustcacerts -alias localhost -file ../../docker/mssql.pem -keystore truststore.ks -storepass debezium -noprompt
+```
+
+This imports the `mssql.pem` public certificate into the truststore that we mount into the container and
+this truststore will be configured as part of the SQL Server connector's configuration using:
+
+```
+database.ssl.truststore=${project.basedir}/src/test/resources/ssl
+database.ssl.truststore.password=debezium
+database.trustServerCertificate=true
+```
+
+We specifically set `trustServerCertificate` because this certificate is self-signed.
+This certificate is also valid for 10 years from August 9th, 2022, so expires August 9th, 2032.
\ No newline at end of file
diff --git a/sink-connector-lightweight/src/test/resources/ssl/truststore.ks b/sink-connector-lightweight/src/test/resources/ssl/truststore.ks
new file mode 100644
index 000000000..8c5e79201
Binary files /dev/null and b/sink-connector-lightweight/src/test/resources/ssl/truststore.ks differ