Skip to content

Commit

Permalink
Dev etcd lease session (#162)
Browse files Browse the repository at this point in the history
* using etcd client session
  • Loading branch information
mlboy authored Aug 13, 2020
1 parent 8020b1c commit 0692ff8
Showing 1 changed file with 51 additions and 78 deletions.
129 changes: 51 additions & 78 deletions pkg/registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sync"
"time"

"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/douyu/jupiter/pkg"
"github.com/douyu/jupiter/pkg/constant"

Expand All @@ -42,9 +42,9 @@ type etcdv3Registry struct {
client *etcdv3.Client
kvs sync.Map
*Config
cancel context.CancelFunc
leases map[string]clientv3.LeaseID
rmu *sync.RWMutex
cancel context.CancelFunc
rmu *sync.RWMutex
sessions map[string]*concurrency.Session
}

func newETCDRegistry(config *Config) *etcdv3Registry {
Expand All @@ -53,11 +53,11 @@ func newETCDRegistry(config *Config) *etcdv3Registry {
}
config.logger = config.logger.With(xlog.FieldMod(ecode.ModRegistryETCD), xlog.FieldAddrAny(config.Config.Endpoints))
reg := &etcdv3Registry{
client: config.Config.Build(),
Config: config,
kvs: sync.Map{},
leases: make(map[string]clientv3.LeaseID),
rmu: &sync.RWMutex{},
client: config.Config.Build(),
Config: config,
kvs: sync.Map{},
rmu: &sync.RWMutex{},
sessions: make(map[string]*concurrency.Session),
}
return reg
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func (reg *etcdv3Registry) unregister(ctx context.Context, key string) error {
defer cancel()
}

if err := reg.delLeaseID(ctx, key); err != nil {
if err := reg.delSession(key); err != nil {
return err
}

Expand Down Expand Up @@ -223,14 +223,13 @@ func (reg *etcdv3Registry) registerMetric(ctx context.Context, info *server.Serv

opOptions := make([]clientv3.OpOption, 0)
// opOptions = append(opOptions, clientv3.WithSerializable())
if reg.Config.ServiceTTL > 0 {
leaseID, err := reg.getLeaseID(ctx, key)
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
//todo ctx without timeout for same as service life?
sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl)))
if err != nil {
return err
}
opOptions = append(opOptions, clientv3.WithLease(leaseID))
//KeepAlive ctx without timeout for same as service life
reg.keepLeaseID(ctx, leaseID)
opOptions = append(opOptions, clientv3.WithLease(sess.Lease()))
}
_, err := reg.client.Put(ctx, key, val, opOptions...)
if err != nil {
Expand All @@ -243,64 +242,6 @@ func (reg *etcdv3Registry) registerMetric(ctx context.Context, info *server.Serv
return nil

}
func (reg *etcdv3Registry) getLeaseID(ctx context.Context, k string) (clientv3.LeaseID, error) {
reg.rmu.RLock()
leaseID, ok := reg.leases[k]
reg.rmu.RUnlock()
if ok {
//from map try keep alive once
if _, err := reg.client.KeepAliveOnce(ctx, leaseID); err != nil {
if err == rpctypes.ErrLeaseNotFound {
goto grant
}
return leaseID, err
}
return leaseID, nil
}
grant:
//grant
rsp, err := reg.client.Grant(ctx, int64(reg.Config.ServiceTTL.Seconds()))
if err != nil {
return leaseID, err
}
//cache to map
reg.rmu.Lock()
reg.leases[k] = rsp.ID
reg.rmu.Unlock()
return rsp.ID, nil
}
func (reg *etcdv3Registry) keepLeaseID(ctx context.Context, leaseID clientv3.LeaseID) {
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ch, err := reg.client.KeepAlive(ctx, leaseID)
if err != nil {
return
}
for {
select {
case lkp := <-ch:
if lkp == nil {
return
}
}
}
}()
}
func (reg *etcdv3Registry) delLeaseID(ctx context.Context, k string) error {
if reg.Config.ServiceTTL > 0 {
reg.rmu.Lock()
id, ok := reg.leases[k]
delete(reg.leases, k)
reg.rmu.Unlock()
if ok {
if _, err := reg.client.Revoke(ctx, id); err != nil {
return err
}
}
}
return nil
}
func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.ServiceInfo) error {
var readCtx context.Context
var readCancel context.CancelFunc
Expand All @@ -314,14 +255,13 @@ func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.Service

opOptions := make([]clientv3.OpOption, 0)
// opOptions = append(opOptions, clientv3.WithSerializable())
if reg.Config.ServiceTTL > 0 {
leaseID, err := reg.getLeaseID(readCtx, key)
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
//todo ctx without timeout for same as service life?
sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl)))
if err != nil {
return err
}
opOptions = append(opOptions, clientv3.WithLease(leaseID))
//KeepAlive ctx without timeout for same as service life
reg.keepLeaseID(ctx, leaseID)
opOptions = append(opOptions, clientv3.WithLease(sess.Lease()))
}
_, err := reg.client.Put(readCtx, key, val, opOptions...)
if err != nil {
Expand All @@ -331,7 +271,40 @@ func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.Service
reg.logger.Info("register service", xlog.FieldKeyAny(key), xlog.FieldValueAny(val))
reg.kvs.Store(key, val)
return nil
}

func (reg *etcdv3Registry) getSession(k string, opts ...concurrency.SessionOption) (*concurrency.Session, error) {
reg.rmu.RLock()
sess, ok := reg.sessions[k]
reg.rmu.RUnlock()
if ok {
return sess, nil
}
sess, err := concurrency.NewSession(reg.client.Client)
if err != nil {
return sess, err
}
reg.rmu.Lock()
reg.sessions[k] = sess
reg.rmu.Unlock()
return sess, nil
}

func (reg *etcdv3Registry) delSession(k string) error {
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
reg.rmu.RLock()
sess, ok := reg.sessions[k]
reg.rmu.RUnlock()
if ok {
reg.rmu.Lock()
delete(reg.sessions, k)
reg.rmu.Unlock()
if err := sess.Close(); err != nil {
return err
}
}
}
return nil
}

func (reg *etcdv3Registry) registerKey(info *server.ServiceInfo) string {
Expand Down

0 comments on commit 0692ff8

Please sign in to comment.