Skip to content

Commit

Permalink
use ipld prime for dag sync
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 14, 2024
1 parent 5ee9616 commit 3f0a9bf
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8
github.com/jbenet/goprocess v0.1.4
github.com/lens-vm/lens/host-go v0.0.0-20231127204031-8d858ed2926c
github.com/lestrrat-go/jwx/v2 v2.0.21
Expand Down Expand Up @@ -185,12 +186,10 @@ require (
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-legacy v0.2.1 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/ipfs/kubo v0.25.0 // indirect
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH
github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8 h1:WQVfplCGOHtFNyZH7eOaEqGsbbje3NP8EFeGggUvEQs=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:PVDd/V/Zz9IW+Diz9LEhD+ZYS9pKzawmtVQhVd0hcgQ=
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8 h1:adq3fTx2YXmpTPNvBRIM0Zi5lX4JjQTRjdLYKhXMkQg=
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:ej/GTRX+HjlHMs/M3zg9fM8mUlQXgHqRvPJjtp+atHw=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
Expand Down
67 changes: 47 additions & 20 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@ import (
"sync"
"time"

"github.com/ipfs/boxo/ipld/merkledag"
cid "github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/schema"
"github.com/ipld/go-ipld-prime/storage/bsrvadapter"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p/core/event"
libpeer "github.com/libp2p/go-libp2p/core/peer"
"github.com/sourcenetwork/corelog"
Expand All @@ -31,9 +39,9 @@ import (
"google.golang.org/protobuf/proto"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/events"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
pb "github.com/sourcenetwork/defradb/net/pb"
)

Expand Down Expand Up @@ -184,7 +192,11 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
}
}()

err = s.syncDAG(ctx, headCID)
block, err := coreblock.GetFromBytes(req.Body.Log.Block)
if err != nil {
return nil, err

Check warning on line 197 in net/server.go

View check run for this annotation

Codecov / codecov/patch

net/server.go#L197

Added line #L197 was not covered by tests
}
err = s.syncDAG(ctx, block)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -361,30 +373,45 @@ func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) {
//
// This process will walk the entire DAG until the issue below is resolved.
// https://github.com/sourcenetwork/defradb/issues/2722
func (s *server) syncDAG(ctx context.Context, c cid.Cid) error {
func (s *server) syncDAG(ctx context.Context, block *coreblock.Block) error {
ctx, cancel := context.WithTimeout(ctx, syncDAGTimeout)
defer cancel()

dserv := merkledag.NewDAGService(s.peer.bserv)
var ng format.NodeGetter = merkledag.NewSession(ctx, dserv)
// Store the block in the DAG store
storage := &bsrvadapter.Adapter{Wrapped: s.peer.bserv}
lsys := cidlink.DefaultLinkSystem()
lsys.SetWriteStorage(storage)
lsys.SetReadStorage(storage)
_, err := lsys.Store(linking.LinkContext{Ctx: ctx}, coreblock.GetLinkPrototype(), block.GenerateNode())
if err != nil {
return err

Check warning on line 387 in net/server.go

View check run for this annotation

Codecov / codecov/patch

net/server.go#L387

Added line #L387 was not covered by tests
}

set := make(map[cid.Cid]struct{})
// visit each node only once
visit := func(c cid.Cid) bool {
_, ok := set[c]
set[c] = struct{}{}
return !ok
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
matchAllSelector, err := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreUnion(
ssb.Matcher(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
)).Selector()
if err != nil {
return err

Check warning on line 396 in net/server.go

View check run for this annotation

Codecov / codecov/patch

net/server.go#L396

Added line #L396 was not covered by tests
}
// ignore transaction conflict errors
onError := func(c cid.Cid, err error) error {
if errors.Is(err, badger.ErrTxnConflict) {
return nil

prototypeChooser := func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) {
if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok {
return tlnkNd.LinkTargetNodePrototype(), nil

Check warning on line 401 in net/server.go

View check run for this annotation

Codecov / codecov/patch

net/server.go#L401

Added line #L401 was not covered by tests
}
return err
return basicnode.Prototype.Any, nil
}

opts := []merkledag.WalkOption{merkledag.Concurrent(), merkledag.OnError(onError)}
return merkledag.Walk(ctx, merkledag.GetLinksDirect(ng), c, visit, opts...)
config := traversal.Config{
Ctx: ctx,
LinkSystem: lsys,
LinkVisitOnlyOnce: true,
LinkTargetNodePrototypeChooser: prototypeChooser,
}
return traversal.Progress{Cfg: &config}.WalkMatching(block.GenerateNode(), matchAllSelector, func(p traversal.Progress, n datamodel.Node) error {

Check failure on line 412 in net/server.go

View workflow job for this annotation

GitHub Actions / Lint GoLang job

line is 146 characters (lll)
return nil
})
}

// addr implements net.Addr and holds a libp2p peer ID.
Expand Down

0 comments on commit 3f0a9bf

Please sign in to comment.