Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: smart discovery (dead/solo remedy subnets) #1949

Open
wants to merge 4 commits into
base: stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"
"tailscale.com/util/singleflight"

"github.com/ssvlabs/ssv/logging/fields"
operatordatastore "github.com/ssvlabs/ssv/operator/datastore"
"github.com/ssvlabs/ssv/operator/slotticker"
beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
"github.com/ssvlabs/ssv/utils/casts"
"go.uber.org/zap"
"tailscale.com/util/singleflight"
Comment on lines +27 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous location looks better

)

const (
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ require (
github.com/herumi/bls-eth-go-binary v1.29.1
github.com/holiman/uint256 v1.3.1
github.com/ilyakaznacheev/cleanenv v1.4.2
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/jellydator/ttlcache/v3 v3.3.0
github.com/libp2p/go-libp2p v0.36.3
github.com/libp2p/go-libp2p-kad-dht v0.25.2
github.com/libp2p/go-libp2p-pubsub v0.12.0
github.com/microsoft/go-crypto-openssl v0.2.9
github.com/multiformats/go-multiaddr v0.13.0
github.com/oleiade/lane/v2 v2.0.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.5
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE=
github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg=
github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down Expand Up @@ -537,6 +537,8 @@ github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/oleiade/lane/v2 v2.0.0 h1:XW/ex/Inr+bPkLd3O240xrFOhUkTd4Wy176+Gv0E3Qw=
github.com/oleiade/lane/v2 v2.0.0/go.mod h1:i5FBPFAYSWCgLh58UkUGCChjcCzef/MI7PlQm2TKCeg=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
3 changes: 1 addition & 2 deletions network/discovery/dv5_bootnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package discovery
import (
"context"

"go.uber.org/zap"

"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/networkconfig"
"github.com/ssvlabs/ssv/utils"
"go.uber.org/zap"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous import place looked better

)

// BootnodeOptions contains options to create the node
Expand Down
21 changes: 21 additions & 0 deletions network/discovery/dv5_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/records"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -48,6 +49,26 @@
}
}

func (dvs *DiscV5Service) alreadyConnectedFilter() func(node *enode.Node) bool {
return func(node *enode.Node) bool {
pid, err := PeerID(node)
if err != nil {
return false
}

Check warning on line 57 in network/discovery/dv5_filters.go

View check run for this annotation

Codecov / codecov/patch

network/discovery/dv5_filters.go#L56-L57

Added lines #L56 - L57 were not covered by tests
return dvs.conns.Connectedness(pid) != libp2pnetwork.Connected
}
}

func (dvs *DiscV5Service) recentlyTrimmedFilter() func(node *enode.Node) bool {
return func(node *enode.Node) bool {
pid, err := PeerID(node)
if err != nil {
return false
}

Check warning on line 67 in network/discovery/dv5_filters.go

View check run for this annotation

Codecov / codecov/patch

network/discovery/dv5_filters.go#L66-L67

Added lines #L66 - L67 were not covered by tests
return !peers.TrimmedRecently.Has(pid)
}
}

// subnetFilter checks if the node has an interest in the given subnet
func (dvs *DiscV5Service) subnetFilter(subnets ...uint64) func(node *enode.Node) bool {
return func(node *enode.Node) bool {
Expand Down
5 changes: 2 additions & 3 deletions network/discovery/dv5_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/logging/fields"
"go.uber.org/zap"
)

// implementing discovery.Discovery
Expand Down Expand Up @@ -56,7 +55,7 @@

dvs.discover(ctx, func(e PeerEvent) {
cn <- e.AddrInfo
}, time.Millisecond, dvs.ssvNodeFilter(logger), dvs.badNodeFilter(logger), dvs.subnetFilter(subnet))
}, time.Millisecond, dvs.ssvNodeFilter(logger), dvs.badNodeFilter(logger), dvs.subnetFilter(subnet), dvs.alreadyConnectedFilter(), dvs.recentlyTrimmedFilter())

return cn, nil
}
31 changes: 15 additions & 16 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/networkconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
Expand Down Expand Up @@ -68,25 +67,25 @@ type DiscV5Service struct {
publishLock chan struct{}
}

func newDiscV5Service(pctx context.Context, logger *zap.Logger, discOpts *Options) (Service, error) {
func newDiscV5Service(pctx context.Context, logger *zap.Logger, opts *Options) (Service, error) {
ctx, cancel := context.WithCancel(pctx)
dvs := DiscV5Service{
ctx: ctx,
cancel: cancel,
conns: discOpts.ConnIndex,
subnetsIdx: discOpts.SubnetsIdx,
networkConfig: discOpts.NetworkConfig,
subnets: discOpts.DiscV5Opts.Subnets,
conns: opts.ConnIndex,
subnetsIdx: opts.SubnetsIdx,
networkConfig: opts.NetworkConfig,
subnets: opts.DiscV5Opts.Subnets,
publishLock: make(chan struct{}, 1),
}

logger.Debug(
"configuring discv5 discovery",
zap.Any("discV5Opts", discOpts.DiscV5Opts),
zap.Any("hostAddress", discOpts.HostAddress),
zap.Any("hostDNS", discOpts.HostDNS),
zap.Any("discV5Opts", opts.DiscV5Opts),
zap.Any("hostAddress", opts.HostAddress),
zap.Any("hostDNS", opts.HostDNS),
)
if err := dvs.initDiscV5Listener(logger, discOpts); err != nil {
if err := dvs.initDiscV5Listener(logger, opts); err != nil {
return nil, err
}
return &dvs, nil
Expand Down Expand Up @@ -162,7 +161,7 @@ func (dvs *DiscV5Service) Bootstrap(logger *zap.Logger, handler HandleNewPeer) e
return
}
handler(e)
}, defaultDiscoveryInterval) // , dvs.forkVersionFilter) //, dvs.badNodeFilter)
}, defaultDiscoveryInterval, dvs.ssvNodeFilter(logger), dvs.sharedSubnetsFilter(1), dvs.badNodeFilter(logger), dvs.alreadyConnectedFilter(), dvs.recentlyTrimmedFilter())
iurii-ssv marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
Expand All @@ -183,16 +182,16 @@ func (dvs *DiscV5Service) checkPeer(ctx context.Context, logger *zap.Logger, e P
}

// Get the peer's subnets, skipping if it has none.
nodeSubnets, err := records.GetSubnetsEntry(e.Node.Record())
peerSubnets, err := records.GetSubnetsEntry(e.Node.Record())
if err != nil {
return fmt.Errorf("could not read subnets: %w", err)
}
if bytes.Equal(zeroSubnets, nodeSubnets) {
if bytes.Equal(zeroSubnets, peerSubnets) {
recordPeerSkipped(ctx, skipReasonZeroSubnets)
return errors.New("zero subnets")
}

dvs.subnetsIdx.UpdatePeerSubnets(e.AddrInfo.ID, nodeSubnets)
dvs.subnetsIdx.UpdatePeerSubnets(e.AddrInfo.ID, peerSubnets)

// Filters
if !dvs.limitNodeFilter(e.Node) {
Expand Down
8 changes: 3 additions & 5 deletions network/discovery/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import (
"crypto/ecdsa"
"net"

"github.com/ssvlabs/ssv/logging"
compatible_logger "github.com/ssvlabs/ssv/network/discovery/logger"

"github.com/ssvlabs/ssv/network/commons"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/pkg/errors"
"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/network/commons"
compatible_logger "github.com/ssvlabs/ssv/network/discovery/logger"
"go.uber.org/zap"
)

Expand Down
3 changes: 1 addition & 2 deletions network/discovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/networkconfig"
"go.uber.org/zap"
)

const (
Expand Down
7 changes: 3 additions & 4 deletions network/discovery/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/networkconfig"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func CheckBootnodes(t *testing.T, dvs *DiscV5Service, netConfig networkconfig.NetworkConfig) {
Expand Down
10 changes: 6 additions & 4 deletions network/discovery/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/elliptic"
"crypto/rand"
"encoding/hex"
"fmt"
"net"
"sync"
"testing"
Expand Down Expand Up @@ -190,6 +191,7 @@ func CustomNode(t *testing.T,
record := enr.Record{}

// Set entries
record.Set(enr.WithEntry("ssv", true)) // marks node as SSV-related (we filter out SSV-unrelated ones)
record.Set(enr.IP(net.IPv4(127, 0, 0, 1)))
record.Set(enr.UDP(12000))
record.Set(enr.TCP(13000))
Expand Down Expand Up @@ -303,13 +305,13 @@ func (mc *MockConnection) Connectedness(id peer.ID) network.Connectedness {
return network.NotConnected
}

func (mc *MockConnection) CanConnect(id peer.ID) bool {
func (mc *MockConnection) CanConnect(id peer.ID) error {
mc.mu.RLock()
defer mc.mu.RUnlock()
if can, ok := mc.canConnect[id]; ok {
return can
if can, ok := mc.canConnect[id]; ok && can {
return nil
}
return false
return fmt.Errorf("cannot connect")
}

func (mc *MockConnection) AtLimit(dir network.Direction) bool {
Expand Down
6 changes: 3 additions & 3 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"io"

"github.com/libp2p/go-libp2p/core/peer"

"go.uber.org/zap"

"github.com/ssvlabs/ssv/network/records"
protocolp2p "github.com/ssvlabs/ssv/protocol/v2/p2p"
"go.uber.org/zap"
)

// DecodedSSVMessage serves as a marker interface for any SSV message types.
Expand Down Expand Up @@ -45,6 +43,8 @@ type P2PNetwork interface {
SubscribeAll(logger *zap.Logger) error
// SubscribeRandoms subscribes to random subnets
SubscribeRandoms(logger *zap.Logger, numSubnets int) error
// SubscribeFillerSubnets
SubscribeFillerSubnets(logger *zap.Logger) error
// UpdateScoreParams will update the scoring parameters of GossipSub
UpdateScoreParams(logger *zap.Logger)
// ActiveSubnets returns active subnets
Expand Down
47 changes: 20 additions & 27 deletions network/p2p/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
"fmt"
"sort"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/topics"
"github.com/ssvlabs/ssv/observability"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -55,33 +54,25 @@
return fmt.Sprintf("%s.%s", observabilityNamespace, name)
}

func recordPeerCount(ctx context.Context, logger *zap.Logger, host host.Host) func() {
func recordPeerCount(ctx context.Context, logger *zap.Logger, h host.Host) func() {
return func() {
peers := host.Network().Peers()
var (
numOfOutbound,
numOfInbound int64
numOfInbound, numOfOutbound := connectionStats(h)
numTotal := numOfInbound + numOfOutbound

logger.Debug(
"connected peers status",
zap.Int("peers_inbound", numOfInbound),
zap.Int("peers_outbound", numOfOutbound),
zap.Int("peers_total", numTotal),
)
for _, peer := range peers {
conns := host.Network().ConnsToPeer(peer)
for _, conn := range conns {
direction := conn.Stat().Direction
if direction == network.DirInbound {
numOfInbound++
} else if direction == network.DirOutbound {
numOfOutbound++
}
}
}
connectionsGauge.Record(ctx, numOfInbound, metric.WithAttributes(

connectionsGauge.Record(ctx, int64(numOfInbound), metric.WithAttributes(

Check warning on line 69 in network/p2p/observability.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/observability.go#L68-L69

Added lines #L68 - L69 were not covered by tests
observability.NetworkDirectionAttribute(network.DirInbound),
))
connectionsGauge.Record(ctx, numOfOutbound, metric.WithAttributes(
connectionsGauge.Record(ctx, int64(numOfOutbound), metric.WithAttributes(

Check warning on line 72 in network/p2p/observability.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/observability.go#L72

Added line #L72 was not covered by tests
observability.NetworkDirectionAttribute(network.DirOutbound),
))

logger.Debug("connected peers status", fields.Count(len(peers)))
peersConnectedGauge.Record(ctx, int64(len(peers)))
peersConnectedGauge.Record(ctx, int64(numTotal))

Check warning on line 75 in network/p2p/observability.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/observability.go#L75

Added line #L75 was not covered by tests
}
}

Expand Down Expand Up @@ -122,7 +113,9 @@
median = subnetPeerCounts[len(subnetPeerCounts)/2]
max = subnetPeerCounts[len(subnetPeerCounts)-1]
}
logger.Debug("topic peers distribution",
logger.Debug(
"topic peers distribution",
zap.Int("subnets_subscribed_total", len(ctrl.Topics())),

Check warning on line 118 in network/p2p/observability.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/observability.go#L116-L118

Added lines #L116 - L118 were not covered by tests
zap.Int("min", min),
zap.Int("median", median),
zap.Int("max", max),
Expand Down
Loading