Skip to content

Commit

Permalink
Merge branch 'master' into remove_failover
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Nov 5, 2024
2 parents 167a302 + 5acb569 commit af687be
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 67 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ build:
${GO} build -o bin/go-mysqldump cmd/go-mysqldump/main.go
${GO} build -o bin/go-canal cmd/go-canal/main.go
${GO} build -o bin/go-binlogparser cmd/go-binlogparser/main.go
${GO} build -o bin/go-mysqlserver cmd/go-mysqlserver/main.go

test:
${GO} test --race -timeout 2m ./...
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ func main() {
}
}
}

```

Another shell
Expand Down
42 changes: 42 additions & 0 deletions cmd/go-mysqlserver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"log"
"net"

"github.com/go-mysql-org/go-mysql/server"
)

func main() {
// Listen for connections on localhost port 4000
l, err := net.Listen("tcp", "127.0.0.1:4000")
if err != nil {
log.Fatal(err)
}

log.Println("Listening on port 4000, connect with 'mysql -h 127.0.0.1 -P 4000 -u root'")

// Accept a new connection once
c, err := l.Accept()
if err != nil {
log.Fatal(err)
}

log.Println("Accepted connection")

// Create a connection with user root and an empty password.
// You can use your own handler to handle command here.
conn, err := server.NewConn(c, "root", "", server.EmptyHandler{})
if err != nil {
log.Fatal(err)
}

log.Println("Registered the connection with the server")

// as long as the client keeps sending commands, keep handling them
for {
if err := conn.HandleCommand(); err != nil {
log.Fatal(err)
}
}
}
62 changes: 56 additions & 6 deletions dump/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"os/exec"
"regexp"
"strings"

. "github.com/go-mysql-org/go-mysql/mysql"
Expand Down Expand Up @@ -44,6 +45,9 @@ type Dumper struct {

// see detectColumnStatisticsParamSupported
isColumnStatisticsParamSupported bool

mysqldumpVersion string
sourceDataSupported bool
}

func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) {
Expand All @@ -67,7 +71,14 @@ func NewDumper(executionPath string, addr string, user string, password string)
d.IgnoreTables = make(map[string][]string)
d.ExtraOptions = make([]string, 0, 5)
d.masterDataSkipped = false
d.isColumnStatisticsParamSupported = d.detectColumnStatisticsParamSupported()

out, err := exec.Command(d.ExecutionPath, `--help`).CombinedOutput()
if err != nil {
return d, err
}
d.isColumnStatisticsParamSupported = d.detectColumnStatisticsParamSupported(out)
d.mysqldumpVersion = d.getMysqldumpVersion(out)
d.sourceDataSupported = d.detectSourceDataSupported(d.mysqldumpVersion)

d.ErrOut = os.Stderr

Expand All @@ -81,12 +92,47 @@ func NewDumper(executionPath string, addr string, user string, password string)
// But this parameter exists only for versions >=8.0.2 (https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html).
//
// For environments where the version of mysql-server and mysqldump differs, we try to check this parameter and use it if available.
func (d *Dumper) detectColumnStatisticsParamSupported() bool {
out, err := exec.Command(d.ExecutionPath, `--help`).CombinedOutput()
if err != nil {
func (d *Dumper) detectColumnStatisticsParamSupported(helpOutput []byte) bool {
return bytes.Contains(helpOutput, []byte(`--column-statistics`))
}

// mysqldump Ver 10.19 Distrib 10.3.37-MariaDB, for linux-systemd (x86_64)`, `10.3.37-MariaDB
// opt/mysql/11.0.0/bin/mysqldump from 11.0.0-preview-MariaDB, client 10.19 for linux-systemd (x86_64)
func (d *Dumper) getMysqldumpVersion(helpOutput []byte) string {
mysqldumpVersionRegexpNew := regexp.MustCompile(`mysqldump Ver ([0-9][^ ]*) for`)
if m := mysqldumpVersionRegexpNew.FindSubmatch(helpOutput); m != nil {
return string(m[1])
}

mysqldumpVersionRegexpOld := regexp.MustCompile(`mysqldump Ver .* Distrib ([0-9][^ ]*),`)
if m := mysqldumpVersionRegexpOld.FindSubmatch(helpOutput); m != nil {
return string(m[1])
}

mysqldumpVersionRegexpMaria := regexp.MustCompile(`mysqldump from ([0-9][^ ]*), `)
if m := mysqldumpVersionRegexpMaria.FindSubmatch(helpOutput); m != nil {
return string(m[1])
}

return ""
}

func (d *Dumper) detectSourceDataSupported(version string) bool {
// Failed to detect mysqldump version
if version == "" {
return false
}

// MySQL 5.x
if version[0] == byte('5') {
return false
}

if strings.Contains(version, "MariaDB") {
return false
}
return bytes.Contains(out, []byte(`--column-statistics`))

return true
}

func (d *Dumper) SetCharset(charset string) {
Expand Down Expand Up @@ -169,7 +215,11 @@ func (d *Dumper) Dump(w io.Writer) error {
passwordArgIndex := len(args) - 1

if !d.masterDataSkipped {
args = append(args, "--master-data")
if d.sourceDataSupported {
args = append(args, "--source-data")
} else {
args = append(args, "--master-data")
}
}

if d.maxAllowedPacket > 0 {
Expand Down
62 changes: 62 additions & 0 deletions dump/dumper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package dump

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGetMysqldumpVersion(t *testing.T) {
versions := []struct {
line string // mysqldump --help | head -1
version string // 9.1.0
}{
// Oracle MySQL
{`mysqldump Ver 10.13 Distrib 5.5.62, for linux-glibc2.12 (x86_64)`, `5.5.62`},
{`mysqldump Ver 10.13 Distrib 5.6.44, for linux-glibc2.12 (x86_64)`, `5.6.44`},
{`mysqldump Ver 10.13 Distrib 5.7.31, for linux-glibc2.12 (x86_64)`, `5.7.31`},
{`mysqldump Ver 10.13 Distrib 5.7.36, for linux-glibc2.12 (x86_64)`, `5.7.36`},
{`mysqldump Ver 8.0.11 for linux-glibc2.12 on x86_64 (MySQL Community Server - GPL)`, `8.0.11`},
{`mysqldump Ver 8.0.22 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.22`},
{`mysqldump Ver 8.0.25 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.25`},
{`mysqldump Ver 8.0.26 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.26`},
{`mysqldump Ver 8.0.27 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.27`},
{`mysqldump Ver 8.0.28 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.28`},
{`mysqldump Ver 8.0.31 for Linux on x86_64 (Source distribution)`, `8.0.31`},
{`mysqldump Ver 8.0.32 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.32`},
{`mysqldump Ver 8.4.2 for FreeBSD14.0 on amd64 (Source distribution)`, `8.4.2`},
{`mysqldump Ver 9.1.0 for Linux on x86_64 (MySQL Community Server - GPL)`, `9.1.0`},

// MariaDB
{`mysqldump Ver 10.19 Distrib 10.3.37-MariaDB, for linux-systemd (x86_64)`, `10.3.37-MariaDB`},
{`mysqldump Ver 10.19 Distrib 10.6.11-MariaDB, for linux-systemd (x86_64)`, `10.6.11-MariaDB`},
{`opt/mysql/11.0.0/bin/mysqldump from 11.0.0-preview-MariaDB, client 10.19 for linux-systemd (x86_64)`, `11.0.0-preview-MariaDB`},
{`opt/mysql/11.2.2/bin/mysqldump from 11.2.2-MariaDB, client 10.19 for linux-systemd (x86_64)`, `11.2.2-MariaDB`},
}

d := new(Dumper)
for _, v := range versions {
ver := d.getMysqldumpVersion([]byte(v.line))
require.Equal(t, v.version, ver, v.line)
}
}

func TestDetectSourceDataSupported(t *testing.T) {
versions := []struct {
version string
supported bool
}{
{`5.7.40`, false},
{`8.0.11`, true},
{`8.4.1`, true},
{`9.1.0`, true},
{``, false},
{`10.3.37-MariaDB`, false},
{`11.2.2-MariaDB`, false},
}

d := new(Dumper)
for _, v := range versions {
require.Equal(t, v.supported, d.detectSourceDataSupported(v.version), v.version)
}
}
6 changes: 3 additions & 3 deletions dump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var valuesExp *regexp.Regexp
var gtidExp *regexp.Regexp

func init() {
binlogExp = regexp.MustCompile(`^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\d+);`)
binlogExp = regexp.MustCompile(`^CHANGE (MASTER|REPLICATION SOURCE) TO (MASTER_LOG_FILE|SOURCE_LOG_FILE)='(.+)', (MASTER_LOG_POS|SOURCE_LOG_POS)=(\d+);`)
useExp = regexp.MustCompile("^USE `(.+)`;")
valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$")
// The pattern will only match MySQL GTID, as you know SET GLOBAL gtid_slave_pos='0-1-4' is used for MariaDB.
Expand Down Expand Up @@ -71,8 +71,8 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
}
}
if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
name := m[0][1]
pos, err := strconv.ParseUint(m[0][2], 10, 64)
name := m[0][3]
pos, err := strconv.ParseUint(m[0][5], 10, 64)
if err != nil {
return errors.Errorf("parse binlog %v err, invalid number", line)
}
Expand Down
28 changes: 28 additions & 0 deletions dump/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,34 @@ import (
"github.com/stretchr/testify/require"
)

// This tests the binlogExp regexp that matches the line that mysqldump adds when called with --master-data or --source-data
func TestBinlogExp(t *testing.T) {
stmts := []struct {
input string
file string
pos string
}{
{
// MySQL 9.1.0
`CHANGE REPLICATION SOURCE TO SOURCE_LOG_FILE='binlog.000002', SOURCE_LOG_POS=170923;`,
`binlog.000002`,
`170923`,
},
{
`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`,
`mysql-bin.008995`,
`102052485`,
},
}

for _, stmt := range stmts {
m := binlogExp.FindAllStringSubmatch(stmt.input, -1)
require.NotNil(t, m)
require.Equal(t, stmt.file, m[0][3])
require.Equal(t, stmt.pos, m[0][5])
}
}

func TestParseGtidExp(t *testing.T) {
// binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);")
// gtidExp := regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)")
Expand Down
4 changes: 3 additions & 1 deletion replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (b *BinlogSyncer) registerSlave() error {

serverUUID, err := uuid.NewUUID()
if err != nil {
b.cfg.Logger.Errorf("failed to get new uud %v", err)
b.cfg.Logger.Errorf("failed to get new uuid %v", err)
return errors.Trace(err)
}
if _, err = b.c.Execute(fmt.Sprintf("SET @slave_uuid = '%s', @replica_uuid = '%s'", serverUUID, serverUUID)); err != nil {
Expand Down Expand Up @@ -404,6 +404,8 @@ func (b *BinlogSyncer) prepare() error {
return errors.Trace(err)
}

b.cfg.Logger.Infof("Connected to %s %s server", b.cfg.Flavor, b.c.GetServerVersion())

return nil
}

Expand Down
Loading

0 comments on commit af687be

Please sign in to comment.