diff --git a/cmd/flags.go b/cmd/flags.go index 9c7f439df..b1aae36fe 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -9,21 +9,23 @@ import ( ) var ( - flagHash = "hash" - flagURL = "url" - flagForce = "force" - flagFlags = "flags" - flagTimeout = "timeout" - flagConfig = "config" - flagJSON = "json" - flagYAML = "yaml" - flagFile = "file" - flagPath = "path" - flagListenAddr = "listen" - flagTx = "no-tx" - flagBlock = "no-block" - flagData = "data" - flagOrder = "unordered" + flagHash = "hash" + flagURL = "url" + flagForce = "force" + flagFlags = "flags" + flagTimeout = "timeout" + flagConfig = "config" + flagJSON = "json" + flagYAML = "yaml" + flagFile = "file" + flagPath = "path" + flagListenAddr = "listen" + flagTx = "no-tx" + flagBlock = "no-block" + flagData = "data" + flagOrder = "unordered" + flagMaxTxSize = "max-tx-size" + flagMaxMsgLength = "max-msgs" ) func liteFlags(cmd *cobra.Command) *cobra.Command { @@ -172,6 +174,18 @@ func urlFlag(cmd *cobra.Command) *cobra.Command { return cmd } +func strategyFlag(cmd *cobra.Command) *cobra.Command { + cmd.Flags().StringP(flagMaxTxSize, "s", "2", "maximum size (in MB) of the messages in a relay transaction") + cmd.Flags().StringP(flagMaxMsgLength, "l", "5", "maximum number of messages in a relay transaction") + if err := viper.BindPFlag(flagMaxTxSize, cmd.Flags().Lookup(flagMaxTxSize)); err != nil { + panic(err) + } + if err := viper.BindPFlag(flagMaxMsgLength, cmd.Flags().Lookup(flagMaxMsgLength)); err != nil { + panic(err) + } + return cmd +} + func getAddInputs(cmd *cobra.Command) (file string, url string, err error) { file, err = cmd.Flags().GetString(flagFile) if err != nil { diff --git a/cmd/root.go b/cmd/root.go index 30372486e..1764b2b88 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -28,6 +28,10 @@ import ( "github.com/spf13/viper" ) +const ( + MB = 1048576 // in bytes +) + var ( cfgPath string homePath string diff --git a/cmd/start.go b/cmd/start.go index c63dbd893..40c3e6934 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -40,7 +40,12 @@ func startCmd() *cobra.Command { } path := config.Paths.MustGet(args[0]) - done, err := relayer.RunStrategy(c[src], c[dst], path.MustGetStrategy(), path.Ordered()) + strategy, err := GetStrategyWithOptions(cmd, path.MustGetStrategy()) + if err != nil { + return err + } + + done, err := relayer.RunStrategy(c[src], c[dst], strategy, path.Ordered()) if err != nil { return err } @@ -49,7 +54,7 @@ func startCmd() *cobra.Command { return nil }, } - return cmd + return strategyFlag(cmd) } // trap signal waits for a SIGINT or SIGTERM and then sends down the done channel diff --git a/cmd/strategy.go b/cmd/strategy.go new file mode 100644 index 000000000..e3318d7ea --- /dev/null +++ b/cmd/strategy.go @@ -0,0 +1,51 @@ +package cmd + +import ( + "fmt" + "strconv" + + "github.com/iqlusioninc/relayer/relayer" + "github.com/spf13/cobra" +) + +// GetStrategyWithOptions sets strategy specific fields. +func GetStrategyWithOptions(cmd *cobra.Command, strategy relayer.Strategy) (relayer.Strategy, error) { + switch strategy.GetType() { + case (&relayer.NaiveStrategy{}).GetType(): + ns, ok := strategy.(*relayer.NaiveStrategy) + if !ok { + return strategy, fmt.Errorf("strategy.GetType() returns naive, but strategy type (%T) is not type NaiveStrategy", strategy) + + } + + maxTxSize, err := cmd.Flags().GetString(flagMaxTxSize) + if err != nil { + return ns, err + } + + txSize, err := strconv.ParseUint(maxTxSize, 10, 64) + if err != nil { + return ns, err + } + + // set max size of messages in a relay transaction + ns.MaxTxSize = txSize * MB // in MB + + maxMsgLength, err := cmd.Flags().GetString(flagMaxMsgLength) + if err != nil { + return ns, err + } + + msgLen, err := strconv.ParseUint(maxMsgLength, 10, 64) + if err != nil { + return ns, err + } + + // set max length messages in relay transaction + ns.MaxMsgLength = msgLen + + return ns, nil + default: + return strategy, nil + } +} diff --git a/cmd/tx.go b/cmd/tx.go index 2007c01e6..a4cd5b95b 100644 --- a/cmd/tx.go +++ b/cmd/tx.go @@ -200,7 +200,12 @@ func relayMsgsCmd() *cobra.Command { return err } - strategy := &relayer.NaiveStrategy{} + path := config.Paths.MustGet(args[0]) + strategy, err := GetStrategyWithOptions(cmd, path.MustGetStrategy()) + if err != nil { + return err + } + if err = strategy.RelayPacketsOrderedChan(c[src], c[dst], sp, sh); err != nil { return err } @@ -209,7 +214,7 @@ func relayMsgsCmd() *cobra.Command { }, } - return cmd + return strategyFlag(cmd) } func sendPacketCmd() *cobra.Command { diff --git a/relayer/naive-strategy.go b/relayer/naive-strategy.go index 18228327f..4ff695116 100644 --- a/relayer/naive-strategy.go +++ b/relayer/naive-strategy.go @@ -9,6 +9,8 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) +var _ Strategy = &NaiveStrategy{} + // NewNaiveStrategy returns the proper config for the NaiveStrategy func NewNaiveStrategy() *StrategyCfg { return &StrategyCfg{ @@ -16,9 +18,11 @@ func NewNaiveStrategy() *StrategyCfg { } } -// NaiveStrategy is an implementation of Strategy +// NaiveStrategy is an implementation of Strategy. type NaiveStrategy struct { - Ordered bool + Ordered bool + MaxTxSize uint64 // maximum permitted size of the msgs in a bundled relay transaction + MaxMsgLength uint64 // maximum amount of messages in a bundled relay transaction } // GetType implements Strategy @@ -40,7 +44,7 @@ func (nrs *NaiveStrategy) UnrelayedSequencesUnordered(src, dst *Chain, sh *SyncH func (nrs *NaiveStrategy) HandleEvents(src, dst *Chain, sh *SyncHeaders, events map[string][]string) { rlyPackets, err := relayPacketsFromEventListener(src.PathEnd, dst.PathEnd, events) if len(rlyPackets) > 0 && err == nil { - sendTxFromEventPackets(src, dst, rlyPackets, sh) + nrs.sendTxFromEventPackets(src, dst, rlyPackets, sh) } } @@ -142,7 +146,7 @@ func relayPacketsFromEventListener(src, dst *PathEnd, events map[string][]string return } -func sendTxFromEventPackets(src, dst *Chain, rlyPackets []relayPacket, sh *SyncHeaders) { +func (nrs *NaiveStrategy) sendTxFromEventPackets(src, dst *Chain, rlyPackets []relayPacket, sh *SyncHeaders) { // fetch the proofs for the relayPackets for _, rp := range rlyPackets { if err := rp.FetchCommitResponse(src, dst, sh); err != nil { @@ -159,7 +163,9 @@ func sendTxFromEventPackets(src, dst *Chain, rlyPackets []relayPacket, sh *SyncH Src: []sdk.Msg{ src.PathEnd.UpdateClient(sh.GetHeader(dst.ChainID), src.MustGetAddress()), }, - Dst: []sdk.Msg{}, + Dst: []sdk.Msg{}, + MaxTxSize: nrs.MaxTxSize, + MaxMsgLength: nrs.MaxMsgLength, } // add the packet msgs to RelayPackets @@ -170,6 +176,7 @@ func sendTxFromEventPackets(src, dst *Chain, rlyPackets []relayPacket, sh *SyncH if txs.Send(src, dst); !txs.success { return fmt.Errorf("failed to send packets") } + return nil }); err != nil { src.Error(err) @@ -184,10 +191,14 @@ func (nrs *NaiveStrategy) RelayPacketsUnorderedChan(src, dst *Chain, sp *RelaySe // RelayPacketsOrderedChan creates transactions to clear both queues // CONTRACT: the SyncHeaders passed in here must be up to date or being kept updated -func (*NaiveStrategy) RelayPacketsOrderedChan(src, dst *Chain, sp *RelaySequences, sh *SyncHeaders) error { - - // create the appropriate update client messages - msgs := &RelayMsgs{Src: []sdk.Msg{}, Dst: []sdk.Msg{}} +func (nrs *NaiveStrategy) RelayPacketsOrderedChan(src, dst *Chain, sp *RelaySequences, sh *SyncHeaders) error { + // set the maximum relay transaction constraints + msgs := &RelayMsgs{ + Src: []sdk.Msg{}, + Dst: []sdk.Msg{}, + MaxTxSize: nrs.MaxTxSize, + MaxMsgLength: nrs.MaxMsgLength, + } // add messages for src -> dst for _, seq := range sp.Src { diff --git a/relayer/relayMsgs.go b/relayer/relayMsgs.go index 35dcc0ca7..9e5daa4d3 100644 --- a/relayer/relayMsgs.go +++ b/relayer/relayMsgs.go @@ -8,10 +8,13 @@ import ( ) // RelayMsgs contains the msgs that need to be sent to both a src and dst chain -// after a given relay round +// after a given relay round. MaxTxSize and MaxMsgLength are ignored if they are +// set to zero. type RelayMsgs struct { - Src []sdk.Msg - Dst []sdk.Msg + Src []sdk.Msg + Dst []sdk.Msg + MaxTxSize uint64 // maximum permitted size of the msgs in a bundled relay transaction + MaxMsgLength uint64 // maximum amount of messages in a bundled relay transaction last bool success bool @@ -34,42 +37,77 @@ func (r *RelayMsgs) Success() bool { return r.success } +func (r *RelayMsgs) IsMaxTx(msgLen, txSize uint64) bool { + return (r.MaxMsgLength != 0 && msgLen > r.MaxMsgLength) || + (r.MaxTxSize != 0 && txSize > r.MaxTxSize) +} + // Send sends the messages with appropriate output +// TODO: Parallelize? Maybe? func (r *RelayMsgs) Send(src, dst *Chain) { - var failed = false - // TODO: maybe figure out a better way to indicate error here? - - // TODO: Parallelize? Maybe? - if len(r.Src) > 0 { - // Submit the transactions to src chain - res, err := src.SendMsgs(r.Src) - if err != nil || res.Code != 0 { - src.LogFailedTx(res, err, r.Src) - failed = true - } else { - // NOTE: Add more data to this such as identifiers - src.LogSuccessTx(res, r.Src) + var msgLen, txSize uint64 + var msgs []sdk.Msg + + r.success = true + + // submit batches of relay transactions + for _, msg := range r.Src { + msgLen++ + txSize += uint64(len(msg.GetSignBytes())) + + if r.IsMaxTx(msgLen, txSize) { + // Submit the transactions to src chain and update its status + r.success = r.success && send(src, msgs) + + // clear the current batch and reset variables + msgLen, txSize = 1, uint64(len(msg.GetSignBytes())) + msgs = []sdk.Msg{} } + msgs = append(msgs, msg) + } + + // submit leftover msgs + if len(msgs) > 0 && !send(src, msgs) { + r.success = false } - if len(r.Dst) > 0 { - // Submit the transactions to dst chain - res, err := dst.SendMsgs(r.Dst) - if err != nil || res.Code != 0 { - dst.LogFailedTx(res, err, r.Dst) - failed = true - } else { - // NOTE: Add more data to this such as identifiers - dst.LogSuccessTx(res, r.Dst) + // reset variables + msgLen, txSize = 0, 0 + msgs = []sdk.Msg{} + + for _, msg := range r.Dst { + msgLen++ + txSize += uint64(len(msg.GetSignBytes())) + + if r.IsMaxTx(msgLen, txSize) { + // Submit the transaction to dst chain and update its status + r.success = r.success && send(dst, msgs) + // clear the current batch and reset variables + msgLen, txSize = 1, uint64(len(msg.GetSignBytes())) + msgs = []sdk.Msg{} } + msgs = append(msgs, msg) } - if failed { + // submit leftover msgs + if len(msgs) > 0 && !send(dst, msgs) { r.success = false - return } - r.success = true +} + +// Submits the messages to the provided chain and logs the result of the transaction. +// Returns true upon success and false otherwise. +func send(chain *Chain, msgs []sdk.Msg) bool { + res, err := chain.SendMsgs(msgs) + if err != nil || res.Code != 0 { + chain.LogFailedTx(res, err, msgs) + return false + } else { + // NOTE: Add more data to this such as identifiers + chain.LogSuccessTx(res, msgs) + } + return true } func getMsgAction(msgs []sdk.Msg) string {