Skip to content

Commit

Permalink
[Feature] Support clickhouse jdbc datasource(#40894)
Browse files Browse the repository at this point in the history
Signed-off-by: samcchen <[email protected]>
  • Loading branch information
DataScientistSamChan committed Apr 18, 2024
1 parent 67bcade commit 4014e46
Show file tree
Hide file tree
Showing 10 changed files with 519 additions and 26 deletions.
51 changes: 50 additions & 1 deletion be/src/exec/jdbc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,42 @@ void JDBCScanner::_init_profile() {

StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_class, SlotDescriptor* slot_desc) {
auto type = slot_desc->type().type;
if (java_class == "java.lang.Short") {
if (java_class == "java.lang.Byte") {
if (type != TYPE_BOOLEAN && type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT &&
type != TYPE_BIGINT) {
return Status::NotSupported(
fmt::format("Type mismatches on column[{}], JDBC result type is Byte, please set the type to "
"one of boolean,tinyint,smallint,int,bigint",
slot_desc->col_name()));
}
if (type == TYPE_BOOLEAN) {
return TYPE_BOOLEAN;
}
return TYPE_TINYINT;
} else if (java_class == "com.clickhouse.data.value.UnsignedByte") {
if (type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is UnsignedByte, please set the type to "
"one of smallint,int,bigint",
slot_desc->col_name()));
}
return TYPE_SMALLINT;
} else if (java_class == "java.lang.Short") {
if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(
fmt::format("Type mismatches on column[{}], JDBC result type is Short, please set the type to "
"one of tinyint,smallint,int,bigint",
slot_desc->col_name()));
}
return TYPE_SMALLINT;
} else if (java_class == "com.clickhouse.data.value.UnsignedShort") {
if (type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is UnsignedShort, please set the type to "
"one of int,bigint",
slot_desc->col_name()));
}
return TYPE_INT;
} else if (java_class == "java.lang.Integer") {
if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(
Expand All @@ -218,6 +246,13 @@ StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_c
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "com.clickhouse.data.value.UnsignedInteger") {
if (type != TYPE_BIGINT) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is UnsignedInteger, please set the type to bigint",
slot_desc->col_name()));
}
return TYPE_BIGINT;
} else if (java_class == "java.lang.Long") {
if (type != TYPE_BIGINT) {
return Status::NotSupported(fmt::format(
Expand All @@ -232,6 +267,13 @@ StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_c
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "com.clickhouse.data.value.UnsignedLong") {
if (type != TYPE_LARGEINT) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is UnsignedLong, please set the type to largeint",
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "java.lang.Boolean") {
if (type != TYPE_BOOLEAN && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(
Expand Down Expand Up @@ -282,6 +324,13 @@ StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_c
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "java.time.LocalDate") {
if (type != TYPE_DATE) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is LocalDate, please set the type to date",
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "java.math.BigDecimal") {
if (type != TYPE_DECIMAL32 && type != TYPE_DECIMAL64 && type != TYPE_DECIMAL128 && type != TYPE_VARCHAR) {
return Status::NotSupported(
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,13 @@ under the License.
<artifactId>postgresql</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.clickhouse/clickhouse-jdbc -->
<!-- we need clickhouse driver for jdbc connector -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.mockrunner/mockrunner-jdbc -->
<dependency>
<groupId>com.mockrunner</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public enum ProtocolType {
MYSQL,
POSTGRES,
ORACLE,
MARIADB
MARIADB,

CLICKHOUSE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.jdbc;

import com.google.common.collect.ImmutableSet;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Type;
import com.starrocks.connector.exception.StarRocksConnectorException;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class ClickhouseSchemaResolver extends JDBCSchemaResolver {
Map<String, String> properties;

public static final Set<String> SUPPORTED_TABLE_TYPES = new HashSet<>(
Arrays.asList("LOG TABLE", "MEMORY TABLE", "TEMPORARY TABLE", "VIEW", "DICTIONARY", "SYSTEM TABLE",
"REMOTE TABLE", "TABLE"));

public ClickhouseSchemaResolver(Map<String, String> properties) {
this.properties = properties;
}

@Override
public Collection<String> listSchemas(Connection connection) {
try (ResultSet resultSet = connection.getMetaData().getSchemas()) {
ImmutableSet.Builder<String> schemaNames = ImmutableSet.builder();
while (resultSet.next()) {
String schemaName = resultSet.getString("TABLE_SCHEM");
// skip internal schemas
if (!schemaName.equalsIgnoreCase("INFORMATION_SCHEMA") && !schemaName.equalsIgnoreCase("system")) {
schemaNames.add(schemaName);
}
}
return schemaNames.build();
} catch (SQLException e) {
throw new StarRocksConnectorException(e.getMessage());
}
}


@Override
public ResultSet getTables(Connection connection, String dbName) throws SQLException {
String tableTypes = properties.get("table_types");
if (null != tableTypes) {
String[] tableTypesArray = tableTypes.split(",");
if (tableTypesArray.length == 0) {
throw new StarRocksConnectorException("table_types should be populated with table types separated by " +
"comma, e.g. 'TABLE,VIEW'. Currently supported type includes:" +
String.join(",", SUPPORTED_TABLE_TYPES));
}

for (String tt : tableTypesArray) {
if (!SUPPORTED_TABLE_TYPES.contains(tt)) {
throw new StarRocksConnectorException("Unsupported table type found: " + tt,
",Currently supported table types includes:" + String.join(",", SUPPORTED_TABLE_TYPES));
}
}
return connection.getMetaData().getTables(connection.getCatalog(), dbName, null, tableTypesArray);
}
return connection.getMetaData().getTables(connection.getCatalog(), dbName, null,
SUPPORTED_TABLE_TYPES.toArray(new String[SUPPORTED_TABLE_TYPES.size()]));

}

@Override
public ResultSet getColumns(Connection connection, String dbName, String tblName) throws SQLException {
return connection.getMetaData().getColumns(connection.getCatalog(), dbName, tblName, "%");
}


@Override
public Type convertColumnType(int dataType, String typeName, int columnSize, int digits) {
PrimitiveType primitiveType;
switch (dataType) {
case Types.TINYINT:
primitiveType = PrimitiveType.TINYINT;
break;
case Types.SMALLINT:
primitiveType = PrimitiveType.SMALLINT;
break;
case Types.INTEGER:
primitiveType = PrimitiveType.INT;
break;
case Types.BIGINT:
primitiveType = PrimitiveType.BIGINT;
break;
case Types.NUMERIC:
primitiveType = PrimitiveType.LARGEINT;
break;
case Types.FLOAT:
primitiveType = PrimitiveType.FLOAT;
break;
case Types.DOUBLE:
primitiveType = PrimitiveType.DOUBLE;
break;
case Types.BOOLEAN:
primitiveType = PrimitiveType.BOOLEAN;
break;
case Types.VARCHAR:
return ScalarType.createVarcharType(65533);
case Types.DATE:
primitiveType = PrimitiveType.DATE;
break;
case Types.TIMESTAMP:
primitiveType = PrimitiveType.DATETIME;
break;
case Types.DECIMAL:
// Decimal(9,9), first 9 is precision, second 9 is scale
String[] precisionAndScale =
typeName.replace("Decimal", "").replace("(", "")
.replace(")", "").replace(" ", "")
.split(",");
if (precisionAndScale.length != 2) {
// should not go here, but if it does, we make it DECIMALV2.
throw new StarRocksConnectorException(
"Cannot extract precision and scale from Decimal typename:" + typeName);
} else {
int precision = Integer.parseInt(precisionAndScale[0]);
int scale = Integer.parseInt(precisionAndScale[1]);
return ScalarType.createUnifiedDecimalType(precision, scale);
}
default:
primitiveType = PrimitiveType.UNKNOWN_TYPE;
break;
}
return ScalarType.createType(primitiveType);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public JDBCMetadata(Map<String, String> properties, String catalogName, HikariDa
schemaResolver = new PostgresSchemaResolver();
} else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("mariadb")) {
schemaResolver = new MysqlSchemaResolver();
} else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("clickhouse")) {
schemaResolver = new ClickhouseSchemaResolver(properties);
} else {
LOG.warn("{} not support yet", properties.get(JDBCResource.DRIVER_CLASS));
throw new StarRocksConnectorException(properties.get(JDBCResource.DRIVER_CLASS) + " not support yet");
Expand Down
Loading

0 comments on commit 4014e46

Please sign in to comment.