Skip to content

Commit

Permalink
fix merge retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 13, 2024
1 parent 6e96888 commit c67b272
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 110 deletions.
5 changes: 4 additions & 1 deletion events/dag_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"

"github.com/ipfs/go-cid"

"github.com/sourcenetwork/immutable"
)

Expand All @@ -23,11 +22,15 @@ type DAGMergeChannel = immutable.Option[Channel[DAGMerge]]

// DAGMerge is a notification that a merge can be performed up to the provided CID.
type DAGMerge struct {
// DocID is the unique identifier for the document being merged.
DocID string
// Cid is the id of the composite commit that formed this update in the DAG.
Cid cid.Cid
// SchemaRoot is the root identifier of the schema that defined the shape of the document that was updated.
SchemaRoot string
// Wg is a wait group that can be used to synchronize the merge,
// allowing the caller to optionnaly block until the merge is complete.
Wg *sync.WaitGroup
// RetryCount is the number of times this merge has been retried due to a conflict.
RetryCount int
}
110 changes: 68 additions & 42 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package db
import (
"container/list"
"context"
"sync"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/linking"
Expand All @@ -34,6 +35,7 @@ import (
)

func (db *db) handleMerges(ctx context.Context, merges events.Subscription[events.DAGMerge]) {
queue := newMergeQueue()
for {
select {
case <-ctx.Done():
Expand All @@ -43,28 +45,30 @@ func (db *db) handleMerges(ctx context.Context, merges events.Subscription[event
return
}
go func() {
err := db.executeMerge(ctx, merge)
if err != nil {
log.ErrorContextE(
ctx,
"Failed to execute merge",
err,
corelog.String("CID", merge.Cid.String()),
corelog.String("Error", err.Error()),
)
// ensure only one merge per docID
queue.add(merge.DocID)
defer queue.done(merge.DocID)

// retry merge up to max txn retries
for i := 0; i < db.MaxTxnRetries(); i++ {
err := db.executeMerge(ctx, merge)
if errors.Is(err, badger.ErrTxnConflict) {
continue // retry merge
}
if err != nil {
log.ErrorContextE(ctx, "Failed to execute merge", err, corelog.Any("Merge", merge))

Check warning on line 59 in internal/db/merge.go

View check run for this annotation

Codecov / codecov/patch

internal/db/merge.go#L59

Added line #L59 was not covered by tests
}
if merge.Wg != nil {
merge.Wg.Done()
}
break // merge completed
}
}()
}
}
}

func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error {
defer func() {
// Notify the caller that the merge is complete.
if dagMerge.Wg != nil {
dagMerge.Wg.Done()
}
}()
ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
return err
Expand Down Expand Up @@ -100,35 +104,57 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error
return err
}

for retry := 0; retry < db.MaxTxnRetries(); retry++ {
err := mp.mergeComposites(ctx)
if err != nil {
return err
}
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
}
err = txn.Commit(ctx)
if err != nil {
if errors.Is(err, badger.ErrTxnConflict) {
txn, err = db.NewTxn(ctx, false)
if err != nil {
return err
}
ctx = SetContextTxn(ctx, txn)
mp.txn = txn
mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage())
// Reset the CRDTs to avoid reusing the old transaction.
mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT)
continue
}
return err
}
break
err = mp.mergeComposites(ctx)
if err != nil {
return err

Check warning on line 109 in internal/db/merge.go

View check run for this annotation

Codecov / codecov/patch

internal/db/merge.go#L109

Added line #L109 was not covered by tests
}

return nil
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err

Check warning on line 114 in internal/db/merge.go

View check run for this annotation

Codecov / codecov/patch

internal/db/merge.go#L114

Added line #L114 was not covered by tests
}

return txn.Commit(ctx)
}

// mergeQueue is synchronization source to ensure that concurrent
// document merges do not cause transaction conflicts.
type mergeQueue struct {
docs map[string]chan struct{}
mu sync.Mutex
}

func newMergeQueue() *mergeQueue {
return &mergeQueue{
docs: make(map[string]chan struct{}),
}
}

// add adds a docID to the queue. If the docID is already in the queue, it will
// wait for the docID to be removed from the queue. For every add call, done must
// be called to remove the docID from the queue. Otherwise, subsequent add calls will
// block forever.
func (dq *mergeQueue) add(docID string) {
dq.mu.Lock()
done, ok := dq.docs[docID]
if !ok {
dq.docs[docID] = make(chan struct{})
}
dq.mu.Unlock()
if ok {
<-done
dq.add(docID)
}
}

func (dq *mergeQueue) done(docID string) {
dq.mu.Lock()
defer dq.mu.Unlock()
done, ok := dq.docs[docID]
if ok {
delete(dq.docs, docID)
close(done)
}
}

type mergeProcessor struct {
Expand Down
23 changes: 23 additions & 0 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package db
import (
"context"
"testing"
"time"

"github.com/fxamacker/cbor/v2"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -292,3 +293,25 @@ func encodeValue(val any) []byte {
}
return b
}

func TestMergeQueue(t *testing.T) {
q := newMergeQueue()

testDocID := "test"

q.add(testDocID)
go q.add(testDocID)
// give time for the goroutine to block
time.Sleep(10 * time.Millisecond)
require.Len(t, q.docs, 1)
q.done(testDocID)
// give time for the goroutine to add the docID
time.Sleep(10 * time.Millisecond)
q.mu.Lock()
require.Len(t, q.docs, 1)
q.mu.Unlock()
q.done(testDocID)
q.mu.Lock()
require.Len(t, q.docs, 0)
q.mu.Unlock()
}
43 changes: 1 addition & 42 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ type server struct {
pubSubEmitter event.Emitter
pushLogEmitter event.Emitter

// docQueue is used to track which documents are currently being processed.
// This is used to prevent multiple concurrent processing of the same document and
// limit unecessary transaction conflicts.
docQueue *docQueue

pb.UnimplementedServiceServer
}

Expand All @@ -73,9 +68,6 @@ func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) {
peer: p,
conns: make(map[libpeer.ID]*grpc.ClientConn),
topics: make(map[string]pubsubTopic),
docQueue: &docQueue{
docs: make(map[string]chan struct{}),
},
}

cred := insecure.NewCredentials()
Expand Down Expand Up @@ -152,38 +144,6 @@ func (s *server) GetLog(ctx context.Context, req *pb.GetLogRequest) (*pb.GetLogR
return nil, nil
}

type docQueue struct {
docs map[string]chan struct{}
mu sync.Mutex
}

// add adds a docID to the queue. If the docID is already in the queue, it will
// wait for the docID to be removed from the queue. For every add call, done must
// be called to remove the docID from the queue. Otherwise, subsequent add calls will
// block forever.
func (dq *docQueue) add(docID string) {
dq.mu.Lock()
done, ok := dq.docs[docID]
if !ok {
dq.docs[docID] = make(chan struct{})
}
dq.mu.Unlock()
if ok {
<-done
dq.add(docID)
}
}

func (dq *docQueue) done(docID string) {
dq.mu.Lock()
defer dq.mu.Unlock()
done, ok := dq.docs[docID]
if ok {
delete(dq.docs, docID)
close(done)
}
}

// PushLog receives a push log request
func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushLogReply, error) {
pid, err := peerIDFromContext(ctx)
Expand All @@ -199,9 +159,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
return nil, err
}

s.docQueue.add(docID.String())
defer func() {
s.docQueue.done(docID.String())
if s.pushLogEmitter != nil {
byPeer, err := libpeer.Decode(req.Body.Creator)
if err != nil {
Expand Down Expand Up @@ -249,6 +207,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
wg := &sync.WaitGroup{}
wg.Add(1)
s.peer.db.Events().DAGMerges.Value().Publish(events.DAGMerge{
DocID: docID.String(),
Cid: cid,
SchemaRoot: string(req.Body.SchemaRoot),
Wg: wg,
Expand Down
25 changes: 0 additions & 25 deletions net/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package net
import (
"context"
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -227,30 +226,6 @@ func TestGetHeadLog(t *testing.T) {
require.Nil(t, err)
}

func TestDocQueue(t *testing.T) {
q := docQueue{
docs: make(map[string]chan struct{}),
}

testDocID := "test"

q.add(testDocID)
go q.add(testDocID)
// give time for the goroutine to block
time.Sleep(10 * time.Millisecond)
require.Len(t, q.docs, 1)
q.done(testDocID)
// give time for the goroutine to add the docID
time.Sleep(10 * time.Millisecond)
q.mu.Lock()
require.Len(t, q.docs, 1)
q.mu.Unlock()
q.done(testDocID)
q.mu.Lock()
require.Len(t, q.docs, 0)
q.mu.Unlock()
}

func getHead(ctx context.Context, db client.DB, docID client.DocID) (cid.Cid, error) {
prefix := core.DataStoreKeyFromDocID(docID).ToHeadStoreKey().WithFieldId(core.COMPOSITE_NAMESPACE).ToString()
results, err := db.Headstore().Query(ctx, query.Query{Prefix: prefix})
Expand Down

0 comments on commit c67b272

Please sign in to comment.