Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ddl translation rest api #936

Open
wants to merge 5 commits into
base: 2.6.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 138 additions & 87 deletions sink-connector-client/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package main

import (
"bufio"
"encoding/json"
"fmt"
"github.com/levigross/grequests"
"github.com/tidwall/pretty"
cli "github.com/urfave/cli"
"log"
"os"
"time"

"github.com/levigross/grequests"
"github.com/tidwall/pretty"
cli "github.com/urfave/cli"
)

var requestOptions = &grequests.RequestOptions{}
Expand All @@ -28,24 +30,26 @@ type UpdateLsn struct {
}

const (
START_REPLICATION_COMMAND = "start_replica"
STOP_REPLICATION_COMAND = "stop_replica"
STATUS_COMMAND = "show_replica_status"
UPDATE_BINLOG_COMMAND = "change_replication_source"
UPDATE_LSN_COMMAND = "lsn"
DELETE_OFFSETS_COMMAND = "delete_offsets"
START_REPLICATION_COMMAND = "start_replica"
STOP_REPLICATION_COMAND = "stop_replica"
DDL_TRANSLATE_COMMAND = "ddl_translate"
STATUS_COMMAND = "show_replica_status"
UPDATE_BINLOG_COMMAND = "change_replication_source"
UPDATE_LSN_COMMAND = "lsn"
DELETE_OFFSETS_COMMAND = "delete_offsets"
DELETE_SCHEMA_HISTORY_COMMAND = "delete_schema_history"
)

const (
START_REPLICATION = "start"
STOP_REPLICATION = "stop"
RESTART_REPLICATION = "restart"
STATUS = "status"
UPDATE_BINLOG = "binlog"
UPDATE_LSN = "lsn"
DELETE_OFFSETS = "offsets"
START_REPLICATION = "start"
STOP_REPLICATION = "stop"
RESTART_REPLICATION = "restart"
STATUS = "status"
UPDATE_BINLOG = "binlog"
UPDATE_LSN = "lsn"
DELETE_OFFSETS = "offsets"
DELETE_SCHEMA_HISTORY = "schema-history"
DDL_TRANSLATE = "ddl-translate"
)

// Fetches the repos for the given Github users
Expand All @@ -59,14 +63,27 @@ func getHTTPCall(url string) *grequests.Response {
}

func getHTTPDeleteCall(url string) *grequests.Response {
resp, err := grequests.Delete(url, requestOptions)
// you can modify the request by passing an optional RequestOptions struct
if err != nil {
log.Fatalln("Unable to make request: ", err)
}
return resp
resp, err := grequests.Delete(url, requestOptions)
// you can modify the request by passing an optional RequestOptions struct
if err != nil {
log.Fatalln("Unable to make request: ", err)
}
return resp
}

func getHTTPPostCall(url string, body string) *grequests.Response {
// Add body
// create a JSON with key as "ddl" and value as the body
requestOptions.JSON = map[string]string{"ddl": body}
resp, err := grequests.Post(url, requestOptions)
if err != nil {
log.Fatalln("Unable to make request: ", err)
}
return resp
}
/**

/*
*
Function to get server url based on the parameters passed
*/
func getServerUrl(action string, c *cli.Context) string {
Expand Down Expand Up @@ -163,6 +180,14 @@ func main() {
return nil
},
},
{
Name: DDL_TRANSLATE_COMMAND,
Usage: "Translate DDL to target database",
Action: func(c *cli.Context) error {
handleDDLTranslate(c)
return nil
},
},
{
Name: UPDATE_BINLOG_COMMAND,
Usage: "Update binlog file/position and gtids",
Expand Down Expand Up @@ -228,77 +253,102 @@ func main() {
return nil
},
},
{
Name: DELETE_OFFSETS_COMMAND,
Usage: "Delete offsets from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteOffsets(c)
return nil
},
},
{
Name: DELETE_SCHEMA_HISTORY_COMMAND,
Usage: "Delete schema history from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteSchemaHistory(c)
return nil
},
},
}
{
Name: DELETE_OFFSETS_COMMAND,
Usage: "Delete offsets from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteOffsets(c)
return nil
},
},
{
Name: DELETE_SCHEMA_HISTORY_COMMAND,
Usage: "Delete schema history from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteSchemaHistory(c)
return nil
},
},
}
app.Version = "1.0"
app.Run(os.Args)
}

func handleDeleteOffsets(c *cli.Context) bool {
log.Println("***** Delete offsets from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_OFFSETS, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Offsets deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting offsets")
return false
}
log.Println("***** Delete offsets from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_OFFSETS, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Offsets deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting offsets")
return false
}
}

func handleDDLTranslate(c *cli.Context) bool {
log.Println("***** Translate MySQL DDL to ClickHouse DDL *****")
log.Println("Enter DDL to translate: ")
var userInput string
scanner := bufio.NewScanner(os.Stdin)

if scanner.Scan() {
userInput = scanner.Text()
}
// Call a REST POST API to translate DDL to target database
var ddlTranslateUrl = getServerUrl(DDL_TRANSLATE, c)
log.Println("Sending request to URL: with DDL: " + userInput + " to " + ddlTranslateUrl)
resp := getHTTPPostCall(ddlTranslateUrl, userInput)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println(resp.String())
//log.Println("DDL translated successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error translating DDL")
return false
}
}

func handleDeleteSchemaHistory(c *cli.Context) bool {
log.Println("***** Delete schema history from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_SCHEMA_HISTORY, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Schema history deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting schema history")
return false
}
log.Println("***** Delete schema history from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_SCHEMA_HISTORY, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Schema history deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting schema history")
return false
}
}

func handleUpdateLsn(c *cli.Context) bool {
Expand Down Expand Up @@ -343,7 +393,8 @@ func handleUpdateLsn(c *cli.Context) bool {
return true
}

/**
/*
*
Function to handle update binlog action
which is used to set binlog file/position and gtids
*/
Expand Down
Binary file modified sink-connector-client/sink-connector-client
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.google.inject.Injector;
import io.javalin.Javalin;
Expand All @@ -12,7 +13,9 @@
import org.apache.logging.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.Assert;

import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

Expand All @@ -28,6 +31,9 @@ public static void startRestApi(Properties props, Injector injector,
DebeziumChangeEventCapture debeziumChangeEventCapture,
Properties userProperties) {
String cliPort = props.getProperty(SinkConnectorLightWeightConfig.CLI_PORT);
MySQLDDLParserService sqlddlParserService = new MySQLDDLParserService();
sqlddlParserService = new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees");
if(cliPort == null || cliPort.isEmpty()) {
cliPort = "7000";
}
Expand Down Expand Up @@ -163,6 +169,18 @@ public static void startRestApi(Properties props, Injector injector,
ctx.result("Started Replication....");

});

MySQLDDLParserService finalSqlddlParserService = sqlddlParserService;
app.post("/ddl-translate", ctx -> {
String ddl = ctx.body();
log.info(String.format("Received DDL for translation %s", ddl));
// get the ddl value from JSON.
JSONObject jsonObject = (JSONObject) new JSONParser().parse(ddl);
String ddlValue = (String) jsonObject.get("ddl");
StringBuffer clickHouseQuery = new StringBuffer();
finalSqlddlParserService.parseSql(ddlValue, "employees", clickHouseQuery);
ctx.result(clickHouseQuery.toString());
});
}
// Stop the javalin server
public static void stop() {
Expand Down
Loading