Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(i): Handle collection commits over P2P #3247

Merged
149 changes: 82 additions & 67 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,37 @@ import (
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/core/crdt"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/encryption"
"github.com/sourcenetwork/defradb/internal/keys"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
merklecrdt "github.com/sourcenetwork/defradb/internal/merkle/crdt"
)

func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
func (db *db) executeMerge(ctx context.Context, col *collection, dagMerge event.Merge) error {
ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

col, err := getCollectionFromRootSchema(ctx, db, dagMerge.SchemaRoot)
if err != nil {
return err
}

docID, err := client.NewDocIDFromString(dagMerge.DocID)
if err != nil {
return err
var key keys.HeadstoreKey
if dagMerge.DocID != "" {
key = keys.HeadstoreDocKey{
DocID: dagMerge.DocID,
FieldID: core.COMPOSITE_NAMESPACE,
}
} else {
key = keys.NewHeadstoreColKey(col.Description().RootID)
}
dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String())

mp, err := db.newMergeProcessor(txn, col, dsKey)
mt, err := getHeadsAsMergeTarget(ctx, txn, key)
if err != nil {
return err
}

mt, err := getHeadsAsMergeTarget(ctx, txn, dsKey.WithFieldID(core.COMPOSITE_NAMESPACE))
mp, err := db.newMergeProcessor(txn, col)
if err != nil {
return err
}
Expand All @@ -73,9 +73,15 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
return err
}

err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
for docID := range mp.docIDs {
docID, err := client.NewDocIDFromString(docID)
if err != nil {
return err
}
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
}
}

err = txn.Commit(ctx)
Expand All @@ -94,39 +100,39 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
// mergeQueue is synchronization source to ensure that concurrent
// document merges do not cause transaction conflicts.
type mergeQueue struct {
docs map[string]chan struct{}
keys map[string]chan struct{}
mutex sync.Mutex
}

func newMergeQueue() *mergeQueue {
return &mergeQueue{
docs: make(map[string]chan struct{}),
keys: make(map[string]chan struct{}),
}
}

// add adds a docID to the queue. If the docID is already in the queue, it will
// wait for the docID to be removed from the queue. For every add call, done must
// be called to remove the docID from the queue. Otherwise, subsequent add calls will
// add adds a key to the queue. If the key is already in the queue, it will
// wait for the key to be removed from the queue. For every add call, done must
// be called to remove the key from the queue. Otherwise, subsequent add calls will
// block forever.
func (m *mergeQueue) add(docID string) {
func (m *mergeQueue) add(key string) {
m.mutex.Lock()
done, ok := m.docs[docID]
done, ok := m.keys[key]
if !ok {
m.docs[docID] = make(chan struct{})
m.keys[key] = make(chan struct{})
}
m.mutex.Unlock()
if ok {
<-done
m.add(docID)
m.add(key)
}
}

func (m *mergeQueue) done(docID string) {
func (m *mergeQueue) done(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()
done, ok := m.docs[docID]
done, ok := m.keys[key]
if ok {
delete(m.docs, docID)
delete(m.keys, key)
close(done)
}
}
Expand All @@ -135,9 +141,11 @@ type mergeProcessor struct {
txn datastore.Txn
blockLS linking.LinkSystem
encBlockLS linking.LinkSystem
mCRDTs map[string]merklecrdt.MerkleCRDT
col *collection
dsKey keys.DataStoreKey

// docIDs contains all docIDs that have been merged so far by the mergeProcessor
docIDs map[string]struct{}

// composites is a list of composites that need to be merged.
composites *list.List
// missingEncryptionBlocks is a list of blocks that we failed to fetch
Expand All @@ -149,7 +157,6 @@ type mergeProcessor struct {
func (db *db) newMergeProcessor(
txn datastore.Txn,
col *collection,
dsKey keys.DataStoreKey,
) (*mergeProcessor, error) {
blockLS := cidlink.DefaultLinkSystem()
blockLS.SetReadStorage(txn.Blockstore().AsIPLDStorage())
Expand All @@ -161,9 +168,8 @@ func (db *db) newMergeProcessor(
txn: txn,
blockLS: blockLS,
encBlockLS: encBlockLS,
mCRDTs: make(map[string]merklecrdt.MerkleCRDT),
col: col,
dsKey: dsKey,
docIDs: make(map[string]struct{}),
composites: list.New(),
missingEncryptionBlocks: make(map[cidlink.Link]struct{}),
availableEncryptionBlocks: make(map[cidlink.Link]*coreblock.Encryption),
Expand Down Expand Up @@ -375,7 +381,7 @@ func (mp *mergeProcessor) processBlock(
}

if canRead {
crdt, err := mp.initCRDTForType(dagBlock.Delta.GetFieldName())
crdt, err := mp.initCRDTForType(dagBlock.Delta)
if err != nil {
return err
}
Expand Down Expand Up @@ -435,50 +441,59 @@ func decryptBlock(
return newBlock, nil
}

func (mp *mergeProcessor) initCRDTForType(field string) (merklecrdt.MerkleCRDT, error) {
mcrdt, exists := mp.mCRDTs[field]
if exists {
return mcrdt, nil
}

func (mp *mergeProcessor) initCRDTForType(crdt crdt.CRDT) (merklecrdt.MerkleCRDT, error) {
schemaVersionKey := keys.CollectionSchemaVersionKey{
SchemaVersionID: mp.col.Schema().VersionID,
CollectionID: mp.col.ID(),
}

if field == "" {
mcrdt = merklecrdt.NewMerkleCompositeDAG(
switch {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Nice use of the block's IsFoo methods.

case crdt.IsComposite():
docID := string(crdt.GetDocID())
mp.docIDs[docID] = struct{}{}

return merklecrdt.NewMerkleCompositeDAG(
mp.txn,
schemaVersionKey,
mp.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE),
)
mp.mCRDTs[field] = mcrdt
return mcrdt, nil
}
base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), docID).WithFieldID(core.COMPOSITE_NAMESPACE),
), nil

fd, ok := mp.col.Definition().GetFieldByName(field)
if !ok {
// If the field is not part of the schema, we can safely ignore it.
return nil, nil
case crdt.IsCollection():
return merklecrdt.NewMerkleCollection(
mp.txn,
schemaVersionKey,
keys.NewHeadstoreColKey(mp.col.Description().RootID),
), nil

default:
docID := string(crdt.GetDocID())
mp.docIDs[docID] = struct{}{}

field := crdt.GetFieldName()
fd, ok := mp.col.Definition().GetFieldByName(field)
if !ok {
// If the field is not part of the schema, we can safely ignore it.
return nil, nil
}

return merklecrdt.FieldLevelCRDTWithStore(
mp.txn,
schemaVersionKey,
fd.Typ,
fd.Kind,
base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), docID).WithFieldID(fd.ID.String()),
field,
)
}
}

mcrdt, err := merklecrdt.FieldLevelCRDTWithStore(
mp.txn,
schemaVersionKey,
fd.Typ,
fd.Kind,
mp.dsKey.WithFieldID(fd.ID.String()),
field,
)
func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) {
ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
return nil, err
}
defer txn.Discard(ctx)

mp.mCRDTs[field] = mcrdt
return mcrdt, nil
}

func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) {
cols, err := db.getCollections(
ctx,
client.CollectionFetchOptions{
Expand All @@ -498,8 +513,8 @@ func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string)

// getHeadsAsMergeTarget retrieves the heads of the composite DAG for the given document
// and returns them as a merge target.
func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) (mergeTarget, error) {
cids, err := getHeads(ctx, txn, dsKey)
func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) (mergeTarget, error) {
cids, err := getHeads(ctx, txn, key)

if err != nil {
return mergeTarget{}, err
Expand All @@ -520,8 +535,8 @@ func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.Da
}

// getHeads retrieves the heads associated with the given datastore key.
func getHeads(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) ([]cid.Cid, error) {
headset := clock.NewHeadSet(txn.Headstore(), dsKey.ToHeadStoreKey())
func getHeads(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) ([]cid.Cid, error) {
headset := clock.NewHeadSet(txn.Headstore(), key)

cids, _, err := headset.List(ctx)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestMerge_SingleBranch_NoError(t *testing.T) {
compInfo2, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfo)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
compInfo2, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfo)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand All @@ -113,7 +113,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
compInfo3, err := d.generateCompositeUpdate(&lsys, map[string]any{"age": 30}, compInfo)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
compInfo2, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfo)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand All @@ -180,7 +180,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
compInfo3, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfoUnkown)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand Down Expand Up @@ -304,15 +304,15 @@ func TestMergeQueue(t *testing.T) {
go q.add(testDocID)
// give time for the goroutine to block
time.Sleep(10 * time.Millisecond)
require.Len(t, q.docs, 1)
require.Len(t, q.keys, 1)
q.done(testDocID)
// give time for the goroutine to add the docID
time.Sleep(10 * time.Millisecond)
q.mutex.Lock()
require.Len(t, q.docs, 1)
require.Len(t, q.keys, 1)
q.mutex.Unlock()
q.done(testDocID)
q.mutex.Lock()
require.Len(t, q.docs, 0)
require.Len(t, q.keys, 0)
q.mutex.Unlock()
}
31 changes: 25 additions & 6 deletions internal/db/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
)

func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) {
queue := newMergeQueue()
docIdQueue := newMergeQueue()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: docIDQueue would be more idomatic.

Copy link
Contributor Author

@AndrewSisley AndrewSisley Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 years later and I'm still messing that up 😁

  • docIDQueue

schemaRootQueue := newMergeQueue()

// This is used to ensure we only trigger loadAndPublishP2PCollections and loadAndPublishReplicators
// once per db instanciation.
loadOnce := sync.Once{}
Expand All @@ -37,17 +39,34 @@ func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) {
switch evt := msg.Data.(type) {
case event.Merge:
go func() {
// ensure only one merge per docID
queue.add(evt.DocID)
defer queue.done(evt.DocID)
col, err := getCollectionFromRootSchema(ctx, db, evt.SchemaRoot)
if err != nil {
log.ErrorContextE(
ctx,
"Failed to execute merge",
err,
corelog.Any("Event", evt))
return
}

if col.Description().IsBranchable {
// As collection commits link to document composite commits, all events
// recieved for branchable collections must be processed serially else
// they may otherwise cause a transaction conflict.
schemaRootQueue.add(evt.SchemaRoot)
defer schemaRootQueue.done(evt.SchemaRoot)
} else {
// ensure only one merge per docID
docIdQueue.add(evt.DocID)
defer docIdQueue.done(evt.DocID)
}

// retry the merge process if a conflict occurs
//
// conficts occur when a user updates a document
// while a merge is in progress.
var err error
for i := 0; i < db.MaxTxnRetries(); i++ {
err = db.executeMerge(ctx, evt)
err = db.executeMerge(ctx, col, evt)
if errors.Is(err, datastore.ErrTxnConflict) {
continue // retry merge
}
Expand Down
Loading
Loading