From 7e7456d95cc66dc2231264defb20664034a651bc Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Tue, 19 Apr 2022 12:27:25 -0400 Subject: [PATCH] Refactor api.getSchemas => cluster.GetSchemas (#10108) Signed-off-by: Andrew Mason --- go/vt/vtadmin/api.go | 99 +-------------------- go/vt/vtadmin/api_test.go | 10 +++ go/vt/vtadmin/cluster/cluster.go | 146 +++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+), 97 deletions(-) diff --git a/go/vt/vtadmin/api.go b/go/vt/vtadmin/api.go index cda283dd74b..ce6faa7af65 100644 --- a/go/vt/vtadmin/api.go +++ b/go/vt/vtadmin/api.go @@ -19,7 +19,6 @@ package vtadmin import ( "context" "encoding/json" - stderrors "errors" "fmt" "net/http" "net/http/pprof" @@ -492,27 +491,11 @@ func (api *API) FindSchema(ctx context.Context, req *vtadminpb.FindSchemaRequest go func(c *cluster.Cluster) { defer wg.Done() - tablets, err := c.FindTablets(ctx, func(t *vtadminpb.Tablet) bool { - // Filter out all the non-serving tablets once, to make the - // later, per-keyspace filtering slightly faster (fewer - // potentially-redundant iterations). - return t.State == vtadminpb.Tablet_SERVING - }, -1) - if err != nil { - err := fmt.Errorf("could not find any serving tablets for cluster %s: %w", c.ID, err) - rec.RecordError(err) - - return - } - - schemas, err := api.getSchemas(ctx, c, cluster.GetSchemaOptions{ - Tablets: tablets, + schemas, err := c.GetSchemas(ctx, cluster.GetSchemaOptions{ TableSizeOptions: req.TableSizeOptions, }) if err != nil { - err := fmt.Errorf("%w: while collecting schemas for cluster %s", err, c.ID) rec.RecordError(err) - return } @@ -800,16 +783,7 @@ func (api *API) GetSchemas(ctx context.Context, req *vtadminpb.GetSchemasRequest go func(c *cluster.Cluster) { defer wg.Done() - // Since tablets are per-cluster, we can fetch them once - // and use them throughout the other waitgroups. - tablets, err := c.GetTablets(ctx) - if err != nil { - er.RecordError(err) - return - } - - ss, err := api.getSchemas(ctx, c, cluster.GetSchemaOptions{ - Tablets: tablets, + ss, err := c.GetSchemas(ctx, cluster.GetSchemaOptions{ TableSizeOptions: req.TableSizeOptions, }) if err != nil { @@ -838,75 +812,6 @@ func (api *API) GetSchemas(ctx context.Context, req *vtadminpb.GetSchemasRequest }, nil } -// getSchemas returns all of the schemas across all keyspaces in the given cluster. -func (api *API) getSchemas(ctx context.Context, c *cluster.Cluster, opts cluster.GetSchemaOptions) ([]*vtadminpb.Schema, error) { - if err := c.Vtctld.Dial(ctx); err != nil { - return nil, err - } - - getKeyspacesSpan, getKeyspacesCtx := trace.NewSpan(ctx, "Cluster.GetKeyspaces") - cluster.AnnotateSpan(c, getKeyspacesSpan) - - resp, err := c.Vtctld.GetKeyspaces(getKeyspacesCtx, &vtctldatapb.GetKeyspacesRequest{}) - if err != nil { - getKeyspacesSpan.Finish() - return nil, err - } - - getKeyspacesSpan.Finish() - - var ( - schemas []*vtadminpb.Schema - wg sync.WaitGroup - er concurrency.AllErrorRecorder - m sync.Mutex - ) - - for _, ks := range resp.Keyspaces { - wg.Add(1) - - // Get schemas for the cluster/keyspace - go func(c *cluster.Cluster, ks *vtctldatapb.Keyspace) { - defer wg.Done() - - ss, err := c.GetSchema(ctx, ks.Name, opts) - if err != nil { - // Ignore keyspaces without any serving tablets. - if stderrors.Is(err, errors.ErrNoServingTablet) { - log.Infof(err.Error()) - return - } - - er.RecordError(err) - return - } - - // Ignore keyspaces without schemas - if ss == nil { - log.Infof("No schemas for %s", ks.Name) - return - } - - if len(ss.TableDefinitions) == 0 { - log.Infof("No tables in schema for %s", ks.Name) - return - } - - m.Lock() - schemas = append(schemas, ss) - m.Unlock() - }(c, ks) - } - - wg.Wait() - - if er.HasErrors() { - return nil, er.Error() - } - - return schemas, nil -} - // GetShardReplicationPositions is part of the vtadminpb.VTAdminServer interface. func (api *API) GetShardReplicationPositions(ctx context.Context, req *vtadminpb.GetShardReplicationPositionsRequest) (*vtadminpb.GetShardReplicationPositionsResponse, error) { span, ctx := trace.NewSpan(ctx, "API.GetShardReplicationPositions") diff --git a/go/vt/vtadmin/api_test.go b/go/vt/vtadmin/api_test.go index 7b81dfee1e8..71029406f4a 100644 --- a/go/vt/vtadmin/api_test.go +++ b/go/vt/vtadmin/api_test.go @@ -144,6 +144,16 @@ func TestFindSchema(t *testing.T) { DBConfig: vtadmintestutil.Dbcfg{ ShouldErr: true, }, + VtctldClient: &fakevtctldclient.VtctldClient{ + GetKeyspacesResults: struct { + Keyspaces []*vtctldatapb.Keyspace + Error error + }{ + Keyspaces: []*vtctldatapb.Keyspace{ + {Name: "testkeyspace"}, + }, + }, + }, }, }, req: &vtadminpb.FindSchemaRequest{ diff --git a/go/vt/vtadmin/cluster/cluster.go b/go/vt/vtadmin/cluster/cluster.go index 192c8eed5cf..cce503613a7 100644 --- a/go/vt/vtadmin/cluster/cluster.go +++ b/go/vt/vtadmin/cluster/cluster.go @@ -20,6 +20,7 @@ import ( "context" "database/sql" "encoding/json" + stderrors "errors" "fmt" "math/rand" "sort" @@ -917,6 +918,9 @@ type GetSchemaOptions struct { // // If empty, GetSchema will first call (*Cluster).FindTablets() to fetch all // tablets for the keyspace. + // + // DEPRECATED: this is currently used only for Cluster.GetSchema, and is + // being phased out. Do not depend on this field. Tablets []*vtadminpb.Tablet // BaseRequest is used to share some common parameters to use for the // individual tablet GetSchema RPCs made by (*Cluster).GetSchema, which @@ -1026,6 +1030,148 @@ func (c *Cluster) GetSchema(ctx context.Context, keyspace string, opts GetSchema return c.getSchemaFromTablets(ctx, keyspace, tabletsToQuery, opts) } +// GetSchemas returns all of the schemas across all keyspaces in the cluster. +func (c *Cluster) GetSchemas(ctx context.Context, opts GetSchemaOptions) ([]*vtadminpb.Schema, error) { + span, ctx := trace.NewSpan(ctx, "Cluster.GetSchemas") + defer span.Finish() + + if opts.TableSizeOptions == nil { + opts.TableSizeOptions = &vtadminpb.GetSchemaTableSizeOptions{ + AggregateSizes: false, + IncludeNonServingShards: false, + } + } + + if opts.BaseRequest == nil { + opts.BaseRequest = &vtctldatapb.GetSchemaRequest{} + } + + if opts.TableSizeOptions.AggregateSizes && opts.BaseRequest.TableNamesOnly { + log.Warningf("GetSchemas(cluster = %s) size aggregation is incompatible with TableNamesOnly, ignoring the latter in favor of aggregating sizes", c.ID) + opts.BaseRequest.TableNamesOnly = false + } + + AnnotateSpan(c, span) + annotateGetSchemaRequest(opts.BaseRequest, span) + vtadminproto.AnnotateSpanWithGetSchemaTableSizeOptions(opts.TableSizeOptions, span) + + var ( + m sync.Mutex + wg sync.WaitGroup + rec concurrency.AllErrorRecorder + + tablets []*vtadminpb.Tablet + keyspaces []*vtadminpb.Keyspace + + schemas []*vtadminpb.Schema + ) + + // Start by collecting the tablets and keyspace names concurrently. + wg.Add(1) + go func() { + defer wg.Done() + + var err error + tablets, err = c.GetTablets(ctx) + if err != nil { + rec.RecordError(err) + return + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + // TODO: (ajm188) we can't use c.GetKeyspaces because it also makes a + // FindAllShardsInKeyspace call for each keyspace, which we may or may + // not need. Refactor that method so we can get better code reuse. + span, ctx := trace.NewSpan(ctx, "Cluster.GetKeyspaces") + defer span.Finish() + + if err := c.Vtctld.Dial(ctx); err != nil { + rec.RecordError(fmt.Errorf("Vtctld.Dial(cluster=%s) failed: %w", c.ID, err)) + return + } + + if err := c.topoReadPool.Acquire(ctx); err != nil { + rec.RecordError(fmt.Errorf("GetKeyspaces() failed to acquire topoReadPool: %w", err)) + return + } + + resp, err := c.Vtctld.GetKeyspaces(ctx, &vtctldatapb.GetKeyspacesRequest{}) + c.topoReadPool.Release() + + if err != nil { + rec.RecordError(err) + return + } + + keyspaces = make([]*vtadminpb.Keyspace, len(resp.Keyspaces)) + for i, ks := range resp.Keyspaces { + keyspaces[i] = &vtadminpb.Keyspace{ + Cluster: c.ToProto(), + Keyspace: ks, + } + } + }() + + wg.Wait() + if rec.HasErrors() { + return nil, rec.Error() + } + + opts.Tablets = tablets + + // Now, fan out to collect the schemas. + for _, ks := range keyspaces { + wg.Add(1) + go func(ctx context.Context, ks *vtadminpb.Keyspace) { + defer wg.Done() + + tablets, err := c.getTabletsToQueryForSchemas(ctx, ks.Keyspace.Name, opts) + if err != nil { + // Ignore keyspaces without any serving tablets. + if stderrors.Is(err, errors.ErrNoServingTablet) { + log.Infof(err.Error()) + return + } + + rec.RecordError(fmt.Errorf("opts %+v, err: %w", opts, err)) + return + } + + schema, err := c.getSchemaFromTablets(ctx, ks.Keyspace.Name, tablets, opts) + if err != nil { + rec.RecordError(err) + return + } + + // Ignore keyspaces without schemas + if schema == nil { + log.Infof("No schemas for %s", ks.Keyspace.Name) + return + } + + if len(schema.TableDefinitions) == 0 { + log.Infof("No tables in schema for %s", ks.Keyspace.Name) + return + } + + m.Lock() + schemas = append(schemas, schema) + m.Unlock() + }(ctx, ks) + } + + wg.Wait() + if rec.HasErrors() { + return nil, rec.Error() + } + + return schemas, nil +} + // Note that for this function we use the tablets parameter, ignoring the // opts.Tablets value completely. func (c *Cluster) getSchemaFromTablets(ctx context.Context, keyspace string, tablets []*vtadminpb.Tablet, opts GetSchemaOptions) (*vtadminpb.Schema, error) {