Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Merge retry logic #2719

Merged
merged 16 commits into from
Jun 17, 2024
3 changes: 2 additions & 1 deletion events/dag_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"

"github.com/ipfs/go-cid"

"github.com/sourcenetwork/immutable"
)

Expand All @@ -23,6 +22,8 @@ type DAGMergeChannel = immutable.Option[Channel[DAGMerge]]

// DAGMerge is a notification that a merge can be performed up to the provided CID.
type DAGMerge struct {
// DocID is the unique identifier for the document being merged.
DocID string
// Cid is the id of the composite commit that formed this update in the DAG.
Cid cid.Cid
// SchemaRoot is the root identifier of the schema that defined the shape of the document that was updated.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8
github.com/jbenet/goprocess v0.1.4
github.com/lens-vm/lens/host-go v0.0.0-20231127204031-8d858ed2926c
github.com/lestrrat-go/jwx/v2 v2.0.21
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH
github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8 h1:WQVfplCGOHtFNyZH7eOaEqGsbbje3NP8EFeGggUvEQs=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:PVDd/V/Zz9IW+Diz9LEhD+ZYS9pKzawmtVQhVd0hcgQ=
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8 h1:adq3fTx2YXmpTPNvBRIM0Zi5lX4JjQTRjdLYKhXMkQg=
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:ej/GTRX+HjlHMs/M3zg9fM8mUlQXgHqRvPJjtp+atHw=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
Expand Down Expand Up @@ -1166,6 +1168,8 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/vito/go-sse v1.0.0 h1:e6/iTrrvy8BRrOwJwmQmlndlil+TLdxXvHi55ZDzH6M=
github.com/vito/go-sse v1.0.0/go.mod h1:2wkcaQ+jtlZ94Uve8gYZjFpL68luAjssTINA2hpgcZs=
github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s=
github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs=
Expand Down
124 changes: 74 additions & 50 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import (
"container/list"
"context"
"sync"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/linking"
Expand All @@ -34,6 +35,7 @@
)

func (db *db) handleMerges(ctx context.Context, merges events.Subscription[events.DAGMerge]) {
queue := newMergeQueue()
for {
select {
case <-ctx.Done():
Expand All @@ -43,28 +45,40 @@
return
}
go func() {
err := db.executeMerge(ctx, merge)
// ensure only one merge per docID
queue.add(merge.DocID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: If the DocID is now part of the message, we can get rid of the getDocIDFromBlock function. The reason I hadn't included DocID before is that I wanted to guarantee that there was no mistake with it. Like the wrong DocID for the provided CID. That was probably a bad reason when I think about it.

defer queue.done(merge.DocID)

// retry the merge process if a conflict occurs
//
// conficts occur when a user updates a document
// while a merge is in progress.
var err error
for i := 0; i < db.MaxTxnRetries(); i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: If you use the merge queue, I expect that a retry will never be needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user updates a doc while a merge is in progress it could still happen.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😅 very true

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user updates a doc while a merge is in progress it could still happen.

I'd be super nice / reminder to document that here.

err = db.executeMerge(ctx, merge)
if errors.Is(err, badger.ErrTxnConflict) {
continue // retry merge
}
break // merge success or error
}

if err != nil {
log.ErrorContextE(
ctx,
"Failed to execute merge",
err,
corelog.String("CID", merge.Cid.String()),
corelog.String("Error", err.Error()),
)
corelog.Any("Error", err),
corelog.Any("Event", merge))

Check warning on line 71 in internal/db/merge.go

View check run for this annotation

Codecov / codecov/patch

internal/db/merge.go#L70-L71

Added lines #L70 - L71 were not covered by tests
}
if merge.Wg != nil {
merge.Wg.Done()
}
}()
}
}
}

func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error {
defer func() {
// Notify the caller that the merge is complete.
if dagMerge.Wg != nil {
dagMerge.Wg.Done()
}
}()
ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
return err
Expand All @@ -79,7 +93,7 @@
ls := cidlink.DefaultLinkSystem()
ls.SetReadStorage(txn.DAGstore().AsIPLDStorage())

docID, err := getDocIDFromBlock(ctx, ls, dagMerge.Cid)
docID, err := client.NewDocIDFromString(dagMerge.DocID)
if err != nil {
return err
}
Expand All @@ -100,35 +114,57 @@
return err
}

for retry := 0; retry < db.MaxTxnRetries(); retry++ {
err := mp.mergeComposites(ctx)
if err != nil {
return err
}
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
}
err = txn.Commit(ctx)
if err != nil {
if errors.Is(err, badger.ErrTxnConflict) {
txn, err = db.NewTxn(ctx, false)
if err != nil {
return err
}
ctx = SetContextTxn(ctx, txn)
mp.txn = txn
mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage())
// Reset the CRDTs to avoid reusing the old transaction.
mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT)
continue
}
return err
}
break
err = mp.mergeComposites(ctx)
if err != nil {
return err

Check warning on line 119 in internal/db/merge.go

View check run for this annotation

Codecov / codecov/patch

internal/db/merge.go#L119

Added line #L119 was not covered by tests
}

return nil
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err

Check warning on line 124 in internal/db/merge.go

View check run for this annotation

Codecov / codecov/patch

internal/db/merge.go#L124

Added line #L124 was not covered by tests
}

return txn.Commit(ctx)
}

// mergeQueue is synchronization source to ensure that concurrent
// document merges do not cause transaction conflicts.
type mergeQueue struct {
docs map[string]chan struct{}
mutex sync.Mutex
}

func newMergeQueue() *mergeQueue {
return &mergeQueue{
docs: make(map[string]chan struct{}),
}
}

// add adds a docID to the queue. If the docID is already in the queue, it will
// wait for the docID to be removed from the queue. For every add call, done must
// be called to remove the docID from the queue. Otherwise, subsequent add calls will
// block forever.
func (m *mergeQueue) add(docID string) {
m.mutex.Lock()
done, ok := m.docs[docID]
if !ok {
m.docs[docID] = make(chan struct{})
}
m.mutex.Unlock()
if ok {
<-done
m.add(docID)
}
}

func (m *mergeQueue) done(docID string) {
m.mutex.Lock()
defer m.mutex.Unlock()
done, ok := m.docs[docID]
if ok {
delete(m.docs, docID)
close(done)
}
}

type mergeProcessor struct {
Expand Down Expand Up @@ -333,18 +369,6 @@
return mcrdt, nil
}

func getDocIDFromBlock(ctx context.Context, ls linking.LinkSystem, cid cid.Cid) (client.DocID, error) {
nd, err := ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: cid}, coreblock.SchemaPrototype)
if err != nil {
return client.DocID{}, err
}
block, err := coreblock.GetFromNode(nd)
if err != nil {
return client.DocID{}, err
}
return client.NewDocIDFromString(string(block.Delta.GetDocID()))
}

func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) {
cols, err := db.getCollections(
ctx,
Expand Down
28 changes: 28 additions & 0 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package db
import (
"context"
"testing"
"time"

"github.com/fxamacker/cbor/v2"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -59,6 +60,7 @@ func TestMerge_SingleBranch_NoError(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand Down Expand Up @@ -103,6 +105,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand All @@ -112,6 +115,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand Down Expand Up @@ -159,6 +163,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand All @@ -177,6 +182,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand Down Expand Up @@ -292,3 +298,25 @@ func encodeValue(val any) []byte {
}
return b
}

func TestMergeQueue(t *testing.T) {
q := newMergeQueue()

testDocID := "test"

q.add(testDocID)
go q.add(testDocID)
// give time for the goroutine to block
time.Sleep(10 * time.Millisecond)
require.Len(t, q.docs, 1)
q.done(testDocID)
// give time for the goroutine to add the docID
time.Sleep(10 * time.Millisecond)
q.mutex.Lock()
require.Len(t, q.docs, 1)
q.mutex.Unlock()
q.done(testDocID)
q.mutex.Lock()
require.Len(t, q.docs, 0)
q.mutex.Unlock()
}
Loading
Loading