Skip to content

Commit

Permalink
[release-20.0] Fix: Separate Lock for Keyspace to Update Controller M…
Browse files Browse the repository at this point in the history
…apping in Schema Tracking (vitessio#17873) (vitessio#17884)

Signed-off-by: Harshit Gangal <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Harshit Gangal <[email protected]>
  • Loading branch information
vitess-bot[bot] and harshit-gangal authored Mar 4, 2025
1 parent 7ea4bb9 commit ab95cb4
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 29 deletions.
26 changes: 19 additions & 7 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
signal func() // a function that we'll call whenever we have new schema data

// map of keyspace currently tracked
trackedMu sync.Mutex
tracked map[keyspaceStr]*updateController
consumeDelay time.Duration

Expand Down Expand Up @@ -96,7 +97,7 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T
return err
}

t.tracked[target.Keyspace].setLoaded(true)
t.setLoaded(target.Keyspace, true)
return nil
}

Expand Down Expand Up @@ -209,8 +210,8 @@ func (t *Tracker) Start() {
// getKeyspaceUpdateController returns the updateController for the given keyspace
// the updateController will be created if there was none.
func (t *Tracker) getKeyspaceUpdateController(th *discovery.TabletHealth) *updateController {
t.mu.Lock()
defer t.mu.Unlock()
t.trackedMu.Lock()
defer t.trackedMu.Unlock()

ksUpdater, exists := t.tracked[th.Target.Keyspace]
if !exists {
Expand All @@ -224,6 +225,16 @@ func (t *Tracker) newUpdateController() *updateController {
return &updateController{update: t.updateSchema, reloadKeyspace: t.initKeyspace, signal: t.signal, consumeDelay: t.consumeDelay}
}

// setLoaded sets the loaded status for the given keyspace.
func (t *Tracker) setLoaded(ks keyspaceStr, loaded bool) {
t.trackedMu.Lock()
defer t.trackedMu.Unlock()

if ksUpdater, exists := t.tracked[ks]; exists {
ksUpdater.setLoaded(loaded)
}
}

func (t *Tracker) initKeyspace(th *discovery.TabletHealth) error {
err := t.LoadKeyspace(th.Conn, th.Target)
if err != nil {
Expand Down Expand Up @@ -343,7 +354,7 @@ func (t *Tracker) updatedTableSchema(th *discovery.TabletHealth) bool {
return nil
})
if err != nil {
t.tracked[th.Target.Keyspace].setLoaded(false)
t.setLoaded(th.Target.Keyspace, false)
// TODO: optimize for the tables that got errored out.
log.Warningf("error fetching new schema for %v, making them non-authoritative: %v", tablesUpdated, err)
return false
Expand Down Expand Up @@ -451,7 +462,7 @@ func (t *Tracker) updatedViewSchema(th *discovery.TabletHealth) bool {
return nil
})
if err != nil {
t.tracked[th.Target.Keyspace].setLoaded(false)
t.setLoaded(th.Target.Keyspace, false)
// TODO: optimize for the views that got errored out.
log.Warningf("error fetching new views definition for %v", viewsUpdated, err)
return false
Expand All @@ -467,8 +478,9 @@ func (t *Tracker) updateViews(keyspace string, res map[string]string) {

// RegisterSignalReceiver allows a function to register to be called when new schema is available
func (t *Tracker) RegisterSignalReceiver(f func()) {
t.mu.Lock()
defer t.mu.Unlock()
t.trackedMu.Lock()
defer t.trackedMu.Unlock()

for _, controller := range t.tracked {
controller.signal = f
}
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtgate/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,38 @@ func TestTrackerGetKeyspaceUpdateController(t *testing.T) {
assert.Nil(t, ks3.reloadKeyspace, "ks3 already initialized")
}

// TestTrackerNoLock tests that processing of health check is not blocked while tracking is making GetSchema rpc calls.
func TestTrackerNoLock(t *testing.T) {
ch := make(chan *discovery.TabletHealth)
tracker := NewTracker(ch, true, false, sqlparser.NewTestParser())
tracker.consumeDelay = 1 * time.Millisecond
tracker.Start()
defer tracker.Stop()

target := &querypb.Target{Cell: cell, Keyspace: keyspace, Shard: "-80", TabletType: topodatapb.TabletType_PRIMARY}
tablet := &topodatapb.Tablet{Keyspace: target.Keyspace, Shard: target.Shard, Type: target.TabletType}

sbc := sandboxconn.NewSandboxConn(tablet)
sbc.GetSchemaDelayResponse = 100 * time.Millisecond

th := &discovery.TabletHealth{
Conn: sbc,
Tablet: tablet,
Target: target,
Serving: true,
Stats: &querypb.RealtimeStats{TableSchemaChanged: []string{"t1"}},
}

for i := 0; i < 500000; i++ {
select {
case ch <- th:
case <-time.After(10 * time.Millisecond):
t.Fatalf("failed to send health check to tracker")
}
}
require.GreaterOrEqual(t, sbc.GetSchemaCount.Load(), int64(1), "GetSchema rpc should be called")
}

type myTable struct {
name, create string
}
Expand Down
47 changes: 25 additions & 22 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ import (
"time"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/queryservice"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/queryservice"
)

// SandboxConn satisfies the QueryService interface
Expand Down Expand Up @@ -65,22 +63,24 @@ type SandboxConn struct {

// These Count vars report how often the corresponding
// functions were called.
ExecCount atomic.Int64
BeginCount atomic.Int64
CommitCount atomic.Int64
RollbackCount atomic.Int64
AsTransactionCount atomic.Int64
PrepareCount atomic.Int64
CommitPreparedCount atomic.Int64
RollbackPreparedCount atomic.Int64
CreateTransactionCount atomic.Int64
StartCommitCount atomic.Int64
SetRollbackCount atomic.Int64
ConcludeTransactionCount atomic.Int64
ReadTransactionCount atomic.Int64
ReserveCount atomic.Int64
ReleaseCount atomic.Int64
GetSchemaCount atomic.Int64
ExecCount atomic.Int64
BeginCount atomic.Int64
CommitCount atomic.Int64
RollbackCount atomic.Int64
AsTransactionCount atomic.Int64
PrepareCount atomic.Int64
CommitPreparedCount atomic.Int64
RollbackPreparedCount atomic.Int64
CreateTransactionCount atomic.Int64
StartCommitCount atomic.Int64
SetRollbackCount atomic.Int64
ConcludeTransactionCount atomic.Int64
ReadTransactionCount atomic.Int64
UnresolvedTransactionsCount atomic.Int64
ReserveCount atomic.Int64
ReleaseCount atomic.Int64
GetSchemaCount atomic.Int64
GetSchemaDelayResponse time.Duration

queriesRequireLocking bool
queriesMu sync.Mutex
Expand Down Expand Up @@ -677,6 +677,9 @@ func (sbc *SandboxConn) Release(ctx context.Context, target *querypb.Target, tra
// GetSchema implements the QueryService interface
func (sbc *SandboxConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error {
sbc.GetSchemaCount.Add(1)
if sbc.GetSchemaDelayResponse > 0 {
time.Sleep(sbc.GetSchemaDelayResponse)
}
if len(sbc.getSchemaResult) == 0 {
return nil
}
Expand Down

0 comments on commit ab95cb4

Please sign in to comment.