Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Create a JDBC connection only while decoding array types #335

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
}
String serializedNameToType = "";
String serializedOidToType = "";
try (YugabyteDBConnection connection = new YugabyteDBConnection(yugabyteDBConnectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL)) {
try (YugabyteDBConnection connection = new YugabyteDBConnection(yugabyteDBConnectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_VALIDATE_CONNECTION)) {
if (yugabyteDBConnectorConfig.isYSQLDbType()) {
YugabyteDBTypeRegistry typeRegistry = new YugabyteDBTypeRegistry(connection);
Map<String, YugabyteDBType> nameToType = typeRegistry.getNameToType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
/**
* Kafka connect source task which uses YugabyteDB CDC API to process DB changes.
*
* @author Suranjan Kumar ([email protected])
* @author Suranjan Kumar ([email protected]), Vaibhav Kushwaha ([email protected])
*/
public class YugabyteDBConnectorTask
extends BaseSourceTask<YBPartition, YugabyteDBOffsetContext> {
Expand All @@ -60,8 +60,6 @@ public class YugabyteDBConnectorTask

private volatile YugabyteDBTaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile YugabyteDBConnection jdbcConnection;
private volatile YugabyteDBConnection heartbeatConnection;
private volatile YugabyteDBSchema schema;

@Override
Expand All @@ -83,17 +81,13 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(

Encoding encoding = Encoding.defaultEncoding(); // UTF-8
YugabyteDBTaskConnection taskConnection = new YugabyteDBTaskConnection(encoding);


final YugabyteDBValueConverterBuilder valueConverterBuilder = (typeRegistry) -> YugabyteDBValueConverter.of(
connectorConfig,
databaseCharset,
typeRegistry);



if (connectorConfig.isYSQLDbType()) {

String nameToTypeStr = config.getString(YugabyteDBConnectorConfig.NAME_TO_TYPE.toString());
String oidToTypeStr = config.getString(YugabyteDBConnectorConfig.OID_TO_TYPE.toString());

Expand All @@ -114,15 +108,10 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
LOGGER.error("Error while deserializing object to type string", e);
}

// Global JDBC connection used both for snapshotting and streaming.
// Must be able to resolve datatypes.
jdbcConnection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), valueConverterBuilder,
YugabyteDBConnection.CONNECTION_GENERAL);

// CDCSDK We can just build the type registry on the co-ordinator and then send
// the map of Postgres Type and Oid to the Task using Config
// This type registry is being build with the nameToType and oidToType map populated.
final YugabyteDBTypeRegistry yugabyteDBTypeRegistry = new YugabyteDBTypeRegistry(taskConnection, nameToType,
oidToType, jdbcConnection);
oidToType, null /* yugabyteDBConnection */);

schema = new YugabyteDBSchema(connectorConfig, yugabyteDBTypeRegistry, topicSelector,
valueConverterBuilder.build(yugabyteDBTypeRegistry));
Expand All @@ -146,9 +135,6 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
new YugabyteDBOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();

YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(previousOffsets,
connectorConfig);

LoggingContext.PreviousContext previousContext = taskContext
.configureLoggingContext(CONTEXT_NAME + "|" + taskId);
try {
Expand All @@ -167,7 +153,6 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(

final YugabyteDBEventMetadataProvider metadataProvider = new YugabyteDBEventMetadataProvider();

Configuration configuration = connectorConfig.getConfig();
HeartbeatFactory heartbeatFactory = new HeartbeatFactory<>(
connectorConfig,
topicSelector,
Expand Down Expand Up @@ -198,7 +183,7 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
metadataProvider,
heartbeatFactory,
schemaNameAdjuster,
jdbcConnection);
null /* jdbcConnection */);

YugabyteDBChangeEventSourceCoordinator coordinator = new YugabyteDBChangeEventSourceCoordinator(
previousOffsets,
Expand All @@ -208,7 +193,7 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
new YugabyteDBChangeEventSourceFactory(
connectorConfig,
snapshotter,
jdbcConnection,
null, /* jdbcConnection */
errorHandler,
dispatcher,
clock,
Expand Down Expand Up @@ -340,14 +325,6 @@ public List<SourceRecord> doPoll() throws InterruptedException {

@Override
protected void doStop() {
if (jdbcConnection != null) {
jdbcConnection.close();
}

if (heartbeatConnection != null) {
heartbeatConnection.close();
}

if (schema != null) {
schema.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,17 @@ protected Optional<ColumnEditor> readTableColumn(CdcService.CDCSDKColumnInfoPB c
}

private int getLength(int oid) {
return getTypeRegistry().get(oid).getDefaultLength();
// YB Note: Since we are anyway getting the default length as -1 as the service doesn't send
// this value, it will not make any difference to return it directly without looking it up.
// return getTypeRegistry().get(oid).getDefaultLength();
return -1;
}

private int getScale(int oid) {
return getTypeRegistry().get(oid).getDefaultScale();
// YB Note: Since we are anyway getting the default scale as -1 as the service doesn't send
// this value, it will not make any difference to return it directly without looking it up.
// return getTypeRegistry().get(oid).getDefaultScale();
return -1;
}

private int resolveNativeType(int oid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public YugabyteDBSnapshotChangeEventSource(YugabyteDBConnectorConfig connectorCo
this.dispatcher = dispatcher;
this.clock = clock;
this.snapshotter = snapshotter;
this.connection = connection;
this.connection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL);
this.snapshotProgressListener = snapshotProgressListener;

AsyncYBClient asyncClient = new AsyncYBClient.AsyncYBClientBuilder(connectorConfig.masterAddresses())
Expand Down Expand Up @@ -155,7 +155,7 @@ public SnapshotResult<YugabyteDBOffsetContext> execute(ChangeEventSourceContext
partitions.forEach(YBPartition::markTableAsColocated);

LOGGER.info("Setting offsetContext/previousOffset for snapshot...");
previousOffset = YugabyteDBOffsetContext.initialContextForSnapshot(this.connectorConfig, connection, clock, partitions);
previousOffset = YugabyteDBOffsetContext.initialContextForSnapshot(this.connectorConfig, null /* connection */, clock, partitions);

this.partitionRanges = YugabyteDBConnectorUtils.populatePartitionRanges(
connectorConfig.getConfig().getString(YugabyteDBConnectorConfig.HASH_RANGES_LIST));
Expand Down Expand Up @@ -188,6 +188,16 @@ public SnapshotResult<YugabyteDBOffsetContext> execute(ChangeEventSourceContext
else {
snapshotProgressListener.snapshotAborted(partition);
}

// Close YugabyteDBConnection if opened.
try {
if (connection.isConnected()) {
LOGGER.info("Closing YugabyteDBConnection after snapshot");
connection.close();
}
} catch (SQLException sqle) {
LOGGER.error("Error while closing JDBC connection during snapshot", sqle);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorC
YugabyteDBSchema schema, YugabyteDBTaskContext taskContext, ReplicationConnection replicationConnection,
ChangeEventQueue<DataChangeEvent> queue) {
this.connectorConfig = connectorConfig;
this.connection = connection;
this.connection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL);
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.clock = clock;
Expand Down Expand Up @@ -144,7 +144,7 @@ public void execute(ChangeEventSourceContext context, YBPartition partition, Yug

if (!hasStartLsnStoredInContext) {
LOGGER.info("No start opid found in the context.");
offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock, partitions);
offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, null /* connection */, clock, partitions);
}
try {
// Populate partition ranges.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import java.util.Map;
import java.util.Set;

import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.TypeInfo;
import org.postgresql.jdbc.PgDatabaseMetaData;
import org.postgresql.jdbc.TypeInfoCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -115,8 +118,8 @@ public Map<Integer, YugabyteDBType> getOidToType() {

private final int maxConnectionRetries = 5;

private final YugabyteDBConnection yugabyteDBConnection;
private final transient TypeInfo typeInfo;
private YugabyteDBConnection yugabyteDBConnection;
private transient TypeInfo typeInfo;
private SqlTypeMapper sqlTypeMapper;

private int geometryOid = Integer.MIN_VALUE;
Expand All @@ -139,8 +142,8 @@ public Map<Integer, YugabyteDBType> getOidToType() {

public YugabyteDBTypeRegistry(YugabyteDBConnection connection) {
try {
this.connection = connection.connection();
this.yugabyteDBConnection = connection;
this.connection = this.yugabyteDBConnection.connection();
this.oidToType = new HashMap<>();
this.nameToType = new HashMap<>();
typeInfo = ((BaseConnection) this.connection).getTypeInfo();
Expand All @@ -156,9 +159,6 @@ public YugabyteDBTypeRegistry(YugabyteDBTaskConnection connection,
Map<String, YugabyteDBType> nameToType,
Map<Integer, YugabyteDBType> oidToType,
YugabyteDBConnection yugabyteDBConnection) {
this.connection = connection;
this.yugabyteDBConnection = yugabyteDBConnection;
typeInfo = ((BaseConnection) this.connection).getTypeInfo();
this.oidToType = oidToType;
this.nameToType = nameToType;
for (YugabyteDBType t : oidToType.values()) {
Expand All @@ -171,7 +171,6 @@ private void addType(YugabyteDBType type) {
nameToType.put(type.getName(), type);

updateType(type);
// updateType(type.getOid());
}

private void updateType(YugabyteDBType type) {
Expand Down Expand Up @@ -248,13 +247,13 @@ else if (TYPE_NAME_LTREE_ARRAY.equals(type.getName())) {
public YugabyteDBType get(int oid) {
YugabyteDBType r = oidToType.get(oid);
if (r == null) {
LOGGER.trace("Looking up type from database for oid {}", oid);
r = resolveUnknownType(oid);
if (r == null) {
LOGGER.warn("Unknown OID {} requested", oid);
r = YugabyteDBType.UNKNOWN;
}
}
// new IllegalArgumentException().printStackTrace();
return r;
}

Expand Down Expand Up @@ -411,7 +410,7 @@ private void prime() throws SQLException {
LOGGER.error("Error while executing query on database, all the {} retries failed.", maxConnectionRetries);
throw e;
}
LOGGER.warn("Error while executing query on database, will retry. Attempt {} out of {}",retryCount, maxConnectionRetries );
LOGGER.warn("Error while executing query on database, will retry. Attempt {} out of {} for error:", retryCount, maxConnectionRetries, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public static Object resolveValue(String columnName, YugabyteDBType type, String
includeUnknownDatatypes, yugabyteDBTypeRegistry);
}

// CDCSDK this too we can avoid using connection.
// only date and string requires
// We will only open a connection once we detect that it is an array type.
if (value.isArray(type)) {
return value.asArray(columnName, type, fullType, connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class YugabyteDBConnection extends JdbcConnection {
public static final String CONNECTION_VALIDATE_CONNECTION = "Debezium Validate Connection";
public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat";
public static final String CONNECTION_GENERAL = "Debezium General";
public static final String CONNECTION_TYPE_REGISTRY = "Debezium Type Registry";
public static final String CONNECTION_TEST = "Debezium Test";

private static Logger LOGGER = LoggerFactory.getLogger(YugabyteDBConnection.class);

Expand Down
37 changes: 32 additions & 5 deletions src/test/java/io/debezium/connector/yugabytedb/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ public static YugabyteDBConnectorConfig.LogicalDecoder decoderPlugin() {
*/
public static YugabyteDBConnection create() {

return new YugabyteDBConnection(defaultJdbcConfig(CONTAINER_YSQL_HOST, CONTAINER_YSQL_PORT), YugabyteDBConnection.CONNECTION_GENERAL);
return new YugabyteDBConnection(defaultJdbcConfig(CONTAINER_YSQL_HOST, CONTAINER_YSQL_PORT), YugabyteDBConnection.CONNECTION_TEST);
}

public static YugabyteDBConnection createConnectionTo(String databaseName) {
return new YugabyteDBConnection(defaultJdbcConfig(CONTAINER_YSQL_HOST, CONTAINER_YSQL_PORT, databaseName), YugabyteDBConnection.CONNECTION_GENERAL);
return new YugabyteDBConnection(defaultJdbcConfig(CONTAINER_YSQL_HOST, CONTAINER_YSQL_PORT, databaseName), YugabyteDBConnection.CONNECTION_TEST);
}

/**
Expand All @@ -193,7 +193,7 @@ public static YugabyteDBConnection createWithTypeRegistry() {

return new YugabyteDBConnection(
config.getJdbcConfig(),
getPostgresValueConverterBuilder(config), YugabyteDBConnection.CONNECTION_GENERAL);
getPostgresValueConverterBuilder(config), YugabyteDBConnection.CONNECTION_TEST);
}

/**
Expand All @@ -204,7 +204,7 @@ public static YugabyteDBConnection createWithTypeRegistry() {
* @return the PostgresConnection instance; never null
*/
public static YugabyteDBConnection create(String appName) {
return new YugabyteDBConnection(Objects.requireNonNull(defaultJdbcConfigBuilder()).with("ApplicationName", appName).build(), YugabyteDBConnection.CONNECTION_GENERAL);
return new YugabyteDBConnection(Objects.requireNonNull(defaultJdbcConfigBuilder()).with("ApplicationName", appName).build(), YugabyteDBConnection.CONNECTION_TEST);
}

/**
Expand Down Expand Up @@ -346,7 +346,7 @@ public static String getMasterAddress() {

public static YugabyteDBTypeRegistry getTypeRegistry() {
final YugabyteDBConnectorConfig config = new YugabyteDBConnectorConfig(defaultConfig().build());
try (final YugabyteDBConnection connection = new YugabyteDBConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), YugabyteDBConnection.CONNECTION_GENERAL)) {
try (final YugabyteDBConnection connection = new YugabyteDBConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), YugabyteDBConnection.CONNECTION_TEST)) {
return connection.getTypeRegistry();
}
}
Expand Down Expand Up @@ -767,4 +767,31 @@ public static Stream<Arguments> streamTypeProviderForSnapshot() {
Arguments.of(false, false), // Older stream
Arguments.of(true, true)); // USE_SNAPSHOT stream
}

public static int getConnectionCount() throws SQLException {
return getConnectionCount(YugabyteDBConnection.CONNECTION_TEST);
}

/**
* @param applicationName the name of the application to get connection count of
* @return the number of connections established on YugabyteDB service with the given application name
* @throws SQLException if a connection cannot be established
*/
public static int getConnectionCount(String applicationName) throws SQLException {
try (Connection conn = create().connection()) {
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(
String.format("SELECT COUNT(*) FROM pg_stat_activity WHERE application_name = '%s'",
applicationName));

if (rs.next()) {
return rs.getInt(1);
} else {
LOGGER.warn("No row returned while querying for connection count");
}
}

// Indicates some error while fetching connection count.
return -1;
}
}
Loading
Loading