diff --git a/go.mod b/go.mod index 3bdbf60438..de665e784f 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/ipfs/go-log/v2 v2.5.1 github.com/ipld/go-ipld-prime v0.21.0 github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8 + github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8 github.com/jbenet/goprocess v0.1.4 github.com/lens-vm/lens/host-go v0.0.0-20231127204031-8d858ed2926c github.com/lestrrat-go/jwx/v2 v2.0.21 @@ -185,12 +186,10 @@ require ( github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.3 // indirect - github.com/ipfs/go-ipld-legacy v0.2.1 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-metrics-interface v0.0.1 // indirect github.com/ipfs/go-peertaskqueue v0.8.1 // indirect github.com/ipfs/kubo v0.25.0 // indirect - github.com/ipld/go-codec-dagpb v1.6.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect diff --git a/go.sum b/go.sum index 9842b09146..eade42f4c3 100644 --- a/go.sum +++ b/go.sum @@ -636,6 +636,8 @@ github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8 h1:WQVfplCGOHtFNyZH7eOaEqGsbbje3NP8EFeGggUvEQs= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:PVDd/V/Zz9IW+Diz9LEhD+ZYS9pKzawmtVQhVd0hcgQ= +github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8 h1:adq3fTx2YXmpTPNvBRIM0Zi5lX4JjQTRjdLYKhXMkQg= +github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:ej/GTRX+HjlHMs/M3zg9fM8mUlQXgHqRvPJjtp+atHw= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= diff --git a/net/server.go b/net/server.go index 868a2eb6a3..2ff62b10f8 100644 --- a/net/server.go +++ b/net/server.go @@ -18,9 +18,17 @@ import ( "sync" "time" - "github.com/ipfs/boxo/ipld/merkledag" cid "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/schema" + "github.com/ipld/go-ipld-prime/storage/bsrvadapter" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p/core/event" libpeer "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" @@ -31,9 +39,9 @@ import ( "google.golang.org/protobuf/proto" "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/datastore/badger/v4" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" + coreblock "github.com/sourcenetwork/defradb/internal/core/block" pb "github.com/sourcenetwork/defradb/net/pb" ) @@ -184,7 +192,11 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL } }() - err = s.syncDAG(ctx, headCID) + block, err := coreblock.GetFromBytes(req.Body.Log.Block) + if err != nil { + return nil, err + } + err = s.syncDAG(ctx, block) if err != nil { return nil, err } @@ -361,30 +373,45 @@ func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) { // // This process will walk the entire DAG until the issue below is resolved. // https://github.com/sourcenetwork/defradb/issues/2722 -func (s *server) syncDAG(ctx context.Context, c cid.Cid) error { +func (s *server) syncDAG(ctx context.Context, block *coreblock.Block) error { ctx, cancel := context.WithTimeout(ctx, syncDAGTimeout) defer cancel() - dserv := merkledag.NewDAGService(s.peer.bserv) - var ng format.NodeGetter = merkledag.NewSession(ctx, dserv) + // Store the block in the DAG store + storage := &bsrvadapter.Adapter{Wrapped: s.peer.bserv} + lsys := cidlink.DefaultLinkSystem() + lsys.SetWriteStorage(storage) + lsys.SetReadStorage(storage) + _, err := lsys.Store(linking.LinkContext{Ctx: ctx}, coreblock.GetLinkPrototype(), block.GenerateNode()) + if err != nil { + return err + } - set := make(map[cid.Cid]struct{}) - // visit each node only once - visit := func(c cid.Cid) bool { - _, ok := set[c] - set[c] = struct{}{} - return !ok + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + matchAllSelector, err := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreUnion( + ssb.Matcher(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge()), + )).Selector() + if err != nil { + return err } - // ignore transaction conflict errors - onError := func(c cid.Cid, err error) error { - if errors.Is(err, badger.ErrTxnConflict) { - return nil + + prototypeChooser := func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) { + if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok { + return tlnkNd.LinkTargetNodePrototype(), nil } - return err + return basicnode.Prototype.Any, nil } - opts := []merkledag.WalkOption{merkledag.Concurrent(), merkledag.OnError(onError)} - return merkledag.Walk(ctx, merkledag.GetLinksDirect(ng), c, visit, opts...) + config := traversal.Config{ + Ctx: ctx, + LinkSystem: lsys, + LinkVisitOnlyOnce: true, + LinkTargetNodePrototypeChooser: prototypeChooser, + } + return traversal.Progress{Cfg: &config}.WalkMatching(block.GenerateNode(), matchAllSelector, func(p traversal.Progress, n datamodel.Node) error { + return nil + }) } // addr implements net.Addr and holds a libp2p peer ID.