Skip to content

Commit f1d327f

Browse files
committed
Address comments
1 parent 61b1142 commit f1d327f

File tree

2 files changed

+109
-37
lines changed

2 files changed

+109
-37
lines changed

peer.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ var (
4747
// ErrNoNewPeers indicates that no previously unselected peer is available.
4848
ErrNoNewPeers = errors.New("no new peer available")
4949

50+
// ErrZeroPeerConnectionCount indicates that the peer connection count is set to zero.
51+
ErrZeroPeerConnectionCount = errors.New("peer connection count must be greater than 0")
52+
5053
peerRng = trand.NewSeeded()
5154
)
5255

@@ -95,11 +98,15 @@ func (l *PeerList) SetStrategy(sc ScoreCalculator) {
9598
// SetPeerConnectionCount sets the number of peer connections to be used in
9699
// combination with the ScoreCalculator to achieve a random load balancing
97100
// of a single client node to `peerConnectionCount` number of server nodes
98-
func (l *PeerList) SetPeerConnectionCount(peerConnectionCount uint32) {
101+
func (l *PeerList) SetPeerConnectionCount(peerConnectionCount uint32) error {
99102
l.Lock()
100103
defer l.Unlock()
101104

105+
if peerConnectionCount == 0 {
106+
return ErrZeroPeerConnectionCount
107+
}
102108
l.peerConnectionCount = peerConnectionCount
109+
return nil
103110
}
104111

105112
// Siblings don't share peer lists (though they take care not to double-connect
@@ -186,8 +193,8 @@ func (l *PeerList) Remove(hostPort string) error {
186193
return nil
187194
}
188195
func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool) *Peer {
189-
var chosenPSList []*peerScore
190-
var poppedList []*peerScore
196+
var chosenPSList = make([]*peerScore, 0, l.peerConnectionCount)
197+
var poppedList = make([]*peerScore, 0, l.peerConnectionCount)
191198

192199
canChoosePeer := func(hostPort string) bool {
193200
if _, ok := prevSelected[hostPort]; ok {
@@ -226,16 +233,12 @@ func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool)
226233
}
227234

228235
ps := randomSampling(chosenPSList)
229-
if ps == nil {
230-
return nil
231-
}
232236
ps.chosenCount.Inc()
233237
return ps.Peer
234238
}
235239

236240
func randomSampling(psList []*peerScore) *peerScore {
237-
peerRand := trand.NewSeeded()
238-
r := peerRand.Intn(len(psList))
241+
r := peerRng.Intn(len(psList))
239242
return psList[r]
240243
}
241244

peer_test.go

Lines changed: 98 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -697,46 +697,115 @@ func TestPeerSelectionRanking(t *testing.T) {
697697
}
698698
}
699699

700+
func TestZeroPeerConnectionCount(t *testing.T) {
701+
ch := testutils.NewClient(t, nil)
702+
defer ch.Close()
703+
err := ch.Peers().SetPeerConnectionCount(0)
704+
require.Error(t, err, "peerConnectionCount should not accept 0")
705+
}
706+
700707
func TestPeerRandomSampling(t *testing.T) {
701-
const numPeers = 10
702708
const numIterations = 1000
703-
// Using `numPeers + 1` should just do a random load balancing among `numPeers`
704-
// as we only have `numPeers` of server nodes
705-
const peerConnectionCount = numPeers + 1
706709

707-
// Selected is a map from rank -> [peer, count]
708-
// It tracks how often a peer gets selected at a specific rank.
709-
selected := make([]map[string]int, numPeers)
710-
for i := 0; i < numPeers; i++ {
711-
selected[i] = make(map[string]int)
710+
testCases := []struct {
711+
numPeers int
712+
peerConnectionCount uint32
713+
distMin float64
714+
distMax float64
715+
}{
716+
// the higher `peerConnectionCount` is, the smoother the impact of uneven scores
717+
// become as we are random sampling among `peerConnectionCount` peers
718+
{numPeers: 10, peerConnectionCount: 1, distMin: 1000, distMax: 1000},
719+
{numPeers: 10, peerConnectionCount: 5, distMin: 160, distMax: 240},
720+
{numPeers: 10, peerConnectionCount: 10, distMin: 50, distMax: 150},
721+
{numPeers: 10, peerConnectionCount: 15, distMin: 50, distMax: 150},
712722
}
713723

714-
for i := 0; i < numIterations; i++ {
715-
ch := testutils.NewClient(t, nil)
716-
defer ch.Close()
717-
ch.SetRandomSeed(int64(i * 100))
718-
// Using a strategy that has uneven scores
719-
strategy, _ := createScoreStrategy(0, 1)
720-
ch.Peers().SetStrategy(strategy)
721-
// `peerConnectionCount > 1` load balances among the top candidates
722-
// so with `peerConnectionCount` == `numPeers`, the score strategy
723-
// shouldn't have any effect
724-
ch.Peers().SetPeerConnectionCount(peerConnectionCount)
724+
for _, tc := range testCases {
725+
// Selected is a map from rank -> [peer, count]
726+
// It tracks how often a peer gets selected at a specific rank.
727+
selected := make([]map[string]int, tc.numPeers)
728+
for i := 0; i < tc.numPeers; i++ {
729+
selected[i] = make(map[string]int)
730+
}
725731

726-
for i := 0; i < numPeers; i++ {
727-
hp := fmt.Sprintf("127.0.0.1:60%v", i)
728-
ch.Peers().Add(hp)
732+
for i := 0; i < numIterations; i++ {
733+
ch := testutils.NewClient(t, nil)
734+
defer ch.Close()
735+
ch.SetRandomSeed(int64(i * 100))
736+
// Using a strategy that has uneven scores
737+
strategy, _ := createScoreStrategy(0, 1)
738+
ch.Peers().SetStrategy(strategy)
739+
ch.Peers().SetPeerConnectionCount(tc.peerConnectionCount)
740+
741+
for i := 0; i < tc.numPeers; i++ {
742+
hp := fmt.Sprintf("127.0.0.1:60%v", i)
743+
ch.Peers().Add(hp)
744+
}
745+
746+
for i := 0; i < tc.numPeers; i++ {
747+
peer, err := ch.Peers().Get(nil)
748+
require.NoError(t, err, "Peers.Get failed")
749+
selected[i][peer.HostPort()]++
750+
}
729751
}
730752

731-
for i := 0; i < numPeers; i++ {
732-
peer, err := ch.Peers().Get(nil)
733-
require.NoError(t, err, "Peers.Get failed")
734-
selected[i][peer.HostPort()]++
753+
for _, m := range selected {
754+
testDistribution(t, m, tc.distMin, tc.distMax)
735755
}
736756
}
737757

738-
for _, m := range selected {
739-
testDistribution(t, m, 50, 150)
758+
}
759+
760+
func BenchmarkGetPeerWithPeerConnectionCount1(b *testing.B) {
761+
numPeers := 10
762+
peerConnectionCount := uint32(1)
763+
764+
ch := testutils.NewClient(b, nil)
765+
defer ch.Close()
766+
ch.SetRandomSeed(int64(100))
767+
// Using a strategy that has uneven scores
768+
strategy, _ := createScoreStrategy(0, 1)
769+
ch.Peers().SetStrategy(strategy)
770+
ch.Peers().SetPeerConnectionCount(peerConnectionCount)
771+
772+
for i := 0; i < numPeers; i++ {
773+
hp := fmt.Sprintf("127.0.0.1:60%v", i)
774+
ch.Peers().Add(hp)
775+
}
776+
b.ResetTimer()
777+
778+
for i := 0; i < b.N; i++ {
779+
peer, _ := ch.Peers().Get(nil)
780+
if peer == nil {
781+
fmt.Println("Just a dummy check to guard against compiler optimization")
782+
}
783+
}
784+
}
785+
786+
func BenchmarkGetPeerWithPeerConnectionCount10(b *testing.B) {
787+
numPeers := 10
788+
peerConnectionCount := uint32(10)
789+
790+
ch := testutils.NewClient(b, nil)
791+
defer ch.Close()
792+
ch.SetRandomSeed(int64(100))
793+
// Using a strategy that has uneven scores
794+
strategy, _ := createScoreStrategy(0, 1)
795+
ch.Peers().SetStrategy(strategy)
796+
ch.Peers().SetPeerConnectionCount(peerConnectionCount)
797+
798+
for i := 0; i < numPeers; i++ {
799+
hp := fmt.Sprintf("127.0.0.1:60%v", i)
800+
ch.Peers().Add(hp)
801+
}
802+
b.ResetTimer()
803+
804+
for i := 0; i < b.N; i++ {
805+
peer, _ := ch.Peers().Get(nil)
806+
if peer == nil {
807+
fmt.Println("Just a dummy check to guard against compiler optimization")
808+
}
740809
}
741810
}
742811

0 commit comments

Comments
 (0)