org.sonatype.plugins
diff --git a/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionIProvider.java b/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionIProvider.java
index 848d7089..4df02d6a 100644
--- a/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionIProvider.java
+++ b/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionIProvider.java
@@ -15,7 +15,9 @@
package com.starrocks.connector.flink.connection;
import org.apache.flink.annotation.Internal;
+
import java.sql.Connection;
+import java.sql.SQLException;
/**
* connection provider.
@@ -27,6 +29,17 @@ public interface StarRocksJdbcConnectionIProvider {
Connection reestablishConnection() throws Exception;
+ boolean isConnectionValid() throws SQLException;
+
+ /**
+ * Get existing connection or establish an new one if there is none.
+ *
+ * @return existing connection or newly established connection
+ * @throws SQLException sql exception
+ * @throws ClassNotFoundException driver class not found
+ */
+ Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;
+
void close();
-
+
}
diff --git a/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionProvider.java b/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionProvider.java
index 6b3843b1..7f2a8693 100644
--- a/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionProvider.java
+++ b/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionProvider.java
@@ -63,7 +63,30 @@ public Connection getConnection() throws SQLException, ClassNotFoundException {
@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
close();
- connection = getConnection();
+ return getOrEstablishConnection();
+ }
+
+ public boolean isConnectionValid() throws SQLException {
+ return connection != null && connection.isValid(60);
+ }
+
+ @Override
+ public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
+ if (isConnectionValid() && !connection.isClosed() ) {
+ return connection;
+ }
+ try {
+ Class.forName(jdbcOptions.getCjDriverName());
+ } catch (ClassNotFoundException ex) {
+ LOG.warn("can not found class {}, try class {}", jdbcOptions.getCjDriverName(), jdbcOptions.getDriverName());
+ Class.forName(jdbcOptions.getDriverName());
+ }
+ if (jdbcOptions.getUsername().isPresent()) {
+ connection = DriverManager.getConnection(jdbcOptions.getDbURL(), jdbcOptions.getUsername().get(),
+ jdbcOptions.getPassword().orElse(null));
+ } else {
+ connection = DriverManager.getConnection(jdbcOptions.getDbURL());
+ }
return connection;
}
diff --git a/src/main/java/com/starrocks/connector/flink/converter/AbstractJdbcRowConverter.java b/src/main/java/com/starrocks/connector/flink/converter/AbstractJdbcRowConverter.java
new file mode 100644
index 00000000..2cf2ef5a
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/converter/AbstractJdbcRowConverter.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.converter;
+
+import com.starrocks.connector.flink.statement.FieldNamedPreparedStatement;
+import com.starrocks.connector.flink.util.JdbcTypeUtil;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all converters that convert between JDBC object and Flink internal object.
+ */
+public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
+
+ protected final RowType rowType;
+ protected final JdbcDeserializationConverter[] toInternalConverters;
+ protected final JdbcSerializationConverter[] toExternalConverters;
+ protected final LogicalType[] fieldTypes;
+
+ public abstract String converterName();
+
+ public AbstractJdbcRowConverter(RowType rowType) {
+ this.rowType = checkNotNull(rowType);
+ this.fieldTypes =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ this.toInternalConverters = new JdbcDeserializationConverter[rowType.getFieldCount()];
+ this.toExternalConverters = new JdbcSerializationConverter[rowType.getFieldCount()];
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ toInternalConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i));
+ toExternalConverters[i] = createNullableExternalConverter(fieldTypes[i]);
+ }
+ }
+
+ @Override
+ public RowData toInternal(ResultSet resultSet) throws SQLException {
+ GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
+ for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
+ Object field = resultSet.getObject(pos + 1);
+ genericRowData.setField(pos, toInternalConverters[pos].deserialize(field));
+ }
+ return genericRowData;
+ }
+
+ @Override
+ public FieldNamedPreparedStatement toExternal(
+ RowData rowData, FieldNamedPreparedStatement statement) throws SQLException {
+ for (int index = 0; index < rowData.getArity(); index++) {
+ toExternalConverters[index].serialize(rowData, index, statement);
+ }
+ return statement;
+ }
+
+ /** Runtime converter to convert JDBC field to {@link RowData} type object. */
+ @FunctionalInterface
+ public interface JdbcDeserializationConverter extends Serializable {
+ /**
+ * Convert a jdbc field object of {@link ResultSet} to the internal data structure object.
+ *
+ * @param jdbcField A single field of a {@link ResultSet}
+ * @return Object
+ * @throws SQLException maybe
+ */
+ Object deserialize(Object jdbcField) throws SQLException;
+ }
+
+ /**
+ * Runtime converter to convert {@link RowData} field to java object and fill into the {@link
+ * PreparedStatement}.
+ */
+ @FunctionalInterface
+ public interface JdbcSerializationConverter extends Serializable {
+ void serialize(RowData rowData, int index, FieldNamedPreparedStatement statement)
+ throws SQLException;
+ }
+
+ /**
+ * Create a nullable runtime {@link JdbcDeserializationConverter} from given {@link
+ * LogicalType}.
+ * @param type row type
+ * @return an converter for deserialize
+ */
+ protected JdbcDeserializationConverter createNullableInternalConverter(LogicalType type) {
+ return wrapIntoNullableInternalConverter(createInternalConverter(type));
+ }
+
+ /**
+ *
+ * @param jdbcDeserializationConverter converter for deserialization
+ * @return wrapped converter
+ */
+ protected JdbcDeserializationConverter wrapIntoNullableInternalConverter(
+ JdbcDeserializationConverter jdbcDeserializationConverter) {
+ return val -> {
+ if (val == null) {
+ return null;
+ } else {
+ return jdbcDeserializationConverter.deserialize(val);
+ }
+ };
+ }
+
+ protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return val -> null;
+ case BOOLEAN:
+ case FLOAT:
+ case DOUBLE:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ case BINARY:
+ case VARBINARY:
+ case BIGINT:
+ case INTEGER:
+ return val -> val;
+ case TINYINT:
+ return val -> ((Integer) val).byteValue();
+ case SMALLINT:
+ // Converter for small type that casts value to int and then return short value,
+ // since
+ // JDBC 1.0 use int type for small values.
+ return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
+ case DECIMAL:
+ final int precision = ((DecimalType) type).getPrecision();
+ final int scale = ((DecimalType) type).getScale();
+ // using decimal(20, 0) to support db type bigint unsigned, user should define
+ // decimal(20, 0) in SQL,
+ // but other precision like decimal(30, 0) can work too from lenient consideration.
+ return val ->
+ val instanceof BigInteger
+ ? DecimalData.fromBigDecimal(
+ new BigDecimal((BigInteger) val, 0), precision, scale)
+ : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
+ case DATE:
+ return val -> (int) (((Date) val).toLocalDate().toEpochDay());
+ case TIME_WITHOUT_TIME_ZONE:
+ return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return val ->
+ val instanceof LocalDateTime
+ ? TimestampData.fromLocalDateTime((LocalDateTime) val)
+ : TimestampData.fromTimestamp((Timestamp) val);
+ case CHAR:
+ case VARCHAR:
+ return val -> StringData.fromString((String) val);
+ case ARRAY:
+ case ROW:
+ case MAP:
+ case MULTISET:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type:" + type);
+ }
+ }
+
+ protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
+ return wrapIntoNullableExternalConverter(createExternalConverter(type), type);
+ }
+
+ protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
+ JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
+ final int sqlType = JdbcTypeUtil.logicalTypeToSqlType(type.getTypeRoot());
+ return (val, index, statement) -> {
+ if (val == null
+ || val.isNullAt(index)
+ || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
+ statement.setNull(index, sqlType);
+ } else {
+ jdbcSerializationConverter.serialize(val, index, statement);
+ }
+ };
+ }
+
+ protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return (val, index, statement) ->
+ statement.setBoolean(index, val.getBoolean(index));
+ case TINYINT:
+ return (val, index, statement) -> statement.setByte(index, val.getByte(index));
+ case SMALLINT:
+ return (val, index, statement) -> statement.setShort(index, val.getShort(index));
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return (val, index, statement) -> statement.setInt(index, val.getInt(index));
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return (val, index, statement) -> statement.setLong(index, val.getLong(index));
+ case FLOAT:
+ return (val, index, statement) -> statement.setFloat(index, val.getFloat(index));
+ case DOUBLE:
+ return (val, index, statement) -> statement.setDouble(index, val.getDouble(index));
+ case CHAR:
+ case VARCHAR:
+ // value is BinaryString
+ return (val, index, statement) ->
+ statement.setString(index, val.getString(index).toString());
+ case BINARY:
+ case VARBINARY:
+ return (val, index, statement) -> statement.setBytes(index, val.getBinary(index));
+ case DATE:
+ return (val, index, statement) ->
+ statement.setDate(
+ index, Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
+ case TIME_WITHOUT_TIME_ZONE:
+ return (val, index, statement) ->
+ statement.setTime(
+ index,
+ Time.valueOf(
+ LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)));
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int timestampPrecision = ((TimestampType) type).getPrecision();
+ return (val, index, statement) ->
+ statement.setTimestamp(
+ index, val.getTimestamp(index, timestampPrecision).toTimestamp());
+ case DECIMAL:
+ final int decimalPrecision = ((DecimalType) type).getPrecision();
+ final int decimalScale = ((DecimalType) type).getScale();
+ return (val, index, statement) ->
+ statement.setBigDecimal(
+ index,
+ val.getDecimal(index, decimalPrecision, decimalScale)
+ .toBigDecimal());
+ case ARRAY:
+ case MAP:
+ case MULTISET:
+ case ROW:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type:" + type);
+ }
+ }
+}
diff --git a/src/main/java/com/starrocks/connector/flink/converter/JdbcRowConverter.java b/src/main/java/com/starrocks/connector/flink/converter/JdbcRowConverter.java
new file mode 100644
index 00000000..e53129b0
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/converter/JdbcRowConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.converter;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import com.starrocks.connector.flink.statement.FieldNamedPreparedStatement;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Converter that is responsible to convert between JDBC object and Flink SQL internal data
+ * structure {@link RowData}.
+ */
+@PublicEvolving
+public interface JdbcRowConverter extends Serializable {
+
+ /**
+ * Convert data retrieved from {@link ResultSet} to internal {@link RowData}.
+ *
+ * @param resultSet ResultSet from JDBC
+ * @return resultSet to row
+ * @throws SQLException sql exception
+ */
+ RowData toInternal(ResultSet resultSet) throws SQLException;
+
+ /**
+ * Convert data retrieved from Flink internal RowData to JDBC Object.
+ *
+ * @param rowData The given internal {@link RowData}.
+ * @param statement The statement to be filled.
+ * @return The filled statement.
+ * @throws SQLException if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ FieldNamedPreparedStatement toExternal(RowData rowData, FieldNamedPreparedStatement statement)
+ throws SQLException;
+}
diff --git a/src/main/java/com/starrocks/connector/flink/dialect/AbstractDialect.java b/src/main/java/com/starrocks/connector/flink/dialect/AbstractDialect.java
new file mode 100644
index 00000000..c78f7ea8
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/dialect/AbstractDialect.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Base class for {@link JdbcDialect JdbcDialects} that implements basic data type validation and
+ * the construction of basic {@code INSERT}, {@code UPDATE}, {@code DELETE}, and {@code SELECT}
+ * statements.
+ *
+ * Implementors should be careful to check the default SQL statements are performant for their
+ * specific dialect and override them if necessary.
+ */
+@PublicEvolving
+public abstract class AbstractDialect implements JdbcDialect {
+
+ @Override
+ public void validate(RowType rowType) throws ValidationException {
+ for (RowType.RowField field : rowType.getFields()) {
+ // TODO: We can't convert VARBINARY(n) data type to
+ // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+ // LegacyTypeInfoDataTypeConverter when n is smaller
+ // than Integer.MAX_VALUE
+ if (!supportedTypes().contains(field.getType().getTypeRoot())
+ || (field.getType() instanceof VarBinaryType
+ && Integer.MAX_VALUE
+ != ((VarBinaryType) field.getType()).getLength())) {
+ throw new ValidationException(
+ String.format(
+ "The %s dialect doesn't support type: %s.",
+ dialectName(), field.getType()));
+ }
+
+ if (field.getType() instanceof DecimalType) {
+ Range range =
+ decimalPrecisionRange()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ String.format(
+ "JdbcDialect %s supports DECIMAL type but no precision range has been set. "
+ + "Ensure AbstractDialect#decimalPrecisionRange() is overriden to return a valid Range",
+ dialectName())));
+ int precision = ((DecimalType) field.getType()).getPrecision();
+ if (precision > range.max || precision < range.min) {
+ throw new ValidationException(
+ String.format(
+ "The precision of field '%s' is out of the DECIMAL "
+ + "precision range [%d, %d] supported by %s dialect.",
+ field.getName(), range.min, range.max, dialectName()));
+ }
+ }
+
+ if (field.getType() instanceof TimestampType) {
+ Range range =
+ timestampPrecisionRange()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ String.format(
+ "JdbcDialect %s supports TIMESTAMP type but no precision range has been set."
+ + "Ensure AbstractDialect#timestampPrecisionRange() is overriden to return a valid Range",
+ dialectName())));
+ int precision = ((TimestampType) field.getType()).getPrecision();
+ if (precision > range.max || precision < range.min) {
+ throw new ValidationException(
+ String.format(
+ "The precision of field '%s' is out of the TIMESTAMP "
+ + "precision range [%d, %d] supported by %s dialect.",
+ field.getName(), range.min, range.max, dialectName()));
+ }
+ }
+ }
+ }
+
+
+
+
+
+ /**
+ * @return The inclusive range [min,max] of supported precisions for {@link TimestampType}
+ * columns. None if timestamp type is not supported.
+ */
+ public Optional timestampPrecisionRange() {
+ return Optional.empty();
+ }
+
+ /**
+ * @return The inclusive range [min,max] of supported precisions for {@link DecimalType}
+ * columns. None if decimal type is not supported.
+ */
+ public Optional decimalPrecisionRange() {
+ return Optional.empty();
+ }
+
+ /**
+ * Defines the set of supported types for the dialect. If the dialect supports {@code DECIMAL}
+ * or {@code TIMESTAMP} types, be sure to override {@link #decimalPrecisionRange()} and {@link
+ * #timestampPrecisionRange()} respectively.
+ *
+ * @return a set of logical type roots.
+ */
+ public abstract Set supportedTypes();
+
+ @PublicEvolving
+ public static class Range {
+ private final int min;
+
+ private final int max;
+
+ public static Range of(int min, int max) {
+ Preconditions.checkArgument(
+ min <= max,
+ String.format(
+ "The range min value in range %d must be <= max value %d", min, max));
+ return new Range(min, max);
+ }
+
+ private Range(int min, int max) {
+ this.min = min;
+ this.max = max;
+ }
+ }
+}
diff --git a/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialect.java b/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialect.java
new file mode 100644
index 00000000..c32eac7f
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialect.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.dialect;
+
+import java.io.Serializable;
+import com.starrocks.connector.flink.converter.JdbcRowConverter;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable
+ * and stateless.
+ *
+ */
+@PublicEvolving
+public interface JdbcDialect extends Serializable {
+
+ /**
+ * Get the name of jdbc dialect.
+ *
+ * @return the dialect name.
+ */
+ String dialectName();
+
+ /**
+ * Get converter that convert jdbc object and Flink internal object each other.
+ *
+ * @param rowType the given row type
+ * @return a row converter for the database
+ */
+ JdbcRowConverter getRowConverter(RowType rowType);
+
+
+ /**
+ * Check if this dialect instance support a specific data type in table schema.
+ *
+ * @param rowType the physical table datatype of a row in the database table.
+ * @exception ValidationException in case of the table schema contains unsupported type.
+ */
+ void validate(RowType rowType) throws ValidationException;
+
+
+ /**
+ * Quotes the identifier.
+ *
+ * Used to put quotes around the identifier if the column name is a reserved keyword or
+ * contains characters requiring quotes (e.g., space).
+ * @param identifier identifier
+ *
+ * @return the quoted identifier.
+ */
+ String quoteIdentifier(String identifier);
+
+
+
+}
diff --git a/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialectFactory.java b/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialectFactory.java
new file mode 100644
index 00000000..0704cc47
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialectFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A factory to create a specific {@link JdbcDialect}. This factory is used with Java's Service
+ * Provider Interfaces (SPI) for discovering.
+ *
+ *
Classes that implement this interface can be added to the
+ * "META_INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory" file of a JAR file
+ * in the current classpath to be found.
+ *
+ * @see JdbcDialect
+ */
+@PublicEvolving
+public interface JdbcDialectFactory {
+
+ /**
+ * Retrieves whether the dialect thinks that it can open a connection to the given URL.
+ * Typically, dialects will return true
if they understand the sub-protocol
+ * specified in the URL and false
if they do not.
+ *
+ * @param url the URL of the database
+ * @return true
if this dialect understands the given URL; false
+ * otherwise.
+ */
+ boolean acceptsURL(String url);
+
+ /** @return Creates a new instance of the {@link JdbcDialect}. */
+ JdbcDialect create();
+}
diff --git a/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialectLoader.java b/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialectLoader.java
new file mode 100644
index 00000000..be012758
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialectLoader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.dialect;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility for working with {@link JdbcDialect}. */
+@Internal
+public final class JdbcDialectLoader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcDialectLoader.class);
+
+ private JdbcDialectLoader() {}
+
+ /**
+ * Loads the unique JDBC Dialect that can handle the given database url.
+ *
+ * @param url A database URL.
+ * @param classLoader the classloader used to load the factory
+ * @throws IllegalStateException if the loader cannot find exactly one dialect that can
+ * unambiguously process the given database URL.
+ * @return The loaded dialect.
+ */
+ public static JdbcDialect load(String url, ClassLoader classLoader) {
+ List foundFactories = discoverFactories(classLoader);
+
+ if (foundFactories.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Could not find any jdbc dialect factories that implement '%s' in the classpath.",
+ JdbcDialectFactory.class.getName()));
+ }
+
+ final List matchingFactories =
+ foundFactories.stream().filter(f -> f.acceptsURL(url)).collect(Collectors.toList());
+
+ if (matchingFactories.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Could not find any jdbc dialect factory that can handle url '%s' that implements '%s' in the classpath.\n\n"
+ + "Available factories are:\n\n"
+ + "%s",
+ url,
+ JdbcDialectFactory.class.getName(),
+ foundFactories.stream()
+ .map(f -> f.getClass().getName())
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ if (matchingFactories.size() > 1) {
+ throw new IllegalStateException(
+ String.format(
+ "Multiple jdbc dialect factories can handle url '%s' that implement '%s' found in the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ url,
+ JdbcDialectFactory.class.getName(),
+ matchingFactories.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return matchingFactories.get(0).create();
+ }
+
+ private static List discoverFactories(ClassLoader classLoader) {
+ try {
+ final List result = new LinkedList<>();
+ ServiceLoader.load(JdbcDialectFactory.class, classLoader)
+ .iterator()
+ .forEachRemaining(result::add);
+ return result;
+ } catch (ServiceConfigurationError e) {
+ LOG.error("Could not load service provider for jdbc dialects factory.", e);
+ throw new RuntimeException(
+ "Could not load service provider for jdbc dialects factory.", e);
+ }
+ }
+}
diff --git a/src/main/java/com/starrocks/connector/flink/dialect/MySQLRowConverter.java b/src/main/java/com/starrocks/connector/flink/dialect/MySQLRowConverter.java
new file mode 100644
index 00000000..5948f474
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/dialect/MySQLRowConverter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.dialect;
+
+import com.starrocks.connector.flink.converter.AbstractJdbcRowConverter;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * MySQL.
+ */
+@Internal
+public class MySQLRowConverter extends AbstractJdbcRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String converterName() {
+ return "MySQL";
+ }
+
+ public MySQLRowConverter(RowType rowType) {
+ super(rowType);
+ }
+}
diff --git a/src/main/java/com/starrocks/connector/flink/dialect/MySqlDialect.java b/src/main/java/com/starrocks/connector/flink/dialect/MySqlDialect.java
new file mode 100644
index 00000000..5bb3dc80
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/dialect/MySqlDialect.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.dialect;
+
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import com.starrocks.connector.flink.converter.JdbcRowConverter;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+/** JDBC dialect for MySQL. */
+@Internal
+public class MySqlDialect extends AbstractDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ // Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
+ // https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
+ private static final int MAX_TIMESTAMP_PRECISION = 6;
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+
+ // Define MAX/MIN precision of DECIMAL type according to Mysql docs:
+ // https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
+ private static final int MAX_DECIMAL_PRECISION = 65;
+ private static final int MIN_DECIMAL_PRECISION = 1;
+
+ @Override
+ public JdbcRowConverter getRowConverter(RowType rowType) {
+ return new MySQLRowConverter(rowType);
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+
+
+ @Override
+ public String dialectName() {
+ return "MySQL";
+ }
+
+ @Override
+ public Optional decimalPrecisionRange() {
+ return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
+ }
+
+ @Override
+ public Optional timestampPrecisionRange() {
+ return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+
+ @Override
+ public Set supportedTypes() {
+ // The data types used in Mysql are list at:
+ // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
+
+ // TODO: We can't convert BINARY data type to
+ // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+ // LegacyTypeInfoDataTypeConverter.
+ return EnumSet.of(
+ LogicalTypeRoot.CHAR,
+ LogicalTypeRoot.VARCHAR,
+ LogicalTypeRoot.BOOLEAN,
+ LogicalTypeRoot.VARBINARY,
+ LogicalTypeRoot.DECIMAL,
+ LogicalTypeRoot.TINYINT,
+ LogicalTypeRoot.SMALLINT,
+ LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.BIGINT,
+ LogicalTypeRoot.FLOAT,
+ LogicalTypeRoot.DOUBLE,
+ LogicalTypeRoot.DATE,
+ LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ }
+
+}
diff --git a/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatement.java b/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatement.java
new file mode 100644
index 00000000..81ff4f0d
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatement.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.statement;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * This is a wrapper around {@link PreparedStatement} and allows the users to set parameters by name
+ * instead of by index. This allows users to use the same variable parameter multiple times in a
+ * statement.
+ *
+ * Code such as this:
+ *
+ *
+ * Connection con = getConnection();
+ * String query = "select * from my_table where first_name=? or last_name=?";
+ * PreparedStatement st = con.prepareStatement(query);
+ * st.setString(1, "bob");
+ * st.setString(2, "bob");
+ * ResultSet rs = st.executeQuery();
+ *
+ *
+ * Can be replaced with:
+ *
+ *
+ * Connection con = getConnection();
+ * String query = "select * from my_table where first_name=:name or last_name=:name";
+ * FieldNamedPreparedStatement st = FieldNamedPreparedStatement.prepareStatement(con, query, new String[]{"name"});
+ * st.setString(0, "bob");
+ * ResultSet rs = st.executeQuery();
+ *
+ */
+@PublicEvolving
+public interface FieldNamedPreparedStatement extends AutoCloseable {
+
+ /**
+ * Creates a NamedPreparedStatement
object for sending parameterized SQL statements
+ * to the database.
+ *
+ * @param connection the connection used to connect to database.
+ * @param sql an SQL statement that may contain one or more ':fieldName' as parameter
+ * placeholders
+ * @param fieldNames the field names in schema order used as the parameter names
+ * @return filled prepared statement
+ * @throws SQLException – if a database access error occurs or this method is called on a closed connection
+ */
+ static FieldNamedPreparedStatement prepareStatement(
+ Connection connection, String sql, String[] fieldNames) throws SQLException {
+ return FieldNamedPreparedStatementImpl.prepareStatement(connection, sql, fieldNames);
+ }
+
+ /**
+ * Executes the SQL query in this NamedPreparedStatement
object and returns the
+ * ResultSet
object generated by the query.
+ *
+ * @see PreparedStatement#executeQuery()
+ * @return a ResultSet object that contains the data produced by the query; never null
+ * @throws SQLException – if a database access error occurs; this method is called on a closed
+ * PreparedStatement or the SQL statement does not return a ResultSet object
+ */
+ ResultSet executeQuery() throws SQLException;
+
+
+ /**
+ * Sets the designated parameter to SQL NULL
.
+ *
+ * Note: You must specify the parameter's SQL type.
+ *
+ * @see PreparedStatement#setNull(int, int)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param sqlType – the SQL type code defined in java.sql.Types
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ */
+ void setNull(int fieldIndex, int sqlType) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given Java boolean
value. The driver
+ * converts this to an SQL BIT
or BOOLEAN
value when it sends it to
+ * the database.
+ *
+ * @see PreparedStatement#setBoolean(int, boolean)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setBoolean(int fieldIndex, boolean x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given Java byte
value. The driver converts
+ * this to an SQL TINYINT
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setByte(int, byte)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setByte(int fieldIndex, byte x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given Java short
value. The driver converts
+ * this to an SQL SMALLINT
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setShort(int, short)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setShort(int fieldIndex, short x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given Java int
value. The driver converts
+ * this to an SQL INTEGER
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setInt(int, int)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setInt(int fieldIndex, int x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given Java long
value. The driver converts
+ * this to an SQL BIGINT
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setLong(int, long)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setLong(int fieldIndex, long x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given Java float
value. The driver converts
+ * this to an SQL REAL
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setFloat(int, float)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setFloat(int fieldIndex, float x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given Java double
value. The driver
+ * converts this to an SQL DOUBLE
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setDouble(int, double)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setDouble(int fieldIndex, double x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given java.math.BigDecimal
value. The
+ * driver converts this to an SQL NUMERIC
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setBigDecimal(int, BigDecimal)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setBigDecimal(int fieldIndex, BigDecimal x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given Java String
value. The driver
+ * converts this to an SQL VARCHAR
or LONGVARCHAR
value (depending on
+ * the argument's size relative to the driver's limits on VARCHAR
values) when it
+ * sends it to the database.
+ *
+ * @see PreparedStatement#setString(int, String)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setString(int fieldIndex, String x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given Java array of bytes. The driver converts this to
+ * an SQL VARBINARY
or LONGVARBINARY
(depending on the argument's size
+ * relative to the driver's limits on VARBINARY
values) when it sends it to the
+ * database.
+ *
+ * @see PreparedStatement#setBytes(int, byte[])
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setBytes(int fieldIndex, byte[] x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given java.sql.Date
value using the default
+ * time zone of the virtual machine that is running the application. The driver converts this to
+ * an SQL DATE
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setDate(int, Date)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setDate(int fieldIndex, Date x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given java.sql.Time
value. The driver
+ * converts this to an SQL TIME
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setTime(int, Time)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setTime(int fieldIndex, Time x) throws SQLException;
+
+ /**
+ * Sets the designated parameter to the given java.sql.Timestamp
value. The driver
+ * converts this to an SQL TIMESTAMP
value when it sends it to the database.
+ *
+ * @see PreparedStatement#setTimestamp(int, Timestamp)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setTimestamp(int fieldIndex, Timestamp x) throws SQLException;
+
+ /**
+ * Sets the value of the designated parameter using the given object.
+ *
+ * @see PreparedStatement#setObject(int, Object)
+ * @param fieldIndex – the first parameter is 1, the second is 2, ...
+ * @param x – the parameter value
+ * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement;
+ * if a database access error occurs or this method is called on a closed PreparedStatement
+ */
+ void setObject(int fieldIndex, Object x) throws SQLException;
+
+ /**
+ * Releases this Statement
object's database and JDBC resources immediately instead
+ * of waiting for this to happen when it is automatically closed. It is generally good practice
+ * to release resources as soon as you are finished with them to avoid tying up database
+ * resources.
+ *
+ * @see PreparedStatement#close()
+ * @throws SQLException – if a database access error occurs
+ */
+ void close() throws SQLException;
+}
diff --git a/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatementImpl.java b/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatementImpl.java
new file mode 100644
index 00000000..32407663
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatementImpl.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.statement;
+
+import java.math.BigDecimal;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple implementation of {@link FieldNamedPreparedStatement}.
+ */
+public class FieldNamedPreparedStatementImpl implements FieldNamedPreparedStatement {
+
+ private final PreparedStatement statement;
+ private final int[][] indexMapping;
+
+ private FieldNamedPreparedStatementImpl(PreparedStatement statement, int[][] indexMapping) {
+ this.statement = statement;
+ this.indexMapping = indexMapping;
+ }
+
+
+ @Override
+ public ResultSet executeQuery() throws SQLException {
+ return statement.executeQuery();
+ }
+
+ @Override
+ public void setNull(int fieldIndex, int sqlType) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setNull(index, sqlType);
+ }
+ }
+
+ @Override
+ public void setBoolean(int fieldIndex, boolean x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setBoolean(index, x);
+ }
+ }
+
+ @Override
+ public void setByte(int fieldIndex, byte x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setByte(index, x);
+ }
+ }
+
+ @Override
+ public void setShort(int fieldIndex, short x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setShort(index, x);
+ }
+ }
+
+ @Override
+ public void setInt(int fieldIndex, int x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setInt(index, x);
+ }
+ }
+
+ @Override
+ public void setLong(int fieldIndex, long x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setLong(index, x);
+ }
+ }
+
+ @Override
+ public void setFloat(int fieldIndex, float x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setFloat(index, x);
+ }
+ }
+
+ @Override
+ public void setDouble(int fieldIndex, double x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setDouble(index, x);
+ }
+ }
+
+ @Override
+ public void setBigDecimal(int fieldIndex, BigDecimal x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setBigDecimal(index, x);
+ }
+ }
+
+ @Override
+ public void setString(int fieldIndex, String x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setString(index, x);
+ }
+ }
+
+ @Override
+ public void setBytes(int fieldIndex, byte[] x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setBytes(index, x);
+ }
+ }
+
+ @Override
+ public void setDate(int fieldIndex, Date x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setDate(index, x);
+ }
+ }
+
+ @Override
+ public void setTime(int fieldIndex, Time x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setTime(index, x);
+ }
+ }
+
+ @Override
+ public void setTimestamp(int fieldIndex, Timestamp x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setTimestamp(index, x);
+ }
+ }
+
+ @Override
+ public void setObject(int fieldIndex, Object x) throws SQLException {
+ for (int index : indexMapping[fieldIndex]) {
+ statement.setObject(index, x);
+ }
+ }
+
+ @Override
+ public void close() throws SQLException {
+ statement.close();
+ }
+
+ // ----------------------------------------------------------------------------------------
+
+ public static FieldNamedPreparedStatement prepareStatement(
+ Connection connection, String sql, String[] fieldNames) throws SQLException {
+ checkNotNull(connection, "connection must not be null.");
+ checkNotNull(sql, "sql must not be null.");
+ checkNotNull(fieldNames, "fieldNames must not be null.");
+
+ if (sql.contains("?")) {
+ throw new IllegalArgumentException("SQL statement must not contain ? character.");
+ }
+
+ HashMap> parameterMap = new HashMap<>();
+ String parsedSQL = parseNamedStatement(sql, parameterMap);
+ // currently, the statements must contain all the field parameters
+ checkArgument(parameterMap.size() == fieldNames.length);
+ int[][] indexMapping = new int[fieldNames.length][];
+ for (int i = 0; i < fieldNames.length; i++) {
+ String fieldName = fieldNames[i];
+ checkArgument(
+ parameterMap.containsKey(fieldName),
+ fieldName + " doesn't exist in the parameters of SQL statement: " + sql);
+ indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray();
+ }
+
+ return new FieldNamedPreparedStatementImpl(
+ connection.prepareStatement(parsedSQL), indexMapping);
+ }
+
+ /**
+ * Parses a sql with named parameters. The parameter-index mappings are put into the map, and
+ * the parsed sql is returned.
+ *
+ * @param sql sql to parse
+ * @param paramMap map to hold parameter-index mappings
+ * @return the parsed sql
+ */
+ public static String parseNamedStatement(String sql, Map> paramMap) {
+ StringBuilder parsedSql = new StringBuilder();
+ int fieldIndex = 1; // SQL statement parameter index starts from 1
+ int length = sql.length();
+ for (int i = 0; i < length; i++) {
+ char c = sql.charAt(i);
+ if (':' == c) {
+ int j = i + 1;
+ while (j < length && Character.isJavaIdentifierPart(sql.charAt(j))) {
+ j++;
+ }
+ String parameterName = sql.substring(i + 1, j);
+ checkArgument(
+ !parameterName.isEmpty(),
+ "Named parameters in SQL statement must not be empty.");
+ paramMap.computeIfAbsent(parameterName, n -> new ArrayList<>()).add(fieldIndex);
+ fieldIndex++;
+ i = j - 1;
+ parsedSql.append('?');
+ } else {
+ parsedSql.append(c);
+ }
+ }
+ return parsedSql.toString();
+ }
+}
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java
index 719cd35e..6e13a935 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java
@@ -14,122 +14,169 @@
package com.starrocks.connector.flink.table.source;
-import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
-import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets;
-import com.starrocks.connector.flink.table.source.struct.QueryInfo;
-import com.starrocks.connector.flink.table.source.struct.SelectColumn;
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
+import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
+import com.starrocks.connector.flink.converter.JdbcRowConverter;
+import com.starrocks.connector.flink.dialect.MySqlDialect;
+import com.starrocks.connector.flink.statement.FieldNamedPreparedStatement;
import com.starrocks.connector.flink.tools.EnvUtils;
-import org.apache.flink.table.data.GenericRowData;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.functions.LookupFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
+public class StarRocksDynamicLookupFunction extends LookupFunction {
-public class StarRocksDynamicLookupFunction extends TableFunction {
-
private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicLookupFunction.class);
-
- private final ColumnRichInfo[] filterRichInfos;
- private final StarRocksSourceOptions sourceOptions;
- private QueryInfo queryInfo;
- private final SelectColumn[] selectColumns;
- private final List columnRichInfos;
-
- private final long cacheMaxSize;
- private final long cacheExpireMs;
+
private final int maxRetryTimes;
- // cache for lookup data
- private Map> cacheMap;
+ private final String[] keyNames;
+
+ private final String query;
+
+ private final transient StarRocksJdbcConnectionProvider connectionProvider;
+
+ private transient FieldNamedPreparedStatement statement;
- private transient long nextLoadTime;
+ private final JdbcRowConverter lookupKeyRowConverter;
- public StarRocksDynamicLookupFunction(StarRocksSourceOptions sourceOptions,
- ColumnRichInfo[] filterRichInfos,
- List columnRichInfos,
- SelectColumn[] selectColumns
- ) {
- this.sourceOptions = sourceOptions;
- this.filterRichInfos = filterRichInfos;
- this.columnRichInfos = columnRichInfos;
- this.selectColumns = selectColumns;
+ private final JdbcRowConverter jdbcRowConverter;
+
+ public StarRocksDynamicLookupFunction(StarRocksSourceOptions sourceOptions,
+ String[] fieldNames,
+ DataType[] fieldTypes,
+ String[] keyNames,
+ RowType rowType
+ ) {
- this.cacheMaxSize = sourceOptions.getLookupCacheMaxRows();
- this.cacheExpireMs = sourceOptions.getLookupCacheTTL();
this.maxRetryTimes = sourceOptions.getLookupMaxRetries();
-
- this.cacheMap = new HashMap<>();
- this.nextLoadTime = -1L;
+
+ this.keyNames = keyNames;
+ MySqlDialect mySqlDialect = new MySqlDialect();
+ this.query = lookupSql(sourceOptions.getTableName(), fieldNames, keyNames);
+ connectionProvider = new StarRocksJdbcConnectionProvider(
+ new StarRocksJdbcConnectionOptions(
+ sourceOptions.getJdbcUrl(), sourceOptions.getUsername(),
+ sourceOptions.getPassword()));
+ this.jdbcRowConverter = mySqlDialect.getRowConverter(rowType);
+ List nameList = Arrays.asList(fieldNames);
+ DataType[] keyTypes =
+ Arrays.stream(keyNames)
+ .map(
+ s -> {
+ checkArgument(
+ nameList.contains(s),
+ "keyName %s can't find in fieldNames %s.",
+ s,
+ nameList);
+ return fieldTypes[nameList.indexOf(s)];
+ })
+ .toArray(DataType[]::new);
+
+ this.lookupKeyRowConverter = mySqlDialect.getRowConverter(RowType.of(
+ Arrays.stream(keyTypes)
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new)));
+
}
-
+
+ private String lookupSql(String tableName, String[] selectFields, String[] conditionFields){
+ String selectExpressions =
+ Arrays.stream(selectFields)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String fieldExpressions =
+ Arrays.stream(conditionFields)
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(" AND "));
+ return "SELECT "
+ + selectExpressions
+ + " FROM "
+ + quoteIdentifier(tableName)
+ + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+ }
+
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
+ establishConnectionAndStatement();
LOG.info("Open lookup function. {}", EnvUtils.getGitInformation());
}
- public void eval(Object... keys) {
- reloadData();
- Row keyRow = Row.of(keys);
- List curList = cacheMap.get(keyRow);
- if (curList != null) {
- curList.parallelStream().forEach(this::collect);
- }
- }
+ @Override
+ public Collection lookup(RowData keyRow) {
+ for (int retry = 0; retry <= maxRetryTimes; retry++) {
+ try {
+ statement = lookupKeyRowConverter.toExternal(keyRow, statement);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ ArrayList rows = new ArrayList<>();
+ while (resultSet.next()) {
+ RowData row = jdbcRowConverter.toInternal(resultSet);
+ rows.add(row);
+ }
+ rows.trimToSize();
+ return rows;
+ }
+ } catch (SQLException e) {
+ LOG.error(String.format("StarRocks executeBatch error, retry times = %d", retry), e);
+ if (retry >= maxRetryTimes) {
+ throw new RuntimeException("Execution of StarRocks statement failed.", e);
+ }
- private void reloadData() {
- if (nextLoadTime > System.currentTimeMillis()) {
- return;
- }
- if (nextLoadTime > 0) {
- LOG.info("Lookup join cache has expired after {} (ms), reloading", this.cacheExpireMs);
- } else {
- LOG.info("Populating lookup join cache");
- }
- cacheMap.clear();
-
- StringBuilder sqlSb = new StringBuilder("select * from ");
- sqlSb.append("`").append(sourceOptions.getDatabaseName()).append("`");
- sqlSb.append(".");
- sqlSb.append("`" + sourceOptions.getTableName() + "`");
- LOG.info("LookUpFunction SQL [{}]", sqlSb.toString());
- this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, sqlSb.toString());
- List> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(1, queryInfo);
- cacheMap = lists.get(0).parallelStream().flatMap(beXTablets -> {
- StarRocksSourceBeReader beReader = new StarRocksSourceBeReader(
- beXTablets.getBeNode(),
- columnRichInfos,
- selectColumns,
- sourceOptions);
- beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(), sourceOptions);
- beReader.startToRead();
- List tmpDataList = new ArrayList<>();
- while (beReader.hasNext()) {
- RowData row = beReader.getNext();
- tmpDataList.add(row);
- }
- return tmpDataList.stream();
- }).collect(Collectors.groupingBy(row -> {
- GenericRowData gRowData = (GenericRowData)row;
- Object[] keyObj = new Object[filterRichInfos.length];
- for (int i = 0; i < filterRichInfos.length; i ++) {
- keyObj[i] = gRowData.getField(filterRichInfos[i].getColumnIndexInSchema());
+ try {
+ if (!connectionProvider.isConnectionValid()) {
+ statement.close();
+ connectionProvider.close();
+ establishConnectionAndStatement();
+ }
+ } catch (SQLException | ClassNotFoundException exception) {
+ LOG.error(
+ "StarRocks connection is not valid, and reestablish connection failed",
+ exception);
+ throw new RuntimeException("Reestablish StarRocks connection failed", exception);
+ }
+
+ try {
+ Thread.sleep(1000L * retry);
+ } catch (InterruptedException e1) {
+ throw new RuntimeException(e1);
+ }
}
- return Row.of(keyObj);
- }));
- nextLoadTime = System.currentTimeMillis() + this.cacheExpireMs;
+ }
+ return Collections.emptyList();
}
+
@Override
public void close() throws Exception {
+ connectionProvider.close();
super.close();
}
+
+ private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
+ Connection dbConn = connectionProvider.getConnection();
+ statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, keyNames);
+ }
}
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java
index 0764ed5d..0bd06cb6 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java
@@ -14,7 +14,6 @@
package com.starrocks.connector.flink.table.source;
-import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
@@ -25,29 +24,47 @@
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-public class StarRocksDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown, SupportsFilterPushDown, SupportsProjectionPushDown {
+public class StarRocksDynamicTableSource implements ScanTableSource, LookupTableSource,
+ SupportsLimitPushDown, SupportsFilterPushDown, SupportsProjectionPushDown {
private final TableSchema flinkSchema;
private final StarRocksSourceOptions options;
private final PushDownHolder pushDownHolder;
- public StarRocksDynamicTableSource(StarRocksSourceOptions options, TableSchema schema, PushDownHolder pushDownHolder) {
+ @Nullable
+ private final LookupCache cache;
+
+ private final DataType physicalRowDataType;
+
+ public StarRocksDynamicTableSource(StarRocksSourceOptions options,
+ TableSchema schema,
+ PushDownHolder pushDownHolder,
+ @Nullable LookupCache cache,
+ DataType physicalRowDataType
+ ) {
this.options = options;
this.flinkSchema = schema;
this.pushDownHolder = pushDownHolder;
+ this.cache = cache;
+ this.physicalRowDataType = physicalRowDataType;
}
@Override
@@ -58,39 +75,45 @@ public ChangelogMode getChangelogMode() {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
StarRocksDynamicSourceFunction sourceFunction = new StarRocksDynamicSourceFunction(
- options, flinkSchema,
- this.pushDownHolder.getFilter(),
- this.pushDownHolder.getLimit(),
- this.pushDownHolder.getSelectColumns(),
- this.pushDownHolder.getColumns(),
+ options, flinkSchema,
+ this.pushDownHolder.getFilter(),
+ this.pushDownHolder.getLimit(),
+ this.pushDownHolder.getSelectColumns(),
+ this.pushDownHolder.getColumns(),
this.pushDownHolder.getQueryType());
return SourceFunctionProvider.of(sourceFunction, true);
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
- int[] projectedFields = Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray();
- ColumnRichInfo[] filerRichInfo = new ColumnRichInfo[projectedFields.length];
- for (int i = 0; i < projectedFields.length; i ++) {
- ColumnRichInfo columnRichInfo = new ColumnRichInfo(
- this.flinkSchema.getFieldName(projectedFields[i]).get(),
- projectedFields[i],
- this.flinkSchema.getFieldDataType(projectedFields[i]).get()
- );
- filerRichInfo[i] = columnRichInfo;
+ // StarRocks only support non-nested look up keys
+ String[] keyNames = new String[context.getKeys().length];
+ for (int i = 0; i < keyNames.length; i++) {
+ int[] innerKeyArr = context.getKeys()[i];
+ Preconditions.checkArgument(
+ innerKeyArr.length == 1, "StarRocks only support non-nested look up keys");
+ keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
+ }
+ final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+ StarRocksDynamicLookupFunction lookupFunction =
+ new StarRocksDynamicLookupFunction(
+ options,
+ DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+ DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
+ keyNames,
+ rowType);
+ if (cache != null) {
+ return PartialCachingLookupProvider.of(lookupFunction, cache);
+ } else {
+ return LookupFunctionProvider.of(lookupFunction);
}
-
- Map columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema);
- List ColumnRichInfos = StarRocksSourceCommonFunc.genColumnRichInfo(columnMap);
- SelectColumn[] selectColumns = StarRocksSourceCommonFunc.genSelectedColumns(columnMap, this.options, ColumnRichInfos);
-
- StarRocksDynamicLookupFunction tableFunction = new StarRocksDynamicLookupFunction(this.options, filerRichInfo, ColumnRichInfos, selectColumns);
- return TableFunctionProvider.of(tableFunction);
}
@Override
public DynamicTableSource copy() {
- return new StarRocksDynamicTableSource(this.options, this.flinkSchema, this.pushDownHolder);
+ return new StarRocksDynamicTableSource(this.options, this.flinkSchema, this.pushDownHolder,
+ cache, physicalRowDataType);
}
@Override
@@ -106,15 +129,16 @@ public boolean supportsNestedProjection() {
@Override
public void applyProjection(int[][] projectedFields) {
// if columns = "*", this func will not be called, so 'selectColumns' will be null
- int[] curProjectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray();
- if (curProjectedFields.length == 0 ) {
+ int[] curProjectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0])
+ .toArray();
+ if (curProjectedFields.length == 0) {
this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QueryCount);
return;
}
this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QuerySomeColumns);
ArrayList columnList = new ArrayList<>();
- ArrayList selectColumns = new ArrayList();
+ ArrayList selectColumns = new ArrayList<>();
for (int index : curProjectedFields) {
String columnName = flinkSchema.getFieldName(index).get();
columnList.add(columnName);
@@ -122,7 +146,8 @@ public void applyProjection(int[][] projectedFields) {
}
String columns = String.join(", ", columnList);
this.pushDownHolder.setColumns(columns);
- this.pushDownHolder.setSelectColumns(selectColumns.toArray(new SelectColumn[selectColumns.size()]));
+ this.pushDownHolder.setSelectColumns(
+ selectColumns.toArray(new SelectColumn[selectColumns.size()]));
}
@Override
@@ -133,14 +158,15 @@ public Result applyFilters(List filtersExpressions) {
StarRocksExpressionExtractor extractor = new StarRocksExpressionExtractor();
for (ResolvedExpression expression : filtersExpressions) {
- if (expression.getOutputDataType().equals(DataTypes.BOOLEAN()) && expression.getChildren().size() == 0) {
+ if (expression.getOutputDataType().equals(DataTypes.BOOLEAN())
+ && expression.getChildren().size() == 0) {
filters.add(expression.accept(extractor) + " = true");
ac.add(expression);
continue;
}
String str = expression.accept(extractor);
+ remain.add(expression);
if (str == null) {
- remain.add(expression);
continue;
}
filters.add(str);
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java
index e340b2cd..bd32ca0a 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java
@@ -19,13 +19,21 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
+import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
+import static com.starrocks.connector.flink.table.source.StarRocksSourceOptions.LOOKUP_CACHE_MAX_ROWS;
+import static com.starrocks.connector.flink.table.source.StarRocksSourceOptions.LOOKUP_CACHE_TTL_MS;
+
public final class StarRocksDynamicTableSourceFactory implements DynamicTableSourceFactory {
@@ -41,10 +49,34 @@ public DynamicTableSource createDynamicTableSource(Context context, boolean need
}
ReadableConfig options = helper.getOptions();
// validate some special properties
- StarRocksSourceOptions sourceOptions = new StarRocksSourceOptions(options, context.getCatalogTable().getOptions());
- TableSchema flinkSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ StarRocksSourceOptions sourceOptions = new StarRocksSourceOptions(options,
+ context.getCatalogTable().getOptions());
+ TableSchema flinkSchema = TableSchemaUtils.getPhysicalSchema(
+ context.getCatalogTable().getSchema());
PushDownHolder pushDownHolder = new PushDownHolder();
- return new StarRocksDynamicTableSource(sourceOptions, flinkSchema, pushDownHolder);
+ return new StarRocksDynamicTableSource(sourceOptions, flinkSchema, pushDownHolder,
+ getLookupCache(options), context.getPhysicalRowDataType());
+ }
+
+ @Nullable
+ private LookupCache getLookupCache(ReadableConfig tableOptions) {
+ LookupCache cache = null;
+ // Legacy cache options
+ if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
+ && tableOptions.get(LOOKUP_CACHE_TTL_MS).compareTo(0L) > 0) {
+ cache =
+ DefaultLookupCache.newBuilder()
+ .maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
+ .expireAfterWrite(Duration.ofMillis(tableOptions.get(LOOKUP_CACHE_TTL_MS)))
+ .cacheMissingKey(tableOptions.get(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY))
+ .build();
+ }
+ if (tableOptions
+ .get(LookupOptions.CACHE_TYPE)
+ .equals(LookupOptions.LookupCacheType.PARTIAL)) {
+ cache = DefaultLookupCache.fromConfig(tableOptions);
+ }
+ return cache;
}
@Override
@@ -76,9 +108,13 @@ public Set> optionalOptions() {
options.add(StarRocksSourceOptions.SCAN_MEM_LIMIT);
options.add(StarRocksSourceOptions.SCAN_MAX_RETRIES);
options.add(StarRocksSourceOptions.SCAN_BE_HOST_MAPPING_LIST);
- options.add(StarRocksSourceOptions.LOOKUP_CACHE_TTL_MS);
- options.add(StarRocksSourceOptions.LOOKUP_CACHE_MAX_ROWS);
- options.add(StarRocksSourceOptions.LOOKUP_MAX_RETRIES);
+ options.add(LookupOptions.CACHE_TYPE);
+ options.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
+ options.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
+ options.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
+ options.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
+ options.add(LookupOptions.MAX_RETRIES);
+
return options;
}
}
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java
index cd739d76..d94c9931 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java
@@ -53,24 +53,24 @@ public class StarRocksSourceOptions implements Serializable {
public static final ConfigOption TABLE_NAME = ConfigOptions.key("table-name")
.stringType().noDefaultValue().withDescription("Table name");
-
-
+
+
// optional Options
public static final ConfigOption SCAN_CONNECT_TIMEOUT = ConfigOptions.key("scan.connect.timeout-ms")
.intType().defaultValue(1000).withDescription("Connect timeout");
-
+
public static final ConfigOption SCAN_BATCH_ROWS = ConfigOptions.key("scan.params.batch-rows")
.intType().defaultValue(1000).withDescription("Batch rows");
public static final ConfigOption SCAN_PROPERTIES = ConfigOptions.key("scan.params.properties")
.stringType().noDefaultValue().withDescription("Reserved params for use");
-
+
public static final ConfigOption SCAN_LIMIT = ConfigOptions.key("scan.params.limit")
.intType().defaultValue(1).withDescription("The query limit, if specified.");
public static final ConfigOption SCAN_KEEP_ALIVE_MIN = ConfigOptions.key("scan.params.keep-alive-min")
.intType().defaultValue(10).withDescription("Max keep alive time min");
-
+
public static final ConfigOption SCAN_QUERTY_TIMEOUT_S = ConfigOptions.key("scan.params.query-timeout-s")
.intType().defaultValue(600).withDescription("Query timeout for a single query");
@@ -88,21 +88,23 @@ public class StarRocksSourceOptions implements Serializable {
public static final ConfigOption SCAN_BE_HOST_MAPPING_LIST = ConfigOptions.key("scan.be-host-mapping-list")
.stringType().defaultValue("").withDescription("List of be host mapping");
-
+
// lookup Options
+ @Deprecated
public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows")
.longType().defaultValue(-1L).withDescription(
"the max number of rows of lookup cache, over this value, the oldest rows will "
+ "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is "
+ "specified. Cache is not enabled as default.");
+ @Deprecated
public static final ConfigOption LOOKUP_CACHE_TTL_MS = ConfigOptions.key("lookup.cache.ttl-ms")
.longType().defaultValue(5000L).withDescription("the cache time to live.");
+ @Deprecated
public static final ConfigOption LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries")
.intType().defaultValue(1).withDescription("the max retry times if lookup database failed.");
-
public static final String SOURCE_PROPERTIES_PREFIX = "scan.params.";
public StarRocksSourceOptions(ReadableConfig options, Map optionsMap) {
@@ -150,7 +152,7 @@ public String getScanUrl() {
public String getJdbcUrl() {
return tableOptions.get(JDBC_URL);
}
-
+
public String getUsername() {
return tableOptions.get(USERNAME);
}
@@ -169,8 +171,8 @@ public String getTableName() {
// optional Options
- public int getConnectTimeoutMs() {
- return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue();
+ public int getConnectTimeoutMs() {
+ return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue();
}
public int getBatchRows() {
diff --git a/src/main/java/com/starrocks/connector/flink/tools/ConnectionUtils.java b/src/main/java/com/starrocks/connector/flink/tools/ConnectionUtils.java
deleted file mode 100644
index e1f9be8d..00000000
--- a/src/main/java/com/starrocks/connector/flink/tools/ConnectionUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.starrocks.connector.flink.tools;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/** Utilities for HTTP connection. */
-public class ConnectionUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class);
-
- /** Select an available host from the list. Each host is like 'ip:port'. */
- @Nullable
- public static String selectAvailableHttpHost(List hostList, int connectionTimeout) {
- for (String host : hostList) {
- if (host == null) {
- continue;
- }
- if (!host.startsWith("http")) {
- host = "http://" + host;
- }
- if (testHttpConnection(host, connectionTimeout)) {
- return host;
- }
- }
-
- return null;
- }
-
- public static boolean testHttpConnection(String urlStr, int connectionTimeout) {
- try {
- URL url = new URL(urlStr);
- HttpURLConnection co = (HttpURLConnection) url.openConnection();
- co.setConnectTimeout(connectionTimeout);
- co.connect();
- co.disconnect();
- return true;
- } catch (Exception e) {
- LOG.warn("Failed to connect to {}", urlStr, e);
- return false;
- }
- }
-}
diff --git a/src/main/java/com/starrocks/connector/flink/util/JdbcTypeUtil.java b/src/main/java/com/starrocks/connector/flink/util/JdbcTypeUtil.java
new file mode 100644
index 00000000..19e512a4
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/util/JdbcTypeUtil.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.util;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ARRAY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BINARY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BOOLEAN;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.CHAR;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.SMALLINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TINYINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+
+import java.sql.Types;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+/**
+ * Utils for jdbc type.
+ */
+@Internal
+public class JdbcTypeUtil {
+
+ private static final Map, Integer> TYPE_MAPPING;
+
+ static {
+ HashMap, Integer> m = new HashMap<>();
+ m.put(STRING_TYPE_INFO, Types.VARCHAR);
+ m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN);
+ m.put(BYTE_TYPE_INFO, Types.TINYINT);
+ m.put(SHORT_TYPE_INFO, Types.SMALLINT);
+ m.put(INT_TYPE_INFO, Types.INTEGER);
+ m.put(LONG_TYPE_INFO, Types.BIGINT);
+ m.put(FLOAT_TYPE_INFO, Types.REAL);
+ m.put(DOUBLE_TYPE_INFO, Types.DOUBLE);
+ m.put(SqlTimeTypeInfo.DATE, Types.DATE);
+ m.put(SqlTimeTypeInfo.TIME, Types.TIME);
+ m.put(SqlTimeTypeInfo.TIMESTAMP, Types.TIMESTAMP);
+ m.put(LocalTimeTypeInfo.LOCAL_DATE, Types.DATE);
+ m.put(LocalTimeTypeInfo.LOCAL_TIME, Types.TIME);
+ m.put(LocalTimeTypeInfo.LOCAL_DATE_TIME, Types.TIMESTAMP);
+ m.put(BIG_DEC_TYPE_INFO, Types.DECIMAL);
+ m.put(BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.BINARY);
+ TYPE_MAPPING = Collections.unmodifiableMap(m);
+ }
+
+ private static final Map LOGICAL_TYPE_MAPPING =
+ Collections.unmodifiableMap(
+ new HashMap() {
+ {
+ put(VARCHAR, Types.VARCHAR);
+ put(CHAR, Types.CHAR);
+ put(VARBINARY, Types.VARBINARY);
+ put(BOOLEAN, Types.BOOLEAN);
+ put(BINARY, Types.BINARY);
+ put(TINYINT, Types.TINYINT);
+ put(SMALLINT, Types.SMALLINT);
+ put(INTEGER, Types.INTEGER);
+ put(BIGINT, Types.BIGINT);
+ put(FLOAT, Types.REAL);
+ put(DOUBLE, Types.DOUBLE);
+ put(DATE, Types.DATE);
+ put(TIMESTAMP_WITHOUT_TIME_ZONE, Types.TIMESTAMP);
+ put(TIMESTAMP_WITH_TIME_ZONE, Types.TIMESTAMP_WITH_TIMEZONE);
+ put(TIME_WITHOUT_TIME_ZONE, Types.TIME);
+ put(DECIMAL, Types.DECIMAL);
+ put(ARRAY, Types.ARRAY);
+ }
+ });
+
+ private JdbcTypeUtil() {
+ }
+
+ public static int logicalTypeToSqlType(LogicalTypeRoot typeRoot) {
+ if (LOGICAL_TYPE_MAPPING.containsKey(typeRoot)) {
+ return LOGICAL_TYPE_MAPPING.get(typeRoot);
+ } else {
+ throw new IllegalArgumentException("Unsupported typeRoot: " + typeRoot);
+ }
+ }
+}
diff --git a/src/main/java/com/starrocks/connector/flink/util/JdbcUtils.java b/src/main/java/com/starrocks/connector/flink/util/JdbcUtils.java
new file mode 100644
index 00000000..91fe8afa
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/flink/util/JdbcUtils.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.connector.flink.util;
+
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/** Utils for jdbc connectors. */
+public class JdbcUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class);
+
+ /**
+ * Adds a record to the prepared statement.
+ *
+ * When this method is called, the output format is guaranteed to be opened.
+ *
+ *
WARNING: this may fail when no column types specified (because a best effort approach is
+ * attempted in order to insert a null value but it's not guaranteed that the JDBC driver
+ * handles PreparedStatement.setObject(pos, null))
+ *
+ * @param upload The prepared statement.
+ * @param typesArray The jdbc types of the row.
+ * @param row The records to add to the output.
+ * @see PreparedStatement
+ */
+ public static void setRecordToStatement(PreparedStatement upload, int[] typesArray, Row row)
+ throws SQLException {
+ if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
+ LOG.warn(
+ "Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+ }
+ if (typesArray == null) {
+ // no types provided
+ for (int index = 0; index < row.getArity(); index++) {
+ LOG.warn(
+ "Unknown column type for column {}. Best effort approach to set its value: {}.",
+ index + 1,
+ row.getField(index));
+ upload.setObject(index + 1, row.getField(index));
+ }
+ } else {
+ // types provided
+ for (int i = 0; i < row.getArity(); i++) {
+ setField(upload, typesArray[i], row.getField(i), i);
+ }
+ }
+ }
+
+ public static void setField(PreparedStatement upload, int type, Object field, int index)
+ throws SQLException {
+ if (field == null) {
+ upload.setNull(index + 1, type);
+ } else {
+ try {
+ // casting values as suggested by
+ // http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
+ switch (type) {
+ case java.sql.Types.NULL:
+ upload.setNull(index + 1, type);
+ break;
+ case java.sql.Types.BOOLEAN:
+ case java.sql.Types.BIT:
+ upload.setBoolean(index + 1, (boolean) field);
+ break;
+ case java.sql.Types.CHAR:
+ case java.sql.Types.NCHAR:
+ case java.sql.Types.VARCHAR:
+ case java.sql.Types.LONGVARCHAR:
+ case java.sql.Types.LONGNVARCHAR:
+ upload.setString(index + 1, (String) field);
+ break;
+ case java.sql.Types.TINYINT:
+ upload.setByte(index + 1, (byte) field);
+ break;
+ case java.sql.Types.SMALLINT:
+ upload.setShort(index + 1, (short) field);
+ break;
+ case java.sql.Types.INTEGER:
+ upload.setInt(index + 1, (int) field);
+ break;
+ case java.sql.Types.BIGINT:
+ upload.setLong(index + 1, (long) field);
+ break;
+ case java.sql.Types.REAL:
+ upload.setFloat(index + 1, (float) field);
+ break;
+ case java.sql.Types.FLOAT:
+ case java.sql.Types.DOUBLE:
+ upload.setDouble(index + 1, (double) field);
+ break;
+ case java.sql.Types.DECIMAL:
+ case java.sql.Types.NUMERIC:
+ upload.setBigDecimal(index + 1, (java.math.BigDecimal) field);
+ break;
+ case java.sql.Types.DATE:
+ upload.setDate(index + 1, (java.sql.Date) field);
+ break;
+ case java.sql.Types.TIME:
+ upload.setTime(index + 1, (java.sql.Time) field);
+ break;
+ case java.sql.Types.TIMESTAMP:
+ upload.setTimestamp(index + 1, (java.sql.Timestamp) field);
+ break;
+ case java.sql.Types.BINARY:
+ case java.sql.Types.VARBINARY:
+ case java.sql.Types.LONGVARBINARY:
+ upload.setBytes(index + 1, (byte[]) field);
+ break;
+ default:
+ upload.setObject(index + 1, field);
+ LOG.warn(
+ "Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
+ type,
+ index + 1,
+ field);
+ // case java.sql.Types.SQLXML
+ // case java.sql.Types.ARRAY:
+ // case java.sql.Types.JAVA_OBJECT:
+ // case java.sql.Types.BLOB:
+ // case java.sql.Types.CLOB:
+ // case java.sql.Types.NCLOB:
+ // case java.sql.Types.DATALINK:
+ // case java.sql.Types.DISTINCT:
+ // case java.sql.Types.OTHER:
+ // case java.sql.Types.REF:
+ // case java.sql.Types.ROWID:
+ // case java.sql.Types.STRUC
+ }
+ } catch (ClassCastException e) {
+ // enrich the exception with detailed information.
+ String errorMessage =
+ String.format(
+ "%s, field index: %s, field value: %s.",
+ e.getMessage(), index, field);
+ ClassCastException enrichedException = new ClassCastException(errorMessage);
+ enrichedException.setStackTrace(e.getStackTrace());
+ throw enrichedException;
+ }
+ }
+ }
+
+ public static Row getPrimaryKey(Row row, int[] pkFields) {
+ Row pkRow = new Row(pkFields.length);
+ for (int i = 0; i < pkFields.length; i++) {
+ pkRow.setField(i, row.getField(pkFields[i]));
+ }
+ return pkRow;
+ }
+}
diff --git a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java
index edfd49b7..77967e1b 100644
--- a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java
+++ b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java
@@ -44,7 +44,7 @@ public class StarRocksDynamicTableSourceTest extends StarRocksSourceBaseTest {
@Before
public void init() {
pushDownHolder = new PushDownHolder();
- dynamicTableSource = new StarRocksDynamicTableSource(OPTIONS, TABLE_SCHEMA, pushDownHolder);
+ dynamicTableSource = new StarRocksDynamicTableSource(OPTIONS, TABLE_SCHEMA, pushDownHolder,null,null);
}
@Test
@@ -55,7 +55,7 @@ public void testApplyProjection() {
for (int i = 0; i < SELECT_COLUMNS.length; i ++) {
assertEquals(SELECT_COLUMNS[i].getColumnIndexInFlinkTable(), pushDownHolder.getSelectColumns()[i].getColumnIndexInFlinkTable());
assertEquals(SELECT_COLUMNS[i].getColumnName(), pushDownHolder.getSelectColumns()[i].getColumnName());
- }
+ }
assertEquals(StarRocksSourceQueryType.QuerySomeColumns, pushDownHolder.getQueryType());
dynamicTableSource.applyProjection(PROJECTION_ARRAY_NULL);
@@ -121,7 +121,7 @@ public void testFilter() {
BuiltInFunctionDefinitions.EQUALS,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN());
-
+
dynamicTableSource.applyFilters(Arrays.asList(c1Exp,
new CallExpression(
BuiltInFunctionDefinitions.NOT_EQUALS,
@@ -162,7 +162,7 @@ public void testFilter() {
BuiltInFunctionDefinitions.LIKE,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN());
- try {
+ try {
dynamicTableSource.applyFilters(Collections.singletonList(c6Exp));
} catch (Exception e) {
e.printStackTrace();
@@ -174,7 +174,7 @@ public void testFilter() {
BuiltInFunctionDefinitions.IN,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN());
- try {
+ try {
dynamicTableSource.applyFilters(Collections.singletonList(c7Exp));
} catch (Exception e) {
e.printStackTrace();
@@ -186,7 +186,7 @@ public void testFilter() {
BuiltInFunctionDefinitions.BETWEEN,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN());
- try {
+ try {
dynamicTableSource.applyFilters(Collections.singletonList(c8Exp));
} catch (Exception e) {
e.printStackTrace();