diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java index ec64946c9..f9e7f8b99 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java @@ -284,6 +284,16 @@ static ConfigDef newConfigDef() { 1, ConfigDef.Width.NONE, ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString()) + .define( + ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_HISTORY_TABLES.toString(), + Type.BOOLEAN, + false, + Importance.HIGH, + "If enabled, history tables, table names suffixed with _history are created in ClickHouse", + CONFIG_GROUP_CONNECTOR_CONFIG, + 1, + ConfigDef.Width.NONE, + ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_HISTORY_TABLES.toString()) .define( ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), Type.BOOLEAN, diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java index bdd6224f4..f27f4e1a2 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java @@ -53,6 +53,8 @@ public enum ClickHouseSinkConnectorConfigVariables { //Config variable for auto creating tables if they dont exist. AUTO_CREATE_TABLES("auto.create.tables"), + AUTO_CREATE_HISTORY_TABLES("auto.create.history.tables"), + // Config variable for auto creating ReplicatedReplacingMergeTree AUTO_CREATE_TABLES_REPLICATED("auto.create.tables.replicated"), diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index 3fa7e3e6b..3b491291e 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -119,9 +119,15 @@ public DbWriter( } boolean useReplicatedReplacingMergeTree = this.config.getBoolean( ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString()); - String rmtDeleteColumn = this.config.getString(ClickHouseSinkConnectorConfigVariables.REPLACING_MERGE_TREE_DELETE_COLUMN.toString()); - act.createNewTable(record.getPrimaryKey(), tableName, database, fields, this.conn, + + if(this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_HISTORY_TABLES.toString())) { + act.createHistoryTableSyntax(record.getPrimaryKey(), tableName, database, fields, columnNameToDataTypeMap, + useReplicatedReplacingMergeTree); + } else { + String rmtDeleteColumn = this.config.getString(ClickHouseSinkConnectorConfigVariables.REPLACING_MERGE_TREE_DELETE_COLUMN.toString()); + act.createNewTable(record.getPrimaryKey(), tableName, database, fields, this.conn, isNewReplacingMergeTreeEngine, useReplicatedReplacingMergeTree, rmtDeleteColumn); + } } catch (Exception e) { log.error("**** Error creating table ***" + tableName, e); } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 45ba44922..0c1fccf1c 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -25,6 +25,17 @@ public class ClickHouseAutoCreateTable extends ClickHouseTableOperationsBase{ private static final Logger log = LogManager.getLogger(ClickHouseAutoCreateTable.class.getName()); + /** + * Function to create a new table in ClickHouse. + * @param primaryKey + * @param tableName + * @param databaseName + * @param fields + * @param connection + * @param isNewReplacingMergeTree + * @param useReplicatedReplacingMergeTree + * @throws SQLException + */ public void createNewTable(ArrayList primaryKey, String tableName, String databaseName, Field[] fields, ClickHouseConnection connection, boolean isNewReplacingMergeTree, boolean useReplicatedReplacingMergeTree, String rmtDeleteColumn) throws SQLException { @@ -36,6 +47,62 @@ public void createNewTable(ArrayList primaryKey, String tableName, Strin this.runQuery(createTableQuery, connection); } + // Create matereialized view + // CREATE MATERIALIZED VIEW user_mv to user as select * from user_history; + public void createMaterializedView(String tableName, String databaseName, ClickHouseConnection connection) throws SQLException { + String createMaterializedViewQuery = "CREATE MATERIALIZED VIEW " + databaseName + "." + tableName + "_mv TO " + databaseName + "." + tableName + " AS SELECT * FROM " + databaseName + "." + tableName + "_history"; + log.info("**** AUTO CREATE MATERIALIZED VIEW " + createMaterializedViewQuery); + this.runQuery(createMaterializedViewQuery, connection); + } + + /** + * Function to create history table, table suffixed with _history. + * @param primaryKey + * @param tableName + * @param databaseName + * @param fields + * @param columnToDataTypesMap + * @param useReplicatedReplacingMergeTree + * @return + */ + public String createHistoryTableSyntax(ArrayList primaryKey, String tableName, String databaseName, Field[] fields, + Map columnToDataTypesMap, + boolean useReplicatedReplacingMergeTree) { + + StringBuilder createTableSyntax = new StringBuilder(); + + createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName) + .append("_history").append("`").append("("); + + for(Field f: fields) { + appendFieldToCreateTableSyntax(createTableSyntax, f, columnToDataTypesMap); + } + + createTableSyntax.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE); + createTableSyntax.append(")"); + createTableSyntax.append(" "); + + if(useReplicatedReplacingMergeTree == true) + createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN)); + else + createTableSyntax.append("ENGINE = ReplacingMergeTree(").append(VERSION_COLUMN).append(")"); + + createTableSyntax.append(" "); + + if(primaryKey != null && isPrimaryKeyColumnPresent(primaryKey, columnToDataTypesMap)) { + createTableSyntax.append(PRIMARY_KEY).append("("); + createTableSyntax.append(primaryKey.stream().map(Object::toString).collect(Collectors.joining(","))); + createTableSyntax.append(") "); + + createTableSyntax.append(ORDER_BY).append("("); + createTableSyntax.append(primaryKey.stream().map(Object::toString).collect(Collectors.joining(","))); + createTableSyntax.append(")"); + } else { + // ToDO: + createTableSyntax.append(ORDER_BY_TUPLE); + } + return createTableSyntax.toString(); + } /** * Function to generate CREATE TABLE for ClickHouse. * @@ -53,29 +120,8 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`").append("("); - for (Field f : fields) { - String colName = f.name(); - String dataType = columnToDataTypesMap.get(colName); - boolean isNull = false; - if (f.schema().isOptional() == true) { - isNull = true; - } - createTableSyntax.append("`").append(colName).append("`").append(" ").append(dataType); - - // Ignore setting NULL OR not NULL for JSON and Array - if (dataType != null && - (dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name()) || - dataType.contains(ClickHouseDataType.Array.name()))) { - // ignore adding nulls; - } else { - if (isNull) { - createTableSyntax.append(" ").append(NULL); - } else { - createTableSyntax.append(" ").append(NOT_NULL); - } - } - createTableSyntax.append(","); - + for(Field f: fields) { + appendFieldToCreateTableSyntax(createTableSyntax, f, columnToDataTypesMap); } String isDeletedColumn = IS_DELETED_COLUMN; @@ -123,6 +169,24 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t return createTableSyntax.toString(); } + private void appendFieldToCreateTableSyntax(StringBuilder createTableSyntax, Field f, Map columnToDataTypesMap) { + String colName = f.name(); + String dataType = columnToDataTypesMap.get(colName); + boolean isNull = f.schema().isOptional(); + + createTableSyntax.append("`").append(colName).append("` ").append(dataType); + + // Ignore setting NULL OR not NULL for JSON and Array + if(dataType != null && + !(dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name()) || + dataType.contains(ClickHouseDataType.Array.name()))) { + createTableSyntax.append(isNull ? " NULL" : " NOT NULL"); + } + + createTableSyntax.append(","); + } + + @VisibleForTesting boolean isPrimaryKeyColumnPresent(ArrayList primaryKeys, Map columnToDataTypesMap) { diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java index e8bbe1e9c..a04e1cfe3 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java @@ -37,12 +37,8 @@ public class ClickHouseAutoCreateTableTest { @BeforeAll static void initialize() { - columnToDataTypesMap = getExpectedColumnToDataTypesMap(); + columnToDataTypesMap = getExpectedColumnToDataTypesMap(); -// this.columnToDataTypesMap.put("customer_id", "Int32"); -// this.columnToDataTypesMap.put("address", "String"); -// this.columnToDataTypesMap.put("first_name", "String"); -// this.columnToDataTypesMap.put("amount", "Int32"); String hostName = "localhost"; Integer port = 8123; @@ -210,4 +206,51 @@ public void testIsPrimaryKeyColumnPresent() { Assert.assertFalse(act.isPrimaryKeyColumnPresent(primaryKeys2, columnToDataTypesMap)); } + + @Test + public void createHistoryTableSyntax_withReplicatedReplacingMergeTreeEngine() { + ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); + ArrayList primaryKeys = new ArrayList<>(); + primaryKeys.add("customerName"); + Field[] fields = new Field[1]; + fields[0] = new Field("customerName", 0, Schema.STRING_SCHEMA); + Map columnToDataTypesMap = new HashMap<>(); + columnToDataTypesMap.put("customerName", "String"); + + String result = act.createHistoryTableSyntax(primaryKeys, "testTable", "testDB", fields, columnToDataTypesMap, true); + + String expected = "CREATE TABLE testDB.`testTable_history`(`customerName` String NOT NULL,`_version` UInt64) Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/testTable', '{replica}', _version) PRIMARY KEY(customerName) ORDER BY(customerName)"; + Assert.assertTrue(expected.equalsIgnoreCase(result)); + } + + @Test + public void createHistoryTableSyntax_withReplacingMergeTreeEngine() { + ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); + ArrayList primaryKeys = new ArrayList<>(); + primaryKeys.add("customerName"); + Field[] fields = new Field[1]; + fields[0] = new Field("customerName", 0, Schema.STRING_SCHEMA); + Map columnToDataTypesMap = new HashMap<>(); + columnToDataTypesMap.put("customerName", "String"); + + String result = act.createHistoryTableSyntax(primaryKeys, "testTable", "testDB", fields, columnToDataTypesMap, false); + + String expected = "CREATE TABLE testDB.`testTable_history`(`customerName` String NOT NULL,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY(customerName)"; + Assert.assertTrue(expected.equalsIgnoreCase(result)); + } + + @Test + public void createHistoryTableSyntax_withoutPrimaryKey() { + ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); + Field[] fields = new Field[1]; + fields[0] = new Field("customerName", 0, Schema.STRING_SCHEMA); + Map columnToDataTypesMap = new HashMap<>(); + columnToDataTypesMap.put("customerName", "String"); + + String result = act.createHistoryTableSyntax(null, "testTable", "testDB", fields, columnToDataTypesMap, false); + + String expected = "CREATE TABLE testDB.`testTable_history`(`customerName` String NOT NULL,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()"; + Assert.assertTrue(expected.equalsIgnoreCase(result)); + } + }