Skip to content

Commit

Permalink
added tests with shouldNotOpenConnectionAfterRestartOnceArrayColumnIs…
Browse files Browse the repository at this point in the history
…Dropped still failing
  • Loading branch information
vaibhav-yb committed Jun 12, 2024
1 parent eafead7 commit 4e492ca
Showing 1 changed file with 140 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;
import io.debezium.connector.yugabytedb.common.YugabytedTestBase;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -207,6 +208,145 @@ public void shouldCreateSingleConnectionAcrossSnapshotAndStreaming(
assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL));
}

@ParameterizedTest
@MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming")
public void shouldOpenConnectionOnceArrayTypeColumnIsAdded(
boolean consistentSnapshot, boolean useSnapshot) throws Exception {
TestHelper.execute("CREATE TABLE test_table (id INT PRIMARY KEY);");

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_table", consistentSnapshot, useSnapshot);
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_table", dbStreamId);
startEngine(configBuilder);

awaitUntilConnectorIsReady();

// Insert a record to the table.
TestHelper.execute("INSERT INTO test_table VALUES (1);");

List<SourceRecord> records = new ArrayList<>();
waitAndFailIfCannotConsume(records, 1 /* records to consume */);

assertEquals(1, records.size());

// We do not have any array type present yet, so there should not be any connection.
assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL));

// Alter the table and add an array type column.
TestHelper.execute("ALTER TABLE test_table ADD COLUMN textarr text[];");
TestHelper.execute("INSERT INTO test_table VALUES (2, '{\"element1\",\"element2\"}');");

// Consume another record.
waitAndFailIfCannotConsume(records, 1 /* records to consume */);

SourceRecord recordWithArrayType = records.get(1);
Struct recordValue = (Struct) recordWithArrayType.value();
List<Object> textArrayList = recordValue.getStruct("after").getStruct("textarr").getArray("value");
assertEquals(textArrayList.get(0).toString(), "element1");
assertEquals(textArrayList.get(1).toString(), "element2");

// Now we should see a database connection being opened since we have to decode an array type.
assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL));
}

@ParameterizedTest
@MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming")
public void shouldNotOpenConnectionWithEnumTypes(
boolean consistentSnapshot, boolean useSnapshot) throws Exception {
TestHelper.executeDDL("yugabyte_create_tables.ddl");

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_enum", consistentSnapshot, useSnapshot);
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_enum", dbStreamId);
startEngine(configBuilder);

awaitUntilConnectorIsReady();

// Insert a record.
TestHelper.execute("INSERT INTO test_enum VALUES (1, 'ONE');");

List<SourceRecord> records = new ArrayList<>();
waitAndFailIfCannotConsume(records, 1 /* records to consume */);

assertValueField(records.get(0), "after/enum_col/value", "ONE");

// There should not be any connection open to server.
assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL));
}

@ParameterizedTest
@MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForStreaming")
public void shouldNotOpenConnectionAfterRestartOnceArrayColumnIsDropped(
boolean consistentSnapshot, boolean useSnapshot) throws Exception {
LogInterceptor logInterceptor = new LogInterceptor(YugabyteDBStreamingChangeEventSource.class);

TestHelper.execute("CREATE TABLE test_arr (id INT PRIMARY KEY, textarr text[]);");

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_arr", consistentSnapshot, useSnapshot);
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_arr", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.SNAPSHOT_MODE, "initial");
startEngine(configBuilder);

awaitUntilConnectorIsReady();

// Insert a record.
TestHelper.execute("INSERT INTO test_arr VALUES (1, '{\"element1\",\"element2\"}');");

List<SourceRecord> records = new ArrayList<>();
waitAndFailIfCannotConsume(records, 1 /* records to consume */);

assertEquals(1, records.size());

// No need to verify record values since they are being verified in another test, simply
// verify that we have only a single connection open.
assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL));

// Drop the array column on the table and insert another record.
TestHelper.execute("ALTER TABLE test_arr DROP COLUMN textarr;");
TestHelper.execute("INSERT INTO test_arr VALUES (2);");

waitAndFailIfCannotConsume(records, 1 /* 1 new record to consume */);

SourceRecord recordWithOutArrayTypeColumn = records.get(1);
Struct recordValue = (Struct) recordWithOutArrayTypeColumn.value();
assertEquals(2, recordValue.getStruct("after").getStruct("id").getInt32("value"));
for (Field field : recordValue.schema().fields()) {
assertNotEquals("textarr", field.name());
}

// The connection will still remain open as we do not close the existing connection once
// the array column is removed from the schema.
assertEquals(1, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL));

// Restart the connector after a wait.
stopConnector();
TestHelper.waitFor(Duration.ofSeconds(10));
startEngine(configBuilder);

awaitUntilConnectorIsReady();

// Wait till we reach the streaming phase.
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.until(() -> logInterceptor.countOccurrences("Beginning to poll the changes from the server") == 2);

// Insert a new record once the connector is up and since we do not have any array columns
// now, there should not be any connection open.
TestHelper.execute("INSERT INTO test_arr VALUES (3);");
waitAndFailIfCannotConsume(records, 1 /* 1 new record to consume */);

SourceRecord recordAfterRestart = records.get(2);
Struct recordValueAfterRestart = (Struct) recordAfterRestart.value();
assertEquals(3, recordValueAfterRestart.getStruct("after").getStruct("id").getInt32("value"));
for (Field field : recordValueAfterRestart.schema().fields()) {
assertNotEquals("textarr", field.name());
}

LOGGER.info("Check if a connection is still open till a timeout");
TestHelper.waitFor(Duration.ofMinutes(2));

assertEquals(0, TestHelper.getConnectionCount(YugabyteDBConnection.CONNECTION_GENERAL));
}

@ParameterizedTest
@ValueSource(strings = {"adaptive", "adaptive_time_microseconds", "connect"})
public void shouldEmitTimestampValuesWithCorrectPrecision(String temporalPrecisionMode) throws Exception {
Expand Down

0 comments on commit 4e492ca

Please sign in to comment.