Skip to content

Commit 4d32f9e

Browse files
committed
Improve behaviour on Commits and irretrievable heads
This makes some minor changes to improve behaviour when shutting down an application. In this case, perhaps recently broadcasted heads become irretrievable, or perhaps we attempt to broadcast things with a cancelled context. So: - First: do not mark the datastore as dirty when a broadcasted head cannot be fetched. - Second: do not broadcast a head when the context that it was committed with is Done.
1 parent 8313605 commit 4d32f9e

File tree

1 file changed

+40
-18
lines changed

1 file changed

+40
-18
lines changed

crdt.go

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,15 @@ func (store *Datastore) handleNext() {
364364
processHead := func(c cid.Cid) {
365365
err = store.handleBlock(c) //handleBlock blocks
366366
if err != nil {
367-
store.logger.Error(err)
368-
store.markDirty()
367+
store.logger.Errorf("error processing new head: %s", err)
368+
// For posterity: do not mark the store as
369+
// Dirty if we could not handle a block. If an
370+
// error happens here, it means the node could
371+
// not be fetched, thus it could not be
372+
// processed, thus it did not leave a branch
373+
// half-processed and there's nothign to
374+
// recover.
375+
// disabled: store.markDirty()
369376
}
370377
}
371378

@@ -530,7 +537,7 @@ func (store *Datastore) rebroadcastHeads() {
530537
store.seenHeadsMux.RUnlock()
531538

532539
// Send them out
533-
err = store.broadcast(headsToBroadcast)
540+
err = store.broadcast(store.ctx, headsToBroadcast)
534541
if err != nil {
535542
store.logger.Warn("broadcast failed: %v", err)
536543
}
@@ -661,11 +668,13 @@ func (store *Datastore) sendNewJobs(session *sync.WaitGroup, ng *crdtNodeGetter,
661668

662669
goodDeltas := make(map[cid.Cid]struct{})
663670

664-
// This gets deltas but it is unable to tells us which childrens
665-
// failed to be fetched though.
671+
var err error
672+
loop:
666673
for deltaOpt := range ng.GetDeltas(ctx, children) {
674+
// we abort whenever we a delta comes back in error.
667675
if deltaOpt.err != nil {
668-
return errors.Wrapf(deltaOpt.err, "error getting delta")
676+
err = errors.Wrapf(deltaOpt.err, "error getting delta")
677+
break
669678
}
670679
goodDeltas[deltaOpt.node.Cid()] = struct{}{}
671680

@@ -685,18 +694,23 @@ func (store *Datastore) sendNewJobs(session *sync.WaitGroup, ng *crdtNodeGetter,
685694
session.Done()
686695
// We are in the middle of sending jobs, thus we left
687696
// something unprocessed.
688-
return store.ctx.Err()
697+
err = store.ctx.Err()
698+
break loop
689699
}
690700
}
691701

692-
// Clear up any children that could not be fetched. The rest will
693-
// remove themselves in processNode().
702+
// This is a safe-guard in case GetDeltas() returns less deltas than
703+
// asked for. It clears up any children that could not be fetched from
704+
// the queue. The rest will remove themselves in processNode().
705+
// Hector: as far as I know, this should not execute unless errors
706+
// happened.
694707
for _, child := range children {
695708
if _, ok := goodDeltas[child]; !ok {
709+
store.logger.Warn("GetDeltas did not include all children")
696710
store.queuedChildren.Remove(child)
697711
}
698712
}
699-
return nil
713+
return err
700714
}
701715

702716
// the only purpose of this worker is to be able to orderly shut-down job
@@ -1029,7 +1043,7 @@ func (store *Datastore) Query(ctx context.Context, q query.Query) (query.Results
10291043
// Put stores the object `value` named by `key`.
10301044
func (store *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error {
10311045
delta := store.set.Add(ctx, key.String(), value)
1032-
return store.publish(delta)
1046+
return store.publish(ctx, delta)
10331047
}
10341048

10351049
// Delete removes the value for given `key`.
@@ -1042,7 +1056,7 @@ func (store *Datastore) Delete(ctx context.Context, key ds.Key) error {
10421056
if len(delta.Tombstones) == 0 {
10431057
return nil
10441058
}
1045-
return store.publish(delta)
1059+
return store.publish(ctx, delta)
10461060
}
10471061

10481062
// Sync ensures that all the data under the given prefix is flushed to disk in
@@ -1170,10 +1184,10 @@ func (store *Datastore) updateDelta(newDelta *pb.Delta) int {
11701184
return size
11711185
}
11721186

1173-
func (store *Datastore) publishDelta() error {
1187+
func (store *Datastore) publishDelta(ctx context.Context) error {
11741188
store.curDeltaMux.Lock()
11751189
defer store.curDeltaMux.Unlock()
1176-
err := store.publish(store.curDelta)
1190+
err := store.publish(ctx, store.curDelta)
11771191
if err != nil {
11781192
return err
11791193
}
@@ -1200,7 +1214,7 @@ func (store *Datastore) putBlock(heads []cid.Cid, height uint64, delta *pb.Delta
12001214
return node, nil
12011215
}
12021216

1203-
func (store *Datastore) publish(delta *pb.Delta) error {
1217+
func (store *Datastore) publish(ctx context.Context, delta *pb.Delta) error {
12041218
// curDelta might be nil if nothing has been added to it
12051219
if delta == nil {
12061220
return nil
@@ -1209,7 +1223,7 @@ func (store *Datastore) publish(delta *pb.Delta) error {
12091223
if err != nil {
12101224
return err
12111225
}
1212-
return store.broadcast([]cid.Cid{c})
1226+
return store.broadcast(ctx, []cid.Cid{c})
12131227
}
12141228

12151229
func (store *Datastore) addDAGNode(delta *pb.Delta) (cid.Cid, error) {
@@ -1253,7 +1267,7 @@ func (store *Datastore) addDAGNode(delta *pb.Delta) (cid.Cid, error) {
12531267
return nd.Cid(), nil
12541268
}
12551269

1256-
func (store *Datastore) broadcast(cids []cid.Cid) error {
1270+
func (store *Datastore) broadcast(ctx context.Context, cids []cid.Cid) error {
12571271
if store.broadcaster == nil { // offline
12581272
return nil
12591273
}
@@ -1262,6 +1276,12 @@ func (store *Datastore) broadcast(cids []cid.Cid) error {
12621276
return nil
12631277
}
12641278

1279+
select {
1280+
case <-ctx.Done():
1281+
store.logger.Debugf("skipping broadcast: %s", ctx.Err())
1282+
default:
1283+
}
1284+
12651285
store.logger.Debugf("broadcasting %s", cids)
12661286

12671287
bcastBytes, err := store.encodeBroadcast(cids)
@@ -1305,8 +1325,10 @@ func (b *batch) Delete(ctx context.Context, key ds.Key) error {
13051325
return nil
13061326
}
13071327

1328+
// Commit writes the current delta as a new DAG node and publishes the new
1329+
// head. The publish step is skipped if the context is cancelled.
13081330
func (b *batch) Commit(ctx context.Context) error {
1309-
return b.store.publishDelta()
1331+
return b.store.publishDelta(ctx)
13101332
}
13111333

13121334
// PrintDAG pretty prints the current Merkle-DAG to stdout in a pretty

0 commit comments

Comments
 (0)