diff --git a/README.md b/README.md index c64403ea..64a0380a 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,8 @@ The Fuji and Mainnet [public API nodes](https://docs.avax.network/tooling/rpc-pr ### Peer-to-Peer Connections -- The AWM relayer implementation gathers BLS signatures from the validators of the source Subnet via peer-to-peer `AppRequest` messages. Validator nodes need to be configured to accept incoming peer connections. Otherwise, the relayer will fail to gather Warp message signatures. For example, networking rules may need to be adjusted to allow traffic on the default AvalancheGo P2P port (9651), or the public IP may need to be manually set in the [node configuration](https://docs.avax.network/nodes/configure/avalanchego-config-flags#public-ip). +- By default, the AWM relayer implementation gathers BLS signatures from the validators of the source Subnet via peer-to-peer `AppRequest` messages. Validator nodes need to be configured to accept incoming peer connections. Otherwise, the relayer will fail to gather Warp message signatures. For example, networking rules may need to be adjusted to allow traffic on the default AvalancheGo P2P port (9651), or the public IP may need to be manually set in the [node configuration](https://docs.avax.network/nodes/configure/avalanchego-config-flags#public-ip). +- If configured to use the Warp API (see `warp-api-endpoint` in [Configuration](#configuration)) then aggregate signatures are fetched via a single RPC request, rather than `AppRequests` to individual validators. Note that the Warp API is disabled on the public API. ### Private Key Management @@ -252,6 +253,10 @@ The relayer is configured via a JSON file, the path to which is passed in via th - List of addresses on this source blockchain to relay Warp messages from. The sending address is defined by the message protocol. For example, it could be defined as the EOA that initiates the transaction, or the address that calls the message protocol contract. If empty, then all addresses are allowed. + `"warp-api-endpoint": APIConfig` + + - The RPC endpoint configuration for the Warp API, which is used to fetch Warp aggregate signatures. If omitted, then signatures are fetched via AppRequest instead. + `"destination-blockchains": []DestinationBlockchains` - The list of destination blockchains to support. Each `DestinationBlockchain` has the following configuration: diff --git a/config/destination_blockchain.go b/config/destination_blockchain.go index 709c87aa..80cd9ed0 100644 --- a/config/destination_blockchain.go +++ b/config/destination_blockchain.go @@ -91,7 +91,7 @@ func (s *DestinationBlockchain) initializeWarpQuorum() error { return fmt.Errorf("invalid subnetID in configuration. error: %w", err) } - client, err := utils.DialWithConfig(context.Background(), s.RPCEndpoint.BaseURL, s.RPCEndpoint.HTTPHeaders, s.RPCEndpoint.QueryParams) + client, err := utils.NewEthClientWithConfig(context.Background(), s.RPCEndpoint.BaseURL, s.RPCEndpoint.HTTPHeaders, s.RPCEndpoint.QueryParams) if err != nil { return fmt.Errorf("failed to dial destination blockchain %s: %w", blockchainID, err) } diff --git a/config/source_blockchain.go b/config/source_blockchain.go index 4d069dd1..d3e6f035 100644 --- a/config/source_blockchain.go +++ b/config/source_blockchain.go @@ -24,11 +24,13 @@ type SourceBlockchain struct { SupportedDestinations []*SupportedDestination `mapstructure:"supported-destinations" json:"supported-destinations"` ProcessHistoricalBlocksFromHeight uint64 `mapstructure:"process-historical-blocks-from-height" json:"process-historical-blocks-from-height"` AllowedOriginSenderAddresses []string `mapstructure:"allowed-origin-sender-addresses" json:"allowed-origin-sender-addresses"` + WarpAPIEndpoint APIConfig `mapstructure:"warp-api-endpoint" json:"warp-api-endpoint"` // convenience fields to access parsed data after initialization subnetID ids.ID blockchainID ids.ID allowedOriginSenderAddresses []common.Address + useAppRequestNetwork bool } // Validates the source subnet configuration, including verifying that the supported destinations are present in destinationBlockchainIDs @@ -47,6 +49,14 @@ func (s *SourceBlockchain) Validate(destinationBlockchainIDs *set.Set[string]) e if err := s.WSEndpoint.Validate(); err != nil { return fmt.Errorf("invalid ws-endpoint in source subnet configuration: %w", err) } + // The Warp API endpoint is optional. If omitted, signatures are fetched from validators via app request. + if s.WarpAPIEndpoint.BaseURL != "" { + if err := s.WarpAPIEndpoint.Validate(); err != nil { + return fmt.Errorf("invalid warp-api-endpoint in source subnet configuration: %w", err) + } + } else { + s.useAppRequestNetwork = true + } // Validate the VM specific settings switch ParseVM(s.VM) { @@ -140,6 +150,10 @@ func (s *SourceBlockchain) GetAllowedOriginSenderAddresses() []common.Address { return s.allowedOriginSenderAddresses } +func (s *SourceBlockchain) UseAppRequestNetwork() bool { + return s.useAppRequestNetwork +} + // Specifies a supported destination blockchain and addresses for a source blockchain. type SupportedDestination struct { BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"` diff --git a/main/main.go b/main/main.go index 6ec3340f..e33c5d8e 100644 --- a/main/main.go +++ b/main/main.go @@ -122,13 +122,14 @@ func main() { // The app request network generates P2P networking logs that are verbose at the info level. // Unless the log level is debug or lower, set the network log level to error to avoid spamming the logs. + // We do not collect metrics for the network. networkLogLevel := logging.Error if logLevel <= logging.Debug { networkLogLevel = logLevel } network, err := peers.NewNetwork( networkLogLevel, - registerer, + prometheus.DefaultRegisterer, &cfg, ) if err != nil { @@ -181,7 +182,14 @@ func main() { } // Initialize message creator passed down to relayers for creating app requests. - messageCreator, err := message.NewCreator(logger, registerer, "message_creator", constants.DefaultNetworkCompressionType, constants.DefaultNetworkMaximumInboundTimeout) + // We do not collect metrics for the message creator. + messageCreator, err := message.NewCreator( + logger, + prometheus.DefaultRegisterer, + "message_creator", + constants.DefaultNetworkCompressionType, + constants.DefaultNetworkMaximumInboundTimeout, + ) if err != nil { logger.Error( "Failed to create message creator", @@ -243,7 +251,7 @@ func main() { // errgroup will cancel the context when the first goroutine returns an error errGroup.Go(func() error { // Dial the eth client - ethClient, err := utils.DialWithConfig( + ethClient, err := utils.NewEthClientWithConfig( context.Background(), sourceBlockchain.RPCEndpoint.BaseURL, sourceBlockchain.RPCEndpoint.HTTPHeaders, diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 7097a80b..c1f59600 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -30,7 +30,8 @@ import ( "github.com/ava-labs/awm-relayer/vms" coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message" msg "github.com/ava-labs/subnet-evm/plugin/evm/message" - warpBackend "github.com/ava-labs/subnet-evm/warp" + "github.com/ava-labs/subnet-evm/rpc" + "github.com/ethereum/go-ethereum/common/hexutil" "golang.org/x/sync/errgroup" "go.uber.org/zap" @@ -58,18 +59,19 @@ var ( // to a specific destination address on a specific destination blockchain. This routing information is // encapsulated in [relayerID], which also represents the database key for an ApplicationRelayer. type ApplicationRelayer struct { - logger logging.Logger - metrics *ApplicationRelayerMetrics - network *peers.AppRequestNetwork - messageCreator message.Creator - sourceBlockchain config.SourceBlockchain - signingSubnetID ids.ID - destinationClient vms.DestinationClient - relayerID database.RelayerID - warpQuorum config.WarpQuorum - checkpointManager *checkpoint.CheckpointManager - currentRequestID uint32 - lock *sync.RWMutex + logger logging.Logger + metrics *ApplicationRelayerMetrics + network *peers.AppRequestNetwork + messageCreator message.Creator + sourceBlockchain config.SourceBlockchain + signingSubnetID ids.ID + destinationClient vms.DestinationClient + relayerID database.RelayerID + warpQuorum config.WarpQuorum + checkpointManager *checkpoint.CheckpointManager + currentRequestID uint32 + lock *sync.RWMutex + sourceWarpSignatureClient *rpc.Client // nil if configured to fetch signatures via AppRequest for the source blockchain } func NewApplicationRelayer( @@ -108,19 +110,39 @@ func NewApplicationRelayer( checkpointManager := checkpoint.NewCheckpointManager(logger, db, sub, relayerID, startingHeight) checkpointManager.Run() + var warpClient *rpc.Client + if !sourceBlockchain.UseAppRequestNetwork() { + // The subnet-evm Warp API client does not support query parameters or HTTP headers, and expects the URI to be in a specific form. + // Instead, we invoke the Warp API directly via the RPC client. + warpClient, err = utils.DialWithConfig( + context.Background(), + sourceBlockchain.WarpAPIEndpoint.BaseURL, + sourceBlockchain.WarpAPIEndpoint.HTTPHeaders, + sourceBlockchain.WarpAPIEndpoint.QueryParams, + ) + if err != nil { + logger.Error( + "Failed to create Warp API client", + zap.Error(err), + ) + return nil, err + } + } + ar := ApplicationRelayer{ - logger: logger, - metrics: metrics, - network: network, - messageCreator: messageCreator, - sourceBlockchain: sourceBlockchain, - destinationClient: destinationClient, - relayerID: relayerID, - signingSubnetID: signingSubnet, - warpQuorum: quorum, - checkpointManager: checkpointManager, - currentRequestID: rand.Uint32(), // TODONOW: pass via ctor - lock: &sync.RWMutex{}, + logger: logger, + metrics: metrics, + network: network, + messageCreator: messageCreator, + sourceBlockchain: sourceBlockchain, + destinationClient: destinationClient, + relayerID: relayerID, + signingSubnetID: signingSubnet, + warpQuorum: quorum, + checkpointManager: checkpointManager, + currentRequestID: rand.Uint32(), // TODONOW: pass via ctor + lock: &sync.RWMutex{}, + sourceWarpSignatureClient: warpClient, } return &ar, nil @@ -170,7 +192,6 @@ func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) err err := r.relayMessage( reqID, handler, - true, ) return err @@ -183,7 +204,6 @@ func (r *ApplicationRelayer) RelayerID() database.RelayerID { func (r *ApplicationRelayer) relayMessage( requestID uint32, handler messages.MessageHandler, - useAppRequestNetwork bool, ) error { r.logger.Debug( "Relaying message", @@ -209,7 +229,10 @@ func (r *ApplicationRelayer) relayMessage( startCreateSignedMessageTime := time.Now() // Query nodes on the origin chain for signatures, and construct the signed warp message. var signedMessage *avalancheWarp.Message - if useAppRequestNetwork { + + // sourceWarpSignatureClient is nil iff the source blockchain is configured to fetch signatures via AppRequest + if r.sourceWarpSignatureClient == nil { + r.incFetchSignatureAppRequestCount() signedMessage, err = r.createSignedMessageAppRequest(unsignedMessage, requestID) if err != nil { r.logger.Error( @@ -220,6 +243,7 @@ func (r *ApplicationRelayer) relayMessage( return err } } else { + r.incFetchSignatureRPCCount() signedMessage, err = r.createSignedMessage(unsignedMessage) if err != nil { r.logger.Error( @@ -257,18 +281,11 @@ func (r *ApplicationRelayer) relayMessage( // will need to be accounted for here. func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp.UnsignedMessage) (*avalancheWarp.Message, error) { r.logger.Info("Fetching aggregate signature from the source chain validators via API") - // TODO: To properly support this, we should provide a dedicated Warp API endpoint in the config - uri := utils.StripFromString(r.sourceBlockchain.RPCEndpoint.BaseURL, "/ext") - warpClient, err := warpBackend.NewClient(uri, r.sourceBlockchain.GetBlockchainID().String()) - if err != nil { - r.logger.Error( - "Failed to create Warp API client", - zap.Error(err), - ) - return nil, err - } - var signedWarpMessageBytes []byte + var ( + signedWarpMessageBytes hexutil.Bytes + err error + ) for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ { r.logger.Debug( "Relayer collecting signatures from peers.", @@ -277,8 +294,11 @@ func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp. zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()), zap.String("signingSubnetID", r.signingSubnetID.String()), ) - signedWarpMessageBytes, err = warpClient.GetMessageAggregateSignature( + + err = r.sourceWarpSignatureClient.CallContext( context.Background(), + &signedWarpMessageBytes, + "warp_getMessageAggregateSignature", unsignedMessage.ID(), r.warpQuorum.QuorumNumerator, r.signingSubnetID.String(), @@ -732,3 +752,19 @@ func (r *ApplicationRelayer) setCreateSignedMessageLatencyMS(latency float64) { r.sourceBlockchain.GetBlockchainID().String(), r.sourceBlockchain.GetSubnetID().String()).Set(latency) } + +func (r *ApplicationRelayer) incFetchSignatureRPCCount() { + r.metrics.fetchSignatureRPCCount. + WithLabelValues( + r.relayerID.DestinationBlockchainID.String(), + r.sourceBlockchain.GetBlockchainID().String(), + r.sourceBlockchain.GetSubnetID().String()).Inc() +} + +func (r *ApplicationRelayer) incFetchSignatureAppRequestCount() { + r.metrics.fetchSignatureAppRequestCount. + WithLabelValues( + r.relayerID.DestinationBlockchainID.String(), + r.sourceBlockchain.GetBlockchainID().String(), + r.sourceBlockchain.GetSubnetID().String()).Inc() +} diff --git a/relayer/application_relayer_metrics.go b/relayer/application_relayer_metrics.go index f36d8f2e..e4e74e3e 100644 --- a/relayer/application_relayer_metrics.go +++ b/relayer/application_relayer_metrics.go @@ -14,9 +14,11 @@ var ( ) type ApplicationRelayerMetrics struct { - successfulRelayMessageCount *prometheus.CounterVec - createSignedMessageLatencyMS *prometheus.GaugeVec - failedRelayMessageCount *prometheus.CounterVec + successfulRelayMessageCount *prometheus.CounterVec + createSignedMessageLatencyMS *prometheus.GaugeVec + failedRelayMessageCount *prometheus.CounterVec + fetchSignatureAppRequestCount *prometheus.CounterVec + fetchSignatureRPCCount *prometheus.CounterVec } func NewApplicationRelayerMetrics(registerer prometheus.Registerer) (*ApplicationRelayerMetrics, error) { @@ -53,9 +55,35 @@ func NewApplicationRelayerMetrics(registerer prometheus.Registerer) (*Applicatio } registerer.MustRegister(failedRelayMessageCount) + fetchSignatureAppRequestCount := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "fetch_signature_app_request_count", + Help: "Number of aggregate signatures constructed via AppRequest", + }, + []string{"destination_chain_id", "source_chain_id", "source_subnet_id"}, + ) + if fetchSignatureAppRequestCount == nil { + return nil, ErrFailedToCreateApplicationRelayerMetrics + } + registerer.MustRegister(fetchSignatureAppRequestCount) + + fetchSignatureRPCCount := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "fetch_signature_rpc_count", + Help: "Number of aggregate signatures fetched via Warp API", + }, + []string{"destination_chain_id", "source_chain_id", "source_subnet_id"}, + ) + if fetchSignatureRPCCount == nil { + return nil, ErrFailedToCreateApplicationRelayerMetrics + } + registerer.MustRegister(fetchSignatureRPCCount) + return &ApplicationRelayerMetrics{ - successfulRelayMessageCount: successfulRelayMessageCount, - createSignedMessageLatencyMS: createSignedMessageLatencyMS, - failedRelayMessageCount: failedRelayMessageCount, + successfulRelayMessageCount: successfulRelayMessageCount, + createSignedMessageLatencyMS: createSignedMessageLatencyMS, + failedRelayMessageCount: failedRelayMessageCount, + fetchSignatureAppRequestCount: fetchSignatureAppRequestCount, + fetchSignatureRPCCount: fetchSignatureRPCCount, }, nil } diff --git a/relayer/listener.go b/relayer/listener.go index 6bc72bfa..345e79b9 100644 --- a/relayer/listener.go +++ b/relayer/listener.go @@ -66,7 +66,7 @@ func NewListener( ) return nil, err } - ethWSClient, err := utils.DialWithConfig( + ethWSClient, err := utils.NewEthClientWithConfig( context.Background(), sourceBlockchain.WSEndpoint.BaseURL, sourceBlockchain.WSEndpoint.HTTPHeaders, diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 1e1b8df5..a587c874 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -80,4 +80,7 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() { ginkgo.It("Batch Message", func() { BatchRelay(localNetworkInstance) }) + ginkgo.It("Warp API", func() { + WarpAPIRelay(localNetworkInstance) + }) }) diff --git a/tests/utils/utils.go b/tests/utils/utils.go index 54b4bc6d..3d8fd8d1 100644 --- a/tests/utils/utils.go +++ b/tests/utils/utils.go @@ -201,6 +201,7 @@ func CreateDefaultRelayerConfig( StorageLocation: StorageLocation, DBWriteIntervalSeconds: DBUpdateSeconds, ProcessMissedBlocks: false, + MetricsPort: 9090, SourceBlockchains: sources, DestinationBlockchains: destinations, } diff --git a/tests/warp_api.go b/tests/warp_api.go new file mode 100644 index 00000000..4a793c7d --- /dev/null +++ b/tests/warp_api.go @@ -0,0 +1,134 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package tests + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + testUtils "github.com/ava-labs/awm-relayer/tests/utils" + "github.com/ava-labs/teleporter/tests/interfaces" + "github.com/ava-labs/teleporter/tests/utils" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + . "github.com/onsi/gomega" +) + +// Fully formed name of the metric that tracks the number aggregate signatures fetched from the Warp API +const rpcSignatureMetricName = "app_fetch_signature_rpc_count" + +// This tests the basic functionality of the relayer using the Warp API/, rather than app requests. Includes: +// - Relaying from Subnet A to Subnet B +// - Relaying from Subnet B to Subnet A +// - Verifying the messages were signed using the Warp API +func WarpAPIRelay(network interfaces.LocalNetwork) { + subnetAInfo := network.GetPrimaryNetworkInfo() + subnetBInfo, _ := utils.GetTwoSubnets(network) + fundedAddress, fundedKey := network.GetFundedAccountInfo() + teleporterContractAddress := network.GetTeleporterContractAddress() + err := testUtils.ClearRelayerStorage() + Expect(err).Should(BeNil()) + + // + // Fund the relayer address on all subnets + // + ctx := context.Background() + + log.Info("Funding relayer address on all subnets") + relayerKey, err := crypto.GenerateKey() + Expect(err).Should(BeNil()) + testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, fundedKey, relayerKey) + + // + // Set up relayer config + // + relayerConfig := testUtils.CreateDefaultRelayerConfig( + []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, + []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, + teleporterContractAddress, + fundedAddress, + relayerKey, + ) + // Enable the Warp API for all source blockchains + for _, subnet := range relayerConfig.SourceBlockchains { + subnet.WarpAPIEndpoint = subnet.RPCEndpoint + } + + relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname) + + // + // Test Relaying from Subnet A to Subnet B + // + log.Info("Test Relaying from Subnet A to Subnet B") + + log.Info("Starting the relayer") + relayerCleanup := testUtils.BuildAndRunRelayerExecutable(ctx, relayerConfigPath) + defer relayerCleanup() + + // Sleep for some time to make sure relayer has started up and subscribed. + log.Info("Waiting for the relayer to start up") + time.Sleep(15 * time.Second) + + log.Info("Sending transaction from Subnet A to Subnet B") + testUtils.RelayBasicMessage( + ctx, + subnetAInfo, + subnetBInfo, + teleporterContractAddress, + fundedKey, + fundedAddress, + ) + + // + // Test Relaying from Subnet B to Subnet A + // + log.Info("Test Relaying from Subnet B to Subnet A") + testUtils.RelayBasicMessage( + ctx, + subnetBInfo, + subnetAInfo, + teleporterContractAddress, + fundedKey, + fundedAddress, + ) + + // + // Verify the messages were signed using the Warp API + // + log.Info("Verifying the messages were signed using the Warp API") + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", relayerConfig.MetricsPort)) + Expect(err).Should(BeNil()) + + body, err := io.ReadAll(resp.Body) + Expect(err).Should(BeNil()) + defer resp.Body.Close() + + var totalCount uint64 + scanner := bufio.NewScanner(strings.NewReader(string(body))) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, rpcSignatureMetricName) { + log.Info("Found metric line", "metric", line) + parts := strings.Fields(line) + + // Fetch the metric count from the last field of the line + value, err := strconv.ParseUint(parts[len(parts)-1], 10, 64) + if err != nil { + continue + } + totalCount += value + } + } + Expect(totalCount).Should(Equal(uint64(2))) + + log.Info("Finished sending warp message, closing down output channel") + // Cancel the command and stop the relayer + relayerCleanup() +} diff --git a/utils/client_utils.go b/utils/client_utils.go index 05d4562a..051a5705 100644 --- a/utils/client_utils.go +++ b/utils/client_utils.go @@ -15,8 +15,17 @@ import ( var ErrInvalidEndpoint = errors.New("invalid rpc endpoint") -// DialWithContext returns an ethclient.Client with the internal RPC client configured with the provided options. -func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) { +// NewEthClientWithConfig returns an ethclient.Client with the internal RPC client configured with the provided options. +func NewEthClientWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) { + client, err := DialWithConfig(ctx, baseURL, httpHeaders, queryParams) + if err != nil { + return nil, err + } + return ethclient.NewClient(client), nil +} + +// DialWithConfig dials the provided baseURL with the provided httpHeaders and queryParams +func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (*rpc.Client, error) { url, err := addQueryParams(baseURL, queryParams) if err != nil { return nil, err @@ -25,7 +34,7 @@ func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParam if err != nil { return nil, err } - return ethclient.NewClient(client), nil + return client, nil } // addQueryParams adds the query parameters to the url diff --git a/vms/evm/destination_client.go b/vms/evm/destination_client.go index 6be100dc..aea6ab90 100644 --- a/vms/evm/destination_client.go +++ b/vms/evm/destination_client.go @@ -52,7 +52,7 @@ func NewDestinationClient( destinationBlockchain *config.DestinationBlockchain, ) (*destinationClient, error) { // Dial the destination RPC endpoint - client, err := utils.DialWithConfig( + client, err := utils.NewEthClientWithConfig( context.Background(), destinationBlockchain.RPCEndpoint.BaseURL, destinationBlockchain.RPCEndpoint.HTTPHeaders,