From 075ecbd4f704892e466ff0b23e4985b368fa8c08 Mon Sep 17 00:00:00 2001 From: Lars Bahner Date: Sat, 25 Nov 2023 21:04:34 +0100 Subject: [PATCH] Modularise p2p and add connection manager --- p2p/connmgr/connmgr.go | 19 ++++++++++++++++ p2p/{ => dht}/dht.go | 39 +++++++++----------------------- p2p/discovery.go | 12 ++++++++-- p2p/{ => mdns}/mdns.go | 30 ++++++++----------------- p2p/p2p.go | 51 ++++++++++-------------------------------- p2p/peers.go | 33 +++++++++++++++++++++++++++ peer/peer.go | 2 +- 7 files changed, 95 insertions(+), 91 deletions(-) create mode 100644 p2p/connmgr/connmgr.go rename p2p/{ => dht}/dht.go (73%) rename p2p/{ => mdns}/mdns.go (67%) create mode 100644 p2p/peers.go diff --git a/p2p/connmgr/connmgr.go b/p2p/connmgr/connmgr.go new file mode 100644 index 0000000..2089b8c --- /dev/null +++ b/p2p/connmgr/connmgr.go @@ -0,0 +1,19 @@ +package connmgr + +import ( + "github.com/bahner/go-ma-actor/config" + "github.com/libp2p/go-libp2p/p2p/net/connmgr" + p2pConnmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr" +) + +func Init() (*p2pConnmgr.BasicConnMgr, error) { + + withGracePeriod := connmgr.WithGracePeriod(config.GetConnMgrGracePeriod()) + + return p2pConnmgr.NewConnManager( + config.GetLowWaterMark(), + config.GetHighWaterMark(), + withGracePeriod, + ) + +} diff --git a/p2p/dht.go b/p2p/dht/dht.go similarity index 73% rename from p2p/dht.go rename to p2p/dht/dht.go index 3728678..067cdeb 100644 --- a/p2p/dht.go +++ b/p2p/dht/dht.go @@ -1,4 +1,4 @@ -package p2p +package dht import ( "context" @@ -6,8 +6,7 @@ import ( "sync" "github.com/bahner/go-ma" - "github.com/bahner/go-ma-actor/config" - dht "github.com/libp2p/go-libp2p-kad-dht" + p2pDHT "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" @@ -15,10 +14,10 @@ import ( log "github.com/sirupsen/logrus" ) -func initDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) { +func Init(ctx context.Context, h host.Host, dhtOpts ...p2pDHT.Option) (*p2pDHT.IpfsDHT, error) { log.Info("Initializing DHT.") - kademliaDHT, err := dht.New(ctx, h) + kademliaDHT, err := p2pDHT.New(ctx, h) if err != nil { log.Error("Failed to create Kademlia DHT.") return nil, err @@ -35,7 +34,7 @@ func initDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) { } var wg sync.WaitGroup - for _, peerAddr := range dht.DefaultBootstrapPeers { + for _, peerAddr := range p2pDHT.DefaultBootstrapPeers { peerinfo, err := peer.AddrInfoFromP2pAddr(peerAddr) if err != nil { log.Warnf("Failed to convert bootstrap peer address: %v", err) @@ -82,14 +81,12 @@ func initDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) { log.Info("Kademlia DHT bootstrapped successfully.") return kademliaDHT, nil } -func DiscoverDHTPeers(ctx context.Context, h host.Host) error { - log.Debug("Starting DHT route discovery.") +// Takes a context and a DHT instance and discovers peers using the DHT. +// You might want to se server option or not for the DHT. +func DiscoverPeers(ctx context.Context, dhtInstance *p2pDHT.IpfsDHT, h host.Host) error { - dhtInstance, err := initDHT(ctx, h) - if err != nil { - return err - } + log.Debug("Starting DHT route discovery.") routingDiscovery := drouting.NewRoutingDiscovery(dhtInstance) dutil.Advertise(ctx, routingDiscovery, ma.RENDEZVOUS) @@ -114,17 +111,6 @@ discoveryLoop: continue // Skip self connection } - highWaterMark := config.GetHighWaterMark() - peerMutex.Lock() - currentPeerCount := len(connectedPeers) - peerMutex.Unlock() - if currentPeerCount >= highWaterMark { - log.Debugf("Current peer count: %d, high water mark: %d.", currentPeerCount, highWaterMark) - log.Debug("High water mark reached, trimming open connections.") - n.ConnManager().TrimOpenConns(context.Background()) - break discoveryLoop - } - err := h.Connect(ctx, p) // Using the outer context directly if err != nil { log.Debugf("Failed connecting to %s, error: %v", p.ID.String(), err) @@ -132,12 +118,9 @@ discoveryLoop: log.Infof("Connected to DHT peer: %s", p.ID.String()) // Add peer to list of known peers - peerMutex.Lock() - connectedPeers[p.ID.String()] = &p log.Debugf("Protecting peer: %s", p.ID.String()) - n.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10) - n.ConnManager().Protect(p.ID, ma.RENDEZVOUS) - peerMutex.Unlock() + h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10) + h.ConnManager().Protect(p.ID, ma.RENDEZVOUS) break discoveryLoop } diff --git a/p2p/discovery.go b/p2p/discovery.go index bab622d..ecb5219 100644 --- a/p2p/discovery.go +++ b/p2p/discovery.go @@ -3,6 +3,8 @@ package p2p import ( "context" + "github.com/bahner/go-ma-actor/p2p/dht" + "github.com/bahner/go-ma-actor/p2p/mdns" "github.com/libp2p/go-libp2p/core/host" log "github.com/sirupsen/logrus" ) @@ -14,13 +16,19 @@ func StartPeerDiscovery(ctx context.Context, h host.Host) error { // Start DHT discovery in a new goroutine go func() { - DiscoverDHTPeers(ctx, h) + dhtINstance, err := dht.Init(ctx, h) + if err != nil { + log.Error("Failed to initialise DHT. Peer discovery unsuccessful.") + done <- struct{}{} // Signal completion + return + } + dht.DiscoverPeers(ctx, dhtINstance, h) done <- struct{}{} // Signal completion }() // Start MDNS discovery in a new goroutine go func() { - DiscoverMDNSPeers(ctx, h) + mdns.DiscoverPeers(ctx, h) done <- struct{}{} // Signal completion }() diff --git a/p2p/mdns.go b/p2p/mdns/mdns.go similarity index 67% rename from p2p/mdns.go rename to p2p/mdns/mdns.go index 4b7e7e5..25ca45f 100644 --- a/p2p/mdns.go +++ b/p2p/mdns/mdns.go @@ -1,14 +1,13 @@ -package p2p +package mdns import ( "context" "github.com/bahner/go-ma" - "github.com/bahner/go-ma-actor/config" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/discovery/mdns" + p2pmdns "github.com/libp2p/go-libp2p/p2p/discovery/mdns" log "github.com/sirupsen/logrus" ) @@ -28,16 +27,18 @@ func initMDNS(h host.Host, rendezvous string) chan peer.AddrInfo { n.PeerChan = make(chan peer.AddrInfo) // An hour might be a long long period in practical applications. But this is fine for us - ser := mdns.NewMdnsService(h, rendezvous, n) + ser := p2pmdns.NewMdnsService(h, rendezvous, n) if err := ser.Start(); err != nil { panic(err) } return n.PeerChan } -func DiscoverMDNSPeers(ctx context.Context, h host.Host) error { +func DiscoverPeers(ctx context.Context, h host.Host) error { log.Debugf("Discovering MDNS peers for servicename: %s", ma.RENDEZVOUS) peerChan := initMDNS(h, ma.RENDEZVOUS) + // Start trimming connections, so we have room for new friends + h.ConnManager().TrimOpenConns(context.Background()) discoveryLoop: for { @@ -51,16 +52,6 @@ discoveryLoop: continue // Skip self connection } - highWaterMark := config.GetHighWaterMark() - peerMutex.Lock() - currentPeerCount := len(connectedPeers) - peerMutex.Unlock() - if currentPeerCount >= highWaterMark { - log.Debugf("Current peer count: %d, high water mark: %d.", currentPeerCount, highWaterMark) - n.ConnManager().TrimOpenConns(context.Background()) - break discoveryLoop - } - log.Infof("Found MDNS peer: %s connecting", p.ID.String()) err := h.Connect(ctx, p) if err != nil { @@ -69,15 +60,12 @@ discoveryLoop: log.Infof("Connected to MDNS peer: %s", p.ID.String()) // Add peer to list of known peers - peerMutex.Lock() - connectedPeers[p.ID.String()] = &p log.Debugf("Protecting discovered MDNS peer: %s", p.ID.String()) - n.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10) - n.ConnManager().Protect(p.ID, ma.RENDEZVOUS) - peerMutex.Unlock() + h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10) + h.ConnManager().Protect(p.ID, ma.RENDEZVOUS) - break discoveryLoop } + case <-ctx.Done(): log.Info("Context cancelled, stopping MDNS peer discovery.") return nil diff --git a/p2p/p2p.go b/p2p/p2p.go index b0c1de6..d92c254 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -3,29 +3,23 @@ package p2p import ( "context" "fmt" - "sync" "time" + "github.com/bahner/go-ma-actor/p2p/connmgr" "github.com/bahner/go-ma-actor/p2p/node" "github.com/bahner/go-ma-actor/p2p/pubsub" "github.com/bahner/go-ma/key/ipns" + libp2p "github.com/libp2p/go-libp2p" p2ppubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - log "github.com/sirupsen/logrus" ) var ( - err error - ctxDiscovery context.Context cancel context.CancelFunc n host.Host ps *p2ppubsub.PubSub - - connectedPeers = make(map[string]*peer.AddrInfo) - peerMutex sync.Mutex ) // Initialise everything needed for p2p communication. @@ -42,8 +36,17 @@ var ( func Init(ctx context.Context, i *ipns.Key, discoveryTimeout time.Duration) (host.Host, *p2ppubsub.PubSub, error) { + connMgr, err := connmgr.Init() + if err != nil { + return nil, nil, fmt.Errorf("p2p.Init: failed to create connection manager: %w", err) + } + + p2pOpts := []libp2p.Option{ + libp2p.ConnectionManager(connMgr), + } + // Create a new libp2p Host that listens on a random TCP port - n, err = node.New(i, nil) + n, err = node.New(i, p2pOpts...) if err != nil { return nil, nil, fmt.Errorf("p2p.Init: failed to create libp2p node: %w", err) } @@ -76,33 +79,3 @@ func GetPubSub() *p2ppubsub.PubSub { func GetNode() host.Host { return n } - -// Get list of connectpeers. -// The connectTimeout is how long to wait for a connection to be established. -// This applies to each host in turn. -// If set to 0 a default timeout of 5 seconds will be used. -func GetConnectedPeers(connectTimeout time.Duration) map[string]*peer.AddrInfo { - defaultTimeoutSeconds := 5 - - if connectTimeout == 0 { - connectTimeout = time.Duration(defaultTimeoutSeconds) * time.Second - } - - for p, addrs := range connectedPeers { - - ctx, cancel := context.WithTimeout(context.Background(), connectTimeout) - defer cancel() - - // Try connecting to the peer - if err := n.Connect(ctx, *addrs); err != nil { - log.Debugf("Failed connecting to %s, error: %v. Pruning.", p, err) - - peerMutex.Lock() - delete(connectedPeers, p) - peerMutex.Unlock() - } - } - - // No need to copy the peers again, as the new hosts are already live - return connectedPeers -} diff --git a/p2p/peers.go b/p2p/peers.go new file mode 100644 index 0000000..eefc878 --- /dev/null +++ b/p2p/peers.go @@ -0,0 +1,33 @@ +package p2p + +import ( + "time" + + "github.com/bahner/go-ma" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" +) + +var ( + connectedPeers map[string]*peer.AddrInfo +) + +// Get list of connected peers. +func GetConnectedPeers(connectTimeout time.Duration) map[string]*peer.AddrInfo { + + for _, p := range n.Network().Peers() { + + if n.ConnManager().IsProtected(p, ma.RENDEZVOUS) { + + if n.Network().Connectedness(p) == network.Connected { + + connectedPeer := n.Peerstore().PeerInfo(p) + + connectedPeers[p.String()] = &connectedPeer + } + } + + } + + return connectedPeers +} diff --git a/peer/peer.go b/peer/peer.go index 004888b..c856b8b 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -11,7 +11,7 @@ type Peer struct { AddrInfo *p2peer.AddrInfo } -func NewWithAlias(addrInfo *p2peer.AddrInfo, alias string) *Peer { +func NewWithAlias(addrInfo p2peer.AddrInfo, alias string) *Peer { id := addrInfo.ID.String() return &Peer{