Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bitswap client #856

Merged
merged 25 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
85b7358
booster bitswap MVP executable (#707)
hannahhoward Sep 12, 2022
98c2e19
booster-bitswap devnet and tracing (#796)
nonsense Sep 13, 2022
14ed4a5
return ipld ErrNotFound from remote blockstore interface (#798)
dirkmc Sep 14, 2022
87dc643
test: comment out part of TestDummydealOnline that is flaky due to a …
dirkmc Sep 14, 2022
5ddd9de
fix normaliseError nil ptr dereference (#803)
nonsense Sep 14, 2022
e0e1d73
merge main into release/lotus1.17.2
nonsense Sep 16, 2022
58163ca
feat: shard selector (#807)
dirkmc Sep 21, 2022
a58ea5a
LoadBalancer for bitswap (and later, more of libp2p) (#786)
hannahhoward Sep 22, 2022
c9d91e6
Add block filter via BadBits (#825)
hannahhoward Sep 26, 2022
0e75016
Libp2p 0.22 upgrade (#837)
hannahhoward Sep 27, 2022
957e65d
Protocol Proxy cleanup (#836)
hannahhoward Sep 27, 2022
e2c6012
feat: update to dagstore v0.5.5 (#849)
dirkmc Sep 30, 2022
e3557d7
refactor: merge main into release v1.17.2
dirkmc Oct 3, 2022
5b425ea
feat: bitswap client
dirkmc Oct 3, 2022
e9da747
feat: bitswap client - output car file
dirkmc Oct 3, 2022
0bd7736
refactor: bitswap client - remove tracing
dirkmc Oct 3, 2022
1cfa691
feat: debug logs
dirkmc Oct 3, 2022
d3b5593
fix: write blocks to blockstore
dirkmc Oct 3, 2022
f2f9729
fix: duration output
dirkmc Oct 3, 2022
55c278d
fix: duration output for block received
dirkmc Oct 3, 2022
c1f57f3
feat: add pprof to bitswap client
dirkmc Oct 3, 2022
c4c3f1d
feat: protocol proxy logging
dirkmc Oct 3, 2022
bf618c6
feat: bitswap client - check host supports bitswap protocol
dirkmc Oct 4, 2022
f1a1ccd
feat: listen for bitswap requests locally as well as through forwardi…
dirkmc Oct 4, 2022
f4246da
refactor: merge main
dirkmc Oct 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions cmd/booster-bitswap/bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package bitswap

import (
"github.com/ipfs/go-bitswap/network"
"github.com/libp2p/go-libp2p/core/protocol"
)

var Protocols = []protocol.ID{
network.ProtocolBitswap,
network.ProtocolBitswapNoVers,
network.ProtocolBitswapOneOne,
network.ProtocolBitswapOneZero,
}

var ProtocolStrings = []string{}

func init() {
for _, p := range Protocols {
ProtocolStrings = append(ProtocolStrings, string(p))
}
}
220 changes: 220 additions & 0 deletions cmd/booster-bitswap/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package main

import (
"context"
"crypto/rand"
"fmt"
"net/http"
_ "net/http/pprof"
"sort"
"sync/atomic"
"time"

"github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-bitswap/client"
bsnetwork "github.com/ipfs/go-bitswap/network"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
ipldlegacy "github.com/ipfs/go-ipld-legacy"
"github.com/ipld/go-car/v2/blockstore"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
)

var fetchCmd = &cli.Command{
Name: "fetch",
Usage: "fetch <multiaddr> <root cid> <output car path>",
Description: "Fetch all blocks in the DAG under the given root cid from the bitswap node at multiaddr",
Before: before,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "pprof",
Usage: "run pprof web server on localhost:6071",
},
&cli.IntFlag{
Name: "concurrency",
Usage: "concurrent request limit - 0 means unlimited",
Value: 10,
},
},
Action: func(cctx *cli.Context) error {
if cctx.Bool("pprof") {
go func() {
err := http.ListenAndServe("localhost:6071", nil)
if err != nil {
log.Error(err)
}
}()
}

if cctx.Args().Len() != 3 {
return fmt.Errorf("usage: fetch <multiaddr> <root cid> <output car path>")
}

addrInfoStr := cctx.Args().Get(0)
serverAddrInfo, err := peer.AddrInfoFromString(addrInfoStr)
if err != nil {
return fmt.Errorf("parsing server multiaddr %s: %w", addrInfoStr, err)
}

rootCidStr := cctx.Args().Get(1)
rootCid, err := cid.Parse(rootCidStr)
if err != nil {
return fmt.Errorf("parsing cid %s: %w", rootCidStr, err)
}

outputCarPath := cctx.Args().Get(2)

ctx := lcli.ReqContext(cctx)

// setup libp2p host
log.Infow("generating libp2p key")
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you went with ECDSA over ED25519, I tend to see the latter as the default in more places.

if err != nil {
return err
}

host, err := libp2p.New(
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(quic.NewTransport),
Comment on lines +88 to +89
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these the only transports you're supporting because they're the only ones Boost supports?

libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
libp2p.Identity(privKey),
libp2p.ResourceManager(network.NullResourceManager),
)
if err != nil {
return err
}

// Create a bitswap client
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
if err != nil {
return err
}
net := bsnetwork.NewFromIpfsHost(host, nilRouter)
bs, err := blockstore.OpenReadWrite(outputCarPath, []cid.Cid{rootCid}, blockstore.UseWholeCIDs(true))
if err != nil {
return fmt.Errorf("creating blockstore at %s: %w", outputCarPath, err)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
brn := &blockReceiver{bs: bs, ctx: ctx, cancel: cancel}
bsClient := client.New(ctx, net, bs, client.WithBlockReceivedNotifier(brn))
defer bsClient.Close()
net.Start(bsClient)

// Connect to host
connectStart := time.Now()
log.Infow("connecting to server", "server", serverAddrInfo.String())
err = host.Connect(ctx, *serverAddrInfo)
if err != nil {
return fmt.Errorf("connecting to %s: %w", serverAddrInfo, err)
Comment on lines +120 to +122
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely cause you problems with large files since if the connection gets pruned on the server side it'll never get re-established and your download will stall (e.g. like ipfs/ipget#103).

It's resolvable by trying to keep the connection alive.

}
log.Debugw("connected to server", "duration", time.Since(connectStart).String())

// Check host's libp2p protocols
protos, err := host.Peerstore().GetProtocols(serverAddrInfo.ID)
if err != nil {
return fmt.Errorf("getting protocols from peer store for %s: %w", serverAddrInfo.ID, err)
}
sort.Slice(protos, func(i, j int) bool {
return protos[i] < protos[j]
})
log.Debugw("host libp2p protocols", "protocols", protos)
p, err := host.Peerstore().FirstSupportedProtocol(serverAddrInfo.ID, bitswap.ProtocolStrings...)
if err != nil {
return fmt.Errorf("getting first supported protocol from peer store for %s: %w", serverAddrInfo.ID, err)
}
if p == "" {
return fmt.Errorf("host %s does not support any know bitswap protocols: %s", serverAddrInfo.ID, bitswap.ProtocolStrings)
}

var throttle chan struct{}
concurrency := cctx.Int("concurrency")
if concurrency > 0 {
throttle = make(chan struct{}, concurrency)
}

// Fetch all blocks under the root cid
log.Infow("fetch", "cid", rootCid, "concurrency", concurrency)
start := time.Now()
count, size, err := getBlocks(ctx, bsClient, rootCid, throttle)
if err != nil {
return fmt.Errorf("getting blocks: %w", err)
}

log.Infow("fetch complete", "count", count, "size", size, "duration", time.Since(start).String())
log.Debug("finalizing")
finalizeStart := time.Now()
defer func() { log.Infow("finalize complete", "duration", time.Since(finalizeStart).String()) }()
return bs.Finalize()
},
}

func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle chan struct{}) (uint64, uint64, error) {
if throttle != nil {
throttle <- struct{}{}
}
// Get the block
start := time.Now()
blk, err := bsClient.GetBlock(ctx, c)
if throttle != nil {
<-throttle
}
if err != nil {
return 0, 0, err
}

var size = uint64(len(blk.RawData()))
log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start).String())

// Read the links from the block to child nodes in the DAG
var count = uint64(1)
nd, err := ipldlegacy.DecodeNode(ctx, blk)
if err != nil {
return 0, 0, fmt.Errorf("decoding node %s: %w", c, err)
}

var eg errgroup.Group
lnks := nd.Links()
for _, l := range lnks {
l := l
// Launch a go routine to fetch the blocks underneath each link
eg.Go(func() error {
cnt, sz, err := getBlocks(ctx, bsClient, l.Cid, throttle)
if err != nil {
return err
}
atomic.AddUint64(&count, cnt)
atomic.AddUint64(&size, sz)
return nil
})
}

return count, size, eg.Wait()
}

type blockReceiver struct {
bs *blockstore.ReadWrite
ctx context.Context
cancel context.CancelFunc
}

func (b blockReceiver) ReceivedBlocks(id peer.ID, blks []blocks.Block) {
err := b.bs.PutMany(b.ctx, blks)
if err != nil {
log.Errorw("failed to write blocks to blockstore: %s", err)
b.cancel()
}
}
1 change: 1 addition & 0 deletions cmd/booster-bitswap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func main() {
Commands: []*cli.Command{
initCmd,
runCmd,
fetchCmd,
},
}
app.Setup()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/ipfs/go-ipfs-files v0.1.1
github.com/ipfs/go-ipfs-routing v0.2.1
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-ipld-legacy v0.1.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.6.0
github.com/ipfs/go-metrics-interface v0.0.1
Expand Down Expand Up @@ -217,7 +218,6 @@ require (
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
github.com/ipfs/go-ipns v0.2.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-path v0.3.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion node/impl/boost_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/filecoin-project/dagstore/shard"
"github.com/multiformats/go-multihash"

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand All @@ -18,7 +20,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
)

func (sm *BoostAPI) MarketListDataTransfers(ctx context.Context) ([]lapi.DataTransferChannel, error) {
Expand Down
15 changes: 4 additions & 11 deletions node/modules/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,18 @@ import (
"context"
"fmt"

"github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap"
"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/boost/protocolproxy"
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
"github.com/filecoin-project/boost/retrievalmarket/types"
"github.com/ipfs/go-bitswap/network"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
)

var bitswapProtocols = []protocol.ID{
network.ProtocolBitswap,
network.ProtocolBitswapNoVers,
network.ProtocolBitswapOneOne,
network.ProtocolBitswapOneZero,
}

func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.TransportsListener, error) {
return func(h host.Host) (*lp2pimpl.TransportsListener, error) {
protos := []types.Protocol{}
Expand Down Expand Up @@ -84,7 +77,7 @@ func NewProtocolProxy(cfg *config.Boost) func(h host.Host) (*protocolproxy.Proto
if err != nil {
return nil, err
}
peerConfig[bsPeerID] = bitswapProtocols
peerConfig[bsPeerID] = bitswap.Protocols
}
return protocolproxy.NewProtocolProxy(h, peerConfig)
}
Expand All @@ -93,12 +86,12 @@ func NewProtocolProxy(cfg *config.Boost) func(h host.Host) (*protocolproxy.Proto
func HandleProtocolProxy(lc fx.Lifecycle, pp *protocolproxy.ProtocolProxy) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
log.Info("starting load balancer")
log.Info("starting libp2p protocol proxy")
pp.Start(ctx)
return nil
},
OnStop: func(context.Context) error {
log.Info("stopping load balancer")
log.Info("stopping libp2p protocol proxy")
pp.Close()
return nil
},
Expand Down
Loading