diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index 0a324007fa2..fbfba01da3f 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -51,6 +51,7 @@ type ( signal func() // a function that we'll call whenever we have new schema data // map of keyspace currently tracked + trackedMu sync.Mutex tracked map[keyspaceStr]*updateController consumeDelay time.Duration @@ -96,7 +97,7 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T return err } - t.tracked[target.Keyspace].setLoaded(true) + t.setLoaded(target.Keyspace, true) return nil } @@ -209,8 +210,8 @@ func (t *Tracker) Start() { // getKeyspaceUpdateController returns the updateController for the given keyspace // the updateController will be created if there was none. func (t *Tracker) getKeyspaceUpdateController(th *discovery.TabletHealth) *updateController { - t.mu.Lock() - defer t.mu.Unlock() + t.trackedMu.Lock() + defer t.trackedMu.Unlock() ksUpdater, exists := t.tracked[th.Target.Keyspace] if !exists { @@ -224,6 +225,16 @@ func (t *Tracker) newUpdateController() *updateController { return &updateController{update: t.updateSchema, reloadKeyspace: t.initKeyspace, signal: t.signal, consumeDelay: t.consumeDelay} } +// setLoaded sets the loaded status for the given keyspace. +func (t *Tracker) setLoaded(ks keyspaceStr, loaded bool) { + t.trackedMu.Lock() + defer t.trackedMu.Unlock() + + if ksUpdater, exists := t.tracked[ks]; exists { + ksUpdater.setLoaded(loaded) + } +} + func (t *Tracker) initKeyspace(th *discovery.TabletHealth) error { err := t.LoadKeyspace(th.Conn, th.Target) if err != nil { @@ -343,7 +354,7 @@ func (t *Tracker) updatedTableSchema(th *discovery.TabletHealth) bool { return nil }) if err != nil { - t.tracked[th.Target.Keyspace].setLoaded(false) + t.setLoaded(th.Target.Keyspace, false) // TODO: optimize for the tables that got errored out. log.Warningf("error fetching new schema for %v, making them non-authoritative: %v", tablesUpdated, err) return false @@ -451,7 +462,7 @@ func (t *Tracker) updatedViewSchema(th *discovery.TabletHealth) bool { return nil }) if err != nil { - t.tracked[th.Target.Keyspace].setLoaded(false) + t.setLoaded(th.Target.Keyspace, false) // TODO: optimize for the views that got errored out. log.Warningf("error fetching new views definition for %v", viewsUpdated, err) return false @@ -467,8 +478,9 @@ func (t *Tracker) updateViews(keyspace string, res map[string]string) { // RegisterSignalReceiver allows a function to register to be called when new schema is available func (t *Tracker) RegisterSignalReceiver(f func()) { - t.mu.Lock() - defer t.mu.Unlock() + t.trackedMu.Lock() + defer t.trackedMu.Unlock() + for _, controller := range t.tracked { controller.signal = f } diff --git a/go/vt/vtgate/schema/tracker_test.go b/go/vt/vtgate/schema/tracker_test.go index e5280c78ed8..41f4a33f4ce 100644 --- a/go/vt/vtgate/schema/tracker_test.go +++ b/go/vt/vtgate/schema/tracker_test.go @@ -170,6 +170,38 @@ func TestTrackerGetKeyspaceUpdateController(t *testing.T) { assert.Nil(t, ks3.reloadKeyspace, "ks3 already initialized") } +// TestTrackerNoLock tests that processing of health check is not blocked while tracking is making GetSchema rpc calls. +func TestTrackerNoLock(t *testing.T) { + ch := make(chan *discovery.TabletHealth) + tracker := NewTracker(ch, true, false, sqlparser.NewTestParser()) + tracker.consumeDelay = 1 * time.Millisecond + tracker.Start() + defer tracker.Stop() + + target := &querypb.Target{Cell: cell, Keyspace: keyspace, Shard: "-80", TabletType: topodatapb.TabletType_PRIMARY} + tablet := &topodatapb.Tablet{Keyspace: target.Keyspace, Shard: target.Shard, Type: target.TabletType} + + sbc := sandboxconn.NewSandboxConn(tablet) + sbc.GetSchemaDelayResponse = 100 * time.Millisecond + + th := &discovery.TabletHealth{ + Conn: sbc, + Tablet: tablet, + Target: target, + Serving: true, + Stats: &querypb.RealtimeStats{TableSchemaChanged: []string{"t1"}}, + } + + for i := 0; i < 500000; i++ { + select { + case ch <- th: + case <-time.After(10 * time.Millisecond): + t.Fatalf("failed to send health check to tracker") + } + } + require.GreaterOrEqual(t, sbc.GetSchemaCount.Load(), int64(1), "GetSchema rpc should be called") +} + type myTable struct { name, create string } diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 618a87b1d81..830ded0b234 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -26,17 +26,15 @@ import ( "time" "vitess.io/vitess/go/mysql/collations" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/queryservice" - + "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/queryservice" ) // SandboxConn satisfies the QueryService interface @@ -65,22 +63,24 @@ type SandboxConn struct { // These Count vars report how often the corresponding // functions were called. - ExecCount atomic.Int64 - BeginCount atomic.Int64 - CommitCount atomic.Int64 - RollbackCount atomic.Int64 - AsTransactionCount atomic.Int64 - PrepareCount atomic.Int64 - CommitPreparedCount atomic.Int64 - RollbackPreparedCount atomic.Int64 - CreateTransactionCount atomic.Int64 - StartCommitCount atomic.Int64 - SetRollbackCount atomic.Int64 - ConcludeTransactionCount atomic.Int64 - ReadTransactionCount atomic.Int64 - ReserveCount atomic.Int64 - ReleaseCount atomic.Int64 - GetSchemaCount atomic.Int64 + ExecCount atomic.Int64 + BeginCount atomic.Int64 + CommitCount atomic.Int64 + RollbackCount atomic.Int64 + AsTransactionCount atomic.Int64 + PrepareCount atomic.Int64 + CommitPreparedCount atomic.Int64 + RollbackPreparedCount atomic.Int64 + CreateTransactionCount atomic.Int64 + StartCommitCount atomic.Int64 + SetRollbackCount atomic.Int64 + ConcludeTransactionCount atomic.Int64 + ReadTransactionCount atomic.Int64 + UnresolvedTransactionsCount atomic.Int64 + ReserveCount atomic.Int64 + ReleaseCount atomic.Int64 + GetSchemaCount atomic.Int64 + GetSchemaDelayResponse time.Duration queriesRequireLocking bool queriesMu sync.Mutex @@ -677,6 +677,9 @@ func (sbc *SandboxConn) Release(ctx context.Context, target *querypb.Target, tra // GetSchema implements the QueryService interface func (sbc *SandboxConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error { sbc.GetSchemaCount.Add(1) + if sbc.GetSchemaDelayResponse > 0 { + time.Sleep(sbc.GetSchemaDelayResponse) + } if len(sbc.getSchemaResult) == 0 { return nil }