Skip to content

Commit

Permalink
fix: packet retention not clear (icon-project#116)
Browse files Browse the repository at this point in the history
* fix: packet retention not clear

* fix: set key to counterpart

* fix: handle tx archway if updateClient gives error

* fix: refractor

* fix: for write ack

* chore: remove unnecessary block of code

* fix: handle fixes for packet timeout

---------

Co-authored-by: izyak <[email protected]>
  • Loading branch information
2 people authored and izyak committed Sep 7, 2023
1 parent 666c07e commit a76297c
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 61 deletions.
2 changes: 1 addition & 1 deletion relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ loop:
icp.log.Info("Queried Latest height: ",
zap.String("chain id ", icp.chainProvider.ChainId()),
zap.Int64("height", br.Height))
err = icp.handlePathProcessorUpdate(ctx, br.Header, ibcMessageCache, ibcHeaderCache)
err = icp.handlePathProcessorUpdate(ctx, br.Header, ibcMessageCache, ibcHeaderCache.Clone())
if err != nil {
reconnect()
icp.log.Warn("Reconnect: error occured during handle block response ",
Expand Down
9 changes: 4 additions & 5 deletions relayer/chains/icon/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ import (
var _ provider.QueryProvider = &IconProvider{}

const (
epoch = 24 * 3600 * 1000
clientNameString = "07-tendermint-"
epoch = 24 * 3600 * 1000
)

type CallParamOption func(*types.CallParam)
Expand Down Expand Up @@ -318,7 +317,7 @@ func (icp *IconProvider) QueryClients(ctx context.Context) (clienttypes.Identifi

identifiedClientStates := make(clienttypes.IdentifiedClientStates, 0)
for i := 0; i <= int(seq)-1; i++ {
clientIdentifier := fmt.Sprintf("%s%d", clientNameString, i)
clientIdentifier := common.GetIdentifier(common.TendermintLightClient, i)
callParams := icp.prepareCallParams(MethodGetClientState, map[string]interface{}{
"clientId": clientIdentifier,
})
Expand Down Expand Up @@ -407,7 +406,7 @@ func (icp *IconProvider) QueryConnections(ctx context.Context) (conns []*conntyp
}

for i := 0; i <= int(nextSeq)-1; i++ {
connectionId := fmt.Sprintf("connection-%d", i)
connectionId := common.GetIdentifier(common.ConnectionKey, i)
var conn_string_ types.HexBytes
err := icp.client.Call(icp.prepareCallParams(MethodGetConnection, map[string]interface{}{
"connectionId": connectionId,
Expand Down Expand Up @@ -609,7 +608,7 @@ func (icp *IconProvider) QueryChannels(ctx context.Context) ([]*chantypes.Identi

for i := 0; i <= int(nextSeq)-1; i++ {
for _, portId := range allPorts {
channelId := fmt.Sprintf("channel-%d", i)
channelId := common.GetIdentifier(common.ChannelKey, i)
var _channel types.HexBytes
err := icp.client.Call(icp.prepareCallParams(MethodGetChannel, map[string]interface{}{
"channelId": channelId,
Expand Down
11 changes: 6 additions & 5 deletions relayer/chains/icon/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ func (icp *IconProvider) SendIconTransaction(
// If update fails, the subsequent txn will fail, result of update not being fetched concurrently
switch m.Method {
case MethodUpdateClient:
icp.WaitForTxResult(asyncCtx, txhash, m.Method, defaultBroadcastWaitTimeout, asyncCallback)
return icp.WaitForTxResult(asyncCtx, txhash, m.Method, defaultBroadcastWaitTimeout, asyncCallback)
default:
go icp.WaitForTxResult(asyncCtx, txhash, m.Method, defaultBroadcastWaitTimeout, asyncCallback)
}
Expand All @@ -753,20 +753,20 @@ func (icp *IconProvider) WaitForTxResult(
method string,
timeout time.Duration,
callback func(*provider.RelayerTxResponse, error),
) {
) error {
txhash := types.NewHexBytes(txHash)
_, txRes, err := icp.client.WaitForResults(asyncCtx, &types.TransactionHashParam{Hash: txhash})
if err != nil {
icp.log.Error("Failed to get txn result", zap.String("txHash", string(txhash)), zap.String("method", method), zap.Error(err))
if callback != nil {
callback(nil, err)
}
return
return err
}

height, err := txRes.BlockHeight.Value()
if err != nil {
return
return err
}

var eventLogs []provider.RelayerEvent
Expand All @@ -787,7 +787,7 @@ func (icp *IconProvider) WaitForTxResult(
callback(nil, err)
}
icp.LogFailedTx(method, txRes, err)
return
return err

}

Expand All @@ -803,6 +803,7 @@ func (icp *IconProvider) WaitForTxResult(
}
// log successful txn
icp.LogSuccessTx(method, txRes)
return nil
}

func (icp *IconProvider) LogSuccessTx(method string, result *types.TransactionResult) {
Expand Down
6 changes: 3 additions & 3 deletions relayer/chains/wasm/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (ap *WasmProvider) QueryClients(ctx context.Context) (clienttypes.Identifie

identifiedClientStates := make(clienttypes.IdentifiedClientStates, 0)
for i := 0; i <= int(seq)-1; i++ {
clientIdentifier := fmt.Sprintf("%s-%d", ClientPrefix, i)
clientIdentifier := common.GetIdentifier(common.IconLightClient, i)
clientState, err := ap.QueryClientStateContract(ctx, clientIdentifier)
if err != nil {
return nil, err
Expand Down Expand Up @@ -545,7 +545,7 @@ func (ap *WasmProvider) QueryConnections(ctx context.Context) (conns []*conntype
}

for i := 0; i <= int(seq)-1; i++ {
connectionId := fmt.Sprintf("%s-%d", ConnectionPrefix, i)
connectionId := common.GetIdentifier(common.ConnectionKey, i)
conn, err := ap.QueryConnectionContract(ctx, connectionId)
if err != nil {
continue
Expand Down Expand Up @@ -664,7 +664,7 @@ func (ap *WasmProvider) QueryChannels(ctx context.Context) ([]*chantypes.Identif

for i := 0; i <= int(nextSeq)-1; i++ {
for _, portId := range allPorts {
channelId := fmt.Sprintf("%s-%d", ChannelPrefix, i)
channelId := common.GetIdentifier(common.ChannelKey, i)
channel, err := ap.QueryChannelContract(ctx, portId, channelId)
if err != nil {
continue
Expand Down
12 changes: 5 additions & 7 deletions relayer/chains/wasm/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,11 @@ func (ap *WasmProvider) MsgTimeout(msgTransfer provider.PacketInfo, proof provid

func (ap *WasmProvider) MsgTimeoutRequest(msgTransfer provider.PacketInfo, proof provider.PacketProof) (provider.RelayerMessage, error) {
panic(fmt.Sprintf("%s%s", ap.ChainName(), NOT_IMPLEMENTED))
return nil, fmt.Errorf("MsgTimeoutRequest Not implemented for Wasm module")
}

// panic
func (ap *WasmProvider) MsgTimeoutOnClose(msgTransfer provider.PacketInfo, proofUnreceived provider.PacketProof) (provider.RelayerMessage, error) {
panic(fmt.Sprintf("%s%s", ap.ChainName(), NOT_IMPLEMENTED))
return nil, nil
}

func (ap *WasmProvider) ConnectionHandshakeProof(ctx context.Context, msgOpenInit provider.ConnectionInfo, height uint64) (provider.ConnectionProof, error) {
Expand Down Expand Up @@ -1013,8 +1011,7 @@ func (ap *WasmProvider) BroadcastTx(
)

if shouldWait {
ap.waitForTx(asyncCtx, hexTx, msgs, asyncTimeout, asyncCallback)
return nil
return ap.waitForTx(asyncCtx, hexTx, msgs, asyncTimeout, asyncCallback)
}
go ap.waitForTx(asyncCtx, hexTx, msgs, asyncTimeout, asyncCallback)
return nil
Expand Down Expand Up @@ -1043,14 +1040,14 @@ func (ap *WasmProvider) waitForTx(
msgs []provider.RelayerMessage, // used for logging only
waitTimeout time.Duration,
callback func(*provider.RelayerTxResponse, error),
) {
) error {
res, err := ap.waitForTxResult(ctx, txHash, waitTimeout)
if err != nil {
ap.log.Error("Failed to wait for block inclusion", zap.Error(err))
if callback != nil {
callback(nil, err)
}
return
return err
}

rlyResp := &provider.RelayerTxResponse{
Expand All @@ -1076,13 +1073,14 @@ func (ap *WasmProvider) waitForTx(
callback(nil, err)
}
ap.LogFailedTx(rlyResp, nil, msgs)
return
return err
}

if callback != nil {
callback(rlyResp, nil)
}
ap.LogSuccessTx(res, msgs)
return nil
}

func (ap *WasmProvider) waitForTxResult(
Expand Down
6 changes: 6 additions & 0 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ func (ccp *WasmChainProcessor) initializeChannelState(ctx context.Context) error
CounterpartyChannelID: ch.Counterparty.ChannelId,
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN
ccp.log.Info("Found channel",
zap.String("channelID", ch.ChannelId),
zap.String("Port id ", ch.PortId))
zap.String("Counterparty Channel Id ", ch.Counterparty.ChannelId)
zap.String("Counterparty Port Id", ch.Counterparty.PortId)

}
return nil
}
Expand Down
7 changes: 5 additions & 2 deletions relayer/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ var (
EventTimeoutRequest = "TimeoutRequest(bytes)"
IconModule = "icon"
WasmModule = "wasm"
TendermintLightClient = "tendermint"
ArchwayModule = "archway"
TendermintLightClient = "07-tendermint"
IconLightClient = "iconclient"
ConnectionKey = "connection"
ChannelKey = "channel"
ONE_HOUR = 60 * 60 * 1000
NanosecondRatio = 1000_000
NanosecondRatio = 1000_000
)
7 changes: 7 additions & 0 deletions relayer/common/identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package common

import "fmt"

func GetIdentifier(name string, i int) string {
return fmt.Sprintf("%s-%d", name, i)
}
1 change: 0 additions & 1 deletion relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (mp *messageProcessor) processMessages(
src, dst *pathEndRuntime,
) error {

fmt.Println("inside process Messages")
// 2/3 rule enough_time_pass && context change in case of BTPBlock
needsClientUpdate, err := mp.shouldUpdateClientNow(ctx, src, dst)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,18 +535,26 @@ func (pathEnd *pathEndRuntime) removePacketRetention(
case chantypes.EventTypeRecvPacket:
toDelete[eventType] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeSendPacket] = []uint64{sequence}
case chantypes.EventTypeWriteAck:
toDelete[eventType] = []uint64{sequence}
case common.EventTimeoutRequest:
toDelete[eventType] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeSendPacket] = []uint64{sequence}
case chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, chantypes.EventTypeTimeoutPacketOnClose:
toDelete[eventType] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeRecvPacket] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeWriteAck] = []uint64{sequence}
toDelete[chantypes.EventTypeAcknowledgePacket] = []uint64{sequence}
toDelete[chantypes.EventTypeSendPacket] = []uint64{sequence}
toDeleteCounterparty[common.EventTimeoutRequest] = []uint64{sequence}
}
// delete in progress send for this specific message
pathEnd.packetProcessing[k].deleteMessages(map[string][]uint64{
eventType: {sequence},
})
// delete all packet flow retention history for this sequence
pathEnd.messageCache.PacketFlow[k].DeleteMessages(toDelete)
counterparty.messageCache.PacketFlow[k].DeleteMessages(toDeleteCounterparty)
counterparty.messageCache.PacketFlow[k.Counterparty()].DeleteMessages(toDeleteCounterparty)
}

// shouldSendConnectionMessage determines if the connection handshake message should be sent now.
Expand Down
36 changes: 2 additions & 34 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,9 @@ func (pp *PathProcessor) unrelayedPacketFlowMessages(
for seq, info := range pathEndPacketFlowMessages.DstMsgRecvPacket {
deletePreInitIfMatches(info)
toDeleteSrc[chantypes.EventTypeSendPacket] = append(toDeleteSrc[chantypes.EventTypeSendPacket], seq)
toDeleteDst[chantypes.EventTypeRecvPacket] = append(toDeleteDst[chantypes.EventTypeRecvPacket], seq)

}
// if len(info.Ack) == 0 {
// // have recv_packet but not write_acknowledgement yet. skip for now.
// continue
// }
// // msg is received by dst chain, but no ack yet. Need to relay ack from dst to src!
// ackMsg := packetIBCMessage{
// eventType: chantypes.EventTypeAcknowledgePacket,
// info: info,
// }
// msgs = append(msgs, ackMsg)
// }

processRemovals()

Expand All @@ -234,7 +225,6 @@ func (pp *PathProcessor) unrelayedPacketFlowMessages(

for seq, msgTimeoutRequest := range pathEndPacketFlowMessages.DstMsgRequestTimeout {
toDeleteSrc[chantypes.EventTypeSendPacket] = append(toDeleteSrc[chantypes.EventTypeSendPacket], seq)
toDeleteDst[common.EventTimeoutRequest] = append(toDeleteDst[common.EventTimeoutRequest], seq)
timeoutMsg := packetIBCMessage{
eventType: chantypes.EventTypeTimeoutPacket,
info: msgTimeoutRequest,
Expand Down Expand Up @@ -1000,28 +990,6 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func(
pathEnd2ProcessRes := make([]pathEndPacketFlowResponse, len(channelPairs))

for i, pair := range channelPairs {
// Append acks into recv packet info if present
// pathEnd1DstMsgRecvPacket :=
// for seq, ackInfo := range pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeWriteAck] {
// if recvPacketInfo, ok := pathEnd1DstMsgRecvPacket[seq]; ok {
// recvPacketInfo.Ack = ackInfo.Ack
// pathEnd1DstMsgRecvPacket[seq] = recvPacketInfo
// continue
// }
// pathEnd1DstMsgRecvPacket[seq] = ackInfo

// }

// pathEnd2DstMsgRecvPacket := pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeRecvPacket]
// for seq, ackInfo := range pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeWriteAck] {
// if recvPacketInfo, ok := pathEnd2DstMsgRecvPacket[seq]; ok {
// recvPacketInfo.Ack = ackInfo.Ack
// pathEnd2DstMsgRecvPacket[seq] = recvPacketInfo
// continue
// }

// pathEnd2DstMsgRecvPacket[seq] = ackInfo
// }

pathEnd1PacketFlowMessages := pathEndPacketFlowMessages{
Src: pp.pathEnd1,
Expand Down
4 changes: 2 additions & 2 deletions relayer/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,9 @@ func (c IBCHeaderCache) Prune(keep int) {
// PacketInfoChannelKey returns the applicable ChannelKey for the chain based on the eventType.
func PacketInfoChannelKey(eventType string, info provider.PacketInfo) (ChannelKey, error) {
switch eventType {
case chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck:
case chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck, common.EventTimeoutRequest:
return packetInfoChannelKey(info).Counterparty(), nil
case chantypes.EventTypeSendPacket, chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, chantypes.EventTypeTimeoutPacketOnClose, common.EventTimeoutRequest:
case chantypes.EventTypeSendPacket, chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, chantypes.EventTypeTimeoutPacketOnClose:
return packetInfoChannelKey(info), nil
}
return ChannelKey{}, fmt.Errorf("eventType not expected for packetIBCMessage channelKey: %s", eventType)
Expand Down

0 comments on commit a76297c

Please sign in to comment.