From 2d0bd84cb6b8aabed0c6ef136aefbd6e8540a3a4 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 11 Nov 2024 15:21:29 -0500 Subject: [PATCH 1/8] Handle collection commits over P2P --- internal/db/merge.go | 158 +++++---- internal/db/merge_test.go | 16 +- internal/db/messages.go | 31 +- net/peer.go | 14 +- net/server.go | 53 +-- .../commits/branchables/peer_index_test.go | 68 ++++ .../query/commits/branchables/peer_test.go | 129 ++++++-- .../commits/branchables/peer_update_test.go | 303 ++++++++++++++++++ 8 files changed, 636 insertions(+), 136 deletions(-) create mode 100644 tests/integration/query/commits/branchables/peer_index_test.go create mode 100644 tests/integration/query/commits/branchables/peer_update_test.go diff --git a/internal/db/merge.go b/internal/db/merge.go index 47db8740b1..211267e002 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -28,6 +28,7 @@ import ( "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/core" coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/core/crdt" "github.com/sourcenetwork/defradb/internal/db/base" "github.com/sourcenetwork/defradb/internal/encryption" "github.com/sourcenetwork/defradb/internal/keys" @@ -35,30 +36,30 @@ import ( merklecrdt "github.com/sourcenetwork/defradb/internal/merkle/crdt" ) -func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error { +func (db *db) executeMerge(ctx context.Context, col *collection, dagMerge event.Merge) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err } defer txn.Discard(ctx) - col, err := getCollectionFromRootSchema(ctx, db, dagMerge.SchemaRoot) - if err != nil { - return err - } - - docID, err := client.NewDocIDFromString(dagMerge.DocID) - if err != nil { - return err - } - dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String()) - - mp, err := db.newMergeProcessor(txn, col, dsKey) - if err != nil { - return err + var mt mergeTarget + if dagMerge.DocID != "" { + mt, err = getHeadsAsMergeTarget(ctx, txn, keys.HeadstoreDocKey{ + DocID: dagMerge.DocID, + FieldID: core.COMPOSITE_NAMESPACE, + }) + if err != nil { + return err + } + } else { + mt, err = getHeadsAsMergeTarget(ctx, txn, keys.NewHeadstoreColKey(col.Description().RootID)) + if err != nil { + return err + } } - mt, err := getHeadsAsMergeTarget(ctx, txn, dsKey.WithFieldID(core.COMPOSITE_NAMESPACE)) + mp, err := db.newMergeProcessor(txn, col) if err != nil { return err } @@ -73,9 +74,15 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error { return err } - err = syncIndexedDoc(ctx, docID, col) - if err != nil { - return err + for docID := range mp.docIDs { + docID, err := client.NewDocIDFromString(docID) + if err != nil { + return err + } + err = syncIndexedDoc(ctx, docID, col) + if err != nil { + return err + } } err = txn.Commit(ctx) @@ -94,39 +101,39 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error { // mergeQueue is synchronization source to ensure that concurrent // document merges do not cause transaction conflicts. type mergeQueue struct { - docs map[string]chan struct{} + keys map[string]chan struct{} mutex sync.Mutex } func newMergeQueue() *mergeQueue { return &mergeQueue{ - docs: make(map[string]chan struct{}), + keys: make(map[string]chan struct{}), } } -// add adds a docID to the queue. If the docID is already in the queue, it will -// wait for the docID to be removed from the queue. For every add call, done must -// be called to remove the docID from the queue. Otherwise, subsequent add calls will +// add adds a key to the queue. If the key is already in the queue, it will +// wait for the key to be removed from the queue. For every add call, done must +// be called to remove the key from the queue. Otherwise, subsequent add calls will // block forever. -func (m *mergeQueue) add(docID string) { +func (m *mergeQueue) add(key string) { m.mutex.Lock() - done, ok := m.docs[docID] + done, ok := m.keys[key] if !ok { - m.docs[docID] = make(chan struct{}) + m.keys[key] = make(chan struct{}) } m.mutex.Unlock() if ok { <-done - m.add(docID) + m.add(key) } } -func (m *mergeQueue) done(docID string) { +func (m *mergeQueue) done(key string) { m.mutex.Lock() defer m.mutex.Unlock() - done, ok := m.docs[docID] + done, ok := m.keys[key] if ok { - delete(m.docs, docID) + delete(m.keys, key) close(done) } } @@ -135,9 +142,11 @@ type mergeProcessor struct { txn datastore.Txn blockLS linking.LinkSystem encBlockLS linking.LinkSystem - mCRDTs map[string]merklecrdt.MerkleCRDT col *collection - dsKey keys.DataStoreKey + + // docIDs contains all docIDs that have been merged so far by the mergeProcessor + docIDs map[string]struct{} + // composites is a list of composites that need to be merged. composites *list.List // missingEncryptionBlocks is a list of blocks that we failed to fetch @@ -149,7 +158,6 @@ type mergeProcessor struct { func (db *db) newMergeProcessor( txn datastore.Txn, col *collection, - dsKey keys.DataStoreKey, ) (*mergeProcessor, error) { blockLS := cidlink.DefaultLinkSystem() blockLS.SetReadStorage(txn.Blockstore().AsIPLDStorage()) @@ -161,9 +169,8 @@ func (db *db) newMergeProcessor( txn: txn, blockLS: blockLS, encBlockLS: encBlockLS, - mCRDTs: make(map[string]merklecrdt.MerkleCRDT), col: col, - dsKey: dsKey, + docIDs: make(map[string]struct{}), composites: list.New(), missingEncryptionBlocks: make(map[cidlink.Link]struct{}), availableEncryptionBlocks: make(map[cidlink.Link]*coreblock.Encryption), @@ -375,7 +382,7 @@ func (mp *mergeProcessor) processBlock( } if canRead { - crdt, err := mp.initCRDTForType(dagBlock.Delta.GetFieldName()) + crdt, err := mp.initCRDTForType(dagBlock.Delta) if err != nil { return err } @@ -435,50 +442,59 @@ func decryptBlock( return newBlock, nil } -func (mp *mergeProcessor) initCRDTForType(field string) (merklecrdt.MerkleCRDT, error) { - mcrdt, exists := mp.mCRDTs[field] - if exists { - return mcrdt, nil - } - +func (mp *mergeProcessor) initCRDTForType(crdt crdt.CRDT) (merklecrdt.MerkleCRDT, error) { schemaVersionKey := keys.CollectionSchemaVersionKey{ SchemaVersionID: mp.col.Schema().VersionID, CollectionID: mp.col.ID(), } - if field == "" { - mcrdt = merklecrdt.NewMerkleCompositeDAG( + switch { + case crdt.IsComposite(): + docID := string(crdt.GetDocID()) + mp.docIDs[docID] = struct{}{} + + return merklecrdt.NewMerkleCompositeDAG( mp.txn, schemaVersionKey, - mp.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE), - ) - mp.mCRDTs[field] = mcrdt - return mcrdt, nil - } + base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), docID).WithFieldID(core.COMPOSITE_NAMESPACE), + ), nil - fd, ok := mp.col.Definition().GetFieldByName(field) - if !ok { - // If the field is not part of the schema, we can safely ignore it. - return nil, nil + case crdt.IsCollection(): + return merklecrdt.NewMerkleCollection( + mp.txn, + schemaVersionKey, + keys.NewHeadstoreColKey(mp.col.Description().RootID), + ), nil + + default: + docID := string(crdt.GetDocID()) + mp.docIDs[docID] = struct{}{} + + field := crdt.GetFieldName() + fd, ok := mp.col.Definition().GetFieldByName(field) + if !ok { + // If the field is not part of the schema, we can safely ignore it. + return nil, nil + } + + return merklecrdt.FieldLevelCRDTWithStore( + mp.txn, + schemaVersionKey, + fd.Typ, + fd.Kind, + base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), docID).WithFieldID(fd.ID.String()), + field, + ) } +} - mcrdt, err := merklecrdt.FieldLevelCRDTWithStore( - mp.txn, - schemaVersionKey, - fd.Typ, - fd.Kind, - mp.dsKey.WithFieldID(fd.ID.String()), - field, - ) +func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) { + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return nil, err } + defer txn.Discard(ctx) - mp.mCRDTs[field] = mcrdt - return mcrdt, nil -} - -func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) { cols, err := db.getCollections( ctx, client.CollectionFetchOptions{ @@ -498,8 +514,8 @@ func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) // getHeadsAsMergeTarget retrieves the heads of the composite DAG for the given document // and returns them as a merge target. -func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) (mergeTarget, error) { - cids, err := getHeads(ctx, txn, dsKey) +func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) (mergeTarget, error) { + cids, err := getHeads(ctx, txn, key) if err != nil { return mergeTarget{}, err @@ -520,8 +536,8 @@ func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.Da } // getHeads retrieves the heads associated with the given datastore key. -func getHeads(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) ([]cid.Cid, error) { - headset := clock.NewHeadSet(txn.Headstore(), dsKey.ToHeadStoreKey()) +func getHeads(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) ([]cid.Cid, error) { + headset := clock.NewHeadSet(txn.Headstore(), key) cids, _, err := headset.List(ctx) if err != nil { diff --git a/internal/db/merge_test.go b/internal/db/merge_test.go index f9478be536..ee170bfe54 100644 --- a/internal/db/merge_test.go +++ b/internal/db/merge_test.go @@ -58,7 +58,7 @@ func TestMerge_SingleBranch_NoError(t *testing.T) { compInfo2, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfo) require.NoError(t, err) - err = db.executeMerge(ctx, event.Merge{ + err = db.executeMerge(ctx, col.(*collection), event.Merge{ DocID: docID.String(), Cid: compInfo2.link.Cid, SchemaRoot: col.SchemaRoot(), @@ -103,7 +103,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) { compInfo2, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfo) require.NoError(t, err) - err = db.executeMerge(ctx, event.Merge{ + err = db.executeMerge(ctx, col.(*collection), event.Merge{ DocID: docID.String(), Cid: compInfo2.link.Cid, SchemaRoot: col.SchemaRoot(), @@ -113,7 +113,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) { compInfo3, err := d.generateCompositeUpdate(&lsys, map[string]any{"age": 30}, compInfo) require.NoError(t, err) - err = db.executeMerge(ctx, event.Merge{ + err = db.executeMerge(ctx, col.(*collection), event.Merge{ DocID: docID.String(), Cid: compInfo3.link.Cid, SchemaRoot: col.SchemaRoot(), @@ -161,7 +161,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) { compInfo2, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfo) require.NoError(t, err) - err = db.executeMerge(ctx, event.Merge{ + err = db.executeMerge(ctx, col.(*collection), event.Merge{ DocID: docID.String(), Cid: compInfo2.link.Cid, SchemaRoot: col.SchemaRoot(), @@ -180,7 +180,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) { compInfo3, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfoUnkown) require.NoError(t, err) - err = db.executeMerge(ctx, event.Merge{ + err = db.executeMerge(ctx, col.(*collection), event.Merge{ DocID: docID.String(), Cid: compInfo3.link.Cid, SchemaRoot: col.SchemaRoot(), @@ -304,15 +304,15 @@ func TestMergeQueue(t *testing.T) { go q.add(testDocID) // give time for the goroutine to block time.Sleep(10 * time.Millisecond) - require.Len(t, q.docs, 1) + require.Len(t, q.keys, 1) q.done(testDocID) // give time for the goroutine to add the docID time.Sleep(10 * time.Millisecond) q.mutex.Lock() - require.Len(t, q.docs, 1) + require.Len(t, q.keys, 1) q.mutex.Unlock() q.done(testDocID) q.mutex.Lock() - require.Len(t, q.docs, 0) + require.Len(t, q.keys, 0) q.mutex.Unlock() } diff --git a/internal/db/messages.go b/internal/db/messages.go index 51efba982e..2dcc175cfe 100644 --- a/internal/db/messages.go +++ b/internal/db/messages.go @@ -22,7 +22,9 @@ import ( ) func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) { - queue := newMergeQueue() + docIdQueue := newMergeQueue() + schemaRootQueue := newMergeQueue() + // This is used to ensure we only trigger loadAndPublishP2PCollections and loadAndPublishReplicators // once per db instanciation. loadOnce := sync.Once{} @@ -37,17 +39,34 @@ func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) { switch evt := msg.Data.(type) { case event.Merge: go func() { - // ensure only one merge per docID - queue.add(evt.DocID) - defer queue.done(evt.DocID) + col, err := getCollectionFromRootSchema(ctx, db, evt.SchemaRoot) + if err != nil { + log.ErrorContextE( + ctx, + "Failed to execute merge", + err, + corelog.Any("Event", evt)) + return + } + + if col.Description().IsBranchable { + // As collection commits link to document composite commits, all events + // recieved for branchable collections must be processed serially else + // they may otherwise cause a transaction conflict. + schemaRootQueue.add(evt.SchemaRoot) + defer schemaRootQueue.done(evt.SchemaRoot) + } else { + // ensure only one merge per docID + docIdQueue.add(evt.DocID) + defer docIdQueue.done(evt.DocID) + } // retry the merge process if a conflict occurs // // conficts occur when a user updates a document // while a merge is in progress. - var err error for i := 0; i < db.MaxTxnRetries(); i++ { - err = db.executeMerge(ctx, evt) + err = db.executeMerge(ctx, col, evt) if errors.Is(err, datastore.ErrTxnConflict) { continue // retry merge } diff --git a/net/peer.go b/net/peer.go index e4ebfe8573..d59d6fe150 100644 --- a/net/peer.go +++ b/net/peer.go @@ -255,9 +255,11 @@ func (p *Peer) handleMessageLoop() { } func (p *Peer) handleLog(evt event.Update) error { - _, err := client.NewDocIDFromString(evt.DocID) - if err != nil { - return NewErrFailedToGetDocID(err) + if evt.DocID != "" { + _, err := client.NewDocIDFromString(evt.DocID) + if err != nil { + return NewErrFailedToGetDocID(err) + } } // push to each peer (replicator) @@ -273,8 +275,10 @@ func (p *Peer) handleLog(evt event.Update) error { Block: evt.Block, } - if err := p.server.publishLog(p.ctx, evt.DocID, req); err != nil { - return NewErrPublishingToDocIDTopic(err, evt.Cid.String(), evt.DocID) + if evt.DocID != "" { + if err := p.server.publishLog(p.ctx, evt.DocID, req); err != nil { + return NewErrPublishingToDocIDTopic(err, evt.Cid.String(), evt.DocID) + } } if err := p.server.publishLog(p.ctx, evt.SchemaRoot, req); err != nil { diff --git a/net/server.go b/net/server.go index c83ba3f6be..0be9def0ce 100644 --- a/net/server.go +++ b/net/server.go @@ -110,9 +110,12 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl if err != nil { return nil, err } - docID, err := client.NewDocIDFromString(req.DocID) - if err != nil { - return nil, err + + if req.DocID != "" { + _, err := client.NewDocIDFromString(req.DocID) + if err != nil { + return nil, err + } } byPeer, err := libpeer.Decode(req.Creator) if err != nil { @@ -126,11 +129,11 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl log.InfoContext(ctx, "Received pushlog", corelog.Any("PeerID", pid.String()), corelog.Any("Creator", byPeer.String()), - corelog.Any("DocID", docID.String())) + corelog.Any("DocID", req.DocID)) log.InfoContext(ctx, "Starting DAG sync", corelog.Any("PeerID", pid.String()), - corelog.Any("DocID", docID.String())) + corelog.Any("DocID", req.DocID)) err = syncDAG(ctx, s.peer.bserv, block) if err != nil { @@ -139,19 +142,19 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl log.InfoContext(ctx, "DAG sync complete", corelog.Any("PeerID", pid.String()), - corelog.Any("DocID", docID.String())) + corelog.Any("DocID", req.DocID)) // Once processed, subscribe to the DocID topic on the pubsub network unless we already // subscribed to the collection. - if !s.hasPubSubTopicAndSubscribed(req.SchemaRoot) { - err = s.addPubSubTopic(docID.String(), true, nil) + if !s.hasPubSubTopicAndSubscribed(req.SchemaRoot) && req.DocID != "" { + _, err = s.addPubSubTopic(req.DocID, true, nil) if err != nil { return nil, err } } s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{ - DocID: docID.String(), + DocID: req.DocID, ByPeer: byPeer, FromPeer: pid, Cid: headCID, @@ -172,9 +175,9 @@ func (s *server) GetHeadLog( // addPubSubTopic subscribes to a topic on the pubsub network // A custom message handler can be provided to handle incoming messages. If not provided, // the default message handler will be used. -func (s *server) addPubSubTopic(topic string, subscribe bool, handler rpc.MessageHandler) error { +func (s *server) addPubSubTopic(topic string, subscribe bool, handler rpc.MessageHandler) (pubsubTopic, error) { if s.peer.ps == nil { - return nil + return pubsubTopic{}, nil } log.InfoContext(s.peer.ctx, "Adding pubsub topic", @@ -188,16 +191,16 @@ func (s *server) addPubSubTopic(topic string, subscribe bool, handler rpc.Messag // we need to close the existing topic and create a new one. if !t.subscribed && subscribe { if err := t.Close(); err != nil { - return err + return pubsubTopic{}, err } } else { - return nil + return t, nil } } t, err := rpc.NewTopic(s.peer.ctx, s.peer.ps, s.peer.host.ID(), topic, subscribe) if err != nil { - return err + return pubsubTopic{}, err } if handler == nil { @@ -206,15 +209,17 @@ func (s *server) addPubSubTopic(topic string, subscribe bool, handler rpc.Messag t.SetEventHandler(s.pubSubEventHandler) t.SetMessageHandler(handler) - s.topics[topic] = pubsubTopic{ + pst := pubsubTopic{ Topic: t, subscribed: subscribe, } - return nil + s.topics[topic] = pst + return pst, nil } func (s *server) AddPubSubTopic(topicName string, handler rpc.MessageHandler) error { - return s.addPubSubTopic(topicName, true, handler) + _, err := s.addPubSubTopic(topicName, true, handler) + return err } // hasPubSubTopicAndSubscribed checks if we are subscribed to a topic. @@ -274,13 +279,23 @@ func (s *server) publishLog(ctx context.Context, topic string, req *pushLogReque s.mu.Unlock() if !ok { subscribe := topic != req.SchemaRoot && !s.hasPubSubTopicAndSubscribed(req.SchemaRoot) - err := s.addPubSubTopic(topic, subscribe, nil) + _, err := s.addPubSubTopic(topic, subscribe, nil) if err != nil { return errors.Wrap(fmt.Sprintf("failed to created single use topic %s", topic), err) } return s.publishLog(ctx, topic, req) } + if topic == req.SchemaRoot && req.DocID == "" && !t.subscribed { + // If the push log request is scoped to the schema and not to a document, subscribe to the + // schema. + var err error + t, err = s.addPubSubTopic(topic, true, nil) + if err != nil { + return errors.Wrap(fmt.Sprintf("failed to created single use topic %s", topic), err) + } + } + log.InfoContext(ctx, "Publish log", corelog.String("PeerID", s.peer.PeerID().String()), corelog.String("Topic", topic)) @@ -356,7 +371,7 @@ func peerIDFromContext(ctx context.Context) (libpeer.ID, error) { func (s *server) updatePubSubTopics(evt event.P2PTopic) { for _, topic := range evt.ToAdd { - err := s.addPubSubTopic(topic, true, nil) + _, err := s.addPubSubTopic(topic, true, nil) if err != nil { log.ErrorE("Failed to add pubsub topic.", err) } diff --git a/tests/integration/query/commits/branchables/peer_index_test.go b/tests/integration/query/commits/branchables/peer_index_test.go new file mode 100644 index 0000000000..ab03eb3c56 --- /dev/null +++ b/tests/integration/query/commits/branchables/peer_index_test.go @@ -0,0 +1,68 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package branchables + +import ( + "testing" + + "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestQueryCommitsBranchables_SyncsIndexAcrossPeerConnection(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String @index + } + `, + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "John" + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + // This query errors out if the document's index has not been correctly + // constructed + Request: `query { + Users (filter: {name: {_eq: "John"}}){ + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{ + { + "name": "John", + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/query/commits/branchables/peer_test.go b/tests/integration/query/commits/branchables/peer_test.go index 81ff77a240..8ba6bf7294 100644 --- a/tests/integration/query/commits/branchables/peer_test.go +++ b/tests/integration/query/commits/branchables/peer_test.go @@ -18,8 +18,6 @@ import ( testUtils "github.com/sourcenetwork/defradb/tests/integration" ) -// TODO: This test documents an unimplemented feature. Tracked by: -// https://github.com/sourcenetwork/defradb/issues/3212 func TestQueryCommitsBranchables_SyncsAcrossPeerConnection(t *testing.T) { test := testUtils.TestCase{ Actions: []any{ @@ -50,76 +48,153 @@ func TestQueryCommitsBranchables_SyncsAcrossPeerConnection(t *testing.T) { }, testUtils.WaitForSync{}, testUtils.Request{ - NodeID: immutable.Some(0), Request: `query { - commits { - cid - links { + commits { cid + links { + cid + } } - } - }`, + }`, Results: map[string]any{ "commits": []map[string]any{ { - "cid": testUtils.NewUniqueCid("collection"), + "cid": testUtils.NewUniqueCid(0), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("composite"), + "cid": testUtils.NewUniqueCid(1), }, }, }, { - "cid": testUtils.NewUniqueCid("age"), + "cid": testUtils.NewUniqueCid(2), "links": []map[string]any{}, }, { - "cid": testUtils.NewUniqueCid("name"), + "cid": testUtils.NewUniqueCid(3), "links": []map[string]any{}, }, { - "cid": testUtils.NewUniqueCid("composite"), + "cid": testUtils.NewUniqueCid(1), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("age"), + "cid": testUtils.NewUniqueCid(2), }, { - "cid": testUtils.NewUniqueCid("name"), + "cid": testUtils.NewUniqueCid(3), }, }, }, }, }, }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestQueryCommitsBranchables_SyncsMultipleAcrossPeerConnection(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + age: Int + } + `, + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "John", + "age": 21 + }`, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred", + "age": 25 + }`, + }, + testUtils.WaitForSync{}, testUtils.Request{ - NodeID: immutable.Some(1), Request: `query { - commits { - cid - links { + commits { cid + links { + cid + } } - } - }`, + }`, Results: map[string]any{ "commits": []map[string]any{ - // Note: The collection commit has not synced. { - "cid": testUtils.NewUniqueCid("age"), + "cid": testUtils.NewUniqueCid("collection, doc2 create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, doc1 create"), + }, + { + "cid": testUtils.NewUniqueCid("doc2 create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("collection, doc1 create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc1 create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc1 name"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("doc1 age"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("doc1 create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc1 name"), + }, + { + "cid": testUtils.NewUniqueCid("doc1 age"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc2 name"), "links": []map[string]any{}, }, { - "cid": testUtils.NewUniqueCid("name"), + "cid": testUtils.NewUniqueCid("doc2 age"), "links": []map[string]any{}, }, { - "cid": testUtils.NewUniqueCid("composite"), + "cid": testUtils.NewUniqueCid("doc2 create"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("age"), + "cid": testUtils.NewUniqueCid("doc2 name"), }, { - "cid": testUtils.NewUniqueCid("name"), + "cid": testUtils.NewUniqueCid("doc2 age"), }, }, }, diff --git a/tests/integration/query/commits/branchables/peer_update_test.go b/tests/integration/query/commits/branchables/peer_update_test.go new file mode 100644 index 0000000000..319baa064e --- /dev/null +++ b/tests/integration/query/commits/branchables/peer_update_test.go @@ -0,0 +1,303 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package branchables + +import ( + "testing" + + "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John" + }`, + }, + testUtils.UpdateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.UpdateDoc{ + NodeID: immutable.Some(1), + Doc: `{ + "name": "Shahzad" + }`, + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.WaitForSync{}, + testUtils.UpdateDoc{ + NodeID: immutable.Some(1), + Doc: `{ + "name": "Chris" + }`, + }, + testUtils.WaitForSync{}, + // Note: node 1 does not recieve the first update from node 0 as it occured before the nodes were connected + // node 0 has it as it recieved it when recieving the second update from node 1. The cids and blocks remain + // consistent across both nodes (minus the missing commits). + testUtils.Request{ + NodeID: immutable.Some(0), + Request: `query { + commits { + cid + links { + cid + } + } + }`, + Results: map[string]any{ + "commits": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, update2"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, node1 update1"), + }, + { + "cid": testUtils.NewUniqueCid("doc, update2"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("collection, node1 update1"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, create"), + }, + { + "cid": testUtils.NewUniqueCid("doc, node1 update1"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("collection, create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc, create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("collection, node0 update1"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, create"), + }, + { + "cid": testUtils.NewUniqueCid("doc, node0 update1"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("name, node0 update1"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("name, create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("name, create"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("name, update2"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("name, node1 update1"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("name, node1 update1"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("name, create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc, update2"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc, node1 update1"), + }, + { + "cid": testUtils.NewUniqueCid("name, update2"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc, node1 update1"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc, create"), + }, + { + "cid": testUtils.NewUniqueCid("name, node1 update1"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc, create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("name, create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc, node0 update1"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc, create"), + }, + { + "cid": testUtils.NewUniqueCid("name, node0 update1"), + }, + }, + }, + }, + }, + }, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: `query { + commits { + cid + links { + cid + } + } + }`, + Results: map[string]any{ + "commits": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, update2"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, node1 update1"), + }, + { + "cid": testUtils.NewUniqueCid("doc, update2"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("collection, node1 update1"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, create"), + }, + { + "cid": testUtils.NewUniqueCid("doc, node1 update1"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("collection, create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc, create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("name, update2"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("name, node1 update1"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("name, node1 update1"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("name, create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("name, create"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("doc, update2"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc, node1 update1"), + }, + { + "cid": testUtils.NewUniqueCid("name, update2"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc, node1 update1"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc, create"), + }, + { + "cid": testUtils.NewUniqueCid("name, node1 update1"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc, create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("name, create"), + }, + }, + }, + }, + }, + }, + testUtils.Request{ + Request: `query { + Users { + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{ + { + "name": "Chris", + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} From 01fba9c028e769553875bfb293c6a9ba3d9a8c9f Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Fri, 15 Nov 2024 13:05:04 -0500 Subject: [PATCH 2/8] PR FIXUP - Simplify executeMerge getHeadsAsMergeTarget block --- internal/db/merge.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/internal/db/merge.go b/internal/db/merge.go index 211267e002..c361aa8a2b 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -43,20 +43,19 @@ func (db *db) executeMerge(ctx context.Context, col *collection, dagMerge event. } defer txn.Discard(ctx) - var mt mergeTarget + var key keys.HeadstoreKey if dagMerge.DocID != "" { - mt, err = getHeadsAsMergeTarget(ctx, txn, keys.HeadstoreDocKey{ + key = keys.HeadstoreDocKey{ DocID: dagMerge.DocID, FieldID: core.COMPOSITE_NAMESPACE, - }) - if err != nil { - return err } } else { - mt, err = getHeadsAsMergeTarget(ctx, txn, keys.NewHeadstoreColKey(col.Description().RootID)) - if err != nil { - return err - } + key = keys.NewHeadstoreColKey(col.Description().RootID) + } + + mt, err := getHeadsAsMergeTarget(ctx, txn, key) + if err != nil { + return err } mp, err := db.newMergeProcessor(txn, col) From d1906b237c1593eda4fc19059848be33b7ffd55b Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Fri, 15 Nov 2024 14:52:04 -0500 Subject: [PATCH 3/8] PR FIXUP - Fix waitForSync test action --- tests/integration/acp.go | 7 ++- tests/integration/events.go | 104 +++++++++++++++++++++++------------- tests/integration/state.go | 20 ++++--- tests/integration/utils.go | 6 ++- 4 files changed, 90 insertions(+), 47 deletions(-) diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 78a5a50997..7bf6e6151f 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -215,7 +215,12 @@ func addDocActorRelationshipACP( } if action.ExpectedError == "" && !action.ExpectedExistence { - waitForUpdateEvents(s, actionNodeID, map[string]struct{}{docID: {}}) + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{ + docID: {}, + } + + waitForUpdateEvents(s, actionNodeID, expect) } } diff --git a/tests/integration/events.go b/tests/integration/events.go index 0e28f3e3df..4efcced75a 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -73,8 +73,8 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { } // all previous documents should be merged on the subscriber node - for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDocHeads { - s.nodes[cfg.TargetNodeID].p2p.expectedDocHeads[key] = val.cid + for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDAGHeads { + s.nodes[cfg.TargetNodeID].p2p.expectedDAGHeads[key] = val.cid } // update node connections and replicators @@ -153,7 +153,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio func waitForUpdateEvents( s *state, nodeID immutable.Option[int], - docIDs map[string]struct{}, + docIDs []map[string]struct{}, ) { for i := 0; i < len(s.nodes); i++ { if nodeID.HasValue() && nodeID.Value() != i { @@ -166,8 +166,15 @@ func waitForUpdateEvents( } expect := make(map[string]struct{}, len(docIDs)) - for k := range docIDs { - expect[k] = struct{}{} + for collectionIndex, collectionDocIDs := range docIDs { + for k := range collectionDocIDs { + expect[k] = struct{}{} + + col := node.collections[collectionIndex] + if col.Description().IsBranchable { + expect[col.SchemaRoot()] = struct{}{} + } + } } for len(expect) > 0 { @@ -183,16 +190,10 @@ func waitForUpdateEvents( require.Fail(s.t, "timeout waiting for update event", "Node %d", i) } - if evt.DocID == "" { - // Todo: This will almost certainly need to change once P2P for collection-level commits - // is enabled. See: https://github.com/sourcenetwork/defradb/issues/3212 - continue - } - // make sure the event is expected - _, ok := expect[evt.DocID] - require.True(s.t, ok, "unexpected document update", "Node %d", i) - delete(expect, evt.DocID) + _, ok := expect[getUpdateEventKey(evt)] + require.True(s.t, ok, "unexpected document update", getUpdateEventKey(evt)) + delete(expect, getUpdateEventKey(evt)) // we only need to update the network state if the nodes // are configured for networking @@ -203,7 +204,7 @@ func waitForUpdateEvents( } } -// waitForMergeEvents waits for all expected document heads to be merged to all nodes. +// waitForMergeEvents waits for all expected heads to be merged to all nodes. // // Will fail the test if an event is not received within the expected time interval to prevent tests // from running forever. @@ -214,11 +215,11 @@ func waitForMergeEvents(s *state, action WaitForSync) { continue // node is closed } - expect := node.p2p.expectedDocHeads + expect := node.p2p.expectedDAGHeads - // remove any docs that are already merged - // up to the expected document head - for key, val := range node.p2p.actualDocHeads { + // remove any heads that are already merged + // up to the expected head + for key, val := range node.p2p.actualDAGHeads { if head, ok := expect[key]; ok && head.String() == val.cid.String() { delete(expect, key) } @@ -230,13 +231,13 @@ func waitForMergeEvents(s *state, action WaitForSync) { require.Fail(s.t, "doc index %d out of range", docIndex) } docID := s.docIDs[0][docIndex].String() - actual, hasActual := node.p2p.actualDocHeads[docID] + actual, hasActual := node.p2p.actualDAGHeads[docID] if !hasActual || !actual.decrypted { expectDecrypted[docID] = struct{}{} } } - // wait for all expected doc heads to be merged + // wait for all expected heads to be merged // // the order of merges does not matter as we only // expect the latest head to eventually be merged @@ -260,11 +261,11 @@ func waitForMergeEvents(s *state, action WaitForSync) { delete(expectDecrypted, evt.Merge.DocID) } - head, ok := expect[evt.Merge.DocID] + head, ok := expect[getMergeEventKey(evt.Merge)] if ok && head.String() == evt.Merge.Cid.String() { - delete(expect, evt.Merge.DocID) + delete(expect, getMergeEventKey(evt.Merge)) } - node.p2p.actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} + node.p2p.actualDAGHeads[getMergeEventKey(evt.Merge)] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} } } } @@ -284,23 +285,23 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { // update the actual document head on the node that updated it // as the node created the document, it is already decrypted - node.p2p.actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true} + node.p2p.actualDAGHeads[getUpdateEventKey(evt)] = docHeadState{cid: evt.Cid, decrypted: true} // update the expected document heads of replicator targets for id := range node.p2p.replicators { // replicator target nodes push updates to source nodes - s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid + s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid } // update the expected document heads of connected nodes for id := range node.p2p.connections { // connected nodes share updates of documents they have in common - if _, ok := s.nodes[id].p2p.actualDocHeads[evt.DocID]; ok { - s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid + if _, ok := s.nodes[id].p2p.actualDAGHeads[getUpdateEventKey(evt)]; ok { + s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid } // peer collection subscribers receive updates from any other subscriber node if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok { - s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid + s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid } } @@ -312,21 +313,24 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { // getEventsForUpdateDoc returns a map of docIDs that should be // published to the local event bus after an UpdateDoc action. -func getEventsForUpdateDoc(s *state, action UpdateDoc) map[string]struct{} { +func getEventsForUpdateDoc(s *state, action UpdateDoc) []map[string]struct{} { docID := s.docIDs[action.CollectionID][action.DocID] docMap := make(map[string]any) err := json.Unmarshal([]byte(action.Doc), &docMap) require.NoError(s.t, err) - return map[string]struct{}{ + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{ docID.String(): {}, } + + return expect } // getEventsForCreateDoc returns a map of docIDs that should be // published to the local event bus after a CreateDoc action. -func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { +func getEventsForCreateDoc(s *state, action CreateDoc) []map[string]struct{} { var collection client.Collection if action.NodeID.HasValue() { collection = s.nodes[action.NodeID.Value()].collections[action.CollectionID] @@ -337,10 +341,11 @@ func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { docs, err := parseCreateDocs(action, collection) require.NoError(s.t, err) - expect := make(map[string]struct{}) + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{} for _, doc := range docs { - expect[doc.ID().String()] = struct{}{} + expect[action.CollectionID][doc.ID().String()] = struct{}{} } return expect @@ -356,16 +361,41 @@ func getEventsForUpdateWithFilter( s *state, action UpdateWithFilter, result *client.UpdateResult, -) map[string]struct{} { +) []map[string]struct{} { var docPatch map[string]any err := json.Unmarshal([]byte(action.Updater), &docPatch) require.NoError(s.t, err) - expect := make(map[string]struct{}) + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{} for _, docID := range result.DocIDs { - expect[docID] = struct{}{} + expect[action.CollectionID][docID] = struct{}{} } return expect } + +// getUpdateEventKey gets the identifier to which this event is scoped to. +// +// For example, if this is scoped to a document, the document ID will be +// returned. If it is scoped to a schema, the schema root will be returned. +func getUpdateEventKey(evt event.Update) string { + if evt.DocID == "" { + return evt.SchemaRoot + } + + return evt.DocID +} + +// getMergeEventKey gets the identifier to which this event is scoped to. +// +// For example, if this is scoped to a document, the document ID will be +// returned. If it is scoped to a schema, the schema root will be returned. +func getMergeEventKey(evt event.Merge) string { + if evt.DocID == "" { + return evt.SchemaRoot + } + + return evt.DocID +} diff --git a/tests/integration/state.go b/tests/integration/state.go index c163a2d9d3..c495f80d9e 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -42,15 +42,21 @@ type p2pState struct { // The map key is the node id of the subscriber. peerCollections map[int]struct{} - // actualDocHeads contains all document heads that exist on a node. + // actualDAGHeads contains all DAG heads that exist on a node. // // The map key is the doc id. The map value is the doc head. - actualDocHeads map[string]docHeadState + // + // This tracks composite commits for documents, and collection commits for + // branchable collections + actualDAGHeads map[string]docHeadState - // expectedDocHeads contains all document heads that are expected to exist on a node. + // expectedDAGHeads contains all DAG heads that are expected to exist on a node. // - // The map key is the doc id. The map value is the doc head. - expectedDocHeads map[string]cid.Cid + // The map key is the doc id. The map value is the DAG head. + // + // This tracks composite commits for documents, and collection commits for + // branchable collections + expectedDAGHeads map[string]cid.Cid } // docHeadState contains the state of a document head. @@ -68,8 +74,8 @@ func newP2PState() *p2pState { connections: make(map[int]struct{}), replicators: make(map[int]struct{}), peerCollections: make(map[int]struct{}), - actualDocHeads: make(map[string]docHeadState), - expectedDocHeads: make(map[string]cid.Cid), + actualDAGHeads: make(map[string]docHeadState), + expectedDAGHeads: make(map[string]cid.Cid), } } diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 39c9ea9624..66f4f48fe5 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -1404,10 +1404,12 @@ func deleteDoc( assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" { - docIDs := map[string]struct{}{ + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{ docID.String(): {}, } - waitForUpdateEvents(s, action.NodeID, docIDs) + + waitForUpdateEvents(s, action.NodeID, expect) } } From 461ca1744bfe84aac16f9c1e67090639aed06e2d Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Fri, 15 Nov 2024 15:17:06 -0500 Subject: [PATCH 4/8] PR FIXUP - Fix capitalisation --- internal/db/messages.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/db/messages.go b/internal/db/messages.go index 2dcc175cfe..e980eb7d84 100644 --- a/internal/db/messages.go +++ b/internal/db/messages.go @@ -22,7 +22,7 @@ import ( ) func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) { - docIdQueue := newMergeQueue() + docIDQueue := newMergeQueue() schemaRootQueue := newMergeQueue() // This is used to ensure we only trigger loadAndPublishP2PCollections and loadAndPublishReplicators @@ -57,8 +57,8 @@ func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) { defer schemaRootQueue.done(evt.SchemaRoot) } else { // ensure only one merge per docID - docIdQueue.add(evt.DocID) - defer docIdQueue.done(evt.DocID) + docIDQueue.add(evt.DocID) + defer docIDQueue.done(evt.DocID) } // retry the merge process if a conflict occurs From 46435f8be8dfa3be1458b89faec21446d1c6a7cc Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Fri, 15 Nov 2024 16:38:06 -0500 Subject: [PATCH 5/8] PR FIXUP - Simplify waitForUpdateEvents --- tests/integration/acp.go | 5 ++--- tests/integration/events.go | 38 ++++++++++++++++--------------------- tests/integration/utils.go | 11 +++++------ 3 files changed, 23 insertions(+), 31 deletions(-) diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 7bf6e6151f..ce50637d4b 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -215,12 +215,11 @@ func addDocActorRelationshipACP( } if action.ExpectedError == "" && !action.ExpectedExistence { - expect := make([]map[string]struct{}, action.CollectionID+1) - expect[action.CollectionID] = map[string]struct{}{ + expect := map[string]struct{}{ docID: {}, } - waitForUpdateEvents(s, actionNodeID, expect) + waitForUpdateEvents(s, actionNodeID, action.CollectionID, expect) } } diff --git a/tests/integration/events.go b/tests/integration/events.go index 4efcced75a..12fc58f8b7 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -153,7 +153,8 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio func waitForUpdateEvents( s *state, nodeID immutable.Option[int], - docIDs []map[string]struct{}, + collectionIndex int, + docIDs map[string]struct{}, ) { for i := 0; i < len(s.nodes); i++ { if nodeID.HasValue() && nodeID.Value() != i { @@ -166,15 +167,13 @@ func waitForUpdateEvents( } expect := make(map[string]struct{}, len(docIDs)) - for collectionIndex, collectionDocIDs := range docIDs { - for k := range collectionDocIDs { - expect[k] = struct{}{} - col := node.collections[collectionIndex] - if col.Description().IsBranchable { - expect[col.SchemaRoot()] = struct{}{} - } - } + col := node.collections[collectionIndex] + if col.Description().IsBranchable { + expect[col.SchemaRoot()] = struct{}{} + } + for k := range docIDs { + expect[k] = struct{}{} } for len(expect) > 0 { @@ -313,24 +312,21 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { // getEventsForUpdateDoc returns a map of docIDs that should be // published to the local event bus after an UpdateDoc action. -func getEventsForUpdateDoc(s *state, action UpdateDoc) []map[string]struct{} { +func getEventsForUpdateDoc(s *state, action UpdateDoc) map[string]struct{} { docID := s.docIDs[action.CollectionID][action.DocID] docMap := make(map[string]any) err := json.Unmarshal([]byte(action.Doc), &docMap) require.NoError(s.t, err) - expect := make([]map[string]struct{}, action.CollectionID+1) - expect[action.CollectionID] = map[string]struct{}{ + return map[string]struct{}{ docID.String(): {}, } - - return expect } // getEventsForCreateDoc returns a map of docIDs that should be // published to the local event bus after a CreateDoc action. -func getEventsForCreateDoc(s *state, action CreateDoc) []map[string]struct{} { +func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { var collection client.Collection if action.NodeID.HasValue() { collection = s.nodes[action.NodeID.Value()].collections[action.CollectionID] @@ -341,11 +337,10 @@ func getEventsForCreateDoc(s *state, action CreateDoc) []map[string]struct{} { docs, err := parseCreateDocs(action, collection) require.NoError(s.t, err) - expect := make([]map[string]struct{}, action.CollectionID+1) - expect[action.CollectionID] = map[string]struct{}{} + expect := make(map[string]struct{}, action.CollectionID+1) for _, doc := range docs { - expect[action.CollectionID][doc.ID().String()] = struct{}{} + expect[doc.ID().String()] = struct{}{} } return expect @@ -361,16 +356,15 @@ func getEventsForUpdateWithFilter( s *state, action UpdateWithFilter, result *client.UpdateResult, -) []map[string]struct{} { +) map[string]struct{} { var docPatch map[string]any err := json.Unmarshal([]byte(action.Updater), &docPatch) require.NoError(s.t, err) - expect := make([]map[string]struct{}, action.CollectionID+1) - expect[action.CollectionID] = map[string]struct{}{} + expect := make(map[string]struct{}, len(result.DocIDs)) for _, docID := range result.DocIDs { - expect[action.CollectionID][docID] = struct{}{} + expect[docID] = struct{}{} } return expect diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 66f4f48fe5..3c0e9baffd 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -1226,7 +1226,7 @@ func createDoc( s.docIDs[action.CollectionID] = append(s.docIDs[action.CollectionID], docIDs...) if action.ExpectedError == "" { - waitForUpdateEvents(s, action.NodeID, getEventsForCreateDoc(s, action)) + waitForUpdateEvents(s, action.NodeID, action.CollectionID, getEventsForCreateDoc(s, action)) } } @@ -1404,12 +1404,11 @@ func deleteDoc( assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" { - expect := make([]map[string]struct{}, action.CollectionID+1) - expect[action.CollectionID] = map[string]struct{}{ + expect := map[string]struct{}{ docID.String(): {}, } - waitForUpdateEvents(s, action.NodeID, expect) + waitForUpdateEvents(s, action.NodeID, action.CollectionID, expect) } } @@ -1454,7 +1453,7 @@ func updateDoc( assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" && !action.SkipLocalUpdateEvent { - waitForUpdateEvents(s, action.NodeID, getEventsForUpdateDoc(s, action)) + waitForUpdateEvents(s, action.NodeID, action.CollectionID, getEventsForUpdateDoc(s, action)) } } @@ -1554,7 +1553,7 @@ func updateWithFilter(s *state, action UpdateWithFilter) { assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" && !action.SkipLocalUpdateEvent { - waitForUpdateEvents(s, action.NodeID, getEventsForUpdateWithFilter(s, action, res)) + waitForUpdateEvents(s, action.NodeID, action.CollectionID, getEventsForUpdateWithFilter(s, action, res)) } } From c4ab4236e752384d57d63850886359d0699b80ad Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 18 Nov 2024 14:01:27 -0500 Subject: [PATCH 6/8] PR FIXUP - Expand concurrent test to correctly establish SEC With Fred's help :) --- .../commits/branchables/peer_update_test.go | 139 ++++++------------ 1 file changed, 48 insertions(+), 91 deletions(-) diff --git a/tests/integration/query/commits/branchables/peer_update_test.go b/tests/integration/query/commits/branchables/peer_update_test.go index 319baa064e..4824c1b981 100644 --- a/tests/integration/query/commits/branchables/peer_update_test.go +++ b/tests/integration/query/commits/branchables/peer_update_test.go @@ -53,17 +53,30 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t }, testUtils.WaitForSync{}, testUtils.UpdateDoc{ + // Update node 1 after the peer connection has been established, this will cause the `Shahzad` commit + // to be synced to node 0, as well as the related collection commits. NodeID: immutable.Some(1), Doc: `{ "name": "Chris" }`, }, testUtils.WaitForSync{}, - // Note: node 1 does not recieve the first update from node 0 as it occured before the nodes were connected - // node 0 has it as it recieved it when recieving the second update from node 1. The cids and blocks remain - // consistent across both nodes (minus the missing commits). - testUtils.Request{ + testUtils.UpdateDoc{ + // Update node 0 after `Chris` and `Shahzad` have synced to node 0. As this update happens after the peer + // connection has been established, this will cause the `Fred` and `Addo` doc commits, and their corresponding + // collection-level commits to sync to node 1. + // + // Now, all nodes should have a full history, including the 'offline' changes made before establishing the + // peer connection. NodeID: immutable.Some(0), + Doc: `{ + "name": "Addo" + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + // Strong eventual consistency must now have been established across both nodes, the result of this query + // *must* exactly match across both nodes. Request: `query { commits { cid @@ -75,13 +88,16 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t Results: map[string]any{ "commits": []map[string]any{ { - "cid": testUtils.NewUniqueCid("collection, update2"), + "cid": testUtils.NewUniqueCid("collection, update3"), "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, update2"), + }, { "cid": testUtils.NewUniqueCid("collection, node1 update1"), }, { - "cid": testUtils.NewUniqueCid("doc, update2"), + "cid": testUtils.NewUniqueCid("doc, update3"), }, }, }, @@ -105,49 +121,32 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t }, }, { - "cid": testUtils.NewUniqueCid("collection, node0 update1"), + "cid": testUtils.NewUniqueCid("collection, update2"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("collection, create"), + "cid": testUtils.NewUniqueCid("collection, node0 update1"), }, { - "cid": testUtils.NewUniqueCid("doc, node0 update1"), - }, - }, - }, - { - "cid": testUtils.NewUniqueCid("name, node0 update1"), - "links": []map[string]any{ - { - "cid": testUtils.NewUniqueCid("name, create"), + "cid": testUtils.NewUniqueCid("doc, update2"), }, }, }, { - "cid": testUtils.NewUniqueCid("name, create"), - "links": []map[string]any{}, - }, - { - "cid": testUtils.NewUniqueCid("name, update2"), + "cid": testUtils.NewUniqueCid("collection, node0 update1"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("name, node1 update1"), + "cid": testUtils.NewUniqueCid("collection, create"), }, - }, - }, - { - "cid": testUtils.NewUniqueCid("name, node1 update1"), - "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("name, create"), + "cid": testUtils.NewUniqueCid("doc, node0 update1"), }, }, }, { - "cid": testUtils.NewUniqueCid("doc, update2"), + "cid": testUtils.NewUniqueCid("name, update3"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("doc, node1 update1"), + "cid": testUtils.NewUniqueCid("name, node1 update1"), }, { "cid": testUtils.NewUniqueCid("name, update2"), @@ -155,18 +154,15 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t }, }, { - "cid": testUtils.NewUniqueCid("doc, node1 update1"), + "cid": testUtils.NewUniqueCid("name, update2"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("doc, create"), - }, - { - "cid": testUtils.NewUniqueCid("name, node1 update1"), + "cid": testUtils.NewUniqueCid("name, node0 update1"), }, }, }, { - "cid": testUtils.NewUniqueCid("doc, create"), + "cid": testUtils.NewUniqueCid("name, node0 update1"), "links": []map[string]any{ { "cid": testUtils.NewUniqueCid("name, create"), @@ -174,86 +170,55 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t }, }, { - "cid": testUtils.NewUniqueCid("doc, node0 update1"), - "links": []map[string]any{ - { - "cid": testUtils.NewUniqueCid("doc, create"), - }, - { - "cid": testUtils.NewUniqueCid("name, node0 update1"), - }, - }, + "cid": testUtils.NewUniqueCid("name, create"), + "links": []map[string]any{}, }, - }, - }, - }, - testUtils.Request{ - NodeID: immutable.Some(1), - Request: `query { - commits { - cid - links { - cid - } - } - }`, - Results: map[string]any{ - "commits": []map[string]any{ { - "cid": testUtils.NewUniqueCid("collection, update2"), + "cid": testUtils.NewUniqueCid("name, node1 update1"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("collection, node1 update1"), - }, - { - "cid": testUtils.NewUniqueCid("doc, update2"), + "cid": testUtils.NewUniqueCid("name, create"), }, }, }, { - "cid": testUtils.NewUniqueCid("collection, node1 update1"), + "cid": testUtils.NewUniqueCid("doc, update3"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("collection, create"), + "cid": testUtils.NewUniqueCid("doc, update2"), }, { "cid": testUtils.NewUniqueCid("doc, node1 update1"), }, + { + "cid": testUtils.NewUniqueCid("name, update3"), + }, }, }, { - "cid": testUtils.NewUniqueCid("collection, create"), + "cid": testUtils.NewUniqueCid("doc, node1 update1"), "links": []map[string]any{ { "cid": testUtils.NewUniqueCid("doc, create"), }, - }, - }, - { - "cid": testUtils.NewUniqueCid("name, update2"), - "links": []map[string]any{ { "cid": testUtils.NewUniqueCid("name, node1 update1"), }, }, }, { - "cid": testUtils.NewUniqueCid("name, node1 update1"), + "cid": testUtils.NewUniqueCid("doc, create"), "links": []map[string]any{ { "cid": testUtils.NewUniqueCid("name, create"), }, }, }, - { - "cid": testUtils.NewUniqueCid("name, create"), - "links": []map[string]any{}, - }, { "cid": testUtils.NewUniqueCid("doc, update2"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("doc, node1 update1"), + "cid": testUtils.NewUniqueCid("doc, node0 update1"), }, { "cid": testUtils.NewUniqueCid("name, update2"), @@ -261,21 +226,13 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t }, }, { - "cid": testUtils.NewUniqueCid("doc, node1 update1"), + "cid": testUtils.NewUniqueCid("doc, node0 update1"), "links": []map[string]any{ { "cid": testUtils.NewUniqueCid("doc, create"), }, { - "cid": testUtils.NewUniqueCid("name, node1 update1"), - }, - }, - }, - { - "cid": testUtils.NewUniqueCid("doc, create"), - "links": []map[string]any{ - { - "cid": testUtils.NewUniqueCid("name, create"), + "cid": testUtils.NewUniqueCid("name, node0 update1"), }, }, }, @@ -291,7 +248,7 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t Results: map[string]any{ "Users": []map[string]any{ { - "name": "Chris", + "name": "Addo", }, }, }, From 8b7e88b55c67a90b4337711c383122b5dd446061 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 18 Nov 2024 14:11:28 -0500 Subject: [PATCH 7/8] PR FIXUP - Use strings for cid identifiers --- .../query/commits/branchables/peer_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/query/commits/branchables/peer_test.go b/tests/integration/query/commits/branchables/peer_test.go index 8ba6bf7294..6d864ad9d1 100644 --- a/tests/integration/query/commits/branchables/peer_test.go +++ b/tests/integration/query/commits/branchables/peer_test.go @@ -59,29 +59,29 @@ func TestQueryCommitsBranchables_SyncsAcrossPeerConnection(t *testing.T) { Results: map[string]any{ "commits": []map[string]any{ { - "cid": testUtils.NewUniqueCid(0), + "cid": testUtils.NewUniqueCid("collection"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid(1), + "cid": testUtils.NewUniqueCid("composite"), }, }, }, { - "cid": testUtils.NewUniqueCid(2), + "cid": testUtils.NewUniqueCid("age"), "links": []map[string]any{}, }, { - "cid": testUtils.NewUniqueCid(3), + "cid": testUtils.NewUniqueCid("name"), "links": []map[string]any{}, }, { - "cid": testUtils.NewUniqueCid(1), + "cid": testUtils.NewUniqueCid("composite"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid(2), + "cid": testUtils.NewUniqueCid("age"), }, { - "cid": testUtils.NewUniqueCid(3), + "cid": testUtils.NewUniqueCid("name"), }, }, }, From 8a9f166450909cb9eb7e915dd5af8745f949ed53 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 18 Nov 2024 15:37:31 -0500 Subject: [PATCH 8/8] PR FIXUP - Note node in update cid descriptions --- .../commits/branchables/peer_update_test.go | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/integration/query/commits/branchables/peer_update_test.go b/tests/integration/query/commits/branchables/peer_update_test.go index 4824c1b981..01789a3bc4 100644 --- a/tests/integration/query/commits/branchables/peer_update_test.go +++ b/tests/integration/query/commits/branchables/peer_update_test.go @@ -88,16 +88,16 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t Results: map[string]any{ "commits": []map[string]any{ { - "cid": testUtils.NewUniqueCid("collection, update3"), + "cid": testUtils.NewUniqueCid("collection, node0 update3"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("collection, update2"), + "cid": testUtils.NewUniqueCid("collection, node1 update2"), }, { "cid": testUtils.NewUniqueCid("collection, node1 update1"), }, { - "cid": testUtils.NewUniqueCid("doc, update3"), + "cid": testUtils.NewUniqueCid("doc, node0 update3"), }, }, }, @@ -121,13 +121,13 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t }, }, { - "cid": testUtils.NewUniqueCid("collection, update2"), + "cid": testUtils.NewUniqueCid("collection, node1 update2"), "links": []map[string]any{ { "cid": testUtils.NewUniqueCid("collection, node0 update1"), }, { - "cid": testUtils.NewUniqueCid("doc, update2"), + "cid": testUtils.NewUniqueCid("doc, node1 update2"), }, }, }, @@ -143,18 +143,18 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t }, }, { - "cid": testUtils.NewUniqueCid("name, update3"), + "cid": testUtils.NewUniqueCid("name, node0 update3"), "links": []map[string]any{ { "cid": testUtils.NewUniqueCid("name, node1 update1"), }, { - "cid": testUtils.NewUniqueCid("name, update2"), + "cid": testUtils.NewUniqueCid("name, node1 update2"), }, }, }, { - "cid": testUtils.NewUniqueCid("name, update2"), + "cid": testUtils.NewUniqueCid("name, node1 update2"), "links": []map[string]any{ { "cid": testUtils.NewUniqueCid("name, node0 update1"), @@ -182,16 +182,16 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t }, }, { - "cid": testUtils.NewUniqueCid("doc, update3"), + "cid": testUtils.NewUniqueCid("doc, node0 update3"), "links": []map[string]any{ { - "cid": testUtils.NewUniqueCid("doc, update2"), + "cid": testUtils.NewUniqueCid("doc, node1 update2"), }, { "cid": testUtils.NewUniqueCid("doc, node1 update1"), }, { - "cid": testUtils.NewUniqueCid("name, update3"), + "cid": testUtils.NewUniqueCid("name, node0 update3"), }, }, }, @@ -215,13 +215,13 @@ func TestQueryCommitsBranchables_HandlesConcurrentUpdatesAcrossPeerConnection(t }, }, { - "cid": testUtils.NewUniqueCid("doc, update2"), + "cid": testUtils.NewUniqueCid("doc, node1 update2"), "links": []map[string]any{ { "cid": testUtils.NewUniqueCid("doc, node0 update1"), }, { - "cid": testUtils.NewUniqueCid("name, update2"), + "cid": testUtils.NewUniqueCid("name, node1 update2"), }, }, },