Skip to content

Commit

Permalink
Fix for possible race for new block post consensus job.
Browse files Browse the repository at this point in the history
  • Loading branch information
Frozen committed May 24, 2024
1 parent c4d24fd commit aff96ee
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 211 deletions.
2 changes: 1 addition & 1 deletion cmd/harmony/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi

// Consensus object.
registry.SetIsBackup(isBackup(hc))
registry.SetNodeConfig(nodeConfig)
currentConsensus, err := consensus.New(
myHost, nodeConfig.ShardID, nodeConfig.ConsensusPriKey, registry, decider, minPeers, aggregateSig)

Expand Down Expand Up @@ -877,7 +878,6 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
Uint64("viewID", viewID).
Msg("Init Blockchain")

currentConsensus.PostConsensusJob = currentNode.PostConsensusProcessing
// update consensus information based on the blockchain
currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation())
currentConsensus.NextBlockDue = time.Now()
Expand Down
3 changes: 0 additions & 3 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ type Consensus struct {
readySignal chan Proposal
// Channel to send full commit signatures to finish new block proposal
commitSigChannel chan []byte
// The post-consensus job func passed from Node object
// Called when consensus on a new block is done
PostConsensusJob func(*types.Block) error
// verified block to state sync broadcast
VerifiedNewBlock chan *types.Block
// Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf
Expand Down
6 changes: 1 addition & 5 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,14 +620,10 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
return nil
}

// NumSignaturesIncludedInBlock returns the number of signatures included in the block
func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 {
func (consensus *Consensus) numSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0)
consensus.mutex.Lock()
members := consensus.decider.Participants()
pubKeys := consensus.getPublicKeys()
consensus.mutex.Unlock()

// TODO(audit): do not reconstruct the Mask
mask := bls.NewMask(members)
err := mask.SetMask(block.Header().LastCommitBitmap())
Expand Down
5 changes: 2 additions & 3 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (consensus *Consensus) finalCommit() {
Int("numTxns", len(block.Transactions())).
Int("numStakingTxns", len(block.StakingTransactions())).
Msg("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!")

consensus.postConsensusProcessing(block)
consensus.UpdateLeaderMetrics(float64(numCommits), float64(block.NumberU64()))

// If still the leader, send commit sig/bitmap to finish the new block proposal,
Expand All @@ -257,7 +257,7 @@ func (consensus *Consensus) finalCommit() {
if block.IsLastBlockInEpoch() {
// No pipelining
go func() {
consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal")
consensus.GetLogger().Info().Msg("[finalCommit] sending block proposal signal")
consensus.ReadySignal(NewProposal(SyncProposal))
}()
} else {
Expand Down Expand Up @@ -669,7 +669,6 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
}

consensus.FinishFinalityCount()
consensus.PostConsensusJob(blk)
consensus.setupForNewConsensus(blk, committedMsg)
utils.Logger().Info().Uint64("blockNum", blk.NumberU64()).
Str("hash", blk.Header().Hash().Hex()).
Expand Down
230 changes: 230 additions & 0 deletions consensus/post_processing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package consensus

import (
"math/rand"

proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/staking/availability"
"github.com/harmony-one/harmony/webhooks"
)

// PostConsensusProcessing is called by consensus participants, after consensus is done, to:
// 1. [leader] send new block to the client
// 2. [leader] send cross shard tx receipts to destination shard
func (consensus *Consensus) PostConsensusProcessing(newBlock *types.Block) error {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.postConsensusProcessing(newBlock)
}

func (consensus *Consensus) postConsensusProcessing(newBlock *types.Block) error {
if consensus.isLeader() {
if consensus.IsRunningBeaconChain() {
// TODO: consider removing this and letting other nodes broadcast new blocks.
// But need to make sure there is at least 1 node that will do the job.
consensus.broadcastNewBlock(newBlock, consensus.registry.GetNodeConfig().GetClientGroupID())
}
consensus.broadcastCXReceipts(newBlock)
} else {
if mode := consensus.mode(); mode != Listening {
numSignatures := consensus.numSignaturesIncludedInBlock(newBlock)
utils.Logger().Info().
Uint64("blockNum", newBlock.NumberU64()).
Uint64("epochNum", newBlock.Epoch().Uint64()).
Uint64("ViewId", newBlock.Header().ViewID().Uint64()).
Str("blockHash", newBlock.Hash().String()).
Int("numTxns", len(newBlock.Transactions())).
Int("numStakingTxns", len(newBlock.StakingTransactions())).
Uint32("numSignatures", numSignatures).
Str("mode", mode.String()).
Msg("BINGO !!! Reached Consensus")
if consensus.mode() == Syncing {
mode = consensus.updateConsensusInformation()
utils.Logger().Info().Msgf("Switching to mode %s", mode)
consensus.setMode(mode)
}

consensus.UpdateValidatorMetrics(float64(numSignatures), float64(newBlock.NumberU64()))

// 1% of the validator also need to do broadcasting
rnd := rand.Intn(100)
if rnd < 1 {
// Beacon validators also broadcast new blocks to make sure beacon sync is strong.
if consensus.IsRunningBeaconChain() {
consensus.broadcastNewBlock(newBlock, consensus.registry.GetNodeConfig().GetClientGroupID())
}
consensus.broadcastCXReceipts(newBlock)
}
}
}

// Broadcast client requested missing cross shard receipts if there is any
consensus.broadcastMissingCXReceipts()

if h := consensus.registry.GetNodeConfig().WebHooks.Hooks; h != nil {
if h.Availability != nil {
for _, addr := range node.GetAddresses(newBlock.Epoch()) {
wrapper, err := node.Beaconchain().ReadValidatorInformation(addr)
if err != nil {
utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator info")
return nil
}
snapshot, err := node.Beaconchain().ReadValidatorSnapshot(addr)
if err != nil {
utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator snapshot")
return nil
}
computed := availability.ComputeCurrentSigning(
snapshot.Validator, wrapper,
)
lastBlockOfEpoch := shard.Schedule.EpochLastBlock(node.Beaconchain().CurrentBlock().Header().Epoch().Uint64())

computed.BlocksLeftInEpoch = lastBlockOfEpoch - node.Beaconchain().CurrentBlock().Header().Number().Uint64()

if err != nil && computed.IsBelowThreshold {
url := h.Availability.OnDroppedBelowThreshold
go func() {
webhooks.DoPost(url, computed)
}()
}
}
}
}

return nil
}

func (consensus *Consensus) IsRunningBeaconChain() bool {
return consensus.ShardID == shard.BeaconChainShardID
}

// BroadcastNewBlock is called by consensus leader to sync new blocks with other clients/nodes.
// NOTE: For now, just send to the client (basically not broadcasting)
// TODO (lc): broadcast the new blocks to new nodes doing state sync
func (consensus *Consensus) broadcastNewBlock(newBlock *types.Block, groupID nodeconfig.GroupID) {
groups := []nodeconfig.GroupID{groupID}
utils.Logger().Info().
Msgf(
"broadcasting new block %d, group %s", newBlock.NumberU64(), groups[0],
)
msg := p2p.ConstructMessage(
proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}),
)
if err := consensus.host.SendMessageToGroups(groups, msg); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot broadcast new block")
}
}

// BroadcastCXReceipts broadcasts cross shard receipts to correspoding
// destination shards
func (consensus *Consensus) broadcastCXReceipts(newBlock *types.Block) {
commitSigAndBitmap := newBlock.GetCurrentCommitSig()
//#### Read payload data from committed msg
if len(commitSigAndBitmap) <= 96 {
utils.Logger().Debug().Int("commitSigAndBitmapLen", len(commitSigAndBitmap)).Msg("[BroadcastCXReceipts] commitSigAndBitmap Not Enough Length")
return
}
commitSig := make([]byte, 96)
commitBitmap := make([]byte, len(commitSigAndBitmap)-96)
offset := 0
copy(commitSig[:], commitSigAndBitmap[offset:offset+96])
offset += 96
copy(commitBitmap[:], commitSigAndBitmap[offset:])
//#### END Read payload data from committed msg

epoch := newBlock.Header().Epoch()
shardingConfig := shard.Schedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
myShardID := consensus.ShardID
utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]")

for i := 0; i < shardNum; i++ {
if i == int(myShardID) {
continue
}
consensus.broadcastCXReceiptsWithShardID(newBlock.Header(), commitSig, commitBitmap, uint32(i))
}
}

// BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID
func (consensus *Consensus) broadcastCXReceiptsWithShardID(block *block.Header, commitSig []byte, commitBitmap []byte, toShardID uint32) {
myShardID := consensus.ShardID
utils.Logger().Debug().
Uint32("toShardID", toShardID).
Uint32("myShardID", myShardID).
Uint64("blockNum", block.NumberU64()).
Msg("[BroadcastCXReceiptsWithShardID]")

cxReceipts, err := consensus.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash())
if err != nil || len(cxReceipts) == 0 {
utils.Logger().Debug().Uint32("ToShardID", toShardID).
Int("numCXReceipts", len(cxReceipts)).
Msg("[CXMerkleProof] No receipts found for the destination shard")
return
}

merkleProof, err := consensus.Blockchain().CXMerkleProof(toShardID, block)
if err != nil {
utils.Logger().Warn().
Uint32("ToShardID", toShardID).
Msg("[BroadcastCXReceiptsWithShardID] Unable to get merkleProof")
return
}

cxReceiptsProof := &types.CXReceiptsProof{
Receipts: cxReceipts,
MerkleProof: merkleProof,
Header: block,
CommitSig: commitSig,
CommitBitmap: commitBitmap,
}

groupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(toShardID))
utils.Logger().Info().Uint32("ToShardID", toShardID).
Str("GroupID", string(groupID)).
Interface("cxp", cxReceiptsProof).
Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof ready. Sending CX receipts...")
// TODO ek – limit concurrency
go consensus.GetHost().SendMessageToGroups([]nodeconfig.GroupID{groupID},
p2p.ConstructMessage(proto_node.ConstructCXReceiptsProof(cxReceiptsProof)),
)
}

// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request
func (consensus *Consensus) broadcastMissingCXReceipts() {
var (
sendNextTime = make([]core.CxEntry, 0)
cxPool = consensus.Registry().GetCxPool()
blockchain = consensus.Blockchain()
)
it := cxPool.Pool().Iterator()
for entry := range it.C {
cxEntry := entry.(core.CxEntry)
toShardID := cxEntry.ToShardID
blk := blockchain.GetBlockByHash(cxEntry.BlockHash)
if blk == nil {
continue
}
blockNum := blk.NumberU64()
nextHeader := blockchain.GetHeaderByNumber(blockNum + 1)
if nextHeader == nil {
sendNextTime = append(sendNextTime, cxEntry)
continue
}
sig := nextHeader.LastCommitSignature()
bitmap := nextHeader.LastCommitBitmap()
consensus.broadcastCXReceiptsWithShardID(blk.Header(), sig[:], bitmap, toShardID)
}
cxPool.Clear()
// this should not happen or maybe happen for impatient user
for _, entry := range sendNextTime {
cxPool.Add(entry)
}
}
19 changes: 19 additions & 0 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/webhooks"
)
Expand All @@ -20,6 +21,7 @@ type Registry struct {
isBackup bool
engine engine.Engine
collection *shardchain.CollectionImpl
nodeconfig *nodeconfig.ConfigType
}

// New creates a new registry.
Expand Down Expand Up @@ -160,3 +162,20 @@ func (r *Registry) GetShardChainCollection() *shardchain.CollectionImpl {

return r.collection
}

// SetNodeConfig sets the node config to registry.
func (r *Registry) SetNodeConfig(config *nodeconfig.ConfigType) *Registry {
r.mu.Lock()
defer r.mu.Unlock()

r.nodeconfig = config
return r
}

// GetNodeConfig gets the node config from registry.
func (r *Registry) GetNodeConfig() *nodeconfig.ConfigType {
r.mu.Lock()
defer r.mu.Unlock()

return r.nodeconfig
}
Loading

0 comments on commit aff96ee

Please sign in to comment.