Skip to content

Commit 668b0c2

Browse files
authored
Fix: ensure timely shutdown of rebroadcast heads. (#156)
I have observed this routine sometimes just stayed running, potentially preventing the shutdown of the whole datastore.
2 parents 641e9fd + fdb6d41 commit 668b0c2

File tree

1 file changed

+30
-20
lines changed

1 file changed

+30
-20
lines changed

crdt.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,6 @@ type Datastore struct {
179179
seenHeadsMux sync.RWMutex
180180
seenHeads map[cid.Cid]struct{}
181181

182-
rebroadcastTicker *time.Ticker
183-
184182
curDeltaMux sync.Mutex
185183
curDelta *pb.Delta // current, unpublished delta
186184

@@ -270,21 +268,20 @@ func New(
270268
}
271269

272270
dstore := &Datastore{
273-
ctx: ctx,
274-
cancel: cancel,
275-
opts: opts,
276-
logger: opts.Logger,
277-
store: store,
278-
namespace: namespace,
279-
set: set,
280-
heads: heads,
281-
dagService: dagSyncer,
282-
broadcaster: bcast,
283-
seenHeads: make(map[cid.Cid]struct{}),
284-
rebroadcastTicker: time.NewTicker(opts.RebroadcastInterval),
285-
jobQueue: make(chan *dagJob, opts.NumWorkers),
286-
sendJobs: make(chan *dagJob),
287-
queuedChildren: newCidSafeSet(),
271+
ctx: ctx,
272+
cancel: cancel,
273+
opts: opts,
274+
logger: opts.Logger,
275+
store: store,
276+
namespace: namespace,
277+
set: set,
278+
heads: heads,
279+
dagService: dagSyncer,
280+
broadcaster: bcast,
281+
seenHeads: make(map[cid.Cid]struct{}),
282+
jobQueue: make(chan *dagJob, opts.NumWorkers),
283+
sendJobs: make(chan *dagJob),
284+
queuedChildren: newCidSafeSet(),
288285
}
289286

290287
headList, maxHeight, err := dstore.heads.List()
@@ -455,14 +452,27 @@ func (store *Datastore) encodeBroadcast(heads []cid.Cid) ([]byte, error) {
455452
}
456453

457454
func (store *Datastore) rebroadcast() {
458-
ticker := store.rebroadcastTicker
459-
defer ticker.Stop()
455+
timer := time.NewTimer(store.opts.RebroadcastInterval)
456+
460457
for {
461458
select {
462459
case <-store.ctx.Done():
460+
if !timer.Stop() {
461+
<-timer.C
462+
}
463463
return
464-
case <-ticker.C:
464+
default:
465+
}
466+
467+
select {
468+
case <-store.ctx.Done():
469+
if !timer.Stop() {
470+
<-timer.C
471+
}
472+
return
473+
case <-timer.C:
465474
store.rebroadcastHeads()
475+
timer.Reset(store.opts.RebroadcastInterval)
466476
}
467477
}
468478
}

0 commit comments

Comments
 (0)