Skip to content

Commit

Permalink
Merge pull request #260 from Altinity/sink-connector-cli
Browse files Browse the repository at this point in the history
Sink connector cli
  • Loading branch information
subkanthi authored Jun 23, 2023
2 parents 70c50ee + a96e724 commit 351c0cf
Show file tree
Hide file tree
Showing 20 changed files with 1,147 additions and 12 deletions.
Binary file added doc/img/sink_connector_cli.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 22 additions & 0 deletions doc/sink_connector_cli.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
### Sink Connector Lightweight (CLI)

![arch](img/sink_connector_cli.drawio.png)

The CLI application will translate the CLI commands to REST payload messages.

Option in sink connector lightweight to not start automatically unless
the user specifies the **start_replica** flag.(skip-replica-start) in the yaml file.
This will give users option to set the binlog status/position, gtid
**change_replication_source** before starting the replication.

## Operations

1. **Startup**, the CLI application will send a REST API call to the server, if it gets a 200, it will continue or throw an error.
2. **Start_replica**, CLI application will send a REST API call to the server to start replication(start Debezium event loop)
3. **Stop_replica**, CLI application will send a REST API call to the server to stop replication(stop Debezium event loop)
4. **change_replication_source**, CLI application will send the gtid, binlog file, and binlog position to the server to change the replication source
Server will update the table with this information will restart the debezium event loop.
5. **show replica status** Return the information from the **replica_status** table.

## API

15 changes: 15 additions & 0 deletions sink-connector-client/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module sink-connector-client

go 1.18

require (
github.com/levigross/grequests v0.0.0-20221222020224-9eee758d18d5
github.com/urfave/cli v1.22.13
)

require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 // indirect
)
30 changes: 30 additions & 0 deletions sink-connector-client/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/levigross/grequests v0.0.0-20221222020224-9eee758d18d5 h1:AsF9Q1mQoyLv0HzvHFW7O+19dHilOcKU74k7E5ufI1A=
github.com/levigross/grequests v0.0.0-20221222020224-9eee758d18d5/go.mod h1:XfzeIE2WC7CGDhlZJY/rUdqUPy0IPcyI6hoIjhAMNbQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/urfave/cli v1.22.13 h1:wsLILXG8qCJNse/qAgLNf23737Cx05GflHg/PJGe1Ok=
github.com/urfave/cli v1.22.13/go.mod h1:VufqObjsMTF2BBwKawpx9R8eAneNEWhoO0yx8Vd+FkE=
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 h1:Y/KGZSOdz/2r0WJ9Mkmz6NJBusp0kiNx1Cn82lzJQ6w=
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
205 changes: 205 additions & 0 deletions sink-connector-client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package main

import (
"encoding/json"
"fmt"
"github.com/levigross/grequests"
cli "github.com/urfave/cli"
"log"
"os"
"time"
)

var requestOptions = &grequests.RequestOptions{}

type UpdateBinLog struct {
File string `json:"binlog_file"`
Position string `json:"binlog_position"`
Gtid string `json:"gtid"`
}

const (
START_REPLICATION_COMMAND = "start_replica"
STOP_REPLICATION_COMAND = "stop_replica"
STATUS_COMMAND = "show_replica_status"
UPDATE_BINLOG_COMMAND = "update_binlog"
)
const (
START_REPLICATION = "start"
STOP_REPLICATION = "stop"
STATUS = "status"
UPDATE_BINLOG = "binlog"
)

// Fetches the repos for the given Github users
func getHTTPCall(url string) *grequests.Response {
resp, err := grequests.Get(url, requestOptions)
// you can modify the request by passing an optional RequestOptions struct
if err != nil {
log.Fatalln("Unable to make request: ", err)
}
return resp
}

/**
Function to get server url based on the parameters passed
*/
func getServerUrl(action string, c *cli.Context) string {

var scheme = "http://"
var hostname = "localhost"
var port = "7000"
if c.GlobalString("host") != "" {
hostname = c.GlobalString("host")
}
if c.GlobalString("port") != "" {
port = c.GlobalString("port")
}
if c.GlobalBool("secure") {
scheme = "https://"
}

var serverUrl = scheme + hostname + ":" + port + "/" + action

return serverUrl
}

func main() {
app := cli.NewApp()
app.Name = "Sink Connector Lightweight CLI"
app.Usage = "CLI for Sink Connector Lightweight, operations to get status, start/stop replication and set binlog/gtid position"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "host",
Usage: "Host server address of sink connector",
Required: false,
},
cli.StringFlag{
Name: "port",
Usage: "Port of sink connector",
Required: false,
},
cli.BoolFlag{
Name: "secure",
Usage: "If true, then use https, else http",
Required: false,
},
}

// define command for our client
app.Commands = []cli.Command{

{
Name: START_REPLICATION_COMMAND,
Usage: "Start the replication",
Action: func(c *cli.Context) error {
var serverUrl = getServerUrl(START_REPLICATION, c)
resp := getHTTPCall(serverUrl)
log.Println(resp)
return nil
},
},
{
Name: STOP_REPLICATION_COMAND,
Usage: "Stop the replication",
Action: func(c *cli.Context) error {
log.Println("***** Stopping replication..... *****")
var serverUrl = getServerUrl(STOP_REPLICATION, c)
resp := getHTTPCall(serverUrl)
log.Println(resp.String())
log.Println("***** Replication stopped successfully *****")
return nil
},
},
{
Name: STATUS_COMMAND,
//Aliases: []string{"c"},
Usage: "Status of replication",
Action: func(c *cli.Context) error {
var serverUrl = getServerUrl(STATUS, c)
resp := getHTTPCall(serverUrl)
log.Println(resp.String())
return nil
},
},
{
Name: UPDATE_BINLOG_COMMAND,
Usage: "Update binlog file/position and gtids",
Flags: []cli.Flag{
cli.StringFlag{
Name: "binlog_file",
Usage: "Set binlog file",
Required: true,
},
cli.StringFlag{
Name: "binlog_position",
Usage: "Set binlog position",
Required: true,
},
cli.StringFlag{
Name: "gtid",
Usage: "Set GTID",
Required: true,
},
},
Action: func(c *cli.Context) error {
handleUpdateBinLogAction(c)
return nil
},
},
}

app.Version = "1.0"
app.Run(os.Args)
}

/**
Function to handle update binlog action
which is used to set binlog file/position and gtids
*/
func handleUpdateBinLogAction(c *cli.Context) bool {
var binlogFile = c.String("binlog_file")
var binlogPos = c.String("binlog_position")
var gtid = c.String("gtid")

log.Println("***** binlog file: ", binlogFile+" *****")
log.Println("***** binlog position:", binlogPos+" *****")
log.Println("***** GTID:", gtid+" *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}

// Step1: Stop replication
log.Println("Stopping replication...")
var stopUrl = getServerUrl(STOP_REPLICATION, c)
resp := getHTTPCall(stopUrl)
time.Sleep(5 * time.Second)

// Step2: Update binlog position
log.Println("Updating binlog file/position and gtids...")
var updateBinLogBody = UpdateBinLog{File: binlogFile, Position: binlogPos, Gtid: gtid}
var postBody, _ = json.Marshal(updateBinLogBody)
var requestOptions_copy = requestOptions
// Add data to JSON field
requestOptions_copy.JSON = string(postBody)
var serverUrl = getServerUrl(UPDATE_BINLOG, c)
resp, err := grequests.Post(serverUrl, requestOptions_copy)
log.Println(resp.String())
if err != nil {
log.Println(err)
log.Println("Create request failed for Github API")
}

// Step3: Start replication
time.Sleep(10 * time.Second)
var startUrl = getServerUrl(START_REPLICATION, c)
resp1 := getHTTPCall(startUrl)
log.Println(resp1.String())
return true
}
Binary file added sink-connector-client/sink-connector-client
Binary file not shown.
1 change: 1 addition & 0 deletions sink-connector-lightweight/build_docker.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/bin/bash

mvn clean install -DskipTests=true
today_date=$(date +%F)

docker login registry.gitlab.com
Expand Down
Loading

0 comments on commit 351c0cf

Please sign in to comment.