From 9bf0b246a19c37ba671d28491d190e5c7dc13d38 Mon Sep 17 00:00:00 2001 From: Mulavar <978007503@qq.com> Date: Mon, 13 Dec 2021 23:28:55 +0800 Subject: [PATCH] refactor HandleZkEvent logic to help application deal with event (#79) Co-authored-by: dongjianhui03 --- database/kv/zk/client.go | 32 ++++++++++++++------------------ database/kv/zk/client_test.go | 9 +++++---- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 22 insertions(+), 25 deletions(-) diff --git a/database/kv/zk/client.go b/database/kv/zk/client.go index 5c29669..724f4da 100644 --- a/database/kv/zk/client.go +++ b/database/kv/zk/client.go @@ -58,7 +58,7 @@ type ZookeeperClient struct { share bool initialized uint32 reconnectCh chan struct{} - eventRegistry map[string][]*chan struct{} + eventRegistry map[string][]chan zk.Event eventRegistryLock sync.RWMutex zkEventHandler ZkEventHandler Session <-chan zk.Event @@ -98,10 +98,6 @@ func StateToString(state zk.State) string { return "zookeeper has Session" case zk.StateUnknown: return "zookeeper unknown state" - case zk.State(zk.EventNodeDeleted): - return "zookeeper node deleted" - case zk.State(zk.EventNodeDataChanged): - return "zookeeper node data changed" default: return state.String() } @@ -138,7 +134,7 @@ func newClient(name string, zkAddrs []string, share bool, opts ...zkClientOption activeNumber: 0, share: share, reconnectCh: make(chan struct{}), - eventRegistry: make(map[string][]*chan struct{}), + eventRegistry: make(map[string][]chan zk.Event), Session: make(<-chan zk.Event), zkEventHandler: &DefaultHandler{}, } @@ -188,7 +184,7 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) Timeout: timeout, share: false, reconnectCh: make(chan struct{}), - eventRegistry: make(map[string][]*chan struct{}), + eventRegistry: make(map[string][]chan zk.Event), Session: make(<-chan zk.Event), zkEventHandler: &DefaultHandler{}, } @@ -226,20 +222,20 @@ func (d *DefaultHandler) HandleZkEvent(z *ZookeeperClient) { for { select { case event = <-z.Session: - switch (int)(event.State) { - case (int)(zk.StateDisconnected): + switch event.State { + case zk.StateDisconnected: atomic.StoreUint32(&z.valid, 0) - case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged): + case zk.StateConnected: z.eventRegistryLock.RLock() - for p, a := range z.eventRegistry { - if strings.HasPrefix(p, event.Path) { + for path, a := range z.eventRegistry { + if strings.HasPrefix(event.Path, path) { for _, e := range a { - *e <- struct{}{} + e <- event } } } z.eventRegistryLock.RUnlock() - case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession): + case zk.StateConnecting, zk.StateHasSession: if state == (int)(zk.StateHasSession) { continue } @@ -254,7 +250,7 @@ func (d *DefaultHandler) HandleZkEvent(z *ZookeeperClient) { z.eventRegistryLock.RLock() if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) { for _, e := range a { - *e <- struct{}{} + e <- event } } z.eventRegistryLock.RUnlock() @@ -265,8 +261,8 @@ func (d *DefaultHandler) HandleZkEvent(z *ZookeeperClient) { } // RegisterEvent registers zookeeper events -func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { - if zkPath == "" || event == nil { +func (z *ZookeeperClient) RegisterEvent(zkPath string, event chan zk.Event) { + if zkPath == "" { return } @@ -278,7 +274,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { } // UnregisterEvent unregisters zookeeper events -func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { +func (z *ZookeeperClient) UnregisterEvent(zkPath string, event chan zk.Event) { if zkPath == "" { return } diff --git a/database/kv/zk/client_test.go b/database/kv/zk/client_test.go index 8e05414..338b076 100644 --- a/database/kv/zk/client_test.go +++ b/database/kv/zk/client_test.go @@ -206,9 +206,10 @@ func TestRegisterTempSeq(t *testing.T) { func Test_UnregisterEvent(t *testing.T) { client := &ZookeeperClient{} - client.eventRegistry = make(map[string][]*chan struct{}) - array := []*chan struct{}{} - array = append(array, new(chan struct{})) + client.eventRegistry = make(map[string][]chan zk.Event) + mockEvent := make(chan zk.Event, 1) + var array []chan zk.Event + array = append(array, mockEvent) client.eventRegistry["test"] = array - client.UnregisterEvent("test", new(chan struct{})) + client.UnregisterEvent("test", mockEvent) } diff --git a/go.mod b/go.mod index ef65712..5e7fb6f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/dubbogo/gost require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/davecgh/go-spew v1.1.1 - github.com/dubbogo/go-zookeeper v1.0.3 + github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5 github.com/dubbogo/jsonparser v1.0.1 github.com/go-ole/go-ole v1.2.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect diff --git a/go.sum b/go.sum index c1de259..07f7c5c 100644 --- a/go.sum +++ b/go.sum @@ -90,8 +90,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= -github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl83me8g= -github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= +github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5 h1:XoR8SSVziXe698dt4uZYDfsmHpKLemqAgFyndQsq5Kw= +github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= github.com/dubbogo/jsonparser v1.0.1 h1:sAIr8gk+gkahkIm6CnUxh9wTCkbgwLEQ8dTXTnAXyzo= github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=