Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BFT] Node ejected before epoch recovery #6632

Open
wants to merge 14 commits into
base: feature/efm-recovery
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 31 additions & 35 deletions cmd/bootstrap/run/epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
)

// GenerateRecoverEpochTxArgs generates the required transaction arguments for the `recoverEpoch` transaction.
// No errors are expected during normal operation.
func GenerateRecoverEpochTxArgs(log zerolog.Logger,
internalNodePrivInfoDir string,
nodeConfigJson string,
Expand Down Expand Up @@ -56,7 +57,7 @@ func GenerateRecoverEpochTxArgs(log zerolog.Logger,
internalNodesMap := make(map[flow.Identifier]struct{})
for _, node := range internalNodes {
if !currentEpochIdentities.Exists(node.Identity()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was originally put here to communicate an unusual state to operators when running the efm-recover-args command. I agree it should not cause an error, but maybe we can replace this with a WARN log. Then when it is run as part of the CLI tool we will still surface this info to operators.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return nil, fmt.Errorf("node ID found in internal node infos missing from protocol snapshot identities %s: %w", node.NodeID, err)
log.Warn().Msgf("node ID found in internal node infos missing from protocol snapshot identities %s", node.NodeID)
}
internalNodesMap[node.NodeID] = struct{}{}
}
Expand All @@ -74,7 +75,7 @@ func GenerateRecoverEpochTxArgs(log zerolog.Logger,

assignments, clusters, err := common.ConstructClusterAssignment(log, partnerCollectors, internalCollectors, collectionClusters)
if err != nil {
log.Fatal().Err(err).Msg("unable to generate cluster assignment")
return nil, fmt.Errorf("unable to generate cluster assignment: %w", err)
}
log.Info().Msg("")

Expand All @@ -90,60 +91,55 @@ func GenerateRecoverEpochTxArgs(log zerolog.Logger,
if err != nil {
return nil, fmt.Errorf("failed to get epoch protocol state from snapshot: %w", err)
}
currentEpochDKG, err := epochProtocolState.DKG()
if err != nil {
return nil, fmt.Errorf("failed to get DKG for current epoch: %w", err)
}
currentEpochCommit := epochProtocolState.EpochCommit()

// NOTE: The RecoveryEpoch will re-use the last successful DKG output. This means that the consensus
// committee in the RecoveryEpoch must be identical to the committee which participated in that DKG.
dkgGroupKeyCdc, cdcErr := cadence.NewString(hex.EncodeToString(currentEpochDKG.GroupKey().Encode()))
// NOTE: The RecoveryEpoch will re-use the last successful DKG output. This means that the random beacon committee can be
// different from the consensus committee. This could happen if the node was ejected from the consensus committee, but it still has to be
// included in the DKG committee since the threshold signature scheme operates on pre-defined number of participants and cannot be changed.
dkgGroupKeyCdc, cdcErr := cadence.NewString(hex.EncodeToString(currentEpochCommit.DKGGroupKey.Encode()))
if cdcErr != nil {
log.Fatal().Err(cdcErr).Msg("failed to get dkg group key cadence string")
return nil, fmt.Errorf("failed to get dkg group key cadence string: %w", cdcErr)
}

// copy DKG index map from the current epoch
dkgIndexMapPairs := make([]cadence.KeyValuePair, 0)
for nodeID, index := range currentEpochCommit.DKGIndexMap {
dkgIndexMapPairs = append(dkgIndexMapPairs, cadence.KeyValuePair{
Key: cadence.String(nodeID.String()),
Value: cadence.NewInt(index),
})
}
// copy DKG public keys from the current epoch
dkgPubKeys := make([]cadence.Value, 0)
for _, dkgPubKey := range currentEpochCommit.DKGParticipantKeys {
dkgPubKeyCdc, cdcErr := cadence.NewString(hex.EncodeToString(dkgPubKey.Encode()))
if cdcErr != nil {
return nil, fmt.Errorf("failed to get dkg pub key cadence string for node: %w", cdcErr)
}
dkgPubKeys = append(dkgPubKeys, dkgPubKeyCdc)
}
// fill node IDs
nodeIds := make([]cadence.Value, 0)
for _, id := range currentEpochIdentities {
if id.GetRole() == flow.RoleConsensus {
dkgPubKey, keyShareErr := currentEpochDKG.KeyShare(id.GetNodeID())
if keyShareErr != nil {
log.Fatal().Err(keyShareErr).Msg(fmt.Sprintf("failed to get dkg pub key share for node: %s", id.GetNodeID()))
}
dkgPubKeyCdc, cdcErr := cadence.NewString(hex.EncodeToString(dkgPubKey.Encode()))
if cdcErr != nil {
log.Fatal().Err(cdcErr).Msg(fmt.Sprintf("failed to get dkg pub key cadence string for node: %s", id.GetNodeID()))
}
dkgPubKeys = append(dkgPubKeys, dkgPubKeyCdc)
}
nodeIdCdc, err := cadence.NewString(id.GetNodeID().String())
if err != nil {
log.Fatal().Err(err).Msg(fmt.Sprintf("failed to convert node ID to cadence string: %s", id.GetNodeID()))
return nil, fmt.Errorf("failed to convert node ID to cadence string %s: %w", id.GetNodeID(), err)
}
nodeIds = append(nodeIds, nodeIdCdc)
}

dkgIndexMapPairs := make([]cadence.KeyValuePair, 0)
for nodeID, index := range epochProtocolState.EpochCommit().DKGIndexMap {
dkgIndexMapPairs = append(dkgIndexMapPairs, cadence.KeyValuePair{
Key: cadence.String(nodeID.String()),
Value: cadence.NewInt(index),
})
}

clusterQCAddress := systemcontracts.SystemContractsForChain(rootChainID).ClusterQC.Address.String()
qcVoteData, err := common.ConvertClusterQcsCdc(clusterQCs, clusters, clusterQCAddress)
if err != nil {
log.Fatal().Err(err).Msg("failed to convert cluster qcs to cadence type")
return nil, fmt.Errorf("failed to convert cluster qcs to cadence type")
}

currEpochFinalView, err := epoch.FinalView()
if err != nil {
log.Fatal().Err(err).Msg("failed to get final view of current epoch")
return nil, fmt.Errorf("failed to get final view of current epoch")
}

currEpochTargetEndTime, err := epoch.TargetEndTime()
if err != nil {
log.Fatal().Err(err).Msg("failed to get target end time of current epoch")
return nil, fmt.Errorf("failed to get target end time of current epoch")
}

args := []cadence.Value{
Expand Down
10 changes: 9 additions & 1 deletion consensus/hotstuff/committees/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package committees

import (
"fmt"

"github.com/onflow/crypto"

"github.com/onflow/flow-go/consensus/hotstuff"
Expand Down Expand Up @@ -148,6 +147,15 @@ func (s staticDKG) KeyShare(nodeID flow.Identifier) (crypto.PublicKey, error) {
return participant.KeyShare, nil
}

// KeyShares returns all public key shares that are result of the distributed key generation.
func (s staticDKG) KeyShares() []crypto.PublicKey {
participants := make([]crypto.PublicKey, len(s.dkgParticipants))
for _, participant := range s.dkgParticipants {
participants[participant.Index] = participant.KeyShare
}
return participants
}

// NodeID returns the node identifier for the given index.
// An exception is returned if the index is ≥ Size().
// Intended for use outside the hotpath, with runtime
Expand Down
20 changes: 20 additions & 0 deletions consensus/hotstuff/mocks/dkg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 1 addition & 9 deletions consensus/hotstuff/votecollector/combined_vote_processor_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,13 @@ func (f *combinedVoteProcessorFactoryBaseV2) Create(log zerolog.Logger, block *m
return nil, fmt.Errorf("could not create aggregator for staking signatures at block %v: %w", block.BlockID, err)
}

publicKeyShares := make([]crypto.PublicKey, 0, len(allParticipants))
dkg, err := f.committee.DKG(block.View)
if err != nil {
return nil, fmt.Errorf("could not get DKG info at block %v: %w", block.BlockID, err)
}
for _, participant := range allParticipants {
pk, err := dkg.KeyShare(participant.NodeID)
if err != nil {
return nil, fmt.Errorf("could not get random beacon key share for %x at block %v: %w", participant.NodeID, block.BlockID, err)
}
publicKeyShares = append(publicKeyShares, pk)
}

threshold := msig.RandomBeaconThreshold(int(dkg.Size()))
randomBeaconInspector, err := signature.NewRandomBeaconInspector(dkg.GroupKey(), publicKeyShares, threshold, msg)
randomBeaconInspector, err := signature.NewRandomBeaconInspector(dkg.GroupKey(), dkg.KeyShares(), threshold, msg)
if err != nil {
return nil, fmt.Errorf("could not create random beacon inspector at block %v: %w", block.BlockID, err)
}
Expand Down
30 changes: 12 additions & 18 deletions consensus/hotstuff/votecollector/combined_vote_processor_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,39 +47,33 @@ func (f *combinedVoteProcessorFactoryBaseV3) Create(log zerolog.Logger, block *m
// message that has to be verified against aggregated signature
msg := verification.MakeVoteMessage(block.View, block.BlockID)

dkg, err := f.committee.DKG(block.View)
if err != nil {
return nil, fmt.Errorf("could not get DKG info at block %v: %w", block.BlockID, err)
}

// prepare the staking public keys of participants
stakingKeys := make([]crypto.PublicKey, 0, len(allParticipants))
stakingBeaconKeys := make([]crypto.PublicKey, 0, len(allParticipants))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stakingBeaconKeys := make([]crypto.PublicKey, 0, len(allParticipants))
beaconKeys := make([]crypto.PublicKey, 0, len(allParticipants))

In other parts of the code, we generally use the term "beacon key". Since what we are mainly differentiating it from is the "staking key", I think omitting "staking" from the name makes sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a special case, it's basically beacon keys that take part in staking. There is a separate beaconKeys down the line that is used only for random beacon protocol.

for _, participant := range allParticipants {
stakingKeys = append(stakingKeys, participant.StakingPubKey)
if pk, err := dkg.KeyShare(participant.NodeID); err == nil {
stakingBeaconKeys = append(stakingBeaconKeys, pk)
}
}

stakingSigAggtor, err := signature.NewWeightedSignatureAggregator(allParticipants, stakingKeys, msg, msig.ConsensusVoteTag)
if err != nil {
return nil, fmt.Errorf("could not create aggregator for staking signatures: %w", err)
}

dkg, err := f.committee.DKG(block.View)
if err != nil {
return nil, fmt.Errorf("could not get DKG info at block %v: %w", block.BlockID, err)
}

// prepare the random beacon public keys of participants
beaconKeys := make([]crypto.PublicKey, 0, len(allParticipants))
for _, participant := range allParticipants {
pk, err := dkg.KeyShare(participant.NodeID)
if err != nil {
return nil, fmt.Errorf("could not get random beacon key share for %x: %w", participant.NodeID, err)
}
beaconKeys = append(beaconKeys, pk)
}

rbSigAggtor, err := signature.NewWeightedSignatureAggregator(allParticipants, beaconKeys, msg, msig.RandomBeaconTag)
rbSigAggtor, err := signature.NewWeightedSignatureAggregator(allParticipants, stakingBeaconKeys, msg, msig.RandomBeaconTag)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
rbSigAggtor, err := signature.NewWeightedSignatureAggregator(allParticipants, stakingBeaconKeys, msg, msig.RandomBeaconTag)
beaconAggregator, err := signature.NewWeightedSignatureAggregator(allParticipants, stakingBeaconKeys, msg, msig.RandomBeaconTag)

Nit: Reuse the "beacon" terminology from above, expand Aggtor so it's pronounceable. (Feel free to ignore if you disagree)

if err != nil {
return nil, fmt.Errorf("could not create aggregator for thershold signatures: %w", err)
return nil, fmt.Errorf("could not create aggregator for threshold signatures: %w", err)
}

threshold := msig.RandomBeaconThreshold(int(dkg.Size()))
randomBeaconInspector, err := signature.NewRandomBeaconInspector(dkg.GroupKey(), beaconKeys, threshold, msg)
randomBeaconInspector, err := signature.NewRandomBeaconInspector(dkg.GroupKey(), dkg.KeyShares(), threshold, msg)
if err != nil {
return nil, fmt.Errorf("could not create random beacon inspector: %w", err)
}
Expand Down
11 changes: 9 additions & 2 deletions integration/tests/epochs/base_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@ type BaseSuite struct {
RequiredSealApprovals uint // defaults to 0 (no approvals required)
// Consensus Node proposal duration
ConsensusProposalDuration time.Duration
// NumOfConsensusNodes is the number of consensus nodes in the network
NumOfConsensusNodes uint
}

// SetupTest is run automatically by the testing framework before each test case.
func (s *BaseSuite) SetupTest() {
if s.ConsensusProposalDuration == 0 {
s.ConsensusProposalDuration = time.Millisecond * 250
}
if s.NumOfConsensusNodes == 0 {
s.NumOfConsensusNodes = 2
}

minEpochLength := s.StakingAuctionLen + s.DKGPhaseLen*3 + 20
// ensure epoch lengths are set correctly
Expand Down Expand Up @@ -95,14 +100,16 @@ func (s *BaseSuite) SetupTest() {
testnet.NewNodeConfig(flow.RoleAccess, accessConfig...),
testnet.NewNodeConfig(flow.RoleAccess, testnet.WithLogLevel(zerolog.WarnLevel)),
testnet.NewNodeConfig(flow.RoleCollection, collectionConfigs...),
testnet.NewNodeConfig(flow.RoleConsensus, consensusConfigs...),
testnet.NewNodeConfig(flow.RoleConsensus, consensusConfigs...),
testnet.NewNodeConfig(flow.RoleExecution, testnet.WithLogLevel(zerolog.WarnLevel), testnet.WithAdditionalFlag("--extensive-logging=true")),
testnet.NewNodeConfig(flow.RoleExecution, testnet.WithLogLevel(zerolog.WarnLevel)),
testnet.NewNodeConfig(flow.RoleVerification, testnet.WithLogLevel(zerolog.WarnLevel)),
ghostNode,
}

for i := uint(0); i < s.NumOfConsensusNodes; i++ {
confs = append(confs, testnet.NewNodeConfig(flow.RoleConsensus, consensusConfigs...))
}

netConf := testnet.NewNetworkConfigWithEpochConfig("epochs-tests", confs, s.StakingAuctionLen, s.DKGPhaseLen, s.EpochLen, s.FinalizationSafetyThreshold)

// initialize the network
Expand Down
Loading