Skip to content

Commit 1e015c2

Browse files
author
bronzehuang
committed
[FLINK-38531][cdc-connector-mysql]Fix data loss when restoring from a checkpoint positioned in the middle of a bulk DML operation.
1 parent 7c7a74d commit 1e015c2

File tree

2 files changed

+117
-22
lines changed

2 files changed

+117
-22
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -186,33 +186,38 @@ public int compareTo(BinlogOffset that) {
186186
String targetGtidSetStr = that.getGtidSet();
187187
if (StringUtils.isNotEmpty(targetGtidSetStr)) {
188188
// The target offset uses GTIDs, so we ideally compare using GTIDs ...
189-
if (StringUtils.isNotEmpty(gtidSetStr)) {
190-
// Both have GTIDs, so base the comparison entirely on the GTID sets.
191-
GtidSet gtidSet = new GtidSet(gtidSetStr);
192-
GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
193-
if (gtidSet.equals(targetGtidSet)) {
194-
long restartSkipEvents = this.getRestartSkipEvents();
195-
long targetRestartSkipEvents = that.getRestartSkipEvents();
196-
return Long.compare(restartSkipEvents, targetRestartSkipEvents);
197-
}
189+
if (StringUtils.isEmpty(gtidSetStr)) {
190+
// The target offset did use GTIDs while this did not use GTIDs. So, we assume
191+
// that this offset is older since GTIDs are often enabled but rarely disabled.
192+
// And if they are disabled,
193+
// it is likely that this offset would not include GTIDs as we would be trying
194+
// to read the binlog of a
195+
// server that no longer has GTIDs. And if they are enabled, disabled, and
196+
// re-enabled, per
197+
// https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html
198+
// all properly configured slaves that
199+
// use GTIDs should always have the complete set of GTIDs copied from the master, in
200+
// which case
201+
// again we know that this offset not having GTIDs is before the target offset ...
202+
return -1;
203+
}
204+
// Both have GTIDs, so base the comparison entirely on the GTID sets.
205+
GtidSet gtidSet = new GtidSet(gtidSetStr);
206+
GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
207+
if (!gtidSet.equals(targetGtidSet)) {
198208
// The GTIDs are not an exact match, so figure out if this is a subset of the target
199209
// offset
200210
// ...
201211
return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
202212
}
203-
// The target offset did use GTIDs while this did not use GTIDs. So, we assume
204-
// that this offset is older since GTIDs are often enabled but rarely disabled.
205-
// And if they are disabled,
206-
// it is likely that this offset would not include GTIDs as we would be trying
207-
// to read the binlog of a
208-
// server that no longer has GTIDs. And if they are enabled, disabled, and re-enabled,
209-
// per
210-
// https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html all properly
211-
// configured slaves that
212-
// use GTIDs should always have the complete set of GTIDs copied from the master, in
213-
// which case
214-
// again we know that this offset not having GTIDs is before the target offset ...
215-
return -1;
213+
long restartSkipEvents = this.getRestartSkipEvents();
214+
long targetRestartSkipEvents = that.getRestartSkipEvents();
215+
// The positions are the same, so compare the completed events in the transaction ...
216+
if (restartSkipEvents != targetRestartSkipEvents) {
217+
return Long.compare(restartSkipEvents, targetRestartSkipEvents);
218+
}
219+
// The completed events are the same, so compare the row number ...
220+
return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows());
216221
} else if (StringUtils.isNotEmpty(gtidSetStr)) {
217222
// This offset has a GTID but the target offset does not, so per the previous paragraph
218223
// we

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.junit.jupiter.api.AfterEach;
6767
import org.junit.jupiter.api.BeforeAll;
6868
import org.junit.jupiter.api.Test;
69+
import org.junit.jupiter.api.Timeout;
6970
import org.testcontainers.lifecycle.Startables;
7071

7172
import java.sql.Connection;
@@ -80,6 +81,7 @@
8081
import java.util.Map;
8182
import java.util.Optional;
8283
import java.util.Properties;
84+
import java.util.concurrent.TimeUnit;
8385
import java.util.function.Predicate;
8486
import java.util.stream.Collectors;
8587
import java.util.stream.Stream;
@@ -786,6 +788,94 @@ void testReadBinlogFromGtidSet() throws Exception {
786788
assertEqualsInOrder(Arrays.asList(expected), actual);
787789
}
788790

791+
/**
792+
* In a bad case, it will skip the rest records whitch causes infinite wait for empty data. So
793+
* it should has a timeout to avoid it.
794+
*/
795+
@Test
796+
@Timeout(value = 600, unit = TimeUnit.SECONDS)
797+
void testRestoreFromCheckpointWithGtidSetAndSkippingEventsAndRows() throws Exception {
798+
// Preparations
799+
customerDatabase.createAndInitialize();
800+
MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"});
801+
binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
802+
mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig);
803+
DataType dataType =
804+
DataTypes.ROW(
805+
DataTypes.FIELD("id", DataTypes.BIGINT()),
806+
DataTypes.FIELD("name", DataTypes.STRING()),
807+
DataTypes.FIELD("address", DataTypes.STRING()),
808+
DataTypes.FIELD("phone_number", DataTypes.STRING()));
809+
810+
// Capture the current binlog offset, and we will start the reader from here
811+
BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection);
812+
813+
// In this case, the binlog is:
814+
// Event 0: QUERY,BEGIN
815+
// Event 1: TABLE_MAP
816+
// Event 2: Update id = 101 and id = 102
817+
// ROW 1 : Update id=101
818+
// ROW 2 : Update id=102
819+
// Event 3: TABLE_MAP
820+
// Event 4: Update id = 103 and id = 109
821+
// ROW 1 : Update id=103
822+
// ROW 2 : Update id=109
823+
824+
// When a checkpoint is triggered
825+
// after id=103 ,before id=109 ,
826+
// the position restored from checkpoint will be event=4 and row=1
827+
BinlogOffset checkpointOffset =
828+
BinlogOffset.builder()
829+
.setBinlogFilePosition("", 0)
830+
.setGtidSet(startingOffset.getGtidSet())
831+
// Because the position restored from checkpoint
832+
// will skip 4 events to drop the first update:
833+
// QUERY / TABLE_MAP / EXT_UPDATE_ROWS / TABLE_MAP
834+
.setSkipEvents(4)
835+
// The position restored from checkpoint
836+
// will skip 1 rows to drop the first
837+
.setSkipRows(1)
838+
.build();
839+
840+
// Create a new config to start reading from the offset captured above
841+
MySqlSourceConfig sourceConfig =
842+
getConfig(
843+
StartupOptions.specificOffset(checkpointOffset),
844+
new String[] {"customers"});
845+
846+
// Create reader and submit splits
847+
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
848+
BinlogSplitReader reader = createBinlogReader(sourceConfig);
849+
reader.submitSplit(split);
850+
851+
// Create some binlog events:
852+
// Event 0: QUERY,BEGIN
853+
// Event 1: TABLE_MAP
854+
// Event 2: Update id = 101 and id = 102
855+
// ROW 1 : Update id=101
856+
// ROW 2 : Update id=102
857+
// Event 3: TABLE_MAP
858+
// Event 4: Update id = 103 and id = 109
859+
// ROW 1 : Update id=103
860+
// ROW 2 : Update id=109
861+
// The event 0-3 will be dropped because skipEvents = 4.
862+
// The row 1 in event 4 will be dropped because skipRows = 1.
863+
// Only the update on 109 will be captured.
864+
updateCustomersTableInBulk(
865+
mySqlConnection, customerDatabase.qualifiedTableName("customers"));
866+
867+
// Read with binlog split reader and validate
868+
String[] expected =
869+
new String[] {
870+
"-U[109, user_4, Shanghai, 123567891234]",
871+
"+U[109, user_4, Pittsburgh, 123567891234]"
872+
};
873+
List<String> actual = readBinlogSplits(dataType, reader, expected.length);
874+
875+
reader.close();
876+
assertEqualsInOrder(Arrays.asList(expected), actual);
877+
}
878+
789879
@Test
790880
void testReadBinlogFromTimestamp() throws Exception {
791881
// Preparations

0 commit comments

Comments
 (0)