From cf59cae006f16dc28eb0eab76dd0bb5205de1ed2 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 16 May 2024 19:46:01 -0400 Subject: [PATCH 1/2] Added logic to create history tables --- .../ClickHouseSinkConnectorConfig.java | 10 ++ ...lickHouseSinkConnectorConfigVariables.java | 2 + .../operations/ClickHouseAutoCreateTable.java | 100 ++++++++++++++---- .../ClickHouseAutoCreateTableTest.java | 67 ++++++++++-- 4 files changed, 146 insertions(+), 33 deletions(-) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java index ec64946c9..f9e7f8b99 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java @@ -284,6 +284,16 @@ static ConfigDef newConfigDef() { 1, ConfigDef.Width.NONE, ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString()) + .define( + ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_HISTORY_TABLES.toString(), + Type.BOOLEAN, + false, + Importance.HIGH, + "If enabled, history tables, table names suffixed with _history are created in ClickHouse", + CONFIG_GROUP_CONNECTOR_CONFIG, + 1, + ConfigDef.Width.NONE, + ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_HISTORY_TABLES.toString()) .define( ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), Type.BOOLEAN, diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java index bdd6224f4..f27f4e1a2 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java @@ -53,6 +53,8 @@ public enum ClickHouseSinkConnectorConfigVariables { //Config variable for auto creating tables if they dont exist. AUTO_CREATE_TABLES("auto.create.tables"), + AUTO_CREATE_HISTORY_TABLES("auto.create.history.tables"), + // Config variable for auto creating ReplicatedReplacingMergeTree AUTO_CREATE_TABLES_REPLICATED("auto.create.tables.replicated"), diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 41d3f63a6..7229860c0 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -24,6 +24,17 @@ public class ClickHouseAutoCreateTable extends ClickHouseTableOperationsBase{ private static final Logger log = LogManager.getLogger(ClickHouseAutoCreateTable.class.getName()); + /** + * Function to create a new table in ClickHouse. + * @param primaryKey + * @param tableName + * @param databaseName + * @param fields + * @param connection + * @param isNewReplacingMergeTree + * @param useReplicatedReplacingMergeTree + * @throws SQLException + */ public void createNewTable(ArrayList primaryKey, String tableName, String databaseName, Field[] fields, ClickHouseConnection connection, boolean isNewReplacingMergeTree, boolean useReplicatedReplacingMergeTree) throws SQLException { @@ -35,6 +46,54 @@ public void createNewTable(ArrayList primaryKey, String tableName, Strin this.runQuery(createTableQuery, connection); } + /** + * Function to create history table, table suffixed with _history. + * @param primaryKey + * @param tableName + * @param databaseName + * @param fields + * @param columnToDataTypesMap + * @param useReplicatedReplacingMergeTree + * @return + */ + public String createHistoryTableSyntax(ArrayList primaryKey, String tableName, String databaseName, Field[] fields, + Map columnToDataTypesMap, + boolean useReplicatedReplacingMergeTree) { + + StringBuilder createTableSyntax = new StringBuilder(); + + createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName) + .append("_history").append("`").append("("); + + for(Field f: fields) { + appendFieldToCreateTableSyntax(createTableSyntax, f, columnToDataTypesMap); + } + + createTableSyntax.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE); + createTableSyntax.append(")"); + createTableSyntax.append(" "); + + if(useReplicatedReplacingMergeTree == true) + createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN)); + else + createTableSyntax.append("ENGINE = ReplacingMergeTree(").append(VERSION_COLUMN).append(")"); + + createTableSyntax.append(" "); + + if(primaryKey != null && isPrimaryKeyColumnPresent(primaryKey, columnToDataTypesMap)) { + createTableSyntax.append(PRIMARY_KEY).append("("); + createTableSyntax.append(primaryKey.stream().map(Object::toString).collect(Collectors.joining(","))); + createTableSyntax.append(") "); + + createTableSyntax.append(ORDER_BY).append("("); + createTableSyntax.append(primaryKey.stream().map(Object::toString).collect(Collectors.joining(","))); + createTableSyntax.append(")"); + } else { + // ToDO: + createTableSyntax.append(ORDER_BY_TUPLE); + } + return createTableSyntax.toString(); + } /** * Function to generate CREATE TABLE for ClickHouse. * @@ -52,28 +111,7 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`").append("("); for(Field f: fields) { - String colName = f.name(); - String dataType = columnToDataTypesMap.get(colName); - boolean isNull = false; - if(f.schema().isOptional() == true) { - isNull = true; - } - createTableSyntax.append("`").append(colName).append("`").append(" ").append(dataType); - - // Ignore setting NULL OR not NULL for JSON and Array - if(dataType != null && - (dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name()) || - dataType.contains(ClickHouseDataType.Array.name()))) { - // ignore adding nulls; - } else { - if (isNull) { - createTableSyntax.append(" ").append(NULL); - } else { - createTableSyntax.append(" ").append(NOT_NULL); - } - } - createTableSyntax.append(","); - + appendFieldToCreateTableSyntax(createTableSyntax, f, columnToDataTypesMap); } String isDeletedColumn = IS_DELETED_COLUMN; @@ -117,6 +155,24 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t return createTableSyntax.toString(); } + private void appendFieldToCreateTableSyntax(StringBuilder createTableSyntax, Field f, Map columnToDataTypesMap) { + String colName = f.name(); + String dataType = columnToDataTypesMap.get(colName); + boolean isNull = f.schema().isOptional(); + + createTableSyntax.append("`").append(colName).append("` ").append(dataType); + + // Ignore setting NULL OR not NULL for JSON and Array + if(dataType != null && + !(dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name()) || + dataType.contains(ClickHouseDataType.Array.name()))) { + createTableSyntax.append(isNull ? " NULL" : " NOT NULL"); + } + + createTableSyntax.append(","); + } + + @VisibleForTesting boolean isPrimaryKeyColumnPresent(ArrayList primaryKeys, Map columnToDataTypesMap) { diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java index 060741219..8daf9f370 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java @@ -34,15 +34,12 @@ public class ClickHouseAutoCreateTableTest { @Container private ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest") .withInitScript("./init_clickhouse.sql"); + @BeforeAll static void initialize() { - columnToDataTypesMap = getExpectedColumnToDataTypesMap(); + columnToDataTypesMap = getExpectedColumnToDataTypesMap(); -// this.columnToDataTypesMap.put("customer_id", "Int32"); -// this.columnToDataTypesMap.put("address", "String"); -// this.columnToDataTypesMap.put("first_name", "String"); -// this.columnToDataTypesMap.put("amount", "Int32"); String hostName = "localhost"; Integer port = 8123; @@ -51,7 +48,7 @@ static void initialize() { String password = "root"; String tableName = "auto_create_table"; - ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>()); + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(new HashMap<>()); String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database); @@ -74,8 +71,8 @@ protected Field[] createFields() { name(Decimal.LOGICAL_NAME).build())); Schema decimalSchema = SchemaBuilder.type(Schema.BYTES_SCHEMA.type()).parameter("scale", "10") - .parameter("connect.decimal.precision", "30") - .name(Decimal.LOGICAL_NAME).build(); + .parameter("connect.decimal.precision", "30") + .name(Decimal.LOGICAL_NAME).build(); fields.add(new Field("blob_storage_scale", 7, decimalSchema)); fields.add(new Field("json_output", 8, Json.schema())); @@ -113,7 +110,7 @@ public void getColumnNameToCHDataTypeMappingTest() { Map expectedColNameToDataTypeMap = getExpectedColumnToDataTypesMap(); - // Assert.assertTrue(colNameToDataTypeMap.equals(expectedColNameToDataTypeMap)); + // Assert.assertTrue(colNameToDataTypeMap.equals(expectedColNameToDataTypeMap)); Assert.assertFalse(colNameToDataTypeMap.isEmpty()); } @@ -142,6 +139,7 @@ public void testCreateTableEmptyPrimaryKey() { String expectedQuery = "CREATE TABLE employees.`auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()"; Assert.assertTrue(query.equalsIgnoreCase(expectedQuery)); } + @Test public void testCreateTableMultiplePrimaryKeys() { ArrayList primaryKeys = new ArrayList<>(); @@ -182,13 +180,13 @@ public void testCreateNewTable() { try { act.createNewTable(primaryKeys, "auto_create_table", "default", this.createFields(), writer.getConnection(), false, false); - } catch(SQLException se) { + } catch (SQLException se) { Assert.assertTrue(false); } } @Test - public void testIsPrimaryKeyColumnPresent() { + public void testIsPrimaryKeyColumnPresent() { ArrayList primaryKeys = new ArrayList<>(); primaryKeys.add("customerName"); primaryKeys.add("id"); @@ -210,4 +208,51 @@ public void testIsPrimaryKeyColumnPresent() { Assert.assertFalse(act.isPrimaryKeyColumnPresent(primaryKeys2, columnToDataTypesMap)); } + + @Test + public void createHistoryTableSyntax_withReplicatedReplacingMergeTreeEngine() { + ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); + ArrayList primaryKeys = new ArrayList<>(); + primaryKeys.add("customerName"); + Field[] fields = new Field[1]; + fields[0] = new Field("customerName", 0, Schema.STRING_SCHEMA); + Map columnToDataTypesMap = new HashMap<>(); + columnToDataTypesMap.put("customerName", "String"); + + String result = act.createHistoryTableSyntax(primaryKeys, "testTable", "testDB", fields, columnToDataTypesMap, true); + + String expected = "CREATE TABLE testDB.`testTable_history`(`customerName` String NOT NULL,`_version` UInt64) Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/testTable', '{replica}', _version) PRIMARY KEY(customerName) ORDER BY(customerName)"; + Assert.assertTrue(expected.equalsIgnoreCase(result)); + } + + @Test + public void createHistoryTableSyntax_withReplacingMergeTreeEngine() { + ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); + ArrayList primaryKeys = new ArrayList<>(); + primaryKeys.add("customerName"); + Field[] fields = new Field[1]; + fields[0] = new Field("customerName", 0, Schema.STRING_SCHEMA); + Map columnToDataTypesMap = new HashMap<>(); + columnToDataTypesMap.put("customerName", "String"); + + String result = act.createHistoryTableSyntax(primaryKeys, "testTable", "testDB", fields, columnToDataTypesMap, false); + + String expected = "CREATE TABLE testDB.`testTable_history`(`customerName` String NOT NULL,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY(customerName)"; + Assert.assertTrue(expected.equalsIgnoreCase(result)); + } + + @Test + public void createHistoryTableSyntax_withoutPrimaryKey() { + ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); + Field[] fields = new Field[1]; + fields[0] = new Field("customerName", 0, Schema.STRING_SCHEMA); + Map columnToDataTypesMap = new HashMap<>(); + columnToDataTypesMap.put("customerName", "String"); + + String result = act.createHistoryTableSyntax(null, "testTable", "testDB", fields, columnToDataTypesMap, false); + + String expected = "CREATE TABLE testDB.`testTable_history`(`customerName` String NOT NULL,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()"; + Assert.assertTrue(expected.equalsIgnoreCase(result)); + } + } From b2ad682fd8e0534d680f98454cf3f173e750376f Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 16 May 2024 21:26:44 -0400 Subject: [PATCH 2/2] Added logic to create materialized view --- .../altinity/clickhouse/sink/connector/db/DbWriter.java | 8 +++++++- .../db/operations/ClickHouseAutoCreateTable.java | 8 ++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index bae2efd2a..2b8b8c1ee 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -108,6 +108,7 @@ public DbWriter( //ToDO: Is this a reliable way of checking if the table exists already. if (this.engine == null) { if (this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString())) { + log.info(String.format("**** Task(%s), AUTO CREATE TABLE (%s) *** ",taskId, tableName)); ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); try { @@ -119,7 +120,12 @@ public DbWriter( } boolean useReplicatedReplacingMergeTree = this.config.getBoolean( ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString()); - act.createNewTable(record.getPrimaryKey(), tableName, database, fields, this.conn, + + if(this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_HISTORY_TABLES.toString())) { + act.createHistoryTableSyntax(record.getPrimaryKey(), tableName, database, fields, this.conn, + useReplicatedReplacingMergeTree); + } else + act.createNewTable(record.getPrimaryKey(), tableName, database, fields, this.conn, isNewReplacingMergeTreeEngine, useReplicatedReplacingMergeTree); } catch (Exception e) { log.error("**** Error creating table ***" + tableName, e); diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 7229860c0..48955b8a2 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -46,6 +46,14 @@ public void createNewTable(ArrayList primaryKey, String tableName, Strin this.runQuery(createTableQuery, connection); } + // Create matereialized view + // CREATE MATERIALIZED VIEW user_mv to user as select * from user_history; + public void createMaterializedView(String tableName, String databaseName, ClickHouseConnection connection) throws SQLException { + String createMaterializedViewQuery = "CREATE MATERIALIZED VIEW " + databaseName + "." + tableName + "_mv TO " + databaseName + "." + tableName + " AS SELECT * FROM " + databaseName + "." + tableName + "_history"; + log.info("**** AUTO CREATE MATERIALIZED VIEW " + createMaterializedViewQuery); + this.runQuery(createMaterializedViewQuery, connection); + } + /** * Function to create history table, table suffixed with _history. * @param primaryKey