Skip to content

Commit

Permalink
p2p: dead/solo subnets remedy (enhanced)
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Dec 18, 2024
1 parent 4ed1fad commit a364c2c
Show file tree
Hide file tree
Showing 28 changed files with 484 additions and 469 deletions.
5 changes: 2 additions & 3 deletions beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"
"tailscale.com/util/singleflight"

"github.com/ssvlabs/ssv/logging/fields"
operatordatastore "github.com/ssvlabs/ssv/operator/datastore"
"github.com/ssvlabs/ssv/operator/slotticker"
beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
"github.com/ssvlabs/ssv/utils/casts"
"go.uber.org/zap"
"tailscale.com/util/singleflight"
)

const (
Expand Down
5 changes: 2 additions & 3 deletions network/discovery/dv5_bootnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package discovery
import (
"context"

"go.uber.org/zap"

"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/networkconfig"
"github.com/ssvlabs/ssv/utils"
"go.uber.org/zap"
)

// BootnodeOptions contains options to create the node
Expand Down Expand Up @@ -68,5 +67,5 @@ func createBootnodeDiscovery(ctx context.Context, logger *zap.Logger, networkCfg
Bootnodes: []string{},
},
}
return newDiscV5Service(ctx, logger, discOpts)
return newDiscV5Service(ctx, logger, false, nil, discOpts)
}
21 changes: 21 additions & 0 deletions network/discovery/dv5_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/records"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -48,6 +49,26 @@ func (dvs *DiscV5Service) ssvNodeFilter(logger *zap.Logger) func(node *enode.Nod
}
}

func (dvs *DiscV5Service) alreadyConnectedFilter() func(node *enode.Node) bool {
return func(node *enode.Node) bool {
pid, err := PeerID(node)
if err != nil {
return false
}
return dvs.conns.Connectedness(pid) != libp2pnetwork.Connected
}
}

func (dvs *DiscV5Service) recentlyTrimmedFilter() func(node *enode.Node) bool {
return func(node *enode.Node) bool {
pid, err := PeerID(node)
if err != nil {
return false
}
return !peers.TrimmedRecently.Has(pid)
}
}

// subnetFilter checks if the node has an interest in the given subnet
func (dvs *DiscV5Service) subnetFilter(subnets ...uint64) func(node *enode.Node) bool {
return func(node *enode.Node) bool {
Expand Down
7 changes: 3 additions & 4 deletions network/discovery/dv5_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/logging/fields"
"go.uber.org/zap"
)

// implementing discovery.Discovery
Expand Down Expand Up @@ -54,9 +53,9 @@ func (dvs *DiscV5Service) FindPeers(ctx context.Context, ns string, opt ...disco
}
cn := make(chan peer.AddrInfo, 32)

dvs.discover(ctx, func(e PeerEvent) {
dvs.discover(ctx, logger, func(e PeerEvent) {
cn <- e.AddrInfo
}, time.Millisecond, dvs.ssvNodeFilter(logger), dvs.badNodeFilter(logger), dvs.subnetFilter(subnet))
}, time.Millisecond, dvs.ssvNodeFilter(logger), dvs.badNodeFilter(logger), dvs.subnetFilter(subnet), dvs.alreadyConnectedFilter(), dvs.recentlyTrimmedFilter())

return cn, nil
}
97 changes: 75 additions & 22 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ import (
"context"
"fmt"
"net"
"strconv"
"time"

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/network/topics"
"github.com/ssvlabs/ssv/networkconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
Expand Down Expand Up @@ -56,6 +57,14 @@ type DiscV5Service struct {
dv5Listener Listener
bootnodes []*enode.Node

// withExtraFiltering specifies whether we want to additionally filter discovered peers,
// since operator node and boot node are using the same discovery code (`DiscV5Service`) we
// want to do this extra filtering for operator node only
withExtraFiltering bool
// topicsCtrl must be non-nil when withExtraFiltering is set to true since it will be used
// to help us decide which peers need to be filtered out
topicsCtrl topics.Controller

conns peers.ConnectionIndex
subnetsIdx peers.SubnetsIndex

Expand All @@ -68,20 +77,33 @@ type DiscV5Service struct {
publishLock chan struct{}
}

func newDiscV5Service(pctx context.Context, logger *zap.Logger, discOpts *Options) (Service, error) {
func newDiscV5Service(
pctx context.Context,
logger *zap.Logger,
withExtraFiltering bool,
topicsController topics.Controller,
opts *Options,
) (Service, error) {
ctx, cancel := context.WithCancel(pctx)
dvs := DiscV5Service{
ctx: ctx,
cancel: cancel,
conns: discOpts.ConnIndex,
subnetsIdx: discOpts.SubnetsIdx,
networkConfig: discOpts.NetworkConfig,
subnets: discOpts.DiscV5Opts.Subnets,
publishLock: make(chan struct{}, 1),
}

logger.Debug("configuring discv5 discovery", zap.Any("discOpts", discOpts))
if err := dvs.initDiscV5Listener(logger, discOpts); err != nil {
ctx: ctx,
cancel: cancel,
withExtraFiltering: withExtraFiltering,
topicsCtrl: topicsController,
conns: opts.ConnIndex,
subnetsIdx: opts.SubnetsIdx,
networkConfig: opts.NetworkConfig,
subnets: opts.DiscV5Opts.Subnets,
publishLock: make(chan struct{}, 1),
}

logger.Debug(
"configuring discv5 discovery",
zap.Any("discV5Opts", opts.DiscV5Opts),
zap.Any("hostAddress", opts.HostAddress),
zap.Any("hostDNS", opts.HostDNS),
)
if err := dvs.initDiscV5Listener(logger, opts); err != nil {
return nil, err
}
return &dvs, nil
Expand Down Expand Up @@ -143,11 +165,12 @@ func (dvs *DiscV5Service) Bootstrap(logger *zap.Logger, handler HandleNewPeer) e
const logFrequency = 10
var skippedPeers uint64 = 0

dvs.discover(dvs.ctx, func(e PeerEvent) {
dvs.discover(dvs.ctx, logger, func(e PeerEvent) {
logger := logger.With(
fields.ENR(e.Node),
fields.PeerID(e.AddrInfo.ID),
)

err := dvs.checkPeer(logger, e)
if err != nil {
if skippedPeers%logFrequency == 0 {
Expand All @@ -157,7 +180,7 @@ func (dvs *DiscV5Service) Bootstrap(logger *zap.Logger, handler HandleNewPeer) e
return
}
handler(e)
}, defaultDiscoveryInterval) // , dvs.forkVersionFilter) //, dvs.badNodeFilter)
}, defaultDiscoveryInterval, dvs.ssvNodeFilter(logger), dvs.sharedSubnetsFilter(1), dvs.badNodeFilter(logger), dvs.alreadyConnectedFilter(), dvs.recentlyTrimmedFilter())

return nil
}
Expand All @@ -176,15 +199,15 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
}

// Get the peer's subnets, skipping if it has none.
nodeSubnets, err := records.GetSubnetsEntry(e.Node.Record())
peerSubnets, err := records.GetSubnetsEntry(e.Node.Record())
if err != nil {
return fmt.Errorf("could not read subnets: %w", err)
}
if bytes.Equal(zeroSubnets, nodeSubnets) {
if bytes.Equal(zeroSubnets, peerSubnets) {
return errors.New("zero subnets")
}

dvs.subnetsIdx.UpdatePeerSubnets(e.AddrInfo.ID, nodeSubnets)
dvs.subnetsIdx.UpdatePeerSubnets(e.AddrInfo.ID, peerSubnets)

// Filters
if !dvs.limitNodeFilter(e.Node) {
Expand All @@ -196,6 +219,36 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
return errors.New("no shared subnets")
}

if dvs.withExtraFiltering {
helpfulPeer := false // whether this peer helps us with getting rid of dead/solo subnets
subscribedTopics := dvs.topicsCtrl.Topics()
for _, topic := range subscribedTopics {
topicPeers, err := dvs.topicsCtrl.Peers(topic)
if err != nil {
return errors.Wrap(err, "could not get subscribed topic peers")
}

if len(topicPeers) >= 2 {
continue // this topic has enough peers
}

// we've got a dead subnet here, see if this peer can help with that
subnet, err := strconv.Atoi(topic)
if err != nil {
return errors.Wrap(err, "could not convert topic name to subnet id")
}
peerSubnet := peerSubnets[subnet]
if peerSubnet != 1 {
continue // peer doesn't have this subnet either, lets check other dead subnets we have
}
helpfulPeer = true // this peer helps with at least 1 dead subnet for us
break
}
if !helpfulPeer {
return errors.New("this peer doesn't help with dead subnets")
}
}

metricFoundNodes.Inc()
return nil
}
Expand Down Expand Up @@ -281,7 +334,7 @@ func (dvs *DiscV5Service) initDiscV5Listener(logger *zap.Logger, discOpts *Optio
// interval enables to control the rate of new nodes that we find.
// filters will be applied on each new node before the handler is called,
// enabling to apply custom access control for different scenarios.
func (dvs *DiscV5Service) discover(ctx context.Context, handler HandleNewPeer, interval time.Duration, filters ...NodeFilter) {
func (dvs *DiscV5Service) discover(ctx context.Context, logger *zap.Logger, handler HandleNewPeer, interval time.Duration, filters ...NodeFilter) {
iterator := dvs.dv5Listener.RandomNodes()
for _, f := range filters {
iterator = enode.Filter(iterator, f)
Expand Down Expand Up @@ -391,7 +444,7 @@ func (dvs *DiscV5Service) PublishENR(logger *zap.Logger) {
peerIDs := map[peer.ID]struct{}{}

// Publish ENR.
dvs.discover(ctx, func(e PeerEvent) {
dvs.discover(ctx, logger, func(e PeerEvent) {
metricPublishEnrPings.Inc()
err := dvs.dv5Listener.Ping(e.Node)
if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions network/discovery/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import (
"crypto/ecdsa"
"net"

"github.com/ssvlabs/ssv/logging"
compatible_logger "github.com/ssvlabs/ssv/network/discovery/logger"

"github.com/ssvlabs/ssv/network/commons"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/pkg/errors"
"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/network/commons"
compatible_logger "github.com/ssvlabs/ssv/network/discovery/logger"
"go.uber.org/zap"
)

Expand Down
8 changes: 4 additions & 4 deletions network/discovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/topics"
"github.com/ssvlabs/ssv/networkconfig"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -54,9 +54,9 @@ type Service interface {
}

// NewService creates new discovery.Service
func NewService(ctx context.Context, logger *zap.Logger, opts Options) (Service, error) {
func NewService(ctx context.Context, logger *zap.Logger, topicsController topics.Controller, opts Options) (Service, error) {
if opts.DiscV5Opts == nil {
return NewLocalDiscovery(ctx, logger, opts.Host)
}
return newDiscV5Service(ctx, logger, &opts)
return newDiscV5Service(ctx, logger, true, topicsController, &opts)
}
13 changes: 6 additions & 7 deletions network/discovery/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ package discovery

import (
"context"
"github.com/ethereum/go-ethereum/p2p/discover"
"testing"
"time"

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/networkconfig"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func CheckBootnodes(t *testing.T, dvs *DiscV5Service, netConfig networkconfig.NetworkConfig) {
Expand Down Expand Up @@ -145,7 +144,7 @@ func TestDiscV5Service_PublishENR(t *testing.T) {
defer cancel()

opts := testingDiscoveryOptions(t, testNetConfig)
service, err := newDiscV5Service(ctx, testLogger, opts)
service, err := newDiscV5Service(ctx, testLogger, false, nil, opts)
require.NoError(t, err)
dvs := service.(*DiscV5Service)

Expand Down Expand Up @@ -174,7 +173,7 @@ func TestDiscV5Service_Bootstrap(t *testing.T) {

opts := testingDiscoveryOptions(t, testNetConfig)

service, err := newDiscV5Service(testCtx, testLogger, opts)
service, err := newDiscV5Service(testCtx, testLogger, false, nil, opts)
require.NoError(t, err)

dvs := service.(*DiscV5Service)
Expand Down
11 changes: 6 additions & 5 deletions network/discovery/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/elliptic"
"crypto/rand"
"encoding/hex"
"fmt"
"net"
"sync"
"testing"
Expand Down Expand Up @@ -73,7 +74,7 @@ func testingDiscoveryOptions(t *testing.T, networkConfig networkconfig.NetworkCo
// Testing discovery with a given NetworkConfig
func testingDiscoveryWithNetworkConfig(t *testing.T, netConfig networkconfig.NetworkConfig) *DiscV5Service {
opts := testingDiscoveryOptions(t, netConfig)
service, err := newDiscV5Service(testCtx, testLogger, opts)
service, err := newDiscV5Service(testCtx, testLogger, false, nil, opts)
require.NoError(t, err)
require.NotNil(t, service)

Expand Down Expand Up @@ -303,13 +304,13 @@ func (mc *MockConnection) Connectedness(id peer.ID) network.Connectedness {
return network.NotConnected
}

func (mc *MockConnection) CanConnect(id peer.ID) bool {
func (mc *MockConnection) CanConnect(id peer.ID) error {
mc.mu.RLock()
defer mc.mu.RUnlock()
if can, ok := mc.canConnect[id]; ok {
return can
if can, ok := mc.canConnect[id]; ok && can {
return nil
}
return false
return fmt.Errorf("cannot connect")
}

func (mc *MockConnection) AtLimit(dir network.Direction) bool {
Expand Down
Loading

0 comments on commit a364c2c

Please sign in to comment.