From a76297ca640e4ebbe0c0f63adf813790578281bc Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Thu, 27 Jul 2023 16:16:13 +0545 Subject: [PATCH] fix: packet retention not clear (#116) * fix: packet retention not clear * fix: set key to counterpart * fix: handle tx archway if updateClient gives error * fix: refractor * fix: for write ack * chore: remove unnecessary block of code * fix: handle fixes for packet timeout --------- Co-authored-by: izyak --- relayer/chains/icon/icon_chain_processor.go | 2 +- relayer/chains/icon/query.go | 9 +++-- relayer/chains/icon/tx.go | 11 +++--- relayer/chains/wasm/query.go | 6 ++-- relayer/chains/wasm/tx.go | 12 +++---- relayer/chains/wasm/wasm_chain_processor.go | 6 ++++ relayer/common/const.go | 7 ++-- relayer/common/identifier.go | 7 ++++ relayer/processor/message_processor.go | 1 - relayer/processor/path_end_runtime.go | 10 +++++- relayer/processor/path_processor_internal.go | 36 ++------------------ relayer/processor/types.go | 4 +-- 12 files changed, 50 insertions(+), 61 deletions(-) create mode 100644 relayer/common/identifier.go diff --git a/relayer/chains/icon/icon_chain_processor.go b/relayer/chains/icon/icon_chain_processor.go index 9dbd36e72..66c98eb07 100644 --- a/relayer/chains/icon/icon_chain_processor.go +++ b/relayer/chains/icon/icon_chain_processor.go @@ -351,7 +351,7 @@ loop: icp.log.Info("Queried Latest height: ", zap.String("chain id ", icp.chainProvider.ChainId()), zap.Int64("height", br.Height)) - err = icp.handlePathProcessorUpdate(ctx, br.Header, ibcMessageCache, ibcHeaderCache) + err = icp.handlePathProcessorUpdate(ctx, br.Header, ibcMessageCache, ibcHeaderCache.Clone()) if err != nil { reconnect() icp.log.Warn("Reconnect: error occured during handle block response ", diff --git a/relayer/chains/icon/query.go b/relayer/chains/icon/query.go index 6da6b5351..b9d5fe453 100644 --- a/relayer/chains/icon/query.go +++ b/relayer/chains/icon/query.go @@ -38,8 +38,7 @@ import ( var _ provider.QueryProvider = &IconProvider{} const ( - epoch = 24 * 3600 * 1000 - clientNameString = "07-tendermint-" + epoch = 24 * 3600 * 1000 ) type CallParamOption func(*types.CallParam) @@ -318,7 +317,7 @@ func (icp *IconProvider) QueryClients(ctx context.Context) (clienttypes.Identifi identifiedClientStates := make(clienttypes.IdentifiedClientStates, 0) for i := 0; i <= int(seq)-1; i++ { - clientIdentifier := fmt.Sprintf("%s%d", clientNameString, i) + clientIdentifier := common.GetIdentifier(common.TendermintLightClient, i) callParams := icp.prepareCallParams(MethodGetClientState, map[string]interface{}{ "clientId": clientIdentifier, }) @@ -407,7 +406,7 @@ func (icp *IconProvider) QueryConnections(ctx context.Context) (conns []*conntyp } for i := 0; i <= int(nextSeq)-1; i++ { - connectionId := fmt.Sprintf("connection-%d", i) + connectionId := common.GetIdentifier(common.ConnectionKey, i) var conn_string_ types.HexBytes err := icp.client.Call(icp.prepareCallParams(MethodGetConnection, map[string]interface{}{ "connectionId": connectionId, @@ -609,7 +608,7 @@ func (icp *IconProvider) QueryChannels(ctx context.Context) ([]*chantypes.Identi for i := 0; i <= int(nextSeq)-1; i++ { for _, portId := range allPorts { - channelId := fmt.Sprintf("channel-%d", i) + channelId := common.GetIdentifier(common.ChannelKey, i) var _channel types.HexBytes err := icp.client.Call(icp.prepareCallParams(MethodGetChannel, map[string]interface{}{ "channelId": channelId, diff --git a/relayer/chains/icon/tx.go b/relayer/chains/icon/tx.go index 61a23f52f..9744bef03 100644 --- a/relayer/chains/icon/tx.go +++ b/relayer/chains/icon/tx.go @@ -738,7 +738,7 @@ func (icp *IconProvider) SendIconTransaction( // If update fails, the subsequent txn will fail, result of update not being fetched concurrently switch m.Method { case MethodUpdateClient: - icp.WaitForTxResult(asyncCtx, txhash, m.Method, defaultBroadcastWaitTimeout, asyncCallback) + return icp.WaitForTxResult(asyncCtx, txhash, m.Method, defaultBroadcastWaitTimeout, asyncCallback) default: go icp.WaitForTxResult(asyncCtx, txhash, m.Method, defaultBroadcastWaitTimeout, asyncCallback) } @@ -753,7 +753,7 @@ func (icp *IconProvider) WaitForTxResult( method string, timeout time.Duration, callback func(*provider.RelayerTxResponse, error), -) { +) error { txhash := types.NewHexBytes(txHash) _, txRes, err := icp.client.WaitForResults(asyncCtx, &types.TransactionHashParam{Hash: txhash}) if err != nil { @@ -761,12 +761,12 @@ func (icp *IconProvider) WaitForTxResult( if callback != nil { callback(nil, err) } - return + return err } height, err := txRes.BlockHeight.Value() if err != nil { - return + return err } var eventLogs []provider.RelayerEvent @@ -787,7 +787,7 @@ func (icp *IconProvider) WaitForTxResult( callback(nil, err) } icp.LogFailedTx(method, txRes, err) - return + return err } @@ -803,6 +803,7 @@ func (icp *IconProvider) WaitForTxResult( } // log successful txn icp.LogSuccessTx(method, txRes) + return nil } func (icp *IconProvider) LogSuccessTx(method string, result *types.TransactionResult) { diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 47e12f9b4..460d07064 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -454,7 +454,7 @@ func (ap *WasmProvider) QueryClients(ctx context.Context) (clienttypes.Identifie identifiedClientStates := make(clienttypes.IdentifiedClientStates, 0) for i := 0; i <= int(seq)-1; i++ { - clientIdentifier := fmt.Sprintf("%s-%d", ClientPrefix, i) + clientIdentifier := common.GetIdentifier(common.IconLightClient, i) clientState, err := ap.QueryClientStateContract(ctx, clientIdentifier) if err != nil { return nil, err @@ -545,7 +545,7 @@ func (ap *WasmProvider) QueryConnections(ctx context.Context) (conns []*conntype } for i := 0; i <= int(seq)-1; i++ { - connectionId := fmt.Sprintf("%s-%d", ConnectionPrefix, i) + connectionId := common.GetIdentifier(common.ConnectionKey, i) conn, err := ap.QueryConnectionContract(ctx, connectionId) if err != nil { continue @@ -664,7 +664,7 @@ func (ap *WasmProvider) QueryChannels(ctx context.Context) ([]*chantypes.Identif for i := 0; i <= int(nextSeq)-1; i++ { for _, portId := range allPorts { - channelId := fmt.Sprintf("%s-%d", ChannelPrefix, i) + channelId := common.GetIdentifier(common.ChannelKey, i) channel, err := ap.QueryChannelContract(ctx, portId, channelId) if err != nil { continue diff --git a/relayer/chains/wasm/tx.go b/relayer/chains/wasm/tx.go index a461a7615..6d4227ee9 100644 --- a/relayer/chains/wasm/tx.go +++ b/relayer/chains/wasm/tx.go @@ -332,13 +332,11 @@ func (ap *WasmProvider) MsgTimeout(msgTransfer provider.PacketInfo, proof provid func (ap *WasmProvider) MsgTimeoutRequest(msgTransfer provider.PacketInfo, proof provider.PacketProof) (provider.RelayerMessage, error) { panic(fmt.Sprintf("%s%s", ap.ChainName(), NOT_IMPLEMENTED)) - return nil, fmt.Errorf("MsgTimeoutRequest Not implemented for Wasm module") } // panic func (ap *WasmProvider) MsgTimeoutOnClose(msgTransfer provider.PacketInfo, proofUnreceived provider.PacketProof) (provider.RelayerMessage, error) { panic(fmt.Sprintf("%s%s", ap.ChainName(), NOT_IMPLEMENTED)) - return nil, nil } func (ap *WasmProvider) ConnectionHandshakeProof(ctx context.Context, msgOpenInit provider.ConnectionInfo, height uint64) (provider.ConnectionProof, error) { @@ -1013,8 +1011,7 @@ func (ap *WasmProvider) BroadcastTx( ) if shouldWait { - ap.waitForTx(asyncCtx, hexTx, msgs, asyncTimeout, asyncCallback) - return nil + return ap.waitForTx(asyncCtx, hexTx, msgs, asyncTimeout, asyncCallback) } go ap.waitForTx(asyncCtx, hexTx, msgs, asyncTimeout, asyncCallback) return nil @@ -1043,14 +1040,14 @@ func (ap *WasmProvider) waitForTx( msgs []provider.RelayerMessage, // used for logging only waitTimeout time.Duration, callback func(*provider.RelayerTxResponse, error), -) { +) error { res, err := ap.waitForTxResult(ctx, txHash, waitTimeout) if err != nil { ap.log.Error("Failed to wait for block inclusion", zap.Error(err)) if callback != nil { callback(nil, err) } - return + return err } rlyResp := &provider.RelayerTxResponse{ @@ -1076,13 +1073,14 @@ func (ap *WasmProvider) waitForTx( callback(nil, err) } ap.LogFailedTx(rlyResp, nil, msgs) - return + return err } if callback != nil { callback(rlyResp, nil) } ap.LogSuccessTx(res, msgs) + return nil } func (ap *WasmProvider) waitForTxResult( diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 5cce14d72..ef3f2680d 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -340,6 +340,12 @@ func (ccp *WasmChainProcessor) initializeChannelState(ctx context.Context) error CounterpartyChannelID: ch.Counterparty.ChannelId, CounterpartyPortID: ch.Counterparty.PortId, }] = ch.State == chantypes.OPEN + ccp.log.Info("Found channel", + zap.String("channelID", ch.ChannelId), + zap.String("Port id ", ch.PortId)) + zap.String("Counterparty Channel Id ", ch.Counterparty.ChannelId) + zap.String("Counterparty Port Id", ch.Counterparty.PortId) + } return nil } diff --git a/relayer/common/const.go b/relayer/common/const.go index c31af9396..73c7d6ba7 100644 --- a/relayer/common/const.go +++ b/relayer/common/const.go @@ -4,8 +4,11 @@ var ( EventTimeoutRequest = "TimeoutRequest(bytes)" IconModule = "icon" WasmModule = "wasm" - TendermintLightClient = "tendermint" + ArchwayModule = "archway" + TendermintLightClient = "07-tendermint" IconLightClient = "iconclient" + ConnectionKey = "connection" + ChannelKey = "channel" ONE_HOUR = 60 * 60 * 1000 - NanosecondRatio = 1000_000 + NanosecondRatio = 1000_000 ) diff --git a/relayer/common/identifier.go b/relayer/common/identifier.go new file mode 100644 index 000000000..810653844 --- /dev/null +++ b/relayer/common/identifier.go @@ -0,0 +1,7 @@ +package common + +import "fmt" + +func GetIdentifier(name string, i int) string { + return fmt.Sprintf("%s-%d", name, i) +} diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 31e3fe734..6aa0d93f6 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -92,7 +92,6 @@ func (mp *messageProcessor) processMessages( src, dst *pathEndRuntime, ) error { - fmt.Println("inside process Messages") // 2/3 rule enough_time_pass && context change in case of BTPBlock needsClientUpdate, err := mp.shouldUpdateClientNow(ctx, src, dst) if err != nil { diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 059da89c9..e270edced 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -535,10 +535,18 @@ func (pathEnd *pathEndRuntime) removePacketRetention( case chantypes.EventTypeRecvPacket: toDelete[eventType] = []uint64{sequence} toDeleteCounterparty[chantypes.EventTypeSendPacket] = []uint64{sequence} + case chantypes.EventTypeWriteAck: + toDelete[eventType] = []uint64{sequence} + case common.EventTimeoutRequest: + toDelete[eventType] = []uint64{sequence} + toDeleteCounterparty[chantypes.EventTypeSendPacket] = []uint64{sequence} case chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, chantypes.EventTypeTimeoutPacketOnClose: toDelete[eventType] = []uint64{sequence} toDeleteCounterparty[chantypes.EventTypeRecvPacket] = []uint64{sequence} + toDeleteCounterparty[chantypes.EventTypeWriteAck] = []uint64{sequence} + toDelete[chantypes.EventTypeAcknowledgePacket] = []uint64{sequence} toDelete[chantypes.EventTypeSendPacket] = []uint64{sequence} + toDeleteCounterparty[common.EventTimeoutRequest] = []uint64{sequence} } // delete in progress send for this specific message pathEnd.packetProcessing[k].deleteMessages(map[string][]uint64{ @@ -546,7 +554,7 @@ func (pathEnd *pathEndRuntime) removePacketRetention( }) // delete all packet flow retention history for this sequence pathEnd.messageCache.PacketFlow[k].DeleteMessages(toDelete) - counterparty.messageCache.PacketFlow[k].DeleteMessages(toDeleteCounterparty) + counterparty.messageCache.PacketFlow[k.Counterparty()].DeleteMessages(toDeleteCounterparty) } // shouldSendConnectionMessage determines if the connection handshake message should be sent now. diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 8324119cc..d66abc57d 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -209,18 +209,9 @@ func (pp *PathProcessor) unrelayedPacketFlowMessages( for seq, info := range pathEndPacketFlowMessages.DstMsgRecvPacket { deletePreInitIfMatches(info) toDeleteSrc[chantypes.EventTypeSendPacket] = append(toDeleteSrc[chantypes.EventTypeSendPacket], seq) + toDeleteDst[chantypes.EventTypeRecvPacket] = append(toDeleteDst[chantypes.EventTypeRecvPacket], seq) + } - // if len(info.Ack) == 0 { - // // have recv_packet but not write_acknowledgement yet. skip for now. - // continue - // } - // // msg is received by dst chain, but no ack yet. Need to relay ack from dst to src! - // ackMsg := packetIBCMessage{ - // eventType: chantypes.EventTypeAcknowledgePacket, - // info: info, - // } - // msgs = append(msgs, ackMsg) - // } processRemovals() @@ -234,7 +225,6 @@ func (pp *PathProcessor) unrelayedPacketFlowMessages( for seq, msgTimeoutRequest := range pathEndPacketFlowMessages.DstMsgRequestTimeout { toDeleteSrc[chantypes.EventTypeSendPacket] = append(toDeleteSrc[chantypes.EventTypeSendPacket], seq) - toDeleteDst[common.EventTimeoutRequest] = append(toDeleteDst[common.EventTimeoutRequest], seq) timeoutMsg := packetIBCMessage{ eventType: chantypes.EventTypeTimeoutPacket, info: msgTimeoutRequest, @@ -1000,28 +990,6 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func( pathEnd2ProcessRes := make([]pathEndPacketFlowResponse, len(channelPairs)) for i, pair := range channelPairs { - // Append acks into recv packet info if present - // pathEnd1DstMsgRecvPacket := - // for seq, ackInfo := range pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeWriteAck] { - // if recvPacketInfo, ok := pathEnd1DstMsgRecvPacket[seq]; ok { - // recvPacketInfo.Ack = ackInfo.Ack - // pathEnd1DstMsgRecvPacket[seq] = recvPacketInfo - // continue - // } - // pathEnd1DstMsgRecvPacket[seq] = ackInfo - - // } - - // pathEnd2DstMsgRecvPacket := pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeRecvPacket] - // for seq, ackInfo := range pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeWriteAck] { - // if recvPacketInfo, ok := pathEnd2DstMsgRecvPacket[seq]; ok { - // recvPacketInfo.Ack = ackInfo.Ack - // pathEnd2DstMsgRecvPacket[seq] = recvPacketInfo - // continue - // } - - // pathEnd2DstMsgRecvPacket[seq] = ackInfo - // } pathEnd1PacketFlowMessages := pathEndPacketFlowMessages{ Src: pp.pathEnd1, diff --git a/relayer/processor/types.go b/relayer/processor/types.go index 34f74779b..594e59f8a 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -587,9 +587,9 @@ func (c IBCHeaderCache) Prune(keep int) { // PacketInfoChannelKey returns the applicable ChannelKey for the chain based on the eventType. func PacketInfoChannelKey(eventType string, info provider.PacketInfo) (ChannelKey, error) { switch eventType { - case chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck: + case chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck, common.EventTimeoutRequest: return packetInfoChannelKey(info).Counterparty(), nil - case chantypes.EventTypeSendPacket, chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, chantypes.EventTypeTimeoutPacketOnClose, common.EventTimeoutRequest: + case chantypes.EventTypeSendPacket, chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, chantypes.EventTypeTimeoutPacketOnClose: return packetInfoChannelKey(info), nil } return ChannelKey{}, fmt.Errorf("eventType not expected for packetIBCMessage channelKey: %s", eventType)