Skip to content

Commit 1bc7a68

Browse files
authored
Merge pull request #157 from ipfs/feat/variable-rebroadcast-interval
broadcast: randomize rebroadcast interval by 30%
2 parents 668b0c2 + e9bd953 commit 1bc7a68

File tree

2 files changed

+29
-11
lines changed

2 files changed

+29
-11
lines changed

crdt.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24+
"math/rand"
2425
"sync"
2526
"sync/atomic"
2627
"time"
@@ -54,6 +55,10 @@ var (
5455
ErrNoMoreBroadcast = errors.New("receiving blocks aborted since no new blocks will be broadcasted")
5556
)
5657

58+
func init() {
59+
rand.Seed(time.Now().UnixNano())
60+
}
61+
5762
// A Broadcaster provides a way to send (notify) an opaque payload to
5863
// all replicas and to retrieve payloads broadcasted.
5964
type Broadcaster interface {
@@ -451,19 +456,18 @@ func (store *Datastore) encodeBroadcast(heads []cid.Cid) ([]byte, error) {
451456
return proto.Marshal(&bcastData)
452457
}
453458

459+
func randomizeInterval(d time.Duration) time.Duration {
460+
// 30% of the configured interval
461+
leeway := (d * 30 / 100)
462+
// A random number between -leeway|+leeway
463+
randomInterval := time.Duration(rand.Int63n(int64(leeway*2))) - leeway
464+
return d + randomInterval
465+
}
466+
454467
func (store *Datastore) rebroadcast() {
455-
timer := time.NewTimer(store.opts.RebroadcastInterval)
468+
timer := time.NewTimer(randomizeInterval(store.opts.RebroadcastInterval))
456469

457470
for {
458-
select {
459-
case <-store.ctx.Done():
460-
if !timer.Stop() {
461-
<-timer.C
462-
}
463-
return
464-
default:
465-
}
466-
467471
select {
468472
case <-store.ctx.Done():
469473
if !timer.Stop() {
@@ -472,7 +476,7 @@ func (store *Datastore) rebroadcast() {
472476
return
473477
case <-timer.C:
474478
store.rebroadcastHeads()
475-
timer.Reset(store.opts.RebroadcastInterval)
479+
timer.Reset(randomizeInterval(store.opts.RebroadcastInterval))
476480
}
477481
}
478482
}

crdt_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,3 +896,17 @@ func BenchmarkQueryElements(b *testing.B) {
896896
}
897897
b.Log(totalSize)
898898
}
899+
900+
func TestRandomizeInterval(t *testing.T) {
901+
prevR := 100 * time.Second
902+
for i := 0; i < 1000; i++ {
903+
r := randomizeInterval(100 * time.Second)
904+
if r < 70*time.Second || r > 130*time.Second {
905+
t.Error("r was ", r)
906+
}
907+
if prevR == r {
908+
t.Log("r and prevR were equal")
909+
}
910+
prevR = r
911+
}
912+
}

0 commit comments

Comments
 (0)