Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sql server support 2 4 0 #797

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions sink-connector-lightweight/docker/config_sql_server.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: "debezium-embedded-sql-server"
database.hostname: "sql-server-db"
database.port: "1433"
database.user: "sa"
database.password: "Root1234$$"
database.names: "Prime"
table.include.list: "MyTable"
clickhouse.server.url: "clickhouse"
clickhouse.server.user: "root"
clickhouse.server.pass: "root"
clickhouse.server.port: "8123"
clickhouse.server.database: "test"
database.allowPublicKeyRetrieval: "true"
snapshot.mode: "initial"
offset.flush.interval.ms: 5000
connector.class: "io.debezium.connector.sqlserver.SqlServerConnector"
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"
offset.storage.offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"
offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
offset.storage.jdbc.user: "root"
offset.storage.jdbc.password: "root"
offset.storage.offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
`id` String,
`offset_key` String,
`offset_val` String,
`record_insert_ts` DateTime,
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8198"
offset.storage.offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
schema.history.internal.jdbc.user: "root"
schema.history.internal.jdbc.password: "root"
schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s
(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id"

schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"
enable.snapshot.ddl: "true"
auto.create.tables: "true"
database.dbname: "public"
database.ssl.truststore: "${project.basedir}/src/test/resources/ssl"
database.ssl.truststore.password: "debezium"
database.trustServerCertificate: "true"
3 changes: 3 additions & 0 deletions sink-connector-lightweight/docker/data_sql_server/MyTable.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id,value
1,yes
2,it works:)
8 changes: 8 additions & 0 deletions sink-connector-lightweight/docker/data_sql_server/MyTable.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
USE Prime;
GO

CREATE TABLE MyTable(
Id nvarchar(max),
Value nvarchar(max)
);
GO
28 changes: 28 additions & 0 deletions sink-connector-lightweight/docker/data_sql_server/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
database=Prime
wait_time=15s
password=Root1234$$

# wait for SQL Server to come up
echo importing data will start in $wait_time...
sleep $wait_time
echo importing data...

# run the init script to create the DB and the tables in /table
/opt/mssql-tools/bin/sqlcmd -S 0.0.0.0 -U sa -P $password -i ./init.sql

for entry in "*.sql"
do
echo executing $entry
/opt/mssql-tools/bin/sqlcmd -S 0.0.0.0 -U sa -P $password -i $entry
done

#import the data from the csv files
for entry in "*.csv"
do
# i.e: transform /data/MyTable.csv to MyTable
shortname=$(echo $entry | cut -f 1 -d '.' | cut -f 2 -d '/')
tableName=$database.dbo.$shortname
echo importing $tableName from $entry
/opt/mssql-tools/bin/bcp $tableName in $entry -c -t',' -F 2 -S 0.0.0.0 -U sa -P $password
done
4 changes: 4 additions & 0 deletions sink-connector-lightweight/docker/data_sql_server/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP DATABASE Prime

CREATE DATABASE Prime;
GO
68 changes: 68 additions & 0 deletions sink-connector-lightweight/docker/docker-compose-sql-server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
version: "3.4"

# Ubuntu , set this for redpanda to start
# https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm

# Clickhouse Table Schema
# create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id;

services:

sql-server-db:
container_name: sql-server-db
image: mcr.microsoft.com/mssql/server:2022-latest
#command: sh -c ' chmod +x /tmp/entrypoint.sh; /tmp/./entrypoint.sh & /opt/mssql/bin/sqlservr;'
ports:
- "1433:1433"
environment:
MSSQL_SA_PASSWORD: "Root1234$$"
ACCEPT_EULA: "Y"
volumes:
- ./data_sqlserver:/tmp

clickhouse:
extends:
file: clickhouse-service.yml
service: clickhouse
depends_on:
zookeeper:
condition: service_healthy

zookeeper:
extends:
file: zookeeper-service.yml
service: zookeeper

clickhouse-sink-connector-lt:
extends:
file: clickhouse-sink-connector-lt-service.yml
service: clickhouse-sink-connector-lt
depends_on:
- clickhouse
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
JAVA_OPTS: >
-Xmx5G
-Xms128m
volumes:
- ./config_sql_server.yml:/config.yml

### MONITORING ####
prometheus:
extends:
file: prometheus-service.yml
service: prometheus


grafana:
extends:
file: grafana-service.yml
service: grafana
volumes:
- ./config/grafana/config/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml
- ./config/grafana/config/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml
- ./config/grafana/config/altinity_sink_connector.json:/var/lib/grafana/dashboards/altinity_sink_connector.json
depends_on:
- prometheus
## END OF MONITORING ###
14 changes: 14 additions & 0 deletions sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@
<artifactId>debezium-connector-postgres</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand Down Expand Up @@ -274,6 +279,12 @@
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mssqlserver</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>

<!--junit for unit test-->
<dependency>
Expand Down Expand Up @@ -539,6 +550,9 @@
<artifact>*:*</artifact>
<excludes>
<exclude>org/apache/log4j/**</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.altinity.clickhouse.debezium.embedded;


import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.junit.Assert;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static com.altinity.clickhouse.debezium.embedded.PostgresProperties.getDefaultProperties;

@Testcontainers
public class SQLServerIT {

@Container
public static org.testcontainers.clickhouse.ClickHouseContainer clickHouseContainer =
new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);

@Container
public static MSSQLServerContainer sqlServerContainer = new MSSQLServerContainer<>
(DockerImageName.parse("mcr.microsoft.com/mssql/server:2019-latest")
.asCompatibleSubstituteFor("sqlserver"))
.withInitScript("init_sqlserver.sql")
.withUsername("sa")

Check failure on line 48 in sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

SQLServerIT.com.altinity.clickhouse.debezium.embedded.SQLServerIT

java.lang.ExceptionInInitializerError at com.altinity.clickhouse.debezium.embedded.SQLServerIT.<clinit>(SQLServerIT.java:48)
Raw output
java.lang.ExceptionInInitializerError
	at com.altinity.clickhouse.debezium.embedded.SQLServerIT.<clinit>(SQLServerIT.java:48)
.withPassword("Password!")
.withDatabaseName("employees")
.withExposedPorts(1433);


public Properties getProperties() throws Exception {

Properties properties = new Properties();
// Properties properties = getDefaultProperties(postgreSQLContainer, clickHouseContainer);
// properties.put("plugin.name", "decoderbufs");
// properties.put("plugin.path", "/");
// properties.put("table.include.list", "public.tm");
// properties.put("slot.max.retries", "6");
// properties.put("slot.retry.delay.ms", "5000");
// properties.put("database.allowPublicKeyRetrieval", "true");
// properties.put("table.include.list", "public.tm,public.tm2");

return properties;
}

@Test
@DisplayName("Integration Test - Validates PostgreSQL replication when the plugin is set to DecoderBufs")
public void testDecoderBufsPlugin() throws Exception {
Network network = Network.newNetwork();

sqlServerContainer.withNetwork(network).start();
clickHouseContainer.withNetwork(network).start();
Thread.sleep(10000);

org.testcontainers.Testcontainers.exposeHostPorts(sqlServerContainer.getFirstMappedPort());
AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

Thread.sleep(10000);//
Thread.sleep(50000);

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public");
ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
Assert.assertTrue(tmColumns.size() == 22);
Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)"));
//Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))"));
Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))"));


int tmCount = 0;
ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery();
while(chRs.next()) {
tmCount = chRs.getInt(1);
}

Assert.assertTrue(tmCount == 2);

if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();

}
}
57 changes: 57 additions & 0 deletions sink-connector-lightweight/src/test/resources/ssl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
This directory contains the truststore (used for validating DB server certificate).

The files are generated based on the certificates in src/test/docker, which are generated following the
guidelines outlined in https://docs.microsoft.com/en-us/sql/linux/sql-server-linux-docker-container-security?view=sql-server-ver16.

To generate the server certificate and key:

```
openssl req -x509 -nodes -newkey rsa:2048 -subj '/CN=localhost' -keyout mssql.key -out mssql.pem -days 3650
```

This will output two files, `mssql.key` and `mssql.pem`.

These files are mounted to the SQL Server docker container and therefore the correct permissions must be set
on these files so that they can be read properly by SQL Server's database process. To set the right permissions,
you need to run the following:

```
chmod 444 mssql.key
chmod 444 mssql.pem
```

Note, we explicitly use `444` rather than `440` since the volume is mounted as root but the SQL Server database
is started by the `mssql` user, this allows the key and certificate to be read.

In addition, a special `mssql.conf` file will also be mounted in order for Microsoft SQL Server to know where
to read the database certificates from. This configuration file will be mounted automatically to
`/var/opt/mssql/mssql.conf`. The contents of this file is as follows:

```
[network]
tlscert = /etc/ssl/debezium/certs/mssql.pem
tlskey = /etc/ssl/debezium/private/mssql.key
tlsprotocols = 1.2
forceencryption = 1
```

This configuration enforces TLS 1.2 and encryption in order to operate with the SQL Server instance.
If the client does not support encryption or cannot negotiate TLS 1.2, the client connection will be rejected.

Finally, the truststore is generated as follows:

```
keytool -import -v -trustcacerts -alias localhost -file ../../docker/mssql.pem -keystore truststore.ks -storepass debezium -noprompt
```

This imports the `mssql.pem` public certificate into the truststore that we mount into the container and
this truststore will be configured as part of the SQL Server connector's configuration using:

```
database.ssl.truststore=${project.basedir}/src/test/resources/ssl
database.ssl.truststore.password=debezium
database.trustServerCertificate=true
```

We specifically set `trustServerCertificate` because this certificate is self-signed.
This certificate is also valid for 10 years from August 9th, 2022, so expires August 9th, 2032.
Binary file not shown.
Loading