-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ShouldSendMessage
decider
#344
Changes from 1 commit
1b161e6
5c6422e
584556e
562cf7c
b01b14b
0d6b07b
249de65
81b30bc
ed3f4e7
f897c89
f89bd35
3631b7b
be38f17
aab66d5
df26aa2
a9c374c
6a9c276
c37fd41
7eb5112
598f9bb
7e02e1a
0a22497
742a6ee
1837d4b
8ed8112
6488c81
9a7944d
19e8689
682fead
79c82b2
c4b2262
f8662fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,5 +49,10 @@ jobs: | |
with: | ||
submodules: recursive | ||
|
||
- name: Install buf | ||
uses: bufbuild/[email protected] | ||
with: | ||
github_token: ${{ github.token }} | ||
|
||
- name: Run E2E Tests | ||
run: AVALANCHEGO_BUILD_PATH=/tmp/e2e-test/avalanchego DATA_DIR=/tmp/e2e-test/data ./scripts/e2e_test.sh |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,5 +27,17 @@ jobs: | |
with: | ||
go-version-file: 'go.mod' | ||
|
||
- name: Install buf | ||
uses: bufbuild/[email protected] | ||
with: | ||
github_token: ${{ github.token }} | ||
|
||
- name: Run Lint | ||
run: ./scripts/lint.sh --go-lint | ||
|
||
- name: Ensure protobuf changes are checked in | ||
run: | | ||
scripts/protobuf_codegen.sh | ||
git update-index --really-refresh >> /dev/null | ||
git diff-index HEAD # to show the differences | ||
git diff-index --quiet HEAD || (echo 'protobuf generated code changes have not all been checked in' && exit 1) | ||
Comment on lines
+39
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the difference between this method of checking that generated files are checked in versus the one we use in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure to be honest. I pulled this method from https://github.com/ava-labs/avalanchego/blob/49299868db0bdf5b3755c58993cc728173b3eedd/.github/workflows/check-clean-branch.sh#L8-L9 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
build/ | ||
__debug_bin | ||
tests/cmd/decider/decider | ||
|
||
.vscode* | ||
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add descriptions of the new config options. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,7 +101,7 @@ awm-relayer --help Display awm-relayer usag | |
|
||
### Building | ||
|
||
Before building, be sure to install Go, which is required even if you're just building the Docker image. | ||
Before building, be sure to install Go, which is required even if you're just building the Docker image. You'll also need to install [buf](github.com/bufbuild/buf/). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In order to build, yes, because the relayer binary is built to be aware of the protobuf interfaces so that that same binary can be used later if you decide you do want to include those config options. |
||
|
||
Build the relayer by running the script: | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,8 @@ type Config struct { | |
SourceBlockchains []*SourceBlockchain `mapstructure:"source-blockchains" json:"source-blockchains"` | ||
DestinationBlockchains []*DestinationBlockchain `mapstructure:"destination-blockchains" json:"destination-blockchains"` | ||
ProcessMissedBlocks bool `mapstructure:"process-missed-blocks" json:"process-missed-blocks"` | ||
DeciderHost string `mapstructure:"decider-host" json:"decider-host"` | ||
DeciderPort *uint16 `mapstructure:"decider-port" json:"decider-port"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add some validation on these new fields? If they're provided, they should be able to construct a valid URI. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed in a9c374c There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on combining these into a unified URI? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way I have it coded right now, you can skip specifying a host, and it will assume There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just calling this out because I wasn't aware how it worked exactly myself: AvalancheGo communicates with VM plugins via gprc, but instead of assuming the VMs are running and providing their host/port in the configuration, it starts the configured VMs itself on the first port available on That type of pattern may be less error prone for relayers to set up. i.e: the relayer config would specify the decider plugin(s) (if any) to run for a given source chain, and the application would handle initializing/configuring them itself. Not suggesting we do this now, but could be worth considering going forward. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that what Michael described is the generally preferred pattern for colocated services, and I agree that less config is better and less error prone. A counter example for why we would want to keep it, is that this allows for using remote deciders. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cam and I also discussed this. For this iteration I was going for the simplest thing possible, and sub-process management proved non-trivial, so I went with this host/port approach instead, since we said that ultimately we want to be able to support both local and remote deciders. Going forward, I'm open to doing the sub-process/plugin approach; we may be able to leverage the sub-process modules available in avalanchego, but if not then the complexity added to the code base for this would be a lot, at least as compared to the host/port approach. Also, at the moment I'm leaning towards just the host/port approach so that there's one single config interface, rather than one for a local decider (the plugin path) and a separate/different one for a remote decider (the host/port). |
||
|
||
// convenience field to fetch a blockchain's subnet ID | ||
blockchainIDToSubnetID map[ids.ID]ids.ID | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ import ( | |
"log" | ||
"net/http" | ||
"os" | ||
"runtime" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/ava-labs/avalanchego/api/metrics" | ||
|
@@ -33,9 +35,16 @@ import ( | |
"go.uber.org/atomic" | ||
"go.uber.org/zap" | ||
"golang.org/x/sync/errgroup" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/connectivity" | ||
"google.golang.org/grpc/credentials/insecure" | ||
) | ||
|
||
var version = "v0.0.0-dev" | ||
var ( | ||
version = "v0.0.0-dev" | ||
|
||
grpcClient *grpc.ClientConn // for connecting to the decider service | ||
geoff-vball marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
|
||
func main() { | ||
fs := config.BuildFlagSet() | ||
|
@@ -216,6 +225,33 @@ func main() { | |
|
||
// Create listeners for each of the subnets configured as a source | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this comment is duplicated on line 254. Was that intended? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved it, but I think the original crept back in as a merge conflict. Thanks for pointing it out. Addressed in 5c6422e |
||
errGroup, ctx := errgroup.WithContext(context.Background()) | ||
|
||
if cfg.DeciderPort != nil { | ||
geoff-vball marked this conversation as resolved.
Show resolved
Hide resolved
|
||
port := strconv.FormatUint(uint64(*cfg.DeciderPort), 10) | ||
|
||
host := cfg.DeciderHost | ||
if len(host) == 0 { | ||
host = "localhost" | ||
} | ||
|
||
grpcClient, err = grpc.NewClient( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use a more descriptive variable name, such as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed in b01b14b |
||
strings.Join([]string{host, port}, ":"), | ||
grpc.WithTransportCredentials( | ||
insecure.NewCredentials(), | ||
), | ||
) | ||
if err != nil { | ||
logger.Fatal( | ||
"Failed to instantiate decider client", | ||
zap.Error(err), | ||
) | ||
panic(err) | ||
} | ||
runtime.SetFinalizer(grpcClient, func(c *grpc.ClientConn) { c.Close() }) | ||
grpcClient.WaitForStateChange(ctx, connectivity.Ready) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe WaitForStateChange is currently marked as experimental: https://pkg.go.dev/google.golang.org/grpc#ClientConn.WaitForStateChange Not sure if this is an issue, just wanted to point it out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for pointing that out, I removed it in 584556e#diff-327181d0a8c5e6b164561d7910f4eeffd41442d55b2a2788fda2aa2692f17ec0 |
||
} | ||
|
||
// Create listeners for each of the subnets configured as a source | ||
for _, s := range cfg.SourceBlockchains { | ||
sourceBlockchain := s | ||
|
||
|
@@ -259,6 +295,7 @@ func createMessageHandlerFactories( | |
logger, | ||
address, | ||
cfg, | ||
grpcClient, | ||
) | ||
case config.OFF_CHAIN_REGISTRY: | ||
m, err = offchainregistry.NewMessageHandlerFactory( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,10 @@ package teleporter | |
|
||
import ( | ||
"context" | ||
"encoding/hex" | ||
"encoding/json" | ||
"fmt" | ||
"reflect" | ||
"time" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
|
@@ -15,6 +17,7 @@ import ( | |
warpPayload "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload" | ||
"github.com/ava-labs/awm-relayer/config" | ||
"github.com/ava-labs/awm-relayer/messages" | ||
pbDecider "github.com/ava-labs/awm-relayer/proto/pb/decider/v1" | ||
"github.com/ava-labs/awm-relayer/utils" | ||
"github.com/ava-labs/awm-relayer/vms" | ||
"github.com/ava-labs/subnet-evm/accounts/abi/bind" | ||
|
@@ -25,25 +28,29 @@ import ( | |
teleporterUtils "github.com/ava-labs/teleporter/utils/teleporter-utils" | ||
"github.com/ethereum/go-ethereum/common" | ||
"go.uber.org/zap" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
type factory struct { | ||
messageConfig Config | ||
protocolAddress common.Address | ||
logger logging.Logger | ||
deciderClient pbDecider.DeciderServiceClient | ||
} | ||
|
||
type messageHandler struct { | ||
logger logging.Logger | ||
teleporterMessage *teleportermessenger.TeleporterMessage | ||
unsignedMessage *warp.UnsignedMessage | ||
factory *factory | ||
deciderClient pbDecider.DeciderServiceClient | ||
} | ||
|
||
func NewMessageHandlerFactory( | ||
logger logging.Logger, | ||
messageProtocolAddress common.Address, | ||
messageProtocolConfig config.MessageProtocolConfig, | ||
grpcClient *grpc.ClientConn, | ||
) (messages.MessageHandlerFactory, error) { | ||
// Marshal the map and unmarshal into the Teleporter config | ||
data, err := json.Marshal(messageProtocolConfig.Settings) | ||
|
@@ -65,10 +72,18 @@ func NewMessageHandlerFactory( | |
return nil, err | ||
} | ||
|
||
var deciderClient pbDecider.DeciderServiceClient | ||
if grpcClient == nil { | ||
deciderClient = nil | ||
geoff-vball marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
deciderClient = pbDecider.NewDeciderServiceClient(grpcClient) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed in b01b14b |
||
|
||
return &factory{ | ||
messageConfig: messageConfig, | ||
protocolAddress: messageProtocolAddress, | ||
logger: logger, | ||
deciderClient: deciderClient, | ||
}, nil | ||
} | ||
|
||
|
@@ -86,6 +101,7 @@ func (f *factory) NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (mess | |
teleporterMessage: teleporterMessage, | ||
unsignedMessage: unsignedMessage, | ||
factory: f, | ||
deciderClient: f.deciderClient, | ||
}, nil | ||
} | ||
|
||
|
@@ -167,7 +183,47 @@ func (m *messageHandler) ShouldSendMessage(destinationClient vms.DestinationClie | |
return false, nil | ||
} | ||
|
||
feuGeneA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return true, nil | ||
var decision bool = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be initialized to false? It looks like the error cases are just returning decision without modifying it, and I assume we should return false in those cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intended control flow here is to separate the existing I think this could be made clearer by adding a new helper, and calling it like so:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I extracted the logic into a new helper, and adapted my changes to |
||
deciderClientValue := reflect.ValueOf(m.deciderClient) | ||
if deciderClientValue.IsValid() && !deciderClientValue.IsNil() { | ||
geoff-vball marked this conversation as resolved.
Show resolved
Hide resolved
|
||
warpMsgIDStr := m.unsignedMessage.ID().Hex() | ||
|
||
warpMsgID, err := hex.DecodeString(warpMsgIDStr) | ||
if err != nil { | ||
m.logger.Error( | ||
"Error decoding message ID", | ||
zap.String("warpMsgIDStr", warpMsgIDStr), | ||
zap.Error(err), | ||
) | ||
return decision, err | ||
} | ||
|
||
// TODO: add a timeout to the context | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's include timeouts in this PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed in f89bd35 |
||
response, err := m.deciderClient.ShouldSendMessage( | ||
context.Background(), | ||
&pbDecider.ShouldSendMessageRequest{ | ||
NetworkId: m.unsignedMessage.NetworkID, | ||
SourceChainId: m.unsignedMessage.SourceChainID[:], | ||
Payload: m.unsignedMessage.Payload, | ||
BytesRepresentation: m.unsignedMessage.Bytes(), | ||
Id: warpMsgID, | ||
}, | ||
) | ||
if err != nil { | ||
m.logger.Error( | ||
"Error response from decider.", | ||
zap.String("destinationBlockchainID", destinationBlockchainID.String()), | ||
zap.String("teleporterMessageID", teleporterMessageID.String()), | ||
zap.Any("warpMessageID", warpMsgIDStr), | ||
zap.Error(err), | ||
) | ||
return decision, err | ||
} | ||
geoff-vball marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
decision = response.ShouldSendMessage | ||
} | ||
|
||
return decision, nil | ||
} | ||
|
||
// SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
Protobuf linting and generation for this project is managed by | ||
[buf](https://github.com/bufbuild/buf). | ||
|
||
Please find installation instructions on | ||
feuGeneA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
[https://docs.buf.build/installation/](https://docs.buf.build/installation/). | ||
|
||
Any changes made to proto definition can be updated by running | ||
feuGeneA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
`protobuf_codegen.sh` located in the `scripts/` directory of this repo. | ||
|
||
Introduction to `buf` | ||
[https://docs.buf.build/tour/introduction](https://docs.buf.build/tour/introduction) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
version: v1 | ||
plugins: | ||
- name: go | ||
out: pb | ||
opt: paths=source_relative | ||
- name: go-grpc | ||
out: pb | ||
opt: paths=source_relative |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# Generated by buf. DO NOT EDIT. | ||
version: v1 | ||
deps: | ||
- remote: buf.build | ||
owner: prometheus | ||
repository: client-model | ||
commit: e171c0b235c546d5a9a597c2961bd357 | ||
digest: shake256:7db3f73ac0f1dce71e70f304f318e9741e857fd78b7b42f0df7a3da353fbb2f387899da7b0a77ac9ee9565194510e39a913cdb9a8ab3c2ff4b8713428c795213 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the github token necessary? Does the repo even have access to a gh token?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently not. I removed this in ed3f4e7 and everything still ran fine. I had just copied this from avalanchego. Thanks for noticing it.