Skip to content

Commit

Permalink
Merge pull request #345 from ava-labs/warp-api-support
Browse files Browse the repository at this point in the history
Warp API support
  • Loading branch information
cam-schultz authored Jun 26, 2024
2 parents 0479088 + b83aa49 commit 5cbcdb0
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 56 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ The Fuji and Mainnet [public API nodes](https://docs.avax.network/tooling/rpc-pr

### Peer-to-Peer Connections

- The AWM relayer implementation gathers BLS signatures from the validators of the source Subnet via peer-to-peer `AppRequest` messages. Validator nodes need to be configured to accept incoming peer connections. Otherwise, the relayer will fail to gather Warp message signatures. For example, networking rules may need to be adjusted to allow traffic on the default AvalancheGo P2P port (9651), or the public IP may need to be manually set in the [node configuration](https://docs.avax.network/nodes/configure/avalanchego-config-flags#public-ip).
- By default, the AWM relayer implementation gathers BLS signatures from the validators of the source Subnet via peer-to-peer `AppRequest` messages. Validator nodes need to be configured to accept incoming peer connections. Otherwise, the relayer will fail to gather Warp message signatures. For example, networking rules may need to be adjusted to allow traffic on the default AvalancheGo P2P port (9651), or the public IP may need to be manually set in the [node configuration](https://docs.avax.network/nodes/configure/avalanchego-config-flags#public-ip).
- If configured to use the Warp API (see `warp-api-endpoint` in [Configuration](#configuration)) then aggregate signatures are fetched via a single RPC request, rather than `AppRequests` to individual validators. Note that the Warp API is disabled on the public API.

### Private Key Management

Expand Down Expand Up @@ -252,6 +253,10 @@ The relayer is configured via a JSON file, the path to which is passed in via th

- List of addresses on this source blockchain to relay Warp messages from. The sending address is defined by the message protocol. For example, it could be defined as the EOA that initiates the transaction, or the address that calls the message protocol contract. If empty, then all addresses are allowed.

`"warp-api-endpoint": APIConfig`

- The RPC endpoint configuration for the Warp API, which is used to fetch Warp aggregate signatures. If omitted, then signatures are fetched via AppRequest instead.

`"destination-blockchains": []DestinationBlockchains`

- The list of destination blockchains to support. Each `DestinationBlockchain` has the following configuration:
Expand Down
2 changes: 1 addition & 1 deletion config/destination_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *DestinationBlockchain) initializeWarpQuorum() error {
return fmt.Errorf("invalid subnetID in configuration. error: %w", err)
}

client, err := utils.DialWithConfig(context.Background(), s.RPCEndpoint.BaseURL, s.RPCEndpoint.HTTPHeaders, s.RPCEndpoint.QueryParams)
client, err := utils.NewEthClientWithConfig(context.Background(), s.RPCEndpoint.BaseURL, s.RPCEndpoint.HTTPHeaders, s.RPCEndpoint.QueryParams)
if err != nil {
return fmt.Errorf("failed to dial destination blockchain %s: %w", blockchainID, err)
}
Expand Down
14 changes: 14 additions & 0 deletions config/source_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ type SourceBlockchain struct {
SupportedDestinations []*SupportedDestination `mapstructure:"supported-destinations" json:"supported-destinations"`
ProcessHistoricalBlocksFromHeight uint64 `mapstructure:"process-historical-blocks-from-height" json:"process-historical-blocks-from-height"`
AllowedOriginSenderAddresses []string `mapstructure:"allowed-origin-sender-addresses" json:"allowed-origin-sender-addresses"`
WarpAPIEndpoint APIConfig `mapstructure:"warp-api-endpoint" json:"warp-api-endpoint"`

// convenience fields to access parsed data after initialization
subnetID ids.ID
blockchainID ids.ID
allowedOriginSenderAddresses []common.Address
useAppRequestNetwork bool
}

// Validates the source subnet configuration, including verifying that the supported destinations are present in destinationBlockchainIDs
Expand All @@ -47,6 +49,14 @@ func (s *SourceBlockchain) Validate(destinationBlockchainIDs *set.Set[string]) e
if err := s.WSEndpoint.Validate(); err != nil {
return fmt.Errorf("invalid ws-endpoint in source subnet configuration: %w", err)
}
// The Warp API endpoint is optional. If omitted, signatures are fetched from validators via app request.
if s.WarpAPIEndpoint.BaseURL != "" {
if err := s.WarpAPIEndpoint.Validate(); err != nil {
return fmt.Errorf("invalid warp-api-endpoint in source subnet configuration: %w", err)
}
} else {
s.useAppRequestNetwork = true
}

// Validate the VM specific settings
switch ParseVM(s.VM) {
Expand Down Expand Up @@ -140,6 +150,10 @@ func (s *SourceBlockchain) GetAllowedOriginSenderAddresses() []common.Address {
return s.allowedOriginSenderAddresses
}

func (s *SourceBlockchain) UseAppRequestNetwork() bool {
return s.useAppRequestNetwork
}

// Specifies a supported destination blockchain and addresses for a source blockchain.
type SupportedDestination struct {
BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"`
Expand Down
14 changes: 11 additions & 3 deletions main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ func main() {

// The app request network generates P2P networking logs that are verbose at the info level.
// Unless the log level is debug or lower, set the network log level to error to avoid spamming the logs.
// We do not collect metrics for the network.
networkLogLevel := logging.Error
if logLevel <= logging.Debug {
networkLogLevel = logLevel
}
network, err := peers.NewNetwork(
networkLogLevel,
registerer,
prometheus.DefaultRegisterer,
&cfg,
)
if err != nil {
Expand Down Expand Up @@ -181,7 +182,14 @@ func main() {
}

// Initialize message creator passed down to relayers for creating app requests.
messageCreator, err := message.NewCreator(logger, registerer, "message_creator", constants.DefaultNetworkCompressionType, constants.DefaultNetworkMaximumInboundTimeout)
// We do not collect metrics for the message creator.
messageCreator, err := message.NewCreator(
logger,
prometheus.DefaultRegisterer,
"message_creator",
constants.DefaultNetworkCompressionType,
constants.DefaultNetworkMaximumInboundTimeout,
)
if err != nil {
logger.Error(
"Failed to create message creator",
Expand Down Expand Up @@ -243,7 +251,7 @@ func main() {
// errgroup will cancel the context when the first goroutine returns an error
errGroup.Go(func() error {
// Dial the eth client
ethClient, err := utils.DialWithConfig(
ethClient, err := utils.NewEthClientWithConfig(
context.Background(),
sourceBlockchain.RPCEndpoint.BaseURL,
sourceBlockchain.RPCEndpoint.HTTPHeaders,
Expand Down
116 changes: 76 additions & 40 deletions relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (
"github.com/ava-labs/awm-relayer/vms"
coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message"
msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
warpBackend "github.com/ava-labs/subnet-evm/warp"
"github.com/ava-labs/subnet-evm/rpc"
"github.com/ethereum/go-ethereum/common/hexutil"
"golang.org/x/sync/errgroup"

"go.uber.org/zap"
Expand Down Expand Up @@ -58,18 +59,19 @@ var (
// to a specific destination address on a specific destination blockchain. This routing information is
// encapsulated in [relayerID], which also represents the database key for an ApplicationRelayer.
type ApplicationRelayer struct {
logger logging.Logger
metrics *ApplicationRelayerMetrics
network *peers.AppRequestNetwork
messageCreator message.Creator
sourceBlockchain config.SourceBlockchain
signingSubnetID ids.ID
destinationClient vms.DestinationClient
relayerID database.RelayerID
warpQuorum config.WarpQuorum
checkpointManager *checkpoint.CheckpointManager
currentRequestID uint32
lock *sync.RWMutex
logger logging.Logger
metrics *ApplicationRelayerMetrics
network *peers.AppRequestNetwork
messageCreator message.Creator
sourceBlockchain config.SourceBlockchain
signingSubnetID ids.ID
destinationClient vms.DestinationClient
relayerID database.RelayerID
warpQuorum config.WarpQuorum
checkpointManager *checkpoint.CheckpointManager
currentRequestID uint32
lock *sync.RWMutex
sourceWarpSignatureClient *rpc.Client // nil if configured to fetch signatures via AppRequest for the source blockchain
}

func NewApplicationRelayer(
Expand Down Expand Up @@ -108,19 +110,39 @@ func NewApplicationRelayer(
checkpointManager := checkpoint.NewCheckpointManager(logger, db, sub, relayerID, startingHeight)
checkpointManager.Run()

var warpClient *rpc.Client
if !sourceBlockchain.UseAppRequestNetwork() {
// The subnet-evm Warp API client does not support query parameters or HTTP headers, and expects the URI to be in a specific form.
// Instead, we invoke the Warp API directly via the RPC client.
warpClient, err = utils.DialWithConfig(
context.Background(),
sourceBlockchain.WarpAPIEndpoint.BaseURL,
sourceBlockchain.WarpAPIEndpoint.HTTPHeaders,
sourceBlockchain.WarpAPIEndpoint.QueryParams,
)
if err != nil {
logger.Error(
"Failed to create Warp API client",
zap.Error(err),
)
return nil, err
}
}

ar := ApplicationRelayer{
logger: logger,
metrics: metrics,
network: network,
messageCreator: messageCreator,
sourceBlockchain: sourceBlockchain,
destinationClient: destinationClient,
relayerID: relayerID,
signingSubnetID: signingSubnet,
warpQuorum: quorum,
checkpointManager: checkpointManager,
currentRequestID: rand.Uint32(), // TODONOW: pass via ctor
lock: &sync.RWMutex{},
logger: logger,
metrics: metrics,
network: network,
messageCreator: messageCreator,
sourceBlockchain: sourceBlockchain,
destinationClient: destinationClient,
relayerID: relayerID,
signingSubnetID: signingSubnet,
warpQuorum: quorum,
checkpointManager: checkpointManager,
currentRequestID: rand.Uint32(), // TODONOW: pass via ctor
lock: &sync.RWMutex{},
sourceWarpSignatureClient: warpClient,
}

return &ar, nil
Expand Down Expand Up @@ -170,7 +192,6 @@ func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) err
err := r.relayMessage(
reqID,
handler,
true,
)

return err
Expand All @@ -183,7 +204,6 @@ func (r *ApplicationRelayer) RelayerID() database.RelayerID {
func (r *ApplicationRelayer) relayMessage(
requestID uint32,
handler messages.MessageHandler,
useAppRequestNetwork bool,
) error {
r.logger.Debug(
"Relaying message",
Expand All @@ -209,7 +229,10 @@ func (r *ApplicationRelayer) relayMessage(
startCreateSignedMessageTime := time.Now()
// Query nodes on the origin chain for signatures, and construct the signed warp message.
var signedMessage *avalancheWarp.Message
if useAppRequestNetwork {

// sourceWarpSignatureClient is nil iff the source blockchain is configured to fetch signatures via AppRequest
if r.sourceWarpSignatureClient == nil {
r.incFetchSignatureAppRequestCount()
signedMessage, err = r.createSignedMessageAppRequest(unsignedMessage, requestID)
if err != nil {
r.logger.Error(
Expand All @@ -220,6 +243,7 @@ func (r *ApplicationRelayer) relayMessage(
return err
}
} else {
r.incFetchSignatureRPCCount()
signedMessage, err = r.createSignedMessage(unsignedMessage)
if err != nil {
r.logger.Error(
Expand Down Expand Up @@ -257,18 +281,11 @@ func (r *ApplicationRelayer) relayMessage(
// will need to be accounted for here.
func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp.UnsignedMessage) (*avalancheWarp.Message, error) {
r.logger.Info("Fetching aggregate signature from the source chain validators via API")
// TODO: To properly support this, we should provide a dedicated Warp API endpoint in the config
uri := utils.StripFromString(r.sourceBlockchain.RPCEndpoint.BaseURL, "/ext")
warpClient, err := warpBackend.NewClient(uri, r.sourceBlockchain.GetBlockchainID().String())
if err != nil {
r.logger.Error(
"Failed to create Warp API client",
zap.Error(err),
)
return nil, err
}

var signedWarpMessageBytes []byte
var (
signedWarpMessageBytes hexutil.Bytes
err error
)
for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
r.logger.Debug(
"Relayer collecting signatures from peers.",
Expand All @@ -277,8 +294,11 @@ func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp.
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
zap.String("signingSubnetID", r.signingSubnetID.String()),
)
signedWarpMessageBytes, err = warpClient.GetMessageAggregateSignature(

err = r.sourceWarpSignatureClient.CallContext(
context.Background(),
&signedWarpMessageBytes,
"warp_getMessageAggregateSignature",
unsignedMessage.ID(),
r.warpQuorum.QuorumNumerator,
r.signingSubnetID.String(),
Expand Down Expand Up @@ -732,3 +752,19 @@ func (r *ApplicationRelayer) setCreateSignedMessageLatencyMS(latency float64) {
r.sourceBlockchain.GetBlockchainID().String(),
r.sourceBlockchain.GetSubnetID().String()).Set(latency)
}

func (r *ApplicationRelayer) incFetchSignatureRPCCount() {
r.metrics.fetchSignatureRPCCount.
WithLabelValues(
r.relayerID.DestinationBlockchainID.String(),
r.sourceBlockchain.GetBlockchainID().String(),
r.sourceBlockchain.GetSubnetID().String()).Inc()
}

func (r *ApplicationRelayer) incFetchSignatureAppRequestCount() {
r.metrics.fetchSignatureAppRequestCount.
WithLabelValues(
r.relayerID.DestinationBlockchainID.String(),
r.sourceBlockchain.GetBlockchainID().String(),
r.sourceBlockchain.GetSubnetID().String()).Inc()
}
40 changes: 34 additions & 6 deletions relayer/application_relayer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ var (
)

type ApplicationRelayerMetrics struct {
successfulRelayMessageCount *prometheus.CounterVec
createSignedMessageLatencyMS *prometheus.GaugeVec
failedRelayMessageCount *prometheus.CounterVec
successfulRelayMessageCount *prometheus.CounterVec
createSignedMessageLatencyMS *prometheus.GaugeVec
failedRelayMessageCount *prometheus.CounterVec
fetchSignatureAppRequestCount *prometheus.CounterVec
fetchSignatureRPCCount *prometheus.CounterVec
}

func NewApplicationRelayerMetrics(registerer prometheus.Registerer) (*ApplicationRelayerMetrics, error) {
Expand Down Expand Up @@ -53,9 +55,35 @@ func NewApplicationRelayerMetrics(registerer prometheus.Registerer) (*Applicatio
}
registerer.MustRegister(failedRelayMessageCount)

fetchSignatureAppRequestCount := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "fetch_signature_app_request_count",
Help: "Number of aggregate signatures constructed via AppRequest",
},
[]string{"destination_chain_id", "source_chain_id", "source_subnet_id"},
)
if fetchSignatureAppRequestCount == nil {
return nil, ErrFailedToCreateApplicationRelayerMetrics
}
registerer.MustRegister(fetchSignatureAppRequestCount)

fetchSignatureRPCCount := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "fetch_signature_rpc_count",
Help: "Number of aggregate signatures fetched via Warp API",
},
[]string{"destination_chain_id", "source_chain_id", "source_subnet_id"},
)
if fetchSignatureRPCCount == nil {
return nil, ErrFailedToCreateApplicationRelayerMetrics
}
registerer.MustRegister(fetchSignatureRPCCount)

return &ApplicationRelayerMetrics{
successfulRelayMessageCount: successfulRelayMessageCount,
createSignedMessageLatencyMS: createSignedMessageLatencyMS,
failedRelayMessageCount: failedRelayMessageCount,
successfulRelayMessageCount: successfulRelayMessageCount,
createSignedMessageLatencyMS: createSignedMessageLatencyMS,
failedRelayMessageCount: failedRelayMessageCount,
fetchSignatureAppRequestCount: fetchSignatureAppRequestCount,
fetchSignatureRPCCount: fetchSignatureRPCCount,
}, nil
}
2 changes: 1 addition & 1 deletion relayer/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewListener(
)
return nil, err
}
ethWSClient, err := utils.DialWithConfig(
ethWSClient, err := utils.NewEthClientWithConfig(
context.Background(),
sourceBlockchain.WSEndpoint.BaseURL,
sourceBlockchain.WSEndpoint.HTTPHeaders,
Expand Down
3 changes: 3 additions & 0 deletions tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() {
ginkgo.It("Batch Message", func() {
BatchRelay(localNetworkInstance)
})
ginkgo.It("Warp API", func() {
WarpAPIRelay(localNetworkInstance)
})
})
1 change: 1 addition & 0 deletions tests/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func CreateDefaultRelayerConfig(
StorageLocation: StorageLocation,
DBWriteIntervalSeconds: DBUpdateSeconds,
ProcessMissedBlocks: false,
MetricsPort: 9090,
SourceBlockchains: sources,
DestinationBlockchains: destinations,
}
Expand Down
Loading

0 comments on commit 5cbcdb0

Please sign in to comment.