Skip to content

Commit

Permalink
Merge pull request #275 from Altinity/add_postgres_lsn_support
Browse files Browse the repository at this point in the history
Added logic to set lsn through REST API.
  • Loading branch information
subkanthi authored Jul 20, 2023
2 parents 3848ec0 + eddee8c commit 497847c
Show file tree
Hide file tree
Showing 28 changed files with 690 additions and 98 deletions.
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM openjdk:11
COPY sink-connector-client/sink-connector-client /sink-connector-client
COPY sink-connector-lightweight/target/clickhouse-debezium-embedded-1.0-SNAPSHOT.jar /app.jar
ENV JAVA_OPTS="-Dlog4jDebug=true"
ENTRYPOINT ["java", "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005", "-jar","/app.jar", "/config.yml", "com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"]
16 changes: 16 additions & 0 deletions build_docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

cd sink-connector-lightweight
mvn clean install -DskipTests=true
today_date=$(date +%F)

cd ..
cd sink-connector-client
CGO_ENABLED=0 go build

cd ..

docker login registry.gitlab.com
docker build -f sink-connector-lightweight/Dockerfile -t clickhouse_debezium_embedded:${today_date} . --no-cache
docker tag clickhouse_debezium_embedded:${today_date} registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${today_date}
docker push registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${today_date}
11 changes: 11 additions & 0 deletions doc/postgres_wal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Using the `pg_waldump` utility to dump the WAL log information. `pg_waldump` utility needs to be provided the postgresql data directory path.
```
pg_waldump pg_wal/000000010000000000000001
```

```
┌─id───────────────────────────────────┬─offset_key────────────────────────────────────┬─offset_val──────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────record_insert_ts─┬─record_insert_seq─┐
│ 03750062-c862-48c5-9f37-451c0d33511b │ ["\"engine\"",{"server":"embeddedconnector"}] │ {"transaction_id":null,"lsn_proc":27485360,"messageType":"UPDATE","lsn":27485360,"txId":743,"ts_usec":1687876724804733} │ 2023-06-27 14:38:45 │ 1 │
└──────────────────────────────────────┴───────────────────────────────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────┴───────────────────┘
```
5 changes: 5 additions & 0 deletions sink-connector-client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ go 1.18

require (
github.com/levigross/grequests v0.0.0-20221222020224-9eee758d18d5
github.com/stretchr/testify v1.8.2
github.com/tidwall/pretty v1.2.1
github.com/urfave/cli v1.22.13
)

require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
3 changes: 3 additions & 0 deletions sink-connector-client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/urfave/cli v1.22.13 h1:wsLILXG8qCJNse/qAgLNf23737Cx05GflHg/PJGe1Ok=
github.com/urfave/cli v1.22.13/go.mod h1:VufqObjsMTF2BBwKawpx9R8eAneNEWhoO0yx8Vd+FkE=
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 h1:Y/KGZSOdz/2r0WJ9Mkmz6NJBusp0kiNx1Cn82lzJQ6w=
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
166 changes: 145 additions & 21 deletions sink-connector-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/levigross/grequests"
"github.com/tidwall/pretty"
cli "github.com/urfave/cli"
"log"
"os"
Expand All @@ -13,22 +14,32 @@ import (
var requestOptions = &grequests.RequestOptions{}

type UpdateBinLog struct {
File string `json:"binlog_file"`
Position string `json:"binlog_position"`
Gtid string `json:"gtid"`
File string `json:"binlog_file"`
Position string `json:"binlog_position"`
Gtid string `json:"gtid"`
SourceHost string `json:"source_host"`
SourcePort string `json:"source_port"`
SourceUser string `json:"source_user"`
SourcePassword string `json:"source_password"`
}

type UpdateLsn struct {
Lsn string `json:"lsn"`
}

const (
START_REPLICATION_COMMAND = "start_replica"
STOP_REPLICATION_COMAND = "stop_replica"
STATUS_COMMAND = "show_replica_status"
UPDATE_BINLOG_COMMAND = "update_binlog"
UPDATE_BINLOG_COMMAND = "change_replication_source"
UPDATE_LSN_COMMAND = "lsn"
)
const (
START_REPLICATION = "start"
STOP_REPLICATION = "stop"
STATUS = "status"
UPDATE_BINLOG = "binlog"
UPDATE_LSN = "lsn"
)

// Fetches the repos for the given Github users
Expand Down Expand Up @@ -118,7 +129,9 @@ func main() {
Action: func(c *cli.Context) error {
var serverUrl = getServerUrl(STATUS, c)
resp := getHTTPCall(serverUrl)
log.Println(resp.String())

//var j, _ = json.MarshalIndent(resp, "", " ")
fmt.Println(string(pretty.Pretty([]byte(resp.String()))))
return nil
},
},
Expand All @@ -129,30 +142,112 @@ func main() {
cli.StringFlag{
Name: "binlog_file",
Usage: "Set binlog file",
Required: true,
Required: false,
},
cli.StringFlag{
Name: "binlog_position",
Usage: "Set binlog position",
Required: true,
Required: false,
},
cli.StringFlag{
Name: "gtid",
Usage: "Set GTID",
Required: true,
Required: false,
},

cli.StringFlag{
Name: "source_host",
Usage: "Source Hostname",
Required: false,
},

cli.StringFlag{
Name: "source_port",
Usage: "Source Port",
Required: false,
},

cli.StringFlag{
Name: "source_username",
Usage: "Source Username",
Required: false,
},

cli.StringFlag{
Name: "source_password",
Usage: "Source Password",
Required: false,
},
},
Action: func(c *cli.Context) error {

handleUpdateBinLogAction(c)
return nil
},
},
{
Name: UPDATE_LSN_COMMAND,
Usage: "Update lsn(For postgreSQL)",
Flags: []cli.Flag{
cli.StringFlag{
Name: "lsn",
Usage: "Set LSN position(For PostgreSQL)",
Required: true,
},
},
Action: func(c *cli.Context) error {
handleUpdateLsn(c)
return nil
},
},
}

app.Version = "1.0"
app.Run(os.Args)
}

func handleUpdateLsn(c *cli.Context) bool {
var lsnPosition = c.String("lsn")
log.Println("***** lsn position:", lsnPosition+" *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}

// Step1: Stop replication
log.Println("Stopping replication...")
var stopUrl = getServerUrl(STOP_REPLICATION, c)
resp := getHTTPCall(stopUrl)
time.Sleep(5 * time.Second)

// Step2: Update binlog position
log.Println("Updating lsn position..")
var updateLsnBody = UpdateLsn{Lsn: lsnPosition}
var postBody, _ = json.Marshal(updateLsnBody)
var requestOptions_copy = requestOptions
// Add data to JSON field
requestOptions_copy.JSON = string(postBody)
var serverUrl = getServerUrl(UPDATE_LSN, c)
resp, err := grequests.Post(serverUrl, requestOptions_copy)
log.Println(resp.String())
if err != nil {
log.Println(err)
log.Println("Create request failed for Github API")
}

// Step3: Start replication
time.Sleep(10 * time.Second)
var startUrl = getServerUrl(START_REPLICATION, c)
resp1 := getHTTPCall(startUrl)
log.Println(resp1.String())
return true
}

/**
Function to handle update binlog action
which is used to set binlog file/position and gtids
Expand All @@ -161,10 +256,32 @@ func handleUpdateBinLogAction(c *cli.Context) bool {
var binlogFile = c.String("binlog_file")
var binlogPos = c.String("binlog_position")
var gtid = c.String("gtid")
var sourceHost = c.String("source_host")
var sourcePort = c.String("source_port")
var sourceUsername = c.String("source_username")
var sourcePassword = c.String("source_password")

if gtid == "" {
// If gtid is empty, then a valid binlog file and position
// needs to be passed.
//if binlogPos == "" || binlogFile == "" {
// log.Println(" ****** A Valid binlog position/file or GTID set is required")
// cli.ShowCommandHelp(c, UPDATE_BINLOG_COMMAND)
// return false
//} else if sourceHost == "" || sourcePort == "" || sourceUsername == "" || sourcePassword == "" {
// log.Println(" ****** A Valid source host/port/username/password is required")
// cli.ShowCommandHelp(c, UPDATE_BINLOG_COMMAND)
// return false
//}
}
log.Println("***** binlog file: ", binlogFile+" *****")
log.Println("***** binlog position:", binlogPos+" *****")
log.Println("***** GTID:", gtid+" *****")
log.Println("***** Source Host:", sourceHost+" *****")
log.Println("***** Source Port:", sourcePort+" *****")
log.Println("***** Source Username:", sourceUsername+" *****")
log.Println("***** Source Password:", sourcePassword+" *****")

log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
Expand All @@ -175,31 +292,38 @@ func handleUpdateBinLogAction(c *cli.Context) bool {
log.Println("Continuing...")
}

// Step1: Stop replication
log.Println("Stopping replication...")
var stopUrl = getServerUrl(STOP_REPLICATION, c)
resp := getHTTPCall(stopUrl)
time.Sleep(5 * time.Second)
//// Step1: Stop replication
//log.Println("Stopping replication...")
//var stopUrl = getServerUrl(STOP_REPLICATION, c)
//resp := getHTTPCall(stopUrl)
//time.Sleep(5 * time.Second)

// Step2: Update binlog position
log.Println("Updating binlog file/position and gtids...")
var updateBinLogBody = UpdateBinLog{File: binlogFile, Position: binlogPos, Gtid: gtid}
var updateBinLogBody = UpdateBinLog{File: binlogFile, Position: binlogPos, Gtid: gtid, SourceHost: sourceHost, SourcePort: sourcePort, SourceUser: sourceUsername, SourcePassword: sourcePassword}
var postBody, _ = json.Marshal(updateBinLogBody)
var requestOptions_copy = requestOptions
// Add data to JSON field
requestOptions_copy.JSON = string(postBody)
var serverUrl = getServerUrl(UPDATE_BINLOG, c)
resp, err := grequests.Post(serverUrl, requestOptions_copy)
if resp.StatusCode == 400 {
log.Println("***** Error: Replication is running, please stop it first ******")
log.Println("***** Use stop_replica command to stop replication ******")
log.Println("***** After change_replication_source is successful, use start_replica to start replication *******")
return false
}

log.Println(resp.String())
if err != nil {
log.Println(err)
log.Println("Create request failed for Github API")
log.Println("Create request failed")
}

// Step3: Start replication
time.Sleep(10 * time.Second)
var startUrl = getServerUrl(START_REPLICATION, c)
resp1 := getHTTPCall(startUrl)
log.Println(resp1.String())
//
//// Step3: Start replication
//time.Sleep(10 * time.Second)
//var startUrl = getServerUrl(START_REPLICATION, c)
//resp1 := getHTTPCall(startUrl)
//log.Println(resp1.String())
return true
}
34 changes: 34 additions & 0 deletions sink-connector-client/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"github.com/stretchr/testify/assert"
"github.com/urfave/cli"
"testing"
)

func TestGetServerUrl(t *testing.T) {
var c = cli.Context{
App: cli.NewApp(),
Command: cli.Command{},
}
c.App.Flags = []cli.Flag{
cli.StringFlag{
Name: "host",
Usage: "Host server address of sink connector",
Required: false,
},
cli.StringFlag{
Name: "port",
Usage: "Port of sink connector",
Required: false,
},
cli.BoolFlag{
Name: "secure",
Usage: "If true, then use https, else http",
Required: false,
},
}
//var c = cli.NewContext(nil, nil, nil)
var serverUrl = getServerUrl("start", &c)
assert.Equal(t, "http://localhost:7000/start_replica", serverUrl, "they should be equal")
}
Binary file modified sink-connector-client/sink-connector-client
Binary file not shown.
3 changes: 2 additions & 1 deletion sink-connector-lightweight/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM openjdk:11
COPY target/clickhouse-debezium-embedded-1.0-SNAPSHOT.jar /app.jar
COPY sink-connector-client/sink-connector-client /sink-connector-client
COPY sink-connector-lightweight/target/clickhouse-debezium-embedded-1.0-SNAPSHOT.jar /app.jar
ENV JAVA_OPTS="-Dlog4jDebug=true"
ENTRYPOINT ["java", "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005", "-jar","/app.jar", "/config.yml", "com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module version="4">
<component name="AdditionalModuleElements">
<content url="file://$MODULE_DIR$" dumb="true">
<sourceFolder url="file://$MODULE_DIR$/src/test" isTestSource="true" />
</content>
</component>
</module>
1 change: 1 addition & 0 deletions sink-connector-lightweight/docker/config_postgres.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
name: "debezium-embedded-postgres"
database.hostname: "postgres"
database.port: "5432"
database.user: "root"
Expand Down
Loading

0 comments on commit 497847c

Please sign in to comment.