Skip to content

Commit

Permalink
clock.TimeSource for future test of time-sensitive changes
Browse files Browse the repository at this point in the history
Otherwise we would have to time.Sleep for 10s :-(
  • Loading branch information
dkrotx committed Sep 12, 2024
1 parent 9e14a56 commit 21f3717
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 99 deletions.
14 changes: 9 additions & 5 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/ringpop-go/membership"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -68,6 +69,7 @@ type ring struct {
refreshChan chan *ChangedEvent
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
timeSource clock.TimeSource
scope metrics.Scope
logger log.Logger

Expand All @@ -88,6 +90,7 @@ type ring struct {
func newHashring(
service string,
provider PeerProvider,
timeSource clock.TimeSource,
logger log.Logger,
scope metrics.Scope,
) *ring {
Expand All @@ -96,8 +99,9 @@ func newHashring(
service: service,
peerProvider: provider,
shutdownCh: make(chan struct{}),
logger: logger,
refreshChan: make(chan *ChangedEvent),
timeSource: timeSource,
logger: logger,
scope: scope,
}

Expand Down Expand Up @@ -237,7 +241,7 @@ func (r *ring) Members() []HostInfo {
}

func (r *ring) refresh() (refreshed bool, err error) {
if r.members.refreshed.After(time.Now().Add(-minRefreshInternal)) {
if r.members.refreshed.After(r.timeSource.Now().Add(-minRefreshInternal)) {
// refreshed too frequently
return false, nil
}
Expand All @@ -258,7 +262,7 @@ func (r *ring) refresh() (refreshed bool, err error) {
ring.AddMembers(castToMembers(members)...)

r.members.keys = newMembersMap
r.members.refreshed = time.Now()
r.members.refreshed = r.timeSource.Now()
r.value.Store(ring)
r.logger.Info("refreshed ring members", tag.Value(members))

Expand All @@ -278,15 +282,15 @@ func (r *ring) refreshAndNotifySubscribers(event *ChangedEvent) {
func (r *ring) refreshRingWorker() {
defer r.shutdownWG.Done()

refreshTicker := time.NewTicker(defaultRefreshInterval)
refreshTicker := r.timeSource.NewTicker(defaultRefreshInterval)
defer refreshTicker.Stop()
for {
select {
case <-r.shutdownCh:
return
case event := <-r.refreshChan: // local signal or signal from provider
r.refreshAndNotifySubscribers(event)
case <-refreshTicker.C: // periodically refresh membership
case <-refreshTicker.Chan(): // periodically refresh membership

Check warning on line 293 in common/membership/hashring.go

View check run for this annotation

Codecov / codecov/patch

common/membership/hashring.go#L293

Added line #L293 was not covered by tests
r.emitHashIdentifier()
r.refreshAndNotifySubscribers(emptyEvent)

Check warning on line 295 in common/membership/hashring.go

View check run for this annotation

Codecov / codecov/patch

common/membership/hashring.go#L295

Added line #L295 was not covered by tests
}
Expand Down
180 changes: 87 additions & 93 deletions common/membership/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
)
Expand Down Expand Up @@ -97,46 +98,60 @@ func Test_ring_compareMembers(t *testing.T) {

}

func TestFailedLookupWillAskProvider(t *testing.T) {
type hashringTestData struct {
mockPeerProvider *MockPeerProvider
mockTimeSource clock.TimeSource
hashRing *ring
}

func newHashringTestData(t *testing.T) *hashringTestData {
var td hashringTestData

ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
td.mockPeerProvider = NewMockPeerProvider(ctrl)
td.mockTimeSource = clock.NewMockedTimeSourceAt(time.Now())

td.hashRing = newHashring("test-service", td.mockPeerProvider, td.mockTimeSource, log.NewNoop(), metrics.NoopScope(0))

return &td
}

pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").Times(1)
func TestFailedLookupWillAskProvider(t *testing.T) {
td := newHashringTestData(t)

td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
td.mockPeerProvider.EXPECT().GetMembers("test-service").Times(1)

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()
_, err := hr.Lookup("a")
td.hashRing.Start()
_, err := td.hashRing.Lookup("a")

assert.Error(t, err)
}

func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
td := newHashringTestData(t)

pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").Times(1).Return(randomHostInfo(3), nil)
td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
td.mockPeerProvider.EXPECT().GetMembers("test-service").Times(1).Return(randomHostInfo(3), nil)

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
// Start will also call .refresh()
hr.Start()
updatedAt := hr.members.refreshed
hr.refresh()
refreshed, err := hr.refresh()
td.hashRing.Start()
updatedAt := td.hashRing.members.refreshed
td.hashRing.refresh()
refreshed, err := td.hashRing.refresh()

assert.NoError(t, err)
assert.False(t, refreshed)
assert.Equal(t, updatedAt, hr.members.refreshed)
assert.Equal(t, updatedAt, td.hashRing.members.refreshed)

}

func TestRefreshWillNotifySubscribers(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
td := newHashringTestData(t)

var hostsToReturn []HostInfo
pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").Times(2).DoAndReturn(func(service string) ([]HostInfo, error) {
td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
td.mockPeerProvider.EXPECT().GetMembers("test-service").Times(2).DoAndReturn(func(service string) ([]HostInfo, error) {
hostsToReturn = randomHostInfo(5)
time.Sleep(time.Millisecond * 70)
return hostsToReturn, nil
Expand All @@ -148,13 +163,12 @@ func TestRefreshWillNotifySubscribers(t *testing.T) {
HostsRemoved: []string{"c"},
}

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()
td.hashRing.Start()

var changeCh = make(chan *ChangedEvent, 2)
// Check if multiple subscribers will get notified
assert.NoError(t, hr.Subscribe("subscriber1", changeCh))
assert.NoError(t, hr.Subscribe("subscriber2", changeCh))
assert.NoError(t, td.hashRing.Subscribe("subscriber1", changeCh))
assert.NoError(t, td.hashRing.Subscribe("subscriber2", changeCh))

wg := sync.WaitGroup{}
wg.Add(1)
Expand All @@ -167,114 +181,99 @@ func TestRefreshWillNotifySubscribers(t *testing.T) {
}()

// to bypass internal check
hr.members.refreshed = time.Now().AddDate(0, 0, -1)
hr.refreshChan <- changed
td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1)
td.hashRing.refreshChan <- changed
wg.Wait() // wait until both subscribers will get notification
// Test if internal members are updated
assert.ElementsMatch(t, hr.Members(), hostsToReturn, "members should contain just-added nodes")
assert.ElementsMatch(t, td.hashRing.Members(), hostsToReturn, "members should contain just-added nodes")
}

func TestSubscribeIgnoresDuplicates(t *testing.T) {
var changeCh = make(chan *ChangedEvent)
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

hr := newHashring("test-watcher", pp, log.NewNoop(), metrics.NoopScope(0))
td := newHashringTestData(t)

assert.NoError(t, hr.Subscribe("test-watcher", changeCh))
assert.Error(t, hr.Subscribe("test-watcher", changeCh))
assert.Equal(t, 1, len(hr.subscribers.keys))
assert.NoError(t, td.hashRing.Subscribe("test-service", changeCh))
assert.Error(t, td.hashRing.Subscribe("test-service", changeCh))
assert.Equal(t, 1, len(td.hashRing.subscribers.keys))
}

func TestUnsubcribeIgnoresDeletionOnEmpty(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
td := newHashringTestData(t)

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
assert.Equal(t, 0, len(hr.subscribers.keys))
assert.NoError(t, hr.Unsubscribe("test-service"))
assert.NoError(t, hr.Unsubscribe("test-service"))
assert.NoError(t, hr.Unsubscribe("test-service"))
assert.Equal(t, 0, len(td.hashRing.subscribers.keys))
assert.NoError(t, td.hashRing.Unsubscribe("test-service"))
assert.NoError(t, td.hashRing.Unsubscribe("test-service"))
assert.NoError(t, td.hashRing.Unsubscribe("test-service"))
}

func TestUnsubcribeDeletes(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
td := newHashringTestData(t)
var changeCh = make(chan *ChangedEvent)

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))

assert.Equal(t, 0, len(hr.subscribers.keys))
assert.NoError(t, hr.Subscribe("testservice1", changeCh))
assert.Equal(t, 1, len(hr.subscribers.keys))
assert.NoError(t, hr.Unsubscribe("test-service"))
assert.Equal(t, 1, len(hr.subscribers.keys))
assert.NoError(t, hr.Unsubscribe("testservice1"))
assert.Equal(t, 0, len(hr.subscribers.keys))
assert.Equal(t, 0, len(td.hashRing.subscribers.keys))
assert.NoError(t, td.hashRing.Subscribe("testservice1", changeCh))
assert.Equal(t, 1, len(td.hashRing.subscribers.keys))
assert.NoError(t, td.hashRing.Unsubscribe("test-service"))
assert.Equal(t, 1, len(td.hashRing.subscribers.keys))
assert.NoError(t, td.hashRing.Unsubscribe("testservice1"))
assert.Equal(t, 0, len(td.hashRing.subscribers.keys))

}

func TestMemberCountReturnsNumber(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
td := newHashringTestData(t)

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
assert.Equal(t, 0, hr.MemberCount())
assert.Equal(t, 0, td.hashRing.MemberCount())

ring := emptyHashring()
for _, addr := range []string{"127", "128"} {
host := NewHostInfo(addr)
ring.AddMembers(host)
}
hr.value.Store(ring)
assert.Equal(t, 2, hr.MemberCount())
td.hashRing.value.Store(ring)
assert.Equal(t, 2, td.hashRing.MemberCount())
}

func TestErrorIsPropagatedWhenProviderFails(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
pp.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error"))
td := newHashringTestData(t)

td.mockPeerProvider.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error"))

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
_, err := hr.refresh()
_, err := td.hashRing.refresh()
assert.Error(t, err)
}

func TestStopWillStopProvider(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
td := newHashringTestData(t)

pp.EXPECT().Stop().Times(1)

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.status = common.DaemonStatusStarted
hr.Stop()
td.mockPeerProvider.EXPECT().Stop().Times(1)

td.hashRing.status = common.DaemonStatusStarted
td.hashRing.Stop()
}

func TestLookupAndRefreshRaceCondition(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
td := newHashringTestData(t)
var wg sync.WaitGroup

pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").AnyTimes().DoAndReturn(func(service string) ([]HostInfo, error) {
td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
td.mockPeerProvider.EXPECT().GetMembers("test-service").AnyTimes().DoAndReturn(func(service string) ([]HostInfo, error) {
return randomHostInfo(5), nil
})
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()

td.hashRing.Start()
wg.Add(2)
go func() {
for i := 0; i < 50; i++ {
_, _ = hr.Lookup("a")
_, _ = td.hashRing.Lookup("a")
}
wg.Done()
}()
go func() {
for i := 0; i < 50; i++ {
// to bypass internal check
hr.members.refreshed = time.Now().AddDate(0, 0, -1)
_, err := hr.refresh()
td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1)
_, err := td.hashRing.refresh()
assert.NoError(t, err)
}
wg.Done()
Expand Down Expand Up @@ -322,24 +321,19 @@ func TestEmitHashringView(t *testing.T) {
},
}

for name, td := range tests {
for testName, testInput := range tests {

t.Run(name, func(t *testing.T) {
t.Run(testName, func(t *testing.T) {
td := newHashringTestData(t)

ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

pp.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) {
return td.hosts, td.lookuperr
td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) {
return testInput.hosts, testInput.lookuperr
})

pp.EXPECT().WhoAmI().DoAndReturn(func() (HostInfo, error) {
return td.selfInfo, td.selfErr
td.mockPeerProvider.EXPECT().WhoAmI().DoAndReturn(func() (HostInfo, error) {
return testInput.selfInfo, testInput.selfErr
})

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))

assert.Equal(t, td.expectedResult, hr.emitHashIdentifier())
assert.Equal(t, testInput.expectedResult, td.hashRing.emitHashIdentifier())
})
}
}
3 changes: 2 additions & 1 deletion common/membership/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync/atomic"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -116,7 +117,7 @@ func NewMultiringResolver(
}

for _, s := range services {
rpo.rings[s] = newHashring(s, provider, logger, metricsClient.Scope(metrics.HashringScope))
rpo.rings[s] = newHashring(s, provider, clock.NewRealTimeSource(), logger, metricsClient.Scope(metrics.HashringScope))
}
return rpo
}
Expand Down

0 comments on commit 21f3717

Please sign in to comment.