Skip to content

Commit

Permalink
fix: handle mempool error (#222)
Browse files Browse the repository at this point in the history
* add: sleep when there is mempool error

* rf: use retry delay rather sleep

* rf: unlimited retry for update client
  • Loading branch information
debendraoli authored Jul 10, 2024
1 parent 478dea5 commit fef22fd
Showing 1 changed file with 8 additions and 18 deletions.
26 changes: 8 additions & 18 deletions relayer/chains/wasm/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
rtyAttNum = uint(5)
rtyAtt = retry.Attempts(rtyAttNum)
rtyDel = retry.Delay(time.Millisecond * 400)
memPoolDel = retry.Delay(time.Second * 30)
rtyErr = retry.LastErrorOnly(true)
numRegex = regexp.MustCompile("[0-9]+")
defaultBroadcastWaitTimeout = 10 * time.Minute
Expand Down Expand Up @@ -151,7 +152,6 @@ func (pc *WasmProviderConfig) SignMode() signing.SignMode {
}

func (ap *WasmProvider) NewClientState(dstChainID string, dstIBCHeader provider.IBCHeader, dstTrustingPeriod, dstUbdPeriod time.Duration, allowUpdateAfterExpiry, allowUpdateAfterMisbehaviour bool) (ibcexported.ClientState, error) {

return &itm.ClientState{
ChainId: dstChainID,
TrustLevel: &itm.Fraction{Numerator: light.DefaultTrustLevel.Numerator, Denominator: light.DefaultTrustLevel.Denominator},
Expand Down Expand Up @@ -198,7 +198,6 @@ func (ap *WasmProvider) MsgSubmitMisbehaviour(clientID string, misbehaviour ibce
}

func (ap *WasmProvider) ValidatePacket(msgTransfer provider.PacketInfo, latest provider.LatestBlock) error {

if msgTransfer.Sequence == 0 {
return errors.New("refusing to relay packet with sequence: 0")
}
Expand All @@ -215,7 +214,6 @@ func (ap *WasmProvider) ValidatePacket(msgTransfer provider.PacketInfo, latest p
revisionNumber := 0
latestClientTypesHeight := clienttypes.NewHeight(uint64(revisionNumber), latest.Height)
if !msgTransfer.TimeoutHeight.IsZero() && latestClientTypesHeight.GTE(msgTransfer.TimeoutHeight) {

return provider.NewTimeoutHeightError(latest.Height, msgTransfer.TimeoutHeight.RevisionHeight)
}
// latestTimestamp := uint64(latest.Time.UnixNano())
Expand All @@ -230,7 +228,6 @@ func (ap *WasmProvider) PacketCommitment(ctx context.Context, msgTransfer provid
packetCommitmentResponse, err := ap.QueryPacketCommitment(
ctx, int64(height), msgTransfer.SourceChannel, msgTransfer.SourcePort, msgTransfer.Sequence,
)

if err != nil {
return provider.PacketProof{}, err
}
Expand All @@ -252,9 +249,7 @@ func (ap *WasmProvider) PacketAcknowledgement(ctx context.Context, msgRecvPacket
}

func (ap *WasmProvider) PacketReceipt(ctx context.Context, msgTransfer provider.PacketInfo, height uint64) (provider.PacketProof, error) {

packetReceiptResponse, err := ap.QueryPacketReceipt(ctx, int64(height), msgTransfer.DestChannel, msgTransfer.DestPort, msgTransfer.Sequence)

if err != nil {
return provider.PacketProof{}, err
}
Expand Down Expand Up @@ -308,7 +303,6 @@ func (ap *WasmProvider) MsgAcknowledgement(msgRecvPacket provider.PacketInfo, pr
Signer: signer,
}
return ap.NewWasmContractMessage(MethodAcknowledgePacket, params)

}

func (ap *WasmProvider) MsgTimeout(msgTransfer provider.PacketInfo, proof provider.PacketProof) (provider.RelayerMessage, error) {
Expand Down Expand Up @@ -381,7 +375,6 @@ func (ap *WasmProvider) MsgConnectionOpenInit(info provider.ConnectionInfo, proo
}

return ap.NewWasmContractMessage(MethodConnectionOpenInit, params)

}

func (ap *WasmProvider) MsgConnectionOpenTry(msgOpenInit provider.ConnectionInfo, proof provider.ConnectionProof) (provider.RelayerMessage, error) {
Expand Down Expand Up @@ -416,7 +409,6 @@ func (ap *WasmProvider) MsgConnectionOpenTry(msgOpenInit provider.ConnectionInfo
}

return ap.NewWasmContractMessage(MethodConnectionOpenTry, params)

}

func (ap *WasmProvider) MsgConnectionOpenAck(msgOpenTry provider.ConnectionInfo, proof provider.ConnectionProof) (provider.RelayerMessage, error) {
Expand Down Expand Up @@ -633,7 +625,6 @@ func (ap *WasmProvider) MsgUpdateClient(clientID string, dstHeader ibcexported.C
}

return ap.NewWasmContractMessage(MethodUpdateClient, params)

}

func (ap *WasmProvider) QueryICQWithProof(ctx context.Context, msgType string, request []byte, height uint64) (provider.ICQProof, error) {
Expand Down Expand Up @@ -760,7 +751,6 @@ func (ap *WasmProvider) SendCustomMessage(ctx context.Context, contract string,
}

return rlyResp, true, callbackErr

}

func (ap *WasmProvider) SendTransactionCosmWasm(
Expand Down Expand Up @@ -845,14 +835,20 @@ func (ap *WasmProvider) SendMessagesToMempool(
}

if msg.Type() == MethodUpdateClient {
delay := retry.Delay(time.Millisecond * time.Duration(ap.PCfg.BlockInterval))
retryAttempt := rtyAtt
if err := retry.Do(func() error {
if err := ap.BroadcastTx(cliCtx, txBytes, []provider.RelayerMessage{msg}, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback, true); err != nil {
if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) {
ap.handleAccountSequenceMismatchError(err)
} else if strings.Contains(err.Error(), sdkerrors.ErrMempoolIsFull.Error()) {
ap.log.Info("Mempool is full, retrying later with increased delay")
delay = memPoolDel
retryAttempt = retry.Attempts(0)
}
}
return err
}, retry.Context(ctx), rtyAtt, retry.Delay(time.Millisecond*time.Duration(ap.PCfg.BlockInterval)), rtyErr); err != nil {
}, retry.Context(ctx), retryAttempt, delay, rtyErr); err != nil {
ap.log.Error("Failed to update client", zap.Any("Message", msg))
return err
}
Expand All @@ -868,11 +864,9 @@ func (ap *WasmProvider) SendMessagesToMempool(
}

return nil

}

func (ap *WasmProvider) LogFailedTx(res *provider.RelayerTxResponse, err error, msgs []provider.RelayerMessage) {

fields := []zapcore.Field{zap.String("chain_id", ap.ChainId())}
// if res != nil {
// channels := getChannelsIfPresent(res.Events)
Expand Down Expand Up @@ -945,7 +939,6 @@ func (ap *WasmProvider) LogSuccessTx(res *sdk.TxResponse, msgs []provider.Relaye
"Successful transaction",
fields...,
)

}

// getFeePayer returns the bech32 address of the fee payer of a transaction.
Expand All @@ -968,7 +961,6 @@ func getFeePayer(tx *txtypes.Tx) string {
default:
return firstMsg.GetSigners()[0].String()
}

}

func (ap *WasmProvider) sdkError(codespace string, code uint32) error {
Expand Down Expand Up @@ -1361,10 +1353,8 @@ func (cc *WasmProvider) QueryABCI(ctx context.Context, req abci.RequestQuery) (a
}

func (cc *WasmProvider) handleAccountSequenceMismatchError(err error) {

clientCtx := cc.ClientContext()
_, seq, err := cc.ClientCtx.AccountRetriever.GetAccountNumberSequence(clientCtx, clientCtx.GetFromAddress())

// sequences := numRegex.FindAllString(err.Error(), -1)
// if len(sequences) != 2 {
// return
Expand Down

0 comments on commit fef22fd

Please sign in to comment.