-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Merge retry logic #2719
fix: Merge retry logic #2719
Changes from 2 commits
c67b272
10e35c3
b46c5ec
6c4eab1
f28473b
b4b456b
f89456b
37c72ba
5ee9616
3f0a9bf
e8bd194
15a603c
d32736c
01f5157
d9bbd87
32634a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
import ( | ||
"container/list" | ||
"context" | ||
"sync" | ||
|
||
"github.com/ipfs/go-cid" | ||
"github.com/ipld/go-ipld-prime/linking" | ||
|
@@ -34,6 +35,7 @@ | |
) | ||
|
||
func (db *db) handleMerges(ctx context.Context, merges events.Subscription[events.DAGMerge]) { | ||
queue := newMergeQueue() | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
|
@@ -43,28 +45,37 @@ | |
return | ||
} | ||
go func() { | ||
err := db.executeMerge(ctx, merge) | ||
// ensure only one merge per docID | ||
queue.add(merge.DocID) | ||
defer queue.done(merge.DocID) | ||
|
||
var err error | ||
// retry merge up to max txn retries | ||
for i := 0; i < db.MaxTxnRetries(); i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thought: If you use the merge queue, I expect that a retry will never be needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a user updates a doc while a merge is in progress it could still happen. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😅 very true There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'd be super nice / reminder to document that here. |
||
err = db.executeMerge(ctx, merge) | ||
if errors.Is(err, badger.ErrTxnConflict) { | ||
continue // retry merge | ||
} | ||
break // merge success or error | ||
} | ||
|
||
if err != nil { | ||
log.ErrorContextE( | ||
ctx, | ||
"Failed to execute merge", | ||
err, | ||
corelog.String("CID", merge.Cid.String()), | ||
corelog.String("Error", err.Error()), | ||
) | ||
corelog.Any("Error", err), | ||
corelog.Any("Event", merge)) | ||
} | ||
if merge.Wg != nil { | ||
merge.Wg.Done() | ||
} | ||
}() | ||
} | ||
} | ||
} | ||
|
||
func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error { | ||
defer func() { | ||
// Notify the caller that the merge is complete. | ||
if dagMerge.Wg != nil { | ||
dagMerge.Wg.Done() | ||
} | ||
}() | ||
ctx, txn, err := ensureContextTxn(ctx, db, false) | ||
if err != nil { | ||
return err | ||
|
@@ -100,35 +111,57 @@ | |
return err | ||
} | ||
|
||
for retry := 0; retry < db.MaxTxnRetries(); retry++ { | ||
err := mp.mergeComposites(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
err = syncIndexedDoc(ctx, docID, col) | ||
if err != nil { | ||
return err | ||
} | ||
err = txn.Commit(ctx) | ||
if err != nil { | ||
if errors.Is(err, badger.ErrTxnConflict) { | ||
txn, err = db.NewTxn(ctx, false) | ||
if err != nil { | ||
return err | ||
} | ||
ctx = SetContextTxn(ctx, txn) | ||
mp.txn = txn | ||
mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage()) | ||
// Reset the CRDTs to avoid reusing the old transaction. | ||
mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT) | ||
continue | ||
} | ||
return err | ||
} | ||
break | ||
err = mp.mergeComposites(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
err = syncIndexedDoc(ctx, docID, col) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return txn.Commit(ctx) | ||
} | ||
|
||
// mergeQueue is synchronization source to ensure that concurrent | ||
// document merges do not cause transaction conflicts. | ||
type mergeQueue struct { | ||
docs map[string]chan struct{} | ||
mu sync.Mutex | ||
} | ||
|
||
func newMergeQueue() *mergeQueue { | ||
return &mergeQueue{ | ||
docs: make(map[string]chan struct{}), | ||
} | ||
} | ||
|
||
// add adds a docID to the queue. If the docID is already in the queue, it will | ||
// wait for the docID to be removed from the queue. For every add call, done must | ||
// be called to remove the docID from the queue. Otherwise, subsequent add calls will | ||
// block forever. | ||
func (dq *mergeQueue) add(docID string) { | ||
dq.mu.Lock() | ||
done, ok := dq.docs[docID] | ||
if !ok { | ||
dq.docs[docID] = make(chan struct{}) | ||
} | ||
dq.mu.Unlock() | ||
if ok { | ||
<-done | ||
dq.add(docID) | ||
} | ||
} | ||
|
||
func (dq *mergeQueue) done(docID string) { | ||
dq.mu.Lock() | ||
defer dq.mu.Unlock() | ||
done, ok := dq.docs[docID] | ||
if ok { | ||
delete(dq.docs, docID) | ||
close(done) | ||
} | ||
} | ||
|
||
type mergeProcessor struct { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,11 +51,6 @@ type server struct { | |
pubSubEmitter event.Emitter | ||
pushLogEmitter event.Emitter | ||
|
||
Comment on lines
52
to
53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: If you remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's still there but in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that if the conflict happens in exactly the wrong way, blocks will be missing from the blockstore. I wanted to remove it when I refactored the dag sync process and I found out that we have to keep it in. Alternatively we need to change the dags sync logic to deal with conflicts. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh right I remember now. Would a simple fix be to ignore transaction conflict errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The updated sync process should now handle conflicts. |
||
// docQueue is used to track which documents are currently being processed. | ||
// This is used to prevent multiple concurrent processing of the same document and | ||
// limit unecessary transaction conflicts. | ||
docQueue *docQueue | ||
|
||
pb.UnimplementedServiceServer | ||
} | ||
|
||
|
@@ -73,9 +68,6 @@ func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) { | |
peer: p, | ||
conns: make(map[libpeer.ID]*grpc.ClientConn), | ||
topics: make(map[string]pubsubTopic), | ||
docQueue: &docQueue{ | ||
docs: make(map[string]chan struct{}), | ||
}, | ||
} | ||
|
||
cred := insecure.NewCredentials() | ||
|
@@ -152,38 +144,6 @@ func (s *server) GetLog(ctx context.Context, req *pb.GetLogRequest) (*pb.GetLogR | |
return nil, nil | ||
} | ||
|
||
type docQueue struct { | ||
docs map[string]chan struct{} | ||
mu sync.Mutex | ||
} | ||
|
||
// add adds a docID to the queue. If the docID is already in the queue, it will | ||
// wait for the docID to be removed from the queue. For every add call, done must | ||
// be called to remove the docID from the queue. Otherwise, subsequent add calls will | ||
// block forever. | ||
func (dq *docQueue) add(docID string) { | ||
dq.mu.Lock() | ||
done, ok := dq.docs[docID] | ||
if !ok { | ||
dq.docs[docID] = make(chan struct{}) | ||
} | ||
dq.mu.Unlock() | ||
if ok { | ||
<-done | ||
dq.add(docID) | ||
} | ||
} | ||
|
||
func (dq *docQueue) done(docID string) { | ||
dq.mu.Lock() | ||
defer dq.mu.Unlock() | ||
done, ok := dq.docs[docID] | ||
if ok { | ||
delete(dq.docs, docID) | ||
close(done) | ||
} | ||
} | ||
|
||
// PushLog receives a push log request | ||
func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushLogReply, error) { | ||
pid, err := peerIDFromContext(ctx) | ||
|
@@ -199,9 +159,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL | |
return nil, err | ||
} | ||
|
||
s.docQueue.add(docID.String()) | ||
defer func() { | ||
s.docQueue.done(docID.String()) | ||
if s.pushLogEmitter != nil { | ||
byPeer, err := libpeer.Decode(req.Body.Creator) | ||
if err != nil { | ||
|
@@ -249,6 +207,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL | |
wg := &sync.WaitGroup{} | ||
wg.Add(1) | ||
s.peer.db.Events().DAGMerges.Value().Publish(events.DAGMerge{ | ||
DocID: docID.String(), | ||
Cid: cid, | ||
SchemaRoot: string(req.Body.SchemaRoot), | ||
Wg: wg, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: If the DocID is now part of the message, we can get rid of the
getDocIDFromBlock
function. The reason I hadn't included DocID before is that I wanted to guarantee that there was no mistake with it. Like the wrong DocID for the provided CID. That was probably a bad reason when I think about it.