Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logic to create history tables #599

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ static ConfigDef newConfigDef() {
1,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString())
.define(
ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_HISTORY_TABLES.toString(),
Type.BOOLEAN,
false,
Importance.HIGH,
"If enabled, history tables, table names suffixed with _history are created in ClickHouse",
CONFIG_GROUP_CONNECTOR_CONFIG,
1,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_HISTORY_TABLES.toString())
.define(
ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(),
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public enum ClickHouseSinkConnectorConfigVariables {
//Config variable for auto creating tables if they dont exist.
AUTO_CREATE_TABLES("auto.create.tables"),

AUTO_CREATE_HISTORY_TABLES("auto.create.history.tables"),

// Config variable for auto creating ReplicatedReplacingMergeTree
AUTO_CREATE_TABLES_REPLICATED("auto.create.tables.replicated"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,15 @@ public DbWriter(
}
boolean useReplicatedReplacingMergeTree = this.config.getBoolean(
ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString());
String rmtDeleteColumn = this.config.getString(ClickHouseSinkConnectorConfigVariables.REPLACING_MERGE_TREE_DELETE_COLUMN.toString());
act.createNewTable(record.getPrimaryKey(), tableName, database, fields, this.conn,

if(this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_HISTORY_TABLES.toString())) {
act.createHistoryTableSyntax(record.getPrimaryKey(), tableName, database, fields, columnNameToDataTypeMap,
useReplicatedReplacingMergeTree);
} else {
String rmtDeleteColumn = this.config.getString(ClickHouseSinkConnectorConfigVariables.REPLACING_MERGE_TREE_DELETE_COLUMN.toString());
act.createNewTable(record.getPrimaryKey(), tableName, database, fields, this.conn,
isNewReplacingMergeTreeEngine, useReplicatedReplacingMergeTree, rmtDeleteColumn);
}
} catch (Exception e) {
log.error("**** Error creating table ***" + tableName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ public class ClickHouseAutoCreateTable extends ClickHouseTableOperationsBase{

private static final Logger log = LogManager.getLogger(ClickHouseAutoCreateTable.class.getName());

/**
* Function to create a new table in ClickHouse.
* @param primaryKey
* @param tableName
* @param databaseName
* @param fields
* @param connection
* @param isNewReplacingMergeTree
* @param useReplicatedReplacingMergeTree
* @throws SQLException
*/
public void createNewTable(ArrayList<String> primaryKey, String tableName, String databaseName, Field[] fields,
ClickHouseConnection connection, boolean isNewReplacingMergeTree,
boolean useReplicatedReplacingMergeTree, String rmtDeleteColumn) throws SQLException {
Expand All @@ -36,6 +47,62 @@ public void createNewTable(ArrayList<String> primaryKey, String tableName, Strin
this.runQuery(createTableQuery, connection);
}

// Create matereialized view
// CREATE MATERIALIZED VIEW user_mv to user as select * from user_history;
public void createMaterializedView(String tableName, String databaseName, ClickHouseConnection connection) throws SQLException {
String createMaterializedViewQuery = "CREATE MATERIALIZED VIEW " + databaseName + "." + tableName + "_mv TO " + databaseName + "." + tableName + " AS SELECT * FROM " + databaseName + "." + tableName + "_history";
log.info("**** AUTO CREATE MATERIALIZED VIEW " + createMaterializedViewQuery);
this.runQuery(createMaterializedViewQuery, connection);
}

/**
* Function to create history table, table suffixed with _history.
* @param primaryKey
* @param tableName
* @param databaseName
* @param fields
* @param columnToDataTypesMap
* @param useReplicatedReplacingMergeTree
* @return
*/
public String createHistoryTableSyntax(ArrayList<String> primaryKey, String tableName, String databaseName, Field[] fields,
Map<String, String> columnToDataTypesMap,
boolean useReplicatedReplacingMergeTree) {

StringBuilder createTableSyntax = new StringBuilder();

createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName)
.append("_history").append("`").append("(");

for(Field f: fields) {
appendFieldToCreateTableSyntax(createTableSyntax, f, columnToDataTypesMap);
}

createTableSyntax.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE);
createTableSyntax.append(")");
createTableSyntax.append(" ");

if(useReplicatedReplacingMergeTree == true)
createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
else
createTableSyntax.append("ENGINE = ReplacingMergeTree(").append(VERSION_COLUMN).append(")");

createTableSyntax.append(" ");

if(primaryKey != null && isPrimaryKeyColumnPresent(primaryKey, columnToDataTypesMap)) {
createTableSyntax.append(PRIMARY_KEY).append("(");
createTableSyntax.append(primaryKey.stream().map(Object::toString).collect(Collectors.joining(",")));
createTableSyntax.append(") ");

createTableSyntax.append(ORDER_BY).append("(");
createTableSyntax.append(primaryKey.stream().map(Object::toString).collect(Collectors.joining(",")));
createTableSyntax.append(")");
} else {
// ToDO:
createTableSyntax.append(ORDER_BY_TUPLE);
}
return createTableSyntax.toString();
}
/**
* Function to generate CREATE TABLE for ClickHouse.
*
Expand All @@ -53,29 +120,8 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t

createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`").append("(");

for (Field f : fields) {
String colName = f.name();
String dataType = columnToDataTypesMap.get(colName);
boolean isNull = false;
if (f.schema().isOptional() == true) {
isNull = true;
}
createTableSyntax.append("`").append(colName).append("`").append(" ").append(dataType);

// Ignore setting NULL OR not NULL for JSON and Array
if (dataType != null &&
(dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name()) ||
dataType.contains(ClickHouseDataType.Array.name()))) {
// ignore adding nulls;
} else {
if (isNull) {
createTableSyntax.append(" ").append(NULL);
} else {
createTableSyntax.append(" ").append(NOT_NULL);
}
}
createTableSyntax.append(",");

for(Field f: fields) {
appendFieldToCreateTableSyntax(createTableSyntax, f, columnToDataTypesMap);
}

String isDeletedColumn = IS_DELETED_COLUMN;
Expand Down Expand Up @@ -123,6 +169,24 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t
return createTableSyntax.toString();
}

private void appendFieldToCreateTableSyntax(StringBuilder createTableSyntax, Field f, Map<String, String> columnToDataTypesMap) {
String colName = f.name();
String dataType = columnToDataTypesMap.get(colName);
boolean isNull = f.schema().isOptional();

createTableSyntax.append("`").append(colName).append("` ").append(dataType);

// Ignore setting NULL OR not NULL for JSON and Array
if(dataType != null &&
!(dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name()) ||
dataType.contains(ClickHouseDataType.Array.name()))) {
createTableSyntax.append(isNull ? " NULL" : " NOT NULL");
}

createTableSyntax.append(",");
}


@VisibleForTesting
boolean isPrimaryKeyColumnPresent(ArrayList<String> primaryKeys, Map<String, String> columnToDataTypesMap) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ public class ClickHouseAutoCreateTableTest {
@BeforeAll
static void initialize() {

columnToDataTypesMap = getExpectedColumnToDataTypesMap();
columnToDataTypesMap = getExpectedColumnToDataTypesMap();

// this.columnToDataTypesMap.put("customer_id", "Int32");
// this.columnToDataTypesMap.put("address", "String");
// this.columnToDataTypesMap.put("first_name", "String");
// this.columnToDataTypesMap.put("amount", "Int32");

String hostName = "localhost";
Integer port = 8123;
Expand Down Expand Up @@ -210,4 +206,51 @@ public void testIsPrimaryKeyColumnPresent() {
Assert.assertFalse(act.isPrimaryKeyColumnPresent(primaryKeys2, columnToDataTypesMap));
}


@Test
public void createHistoryTableSyntax_withReplicatedReplacingMergeTreeEngine() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
ArrayList<String> primaryKeys = new ArrayList<>();
primaryKeys.add("customerName");
Field[] fields = new Field[1];
fields[0] = new Field("customerName", 0, Schema.STRING_SCHEMA);
Map<String, String> columnToDataTypesMap = new HashMap<>();
columnToDataTypesMap.put("customerName", "String");

String result = act.createHistoryTableSyntax(primaryKeys, "testTable", "testDB", fields, columnToDataTypesMap, true);

String expected = "CREATE TABLE testDB.`testTable_history`(`customerName` String NOT NULL,`_version` UInt64) Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/testTable', '{replica}', _version) PRIMARY KEY(customerName) ORDER BY(customerName)";
Assert.assertTrue(expected.equalsIgnoreCase(result));
}

@Test
public void createHistoryTableSyntax_withReplacingMergeTreeEngine() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
ArrayList<String> primaryKeys = new ArrayList<>();
primaryKeys.add("customerName");
Field[] fields = new Field[1];
fields[0] = new Field("customerName", 0, Schema.STRING_SCHEMA);
Map<String, String> columnToDataTypesMap = new HashMap<>();
columnToDataTypesMap.put("customerName", "String");

String result = act.createHistoryTableSyntax(primaryKeys, "testTable", "testDB", fields, columnToDataTypesMap, false);

String expected = "CREATE TABLE testDB.`testTable_history`(`customerName` String NOT NULL,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY(customerName)";
Assert.assertTrue(expected.equalsIgnoreCase(result));
}

@Test
public void createHistoryTableSyntax_withoutPrimaryKey() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
Field[] fields = new Field[1];
fields[0] = new Field("customerName", 0, Schema.STRING_SCHEMA);
Map<String, String> columnToDataTypesMap = new HashMap<>();
columnToDataTypesMap.put("customerName", "String");

String result = act.createHistoryTableSyntax(null, "testTable", "testDB", fields, columnToDataTypesMap, false);

String expected = "CREATE TABLE testDB.`testTable_history`(`customerName` String NOT NULL,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()";
Assert.assertTrue(expected.equalsIgnoreCase(result));
}

}
Loading