Skip to content
This repository has been archived by the owner on Mar 26, 2020. It is now read-only.

Commit

Permalink
Merge pull request #284 from kshlm/rm-libkv
Browse files Browse the repository at this point in the history
Switch to etcd/clientv3 for store and remove libkv
  • Loading branch information
prashanthpai authored Apr 12, 2017
2 parents 3d2e217 + 09ceb28 commit 9062b80
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 196 deletions.
8 changes: 6 additions & 2 deletions commands/peers/peer-rpc-svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func (p *PeerService) ExportAndStoreETCDConfig(nc netctx.Context, c *EtcdConfigR
var opRet int32
var opError string

// Stop the store first
gdctx.Store.Close()

newEtcdConfig, err := etcdmgmt.GetEtcdConfig(false)
if err != nil {
opRet = -1
Expand Down Expand Up @@ -112,15 +115,16 @@ func (p *PeerService) ExportAndStoreETCDConfig(nc netctx.Context, c *EtcdConfigR
goto Out
}

// Reinitialize the store now that a new etcd instance is running
gdctx.InitStore()

if c.DeletePeer {
// After being detached from the cluster, this glusterd instance
// now should get back to clean slate i.e state of a single node
// standalone cluster.
gdctx.InitStore(true)
peer.AddSelfDetails()
} else {
// Store the etcd config in a file for use during restarts.

err = etcdmgmt.StoreEtcdConfig(newEtcdConfig)
if err != nil {
opRet = -1
Expand Down
12 changes: 11 additions & 1 deletion commands/volumes/volume-create.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,12 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
return
}

reqid := uuid.NewRandom().String()
logger := log.WithField("reqid", reqid)

nodes, e := nodesForVolCreate(req)
if e != nil {
log.WithError(e).Error("could not prepare node list")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
return
}
Expand All @@ -200,29 +204,33 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
Store: "vol-create.Store",
Rollback: "vol-create.Rollback",
LogFields: &log.Fields{
"reqid": uuid.NewRandom().String(),
"reqid": reqid,
},
}).NewTxn()
if e != nil {
logger.WithError(e).Error("failed to create transaction")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
return
}
defer txn.Cleanup()

e = txn.Ctx.Set("req", req)
if e != nil {
logger.WithError(e).Error("failed to set request in transaction context")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
return
}

vol, e := createVolinfo(req)
if e != nil {
logger.WithError(e).Error("failed to create volinfo")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
return
}

e = txn.Ctx.Set("volinfo", vol)
if e != nil {
logger.WithError(e).Error("failed to set volinfo in transaction context")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
return
}
Expand All @@ -234,12 +242,14 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
}
e = txn.Ctx.Set("volauth", volAuth)
if e != nil {
logger.WithError(e).Error("failed to set trusted credentials in transaction context")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
return
}

c, e := txn.Do()
if e != nil {
logger.WithError(e).Error("volume create transaction failed")
restutils.SendHTTPError(w, http.StatusInternalServerError, e.Error())
return
}
Expand Down
2 changes: 1 addition & 1 deletion commands/volumes/volume-start.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func volumeStartHandler(w http.ResponseWriter, r *http.Request) {

vol, e := volume.GetVolume(volname)
if e != nil {
restutils.SendHTTPError(w, http.StatusBadRequest, errors.ErrVolNotFound.Error())
restutils.SendHTTPError(w, http.StatusNotFound, errors.ErrVolNotFound.Error())
return
}
if vol.Status == volume.VolStarted {
Expand Down
2 changes: 1 addition & 1 deletion commands/volumes/volume-stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func volumeStopHandler(w http.ResponseWriter, r *http.Request) {

vol, e := volume.GetVolume(volname)
if e != nil {
restutils.SendHTTPError(w, http.StatusBadRequest, errors.ErrVolNotFound.Error())
restutils.SendHTTPError(w, http.StatusNotFound, errors.ErrVolNotFound.Error())
return
}
if vol.Status == volume.VolStopped {
Expand Down
2 changes: 1 addition & 1 deletion gdctx/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func doInit() {
// false. That is when we'll have to initialize prefixes by passing true to
// InitStore(). On subsequent restarts of glusterd, we would want to skip
// initializing prefixes by passing false to InitStore()
InitStore(!Restart)
InitStore()
}

// Init initializes the GlusterD context. This should be called once before doing anything else.
Expand Down
29 changes: 1 addition & 28 deletions gdctx/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package gdctx

import (
"github.com/gluster/glusterd2/store"

log "github.com/Sirupsen/logrus"
)

// If someone needs to use the GD2 store, all they need to do is just import context and use context.Store
Expand All @@ -12,32 +10,7 @@ var (
prefixes []string
)

// RegisterStorePrefix allows other packages to register prefixes to be initalized on the store.
// The prefixes will be created on the store during GD2 context intialization
func RegisterStorePrefix(prefix string) {
prefixes = append(prefixes, prefix)
}

// InitStore is to initialize the store
func InitStore(initPrefix bool) {
func InitStore() {
Store = store.New()

if initPrefix {

if e := Store.InitPrefix(store.GlusterPrefix); e != nil {
log.WithFields(log.Fields{
"prefix": store.GlusterPrefix,
"error": e,
}).Error("InitPrefix failed.")
}

for _, prefix := range prefixes {
if e := Store.InitPrefix(prefix); e != nil {
log.WithFields(log.Fields{
"prefix": prefix,
"error": e,
}).Error("InitPrefix failed.")
}
}
}
}
17 changes: 7 additions & 10 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package: github.com/gluster/glusterd2
import:
- package: github.com/coreos/etcd
version: ~v3.1.0
version: ~v3.1.5
subpackages:
- client
- clientv3
- embed
- package: github.com/prashanthpai/libkv
subpackages:
- store
- store/etcd
- clientv3/concurrency
- package: github.com/pborman/uuid
- package: github.com/gorilla/mux
- package: github.com/Sirupsen/logrus
Expand Down
72 changes: 29 additions & 43 deletions peer/store-utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package peer
// peer information stores in the store

import (
"context"
"encoding/json"

"github.com/gluster/glusterd2/errors"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/gluster/glusterd2/utils"

log "github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
"github.com/pborman/uuid"
)

Expand All @@ -32,10 +34,6 @@ var (
GetPeerIDByAddrF = GetPeerIDByAddr
)

func init() {
gdctx.RegisterStorePrefix(peerPrefix)
}

// AddOrUpdatePeer adds/updates given peer in the store
func AddOrUpdatePeer(p *Peer) error {
json, err := json.Marshal(p)
Expand All @@ -45,7 +43,7 @@ func AddOrUpdatePeer(p *Peer) error {

idStr := p.ID.String()

if err := gdctx.Store.Put(peerPrefix+idStr, json, nil); err != nil {
if _, err := gdctx.Store.Put(context.TODO(), peerPrefix+idStr, string(json)); err != nil {
return err
}

Expand All @@ -54,13 +52,19 @@ func AddOrUpdatePeer(p *Peer) error {

// GetPeer returns specified peer from the store
func GetPeer(id string) (*Peer, error) {
pair, err := gdctx.Store.Get(peerPrefix + id)
if err != nil || pair == nil {
resp, err := gdctx.Store.Get(context.TODO(), peerPrefix+id)
if err != nil {
return nil, err
}

// We cannot have more than one peer with a given ID
// TODO: Fix this to return a proper error
if resp.Count != 1 {
return nil, errors.ErrPeerNotFound
}

var p Peer
if err := json.Unmarshal(pair.Value, &p); err != nil {
if err := json.Unmarshal(resp.Kvs[0].Value, &p); err != nil {
return nil, err
}
return &p, nil
Expand All @@ -86,46 +90,36 @@ func GetInitialCluster() (string, error) {

// GetPeers returns all available peers in the store
func GetPeers() ([]Peer, error) {
pairs, err := gdctx.Store.List(peerPrefix)
if err != nil || pairs == nil {
resp, err := gdctx.Store.Get(context.TODO(), peerPrefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
// There will be at least one peer (current node)
peers := make([]Peer, len(pairs))
i := 0
for _, pair := range pairs {
peers := make([]Peer, len(resp.Kvs))
for i, kv := range resp.Kvs {
var p Peer

if err := json.Unmarshal(pair.Value, &p); err != nil {
if err := json.Unmarshal(kv.Value, &p); err != nil {
log.WithFields(log.Fields{
"peer": pair.Key,
"peer": string(kv.Key),
"error": err,
}).Error("Failed to unmarshal peer")
continue
}
peers[i] = p
i = i + 1
}

return peers, nil
}

// GetPeerByName returns the peer with the given name from store
func GetPeerByName(name string) (*Peer, error) {
pairs, err := gdctx.Store.List(peerPrefix)
if err != nil || pairs == nil {
peers, err := GetPeers()
if err != nil {
return nil, err
}

for _, pair := range pairs {
var p Peer
if err := json.Unmarshal(pair.Value, &p); err != nil {
log.WithFields(log.Fields{
"peer": pair.Key,
"error": err,
}).Error("Failed to unmarshal peer")
continue
}
for _, p := range peers {
if p.Name == name {
return &p, nil
}
Expand All @@ -136,36 +130,28 @@ func GetPeerByName(name string) (*Peer, error) {

// DeletePeer deletes given peer from the store
func DeletePeer(id string) error {
return gdctx.Store.Delete(peerPrefix + id)
_, e := gdctx.Store.Delete(context.TODO(), peerPrefix+id)
return e
}

// Exists checks if given peer is present in the store
func Exists(id string) bool {
b, e := gdctx.Store.Exists(peerPrefix + id)
resp, e := gdctx.Store.Get(context.TODO(), peerPrefix+id)
if e != nil {
return false
}

return b
return resp.Count == 1
}

//GetPeerByAddr returns the peer with the given address from the store
func GetPeerByAddr(addr string) (*Peer, error) {
pairs, err := gdctx.Store.List(peerPrefix)
if err != nil || pairs == nil {
return nil, err
peers, e := GetPeers()
if e != nil {
return nil, e
}

for _, pair := range pairs {
var p Peer
if err := json.Unmarshal(pair.Value, &p); err != nil {
log.WithFields(log.Fields{
"peer": pair.Key,
"error": err,
}).Error("Failed to unmarshal peer")
continue
}

for _, p := range peers {
for _, paddr := range p.Addresses {
if utils.IsPeerAddressSame(addr, paddr) {
return &p, nil
Expand Down
Loading

0 comments on commit 9062b80

Please sign in to comment.