-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Merge retry logic #2719
fix: Merge retry logic #2719
Changes from 8 commits
c67b272
10e35c3
b46c5ec
6c4eab1
f28473b
b4b456b
f89456b
37c72ba
5ee9616
3f0a9bf
e8bd194
15a603c
d32736c
01f5157
d9bbd87
32634a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
import ( | ||
"container/list" | ||
"context" | ||
"sync" | ||
|
||
"github.com/ipfs/go-cid" | ||
"github.com/ipld/go-ipld-prime/linking" | ||
|
@@ -34,6 +35,7 @@ | |
) | ||
|
||
func (db *db) handleMerges(ctx context.Context, merges events.Subscription[events.DAGMerge]) { | ||
queue := newMergeQueue() | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
|
@@ -43,28 +45,37 @@ | |
return | ||
} | ||
go func() { | ||
err := db.executeMerge(ctx, merge) | ||
// ensure only one merge per docID | ||
queue.add(merge.DocID) | ||
defer queue.done(merge.DocID) | ||
|
||
var err error | ||
// retry merge up to max txn retries | ||
for i := 0; i < db.MaxTxnRetries(); i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thought: If you use the merge queue, I expect that a retry will never be needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a user updates a doc while a merge is in progress it could still happen. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😅 very true There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'd be super nice / reminder to document that here. |
||
err = db.executeMerge(ctx, merge) | ||
if errors.Is(err, badger.ErrTxnConflict) { | ||
continue // retry merge | ||
} | ||
break // merge success or error | ||
} | ||
|
||
if err != nil { | ||
log.ErrorContextE( | ||
ctx, | ||
"Failed to execute merge", | ||
err, | ||
corelog.String("CID", merge.Cid.String()), | ||
corelog.String("Error", err.Error()), | ||
) | ||
corelog.Any("Error", err), | ||
corelog.Any("Event", merge)) | ||
} | ||
if merge.Wg != nil { | ||
merge.Wg.Done() | ||
} | ||
}() | ||
} | ||
} | ||
} | ||
|
||
func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error { | ||
defer func() { | ||
// Notify the caller that the merge is complete. | ||
if dagMerge.Wg != nil { | ||
dagMerge.Wg.Done() | ||
} | ||
}() | ||
ctx, txn, err := ensureContextTxn(ctx, db, false) | ||
if err != nil { | ||
return err | ||
|
@@ -79,7 +90,7 @@ | |
ls := cidlink.DefaultLinkSystem() | ||
ls.SetReadStorage(txn.DAGstore().AsIPLDStorage()) | ||
|
||
docID, err := getDocIDFromBlock(ctx, ls, dagMerge.Cid) | ||
docID, err := client.NewDocIDFromString(dagMerge.DocID) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -100,35 +111,57 @@ | |
return err | ||
} | ||
|
||
for retry := 0; retry < db.MaxTxnRetries(); retry++ { | ||
err := mp.mergeComposites(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
err = syncIndexedDoc(ctx, docID, col) | ||
if err != nil { | ||
return err | ||
} | ||
err = txn.Commit(ctx) | ||
if err != nil { | ||
if errors.Is(err, badger.ErrTxnConflict) { | ||
txn, err = db.NewTxn(ctx, false) | ||
if err != nil { | ||
return err | ||
} | ||
ctx = SetContextTxn(ctx, txn) | ||
mp.txn = txn | ||
mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage()) | ||
// Reset the CRDTs to avoid reusing the old transaction. | ||
mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT) | ||
continue | ||
} | ||
return err | ||
} | ||
break | ||
err = mp.mergeComposites(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
err = syncIndexedDoc(ctx, docID, col) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return txn.Commit(ctx) | ||
} | ||
|
||
// mergeQueue is synchronization source to ensure that concurrent | ||
// document merges do not cause transaction conflicts. | ||
type mergeQueue struct { | ||
docs map[string]chan struct{} | ||
mu sync.Mutex | ||
} | ||
|
||
func newMergeQueue() *mergeQueue { | ||
return &mergeQueue{ | ||
docs: make(map[string]chan struct{}), | ||
} | ||
} | ||
|
||
// add adds a docID to the queue. If the docID is already in the queue, it will | ||
// wait for the docID to be removed from the queue. For every add call, done must | ||
// be called to remove the docID from the queue. Otherwise, subsequent add calls will | ||
// block forever. | ||
func (dq *mergeQueue) add(docID string) { | ||
dq.mu.Lock() | ||
done, ok := dq.docs[docID] | ||
if !ok { | ||
dq.docs[docID] = make(chan struct{}) | ||
} | ||
dq.mu.Unlock() | ||
if ok { | ||
<-done | ||
dq.add(docID) | ||
} | ||
} | ||
|
||
func (dq *mergeQueue) done(docID string) { | ||
dq.mu.Lock() | ||
defer dq.mu.Unlock() | ||
done, ok := dq.docs[docID] | ||
if ok { | ||
delete(dq.docs, docID) | ||
close(done) | ||
} | ||
} | ||
|
||
type mergeProcessor struct { | ||
|
@@ -333,18 +366,6 @@ | |
return mcrdt, nil | ||
} | ||
|
||
func getDocIDFromBlock(ctx context.Context, ls linking.LinkSystem, cid cid.Cid) (client.DocID, error) { | ||
nd, err := ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: cid}, coreblock.SchemaPrototype) | ||
if err != nil { | ||
return client.DocID{}, err | ||
} | ||
block, err := coreblock.GetFromNode(nd) | ||
if err != nil { | ||
return client.DocID{}, err | ||
} | ||
return client.NewDocIDFromString(string(block.Delta.GetDocID())) | ||
} | ||
|
||
func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) { | ||
cols, err := db.getCollections( | ||
ctx, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: If the DocID is now part of the message, we can get rid of the
getDocIDFromBlock
function. The reason I hadn't included DocID before is that I wanted to guarantee that there was no mistake with it. Like the wrong DocID for the provided CID. That was probably a bad reason when I think about it.