|
66 | 66 | import org.junit.jupiter.api.AfterEach; |
67 | 67 | import org.junit.jupiter.api.BeforeAll; |
68 | 68 | import org.junit.jupiter.api.Test; |
| 69 | +import org.junit.jupiter.api.Timeout; |
69 | 70 | import org.testcontainers.lifecycle.Startables; |
70 | 71 |
|
71 | 72 | import java.sql.Connection; |
|
80 | 81 | import java.util.Map; |
81 | 82 | import java.util.Optional; |
82 | 83 | import java.util.Properties; |
| 84 | +import java.util.concurrent.TimeUnit; |
83 | 85 | import java.util.function.Predicate; |
84 | 86 | import java.util.stream.Collectors; |
85 | 87 | import java.util.stream.Stream; |
@@ -786,6 +788,94 @@ void testReadBinlogFromGtidSet() throws Exception { |
786 | 788 | assertEqualsInOrder(Arrays.asList(expected), actual); |
787 | 789 | } |
788 | 790 |
|
| 791 | + /** |
| 792 | + * In a bad case, it will skip the rest records whitch causes infinite wait for empty data. So |
| 793 | + * it should has a timeout to avoid it. |
| 794 | + */ |
| 795 | + @Test |
| 796 | + @Timeout(value = 600, unit = TimeUnit.SECONDS) |
| 797 | + void testRestoreFromCheckpointWithGtidSetAndSkippingEventsAndRows() throws Exception { |
| 798 | + // Preparations |
| 799 | + customerDatabase.createAndInitialize(); |
| 800 | + MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); |
| 801 | + binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); |
| 802 | + mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); |
| 803 | + DataType dataType = |
| 804 | + DataTypes.ROW( |
| 805 | + DataTypes.FIELD("id", DataTypes.BIGINT()), |
| 806 | + DataTypes.FIELD("name", DataTypes.STRING()), |
| 807 | + DataTypes.FIELD("address", DataTypes.STRING()), |
| 808 | + DataTypes.FIELD("phone_number", DataTypes.STRING())); |
| 809 | + |
| 810 | + // Capture the current binlog offset, and we will start the reader from here |
| 811 | + BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection); |
| 812 | + |
| 813 | + // In this case, the binlog is: |
| 814 | + // Event 0: QUERY,BEGIN |
| 815 | + // Event 1: TABLE_MAP |
| 816 | + // Event 2: Update id = 101 and id = 102 |
| 817 | + // ROW 1 : Update id=101 |
| 818 | + // ROW 2 : Update id=102 |
| 819 | + // Event 3: TABLE_MAP |
| 820 | + // Event 4: Update id = 103 and id = 109 |
| 821 | + // ROW 1 : Update id=103 |
| 822 | + // ROW 2 : Update id=109 |
| 823 | + |
| 824 | + // When a checkpoint is triggered |
| 825 | + // after id=103 ,before id=109 , |
| 826 | + // the position restored from checkpoint will be event=4 and row=1 |
| 827 | + BinlogOffset checkpointOffset = |
| 828 | + BinlogOffset.builder() |
| 829 | + .setBinlogFilePosition("", 0) |
| 830 | + .setGtidSet(startingOffset.getGtidSet()) |
| 831 | + // Because the position restored from checkpoint |
| 832 | + // will skip 4 events to drop the first update: |
| 833 | + // QUERY / TABLE_MAP / EXT_UPDATE_ROWS / TABLE_MAP |
| 834 | + .setSkipEvents(4) |
| 835 | + // The position restored from checkpoint |
| 836 | + // will skip 1 rows to drop the first |
| 837 | + .setSkipRows(1) |
| 838 | + .build(); |
| 839 | + |
| 840 | + // Create a new config to start reading from the offset captured above |
| 841 | + MySqlSourceConfig sourceConfig = |
| 842 | + getConfig( |
| 843 | + StartupOptions.specificOffset(checkpointOffset), |
| 844 | + new String[] {"customers"}); |
| 845 | + |
| 846 | + // Create reader and submit splits |
| 847 | + MySqlBinlogSplit split = createBinlogSplit(sourceConfig); |
| 848 | + BinlogSplitReader reader = createBinlogReader(sourceConfig); |
| 849 | + reader.submitSplit(split); |
| 850 | + |
| 851 | + // Create some binlog events: |
| 852 | + // Event 0: QUERY,BEGIN |
| 853 | + // Event 1: TABLE_MAP |
| 854 | + // Event 2: Update id = 101 and id = 102 |
| 855 | + // ROW 1 : Update id=101 |
| 856 | + // ROW 2 : Update id=102 |
| 857 | + // Event 3: TABLE_MAP |
| 858 | + // Event 4: Update id = 103 and id = 109 |
| 859 | + // ROW 1 : Update id=103 |
| 860 | + // ROW 2 : Update id=109 |
| 861 | + // The event 0-3 will be dropped because skipEvents = 4. |
| 862 | + // The row 1 in event 4 will be dropped because skipRows = 1. |
| 863 | + // Only the update on 109 will be captured. |
| 864 | + updateCustomersTableInBulk( |
| 865 | + mySqlConnection, customerDatabase.qualifiedTableName("customers")); |
| 866 | + |
| 867 | + // Read with binlog split reader and validate |
| 868 | + String[] expected = |
| 869 | + new String[] { |
| 870 | + "-U[109, user_4, Shanghai, 123567891234]", |
| 871 | + "+U[109, user_4, Pittsburgh, 123567891234]" |
| 872 | + }; |
| 873 | + List<String> actual = readBinlogSplits(dataType, reader, expected.length); |
| 874 | + |
| 875 | + reader.close(); |
| 876 | + assertEqualsInOrder(Arrays.asList(expected), actual); |
| 877 | + } |
| 878 | + |
789 | 879 | @Test |
790 | 880 | void testReadBinlogFromTimestamp() throws Exception { |
791 | 881 | // Preparations |
|
0 commit comments