diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..260c97c89 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11 +COPY sink-connector-client/sink-connector-client /sink-connector-client +COPY sink-connector-lightweight/target/clickhouse-debezium-embedded-1.0-SNAPSHOT.jar /app.jar +ENV JAVA_OPTS="-Dlog4jDebug=true" +ENTRYPOINT ["java", "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005", "-jar","/app.jar", "/config.yml", "com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"] diff --git a/build_docker.sh b/build_docker.sh new file mode 100755 index 000000000..69fd0a448 --- /dev/null +++ b/build_docker.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +cd sink-connector-lightweight +mvn clean install -DskipTests=true +today_date=$(date +%F) + +cd .. +cd sink-connector-client +CGO_ENABLED=0 go build + +cd .. + +docker login registry.gitlab.com +docker build -f sink-connector-lightweight/Dockerfile -t clickhouse_debezium_embedded:${today_date} . --no-cache +docker tag clickhouse_debezium_embedded:${today_date} registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${today_date} +docker push registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${today_date} diff --git a/doc/postgres_wal.md b/doc/postgres_wal.md new file mode 100644 index 000000000..3b29db016 --- /dev/null +++ b/doc/postgres_wal.md @@ -0,0 +1,11 @@ +Using the `pg_waldump` utility to dump the WAL log information. `pg_waldump` utility needs to be provided the postgresql data directory path. +``` +pg_waldump pg_wal/000000010000000000000001 +``` + +``` +┌─id───────────────────────────────────┬─offset_key────────────────────────────────────┬─offset_val──────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────record_insert_ts─┬─record_insert_seq─┐ +│ 03750062-c862-48c5-9f37-451c0d33511b │ ["\"engine\"",{"server":"embeddedconnector"}] │ {"transaction_id":null,"lsn_proc":27485360,"messageType":"UPDATE","lsn":27485360,"txId":743,"ts_usec":1687876724804733} │ 2023-06-27 14:38:45 │ 1 │ +└──────────────────────────────────────┴───────────────────────────────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────┴───────────────────┘ + +``` \ No newline at end of file diff --git a/sink-connector-client/go.mod b/sink-connector-client/go.mod index f8e5e09f7..e79de5ecf 100644 --- a/sink-connector-client/go.mod +++ b/sink-connector-client/go.mod @@ -4,12 +4,17 @@ go 1.18 require ( github.com/levigross/grequests v0.0.0-20221222020224-9eee758d18d5 + github.com/stretchr/testify v1.8.2 + github.com/tidwall/pretty v1.2.1 github.com/urfave/cli v1.22.13 ) require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/go-querystring v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/sink-connector-client/go.sum b/sink-connector-client/go.sum index d1c20d58a..df7432496 100644 --- a/sink-connector-client/go.sum +++ b/sink-connector-client/go.sum @@ -19,10 +19,13 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/urfave/cli v1.22.13 h1:wsLILXG8qCJNse/qAgLNf23737Cx05GflHg/PJGe1Ok= github.com/urfave/cli v1.22.13/go.mod h1:VufqObjsMTF2BBwKawpx9R8eAneNEWhoO0yx8Vd+FkE= golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 h1:Y/KGZSOdz/2r0WJ9Mkmz6NJBusp0kiNx1Cn82lzJQ6w= golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sink-connector-client/main.go b/sink-connector-client/main.go index 4f6916635..2f690fe83 100644 --- a/sink-connector-client/main.go +++ b/sink-connector-client/main.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "github.com/levigross/grequests" + "github.com/tidwall/pretty" cli "github.com/urfave/cli" "log" "os" @@ -13,22 +14,32 @@ import ( var requestOptions = &grequests.RequestOptions{} type UpdateBinLog struct { - File string `json:"binlog_file"` - Position string `json:"binlog_position"` - Gtid string `json:"gtid"` + File string `json:"binlog_file"` + Position string `json:"binlog_position"` + Gtid string `json:"gtid"` + SourceHost string `json:"source_host"` + SourcePort string `json:"source_port"` + SourceUser string `json:"source_user"` + SourcePassword string `json:"source_password"` +} + +type UpdateLsn struct { + Lsn string `json:"lsn"` } const ( START_REPLICATION_COMMAND = "start_replica" STOP_REPLICATION_COMAND = "stop_replica" STATUS_COMMAND = "show_replica_status" - UPDATE_BINLOG_COMMAND = "update_binlog" + UPDATE_BINLOG_COMMAND = "change_replication_source" + UPDATE_LSN_COMMAND = "lsn" ) const ( START_REPLICATION = "start" STOP_REPLICATION = "stop" STATUS = "status" UPDATE_BINLOG = "binlog" + UPDATE_LSN = "lsn" ) // Fetches the repos for the given Github users @@ -118,7 +129,9 @@ func main() { Action: func(c *cli.Context) error { var serverUrl = getServerUrl(STATUS, c) resp := getHTTPCall(serverUrl) - log.Println(resp.String()) + + //var j, _ = json.MarshalIndent(resp, "", " ") + fmt.Println(string(pretty.Pretty([]byte(resp.String())))) return nil }, }, @@ -129,30 +142,112 @@ func main() { cli.StringFlag{ Name: "binlog_file", Usage: "Set binlog file", - Required: true, + Required: false, }, cli.StringFlag{ Name: "binlog_position", Usage: "Set binlog position", - Required: true, + Required: false, }, cli.StringFlag{ Name: "gtid", Usage: "Set GTID", - Required: true, + Required: false, + }, + + cli.StringFlag{ + Name: "source_host", + Usage: "Source Hostname", + Required: false, + }, + + cli.StringFlag{ + Name: "source_port", + Usage: "Source Port", + Required: false, + }, + + cli.StringFlag{ + Name: "source_username", + Usage: "Source Username", + Required: false, + }, + + cli.StringFlag{ + Name: "source_password", + Usage: "Source Password", + Required: false, }, }, Action: func(c *cli.Context) error { + handleUpdateBinLogAction(c) return nil }, }, + { + Name: UPDATE_LSN_COMMAND, + Usage: "Update lsn(For postgreSQL)", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "lsn", + Usage: "Set LSN position(For PostgreSQL)", + Required: true, + }, + }, + Action: func(c *cli.Context) error { + handleUpdateLsn(c) + return nil + }, + }, } app.Version = "1.0" app.Run(os.Args) } +func handleUpdateLsn(c *cli.Context) bool { + var lsnPosition = c.String("lsn") + log.Println("***** lsn position:", lsnPosition+" *****") + log.Println("Are you sure you want to continue? (y/n): ") + var userInput string + fmt.Scanln(&userInput) + if userInput != "y" { + log.Println("Exiting...") + return false + } else { + log.Println("Continuing...") + } + + // Step1: Stop replication + log.Println("Stopping replication...") + var stopUrl = getServerUrl(STOP_REPLICATION, c) + resp := getHTTPCall(stopUrl) + time.Sleep(5 * time.Second) + + // Step2: Update binlog position + log.Println("Updating lsn position..") + var updateLsnBody = UpdateLsn{Lsn: lsnPosition} + var postBody, _ = json.Marshal(updateLsnBody) + var requestOptions_copy = requestOptions + // Add data to JSON field + requestOptions_copy.JSON = string(postBody) + var serverUrl = getServerUrl(UPDATE_LSN, c) + resp, err := grequests.Post(serverUrl, requestOptions_copy) + log.Println(resp.String()) + if err != nil { + log.Println(err) + log.Println("Create request failed for Github API") + } + + // Step3: Start replication + time.Sleep(10 * time.Second) + var startUrl = getServerUrl(START_REPLICATION, c) + resp1 := getHTTPCall(startUrl) + log.Println(resp1.String()) + return true +} + /** Function to handle update binlog action which is used to set binlog file/position and gtids @@ -161,10 +256,32 @@ func handleUpdateBinLogAction(c *cli.Context) bool { var binlogFile = c.String("binlog_file") var binlogPos = c.String("binlog_position") var gtid = c.String("gtid") + var sourceHost = c.String("source_host") + var sourcePort = c.String("source_port") + var sourceUsername = c.String("source_username") + var sourcePassword = c.String("source_password") + if gtid == "" { + // If gtid is empty, then a valid binlog file and position + // needs to be passed. + //if binlogPos == "" || binlogFile == "" { + // log.Println(" ****** A Valid binlog position/file or GTID set is required") + // cli.ShowCommandHelp(c, UPDATE_BINLOG_COMMAND) + // return false + //} else if sourceHost == "" || sourcePort == "" || sourceUsername == "" || sourcePassword == "" { + // log.Println(" ****** A Valid source host/port/username/password is required") + // cli.ShowCommandHelp(c, UPDATE_BINLOG_COMMAND) + // return false + //} + } log.Println("***** binlog file: ", binlogFile+" *****") log.Println("***** binlog position:", binlogPos+" *****") log.Println("***** GTID:", gtid+" *****") + log.Println("***** Source Host:", sourceHost+" *****") + log.Println("***** Source Port:", sourcePort+" *****") + log.Println("***** Source Username:", sourceUsername+" *****") + log.Println("***** Source Password:", sourcePassword+" *****") + log.Println("Are you sure you want to continue? (y/n): ") var userInput string fmt.Scanln(&userInput) @@ -175,31 +292,38 @@ func handleUpdateBinLogAction(c *cli.Context) bool { log.Println("Continuing...") } - // Step1: Stop replication - log.Println("Stopping replication...") - var stopUrl = getServerUrl(STOP_REPLICATION, c) - resp := getHTTPCall(stopUrl) - time.Sleep(5 * time.Second) + //// Step1: Stop replication + //log.Println("Stopping replication...") + //var stopUrl = getServerUrl(STOP_REPLICATION, c) + //resp := getHTTPCall(stopUrl) + //time.Sleep(5 * time.Second) // Step2: Update binlog position log.Println("Updating binlog file/position and gtids...") - var updateBinLogBody = UpdateBinLog{File: binlogFile, Position: binlogPos, Gtid: gtid} + var updateBinLogBody = UpdateBinLog{File: binlogFile, Position: binlogPos, Gtid: gtid, SourceHost: sourceHost, SourcePort: sourcePort, SourceUser: sourceUsername, SourcePassword: sourcePassword} var postBody, _ = json.Marshal(updateBinLogBody) var requestOptions_copy = requestOptions // Add data to JSON field requestOptions_copy.JSON = string(postBody) var serverUrl = getServerUrl(UPDATE_BINLOG, c) resp, err := grequests.Post(serverUrl, requestOptions_copy) + if resp.StatusCode == 400 { + log.Println("***** Error: Replication is running, please stop it first ******") + log.Println("***** Use stop_replica command to stop replication ******") + log.Println("***** After change_replication_source is successful, use start_replica to start replication *******") + return false + } + log.Println(resp.String()) if err != nil { log.Println(err) - log.Println("Create request failed for Github API") + log.Println("Create request failed") } - - // Step3: Start replication - time.Sleep(10 * time.Second) - var startUrl = getServerUrl(START_REPLICATION, c) - resp1 := getHTTPCall(startUrl) - log.Println(resp1.String()) + // + //// Step3: Start replication + //time.Sleep(10 * time.Second) + //var startUrl = getServerUrl(START_REPLICATION, c) + //resp1 := getHTTPCall(startUrl) + //log.Println(resp1.String()) return true } diff --git a/sink-connector-client/main_test.go b/sink-connector-client/main_test.go new file mode 100644 index 000000000..b01fec12e --- /dev/null +++ b/sink-connector-client/main_test.go @@ -0,0 +1,34 @@ +package main + +import ( + "github.com/stretchr/testify/assert" + "github.com/urfave/cli" + "testing" +) + +func TestGetServerUrl(t *testing.T) { + var c = cli.Context{ + App: cli.NewApp(), + Command: cli.Command{}, + } + c.App.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "host", + Usage: "Host server address of sink connector", + Required: false, + }, + cli.StringFlag{ + Name: "port", + Usage: "Port of sink connector", + Required: false, + }, + cli.BoolFlag{ + Name: "secure", + Usage: "If true, then use https, else http", + Required: false, + }, + } + //var c = cli.NewContext(nil, nil, nil) + var serverUrl = getServerUrl("start", &c) + assert.Equal(t, "http://localhost:7000/start_replica", serverUrl, "they should be equal") +} diff --git a/sink-connector-client/sink-connector-client b/sink-connector-client/sink-connector-client index c13f13a59..a0b351291 100755 Binary files a/sink-connector-client/sink-connector-client and b/sink-connector-client/sink-connector-client differ diff --git a/sink-connector-lightweight/Dockerfile b/sink-connector-lightweight/Dockerfile index a958eb605..260c97c89 100644 --- a/sink-connector-lightweight/Dockerfile +++ b/sink-connector-lightweight/Dockerfile @@ -1,4 +1,5 @@ FROM openjdk:11 -COPY target/clickhouse-debezium-embedded-1.0-SNAPSHOT.jar /app.jar +COPY sink-connector-client/sink-connector-client /sink-connector-client +COPY sink-connector-lightweight/target/clickhouse-debezium-embedded-1.0-SNAPSHOT.jar /app.jar ENV JAVA_OPTS="-Dlog4jDebug=true" ENTRYPOINT ["java", "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005", "-jar","/app.jar", "/config.yml", "com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"] diff --git a/sink-connector-lightweight/clickhouse-debezium-embedded (2).iml b/sink-connector-lightweight/clickhouse-debezium-embedded (2).iml new file mode 100644 index 000000000..219cba0fc --- /dev/null +++ b/sink-connector-lightweight/clickhouse-debezium-embedded (2).iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/sink-connector-lightweight/docker/config_postgres.yml b/sink-connector-lightweight/docker/config_postgres.yml index 00b7ed14a..eef680390 100644 --- a/sink-connector-lightweight/docker/config_postgres.yml +++ b/sink-connector-lightweight/docker/config_postgres.yml @@ -1,3 +1,4 @@ +name: "debezium-embedded-postgres" database.hostname: "postgres" database.port: "5432" database.user: "root" diff --git a/sink-connector-lightweight/docker/docker-compose-postgres.yml b/sink-connector-lightweight/docker/docker-compose-postgres.yml index 7d3e874c6..42038936f 100644 --- a/sink-connector-lightweight/docker/docker-compose-postgres.yml +++ b/sink-connector-lightweight/docker/docker-compose-postgres.yml @@ -44,25 +44,6 @@ services: volumes: #- ../sql/init_ch.sql:/docker-entrypoint-initdb.d/init_clickhouse.sql - ../clickhouse/users.xml:/etc/clickhouse-server/users.xml - depends_on: - zookeeper: - condition: service_healthy - - zookeeper: - image: zookeeper:3.6.2 - expose: - - "2181" - environment: - ZOO_TICK_TIME: 500 - ZOO_MY_ID: 1 - healthcheck: - test: echo stat | nc localhost 2181 - interval: 3s - timeout: 2s - retries: 5 - start_period: 2s - security_opt: - - label:disable debezium-embedded: image: registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${SINK_LIGHTWEIGHT_VERSION} @@ -72,6 +53,7 @@ services: ports: - "8083:8083" - "5005:5005" + - "7000:7000" depends_on: - clickhouse env_file: @@ -81,3 +63,34 @@ services: volumes: #- ./data:/data - ./config_postgres.yml:/config.yml + + ### MONITORING #### + prometheus: + container_name: prometheus + image: bitnami/prometheus:2.36.0 + restart: "no" + ports: + - "9090:9090" + volumes: + - ./config/prometheus.yml:/opt/bitnami/prometheus/conf/prometheus.yml + + grafana: + build: + context: ./config/grafana + args: + GRAFANA_VERSION: latest + #container_name: grafana + #image: grafana/grafana + restart: "no" + #volumes: + # - ../config/grafana/dashboards:/etc/grafana/provisioning/dashboards + ports: + - "3000:3000" + environment: + - DS_PROMETHEUS=prometheus + - GF_USERS_DEFAULT_THEME=light + - GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS=vertamedia-clickhouse-datasource,grafana-clickhouse-datasource + - GF_INSTALL_PLUGINS=vertamedia-clickhouse-datasource,grafana-clickhouse-datasource + depends_on: + - prometheus + ## END OF MONITORING ### \ No newline at end of file diff --git a/sink-connector-lightweight/docker/docker-compose.yml b/sink-connector-lightweight/docker/docker-compose.yml index d0a918c9a..19190de2c 100644 --- a/sink-connector-lightweight/docker/docker-compose.yml +++ b/sink-connector-lightweight/docker/docker-compose.yml @@ -30,6 +30,43 @@ services: timeout: 5s retries: 6 + mysql-slave: + # mysql --host=127.0.0.1 --port=3306 --user=root --password=root --database=test + # SHOW VARIABLES LIKE 'server_id'; + # SHOW VARIABLES LIKE 'log_bin'; + # SHOW MASTER STATUS; + container_name: mysql-slave + image: docker.io/bitnami/mysql:8.0 + # command: + # --default-authentication-plugin=mysql_native_password + # --server-id=1 + # --binlog_do_db=test + restart: "no" + ports: + - "3306" + environment: + - MYSQL_ROOT_PASSWORD=root + - MYSQL_DATABASE=test + - MYSQL_REPLICATION_MODE=slave + - MYSQL_REPLICATION_USER=repl_user + - MYSQL_MASTER_HOST=mysql-master + - MYSQL_MASTER_PORT_NUMBER=3306 + - MYSQL_MASTER_ROOT_PASSWORD=root + - ALLOW_EMPTY_PASSWORD=yes + volumes: + - ./mysqld-slave.cnf:/opt/bitnami/mysql/conf/my_custom.cnf + # volumes: + # - ../sql/init_mysql.sql:/docker-entrypoint-initdb.d/init_mysql.sql + healthcheck: + test: [ 'CMD', '/opt/bitnami/scripts/mysql/healthcheck.sh' ] + interval: 15s + timeout: 5s + retries: 6 + #security_opt: + # - seccomp:unconfined + depends_on: + - mysql-master + clickhouse: # clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test container_name: clickhouse diff --git a/sink-connector-lightweight/docker/mysqld-slave.cnf b/sink-connector-lightweight/docker/mysqld-slave.cnf new file mode 100644 index 000000000..2e43de952 --- /dev/null +++ b/sink-connector-lightweight/docker/mysqld-slave.cnf @@ -0,0 +1,12 @@ +[mysqld] +max_connections=100000 + + +gtid-mode = on +enforce-gtid-consistency = true +# for loading files +local_infile = on +# to support tables without PK +sql_generate_invisible_primary_key=1 + +log-slave-updates diff --git a/sink-connector-lightweight/jar/debezium-storage-jdbc-2.4.0-SNAPSHOT.jar b/sink-connector-lightweight/jar/debezium-storage-jdbc-2.4.0-SNAPSHOT.jar new file mode 100644 index 000000000..b53f539a8 Binary files /dev/null and b/sink-connector-lightweight/jar/debezium-storage-jdbc-2.4.0-SNAPSHOT.jar differ diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index 64d398fe8..f2832df81 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -120,8 +120,8 @@ io.debezium debezium-storage-jdbc 2.4.0-SNAPSHOT - - + diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java index 5461baf80..141932852 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java @@ -1,6 +1,7 @@ package com.altinity.clickhouse.debezium.embedded; import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import static com.altinity.clickhouse.debezium.embedded.cdc.DebeziumOffsetStorage.*; import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader; import com.altinity.clickhouse.debezium.embedded.config.ConfigurationService; @@ -10,7 +11,9 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.google.inject.Guice; import com.google.inject.Injector; +import io.debezium.engine.DebeziumEngine; import io.javalin.Javalin; +import io.javalin.http.HttpStatus; import org.apache.log4j.ConsoleAppender; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -20,11 +23,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.xml.transform.Result; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; -import java.util.concurrent.Future; public class ClickHouseDebeziumEmbeddedApplication { @@ -33,6 +34,10 @@ public class ClickHouseDebeziumEmbeddedApplication { private static ClickHouseDebeziumEmbeddedApplication embeddedApplication; private static DebeziumChangeEventCapture debeziumChangeEventCapture; + + + private static Properties userProperties = new Properties(); + /** * Main Entry for the application * @param args arguments @@ -97,22 +102,65 @@ public static void main(String[] args) throws Exception { }); app.post("/binlog", ctx -> { + if(debeziumChangeEventCapture.isReplicationRunning()) { + ctx.status(HttpStatus.BAD_REQUEST); + return; + } + String body = ctx.body(); + JSONObject jsonObject = (JSONObject) new JSONParser().parse(body); + String binlogFile = (String) jsonObject.get(BINLOG_FILE); + String binlogPosition = (String) jsonObject.get(BINLOG_POS); + String gtid = (String) jsonObject.get(GTID); + + String sourceHost = (String) jsonObject.get(SOURCE_HOST); + String sourcePort = (String) jsonObject.get(SOURCE_PORT); + String sourceUser = (String) jsonObject.get(SOURCE_USER); + String sourcePassword = (String) jsonObject.get(SOURCE_PASSWORD); + + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(finalProps1)); + + if(sourceHost != null && !sourceHost.isEmpty()) { + userProperties.setProperty("database.hostname", sourceHost); + } + + if(sourcePort != null && !sourcePort.isEmpty()) { + userProperties.setProperty("database.port", sourcePort); + } + + if(sourceUser != null && !sourceUser.isEmpty()) { + userProperties.setProperty("database.user", sourceUser); + } + + if(sourcePassword != null && !sourcePassword.isEmpty()) { + userProperties.setProperty("database.password", sourcePassword); + } + + if(userProperties.size() > 0) { + log.info("User Overridden properties: " + userProperties); + + } + + debeziumChangeEventCapture.updateDebeziumStorageStatus(config, finalProps1, binlogFile, binlogPosition, + gtid, sourceHost, sourcePort, sourceUser, sourcePassword); + log.info("Received update-binlog request: " + body); + }); + + app.post("/lsn", ctx -> { String body = ctx.body(); JSONObject jsonObject = (JSONObject) new JSONParser().parse(body); - String binlogFile = (String) jsonObject.get("binlog_file"); - String binlogPosition = (String) jsonObject.get("binlog_position"); - String gtid = (String) jsonObject.get("gtid"); + String lsn = (String) jsonObject.get(LSN); + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(finalProps1)); - debeziumChangeEventCapture.updateDebeziumStorageStatus(config, finalProps1, binlogFile, binlogPosition, gtid); + debeziumChangeEventCapture.updateDebeziumStorageStatus(config, finalProps1, lsn); log.info("Received update-binlog request: " + body); }); Properties finalProps = props; app.get("/start", ctx -> { - + finalProps.putAll(userProperties); CompletableFuture cf = startDebeziumEventLoop(injector, finalProps); - ctx.result("Started Debezium Event Loop"); + ctx.result("Started Replication...."); }); // app.get("/updateBinLogStatus", ctx -> { @@ -124,14 +172,7 @@ public static void main(String[] args) throws Exception { embeddedApplication = new ClickHouseDebeziumEmbeddedApplication(); embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(ConfigurationService.class), injector.getInstance(DDLParserService.class), props); - - - - - - } public static CompletableFuture startDebeziumEventLoop(Injector injector, Properties props) throws InterruptedException { @@ -144,7 +185,6 @@ public static CompletableFuture startDebeziumEventLoop(Injector injector embeddedApplication = new ClickHouseDebeziumEmbeddedApplication(); embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(ConfigurationService.class), injector.getInstance(DDLParserService.class), props); return null; }); @@ -154,7 +194,6 @@ public static CompletableFuture startDebeziumEventLoop(Injector injector public void start(DebeziumRecordParserService recordParserService, - ConfigurationService configurationService, DDLParserService ddlParserService, Properties props) throws Exception { // Define the configuration for the Debezium Engine with MySQL connector... // log.debug("Loading properties"); diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 8ba3b7c6e..19f39bc7f 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -9,14 +9,12 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.altinity.clickhouse.sink.connector.common.Metrics; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.altinity.clickhouse.sink.connector.db.DbWriter; import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable; import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor; import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; import com.altinity.clickhouse.sink.connector.model.DBCredentials; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.connection.PostgresConnection; @@ -39,9 +37,7 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -70,6 +66,16 @@ public class DebeziumChangeEventCapture { static public boolean isNewReplacingMergeTreeEngine = true; + private long replicationLag = 0; + + private boolean isReplicationRunning = false; + + private String binLogFile = ""; + + private String binLogPosition = ""; + + private String gtid = ""; + DebeziumEngine> engine; private void performDDLOperation(String DDL, Properties props, SourceRecord sr, ClickHouseSinkConnectorConfig config) { @@ -177,18 +183,21 @@ private void processEveryChangeRecord(Properties props, ChangeEvent queue = new ConcurrentLinkedQueue(); if (chStruct != null) { queue.add(chStruct); @@ -316,6 +325,13 @@ public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pro .collect(Collectors.toList()); JSONArray result = new JSONArray(); + JSONObject replicationLag = new JSONObject(); + replicationLag.put("Seconds_Behind_Source", this.replicationLag/1000); + result.add(replicationLag); + + JSONObject replicationRunning = new JSONObject(); + replicationRunning.put("Replica_Running", this.isReplicationRunning); + result.add(replicationRunning); // Add Database name and table name. JSONObject dbName = new JSONObject(); @@ -348,7 +364,9 @@ public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pro * @param gtid */ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props, - String binlogFile, String binLogPosition, String gtid) throws SQLException, ParseException { + String binlogFile, String binLogPosition, String gtid, + String sourceHost, String sourcePort, String sourceUsername, + String sourcePassword) throws SQLException, ParseException { String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + @@ -370,6 +388,37 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr } + /** + * Function to update the status of Debezium storage (LSN). + * @param config + * @param props + * @param lsn + * @throws SQLException + * @throws ParseException + */ + public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props, + String lsn) throws SQLException, ParseException { + + + String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + + JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name()); + DBCredentials dbCredentials = parseDBConfiguration(config); + + BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), + dbCredentials.getDatabase(), dbCredentials.getUserName(), + dbCredentials.getPassword(), config); + String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(props, writer); + + String offsetKey = new DebeziumOffsetStorage().getOffsetKey(props); + String updateOffsetValue = new DebeziumOffsetStorage().updateLsnInformation(offsetValue, + Long.parseLong(lsn)); + + new DebeziumOffsetStorage().deleteOffsetStorageRow(offsetKey, props, writer); + new DebeziumOffsetStorage().updateDebeziumStorageRow(writer, tableName, offsetKey, updateOffsetValue, + System.currentTimeMillis()); + + } + public static int MAX_RETRIES = 25; public static int SLEEP_TIME = 10000; @@ -413,7 +462,22 @@ public void handle(boolean b, String s, Throwable throwable) { } log.debug("Completion callback"); } - }).build(); + + }).using( + new DebeziumEngine.ConnectorCallback() { + @Override + public void connectorStarted() { + isReplicationRunning = true; + log.debug("Connector started"); + } + + @Override + public void connectorStopped() { + isReplicationRunning = false; + log.debug("Connector stopped"); + } + } + ).build(); engine.run(); } catch (Exception e) { @@ -452,6 +516,18 @@ public void stop() throws IOException { Metrics.stop(); } + public long getReplicationLag() { + return this.replicationLag; + } + + public long getReplicationLagInSecs() { + return this.replicationLag / 1000; + } + + public boolean isReplicationRunning() { + return this.isReplicationRunning; + } + DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config) { DBCredentials dbCredentials = new DBCredentials(); @@ -498,6 +574,7 @@ private void addRecordsToSharedBuffer(String topicName, ClickHouseStruct chs) { } } + // db.items.insert({_id:ObjectId(), uuid:ObjectId(), price:22, name:"New record"}); private void trySomething(Configuration config){ PostgresConnectorConfig postgresConfig = new PostgresConnectorConfig(config); diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java index 22547bd6b..2e8bfa0d9 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java @@ -1,10 +1,9 @@ package com.altinity.clickhouse.debezium.embedded.cdc; -import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.altinity.clickhouse.sink.connector.model.DBCredentials; -import com.fasterxml.jackson.databind.annotation.JsonAppend; + import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; +import org.apache.kafka.common.protocol.types.Field; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; @@ -17,6 +16,24 @@ public class DebeziumOffsetStorage { + // MySQL parameters + public static final String BINLOG_POS = "binlog_pos"; + public static final String BINLOG_FILE = "binlog_file"; + + public static final String GTID = "gtid"; + + // PostgreSQL parameters + public static final String LSN_PROCESSED = "lsn_proc"; + public static final String LSN = "lsn"; + + // Source Host + public static final String SOURCE_HOST = "source_host"; + public static final String SOURCE_PORT = "source_port"; + public static final String SOURCE_USER = "source_user"; + public static final String SOURCE_PASSWORD = "source_password"; + + + public String getOffsetKey(Properties props) { String connectorName = props.getProperty("name"); return String.format("[\"%s\",{\"server\":\"embeddedconnector\"}]", connectorName); @@ -62,15 +79,42 @@ public String updateBinLogInformation(String record, String binLogFile, String b jsonObject.put("transaction_id", null); } - jsonObject.put("file", binLogFile); - jsonObject.put("pos", binLogPosition); - if(gtids != null) { + if(binLogFile != null && !binLogFile.isEmpty()) { + jsonObject.put("file", binLogFile); + } + + if(binLogPosition != null && !binLogPosition.isEmpty()) { + jsonObject.put("pos", binLogPosition); + } + + if(gtids != null && !gtids.isEmpty()) { jsonObject.put("gtids", gtids); } return jsonObject.toJSONString(); } + /** + * ┌─id───────────────────────────────────┬─offset_key────────────────────────────────────┬─offset_val──────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────record_insert_ts─┬─record_insert_seq─┐ + * │ 03750062-c862-48c5-9f37-451c0d33511b │ ["\"engine\"",{"server":"embeddedconnector"}] │ {"transaction_id":null,"lsn_proc":27485360,"messageType":"UPDATE","lsn":27485360,"txId":743,"ts_usec":1687876724804733} │ 2023-06-27 14:38:45 │ 1 │ + * └──────────────────────────────────────┴───────────────────────────────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────┴───────────────────┘ + * + * @param record + * @return + * @throws ParseException + */ + public String updateLsnInformation(String record, long lsn) throws ParseException { + JSONObject jsonObject = new JSONObject(); + if(record != null || !record.isEmpty()) { + jsonObject = (JSONObject) new JSONParser().parse(record); + } + + jsonObject.put(LSN_PROCESSED, lsn); + jsonObject.put(LSN, lsn); + + return jsonObject.toJSONString(); + } + public void updateDebeziumStorageRow(BaseDbWriter writer, String tableName, String offsetKey, String offsetVal, long currentTs) throws SQLException { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java index efd899518..e2076df15 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java @@ -49,13 +49,4 @@ public void testDeleteOffsetStorageRow2() { } } - @Test - public void testUpdateBingLogInformation() throws ParseException { - String record = "{\"transaction_id\":null,\"ts_sec\":1687278006,\"file\":\"mysql-bin.000003\",\"pos\":1156385,\"gtids\":\"30fd82c7-0f86-11ee-9e3b-0242c0a86002:1-2442\",\"row\":1,\"server_id\":266,\"event\":2}"; - - String updatedRecord = new DebeziumOffsetStorage().updateBinLogInformation(record , "mysql-bin.001", "1222", "232232323"); - - assertTrue(updatedRecord != null); - } - } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java index c7afd3392..ab15feb85 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java @@ -1,2 +1,27 @@ -package com.altinity.clickhouse.debezium.embedded.cdc;public class DebeziumChangeEventCaptureTest { -} +package com.altinity.clickhouse.debezium.embedded.cdc; + +import org.json.simple.parser.ParseException; +import static org.junit.Assert.assertTrue; +import org.junit.jupiter.api.Test; + +public class DebeziumChangeEventCaptureTest { + + @Test + public void testUpdateBingLogInformation() throws ParseException { + String record = "{\"transaction_id\":null,\"ts_sec\":1687278006,\"file\":\"mysql-bin.000003\",\"pos\":1156385,\"gtids\":\"30fd82c7-0f86-11ee-9e3b-0242c0a86002:1-2442\",\"row\":1,\"server_id\":266,\"event\":2}"; + + String updatedRecord = new DebeziumOffsetStorage().updateBinLogInformation(record , "mysql-bin.001", "1222", "232232323"); + + assertTrue(updatedRecord.equalsIgnoreCase("{\"transaction_id\":null,\"ts_sec\":1687278006,\"file\":\"mysql-bin.001\",\"pos\":\"1222\",\"gtids\":\"232232323\",\"row\":1,\"server_id\":266,\"event\":2}")); + } + + @Test + public void testUpdateLsn() throws ParseException { + String record = "{\"transaction_id\":null,\"lsn_proc\":27485360,\"messageType\":\"UPDATE\",\"lsn\":27485360,\"txId\":743,\"ts_usec\":1687876724804733}"; + + String updatedRecord = new DebeziumOffsetStorage().updateLsnInformation(record, 1232323L); + + assertTrue(updatedRecord.equalsIgnoreCase("{\"transaction_id\":null,\"lsn_proc\":1232323,\"messageType\":\"UPDATE\",\"lsn\":1232323,\"txId\":743,\"ts_usec\":1687876724804733}")); + } + +} \ No newline at end of file diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/ClickHouseDebeziumEmbeddedDDLAddColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/ClickHouseDebeziumEmbeddedDDLAddColumnIT.java index 736b262c9..db39e3f76 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/ClickHouseDebeziumEmbeddedDDLAddColumnIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/ClickHouseDebeziumEmbeddedDDLAddColumnIT.java @@ -52,7 +52,7 @@ public void testAddColumn() throws Exception { } }); - Thread.sleep(10000); + Thread.sleep(10000);// Connection conn = connectToMySQL(); // alter table ship_class change column class_name class_name_new int; diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/ClickHouseDebeziumEmbeddedDDLBaseIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/ClickHouseDebeziumEmbeddedDDLBaseIT.java index ee825fe1f..804107ef0 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/ClickHouseDebeziumEmbeddedDDLBaseIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/ClickHouseDebeziumEmbeddedDDLBaseIT.java @@ -2,20 +2,19 @@ import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader; -import com.altinity.clickhouse.debezium.embedded.config.EnvironmentConfigurationService; import org.apache.log4j.BasicConfigurator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.BindMode; import org.testcontainers.containers.ClickHouseContainer; import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.SelinuxContext; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.shaded.com.fasterxml.jackson.databind.annotation.JsonAppend; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; -import java.nio.file.Files; -import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -26,10 +25,20 @@ public class ClickHouseDebeziumEmbeddedDDLBaseIT { protected MySQLContainer mySqlContainer; @Container - public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest") + public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer + ("clickhouse/clickhouse-server:latest") .withInitScript("init_clickhouse_it.sql") .withExposedPorts(8123); - + //.withClasspathResourceMapping("users.xml", "/etc/clickhouse-server/users.xml", BindMode.READ_WRITE, + // SelinuxContext.SHARED) ; + //.withEnv("CLICKHOUSE_USER", "default") + //.withEnv("CLICKHOUSE_PASSWORD", "root") + //.withEnv("CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT", "0 "); +// .withFileSystemBind("src/test/resources/users.xml", +// "/etc/clickhouse-server/users.xml", BindMode.READ_WRITE); + + // .withCopyFileToContainer(MountableFile.forClasspathResource("users.xml"), + // "/etc/clickhouse-server/users.xml"); @BeforeEach public void startContainers() throws InterruptedException { diff --git a/sink-connector-lightweight/src/test/resources/config.yml b/sink-connector-lightweight/src/test/resources/config.yml index e053e5639..520e11ded 100644 --- a/sink-connector-lightweight/src/test/resources/config.yml +++ b/sink-connector-lightweight/src/test/resources/config.yml @@ -8,7 +8,7 @@ database.include.list: employees #table.include.list=sbtest1 clickhouse.server.url: "clickhouse" clickhouse.server.user: "default" -clickhouse.server.pass: "root" +clickhouse.server.pass: "" clickhouse.server.port: "8123" clickhouse.server.database: "test" database.allowPublicKeyRetrieval: "true" diff --git a/sink-connector-lightweight/src/test/resources/init_clickhouse.sql b/sink-connector-lightweight/src/test/resources/init_clickhouse.sql index 5f27e6b16..7bd13e80a 100644 --- a/sink-connector-lightweight/src/test/resources/init_clickhouse.sql +++ b/sink-connector-lightweight/src/test/resources/init_clickhouse.sql @@ -49,6 +49,11 @@ INSERT INTO altinity_sink_connector.replica_source_info (id,offset_key,offset_va INSERT INTO altinity_sink_connector.replica_source_info (id,offset_key,offset_val,record_insert_ts,record_insert_seq) VALUES ('f15e5c94-4338-409b-b3ed-5044fa20e38e','["company-1",{"server":"embeddedconnector"}]','{"transaction_id":null,"ts_sec":1687278006,"file":"mysql-bin.000003","pos":1156385,"gtids":"30fd82c7-0f86-11ee-9e3b-0242c0a86002:1-2442","row":1,"server_id":266,"event":2}','2023-06-20 16:20:07',3); +-- PostgreSQL +--INSERT INTO altinity_sink_connector.replica_source_info +--(id, offset_key, offset_val, record_insert_ts, record_insert_seq) +--VALUES('03750062-c862-48c5-9f37-451c0d33511b', '["\"engine\"",{"server":"embeddedconnector"}]', '{"transaction_id":null,"lsn_proc":27485360,"messageType":"UPDATE","lsn":27485360,"txId":743,"ts_usec":1687876724804733}', 2023-06-27 14:38:45.000, 1); + --CREATE TABLE employees.rmt_test --( -- `id` Int64, diff --git a/sink-connector-lightweight/src/test/resources/init_clickhouse_it.sql b/sink-connector-lightweight/src/test/resources/init_clickhouse_it.sql index 83ca8ac09..bb7bc0d10 100644 --- a/sink-connector-lightweight/src/test/resources/init_clickhouse_it.sql +++ b/sink-connector-lightweight/src/test/resources/init_clickhouse_it.sql @@ -1,3 +1,8 @@ +--CREATE USER 'ch_user' IDENTIFIED WITH plaintext_password BY 'root'; +--SET allow_introspection_functions=1; +--GRANT ALL ON . TO 'ch_user' WITH GRANT OPTION + + CREATE database datatypes; CREATE database employees; CREATE database public; diff --git a/sink-connector-lightweight/src/test/resources/users.xml b/sink-connector-lightweight/src/test/resources/users.xml new file mode 100644 index 000000000..b9896eb70 --- /dev/null +++ b/sink-connector-lightweight/src/test/resources/users.xml @@ -0,0 +1,121 @@ + + + + + + + + + + 1 + random + + + + + 1 + + + + + + + + + root + + + + ::/0 + + + + default + + + default + + + + + + + + + + + + + + 3600 + + + 0 + 0 + 0 + 0 + 0 + + + + diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/model/ClickHouseStruct.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/model/ClickHouseStruct.java index f405ba9e1..fe6af9be8 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/model/ClickHouseStruct.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/model/ClickHouseStruct.java @@ -231,6 +231,12 @@ public void setAdditionalMetaData(Map convertedValue) { } } + public long getReplicationLag() { + if(this.getTs_ms() > 0) { + return System.currentTimeMillis() - this.getTs_ms(); + } + return 0; + } @Override public String toString() { return new StringBuffer()