Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replication: Support GTID tag in PreviousGTIDsEvent #952

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions replication/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,59 @@ type PreviousGTIDsEvent struct {
GTIDSets string
}

type GtidFormat int

const (
GtidFormatClassic = iota
GtidFormatTagged
)

func decodeSid(data []byte) (format GtidFormat, sidnr uint64) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put the referenced MySQL source code or document in comment, so other reviewers can double check the implementation?

if data[7] == 1 {
format = GtidFormatTagged
}

if format == GtidFormatTagged {
sid_mask := []byte{0, 255, 255, 255, 255, 255, 255, 0}

// Apply the mask
for i, _ := range data[:8] {
data[i] &= sid_mask[i]
}
data = append(data, 0)

// sidnr
sidnr = binary.LittleEndian.Uint64(data[1:])
return
}
sidnr = binary.LittleEndian.Uint64(data)
return
}

func (e *PreviousGTIDsEvent) Decode(data []byte) error {
pos := 0
uuidCount := binary.LittleEndian.Uint16(data[pos : pos+8])

gtidinfo := make([]byte, 8)
copy(gtidinfo, data[:8])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the new copy is caused by decodeSid will modify data in-place. How about let decodeSid copy the needed data? So there's no risk that in future we need to change the code and caller forget to copy it.

format, uuidCount := decodeSid(gtidinfo)
pos += 8

previousGTIDSets := make([]string, uuidCount)
for i := range previousGTIDSets {
currentSetnr := 0
for _ = range previousGTIDSets {
dveeden marked this conversation as resolved.
Show resolved Hide resolved
uuid := e.decodeUuid(data[pos : pos+16])
pos += 16
isTag := false
var tag string
if format == GtidFormatTagged {
tagLength := int(data[pos]) / 2
pos += 1
if tagLength > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the tag have zero length? please add comments

isTag = true
tag = string(data[pos : pos+tagLength])
pos += tagLength
}
}
sliceCount := binary.LittleEndian.Uint16(data[pos : pos+8])
pos += 8
intervals := make([]string, sliceCount)
Expand All @@ -254,9 +298,14 @@ func (e *PreviousGTIDsEvent) Decode(data []byte) error {
}
intervals[i] = interval
}
previousGTIDSets[i] = fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":"))
if isTag {
previousGTIDSets[currentSetnr-1] += fmt.Sprintf(":%s:%s", tag, strings.Join(intervals, ":"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because string is immutable, string += will create a new string object each time. If there are too many tags the performance will drop. Maybe store every "tag+intervals" and join them once at all

} else {
previousGTIDSets[currentSetnr] = fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":"))
currentSetnr += 1
}
}
e.GTIDSets = strings.Join(previousGTIDSets, ",")
e.GTIDSets = strings.Join(previousGTIDSets[:currentSetnr], ",")
return nil
}

Expand Down
50 changes: 50 additions & 0 deletions replication/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,53 @@ func TestIntVarEvent(t *testing.T) {
require.Equal(t, INSERT_ID, ev.Type)
require.Equal(t, uint64(23), ev.Value)
}

func TestDecodeSid(t *testing.T) {
testcases := []struct {
input []byte
gtidFormat GtidFormat
uuidCount uint64
}{
{[]byte{1, 2, 0, 0, 0, 0, 0, 1}, GtidFormatTagged, 2},
{[]byte{1, 1, 0, 0, 0, 0, 0, 1}, GtidFormatTagged, 1},
{[]byte{1, 0, 0, 0, 0, 0, 0, 1}, GtidFormatTagged, 0},
{[]byte{1, 0, 0, 0, 0, 0, 0, 0}, GtidFormatClassic, 1},
}

for _, tc := range testcases {
format, uuidCount := decodeSid(tc.input)
assert.Equal(t, tc.gtidFormat, format)
assert.Equal(t, tc.uuidCount, uuidCount)
}
}

func TestPreviousGTIDEvent(t *testing.T) {
testcases := []struct {
input []byte
GTIDSets string
}{
{
[]byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
"",
},
{
[]byte{0x1, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
"896e7882-18fe-11ef-ab88-22222d34d411:1-3",
},
{
[]byte{0x1, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x8, 0x61, 0x61, 0x61, 0x61, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
"896e7882-18fe-11ef-ab88-22222d34d411:1-4:aaaa:1",
},
{
[]byte{0x1, 0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x8, 0x61, 0x61, 0x61, 0x61, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x6, 0x61, 0x62, 0x63, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0xa, 0x62, 0x62, 0x62, 0x62, 0x62, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0xc, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x2, 0x78, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x12, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
"896e7882-18fe-11ef-ab88-22222d34d411:1-4:aaaa:1:abc:1-3:bbbbb:1:bbbbbb:1:x:1,896e7882-18fe-11ef-ab88-22222d34d412:1-2",
},
}

for _, tc := range testcases {
e := PreviousGTIDsEvent{}
err := e.Decode(tc.input)
require.NoError(t, err)
require.Equal(t, tc.GTIDSets, e.GTIDSets)
}
}
Loading