Skip to content

Commit

Permalink
Always notify subscribers on membership change
Browse files Browse the repository at this point in the history
Currently there is an issue:
 - we loose lots of updates (a separate PR to address this)
 - we rely on periodic (every 10s) update to update membership
   regardless
 - but it doesn't notify subscribers

Because of that we were sometimes missing notifications in history and matching and didn't get any before the next rollout / restart.
  • Loading branch information
dkrotx committed Sep 12, 2024
1 parent 04add2d commit 9e14a56
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (

// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request
var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"}
var emptyEvent = &ChangedEvent{}

const (
minRefreshInternal = time.Second * 4
Expand Down Expand Up @@ -162,7 +163,7 @@ func (r *ring) Lookup(
addr, found := r.ring().Lookup(key)
if !found {
select {
case r.refreshChan <- &ChangedEvent{}:
case r.refreshChan <- emptyEvent:

Check warning on line 166 in common/membership/hashring.go

View check run for this annotation

Codecov / codecov/patch

common/membership/hashring.go#L166

Added line #L166 was not covered by tests
default:
}
return HostInfo{}, ErrInsufficientHosts
Expand Down Expand Up @@ -264,6 +265,16 @@ func (r *ring) refresh() (refreshed bool, err error) {
return true, nil
}

func (r *ring) refreshAndNotifySubscribers(event *ChangedEvent) {
refreshed, err := r.refresh()
if err != nil {
r.logger.Error("refreshing ring", tag.Error(err))

Check warning on line 271 in common/membership/hashring.go

View check run for this annotation

Codecov / codecov/patch

common/membership/hashring.go#L271

Added line #L271 was not covered by tests
}
if refreshed {
r.notifySubscribers(event)
}
}

func (r *ring) refreshRingWorker() {
defer r.shutdownWG.Done()

Expand All @@ -274,18 +285,10 @@ func (r *ring) refreshRingWorker() {
case <-r.shutdownCh:
return
case event := <-r.refreshChan: // local signal or signal from provider
refreshed, err := r.refresh()
if err != nil {
r.logger.Error("refreshing ring", tag.Error(err))
}
if refreshed {
r.notifySubscribers(event)
}
r.refreshAndNotifySubscribers(event)
case <-refreshTicker.C: // periodically refresh membership
r.emitHashIdentifier()
if _, err := r.refresh(); err != nil {
r.logger.Error("periodically refreshing ring", tag.Error(err))
}
r.refreshAndNotifySubscribers(emptyEvent)

Check warning on line 291 in common/membership/hashring.go

View check run for this annotation

Codecov / codecov/patch

common/membership/hashring.go#L291

Added line #L291 was not covered by tests
}
}
}
Expand Down

0 comments on commit 9e14a56

Please sign in to comment.