diff --git a/config/generate.go b/config/generate.go index e3ed113..857a850 100644 --- a/config/generate.go +++ b/config/generate.go @@ -45,9 +45,9 @@ func generateActorConfigFile(identity string, node string) { "grace-period": P2PConnMgrGracePeriod(), }, "discovery": map[string]interface{}{ - "retry": P2PDiscoveryRetryInterval(), - "timeout": P2PDiscoveryTimeout(), - "limit": P2PDiscoveryLimit(), + "advertise-ttl": P2PDiscoveryAdvertiseTTL(), + "advertise-limit": P2PDiscoveryAdvertiseLimit(), + "allow-all": P2PDiscoveryAllowAll(), }, }, } @@ -135,9 +135,9 @@ func generatePongConfigFile(identity string, node string) { "grace-period": P2PConnMgrGracePeriod(), }, "discovery": map[string]interface{}{ - "retry": P2PDiscoveryRetryInterval(), - "timeout": P2PDiscoveryTimeout(), - "limit": P2PDiscoveryLimit(), + "advertise-ttl": P2PDiscoveryAdvertiseTTL(), + "advertise-limit": P2PDiscoveryAdvertiseLimit(), + "allow-all": P2PDiscoveryAllowAll(), }, }, "mode": map[string]interface{}{ @@ -185,9 +185,9 @@ func generateRelayConfigFile(node string) { "grace-period": P2PConnMgrGracePeriod(), }, "discovery": map[string]interface{}{ - "retry": P2PDiscoveryRetryInterval(), - "timeout": P2PDiscoveryTimeout(), - "limit": P2PDiscoveryLimit(), + "advertise-ttl": P2PDiscoveryAdvertiseTTL(), + "advertise-limit": P2PDiscoveryAdvertiseLimit(), + "allow-all": P2PDiscoveryAllowAll(), }, }, } diff --git a/config/p2p.go b/config/p2p.go index 21a95f7..28e9413 100644 --- a/config/p2p.go +++ b/config/p2p.go @@ -1,7 +1,6 @@ package config import ( - "context" "strconv" "time" @@ -14,12 +13,9 @@ const ( defaultConnmgrHighWatermark int = 100 defaultConnmgrGracePeriod time.Duration = time.Minute * 1 - defaultDiscoveryLimit int = 10 - defaultDiscoveryTimeout time.Duration = time.Second * 30 - defaultDiscoveryRetryInterval time.Duration = time.Second * 60 defaultDiscoveryAdvertiseTTL time.Duration = time.Minute * 60 defaultDiscoveryAdvertiseLimit int = 100 - defaultDiscoveryAllow bool = false + defaultAllowAll bool = true // Allow all peers by default. This is the norm for now. Use connmgr threshold and protection instead. defaultListenPort int = 0 fakeP2PIdentity string = "NO_DEFAULT_NODE_IDENITY" @@ -43,26 +39,17 @@ func init() { viper.SetDefault("p2p.connmgr.low-watermark", defaultConnmgrLowWatermark) // DISCOVERY - pflag.Bool("p2p-discovery-allow", defaultDiscoveryAllow, "Number of concurrent peer discovery routines.") - pflag.Duration("p2p-discovery-retry", defaultDiscoveryRetryInterval, "Retry interval for peer discovery.") - pflag.Duration("p2p-discovery-timeout", defaultDiscoveryTimeout, "Timeout for peer discovery.") - pflag.Duration("p2p-discovery-advertise-ttl", defaultDiscoveryTimeout, "Hint o TimeToLive for advertising peer discovery.") - pflag.Int("p2p-discovery-limit", defaultDiscoveryLimit, "Number of concurrent peer discovery routines.") - pflag.Int("p2p-discovery-advertise-limit", defaultDiscoveryLimit, "Limit for advertising peer discovery.") - - viper.BindPFlag("p2p.discovery.limit", pflag.Lookup("p2p-discovery-limit")) - viper.BindPFlag("p2p.discovery.retry", pflag.Lookup("p2p-discovery-retryl")) - viper.BindPFlag("p2p.discovery.timeout", pflag.Lookup("p2p-discoveryTimeout")) - viper.BindPFlag("p2p.discovery.advertise-ttl", pflag.Lookup("p2p-discovery-advertise-ttl")) + pflag.Int("p2p-discovery-advertise-limit", defaultDiscoveryAdvertiseLimit, "Limit for advertising peer discovery.") + pflag.Duration("p2p-discovery-advertise-ttl", defaultDiscoveryAdvertiseTTL, "Hint o TimeToLive for advertising peer discovery.") + pflag.Bool("p2p-discovery-allow-all", defaultAllowAll, "Number of concurrent peer discovery routines.") + viper.BindPFlag("p2p.discovery.advertise-limit", pflag.Lookup("p2p-discovery-advertise-limit")) - viper.BindPFlag("p2p.discovery.allow", pflag.Lookup("p2p-discovery-allow")) + viper.BindPFlag("p2p.discovery.advertise-ttl", pflag.Lookup("p2p-discovery-advertise-ttl")) + viper.BindPFlag("p2p.discovery.allow-all", pflag.Lookup("p2p-discovery-allow-all")) - viper.SetDefault("p2p.discovery.limit", defaultDiscoveryLimit) - viper.SetDefault("p2p.discovery.retry", defaultDiscoveryRetryInterval) - viper.SetDefault("p2p.discovery.timeout", defaultDiscoveryTimeout) - viper.SetDefault("p2p.discovery.advertise-ttl", defaultDiscoveryAdvertiseTTL) viper.SetDefault("p2p.discovery.advertise-limit", defaultDiscoveryAdvertiseLimit) - viper.SetDefault("p2p.discovery.allow", defaultDiscoveryAllow) + viper.SetDefault("p2p.discovery.advertise-ttl", defaultDiscoveryAdvertiseTTL) + viper.SetDefault("p2p.discovery.allow-all", defaultAllowAll) // Port pflag.Int("p2p-port", defaultListenPort, "Port for libp2p node to listen on.") @@ -81,23 +68,6 @@ func P2PIdentity() string { return viper.GetString("p2p.identity") } -func P2PDiscoveryContext() (context.Context, func()) { - - ctx := context.Background() - - discoveryCtx, cancel := context.WithTimeout(ctx, P2PDiscoveryTimeout()) - - return discoveryCtx, cancel -} - -func P2PDiscoveryTimeout() time.Duration { - return time.Duration(viper.GetDuration("p2p.discovery.timeout")) -} - -func P2PDiscoveryLimit() int { - return viper.GetInt("p2p.discovery.limit") -} - func P2PDiscoveryAdvertiseTTL() time.Duration { return viper.GetDuration("p2p.discovery.advertise-ttl") } @@ -106,8 +76,8 @@ func P2PDiscoveryAdvertiseLimit() int { return viper.GetInt("p2p.discovery.advertise-limit") } -func P2PDiscoveryAllow() bool { - return viper.GetBool("p2p.discovery.allow") +func P2PDiscoveryAllowAll() bool { + return viper.GetBool("p2p.discovery.allow-all") } func P2PConnmgrLowWatermark() int { diff --git a/p2p/connmgr/gater.go b/p2p/connmgr/gater.go index 8436574..5827896 100644 --- a/p2p/connmgr/gater.go +++ b/p2p/connmgr/gater.go @@ -21,7 +21,7 @@ type ConnectionGater struct { // New creates a new CustomConnectionGater instance. func NewConnectionGater(connMgr *p2pConnmgr.BasicConnMgr) *ConnectionGater { return &ConnectionGater{ - AllowAll: config.P2PDiscoveryAllow(), + AllowAll: config.P2PDiscoveryAllowAll(), ConnMgr: connMgr, } } @@ -41,10 +41,12 @@ func (cg *ConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (allow b // For simplicity, they are set to allow all connections in this example. func (cg *ConnectionGater) InterceptSecured(nd network.Direction, p p2peer.ID, _ network.ConnMultiaddrs) (allow bool) { - if nd == network.DirOutbound { + // We should probably run with cg.AllowAll = true in the future + if nd == network.DirOutbound || cg.AllowAll { return true } + // We normally shouldn't arrive here. allow = cg.isAllowed(p) if allow { @@ -66,7 +68,7 @@ func (cg *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason func (cg *ConnectionGater) isAllowed(p p2peer.ID) bool { - if config.P2PDiscoveryAllow() || cg.AllowAll { + if config.P2PDiscoveryAllowAll() || cg.AllowAll { return true } diff --git a/p2p/dht/dht.go b/p2p/dht/dht.go index 944c1c5..f3fae46 100644 --- a/p2p/dht/dht.go +++ b/p2p/dht/dht.go @@ -42,7 +42,7 @@ func New(h host.Host, cg *connmgr.ConnectionGater, dhtOpts ...p2pDHT.Option) (*D d.ConnectionGater.AllowAll = true d.Bootstrap(context.Background()) // Reset the connection gater to its original allow state - d.ConnectionGater.AllowAll = config.P2PDiscoveryAllow() + d.ConnectionGater.AllowAll = config.P2PDiscoveryAllowAll() return d, nil } diff --git a/p2p/dht/discovery.go b/p2p/dht/discovery.go index 6de9a66..78d94ac 100644 --- a/p2p/dht/discovery.go +++ b/p2p/dht/discovery.go @@ -3,84 +3,49 @@ package dht import ( "context" "fmt" - "sync/atomic" "github.com/bahner/go-ma" - "github.com/bahner/go-ma-actor/config" - "github.com/libp2p/go-libp2p/core/discovery" - p2peer "github.com/libp2p/go-libp2p/core/peer" drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" - dutil "github.com/libp2p/go-libp2p/p2p/discovery/util" log "github.com/sirupsen/logrus" ) var ( - ErrAddrInfoAddrsEmpty = fmt.Errorf("addrinfo has no addresses") - ErrNoProtectedPeersFound = fmt.Errorf("no peers were discovered") + ErrFailedToCreateRoutingDiscovery = fmt.Errorf("failed to create routing discovery") + ErrPeerChanClosed = fmt.Errorf("peer channel closed") ) -// Takes a context and a DHT instance and discovers peers using the DHT. -// You might want to se server option or not for the DHT. -// Takes a variadic list of discovery.Option. You'll need this if you want to set a custom routing table. -func (d *DHT) DiscoverPeers(ctx context.Context, discoveryOpts ...discovery.Option) error { - log.Debugf("Starting DHT peer discovery searching for peers with rendezvous string: %s", ma.RENDEZVOUS) - - log.Debugf("Number of open connections: %d", len(d.h.Network().Conns())) - // // Trim connections - // log.Debugf("Trimming open connections to %d", config.P2PConnmgrLowWatermark()) - // d.h.ConnManager().TrimOpenConns(ctx) - - log.Debugf("Peer discovery timeout: %v", config.P2PDiscoveryTimeout()) - log.Debugf("Peer discovery context %v", ctx) - +// Run a continuous discovery loop to find new peers +// The ctx should probably be a background context +func (d *DHT) DiscoveryLoop(ctx context.Context) error { routingDiscovery := drouting.NewRoutingDiscovery(d.IpfsDHT) if routingDiscovery == nil { - return fmt.Errorf("dht:discovery: failed to create routing discovery") + return ErrFailedToCreateRoutingDiscovery } - // discoveryOpts = append(discoveryOpts, - // discovery.Limit(config.P2PDiscoveryAdvertiseLimit()), - // discovery.TTL(config.P2PDiscoveryAdvertiseTTL())) - - dutil.Advertise(ctx, routingDiscovery, ma.RENDEZVOUS, discoveryOpts...) - log.Debugf("Advertising rendezvous string: %s", ma.RENDEZVOUS) - - peerChan, err := routingDiscovery.FindPeers(ctx, ma.RENDEZVOUS, discoveryOpts...) + peerChan, err := routingDiscovery.FindPeers(ctx, ma.RENDEZVOUS) if err != nil { - return fmt.Errorf("dht:discovery: peer discovery error: %w", err) + return fmt.Errorf("peer discovery error: %w", err) } - sem := make(chan struct{}, config.P2PDiscoveryLimit()) // Semaphore for controlling concurrency - var successCount int32 - - // // Make sure we have set the allowAll flag to it's to it's allowed state - // d.ConnectionGater.AllowAll = config.P2PDiscoveryAllow() - - for p := range peerChan { - sem <- struct{}{} // Acquire a token - go func(peerInfo p2peer.AddrInfo) { - defer func() { <-sem }() // Release the token - - if peerInfo.ID == d.h.ID() { - return // Skip self connection - } - if err := d.PeerConnectAndUpdateIfSuccessful(ctx, peerInfo); err != nil { - log.Warnf("Failed to protect discovered peer: %s: %v", peerInfo.ID.String(), err) - } else { - atomic.AddInt32(&successCount, 1) // Increment on successful operation + for { + select { + case p, ok := <-peerChan: + if !ok { + if !(ctx == nil) { + log.Fatalf("DHT peer channel closed, but it was supposed to be running in the background.") + } + return ErrPeerChanClosed } - }(p) - } - // Wait for all goroutines to finish - for i := 0; i < cap(sem); i++ { - sem <- struct{}{} // Ensure all tokens are returned before proceeding - } + if p.ID == d.h.ID() { + continue // Skip self + } - // After processing all peers, check if there were any successful connections - if atomic.LoadInt32(&successCount) == 0 { - return ErrNoProtectedPeersFound + if err := d.PeerConnectAndUpdateIfSuccessful(ctx, p); err != nil { + log.Warnf("Failed to connect to discovered peer: %s: %v", p.ID.String(), err) + } + case <-ctx.Done(): + return nil + } } - - return nil } diff --git a/p2p/discovery.go b/p2p/discovery.go index 3d95247..d5997b2 100644 --- a/p2p/discovery.go +++ b/p2p/discovery.go @@ -2,14 +2,6 @@ package p2p import ( "context" - "fmt" - "time" - - "github.com/bahner/go-ma" - "github.com/bahner/go-ma-actor/config" - "github.com/bahner/go-ma-actor/p2p/dht" - "github.com/bahner/go-ma-actor/p2p/mdns" - log "github.com/sirupsen/logrus" ) // DiscoverPeers starts the peer discovery process. @@ -20,56 +12,11 @@ import ( // DHT is a Kademlia DHT instance. // If nil, a new DHT instance will be created. // You might want to pass a DHT instance in Server mode here, for long running processes. -func (p *P2P) DiscoverPeers() error { - - ctx, cancel := config.P2PDiscoveryContext() - defer cancel() +func (p *P2P) DiscoveryLoop(ctx context.Context) error { - // Start MDNS discovery in a new goroutine - go func() { - m, err := mdns.New(p.DHT.Host(), ma.RENDEZVOUS) - if err == mdns.ErrNoProtectedPeersFound { - log.Warnf("No protected peers found") - return - } - if err != nil { - log.Errorf("Failed to start MDNS discovery: %s", err) - return - } - m.DiscoverPeers(ctx) - }() + go p.MDNS.DiscoveryLoop(ctx) - // Wait for a discovery process to complete - err := p.DHT.DiscoverPeers(ctx) - if err != dht.ErrNoProtectedPeersFound { - return fmt.Errorf("no new peers found %w", err) - } - if err != nil { - return fmt.Errorf("peer discovery unsuccessful: %w", err) - } + go p.DHT.DiscoveryLoop(ctx) return nil } - -// DiscoveryLoop is a blocking function that will periodically -// call DiscoverPeers() until the context is cancelled. -// This shouldn't be cancelled in normal operation. -// Each iteration will have a timeout of its own. - -func (p *P2P) DiscoveryLoop(ctx context.Context) { - log.Infof("Starting discovery with retry interval %s", config.P2PDiscoveryRetryIntervalString()) - for { - select { - case <-ctx.Done(): - return - default: - err := p.DiscoverPeers() // This will block until discovery is complete or timeout - if err != nil { - log.Debugf("Discovery attempt failed: %s", err) - } - sleepTime := config.P2PDiscoveryRetryInterval() - log.Debugf("Discovery sleeping for %s", sleepTime.String()) - time.Sleep(sleepTime) - } - } -} diff --git a/p2p/mdns/discover.go b/p2p/mdns/discovery.go similarity index 75% rename from p2p/mdns/discover.go rename to p2p/mdns/discovery.go index 43e66ea..238141d 100644 --- a/p2p/mdns/discover.go +++ b/p2p/mdns/discovery.go @@ -2,21 +2,21 @@ package mdns import ( "context" - "errors" log "github.com/sirupsen/logrus" ) -var ErrNoProtectedPeersFound = errors.New("protected peers not found") - -// DiscoverPeers starts the discovery process and connects to discovered peers until the context is cancelled. -func (m *MDNS) DiscoverPeers(ctx context.Context) error { +// DiscoveryLoop starts the discovery process and connects to discovered peers until the context is cancelled. +func (m *MDNS) DiscoveryLoop(ctx context.Context) error { log.Debugf("Discovering MDNS peers for service name: %s", m.rendezvous) for { select { case pai, ok := <-m.PeerChan: if !ok { + if !(ctx == nil) { // conext.Bacground() is nil + log.Fatalf("MDNS peer channel closed, ut was supposed to be running in the background.") + } log.Debug("MDNS peer channel closed.") return nil // Exit if the channel is closed } diff --git a/p2p/peer/peer.go b/p2p/peer/peer.go index 0f73109..b420d63 100644 --- a/p2p/peer/peer.go +++ b/p2p/peer/peer.go @@ -49,7 +49,7 @@ func GetOrCreateFromAddrInfo(addrInfo *p2peer.AddrInfo) (Peer, error) { nodeAlias = createNodeAlias(id) } - return New(addrInfo, nodeAlias, config.P2PDiscoveryAllow()), nil + return New(addrInfo, nodeAlias, config.P2PDiscoveryAllowAll()), nil } diff --git a/ui/commands.go b/ui/commands.go index dbac69b..a6c63b2 100644 --- a/ui/commands.go +++ b/ui/commands.go @@ -17,8 +17,6 @@ func (ui *ChatUI) handleCommands(input string) { switch args[0] { case "/broadcast": ui.handleBroadcastCommand(args) - case "/discover": - ui.triggerDiscovery() case "/edit": ui.handleEditCommand() case "/enter": @@ -33,8 +31,6 @@ func (ui *ChatUI) handleCommands(input string) { go ui.handleResolveCommand(args) // This make take some time. No need to block the UI case "/peer": ui.handlePeerCommand(args) - case "/p2p": - ui.handleP2PCommand(args) case "/reset": ui.handleResetCommand(args) case "/refresh": diff --git a/ui/help.go b/ui/help.go index 003d718..6c27e71 100644 --- a/ui/help.go +++ b/ui/help.go @@ -57,8 +57,6 @@ func (ui *ChatUI) handleHelpCommands(args []string) { switch args[1] { case "broadcast": ui.handleHelpCommand(broadcastUsage, broadcastHelp) - case "discover": - ui.handleHelpCommand(p2pDiscoverUsage, p2pDiscoverHelp) case "enter": ui.handleHelpCommand(enterUsage, enterHelp) case "entity": diff --git a/ui/p2p.go b/ui/p2p.go deleted file mode 100644 index 12a7346..0000000 --- a/ui/p2p.go +++ /dev/null @@ -1,40 +0,0 @@ -package ui - -const ( - p2pUsage = "/p2p discover" - p2pHelp = "P2P commands only feature discovery at the moment" - p2pDiscoverUsage = "/p2p discover" - p2pDiscoverHelp = "Triggers a discovery of peers" -) - -func (ui *ChatUI) triggerDiscovery() { - - ui.displaySystemMessage("Discovery process started...") - ui.p.DiscoverPeers() - ui.displaySystemMessage("Discovery process complete.") - -} -func (ui *ChatUI) handleP2PDiscoverCommand(args []string) { - - if len(args) == 2 { - ui.triggerDiscovery() - } else { - ui.handleHelpCommand(p2pDiscoverUsage, p2pDiscoverHelp) - } -} - -func (ui *ChatUI) handleP2PCommand(args []string) { - - if len(args) == 2 { - command := args[1] - switch command { - case "discover": - ui.handleP2PDiscoverCommand(args) - return - default: - ui.displaySystemMessage("Unknown p2p command: " + command) - } - } - - ui.handleHelpCommand(peerUsage, peerHelp) -} diff --git a/ui/reset.go b/ui/reset.go index e8212bc..b2c5575 100644 --- a/ui/reset.go +++ b/ui/reset.go @@ -17,10 +17,6 @@ func (ui *ChatUI) handleReset() { ui.msgBox.Clear() - // First trigger discovery of peers. - ui.displaySystemMessage("Discovering peers...") - ui.p.DiscoverPeers() - // Cancel the broadcast loop and start it again ui.displaySystemMessage("Resetting broadcast channel...") ui.broadcastCancel()