Skip to content

Commit

Permalink
Consolidate P2P
Browse files Browse the repository at this point in the history
The p2p package had ended up in dependency hell, but was also
overengineered. With a little gardening to get out the weeds,
the package is now much better balanced and easy to use.

It's much nicer to call peer.ConnectAndProtect() than some
DHT function.
  • Loading branch information
bahner committed Mar 9, 2024
1 parent 0e3c840 commit e0f6044
Show file tree
Hide file tree
Showing 28 changed files with 377 additions and 463 deletions.
6 changes: 3 additions & 3 deletions cmd/actor/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"fmt"

"github.com/bahner/go-ma-actor/config"
"github.com/bahner/go-ma-actor/p2p"
"github.com/bahner/go-ma-actor/p2p/connmgr"
"github.com/bahner/go-ma-actor/p2p/dht"
"github.com/bahner/go-ma-actor/p2p/node"
"github.com/libp2p/go-libp2p"
)

func DHT(cg *connmgr.ConnectionGater) (*dht.DHT, error) {
func DHT(cg *connmgr.ConnectionGater) (*p2p.DHT, error) {

// THese are the relay specific parts.
p2pOpts := []libp2p.Option{
Expand All @@ -22,7 +22,7 @@ func DHT(cg *connmgr.ConnectionGater) (*dht.DHT, error) {
return nil, fmt.Errorf("pong: failed to create libp2p node: %w", err)
}

d, err := dht.New(n, cg)
d, err := p2p.NewDHT(n, cg)
if err != nil {
return nil, fmt.Errorf("pong: failed to create DHT: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/actor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func main() {
// So let's just start it quickly and stop here.
if config.RelayMode() {
fmt.Println("Starting relay mode...")
go p2P.DiscoveryLoop(context.Background())
go p2P.StartDiscoveryLoop(context.Background())
startWebServer(p2P, nil)
os.Exit(0) // This won't be reached.
}
Expand Down
4 changes: 2 additions & 2 deletions entity/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (e *Entity) ConnectPeer() (pi p2peer.AddrInfo, err error) {
}

// If we're already connected, return
if p.DHT.Host().Network().Connectedness(pid) == network.Connected {
if p.Host.Network().Connectedness(pid) == network.Connected {
log.Debugf("Already connected to peer: %s", pid.String())
return pi, peer.ErrAlreadyConnected
}
Expand All @@ -37,7 +37,7 @@ func (e *Entity) ConnectPeer() (pi p2peer.AddrInfo, err error) {

// Connect to the peer
log.Debugf("Connecting to peer with addrs: %v", pi.Addrs)
err = p.DHT.Host().Connect(e.Ctx, pi)
err = p.Host.Connect(e.Ctx, pi)

return pi, err

Expand Down
4 changes: 2 additions & 2 deletions entity/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func webHandler(w http.ResponseWriter, _ *http.Request, p *p2p.P2P, e *Entity) {

doc.Title = titleStr
doc.H1 = h1str
doc.H2 = fmt.Sprintf("%s@%s", ma.RENDEZVOUS, (p.DHT.Host().ID().String()))
doc.Addrs = p.DHT.Host().Addrs()
doc.H2 = fmt.Sprintf("%s@%s", ma.RENDEZVOUS, (p.Host.ID().String()))
doc.Addrs = p.Host.Addrs()
doc.AllConnectedPeers = p.AllConnectedPeers()
doc.PeersWithSameRendez = p.ConnectedProtectedPeers()

Expand Down
6 changes: 3 additions & 3 deletions mode/pong/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"fmt"

"github.com/bahner/go-ma-actor/config"
"github.com/bahner/go-ma-actor/p2p"
"github.com/bahner/go-ma-actor/p2p/connmgr"
"github.com/bahner/go-ma-actor/p2p/dht"
"github.com/bahner/go-ma-actor/p2p/node"
"github.com/libp2p/go-libp2p"
p2pDHT "github.com/libp2p/go-libp2p-kad-dht"
)

func DHT(cg *connmgr.ConnectionGater) (*dht.DHT, error) {
func DHT(cg *connmgr.ConnectionGater) (*p2p.DHT, error) {

// THese are the relay specific parts.
p2pOpts := []libp2p.Option{
Expand All @@ -28,7 +28,7 @@ func DHT(cg *connmgr.ConnectionGater) (*dht.DHT, error) {
return nil, fmt.Errorf("pong: failed to create libp2p node: %w", err)
}

d, err := dht.New(n, cg, dhtOpts...)
d, err := p2p.NewDHT(n, cg, dhtOpts...)
if err != nil {
return nil, fmt.Errorf("pong: failed to create DHT: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions mode/pong/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func Run(a *actor.Actor, p *p2p.P2P) {
viper.SetDefault("mode.pong.fortune", config.DEFAULT_PONG_FORTUNE_MODE)

fmt.Printf("Starting pong mode as %s\n", a.Entity.DID.Id)
go p.DiscoveryLoop(ctx)
go p.StartDiscoveryLoop(ctx)
fmt.Println("Discovery loop started.")
go a.Subscribe(ctx, a.Entity)
fmt.Println("Subscribed to self.")
Expand All @@ -38,7 +38,7 @@ func Run(a *actor.Actor, p *p2p.P2P) {
actor.HelloWorld(ctx, a)
fmt.Println("Sent hello world.")

fmt.Printf("Running in pong mode as %s@%s\n", a.Entity.DID.Id, p.DHT.Host().ID())
fmt.Printf("Running in pong mode as %s@%s\n", a.Entity.DID.Id, p.Host.ID())
fmt.Println("Press Ctrl-C to stop.")

for {
Expand Down
6 changes: 3 additions & 3 deletions mode/relay/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"fmt"

"github.com/bahner/go-ma-actor/config"
"github.com/bahner/go-ma-actor/p2p"
"github.com/bahner/go-ma-actor/p2p/connmgr"
"github.com/bahner/go-ma-actor/p2p/dht"
"github.com/bahner/go-ma-actor/p2p/node"
"github.com/libp2p/go-libp2p"
p2pDHT "github.com/libp2p/go-libp2p-kad-dht"
)

func DHT(cg *connmgr.ConnectionGater) (*dht.DHT, error) {
func DHT(cg *connmgr.ConnectionGater) (*p2p.DHT, error) {

// THese are the relay specific parts.
p2pOpts := []libp2p.Option{
Expand All @@ -27,7 +27,7 @@ func DHT(cg *connmgr.ConnectionGater) (*dht.DHT, error) {
return nil, fmt.Errorf("pong: failed to create libp2p node: %w", err)
}

d, err := dht.New(n, cg, dhtOpts...)
d, err := p2p.NewDHT(n, cg, dhtOpts...)
if err != nil {
return nil, fmt.Errorf("pong: failed to create DHT: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions mode/relay/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func webHandler(w http.ResponseWriter, _ *http.Request, p *p2p.P2P) {
doc := NewWebHandlerDocument()

doc.Title = fmt.Sprintf("Bootstrap peer for rendezvous %s.", ma.RENDEZVOUS)
doc.H1 = fmt.Sprintf("%s@%s", ma.RENDEZVOUS, (p.DHT.Host().ID().String()))
doc.H1 = fmt.Sprintf("%s@%s", ma.RENDEZVOUS, (p.Host.ID().String()))
doc.H1 += fmt.Sprintf("<br>Found %d peers with rendezvous %s", len(p.ConnectedProtectedPeers()), ma.RENDEZVOUS)
doc.Addrs = p.DHT.Host().Addrs()
doc.Addrs = p.Host.Addrs()
doc.ProtectedPeers = p.ConnectedProtectedPeers()
doc.UnprotectedPeers = p.ConnectedUnprotectedPeers()
// doc.AllConnectedPeers = p.GetAllConnectedPeers()
Expand Down
29 changes: 29 additions & 0 deletions p2p/advertise.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package p2p

import (
"context"
"time"

"github.com/bahner/go-ma"
"github.com/bahner/go-ma-actor/config"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/discovery/util"
log "github.com/sirupsen/logrus"
)

func advertisementLoop(ctx context.Context, routingDiscovery *routing.RoutingDiscovery, discoveryOpts ...discovery.Option) {

ticker := time.NewTicker(config.P2PDiscoveryAdvertiseInterval())
defer ticker.Stop()

for {
select {
case <-ticker.C:
util.Advertise(ctx, routingDiscovery, ma.RENDEZVOUS, discoveryOpts...)
log.Debugf("Advertising rendezvous string: %s", ma.RENDEZVOUS)
case <-ctx.Done():
return
}
}
}
6 changes: 3 additions & 3 deletions p2p/dht/bootstrap.go → p2p/bootstrap.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dht
package p2p

import (
"context"
Expand All @@ -14,7 +14,7 @@ import (
func (d *DHT) Bootstrap(ctx context.Context) error {
log.Info("Initialising Kademlia DHT.")

err := d.IpfsDHT.Bootstrap(context.Background())
err := d.IpfsDHT.Bootstrap(ctx)
if err != nil {
return fmt.Errorf("failed to bootstrap Kademlia DHT: %w", err)
}
Expand All @@ -36,7 +36,7 @@ func (d *DHT) Bootstrap(ctx context.Context) error {
go func(pInfo peer.AddrInfo) {
defer wg.Done()

if err := d.h.Connect(ctx, pInfo); err != nil {
if err := d.Host.Connect(ctx, pInfo); err != nil {
log.Warnf("Bootstrap warning: %v", err)
}
}(*peerinfo)
Expand Down
93 changes: 93 additions & 0 deletions p2p/dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package p2p

import (
"context"
"fmt"

"github.com/bahner/go-ma"
"github.com/bahner/go-ma-actor/p2p/connmgr"
"github.com/bahner/go-ma-actor/p2p/peer"
p2pDHT "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
p2peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
log "github.com/sirupsen/logrus"
)

type DHT struct {
*p2pDHT.IpfsDHT
Host host.Host
ConnectionGater *connmgr.ConnectionGater
}

// Initialise The Kademlia DHT and bootstrap it.
// The context is used to abort the process, but context.Background() probably works fine.
// If nil is passed, a background context will be used.
//
// The host is a libp2p host.
//
// Takes a variadic list of dht.Option. You'll need this if you want to set a custom routing table.
// or set the mode to server. None is fine for ordinary use.

func NewDHT(h host.Host, cg *connmgr.ConnectionGater, dhtOpts ...p2pDHT.Option) (*DHT, error) {

var err error

d := &DHT{
Host: h,
ConnectionGater: cg,
}

d.IpfsDHT, err = p2pDHT.New(context.Background(), h, dhtOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create Kademlia DHT: %w", err)
}

d.Bootstrap(context.Background())

return d, nil
}

var (
ErrFailedToCreateRoutingDiscovery = fmt.Errorf("failed to create routing discovery")
)

// Run a continuous discovery loop to find new peers
// The ctx should probably be a background context
func (d *DHT) discoveryLoop(ctx context.Context) error {
routingDiscovery := routing.NewRoutingDiscovery(d.IpfsDHT)
if routingDiscovery == nil {
return ErrFailedToCreateRoutingDiscovery
}

peerChan, err := routingDiscovery.FindPeers(ctx, ma.RENDEZVOUS)
if err != nil {
return fmt.Errorf("peer discovery error: %w", err)
}

go advertisementLoop(ctx, routingDiscovery) // Run advertisement continuously in the background
go discover(ctx, peerChan, d) // Run discovery continuously in the background

return nil
}

func discover(ctx context.Context, peerChan <-chan p2peer.AddrInfo, d *DHT) error {
for {
select {
case p, ok := <-peerChan:
if !ok {
log.Fatalf("DHT peer channel closed, but it was supposed to be running in the background.")
}

if p.ID == d.Host.ID() {
continue // Skip self
}

if err := peer.ConnectAndProtect(context.Background(), d.Host, p); err != nil {
log.Warnf("Failed to connect to discovered peer: %s: %v", p.ID.String(), err)
}
case <-ctx.Done():
return nil
}
}
}
69 changes: 0 additions & 69 deletions p2p/dht/connect.go

This file was deleted.

Loading

0 comments on commit e0f6044

Please sign in to comment.