Skip to content

Commit

Permalink
feat: add flush unreceived ack logic
Browse files Browse the repository at this point in the history
  • Loading branch information
viveksharmapoudel committed Sep 11, 2023
1 parent 7fff473 commit 7e10932
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 77 deletions.
3 changes: 2 additions & 1 deletion relayer/chains/cosmos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,9 +1175,10 @@ func (ap *CosmosProvider) QueryClientPrevConsensusStateHeight(ctx context.Contex
panic("QueryClientPrevConsensusStateHeight not implemented")
}

func (ap *CosmosProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) {
func (ap *CosmosProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) {
panic("QuerySendPacketByHeight not implemented")

Check warning on line 1179 in relayer/chains/cosmos/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/cosmos/query.go#L1178-L1179

Added lines #L1178 - L1179 were not covered by tests
}

func (ap *CosmosProvider) QueryPacketHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights provider.MessageHeights, err error) {
panic("QueryPacketHeights not implemented")

Check warning on line 1183 in relayer/chains/cosmos/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/cosmos/query.go#L1182-L1183

Added lines #L1182 - L1183 were not covered by tests
}
Expand Down
12 changes: 10 additions & 2 deletions relayer/chains/icon/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,15 @@ func (icp *IconProvider) QueryMessageHeights(ctx context.Context, methodName str
return packetHeights, nil

Check warning on line 850 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L850

Added line #L850 was not covered by tests
}

func (ap *IconProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) {
func (ap *IconProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType string, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) {
var eventName = ""
switch eventType {
case chantypes.EventTypeSendPacket:
eventName = EventTypeSendPacket
case chantypes.EventTypeWriteAck:
eventName = EventTypeWriteAcknowledgement

Check warning on line 859 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L853-L859

Added lines #L853 - L859 were not covered by tests
}

block, err := ap.client.GetBlockByHeight(&types.BlockHeightParam{
Height: types.NewHexInt(int64(seqHeight)),
})
Expand All @@ -870,7 +878,7 @@ func (ap *IconProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID,
if el.Addr != types.Address(ap.PCfg.IbcHandlerAddress) &&
// sendPacket will be of index length 2
len(el.Indexed) != 2 &&
el.Indexed[0] != EventTypeSendPacket {
el.Indexed[0] != eventName {
continue

Check warning on line 882 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L869-L882

Added lines #L869 - L882 were not covered by tests
}
packetStr := el.Indexed[1]
Expand Down
3 changes: 2 additions & 1 deletion relayer/chains/penumbra/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,9 +989,10 @@ func (cc *PenumbraProvider) QueryClientPrevConsensusStateHeight(ctx context.Cont
panic("QueryClientPrevConsensusStateHeight not implemented")
}

func (ap *PenumbraProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) {
func (ap *PenumbraProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType, srcChanID, srcPortID string, sequence uint64, height uint64) (provider.PacketInfo, error) {
panic("QuerySendPacketByHeight not implemented")
}

func (ap *PenumbraProvider) QueryPacketHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights provider.MessageHeights, err error) {
panic("QueryPacketHeights not implemented")
}
Expand Down
6 changes: 5 additions & 1 deletion relayer/chains/wasm/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ func (ap *WasmProvider) QueryAckHeights(ctx context.Context, latestHeight int64,
return ap.QueryMessageHeights(ctx, MethodGetAckHeights, latestHeight, channelId, portId, startSeq, endSeq)

Check warning on line 974 in relayer/chains/wasm/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/query.go#L973-L974

Added lines #L973 - L974 were not covered by tests
}

func (ap *WasmProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) {
func (ap *WasmProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) {

h := int64(seqHeight)
blockRes, err := ap.RPCClient.BlockResults(ctx, &h)
Expand All @@ -990,6 +990,10 @@ func (ap *WasmProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID,
}
messages := ibcMessagesFromEvents(ap.log, tx.Events, ap.ChainId(), seqHeight, ap.PCfg.IbcHandlerAddress, base64Encoded)
for _, m := range messages {
// in case eventtype donot match
if m.eventType != eventType {
continue

Check warning on line 995 in relayer/chains/wasm/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/query.go#L991-L995

Added lines #L991 - L995 were not covered by tests
}
switch t := m.info.(type) {
case *packetInfo:
packet := provider.PacketInfo(*t)
Expand Down
141 changes: 71 additions & 70 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,7 +1815,7 @@ func (pp *PathProcessor) queuePendingRecvAndAcksByHeights(
seq := seq

eg.Go(func() error {
sendPacket, err := src.chainProvider.QuerySendPacketByHeight(ctx, k.ChannelID, k.PortID, seq, seqHeight)
sendPacket, err := src.chainProvider.QueryPacketMessageByEventHeight(ctx, chantypes.EventTypeSendPacket, k.ChannelID, k.PortID, seq, seqHeight)
if err != nil {
return err
}
Expand Down Expand Up @@ -1853,75 +1853,76 @@ func (pp *PathProcessor) queuePendingRecvAndAcksByHeights(
)
}

Check warning on line 1854 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1843-L1854

Added lines #L1843 - L1854 were not covered by tests

// TODO: for ackedMessage
// var unacked []uint64

// SeqLoop:
// for _, seq := range seqs {
// for _, unrecvSeq := range unrecv {
// if seq == unrecvSeq {
// continue SeqLoop
// }
// }
// // does not exist in unrecv, so this is an ack that must be written
// unacked = append(unacked, seq)
// }

// for i, seq := range unacked {
// dstMu.Lock()
// ck := k.Counterparty()
// if dstCache.IsCached(chantypes.EventTypeRecvPacket, ck, seq) &&
// dstCache.IsCached(chantypes.EventTypeWriteAck, ck, seq) {
// continue // already cached
// }
// dstMu.Unlock()

// if i >= maxPacketsPerFlush {
// skipped = true
// break
// }

// seq := seq

// dst.log.Debug("Querying recv packet",
// zap.String("channel", k.CounterpartyChannelID),
// zap.String("port", k.CounterpartyPortID),
// zap.Uint64("sequence", seq),
// )

// eg.Go(func() error {
// recvPacket, err := dst.chainProvider.QueryRecvPacket(ctx, k.CounterpartyChannelID, k.CounterpartyPortID, seq)
// if err != nil {
// return err
// }

// ck := k.Counterparty()
// dstMu.Lock()
// dstCache.Cache(chantypes.EventTypeRecvPacket, ck, seq, recvPacket)
// dstCache.Cache(chantypes.EventTypeWriteAck, ck, seq, recvPacket)
// dstMu.Unlock()

// return nil
// })
// }

// if err := eg.Wait(); err != nil {
// return false, err
// }

// if len(unacked) > 0 {
// dst.log.Debug(
// "Will flush MsgAcknowledgement",
// zap.Object("channel", k),
// zap.Uint64s("sequences", unacked),
// )
// } else {
// dst.log.Debug(
// "No MsgAcknowledgement to flush",
// zap.String("channel", k.CounterpartyChannelID),
// zap.String("port", k.CounterpartyPortID),
// )
// }
var unacked []uint64

ackHeights, err := dst.chainProvider.QueryAckHeights(ctx, int64(dst.latestBlock.Height), dstChan, dstPort, packetHeights.StartSeq, packetHeights.EndSeq)
if err != nil {
return false, err
}

Check warning on line 1861 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1856-L1861

Added lines #L1856 - L1861 were not covered by tests

counter := 0
for seq, height := range ackHeights {

// Is packetHeights is present then Ack not received
// not present means ack already received
_, ok := packetHeights.MessageHeights[seq]
if !ok {
continue

Check warning on line 1870 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1863-L1870

Added lines #L1863 - L1870 were not covered by tests
}

dstMu.Lock()
ck := k.Counterparty()
if dstCache.IsCached(chantypes.EventTypeWriteAck, ck, seq) {
continue // already cached

Check warning on line 1876 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1873-L1876

Added lines #L1873 - L1876 were not covered by tests
}
dstMu.Unlock()

if counter >= maxPacketsPerFlush {
skipped = true
break

Check warning on line 1882 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1878-L1882

Added lines #L1878 - L1882 were not covered by tests
}

seq := seq

dst.log.Debug("Querying write Ack",
zap.String("channel", k.CounterpartyChannelID),
zap.String("port", k.CounterpartyPortID),
zap.Uint64("sequence", seq),
)

eg.Go(func() error {
AckPacket, err := dst.chainProvider.QueryPacketMessageByEventHeight(ctx, chantypes.EventTypeWriteAck, k.CounterpartyChannelID, k.CounterpartyPortID, seq, height)
if err != nil {
return err
}

Check warning on line 1897 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1885-L1897

Added lines #L1885 - L1897 were not covered by tests

ck := k.Counterparty()
dstMu.Lock()
dstCache.Cache(chantypes.EventTypeWriteAck, ck, seq, AckPacket)
dstMu.Unlock()

return nil

Check warning on line 1904 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1899-L1904

Added lines #L1899 - L1904 were not covered by tests
})
counter++

Check warning on line 1906 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1906

Added line #L1906 was not covered by tests
}

if err := eg.Wait(); err != nil {
return false, err
}

Check warning on line 1911 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1909-L1911

Added lines #L1909 - L1911 were not covered by tests

if len(unacked) > 0 {
dst.log.Debug(
"Will flush MsgAcknowledgement",
zap.Object("channel", k),
zap.Uint64s("sequences", unacked),
)
} else {
dst.log.Debug(
"No MsgAcknowledgement to flush",
zap.String("channel", k.CounterpartyChannelID),
zap.String("port", k.CounterpartyPortID),
)
}

Check warning on line 1925 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1913-L1925

Added lines #L1913 - L1925 were not covered by tests

return !skipped, nil

Check warning on line 1927 in relayer/processor/path_processor_internal.go

View check run for this annotation

Codecov / codecov/patch

relayer/processor/path_processor_internal.go#L1927

Added line #L1927 was not covered by tests
}
3 changes: 1 addition & 2 deletions relayer/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,7 @@ type QueryProvider interface {
// query packet info for sequence
QuerySendPacket(ctx context.Context, srcChanID, srcPortID string, sequence uint64) (PacketInfo, error)
QueryRecvPacket(ctx context.Context, dstChanID, dstPortID string, sequence uint64) (PacketInfo, error)
QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (PacketInfo, error)

QueryPacketMessageByEventHeight(ctx context.Context, eventType string, srcChanID, srcPortID string, sequence uint64, height uint64) (PacketInfo, error)
// bank
QueryBalance(ctx context.Context, keyName string) (sdk.Coins, error)
QueryBalanceWithAddress(ctx context.Context, addr string) (sdk.Coins, error)
Expand Down

0 comments on commit 7e10932

Please sign in to comment.