From b3bb7273f7fce881eca2cd698cb5268b7ae5da3f Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 12 Jun 2024 01:44:34 +0530 Subject: [PATCH 01/10] working set of changes --- .../YugabyteDBChangeEventSourceFactory.java | 16 +--- .../YugabyteDBChangeRecordEmitter.java | 6 +- .../yugabytedb/YugabyteDBConnector.java | 4 +- .../yugabytedb/YugabyteDBConnectorTask.java | 43 ++------- .../YugabyteDBConsistentStreamingSource.java | 3 +- .../yugabytedb/YugabyteDBEventDispatcher.java | 5 +- .../yugabytedb/YugabyteDBOffsetContext.java | 7 +- .../yugabytedb/YugabyteDBSchema.java | 10 ++- .../YugabyteDBSnapshotChangeEventSource.java | 16 +++- .../YugabyteDBStreamingChangeEventSource.java | 7 +- .../yugabytedb/YugabyteDBTypeRegistry.java | 66 +++----------- .../connection/AbstractColumnValue.java | 7 +- .../connection/ReplicationMessage.java | 7 +- ...ReplicationMessageColumnValueResolver.java | 7 +- .../connection/YugabyteDBConnection.java | 2 + .../pgoutput/YbOutputMessageDecoder.java | 4 +- .../pgoutput/YbOutputReplicationMessage.java | 3 +- .../pgproto/YbProtoColumnValue.java | 8 +- .../pgproto/YbProtoReplicationMessage.java | 7 +- .../wal2json/Wal2JsonReplicationMessage.java | 5 +- .../connector/yugabytedb/TestHelper.java | 41 +++++++-- .../YugabyteDBCompleteTypesTest.java | 87 +++++++++++++++++-- .../yugabytedb/YugabyteDBDatatypesTest.java | 10 ++- 23 files changed, 206 insertions(+), 165 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java index cd4c0c08..6bf9363f 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java @@ -32,7 +32,6 @@ public class YugabyteDBChangeEventSourceFactory implements ChangeEventSourceFact private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBChangeEventSourceFactory.class); private final YugabyteDBConnectorConfig configuration; - private final YugabyteDBConnection jdbcConnection; private final ErrorHandler errorHandler; private final YugabyteDBEventDispatcher dispatcher; private final Clock clock; @@ -46,7 +45,6 @@ public class YugabyteDBChangeEventSourceFactory implements ChangeEventSourceFact public YugabyteDBChangeEventSourceFactory(YugabyteDBConnectorConfig configuration, Snapshotter snapshotter, - YugabyteDBConnection jdbcConnection, ErrorHandler errorHandler, YugabyteDBEventDispatcher dispatcher, Clock clock, YugabyteDBSchema schema, @@ -56,7 +54,6 @@ public YugabyteDBChangeEventSourceFactory(YugabyteDBConnectorConfig configuratio SlotState startingSlotInfo, ChangeEventQueue queue) { this.configuration = configuration; - this.jdbcConnection = jdbcConnection; this.errorHandler = errorHandler; this.dispatcher = dispatcher; this.clock = clock; @@ -76,7 +73,6 @@ public SnapshotChangeEventSource getSnapsh configuration, taskContext, snapshotter, - jdbcConnection, schema, dispatcher, clock, @@ -91,7 +87,6 @@ public StreamingChangeEventSource getStrea return new YugabyteDBStreamingChangeEventSource( configuration, snapshotter, - jdbcConnection, dispatcher, errorHandler, clock, @@ -104,7 +99,6 @@ public StreamingChangeEventSource getStrea return new YugabyteDBConsistentStreamingSource( configuration, snapshotter, - jdbcConnection, dispatcher, errorHandler, clock, @@ -119,14 +113,6 @@ public StreamingChangeEventSource getStrea public Optional> getIncrementalSnapshotChangeEventSource(YugabyteDBOffsetContext offsetContext, SnapshotProgressListener snapshotProgressListener, DataChangeEventListener dataChangeEventListener) { - final SignalBasedIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource( - configuration, - jdbcConnection, - dispatcher, - schema, - clock, - snapshotProgressListener, - dataChangeEventListener); - return Optional.of(incrementalSnapshotChangeEventSource); + return Optional.empty(); } } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java index 2e2d2050..9eb2db07 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java @@ -196,8 +196,7 @@ private Object[] columnValues(List columns, TableId t undeliveredToastableColumns.remove(columnName); int position = getPosition(columnName, table, values); if (position != -1) { - Object value = column.getValue(() -> (BaseConnection) connection.connection(), - connectorConfig.includeUnknownDatatypes()); + Object value = column.getValue(connection, connectorConfig.includeUnknownDatatypes()); // values[position] = value; values[position] = new Object[]{ value, Boolean.TRUE }; } @@ -240,8 +239,7 @@ private Object[] updatedColumnValues(List columns, Ta int position = getPosition(columnName, table, values); if (position != -1) { - Object value = column.getValue(() -> (BaseConnection) connection.connection(), - connectorConfig.includeUnknownDatatypes()); + Object value = column.getValue(connection, connectorConfig.includeUnknownDatatypes()); values[position] = new Object[]{ value, Boolean.TRUE }; } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java index 6886a0d7..7691130d 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java @@ -115,9 +115,11 @@ public List> taskConfigs(int maxTasks) { } String serializedNameToType = ""; String serializedOidToType = ""; - try (YugabyteDBConnection connection = new YugabyteDBConnection(yugabyteDBConnectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL)) { + try (YugabyteDBConnection connection = new YugabyteDBConnection(yugabyteDBConnectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_VALIDATE_CONNECTION)) { if (yugabyteDBConnectorConfig.isYSQLDbType()) { + LOGGER.info("Creating type registry"); YugabyteDBTypeRegistry typeRegistry = new YugabyteDBTypeRegistry(connection); + LOGGER.info("After creating type registry"); Map nameToType = typeRegistry.getNameToType(); Map oidToType = typeRegistry.getOidToType(); try { diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java index 284cbd19..c91e04d6 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java @@ -50,7 +50,7 @@ /** * Kafka connect source task which uses YugabyteDB CDC API to process DB changes. * - * @author Suranjan Kumar (skumar@yugabyte.com) + * @author Suranjan Kumar (skumar@yugabyte.com), Vaibhav Kushwaha (vkushwaha@yugabyte.com) */ public class YugabyteDBConnectorTask extends BaseSourceTask { @@ -60,8 +60,6 @@ public class YugabyteDBConnectorTask private volatile YugabyteDBTaskContext taskContext; private volatile ChangeEventQueue queue; - private volatile YugabyteDBConnection jdbcConnection; - private volatile YugabyteDBConnection heartbeatConnection; private volatile YugabyteDBSchema schema; @Override @@ -81,19 +79,12 @@ public ChangeEventSourceCoordinator start( final String databaseCharsetName = config.getString(YugabyteDBConnectorConfig.CHAR_SET); final Charset databaseCharset = Charset.forName(databaseCharsetName); - Encoding encoding = Encoding.defaultEncoding(); // UTF-8 - YugabyteDBTaskConnection taskConnection = new YugabyteDBTaskConnection(encoding); - - final YugabyteDBValueConverterBuilder valueConverterBuilder = (typeRegistry) -> YugabyteDBValueConverter.of( connectorConfig, databaseCharset, typeRegistry); - - if (connectorConfig.isYSQLDbType()) { - String nameToTypeStr = config.getString(YugabyteDBConnectorConfig.NAME_TO_TYPE.toString()); String oidToTypeStr = config.getString(YugabyteDBConnectorConfig.OID_TO_TYPE.toString()); @@ -114,15 +105,10 @@ public ChangeEventSourceCoordinator start( LOGGER.error("Error while deserializing object to type string", e); } - // Global JDBC connection used both for snapshotting and streaming. - // Must be able to resolve datatypes. - jdbcConnection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), valueConverterBuilder, - YugabyteDBConnection.CONNECTION_GENERAL); - // CDCSDK We can just build the type registry on the co-ordinator and then send - // the map of Postgres Type and Oid to the Task using Config - final YugabyteDBTypeRegistry yugabyteDBTypeRegistry = new YugabyteDBTypeRegistry(taskConnection, nameToType, - oidToType, jdbcConnection); + // This type registry is being build with the nameToType and oidToType map populated. + final YugabyteDBTypeRegistry yugabyteDBTypeRegistry = + new YugabyteDBTypeRegistry(connectorConfig.getJdbcConfig(), nameToType, oidToType); schema = new YugabyteDBSchema(connectorConfig, yugabyteDBTypeRegistry, topicSelector, valueConverterBuilder.build(yugabyteDBTypeRegistry)); @@ -146,15 +132,9 @@ public ChangeEventSourceCoordinator start( new YugabyteDBOffsetContext.Loader(connectorConfig)); final Clock clock = Clock.system(); - YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(previousOffsets, - connectorConfig); - LoggingContext.PreviousContext previousContext = taskContext .configureLoggingContext(CONTEXT_NAME + "|" + taskId); try { - // Print out the server information - // CDCSDK Get the table, - queue = new ChangeEventQueue.Builder() .pollInterval(connectorConfig.getPollInterval()) .maxBatchSize(connectorConfig.getMaxBatchSize()) @@ -167,7 +147,7 @@ public ChangeEventSourceCoordinator start( final YugabyteDBEventMetadataProvider metadataProvider = new YugabyteDBEventMetadataProvider(); - Configuration configuration = connectorConfig.getConfig(); + // todo Vaibhav: see if we can get rid of heartbeat factory HeartbeatFactory heartbeatFactory = new HeartbeatFactory<>( connectorConfig, topicSelector, @@ -197,8 +177,7 @@ public ChangeEventSourceCoordinator start( YugabyteDBChangeRecordEmitter::updateSchema, metadataProvider, heartbeatFactory, - schemaNameAdjuster, - jdbcConnection); + schemaNameAdjuster); YugabyteDBChangeEventSourceCoordinator coordinator = new YugabyteDBChangeEventSourceCoordinator( previousOffsets, @@ -208,7 +187,6 @@ public ChangeEventSourceCoordinator start( new YugabyteDBChangeEventSourceFactory( connectorConfig, snapshotter, - jdbcConnection, errorHandler, dispatcher, clock, @@ -233,6 +211,7 @@ public ChangeEventSourceCoordinator start( } } + // todo Vaibhav: not being used. Map getPreviousOffsetss( Partition.Provider provider, OffsetContext.Loader loader) { @@ -340,14 +319,6 @@ public List doPoll() throws InterruptedException { @Override protected void doStop() { - if (jdbcConnection != null) { - jdbcConnection.close(); - } - - if (heartbeatConnection != null) { - heartbeatConnection.close(); - } - if (schema != null) { schema.close(); } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java index 9399d237..24fba4a3 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java @@ -29,14 +29,13 @@ public class YugabyteDBConsistentStreamingSource extends YugabyteDBStreamingChan public YugabyteDBConsistentStreamingSource(YugabyteDBConnectorConfig connectorConfig, Snapshotter snapshotter, - YugabyteDBConnection connection, YugabyteDBEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, YugabyteDBSchema schema, YugabyteDBTaskContext taskContext, ReplicationConnection replicationConnection, ChangeEventQueue queue) { - super(connectorConfig, snapshotter, connection, dispatcher, errorHandler, clock, schema, + super(connectorConfig, snapshotter, dispatcher, errorHandler, clock, schema, taskContext, replicationConnection, queue); } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBEventDispatcher.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBEventDispatcher.java index 48602545..b62004bd 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBEventDispatcher.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBEventDispatcher.java @@ -55,7 +55,6 @@ public class YugabyteDBEventDispatcher extends Event private final InconsistentSchemaHandler inconsistentSchemaHandler; private final Signal signal; private final boolean neverSkip; - private final Heartbeat heartbeat; private final EnumSet skippedOperations; private final DataCollectionFilters.DataCollectionFilter filter; private final YugabyteDBTransactionMonitor transactionMonitor; @@ -64,8 +63,7 @@ public class YugabyteDBEventDispatcher extends Event public YugabyteDBEventDispatcher(YugabyteDBConnectorConfig connectorConfig, TopicSelector topicSelector, DatabaseSchema schema, ChangeEventQueue queue, DataCollectionFilters.DataCollectionFilter filter, ChangeEventCreator changeEventCreator, InconsistentSchemaHandler inconsistentSchemaHandler, - EventMetadataProvider metadataProvider, HeartbeatFactory heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster, - JdbcConnection jdbcConnection) { + EventMetadataProvider metadataProvider, HeartbeatFactory heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster) { super(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler, metadataProvider, heartbeatFactory, schemaNameAdjuster); this.connectorConfig = connectorConfig; @@ -74,7 +72,6 @@ public YugabyteDBEventDispatcher(YugabyteDBConnectorConfig connectorConfig, Topi this.logicalDecodingMessageMonitor = new LogicalDecodingMessageMonitor(connectorConfig, this::enqueueLogicalDecodingMessage); this.messageFilter = connectorConfig.getMessageFilter(); this.topicSelector = topicSelector; - this.heartbeat = heartbeatFactory.createHeartbeat(); this.streamingReceiver = new YugabyteDBStreamingChangeRecordReceiver(); this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema; this.signal = new Signal<>(connectorConfig, this); diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 2e133302..03f71044 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -83,24 +83,21 @@ public YugabyteDBOffsetContext(Offsets pre } public static YugabyteDBOffsetContext initialContextForSnapshot(YugabyteDBConnectorConfig connectorConfig, - YugabyteDBConnection jdbcConnection, Clock clock, Set partitions) { - return initialContext(connectorConfig, jdbcConnection, clock, snapshotStartLsn(), + return initialContext(connectorConfig, clock, snapshotStartLsn(), snapshotStartLsn(), partitions); } public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig, - YugabyteDBConnection jdbcConnection, Clock clock, Set partitions) { LOGGER.info("Initializing streaming context"); - return initialContext(connectorConfig, jdbcConnection, clock, streamingStartLsn(), + return initialContext(connectorConfig, clock, streamingStartLsn(), streamingStartLsn(), partitions); } public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig, - YugabyteDBConnection jdbcConnection, Clock clock, OpId lastCommitLsn, OpId lastCompletelyProcessedLsn, diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java index ea698bc8..e051a6c0 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java @@ -373,15 +373,19 @@ protected Optional readTableColumn(CdcService.CDCSDKColumnInfoPB c } private int getLength(int oid) { - return getTypeRegistry().get(oid).getDefaultLength(); +// return getTypeRegistry().get(oid).getDefaultLength(); + return -1; } private int getScale(int oid) { - return getTypeRegistry().get(oid).getDefaultScale(); +// return getTypeRegistry().get(oid).getDefaultScale(); + return -1; } private int resolveNativeType(int oid) { - return getTypeRegistry().get(oid).getRootType().getOid(); + int nativeOid = getTypeRegistry().get(oid).getRootType().getOid(); + LOGGER.info("VKVK native type oid is " + nativeOid); + return nativeOid; } private int resolveQLType(QLType type) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java index 7e68aeab..5e8c1648 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java @@ -87,7 +87,7 @@ public class YugabyteDBSnapshotChangeEventSource extends AbstractSnapshotChangeE public YugabyteDBSnapshotChangeEventSource(YugabyteDBConnectorConfig connectorConfig, YugabyteDBTaskContext taskContext, - Snapshotter snapshotter, YugabyteDBConnection connection, + Snapshotter snapshotter, YugabyteDBSchema schema, YugabyteDBEventDispatcher dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener) { super(connectorConfig, snapshotProgressListener); @@ -97,7 +97,7 @@ public YugabyteDBSnapshotChangeEventSource(YugabyteDBConnectorConfig connectorCo this.dispatcher = dispatcher; this.clock = clock; this.snapshotter = snapshotter; - this.connection = connection; + this.connection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL); this.snapshotProgressListener = snapshotProgressListener; AsyncYBClient asyncClient = new AsyncYBClient.AsyncYBClientBuilder(connectorConfig.masterAddresses()) @@ -155,7 +155,7 @@ public SnapshotResult execute(ChangeEventSourceContext partitions.forEach(YBPartition::markTableAsColocated); LOGGER.info("Setting offsetContext/previousOffset for snapshot..."); - previousOffset = YugabyteDBOffsetContext.initialContextForSnapshot(this.connectorConfig, connection, clock, partitions); + previousOffset = YugabyteDBOffsetContext.initialContextForSnapshot(this.connectorConfig, clock, partitions); this.partitionRanges = YugabyteDBConnectorUtils.populatePartitionRanges( connectorConfig.getConfig().getString(YugabyteDBConnectorConfig.HASH_RANGES_LIST)); @@ -188,6 +188,16 @@ public SnapshotResult execute(ChangeEventSourceContext else { snapshotProgressListener.snapshotAborted(partition); } + + // Close YugabyteDBConnection if opened. + try { + if (connection.isConnected()) { + LOGGER.info("Closing YugabyteDBConnection after snapshot"); + connection.close(); + } + } catch (SQLException sqle) { + LOGGER.error("Error while closing JDBC connection during snapshot", sqle); + } } } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 26d1dad9..edea8a71 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -101,12 +101,11 @@ public class YugabyteDBStreamingChangeEventSource implements protected Set splitTabletsWaitingForCallback; protected List partitionRanges; - public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorConfig, Snapshotter snapshotter, - YugabyteDBConnection connection, YugabyteDBEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, + public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorConfig, Snapshotter snapshotter, YugabyteDBEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, YugabyteDBSchema schema, YugabyteDBTaskContext taskContext, ReplicationConnection replicationConnection, ChangeEventQueue queue) { this.connectorConfig = connectorConfig; - this.connection = connection; + this.connection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL); this.dispatcher = dispatcher; this.errorHandler = errorHandler; this.clock = clock; @@ -144,7 +143,7 @@ public void execute(ChangeEventSourceContext context, YBPartition partition, Yug if (!hasStartLsnStoredInContext) { LOGGER.info("No start opid found in the context."); - offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock, partitions); + offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, clock, partitions); } try { // Populate partition ranges. diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBTypeRegistry.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBTypeRegistry.java index 617bcbc6..457a6297 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBTypeRegistry.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBTypeRegistry.java @@ -19,10 +19,13 @@ import java.util.Map; import java.util.Set; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; import org.apache.kafka.connect.errors.ConnectException; import org.postgresql.core.BaseConnection; import org.postgresql.core.TypeInfo; import org.postgresql.jdbc.PgDatabaseMetaData; +import org.postgresql.jdbc.TypeInfoCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,8 +118,9 @@ public Map getOidToType() { private final int maxConnectionRetries = 5; - private final YugabyteDBConnection yugabyteDBConnection; - private final transient TypeInfo typeInfo; + private YugabyteDBConnection yugabyteDBConnection; + private transient JdbcConfiguration jdbcConfig; + private transient TypeInfo typeInfo; private SqlTypeMapper sqlTypeMapper; private int geometryOid = Integer.MIN_VALUE; @@ -139,8 +143,8 @@ public Map getOidToType() { public YugabyteDBTypeRegistry(YugabyteDBConnection connection) { try { - this.connection = connection.connection(); this.yugabyteDBConnection = connection; + this.connection = this.yugabyteDBConnection.connection(); this.oidToType = new HashMap<>(); this.nameToType = new HashMap<>(); typeInfo = ((BaseConnection) this.connection).getTypeInfo(); @@ -152,13 +156,10 @@ public YugabyteDBTypeRegistry(YugabyteDBConnection connection) { } } - public YugabyteDBTypeRegistry(YugabyteDBTaskConnection connection, + public YugabyteDBTypeRegistry(JdbcConfiguration jdbcConfig, Map nameToType, - Map oidToType, - YugabyteDBConnection yugabyteDBConnection) { - this.connection = connection; - this.yugabyteDBConnection = yugabyteDBConnection; - typeInfo = ((BaseConnection) this.connection).getTypeInfo(); + Map oidToType) { + this.jdbcConfig = jdbcConfig; this.oidToType = oidToType; this.nameToType = nameToType; for (YugabyteDBType t : oidToType.values()) { @@ -171,7 +172,6 @@ private void addType(YugabyteDBType type) { nameToType.put(type.getName(), type); updateType(type); - // updateType(type.getOid()); } private void updateType(YugabyteDBType type) { @@ -208,39 +208,6 @@ else if (TYPE_NAME_LTREE_ARRAY.equals(type.getName())) { } } - private void updateTypeByOid(YugabyteDBType type) { - if (TYPE_NAME_GEOMETRY.equals(type.getName())) { - geometryOid = type.getOid(); - } - else if (TYPE_NAME_GEOGRAPHY.equals(type.getName())) { - geographyOid = type.getOid(); - } - else if (TYPE_NAME_CITEXT.equals(type.getName())) { - citextOid = type.getOid(); - } - else if (TYPE_NAME_HSTORE.equals(type.getName())) { - hstoreOid = type.getOid(); - } - else if (TYPE_NAME_LTREE.equals(type.getName())) { - ltreeOid = type.getOid(); - } - else if (TYPE_NAME_HSTORE_ARRAY.equals(type.getName())) { - hstoreArrayOid = type.getOid(); - } - else if (TYPE_NAME_GEOMETRY_ARRAY.equals(type.getName())) { - geometryArrayOid = type.getOid(); - } - else if (TYPE_NAME_GEOGRAPHY_ARRAY.equals(type.getName())) { - geographyArrayOid = type.getOid(); - } - else if (TYPE_NAME_CITEXT_ARRAY.equals(type.getName())) { - citextArrayOid = type.getOid(); - } - else if (TYPE_NAME_LTREE_ARRAY.equals(type.getName())) { - ltreeArrayOid = type.getOid(); - } - } - /** * @param oid - PostgreSQL OID * @return type associated with the given OID @@ -248,13 +215,13 @@ else if (TYPE_NAME_LTREE_ARRAY.equals(type.getName())) { public YugabyteDBType get(int oid) { YugabyteDBType r = oidToType.get(oid); if (r == null) { + LOGGER.trace("Looking up type from database for oid {}", oid); r = resolveUnknownType(oid); if (r == null) { LOGGER.warn("Unknown OID {} requested", oid); r = YugabyteDBType.UNKNOWN; } } - // new IllegalArgumentException().printStackTrace(); return r; } @@ -381,9 +348,6 @@ private void prime() throws SQLException { while (retryCount <= maxConnectionRetries) { try { final List delayResolvedBuilders = new ArrayList<>(); - if (retryCount > 0) { - this.connection = yugabyteDBConnection.connection(); - } final Statement statement = connection.createStatement(); final ResultSet rs = statement.executeQuery(SQL_TYPES); while (rs.next()) { @@ -411,7 +375,7 @@ private void prime() throws SQLException { LOGGER.error("Error while executing query on database, all the {} retries failed.", maxConnectionRetries); throw e; } - LOGGER.warn("Error while executing query on database, will retry. Attempt {} out of {}",retryCount, maxConnectionRetries ); + LOGGER.warn("Error while executing query on database, will retry. Attempt {} out of {} for error:", retryCount, maxConnectionRetries, e); } } } @@ -449,9 +413,6 @@ private YugabyteDBType resolveUnknownType(String name) { Exception exception = null; while (retryCount <= maxConnectionRetries) { try { - if (retryCount > 0) { - connection = yugabyteDBConnection.connection(); - } final PreparedStatement statement = connection.prepareStatement(SQL_NAME_LOOKUP); statement.setString(1, name); return loadType(statement); @@ -477,9 +438,6 @@ private YugabyteDBType resolveUnknownType(int lookupOid) { Exception exception = null; while (retryCount <= maxConnectionRetries) { try { - if (retryCount > 0) { - connection = yugabyteDBConnection.connection(); - } final PreparedStatement statement = connection.prepareStatement(SQL_OID_LOOKUP); statement.setInt(1, lookupOid); return loadType(statement); diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java b/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java index 7feda593..f35e4858 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java @@ -13,6 +13,7 @@ import java.time.ZoneOffset; import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; import org.postgresql.geometric.PGbox; import org.postgresql.geometric.PGcircle; import org.postgresql.geometric.PGline; @@ -191,10 +192,10 @@ public boolean isArray(YugabyteDBType type) { } @Override - public Object asArray(String columnName, YugabyteDBType type, String fullType, PgConnectionSupplier connection) { + public Object asArray(String columnName, YugabyteDBType type, String fullType, YugabyteDBConnection connection) { try { final String dataString = asString(); - return new PgArray(connection.get(), type.getOid(), dataString); + return new PgArray((BaseConnection) connection.connection(), type.getOid(), dataString); } catch (SQLException e) { LOGGER.warn("Unexpected exception trying to process PgArray ({}) column '{}', {}", fullType, columnName, e); @@ -204,7 +205,7 @@ public Object asArray(String columnName, YugabyteDBType type, String fullType, P @Override public Object asDefault(YugabyteDBTypeRegistry yugabyteDBTypeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, - PgConnectionSupplier connection) { + YugabyteDBConnection connection) { if (includeUnknownDatatypes) { // this includes things like PostGIS geoemetries or other custom types // leave up to the downstream message recipient to deal with diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java index ab634772..d1469877 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java @@ -12,6 +12,7 @@ import java.time.OffsetTime; import java.util.List; +import io.debezium.jdbc.JdbcConnection; import org.postgresql.geometric.PGbox; import org.postgresql.geometric.PGcircle; import org.postgresql.geometric.PGline; @@ -65,7 +66,7 @@ public interface Column { */ ColumnTypeMetadata getTypeMetadata(); - Object getValue(final PgConnectionSupplier connection, boolean includeUnknownDatatypes); + Object getValue(final YugabyteDBConnection connection, boolean includeUnknownDatatypes); boolean isOptional(); @@ -133,9 +134,9 @@ public interface ColumnValue { boolean isArray(YugabyteDBType type); - Object asArray(String columnName, YugabyteDBType type, String fullType, PgConnectionSupplier connection); + Object asArray(String columnName, YugabyteDBType type, String fullType, YugabyteDBConnection connection); - Object asDefault(YugabyteDBTypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, PgConnectionSupplier connection); + Object asDefault(YugabyteDBTypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, YugabyteDBConnection connection); } /** diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java index 4eb44512..f7b6cfed 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java @@ -16,6 +16,8 @@ import io.debezium.connector.yugabytedb.connection.ReplicationMessage.ColumnValue; import org.yb.QLType; +import java.sql.SQLException; + /** * @author Chris Cranford */ @@ -37,7 +39,7 @@ public class ReplicationMessageColumnValueResolver { * @return */ public static Object resolveValue(String columnName, YugabyteDBType type, String fullType, - ColumnValue value, final PgConnectionSupplier connection, + ColumnValue value, final YugabyteDBConnection connection, boolean includeUnknownDatatypes, YugabyteDBTypeRegistry yugabyteDBTypeRegistry) { if (value.isNull()) { @@ -52,8 +54,7 @@ public static Object resolveValue(String columnName, YugabyteDBType type, String includeUnknownDatatypes, yugabyteDBTypeRegistry); } - // CDCSDK this too we can avoid using connection. - // only date and string requires + // We will only open a connection once we detect that it is an array type. if (value.isArray(type)) { return value.asArray(columnName, type, fullType, connection); } diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java b/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java index 1645b131..79481869 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java @@ -58,6 +58,8 @@ public class YugabyteDBConnection extends JdbcConnection { public static final String CONNECTION_VALIDATE_CONNECTION = "Debezium Validate Connection"; public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat"; public static final String CONNECTION_GENERAL = "Debezium General"; + public static final String CONNECTION_TYPE_REGISTRY = "Debezium Type Registry"; + public static final String CONNECTION_TEST = "Debezium Test"; private static Logger LOGGER = LoggerFactory.getLogger(YugabyteDBConnection.class); diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputMessageDecoder.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputMessageDecoder.java index 75c36b8b..9008e71a 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputMessageDecoder.java @@ -683,7 +683,7 @@ private static List resolveColumnsFromStreamTupleData(ByteBuffer buffer, new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, optional, true) { @Override - public Object getValue(PgConnectionSupplier connection, + public Object getValue(YugabyteDBConnection connection, boolean includeUnknownDatatypes) { return YbOutputReplicationMessage.getValue(columnName, columnType, typeExpression, valueStr, connection, @@ -702,7 +702,7 @@ else if (type == 'n') { new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, true, true) { @Override - public Object getValue(PgConnectionSupplier connection, + public Object getValue(YugabyteDBConnection connection, boolean includeUnknownDatatypes) { return null; } diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java index b1298d78..8688326d 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java @@ -14,6 +14,7 @@ import io.debezium.connector.yugabytedb.YugabyteDBTypeRegistry; import io.debezium.connector.yugabytedb.connection.ReplicationMessage; import io.debezium.connector.yugabytedb.connection.ReplicationMessageColumnValueResolver; +import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; /** * @author Gunnar Morling @@ -91,7 +92,7 @@ public boolean shouldSchemaBeSynchronized() { * * @return the value; may be null */ - public static Object getValue(String columnName, YugabyteDBType type, String fullType, String rawValue, final PgConnectionSupplier connection, + public static Object getValue(String columnName, YugabyteDBType type, String fullType, String rawValue, final YugabyteDBConnection connection, boolean includeUnknownDataTypes, YugabyteDBTypeRegistry yugabyteDBTypeRegistry) { final YbOutputColumnValue columnValue = new YbOutputColumnValue(rawValue); return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDataTypes, yugabyteDBTypeRegistry); diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java index 531e7058..bda47a84 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java @@ -16,6 +16,8 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; +import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; +import org.postgresql.core.BaseConnection; import org.postgresql.geometric.PGpoint; import org.postgresql.jdbc.PgArray; import org.postgresql.util.PGmoney; @@ -310,7 +312,7 @@ public boolean isArray(YugabyteDBType type) { } @Override - public Object asArray(String columnName, YugabyteDBType type, String fullType, PgConnectionSupplier connection) { + public Object asArray(String columnName, YugabyteDBType type, String fullType, YugabyteDBConnection connection) { // Currently the logical decoding plugin sends unhandled types as a byte array containing the string // representation (in Postgres) of the array value. // The approach to decode this is sub-optimal but the only way to improve this is to update the plugin. @@ -328,7 +330,7 @@ public Object asArray(String columnName, YugabyteDBType type, String fullType, P * } */ final String dataString = asString(); - return new PgArray(connection.get(), type.getOid(), dataString); + return new PgArray((BaseConnection) connection.connection(), type.getOid(), dataString); /* * String dataString = new String(data, Charset.forName("UTF-8")); * PgArray arrayData = new PgArray(connection.get(), (int) value.getColumnType(), dataString); @@ -344,7 +346,7 @@ public Object asArray(String columnName, YugabyteDBType type, String fullType, P @Override public Object asDefault(YugabyteDBTypeRegistry yugabyteDBTypeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, - PgConnectionSupplier connection) { + YugabyteDBConnection connection) { final YugabyteDBType type = yugabyteDBTypeRegistry.get(columnType); if (type.getOid() == yugabyteDBTypeRegistry.geometryOid() || type.getOid() == yugabyteDBTypeRegistry.geographyOid() || diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java index 7fb4fb50..8b3af2d9 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java @@ -13,6 +13,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yb.Common; @@ -124,7 +125,7 @@ private List transform(List me return new AbstractReplicationMessageColumn(columnName, type, fullType, typeInfo.map(CdcService.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata()) { @Override - public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { + public Object getValue(YugabyteDBConnection connection, boolean includeUnknownDatatypes) { return YbProtoReplicationMessage.this.getValue(columnName, type, fullType, datum, connection, includeUnknownDatatypes); } @@ -139,7 +140,7 @@ public String toString() { return new AbstractReplicationMessageColumn(columnName, type, fullType, typeInfo.map(CdcService.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata()) { @Override - public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { + public Object getValue(YugabyteDBConnection connection, boolean includeUnknownDatatypes) { return YbProtoReplicationMessage.this.getValue(columnName, type, datum); } @Override @@ -160,7 +161,7 @@ public boolean isLastEventForLsn() { public Object getValue(String columnName, YugabyteDBType type, String fullType, Common.DatumMessagePB datumMessage, - final PgConnectionSupplier connection, + final YugabyteDBConnection connection, boolean includeUnknownDatatypes) { final YbProtoColumnValue columnValue = new YbProtoColumnValue(datumMessage); return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java index 4927ce8f..8f577599 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java @@ -12,6 +12,7 @@ import java.util.OptionalLong; import java.util.regex.Matcher; +import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; @@ -123,7 +124,7 @@ private List transform(final Document data, final Str columns.add(new AbstractReplicationMessageColumn(columnName, columnType, columnTypeName, columnOptional, true) { @Override - public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { + public Object getValue(YugabyteDBConnection connection, boolean includeUnknownDatatypes) { return Wal2JsonReplicationMessage.this.getValue(columnName, columnType, columnTypeName, rawValue, connection, includeUnknownDatatypes); } @@ -167,7 +168,7 @@ private String parseType(String columnName, String typeWithModifiers) { * * @return the value; may be null */ - public Object getValue(String columnName, YugabyteDBType type, String fullType, Value rawValue, final PgConnectionSupplier connection, + public Object getValue(String columnName, YugabyteDBType type, String fullType, Value rawValue, final YugabyteDBConnection connection, boolean includeUnknownDatatypes) { final Wal2JsonColumnValue columnValue = new Wal2JsonColumnValue(rawValue); return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDatatypes, yugabyteDBTypeRegistry); diff --git a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java index c58bc7f1..4546708a 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java +++ b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java @@ -176,11 +176,11 @@ public static YugabyteDBConnectorConfig.LogicalDecoder decoderPlugin() { */ public static YugabyteDBConnection create() { - return new YugabyteDBConnection(defaultJdbcConfig(CONTAINER_YSQL_HOST, CONTAINER_YSQL_PORT), YugabyteDBConnection.CONNECTION_GENERAL); + return new YugabyteDBConnection(defaultJdbcConfig(CONTAINER_YSQL_HOST, CONTAINER_YSQL_PORT), YugabyteDBConnection.CONNECTION_TEST); } public static YugabyteDBConnection createConnectionTo(String databaseName) { - return new YugabyteDBConnection(defaultJdbcConfig(CONTAINER_YSQL_HOST, CONTAINER_YSQL_PORT, databaseName), YugabyteDBConnection.CONNECTION_GENERAL); + return new YugabyteDBConnection(defaultJdbcConfig(CONTAINER_YSQL_HOST, CONTAINER_YSQL_PORT, databaseName), YugabyteDBConnection.CONNECTION_TEST); } /** @@ -193,7 +193,7 @@ public static YugabyteDBConnection createWithTypeRegistry() { return new YugabyteDBConnection( config.getJdbcConfig(), - getPostgresValueConverterBuilder(config), YugabyteDBConnection.CONNECTION_GENERAL); + getPostgresValueConverterBuilder(config), YugabyteDBConnection.CONNECTION_TEST); } /** @@ -204,7 +204,7 @@ public static YugabyteDBConnection createWithTypeRegistry() { * @return the PostgresConnection instance; never null */ public static YugabyteDBConnection create(String appName) { - return new YugabyteDBConnection(Objects.requireNonNull(defaultJdbcConfigBuilder()).with("ApplicationName", appName).build(), YugabyteDBConnection.CONNECTION_GENERAL); + return new YugabyteDBConnection(Objects.requireNonNull(defaultJdbcConfigBuilder()).with("ApplicationName", appName).build(), YugabyteDBConnection.CONNECTION_TEST); } /** @@ -346,7 +346,7 @@ public static String getMasterAddress() { public static YugabyteDBTypeRegistry getTypeRegistry() { final YugabyteDBConnectorConfig config = new YugabyteDBConnectorConfig(defaultConfig().build()); - try (final YugabyteDBConnection connection = new YugabyteDBConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), YugabyteDBConnection.CONNECTION_GENERAL)) { + try (final YugabyteDBConnection connection = new YugabyteDBConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), YugabyteDBConnection.CONNECTION_TEST)) { return connection.getTypeRegistry(); } } @@ -758,8 +758,8 @@ private static YugabyteDBValueConverterBuilder getPostgresValueConverterBuilder( public static Stream streamTypeProviderForStreaming() { return Stream.of( - Arguments.of(false, false), // Older stream - Arguments.of(true, false)); // NO_EXPORT stream + Arguments.of(false, false)); // Older stream +// Arguments.of(true, false)); // NO_EXPORT stream } public static Stream streamTypeProviderForSnapshot() { @@ -767,4 +767,31 @@ public static Stream streamTypeProviderForSnapshot() { Arguments.of(false, false), // Older stream Arguments.of(true, true)); // USE_SNAPSHOT stream } + + public static int getConnectionCount() throws SQLException { + return getConnectionCount(YugabyteDBConnection.CONNECTION_TEST); + } + + /** + * @param applicationName the name of the application to get connection count of + * @return the number of connections established on YugabyteDB service with the given application name + * @throws SQLException if a connection cannot be established + */ + public static int getConnectionCount(String applicationName) throws SQLException { + try (Connection conn = create().connection()) { + Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery( + String.format("SELECT COUNT(*) FROM pg_stat_activity WHERE application_name = '%s'", + applicationName)); + + if (rs.next()) { + return rs.getInt(1); + } else { + LOGGER.warn("No row returned while querying for connection count"); + } + } + + // Indicates some error while fetching connection count. + return -1; + } } diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java index 61be5e97..aa8b1ca2 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java @@ -3,17 +3,22 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.SQLException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.junit.logging.LogInterceptor; import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; import io.debezium.util.HexConverter; import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import org.awaitility.Awaitility; import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; @@ -26,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.*; -public class YugabyteDBCompleteTypesTest extends YugabyteDBContainerTestBase { +public class YugabyteDBCompleteTypesTest extends YugabytedTestBase { @BeforeAll public static void beforeClass() throws SQLException { initializeYBContainer(); @@ -34,8 +39,9 @@ public static void beforeClass() throws SQLException { } @BeforeEach - public void before() { + public void before() throws SQLException { initializeConnectorTestFramework(); + TestHelper.dropAllSchemas(); } @AfterEach @@ -72,7 +78,6 @@ private void consumeRecords(List records, long recordsCount) { @ParameterizedTest @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") public void verifyAllWorkingDataTypesInSingleTable(boolean consistentSnapshot, boolean useSnapshot) throws Exception { - TestHelper.dropAllSchemas(); TestHelper.executeDDL("yugabyte_create_tables.ddl"); Thread.sleep(1000); @@ -127,12 +132,84 @@ public void verifyAllWorkingDataTypesInSingleTable(boolean consistentSnapshot, b assertValueField(record, "after/ts/value", 1637841600123456L); assertValueField(record, "after/tstz/value", "2021-11-25T06:30:00Z"); assertValueField(record, "after/uuidval/value", "ffffffff-ffff-ffff-ffff-ffffffffffff"); + + // No array types are present here so we should not be opening any connection to database. + assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + } + + @ParameterizedTest + @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") + public void shouldCreateConnectionWhenArrayPresent(boolean consistentSnapshot, boolean useSnapshot) throws Exception { + TestHelper.execute("CREATE TABLE test_arr (id INT PRIMARY KEY, textarr text[], intarr int[]);"); + + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr", consistentSnapshot, useSnapshot); + Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_arr", dbStreamId); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + // Insert a record to the table. + TestHelper.execute("INSERT INTO test_arr VALUES (1, '{\"element1\",\"element2\"}', '{0,1,2}');"); + + List records = new ArrayList<>(); + waitAndFailIfCannotConsume(records, 1 /* records to consume */); + + assertEquals(1, records.size()); + + SourceRecord record = records.get(0); + Struct recordValue = (Struct) record.value(); + + List textArrayList = recordValue.getStruct("after").getStruct("textarr").getArray("value"); + assertEquals(textArrayList.get(0).toString(), "element1"); + assertEquals(textArrayList.get(1).toString(), "element2"); + + List intArrayList = recordValue.getStruct("after").getStruct("intarr").getArray("value"); + assertEquals(intArrayList.get(0).toString(), String.valueOf(0)); + assertEquals(intArrayList.get(1).toString(), String.valueOf(1)); + assertEquals(intArrayList.get(2).toString(), String.valueOf(2)); + + // Since we have one task running which has a array type present in it, we should expect + // a single JDBC connection on YugabyteDB service. + assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + } + + @ParameterizedTest + @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") + public void shouldCreateSingleConnectionAcrossSnapshotAndStreaming( + boolean consistentSnapshot, boolean useSnapshot) throws Exception { + LogInterceptor logInterceptor = new LogInterceptor(YugabyteDBStreamingChangeEventSource.class); + + TestHelper.execute("CREATE TABLE test_arr (id INT PRIMARY KEY, textarr text[], intarr int[]);"); + TestHelper.execute("INSERT INTO test_arr VALUES (1, '{\"element1\",\"element2\"}', '{0,1,2}');"); + + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr", consistentSnapshot, useSnapshot); + Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_arr", dbStreamId); + configBuilder.with(YugabyteDBConnectorConfig.SNAPSHOT_MODE, "initial"); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> logInterceptor.containsMessage("Beginning to poll the changes from the server")); + + // Insert another record in streaming phase. + TestHelper.execute("INSERT INTO test_arr VALUES (2, '{\"element1\",\"element2\"}', '{0,1,2}');"); + + List records = new ArrayList<>(); + waitAndFailIfCannotConsume(records, 2 /* records to consume */); + + assertEquals(2, records.size()); + + // No need to verify record values since they are being verified in another test, simply + // verify that we have only a single connection open. + assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); } @ParameterizedTest @ValueSource(strings = {"adaptive", "adaptive_time_microseconds", "connect"}) public void shouldEmitTimestampValuesWithCorrectPrecision(String temporalPrecisionMode) throws Exception { - TestHelper.dropAllSchemas(); TestHelper.executeDDL("yugabyte_create_tables.ddl"); Thread.sleep(1000); @@ -174,7 +251,6 @@ public void shouldEmitTimestampValuesWithCorrectPrecision(String temporalPrecisi @ParameterizedTest @ValueSource(strings = {"precise", "double", "string"}) public void shouldWorkWithAllDecimalTypes(String decimalMode) throws Exception { - TestHelper.dropAllSchemas(); TestHelper.executeDDL("yugabyte_create_tables.ddl"); Thread.sleep(1000); @@ -216,7 +292,6 @@ public void shouldWorkWithAllDecimalTypes(String decimalMode) throws Exception { @ParameterizedTest @ValueSource(strings = {"precise", "double", "string"}) public void shouldHaveLossOfPrecisionWithDecimalModeWithLargeValue(String decimalMode) throws Exception { - TestHelper.dropAllSchemas(); TestHelper.executeDDL("yugabyte_create_tables.ddl"); Thread.sleep(1000); diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java index ab1ef7ee..048950e3 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java @@ -8,6 +8,7 @@ import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; +import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import io.debezium.junit.logging.LogInterceptor; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -29,7 +30,7 @@ * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) */ -public class YugabyteDBDatatypesTest extends YugabyteDBContainerTestBase { +public class YugabyteDBDatatypesTest extends YugabytedTestBase { private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);" + "INSERT INTO s2.a (aa) VALUES (1);"; private static final String CREATE_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" + @@ -192,6 +193,13 @@ public void testRecordConsumption(boolean consistentSnapshot, boolean useSnapsho .exceptionally(throwable -> { throw new RuntimeException(throwable); }).get(); + + // Since this table has no datatypes which require a JDBC connection to be decoded, + // we shouldn't have opened any connections. + assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + + LOGGER.info("Waiting for 2 minutes"); + TestHelper.waitFor(Duration.ofMinutes(2)); } @ParameterizedTest From eafead728ab85bc28145d0d2c19bd1c5cca97ef7 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 12 Jun 2024 11:22:38 +0530 Subject: [PATCH 02/10] pass null to the jdbc parameters --- .../YugabyteDBChangeEventSourceFactory.java | 13 ++++++++++++- .../connector/yugabytedb/YugabyteDBConnector.java | 2 -- .../yugabytedb/YugabyteDBConnectorTask.java | 3 ++- .../YugabyteDBConsistentStreamingSource.java | 3 ++- .../yugabytedb/YugabyteDBEventDispatcher.java | 5 ++++- .../yugabytedb/YugabyteDBOffsetContext.java | 7 +++++-- .../connector/yugabytedb/YugabyteDBSchema.java | 12 +++++++----- .../YugabyteDBSnapshotChangeEventSource.java | 4 ++-- .../YugabyteDBStreamingChangeEventSource.java | 5 +++-- 9 files changed, 37 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java index 6bf9363f..5fa0fb51 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java @@ -73,6 +73,7 @@ public SnapshotChangeEventSource getSnapsh configuration, taskContext, snapshotter, + null, /* yugabyteDBConnection */ schema, dispatcher, clock, @@ -87,6 +88,7 @@ public StreamingChangeEventSource getStrea return new YugabyteDBStreamingChangeEventSource( configuration, snapshotter, + null, /* yugabyteDBConnection */ dispatcher, errorHandler, clock, @@ -99,6 +101,7 @@ public StreamingChangeEventSource getStrea return new YugabyteDBConsistentStreamingSource( configuration, snapshotter, + null, /* yugabyteDBConnection */ dispatcher, errorHandler, clock, @@ -113,6 +116,14 @@ public StreamingChangeEventSource getStrea public Optional> getIncrementalSnapshotChangeEventSource(YugabyteDBOffsetContext offsetContext, SnapshotProgressListener snapshotProgressListener, DataChangeEventListener dataChangeEventListener) { - return Optional.empty(); + final SignalBasedIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource( + configuration, + null, /* jdbcConnection */ + dispatcher, + schema, + clock, + snapshotProgressListener, + dataChangeEventListener); + return Optional.of(incrementalSnapshotChangeEventSource); } } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java index 7691130d..0f5807aa 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java @@ -117,9 +117,7 @@ public List> taskConfigs(int maxTasks) { String serializedOidToType = ""; try (YugabyteDBConnection connection = new YugabyteDBConnection(yugabyteDBConnectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_VALIDATE_CONNECTION)) { if (yugabyteDBConnectorConfig.isYSQLDbType()) { - LOGGER.info("Creating type registry"); YugabyteDBTypeRegistry typeRegistry = new YugabyteDBTypeRegistry(connection); - LOGGER.info("After creating type registry"); Map nameToType = typeRegistry.getNameToType(); Map oidToType = typeRegistry.getOidToType(); try { diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java index c91e04d6..1139911b 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java @@ -177,7 +177,8 @@ public ChangeEventSourceCoordinator start( YugabyteDBChangeRecordEmitter::updateSchema, metadataProvider, heartbeatFactory, - schemaNameAdjuster); + schemaNameAdjuster, + null /* jdbcConnection */); YugabyteDBChangeEventSourceCoordinator coordinator = new YugabyteDBChangeEventSourceCoordinator( previousOffsets, diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java index 24fba4a3..9399d237 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java @@ -29,13 +29,14 @@ public class YugabyteDBConsistentStreamingSource extends YugabyteDBStreamingChan public YugabyteDBConsistentStreamingSource(YugabyteDBConnectorConfig connectorConfig, Snapshotter snapshotter, + YugabyteDBConnection connection, YugabyteDBEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, YugabyteDBSchema schema, YugabyteDBTaskContext taskContext, ReplicationConnection replicationConnection, ChangeEventQueue queue) { - super(connectorConfig, snapshotter, dispatcher, errorHandler, clock, schema, + super(connectorConfig, snapshotter, connection, dispatcher, errorHandler, clock, schema, taskContext, replicationConnection, queue); } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBEventDispatcher.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBEventDispatcher.java index b62004bd..48602545 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBEventDispatcher.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBEventDispatcher.java @@ -55,6 +55,7 @@ public class YugabyteDBEventDispatcher extends Event private final InconsistentSchemaHandler inconsistentSchemaHandler; private final Signal signal; private final boolean neverSkip; + private final Heartbeat heartbeat; private final EnumSet skippedOperations; private final DataCollectionFilters.DataCollectionFilter filter; private final YugabyteDBTransactionMonitor transactionMonitor; @@ -63,7 +64,8 @@ public class YugabyteDBEventDispatcher extends Event public YugabyteDBEventDispatcher(YugabyteDBConnectorConfig connectorConfig, TopicSelector topicSelector, DatabaseSchema schema, ChangeEventQueue queue, DataCollectionFilters.DataCollectionFilter filter, ChangeEventCreator changeEventCreator, InconsistentSchemaHandler inconsistentSchemaHandler, - EventMetadataProvider metadataProvider, HeartbeatFactory heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster) { + EventMetadataProvider metadataProvider, HeartbeatFactory heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster, + JdbcConnection jdbcConnection) { super(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler, metadataProvider, heartbeatFactory, schemaNameAdjuster); this.connectorConfig = connectorConfig; @@ -72,6 +74,7 @@ public YugabyteDBEventDispatcher(YugabyteDBConnectorConfig connectorConfig, Topi this.logicalDecodingMessageMonitor = new LogicalDecodingMessageMonitor(connectorConfig, this::enqueueLogicalDecodingMessage); this.messageFilter = connectorConfig.getMessageFilter(); this.topicSelector = topicSelector; + this.heartbeat = heartbeatFactory.createHeartbeat(); this.streamingReceiver = new YugabyteDBStreamingChangeRecordReceiver(); this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema; this.signal = new Signal<>(connectorConfig, this); diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 03f71044..2e133302 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -83,21 +83,24 @@ public YugabyteDBOffsetContext(Offsets pre } public static YugabyteDBOffsetContext initialContextForSnapshot(YugabyteDBConnectorConfig connectorConfig, + YugabyteDBConnection jdbcConnection, Clock clock, Set partitions) { - return initialContext(connectorConfig, clock, snapshotStartLsn(), + return initialContext(connectorConfig, jdbcConnection, clock, snapshotStartLsn(), snapshotStartLsn(), partitions); } public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig, + YugabyteDBConnection jdbcConnection, Clock clock, Set partitions) { LOGGER.info("Initializing streaming context"); - return initialContext(connectorConfig, clock, streamingStartLsn(), + return initialContext(connectorConfig, jdbcConnection, clock, streamingStartLsn(), streamingStartLsn(), partitions); } public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig, + YugabyteDBConnection jdbcConnection, Clock clock, OpId lastCommitLsn, OpId lastCompletelyProcessedLsn, diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java index e051a6c0..a712225d 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java @@ -373,19 +373,21 @@ protected Optional readTableColumn(CdcService.CDCSDKColumnInfoPB c } private int getLength(int oid) { -// return getTypeRegistry().get(oid).getDefaultLength(); + // YB Note: Since we are anyway getting the default length as -1 as the service doesn't send + // this value, it will not make any difference to return it directly without looking it up. + // return getTypeRegistry().get(oid).getDefaultLength(); return -1; } private int getScale(int oid) { -// return getTypeRegistry().get(oid).getDefaultScale(); + // YB Note: Since we are anyway getting the default scale as -1 as the service doesn't send + // this value, it will not make any difference to return it directly without looking it up. + // return getTypeRegistry().get(oid).getDefaultScale(); return -1; } private int resolveNativeType(int oid) { - int nativeOid = getTypeRegistry().get(oid).getRootType().getOid(); - LOGGER.info("VKVK native type oid is " + nativeOid); - return nativeOid; + return getTypeRegistry().get(oid).getRootType().getOid(); } private int resolveQLType(QLType type) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java index 5e8c1648..5bf7056e 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java @@ -87,7 +87,7 @@ public class YugabyteDBSnapshotChangeEventSource extends AbstractSnapshotChangeE public YugabyteDBSnapshotChangeEventSource(YugabyteDBConnectorConfig connectorConfig, YugabyteDBTaskContext taskContext, - Snapshotter snapshotter, + Snapshotter snapshotter, YugabyteDBConnection connection, YugabyteDBSchema schema, YugabyteDBEventDispatcher dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener) { super(connectorConfig, snapshotProgressListener); @@ -155,7 +155,7 @@ public SnapshotResult execute(ChangeEventSourceContext partitions.forEach(YBPartition::markTableAsColocated); LOGGER.info("Setting offsetContext/previousOffset for snapshot..."); - previousOffset = YugabyteDBOffsetContext.initialContextForSnapshot(this.connectorConfig, clock, partitions); + previousOffset = YugabyteDBOffsetContext.initialContextForSnapshot(this.connectorConfig, null /* connection */, clock, partitions); this.partitionRanges = YugabyteDBConnectorUtils.populatePartitionRanges( connectorConfig.getConfig().getString(YugabyteDBConnectorConfig.HASH_RANGES_LIST)); diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index edea8a71..3368850e 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -101,7 +101,8 @@ public class YugabyteDBStreamingChangeEventSource implements protected Set splitTabletsWaitingForCallback; protected List partitionRanges; - public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorConfig, Snapshotter snapshotter, YugabyteDBEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, + public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorConfig, Snapshotter snapshotter, + YugabyteDBConnection connection, YugabyteDBEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, YugabyteDBSchema schema, YugabyteDBTaskContext taskContext, ReplicationConnection replicationConnection, ChangeEventQueue queue) { this.connectorConfig = connectorConfig; @@ -143,7 +144,7 @@ public void execute(ChangeEventSourceContext context, YBPartition partition, Yug if (!hasStartLsnStoredInContext) { LOGGER.info("No start opid found in the context."); - offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, clock, partitions); + offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, null /* connection */, clock, partitions); } try { // Populate partition ranges. From 4e492ca770b3c5dce8fcc6600b63feb86a201f74 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 12 Jun 2024 14:25:03 +0530 Subject: [PATCH 03/10] added tests with shouldNotOpenConnectionAfterRestartOnceArrayColumnIsDropped still failing --- .../YugabyteDBCompleteTypesTest.java | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java index aa8b1ca2..90f3e849 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java @@ -16,6 +16,7 @@ import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.awaitility.Awaitility; @@ -207,6 +208,145 @@ public void shouldCreateSingleConnectionAcrossSnapshotAndStreaming( assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); } + @ParameterizedTest + @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") + public void shouldOpenConnectionOnceArrayTypeColumnIsAdded( + boolean consistentSnapshot, boolean useSnapshot) throws Exception { + TestHelper.execute("CREATE TABLE test_table (id INT PRIMARY KEY);"); + + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_table", consistentSnapshot, useSnapshot); + Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_table", dbStreamId); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + // Insert a record to the table. + TestHelper.execute("INSERT INTO test_table VALUES (1);"); + + List records = new ArrayList<>(); + waitAndFailIfCannotConsume(records, 1 /* records to consume */); + + assertEquals(1, records.size()); + + // We do not have any array type present yet, so there should not be any connection. + assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + + // Alter the table and add an array type column. + TestHelper.execute("ALTER TABLE test_table ADD COLUMN textarr text[];"); + TestHelper.execute("INSERT INTO test_table VALUES (2, '{\"element1\",\"element2\"}');"); + + // Consume another record. + waitAndFailIfCannotConsume(records, 1 /* records to consume */); + + SourceRecord recordWithArrayType = records.get(1); + Struct recordValue = (Struct) recordWithArrayType.value(); + List textArrayList = recordValue.getStruct("after").getStruct("textarr").getArray("value"); + assertEquals(textArrayList.get(0).toString(), "element1"); + assertEquals(textArrayList.get(1).toString(), "element2"); + + // Now we should see a database connection being opened since we have to decode an array type. + assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + } + + @ParameterizedTest + @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") + public void shouldNotOpenConnectionWithEnumTypes( + boolean consistentSnapshot, boolean useSnapshot) throws Exception { + TestHelper.executeDDL("yugabyte_create_tables.ddl"); + + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_enum", consistentSnapshot, useSnapshot); + Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_enum", dbStreamId); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + // Insert a record. + TestHelper.execute("INSERT INTO test_enum VALUES (1, 'ONE');"); + + List records = new ArrayList<>(); + waitAndFailIfCannotConsume(records, 1 /* records to consume */); + + assertValueField(records.get(0), "after/enum_col/value", "ONE"); + + // There should not be any connection open to server. + assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + } + + @ParameterizedTest + @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") + public void shouldNotOpenConnectionAfterRestartOnceArrayColumnIsDropped( + boolean consistentSnapshot, boolean useSnapshot) throws Exception { + LogInterceptor logInterceptor = new LogInterceptor(YugabyteDBStreamingChangeEventSource.class); + + TestHelper.execute("CREATE TABLE test_arr (id INT PRIMARY KEY, textarr text[]);"); + + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr", consistentSnapshot, useSnapshot); + Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_arr", dbStreamId); + configBuilder.with(YugabyteDBConnectorConfig.SNAPSHOT_MODE, "initial"); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + // Insert a record. + TestHelper.execute("INSERT INTO test_arr VALUES (1, '{\"element1\",\"element2\"}');"); + + List records = new ArrayList<>(); + waitAndFailIfCannotConsume(records, 1 /* records to consume */); + + assertEquals(1, records.size()); + + // No need to verify record values since they are being verified in another test, simply + // verify that we have only a single connection open. + assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + + // Drop the array column on the table and insert another record. + TestHelper.execute("ALTER TABLE test_arr DROP COLUMN textarr;"); + TestHelper.execute("INSERT INTO test_arr VALUES (2);"); + + waitAndFailIfCannotConsume(records, 1 /* 1 new record to consume */); + + SourceRecord recordWithOutArrayTypeColumn = records.get(1); + Struct recordValue = (Struct) recordWithOutArrayTypeColumn.value(); + assertEquals(2, recordValue.getStruct("after").getStruct("id").getInt32("value")); + for (Field field : recordValue.schema().fields()) { + assertNotEquals("textarr", field.name()); + } + + // The connection will still remain open as we do not close the existing connection once + // the array column is removed from the schema. + assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + + // Restart the connector after a wait. + stopConnector(); + TestHelper.waitFor(Duration.ofSeconds(10)); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + // Wait till we reach the streaming phase. + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> logInterceptor.countOccurrences("Beginning to poll the changes from the server") == 2); + + // Insert a new record once the connector is up and since we do not have any array columns + // now, there should not be any connection open. + TestHelper.execute("INSERT INTO test_arr VALUES (3);"); + waitAndFailIfCannotConsume(records, 1 /* 1 new record to consume */); + + SourceRecord recordAfterRestart = records.get(2); + Struct recordValueAfterRestart = (Struct) recordAfterRestart.value(); + assertEquals(3, recordValueAfterRestart.getStruct("after").getStruct("id").getInt32("value")); + for (Field field : recordValueAfterRestart.schema().fields()) { + assertNotEquals("textarr", field.name()); + } + + LOGGER.info("Check if a connection is still open till a timeout"); + TestHelper.waitFor(Duration.ofMinutes(2)); + + assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + } + @ParameterizedTest @ValueSource(strings = {"adaptive", "adaptive_time_microseconds", "connect"}) public void shouldEmitTimestampValuesWithCorrectPrecision(String temporalPrecisionMode) throws Exception { From eaaa0eb3debca9caf9420d8621899f8268a74af1 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 12 Jun 2024 17:30:28 +0530 Subject: [PATCH 04/10] addressed review comments --- .../YugabyteDBChangeEventSourceFactory.java | 27 ++++++++++--------- .../yugabytedb/YugabyteDBConnectorTask.java | 1 + 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java index 5fa0fb51..cd4c0c08 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java @@ -32,6 +32,7 @@ public class YugabyteDBChangeEventSourceFactory implements ChangeEventSourceFact private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBChangeEventSourceFactory.class); private final YugabyteDBConnectorConfig configuration; + private final YugabyteDBConnection jdbcConnection; private final ErrorHandler errorHandler; private final YugabyteDBEventDispatcher dispatcher; private final Clock clock; @@ -45,6 +46,7 @@ public class YugabyteDBChangeEventSourceFactory implements ChangeEventSourceFact public YugabyteDBChangeEventSourceFactory(YugabyteDBConnectorConfig configuration, Snapshotter snapshotter, + YugabyteDBConnection jdbcConnection, ErrorHandler errorHandler, YugabyteDBEventDispatcher dispatcher, Clock clock, YugabyteDBSchema schema, @@ -54,6 +56,7 @@ public YugabyteDBChangeEventSourceFactory(YugabyteDBConnectorConfig configuratio SlotState startingSlotInfo, ChangeEventQueue queue) { this.configuration = configuration; + this.jdbcConnection = jdbcConnection; this.errorHandler = errorHandler; this.dispatcher = dispatcher; this.clock = clock; @@ -73,7 +76,7 @@ public SnapshotChangeEventSource getSnapsh configuration, taskContext, snapshotter, - null, /* yugabyteDBConnection */ + jdbcConnection, schema, dispatcher, clock, @@ -88,7 +91,7 @@ public StreamingChangeEventSource getStrea return new YugabyteDBStreamingChangeEventSource( configuration, snapshotter, - null, /* yugabyteDBConnection */ + jdbcConnection, dispatcher, errorHandler, clock, @@ -101,7 +104,7 @@ public StreamingChangeEventSource getStrea return new YugabyteDBConsistentStreamingSource( configuration, snapshotter, - null, /* yugabyteDBConnection */ + jdbcConnection, dispatcher, errorHandler, clock, @@ -116,14 +119,14 @@ public StreamingChangeEventSource getStrea public Optional> getIncrementalSnapshotChangeEventSource(YugabyteDBOffsetContext offsetContext, SnapshotProgressListener snapshotProgressListener, DataChangeEventListener dataChangeEventListener) { - final SignalBasedIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource( - configuration, - null, /* jdbcConnection */ - dispatcher, - schema, - clock, - snapshotProgressListener, - dataChangeEventListener); - return Optional.of(incrementalSnapshotChangeEventSource); + final SignalBasedIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource( + configuration, + jdbcConnection, + dispatcher, + schema, + clock, + snapshotProgressListener, + dataChangeEventListener); + return Optional.of(incrementalSnapshotChangeEventSource); } } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java index 1139911b..d5de8196 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java @@ -188,6 +188,7 @@ public ChangeEventSourceCoordinator start( new YugabyteDBChangeEventSourceFactory( connectorConfig, snapshotter, + null, /* jdbcConnection */ errorHandler, dispatcher, clock, From b86300c7b325e1b15a303648a6807c8149c9bf38 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 13 Jun 2024 13:48:07 +0530 Subject: [PATCH 05/10] reverted the change to pass YugabyteDBConnection instead of PgConnectionSupplier --- .../YugabyteDBChangeRecordEmitter.java | 4 +- .../yugabytedb/YugabyteDBConnectorTask.java | 7 +-- .../yugabytedb/YugabyteDBTypeRegistry.java | 49 +++++++++++++++++-- .../connection/AbstractColumnValue.java | 6 +-- .../connection/ReplicationMessage.java | 6 +-- ...ReplicationMessageColumnValueResolver.java | 2 +- .../connection/YugabyteDBConnection.java | 1 + .../pgoutput/YbOutputMessageDecoder.java | 4 +- .../pgoutput/YbOutputReplicationMessage.java | 2 +- .../pgproto/YbProtoColumnValue.java | 6 +-- .../pgproto/YbProtoReplicationMessage.java | 6 +-- .../wal2json/Wal2JsonReplicationMessage.java | 4 +- 12 files changed, 70 insertions(+), 27 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java index 9eb2db07..2e451790 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java @@ -196,7 +196,7 @@ private Object[] columnValues(List columns, TableId t undeliveredToastableColumns.remove(columnName); int position = getPosition(columnName, table, values); if (position != -1) { - Object value = column.getValue(connection, connectorConfig.includeUnknownDatatypes()); + Object value = column.getValue(() -> (BaseConnection) connection.connection(), connectorConfig.includeUnknownDatatypes()); // values[position] = value; values[position] = new Object[]{ value, Boolean.TRUE }; } @@ -239,7 +239,7 @@ private Object[] updatedColumnValues(List columns, Ta int position = getPosition(columnName, table, values); if (position != -1) { - Object value = column.getValue(connection, connectorConfig.includeUnknownDatatypes()); + Object value = column.getValue(() -> (BaseConnection) connection.connection(), connectorConfig.includeUnknownDatatypes()); values[position] = new Object[]{ value, Boolean.TRUE }; } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java index d5de8196..1d7f16e1 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java @@ -79,6 +79,9 @@ public ChangeEventSourceCoordinator start( final String databaseCharsetName = config.getString(YugabyteDBConnectorConfig.CHAR_SET); final Charset databaseCharset = Charset.forName(databaseCharsetName); + Encoding encoding = Encoding.defaultEncoding(); // UTF-8 + YugabyteDBTaskConnection taskConnection = new YugabyteDBTaskConnection(encoding); + final YugabyteDBValueConverterBuilder valueConverterBuilder = (typeRegistry) -> YugabyteDBValueConverter.of( connectorConfig, databaseCharset, @@ -108,7 +111,7 @@ public ChangeEventSourceCoordinator start( // This type registry is being build with the nameToType and oidToType map populated. final YugabyteDBTypeRegistry yugabyteDBTypeRegistry = - new YugabyteDBTypeRegistry(connectorConfig.getJdbcConfig(), nameToType, oidToType); + new YugabyteDBTypeRegistry(taskConnection, nameToType, oidToType, null /* yugabyteDBConnection */); schema = new YugabyteDBSchema(connectorConfig, yugabyteDBTypeRegistry, topicSelector, valueConverterBuilder.build(yugabyteDBTypeRegistry)); @@ -147,7 +150,6 @@ public ChangeEventSourceCoordinator start( final YugabyteDBEventMetadataProvider metadataProvider = new YugabyteDBEventMetadataProvider(); - // todo Vaibhav: see if we can get rid of heartbeat factory HeartbeatFactory heartbeatFactory = new HeartbeatFactory<>( connectorConfig, topicSelector, @@ -213,7 +215,6 @@ public ChangeEventSourceCoordinator start( } } - // todo Vaibhav: not being used. Map getPreviousOffsetss( Partition.Provider provider, OffsetContext.Loader loader) { diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBTypeRegistry.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBTypeRegistry.java index 457a6297..13a15787 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBTypeRegistry.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBTypeRegistry.java @@ -119,7 +119,6 @@ public Map getOidToType() { private final int maxConnectionRetries = 5; private YugabyteDBConnection yugabyteDBConnection; - private transient JdbcConfiguration jdbcConfig; private transient TypeInfo typeInfo; private SqlTypeMapper sqlTypeMapper; @@ -156,10 +155,10 @@ public YugabyteDBTypeRegistry(YugabyteDBConnection connection) { } } - public YugabyteDBTypeRegistry(JdbcConfiguration jdbcConfig, + public YugabyteDBTypeRegistry(YugabyteDBTaskConnection connection, Map nameToType, - Map oidToType) { - this.jdbcConfig = jdbcConfig; + Map oidToType, + YugabyteDBConnection yugabyteDBConnection) { this.oidToType = oidToType; this.nameToType = nameToType; for (YugabyteDBType t : oidToType.values()) { @@ -208,6 +207,39 @@ else if (TYPE_NAME_LTREE_ARRAY.equals(type.getName())) { } } + private void updateTypeByOid(YugabyteDBType type) { + if (TYPE_NAME_GEOMETRY.equals(type.getName())) { + geometryOid = type.getOid(); + } + else if (TYPE_NAME_GEOGRAPHY.equals(type.getName())) { + geographyOid = type.getOid(); + } + else if (TYPE_NAME_CITEXT.equals(type.getName())) { + citextOid = type.getOid(); + } + else if (TYPE_NAME_HSTORE.equals(type.getName())) { + hstoreOid = type.getOid(); + } + else if (TYPE_NAME_LTREE.equals(type.getName())) { + ltreeOid = type.getOid(); + } + else if (TYPE_NAME_HSTORE_ARRAY.equals(type.getName())) { + hstoreArrayOid = type.getOid(); + } + else if (TYPE_NAME_GEOMETRY_ARRAY.equals(type.getName())) { + geometryArrayOid = type.getOid(); + } + else if (TYPE_NAME_GEOGRAPHY_ARRAY.equals(type.getName())) { + geographyArrayOid = type.getOid(); + } + else if (TYPE_NAME_CITEXT_ARRAY.equals(type.getName())) { + citextArrayOid = type.getOid(); + } + else if (TYPE_NAME_LTREE_ARRAY.equals(type.getName())) { + ltreeArrayOid = type.getOid(); + } + } + /** * @param oid - PostgreSQL OID * @return type associated with the given OID @@ -348,6 +380,9 @@ private void prime() throws SQLException { while (retryCount <= maxConnectionRetries) { try { final List delayResolvedBuilders = new ArrayList<>(); + if (retryCount > 0) { + this.connection = yugabyteDBConnection.connection(); + } final Statement statement = connection.createStatement(); final ResultSet rs = statement.executeQuery(SQL_TYPES); while (rs.next()) { @@ -413,6 +448,9 @@ private YugabyteDBType resolveUnknownType(String name) { Exception exception = null; while (retryCount <= maxConnectionRetries) { try { + if (retryCount > 0) { + connection = yugabyteDBConnection.connection(); + } final PreparedStatement statement = connection.prepareStatement(SQL_NAME_LOOKUP); statement.setString(1, name); return loadType(statement); @@ -438,6 +476,9 @@ private YugabyteDBType resolveUnknownType(int lookupOid) { Exception exception = null; while (retryCount <= maxConnectionRetries) { try { + if (retryCount > 0) { + connection = yugabyteDBConnection.connection(); + } final PreparedStatement statement = connection.prepareStatement(SQL_OID_LOOKUP); statement.setInt(1, lookupOid); return loadType(statement); diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java b/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java index f35e4858..454aaa75 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java @@ -192,10 +192,10 @@ public boolean isArray(YugabyteDBType type) { } @Override - public Object asArray(String columnName, YugabyteDBType type, String fullType, YugabyteDBConnection connection) { + public Object asArray(String columnName, YugabyteDBType type, String fullType, PgConnectionSupplier connection) { try { final String dataString = asString(); - return new PgArray((BaseConnection) connection.connection(), type.getOid(), dataString); + return new PgArray(connection.get(), type.getOid(), dataString); } catch (SQLException e) { LOGGER.warn("Unexpected exception trying to process PgArray ({}) column '{}', {}", fullType, columnName, e); @@ -205,7 +205,7 @@ public Object asArray(String columnName, YugabyteDBType type, String fullType, Y @Override public Object asDefault(YugabyteDBTypeRegistry yugabyteDBTypeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, - YugabyteDBConnection connection) { + PgConnectionSupplier connection) { if (includeUnknownDatatypes) { // this includes things like PostGIS geoemetries or other custom types // leave up to the downstream message recipient to deal with diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java index d1469877..9632c28c 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java @@ -66,7 +66,7 @@ public interface Column { */ ColumnTypeMetadata getTypeMetadata(); - Object getValue(final YugabyteDBConnection connection, boolean includeUnknownDatatypes); + Object getValue(final PgConnectionSupplier connection, boolean includeUnknownDatatypes); boolean isOptional(); @@ -134,9 +134,9 @@ public interface ColumnValue { boolean isArray(YugabyteDBType type); - Object asArray(String columnName, YugabyteDBType type, String fullType, YugabyteDBConnection connection); + Object asArray(String columnName, YugabyteDBType type, String fullType, PgConnectionSupplier connection); - Object asDefault(YugabyteDBTypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, YugabyteDBConnection connection); + Object asDefault(YugabyteDBTypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, PgConnectionSupplier connection); } /** diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java index f7b6cfed..98dcd6dc 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java @@ -39,7 +39,7 @@ public class ReplicationMessageColumnValueResolver { * @return */ public static Object resolveValue(String columnName, YugabyteDBType type, String fullType, - ColumnValue value, final YugabyteDBConnection connection, + ColumnValue value, final PgConnectionSupplier connection, boolean includeUnknownDatatypes, YugabyteDBTypeRegistry yugabyteDBTypeRegistry) { if (value.isNull()) { diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java b/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java index 79481869..c239a8da 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java @@ -7,6 +7,7 @@ package io.debezium.connector.yugabytedb.connection; import java.nio.charset.Charset; +import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputMessageDecoder.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputMessageDecoder.java index 9008e71a..75c36b8b 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputMessageDecoder.java @@ -683,7 +683,7 @@ private static List resolveColumnsFromStreamTupleData(ByteBuffer buffer, new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, optional, true) { @Override - public Object getValue(YugabyteDBConnection connection, + public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { return YbOutputReplicationMessage.getValue(columnName, columnType, typeExpression, valueStr, connection, @@ -702,7 +702,7 @@ else if (type == 'n') { new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, true, true) { @Override - public Object getValue(YugabyteDBConnection connection, + public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { return null; } diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java index 8688326d..412592b9 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java @@ -92,7 +92,7 @@ public boolean shouldSchemaBeSynchronized() { * * @return the value; may be null */ - public static Object getValue(String columnName, YugabyteDBType type, String fullType, String rawValue, final YugabyteDBConnection connection, + public static Object getValue(String columnName, YugabyteDBType type, String fullType, String rawValue, final PgConnectionSupplier connection, boolean includeUnknownDataTypes, YugabyteDBTypeRegistry yugabyteDBTypeRegistry) { final YbOutputColumnValue columnValue = new YbOutputColumnValue(rawValue); return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDataTypes, yugabyteDBTypeRegistry); diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java index bda47a84..d17bfedd 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java @@ -312,7 +312,7 @@ public boolean isArray(YugabyteDBType type) { } @Override - public Object asArray(String columnName, YugabyteDBType type, String fullType, YugabyteDBConnection connection) { + public Object asArray(String columnName, YugabyteDBType type, String fullType, PgConnectionSupplier connection) { // Currently the logical decoding plugin sends unhandled types as a byte array containing the string // representation (in Postgres) of the array value. // The approach to decode this is sub-optimal but the only way to improve this is to update the plugin. @@ -330,7 +330,7 @@ public Object asArray(String columnName, YugabyteDBType type, String fullType, Y * } */ final String dataString = asString(); - return new PgArray((BaseConnection) connection.connection(), type.getOid(), dataString); + return new PgArray(connection.get(), type.getOid(), dataString); /* * String dataString = new String(data, Charset.forName("UTF-8")); * PgArray arrayData = new PgArray(connection.get(), (int) value.getColumnType(), dataString); @@ -346,7 +346,7 @@ public Object asArray(String columnName, YugabyteDBType type, String fullType, Y @Override public Object asDefault(YugabyteDBTypeRegistry yugabyteDBTypeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, - YugabyteDBConnection connection) { + PgConnectionSupplier connection) { final YugabyteDBType type = yugabyteDBTypeRegistry.get(columnType); if (type.getOid() == yugabyteDBTypeRegistry.geometryOid() || type.getOid() == yugabyteDBTypeRegistry.geographyOid() || diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java index 8b3af2d9..21487afb 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java @@ -125,7 +125,7 @@ private List transform(List me return new AbstractReplicationMessageColumn(columnName, type, fullType, typeInfo.map(CdcService.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata()) { @Override - public Object getValue(YugabyteDBConnection connection, boolean includeUnknownDatatypes) { + public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { return YbProtoReplicationMessage.this.getValue(columnName, type, fullType, datum, connection, includeUnknownDatatypes); } @@ -140,7 +140,7 @@ public String toString() { return new AbstractReplicationMessageColumn(columnName, type, fullType, typeInfo.map(CdcService.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata()) { @Override - public Object getValue(YugabyteDBConnection connection, boolean includeUnknownDatatypes) { + public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { return YbProtoReplicationMessage.this.getValue(columnName, type, datum); } @Override @@ -161,7 +161,7 @@ public boolean isLastEventForLsn() { public Object getValue(String columnName, YugabyteDBType type, String fullType, Common.DatumMessagePB datumMessage, - final YugabyteDBConnection connection, + final PgConnectionSupplier connection, boolean includeUnknownDatatypes) { final YbProtoColumnValue columnValue = new YbProtoColumnValue(datumMessage); return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java index 8f577599..622f1126 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java @@ -124,7 +124,7 @@ private List transform(final Document data, final Str columns.add(new AbstractReplicationMessageColumn(columnName, columnType, columnTypeName, columnOptional, true) { @Override - public Object getValue(YugabyteDBConnection connection, boolean includeUnknownDatatypes) { + public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { return Wal2JsonReplicationMessage.this.getValue(columnName, columnType, columnTypeName, rawValue, connection, includeUnknownDatatypes); } @@ -168,7 +168,7 @@ private String parseType(String columnName, String typeWithModifiers) { * * @return the value; may be null */ - public Object getValue(String columnName, YugabyteDBType type, String fullType, Value rawValue, final YugabyteDBConnection connection, + public Object getValue(String columnName, YugabyteDBType type, String fullType, Value rawValue, final PgConnectionSupplier connection, boolean includeUnknownDatatypes) { final Wal2JsonColumnValue columnValue = new Wal2JsonColumnValue(rawValue); return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDatatypes, yugabyteDBTypeRegistry); From e56f4febaa05fc3e31890f2d10938c5f4616c7d7 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 13 Jun 2024 13:53:20 +0530 Subject: [PATCH 06/10] remove unused imports --- .../connector/yugabytedb/connection/AbstractColumnValue.java | 1 - .../connector/yugabytedb/connection/ReplicationMessage.java | 1 - .../connection/ReplicationMessageColumnValueResolver.java | 2 -- .../connector/yugabytedb/connection/YugabyteDBConnection.java | 1 - .../connection/pgoutput/YbOutputReplicationMessage.java | 1 - .../yugabytedb/connection/pgproto/YbProtoColumnValue.java | 2 -- .../connection/pgproto/YbProtoReplicationMessage.java | 1 - .../connection/wal2json/Wal2JsonReplicationMessage.java | 1 - 8 files changed, 10 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java b/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java index 454aaa75..7feda593 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/AbstractColumnValue.java @@ -13,7 +13,6 @@ import java.time.ZoneOffset; import org.apache.kafka.connect.errors.ConnectException; -import org.postgresql.core.BaseConnection; import org.postgresql.geometric.PGbox; import org.postgresql.geometric.PGcircle; import org.postgresql.geometric.PGline; diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java index 9632c28c..ab634772 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java @@ -12,7 +12,6 @@ import java.time.OffsetTime; import java.util.List; -import io.debezium.jdbc.JdbcConnection; import org.postgresql.geometric.PGbox; import org.postgresql.geometric.PGcircle; import org.postgresql.geometric.PGline; diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java index 98dcd6dc..3f439207 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessageColumnValueResolver.java @@ -16,8 +16,6 @@ import io.debezium.connector.yugabytedb.connection.ReplicationMessage.ColumnValue; import org.yb.QLType; -import java.sql.SQLException; - /** * @author Chris Cranford */ diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java b/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java index c239a8da..79481869 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/YugabyteDBConnection.java @@ -7,7 +7,6 @@ package io.debezium.connector.yugabytedb.connection; import java.nio.charset.Charset; -import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java index 412592b9..b1298d78 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgoutput/YbOutputReplicationMessage.java @@ -14,7 +14,6 @@ import io.debezium.connector.yugabytedb.YugabyteDBTypeRegistry; import io.debezium.connector.yugabytedb.connection.ReplicationMessage; import io.debezium.connector.yugabytedb.connection.ReplicationMessageColumnValueResolver; -import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; /** * @author Gunnar Morling diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java index d17bfedd..531e7058 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoColumnValue.java @@ -16,8 +16,6 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; -import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; -import org.postgresql.core.BaseConnection; import org.postgresql.geometric.PGpoint; import org.postgresql.jdbc.PgArray; import org.postgresql.util.PGmoney; diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java index 21487afb..7fb4fb50 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java @@ -13,7 +13,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yb.Common; diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java b/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java index 622f1126..4927ce8f 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/wal2json/Wal2JsonReplicationMessage.java @@ -12,7 +12,6 @@ import java.util.OptionalLong; import java.util.regex.Matcher; -import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; From 3796d0e4511d3f45bd2c84e2e741a77f55d8d961 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 13 Jun 2024 13:55:44 +0530 Subject: [PATCH 07/10] formatting changes revert --- .../connector/yugabytedb/YugabyteDBChangeRecordEmitter.java | 6 ++++-- .../connector/yugabytedb/YugabyteDBConnectorTask.java | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java index 2e451790..2e2d2050 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java @@ -196,7 +196,8 @@ private Object[] columnValues(List columns, TableId t undeliveredToastableColumns.remove(columnName); int position = getPosition(columnName, table, values); if (position != -1) { - Object value = column.getValue(() -> (BaseConnection) connection.connection(), connectorConfig.includeUnknownDatatypes()); + Object value = column.getValue(() -> (BaseConnection) connection.connection(), + connectorConfig.includeUnknownDatatypes()); // values[position] = value; values[position] = new Object[]{ value, Boolean.TRUE }; } @@ -239,7 +240,8 @@ private Object[] updatedColumnValues(List columns, Ta int position = getPosition(columnName, table, values); if (position != -1) { - Object value = column.getValue(() -> (BaseConnection) connection.connection(), connectorConfig.includeUnknownDatatypes()); + Object value = column.getValue(() -> (BaseConnection) connection.connection(), + connectorConfig.includeUnknownDatatypes()); values[position] = new Object[]{ value, Boolean.TRUE }; } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java index 1d7f16e1..f21a8fb4 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java @@ -111,7 +111,8 @@ public ChangeEventSourceCoordinator start( // This type registry is being build with the nameToType and oidToType map populated. final YugabyteDBTypeRegistry yugabyteDBTypeRegistry = - new YugabyteDBTypeRegistry(taskConnection, nameToType, oidToType, null /* yugabyteDBConnection */); + new YugabyteDBTypeRegistry(taskConnection, nameToType, + oidToType, null /* yugabyteDBConnection */); schema = new YugabyteDBSchema(connectorConfig, yugabyteDBTypeRegistry, topicSelector, valueConverterBuilder.build(yugabyteDBTypeRegistry)); From e38ce225e4de74f121478563e4d7d4bdea4f69ef Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 13 Jun 2024 13:57:41 +0530 Subject: [PATCH 08/10] formatting changes revert --- .../connector/yugabytedb/YugabyteDBConnectorTask.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java index f21a8fb4..d4c82017 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java @@ -110,8 +110,7 @@ public ChangeEventSourceCoordinator start( // This type registry is being build with the nameToType and oidToType map populated. - final YugabyteDBTypeRegistry yugabyteDBTypeRegistry = - new YugabyteDBTypeRegistry(taskConnection, nameToType, + final YugabyteDBTypeRegistry yugabyteDBTypeRegistry = new YugabyteDBTypeRegistry(taskConnection, nameToType, oidToType, null /* yugabyteDBConnection */); schema = new YugabyteDBSchema(connectorConfig, yugabyteDBTypeRegistry, topicSelector, @@ -139,6 +138,9 @@ public ChangeEventSourceCoordinator start( LoggingContext.PreviousContext previousContext = taskContext .configureLoggingContext(CONTEXT_NAME + "|" + taskId); try { + // Print out the server information + // CDCSDK Get the table, + queue = new ChangeEventQueue.Builder() .pollInterval(connectorConfig.getPollInterval()) .maxBatchSize(connectorConfig.getMaxBatchSize()) From 37b219a6dd833bef9140fcd86250b887d43496eb Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 13 Jun 2024 14:31:33 +0530 Subject: [PATCH 09/10] formatting changes --- .../java/io/debezium/connector/yugabytedb/TestHelper.java | 4 ++-- .../connector/yugabytedb/YugabyteDBCompleteTypesTest.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java index 4546708a..0938a7cc 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java +++ b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java @@ -758,8 +758,8 @@ private static YugabyteDBValueConverterBuilder getPostgresValueConverterBuilder( public static Stream streamTypeProviderForStreaming() { return Stream.of( - Arguments.of(false, false)); // Older stream -// Arguments.of(true, false)); // NO_EXPORT stream + Arguments.of(false, false), // Older stream + Arguments.of(true, false)); // NO_EXPORT stream } public static Stream streamTypeProviderForSnapshot() { diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java index 90f3e849..b5d52793 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java @@ -272,6 +272,7 @@ public void shouldNotOpenConnectionWithEnumTypes( assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); } + @Disabled @ParameterizedTest @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") public void shouldNotOpenConnectionAfterRestartOnceArrayColumnIsDropped( From 6b640b28e3df8b47b97ee88d946eac9ae375263e Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 13 Jun 2024 16:10:14 +0530 Subject: [PATCH 10/10] fixed failing test --- .../YugabyteDBCompleteTypesTest.java | 39 +++++++------------ 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java index b5d52793..3a223ba4 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java @@ -138,12 +138,11 @@ public void verifyAllWorkingDataTypesInSingleTable(boolean consistentSnapshot, b assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); } - @ParameterizedTest - @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") - public void shouldCreateConnectionWhenArrayPresent(boolean consistentSnapshot, boolean useSnapshot) throws Exception { + @Test + public void shouldCreateConnectionWhenArrayPresent() throws Exception { TestHelper.execute("CREATE TABLE test_arr (id INT PRIMARY KEY, textarr text[], intarr int[]);"); - String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr", consistentSnapshot, useSnapshot); + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_arr", dbStreamId); startEngine(configBuilder); @@ -174,16 +173,14 @@ public void shouldCreateConnectionWhenArrayPresent(boolean consistentSnapshot, b assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); } - @ParameterizedTest - @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") - public void shouldCreateSingleConnectionAcrossSnapshotAndStreaming( - boolean consistentSnapshot, boolean useSnapshot) throws Exception { + @Test + public void shouldCreateSingleConnectionAcrossSnapshotAndStreaming() throws Exception { LogInterceptor logInterceptor = new LogInterceptor(YugabyteDBStreamingChangeEventSource.class); TestHelper.execute("CREATE TABLE test_arr (id INT PRIMARY KEY, textarr text[], intarr int[]);"); TestHelper.execute("INSERT INTO test_arr VALUES (1, '{\"element1\",\"element2\"}', '{0,1,2}');"); - String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr", consistentSnapshot, useSnapshot); + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_arr", dbStreamId); configBuilder.with(YugabyteDBConnectorConfig.SNAPSHOT_MODE, "initial"); startEngine(configBuilder); @@ -208,13 +205,11 @@ public void shouldCreateSingleConnectionAcrossSnapshotAndStreaming( assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); } - @ParameterizedTest - @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") - public void shouldOpenConnectionOnceArrayTypeColumnIsAdded( - boolean consistentSnapshot, boolean useSnapshot) throws Exception { + @Test + public void shouldOpenConnectionOnceArrayTypeColumnIsAdded() throws Exception { TestHelper.execute("CREATE TABLE test_table (id INT PRIMARY KEY);"); - String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_table", consistentSnapshot, useSnapshot); + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_table"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_table", dbStreamId); startEngine(configBuilder); @@ -248,13 +243,11 @@ public void shouldOpenConnectionOnceArrayTypeColumnIsAdded( assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); } - @ParameterizedTest - @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") - public void shouldNotOpenConnectionWithEnumTypes( - boolean consistentSnapshot, boolean useSnapshot) throws Exception { + @Test + public void shouldNotOpenConnectionWithEnumTypes() throws Exception { TestHelper.executeDDL("yugabyte_create_tables.ddl"); - String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_enum", consistentSnapshot, useSnapshot); + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_enum"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_enum", dbStreamId); startEngine(configBuilder); @@ -273,15 +266,13 @@ public void shouldNotOpenConnectionWithEnumTypes( } @Disabled - @ParameterizedTest - @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") - public void shouldNotOpenConnectionAfterRestartOnceArrayColumnIsDropped( - boolean consistentSnapshot, boolean useSnapshot) throws Exception { + @Test + public void shouldNotOpenConnectionAfterRestartOnceArrayColumnIsDropped() throws Exception { LogInterceptor logInterceptor = new LogInterceptor(YugabyteDBStreamingChangeEventSource.class); TestHelper.execute("CREATE TABLE test_arr (id INT PRIMARY KEY, textarr text[]);"); - String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr", consistentSnapshot, useSnapshot); + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_arr", dbStreamId); configBuilder.with(YugabyteDBConnectorConfig.SNAPSHOT_MODE, "initial"); startEngine(configBuilder);