Skip to content

Commit

Permalink
backport fix 13557 to v15
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Jul 27, 2023
1 parent 5bb447a commit bc97a34
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 56 deletions.
10 changes: 6 additions & 4 deletions go/test/endtoend/recovery/recovery_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,20 @@ func VerifyQueriesUsingVtgate(t *testing.T, session *vtgateconn.VTGateSession, q
}

// RestoreTablet performs a PITR restore.
func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string) {
func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string, restoreTime time.Time) {
tablet.ValidateTabletRestart(t)
replicaTabletArgs := commonTabletArg

_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("GetKeyspace", restoreKSName)

if restoreTime.IsZero() {
restoreTime = time.Now().UTC()
}

if err != nil {
tm := time.Now().UTC()
tm.Format(time.RFC3339)
_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("CreateKeyspace", "--",
"--keyspace_type=SNAPSHOT", "--base_keyspace="+keyspaceName,
"--snapshot_time", tm.Format(time.RFC3339), restoreKSName)
"--snapshot_time", restoreTime.Format(time.RFC3339), restoreKSName)
require.Nil(t, err)
}

Expand Down
118 changes: 66 additions & 52 deletions go/test/endtoend/recovery/unshardedrecovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,103 +173,118 @@ SET GLOBAL old_alter_table = ON;

}

// TestRecoveryImpl does following
// - create a shard with primary and replica1 only
// - run InitShardPrimary
// - insert some data
// - take a backup
// - insert more data on the primary
// - take another backup
// - create a recovery keyspace after first backup
// - bring up tablet_replica2 in the new keyspace
// - check that new tablet does not have data created after backup1
// - create second recovery keyspace after second backup
// - bring up tablet_replica3 in second keyspace
// - check that new tablet has data created after backup1 but not data created after backup2
// - check that vtgate queries work correctly
// 1. create a shard with primary and replica1 only
// - run InitShardPrimary
// - insert some data
//
// 2. take a backup
// 3.create a recovery keyspace after first backup
// - bring up tablet_replica2 in the new keyspace
// - check that new tablet has data from backup1
//
// 4. insert more data on the primary
//
// 5. take another backup
// 6. create a recovery keyspace after second backup
// - bring up tablet_replica3 in the new keyspace
// - check that new tablet has data from backup2
//
// 7. check that vtgate queries work correctly
func TestRecoveryImpl(t *testing.T) {
defer cluster.PanicHandler(t)
defer tabletsTeardown()
verifyInitialReplication(t)

// take first backup of value = test1
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.NoError(t, err)

backups := listBackups(t)
require.Equal(t, len(backups), 1)
assert.Contains(t, backups[0], replica1.Alias)

_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
assert.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2)

err = localCluster.VtctlclientProcess.ApplyVSchema(keyspaceName, vSchema)
assert.NoError(t, err)

output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", keyspaceName)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

recovery.RestoreTablet(t, localCluster, replica2, recoveryKS1, "0", keyspaceName, commonTabletArg)
// restore with latest backup
restoreTime := time.Now().UTC()
recovery.RestoreTablet(t, localCluster, replica2, recoveryKS1, "0", keyspaceName, commonTabletArg, restoreTime)

output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvVSchema", cell)
assert.NoError(t, err)
assert.Contains(t, output, keyspaceName)
assert.Contains(t, output, recoveryKS1)

err = localCluster.VtctlclientProcess.ExecuteCommand("GetSrvKeyspace", cell, keyspaceName)
assert.NoError(t, err)

output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS1)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 1)

cluster.VerifyLocalMetadata(t, replica2, recoveryKS1, shardName, cell)

// verify that restored replica has value = test1
qr, err := replica2.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "test1", qr.Rows[0][0].ToString())

// insert new row on primary
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
assert.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2)

// update the original row in primary
_, err = primary.VttabletProcess.QueryTablet("update vt_insert_test set msg = 'msgx1' where id = 1", keyspaceName, true)
assert.NoError(t, err)

// verify that primary has new value
qr, err := primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
qr, err = primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "msgx1", qr.Rows[0][0].ToString())

// verify that restored replica has old value
qr, err = replica2.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "test1", qr.Rows[0][0].ToString())
// check that replica1, used for the backup, has the new value
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.NoError(t, err)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true)
assert.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 3)
for {
qr, err = replica1.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
if qr.Rows[0][0].ToString() == "msgx1" {
break
}

recovery.RestoreTablet(t, localCluster, replica3, recoveryKS2, "0", keyspaceName, commonTabletArg)
select {
case <-ctx.Done():
t.Error("timeout waiting for new value to be replicated on replica 1")
break
case <-ticker.C:
}
}

output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS2)
// take second backup of value = msgx1
err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

cluster.VerifyRowsInTablet(t, replica3, keyspaceName, 2)
// restore to first backup
recovery.RestoreTablet(t, localCluster, replica3, recoveryKS2, "0", keyspaceName, commonTabletArg, restoreTime)

// update the original row in primary
_, err = primary.VttabletProcess.QueryTablet("update vt_insert_test set msg = 'msgx2' where id = 1", keyspaceName, true)
output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS2)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

// verify that primary has new value
qr, err = primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "msgx2", qr.Rows[0][0].ToString())
// only one row from first backup
cluster.VerifyRowsInTablet(t, replica3, keyspaceName, 1)

// verify that restored replica has old value
// verify that restored replica has value = test1
qr, err = replica3.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "msgx1", qr.Rows[0][0].ToString())
assert.Equal(t, "test1", qr.Rows[0][0].ToString())

vtgateInstance := localCluster.NewVtgateInstance()
vtgateInstance.TabletTypesToWait = "REPLICA"
Expand All @@ -281,7 +296,6 @@ func TestRecoveryImpl(t *testing.T) {
assert.NoError(t, vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspaceName, shardName), 1, 30*time.Second))
assert.NoError(t, vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", recoveryKS1, shardName), 1, 30*time.Second))
assert.NoError(t, vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", recoveryKS2, shardName), 1, 30*time.Second))

// Build vtgate grpc connection
grpcAddress := fmt.Sprintf("%s:%d", localCluster.Hostname, localCluster.VtgateGrpcPort)
vtgateConn, err := vtgateconn.Dial(context.Background(), grpcAddress)
Expand All @@ -290,26 +304,26 @@ func TestRecoveryImpl(t *testing.T) {
session := vtgateConn.Session("@replica", nil)

// check that vtgate doesn't route queries to new tablet
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(3)")
recovery.VerifyQueriesUsingVtgate(t, session, "select msg from vt_insert_test where id = 1", `VARCHAR("msgx2")`)
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, "select msg from vt_insert_test where id = 1", `VARCHAR("msgx1")`)
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS1), "INT64(1)")
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS1), `VARCHAR("test1")`)
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS2), "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS2), `VARCHAR("msgx1")`)
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS2), "INT64(1)")
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS2), `VARCHAR("test1")`)

// check that new keyspace is accessible with 'use ks'
cluster.ExecuteQueriesUsingVtgate(t, session, "use "+recoveryKS1+"@replica")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")

cluster.ExecuteQueriesUsingVtgate(t, session, "use "+recoveryKS2+"@replica")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")

// check that new tablet is accessible with use `ks:shard`
cluster.ExecuteQueriesUsingVtgate(t, session, "use `"+recoveryKS1+":0@replica`")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")

cluster.ExecuteQueriesUsingVtgate(t, session, "use `"+recoveryKS2+":0@replica`")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")
}

// verifyInitialReplication will create schema in primary, insert some data to primary and verify the same data in replica.
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
log.Infof("Using base_keyspace %v to restore keyspace %v using a backup time of %v", keyspace, tablet.Keyspace, backupTime)
}

if backupTime.IsZero() {
backupTime = logutil.ProtoToTime(keyspaceInfo.SnapshotTime)
}

params := mysqlctl.RestoreParams{
Cnf: tm.Cnf,
Mysqld: tm.MysqlDaemon,
Expand Down

0 comments on commit bc97a34

Please sign in to comment.