diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java index aa8b1ca2..90f3e849 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompleteTypesTest.java @@ -16,6 +16,7 @@ import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.awaitility.Awaitility; @@ -207,6 +208,145 @@ public void shouldCreateSingleConnectionAcrossSnapshotAndStreaming( assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); } + @ParameterizedTest + @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") + public void shouldOpenConnectionOnceArrayTypeColumnIsAdded( + boolean consistentSnapshot, boolean useSnapshot) throws Exception { + TestHelper.execute("CREATE TABLE test_table (id INT PRIMARY KEY);"); + + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_table", consistentSnapshot, useSnapshot); + Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_table", dbStreamId); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + // Insert a record to the table. + TestHelper.execute("INSERT INTO test_table VALUES (1);"); + + List records = new ArrayList<>(); + waitAndFailIfCannotConsume(records, 1 /* records to consume */); + + assertEquals(1, records.size()); + + // We do not have any array type present yet, so there should not be any connection. + assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + + // Alter the table and add an array type column. + TestHelper.execute("ALTER TABLE test_table ADD COLUMN textarr text[];"); + TestHelper.execute("INSERT INTO test_table VALUES (2, '{\"element1\",\"element2\"}');"); + + // Consume another record. + waitAndFailIfCannotConsume(records, 1 /* records to consume */); + + SourceRecord recordWithArrayType = records.get(1); + Struct recordValue = (Struct) recordWithArrayType.value(); + List textArrayList = recordValue.getStruct("after").getStruct("textarr").getArray("value"); + assertEquals(textArrayList.get(0).toString(), "element1"); + assertEquals(textArrayList.get(1).toString(), "element2"); + + // Now we should see a database connection being opened since we have to decode an array type. + assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + } + + @ParameterizedTest + @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") + public void shouldNotOpenConnectionWithEnumTypes( + boolean consistentSnapshot, boolean useSnapshot) throws Exception { + TestHelper.executeDDL("yugabyte_create_tables.ddl"); + + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_enum", consistentSnapshot, useSnapshot); + Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_enum", dbStreamId); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + // Insert a record. + TestHelper.execute("INSERT INTO test_enum VALUES (1, 'ONE');"); + + List records = new ArrayList<>(); + waitAndFailIfCannotConsume(records, 1 /* records to consume */); + + assertValueField(records.get(0), "after/enum_col/value", "ONE"); + + // There should not be any connection open to server. + assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + } + + @ParameterizedTest + @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming") + public void shouldNotOpenConnectionAfterRestartOnceArrayColumnIsDropped( + boolean consistentSnapshot, boolean useSnapshot) throws Exception { + LogInterceptor logInterceptor = new LogInterceptor(YugabyteDBStreamingChangeEventSource.class); + + TestHelper.execute("CREATE TABLE test_arr (id INT PRIMARY KEY, textarr text[]);"); + + String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr", consistentSnapshot, useSnapshot); + Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_arr", dbStreamId); + configBuilder.with(YugabyteDBConnectorConfig.SNAPSHOT_MODE, "initial"); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + // Insert a record. + TestHelper.execute("INSERT INTO test_arr VALUES (1, '{\"element1\",\"element2\"}');"); + + List records = new ArrayList<>(); + waitAndFailIfCannotConsume(records, 1 /* records to consume */); + + assertEquals(1, records.size()); + + // No need to verify record values since they are being verified in another test, simply + // verify that we have only a single connection open. + assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + + // Drop the array column on the table and insert another record. + TestHelper.execute("ALTER TABLE test_arr DROP COLUMN textarr;"); + TestHelper.execute("INSERT INTO test_arr VALUES (2);"); + + waitAndFailIfCannotConsume(records, 1 /* 1 new record to consume */); + + SourceRecord recordWithOutArrayTypeColumn = records.get(1); + Struct recordValue = (Struct) recordWithOutArrayTypeColumn.value(); + assertEquals(2, recordValue.getStruct("after").getStruct("id").getInt32("value")); + for (Field field : recordValue.schema().fields()) { + assertNotEquals("textarr", field.name()); + } + + // The connection will still remain open as we do not close the existing connection once + // the array column is removed from the schema. + assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + + // Restart the connector after a wait. + stopConnector(); + TestHelper.waitFor(Duration.ofSeconds(10)); + startEngine(configBuilder); + + awaitUntilConnectorIsReady(); + + // Wait till we reach the streaming phase. + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> logInterceptor.countOccurrences("Beginning to poll the changes from the server") == 2); + + // Insert a new record once the connector is up and since we do not have any array columns + // now, there should not be any connection open. + TestHelper.execute("INSERT INTO test_arr VALUES (3);"); + waitAndFailIfCannotConsume(records, 1 /* 1 new record to consume */); + + SourceRecord recordAfterRestart = records.get(2); + Struct recordValueAfterRestart = (Struct) recordAfterRestart.value(); + assertEquals(3, recordValueAfterRestart.getStruct("after").getStruct("id").getInt32("value")); + for (Field field : recordValueAfterRestart.schema().fields()) { + assertNotEquals("textarr", field.name()); + } + + LOGGER.info("Check if a connection is still open till a timeout"); + TestHelper.waitFor(Duration.ofMinutes(2)); + + assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL)); + } + @ParameterizedTest @ValueSource(strings = {"adaptive", "adaptive_time_microseconds", "connect"}) public void shouldEmitTimestampValuesWithCorrectPrecision(String temporalPrecisionMode) throws Exception {