From 03b6cf350bd1050c4eacc493ddfca05619e7d6f6 Mon Sep 17 00:00:00 2001 From: Jake Rawsthorne Date: Sat, 22 Oct 2022 21:32:17 +0100 Subject: [PATCH 1/5] speed up setup command remove unnecessary rename that takes ~10s --- helper/helper.go | 1 + service/common/manager.go | 19 ++----- service/common/node.go | 112 +++++--------------------------------- 3 files changed, 19 insertions(+), 113 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index da5773c..0729abe 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -58,6 +58,7 @@ const ( PFailover = "/controller/failOver" PEject = "/controller/ejectNode" PSetupServices = "/node/controller/setupServices" + ClusterInit = "/clusterInit" PPoolsDefault = "/pools/default" PSettingsWeb = "/settings/web" PRbacUsers = "/settings/rbac/users/local" diff --git a/service/common/manager.go b/service/common/manager.go index 8a9d162..829e864 100644 --- a/service/common/manager.go +++ b/service/common/manager.go @@ -194,24 +194,13 @@ func (m *Manager) setupNewCluster() (string, error) { if err != nil { return "", err } - glog.Infof("Set data memory quota to %d", memoryQuota) - if err := epnode.SetupMemoryQuota(memoryQuota); err != nil { - return "", err - } - if m.Config.UseHostname { - glog.Infof("Set hostname to entry point node") - if err := epnode.Rename(epnode.HostName); err != nil { - return "", err - } - } - - glog.Info("SetupInititalService") - if err := epnode.SetupInitialService(); err != nil { - return "", err + clusterInitOpts := ClusterInitOpts{ + KVMemoryQuota: memoryQuota, + IndexerStorageMode: m.Config.StorageMode, } - if err := epnode.InitNewCluster(m.Config); err != nil { + if err := epnode.ClusterInit(clusterInitOpts); err != nil { return "", err } diff --git a/service/common/node.go b/service/common/node.go index 2cc06f7..e427478 100644 --- a/service/common/node.go +++ b/service/common/node.go @@ -141,35 +141,6 @@ func (n *Node) AddNode(newNode *Node, services string) error { return err } -func (n *Node) InitNewCluster(config Config) error { - err := n.SetMemoryQuota(config.MemoryQuota) - if err != nil { - return err - } - if config.StorageMode != "" { - glog.Infof("Set storage mode to %s", config.StorageMode) - if err = n.SetStorageMode(config.StorageMode); err != nil { - return err - } - } - return n.Provision() -} - -func (n *Node) Provision() error { - body := fmt.Sprintf("port=SAME&username=%s&password=%s", n.RestLogin.Username, n.RestLogin.Password) - - restParam := &helper.RestCall{ - ExpectedCode: 200, - Method: "POST", - Path: helper.PSettingsWeb, - Cred: n.RestLogin, - Body: body, - Header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"}, - } - _, err := helper.RestRetryer(helper.RestRetry, restParam, helper.GetResponse) - return err -} - func (n *Node) ManageExternalListener(enable bool) error { setting := "off" path := helper.PDisableExternalListener @@ -249,21 +220,6 @@ func (n *Node) AllowStrictEncryption() error { return err } -func (n *Node) SetMemoryQuota(quota string) error { - body := fmt.Sprintf("memoryQuota=%s&ftsMemoryQuota=%d", quota, helper.FtsDefaultMemoryQuota) - - restParam := &helper.RestCall{ - ExpectedCode: 200, - Method: "POST", - Path: helper.PPoolsDefault, - Cred: n.RestLogin, - Body: body, - Header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"}, - } - _, err := helper.RestRetryer(helper.RestRetry, restParam, helper.GetResponse) - return err -} - func (n *Node) GetMemUsedStats(bucket string) (*helper.MemUsedStats, error) { var stdoutBuf, stderrBuf bytes.Buffer err := n.RunSsh(&stdoutBuf, &stderrBuf, "/opt/couchbase/bin/cbstats localhost -u "+n.RestLogin.Username+" -p "+n.RestLogin.Password+" all -b "+bucket) @@ -306,50 +262,6 @@ func (n *Node) GetMemUsedStats(bucket string) (*helper.MemUsedStats, error) { }, nil } -func (n *Node) Rename(hostname string) error { - body := fmt.Sprintf("hostname=%s", hostname) - - restParam := &helper.RestCall{ - ExpectedCode: 200, - Method: "POST", - Path: helper.PRename, - Cred: n.RestLogin, - Body: body, - Header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"}, - RetryOnCode: 400, - } - _, err := helper.RestRetryer(helper.RestRetry, restParam, helper.GetResponse) - if err == nil { - glog.Infof("Succesfully renamed to %s", hostname) - } else { - glog.Errorf("Error while renaming to %s:%s", hostname, err) - } - - return err - -} - -func (n *Node) SetupMemoryQuota(memoryQuota int) error { - body := fmt.Sprintf("memoryQuota=%d", memoryQuota) - - restParam := &helper.RestCall{ - ExpectedCode: 200, - Method: "POST", - Path: helper.PPoolsDefault, - Cred: n.RestLogin, - Body: body, - Header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"}, - } - _, err := helper.RestRetryer(helper.RestRetry, restParam, helper.GetResponse) - if err == nil { - glog.Infof("Succesfully set memoryQuota to %d", memoryQuota) - } else { - glog.Errorf("Error while setting memoryQuota:%s", err) - } - - return err -} - func (n *Node) setAutoFailover(body string) error { restParam := &helper.RestCall{ ExpectedCode: 200, @@ -392,26 +304,30 @@ func (n *Node) IsAutoFailoverEnabled() (bool, int, error) { return enabled, timeout, nil } -func (n *Node) SetupInitialService() error { - glog.Infof("SetupInitialService for %s", n.HostName) - body := fmt.Sprintf("services=%s", url.QueryEscape(n.Services)) +type ClusterInitOpts struct { + KVMemoryQuota int + IndexerStorageMode string +} +func (n *Node) ClusterInit(opts ClusterInitOpts) error { + body := fmt.Sprintf("hostname=%s", n.HostName) + body += fmt.Sprintf("&services=%s", url.QueryEscape(n.Services)) + body += fmt.Sprintf("&memoryQuota=%d", opts.KVMemoryQuota) + body += fmt.Sprintf("&ftsMemoryQuota=%d", helper.FtsDefaultMemoryQuota) + if opts.IndexerStorageMode != "" { + body += fmt.Sprintf("&indexerStorageMode=%s", opts.IndexerStorageMode) + } + body += fmt.Sprintf("&port=SAME&username=%s&password=%s", n.RestLogin.Username, n.RestLogin.Password) restParam := &helper.RestCall{ ExpectedCode: 200, RetryOnCode: 400, Method: "POST", - Path: helper.PSetupServices, + Path: helper.ClusterInit, Cred: n.RestLogin, Body: body, Header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"}, } _, err := helper.RestRetryer(20, restParam, helper.GetResponse) - if err == nil { - glog.Infof("SetupInitialService for %s with services %s", n.HostName, n.Services) - } else { - glog.Errorf("SetupInitialService:%s", err) - } - return err } From 7081b42de02e49c368b2af0a45d45e0d0a095e41 Mon Sep 17 00:00:00 2001 From: Jake Rawsthorne Date: Sat, 22 Oct 2022 21:40:45 +0100 Subject: [PATCH 2/5] remove unused code and fix lint warnings --- service/common/clusters.go | 2 +- service/common/manager.go | 139 +------------------------------------ service/common/node.go | 98 +++++--------------------- service/common/nodes.go | 6 +- 4 files changed, 22 insertions(+), 223 deletions(-) diff --git a/service/common/clusters.go b/service/common/clusters.go index c6f8509..c1cde45 100644 --- a/service/common/clusters.go +++ b/service/common/clusters.go @@ -106,7 +106,7 @@ func AddSampleBucket(ctx context.Context, s service.ClusterService, clusterID st } if helper.SampleBucketsCount[opts.SampleBucket] == 0 { - return errors.New("Unknown sample bucket") + return errors.New("unknown sample bucket") } if len(c.Nodes) == 0 { diff --git a/service/common/manager.go b/service/common/manager.go index 829e864..f6cf23b 100644 --- a/service/common/manager.go +++ b/service/common/manager.go @@ -1,15 +1,12 @@ package common import ( - "bufio" "bytes" "errors" "fmt" "io" - "os" "strconv" "strings" - "sync" "time" "github.com/couchbaselabs/cbdynclusterd/cluster" @@ -58,37 +55,6 @@ type Config struct { UseDevPreview bool } -func (m *Manager) GetMemUsedStats(bucket string) (*helper.MemUsedStats, error) { - ret := &helper.MemUsedStats{} - for _, n := range m.Nodes { - curr, err := n.GetMemUsedStats(bucket) - if err != nil { - return nil, err - } - ret.Uncompressed += curr.Uncompressed - ret.Used += curr.Used - } - return ret, nil -} - -func (m *Manager) ScpToLocal(hostname, src, dest string) error { - for _, nn := range m.Nodes { - if nn.HostName == hostname { - return nn.ScpToLocal(src, dest) - } - } - return errors.New("Can not find matching host" + hostname) -} - -func (m *Manager) ScpToLocalDir(hostname, src, dest string) error { - for _, nn := range m.Nodes { - if nn.HostName == hostname { - return nn.ScpToLocalDir(src, dest) - } - } - return errors.New("Can not find matching host" + hostname) -} - func (m *Manager) StartCluster() (string, error) { existingCluster := make(map[string][]*Node) // Ensure we can connect to the REST port @@ -121,40 +87,11 @@ func (m *Manager) StartCluster() (string, error) { } } } - wg := sync.WaitGroup{} - for _, n := range m.Nodes { - wg.Add(1) - glog.Infof("Calling n.StartServer for n:%p:%s", n, n.HostName) - go n.StartServer(&wg) - } - wg.Wait() glog.Info("Server started. Setting up a cluster") return m.setupNewCluster() } -func (m *Manager) Rebalance() error { - epnode := m.Nodes[m.epNode] - - // in case rebalance fails, just try one more time - numRetry := 2 - for i := 0; i < numRetry; i++ { - if err := epnode.Rebalance(nil, nil, nil); err != nil { - return err - } - err := epnode.PollRebalance() - if err == nil { - break - } else if i < numRetry { - glog.Infof("Rebalance failed, retrying:%s", err) - } else { - glog.Infof("Rebalance failed, exiting:%s", err) - return err - } - } - return nil -} - func (m *Manager) pollJoinReadyAll(epnode *Node) error { chErr := make(chan error) size := 0 @@ -180,7 +117,7 @@ func (m *Manager) pollJoinReadyAll(epnode *Node) error { glog.Infof("%d/%d node is ready to join", finished, size) } case <-time.After(helper.RestTimeout): - return errors.New("Timeout while polling rest of nodes") + return errors.New("timeout while polling rest of nodes") } } } @@ -277,11 +214,6 @@ func (m *Manager) setupNewCluster() (string, error) { return fmt.Sprintf("http://%s:%s", epnode.HostName, epnode.Port), nil } -func (m *Manager) PollCompressionMode(bucket, mode string) error { - epnode := m.Nodes[m.epNode] - return epnode.PollCompressionMode(bucket, mode) -} - func getEpNode(force bool, nodes []*Node) (*Node, error) { // select epnode (entry point node) var epnode *Node @@ -297,7 +229,7 @@ func getEpNode(force bool, nodes []*Node) (*Node, error) { } } if epnode == nil { - return nil, errors.New("Could not find active nodes") + return nil, errors.New("could not find active nodes") } return epnode, nil } @@ -503,70 +435,3 @@ func (m *Manager) SetupBucket(bucketName, bucketType, bucketPassword string, sto return m.Nodes[m.epNode].WaitForBucketReady() } - -func getNodes(ini []string) []*Node { - var nodes []*Node - for _, spec := range ini { - nodeInfo := strings.Split(spec, ":") - host := nodeInfo[0] - port := nodeInfo[1] - version := nodeInfo[2] - services := nodeInfo[3] - nodes = append(nodes, fromSpec(host, port, version, services)) - } - - return nodes -} - -func fromSpec(host, port, version, services string) *Node { - - nPort := 8091 - if len(port) > 0 { - num, err := strconv.Atoi(port) - if err != nil { - panic(err) - } - nPort = num - } - nodeHost := &Node{ - HostName: host, - Port: strconv.Itoa(nPort), - SshLogin: &helper.Cred{Username: "root", Password: "couchbase", Hostname: host, Port: 22}, - RestLogin: &helper.Cred{Username: RestUsername, Password: RestPassword, Hostname: host, Port: nPort}, - N1qlLogin: &helper.Cred{Username: RestUsername, Password: RestPassword, Hostname: host, Port: 8093}, - FtsLogin: &helper.Cred{Username: RestUsername, Password: RestPassword, Hostname: host, Port: 8094}, - Version: version, - Services: services, - } - - return nodeHost - -} - -func parse(iniFileName string) ([]string, error) { - - file, err := os.Open(iniFileName) - - if err != nil { - return nil, err - } - defer file.Close() - - scanner := bufio.NewScanner(file) - var parsed []string - // currently we parse only 'node' - for scanner.Scan() { - line := scanner.Text() - if line[0] == '#' { - continue - } - lineSlice := strings.Split(strings.TrimSpace(line), "=") - if len(lineSlice) == 2 && lineSlice[0] == "node" { - parsed = append(parsed, lineSlice[1]) - } - } - if err := scanner.Err(); err != nil { - return nil, err - } - return parsed, nil -} diff --git a/service/common/node.go b/service/common/node.go index e427478..eb67fb8 100644 --- a/service/common/node.go +++ b/service/common/node.go @@ -15,7 +15,6 @@ import ( "path" "strconv" "strings" - "sync" "time" "github.com/couchbaselabs/cbdynclusterd/cluster" @@ -74,7 +73,7 @@ func (n *Node) getId() string { func (n *Node) Rebalance(remaining, failedOver, toRemove []Node) error { var ejectedId []string - var ejectedIds map[string]bool + ejectedIds := make(map[string]bool) for _, n := range failedOver { ejectedId = append(ejectedId, n.OtpNode) ejectedIds[n.OtpNode] = true @@ -206,7 +205,7 @@ func (n *Node) SetLowMagmaMinMemoryQuote() error { } func (n *Node) AllowStrictEncryption() error { - body := fmt.Sprintf("canEnableStrictEncryption=true") + body := "canEnableStrictEncryption=true" restParam := &helper.RestCall{ ExpectedCode: 200, @@ -296,6 +295,9 @@ func (n *Node) IsAutoFailoverEnabled() (bool, int, error) { Cred: n.RestLogin, } resp, err := helper.RestRetryer(20, restParam, helper.GetResponse) + if err != nil { + return false, 0, err + } if err = json.Unmarshal([]byte(resp), &parsed); err != nil { return false, 0, err } @@ -514,7 +516,7 @@ func (n *Node) WaitForBucketReady() error { return nil } case <-time.After(helper.WaitTimeout): - return errors.New("Timeout while waiting for bucket ready") + return errors.New("timeout while waiting for bucket ready") } } } @@ -635,60 +637,13 @@ func (n *Node) PollJoinReady(chErr chan error) { time.Sleep(1 * time.Second) } case <-time.After(helper.RestTimeout): - chErr <- errors.New("Timeout while polling join ready") + chErr <- errors.New("timeout while polling join ready") return } } } -func (n *Node) PollCompressionMode(bucket, mode string) error { - var err error - params := &helper.RestCall{ - ExpectedCode: 200, - Method: "GET", - Path: helper.PBuckets + "/" + bucket, - Cred: n.RestLogin, - } - - info := make(chan map[string]interface{}) - -CompressionLoop: - for { - resp, err := helper.RestRetryer(helper.RestRetry, params, helper.GetResponse) - if err != nil { - return err - } - - //glog.Infof("resp=%s", resp) - var parsed map[string]interface{} - if err := json.Unmarshal([]byte(resp), &parsed); err != nil { - return err - } - - go func() { - info <- parsed - }() - select { - case status := <-info: - if status["compressionMode"].(string) == mode { - err = nil - glog.Infof("Compression mode switched to %s", mode) - break CompressionLoop - } else { - err = nil - glog.Infof("Compression mode is still %s", status["compressionMode"].(string)) - time.Sleep(1 * time.Second) - } - case <-time.After(helper.RestTimeout): - err = errors.New("Timeout while checking compression mode") - break CompressionLoop - } - } - - return err -} - func (n *Node) PollRebalance() error { params := &helper.RestCall{ ExpectedCode: 200, @@ -733,7 +688,7 @@ func (n *Node) PollRebalance() error { time.Sleep(1 * time.Second) } case <-time.After(helper.RestTimeout): - return errors.New("Timeout while rebalancing") + return errors.New("timeout while rebalancing") } } @@ -821,7 +776,7 @@ func (n *Node) pollSampleBucketCollections(s string) error { for { if time.Now().After(deadline) { - return errors.New("Timeout while loading sample bucket.") + return errors.New("timeout while loading sample bucket") } resp, err := helper.RestRetryer(helper.RestRetry, params, helper.GetResponse) @@ -879,7 +834,7 @@ func (n *Node) pollSampleBucket(s string) error { return nil } if time.Now().After(deadline) { - return errors.New("Timeout while loading sample bucket.") + return errors.New("timeout while loading sample bucket") } } } @@ -929,7 +884,7 @@ func (n *Node) restCallToAux(fn func(RespNode, *helper.Cred, chan error), restLo return res } case <-time.After(helper.RestTimeout): - return errors.New("Timeout while restCallToAux") + return errors.New("timeout while restCallToAux") } } } @@ -1060,27 +1015,6 @@ func (n *Node) GetInfo() (*RefInfo, error) { return &refInfo, nil } -func (n *Node) StartServer(wg *sync.WaitGroup) { - glog.Infof("In StartServer, my host is :%p:%s", n, n.HostName) - /*var stdoutBuf, stderrBuf bytes.Buffer - var cmdList []string - cmdList = append(cmdList,"service couchbase-server start"); - cmdList = append(cmdList,"pkill -CONT -f memcached"); - cmdList = append(cmdList,"pkill -CONT -f beam.smp"); - cmdList = append(cmdList,"iptables -F"); - cmdList = append(cmdList,"iptables -t nat -F"); - - for _, cmd := range cmdList { - glog.Infof("running %s on %s", cmd, n.HostName) - stdoutBuf.Reset() - stderrBuf.Reset() - err := n.RunSsh(&stdoutBuf, &stderrBuf, cmd) - - if err != nil { glog.Fatalf("Failed to start couchbase server:%s:%s", n.HostName, err) } - }*/ - wg.Done() -} - func (n *Node) GetSystemInfo() OsInfo { var stdoutBuf, stderrBuf bytes.Buffer err := n.RunSsh(&stdoutBuf, &stderrBuf, "cat /etc/os-release") @@ -1239,11 +1173,11 @@ func newClient(sshLogin *helper.Cred) (*ssh.Client, error) { } else { key, err := ioutil.ReadFile(sshLogin.KeyPath) if err != nil { - return nil, fmt.Errorf("Reading private key file failed %v", err) + return nil, fmt.Errorf("reading private key file failed %v", err) } signer, err := ssh.ParsePrivateKey(key) if err != nil { - return nil, fmt.Errorf("Parsing private key file failed %v", err) + return nil, fmt.Errorf("parsing private key file failed %v", err) } sshConfig.Auth = []ssh.AuthMethod{ ssh.PublicKeys(signer), @@ -1258,7 +1192,7 @@ func newSession(sshLogin *helper.Cred) (*ssh.Session, error) { connection, err := newClient(sshLogin) if err != nil { - return nil, errors.New(fmt.Sprintf("Failed to dial:%s", err)) + return nil, fmt.Errorf("failed to dial:%s", err) } session, err := connection.NewSession() @@ -1304,7 +1238,7 @@ func (n *Node) SetupCert(cas []*x509.Certificate, caPrivateKeys []*rsa.PrivateKe err = sftpClient.MkdirAll("/opt/couchbase/var/lib/couchbase/inbox") if err != nil { - return fmt.Errorf("Failed to create node inbox: %s\n", err) + return fmt.Errorf("failed to create node inbox: %s", err) } err = cbcerthelper.WriteRemoteCert("/opt/couchbase/var/lib/couchbase/inbox/chain.pem", cbcerthelper.CertTypeCertificate, @@ -1321,7 +1255,7 @@ func (n *Node) SetupCert(cas []*x509.Certificate, caPrivateKeys []*rsa.PrivateKe if supportsMultipleRoots { err = sftpClient.MkdirAll("/opt/couchbase/var/lib/couchbase/inbox/CA") if err != nil { - return fmt.Errorf("Failed to create CA inbox: %v\n", err) + return fmt.Errorf("failed to create CA inbox: %v", err) } for i, cert := range cas { diff --git a/service/common/nodes.go b/service/common/nodes.go index b369638..156e3f3 100644 --- a/service/common/nodes.go +++ b/service/common/nodes.go @@ -103,12 +103,12 @@ func flavorFromVersion(version string) (string, error) { major, err := strconv.Atoi(versionSplit[0]) if err != nil { - return "", errors.New("Could not convert version major to int") + return "", errors.New("could not convert version major to int") } minor, err := strconv.Atoi(versionSplit[1]) if err != nil { - return "", errors.New("Could not convert version minor to int") + return "", errors.New("could not convert version minor to int") } if minor >= 5 { @@ -183,7 +183,7 @@ func AliasServerVersion(version, aliasRepoPath string) (string, error) { } if serverBuild == "" { - return "", fmt.Errorf("No build version found for %s", version) + return "", fmt.Errorf("no build version found for %s", version) } log.Printf("Using %s version for %s -> %s", buildParts[1], buildParts[0], serverBuild) From a91e78f97b29b77e9bb725c619ab5a54836984a6 Mon Sep 17 00:00:00 2001 From: Jake Rawsthorne Date: Sat, 22 Oct 2022 21:42:52 +0100 Subject: [PATCH 3/5] remove wait until status ok This was an unnecessary wait. Replace it with waiting for dns record propagation and retrying the update hosts file function --- service/ec2/ec2service.go | 67 ++++++++++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/service/ec2/ec2service.go b/service/ec2/ec2service.go index 11474b8..7fb9150 100644 --- a/service/ec2/ec2service.go +++ b/service/ec2/ec2service.go @@ -337,11 +337,34 @@ func (s *EC2Service) hasARecords(ctx context.Context, cluster *cluster.Cluster) } func (s *EC2Service) createARecords(ctx context.Context, cluster *cluster.Cluster) error { - return s.changeARecords(ctx, cluster, route53types.ChangeActionCreate) + changeInfo, err := s.changeARecords(ctx, cluster, route53types.ChangeActionCreate) + if err != nil { + return err + } + + // wait for dns records to propagate + timeout := time.Now().Add(time.Minute * 2) + for { + if time.Now().After(timeout) { + return errors.New("timeout waiting for dns records to propagate") + } + + out, err := s.route53Client.GetChange(ctx, &route53.GetChangeInput{Id: changeInfo.Id}) + if err != nil { + return err + } + + if out.ChangeInfo.Status == route53types.ChangeStatusInsync { + return nil + } + + time.Sleep(time.Second * 5) + } } func (s *EC2Service) removeARecords(ctx context.Context, cluster *cluster.Cluster) error { - return s.changeARecords(ctx, cluster, route53types.ChangeActionDelete) + _, err := s.changeARecords(ctx, cluster, route53types.ChangeActionDelete) + return err } func (s *EC2Service) removeSrvRecords(ctx context.Context, cluster *cluster.Cluster) error { @@ -398,7 +421,7 @@ func (s *EC2Service) changeSrvRecords(ctx context.Context, cluster *cluster.Clus return err } -func (s *EC2Service) changeARecords(ctx context.Context, cluster *cluster.Cluster, action route53types.ChangeAction) error { +func (s *EC2Service) changeARecords(ctx context.Context, cluster *cluster.Cluster, action route53types.ChangeAction) (*route53types.ChangeInfo, error) { changes := make([]route53types.Change, 0, len(cluster.Nodes)) for _, node := range cluster.Nodes { changes = append(changes, route53types.Change{ @@ -416,14 +439,14 @@ func (s *EC2Service) changeARecords(ctx context.Context, cluster *cluster.Cluste }) } - _, err := s.route53Client.ChangeResourceRecordSets(ctx, &route53.ChangeResourceRecordSetsInput{ + out, err := s.route53Client.ChangeResourceRecordSets(ctx, &route53.ChangeResourceRecordSetsInput{ ChangeBatch: &route53types.ChangeBatch{ Changes: changes, }, HostedZoneId: aws.String(s.hostedZoneId), }) - return err + return out.ChangeInfo, err } func (s *EC2Service) runInstances(ctx context.Context, clusterID, serverVersion string, ami *string, instanceCount int, instanceType types.InstanceType) ([]string, error) { @@ -572,30 +595,38 @@ func (s *EC2Service) allocateNodes(ctx context.Context, clusterID string, opts [ if err != nil { return nil, err } - } - if s.route53Enabled { - // we need to wait until the instance is ready before we can ssh in and update the hosts file - err = ec2.NewInstanceStatusOkWaiter(s.client).Wait(ctx, &ec2.DescribeInstanceStatusInput{ - InstanceIds: instanceIds, - }, 5*time.Minute) + err = tryUpdateHostsFile(ctx, s, clusterID) if err != nil { return nil, err } + } - connCtx, err := s.connectionContext(clusterID) - if err != nil { - return nil, err + return instanceIds, nil +} + +func tryUpdateHostsFile(ctx context.Context, s *EC2Service, clusterID string) error { + connCtx, err := s.connectionContext(clusterID) + if err != nil { + return err + } + + timeout := time.Now().Add(time.Minute * 5) + for { + if time.Now().After(timeout) { + return errors.New("timeout trying to update hosts file") } // allow nodes to listen on custom hostnames err = common.UpdateHostsFile(ctx, s, clusterID, connCtx) - if err != nil { - return nil, err + if err == nil { + return nil } - } - return instanceIds, nil + log.Printf("update hosts file failed with %v", err) + + time.Sleep(time.Second * 5) + } } func (s *EC2Service) getFilteredClusters(ctx context.Context, filters []types.Filter) ([]*cluster.Cluster, error) { From fd7b382242a53091b1eb36d2d706e7ef969bd238 Mon Sep 17 00:00:00 2001 From: Jake Rawsthorne Date: Mon, 24 Oct 2022 10:13:59 +0100 Subject: [PATCH 4/5] replace sleeps with time.NewTimer() --- daemon/restserver.go | 61 ++++++++------- service/common/alias.go | 11 +-- service/docker/dockerservice.go | 2 +- service/docker/images.go | 8 +- service/ec2/ec2service.go | 134 +++++++++++++++++++------------- service/ec2/packer.go | 5 +- 6 files changed, 130 insertions(+), 91 deletions(-) diff --git a/daemon/restserver.go b/daemon/restserver.go index bdecb92..4539f4e 100644 --- a/daemon/restserver.go +++ b/daemon/restserver.go @@ -157,31 +157,40 @@ func (d *daemon) HttpCreateCluster(w http.ResponseWriter, r *http.Request) { s = d.ec2Service } - for { - err = s.AllocateCluster(reqCtx, clusterOpts) - if err == service.MaxCapacityError { - // update timeout and try again - clusterOpts.Deadline = time.Now().Add(timeout) - - err = d.metaStore.UpdateClusterMeta(clusterID, func(meta store.ClusterMeta) (store.ClusterMeta, error) { - meta.Timeout = clusterOpts.Deadline - return meta, nil - }) - - if err == nil { - time.Sleep(time.Duration(1) * time.Minute) - continue - } + timer := time.NewTimer(0) + const sleep = time.Minute - // fall through if error updating timeout - } - // allocate or meta update failed - if err != nil { - writeJSONError(w, err) +Loop: + for { + select { + case <-reqCtx.Done(): return + case <-timer.C: + err = s.AllocateCluster(reqCtx, clusterOpts) + if err == service.MaxCapacityError { + // update timeout and try again + clusterOpts.Deadline = time.Now().Add(timeout) + + err = d.metaStore.UpdateClusterMeta(clusterID, func(meta store.ClusterMeta) (store.ClusterMeta, error) { + meta.Timeout = clusterOpts.Deadline + return meta, nil + }) + + if err == nil { + timer.Reset(sleep) + continue + } + + // fall through if error updating timeout + } + // allocate or meta update failed + if err != nil { + writeJSONError(w, err) + return + } + // successfully allocated cluster + break Loop } - // successfully allocated cluster - break } newClusterJson := NewClusterJSON{ @@ -248,7 +257,6 @@ func (d *daemon) HttpGetDockerHost(w http.ResponseWriter, r *http.Request) { Port: hostURI.Port(), } writeJsonResponse(w, jsonResp) - return } func HttpGetVersion(w http.ResponseWriter, r *http.Request) { @@ -256,7 +264,6 @@ func HttpGetVersion(w http.ResponseWriter, r *http.Request) { Version: Version, } writeJsonResponse(w, jsonResp) - return } func (d *daemon) HttpSetupCluster(w http.ResponseWriter, r *http.Request) { @@ -378,7 +385,6 @@ func (d *daemon) HttpSetupCluster(w http.ResponseWriter, r *http.Request) { } writeJsonResponse(w, jsonCluster) - return } func (d *daemon) HttpSetupClusterEncryption(w http.ResponseWriter, r *http.Request) { @@ -886,7 +892,6 @@ func (d *daemon) HttpSetupClientCertAuth(w http.ResponseWriter, r *http.Request) ClientKey: certData.ClientKey, ClientCert: certData.ClientCert, }) - return } func (d *daemon) HttpBuildImage(w http.ResponseWriter, r *http.Request) { @@ -1007,6 +1012,10 @@ func (d *daemon) HttpCreateCloudCluster(w http.ResponseWriter, r *http.Request) meta.Timeout = time.Now().Add(timeout) return meta, nil }) + if err != nil { + writeJSONError(w, err) + return + } dCtx, cancel := context.WithDeadline(reqCtx, time.Now().Add(helper.RestTimeout)) defer cancel() diff --git a/service/common/alias.go b/service/common/alias.go index 4fcc6ec..5b857a2 100644 --- a/service/common/alias.go +++ b/service/common/alias.go @@ -1,6 +1,7 @@ package common import ( + "context" "errors" "io/ioutil" "log" @@ -38,20 +39,20 @@ func parseYaml(data []byte) (Products, error) { } //Clones/pulls the github alias repo -func GetConfigRepo(aliasRepoPath string) error { +func GetConfigRepo(ctx context.Context, aliasRepoPath string) error { log.Printf("Cloning products repo to %s", aliasRepoPath) - _, err := git.PlainClone(aliasRepoPath, false, &git.CloneOptions{ + _, err := git.PlainCloneContext(ctx, aliasRepoPath, false, &git.CloneOptions{ URL: helper.AliasRepo, Progress: os.Stdout, }) if errors.Is(err, git.ErrRepositoryAlreadyExists) { - return pullConfigRepo(aliasRepoPath) + return pullConfigRepo(ctx, aliasRepoPath) } return err } -func pullConfigRepo(aliasRepoPath string) error { +func pullConfigRepo(ctx context.Context, aliasRepoPath string) error { r, err := git.PlainOpen(aliasRepoPath) if err != nil { return err @@ -63,7 +64,7 @@ func pullConfigRepo(aliasRepoPath string) error { } log.Printf("Pulling products repo") - err = w.Pull(&git.PullOptions{RemoteName: "origin"}) + err = w.PullContext(ctx, &git.PullOptions{RemoteName: "origin"}) if errors.Is(err, git.NoErrAlreadyUpToDate) { log.Printf("%v", err) return nil diff --git a/service/docker/dockerservice.go b/service/docker/dockerservice.go index c37e041..4f5746a 100644 --- a/service/docker/dockerservice.go +++ b/service/docker/dockerservice.go @@ -200,7 +200,7 @@ func (ds *DockerService) AllocateCluster(ctx context.Context, opts service.Alloc return errors.New("cannot allocate clusters with more than 10 nodes") } - if err := common.GetConfigRepo(ds.aliasRepoPath); err != nil { + if err := common.GetConfigRepo(ctx, ds.aliasRepoPath); err != nil { log.Printf("Get config failed: %v", err) return err } diff --git a/service/docker/images.go b/service/docker/images.go index b5af125..20cdd7a 100644 --- a/service/docker/images.go +++ b/service/docker/images.go @@ -27,11 +27,6 @@ type imageEvent struct { } `json:"progressDetail"` } -type imageBuildArgs struct { - Version string - BuildNo string -} - func (ds *DockerService) imagePush(ctx context.Context, nodeVersion *common.NodeVersion) error { eventReader, err := ds.docker.ImagePush(ctx, nodeVersion.ToImageName(ds.dockerRegistry), types.ImagePushOptions{ RegistryAuth: ds.dockerRegistry, @@ -79,6 +74,9 @@ func (ds *DockerService) imageBuild(ctx context.Context, nodeVersion *common.Nod } buildCtx, err := os.Open(tarPath) + if err != nil { + return err + } defer buildCtx.Close() resp, err := ds.docker.ImageBuild(ctx, buildCtx, types.ImageBuildOptions{ diff --git a/service/ec2/ec2service.go b/service/ec2/ec2service.go index 7fb9150..a2c943d 100644 --- a/service/ec2/ec2service.go +++ b/service/ec2/ec2service.go @@ -121,7 +121,7 @@ func (s *EC2Service) AllocateCluster(ctx context.Context, opts service.AllocateC return errors.New("cannot allocate clusters with more than 10 nodes") } - if err := common.GetConfigRepo(s.aliasRepoPath); err != nil { + if err := common.GetConfigRepo(ctx, s.aliasRepoPath); err != nil { log.Printf("Get config failed: %v", err) return err } @@ -152,7 +152,7 @@ func (s *EC2Service) AllocateCluster(ctx context.Context, opts service.AllocateC return nil } -func (s *EC2Service) buildAMI(imageName string, nodeVersion *common.NodeVersion) error { +func (s *EC2Service) buildAMI(ctx context.Context, imageName string, nodeVersion *common.NodeVersion) error { log.Printf("Downloading build %s to /tmp", nodeVersion.ToPkgName()) localFileName := fmt.Sprintf("/tmp/%s", nodeVersion.ToPkgName()) @@ -169,7 +169,12 @@ func (s *EC2Service) buildAMI(imageName string, nodeVersion *common.NodeVersion) client := http.Client{} - resp, err := client.Get(remoteFileName) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, remoteFileName, nil) + if err != nil { + return err + } + + resp, err := client.Do(req) if err != nil { return err } @@ -181,7 +186,7 @@ func (s *EC2Service) buildAMI(imageName string, nodeVersion *common.NodeVersion) return err } - err = CallPacker(PackerOptions{ + err = CallPacker(ctx, PackerOptions{ BuildPkg: nodeVersion.ToPkgName(), AmiName: imageName, Arch: nodeVersion.Arch, @@ -223,11 +228,15 @@ func (s *EC2Service) ensureImageExists(ctx context.Context, nodeVersion *common. *waitChans = append(*waitChans, waitChan) s.mu.Unlock() log.Printf("Image %s is already being built, waiting...", imageName) + + childCtx, cancel := context.WithTimeout(ctx, time.Hour) + defer cancel() + select { + case <-childCtx.Done(): + return errors.New("context timed out waiting for image to be built") case err := <-waitChan: return err - case <-time.After(1 * time.Hour): - return errors.New("timed out waiting for image to be built") } } @@ -237,7 +246,7 @@ func (s *EC2Service) ensureImageExists(ctx context.Context, nodeVersion *common. log.Printf("No image found for %s, building...", imageName) - err = s.buildAMI(imageName, nodeVersion) + err = s.buildAMI(ctx, imageName, nodeVersion) // inform any waiters s.mu.Lock() @@ -284,7 +293,6 @@ func (s *EC2Service) hasSrvRecords(ctx context.Context, clusterID string) (bool, } out, err := s.route53Client.ListResourceRecordSets(ctx, &input) - if err != nil { return false, err } @@ -315,7 +323,6 @@ func (s *EC2Service) hasARecords(ctx context.Context, cluster *cluster.Cluster) } out, err := s.route53Client.ListResourceRecordSets(ctx, &input) - if err != nil { return false, err } @@ -336,29 +343,33 @@ func (s *EC2Service) hasARecords(ctx context.Context, cluster *cluster.Cluster) return true, nil } -func (s *EC2Service) createARecords(ctx context.Context, cluster *cluster.Cluster) error { - changeInfo, err := s.changeARecords(ctx, cluster, route53types.ChangeActionCreate) - if err != nil { - return err - } +func (s *EC2Service) createARecords(ctx context.Context, cluster *cluster.Cluster) (*route53types.ChangeInfo, error) { + return s.changeARecords(ctx, cluster, route53types.ChangeActionCreate) +} + +func (s *EC2Service) waitForDNSPropagation(ctx context.Context, changeID *string) error { + childCtx, cancel := context.WithTimeout(ctx, time.Minute*2) + defer cancel() + + const sleep = time.Second * 5 + timer := time.NewTimer(0) - // wait for dns records to propagate - timeout := time.Now().Add(time.Minute * 2) for { - if time.Now().After(timeout) { - return errors.New("timeout waiting for dns records to propagate") - } + select { + case <-childCtx.Done(): + return childCtx.Err() + case <-timer.C: + out, err := s.route53Client.GetChange(ctx, &route53.GetChangeInput{Id: changeID}) + if err != nil { + return err + } - out, err := s.route53Client.GetChange(ctx, &route53.GetChangeInput{Id: changeInfo.Id}) - if err != nil { - return err - } + if out.ChangeInfo.Status == route53types.ChangeStatusInsync { + return nil + } - if out.ChangeInfo.Status == route53types.ChangeStatusInsync { - return nil + timer.Reset(sleep) } - - time.Sleep(time.Second * 5) } } @@ -368,14 +379,15 @@ func (s *EC2Service) removeARecords(ctx context.Context, cluster *cluster.Cluste } func (s *EC2Service) removeSrvRecords(ctx context.Context, cluster *cluster.Cluster) error { - return s.changeSrvRecords(ctx, cluster, route53types.ChangeActionDelete) + _, err := s.changeSrvRecords(ctx, cluster, route53types.ChangeActionDelete) + return err } -func (s *EC2Service) createSrvRecords(ctx context.Context, cluster *cluster.Cluster) error { +func (s *EC2Service) createSrvRecords(ctx context.Context, cluster *cluster.Cluster) (*route53types.ChangeInfo, error) { return s.changeSrvRecords(ctx, cluster, route53types.ChangeActionCreate) } -func (s *EC2Service) changeSrvRecords(ctx context.Context, cluster *cluster.Cluster, action route53types.ChangeAction) error { +func (s *EC2Service) changeSrvRecords(ctx context.Context, cluster *cluster.Cluster, action route53types.ChangeAction) (*route53types.ChangeInfo, error) { hostnames := make([]string, 0, len(cluster.Nodes)) for _, node := range cluster.Nodes { hostnames = append(hostnames, node.Hostname) @@ -411,14 +423,14 @@ func (s *EC2Service) changeSrvRecords(ctx context.Context, cluster *cluster.Clus }) } - _, err := s.route53Client.ChangeResourceRecordSets(ctx, &route53.ChangeResourceRecordSetsInput{ + out, err := s.route53Client.ChangeResourceRecordSets(ctx, &route53.ChangeResourceRecordSetsInput{ ChangeBatch: &route53types.ChangeBatch{ Changes: changes, }, HostedZoneId: aws.String(s.hostedZoneId), }) - return err + return out.ChangeInfo, err } func (s *EC2Service) changeARecords(ctx context.Context, cluster *cluster.Cluster, action route53types.ChangeAction) (*route53types.ChangeInfo, error) { @@ -524,7 +536,6 @@ func (s *EC2Service) allocateNodes(ctx context.Context, clusterID string, opts [ }, }, }) - if err != nil { return nil, err } @@ -542,7 +553,6 @@ func (s *EC2Service) allocateNodes(ctx context.Context, clusterID string, opts [ var instanceIds []string instanceIds, err = s.runInstances(ctx, clusterID, options.ServerVersion, ami, instanceCount, instanceType) - if err != nil { return nil, err } @@ -585,13 +595,23 @@ func (s *EC2Service) allocateNodes(ctx context.Context, clusterID string, opts [ return nil, err } - err = s.createARecords(ctx, cluster) + aRecordsChangeInfo, err := s.createARecords(ctx, cluster) if err != nil { return nil, err } // srv records point to the custom hostnames - err = s.createSrvRecords(ctx, cluster) + srvRecordsChangeInfo, err := s.createSrvRecords(ctx, cluster) + if err != nil { + return nil, err + } + + err = s.waitForDNSPropagation(ctx, aRecordsChangeInfo.Id) + if err != nil { + return nil, err + } + + err = s.waitForDNSPropagation(ctx, srvRecordsChangeInfo.Id) if err != nil { return nil, err } @@ -611,21 +631,27 @@ func tryUpdateHostsFile(ctx context.Context, s *EC2Service, clusterID string) er return err } - timeout := time.Now().Add(time.Minute * 5) - for { - if time.Now().After(timeout) { - return errors.New("timeout trying to update hosts file") - } + childCtx, cancel := context.WithTimeout(ctx, time.Minute*5) + defer cancel() - // allow nodes to listen on custom hostnames - err = common.UpdateHostsFile(ctx, s, clusterID, connCtx) - if err == nil { - return nil - } + timer := time.NewTimer(0) + const sleep = time.Second * 5 + + for { + select { + case <-childCtx.Done(): + return childCtx.Err() + case <-timer.C: + // allow nodes to listen on custom hostnames + err = common.UpdateHostsFile(ctx, s, clusterID, connCtx) + if err == nil { + return nil + } - log.Printf("update hosts file failed with %v", err) + log.Printf("update hosts file failed with %v", err) - time.Sleep(time.Second * 5) + timer.Reset(sleep) + } } } @@ -925,10 +951,14 @@ func (s *EC2Service) SetupTrustedCert(ctx context.Context, clusterID string) err } for range nodes { - hostsErr := <-recv - if hostsErr != nil { - err = hostsErr - continue + select { + case hostsErr := <-recv: + if hostsErr != nil { + err = hostsErr + continue + } + case <-ctx.Done(): + return ctx.Err() } } diff --git a/service/ec2/packer.go b/service/ec2/packer.go index 4ad0474..4089b8e 100644 --- a/service/ec2/packer.go +++ b/service/ec2/packer.go @@ -1,6 +1,7 @@ package ec2 import ( + "context" "fmt" "os" "os/exec" @@ -45,7 +46,7 @@ var osToDeviceName = map[string]string{ "centos7": "/dev/sda1", } -func CallPacker(opts PackerOptions) error { +func CallPacker(ctx context.Context, opts PackerOptions) error { filePath := fmt.Sprintf("packerfiles/aws-%s.pkr.hcl", osToPackerfilePrefix[opts.OS]) err := exec.Command("packer", "init", filePath).Run() @@ -67,7 +68,7 @@ func CallPacker(opts PackerOptions) error { addArg(&args, "serverless_mode="+fmt.Sprintf("%t", opts.ServerlessMode)) args = append(args, filePath) - cmd := exec.Command("packer", args...) + cmd := exec.CommandContext(ctx, "packer", args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr From 473df02cca17087c2f686229227b731605759ca1 Mon Sep 17 00:00:00 2001 From: Jake Rawsthorne Date: Thu, 27 Oct 2022 13:38:48 +0100 Subject: [PATCH 5/5] allow running setup multiple times on ec2 cluster So that we can tear down a prevous cluster we need to pass in the ssh username and ssh key path to setup for ec2 clusters --- daemon/restserver.go | 5 ++--- service/common/clusters.go | 2 +- service/common/options.go | 18 ------------------ service/docker/dockerservice.go | 4 ++++ service/ec2/ec2service.go | 9 +++++++++ service/service.go | 14 ++++++++++++++ 6 files changed, 30 insertions(+), 22 deletions(-) delete mode 100644 service/common/options.go diff --git a/daemon/restserver.go b/daemon/restserver.go index 4539f4e..89040a3 100644 --- a/daemon/restserver.go +++ b/daemon/restserver.go @@ -15,7 +15,6 @@ import ( "github.com/couchbaselabs/cbdynclusterd/helper" "github.com/couchbaselabs/cbdynclusterd/service" "github.com/couchbaselabs/cbdynclusterd/service/cloud" - "github.com/couchbaselabs/cbdynclusterd/service/common" "github.com/couchbaselabs/cbdynclusterd/store" "github.com/gorilla/mux" @@ -312,7 +311,7 @@ func (d *daemon) HttpSetupCluster(w http.ResponseWriter, r *http.Request) { return } - epnode, err := common.SetupCluster(common.ClusterSetupOptions{ + epnode, err := s.SetupCluster(clusterID, service.ClusterSetupOptions{ Nodes: c.Nodes, Services: reqData.Services, UseHostname: reqData.UseHostname || meta.Platform == store.ClusterPlatformEC2, @@ -322,7 +321,7 @@ func (d *daemon) HttpSetupCluster(w http.ResponseWriter, r *http.Request) { StorageMode: reqData.StorageMode, Bucket: reqData.Bucket, UseDeveloperPreview: reqData.UseDeveloperPreview, - }, service.ConnectContext{}) + }) if err != nil { writeJSONError(w, err) return diff --git a/service/common/clusters.go b/service/common/clusters.go index c1cde45..7570196 100644 --- a/service/common/clusters.go +++ b/service/common/clusters.go @@ -42,7 +42,7 @@ func NewNode(hostname string, clusterVersion string, connCtx service.ConnectCont } } -func SetupCluster(opts ClusterSetupOptions, connCtx service.ConnectContext) (string, error) { +func SetupCluster(opts service.ClusterSetupOptions, connCtx service.ConnectContext) (string, error) { services := opts.Services clusterVersion := opts.Nodes[0].InitialServerVersion diff --git a/service/common/options.go b/service/common/options.go deleted file mode 100644 index 090f8aa..0000000 --- a/service/common/options.go +++ /dev/null @@ -1,18 +0,0 @@ -package common - -import ( - "github.com/couchbaselabs/cbdynclusterd/cluster" - "github.com/couchbaselabs/cbdynclusterd/helper" -) - -type ClusterSetupOptions struct { - Services []string - Nodes []*cluster.Node - UseHostname bool - UseIpv6 bool - MemoryQuota string - User *helper.UserOption - StorageMode string - Bucket *helper.BucketOption - UseDeveloperPreview bool -} diff --git a/service/docker/dockerservice.go b/service/docker/dockerservice.go index 4f5746a..4cda178 100644 --- a/service/docker/dockerservice.go +++ b/service/docker/dockerservice.go @@ -349,3 +349,7 @@ func (ds *DockerService) ConnString(ctx context.Context, clusterID string, useSS func (ds *DockerService) RunCBCollect(ctx context.Context, clusterID string) (*service.CBCollectResult, error) { return common.RunCBCollect(ctx, ds, clusterID, service.ConnectContext{}) } + +func (ds *DockerService) SetupCluster(clusterID string, opts service.ClusterSetupOptions) (string, error) { + return common.SetupCluster(opts, service.ConnectContext{}) +} diff --git a/service/ec2/ec2service.go b/service/ec2/ec2service.go index a2c943d..1257703 100644 --- a/service/ec2/ec2service.go +++ b/service/ec2/ec2service.go @@ -964,3 +964,12 @@ func (s *EC2Service) SetupTrustedCert(ctx context.Context, clusterID string) err return err } + +func (s *EC2Service) SetupCluster(clusterID string, opts service.ClusterSetupOptions) (string, error) { + connCtx, err := s.connectionContext(clusterID) + if err != nil { + return "", err + } + + return common.SetupCluster(opts, connCtx) +} diff --git a/service/service.go b/service/service.go index 3236ac3..492d212 100644 --- a/service/service.go +++ b/service/service.go @@ -6,6 +6,7 @@ import ( "time" "github.com/couchbaselabs/cbdynclusterd/cluster" + "github.com/couchbaselabs/cbdynclusterd/helper" ) type CertAuthResult struct { @@ -43,6 +44,18 @@ type ConnectContext struct { SshKeyPath string } +type ClusterSetupOptions struct { + Services []string + Nodes []*cluster.Node + UseHostname bool + UseIpv6 bool + MemoryQuota string + User *helper.UserOption + StorageMode string + Bucket *helper.BucketOption + UseDeveloperPreview bool +} + type ClusterService interface { GetCluster(ctx context.Context, clusterID string) (*cluster.Cluster, error) GetAllClusters(ctx context.Context) ([]*cluster.Cluster, error) @@ -62,6 +75,7 @@ type UnmanagedClusterService interface { ClusterService AllocateCluster(ctx context.Context, opts AllocateClusterOptions) error RunCBCollect(ctx context.Context, clusterID string) (*CBCollectResult, error) + SetupCluster(clusterID string, opts ClusterSetupOptions) (string, error) } var MaxCapacityError = errors.New("max capacity reached")