Skip to content
This repository has been archived by the owner on Mar 26, 2020. It is now read-only.

Commit

Permalink
Merge pull request #324 from kshlm/elastic-etcd
Browse files Browse the repository at this point in the history
ElasticEtcd
  • Loading branch information
prashanthpai authored Jun 30, 2017
2 parents 82dc32e + c665062 commit 0be2455
Show file tree
Hide file tree
Showing 43 changed files with 2,026 additions and 1,160 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ _projects/
# Ignore build and release directories
build/
releases/
pkg/elasticetcd/example/example
149 changes: 41 additions & 108 deletions commands/peers/addpeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,22 @@ package peercommands
import (
"fmt"
"net/http"
"strings"

"github.com/gluster/glusterd2/errors"
"github.com/gluster/glusterd2/etcdmgmt"
"github.com/gluster/glusterd2/peer"
restutils "github.com/gluster/glusterd2/servers/rest/utils"
"github.com/gluster/glusterd2/store"
"github.com/gluster/glusterd2/utils"

log "github.com/Sirupsen/logrus"
"github.com/pborman/uuid"
)

func isPeerInCluster(peerID string) bool {

mlist, err := etcdmgmt.EtcdMemberList()
if err != nil {
log.WithError(err).Error("isPeerInCluster: Failed to list etcd cluster members.")
return false
}

for _, memb := range mlist {
if memb.Name == peerID {
return true
}
}

return false
type peerAddReq struct {
Addresses []string
}

func addPeerHandler(w http.ResponseWriter, r *http.Request) {

// FIXME: This is not txn based, yet. Behaviour when multiple simultaneous
// add peer requests are sent to same node is unknown.

var req PeerAddReq
var req peerAddReq
if e := utils.GetJSONFromRequest(r, &req); e != nil {
restutils.SendHTTPError(w, http.StatusBadRequest, e.Error())
return
Expand All @@ -47,107 +28,59 @@ func addPeerHandler(w http.ResponseWriter, r *http.Request) {
restutils.SendHTTPError(w, http.StatusBadRequest, errors.ErrNoHostnamesPresent.Error())
return
}
log.WithField("addresses", req.Addresses).Debug("recieved request to add new peer with given addresses")

p, _ := peer.GetPeerByAddrs(req.Addresses)
if p != nil {
restutils.SendHTTPError(w, http.StatusConflict, fmt.Sprintf("Peer exists with given addresses (ID: %s)", p.ID.String()))
return
}

// A peer can have multiple addresses. For now, we use only the first
// address present in the req.Addresses list.

remotePeerAddress, err := utils.FormRemotePeerAddress(req.Addresses[0])
if err != nil {
restutils.SendHTTPError(w, http.StatusBadRequest, err.Error())
return
}

// This remote call will return the remote peer's ID (UUID), name
// and etcd peer url.
remotePeer, e := ValidateAddPeer(remotePeerAddress, &req)
if e != nil {
restutils.SendHTTPError(w, http.StatusInternalServerError, remotePeer.OpError)
return
}

// TODO: Parse addresses considering ports to figure this out.
if isPeerInCluster(remotePeer.UUID) {
restutils.SendHTTPError(w, http.StatusInternalServerError, "Peer already in cluster")
log.WithError(err).WithField("address", req.Addresses[0]).Error("failed to parse peer address")
restutils.SendHTTPError(w, http.StatusBadRequest, "failed to parse remote address")
return
}

// If user hasn't specified peer name, use the name returned by remote
// peer which defaults to it's hostname.
if req.Name == "" {
req.Name = remotePeer.PeerName
}

// Adding a member is a two step process:
// 1. Add the new member to the cluster via the members API. This is
// performed on this node i.e the one that just accepted peer add
// request from the user.
// 2. Start the new member on the target node (the new peer) with the new
// cluster configuration, including a list of the updated members
// (existing members + the new member).

newMember, e := etcdmgmt.EtcdMemberAdd("http://" + remotePeer.EtcdPeerAddress)
if e != nil {
log.WithFields(log.Fields{
"error": e,
"uuid": remotePeer.UUID,
"name": req.Name,
"address": remotePeer.EtcdPeerAddress,
}).Error("Failed to add member to etcd cluster.")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
// TODO: Try all addresses till the first one connects
client, err := getPeerServiceClient(remotePeerAddress)
if err != nil {
restutils.SendHTTPError(w, http.StatusInternalServerError, err.Error())
return
}
defer client.conn.Close()
logger := log.WithField("peer", remotePeerAddress)

log.WithField("member-id", newMember.ID).Info("Added new member to etcd cluster")
newconfig := &StoreConfig{store.Store.Endpoints()}
logger.WithField("endpoints", newconfig.Endpoints).Debug("asking new peer to join cluster with given endpoints")

mlist, e := etcdmgmt.EtcdMemberList()
if e != nil {
log.WithField("error", e).Error("Failed to list members in etcd cluster")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
// Ask the peer to join the cluster
rsp, err := client.JoinCluster(newconfig)
if err != nil {
log.WithError(err).Error("sending Join request failed")
restutils.SendHTTPError(w, http.StatusInternalServerError, "failed to send join cluster request")
return
}

// Member name of the newly added etcd member has not been set at this point.
conf := []string{}
for _, memb := range mlist {
for _, u := range memb.PeerURLs {
n := memb.Name
if memb.ID == newMember.ID {
n = remotePeer.UUID
}
conf = append(conf, fmt.Sprintf("%s=%s", n, u))
}
}

var etcdConf EtcdConfigReq
etcdConf.EtcdName = remotePeer.UUID
etcdConf.InitialCluster = strings.Join(conf, ",")
etcdConf.ClusterState = "existing"

log.WithField("initial-cluster", etcdConf.InitialCluster).Debug("Reconfiguring etcd on remote peer")

etcdrsp, e := ConfigureRemoteETCD(remotePeerAddress, &etcdConf)
if e != nil {
log.WithField("err", e).Error("Failed to configure remote etcd")
restutils.SendHTTPError(w, http.StatusInternalServerError, etcdrsp.OpError)
} else if Error(rsp.Err) != ErrNone {
err = Error(rsp.Err)
logger.WithError(err).Error("join request failed")
restutils.SendHTTPError(w, http.StatusInternalServerError, err.Error())
return
}
logger = logger.WithField("peerid", rsp.PeerID)
logger.Info("new peer joined our cluster")

// Create a new peer object and add it to the store.
p := &peer.Peer{
ID: uuid.Parse(remotePeer.UUID),
Name: req.Name,
Addresses: req.Addresses,
MemberID: newMember.ID,
}
if e = peer.AddOrUpdatePeer(p); e != nil {
log.WithFields(log.Fields{
"error": e,
"peer/node": p.Name,
}).Error("Failed to add peer into the etcd store")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
return
// Get the new peer information to reply back with
newpeer, err := peer.GetPeer(rsp.PeerID)
if err != nil {
// XXX: Don't know the correct error to send here
restutils.SendHTTPError(w, http.StatusInternalServerError, "new peer was added, but could not find peer in store. Try again later.")
} else {
restutils.SendHTTPResponse(w, http.StatusCreated, newpeer)
}

body := map[string]uuid.UUID{"id": p.ID}
restutils.SendHTTPResponse(w, http.StatusCreated, body)
// Save updated store endpoints for restarts
store.Store.UpdateEndpoints()
}
122 changes: 81 additions & 41 deletions commands/peers/deletepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,85 +3,125 @@ package peercommands
import (
"net/http"

"github.com/gluster/glusterd2/etcdmgmt"
"github.com/gluster/glusterd2/gdctx"
"github.com/gluster/glusterd2/peer"
restutils "github.com/gluster/glusterd2/servers/rest/utils"
"github.com/gluster/glusterd2/store"
"github.com/gluster/glusterd2/utils"
"github.com/gluster/glusterd2/volume"

log "github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
"github.com/pborman/uuid"
)

func deletePeerHandler(w http.ResponseWriter, r *http.Request) {

// FIXME: This is not txn based, yet. Behaviour when multiple simultaneous
// delete peer requests are sent to same node is unknown.

peerReq := mux.Vars(r)

id := peerReq["peerid"]
if id == "" {
restutils.SendHTTPError(w, http.StatusBadRequest, "peerid not present in the request")
return
}

// Deleting a peer from the cluster happens as follows,
// - Check if the peer is a member of the cluster
// - Check if the peer can be removed
// - Delete the peer info from the store
// - Send the Leave request

logger := log.WithField("peerid", id)
logger.Debug("recieved delete peer request")

// Check whether the member exists
p, e := peer.GetPeerF(id)
if e != nil || p == nil {
p, err := peer.GetPeerF(id)
if err != nil {
logger.WithError(err).Error("failed to get peer")
restutils.SendHTTPError(w, http.StatusInternalServerError, "could not validate delete request")
return
} else if p == nil {
logger.Debug("request denied, recieved request to remove unknown peer")
restutils.SendHTTPError(w, http.StatusNotFound, "peer not found in cluster")
return
}

// Removing self should be disallowed (like in glusterd1)
// You cannot remove yourself
if id == gdctx.MyUUID.String() {
restutils.SendHTTPError(w, http.StatusBadRequest, "Removing self is disallowed.")
logger.Debug("request denied, recieved request to delete self from cluster")
restutils.SendHTTPError(w, http.StatusBadRequest, "removing self is disallowed.")
return
}

// Check if any volumes exist with bricks on this peer
if exists, err := bricksExist(id); err != nil {
logger.WithError(err).Error("failed to check if bricks exist on peer")
restutils.SendHTTPError(w, http.StatusInternalServerError, "could not validate delete request")
return
} else if exists {
logger.Debug("request denied, peer has bricks")
restutils.SendHTTPError(w, http.StatusForbidden, "cannot delete peer, peer has bricks")
return
}

// Remove the peer details from the store
if err := peer.DeletePeer(id); err != nil {
log.WithError(err).WithField("peer", id).Error("failed to remove peer from the store")
restutils.SendHTTPError(w, http.StatusInternalServerError, err.Error())
return
}

remotePeerAddress, err := utils.FormRemotePeerAddress(p.Addresses[0])
if err != nil {
restutils.SendHTTPError(w, http.StatusBadRequest, err.Error())
log.WithError(err).WithField("address", p.Addresses[0]).Error("failed to parse peer address")
restutils.SendHTTPError(w, http.StatusBadRequest, "failed to parse remote address")
return
}

// Validate whether the peer can be deleted
rsp, e := ValidateDeletePeer(remotePeerAddress, id)
if e != nil {
restutils.SendHTTPError(w, http.StatusInternalServerError, rsp.OpError)
client, err := getPeerServiceClient(remotePeerAddress)
if err != nil {
restutils.SendHTTPError(w, http.StatusInternalServerError, err.Error())
return
}
defer client.conn.Close()

// Remove the peer from the store
if e := peer.DeletePeer(id); e != nil {
log.WithFields(log.Fields{
"er": e,
"peer": id,
}).Error("Failed to remove peer from the store")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
} else {
restutils.SendHTTPResponse(w, http.StatusNoContent, nil)
// TODO: Need to do a better job of handling failures here. If this fails the
// peer being removed still thinks it's a part of the cluster, and could
// potentially still send commands to the cluster
rsp, err := client.LeaveCluster()
if err != nil {
logger.WithError(err).Error("sending Leave request failed")
restutils.SendHTTPError(w, http.StatusInternalServerError, "failed to send leave cluster request")
return
} else if Error(rsp.Err) != ErrNone {
err = Error(rsp.Err)
logger.WithError(err).Error("leave request failed")
restutils.SendHTTPError(w, http.StatusInternalServerError, err.Error())
return
}
logger.Debug("peer left cluster")

// Delete member from etcd cluster
e = etcdmgmt.EtcdMemberRemove(p.MemberID)
if e != nil {
log.WithFields(log.Fields{
"er": e,
"peer": id,
}).Error("Failed to remove member from etcd cluster")
restutils.SendHTTPResponse(w, http.StatusNoContent, nil)

restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
return
// Save updated store endpoints for restarts
store.Store.UpdateEndpoints()
}

// bricksExist checks if the given peer has any bricks on it
// TODO: Move this to a more appropriate place
func bricksExist(id string) (bool, error) {
pid := uuid.Parse(id)

vols, err := volume.GetVolumes()
if err != nil {
return true, err
}

// Remove data dir of etcd on remote machine. Restart etcd on remote machine
// in standalone (single cluster) mode.
var etcdConf EtcdConfigReq
etcdConf.DeletePeer = true
etcdrsp, e := ConfigureRemoteETCD(remotePeerAddress, &etcdConf)
if e != nil {
log.WithField("err", e).Error("Failed to configure remote etcd.")
restutils.SendHTTPError(w, http.StatusInternalServerError, etcdrsp.OpError)
return
for _, v := range vols {
for _, b := range v.Bricks {
if uuid.Equal(pid, b.NodeID) {
return true, nil
}
}
}
return false, nil
}
36 changes: 36 additions & 0 deletions commands/peers/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package peercommands

// Error is the error type returned by this package
type Error int32

// Errors returned by this package
// TODO: Add more errors
const (
ErrNone Error = iota
ErrAnotherCluster
ErrHaveVolumes
ErrStoreReconfigFailed
ErrUnknownPeer
ErrMax
)

var errorStrings [ErrMax]string

func init() {
errorStrings[ErrNone] = "not an error"
errorStrings[ErrAnotherCluster] = "peer is part of another cluster"
errorStrings[ErrHaveVolumes] = "peer has existing volumes"
errorStrings[ErrStoreReconfigFailed] = "store reconfigure failed on peer"
errorStrings[ErrUnknownPeer] = "request recieved from unknown peer"
}

func (e Error) String() string {
if e <= ErrNone || e >= ErrMax {
return "unknown error"
}
return errorStrings[e]
}

func (e Error) Error() string {
return e.String()
}
Loading

0 comments on commit 0be2455

Please sign in to comment.