Skip to content

Commit

Permalink
Modularise p2p and add connection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
bahner committed Nov 25, 2023
1 parent 0e23f20 commit 075ecbd
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 91 deletions.
19 changes: 19 additions & 0 deletions p2p/connmgr/connmgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package connmgr

import (
"github.com/bahner/go-ma-actor/config"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
p2pConnmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
)

func Init() (*p2pConnmgr.BasicConnMgr, error) {

withGracePeriod := connmgr.WithGracePeriod(config.GetConnMgrGracePeriod())

return p2pConnmgr.NewConnManager(
config.GetLowWaterMark(),
config.GetHighWaterMark(),
withGracePeriod,
)

}
39 changes: 11 additions & 28 deletions p2p/dht.go → p2p/dht/dht.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package p2p
package dht

import (
"context"
"fmt"
"sync"

"github.com/bahner/go-ma"
"github.com/bahner/go-ma-actor/config"
dht "github.com/libp2p/go-libp2p-kad-dht"
p2pDHT "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
log "github.com/sirupsen/logrus"
)

func initDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
func Init(ctx context.Context, h host.Host, dhtOpts ...p2pDHT.Option) (*p2pDHT.IpfsDHT, error) {
log.Info("Initializing DHT.")

kademliaDHT, err := dht.New(ctx, h)
kademliaDHT, err := p2pDHT.New(ctx, h)
if err != nil {
log.Error("Failed to create Kademlia DHT.")
return nil, err
Expand All @@ -35,7 +34,7 @@ func initDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
}

var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
for _, peerAddr := range p2pDHT.DefaultBootstrapPeers {
peerinfo, err := peer.AddrInfoFromP2pAddr(peerAddr)
if err != nil {
log.Warnf("Failed to convert bootstrap peer address: %v", err)
Expand Down Expand Up @@ -82,14 +81,12 @@ func initDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
log.Info("Kademlia DHT bootstrapped successfully.")
return kademliaDHT, nil
}
func DiscoverDHTPeers(ctx context.Context, h host.Host) error {

log.Debug("Starting DHT route discovery.")
// Takes a context and a DHT instance and discovers peers using the DHT.
// You might want to se server option or not for the DHT.
func DiscoverPeers(ctx context.Context, dhtInstance *p2pDHT.IpfsDHT, h host.Host) error {

dhtInstance, err := initDHT(ctx, h)
if err != nil {
return err
}
log.Debug("Starting DHT route discovery.")

routingDiscovery := drouting.NewRoutingDiscovery(dhtInstance)
dutil.Advertise(ctx, routingDiscovery, ma.RENDEZVOUS)
Expand All @@ -114,30 +111,16 @@ discoveryLoop:
continue // Skip self connection
}

highWaterMark := config.GetHighWaterMark()
peerMutex.Lock()
currentPeerCount := len(connectedPeers)
peerMutex.Unlock()
if currentPeerCount >= highWaterMark {
log.Debugf("Current peer count: %d, high water mark: %d.", currentPeerCount, highWaterMark)
log.Debug("High water mark reached, trimming open connections.")
n.ConnManager().TrimOpenConns(context.Background())
break discoveryLoop
}

err := h.Connect(ctx, p) // Using the outer context directly
if err != nil {
log.Debugf("Failed connecting to %s, error: %v", p.ID.String(), err)
} else {
log.Infof("Connected to DHT peer: %s", p.ID.String())

// Add peer to list of known peers
peerMutex.Lock()
connectedPeers[p.ID.String()] = &p
log.Debugf("Protecting peer: %s", p.ID.String())
n.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
n.ConnManager().Protect(p.ID, ma.RENDEZVOUS)
peerMutex.Unlock()
h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
h.ConnManager().Protect(p.ID, ma.RENDEZVOUS)

break discoveryLoop
}
Expand Down
12 changes: 10 additions & 2 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package p2p
import (
"context"

"github.com/bahner/go-ma-actor/p2p/dht"
"github.com/bahner/go-ma-actor/p2p/mdns"
"github.com/libp2p/go-libp2p/core/host"
log "github.com/sirupsen/logrus"
)
Expand All @@ -14,13 +16,19 @@ func StartPeerDiscovery(ctx context.Context, h host.Host) error {

// Start DHT discovery in a new goroutine
go func() {
DiscoverDHTPeers(ctx, h)
dhtINstance, err := dht.Init(ctx, h)
if err != nil {
log.Error("Failed to initialise DHT. Peer discovery unsuccessful.")
done <- struct{}{} // Signal completion
return
}
dht.DiscoverPeers(ctx, dhtINstance, h)
done <- struct{}{} // Signal completion
}()

// Start MDNS discovery in a new goroutine
go func() {
DiscoverMDNSPeers(ctx, h)
mdns.DiscoverPeers(ctx, h)
done <- struct{}{} // Signal completion
}()

Expand Down
30 changes: 9 additions & 21 deletions p2p/mdns.go → p2p/mdns/mdns.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package p2p
package mdns

import (
"context"

"github.com/bahner/go-ma"
"github.com/bahner/go-ma-actor/config"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
p2pmdns "github.com/libp2p/go-libp2p/p2p/discovery/mdns"
log "github.com/sirupsen/logrus"
)

Expand All @@ -28,16 +27,18 @@ func initMDNS(h host.Host, rendezvous string) chan peer.AddrInfo {
n.PeerChan = make(chan peer.AddrInfo)

// An hour might be a long long period in practical applications. But this is fine for us
ser := mdns.NewMdnsService(h, rendezvous, n)
ser := p2pmdns.NewMdnsService(h, rendezvous, n)
if err := ser.Start(); err != nil {
panic(err)
}
return n.PeerChan
}
func DiscoverMDNSPeers(ctx context.Context, h host.Host) error {
func DiscoverPeers(ctx context.Context, h host.Host) error {
log.Debugf("Discovering MDNS peers for servicename: %s", ma.RENDEZVOUS)

peerChan := initMDNS(h, ma.RENDEZVOUS)
// Start trimming connections, so we have room for new friends
h.ConnManager().TrimOpenConns(context.Background())

discoveryLoop:
for {
Expand All @@ -51,16 +52,6 @@ discoveryLoop:
continue // Skip self connection
}

highWaterMark := config.GetHighWaterMark()
peerMutex.Lock()
currentPeerCount := len(connectedPeers)
peerMutex.Unlock()
if currentPeerCount >= highWaterMark {
log.Debugf("Current peer count: %d, high water mark: %d.", currentPeerCount, highWaterMark)
n.ConnManager().TrimOpenConns(context.Background())
break discoveryLoop
}

log.Infof("Found MDNS peer: %s connecting", p.ID.String())
err := h.Connect(ctx, p)
if err != nil {
Expand All @@ -69,15 +60,12 @@ discoveryLoop:
log.Infof("Connected to MDNS peer: %s", p.ID.String())

// Add peer to list of known peers
peerMutex.Lock()
connectedPeers[p.ID.String()] = &p
log.Debugf("Protecting discovered MDNS peer: %s", p.ID.String())
n.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
n.ConnManager().Protect(p.ID, ma.RENDEZVOUS)
peerMutex.Unlock()
h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
h.ConnManager().Protect(p.ID, ma.RENDEZVOUS)

break discoveryLoop
}

case <-ctx.Done():
log.Info("Context cancelled, stopping MDNS peer discovery.")
return nil
Expand Down
51 changes: 12 additions & 39 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,23 @@ package p2p
import (
"context"
"fmt"
"sync"
"time"

"github.com/bahner/go-ma-actor/p2p/connmgr"
"github.com/bahner/go-ma-actor/p2p/node"
"github.com/bahner/go-ma-actor/p2p/pubsub"
"github.com/bahner/go-ma/key/ipns"
libp2p "github.com/libp2p/go-libp2p"
p2ppubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
log "github.com/sirupsen/logrus"
)

var (
err error

ctxDiscovery context.Context
cancel context.CancelFunc

n host.Host
ps *p2ppubsub.PubSub

connectedPeers = make(map[string]*peer.AddrInfo)
peerMutex sync.Mutex
)

// Initialise everything needed for p2p communication.
Expand All @@ -42,8 +36,17 @@ var (

func Init(ctx context.Context, i *ipns.Key, discoveryTimeout time.Duration) (host.Host, *p2ppubsub.PubSub, error) {

connMgr, err := connmgr.Init()
if err != nil {
return nil, nil, fmt.Errorf("p2p.Init: failed to create connection manager: %w", err)
}

p2pOpts := []libp2p.Option{
libp2p.ConnectionManager(connMgr),
}

// Create a new libp2p Host that listens on a random TCP port
n, err = node.New(i, nil)
n, err = node.New(i, p2pOpts...)
if err != nil {
return nil, nil, fmt.Errorf("p2p.Init: failed to create libp2p node: %w", err)
}
Expand Down Expand Up @@ -76,33 +79,3 @@ func GetPubSub() *p2ppubsub.PubSub {
func GetNode() host.Host {
return n
}

// Get list of connectpeers.
// The connectTimeout is how long to wait for a connection to be established.
// This applies to each host in turn.
// If set to 0 a default timeout of 5 seconds will be used.
func GetConnectedPeers(connectTimeout time.Duration) map[string]*peer.AddrInfo {
defaultTimeoutSeconds := 5

if connectTimeout == 0 {
connectTimeout = time.Duration(defaultTimeoutSeconds) * time.Second
}

for p, addrs := range connectedPeers {

ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
defer cancel()

// Try connecting to the peer
if err := n.Connect(ctx, *addrs); err != nil {
log.Debugf("Failed connecting to %s, error: %v. Pruning.", p, err)

peerMutex.Lock()
delete(connectedPeers, p)
peerMutex.Unlock()
}
}

// No need to copy the peers again, as the new hosts are already live
return connectedPeers
}
33 changes: 33 additions & 0 deletions p2p/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package p2p

import (
"time"

"github.com/bahner/go-ma"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

var (
connectedPeers map[string]*peer.AddrInfo
)

// Get list of connected peers.
func GetConnectedPeers(connectTimeout time.Duration) map[string]*peer.AddrInfo {

for _, p := range n.Network().Peers() {

if n.ConnManager().IsProtected(p, ma.RENDEZVOUS) {

if n.Network().Connectedness(p) == network.Connected {

connectedPeer := n.Peerstore().PeerInfo(p)

connectedPeers[p.String()] = &connectedPeer
}
}

}

return connectedPeers
}
2 changes: 1 addition & 1 deletion peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Peer struct {
AddrInfo *p2peer.AddrInfo
}

func NewWithAlias(addrInfo *p2peer.AddrInfo, alias string) *Peer {
func NewWithAlias(addrInfo p2peer.AddrInfo, alias string) *Peer {

id := addrInfo.ID.String()
return &Peer{
Expand Down

0 comments on commit 075ecbd

Please sign in to comment.