Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 37 additions & 29 deletions daemon/restserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/couchbaselabs/cbdynclusterd/helper"
"github.com/couchbaselabs/cbdynclusterd/service"
"github.com/couchbaselabs/cbdynclusterd/service/cloud"
"github.com/couchbaselabs/cbdynclusterd/service/common"
"github.com/couchbaselabs/cbdynclusterd/store"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -157,31 +156,40 @@ func (d *daemon) HttpCreateCluster(w http.ResponseWriter, r *http.Request) {
s = d.ec2Service
}

for {
err = s.AllocateCluster(reqCtx, clusterOpts)
if err == service.MaxCapacityError {
// update timeout and try again
clusterOpts.Deadline = time.Now().Add(timeout)

err = d.metaStore.UpdateClusterMeta(clusterID, func(meta store.ClusterMeta) (store.ClusterMeta, error) {
meta.Timeout = clusterOpts.Deadline
return meta, nil
})

if err == nil {
time.Sleep(time.Duration(1) * time.Minute)
continue
}
timer := time.NewTimer(0)
const sleep = time.Minute

// fall through if error updating timeout
}
// allocate or meta update failed
if err != nil {
writeJSONError(w, err)
Loop:
for {
select {
case <-reqCtx.Done():
return
case <-timer.C:
err = s.AllocateCluster(reqCtx, clusterOpts)
if err == service.MaxCapacityError {
// update timeout and try again
clusterOpts.Deadline = time.Now().Add(timeout)

err = d.metaStore.UpdateClusterMeta(clusterID, func(meta store.ClusterMeta) (store.ClusterMeta, error) {
meta.Timeout = clusterOpts.Deadline
return meta, nil
})

if err == nil {
timer.Reset(sleep)
continue
}

// fall through if error updating timeout
}
// allocate or meta update failed
if err != nil {
writeJSONError(w, err)
return
}
// successfully allocated cluster
break Loop
}
// successfully allocated cluster
break
}

newClusterJson := NewClusterJSON{
Expand Down Expand Up @@ -248,15 +256,13 @@ func (d *daemon) HttpGetDockerHost(w http.ResponseWriter, r *http.Request) {
Port: hostURI.Port(),
}
writeJsonResponse(w, jsonResp)
return
}

func HttpGetVersion(w http.ResponseWriter, r *http.Request) {
jsonResp := &VersionJSON{
Version: Version,
}
writeJsonResponse(w, jsonResp)
return
}

func (d *daemon) HttpSetupCluster(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -305,7 +311,7 @@ func (d *daemon) HttpSetupCluster(w http.ResponseWriter, r *http.Request) {
return
}

epnode, err := common.SetupCluster(common.ClusterSetupOptions{
epnode, err := s.SetupCluster(clusterID, service.ClusterSetupOptions{
Nodes: c.Nodes,
Services: reqData.Services,
UseHostname: reqData.UseHostname || meta.Platform == store.ClusterPlatformEC2,
Expand All @@ -315,7 +321,7 @@ func (d *daemon) HttpSetupCluster(w http.ResponseWriter, r *http.Request) {
StorageMode: reqData.StorageMode,
Bucket: reqData.Bucket,
UseDeveloperPreview: reqData.UseDeveloperPreview,
}, service.ConnectContext{})
})
if err != nil {
writeJSONError(w, err)
return
Expand Down Expand Up @@ -378,7 +384,6 @@ func (d *daemon) HttpSetupCluster(w http.ResponseWriter, r *http.Request) {
}

writeJsonResponse(w, jsonCluster)
return
}

func (d *daemon) HttpSetupClusterEncryption(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -886,7 +891,6 @@ func (d *daemon) HttpSetupClientCertAuth(w http.ResponseWriter, r *http.Request)
ClientKey: certData.ClientKey,
ClientCert: certData.ClientCert,
})
return
}

func (d *daemon) HttpBuildImage(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -1007,6 +1011,10 @@ func (d *daemon) HttpCreateCloudCluster(w http.ResponseWriter, r *http.Request)
meta.Timeout = time.Now().Add(timeout)
return meta, nil
})
if err != nil {
writeJSONError(w, err)
return
}

dCtx, cancel := context.WithDeadline(reqCtx, time.Now().Add(helper.RestTimeout))
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
PFailover = "/controller/failOver"
PEject = "/controller/ejectNode"
PSetupServices = "/node/controller/setupServices"
ClusterInit = "/clusterInit"
PPoolsDefault = "/pools/default"
PSettingsWeb = "/settings/web"
PRbacUsers = "/settings/rbac/users/local"
Expand Down
11 changes: 6 additions & 5 deletions service/common/alias.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"errors"
"io/ioutil"
"log"
Expand Down Expand Up @@ -38,20 +39,20 @@ func parseYaml(data []byte) (Products, error) {
}

//Clones/pulls the github alias repo
func GetConfigRepo(aliasRepoPath string) error {
func GetConfigRepo(ctx context.Context, aliasRepoPath string) error {
log.Printf("Cloning products repo to %s", aliasRepoPath)
_, err := git.PlainClone(aliasRepoPath, false, &git.CloneOptions{
_, err := git.PlainCloneContext(ctx, aliasRepoPath, false, &git.CloneOptions{
URL: helper.AliasRepo,
Progress: os.Stdout,
})

if errors.Is(err, git.ErrRepositoryAlreadyExists) {
return pullConfigRepo(aliasRepoPath)
return pullConfigRepo(ctx, aliasRepoPath)
}
return err
}

func pullConfigRepo(aliasRepoPath string) error {
func pullConfigRepo(ctx context.Context, aliasRepoPath string) error {
r, err := git.PlainOpen(aliasRepoPath)
if err != nil {
return err
Expand All @@ -63,7 +64,7 @@ func pullConfigRepo(aliasRepoPath string) error {
}

log.Printf("Pulling products repo")
err = w.Pull(&git.PullOptions{RemoteName: "origin"})
err = w.PullContext(ctx, &git.PullOptions{RemoteName: "origin"})
if errors.Is(err, git.NoErrAlreadyUpToDate) {
log.Printf("%v", err)
return nil
Expand Down
4 changes: 2 additions & 2 deletions service/common/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewNode(hostname string, clusterVersion string, connCtx service.ConnectCont
}
}

func SetupCluster(opts ClusterSetupOptions, connCtx service.ConnectContext) (string, error) {
func SetupCluster(opts service.ClusterSetupOptions, connCtx service.ConnectContext) (string, error) {
services := opts.Services
clusterVersion := opts.Nodes[0].InitialServerVersion

Expand Down Expand Up @@ -106,7 +106,7 @@ func AddSampleBucket(ctx context.Context, s service.ClusterService, clusterID st
}

if helper.SampleBucketsCount[opts.SampleBucket] == 0 {
return errors.New("Unknown sample bucket")
return errors.New("unknown sample bucket")
}

if len(c.Nodes) == 0 {
Expand Down
Loading