Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport VReplication: Support reversing read-only traffic in vtctldclient (#16920) #144

Open
wants to merge 7 commits into
base: release-19.0-github
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,32 @@ create table nopk (name varchar(128), age int unsigned);
"create_ddl": "create table cproduct(pid bigint, description varchar(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4"
}]
}
`

materializeCustomerNameSpec = `
{
"workflow": "customer_name",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "customer_name",
"source_expression": "select cid, name from customer",
"create_ddl": "create table if not exists customer_name (cid bigint not null, name varchar(128), primary key(cid), key(name))"
}]
}
`

materializeCustomerTypeSpec = `
{
"workflow": "enterprise_customer",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "enterprise_customer",
"source_expression": "select cid, name, typ from customer where typ = 'enterprise'",
"create_ddl": "create table if not exists enterprise_customer (cid bigint not null, name varchar(128), typ varchar(64), primary key(cid), key(typ))"
}]
}
`

merchantOrdersVSchema = `
Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,11 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro

func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
t.Helper()
rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules")
require.NoError(t, err)
count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery)
assert.Equalf(t, count0+1, count1, "query %q did not execute in target;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\n", query, matchQuery, body0, body1)
require.Equalf(t, count0+1, count1, "query %q did not execute on destination %s (%s-%d);\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n",
query, ksName, tablet.Cell, tablet.TabletUID, matchQuery, body0, body1, rr)
}

func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
Expand Down
9 changes: 8 additions & 1 deletion go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ import (
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultRdonly = 0
defaultReplicas = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

Expand Down
128 changes: 97 additions & 31 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"net"
"slices"
"strconv"
"strings"
"testing"
Expand All @@ -31,9 +32,11 @@ import (

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand Down Expand Up @@ -163,9 +166,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
args = append(args, "--tablet-types", tabletTypes)
}
args = append(args, "--action_timeout=10m") // At this point something is up so fail the test
if debugMode {
t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " "))
}
t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args, " "))
output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
Expand Down Expand Up @@ -326,27 +327,45 @@ func tstWorkflowCancel(t *testing.T) error {
return tstWorkflowAction(t, workflowActionCancel, "", "")
}

func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) {
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
func validateReadsRoute(t *testing.T, tabletType string, tablet *cluster.VttabletProcess) {
if tablet == nil {
return
}
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
for _, tt := range []string{"replica", "rdonly"} {
destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt)
if strings.Contains(tabletTypes, tt) {
readQuery := "select * from customer"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, readQuery)
}
}
// We do NOT want to target a shard as that goes around the routing rules and
// defeats the purpose here. We are using a query w/o a WHERE clause so for
// sharded keyspaces it should hit all shards as a SCATTER query. So all we
// care about is the keyspace and tablet type.
destination := fmt.Sprintf("%s@%s", tablet.Keyspace, strings.ToLower(tabletType))
readQuery := "select cid from customer limit 50"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}

func validateReadsRouteToSource(t *testing.T, tabletTypes string) {
validateReadsRoute(t, tabletTypes, sourceReplicaTab)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, sourceReplicaTab)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), sourceReplicaTab)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, sourceRdonlyTab)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), sourceRdonlyTab)
}
}

func validateReadsRouteToTarget(t *testing.T, tabletTypes string) {
validateReadsRoute(t, tabletTypes, targetReplicaTab1)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, targetReplicaTab1)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), targetReplicaTab1)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, targetRdonlyTab1)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), targetRdonlyTab1)
}
}

func validateWritesRouteToSource(t *testing.T) {
Expand Down Expand Up @@ -396,6 +415,13 @@ func getCurrentStatus(t *testing.T) string {
// but CI currently fails on creating multiple clusters even after the previous ones are torn down

func TestBasicV2Workflows(t *testing.T) {
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultReplicas = 1
defaultRdonly = 1
extraVTTabletArgs = []string{
parallelInsertWorkers,
Expand Down Expand Up @@ -633,6 +659,12 @@ func testPartialSwitches(t *testing.T) {
tstWorkflowSwitchReads(t, "", "")
checkStates(t, nextState, nextState) // idempotency

tstWorkflowReverseReads(t, "replica,rdonly", "")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)

tstWorkflowSwitchWrites(t)
currentState = nextState
nextState = wrangler.WorkflowStateAllSwitched
Expand Down Expand Up @@ -669,12 +701,12 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

// this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces
Expand All @@ -685,42 +717,45 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReads(t, "", "")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

tstWorkflowReverseWrites(t)
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseWrites(t)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
validateReadsRouteToTarget(t, "replica")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowReverseReads(t, "", "")
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchReadsAndWrites(t)
validateReadsRouteToTarget(t, "replica")
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReadsAndWrites(t)
validateReadsRoute(t, "rdonly", sourceRdonlyTab)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

// trying to complete an unswitched workflow should error
Expand All @@ -731,8 +766,7 @@ func testRestOfWorkflow(t *testing.T) {
// fully switch and complete
waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReadsAndWrites(t)
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

err = tstWorkflowComplete(t)
Expand Down Expand Up @@ -787,7 +821,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {

zone1 := vc.Cells["zone1"]

vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)

verifyClusterHealth(t, vc)
insertInitialData(t)
Expand All @@ -800,7 +834,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {
func setupMinimalCustomerKeyspace(t *testing.T) map[string]*cluster.VttabletProcess {
tablets := make(map[string]*cluster.VttabletProcess)
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-",
customerVSchema, customerSchema, 0, 0, 200, nil); err != nil {
customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil {
t.Fatal(err)
}
defaultCell := vc.Cells[vc.CellNames[0]]
Expand Down Expand Up @@ -936,6 +970,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) {
targetTab2 = custKs.Shards["80-c0"].Tablets["zone1-600"].Vttablet
targetTab1 = custKs.Shards["40-80"].Tablets["zone1-500"].Vttablet
targetReplicaTab1 = custKs.Shards["-40"].Tablets["zone1-401"].Vttablet
targetRdonlyTab1 = custKs.Shards["-40"].Tablets["zone1-402"].Vttablet

sourceTab = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
sourceReplicaTab = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet
Expand All @@ -947,3 +982,34 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) {
"--sql", sql, keyspace)
require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err))
}

func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) {
tabletType = strings.ToLower(strings.TrimSpace(tabletType))
rr := getRoutingRules(t)
// We set matched = true by default because it is possible, if --no-routing-rules is set while creating
// a workflow, that the routing rules are empty when the workflow starts.
// We set it to false below when the rule is found, but before matching the routed keyspace.
matched := true
for _, r := range rr.GetRules() {
fromRule := fmt.Sprintf("%s.%s", fromKeyspace, table)
if tabletType != "" && tabletType != "primary" {
fromRule = fmt.Sprintf("%s@%s", fromRule, tabletType)
}
if r.FromTable == fromRule {
// We found the rule, so we can set matched to false here and check for the routed keyspace below.
matched = false
require.NotEmpty(t, r.ToTables)
toTable := r.ToTables[0]
// The ToTables value is of the form "routedKeyspace.table".
routedKeyspace, routedTable, ok := strings.Cut(toTable, ".")
require.True(t, ok)
require.Equal(t, table, routedTable)
if routedKeyspace == toKeyspace {
// We found the rule, the table and keyspace matches, so our search is done.
matched = true
break
}
}
}
require.Truef(t, matched, "routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace)
}
Loading
Loading