Skip to content

Commit

Permalink
fix(dot/network): fix receiving notifications messages from substrate…
Browse files Browse the repository at this point in the history
… peers (#1517)
  • Loading branch information
noot authored Apr 14, 2021
1 parent 610366b commit fdf3c53
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 98 deletions.
2 changes: 1 addition & 1 deletion dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (bm *BlockAnnounceMessage) Type() byte {

// string formats a BlockAnnounceMessage as a string
func (bm *BlockAnnounceMessage) String() string {
return fmt.Sprintf("BlockAnnounceMessage ParentHash=%s Number=%d StateRoot=%sx ExtrinsicsRoot=%s Digest=%v",
return fmt.Sprintf("BlockAnnounceMessage ParentHash=%s Number=%d StateRoot=%s ExtrinsicsRoot=%s Digest=%v",
bm.ParentHash,
bm.Number,
bm.StateRoot,
Expand Down
23 changes: 17 additions & 6 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"math/rand"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -29,6 +30,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

var (
maxRetries = 12
)

// ConnManager implements connmgr.ConnManager
type ConnManager struct {
sync.Mutex
Expand Down Expand Up @@ -191,10 +196,18 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) {
Addrs: addrs,
}

err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
}
go func() {
for i := 0; i < maxRetries; i++ {
err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
time.Sleep(time.Minute)
continue
}

return
}
}()

// TODO: if number of peers falls below the min desired peer count, we should try to connect to previously discovered peers
}
Expand All @@ -207,7 +220,6 @@ func (cm *ConnManager) registerDisconnectHandler(cb func(peer.ID)) {
func (cm *ConnManager) OpenedStream(n network.Network, s network.Stream) {
logger.Trace(
"Opened stream",
"host", s.Conn().LocalPeer(),
"peer", s.Conn().RemotePeer(),
"protocol", s.Protocol(),
)
Expand All @@ -221,7 +233,6 @@ func (cm *ConnManager) registerCloseHandler(protocolID protocol.ID, cb func(id p
func (cm *ConnManager) ClosedStream(n network.Network, s network.Stream) {
logger.Trace(
"Closed stream",
"host", s.Conn().LocalPeer(),
"peer", s.Conn().RemotePeer(),
"protocol", s.Protocol(),
)
Expand Down
2 changes: 1 addition & 1 deletion dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestPersistentPeers(t *testing.T) {
require.NotEqual(t, 0, len(conns))

// if A disconnects from B, B should reconnect
nodeA.host.h.Network().ClosePeer(nodeA.host.id())
nodeA.host.h.Network().ClosePeer(nodeB.host.id())
time.Sleep(time.Millisecond * 500)
conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
require.NotEqual(t, 0, len(conns))
Expand Down
4 changes: 4 additions & 0 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func (bm *BlockRequestMessage) Decode(in []byte) error {
case *pb.BlockRequest_Hash:
startingBlock, err = variadic.NewUint64OrHash(common.BytesToHash(from.Hash))
case *pb.BlockRequest_Number:
// TODO: we are receiving block requests w/ 4-byte From field; did the format change?
if len(from.Number) != 8 {
return errors.New("invalid BlockResponseMessage.From; uint64 is not 8 bytes")
}
startingBlock, err = variadic.NewUint64OrHash(binary.LittleEndian.Uint64(from.Number))
default:
err = errors.New("invalid StartingBlock")
Expand Down
25 changes: 13 additions & 12 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
err := handshakeValidator(peer, hs)
if err != nil {
logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err)
_ = stream.Conn().Close()
return errCannotValidateHandshake
}

Expand All @@ -141,17 +140,17 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
// once validated, send back a handshake
resp, err := info.getHandshake()
if err != nil {
logger.Debug("failed to get handshake", "protocol", info.protocolID, "error", err)
logger.Warn("failed to get handshake", "protocol", info.protocolID, "error", err)
return err
}

err = s.host.send(peer, info.protocolID, resp)
err = s.host.writeToStream(stream, resp)
if err != nil {
logger.Trace("failed to send handshake", "protocol", info.protocolID, "peer", peer, "error", err)
_ = stream.Conn().Close()
return err
}
logger.Trace("receiver: sent handshake", "protocol", info.protocolID, "peer", peer)
return nil
}

// if we are the initiator and haven't received the handshake already, validate it
Expand All @@ -161,7 +160,6 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
if err != nil {
logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err)
hsData.validated = false
_ = stream.Conn().Close()
return errCannotValidateHandshake
}

Expand All @@ -175,7 +173,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
// if we are the initiator, send the message
if hsData, has := info.getHandshakeData(peer); has && hsData.validated && hsData.received && hsData.outboundMsg != nil {
logger.Trace("sender: sending message", "protocol", info.protocolID)
err := s.host.send(peer, info.protocolID, hsData.outboundMsg)
err := s.host.writeToStream(stream, hsData.outboundMsg)
if err != nil {
logger.Debug("failed to send message", "protocol", info.protocolID, "peer", peer, "error", err)
return err
Expand All @@ -197,11 +195,14 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
}

// TODO: improve this by keeping track of who you've received/sent messages from
if !s.noGossip {
seen := s.gossip.hasSeen(msg)
if !seen {
s.broadcastExcluding(info, peer, msg)
}
if s.noGossip {
return nil
}

seen := s.gossip.hasSeen(msg)
if !seen {
// TODO: update this to write to stream w/ handshake established
s.broadcastExcluding(info, peer, msg)
}

return nil
Expand Down Expand Up @@ -261,7 +262,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
}

if err != nil {
logger.Error("failed to send message to peer", "peer", peer, "error", err)
logger.Debug("failed to send message to peer", "peer", peer, "error", err)
}
}
}
55 changes: 10 additions & 45 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func NewService(cfg *Config) (*Service, error) {
}

network.syncQueue = newSyncQueue(network)

network.noGossip = true // TODO: remove once duplicate message sending is merged
return network, err
}

Expand Down Expand Up @@ -281,40 +281,8 @@ func (s *Service) logPeerCount() {

func (s *Service) handleConn(conn libp2pnetwork.Conn) {
// give new peers a slight weight
// TODO: do this once handshake is received
s.syncQueue.updatePeerScore(conn.RemotePeer(), 1)

s.notificationsMu.Lock()
defer s.notificationsMu.Unlock()

info, has := s.notificationsProtocols[BlockAnnounceMsgType]
if !has {
// this shouldn't happen
logger.Warn("block announce protocol is not yet registered!")
return
}

// open block announce substream
hs, err := info.getHandshake()
if err != nil {
logger.Warn("failed to get handshake", "protocol", blockAnnounceID, "error", err)
return
}

info.mapMu.RLock()
defer info.mapMu.RUnlock()

peer := conn.RemotePeer()
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { //nolint
info.handshakeData.Store(peer, &handshakeData{
validated: false,
})

logger.Trace("sending handshake", "protocol", info.protocolID, "peer", peer, "message", hs)
err = s.host.send(peer, info.protocolID, hs)
if err != nil {
logger.Trace("failed to send block announce handshake to peer", "peer", peer, "error", err)
}
}
}

func (s *Service) beginDiscovery() error {
Expand Down Expand Up @@ -528,7 +496,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder
if err == io.EOF {
continue
} else if err != nil {
logger.Trace("failed to read from stream", "protocol", stream.Protocol(), "error", err)
logger.Trace("failed to read from stream", "peer", stream.Conn().RemotePeer(), "protocol", stream.Protocol(), "error", err)
_ = stream.Close()
return
}
Expand All @@ -541,21 +509,18 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder
}

logger.Trace(
"Received message from peer",
"received message from peer",
"host", s.host.id(),
"peer", peer,
"msg", msg.String(),
)

go func() {
// handle message based on peer status and message type
err = handler(stream, msg)
if err != nil {
logger.Trace("Failed to handle message from stream", "message", msg, "error", err)
_ = stream.Close()
return
}
}()
err = handler(stream, msg)
if err != nil {
logger.Debug("failed to handle message from stream", "message", msg, "error", err)
_ = stream.Close()
return
}
}
}

Expand Down
12 changes: 0 additions & 12 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,16 +438,4 @@ func TestHandleConn(t *testing.T) {
aScore, ok := nodeB.syncQueue.peerScore.Load(nodeA.host.id())
require.True(t, ok)
require.Equal(t, 1, aScore)

infoA := nodeA.notificationsProtocols[BlockAnnounceMsgType]
hsDataB, has := infoA.getHandshakeData(nodeB.host.id())
require.True(t, has)
require.True(t, hsDataB.received)
require.True(t, hsDataB.validated)

infoB := nodeB.notificationsProtocols[BlockAnnounceMsgType]
hsDataA, has := infoB.getHandshakeData(nodeA.host.id())
require.True(t, has)
require.True(t, hsDataA.received)
require.True(t, hsDataA.validated)
}
18 changes: 11 additions & 7 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package network

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/ChainSafe/gossamer/lib/common/optional"
"github.com/ChainSafe/gossamer/lib/common/variadic"

"github.com/ChainSafe/chaindb"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)
Expand Down Expand Up @@ -182,7 +184,7 @@ func (q *syncQueue) syncAtHead() {
for {
select {
// sleep for average block time TODO: make this configurable from slot duration
case <-time.After(q.slotDuration):
case <-time.After(q.slotDuration * 2):
case <-q.ctx.Done():
return
}
Expand Down Expand Up @@ -689,14 +691,14 @@ func (q *syncQueue) handleBlockJustification(data []*types.BlockData) {
}

func (q *syncQueue) handleBlockData(data []*types.BlockData) {
bestNum, err := q.s.blockState.BestBlockNumber()
finalized, err := q.s.blockState.GetFinalizedHeader(0, 0)
if err != nil {
panic(err) // TODO: don't panic but try again. seems blockState needs better concurrency handling
}

end := data[len(data)-1].Number().Int64()
if end <= bestNum.Int64() {
logger.Debug("ignoring block data that is below our head", "got", end, "head", bestNum.Int64())
if end <= finalized.Number.Int64() {
logger.Debug("ignoring block data that is below our head", "got", end, "head", finalized.Number.Int64())
q.pushRequest(uint64(end+1), blockRequestBufferSize, "")
return
}
Expand Down Expand Up @@ -736,7 +738,7 @@ func (q *syncQueue) handleBlockData(data []*types.BlockData) {
func (q *syncQueue) handleBlockDataFailure(idx int, err error, data []*types.BlockData) {
logger.Warn("failed to handle block data", "failed on block", q.currStart+int64(idx), "error", err)

if err.Error() == "failed to get parent hash: Key not found" { // TODO: unwrap err
if errors.Is(err, chaindb.ErrKeyNotFound) {
header, err := types.NewHeaderFromOptional(data[idx].Header)
if err != nil {
logger.Debug("failed to get header from BlockData", "idx", idx, "error", err)
Expand Down Expand Up @@ -787,7 +789,6 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID)
return
}

logger.Debug("received BlockAnnounce!", "number", msg.Number, "hash", header.Hash(), "from", from)
has, _ := q.s.blockState.HasBlockBody(header.Hash())
if has {
return
Expand All @@ -797,13 +798,16 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID)
return
}

q.goal = header.Number.Int64()

bestNum, err := q.s.blockState.BestBlockNumber()
if err != nil {
logger.Error("failed to get best block number", "error", err)
return
}

q.goal = header.Number.Int64()
// TODO: if we're at the head, this should request by hash instead of number, since there will
// certainly be blocks with the same number.
q.pushRequest(uint64(bestNum.Int64()+1), blockRequestBufferSize, from)
}

Expand Down
6 changes: 4 additions & 2 deletions dot/network/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ChainSafe/gossamer/lib/common/optional"
"github.com/ChainSafe/gossamer/lib/utils"

"github.com/ChainSafe/chaindb"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -425,9 +426,10 @@ func TestSyncQueue_SyncAtHead(t *testing.T) {
q.stop()
time.Sleep(time.Second)
q.ctx = context.Background()
q.slotDuration = time.Millisecond * 100

go q.syncAtHead()
time.Sleep(time.Millisecond * 6100)
time.Sleep(q.slotDuration * 3)
select {
case req := <-q.requestCh:
require.Equal(t, uint64(2), req.req.StartingBlock.Uint64())
Expand Down Expand Up @@ -500,7 +502,7 @@ func TestSyncQueue_handleBlockDataFailure_MissingParent(t *testing.T) {
q.ctx = context.Background()

data := testBlockResponseMessage().BlockData
q.handleBlockDataFailure(0, fmt.Errorf("failed to get parent hash: Key not found"), data)
q.handleBlockDataFailure(0, fmt.Errorf("some error: %w", chaindb.ErrKeyNotFound), data)
select {
case req := <-q.requestCh:
require.True(t, req.req.StartingBlock.IsHash())
Expand Down
2 changes: 1 addition & 1 deletion dot/network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) {
}

if length == 0 {
return 0, err // TODO: return bytes read from readLEB128ToUint64
return 0, nil // msg length of 0 is allowed, for example transactions handshake
}

// TODO: check if length > len(buf), if so probably log.Crit
Expand Down
Loading

0 comments on commit fdf3c53

Please sign in to comment.