Skip to content

Commit 2fbac7e

Browse files
committed
Resolve conflicts
Signed-off-by: yuxiqian <[email protected]>
1 parent 3052e56 commit 2fbac7e

File tree

6 files changed

+55
-68
lines changed

6 files changed

+55
-68
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
7272
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
7373
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
74-
import static org.junit.Assert.fail;
7574

7675
/** IT tests for {@link DorisMetadataApplier}. */
7776
class DorisMetadataApplierITCase extends DorisSinkTestBase {
@@ -439,8 +438,9 @@ void testDorisNarrowingAlterColumnType(boolean batchMode) {
439438
.isExactlyInstanceOf(JobExecutionException.class);
440439
}
441440

442-
@Test
443-
public void testDorisTruncateTable() throws Exception {
441+
@ParameterizedTest(name = "batchMode: {0}")
442+
@ValueSource(booleans = {true, false})
443+
void testDorisTruncateTable(boolean batchMode) throws Exception {
444444
TableId tableId =
445445
TableId.tableId(
446446
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
@@ -458,7 +458,7 @@ public void testDorisTruncateTable() throws Exception {
458458
new CreateTableEvent(tableId, schema),
459459
DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")),
460460
DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")));
461-
runJobWithEvents(preparationTestingEvents);
461+
runJobWithEvents(preparationTestingEvents, batchMode);
462462
waitAndVerify(
463463
tableId,
464464
3,
@@ -471,16 +471,17 @@ public void testDorisTruncateTable() throws Exception {
471471
new TruncateTableEvent(tableId),
472472
DataChangeEvent.insertEvent(tableId, generate(schema, 3, 4.5, "Cecily")),
473473
DataChangeEvent.insertEvent(tableId, generate(schema, 4, 5.6, "Derrida")));
474-
runJobWithEvents(truncateTestingEvents);
474+
runJobWithEvents(truncateTestingEvents, batchMode);
475475
waitAndVerify(
476476
tableId,
477477
3,
478478
Arrays.asList("3 | 4.5 | Cecily", "4 | 5.6 | Derrida"),
479479
DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L);
480480
}
481481

482-
@Test
483-
public void testDorisDropTable() throws Exception {
482+
@ParameterizedTest(name = "batchMode: {0}")
483+
@ValueSource(booleans = {true, false})
484+
void testDorisDropTable(boolean batchMode) throws Exception {
484485
TableId tableId =
485486
TableId.tableId(
486487
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
@@ -498,7 +499,7 @@ public void testDorisDropTable() throws Exception {
498499
new CreateTableEvent(tableId, schema),
499500
DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")),
500501
DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")));
501-
runJobWithEvents(preparationTestingEvents);
502+
runJobWithEvents(preparationTestingEvents, batchMode);
502503

503504
waitAndVerify(
504505
tableId,
@@ -507,17 +508,15 @@ public void testDorisDropTable() throws Exception {
507508
DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L);
508509

509510
runJobWithEvents(
510-
Arrays.asList(new CreateTableEvent(tableId, schema), new DropTableEvent(tableId)));
511-
512-
SQLSyntaxErrorException thrown =
513-
Assertions.assertThrows(
514-
SQLSyntaxErrorException.class, () -> fetchTableContent(tableId, 3));
515-
Assertions.assertTrue(
516-
thrown.getMessage()
517-
.contains(
518-
String.format(
519-
"errCode = 2, detailMessage = Unknown table '%s'",
520-
tableId.getTableName())));
511+
Arrays.asList(new CreateTableEvent(tableId, schema), new DropTableEvent(tableId)),
512+
batchMode);
513+
514+
Assertions.assertThatThrownBy(() -> fetchTableContent(tableId, 3))
515+
.isExactlyInstanceOf(SQLSyntaxErrorException.class)
516+
.hasMessageContaining(
517+
String.format(
518+
"errCode = 2, detailMessage = Unknown table '%s'",
519+
tableId.getTableName()));
521520
}
522521

523522
private void runJobWithEvents(List<Event> events, boolean batchMode) throws Exception {
@@ -601,6 +600,6 @@ private void waitAndVerify(
601600
actual);
602601
Thread.sleep(1000L);
603602
}
604-
fail(String.format("Failed to verify content of %s.", tableId));
603+
Assertions.fail("Failed to verify content of {}.", tableId);
605604
}
606605
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,8 @@ void testApplySchemaChange(String metastore)
140140
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
141141
.isEqualTo(tableSchema);
142142

143-
Assertions.assertEquals(
144-
"col3DefValue",
145-
catalog.getTable(Identifier.fromString("test.table1"))
146-
.options()
147-
.get("fields.col3.default-value"));
143+
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).options())
144+
.containsEntry("fields.col3.default-value", "col3DefValue");
148145

149146
Map<String, String> nameMapping = new HashMap<>();
150147
nameMapping.put("col2", "newcol2");

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@
4949
import org.apache.flink.streaming.api.datastream.DataStream;
5050
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5151

52+
import com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException;
5253
import org.assertj.core.api.Assertions;
5354
import org.junit.jupiter.api.AfterEach;
5455
import org.junit.jupiter.api.BeforeAll;
5556
import org.junit.jupiter.api.BeforeEach;
5657
import org.junit.jupiter.api.Disabled;
5758
import org.junit.jupiter.api.Test;
58-
import com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException;
5959

6060
import java.util.ArrayList;
6161
import java.util.Arrays;
@@ -414,13 +414,12 @@ public void testStarRocksDropTable() throws Exception {
414414
new DropTableEvent(tableId));
415415
runJobWithEvents(dropTableTestingEvents);
416416

417-
Assertions.assertThatThrownBy(
418-
() -> fetchTableContent(tableId, 3)
419-
).isExactlyInstanceOf(MySQLSyntaxErrorException.class)
420-
.hasMessage(
421-
String.format(
422-
"Getting analyzing error. Detail message: Unknown table '%s.%s'.",
423-
tableId.getSchemaName(), tableId.getTableName()));
417+
Assertions.assertThatThrownBy(() -> fetchTableContent(tableId, 3))
418+
.isExactlyInstanceOf(MySQLSyntaxErrorException.class)
419+
.hasMessageContaining(
420+
String.format(
421+
"Getting analyzing error. Detail message: Unknown table '%s.%s'.",
422+
tableId.getSchemaName(), tableId.getTableName()));
424423
}
425424

426425
private void runJobWithEvents(List<Event> events) throws Exception {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import static org.apache.flink.table.api.DataTypes.STRING;
7979
import static org.apache.flink.table.catalog.Column.physical;
8080
import static org.apache.flink.util.Preconditions.checkState;
81+
import static org.assertj.core.api.Assertions.assertThat;
8182

8283
/** IT tests for {@link PostgresSourceBuilder.PostgresIncrementalSource}. */
8384
@Timeout(value = 300, unit = TimeUnit.SECONDS)
@@ -687,16 +688,17 @@ void testTableWithChunkColumnOfNoPrimaryKey(String scanStartupMode) {
687688
}
688689
}
689690

690-
@Test
691-
public void testHeartBeat() throws Exception {
691+
@ParameterizedTest
692+
@ValueSource(strings = {"initial", "latest-offset"})
693+
void testHeartBeat(String scanStartupMode) throws Exception {
692694
try (PostgresConnection connection = getConnection()) {
693695
connection.execute("CREATE TABLE IF NOT EXISTS heart_beat_table(a int)");
694696
connection.commit();
695697
}
696698

697699
TableId tableId = new TableId(null, "public", "heart_beat_table");
698700
try (PostgresConnection connection = getConnection()) {
699-
assertEquals(0, getCountOfTable(connection, tableId));
701+
Assertions.assertThat(getCountOfTable(connection, tableId)).isZero();
700702
}
701703

702704
Map<String, String> options = new HashMap<>();
@@ -711,7 +713,7 @@ public void testHeartBeat() throws Exception {
711713
RestartStrategies.noRestart(),
712714
options);
713715
try (PostgresConnection connection = getConnection()) {
714-
assertTrue(getCountOfTable(connection, tableId) > 0);
716+
assertThat(getCountOfTable(connection, tableId)).isGreaterThan(0);
715717
}
716718
}
717719

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.junit.jupiter.api.Test;
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
34-
import org.testcontainers.containers.Container;
3534
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
3635
import org.testcontainers.lifecycle.Startables;
3736

@@ -47,12 +46,9 @@
4746
import java.util.Arrays;
4847
import java.util.Collections;
4948
import java.util.List;
49+
import java.util.stream.Collectors;
5050
import java.util.stream.Stream;
5151

52-
import static org.junit.Assert.assertThrows;
53-
import static org.junit.Assert.assertTrue;
54-
import static org.junit.Assert.fail;
55-
5652
/** End-to-end tests for mysql cdc to Doris pipeline job. */
5753
class MySqlToDorisE2eITCase extends PipelineTestEnvironment {
5854
private static final Logger LOG = LoggerFactory.getLogger(MySqlToDorisE2eITCase.class);
@@ -755,9 +751,7 @@ public void testSchemaEvolution() throws Exception {
755751

756752
stat.execute("DROP TABLE products;");
757753
Thread.sleep(5000L);
758-
SQLException thrown =
759-
assertThrows(
760-
SQLSyntaxErrorException.class,
754+
Assertions.assertThatThrownBy(
761755
() -> {
762756
try (Connection connection =
763757
DriverManager.getConnection(
@@ -767,10 +761,9 @@ public void testSchemaEvolution() throws Exception {
767761
Statement statement = connection.createStatement()) {
768762
statement.executeQuery("SELECT * FROM products;");
769763
}
770-
});
771-
assertTrue(
772-
thrown.getMessage()
773-
.contains("errCode = 2, detailMessage = Unknown table 'products'"));
764+
})
765+
.isExactlyInstanceOf(SQLSyntaxErrorException.class)
766+
.hasMessageContaining("errCode = 2, detailMessage = Unknown table 'products'");
774767
} catch (SQLException e) {
775768
throw new RuntimeException("Failed to trigger schema change.", e);
776769
}
@@ -872,7 +865,7 @@ private void waitAndVerify(
872865
}
873866
Thread.sleep(1000L);
874867
}
875-
fail(String.format("Failed to verify content of %s::%s.", databaseName, sql));
868+
Assertions.fail("Failed to verify content of {}::{}.", databaseName, sql);
876869
}
877870

878871
private List<String> fetchTableContent(String databaseName, String sql, int columnCount)

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,15 @@
3636
import org.apache.kafka.clients.consumer.ConsumerRecords;
3737
import org.apache.kafka.clients.consumer.KafkaConsumer;
3838
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
39-
import org.junit.After;
40-
import org.junit.Before;
41-
import org.junit.BeforeClass;
42-
import org.junit.ClassRule;
43-
import org.junit.Test;
44-
import org.junit.runner.RunWith;
45-
import org.junit.runners.Parameterized;
39+
import org.junit.jupiter.api.AfterEach;
40+
import org.junit.jupiter.api.BeforeAll;
41+
import org.junit.jupiter.api.BeforeEach;
42+
import org.junit.jupiter.api.Test;
4643
import org.slf4j.Logger;
4744
import org.slf4j.LoggerFactory;
4845
import org.testcontainers.containers.KafkaContainer;
4946
import org.testcontainers.containers.output.Slf4jLogConsumer;
47+
import org.testcontainers.junit.jupiter.Container;
5048
import org.testcontainers.lifecycle.Startables;
5149
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
5250

@@ -76,8 +74,7 @@
7674
import static org.assertj.core.api.Assertions.assertThat;
7775

7876
/** End-to-end tests for mysql cdc to Kafka pipeline job. */
79-
@RunWith(Parameterized.class)
80-
public class MysqlToKafkaE2eITCase extends PipelineTestEnvironment {
77+
class MysqlToKafkaE2eITCase extends PipelineTestEnvironment {
8178
private static final Logger LOG = LoggerFactory.getLogger(MysqlToKafkaE2eITCase.class);
8279

8380
// ------------------------------------------------------------------------------------------
@@ -97,8 +94,8 @@ public class MysqlToKafkaE2eITCase extends PipelineTestEnvironment {
9794
private String topic;
9895
private KafkaConsumer<byte[], byte[]> consumer;
9996

100-
@ClassRule
101-
public static final MySqlContainer MYSQL =
97+
@Container
98+
static final MySqlContainer MYSQL =
10299
(MySqlContainer)
103100
new MySqlContainer(
104101
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
@@ -111,8 +108,8 @@ public class MysqlToKafkaE2eITCase extends PipelineTestEnvironment {
111108
.withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
112109
.withLogConsumer(new Slf4jLogConsumer(LOG));
113110

114-
@ClassRule
115-
public static final KafkaContainer KAFKA_CONTAINER =
111+
@Container
112+
static final KafkaContainer KAFKA_CONTAINER =
116113
KafkaUtil.createKafkaContainer(KAFKA, LOG)
117114
.withNetworkAliases("kafka")
118115
.withEmbeddedZookeeper()
@@ -122,8 +119,8 @@ public class MysqlToKafkaE2eITCase extends PipelineTestEnvironment {
122119
protected final UniqueDatabase mysqlInventoryDatabase =
123120
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
124121

125-
@BeforeClass
126-
public static void initializeContainers() {
122+
@BeforeAll
123+
static void initializeContainers() {
127124
LOG.info("Starting containers...");
128125
Startables.deepStart(Stream.of(MYSQL)).join();
129126
Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
@@ -135,7 +132,7 @@ public static void initializeContainers() {
135132
LOG.info("Containers are started.");
136133
}
137134

138-
@Before
135+
@BeforeEach
139136
public void before() throws Exception {
140137
super.before();
141138
createTestTopic(1, TOPIC_REPLICATION_FACTOR);
@@ -145,7 +142,7 @@ public void before() throws Exception {
145142
mysqlInventoryDatabase.createAndInitialize();
146143
}
147144

148-
@After
145+
@AfterEach
149146
public void after() {
150147
super.after();
151148
admin.deleteTopics(Collections.singletonList(topic));
@@ -154,7 +151,7 @@ public void after() {
154151
}
155152

156153
@Test
157-
public void testSyncWholeDatabaseWithDebeziumJson() throws Exception {
154+
void testSyncWholeDatabaseWithDebeziumJson() throws Exception {
158155
String pipelineJob =
159156
String.format(
160157
"source:\n"

0 commit comments

Comments
 (0)