Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(grandpa): implement neighbor tracker and message handling #4230

Merged
merged 23 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ type Service struct {
bestFinalCandidate map[uint64]*Vote // map of round number -> best final candidate

// channels for communication with other services
finalisedCh chan *types.FinalisationInfo
finalisedCh chan *types.FinalisationInfo
neighborMsgChan chan neighborData

telemetry Telemetry

neighborTracker *neighborTracker
}

// Config represents a GRANDPA service configuration
Expand Down Expand Up @@ -130,6 +133,8 @@ func NewService(cfg *Config) (*Service, error) {
cfg.Interval = defaultGrandpaInterval
}

neighborMsgChan := make(chan neighborData)

ctx, cancel := context.WithCancel(context.Background())
s := &Service{
ctx: ctx,
Expand All @@ -151,8 +156,11 @@ func NewService(cfg *Config) (*Service, error) {
finalisedCh: finalisedCh,
interval: cfg.Interval,
telemetry: cfg.Telemetry,
neighborMsgChan: neighborMsgChan,
}

s.neighborTracker = newNeighborTracker(s, neighborMsgChan)

if err := s.registerProtocol(); err != nil {
return nil, err
}
Expand All @@ -165,6 +173,8 @@ func NewService(cfg *Config) (*Service, error) {

// Start begins the GRANDPA finality service
func (s *Service) Start() error {
s.neighborTracker.Start()

// if we're not an authority, we don't need to worry about the voting process.
// the grandpa service is only used to verify incoming block justifications
if !s.authority {
Expand All @@ -191,6 +201,9 @@ func (s *Service) Stop() error {
s.cancel()
s.blockState.FreeFinalisedNotifierChannel(s.finalisedCh)

s.neighborTracker.Stop()
close(s.neighborTracker.neighborMsgChan)

if !s.authority {
return nil
}
Expand Down Expand Up @@ -1146,6 +1159,7 @@ func (s *Service) handleCommitMessage(commitMessage *CommitMessage) error {
commitMessage.Vote.Hash, uint(commitMessage.Vote.Number))
if err != nil {
if errors.Is(err, database.ErrNotFound) {
logger.Warnf("Not able to verify, adding commit to tracker")
s.tracker.addCommit(commitMessage)
}

Expand Down
18 changes: 9 additions & 9 deletions lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

var (
ErrNeighbourVersionNotSupported = errors.New("neighbour version not supported")
)

// MessageHandler handles GRANDPA consensus messages
type MessageHandler struct {
grandpa *Service
Expand Down Expand Up @@ -63,8 +59,8 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.

return nil, nil //nolint:nilnil
case *NeighbourPacketV1:
// we can afford to not retry handling neighbour message, if it errors.
return nil, h.handleNeighbourMessage(msg)
h.handleNeighbourMessage(msg, from)
return nil, nil //nolint:nilnil
case *CatchUpRequest:
return h.handleCatchUpRequest(msg)
case *CatchUpResponse:
Expand All @@ -82,9 +78,13 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
}
}

func (*MessageHandler) handleNeighbourMessage(_ *NeighbourPacketV1) error {
// TODO(#2931)
return nil
func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from peer.ID) {
logger.Debugf("handling neighbour message from peer %v with set id %v and round %v",
from.ShortString(), packet.SetID, packet.Round)
h.grandpa.neighborMsgChan <- neighborData{
peer: from,
neighborMsg: packet,
}
}

func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMessage, error) {
Expand Down
4 changes: 4 additions & 0 deletions lib/grandpa/message_handler_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ func TestMessageHandler_NeighbourMessage(t *testing.T) {

gs, st := newTestService(t, aliceKeyPair)

gs.neighborTracker.Start()

ctrl := gomock.NewController(t)
telemetryMock := NewMockTelemetry(ctrl)

Expand Down Expand Up @@ -250,6 +252,8 @@ func TestMessageHandler_NeighbourMessage(t *testing.T) {
out, err := h.handleMessage("", NeighbourPacketV1)
require.NoError(t, err)
require.Nil(t, out)

gs.neighborTracker.Stop()
}

func TestMessageHandler_VerifyJustification_InvalidSig(t *testing.T) {
Expand Down
150 changes: 150 additions & 0 deletions lib/grandpa/neighbor_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package grandpa

import (
"fmt"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/libp2p/go-libp2p/core/peer"
)

// How often neighbour messages should be rebroadcast in the case where no new packets are created
const neighbourBroadcastPeriod = time.Minute * 2

type neighborData struct {
peer peer.ID
neighborMsg *NeighbourPacketV1
}

type neighborState struct {
setID uint64
round uint64
highestFinalized uint32
}

type neighborTracker struct {
sync.Mutex
grandpa *Service

peerview map[peer.ID]neighborState
currentSetID uint64
currentRound uint64
highestFinalized uint32

finalizationCha chan *types.FinalisationInfo
neighborMsgChan chan neighborData
stoppedNeighbor chan struct{}
wg sync.WaitGroup
}

func newNeighborTracker(grandpa *Service, neighborChan chan neighborData) *neighborTracker {
return &neighborTracker{
grandpa: grandpa,
peerview: make(map[peer.ID]neighborState),
finalizationCha: grandpa.blockState.GetFinalisedNotifierChannel(),
neighborMsgChan: neighborChan,
stoppedNeighbor: make(chan struct{}),
}
}

func (nt *neighborTracker) Start() {
nt.wg.Add(1)
go nt.run()
}

func (nt *neighborTracker) Stop() {
nt.grandpa.blockState.FreeFinalisedNotifierChannel(nt.finalizationCha)
close(nt.stoppedNeighbor)
jimjbrettj marked this conversation as resolved.
Show resolved Hide resolved

nt.wg.Wait()
}

func (nt *neighborTracker) run() {
logger.Info("starting neighbour tracker")
ticker := time.NewTicker(neighbourBroadcastPeriod)
defer func() {
ticker.Stop()
nt.wg.Done()
}()

for {
select {
case <-ticker.C:
logger.Debugf("neighbour message broadcast triggered by ticker")
err := nt.BroadcastNeighborMsg()
if err != nil {
logger.Errorf("broadcasting neighbour message: %v", err)
}

case block := <-nt.finalizationCha:
if block != nil {
nt.updateState(block.SetID, block.Round, uint32(block.Header.Number)) //nolint
err := nt.BroadcastNeighborMsg()
if err != nil {
logger.Errorf("broadcasting neighbour message: %v", err)
}
ticker.Reset(neighbourBroadcastPeriod)
}
case neighborData := <-nt.neighborMsgChan:
if neighborData.neighborMsg.Number > nt.peerview[neighborData.peer].highestFinalized {
nt.updatePeer(
neighborData.peer,
neighborData.neighborMsg.SetID,
neighborData.neighborMsg.Round,
neighborData.neighborMsg.Number,
)
}
case <-nt.stoppedNeighbor:
logger.Info("stopping neighbour tracker")
return
}
}
}

func (nt *neighborTracker) updateState(setID uint64, round uint64, highestFinalized uint32) {
nt.Lock()
defer nt.Unlock()

nt.currentSetID = setID
nt.currentRound = round
nt.highestFinalized = highestFinalized
}

func (nt *neighborTracker) updatePeer(p peer.ID, setID uint64, round uint64, highestFinalized uint32) {
nt.Lock()
defer nt.Unlock()
peerState := neighborState{setID, round, highestFinalized}
nt.peerview[p] = peerState
}

func (nt *neighborTracker) getPeer(p peer.ID) neighborState {
nt.Lock()
defer nt.Unlock()
return nt.peerview[p]
}

func (nt *neighborTracker) BroadcastNeighborMsg() error {
packet := NeighbourPacketV1{
Round: nt.currentRound,
SetID: nt.currentSetID,
Number: nt.highestFinalized,
}

cm, err := packet.ToConsensusMessage()
if err != nil {
return fmt.Errorf("converting NeighbourPacketV1 to network message: %w", err)
}
for id, peerState := range nt.peerview {
if peerState.round >= nt.currentRound && peerState.setID >= nt.currentSetID {
haikoschol marked this conversation as resolved.
Show resolved Hide resolved
err = nt.grandpa.network.SendMessage(id, cm)
if err != nil {
return fmt.Errorf("sending message to peer: %v", id)
}
}
}
return nil
}
Loading
Loading