Skip to content

Commit

Permalink
Refactor api.getSchemas => cluster.GetSchemas (vitessio#10108)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Mason <[email protected]>
  • Loading branch information
Andrew Mason authored Apr 19, 2022
1 parent 2571b36 commit 7e7456d
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 97 deletions.
99 changes: 2 additions & 97 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package vtadmin
import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -492,27 +491,11 @@ func (api *API) FindSchema(ctx context.Context, req *vtadminpb.FindSchemaRequest
go func(c *cluster.Cluster) {
defer wg.Done()

tablets, err := c.FindTablets(ctx, func(t *vtadminpb.Tablet) bool {
// Filter out all the non-serving tablets once, to make the
// later, per-keyspace filtering slightly faster (fewer
// potentially-redundant iterations).
return t.State == vtadminpb.Tablet_SERVING
}, -1)
if err != nil {
err := fmt.Errorf("could not find any serving tablets for cluster %s: %w", c.ID, err)
rec.RecordError(err)

return
}

schemas, err := api.getSchemas(ctx, c, cluster.GetSchemaOptions{
Tablets: tablets,
schemas, err := c.GetSchemas(ctx, cluster.GetSchemaOptions{
TableSizeOptions: req.TableSizeOptions,
})
if err != nil {
err := fmt.Errorf("%w: while collecting schemas for cluster %s", err, c.ID)
rec.RecordError(err)

return
}

Expand Down Expand Up @@ -800,16 +783,7 @@ func (api *API) GetSchemas(ctx context.Context, req *vtadminpb.GetSchemasRequest
go func(c *cluster.Cluster) {
defer wg.Done()

// Since tablets are per-cluster, we can fetch them once
// and use them throughout the other waitgroups.
tablets, err := c.GetTablets(ctx)
if err != nil {
er.RecordError(err)
return
}

ss, err := api.getSchemas(ctx, c, cluster.GetSchemaOptions{
Tablets: tablets,
ss, err := c.GetSchemas(ctx, cluster.GetSchemaOptions{
TableSizeOptions: req.TableSizeOptions,
})
if err != nil {
Expand Down Expand Up @@ -838,75 +812,6 @@ func (api *API) GetSchemas(ctx context.Context, req *vtadminpb.GetSchemasRequest
}, nil
}

// getSchemas returns all of the schemas across all keyspaces in the given cluster.
func (api *API) getSchemas(ctx context.Context, c *cluster.Cluster, opts cluster.GetSchemaOptions) ([]*vtadminpb.Schema, error) {
if err := c.Vtctld.Dial(ctx); err != nil {
return nil, err
}

getKeyspacesSpan, getKeyspacesCtx := trace.NewSpan(ctx, "Cluster.GetKeyspaces")
cluster.AnnotateSpan(c, getKeyspacesSpan)

resp, err := c.Vtctld.GetKeyspaces(getKeyspacesCtx, &vtctldatapb.GetKeyspacesRequest{})
if err != nil {
getKeyspacesSpan.Finish()
return nil, err
}

getKeyspacesSpan.Finish()

var (
schemas []*vtadminpb.Schema
wg sync.WaitGroup
er concurrency.AllErrorRecorder
m sync.Mutex
)

for _, ks := range resp.Keyspaces {
wg.Add(1)

// Get schemas for the cluster/keyspace
go func(c *cluster.Cluster, ks *vtctldatapb.Keyspace) {
defer wg.Done()

ss, err := c.GetSchema(ctx, ks.Name, opts)
if err != nil {
// Ignore keyspaces without any serving tablets.
if stderrors.Is(err, errors.ErrNoServingTablet) {
log.Infof(err.Error())
return
}

er.RecordError(err)
return
}

// Ignore keyspaces without schemas
if ss == nil {
log.Infof("No schemas for %s", ks.Name)
return
}

if len(ss.TableDefinitions) == 0 {
log.Infof("No tables in schema for %s", ks.Name)
return
}

m.Lock()
schemas = append(schemas, ss)
m.Unlock()
}(c, ks)
}

wg.Wait()

if er.HasErrors() {
return nil, er.Error()
}

return schemas, nil
}

// GetShardReplicationPositions is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetShardReplicationPositions(ctx context.Context, req *vtadminpb.GetShardReplicationPositionsRequest) (*vtadminpb.GetShardReplicationPositionsResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.GetShardReplicationPositions")
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vtadmin/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ func TestFindSchema(t *testing.T) {
DBConfig: vtadmintestutil.Dbcfg{
ShouldErr: true,
},
VtctldClient: &fakevtctldclient.VtctldClient{
GetKeyspacesResults: struct {
Keyspaces []*vtctldatapb.Keyspace
Error error
}{
Keyspaces: []*vtctldatapb.Keyspace{
{Name: "testkeyspace"},
},
},
},
},
},
req: &vtadminpb.FindSchemaRequest{
Expand Down
146 changes: 146 additions & 0 deletions go/vt/vtadmin/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"database/sql"
"encoding/json"
stderrors "errors"
"fmt"
"math/rand"
"sort"
Expand Down Expand Up @@ -917,6 +918,9 @@ type GetSchemaOptions struct {
//
// If empty, GetSchema will first call (*Cluster).FindTablets() to fetch all
// tablets for the keyspace.
//
// DEPRECATED: this is currently used only for Cluster.GetSchema, and is
// being phased out. Do not depend on this field.
Tablets []*vtadminpb.Tablet
// BaseRequest is used to share some common parameters to use for the
// individual tablet GetSchema RPCs made by (*Cluster).GetSchema, which
Expand Down Expand Up @@ -1026,6 +1030,148 @@ func (c *Cluster) GetSchema(ctx context.Context, keyspace string, opts GetSchema
return c.getSchemaFromTablets(ctx, keyspace, tabletsToQuery, opts)
}

// GetSchemas returns all of the schemas across all keyspaces in the cluster.
func (c *Cluster) GetSchemas(ctx context.Context, opts GetSchemaOptions) ([]*vtadminpb.Schema, error) {
span, ctx := trace.NewSpan(ctx, "Cluster.GetSchemas")
defer span.Finish()

if opts.TableSizeOptions == nil {
opts.TableSizeOptions = &vtadminpb.GetSchemaTableSizeOptions{
AggregateSizes: false,
IncludeNonServingShards: false,
}
}

if opts.BaseRequest == nil {
opts.BaseRequest = &vtctldatapb.GetSchemaRequest{}
}

if opts.TableSizeOptions.AggregateSizes && opts.BaseRequest.TableNamesOnly {
log.Warningf("GetSchemas(cluster = %s) size aggregation is incompatible with TableNamesOnly, ignoring the latter in favor of aggregating sizes", c.ID)
opts.BaseRequest.TableNamesOnly = false
}

AnnotateSpan(c, span)
annotateGetSchemaRequest(opts.BaseRequest, span)
vtadminproto.AnnotateSpanWithGetSchemaTableSizeOptions(opts.TableSizeOptions, span)

var (
m sync.Mutex
wg sync.WaitGroup
rec concurrency.AllErrorRecorder

tablets []*vtadminpb.Tablet
keyspaces []*vtadminpb.Keyspace

schemas []*vtadminpb.Schema
)

// Start by collecting the tablets and keyspace names concurrently.
wg.Add(1)
go func() {
defer wg.Done()

var err error
tablets, err = c.GetTablets(ctx)
if err != nil {
rec.RecordError(err)
return
}
}()

wg.Add(1)
go func() {
defer wg.Done()

// TODO: (ajm188) we can't use c.GetKeyspaces because it also makes a
// FindAllShardsInKeyspace call for each keyspace, which we may or may
// not need. Refactor that method so we can get better code reuse.
span, ctx := trace.NewSpan(ctx, "Cluster.GetKeyspaces")
defer span.Finish()

if err := c.Vtctld.Dial(ctx); err != nil {
rec.RecordError(fmt.Errorf("Vtctld.Dial(cluster=%s) failed: %w", c.ID, err))
return
}

if err := c.topoReadPool.Acquire(ctx); err != nil {
rec.RecordError(fmt.Errorf("GetKeyspaces() failed to acquire topoReadPool: %w", err))
return
}

resp, err := c.Vtctld.GetKeyspaces(ctx, &vtctldatapb.GetKeyspacesRequest{})
c.topoReadPool.Release()

if err != nil {
rec.RecordError(err)
return
}

keyspaces = make([]*vtadminpb.Keyspace, len(resp.Keyspaces))
for i, ks := range resp.Keyspaces {
keyspaces[i] = &vtadminpb.Keyspace{
Cluster: c.ToProto(),
Keyspace: ks,
}
}
}()

wg.Wait()
if rec.HasErrors() {
return nil, rec.Error()
}

opts.Tablets = tablets

// Now, fan out to collect the schemas.
for _, ks := range keyspaces {
wg.Add(1)
go func(ctx context.Context, ks *vtadminpb.Keyspace) {
defer wg.Done()

tablets, err := c.getTabletsToQueryForSchemas(ctx, ks.Keyspace.Name, opts)
if err != nil {
// Ignore keyspaces without any serving tablets.
if stderrors.Is(err, errors.ErrNoServingTablet) {
log.Infof(err.Error())
return
}

rec.RecordError(fmt.Errorf("opts %+v, err: %w", opts, err))
return
}

schema, err := c.getSchemaFromTablets(ctx, ks.Keyspace.Name, tablets, opts)
if err != nil {
rec.RecordError(err)
return
}

// Ignore keyspaces without schemas
if schema == nil {
log.Infof("No schemas for %s", ks.Keyspace.Name)
return
}

if len(schema.TableDefinitions) == 0 {
log.Infof("No tables in schema for %s", ks.Keyspace.Name)
return
}

m.Lock()
schemas = append(schemas, schema)
m.Unlock()
}(ctx, ks)
}

wg.Wait()
if rec.HasErrors() {
return nil, rec.Error()
}

return schemas, nil
}

// Note that for this function we use the tablets parameter, ignoring the
// opts.Tablets value completely.
func (c *Cluster) getSchemaFromTablets(ctx context.Context, keyspace string, tablets []*vtadminpb.Tablet, opts GetSchemaOptions) (*vtadminpb.Schema, error) {
Expand Down

0 comments on commit 7e7456d

Please sign in to comment.