Skip to content

Commit f2ba86f

Browse files
DataScientistSamChanleiyang0324
authored andcommitted
[Feature] Support clickhouse jdbc datasource(StarRocks#40894) (StarRocks#42930)
Why I'm doing: Enhance current clickhouse JDBC datasource support. What I'm doing: Support more fine-grained clickhouse JDBC datasource with partition information detection and more fine grained data types. Fixes #issue StarRocks#40894 Signed-off-by: samcchen <[email protected]>
1 parent b1421b5 commit f2ba86f

File tree

10 files changed

+525
-26
lines changed

10 files changed

+525
-26
lines changed

be/src/exec/jdbc_scanner.cpp

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,14 +195,42 @@ void JDBCScanner::_init_profile() {
195195

196196
StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_class, SlotDescriptor* slot_desc) {
197197
auto type = slot_desc->type().type;
198-
if (java_class == "java.lang.Short") {
198+
if (java_class == "java.lang.Byte") {
199+
if (type != TYPE_BOOLEAN && type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT &&
200+
type != TYPE_BIGINT) {
201+
return Status::NotSupported(
202+
fmt::format("Type mismatches on column[{}], JDBC result type is Byte, please set the type to "
203+
"one of boolean,tinyint,smallint,int,bigint",
204+
slot_desc->col_name()));
205+
}
206+
if (type == TYPE_BOOLEAN) {
207+
return TYPE_BOOLEAN;
208+
}
209+
return TYPE_TINYINT;
210+
} else if (java_class == "com.clickhouse.data.value.UnsignedByte") {
211+
if (type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
212+
return Status::NotSupported(fmt::format(
213+
"Type mismatches on column[{}], JDBC result type is UnsignedByte, please set the type to "
214+
"one of smallint,int,bigint",
215+
slot_desc->col_name()));
216+
}
217+
return TYPE_SMALLINT;
218+
} else if (java_class == "java.lang.Short") {
199219
if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
200220
return Status::NotSupported(
201221
fmt::format("Type mismatches on column[{}], JDBC result type is Short, please set the type to "
202222
"one of tinyint,smallint,int,bigint",
203223
slot_desc->col_name()));
204224
}
205225
return TYPE_SMALLINT;
226+
} else if (java_class == "com.clickhouse.data.value.UnsignedShort") {
227+
if (type != TYPE_INT && type != TYPE_BIGINT) {
228+
return Status::NotSupported(fmt::format(
229+
"Type mismatches on column[{}], JDBC result type is UnsignedShort, please set the type to "
230+
"one of int,bigint",
231+
slot_desc->col_name()));
232+
}
233+
return TYPE_INT;
206234
} else if (java_class == "java.lang.Integer") {
207235
if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
208236
return Status::NotSupported(
@@ -218,6 +246,13 @@ StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_c
218246
slot_desc->col_name()));
219247
}
220248
return TYPE_VARCHAR;
249+
} else if (java_class == "com.clickhouse.data.value.UnsignedInteger") {
250+
if (type != TYPE_BIGINT) {
251+
return Status::NotSupported(fmt::format(
252+
"Type mismatches on column[{}], JDBC result type is UnsignedInteger, please set the type to bigint",
253+
slot_desc->col_name()));
254+
}
255+
return TYPE_BIGINT;
221256
} else if (java_class == "java.lang.Long") {
222257
if (type != TYPE_BIGINT) {
223258
return Status::NotSupported(fmt::format(
@@ -232,6 +267,13 @@ StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_c
232267
slot_desc->col_name()));
233268
}
234269
return TYPE_VARCHAR;
270+
} else if (java_class == "com.clickhouse.data.value.UnsignedLong") {
271+
if (type != TYPE_LARGEINT) {
272+
return Status::NotSupported(fmt::format(
273+
"Type mismatches on column[{}], JDBC result type is UnsignedLong, please set the type to largeint",
274+
slot_desc->col_name()));
275+
}
276+
return TYPE_VARCHAR;
235277
} else if (java_class == "java.lang.Boolean") {
236278
if (type != TYPE_BOOLEAN && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
237279
return Status::NotSupported(
@@ -282,6 +324,13 @@ StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_c
282324
slot_desc->col_name()));
283325
}
284326
return TYPE_VARCHAR;
327+
} else if (java_class == "java.time.LocalDate") {
328+
if (type != TYPE_DATE) {
329+
return Status::NotSupported(fmt::format(
330+
"Type mismatches on column[{}], JDBC result type is LocalDate, please set the type to date",
331+
slot_desc->col_name()));
332+
}
333+
return TYPE_VARCHAR;
285334
} else if (java_class == "java.math.BigDecimal") {
286335
if (type != TYPE_DECIMAL32 && type != TYPE_DECIMAL64 && type != TYPE_DECIMAL128 && type != TYPE_VARCHAR) {
287336
return Status::NotSupported(

fe/fe-core/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,13 @@ under the License.
757757
<artifactId>postgresql</artifactId>
758758
</dependency>
759759

760+
<!-- https://mvnrepository.com/artifact/com.clickhouse/clickhouse-jdbc -->
761+
<!-- we need clickhouse driver for jdbc connector -->
762+
<dependency>
763+
<groupId>com.clickhouse</groupId>
764+
<artifactId>clickhouse-jdbc</artifactId>
765+
</dependency>
766+
760767
<!-- https://mvnrepository.com/artifact/com.mockrunner/mockrunner-jdbc -->
761768
<dependency>
762769
<groupId>com.mockrunner</groupId>

fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,8 @@ public enum ProtocolType {
302302
MYSQL,
303303
POSTGRES,
304304
ORACLE,
305-
MARIADB
305+
MARIADB,
306+
307+
CLICKHOUSE
306308
}
307309
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright 2021-present StarRocks, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.starrocks.connector.jdbc;
16+
17+
import com.google.common.collect.ImmutableSet;
18+
import com.starrocks.catalog.PrimitiveType;
19+
import com.starrocks.catalog.ScalarType;
20+
import com.starrocks.catalog.Type;
21+
import com.starrocks.connector.exception.StarRocksConnectorException;
22+
23+
import java.sql.Connection;
24+
import java.sql.ResultSet;
25+
import java.sql.SQLException;
26+
import java.sql.Types;
27+
import java.util.Arrays;
28+
import java.util.Collection;
29+
import java.util.HashSet;
30+
import java.util.Map;
31+
import java.util.Set;
32+
33+
public class ClickhouseSchemaResolver extends JDBCSchemaResolver {
34+
Map<String, String> properties;
35+
36+
public static final Set<String> SUPPORTED_TABLE_TYPES = new HashSet<>(
37+
Arrays.asList("LOG TABLE", "MEMORY TABLE", "TEMPORARY TABLE", "VIEW", "DICTIONARY", "SYSTEM TABLE",
38+
"REMOTE TABLE", "TABLE"));
39+
40+
public ClickhouseSchemaResolver(Map<String, String> properties) {
41+
this.properties = properties;
42+
}
43+
44+
@Override
45+
public Collection<String> listSchemas(Connection connection) {
46+
try (ResultSet resultSet = connection.getMetaData().getSchemas()) {
47+
ImmutableSet.Builder<String> schemaNames = ImmutableSet.builder();
48+
while (resultSet.next()) {
49+
String schemaName = resultSet.getString("TABLE_SCHEM");
50+
// skip internal schemas
51+
if (!schemaName.equalsIgnoreCase("INFORMATION_SCHEMA") && !schemaName.equalsIgnoreCase("system")) {
52+
schemaNames.add(schemaName);
53+
}
54+
}
55+
return schemaNames.build();
56+
} catch (SQLException e) {
57+
throw new StarRocksConnectorException(e.getMessage());
58+
}
59+
}
60+
61+
62+
@Override
63+
public ResultSet getTables(Connection connection, String dbName) throws SQLException {
64+
String tableTypes = properties.get("table_types");
65+
if (null != tableTypes) {
66+
String[] tableTypesArray = tableTypes.split(",");
67+
if (tableTypesArray.length == 0) {
68+
throw new StarRocksConnectorException("table_types should be populated with table types separated by " +
69+
"comma, e.g. 'TABLE,VIEW'. Currently supported type includes:" +
70+
String.join(",", SUPPORTED_TABLE_TYPES));
71+
}
72+
73+
for (String tt : tableTypesArray) {
74+
if (!SUPPORTED_TABLE_TYPES.contains(tt)) {
75+
throw new StarRocksConnectorException("Unsupported table type found: " + tt,
76+
",Currently supported table types includes:" + String.join(",", SUPPORTED_TABLE_TYPES));
77+
}
78+
}
79+
return connection.getMetaData().getTables(connection.getCatalog(), dbName, null, tableTypesArray);
80+
}
81+
return connection.getMetaData().getTables(connection.getCatalog(), dbName, null,
82+
SUPPORTED_TABLE_TYPES.toArray(new String[SUPPORTED_TABLE_TYPES.size()]));
83+
84+
}
85+
86+
@Override
87+
public ResultSet getColumns(Connection connection, String dbName, String tblName) throws SQLException {
88+
return connection.getMetaData().getColumns(connection.getCatalog(), dbName, tblName, "%");
89+
}
90+
91+
92+
@Override
93+
public Type convertColumnType(int dataType, String typeName, int columnSize, int digits) {
94+
PrimitiveType primitiveType;
95+
switch (dataType) {
96+
case Types.TINYINT:
97+
primitiveType = PrimitiveType.TINYINT;
98+
break;
99+
case Types.SMALLINT:
100+
primitiveType = PrimitiveType.SMALLINT;
101+
break;
102+
case Types.INTEGER:
103+
primitiveType = PrimitiveType.INT;
104+
break;
105+
case Types.BIGINT:
106+
primitiveType = PrimitiveType.BIGINT;
107+
break;
108+
case Types.NUMERIC:
109+
primitiveType = PrimitiveType.LARGEINT;
110+
break;
111+
case Types.FLOAT:
112+
primitiveType = PrimitiveType.FLOAT;
113+
break;
114+
case Types.DOUBLE:
115+
primitiveType = PrimitiveType.DOUBLE;
116+
break;
117+
case Types.BOOLEAN:
118+
primitiveType = PrimitiveType.BOOLEAN;
119+
break;
120+
case Types.VARCHAR:
121+
return ScalarType.createVarcharType(65533);
122+
case Types.DATE:
123+
primitiveType = PrimitiveType.DATE;
124+
break;
125+
case Types.TIMESTAMP:
126+
primitiveType = PrimitiveType.DATETIME;
127+
break;
128+
case Types.DECIMAL:
129+
// Decimal(9,9), first 9 is precision, second 9 is scale
130+
String[] precisionAndScale =
131+
typeName.replace("Decimal", "").replace("(", "")
132+
.replace(")", "").replace(" ", "")
133+
.split(",");
134+
if (precisionAndScale.length != 2) {
135+
// should not go here, but if it does, we make it DECIMALV2.
136+
throw new StarRocksConnectorException(
137+
"Cannot extract precision and scale from Decimal typename:" + typeName);
138+
} else {
139+
int precision = Integer.parseInt(precisionAndScale[0]);
140+
int scale = Integer.parseInt(precisionAndScale[1]);
141+
return ScalarType.createUnifiedDecimalType(precision, scale);
142+
}
143+
default:
144+
primitiveType = PrimitiveType.UNKNOWN_TYPE;
145+
break;
146+
}
147+
return ScalarType.createType(primitiveType);
148+
}
149+
150+
151+
}

fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCMetadata.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public JDBCMetadata(Map<String, String> properties, String catalogName, HikariDa
8080
schemaResolver = new PostgresSchemaResolver();
8181
} else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("mariadb")) {
8282
schemaResolver = new MysqlSchemaResolver();
83+
} else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("clickhouse")) {
84+
schemaResolver = new ClickhouseSchemaResolver(properties);
8385
} else {
8486
LOG.warn("{} not support yet", properties.get(JDBCResource.DRIVER_CLASS));
8587
throw new StarRocksConnectorException(properties.get(JDBCResource.DRIVER_CLASS) + " not support yet");

0 commit comments

Comments
 (0)