Skip to content

Commit

Permalink
refactor HandleZkEvent logic to help application deal with event (#79)
Browse files Browse the repository at this point in the history
Co-authored-by: dongjianhui03 <[email protected]>
  • Loading branch information
Mulavar and dongjianhui03 authored Dec 13, 2021
1 parent 7eddead commit 9bf0b24
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 25 deletions.
32 changes: 14 additions & 18 deletions database/kv/zk/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type ZookeeperClient struct {
share bool
initialized uint32
reconnectCh chan struct{}
eventRegistry map[string][]*chan struct{}
eventRegistry map[string][]chan zk.Event
eventRegistryLock sync.RWMutex
zkEventHandler ZkEventHandler
Session <-chan zk.Event
Expand Down Expand Up @@ -98,10 +98,6 @@ func StateToString(state zk.State) string {
return "zookeeper has Session"
case zk.StateUnknown:
return "zookeeper unknown state"
case zk.State(zk.EventNodeDeleted):
return "zookeeper node deleted"
case zk.State(zk.EventNodeDataChanged):
return "zookeeper node data changed"
default:
return state.String()
}
Expand Down Expand Up @@ -138,7 +134,7 @@ func newClient(name string, zkAddrs []string, share bool, opts ...zkClientOption
activeNumber: 0,
share: share,
reconnectCh: make(chan struct{}),
eventRegistry: make(map[string][]*chan struct{}),
eventRegistry: make(map[string][]chan zk.Event),
Session: make(<-chan zk.Event),
zkEventHandler: &DefaultHandler{},
}
Expand Down Expand Up @@ -188,7 +184,7 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
Timeout: timeout,
share: false,
reconnectCh: make(chan struct{}),
eventRegistry: make(map[string][]*chan struct{}),
eventRegistry: make(map[string][]chan zk.Event),
Session: make(<-chan zk.Event),
zkEventHandler: &DefaultHandler{},
}
Expand Down Expand Up @@ -226,20 +222,20 @@ func (d *DefaultHandler) HandleZkEvent(z *ZookeeperClient) {
for {
select {
case event = <-z.Session:
switch (int)(event.State) {
case (int)(zk.StateDisconnected):
switch event.State {
case zk.StateDisconnected:
atomic.StoreUint32(&z.valid, 0)
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
case zk.StateConnected:
z.eventRegistryLock.RLock()
for p, a := range z.eventRegistry {
if strings.HasPrefix(p, event.Path) {
for path, a := range z.eventRegistry {
if strings.HasPrefix(event.Path, path) {
for _, e := range a {
*e <- struct{}{}
e <- event
}
}
}
z.eventRegistryLock.RUnlock()
case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
case zk.StateConnecting, zk.StateHasSession:
if state == (int)(zk.StateHasSession) {
continue
}
Expand All @@ -254,7 +250,7 @@ func (d *DefaultHandler) HandleZkEvent(z *ZookeeperClient) {
z.eventRegistryLock.RLock()
if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) {
for _, e := range a {
*e <- struct{}{}
e <- event
}
}
z.eventRegistryLock.RUnlock()
Expand All @@ -265,8 +261,8 @@ func (d *DefaultHandler) HandleZkEvent(z *ZookeeperClient) {
}

// RegisterEvent registers zookeeper events
func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" || event == nil {
func (z *ZookeeperClient) RegisterEvent(zkPath string, event chan zk.Event) {
if zkPath == "" {
return
}

Expand All @@ -278,7 +274,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
}

// UnregisterEvent unregisters zookeeper events
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event chan zk.Event) {
if zkPath == "" {
return
}
Expand Down
9 changes: 5 additions & 4 deletions database/kv/zk/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,10 @@ func TestRegisterTempSeq(t *testing.T) {

func Test_UnregisterEvent(t *testing.T) {
client := &ZookeeperClient{}
client.eventRegistry = make(map[string][]*chan struct{})
array := []*chan struct{}{}
array = append(array, new(chan struct{}))
client.eventRegistry = make(map[string][]chan zk.Event)
mockEvent := make(chan zk.Event, 1)
var array []chan zk.Event
array = append(array, mockEvent)
client.eventRegistry["test"] = array
client.UnregisterEvent("test", new(chan struct{}))
client.UnregisterEvent("test", mockEvent)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dubbogo/gost
require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/davecgh/go-spew v1.1.1
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5
github.com/dubbogo/jsonparser v1.0.1
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl83me8g=
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5 h1:XoR8SSVziXe698dt4uZYDfsmHpKLemqAgFyndQsq5Kw=
github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/jsonparser v1.0.1 h1:sAIr8gk+gkahkIm6CnUxh9wTCkbgwLEQ8dTXTnAXyzo=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
Expand Down

0 comments on commit 9bf0b24

Please sign in to comment.