Skip to content

Commit

Permalink
[vtadmin] Remove Tablets option for GetSchema(s) (vitessio#10111)
Browse files Browse the repository at this point in the history
* Remove Tablets option for GetSchema(s)

Signed-off-by: Andrew Mason <[email protected]>

* Fix tests

Signed-off-by: Andrew Mason <[email protected]>
  • Loading branch information
Andrew Mason authored Apr 19, 2022
1 parent 7e7456d commit 619de68
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 63 deletions.
4 changes: 1 addition & 3 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1664,9 +1664,7 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest)
go func(c *cluster.Cluster) {
defer wg.Done()

res, err := c.GetSchema(ctx, req.Keyspace, cluster.GetSchemaOptions{
Tablets: []*vtadminpb.Tablet{tablet},
})
res, err := c.GetSchema(ctx, req.Keyspace, cluster.GetSchemaOptions{})
if err != nil {
er.RecordError(fmt.Errorf("GetSchema(%s): %w", topoproto.TabletAliasString(tablet.Tablet.Alias), err))
return
Expand Down
66 changes: 36 additions & 30 deletions go/vt/vtadmin/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,15 +913,6 @@ func (c *Cluster) getTablets(ctx context.Context) ([]*vtadminpb.Tablet, error) {
// GetSchemaOptions contains the options that modify the behavior of the
// (*Cluster).GetSchema method.
type GetSchemaOptions struct {
// Tablets is the starting set of tablets that GetSchema will filter to find
// suitable tablet(s) to make GetSchema RPC(s) to.
//
// If empty, GetSchema will first call (*Cluster).FindTablets() to fetch all
// tablets for the keyspace.
//
// DEPRECATED: this is currently used only for Cluster.GetSchema, and is
// being phased out. Do not depend on this field.
Tablets []*vtadminpb.Tablet
// BaseRequest is used to share some common parameters to use for the
// individual tablet GetSchema RPCs made by (*Cluster).GetSchema, which
// takes a copy of this request in order to makeb certain overrides as
Expand Down Expand Up @@ -959,22 +950,18 @@ type GetSchemaOptions struct {
// GetSchema returns the schema for a given keyspace. GetSchema has a few
// different behaviors depending on the GetSchemaOptions provided, as follows:
//
// (1) If opts.Tablets is empty, we will first use FindTablets to fetch all
// tablets for the keyspace, regardless of their serving state. Additional
// filtering of either this set, or the provided Tablets, will happen later.
//
// (2) If opts.SizeOpts.AggregateSizes is true, we will also make a call to
// (1) If opts.SizeOpts.AggregateSizes is true, we will also make a call to
// FindAllShardsInKeyspace, in order to fan out GetSchema RPCs to a tablet in
// each shard. If this option is false, we make exactly one GetSchema request to
// a single, randomly-chosen, tablet in the keyspace.
//
// (2.1) If, in size aggregation mode, opts.SizeOpts.IncludeNonServingShards is
// (1.1) If, in size aggregation mode, opts.SizeOpts.IncludeNonServingShards is
// false (the default), then we will filter out any shards for which
// IsPrimaryServing is false in the topo, and make GetSchema RPCs to one tablet
// in every _serving_ shard. Otherwise we will make a GetSchema RPC to one
// tablet in _every_ shard.
//
// (3) Irrespective of whether we're including nonserving shards, or whether
// (2) Irrespective of whether we're including nonserving shards, or whether
// we're doing size aggregation at all, we will only make GetSchema RPCs to
// tablets that are in SERVING state; we don't want to use a tablet that might
// be in a bad state as the source of truth for a schema. Therefore if we can't
Expand Down Expand Up @@ -1006,23 +993,44 @@ func (c *Cluster) GetSchema(ctx context.Context, keyspace string, opts GetSchema
annotateGetSchemaRequest(opts.BaseRequest, span)
vtadminproto.AnnotateSpanWithGetSchemaTableSizeOptions(opts.TableSizeOptions, span)

if len(opts.Tablets) == 0 {
var (
wg sync.WaitGroup
rec concurrency.AllErrorRecorder

tablets []*vtadminpb.Tablet
)

// First, dial vtctld and fetch tablets concurrently.
wg.Add(1)
go func() {
defer wg.Done()

if err := c.Vtctld.Dial(ctx); err != nil {
rec.RecordError(fmt.Errorf("failed to Dial vtctld for cluster = %s for GetSchema: %w", c.ID, err))
return
}
}()

wg.Add(1)
go func() {
defer wg.Done()
// Fetch all tablets for the keyspace.
var err error

opts.Tablets, err = c.FindTablets(ctx, func(tablet *vtadminpb.Tablet) bool {
tablets, err = c.FindTablets(ctx, func(tablet *vtadminpb.Tablet) bool {
return tablet.Tablet.Keyspace == keyspace
}, -1)
if err != nil {
return nil, fmt.Errorf("%w for keyspace %s", errors.ErrNoTablet, keyspace)
rec.RecordError(fmt.Errorf("%w for keyspace %s", errors.ErrNoTablet, keyspace))
}
}
}()

if err := c.Vtctld.Dial(ctx); err != nil {
return nil, fmt.Errorf("failed to Dial vtctld for cluster = %s for GetSchema: %w", c.ID, err)
wg.Wait()
if rec.HasErrors() {
return nil, rec.Error()
}

tabletsToQuery, err := c.getTabletsToQueryForSchemas(ctx, keyspace, opts)
tabletsToQuery, err := c.getTabletsToQueryForSchemas(ctx, keyspace, tablets, opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1121,15 +1129,13 @@ func (c *Cluster) GetSchemas(ctx context.Context, opts GetSchemaOptions) ([]*vta
return nil, rec.Error()
}

opts.Tablets = tablets

// Now, fan out to collect the schemas.
for _, ks := range keyspaces {
wg.Add(1)
go func(ctx context.Context, ks *vtadminpb.Keyspace) {
defer wg.Done()

tablets, err := c.getTabletsToQueryForSchemas(ctx, ks.Keyspace.Name, opts)
tablets, err := c.getTabletsToQueryForSchemas(ctx, ks.Keyspace.Name, tablets, opts)
if err != nil {
// Ignore keyspaces without any serving tablets.
if stderrors.Is(err, errors.ErrNoServingTablet) {
Expand Down Expand Up @@ -1284,7 +1290,7 @@ func (c *Cluster) getSchemaFromTablets(ctx context.Context, keyspace string, tab
return schema, nil
}

func (c *Cluster) getTabletsToQueryForSchemas(ctx context.Context, keyspace string, opts GetSchemaOptions) ([]*vtadminpb.Tablet, error) {
func (c *Cluster) getTabletsToQueryForSchemas(ctx context.Context, keyspace string, tablets []*vtadminpb.Tablet, opts GetSchemaOptions) ([]*vtadminpb.Tablet, error) {
if opts.TableSizeOptions.AggregateSizes {
shards, err := c.FindAllShardsInKeyspace(ctx, keyspace, FindAllShardsInKeyspaceOptions{SkipDial: true})
if err != nil {
Expand All @@ -1308,7 +1314,7 @@ func (c *Cluster) getTabletsToQueryForSchemas(ctx context.Context, keyspace stri

shardTablets := vtadminproto.FilterTablets(func(tablet *vtadminpb.Tablet) bool {
return tablet.Tablet.Keyspace == keyspace && tablet.Tablet.Shard == shard.Name && tablet.State == vtadminpb.Tablet_SERVING
}, opts.Tablets, len(opts.Tablets))
}, tablets, len(tablets))

if len(shardTablets) == 0 {
return nil, fmt.Errorf("%w for shard %s/%s", errors.ErrNoServingTablet, shard.Keyspace, shard.Name)
Expand All @@ -1323,11 +1329,11 @@ func (c *Cluster) getTabletsToQueryForSchemas(ctx context.Context, keyspace stri

keyspaceTablets := vtadminproto.FilterTablets(func(tablet *vtadminpb.Tablet) bool {
return tablet.Tablet.Keyspace == keyspace && tablet.State == vtadminpb.Tablet_SERVING
}, opts.Tablets, len(opts.Tablets))
}, tablets, len(tablets))

if len(keyspaceTablets) == 0 {
err := fmt.Errorf("%w for keyspace %s", errors.ErrNoServingTablet, keyspace)
log.Warningf("%s. Searched tablets: %v", err, vtadminproto.Tablets(opts.Tablets).AliasStringList())
log.Warningf("%s. Searched tablets: %v", err, vtadminproto.Tablets(tablets).AliasStringList())
return nil, err
}

Expand Down
34 changes: 4 additions & 30 deletions go/vt/vtadmin/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,15 +1365,14 @@ func TestGetSchema(t *testing.T) {
Name: fmt.Sprintf("cluster%d", i),
},
VtctldClient: tt.vtctld,
Tablets: nil,
Tablets: []*vtadminpb.Tablet{tt.tablet},
DBConfig: testutil.Dbcfg{},
})

err := c.Vtctld.Dial(ctx)
require.NoError(t, err, "could not dial test vtctld")

schema, err := c.GetSchema(ctx, "testkeyspace", cluster.GetSchemaOptions{
Tablets: []*vtadminpb.Tablet{tt.tablet},
BaseRequest: tt.req,
})
if tt.shouldErr {
Expand Down Expand Up @@ -1429,7 +1428,6 @@ func TestGetSchema(t *testing.T) {

_, _ = c.GetSchema(ctx, "testkeyspace", cluster.GetSchemaOptions{
BaseRequest: req,
Tablets: []*vtadminpb.Tablet{tablet},
})

assert.NotEqual(t, req.TabletAlias, tablet.Tablet.Alias, "expected GetSchema to not modify original request object")
Expand Down Expand Up @@ -2234,9 +2232,10 @@ func TestGetSchema(t *testing.T) {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 200,
Uid: 300,
},
Keyspace: "testkeyspace",
// Note this is for another keyspace, so we fail to find a tablet for testkeyspace/-80.
Keyspace: "otherkeyspace",
Shard: "80-",
},
State: vtadminpb.Tablet_SERVING,
Expand Down Expand Up @@ -2331,31 +2330,6 @@ func TestGetSchema(t *testing.T) {
TableSizeOptions: &vtadminpb.GetSchemaTableSizeOptions{
AggregateSizes: true,
},
Tablets: []*vtadminpb.Tablet{
{
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Keyspace: "testkeyspace",
Shard: "-80",
},
State: vtadminpb.Tablet_SERVING,
},
{
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 300,
},
// Note this is for another keyspace, so we fail to find a tablet for testkeyspace/-80.
Keyspace: "otherkeyspace",
Shard: "80-",
},
State: vtadminpb.Tablet_SERVING,
},
},
},
expected: nil,
shouldErr: true,
Expand Down

0 comments on commit 619de68

Please sign in to comment.