diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java index acefcf08567..a9a8c34db0d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java @@ -40,6 +40,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; @@ -126,7 +127,9 @@ public void addSplitsBack(List splits, int subtaskId) { LOG.info("The enumerator adds add stream split back: {}", streamSplit); this.streamSplitTaskId = null; } - splitAssigner.addSplits(splits); + if (!CollectionUtil.isNullOrEmpty(splits)) { + splitAssigner.addSplits(splits); + } } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java index aa35e47a6d9..b8d1f54fc11 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java @@ -25,7 +25,6 @@ import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; -import org.apache.flink.util.CollectionUtil; import java.io.IOException; import java.util.ArrayList; @@ -90,10 +89,8 @@ public void onFinishedSplits(Map splitFinishedOffsets) { @Override public void addSplits(Collection splits) { - if (!CollectionUtil.isNullOrEmpty(splits)) { - // we don't store the split, but will re-create binlog split later - isBinlogSplitAssigned = false; - } + // we don't store the split, but will re-create binlog split later + isBinlogSplitAssigned = false; } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java index bd2c8886405..0451db76aa9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java @@ -40,6 +40,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; @@ -126,7 +127,9 @@ public void addSplitsBack(List splits, int subtaskId) { LOG.info("The enumerator adds add binlog split back: {}", binlogSplit); this.binlogSplitTaskId = null; } - splitAssigner.addSplits(splits); + if (!CollectionUtil.isNullOrEmpty(splits)) { + splitAssigner.addSplits(splits); + } } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java index d92ce7cc9a5..099878e83ea 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java @@ -37,6 +37,7 @@ import io.debezium.jdbc.JdbcConnection; import org.apache.commons.lang3.StringUtils; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -48,11 +49,17 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; import static java.lang.String.format; +import static org.apache.flink.api.common.JobStatus.RUNNING; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.catalog.Column.physical; @@ -67,6 +74,7 @@ class SqlServerSourceITCase extends SqlServerSourceTestBase { private static final int USE_POST_LOWWATERMARK_HOOK = 1; private static final int USE_PRE_HIGHWATERMARK_HOOK = 2; + private static final String DEFAULT_SCAN_STARTUP_MODE = "initial"; @Test void testReadSingleTableWithSingleParallelism() throws Exception { @@ -111,6 +119,26 @@ void testJobManagerFailoverSingleParallelism() throws Exception { 1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"dbo.customers"}); } + @Test + public void testJobManagerFailoverFromLatestOffset() throws Exception { + testSqlServerParallelSource( + DEFAULT_PARALLELISM, + "latest-offset", + FailoverType.JM, + FailoverPhase.STREAM, + new String[] {"dbo.customers"}); + } + + @Test + public void testTaskManagerFailoverFromLatestOffset() throws Exception { + testSqlServerParallelSource( + DEFAULT_PARALLELISM, + "latest-offset", + FailoverType.TM, + FailoverPhase.STREAM, + new String[] {"dbo.customers"}); + } + @Test void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exception { testSqlServerParallelSource( @@ -375,12 +403,14 @@ private void testSqlServerParallelSource( private void testSqlServerParallelSource( int parallelism, + String scanStartupMode, FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables) throws Exception { testSqlServerParallelSource( parallelism, + scanStartupMode, failoverType, failoverPhase, captureCustomerTables, @@ -393,6 +423,43 @@ private void testSqlServerParallelSource( int parallelism, FailoverType failoverType, FailoverPhase failoverPhase, + String[] captureCustomerTables) + throws Exception { + testSqlServerParallelSource( + parallelism, + failoverType, + failoverPhase, + captureCustomerTables, + false, + RestartStrategies.fixedDelayRestart(1, 0), + null); + } + + private void testSqlServerParallelSource( + int parallelism, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables, + boolean skipSnapshotBackfill, + RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, + String chunkColumn) + throws Exception { + testSqlServerParallelSource( + parallelism, + DEFAULT_SCAN_STARTUP_MODE, + failoverType, + failoverPhase, + captureCustomerTables, + skipSnapshotBackfill, + restartStrategyConfiguration, + chunkColumn); + } + + private void testSqlServerParallelSource( + int parallelism, + String scanStartupMode, + FailoverType failoverType, + FailoverPhase failoverPhase, String[] captureCustomerTables, boolean skipSnapshotBackfill, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, @@ -418,6 +485,7 @@ private void testSqlServerParallelSource( + " phone_number STRING," + " primary key (id) not enforced" + ") WITH (" + + " 'scan.startup.mode' = '%s'," + " 'connector' = 'sqlserver-cdc'," + " 'hostname' = '%s'," + " 'port' = '%s'," @@ -430,6 +498,7 @@ private void testSqlServerParallelSource( + " 'scan.incremental.snapshot.backfill.skip' = '%s'" + "%s" + ")", + scanStartupMode, MSSQL_SERVER_CONTAINER.getHost(), MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT), MSSQL_SERVER_CONTAINER.getUsername(), @@ -442,8 +511,26 @@ private void testSqlServerParallelSource( : ",'scan.incremental.snapshot.chunk.key-column'='" + chunkColumn + "'"); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from customers"); // first step: check the snapshot data + if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) { + checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables); + } + + // second step: check the binlog data + checkBinlogData(tableResult, failoverType, failoverPhase, captureCustomerTables); + + tableResult.getJobClient().get().cancel().get(); + } + + private void checkSnapshotData( + TableResult tableResult, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables) + throws Exception { String[] snapshotForSingleTable = new String[] { "+I[101, user_1, Shanghai, 123567891234]", @@ -468,15 +555,15 @@ private void testSqlServerParallelSource( "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]" }; - tEnv.executeSql(sourceDDL); - TableResult tableResult = tEnv.executeSql("select * from customers"); - CloseableIterator iterator = tableResult.collect(); - JobID jobId = tableResult.getJobClient().get().getJobID(); + List expectedSnapshotData = new ArrayList<>(); for (int i = 0; i < captureCustomerTables.length; i++) { expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); } + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + // trigger failover after some snapshot splits read finished if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) { triggerFailover( @@ -486,20 +573,35 @@ private void testSqlServerParallelSource( () -> sleepMs(100)); } - LOG.info("snapshot data start"); assertEqualsInAnyOrder( expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + } + + private void checkBinlogData( + TableResult tableResult, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables) + throws Exception { + String databaseName = "customer"; + waitUntilJobRunning(tableResult); + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); - // second step: check the change stream data for (String tableId : captureCustomerTables) { makeFirstPartChangeStreamEvents(databaseName + "." + tableId); } + + // wait for the binlog reading + Thread.sleep(2000L); + if (failoverPhase == FailoverPhase.STREAM) { triggerFailover( failoverType, jobId, miniClusterResource.get().getMiniCluster(), () -> sleepMs(200)); + waitUntilJobRunning(tableResult); } for (String tableId : captureCustomerTables) { makeSecondPartBinlogEvents(databaseName + "." + tableId); @@ -524,7 +626,28 @@ private void testSqlServerParallelSource( expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable)); } assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size())); - tableResult.getJobClient().get().cancel().get(); + Assertions.assertThat(hasNextData(iterator)).isFalse(); + } + + private void waitUntilJobRunning(TableResult tableResult) + throws InterruptedException, ExecutionException { + do { + Thread.sleep(5000L); + } while (tableResult.getJobClient().get().getJobStatus().get() != RUNNING); + } + + private boolean hasNextData(final CloseableIterator iterator) + throws InterruptedException, ExecutionException { + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + FutureTask future = new FutureTask(iterator::hasNext); + executor.execute(future); + return future.get(3, TimeUnit.SECONDS); + } catch (TimeoutException e) { + return false; + } finally { + executor.shutdown(); + } } private void makeFirstPartChangeStreamEvents(String tableId) {