Skip to content

Commit

Permalink
Use connmgr and modularise p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
bahner committed Nov 25, 2023
1 parent bc114f9 commit 72e426b
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 83 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ ifneq (,$(wildcard ./.env))
export
endif

default: clean tidy $(NAME) install
default: clean tidy $(NAME)

build: $(NAME)

init: go.mod tidy

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ eval $(go run . -genenv -forcePublish | tee .env)
type `./go-ma-actor -help`. Most config settings can be set with environment variables, as follows:

```bash
export GO_ACTOR_LOG_LEVEL="error"
export GO_ACTOR_DISCOVERY_TIMEOUT="300"
export GO_ACTOR_KEYSET="myBase58EncodedPrivkeyGeneratedByGenerate"
export GO_MA_ACTOR_LOG_LEVEL="error"
export GO_MA_ACTOR_DISCOVERY_TIMEOUT="300"
export GO_MA_ACTOR_KEYSET="myBase58EncodedPrivkeyGeneratedByGenerate"
```

## Identity
Expand All @@ -30,7 +30,7 @@ Just don't store somewhere insecure. It's your future identity.

```bash
unset HISTFILE
export GO_ACTOR_KEYSET=FooBarABCDEFbase58
export GO_MA_ACTOR_KEYSET=FooBarABCDEFbase58
```

or specified on the command line:
Expand Down
7 changes: 1 addition & 6 deletions TODO
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
## Bugs
* fix redraw of ui when issuing commands
* redirect logs from stdout to file

## Features
* integrate ipns
- publish did with key
- There is some logical error wityh handling existing keys
* fix redraw of ui when issuing commands
46 changes: 37 additions & 9 deletions config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ import (
)

const (
name = "go-ma-actor"
keyset_var = "GO_ACTOR_KEYSET"
entity_var = "GO_ACTOR_ENTITY"
discovery_timeout_var = "GO_ACTOR_DISCOVERY_TIMEOUT"
log_level_var = "GO_ACTOR_LOG_LEVEL"
defaultDiscoveryTimeout = 300
name = "go-ma-actor"
keyset_var = "GO_MA_ACTOR_KEYSET"
entity_var = "GO_MA_ACTOR_ENTITY"
discovery_timeout_var = "GO_MA_ACTOR_DISCOVERY_TIMEOUT"
low_watermark_var = "GO_MA_ACTOR_LOW_WATERMARK"
high_watermark_var = "GO_MA_ACTOR_HIGH_WATERMARK"
connmgr_grace_var = "GO_MA_ACTOR_CONNMGR_GRACE"
log_level_var = "GO_MA_ACTOR_LOG_LEVEL"

defaultDiscoveryTimeout int = 300
defaultLowWaterMark int = 2
defaultHighWaterMark int = 10
defaultConnMgrGrace time.Duration = time.Minute * 1
)

var (
Expand All @@ -31,9 +38,13 @@ var (
)

var (
discoveryTimeout int = env.GetInt(discovery_timeout_var, defaultDiscoveryTimeout)
logLevel string = env.Get(log_level_var, "info")
logfile string = env.Get("GO_ACTOR_LOG_FILE", name+"log")
discoveryTimeout int = env.GetInt(discovery_timeout_var, defaultDiscoveryTimeout)
lowWaterMark int = env.GetInt(low_watermark_var, defaultLowWaterMark)
highWaterMark int = env.GetInt(high_watermark_var, defaultHighWaterMark)
connmgrGracePeriod time.Duration = env.GetDuration(connmgr_grace_var, defaultConnMgrGrace)

logLevel string = env.Get(log_level_var, "info")
logfile string = env.Get("GO_MA_LOG_FILE", name+"log")

// What we want to communicate with initially
entity string = env.Get(entity_var, "")
Expand All @@ -50,7 +61,12 @@ func init() {
// Flags - user configurations
flag.StringVar(&logLevel, "loglevel", logLevel, "Loglevel to use for application")
flag.StringVar(&logfile, "logfile", logfile, "Logfile to use for application")

// P"P Settings
flag.IntVar(&discoveryTimeout, "discoveryTimeout", discoveryTimeout, "Timeout for peer discovery")
flag.IntVar(&lowWaterMark, "lowWaterMark", lowWaterMark, "Low watermark for peer discovery")
flag.IntVar(&highWaterMark, "highWaterMark", highWaterMark, "High watermark for peer discovery")
flag.DurationVar(&connmgrGracePeriod, "connmgrGracePeriod", connmgrGracePeriod, "Grace period for connection manager")

// Actor
flag.StringVar(&nick, "nick", nick, "Nickname to use in character creation")
Expand Down Expand Up @@ -110,3 +126,15 @@ func GetForcePublish() bool {
func GetDiscoveryTimeout() time.Duration {
return time.Duration(discoveryTimeout) * time.Second
}

func GetLowWaterMark() int {
return lowWaterMark
}

func GetHighWaterMark() int {
return highWaterMark
}

func GetConnMgrGracePeriod() time.Duration {
return connmgrGracePeriod
}
19 changes: 19 additions & 0 deletions p2p/connmgr/connmgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package connmgr

import (
"github.com/bahner/go-ma-actor/config"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
p2pConnmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
)

func Init() (*p2pConnmgr.BasicConnMgr, error) {

withGracePeriod := connmgr.WithGracePeriod(config.GetConnMgrGracePeriod())

return p2pConnmgr.NewConnManager(
config.GetLowWaterMark(),
config.GetHighWaterMark(),
withGracePeriod,
)

}
26 changes: 12 additions & 14 deletions p2p/dht.go → p2p/dht/dht.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package p2p
package dht

import (
"context"
"fmt"
"sync"

"github.com/bahner/go-ma"
dht "github.com/libp2p/go-libp2p-kad-dht"
p2pDHT "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
log "github.com/sirupsen/logrus"
)

func initDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
func Init(ctx context.Context, h host.Host, dhtOpts ...p2pDHT.Option) (*p2pDHT.IpfsDHT, error) {
log.Info("Initializing DHT.")

kademliaDHT, err := dht.New(ctx, h)
kademliaDHT, err := p2pDHT.New(ctx, h)
if err != nil {
log.Error("Failed to create Kademlia DHT.")
return nil, err
Expand All @@ -34,7 +34,7 @@ func initDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
}

var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
for _, peerAddr := range p2pDHT.DefaultBootstrapPeers {
peerinfo, err := peer.AddrInfoFromP2pAddr(peerAddr)
if err != nil {
log.Warnf("Failed to convert bootstrap peer address: %v", err)
Expand Down Expand Up @@ -81,14 +81,12 @@ func initDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
log.Info("Kademlia DHT bootstrapped successfully.")
return kademliaDHT, nil
}
func DiscoverDHTPeers(ctx context.Context, h host.Host) error {

log.Debug("Starting DHT route discovery.")
// Takes a context and a DHT instance and discovers peers using the DHT.
// You might want to se server option or not for the DHT.
func DiscoverPeers(ctx context.Context, dhtInstance *p2pDHT.IpfsDHT, h host.Host) error {

dhtInstance, err := initDHT(ctx, h)
if err != nil {
return err
}
log.Debug("Starting DHT route discovery.")

routingDiscovery := drouting.NewRoutingDiscovery(dhtInstance)
dutil.Advertise(ctx, routingDiscovery, ma.RENDEZVOUS)
Expand Down Expand Up @@ -120,9 +118,9 @@ discoveryLoop:
log.Infof("Connected to DHT peer: %s", p.ID.String())

// Add peer to list of known peers
peerMutex.Lock()
connectedPeers[p.ID.String()] = &p
peerMutex.Unlock()
log.Debugf("Protecting peer: %s", p.ID.String())
h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
h.ConnManager().Protect(p.ID, ma.RENDEZVOUS)

break discoveryLoop
}
Expand Down
12 changes: 10 additions & 2 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package p2p
import (
"context"

"github.com/bahner/go-ma-actor/p2p/dht"
"github.com/bahner/go-ma-actor/p2p/mdns"
"github.com/libp2p/go-libp2p/core/host"
log "github.com/sirupsen/logrus"
)
Expand All @@ -14,13 +16,19 @@ func StartPeerDiscovery(ctx context.Context, h host.Host) error {

// Start DHT discovery in a new goroutine
go func() {
DiscoverDHTPeers(ctx, h)
dhtINstance, err := dht.Init(ctx, h)
if err != nil {
log.Error("Failed to initialise DHT. Peer discovery unsuccessful.")
done <- struct{}{} // Signal completion
return
}
dht.DiscoverPeers(ctx, dhtINstance, h)
done <- struct{}{} // Signal completion
}()

// Start MDNS discovery in a new goroutine
go func() {
DiscoverMDNSPeers(ctx, h)
mdns.DiscoverPeers(ctx, h)
done <- struct{}{} // Signal completion
}()

Expand Down
18 changes: 10 additions & 8 deletions p2p/mdns.go → p2p/mdns/mdns.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package p2p
package mdns

import (
"context"
Expand All @@ -7,7 +7,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
p2pmdns "github.com/libp2p/go-libp2p/p2p/discovery/mdns"
log "github.com/sirupsen/logrus"
)

Expand All @@ -27,16 +27,18 @@ func initMDNS(h host.Host, rendezvous string) chan peer.AddrInfo {
n.PeerChan = make(chan peer.AddrInfo)

// An hour might be a long long period in practical applications. But this is fine for us
ser := mdns.NewMdnsService(h, rendezvous, n)
ser := p2pmdns.NewMdnsService(h, rendezvous, n)
if err := ser.Start(); err != nil {
panic(err)
}
return n.PeerChan
}
func DiscoverMDNSPeers(ctx context.Context, h host.Host) error {
func DiscoverPeers(ctx context.Context, h host.Host) error {
log.Debugf("Discovering MDNS peers for servicename: %s", ma.RENDEZVOUS)

peerChan := initMDNS(h, ma.RENDEZVOUS)
// Start trimming connections, so we have room for new friends
h.ConnManager().TrimOpenConns(context.Background())

discoveryLoop:
for {
Expand All @@ -58,12 +60,12 @@ discoveryLoop:
log.Infof("Connected to MDNS peer: %s", p.ID.String())

// Add peer to list of known peers
peerMutex.Lock()
connectedPeers[p.ID.String()] = &p
peerMutex.Unlock()
log.Debugf("Protecting discovered MDNS peer: %s", p.ID.String())
h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
h.ConnManager().Protect(p.ID, ma.RENDEZVOUS)

break discoveryLoop
}

case <-ctx.Done():
log.Info("Context cancelled, stopping MDNS peer discovery.")
return nil
Expand Down
51 changes: 12 additions & 39 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,23 @@ package p2p
import (
"context"
"fmt"
"sync"
"time"

"github.com/bahner/go-ma-actor/p2p/connmgr"
"github.com/bahner/go-ma-actor/p2p/node"
"github.com/bahner/go-ma-actor/p2p/pubsub"
"github.com/bahner/go-ma/key/ipns"
libp2p "github.com/libp2p/go-libp2p"
p2ppubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
log "github.com/sirupsen/logrus"
)

var (
err error

ctxDiscovery context.Context
cancel context.CancelFunc

n host.Host
ps *p2ppubsub.PubSub

connectedPeers = make(map[string]*peer.AddrInfo)
peerMutex sync.Mutex
)

// Initialise everything needed for p2p communication.
Expand All @@ -42,8 +36,17 @@ var (

func Init(ctx context.Context, i *ipns.Key, discoveryTimeout time.Duration) (host.Host, *p2ppubsub.PubSub, error) {

connMgr, err := connmgr.Init()
if err != nil {
return nil, nil, fmt.Errorf("p2p.Init: failed to create connection manager: %w", err)
}

p2pOpts := []libp2p.Option{
libp2p.ConnectionManager(connMgr),
}

// Create a new libp2p Host that listens on a random TCP port
n, err = node.New(i, nil)
n, err = node.New(i, p2pOpts...)
if err != nil {
return nil, nil, fmt.Errorf("p2p.Init: failed to create libp2p node: %w", err)
}
Expand Down Expand Up @@ -76,33 +79,3 @@ func GetPubSub() *p2ppubsub.PubSub {
func GetNode() host.Host {
return n
}

// Get list of connectpeers.
// The connectTimeout is how long to wait for a connection to be established.
// This applies to each host in turn.
// If set to 0 a default timeout of 5 seconds will be used.
func GetConnectedPeers(connectTimeout time.Duration) map[string]*peer.AddrInfo {
defaultTimeoutSeconds := 5

if connectTimeout == 0 {
connectTimeout = time.Duration(defaultTimeoutSeconds) * time.Second
}

for p, addrs := range connectedPeers {

ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
defer cancel()

// Try connecting to the peer
if err := n.Connect(ctx, *addrs); err != nil {
log.Debugf("Failed connecting to %s, error: %v. Pruning.", p, err)

peerMutex.Lock()
delete(connectedPeers, p)
peerMutex.Unlock()
}
}

// No need to copy the peers again, as the new hosts are already live
return connectedPeers
}
Loading

0 comments on commit 72e426b

Please sign in to comment.