Skip to content

Commit

Permalink
Merge PR #245: Breakup oversize relay msgs
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-axner authored May 21, 2020
1 parent 672917a commit 39360aa
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 56 deletions.
44 changes: 29 additions & 15 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@ import (
)

var (
flagHash = "hash"
flagURL = "url"
flagForce = "force"
flagFlags = "flags"
flagTimeout = "timeout"
flagConfig = "config"
flagJSON = "json"
flagYAML = "yaml"
flagFile = "file"
flagPath = "path"
flagListenAddr = "listen"
flagTx = "no-tx"
flagBlock = "no-block"
flagData = "data"
flagOrder = "unordered"
flagHash = "hash"
flagURL = "url"
flagForce = "force"
flagFlags = "flags"
flagTimeout = "timeout"
flagConfig = "config"
flagJSON = "json"
flagYAML = "yaml"
flagFile = "file"
flagPath = "path"
flagListenAddr = "listen"
flagTx = "no-tx"
flagBlock = "no-block"
flagData = "data"
flagOrder = "unordered"
flagMaxTxSize = "max-tx-size"
flagMaxMsgLength = "max-msgs"
)

func liteFlags(cmd *cobra.Command) *cobra.Command {
Expand Down Expand Up @@ -172,6 +174,18 @@ func urlFlag(cmd *cobra.Command) *cobra.Command {
return cmd
}

func strategyFlag(cmd *cobra.Command) *cobra.Command {
cmd.Flags().StringP(flagMaxTxSize, "s", "2", "maximum size (in MB) of the messages in a relay transaction")
cmd.Flags().StringP(flagMaxMsgLength, "l", "5", "maximum number of messages in a relay transaction")
if err := viper.BindPFlag(flagMaxTxSize, cmd.Flags().Lookup(flagMaxTxSize)); err != nil {
panic(err)
}
if err := viper.BindPFlag(flagMaxMsgLength, cmd.Flags().Lookup(flagMaxMsgLength)); err != nil {
panic(err)
}
return cmd
}

func getAddInputs(cmd *cobra.Command) (file string, url string, err error) {
file, err = cmd.Flags().GetString(flagFile)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (
"github.com/spf13/viper"
)

const (
MB = 1048576 // in bytes
)

var (
cfgPath string
homePath string
Expand Down
9 changes: 7 additions & 2 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ func startCmd() *cobra.Command {
}

path := config.Paths.MustGet(args[0])
done, err := relayer.RunStrategy(c[src], c[dst], path.MustGetStrategy(), path.Ordered())
strategy, err := GetStrategyWithOptions(cmd, path.MustGetStrategy())
if err != nil {
return err
}

done, err := relayer.RunStrategy(c[src], c[dst], strategy, path.Ordered())
if err != nil {
return err
}
Expand All @@ -49,7 +54,7 @@ func startCmd() *cobra.Command {
return nil
},
}
return cmd
return strategyFlag(cmd)
}

// trap signal waits for a SIGINT or SIGTERM and then sends down the done channel
Expand Down
51 changes: 51 additions & 0 deletions cmd/strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package cmd

import (
"fmt"
"strconv"

"github.com/iqlusioninc/relayer/relayer"
"github.com/spf13/cobra"
)

// GetStrategyWithOptions sets strategy specific fields.
func GetStrategyWithOptions(cmd *cobra.Command, strategy relayer.Strategy) (relayer.Strategy, error) {
switch strategy.GetType() {
case (&relayer.NaiveStrategy{}).GetType():
ns, ok := strategy.(*relayer.NaiveStrategy)
if !ok {
return strategy, fmt.Errorf("strategy.GetType() returns naive, but strategy type (%T) is not type NaiveStrategy", strategy)

}

maxTxSize, err := cmd.Flags().GetString(flagMaxTxSize)
if err != nil {
return ns, err
}

txSize, err := strconv.ParseUint(maxTxSize, 10, 64)
if err != nil {
return ns, err
}

// set max size of messages in a relay transaction
ns.MaxTxSize = txSize * MB // in MB

maxMsgLength, err := cmd.Flags().GetString(flagMaxMsgLength)
if err != nil {
return ns, err
}

msgLen, err := strconv.ParseUint(maxMsgLength, 10, 64)
if err != nil {
return ns, err
}

// set max length messages in relay transaction
ns.MaxMsgLength = msgLen

return ns, nil
default:
return strategy, nil
}
}
9 changes: 7 additions & 2 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,12 @@ func relayMsgsCmd() *cobra.Command {
return err
}

strategy := &relayer.NaiveStrategy{}
path := config.Paths.MustGet(args[0])
strategy, err := GetStrategyWithOptions(cmd, path.MustGetStrategy())
if err != nil {
return err
}

if err = strategy.RelayPacketsOrderedChan(c[src], c[dst], sp, sh); err != nil {
return err
}
Expand All @@ -209,7 +214,7 @@ func relayMsgsCmd() *cobra.Command {
},
}

return cmd
return strategyFlag(cmd)
}

func sendPacketCmd() *cobra.Command {
Expand Down
29 changes: 20 additions & 9 deletions relayer/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

var _ Strategy = &NaiveStrategy{}

// NewNaiveStrategy returns the proper config for the NaiveStrategy
func NewNaiveStrategy() *StrategyCfg {
return &StrategyCfg{
Type: (&NaiveStrategy{}).GetType(),
}
}

// NaiveStrategy is an implementation of Strategy
// NaiveStrategy is an implementation of Strategy.
type NaiveStrategy struct {
Ordered bool
Ordered bool
MaxTxSize uint64 // maximum permitted size of the msgs in a bundled relay transaction
MaxMsgLength uint64 // maximum amount of messages in a bundled relay transaction
}

// GetType implements Strategy
Expand All @@ -40,7 +44,7 @@ func (nrs *NaiveStrategy) UnrelayedSequencesUnordered(src, dst *Chain, sh *SyncH
func (nrs *NaiveStrategy) HandleEvents(src, dst *Chain, sh *SyncHeaders, events map[string][]string) {
rlyPackets, err := relayPacketsFromEventListener(src.PathEnd, dst.PathEnd, events)
if len(rlyPackets) > 0 && err == nil {
sendTxFromEventPackets(src, dst, rlyPackets, sh)
nrs.sendTxFromEventPackets(src, dst, rlyPackets, sh)
}
}

Expand Down Expand Up @@ -142,7 +146,7 @@ func relayPacketsFromEventListener(src, dst *PathEnd, events map[string][]string
return
}

func sendTxFromEventPackets(src, dst *Chain, rlyPackets []relayPacket, sh *SyncHeaders) {
func (nrs *NaiveStrategy) sendTxFromEventPackets(src, dst *Chain, rlyPackets []relayPacket, sh *SyncHeaders) {
// fetch the proofs for the relayPackets
for _, rp := range rlyPackets {
if err := rp.FetchCommitResponse(src, dst, sh); err != nil {
Expand All @@ -159,7 +163,9 @@ func sendTxFromEventPackets(src, dst *Chain, rlyPackets []relayPacket, sh *SyncH
Src: []sdk.Msg{
src.PathEnd.UpdateClient(sh.GetHeader(dst.ChainID), src.MustGetAddress()),
},
Dst: []sdk.Msg{},
Dst: []sdk.Msg{},
MaxTxSize: nrs.MaxTxSize,
MaxMsgLength: nrs.MaxMsgLength,
}

// add the packet msgs to RelayPackets
Expand All @@ -170,6 +176,7 @@ func sendTxFromEventPackets(src, dst *Chain, rlyPackets []relayPacket, sh *SyncH
if txs.Send(src, dst); !txs.success {
return fmt.Errorf("failed to send packets")
}

return nil
}); err != nil {
src.Error(err)
Expand All @@ -184,10 +191,14 @@ func (nrs *NaiveStrategy) RelayPacketsUnorderedChan(src, dst *Chain, sp *RelaySe

// RelayPacketsOrderedChan creates transactions to clear both queues
// CONTRACT: the SyncHeaders passed in here must be up to date or being kept updated
func (*NaiveStrategy) RelayPacketsOrderedChan(src, dst *Chain, sp *RelaySequences, sh *SyncHeaders) error {

// create the appropriate update client messages
msgs := &RelayMsgs{Src: []sdk.Msg{}, Dst: []sdk.Msg{}}
func (nrs *NaiveStrategy) RelayPacketsOrderedChan(src, dst *Chain, sp *RelaySequences, sh *SyncHeaders) error {
// set the maximum relay transaction constraints
msgs := &RelayMsgs{
Src: []sdk.Msg{},
Dst: []sdk.Msg{},
MaxTxSize: nrs.MaxTxSize,
MaxMsgLength: nrs.MaxMsgLength,
}

// add messages for src -> dst
for _, seq := range sp.Src {
Expand Down
94 changes: 66 additions & 28 deletions relayer/relayMsgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
)

// RelayMsgs contains the msgs that need to be sent to both a src and dst chain
// after a given relay round
// after a given relay round. MaxTxSize and MaxMsgLength are ignored if they are
// set to zero.
type RelayMsgs struct {
Src []sdk.Msg
Dst []sdk.Msg
Src []sdk.Msg
Dst []sdk.Msg
MaxTxSize uint64 // maximum permitted size of the msgs in a bundled relay transaction
MaxMsgLength uint64 // maximum amount of messages in a bundled relay transaction

last bool
success bool
Expand All @@ -34,42 +37,77 @@ func (r *RelayMsgs) Success() bool {
return r.success
}

func (r *RelayMsgs) IsMaxTx(msgLen, txSize uint64) bool {
return (r.MaxMsgLength != 0 && msgLen > r.MaxMsgLength) ||
(r.MaxTxSize != 0 && txSize > r.MaxTxSize)
}

// Send sends the messages with appropriate output
// TODO: Parallelize? Maybe?
func (r *RelayMsgs) Send(src, dst *Chain) {
var failed = false
// TODO: maybe figure out a better way to indicate error here?

// TODO: Parallelize? Maybe?
if len(r.Src) > 0 {
// Submit the transactions to src chain
res, err := src.SendMsgs(r.Src)
if err != nil || res.Code != 0 {
src.LogFailedTx(res, err, r.Src)
failed = true
} else {
// NOTE: Add more data to this such as identifiers
src.LogSuccessTx(res, r.Src)
var msgLen, txSize uint64
var msgs []sdk.Msg

r.success = true

// submit batches of relay transactions
for _, msg := range r.Src {
msgLen++
txSize += uint64(len(msg.GetSignBytes()))

if r.IsMaxTx(msgLen, txSize) {
// Submit the transactions to src chain and update its status
r.success = r.success && send(src, msgs)

// clear the current batch and reset variables
msgLen, txSize = 1, uint64(len(msg.GetSignBytes()))
msgs = []sdk.Msg{}
}
msgs = append(msgs, msg)
}

// submit leftover msgs
if len(msgs) > 0 && !send(src, msgs) {
r.success = false
}

if len(r.Dst) > 0 {
// Submit the transactions to dst chain
res, err := dst.SendMsgs(r.Dst)
if err != nil || res.Code != 0 {
dst.LogFailedTx(res, err, r.Dst)
failed = true
} else {
// NOTE: Add more data to this such as identifiers
dst.LogSuccessTx(res, r.Dst)
// reset variables
msgLen, txSize = 0, 0
msgs = []sdk.Msg{}

for _, msg := range r.Dst {
msgLen++
txSize += uint64(len(msg.GetSignBytes()))

if r.IsMaxTx(msgLen, txSize) {
// Submit the transaction to dst chain and update its status
r.success = r.success && send(dst, msgs)

// clear the current batch and reset variables
msgLen, txSize = 1, uint64(len(msg.GetSignBytes()))
msgs = []sdk.Msg{}
}
msgs = append(msgs, msg)
}

if failed {
// submit leftover msgs
if len(msgs) > 0 && !send(dst, msgs) {
r.success = false
return
}
r.success = true
}

// Submits the messages to the provided chain and logs the result of the transaction.
// Returns true upon success and false otherwise.
func send(chain *Chain, msgs []sdk.Msg) bool {
res, err := chain.SendMsgs(msgs)
if err != nil || res.Code != 0 {
chain.LogFailedTx(res, err, msgs)
return false
} else {
// NOTE: Add more data to this such as identifiers
chain.LogSuccessTx(res, msgs)
}
return true
}

func getMsgAction(msgs []sdk.Msg) string {
Expand Down

0 comments on commit 39360aa

Please sign in to comment.