Skip to content

Commit b4879f4

Browse files
authored
Merge pull request #164 from ipfs/feat/improve-head-processing
Improve behaviour on Commits and irretrievable heads
2 parents 8313605 + 4d32f9e commit b4879f4

File tree

1 file changed

+40
-18
lines changed

1 file changed

+40
-18
lines changed

crdt.go

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,15 @@ func (store *Datastore) handleNext() {
364364
processHead := func(c cid.Cid) {
365365
err = store.handleBlock(c) //handleBlock blocks
366366
if err != nil {
367-
store.logger.Error(err)
368-
store.markDirty()
367+
store.logger.Errorf("error processing new head: %s", err)
368+
// For posterity: do not mark the store as
369+
// Dirty if we could not handle a block. If an
370+
// error happens here, it means the node could
371+
// not be fetched, thus it could not be
372+
// processed, thus it did not leave a branch
373+
// half-processed and there's nothign to
374+
// recover.
375+
// disabled: store.markDirty()
369376
}
370377
}
371378

@@ -530,7 +537,7 @@ func (store *Datastore) rebroadcastHeads() {
530537
store.seenHeadsMux.RUnlock()
531538

532539
// Send them out
533-
err = store.broadcast(headsToBroadcast)
540+
err = store.broadcast(store.ctx, headsToBroadcast)
534541
if err != nil {
535542
store.logger.Warn("broadcast failed: %v", err)
536543
}
@@ -661,11 +668,13 @@ func (store *Datastore) sendNewJobs(session *sync.WaitGroup, ng *crdtNodeGetter,
661668

662669
goodDeltas := make(map[cid.Cid]struct{})
663670

664-
// This gets deltas but it is unable to tells us which childrens
665-
// failed to be fetched though.
671+
var err error
672+
loop:
666673
for deltaOpt := range ng.GetDeltas(ctx, children) {
674+
// we abort whenever we a delta comes back in error.
667675
if deltaOpt.err != nil {
668-
return errors.Wrapf(deltaOpt.err, "error getting delta")
676+
err = errors.Wrapf(deltaOpt.err, "error getting delta")
677+
break
669678
}
670679
goodDeltas[deltaOpt.node.Cid()] = struct{}{}
671680

@@ -685,18 +694,23 @@ func (store *Datastore) sendNewJobs(session *sync.WaitGroup, ng *crdtNodeGetter,
685694
session.Done()
686695
// We are in the middle of sending jobs, thus we left
687696
// something unprocessed.
688-
return store.ctx.Err()
697+
err = store.ctx.Err()
698+
break loop
689699
}
690700
}
691701

692-
// Clear up any children that could not be fetched. The rest will
693-
// remove themselves in processNode().
702+
// This is a safe-guard in case GetDeltas() returns less deltas than
703+
// asked for. It clears up any children that could not be fetched from
704+
// the queue. The rest will remove themselves in processNode().
705+
// Hector: as far as I know, this should not execute unless errors
706+
// happened.
694707
for _, child := range children {
695708
if _, ok := goodDeltas[child]; !ok {
709+
store.logger.Warn("GetDeltas did not include all children")
696710
store.queuedChildren.Remove(child)
697711
}
698712
}
699-
return nil
713+
return err
700714
}
701715

702716
// the only purpose of this worker is to be able to orderly shut-down job
@@ -1029,7 +1043,7 @@ func (store *Datastore) Query(ctx context.Context, q query.Query) (query.Results
10291043
// Put stores the object `value` named by `key`.
10301044
func (store *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error {
10311045
delta := store.set.Add(ctx, key.String(), value)
1032-
return store.publish(delta)
1046+
return store.publish(ctx, delta)
10331047
}
10341048

10351049
// Delete removes the value for given `key`.
@@ -1042,7 +1056,7 @@ func (store *Datastore) Delete(ctx context.Context, key ds.Key) error {
10421056
if len(delta.Tombstones) == 0 {
10431057
return nil
10441058
}
1045-
return store.publish(delta)
1059+
return store.publish(ctx, delta)
10461060
}
10471061

10481062
// Sync ensures that all the data under the given prefix is flushed to disk in
@@ -1170,10 +1184,10 @@ func (store *Datastore) updateDelta(newDelta *pb.Delta) int {
11701184
return size
11711185
}
11721186

1173-
func (store *Datastore) publishDelta() error {
1187+
func (store *Datastore) publishDelta(ctx context.Context) error {
11741188
store.curDeltaMux.Lock()
11751189
defer store.curDeltaMux.Unlock()
1176-
err := store.publish(store.curDelta)
1190+
err := store.publish(ctx, store.curDelta)
11771191
if err != nil {
11781192
return err
11791193
}
@@ -1200,7 +1214,7 @@ func (store *Datastore) putBlock(heads []cid.Cid, height uint64, delta *pb.Delta
12001214
return node, nil
12011215
}
12021216

1203-
func (store *Datastore) publish(delta *pb.Delta) error {
1217+
func (store *Datastore) publish(ctx context.Context, delta *pb.Delta) error {
12041218
// curDelta might be nil if nothing has been added to it
12051219
if delta == nil {
12061220
return nil
@@ -1209,7 +1223,7 @@ func (store *Datastore) publish(delta *pb.Delta) error {
12091223
if err != nil {
12101224
return err
12111225
}
1212-
return store.broadcast([]cid.Cid{c})
1226+
return store.broadcast(ctx, []cid.Cid{c})
12131227
}
12141228

12151229
func (store *Datastore) addDAGNode(delta *pb.Delta) (cid.Cid, error) {
@@ -1253,7 +1267,7 @@ func (store *Datastore) addDAGNode(delta *pb.Delta) (cid.Cid, error) {
12531267
return nd.Cid(), nil
12541268
}
12551269

1256-
func (store *Datastore) broadcast(cids []cid.Cid) error {
1270+
func (store *Datastore) broadcast(ctx context.Context, cids []cid.Cid) error {
12571271
if store.broadcaster == nil { // offline
12581272
return nil
12591273
}
@@ -1262,6 +1276,12 @@ func (store *Datastore) broadcast(cids []cid.Cid) error {
12621276
return nil
12631277
}
12641278

1279+
select {
1280+
case <-ctx.Done():
1281+
store.logger.Debugf("skipping broadcast: %s", ctx.Err())
1282+
default:
1283+
}
1284+
12651285
store.logger.Debugf("broadcasting %s", cids)
12661286

12671287
bcastBytes, err := store.encodeBroadcast(cids)
@@ -1305,8 +1325,10 @@ func (b *batch) Delete(ctx context.Context, key ds.Key) error {
13051325
return nil
13061326
}
13071327

1328+
// Commit writes the current delta as a new DAG node and publishes the new
1329+
// head. The publish step is skipped if the context is cancelled.
13081330
func (b *batch) Commit(ctx context.Context) error {
1309-
return b.store.publishDelta()
1331+
return b.store.publishDelta(ctx)
13101332
}
13111333

13121334
// PrintDAG pretty prints the current Merkle-DAG to stdout in a pretty

0 commit comments

Comments
 (0)