diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 38f29cee699..b99bdd17cf1 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -44,8 +44,7 @@ import ( // it implements the BackupEngine interface and contains all the logic // required to implement a backup/restore by invoking xtrabackup with // the appropriate parameters -type XtrabackupEngine struct { -} +type XtrabackupEngine struct{} var ( // path where backup engine program is located @@ -68,6 +67,7 @@ const ( streamModeTar = "tar" xtrabackupBinaryName = "xtrabackup" xtrabackupEngineName = "xtrabackup" + xtrabackupInfoFile = "xtrabackup_info" xbstream = "xbstream" // closeTimeout is the timeout for closing backup files after writing. @@ -155,7 +155,6 @@ func closeFile(wc io.WriteCloser, fileName string, logger logutil.Logger, finalE // ExecuteBackup returns a boolean that indicates if the backup is usable, // and an overall error. func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (complete bool, finalErr error) { - if xtrabackupUser == "" { return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "xtrabackupUser must be specified.") } @@ -238,15 +237,23 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara return true, nil } -func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, backupFileName string, numStripes int, flavor string) (replicationPosition mysql.Position, finalErr error) { - +func (be *XtrabackupEngine) backupFiles( + ctx context.Context, + params BackupParams, + bh backupstorage.BackupHandle, + backupFileName string, + numStripes int, + flavor string, +) (replicationPosition mysql.Position, finalErr error) { backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName) - flagsToExec := []string{"--defaults-file=" + params.Cnf.Path, + flagsToExec := []string{ + "--defaults-file=" + params.Cnf.Path, "--backup", "--socket=" + params.Cnf.SocketFile, "--slave-info", "--user=" + xtrabackupUser, "--target-dir=" + params.Cnf.TmpDir, + "--extra-lsndir=" + params.Cnf.TmpDir, } if xtrabackupStreamMode != "" { flagsToExec = append(flagsToExec, "--stream="+xtrabackupStreamMode) @@ -345,27 +352,14 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams // the replication position. Note that if we don't read stderr as we go, the // xtrabackup process gets blocked when the write buffer fills up. stderrBuilder := &strings.Builder{} - posBuilder := &strings.Builder{} stderrDone := make(chan struct{}) go func() { defer close(stderrDone) scanner := bufio.NewScanner(backupErr) - capture := false for scanner.Scan() { line := scanner.Text() params.Logger.Infof("xtrabackup stderr: %s", line) - - // Wait until we see the first line of the binlog position. - // Then capture all subsequent lines. We need multiple lines since - // the value we're looking for has newlines in it. - if !capture { - if !strings.Contains(line, "MySQL binlog position") { - continue - } - capture = true - } - fmt.Fprintln(posBuilder, line) } if err := scanner.Err(); err != nil { params.Logger.Errorf("error reading from xtrabackup stderr: %v", err) @@ -409,8 +403,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams return replicationPosition, vterrors.Wrap(err, fmt.Sprintf("xtrabackup failed with error. Output=%s", sterrOutput)) } - posOutput := posBuilder.String() - replicationPosition, rerr := findReplicationPosition(posOutput, flavor, params.Logger) + replicationPosition, rerr := findReplicationPositionFromXtrabackupInfo(params.Cnf.TmpDir, flavor, params.Logger) if rerr != nil { return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position") } @@ -420,7 +413,6 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams // ExecuteRestore restores from a backup. Any error is returned. func (be *XtrabackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) { - var bm xtraBackupManifest if err := getBackupManifestInto(ctx, bh, &bm); err != nil { @@ -483,7 +475,8 @@ func (be *XtrabackupEngine) restoreFromBackup(ctx context.Context, cnf *Mycnf, b logger.Infof("Restore: Preparing the extracted files") // prepare the backup restoreProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName) - flagsToExec := []string{"--defaults-file=" + cnf.Path, + flagsToExec := []string{ + "--defaults-file=" + cnf.Path, "--prepare", "--target-dir=" + tempDir, } @@ -518,7 +511,8 @@ func (be *XtrabackupEngine) restoreFromBackup(ctx context.Context, cnf *Mycnf, b // then move-back logger.Infof("Restore: Move extracted and prepared files to final locations") - flagsToExec = []string{"--defaults-file=" + cnf.Path, + flagsToExec = []string{ + "--defaults-file=" + cnf.Path, "--move-back", "--target-dir=" + tempDir, } @@ -585,7 +579,7 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log // Create the decompressor if needed. if compressed { var decompressor io.ReadCloser - var deCompressionEngine = bm.CompressionEngine + deCompressionEngine := bm.CompressionEngine if deCompressionEngine == "" { // For backward compatibility. Incase if Manifest is from N-1 binary // then we assign the default value of compressionEngine. @@ -694,6 +688,22 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log return nil } +func findReplicationPositionFromXtrabackupInfo(directory, flavor string, logger logutil.Logger) (mysql.Position, error) { + f, err := os.Open(path.Join(directory, xtrabackupInfoFile)) + if err != nil { + return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, + "couldn't open %q to read GTID position", path.Join(directory, xtrabackupInfoFile)) + } + defer f.Close() + + contents, err := io.ReadAll(f) + if err != nil { + return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "couldn't read GTID position from %q", f.Name()) + } + + return findReplicationPosition(string(contents), flavor, logger) +} + var xtrabackupReplicationPositionRegexp = regexp.MustCompile(`GTID of the last change '([^']*)'`) func findReplicationPosition(input, flavor string, logger logutil.Logger) (mysql.Position, error) { diff --git a/go/vt/mysqlctl/xtrabackupengine_test.go b/go/vt/mysqlctl/xtrabackupengine_test.go index 26e53c6c949..3d81d269608 100644 --- a/go/vt/mysqlctl/xtrabackupengine_test.go +++ b/go/vt/mysqlctl/xtrabackupengine_test.go @@ -20,8 +20,11 @@ import ( "bytes" "io" "math/rand" + "os" + "path" "testing" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/vt/logutil" ) @@ -51,26 +54,48 @@ func TestFindReplicationPosition(t *testing.T) { } } -func TestFindReplicationPositionNoMatch(t *testing.T) { +func TestFindReplicationPositionFromXtrabackupInfo(t *testing.T) { + input := `tool_version = 8.0.35-30 + binlog_pos = filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3, + 1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3, + 47b59de1-b368-11e9-b48b-624401d35560:1-152981, + 557def0a-b368-11e9-84ed-f6fffd91cc57:1-3, + 599ef589-ae55-11e9-9688-ca1f44501925:1-14857169, + b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262' + format = xbstream + ` + want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262" + + tmp, err := os.MkdirTemp(t.TempDir(), "test") + assert.NoError(t, err) + + f, err := os.Create(path.Join(tmp, xtrabackupInfoFile)) + assert.NoError(t, err) + _, err = f.WriteString(input) + assert.NoError(t, err) + assert.NoError(t, f.Close()) + + pos, err := findReplicationPositionFromXtrabackupInfo(tmp, "MySQL56", logutil.NewConsoleLogger()) + assert.NoError(t, err) + assert.Equal(t, want, pos.String()) +} + +func TestFindReplicationPositionNoMatchFromXtrabackupInfo(t *testing.T) { // Make sure failure to find a match triggers an error. input := `nothing` - _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) - if err == nil { - t.Fatalf("expected error from findReplicationPosition but got nil") - } + _, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger()) + assert.Error(t, err) } -func TestFindReplicationPositionEmptyMatch(t *testing.T) { +func TestFindReplicationPositionEmptyMatchFromXtrabackupInfo(t *testing.T) { // Make sure failure to find a match triggers an error. input := `GTID of the last change ' '` - _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) - if err == nil { - t.Fatalf("expected error from findReplicationPosition but got nil") - } + _, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger()) + assert.Error(t, err) } func TestStripeRoundTrip(t *testing.T) {