Skip to content

Commit

Permalink
Refactor ConectinGater and Discovery
Browse files Browse the repository at this point in the history
The discovery process had become way to complex and I
hadn't realised how important it is for the host to constantly
have a connection to a slew of other hosts to function properly.

It's not enough to know each other, kademlia works best, when
you have a reasonable amount of anonymous peers. The gater is
still there for the future, but it's not in use.
  • Loading branch information
bahner committed Mar 8, 2024
1 parent 2204818 commit 6a83c52
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 226 deletions.
18 changes: 9 additions & 9 deletions config/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func generateActorConfigFile(identity string, node string) {
"grace-period": P2PConnMgrGracePeriod(),
},
"discovery": map[string]interface{}{
"retry": P2PDiscoveryRetryInterval(),
"timeout": P2PDiscoveryTimeout(),
"limit": P2PDiscoveryLimit(),
"advertise-ttl": P2PDiscoveryAdvertiseTTL(),
"advertise-limit": P2PDiscoveryAdvertiseLimit(),
"allow-all": P2PDiscoveryAllowAll(),
},
},
}
Expand Down Expand Up @@ -135,9 +135,9 @@ func generatePongConfigFile(identity string, node string) {
"grace-period": P2PConnMgrGracePeriod(),
},
"discovery": map[string]interface{}{
"retry": P2PDiscoveryRetryInterval(),
"timeout": P2PDiscoveryTimeout(),
"limit": P2PDiscoveryLimit(),
"advertise-ttl": P2PDiscoveryAdvertiseTTL(),
"advertise-limit": P2PDiscoveryAdvertiseLimit(),
"allow-all": P2PDiscoveryAllowAll(),
},
},
"mode": map[string]interface{}{
Expand Down Expand Up @@ -185,9 +185,9 @@ func generateRelayConfigFile(node string) {
"grace-period": P2PConnMgrGracePeriod(),
},
"discovery": map[string]interface{}{
"retry": P2PDiscoveryRetryInterval(),
"timeout": P2PDiscoveryTimeout(),
"limit": P2PDiscoveryLimit(),
"advertise-ttl": P2PDiscoveryAdvertiseTTL(),
"advertise-limit": P2PDiscoveryAdvertiseLimit(),
"allow-all": P2PDiscoveryAllowAll(),
},
},
}
Expand Down
52 changes: 11 additions & 41 deletions config/p2p.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"context"
"strconv"
"time"

Expand All @@ -14,12 +13,9 @@ const (
defaultConnmgrHighWatermark int = 100
defaultConnmgrGracePeriod time.Duration = time.Minute * 1

defaultDiscoveryLimit int = 10
defaultDiscoveryTimeout time.Duration = time.Second * 30
defaultDiscoveryRetryInterval time.Duration = time.Second * 60
defaultDiscoveryAdvertiseTTL time.Duration = time.Minute * 60
defaultDiscoveryAdvertiseLimit int = 100
defaultDiscoveryAllow bool = false
defaultAllowAll bool = true // Allow all peers by default. This is the norm for now. Use connmgr threshold and protection instead.

defaultListenPort int = 0
fakeP2PIdentity string = "NO_DEFAULT_NODE_IDENITY"
Expand All @@ -43,26 +39,17 @@ func init() {
viper.SetDefault("p2p.connmgr.low-watermark", defaultConnmgrLowWatermark)

// DISCOVERY
pflag.Bool("p2p-discovery-allow", defaultDiscoveryAllow, "Number of concurrent peer discovery routines.")
pflag.Duration("p2p-discovery-retry", defaultDiscoveryRetryInterval, "Retry interval for peer discovery.")
pflag.Duration("p2p-discovery-timeout", defaultDiscoveryTimeout, "Timeout for peer discovery.")
pflag.Duration("p2p-discovery-advertise-ttl", defaultDiscoveryTimeout, "Hint o TimeToLive for advertising peer discovery.")
pflag.Int("p2p-discovery-limit", defaultDiscoveryLimit, "Number of concurrent peer discovery routines.")
pflag.Int("p2p-discovery-advertise-limit", defaultDiscoveryLimit, "Limit for advertising peer discovery.")

viper.BindPFlag("p2p.discovery.limit", pflag.Lookup("p2p-discovery-limit"))
viper.BindPFlag("p2p.discovery.retry", pflag.Lookup("p2p-discovery-retryl"))
viper.BindPFlag("p2p.discovery.timeout", pflag.Lookup("p2p-discoveryTimeout"))
viper.BindPFlag("p2p.discovery.advertise-ttl", pflag.Lookup("p2p-discovery-advertise-ttl"))
pflag.Int("p2p-discovery-advertise-limit", defaultDiscoveryAdvertiseLimit, "Limit for advertising peer discovery.")
pflag.Duration("p2p-discovery-advertise-ttl", defaultDiscoveryAdvertiseTTL, "Hint o TimeToLive for advertising peer discovery.")
pflag.Bool("p2p-discovery-allow-all", defaultAllowAll, "Number of concurrent peer discovery routines.")

viper.BindPFlag("p2p.discovery.advertise-limit", pflag.Lookup("p2p-discovery-advertise-limit"))
viper.BindPFlag("p2p.discovery.allow", pflag.Lookup("p2p-discovery-allow"))
viper.BindPFlag("p2p.discovery.advertise-ttl", pflag.Lookup("p2p-discovery-advertise-ttl"))
viper.BindPFlag("p2p.discovery.allow-all", pflag.Lookup("p2p-discovery-allow-all"))

viper.SetDefault("p2p.discovery.limit", defaultDiscoveryLimit)
viper.SetDefault("p2p.discovery.retry", defaultDiscoveryRetryInterval)
viper.SetDefault("p2p.discovery.timeout", defaultDiscoveryTimeout)
viper.SetDefault("p2p.discovery.advertise-ttl", defaultDiscoveryAdvertiseTTL)
viper.SetDefault("p2p.discovery.advertise-limit", defaultDiscoveryAdvertiseLimit)
viper.SetDefault("p2p.discovery.allow", defaultDiscoveryAllow)
viper.SetDefault("p2p.discovery.advertise-ttl", defaultDiscoveryAdvertiseTTL)
viper.SetDefault("p2p.discovery.allow-all", defaultAllowAll)

// Port
pflag.Int("p2p-port", defaultListenPort, "Port for libp2p node to listen on.")
Expand All @@ -81,23 +68,6 @@ func P2PIdentity() string {
return viper.GetString("p2p.identity")
}

func P2PDiscoveryContext() (context.Context, func()) {

ctx := context.Background()

discoveryCtx, cancel := context.WithTimeout(ctx, P2PDiscoveryTimeout())

return discoveryCtx, cancel
}

func P2PDiscoveryTimeout() time.Duration {
return time.Duration(viper.GetDuration("p2p.discovery.timeout"))
}

func P2PDiscoveryLimit() int {
return viper.GetInt("p2p.discovery.limit")
}

func P2PDiscoveryAdvertiseTTL() time.Duration {
return viper.GetDuration("p2p.discovery.advertise-ttl")
}
Expand All @@ -106,8 +76,8 @@ func P2PDiscoveryAdvertiseLimit() int {
return viper.GetInt("p2p.discovery.advertise-limit")
}

func P2PDiscoveryAllow() bool {
return viper.GetBool("p2p.discovery.allow")
func P2PDiscoveryAllowAll() bool {
return viper.GetBool("p2p.discovery.allow-all")
}

func P2PConnmgrLowWatermark() int {
Expand Down
8 changes: 5 additions & 3 deletions p2p/connmgr/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ConnectionGater struct {
// New creates a new CustomConnectionGater instance.
func NewConnectionGater(connMgr *p2pConnmgr.BasicConnMgr) *ConnectionGater {
return &ConnectionGater{
AllowAll: config.P2PDiscoveryAllow(),
AllowAll: config.P2PDiscoveryAllowAll(),
ConnMgr: connMgr,
}
}
Expand All @@ -41,10 +41,12 @@ func (cg *ConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (allow b
// For simplicity, they are set to allow all connections in this example.
func (cg *ConnectionGater) InterceptSecured(nd network.Direction, p p2peer.ID, _ network.ConnMultiaddrs) (allow bool) {

if nd == network.DirOutbound {
// We should probably run with cg.AllowAll = true in the future
if nd == network.DirOutbound || cg.AllowAll {
return true
}

// We normally shouldn't arrive here.
allow = cg.isAllowed(p)

if allow {
Expand All @@ -66,7 +68,7 @@ func (cg *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason

func (cg *ConnectionGater) isAllowed(p p2peer.ID) bool {

if config.P2PDiscoveryAllow() || cg.AllowAll {
if config.P2PDiscoveryAllowAll() || cg.AllowAll {
return true
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func New(h host.Host, cg *connmgr.ConnectionGater, dhtOpts ...p2pDHT.Option) (*D
d.ConnectionGater.AllowAll = true
d.Bootstrap(context.Background())
// Reset the connection gater to its original allow state
d.ConnectionGater.AllowAll = config.P2PDiscoveryAllow()
d.ConnectionGater.AllowAll = config.P2PDiscoveryAllowAll()

return d, nil
}
85 changes: 25 additions & 60 deletions p2p/dht/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,84 +3,49 @@ package dht
import (
"context"
"fmt"
"sync/atomic"

"github.com/bahner/go-ma"
"github.com/bahner/go-ma-actor/config"
"github.com/libp2p/go-libp2p/core/discovery"
p2peer "github.com/libp2p/go-libp2p/core/peer"
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
log "github.com/sirupsen/logrus"
)

var (
ErrAddrInfoAddrsEmpty = fmt.Errorf("addrinfo has no addresses")
ErrNoProtectedPeersFound = fmt.Errorf("no peers were discovered")
ErrFailedToCreateRoutingDiscovery = fmt.Errorf("failed to create routing discovery")
ErrPeerChanClosed = fmt.Errorf("peer channel closed")
)

// Takes a context and a DHT instance and discovers peers using the DHT.
// You might want to se server option or not for the DHT.
// Takes a variadic list of discovery.Option. You'll need this if you want to set a custom routing table.
func (d *DHT) DiscoverPeers(ctx context.Context, discoveryOpts ...discovery.Option) error {
log.Debugf("Starting DHT peer discovery searching for peers with rendezvous string: %s", ma.RENDEZVOUS)

log.Debugf("Number of open connections: %d", len(d.h.Network().Conns()))
// // Trim connections
// log.Debugf("Trimming open connections to %d", config.P2PConnmgrLowWatermark())
// d.h.ConnManager().TrimOpenConns(ctx)

log.Debugf("Peer discovery timeout: %v", config.P2PDiscoveryTimeout())
log.Debugf("Peer discovery context %v", ctx)

// Run a continuous discovery loop to find new peers
// The ctx should probably be a background context
func (d *DHT) DiscoveryLoop(ctx context.Context) error {
routingDiscovery := drouting.NewRoutingDiscovery(d.IpfsDHT)
if routingDiscovery == nil {
return fmt.Errorf("dht:discovery: failed to create routing discovery")
return ErrFailedToCreateRoutingDiscovery
}

// discoveryOpts = append(discoveryOpts,
// discovery.Limit(config.P2PDiscoveryAdvertiseLimit()),
// discovery.TTL(config.P2PDiscoveryAdvertiseTTL()))

dutil.Advertise(ctx, routingDiscovery, ma.RENDEZVOUS, discoveryOpts...)
log.Debugf("Advertising rendezvous string: %s", ma.RENDEZVOUS)

peerChan, err := routingDiscovery.FindPeers(ctx, ma.RENDEZVOUS, discoveryOpts...)
peerChan, err := routingDiscovery.FindPeers(ctx, ma.RENDEZVOUS)
if err != nil {
return fmt.Errorf("dht:discovery: peer discovery error: %w", err)
return fmt.Errorf("peer discovery error: %w", err)
}

sem := make(chan struct{}, config.P2PDiscoveryLimit()) // Semaphore for controlling concurrency
var successCount int32

// // Make sure we have set the allowAll flag to it's to it's allowed state
// d.ConnectionGater.AllowAll = config.P2PDiscoveryAllow()

for p := range peerChan {
sem <- struct{}{} // Acquire a token
go func(peerInfo p2peer.AddrInfo) {
defer func() { <-sem }() // Release the token

if peerInfo.ID == d.h.ID() {
return // Skip self connection
}
if err := d.PeerConnectAndUpdateIfSuccessful(ctx, peerInfo); err != nil {
log.Warnf("Failed to protect discovered peer: %s: %v", peerInfo.ID.String(), err)
} else {
atomic.AddInt32(&successCount, 1) // Increment on successful operation
for {
select {
case p, ok := <-peerChan:
if !ok {
if !(ctx == nil) {
log.Fatalf("DHT peer channel closed, but it was supposed to be running in the background.")
}
return ErrPeerChanClosed
}
}(p)
}

// Wait for all goroutines to finish
for i := 0; i < cap(sem); i++ {
sem <- struct{}{} // Ensure all tokens are returned before proceeding
}
if p.ID == d.h.ID() {
continue // Skip self
}

// After processing all peers, check if there were any successful connections
if atomic.LoadInt32(&successCount) == 0 {
return ErrNoProtectedPeersFound
if err := d.PeerConnectAndUpdateIfSuccessful(ctx, p); err != nil {
log.Warnf("Failed to connect to discovered peer: %s: %v", p.ID.String(), err)
}
case <-ctx.Done():
return nil
}
}

return nil
}
59 changes: 3 additions & 56 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,6 @@ package p2p

import (
"context"
"fmt"
"time"

"github.com/bahner/go-ma"
"github.com/bahner/go-ma-actor/config"
"github.com/bahner/go-ma-actor/p2p/dht"
"github.com/bahner/go-ma-actor/p2p/mdns"
log "github.com/sirupsen/logrus"
)

// DiscoverPeers starts the peer discovery process.
Expand All @@ -20,56 +12,11 @@ import (
// DHT is a Kademlia DHT instance.
// If nil, a new DHT instance will be created.
// You might want to pass a DHT instance in Server mode here, for long running processes.
func (p *P2P) DiscoverPeers() error {

ctx, cancel := config.P2PDiscoveryContext()
defer cancel()
func (p *P2P) DiscoveryLoop(ctx context.Context) error {

// Start MDNS discovery in a new goroutine
go func() {
m, err := mdns.New(p.DHT.Host(), ma.RENDEZVOUS)
if err == mdns.ErrNoProtectedPeersFound {
log.Warnf("No protected peers found")
return
}
if err != nil {
log.Errorf("Failed to start MDNS discovery: %s", err)
return
}
m.DiscoverPeers(ctx)
}()
go p.MDNS.DiscoveryLoop(ctx)

// Wait for a discovery process to complete
err := p.DHT.DiscoverPeers(ctx)
if err != dht.ErrNoProtectedPeersFound {
return fmt.Errorf("no new peers found %w", err)
}
if err != nil {
return fmt.Errorf("peer discovery unsuccessful: %w", err)
}
go p.DHT.DiscoveryLoop(ctx)

return nil
}

// DiscoveryLoop is a blocking function that will periodically
// call DiscoverPeers() until the context is cancelled.
// This shouldn't be cancelled in normal operation.
// Each iteration will have a timeout of its own.

func (p *P2P) DiscoveryLoop(ctx context.Context) {
log.Infof("Starting discovery with retry interval %s", config.P2PDiscoveryRetryIntervalString())
for {
select {
case <-ctx.Done():
return
default:
err := p.DiscoverPeers() // This will block until discovery is complete or timeout
if err != nil {
log.Debugf("Discovery attempt failed: %s", err)
}
sleepTime := config.P2PDiscoveryRetryInterval()
log.Debugf("Discovery sleeping for %s", sleepTime.String())
time.Sleep(sleepTime)
}
}
}
10 changes: 5 additions & 5 deletions p2p/mdns/discover.go → p2p/mdns/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package mdns

import (
"context"
"errors"

log "github.com/sirupsen/logrus"
)

var ErrNoProtectedPeersFound = errors.New("protected peers not found")

// DiscoverPeers starts the discovery process and connects to discovered peers until the context is cancelled.
func (m *MDNS) DiscoverPeers(ctx context.Context) error {
// DiscoveryLoop starts the discovery process and connects to discovered peers until the context is cancelled.
func (m *MDNS) DiscoveryLoop(ctx context.Context) error {
log.Debugf("Discovering MDNS peers for service name: %s", m.rendezvous)

for {
select {
case pai, ok := <-m.PeerChan:
if !ok {
if !(ctx == nil) { // conext.Bacground() is nil
log.Fatalf("MDNS peer channel closed, ut was supposed to be running in the background.")
}
log.Debug("MDNS peer channel closed.")
return nil // Exit if the channel is closed
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func GetOrCreateFromAddrInfo(addrInfo *p2peer.AddrInfo) (Peer, error) {
nodeAlias = createNodeAlias(id)
}

return New(addrInfo, nodeAlias, config.P2PDiscoveryAllow()), nil
return New(addrInfo, nodeAlias, config.P2PDiscoveryAllowAll()), nil

}

Expand Down
Loading

0 comments on commit 6a83c52

Please sign in to comment.