Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ShouldSendMessage decider #344

Merged
merged 32 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1b161e6
`ShouldSendMessage` decider service support
feuGeneA Jun 21, 2024
5c6422e
rm superfluous/duplicate comment
feuGeneA Jul 19, 2024
584556e
msg handler: extract `deciderRejectedMsg()` helper
feuGeneA Jul 19, 2024
562cf7c
extract helper `initializeDeciderClient()`
feuGeneA Jul 19, 2024
b01b14b
main: deciderClient as main() local, not global
feuGeneA Jul 19, 2024
0d6b07b
Merge remote-tracking branch 'origin/main' into decider
feuGeneA Jul 19, 2024
249de65
Apply suggestions from code review
feuGeneA Jul 19, 2024
81b30bc
proto .sh: pull protoc-gen-go version from go.mod
feuGeneA Jul 22, 2024
ed3f4e7
rm unused github_token from .github workflow
feuGeneA Jul 22, 2024
f897c89
Merge branch 'main' into decider
feuGeneA Jul 22, 2024
f89bd35
add timeout to decider delegation
feuGeneA Jul 22, 2024
3631b7b
follow ava-go: exclude ver from proto svc name
feuGeneA Jul 22, 2024
be38f17
add descriptions of new config options
feuGeneA Jul 22, 2024
aab66d5
validate uri
feuGeneA Jul 22, 2024
df26aa2
improve readbility of decider invocation
feuGeneA Jul 24, 2024
a9c374c
validate decider host/port in config, not main
feuGeneA Jul 24, 2024
6a9c276
explain `createDeciderClient()`
feuGeneA Jul 24, 2024
c37fd41
add to README
feuGeneA Jul 24, 2024
7eb5112
config: just DeciderURL, not Host/Port
feuGeneA Jul 24, 2024
598f9bb
Merge branch 'main' into decider
feuGeneA Jul 25, 2024
7e02e1a
stop returning bool pointer
feuGeneA Jul 25, 2024
0a22497
rename and document helper method
feuGeneA Jul 25, 2024
742a6ee
e2e test: panic if decider exits abnormally
feuGeneA Jul 25, 2024
1837d4b
Update messages/teleporter/message_handler.go
feuGeneA Jul 25, 2024
8ed8112
streamline getShouldSendMessageFromDecider flow
feuGeneA Jul 25, 2024
6488c81
Update config/config.go
feuGeneA Jul 25, 2024
9a7944d
stop decoding warpMsgID
feuGeneA Jul 25, 2024
19e8689
rename parameter
feuGeneA Jul 25, 2024
682fead
remove redundant assignment of nil to interface
feuGeneA Jul 25, 2024
79c82b2
`deciderConnection`, not just `deciderClient`
feuGeneA Jul 26, 2024
c4b2262
add comment to separate two "halves" of function
feuGeneA Jul 26, 2024
f8662fd
when conn==nil, init client to "empty" impl
feuGeneA Jul 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,10 @@ jobs:
with:
submodules: recursive

- name: Install buf
uses: bufbuild/[email protected]
with:
github_token: ${{ github.token }}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the github token necessary? Does the repo even have access to a gh token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently not. I removed this in ed3f4e7 and everything still ran fine. I had just copied this from avalanchego. Thanks for noticing it.


- name: Run E2E Tests
run: AVALANCHEGO_BUILD_PATH=/tmp/e2e-test/avalanchego DATA_DIR=/tmp/e2e-test/data ./scripts/e2e_test.sh
10 changes: 10 additions & 0 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,15 @@ jobs:
with:
go-version-file: 'go.mod'

- name: Install buf
uses: bufbuild/[email protected]

- name: Run Lint
run: ./scripts/lint.sh --go-lint

- name: Ensure protobuf changes are checked in
run: |
scripts/protobuf_codegen.sh
git update-index --really-refresh >> /dev/null
git diff-index HEAD # to show the differences
git diff-index --quiet HEAD || (echo 'protobuf generated code changes have not all been checked in' && exit 1)
Comment on lines +39 to +41
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this method of checking that generated files are checked in versus the one we use in teleporter? https://github.com/ava-labs/teleporter/blob/main/.github/workflows/abi_bindings_checker.yml#L41

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
build/
__debug_bin
tests/cmd/decider/decider

.vscode*

Expand Down
6 changes: 5 additions & 1 deletion README.md
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add descriptions of the new config options.

Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ awm-relayer --help Display awm-relayer usag

### Building

Before building, be sure to install Go, which is required even if you're just building the Docker image.
Before building, be sure to install Go, which is required even if you're just building the Docker image. You'll also need to install [buf](github.com/bufbuild/buf/).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is buf required even if the Decider is unused (i.e. the relevant config options are omitted)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to build, yes, because the relayer binary is built to be aware of the protobuf interfaces so that that same binary can be used later if you decide you do want to include those config options.


Build the relayer by running the script:

Expand Down Expand Up @@ -291,6 +291,10 @@ The relayer is configured via a JSON file, the path to which is passed in via th

- The AWS region in which the KMS key is located. Required if `kms-key-id` is provided.

`"decider-url": string`

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd that there's no description for decider-host. Even a token description would be good. The description of the semantics when this field is omitted should be included here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got rid of decider-host altogether. See #344 (comment)

- The URL of a service implementing the gRPC service defined by `proto/decider`, which will be queried for each message to determine whether that message should be relayed.

## Architecture

### Components
Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"net/url"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/constants"
Expand Down Expand Up @@ -58,6 +59,7 @@ type Config struct {
SourceBlockchains []*SourceBlockchain `mapstructure:"source-blockchains" json:"source-blockchains"`
DestinationBlockchains []*DestinationBlockchain `mapstructure:"destination-blockchains" json:"destination-blockchains"`
ProcessMissedBlocks bool `mapstructure:"process-missed-blocks" json:"process-missed-blocks"`
DeciderURL string `mapstructure:"decider-url" json:"decider-url"`

// convenience field to fetch a blockchain's subnet ID
blockchainIDToSubnetID map[ids.ID]ids.ID
Expand Down Expand Up @@ -119,6 +121,12 @@ func (c *Config) Validate() error {
}
c.blockchainIDToSubnetID = blockchainIDToSubnetID

if len(c.DeciderURL) != 0 {
if _, err := url.ParseRequestURI(c.DeciderURL); err != nil {
return fmt.Errorf("Invalid decider URL: %w", err)
}
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ require (
github.com/stretchr/testify v1.9.0
go.uber.org/mock v0.4.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.2
)

require (
Expand Down Expand Up @@ -158,8 +160,6 @@ require (
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.3.0 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
46 changes: 45 additions & 1 deletion main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"net/http"
"os"
"runtime"
"strings"

"github.com/ava-labs/avalanchego/api/metrics"
Expand All @@ -33,6 +34,8 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var version = "v0.0.0-dev"
Expand Down Expand Up @@ -174,7 +177,20 @@ func main() {

relayerHealth := createHealthTrackers(&cfg)

messageHandlerFactories, err := createMessageHandlerFactories(logger, &cfg)
deciderConnection, err := createDeciderConnection(cfg.DeciderURL)
if err != nil {
logger.Fatal(
"Failed to instantiate decider connection",
zap.Error(err),
)
panic(err)
}

messageHandlerFactories, err := createMessageHandlerFactories(
logger,
&cfg,
deciderConnection,
)
if err != nil {
logger.Fatal("Failed to create message handler factories", zap.Error(err))
panic(err)
Expand Down Expand Up @@ -240,6 +256,7 @@ func main() {
func createMessageHandlerFactories(
logger logging.Logger,
globalConfig *config.Config,
deciderConnection *grpc.ClientConn,
) (map[ids.ID]map[common.Address]messages.MessageHandlerFactory, error) {
messageHandlerFactories := make(map[ids.ID]map[common.Address]messages.MessageHandlerFactory)
for _, sourceBlockchain := range globalConfig.SourceBlockchains {
Expand All @@ -258,6 +275,7 @@ func createMessageHandlerFactories(
logger,
address,
cfg,
deciderConnection,
)
case config.OFF_CHAIN_REGISTRY:
m, err = offchainregistry.NewMessageHandlerFactory(
Expand Down Expand Up @@ -432,6 +450,32 @@ func createApplicationRelayersForSourceChain(
return applicationRelayers, minHeight, nil
}

// create a connection to the "should send message" decider service.
// if url is unspecified, returns a nil client pointer
func createDeciderConnection(url string) (*grpc.ClientConn, error) {
if len(url) == 0 {
return nil, nil
}

connection, err := grpc.NewClient(
url,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf(
"Failed to instantiate grpc client: %w",
err,
)
}

runtime.SetFinalizer(
connection,
func(c *grpc.ClientConn) { c.Close() },
)

return connection, nil
}

func createHealthTrackers(cfg *config.Config) map[ids.ID]*atomic.Bool {
healthTrackers := make(map[ids.ID]*atomic.Bool, len(cfg.SourceBlockchains))
for _, sourceBlockchain := range cfg.SourceBlockchains {
Expand Down
70 changes: 69 additions & 1 deletion messages/teleporter/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
warpPayload "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/messages"
pbDecider "github.com/ava-labs/awm-relayer/proto/pb/decider"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/accounts/abi/bind"
Expand All @@ -25,25 +26,40 @@ import (
teleporterUtils "github.com/ava-labs/teleporter/utils/teleporter-utils"
"github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type factory struct {
messageConfig Config
protocolAddress common.Address
logger logging.Logger
deciderClient pbDecider.DeciderServiceClient
}

type messageHandler struct {
logger logging.Logger
teleporterMessage *teleportermessenger.TeleporterMessage
unsignedMessage *warp.UnsignedMessage
factory *factory
deciderClient pbDecider.DeciderServiceClient
}

// define an "empty" decider client to use when a connection isn't provided:
type emptyDeciderClient struct{}

func (s *emptyDeciderClient) ShouldSendMessage(
_ context.Context,
_ *pbDecider.ShouldSendMessageRequest,
_ ...grpc.CallOption,
) (*pbDecider.ShouldSendMessageResponse, error) {
return &pbDecider.ShouldSendMessageResponse{ShouldSendMessage: true}, nil
}

func NewMessageHandlerFactory(
logger logging.Logger,
messageProtocolAddress common.Address,
messageProtocolConfig config.MessageProtocolConfig,
deciderClientConn *grpc.ClientConn,
) (messages.MessageHandlerFactory, error) {
// Marshal the map and unmarshal into the Teleporter config
data, err := json.Marshal(messageProtocolConfig.Settings)
Expand All @@ -65,10 +81,18 @@ func NewMessageHandlerFactory(
return nil, err
}

var deciderClient pbDecider.DeciderServiceClient
if deciderClientConn == nil {
deciderClient = &emptyDeciderClient{}
} else {
deciderClient = pbDecider.NewDeciderServiceClient(deciderClientConn)
}

return &factory{
messageConfig: messageConfig,
protocolAddress: messageProtocolAddress,
logger: logger,
deciderClient: deciderClient,
}, nil
}

Expand All @@ -86,6 +110,7 @@ func (f *factory) NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (mess
teleporterMessage: teleporterMessage,
unsignedMessage: unsignedMessage,
factory: f,
deciderClient: f.deciderClient,
}, nil
}

Expand Down Expand Up @@ -166,8 +191,51 @@ func (m *messageHandler) ShouldSendMessage(destinationClient vms.DestinationClie
)
return false, nil
}
// Dispatch to the external decider service. If the service is unavailable or returns
// an error, then use the decision that has already been made, i.e. return true
decision, err := m.getShouldSendMessageFromDecider()
if err != nil {
m.logger.Warn(
"Error delegating to decider",
zap.String("warpMessageID", m.unsignedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessageID.String()),
)
return true, nil
}
if !decision {
m.logger.Info(
"Decider rejected message",
zap.String("warpMessageID", m.unsignedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessageID.String()),
zap.String("destinationBlockchainID", destinationBlockchainID.String()),
)
}
return decision, nil
}

// Queries the decider service to determine whether this message should be
// sent. If the decider client is nil, returns true.
func (m *messageHandler) getShouldSendMessageFromDecider() (bool, error) {
warpMsgID := m.unsignedMessage.ID()

ctx, cancelCtx := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelCtx()
response, err := m.deciderClient.ShouldSendMessage(
ctx,
&pbDecider.ShouldSendMessageRequest{
NetworkId: m.unsignedMessage.NetworkID,
SourceChainId: m.unsignedMessage.SourceChainID[:],
Payload: m.unsignedMessage.Payload,
BytesRepresentation: m.unsignedMessage.Bytes(),
Id: warpMsgID[:],
},
)
if err != nil {
m.logger.Error("Error response from decider.", zap.Error(err))
return false, err
}

return true, nil
return response.ShouldSendMessage, nil
}

// SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage
Expand Down
1 change: 1 addition & 0 deletions messages/teleporter/message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func TestShouldSendMessage(t *testing.T) {
logger,
messageProtocolAddress,
messageProtocolConfig,
nil,
)
require.NoError(t, err)
messageHandler, err := factory.NewMessageHandler(test.warpUnsignedMessage)
Expand Down
11 changes: 11 additions & 0 deletions proto/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Protobuf linting and generation for this project is managed by
[buf](https://github.com/bufbuild/buf).

Please find installation instructions at
[https://docs.buf.build/installation/](https://docs.buf.build/installation/).

When changes are made to the proto definition, the generated source code can be updated by running
`protobuf_codegen.sh` located in the `scripts/` directory of this repo.

Introduction to `buf`
[https://docs.buf.build/tour/introduction](https://docs.buf.build/tour/introduction)
8 changes: 8 additions & 0 deletions proto/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: v1
plugins:
- name: go
out: pb
opt: paths=source_relative
- name: go-grpc
out: pb
opt: paths=source_relative
8 changes: 8 additions & 0 deletions proto/buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Generated by buf. DO NOT EDIT.
version: v1
deps:
- remote: buf.build
owner: prometheus
repository: client-model
commit: e171c0b235c546d5a9a597c2961bd357
digest: shake256:7db3f73ac0f1dce71e70f304f318e9741e857fd78b7b42f0df7a3da353fbb2f387899da7b0a77ac9ee9565194510e39a913cdb9a8ab3c2ff4b8713428c795213
10 changes: 10 additions & 0 deletions proto/buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: v1
name: buf.build/ava-labs/awm-relayer
breaking:
use:
- FILE
lint:
use:
- DEFAULT
except:
- PACKAGE_VERSION_SUFFIX # versioned naming <service>.v1beta
21 changes: 21 additions & 0 deletions proto/decider/decider.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
syntax = "proto3";

package decider;

option go_package = "github.com/ava-labs/awm-relayer/proto/pb/decider";

service DeciderService {
rpc ShouldSendMessage(ShouldSendMessageRequest) returns (ShouldSendMessageResponse);
}

message ShouldSendMessageRequest {
uint32 network_id = 1;
bytes source_chain_id = 2;
bytes payload = 3;
bytes bytes_representation = 4;
bytes id = 5;
}

message ShouldSendMessageResponse {
bool should_send_message = 1;
}
Loading
Loading