Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Enhancement] refactor StarRocks lookup function #300

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ limitations under the License.
</goals>
</execution>
</executions>
<configuration>
<source>${maven.compiler.source}</source>
</configuration>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package com.starrocks.connector.flink.connection;

import org.apache.flink.annotation.Internal;

import java.sql.Connection;
import java.sql.SQLException;

/**
* connection provider.
Expand All @@ -27,6 +29,17 @@ public interface StarRocksJdbcConnectionIProvider {

Connection reestablishConnection() throws Exception;

boolean isConnectionValid() throws SQLException;

/**
* Get existing connection or establish an new one if there is none.
*
* @return existing connection or newly established connection
* @throws SQLException sql exception
* @throws ClassNotFoundException driver class not found
*/
Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;

void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,30 @@ public Connection getConnection() throws SQLException, ClassNotFoundException {
@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
close();
connection = getConnection();
return getOrEstablishConnection();
}

public boolean isConnectionValid() throws SQLException {
return connection != null && connection.isValid(60);
}

@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (isConnectionValid() && !connection.isClosed() ) {
return connection;
}
try {
Class.forName(jdbcOptions.getCjDriverName());
} catch (ClassNotFoundException ex) {
LOG.warn("can not found class {}, try class {}", jdbcOptions.getCjDriverName(), jdbcOptions.getDriverName());
Class.forName(jdbcOptions.getDriverName());
}
if (jdbcOptions.getUsername().isPresent()) {
connection = DriverManager.getConnection(jdbcOptions.getDbURL(), jdbcOptions.getUsername().get(),
jdbcOptions.getPassword().orElse(null));
} else {
connection = DriverManager.getConnection(jdbcOptions.getDbURL());
}
return connection;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.converter;

import com.starrocks.connector.flink.statement.FieldNamedPreparedStatement;
import com.starrocks.connector.flink.util.JdbcTypeUtil;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.LogicalType;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Base class for all converters that convert between JDBC object and Flink internal object.
*/
public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {

protected final RowType rowType;
protected final JdbcDeserializationConverter[] toInternalConverters;
protected final JdbcSerializationConverter[] toExternalConverters;
protected final LogicalType[] fieldTypes;

public abstract String converterName();

public AbstractJdbcRowConverter(RowType rowType) {
this.rowType = checkNotNull(rowType);
this.fieldTypes =
rowType.getFields().stream()
.map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
this.toInternalConverters = new JdbcDeserializationConverter[rowType.getFieldCount()];
this.toExternalConverters = new JdbcSerializationConverter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
toInternalConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i));
toExternalConverters[i] = createNullableExternalConverter(fieldTypes[i]);
}
}

@Override
public RowData toInternal(ResultSet resultSet) throws SQLException {
GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
Object field = resultSet.getObject(pos + 1);
genericRowData.setField(pos, toInternalConverters[pos].deserialize(field));
}
return genericRowData;
}

@Override
public FieldNamedPreparedStatement toExternal(
RowData rowData, FieldNamedPreparedStatement statement) throws SQLException {
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters[index].serialize(rowData, index, statement);
}
return statement;
}

/** Runtime converter to convert JDBC field to {@link RowData} type object. */
@FunctionalInterface
public interface JdbcDeserializationConverter extends Serializable {
/**
* Convert a jdbc field object of {@link ResultSet} to the internal data structure object.
*
* @param jdbcField A single field of a {@link ResultSet}
* @return Object
* @throws SQLException maybe
*/
Object deserialize(Object jdbcField) throws SQLException;
}

/**
* Runtime converter to convert {@link RowData} field to java object and fill into the {@link
* PreparedStatement}.
*/
@FunctionalInterface
public interface JdbcSerializationConverter extends Serializable {
void serialize(RowData rowData, int index, FieldNamedPreparedStatement statement)
throws SQLException;
}

/**
* Create a nullable runtime {@link JdbcDeserializationConverter} from given {@link
* LogicalType}.
* @param type row type
* @return an converter for deserialize
*/
protected JdbcDeserializationConverter createNullableInternalConverter(LogicalType type) {
return wrapIntoNullableInternalConverter(createInternalConverter(type));
}

/**
*
* @param jdbcDeserializationConverter converter for deserialization
* @return wrapped converter
*/
protected JdbcDeserializationConverter wrapIntoNullableInternalConverter(
JdbcDeserializationConverter jdbcDeserializationConverter) {
return val -> {
if (val == null) {
return null;
} else {
return jdbcDeserializationConverter.deserialize(val);
}
};
}

protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
case BOOLEAN:
case FLOAT:
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case BINARY:
case VARBINARY:
case BIGINT:
case INTEGER:
return val -> val;
case TINYINT:
return val -> ((Integer) val).byteValue();
case SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case DATE:
return val -> (int) (((Date) val).toLocalDate().toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val ->
val instanceof LocalDateTime
? TimestampData.fromLocalDateTime((LocalDateTime) val)
: TimestampData.fromTimestamp((Timestamp) val);
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
case ARRAY:
case ROW:
case MAP:
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}

protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
return wrapIntoNullableExternalConverter(createExternalConverter(type), type);
}

protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
final int sqlType = JdbcTypeUtil.logicalTypeToSqlType(type.getTypeRoot());
return (val, index, statement) -> {
if (val == null
|| val.isNullAt(index)
|| LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
statement.setNull(index, sqlType);
} else {
jdbcSerializationConverter.serialize(val, index, statement);
}
};
}

protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
return (val, index, statement) ->
statement.setBoolean(index, val.getBoolean(index));
case TINYINT:
return (val, index, statement) -> statement.setByte(index, val.getByte(index));
case SMALLINT:
return (val, index, statement) -> statement.setShort(index, val.getShort(index));
case INTEGER:
case INTERVAL_YEAR_MONTH:
return (val, index, statement) -> statement.setInt(index, val.getInt(index));
case BIGINT:
case INTERVAL_DAY_TIME:
return (val, index, statement) -> statement.setLong(index, val.getLong(index));
case FLOAT:
return (val, index, statement) -> statement.setFloat(index, val.getFloat(index));
case DOUBLE:
return (val, index, statement) -> statement.setDouble(index, val.getDouble(index));
case CHAR:
case VARCHAR:
// value is BinaryString
return (val, index, statement) ->
statement.setString(index, val.getString(index).toString());
case BINARY:
case VARBINARY:
return (val, index, statement) -> statement.setBytes(index, val.getBinary(index));
case DATE:
return (val, index, statement) ->
statement.setDate(
index, Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
case TIME_WITHOUT_TIME_ZONE:
return (val, index, statement) ->
statement.setTime(
index,
Time.valueOf(
LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)));
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int timestampPrecision = ((TimestampType) type).getPrecision();
return (val, index, statement) ->
statement.setTimestamp(
index, val.getTimestamp(index, timestampPrecision).toTimestamp());
case DECIMAL:
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
return (val, index, statement) ->
statement.setBigDecimal(
index,
val.getDecimal(index, decimalPrecision, decimalScale)
.toBigDecimal());
case ARRAY:
case MAP:
case MULTISET:
case ROW:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.converter;

import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import com.starrocks.connector.flink.statement.FieldNamedPreparedStatement;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.data.RowData;

/**
* Converter that is responsible to convert between JDBC object and Flink SQL internal data
* structure {@link RowData}.
*/
@PublicEvolving
public interface JdbcRowConverter extends Serializable {

/**
* Convert data retrieved from {@link ResultSet} to internal {@link RowData}.
*
* @param resultSet ResultSet from JDBC
* @return resultSet to row
* @throws SQLException sql exception
*/
RowData toInternal(ResultSet resultSet) throws SQLException;

/**
* Convert data retrieved from Flink internal RowData to JDBC Object.
*
* @param rowData The given internal {@link RowData}.
* @param statement The statement to be filled.
* @return The filled statement.
* @throws SQLException if parameterIndex does not correspond to a parameter marker in the SQL statement;
* if a database access error occurs or this method is called on a closed PreparedStatement
*/
FieldNamedPreparedStatement toExternal(RowData rowData, FieldNamedPreparedStatement statement)
throws SQLException;
}
Loading