Skip to content

Commit

Permalink
(dot/parachain): handle view update message for receiver side of the…
Browse files Browse the repository at this point in the history
… network bridge (#4249)

Over the network through collation and validation protocol, as part of WireMessage we could get a PeerMessage or a ViewUpdate.

ViewUpdate tells us about latest blocks (heads) and finalized number with our peers.

We are supposed to be update our peer data with these views.

This commit, reads ViewUpdate from both collation and validation protocol and handles them inside network bridge by updating our peer data. Later, it sends that view to other subsystems so that each subsystem can update their view too.

Fixes #3864
  • Loading branch information
kishansagathiya authored Oct 17, 2024
1 parent 20b85ad commit 252dc13
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 30 deletions.
18 changes: 3 additions & 15 deletions dot/parachain/network-bridge/collation_protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,19 @@ import (
"fmt"

"github.com/ChainSafe/gossamer/dot/network"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
"github.com/ChainSafe/gossamer/pkg/scale"
"github.com/libp2p/go-libp2p/core/peer"
)

func decodeCollationMessage(in []byte) (network.NotificationsMessage, error) {
wireMessage := WireMessage{}
err := wireMessage.SetValue(collatorprotocolmessages.CollationProtocol{})
if err != nil {
return nil, fmt.Errorf("setting collation protocol message: %w", err)
}
err = scale.Unmarshal(in, &wireMessage)
err := scale.Unmarshal(in, &wireMessage)
if err != nil {
return nil, fmt.Errorf("decoding message: %w", err)
}

collationMessageV, err := wireMessage.Value()
if err != nil {
return nil, fmt.Errorf("getting collation protocol message value: %w", err)
}
collationMessage, ok := collationMessageV.(collatorprotocolmessages.CollationProtocol)
if !ok {
return nil, fmt.Errorf("casting to collation protocol message")
}
return &collationMessage, nil
wireMessage.SetType(network.CollationMsgType)
return wireMessage, nil
}

func getCollatorHandshake() (network.Handshake, error) {
Expand Down
102 changes: 87 additions & 15 deletions dot/parachain/network-bridge/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
events "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/events"
networkbridgemessages "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/messages"
validationprotocol "github.com/ChainSafe/gossamer/dot/parachain/validation-protocol"
"github.com/ChainSafe/gossamer/dot/peerset"

parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"

Expand All @@ -28,6 +29,8 @@ import (

var logger = log.NewFromGlobal(log.AddContext("pkg", "network-bridge"))

const newMaxHeads = 5

var (
ErrFinalizedNumber = errors.New("finalized number is greater than or equal to the block number")
ErrInvalidStringFormat = errors.New("invalid string format for fetched collation info")
Expand All @@ -53,6 +56,11 @@ type NetworkBridgeReceiver struct {
networkEventInfoChan chan *network.NetworkEventInfo

authorityDiscoveryService AuthorityDiscoveryService

peerData map[peer.ID]struct {
view View
protocolVersion uint32
}
}

type CollationStatus int
Expand Down Expand Up @@ -240,46 +248,110 @@ func (nbr *NetworkBridgeReceiver) handleCollationMessage(
network.CollationMsgType, msg.Type())
}

collatorProtocol, ok := msg.(*collatorprotocolmessages.CollationProtocol)
wireMessage, ok := msg.(*WireMessage)
if !ok {
return propagate, fmt.Errorf(
"failed to cast into collator protocol message, expected: *CollationProtocol, got: %T",
msg)
return propagate, fmt.Errorf("failed to cast into wire message, expected: *WireMessage, got: %T", msg)
}

nbr.SubsystemsToOverseer <- events.PeerMessage[collatorprotocolmessages.CollationProtocol]{
PeerID: sender,
Message: *collatorProtocol,
_, value, err := wireMessage.IndexValue()
if err != nil {
return propagate, fmt.Errorf("getting index value: %w", err)
}

switch v := value.(type) {
case *collatorprotocolmessages.CollationProtocol:
nbr.SubsystemsToOverseer <- events.PeerMessage[collatorprotocolmessages.CollationProtocol]{
PeerID: sender,
Message: *v,
}
case *ViewUpdate:
err = nbr.handleViewUpdate(sender, *v)
if err != nil {
return propagate, fmt.Errorf("handling view update: %w", err)
}
default:
return propagate, fmt.Errorf("unexpected message type: %T", value)
}

return propagate, nil
}

func (nbr *NetworkBridgeReceiver) handleValidationMessage(
sender peer.ID, msg network.NotificationsMessage) (bool, error) {
// we don't propagate collation messages, so it will always be false

propagate := false

if msg.Type() != network.ValidationMsgType {
return propagate, fmt.Errorf("%w, expected: %d, found:%d", ErrUnexpectedMessageOnValidationProtocol,
network.ValidationMsgType, msg.Type())
}

validationProtocol, ok := msg.(*validationprotocol.ValidationProtocol)
wireMessage, ok := msg.(*WireMessage)
if !ok {
return propagate, fmt.Errorf(
"failed to cast into collator protocol message, expected: *CollationProtocol, got: %T",
msg)
return propagate, fmt.Errorf("failed to cast into wire message, expected: *WireMessage, got: %T", msg)
}

nbr.SubsystemsToOverseer <- events.PeerMessage[validationprotocol.ValidationProtocol]{
PeerID: sender,
Message: *validationProtocol,
_, value, err := wireMessage.IndexValue()
if err != nil {
return propagate, fmt.Errorf("getting index value: %w", err)
}

switch v := value.(type) {
case *validationprotocol.ValidationProtocol:
nbr.SubsystemsToOverseer <- events.PeerMessage[validationprotocol.ValidationProtocol]{
PeerID: sender,
Message: *v,
}
case *ViewUpdate:
err = nbr.handleViewUpdate(sender, *v)
if err != nil {
return propagate, fmt.Errorf("handling view update: %w", err)
}
}

return propagate, nil
}

func (nbr *NetworkBridgeReceiver) handleViewUpdate(peer peer.ID, view ViewUpdate) error {

peerData, ok := nbr.peerData[peer]
if !ok {
return errors.New("peer not found")
}
if len(view.Heads) > newMaxHeads || view.FinalizedNumber < peerData.view.FinalizedNumber {
nbr.net.ReportPeer(peerset.ReputationChange{
Value: peerset.CostMajor,
Reason: "malformed view",
}, peer)
} else if len(view.Heads) == 0 {
nbr.net.ReportPeer(peerset.ReputationChange{
Value: peerset.CostMinor,
Reason: "peer sent us empty view",
}, peer)
} else if View(view).checkHeadsEqual(peerData.view) {
// nothing
} else {
peerData.view = View(view)
nbr.peerData[peer] = peerData

nbr.SubsystemsToOverseer <- events.Event[collatorprotocolmessages.CollationProtocol]{
Inner: events.PeerViewChange{
PeerID: peer,
View: events.View(view),
},
}

nbr.SubsystemsToOverseer <- events.Event[validationprotocol.ValidationProtocol]{
Inner: events.PeerViewChange{
PeerID: peer,
View: events.View(view),
},
}
}

return nil
}

func (nbr *NetworkBridgeReceiver) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error {
if nbr.finalizedNumber >= signal.BlockNumber {
return ErrFinalizedNumber
Expand Down
1 change: 1 addition & 0 deletions dot/parachain/network-bridge/wire_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (mvdt WireMessage) ValueAt(index uint) (value any, err error) {
}

func (w *WireMessage) SetType(messageType network.MessageType) {
// NOTE: We need a message type only to know where to send it
w.messageType = messageType
}

Expand Down

0 comments on commit 252dc13

Please sign in to comment.