diff --git a/replication/event.go b/replication/event.go index 6fb614f61..4d900fea9 100644 --- a/replication/event.go +++ b/replication/event.go @@ -229,15 +229,59 @@ type PreviousGTIDsEvent struct { GTIDSets string } +type GtidFormat int + +const ( + GtidFormatClassic = iota + GtidFormatTagged +) + +func decodeSid(data []byte) (format GtidFormat, sidnr uint64) { + if data[7] == 1 { + format = GtidFormatTagged + } + + if format == GtidFormatTagged { + sid_mask := []byte{0, 255, 255, 255, 255, 255, 255, 0} + + // Apply the mask + for i, _ := range data[:8] { + data[i] &= sid_mask[i] + } + data = append(data, 0) + + // sidnr + sidnr = binary.LittleEndian.Uint64(data[1:]) + return + } + sidnr = binary.LittleEndian.Uint64(data) + return +} + func (e *PreviousGTIDsEvent) Decode(data []byte) error { pos := 0 - uuidCount := binary.LittleEndian.Uint16(data[pos : pos+8]) + + gtidinfo := make([]byte, 8) + copy(gtidinfo, data[:8]) + format, uuidCount := decodeSid(gtidinfo) pos += 8 previousGTIDSets := make([]string, uuidCount) - for i := range previousGTIDSets { + currentSetnr := 0 + for range previousGTIDSets { uuid := e.decodeUuid(data[pos : pos+16]) pos += 16 + isTag := false + var tag string + if format == GtidFormatTagged { + tagLength := int(data[pos]) / 2 + pos += 1 + if tagLength > 0 { + isTag = true + tag = string(data[pos : pos+tagLength]) + pos += tagLength + } + } sliceCount := binary.LittleEndian.Uint16(data[pos : pos+8]) pos += 8 intervals := make([]string, sliceCount) @@ -254,9 +298,14 @@ func (e *PreviousGTIDsEvent) Decode(data []byte) error { } intervals[i] = interval } - previousGTIDSets[i] = fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":")) + if isTag { + previousGTIDSets[currentSetnr-1] += fmt.Sprintf(":%s:%s", tag, strings.Join(intervals, ":")) + } else { + previousGTIDSets[currentSetnr] = fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":")) + currentSetnr += 1 + } } - e.GTIDSets = strings.Join(previousGTIDSets, ",") + e.GTIDSets = strings.Join(previousGTIDSets[:currentSetnr], ",") return nil } diff --git a/replication/event_test.go b/replication/event_test.go index 1333cd5ef..91184412c 100644 --- a/replication/event_test.go +++ b/replication/event_test.go @@ -140,3 +140,53 @@ func TestIntVarEvent(t *testing.T) { require.Equal(t, INSERT_ID, ev.Type) require.Equal(t, uint64(23), ev.Value) } + +func TestDecodeSid(t *testing.T) { + testcases := []struct { + input []byte + gtidFormat GtidFormat + uuidCount uint64 + }{ + {[]byte{1, 2, 0, 0, 0, 0, 0, 1}, GtidFormatTagged, 2}, + {[]byte{1, 1, 0, 0, 0, 0, 0, 1}, GtidFormatTagged, 1}, + {[]byte{1, 0, 0, 0, 0, 0, 0, 1}, GtidFormatTagged, 0}, + {[]byte{1, 0, 0, 0, 0, 0, 0, 0}, GtidFormatClassic, 1}, + } + + for _, tc := range testcases { + format, uuidCount := decodeSid(tc.input) + assert.Equal(t, tc.gtidFormat, format) + assert.Equal(t, tc.uuidCount, uuidCount) + } +} + +func TestPreviousGTIDEvent(t *testing.T) { + testcases := []struct { + input []byte + GTIDSets string + }{ + { + []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + "", + }, + { + []byte{0x1, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + "896e7882-18fe-11ef-ab88-22222d34d411:1-3", + }, + { + []byte{0x1, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x8, 0x61, 0x61, 0x61, 0x61, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + "896e7882-18fe-11ef-ab88-22222d34d411:1-4:aaaa:1", + }, + { + []byte{0x1, 0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x8, 0x61, 0x61, 0x61, 0x61, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x6, 0x61, 0x62, 0x63, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0xa, 0x62, 0x62, 0x62, 0x62, 0x62, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0xc, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x2, 0x78, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x12, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + "896e7882-18fe-11ef-ab88-22222d34d411:1-4:aaaa:1:abc:1-3:bbbbb:1:bbbbbb:1:x:1,896e7882-18fe-11ef-ab88-22222d34d412:1-2", + }, + } + + for _, tc := range testcases { + e := PreviousGTIDsEvent{} + err := e.Decode(tc.input) + require.NoError(t, err) + require.Equal(t, tc.GTIDSets, e.GTIDSets) + } +}