Skip to content

Commit 90b27cf

Browse files
committed
[FLINK-38618][base] Fix reassign stream split cause offset error when startup mode is latest-offset and taskManager failover
1 parent 6a18cd7 commit 90b27cf

File tree

2 files changed

+135
-10
lines changed

2 files changed

+135
-10
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,11 @@ public void onFinishedSplits(Map<String, Offset> splitFinishedOffsets) {
129129

130130
@Override
131131
public void addSplits(Collection<SourceSplitBase> splits) {
132-
// we don't store the split, but will re-create stream split later
133-
isStreamSplitAssigned = false;
134-
enumeratorMetrics.exitStreamReading();
132+
if (!CollectionUtil.isNullOrEmpty(splits)) {
133+
// we don't store the split, but will re-create stream split later
134+
isStreamSplitAssigned = false;
135+
enumeratorMetrics.exitStreamReading();
136+
}
135137
}
136138

137139
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java

Lines changed: 130 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,22 @@
4848
import java.util.Collections;
4949
import java.util.Iterator;
5050
import java.util.List;
51+
import java.util.concurrent.ExecutionException;
52+
import java.util.concurrent.ExecutorService;
53+
import java.util.concurrent.Executors;
54+
import java.util.concurrent.FutureTask;
5155
import java.util.concurrent.TimeUnit;
56+
import java.util.concurrent.TimeoutException;
5257
import java.util.function.Function;
5358
import java.util.stream.Collectors;
5459

5560
import static java.lang.String.format;
61+
import static org.apache.flink.api.common.JobStatus.RUNNING;
5662
import static org.apache.flink.table.api.DataTypes.BIGINT;
5763
import static org.apache.flink.table.api.DataTypes.STRING;
5864
import static org.apache.flink.table.catalog.Column.physical;
5965
import static org.apache.flink.util.Preconditions.checkState;
66+
import static org.junit.jupiter.api.Assertions.assertFalse;
6067
import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
6168

6269
/** IT tests for {@link SqlServerSourceBuilder.SqlServerIncrementalSource}. */
@@ -67,6 +74,7 @@ class SqlServerSourceITCase extends SqlServerSourceTestBase {
6774

6875
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
6976
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
77+
private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
7078

7179
@Test
7280
void testReadSingleTableWithSingleParallelism() throws Exception {
@@ -111,6 +119,26 @@ void testJobManagerFailoverSingleParallelism() throws Exception {
111119
1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"dbo.customers"});
112120
}
113121

122+
@Test
123+
public void testJobManagerFailoverFromLatestOffset() throws Exception {
124+
testSqlServerParallelSource(
125+
DEFAULT_PARALLELISM,
126+
"latest-offset",
127+
FailoverType.JM,
128+
FailoverPhase.STREAM,
129+
new String[] {"dbo.customers"});
130+
}
131+
132+
@Test
133+
public void testTaskManagerFailoverFromLatestOffset() throws Exception {
134+
testSqlServerParallelSource(
135+
DEFAULT_PARALLELISM,
136+
"latest-offset",
137+
FailoverType.TM,
138+
FailoverPhase.STREAM,
139+
new String[] {"dbo.customers"});
140+
}
141+
114142
@Test
115143
void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exception {
116144
testSqlServerParallelSource(
@@ -375,12 +403,14 @@ private void testSqlServerParallelSource(
375403

376404
private void testSqlServerParallelSource(
377405
int parallelism,
406+
String scanStartupMode,
378407
FailoverType failoverType,
379408
FailoverPhase failoverPhase,
380409
String[] captureCustomerTables)
381410
throws Exception {
382411
testSqlServerParallelSource(
383412
parallelism,
413+
scanStartupMode,
384414
failoverType,
385415
failoverPhase,
386416
captureCustomerTables,
@@ -393,6 +423,43 @@ private void testSqlServerParallelSource(
393423
int parallelism,
394424
FailoverType failoverType,
395425
FailoverPhase failoverPhase,
426+
String[] captureCustomerTables)
427+
throws Exception {
428+
testSqlServerParallelSource(
429+
parallelism,
430+
failoverType,
431+
failoverPhase,
432+
captureCustomerTables,
433+
false,
434+
RestartStrategies.fixedDelayRestart(1, 0),
435+
null);
436+
}
437+
438+
private void testSqlServerParallelSource(
439+
int parallelism,
440+
FailoverType failoverType,
441+
FailoverPhase failoverPhase,
442+
String[] captureCustomerTables,
443+
boolean skipSnapshotBackfill,
444+
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
445+
String chunkColumn)
446+
throws Exception {
447+
testSqlServerParallelSource(
448+
parallelism,
449+
DEFAULT_SCAN_STARTUP_MODE,
450+
failoverType,
451+
failoverPhase,
452+
captureCustomerTables,
453+
skipSnapshotBackfill,
454+
restartStrategyConfiguration,
455+
chunkColumn);
456+
}
457+
458+
private void testSqlServerParallelSource(
459+
int parallelism,
460+
String scanStartupMode,
461+
FailoverType failoverType,
462+
FailoverPhase failoverPhase,
396463
String[] captureCustomerTables,
397464
boolean skipSnapshotBackfill,
398465
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
@@ -418,6 +485,7 @@ private void testSqlServerParallelSource(
418485
+ " phone_number STRING,"
419486
+ " primary key (id) not enforced"
420487
+ ") WITH ("
488+
+ " 'scan.startup.mode' = '%s',"
421489
+ " 'connector' = 'sqlserver-cdc',"
422490
+ " 'hostname' = '%s',"
423491
+ " 'port' = '%s',"
@@ -430,6 +498,7 @@ private void testSqlServerParallelSource(
430498
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
431499
+ "%s"
432500
+ ")",
501+
scanStartupMode,
433502
MSSQL_SERVER_CONTAINER.getHost(),
434503
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
435504
MSSQL_SERVER_CONTAINER.getUsername(),
@@ -442,8 +511,26 @@ private void testSqlServerParallelSource(
442511
: ",'scan.incremental.snapshot.chunk.key-column'='"
443512
+ chunkColumn
444513
+ "'");
514+
tEnv.executeSql(sourceDDL);
515+
TableResult tableResult = tEnv.executeSql("select * from customers");
445516

446517
// first step: check the snapshot data
518+
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
519+
checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables);
520+
}
521+
522+
// second step: check the binlog data
523+
checkBinlogData(tableResult, failoverType, failoverPhase, captureCustomerTables);
524+
525+
tableResult.getJobClient().get().cancel().get();
526+
}
527+
528+
private void checkSnapshotData(
529+
TableResult tableResult,
530+
FailoverType failoverType,
531+
FailoverPhase failoverPhase,
532+
String[] captureCustomerTables)
533+
throws Exception {
447534
String[] snapshotForSingleTable =
448535
new String[] {
449536
"+I[101, user_1, Shanghai, 123567891234]",
@@ -468,15 +555,15 @@ private void testSqlServerParallelSource(
468555
"+I[1019, user_20, Shanghai, 123567891234]",
469556
"+I[2000, user_21, Shanghai, 123567891234]"
470557
};
471-
tEnv.executeSql(sourceDDL);
472-
TableResult tableResult = tEnv.executeSql("select * from customers");
473-
CloseableIterator<Row> iterator = tableResult.collect();
474-
JobID jobId = tableResult.getJobClient().get().getJobID();
558+
475559
List<String> expectedSnapshotData = new ArrayList<>();
476560
for (int i = 0; i < captureCustomerTables.length; i++) {
477561
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
478562
}
479563

564+
CloseableIterator<Row> iterator = tableResult.collect();
565+
JobID jobId = tableResult.getJobClient().get().getJobID();
566+
480567
// trigger failover after some snapshot splits read finished
481568
if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
482569
triggerFailover(
@@ -486,20 +573,35 @@ private void testSqlServerParallelSource(
486573
() -> sleepMs(100));
487574
}
488575

489-
LOG.info("snapshot data start");
490576
assertEqualsInAnyOrder(
491577
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
578+
}
579+
580+
private void checkBinlogData(
581+
TableResult tableResult,
582+
FailoverType failoverType,
583+
FailoverPhase failoverPhase,
584+
String[] captureCustomerTables)
585+
throws Exception {
586+
String databaseName = "customer";
587+
waitUntilJobRunning(tableResult);
588+
CloseableIterator<Row> iterator = tableResult.collect();
589+
JobID jobId = tableResult.getJobClient().get().getJobID();
492590

493-
// second step: check the change stream data
494591
for (String tableId : captureCustomerTables) {
495592
makeFirstPartChangeStreamEvents(databaseName + "." + tableId);
496593
}
594+
595+
// wait for the binlog reading
596+
Thread.sleep(2000L);
597+
497598
if (failoverPhase == FailoverPhase.STREAM) {
498599
triggerFailover(
499600
failoverType,
500601
jobId,
501602
miniClusterResource.get().getMiniCluster(),
502603
() -> sleepMs(200));
604+
waitUntilJobRunning(tableResult);
503605
}
504606
for (String tableId : captureCustomerTables) {
505607
makeSecondPartBinlogEvents(databaseName + "." + tableId);
@@ -524,7 +626,28 @@ private void testSqlServerParallelSource(
524626
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
525627
}
526628
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
527-
tableResult.getJobClient().get().cancel().get();
629+
assertFalse(hasNextData(iterator));
630+
}
631+
632+
private void waitUntilJobRunning(TableResult tableResult)
633+
throws InterruptedException, ExecutionException {
634+
do {
635+
Thread.sleep(5000L);
636+
} while (tableResult.getJobClient().get().getJobStatus().get() != RUNNING);
637+
}
638+
639+
private boolean hasNextData(final CloseableIterator<?> iterator)
640+
throws InterruptedException, ExecutionException {
641+
ExecutorService executor = Executors.newSingleThreadExecutor();
642+
try {
643+
FutureTask<Boolean> future = new FutureTask(iterator::hasNext);
644+
executor.execute(future);
645+
return future.get(3, TimeUnit.SECONDS);
646+
} catch (TimeoutException e) {
647+
return false;
648+
} finally {
649+
executor.shutdown();
650+
}
528651
}
529652

530653
private void makeFirstPartChangeStreamEvents(String tableId) {

0 commit comments

Comments
 (0)