diff --git a/sink-connector-client/main.go b/sink-connector-client/main.go index b82c40b6f..2cd7bdc87 100644 --- a/sink-connector-client/main.go +++ b/sink-connector-client/main.go @@ -1,14 +1,16 @@ package main import ( + "bufio" "encoding/json" "fmt" - "github.com/levigross/grequests" - "github.com/tidwall/pretty" - cli "github.com/urfave/cli" "log" "os" "time" + + "github.com/levigross/grequests" + "github.com/tidwall/pretty" + cli "github.com/urfave/cli" ) var requestOptions = &grequests.RequestOptions{} @@ -28,24 +30,26 @@ type UpdateLsn struct { } const ( - START_REPLICATION_COMMAND = "start_replica" - STOP_REPLICATION_COMAND = "stop_replica" - STATUS_COMMAND = "show_replica_status" - UPDATE_BINLOG_COMMAND = "change_replication_source" - UPDATE_LSN_COMMAND = "lsn" - DELETE_OFFSETS_COMMAND = "delete_offsets" + START_REPLICATION_COMMAND = "start_replica" + STOP_REPLICATION_COMAND = "stop_replica" + DDL_TRANSLATE_COMMAND = "ddl_translate" + STATUS_COMMAND = "show_replica_status" + UPDATE_BINLOG_COMMAND = "change_replication_source" + UPDATE_LSN_COMMAND = "lsn" + DELETE_OFFSETS_COMMAND = "delete_offsets" DELETE_SCHEMA_HISTORY_COMMAND = "delete_schema_history" ) const ( - START_REPLICATION = "start" - STOP_REPLICATION = "stop" - RESTART_REPLICATION = "restart" - STATUS = "status" - UPDATE_BINLOG = "binlog" - UPDATE_LSN = "lsn" - DELETE_OFFSETS = "offsets" + START_REPLICATION = "start" + STOP_REPLICATION = "stop" + RESTART_REPLICATION = "restart" + STATUS = "status" + UPDATE_BINLOG = "binlog" + UPDATE_LSN = "lsn" + DELETE_OFFSETS = "offsets" DELETE_SCHEMA_HISTORY = "schema-history" + DDL_TRANSLATE = "ddl-translate" ) // Fetches the repos for the given Github users @@ -59,14 +63,27 @@ func getHTTPCall(url string) *grequests.Response { } func getHTTPDeleteCall(url string) *grequests.Response { - resp, err := grequests.Delete(url, requestOptions) - // you can modify the request by passing an optional RequestOptions struct - if err != nil { - log.Fatalln("Unable to make request: ", err) - } - return resp + resp, err := grequests.Delete(url, requestOptions) + // you can modify the request by passing an optional RequestOptions struct + if err != nil { + log.Fatalln("Unable to make request: ", err) + } + return resp +} + +func getHTTPPostCall(url string, body string) *grequests.Response { + // Add body + // create a JSON with key as "ddl" and value as the body + requestOptions.JSON = map[string]string{"ddl": body} + resp, err := grequests.Post(url, requestOptions) + if err != nil { + log.Fatalln("Unable to make request: ", err) + } + return resp } -/** + +/* +* Function to get server url based on the parameters passed */ func getServerUrl(action string, c *cli.Context) string { @@ -163,6 +180,14 @@ func main() { return nil }, }, + { + Name: DDL_TRANSLATE_COMMAND, + Usage: "Translate DDL to target database", + Action: func(c *cli.Context) error { + handleDDLTranslate(c) + return nil + }, + }, { Name: UPDATE_BINLOG_COMMAND, Usage: "Update binlog file/position and gtids", @@ -228,77 +253,102 @@ func main() { return nil }, }, - { - Name: DELETE_OFFSETS_COMMAND, - Usage: "Delete offsets from the sink connector", - Action: func(c *cli.Context) error { - handleDeleteOffsets(c) - return nil - }, - }, - { - Name: DELETE_SCHEMA_HISTORY_COMMAND, - Usage: "Delete schema history from the sink connector", - Action: func(c *cli.Context) error { - handleDeleteSchemaHistory(c) - return nil - }, - }, - } + { + Name: DELETE_OFFSETS_COMMAND, + Usage: "Delete offsets from the sink connector", + Action: func(c *cli.Context) error { + handleDeleteOffsets(c) + return nil + }, + }, + { + Name: DELETE_SCHEMA_HISTORY_COMMAND, + Usage: "Delete schema history from the sink connector", + Action: func(c *cli.Context) error { + handleDeleteSchemaHistory(c) + return nil + }, + }, + } app.Version = "1.0" app.Run(os.Args) } func handleDeleteOffsets(c *cli.Context) bool { - log.Println("***** Delete offsets from the sink connector *****") - log.Println("Are you sure you want to continue? (y/n): ") - var userInput string - fmt.Scanln(&userInput) - if userInput != "y" { - log.Println("Exiting...") - return false - } else { - log.Println("Continuing...") - } - // Call a REST DELETE API to delete offsets from the sink connector - var deleteOffsetsUrl = getServerUrl(DELETE_OFFSETS, c) - log.Println("Sending request to URL: " + deleteOffsetsUrl) - resp := getHTTPDeleteCall(deleteOffsetsUrl) - time.Sleep(5 * time.Second) - if resp.StatusCode == 200 { - log.Println("Offsets deleted successfully") - return true - } else { - log.Println("Response Status Code:", resp.StatusCode) - log.Println("Error deleting offsets") - return false - } + log.Println("***** Delete offsets from the sink connector *****") + log.Println("Are you sure you want to continue? (y/n): ") + var userInput string + fmt.Scanln(&userInput) + if userInput != "y" { + log.Println("Exiting...") + return false + } else { + log.Println("Continuing...") + } + // Call a REST DELETE API to delete offsets from the sink connector + var deleteOffsetsUrl = getServerUrl(DELETE_OFFSETS, c) + log.Println("Sending request to URL: " + deleteOffsetsUrl) + resp := getHTTPDeleteCall(deleteOffsetsUrl) + time.Sleep(5 * time.Second) + if resp.StatusCode == 200 { + log.Println("Offsets deleted successfully") + return true + } else { + log.Println("Response Status Code:", resp.StatusCode) + log.Println("Error deleting offsets") + return false + } +} + +func handleDDLTranslate(c *cli.Context) bool { + log.Println("***** Translate MySQL DDL to ClickHouse DDL *****") + log.Println("Enter DDL to translate: ") + var userInput string + scanner := bufio.NewScanner(os.Stdin) + + if scanner.Scan() { + userInput = scanner.Text() + } + // Call a REST POST API to translate DDL to target database + var ddlTranslateUrl = getServerUrl(DDL_TRANSLATE, c) + log.Println("Sending request to URL: with DDL: " + userInput + " to " + ddlTranslateUrl) + resp := getHTTPPostCall(ddlTranslateUrl, userInput) + time.Sleep(5 * time.Second) + if resp.StatusCode == 200 { + log.Println(resp.String()) + //log.Println("DDL translated successfully") + return true + } else { + log.Println("Response Status Code:", resp.StatusCode) + log.Println("Error translating DDL") + return false + } } func handleDeleteSchemaHistory(c *cli.Context) bool { - log.Println("***** Delete schema history from the sink connector *****") - log.Println("Are you sure you want to continue? (y/n): ") - var userInput string - fmt.Scanln(&userInput) - if userInput != "y" { - log.Println("Exiting...") - return false - } else { - log.Println("Continuing...") - } - // Call a REST DELETE API to delete offsets from the sink connector - var deleteOffsetsUrl = getServerUrl(DELETE_SCHEMA_HISTORY, c) - log.Println("Sending request to URL: " + deleteOffsetsUrl) - resp := getHTTPDeleteCall(deleteOffsetsUrl) - time.Sleep(5 * time.Second) - if resp.StatusCode == 200 { - log.Println("Schema history deleted successfully") - return true - } else { - log.Println("Response Status Code:", resp.StatusCode) - log.Println("Error deleting schema history") - return false - } + log.Println("***** Delete schema history from the sink connector *****") + log.Println("Are you sure you want to continue? (y/n): ") + var userInput string + fmt.Scanln(&userInput) + if userInput != "y" { + log.Println("Exiting...") + return false + } else { + log.Println("Continuing...") + } + // Call a REST DELETE API to delete offsets from the sink connector + var deleteOffsetsUrl = getServerUrl(DELETE_SCHEMA_HISTORY, c) + log.Println("Sending request to URL: " + deleteOffsetsUrl) + resp := getHTTPDeleteCall(deleteOffsetsUrl) + time.Sleep(5 * time.Second) + if resp.StatusCode == 200 { + log.Println("Schema history deleted successfully") + return true + } else { + log.Println("Response Status Code:", resp.StatusCode) + log.Println("Error deleting schema history") + return false + } } func handleUpdateLsn(c *cli.Context) bool { @@ -343,7 +393,8 @@ func handleUpdateLsn(c *cli.Context) bool { return true } -/** +/* +* Function to handle update binlog action which is used to set binlog file/position and gtids */ diff --git a/sink-connector-client/sink-connector-client b/sink-connector-client/sink-connector-client index 12a0abf1e..a575eaebe 100755 Binary files a/sink-connector-client/sink-connector-client and b/sink-connector-client/sink-connector-client differ diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java index b2772f63b..bc8359471 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java @@ -4,6 +4,7 @@ import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; +import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.google.inject.Injector; import io.javalin.Javalin; @@ -12,7 +13,9 @@ import org.apache.logging.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; +import org.junit.Assert; +import java.util.HashMap; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -28,6 +31,9 @@ public static void startRestApi(Properties props, Injector injector, DebeziumChangeEventCapture debeziumChangeEventCapture, Properties userProperties) { String cliPort = props.getProperty(SinkConnectorLightWeightConfig.CLI_PORT); + MySQLDDLParserService sqlddlParserService = new MySQLDDLParserService(); + sqlddlParserService = new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), + "employees"); if(cliPort == null || cliPort.isEmpty()) { cliPort = "7000"; } @@ -163,6 +169,18 @@ public static void startRestApi(Properties props, Injector injector, ctx.result("Started Replication...."); }); + + MySQLDDLParserService finalSqlddlParserService = sqlddlParserService; + app.post("/ddl-translate", ctx -> { + String ddl = ctx.body(); + log.info(String.format("Received DDL for translation %s", ddl)); + // get the ddl value from JSON. + JSONObject jsonObject = (JSONObject) new JSONParser().parse(ddl); + String ddlValue = (String) jsonObject.get("ddl"); + StringBuffer clickHouseQuery = new StringBuffer(); + finalSqlddlParserService.parseSql(ddlValue, "employees", clickHouseQuery); + ctx.result(clickHouseQuery.toString()); + }); } // Stop the javalin server public static void stop() {