-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bitswap client #856
bitswap client #856
Changes from all commits
85b7358
98c2e19
14ed4a5
87dc643
5ddd9de
e0e1d73
58163ca
a58ea5a
c9d91e6
0e75016
957e65d
e2c6012
e3557d7
5b425ea
e9da747
0bd7736
1cfa691
d3b5593
f2f9729
55c278d
c1f57f3
c4c3f1d
bf618c6
f1a1ccd
f4246da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package bitswap | ||
|
||
import ( | ||
"github.com/ipfs/go-bitswap/network" | ||
"github.com/libp2p/go-libp2p/core/protocol" | ||
) | ||
|
||
var Protocols = []protocol.ID{ | ||
network.ProtocolBitswap, | ||
network.ProtocolBitswapNoVers, | ||
network.ProtocolBitswapOneOne, | ||
network.ProtocolBitswapOneZero, | ||
} | ||
|
||
var ProtocolStrings = []string{} | ||
|
||
func init() { | ||
for _, p := range Protocols { | ||
ProtocolStrings = append(ProtocolStrings, string(p)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"crypto/rand" | ||
"fmt" | ||
"net/http" | ||
_ "net/http/pprof" | ||
"sort" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap" | ||
lcli "github.com/filecoin-project/lotus/cli" | ||
"github.com/ipfs/go-bitswap/client" | ||
bsnetwork "github.com/ipfs/go-bitswap/network" | ||
blocks "github.com/ipfs/go-block-format" | ||
"github.com/ipfs/go-cid" | ||
nilrouting "github.com/ipfs/go-ipfs-routing/none" | ||
ipldlegacy "github.com/ipfs/go-ipld-legacy" | ||
"github.com/ipld/go-car/v2/blockstore" | ||
"github.com/libp2p/go-libp2p" | ||
"github.com/libp2p/go-libp2p/core/crypto" | ||
"github.com/libp2p/go-libp2p/core/network" | ||
"github.com/libp2p/go-libp2p/core/peer" | ||
"github.com/libp2p/go-libp2p/p2p/muxer/mplex" | ||
"github.com/libp2p/go-libp2p/p2p/muxer/yamux" | ||
quic "github.com/libp2p/go-libp2p/p2p/transport/quic" | ||
"github.com/libp2p/go-libp2p/p2p/transport/tcp" | ||
"github.com/urfave/cli/v2" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
var fetchCmd = &cli.Command{ | ||
Name: "fetch", | ||
Usage: "fetch <multiaddr> <root cid> <output car path>", | ||
Description: "Fetch all blocks in the DAG under the given root cid from the bitswap node at multiaddr", | ||
Before: before, | ||
Flags: []cli.Flag{ | ||
&cli.BoolFlag{ | ||
Name: "pprof", | ||
Usage: "run pprof web server on localhost:6071", | ||
}, | ||
&cli.IntFlag{ | ||
Name: "concurrency", | ||
Usage: "concurrent request limit - 0 means unlimited", | ||
Value: 10, | ||
}, | ||
}, | ||
Action: func(cctx *cli.Context) error { | ||
if cctx.Bool("pprof") { | ||
go func() { | ||
err := http.ListenAndServe("localhost:6071", nil) | ||
if err != nil { | ||
log.Error(err) | ||
} | ||
}() | ||
} | ||
|
||
if cctx.Args().Len() != 3 { | ||
return fmt.Errorf("usage: fetch <multiaddr> <root cid> <output car path>") | ||
} | ||
|
||
addrInfoStr := cctx.Args().Get(0) | ||
serverAddrInfo, err := peer.AddrInfoFromString(addrInfoStr) | ||
if err != nil { | ||
return fmt.Errorf("parsing server multiaddr %s: %w", addrInfoStr, err) | ||
} | ||
|
||
rootCidStr := cctx.Args().Get(1) | ||
rootCid, err := cid.Parse(rootCidStr) | ||
if err != nil { | ||
return fmt.Errorf("parsing cid %s: %w", rootCidStr, err) | ||
} | ||
|
||
outputCarPath := cctx.Args().Get(2) | ||
|
||
ctx := lcli.ReqContext(cctx) | ||
|
||
// setup libp2p host | ||
log.Infow("generating libp2p key") | ||
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
host, err := libp2p.New( | ||
libp2p.Transport(tcp.NewTCPTransport), | ||
libp2p.Transport(quic.NewTransport), | ||
Comment on lines
+88
to
+89
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these the only transports you're supporting because they're the only ones Boost supports? |
||
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), | ||
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), | ||
libp2p.Identity(privKey), | ||
libp2p.ResourceManager(network.NullResourceManager), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Create a bitswap client | ||
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil) | ||
if err != nil { | ||
return err | ||
} | ||
net := bsnetwork.NewFromIpfsHost(host, nilRouter) | ||
bs, err := blockstore.OpenReadWrite(outputCarPath, []cid.Cid{rootCid}, blockstore.UseWholeCIDs(true)) | ||
if err != nil { | ||
return fmt.Errorf("creating blockstore at %s: %w", outputCarPath, err) | ||
} | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
brn := &blockReceiver{bs: bs, ctx: ctx, cancel: cancel} | ||
bsClient := client.New(ctx, net, bs, client.WithBlockReceivedNotifier(brn)) | ||
defer bsClient.Close() | ||
net.Start(bsClient) | ||
|
||
// Connect to host | ||
connectStart := time.Now() | ||
log.Infow("connecting to server", "server", serverAddrInfo.String()) | ||
err = host.Connect(ctx, *serverAddrInfo) | ||
if err != nil { | ||
return fmt.Errorf("connecting to %s: %w", serverAddrInfo, err) | ||
Comment on lines
+120
to
+122
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will likely cause you problems with large files since if the connection gets pruned on the server side it'll never get re-established and your download will stall (e.g. like ipfs/ipget#103). It's resolvable by trying to keep the connection alive. |
||
} | ||
log.Debugw("connected to server", "duration", time.Since(connectStart).String()) | ||
|
||
// Check host's libp2p protocols | ||
protos, err := host.Peerstore().GetProtocols(serverAddrInfo.ID) | ||
if err != nil { | ||
return fmt.Errorf("getting protocols from peer store for %s: %w", serverAddrInfo.ID, err) | ||
} | ||
sort.Slice(protos, func(i, j int) bool { | ||
return protos[i] < protos[j] | ||
}) | ||
log.Debugw("host libp2p protocols", "protocols", protos) | ||
p, err := host.Peerstore().FirstSupportedProtocol(serverAddrInfo.ID, bitswap.ProtocolStrings...) | ||
if err != nil { | ||
return fmt.Errorf("getting first supported protocol from peer store for %s: %w", serverAddrInfo.ID, err) | ||
} | ||
if p == "" { | ||
return fmt.Errorf("host %s does not support any know bitswap protocols: %s", serverAddrInfo.ID, bitswap.ProtocolStrings) | ||
} | ||
|
||
var throttle chan struct{} | ||
concurrency := cctx.Int("concurrency") | ||
if concurrency > 0 { | ||
throttle = make(chan struct{}, concurrency) | ||
} | ||
|
||
// Fetch all blocks under the root cid | ||
log.Infow("fetch", "cid", rootCid, "concurrency", concurrency) | ||
start := time.Now() | ||
count, size, err := getBlocks(ctx, bsClient, rootCid, throttle) | ||
if err != nil { | ||
return fmt.Errorf("getting blocks: %w", err) | ||
} | ||
|
||
log.Infow("fetch complete", "count", count, "size", size, "duration", time.Since(start).String()) | ||
log.Debug("finalizing") | ||
finalizeStart := time.Now() | ||
defer func() { log.Infow("finalize complete", "duration", time.Since(finalizeStart).String()) }() | ||
return bs.Finalize() | ||
}, | ||
} | ||
|
||
func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle chan struct{}) (uint64, uint64, error) { | ||
if throttle != nil { | ||
throttle <- struct{}{} | ||
} | ||
// Get the block | ||
start := time.Now() | ||
blk, err := bsClient.GetBlock(ctx, c) | ||
if throttle != nil { | ||
<-throttle | ||
} | ||
if err != nil { | ||
return 0, 0, err | ||
} | ||
|
||
var size = uint64(len(blk.RawData())) | ||
log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start).String()) | ||
|
||
// Read the links from the block to child nodes in the DAG | ||
var count = uint64(1) | ||
nd, err := ipldlegacy.DecodeNode(ctx, blk) | ||
if err != nil { | ||
return 0, 0, fmt.Errorf("decoding node %s: %w", c, err) | ||
} | ||
|
||
var eg errgroup.Group | ||
lnks := nd.Links() | ||
for _, l := range lnks { | ||
l := l | ||
// Launch a go routine to fetch the blocks underneath each link | ||
eg.Go(func() error { | ||
cnt, sz, err := getBlocks(ctx, bsClient, l.Cid, throttle) | ||
if err != nil { | ||
return err | ||
} | ||
atomic.AddUint64(&count, cnt) | ||
atomic.AddUint64(&size, sz) | ||
return nil | ||
}) | ||
} | ||
|
||
return count, size, eg.Wait() | ||
} | ||
|
||
type blockReceiver struct { | ||
bs *blockstore.ReadWrite | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
} | ||
|
||
func (b blockReceiver) ReceivedBlocks(id peer.ID, blks []blocks.Block) { | ||
err := b.bs.PutMany(b.ctx, blks) | ||
if err != nil { | ||
log.Errorw("failed to write blocks to blockstore: %s", err) | ||
b.cancel() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ func main() { | |
Commands: []*cli.Command{ | ||
initCmd, | ||
runCmd, | ||
fetchCmd, | ||
}, | ||
} | ||
app.Setup() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason you went with ECDSA over ED25519, I tend to see the latter as the default in more places.