Skip to content

Commit

Permalink
Refactor P2P to be a struct
Browse files Browse the repository at this point in the history
This should greatly simply both use and configuration of P2P
functionality.

It only takes a Kademlia DHT instance or libp2p Options as aparameters.

The DHT Instance can be nil, but if you want to confifure routing or set
ServerMode that is possible and makes sense.

As for libp2p options, these are required to enable relay services and such.
  • Loading branch information
bahner committed Nov 27, 2023
1 parent 4ffa0bd commit ffc21ad
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 21 deletions.
18 changes: 12 additions & 6 deletions p2p/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,32 @@ package connmgr

import (
"github.com/bahner/go-ma-actor/config"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
p2pConnmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
log "github.com/sirupsen/logrus"
)

func Init() (*p2pConnmgr.BasicConnMgr, error) {
var connmgr *p2pConnmgr.BasicConnMgr

gracePeriod := config.GetConnMgrGracePeriod()
withGracePeriod := connmgr.WithGracePeriod(gracePeriod)
log.Infof("Connection manager grace period: %v", gracePeriod)
func Init(opts ...p2pConnmgr.Option) (*p2pConnmgr.BasicConnMgr, error) {

if connmgr != nil {
return connmgr, nil
}

lowWaterMark := config.GetLowWaterMark()
log.Infof("Connection manager low water mark: %v", lowWaterMark)

highWaterMark := config.GetHighWaterMark()
log.Infof("Connection manager high water mark: %v", highWaterMark)

gracePeriod := config.GetConnMgrGracePeriod()
opts = append(opts, p2pConnmgr.WithGracePeriod(gracePeriod))
log.Infof("Connection manager grace period: %v", gracePeriod)

return p2pConnmgr.NewConnManager(
lowWaterMark,
highWaterMark,
withGracePeriod,
opts...,
)

}
41 changes: 30 additions & 11 deletions p2p/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,33 @@ import (
log "github.com/sirupsen/logrus"
)

func Init(ctx context.Context, h host.Host, dhtOpts ...p2pDHT.Option) (*p2pDHT.IpfsDHT, error) {
log.Info("Initializing DHT.")
var dhtStructInstance *dhtStruct
var _ DHT = (*dhtStruct)(nil)

kademliaDHT, err := p2pDHT.New(ctx, h)
type DHT interface {
DiscoverPeers(context.Context) error
}

type dhtStruct struct {
*p2pDHT.IpfsDHT
h host.Host
}

func Init(ctx context.Context, h host.Host, dhtOpts ...p2pDHT.Option) (DHT, error) {
log.Info("Initializing Kademlia DHT.")

var err error
dhtStructInstance = &dhtStruct{h: h}

dhtStructInstance.IpfsDHT, err = p2pDHT.New(ctx, h, dhtOpts...)
if err != nil {
log.Error("Failed to create Kademlia DHT.")
return nil, err
} else {
log.Debug("Kademlia DHT created.")
}

err = kademliaDHT.Bootstrap(ctx)
err = dhtStructInstance.IpfsDHT.Bootstrap(ctx)
if err != nil {
log.Error("Failed to bootstrap Kademlia DHT.")
return nil, err
Expand Down Expand Up @@ -79,16 +94,16 @@ func Init(ctx context.Context, h host.Host, dhtOpts ...p2pDHT.Option) (*p2pDHT.I
}

log.Info("Kademlia DHT bootstrapped successfully.")
return kademliaDHT, nil
return dhtStructInstance, nil
}

// Takes a context and a DHT instance and discovers peers using the DHT.
// You might want to se server option or not for the DHT.
func DiscoverPeers(ctx context.Context, dhtInstance *p2pDHT.IpfsDHT, h host.Host) error {
func (d *dhtStruct) DiscoverPeers(ctx context.Context) error {

log.Debug("Starting DHT route discovery.")

routingDiscovery := drouting.NewRoutingDiscovery(dhtInstance)
routingDiscovery := drouting.NewRoutingDiscovery(d.IpfsDHT)
dutil.Advertise(ctx, routingDiscovery, ma.RENDEZVOUS)

log.Infof("Starting DHT peer discovery for rendezvous string: %s", ma.RENDEZVOUS)
Expand All @@ -107,20 +122,20 @@ discoveryLoop:
peerChan = nil
break
}
if p.ID == h.ID() {
if p.ID == d.h.ID() {
continue // Skip self connection
}

err := h.Connect(ctx, p) // Using the outer context directly
err := d.h.Connect(ctx, p) // Using the outer context directly
if err != nil {
log.Debugf("Failed connecting to %s, error: %v", p.ID.String(), err)
} else {
log.Infof("Connected to DHT peer: %s", p.ID.String())

// Add peer to list of known peers
log.Debugf("Protecting peer: %s", p.ID.String())
h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
h.ConnManager().Protect(p.ID, ma.RENDEZVOUS)
d.h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
d.h.ConnManager().Protect(p.ID, ma.RENDEZVOUS)

break discoveryLoop

Expand All @@ -138,3 +153,7 @@ discoveryLoop:
log.Info("DHT Peer discovery complete")
return nil
}

func Get() *p2pDHT.IpfsDHT {
return dhtStructInstance.IpfsDHT
}
6 changes: 4 additions & 2 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
func StartPeerDiscovery(ctx context.Context, h host.Host, dhtInstance *p2pdht.IpfsDHT) error {
log.Debug("Starting peer discovery...")
var err error
done := make(chan struct{}, 2) // Buffered channel to avoid blocking

// Only required for the first discovery process
// We *need* DHT, but MDNS is just a bonus.
done := make(chan struct{}, 1) // Buffered channel to avoid blocking

if dhtInstance == nil {
dhtInstance, err = dht.Init(ctx, h)

Check failure on line 30 in p2p/discovery.go

View workflow job for this annotation

GitHub Actions / build

cannot use dht.Init(ctx, h) (value of type "github.com/bahner/go-ma-actor/p2p/dht".DHT) as *"github.com/libp2p/go-libp2p-kad-dht".IpfsDHT value in assignment
Expand All @@ -47,7 +50,6 @@ func StartPeerDiscovery(ctx context.Context, h host.Host, dhtInstance *p2pdht.Ip
// Start MDNS discovery in a new goroutine
go func() {
mdns.DiscoverPeers(ctx, h)
done <- struct{}{} // Signal completion
}()

// Wait for a discovery process to complete
Expand Down
5 changes: 3 additions & 2 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package p2p
import (
"context"
"fmt"
"time"

"github.com/bahner/go-ma-actor/config"
"github.com/bahner/go-ma-actor/p2p/connmgr"
"github.com/bahner/go-ma-actor/p2p/node"
"github.com/bahner/go-ma-actor/p2p/pubsub"
Expand Down Expand Up @@ -37,7 +37,7 @@ var (
//
// The function return the libp2p node and a PubSub Service

func Init(ctx context.Context, i *ipns.Key, discoveryTimeout time.Duration, p2pOpts ...libp2p.Option) (host.Host, *p2ppubsub.PubSub, error) {
func Init(ctx context.Context, i *ipns.Key, p2pOpts ...libp2p.Option) (host.Host, *p2ppubsub.PubSub, error) {

// Initiate libp2p options, if none are provided
if p2pOpts == nil {
Expand Down Expand Up @@ -67,6 +67,7 @@ func Init(ctx context.Context, i *ipns.Key, discoveryTimeout time.Duration, p2pO
ctx = context.Background()
}

discoveryTimeout := config.GetDiscoveryTimeout()
ctxDiscovery, cancel = context.WithTimeout(ctx, discoveryTimeout)
defer cancel()

Expand Down

0 comments on commit ffc21ad

Please sign in to comment.