Skip to content

Commit

Permalink
chore: propagate block announce
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed Sep 10, 2024
1 parent d339546 commit 195a9be
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
10 changes: 5 additions & 5 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,21 +272,21 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc

stream, err := s.sendHandshake(peer, hs, info)
if err != nil {
logger.Debugf("failed to send handshake to peer %s on protocol %s: %s", peer, info.protocolID, err)
logger.Infof("failed to send handshake to peer %s on protocol %s: %s", peer, info.protocolID, err)
return
}

_, isConsensusMsg := msg.(*ConsensusMessage)

if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) && !isConsensusMsg {
logger.Tracef("message has already been sent, ignoring: peer=%s msg=%s", peer, msg)
logger.Infof("message has already been sent, ignoring: peer=%s msg=%s", peer, msg)
return
}

// we've completed the handshake with the peer, send message directly
logger.Tracef("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg)
logger.Infof("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg)
if err := s.host.writeToStream(stream, msg); err != nil {
logger.Debugf("failed to send message to peer %s: %s", peer, err)
logger.Errorf("failed to send message to peer %s: %s", peer, err)

// the stream was closed or reset, close it on our end and delete it from our peer's data
if errors.Is(err, io.EOF) || errors.Is(err, network.ErrReset) {
Expand All @@ -300,7 +300,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
}
}

logger.Tracef("successfully sent message on protocol %s to peer %s: message=", info.protocolID, peer, msg)
logger.Infof("successfully sent message on protocol %s to peer %s: message= %v", info.protocolID, peer, msg)
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.GossipSuccessValue,
Reason: peerset.GossipSuccessReason,
Expand Down
25 changes: 25 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,31 @@ func (s *Service) GossipMessage(msg NotificationsMessage) {
logger.Errorf("message type %d not supported by any notifications protocol", msg.Type())
}

// GossipMessage gossips a notifications protocol message to our peers
func (s *Service) GossipMessageExcluding(msg NotificationsMessage, excluding peer.ID) {
if s.host == nil || msg == nil || s.IsStopped() {
return
}

logger.Infof("gossiping from host %s message of type %d: %s",
s.host.id(), msg.Type(), msg)

// check if the message is part of a notifications protocol
s.notificationsMu.Lock()
defer s.notificationsMu.Unlock()

for msgID, prtl := range s.notificationsProtocols {
if msg.Type() != msgID || prtl == nil {
continue
}

s.broadcastExcluding(prtl, excluding, msg)
return
}

logger.Errorf("message type %d not supported by any notifications protocol", msg.Type())
}

// SendMessage sends a message to the given peer
func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error {
s.notificationsMu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (f *FullSyncStrategy) OnBlockAnnounce(from peer.ID, msg *network.BlockAnnou
f.requestQueue.PushBack(request)
}

logger.Infof("block announced already exists #%d (%s)", blockAnnounceHeader.Number, blockAnnounceHeaderHash.Short())
logger.Infof("announced block already exists #%d (%s)", blockAnnounceHeader.Number, blockAnnounceHeaderHash.Short())
return true, &Change{
who: from,
rep: peerset.ReputationChange{
Expand Down
7 changes: 4 additions & 3 deletions dot/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Network interface {
BlockAnnounceHandshake(*types.Header) error
GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) *network.RequestResponseProtocol
GossipMessage(network.NotificationsMessage)
GossipMessageExcluding(network.NotificationsMessage, peer.ID)
}

type BlockState interface {
Expand Down Expand Up @@ -162,7 +162,7 @@ func (s *SyncService) Stop() error {
}

func (s *SyncService) HandleBlockAnnounceHandshake(from peer.ID, msg *network.BlockAnnounceHandshake) error {
logger.Infof("receiving a block announce handshake: %s", from.String())
logger.Infof("receiving a block announce handshake from %s", from.String())
if err := s.workerPool.fromBlockAnnounceHandshake(from); err != nil {
return err
}
Expand All @@ -184,7 +184,8 @@ func (s *SyncService) HandleBlockAnnounce(from peer.ID, msg *network.BlockAnnoun
}

if gossip {
s.network.GossipMessage(msg)
logger.Infof("propagating block announcement #%d excluding %s", msg.Number, from.String())
s.network.GossipMessageExcluding(msg, from)
}

return nil
Expand Down

0 comments on commit 195a9be

Please sign in to comment.