Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validators at height #506

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions peers/app_request_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (

type AppRequestNetwork interface {
ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[ids.NodeID]
ConnectToCanonicalValidators(subnetID ids.ID) (
ConnectToCanonicalValidators(height uint64, subnetID ids.ID) (
*ConnectedCanonicalValidators,
error,
)
Expand Down Expand Up @@ -222,14 +222,31 @@ func (c *ConnectedCanonicalValidators) GetValidator(nodeID ids.NodeID) (*warp.Va

// ConnectToCanonicalValidators connects to the canonical validators of the given subnet and returns the connected
// validator information
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
func (n *appRequestNetwork) ConnectToCanonicalValidators(subnetID ids.ID) (*ConnectedCanonicalValidators, error) {
// Get the subnet's current canonical validator set
func (n *appRequestNetwork) ConnectToCanonicalValidators(
height uint64,
subnetID ids.ID,
) (*ConnectedCanonicalValidators, error) {
var err error

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to set a new variable here instead of usual err := assignment, will only pass line 231 if err = nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm overriding the passed in height if it's 0 so I do need the err defined prior to that so that line 232 is pure assignment to already declared vars.

I could declare a new variable to replace height instead but I think this is cleaner

Also hi 😄


if height == 0 {
height, err = n.validatorClient.GetCurrentHeight(context.Background())
if err != nil {
n.logger.Error(
"Failed to get P-Chain height",
zap.Error(err),
)
return nil, err
}
}

startPChainAPICall := time.Now()
validatorSet, totalValidatorWeight, err := n.validatorClient.GetCurrentCanonicalValidatorSet(subnetID)
n.setPChainAPICallLatencyMS(float64(time.Since(startPChainAPICall).Milliseconds()))
// Get the subnet's current canonical validator set at the supplied height
validatorSet, totalValidatorWeight, err := n.validatorClient.GetCanonicalValidatorSet(height, subnetID)
if err != nil {
return nil, err
}
n.setPChainAPICallLatencyMS(float64(time.Since(startPChainAPICall).Milliseconds()))

// We make queries to node IDs, not unique validators as represented by a BLS pubkey, so we need this map to track
// responses from nodes and populate the signatureMap with the corresponding validator signature
// This maps node IDs to the index in the canonical validator set
Expand Down
8 changes: 4 additions & 4 deletions peers/mocks/mock_app_request_network.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 7 additions & 76 deletions peers/validators/canonical_validator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,10 @@ func NewCanonicalValidatorClient(logger logging.Logger, apiConfig *config.APICon
}
}

func (v *CanonicalValidatorClient) GetCurrentCanonicalValidatorSet(
func (v *CanonicalValidatorClient) GetCanonicalValidatorSet(
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
height uint64,
subnetID ids.ID,
) ([]*avalancheWarp.Validator, uint64, error) {
height, err := v.GetCurrentHeight(context.Background())
if err != nil {
v.logger.Error(
"Failed to get P-Chain height",
zap.Error(err),
)
return nil, 0, err
}

// Get the current canonical validator set of the source subnet.
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
canonicalSubnetValidators, totalValidatorWeight, err := avalancheWarp.GetCanonicalValidatorSet(
context.Background(),
Expand Down Expand Up @@ -75,13 +67,15 @@ func (v *CanonicalValidatorClient) GetCurrentHeight(ctx context.Context) (uint64
return v.client.GetHeight(ctx, v.options...)
}

func (v *CanonicalValidatorClient) GetBlockByHeight(ctx context.Context, height uint64) ([]byte, error) {
return v.client.GetBlockByHeight(ctx, height, v.options...)
}

func (v *CanonicalValidatorClient) GetSubnetID(ctx context.Context, blockchainID ids.ID) (ids.ID, error) {
return v.client.ValidatedBy(ctx, blockchainID, v.options...)
}

// Gets the validator set of the given subnet at the given P-chain block height.
// Attempts to use the "getValidatorsAt" API first. If not available, falls back
// to use "getCurrentValidators", ignoring the specified P-chain block height.
func (v *CanonicalValidatorClient) GetValidatorSet(
ctx context.Context,
height uint64,
Expand All @@ -91,75 +85,12 @@ func (v *CanonicalValidatorClient) GetValidatorSet(
// all API nodes, in which case we can fall back to using "getCurrentValidators" if needed.
res, err := v.client.GetValidatorsAt(ctx, subnetID, height, v.options...)
if err != nil {
v.logger.Debug(
v.logger.Warn(
"P-chain RPC to getValidatorAt returned error. Falling back to getCurrentValidators",
zap.String("subnetID", subnetID.String()),
zap.Uint64("pChainHeight", height),
zap.Error(err))
return v.getCurrentValidatorSet(ctx, subnetID)
}
return res, nil
}

// Gets the current validator set of the given subnet ID, including the validators' BLS public
// keys. The implementation currently makes two RPC requests, one to get the subnet validators,
// and another to get their BLS public keys. This is necessary in order to enable the use of
// the public APIs (which don't support "GetValidatorsAt") because BLS keys are currently only
// associated with primary network validation periods. If ACP-13 is implemented in the future
// (https://github.com/avalanche-foundation/ACPs/blob/main/ACPs/13-subnet-only-validators.md), it
// may become possible to reduce this to a single RPC request that returns both the subnet validators
// as well as their BLS public keys.
func (v *CanonicalValidatorClient) getCurrentValidatorSet(
ctx context.Context,
subnetID ids.ID,
) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
// Get the current subnet validators. These validators are not expected to include
// BLS signing information given that addPermissionlessValidatorTx is only used to
// add primary network validators.
subnetVdrs, err := v.client.GetCurrentValidators(ctx, subnetID, nil, v.options...)
if err != nil {
return nil, err
}

// Look up the primary network validators of the NodeIDs validating the subnet
// in order to get their BLS keys.
res := make(map[ids.NodeID]*validators.GetValidatorOutput, len(subnetVdrs))
subnetNodeIDs := make([]ids.NodeID, 0, len(subnetVdrs))
for _, subnetVdr := range subnetVdrs {
subnetNodeIDs = append(subnetNodeIDs, subnetVdr.NodeID)
res[subnetVdr.NodeID] = &validators.GetValidatorOutput{
NodeID: subnetVdr.NodeID,
Weight: subnetVdr.Weight,
}
}
primaryVdrs, err := v.client.GetCurrentValidators(ctx, ids.Empty, subnetNodeIDs, v.options...)
if err != nil {
return nil, err
}

// Set the BLS keys of the result.
for _, primaryVdr := range primaryVdrs {
// We expect all of the primary network validators to already be in `res` because
// we filtered the request to node IDs that were identified as validators of the
// specific subnet ID.
vdr, ok := res[primaryVdr.NodeID]
if !ok {
v.logger.Warn(
"Unexpected primary network validator returned by getCurrentValidators request",
zap.String("subnetID", subnetID.String()),
zap.String("nodeID", primaryVdr.NodeID.String()))
continue
}

// Validators that do not have a BLS public key registered on the P-chain are still
// included in the result because they affect the stake weight of the subnet validators.
// Such validators will not be queried for BLS signatures of warp messages. As long as
// sufficient stake percentage of subnet validators have registered BLS public keys,
// messages can still be successfully relayed.
if primaryVdr.Signer != nil {
vdr.PublicKey = primaryVdr.Signer.Key()
}
}

return res, nil
}
1 change: 1 addition & 0 deletions relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) (co
nil,
r.signingSubnetID,
r.warpQuorum.QuorumNumerator,
0,
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
)
r.incFetchSignatureAppRequestCount()
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions relayer/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"runtime"
"strings"
"time"

"github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/ids"
Expand Down Expand Up @@ -219,6 +220,17 @@ func main() {
panic(err)
}

proposerHeightCache, err := aggregator.NewProposerHeightCache(
logger,
cfg.GetPChainAPI(),
time.Second*2,
)
if err != nil {
logger.Fatal("Failed to create proposer height cache", zap.Error(err))
panic(err)
}
proposerHeightCache.Start(context.Background())

signatureAggregator, err := aggregator.NewSignatureAggregator(
network,
logger,
Expand All @@ -227,6 +239,7 @@ func main() {
prometheus.DefaultRegisterer,
),
messageCreator,
proposerHeightCache,
cfg.EtnaTime,
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions relayer/network_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func connectToNonPrimaryNetworkPeers(
sourceBlockchain *config.SourceBlockchain,
) error {
subnetID := sourceBlockchain.GetSubnetID()
connectedValidators, err := network.ConnectToCanonicalValidators(subnetID)
connectedValidators, err := network.ConnectToCanonicalValidators(0, subnetID)
if err != nil {
logger.Error(
"Failed to connect to canonical validators",
Expand Down Expand Up @@ -94,7 +94,7 @@ func connectToPrimaryNetworkPeers(
for _, destination := range sourceBlockchain.SupportedDestinations {
blockchainID := destination.GetBlockchainID()
subnetID := cfg.GetSubnetID(blockchainID)
connectedValidators, err := network.ConnectToCanonicalValidators(subnetID)
connectedValidators, err := network.ConnectToCanonicalValidators(0, subnetID)
if err != nil {
logger.Error(
"Failed to connect to canonical validators",
Expand Down
10 changes: 9 additions & 1 deletion signature-aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type SignatureAggregator struct {
subnetsMapLock sync.RWMutex
metrics *metrics.SignatureAggregatorMetrics
cache *cache.Cache
proposerHeightCache *ProposerHeightCache
etnaTime time.Time
}

Expand All @@ -73,6 +74,7 @@ func NewSignatureAggregator(
signatureCacheSize uint64,
metrics *metrics.SignatureAggregatorMetrics,
messageCreator message.Creator,
proposerHeightCache *ProposerHeightCache,
etnaTime time.Time,
) (*SignatureAggregator, error) {
cache, err := cache.NewCache(signatureCacheSize, logger)
Expand Down Expand Up @@ -101,6 +103,7 @@ func (s *SignatureAggregator) CreateSignedMessage(
justification []byte,
inputSigningSubnet ids.ID,
quorumPercentage uint64,
pChainHeight uint64,
) (*avalancheWarp.Message, error) {
var signingSubnet ids.ID
var err error
Expand All @@ -118,7 +121,12 @@ func (s *SignatureAggregator) CreateSignedMessage(
signingSubnet = inputSigningSubnet
}

connectedValidators, err := s.network.ConnectToCanonicalValidators(signingSubnet)
// only fetch proposer height if it's not provided and the proposerHeightCache is configured
if pChainHeight == 0 && s.proposerHeightCache != nil {
pChainHeight = s.proposerHeightCache.GetOptimalHeight()
}

connectedValidators, err := s.network.ConnectToCanonicalValidators(pChainHeight, signingSubnet)
if err != nil {
msg := "Failed to connect to canonical validators"
s.logger.Error(
Expand Down
16 changes: 9 additions & 7 deletions signature-aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func instantiateAggregator(t *testing.T) (
1024,
sigAggMetrics,
messageCreator,
nil,
// Setting the etnaTime to a minute ago so that the post-etna code path is used in the test
time.Now().Add(-1*time.Minute),
)
Expand Down Expand Up @@ -134,15 +135,15 @@ func TestCreateSignedMessageFailsWithNoValidators(t *testing.T) {
msg, err := warp.NewUnsignedMessage(0, ids.Empty, []byte{})
require.NoError(t, err)
mockNetwork.EXPECT().GetSubnetID(ids.Empty).Return(ids.Empty, nil)
mockNetwork.EXPECT().ConnectToCanonicalValidators(ids.Empty).Return(
mockNetwork.EXPECT().ConnectToCanonicalValidators(uint64(0), ids.Empty).Return(
&peers.ConnectedCanonicalValidators{
ConnectedWeight: 0,
TotalValidatorWeight: 0,
ValidatorSet: []*warp.Validator{},
},
nil,
)
_, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80)
_, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80, 0)
require.ErrorContains(t, err, "no signatures")
}

Expand All @@ -151,15 +152,15 @@ func TestCreateSignedMessageFailsWithoutSufficientConnectedStake(t *testing.T) {
msg, err := warp.NewUnsignedMessage(0, ids.Empty, []byte{})
require.NoError(t, err)
mockNetwork.EXPECT().GetSubnetID(ids.Empty).Return(ids.Empty, nil)
mockNetwork.EXPECT().ConnectToCanonicalValidators(ids.Empty).Return(
mockNetwork.EXPECT().ConnectToCanonicalValidators(uint64(0), ids.Empty).Return(
&peers.ConnectedCanonicalValidators{
ConnectedWeight: 0,
TotalValidatorWeight: 1,
ValidatorSet: []*warp.Validator{},
},
nil,
)
_, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80)
_, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80, 0)
require.ErrorContains(
t,
err,
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestCreateSignedMessageRetriesAndFailsWithoutP2PResponses(t *testing.T) {
nil,
)

mockNetwork.EXPECT().ConnectToCanonicalValidators(subnetID).Return(
mockNetwork.EXPECT().ConnectToCanonicalValidators(uint64(0), subnetID).Return(
connectedValidators,
nil,
)
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestCreateSignedMessageRetriesAndFailsWithoutP2PResponses(t *testing.T) {
subnets.NoOpAllower,
).Times(maxRelayerQueryAttempts)

_, err = aggregator.CreateSignedMessage(msg, nil, subnetID, 80)
_, err = aggregator.CreateSignedMessage(msg, nil, subnetID, 80, 0)
require.ErrorContains(
t,
err,
Expand Down Expand Up @@ -272,7 +273,7 @@ func TestCreateSignedMessageSucceeds(t *testing.T) {
nil,
)

mockNetwork.EXPECT().ConnectToCanonicalValidators(subnetID).Return(
mockNetwork.EXPECT().ConnectToCanonicalValidators(uint64(0), subnetID).Return(
connectedValidators,
nil,
)
Expand Down Expand Up @@ -328,6 +329,7 @@ func TestCreateSignedMessageSucceeds(t *testing.T) {
nil,
subnetID,
quorumPercentage,
0,
)
require.NoError(t, err)

Expand Down
Loading