Skip to content

Commit fb92918

Browse files
committed
Resolve conflicts
1 parent 6e22be2 commit fb92918

File tree

4 files changed

+89
-80
lines changed

4 files changed

+89
-80
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderAssignEndingFirstTest.java

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,21 @@
2828
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
2929

3030
import io.debezium.relational.TableId;
31+
import org.assertj.core.api.Assertions;
3132
import org.bson.BsonDocument;
32-
import org.junit.Before;
33-
import org.junit.Test;
34-
import org.junit.runner.RunWith;
35-
import org.junit.runners.Parameterized;
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
3837

3938
import java.util.LinkedList;
40-
import java.util.stream.Stream;
4139

4240
import static org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.BSON_MAX_KEY;
4341
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
4442
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
45-
import static org.junit.Assert.assertTrue;
4643

4744
/** MongoDB snapshot split reader test case. */
48-
@RunWith(Parameterized.class)
49-
public class MongoDBSnapshotSplitReaderAssignEndingFirstTest extends MongoDBSourceTestBase {
45+
class MongoDBSnapshotSplitReaderAssignEndingFirstTest extends MongoDBSourceTestBase {
5046

5147
private static final Logger LOG =
5248
LoggerFactory.getLogger(MongoDBSnapshotSplitReaderAssignEndingFirstTest.class);
@@ -57,22 +53,13 @@ public class MongoDBSnapshotSplitReaderAssignEndingFirstTest extends MongoDBSour
5753

5854
private SplitContext splitContext;
5955

60-
public MongoDBSnapshotSplitReaderAssignEndingFirstTest(String mongoVersion) {
61-
super(mongoVersion);
62-
}
63-
64-
@Parameterized.Parameters(name = "mongoVersion: {0}")
65-
public static Object[] parameters() {
66-
return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray();
67-
}
68-
69-
@Before
70-
public void before() {
71-
database = mongoContainer.executeCommandFileInSeparateDatabase("chunk_test");
56+
@BeforeEach
57+
void before() {
58+
database = MONGO_CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
7259

7360
MongoDBSourceConfigFactory configFactory =
7461
new MongoDBSourceConfigFactory()
75-
.hosts(mongoContainer.getHostAndPort())
62+
.hosts(MONGO_CONTAINER.getHostAndPort())
7663
.databaseList(database)
7764
.collectionList(database + ".shopping_cart")
7865
.username(FLINK_USER)
@@ -88,27 +75,29 @@ public void before() {
8875
}
8976

9077
@Test
91-
public void testMongoDBSnapshotSplitReaderWithSplitVectorSplitter() throws Exception {
78+
void testMongoDBSnapshotSplitReaderWithSplitVectorSplitter() throws Exception {
9279
testMongoDBSnapshotSplitReader(SplitVectorSplitStrategy.INSTANCE);
9380
}
9481

9582
@Test
96-
public void testMongoDBSnapshotSplitReaderWithSamplerSplitter() throws Exception {
83+
void testMongoDBSnapshotSplitReaderWithSamplerSplitter() throws Exception {
9784
testMongoDBSnapshotSplitReader(SampleBucketSplitStrategy.INSTANCE);
9885
}
9986

10087
@Test
101-
public void testMongoDBSnapshotSplitReaderWithSingleSplitter() throws Exception {
88+
void testMongoDBSnapshotSplitReaderWithSingleSplitter() throws Exception {
10289
testMongoDBSnapshotSplitReader(SingleSplitStrategy.INSTANCE);
10390
}
10491

105-
private void testMongoDBSnapshotSplitReader(SplitStrategy splitter) throws Exception {
92+
private void testMongoDBSnapshotSplitReader(SplitStrategy splitter) {
10693
LinkedList<SnapshotSplit> snapshotSplits = new LinkedList<>(splitter.split(splitContext));
107-
assertTrue(snapshotSplits.size() > 0);
108-
assertTrue(snapshotSplits.get(0).getSplitEnd()[1] instanceof BsonDocument);
109-
assertTrue(
110-
((BsonDocument) snapshotSplits.get(0).getSplitEnd()[1])
111-
.get("_id")
112-
.equals(BSON_MAX_KEY));
94+
Assertions.assertThat(snapshotSplits)
95+
.isNotEmpty()
96+
.first()
97+
.extracting(SnapshotSplit::getSplitEnd)
98+
.extracting(o -> o[1])
99+
.isInstanceOf(BsonDocument.class)
100+
.extracting(o -> ((BsonDocument) o).get("_id"))
101+
.isEqualTo(BSON_MAX_KEY);
113102
}
114103
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java

Lines changed: 64 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -175,21 +175,22 @@ public static Stream<Arguments> parameters() {
175175

176176
@ParameterizedTest
177177
@MethodSource("parameters")
178-
void testReadSingleTableWithSingleParallelism(String tableName, String chunkColumnName)
179-
throws Exception {
178+
void testReadSingleTableWithSingleParallelism(
179+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
180180
testMySqlParallelSource(
181181
1,
182182
FailoverType.NONE,
183183
FailoverPhase.NEVER,
184184
new String[] {tableName},
185185
tableName,
186-
chunkColumnName);
186+
chunkColumnName,
187+
assignEndingFirst);
187188
}
188189

189190
@ParameterizedTest
190191
@MethodSource("parameters")
191192
void testReadSingleTableWithSingleParallelismAndSkipBackFill(
192-
String tableName, String chunkColumnName) throws Exception {
193+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
193194
testMySqlParallelSource(
194195
1,
195196
DEFAULT_SCAN_STARTUP_MODE,
@@ -199,77 +200,83 @@ void testReadSingleTableWithSingleParallelismAndSkipBackFill(
199200
RestartStrategies.fixedDelayRestart(1, 0),
200201
true,
201202
tableName,
202-
chunkColumnName);
203+
chunkColumnName,
204+
assignEndingFirst);
203205
}
204206

205207
@ParameterizedTest
206208
@MethodSource("parameters")
207-
void testReadSingleTableWithMultipleParallelism(String tableName, String chunkColumnName)
208-
throws Exception {
209+
void testReadSingleTableWithMultipleParallelism(
210+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
209211
testMySqlParallelSource(
210212
4,
211213
FailoverType.NONE,
212214
FailoverPhase.NEVER,
213215
new String[] {tableName},
214216
tableName,
215-
chunkColumnName);
217+
chunkColumnName,
218+
assignEndingFirst);
216219
}
217220

218221
@ParameterizedTest
219222
@MethodSource("parameters")
220-
void testReadMultipleTableWithSingleParallelism(String tableName, String chunkColumnName)
221-
throws Exception {
223+
void testReadMultipleTableWithSingleParallelism(
224+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
222225
testMySqlParallelSource(
223226
1,
224227
FailoverType.NONE,
225228
FailoverPhase.NEVER,
226229
new String[] {tableName, "customers_1"},
227230
tableName,
228-
chunkColumnName);
231+
chunkColumnName,
232+
assignEndingFirst);
229233
}
230234

231235
@ParameterizedTest
232236
@MethodSource("parameters")
233-
void testReadMultipleTableWithMultipleParallelism(String tableName, String chunkColumnName)
234-
throws Exception {
237+
void testReadMultipleTableWithMultipleParallelism(
238+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
235239
testMySqlParallelSource(
236240
4,
237241
FailoverType.NONE,
238242
FailoverPhase.NEVER,
239243
new String[] {tableName, "customers_1"},
240244
tableName,
241-
chunkColumnName);
245+
chunkColumnName,
246+
assignEndingFirst);
242247
}
243248

244249
// Failover tests
245250
@ParameterizedTest
246251
@MethodSource("parameters")
247-
void testTaskManagerFailoverInSnapshotPhase(String tableName, String chunkColumnName)
248-
throws Exception {
252+
void testTaskManagerFailoverInSnapshotPhase(
253+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
249254
testMySqlParallelSource(
250255
FailoverType.TM,
251256
FailoverPhase.SNAPSHOT,
252257
new String[] {tableName, "customers_1"},
253258
tableName,
254-
chunkColumnName);
259+
chunkColumnName,
260+
assignEndingFirst);
255261
}
256262

257263
@ParameterizedTest
258264
@MethodSource("parameters")
259-
void testTaskManagerFailoverInBinlogPhase(String tableName, String chunkColumnName)
260-
throws Exception {
265+
void testTaskManagerFailoverInBinlogPhase(
266+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
261267
testMySqlParallelSource(
262268
FailoverType.TM,
263269
FailoverPhase.BINLOG,
264270
new String[] {tableName, "customers_1"},
265271
tableName,
266-
chunkColumnName);
272+
chunkColumnName,
273+
assignEndingFirst);
267274
}
268275

269276
@ParameterizedTest
270277
@MethodSource("parameters")
271-
void testTaskManagerFailoverFromLatestOffset(String tableName, String chunkColumnName)
272-
throws Exception {
278+
void testTaskManagerFailoverFromLatestOffset(
279+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
273280
testMySqlParallelSource(
274281
DEFAULT_PARALLELISM,
275282
"latest-offset",
@@ -278,37 +285,40 @@ void testTaskManagerFailoverFromLatestOffset(String tableName, String chunkColum
278285
new String[] {tableName, "customers_1"},
279286
RestartStrategies.fixedDelayRestart(1, 0),
280287
tableName,
281-
chunkColumnName);
288+
chunkColumnName,
289+
assignEndingFirst);
282290
}
283291

284292
@ParameterizedTest
285293
@MethodSource("parameters")
286-
void testJobManagerFailoverInSnapshotPhase(String tableName, String chunkColumnName)
287-
throws Exception {
294+
void testJobManagerFailoverInSnapshotPhase(
295+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
288296
testMySqlParallelSource(
289297
FailoverType.JM,
290298
FailoverPhase.SNAPSHOT,
291299
new String[] {tableName, "customers_1"},
292300
tableName,
293-
chunkColumnName);
301+
chunkColumnName,
302+
assignEndingFirst);
294303
}
295304

296305
@ParameterizedTest
297306
@MethodSource("parameters")
298-
void testJobManagerFailoverInBinlogPhase(String tableName, String chunkColumnName)
299-
throws Exception {
307+
void testJobManagerFailoverInBinlogPhase(
308+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
300309
testMySqlParallelSource(
301310
FailoverType.JM,
302311
FailoverPhase.BINLOG,
303312
new String[] {tableName, "customers_1"},
304313
tableName,
305-
chunkColumnName);
314+
chunkColumnName,
315+
assignEndingFirst);
306316
}
307317

308318
@ParameterizedTest
309319
@MethodSource("parameters")
310-
void testJobManagerFailoverFromLatestOffset(String tableName, String chunkColumnName)
311-
throws Exception {
320+
void testJobManagerFailoverFromLatestOffset(
321+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
312322
testMySqlParallelSource(
313323
DEFAULT_PARALLELISM,
314324
"latest-offset",
@@ -317,33 +327,36 @@ void testJobManagerFailoverFromLatestOffset(String tableName, String chunkColumn
317327
new String[] {tableName, "customers_1"},
318328
RestartStrategies.fixedDelayRestart(1, 0),
319329
tableName,
320-
chunkColumnName);
330+
chunkColumnName,
331+
assignEndingFirst);
321332
}
322333

323334
@ParameterizedTest
324335
@MethodSource("parameters")
325-
void testTaskManagerFailoverSingleParallelism(String tableName, String chunkColumnName)
326-
throws Exception {
336+
void testTaskManagerFailoverSingleParallelism(
337+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
327338
testMySqlParallelSource(
328339
1,
329340
FailoverType.TM,
330341
FailoverPhase.SNAPSHOT,
331342
new String[] {tableName},
332343
tableName,
333-
chunkColumnName);
344+
chunkColumnName,
345+
assignEndingFirst);
334346
}
335347

336348
@ParameterizedTest
337349
@MethodSource("parameters")
338-
void testJobManagerFailoverSingleParallelism(String tableName, String chunkColumnName)
339-
throws Exception {
350+
void testJobManagerFailoverSingleParallelism(
351+
String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
340352
testMySqlParallelSource(
341353
1,
342354
FailoverType.JM,
343355
FailoverPhase.SNAPSHOT,
344356
new String[] {tableName},
345357
tableName,
346-
chunkColumnName);
358+
chunkColumnName,
359+
assignEndingFirst);
347360
}
348361

349362
@ParameterizedTest
@@ -681,6 +694,7 @@ void testSourceMetrics(String tableName, String chunkColumnName) throws Exceptio
681694
.password(customDatabase.getPassword())
682695
.deserializer(new StringDebeziumDeserializationSchema())
683696
.serverId(getServerId())
697+
.serverTimeZone("UTC")
684698
.build();
685699
DataStreamSource<String> stream =
686700
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
@@ -1062,15 +1076,17 @@ private void testMySqlParallelSource(
10621076
FailoverPhase failoverPhase,
10631077
String[] captureCustomerTables,
10641078
String tableName,
1065-
String chunkColumnName)
1079+
String chunkColumnName,
1080+
String assignEndingFirst)
10661081
throws Exception {
10671082
testMySqlParallelSource(
10681083
DEFAULT_PARALLELISM,
10691084
failoverType,
10701085
failoverPhase,
10711086
captureCustomerTables,
10721087
tableName,
1073-
chunkColumnName);
1088+
chunkColumnName,
1089+
assignEndingFirst);
10741090
}
10751091

10761092
private void testMySqlParallelSource(
@@ -1079,7 +1095,8 @@ private void testMySqlParallelSource(
10791095
FailoverPhase failoverPhase,
10801096
String[] captureCustomerTables,
10811097
String tableName,
1082-
String chunkColumnName)
1098+
String chunkColumnName,
1099+
String assignEndingFirst)
10831100
throws Exception {
10841101
testMySqlParallelSource(
10851102
parallelism,
@@ -1089,7 +1106,8 @@ private void testMySqlParallelSource(
10891106
captureCustomerTables,
10901107
RestartStrategies.fixedDelayRestart(1, 0),
10911108
tableName,
1092-
chunkColumnName);
1109+
chunkColumnName,
1110+
assignEndingFirst);
10931111
}
10941112

10951113
private void testMySqlParallelSource(
@@ -1100,7 +1118,8 @@ private void testMySqlParallelSource(
11001118
String[] captureCustomerTables,
11011119
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
11021120
String tableName,
1103-
String chunkColumnName)
1121+
String chunkColumnName,
1122+
String assignEndingFirst)
11041123
throws Exception {
11051124
testMySqlParallelSource(
11061125
parallelism,
@@ -1111,7 +1130,8 @@ private void testMySqlParallelSource(
11111130
restartStrategyConfiguration,
11121131
false,
11131132
tableName,
1114-
chunkColumnName);
1133+
chunkColumnName,
1134+
assignEndingFirst);
11151135
}
11161136

11171137
private void testMySqlParallelSource(

0 commit comments

Comments
 (0)