diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 54410fdf2..4359550a0 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -47,8 +47,10 @@ func (pp *PathProcessor) getMessagesToSend( return m[i].info.Sequence < m[j].info.Sequence }) + mi := m[0].info + if e == chantypes.EventTypeRecvPacket { - dstChan, dstPort := m[0].info.DestChannel, m[0].info.DestPort + dstChan, dstPort := mi.DestChannel, mi.DestPort res, err := dst.chainProvider.QueryNextSeqRecv(ctx, 0, dstChan, dstPort) if err != nil { dst.log.Error("Failed to query next sequence recv", @@ -59,19 +61,29 @@ func (pp *PathProcessor) getMessagesToSend( return } - if m[0].info.Sequence != res.NextSequenceReceive { - dst.log.Error("Unexpected next sequence recv", + if mi.Sequence < res.NextSequenceReceive { + dst.log.Error("Unexpected next recv sequence", + zap.String("channel_id", dstChan), + zap.String("port_id", dstPort), + zap.Uint64("expected", res.NextSequenceReceive), + zap.Uint64("actual", mi.Sequence), + ) + return + } + + if mi.Sequence > res.NextSequenceReceive { + dst.log.Warn("Not yet ready to relay this recv sequence", zap.String("channel_id", dstChan), zap.String("port_id", dstPort), zap.Uint64("expected", res.NextSequenceReceive), - zap.Uint64("actual", m[0].info.Sequence), + zap.Uint64("actual", mi.Sequence), ) return } } if e == chantypes.EventTypeAcknowledgePacket { - srcChan, srcPort := m[0].info.SourceChannel, m[0].info.SourcePort + srcChan, srcPort := mi.SourceChannel, mi.SourcePort res, err := src.chainProvider.QueryNextSeqAck(ctx, 0, srcChan, srcPort) if err != nil { src.log.Error("Failed to query next sequence ack", @@ -82,12 +94,22 @@ func (pp *PathProcessor) getMessagesToSend( return } - if m[0].info.Sequence != res.NextSequenceReceive { - src.log.Error("Unexpected next sequence ack", + if mi.Sequence < res.NextSequenceReceive { + src.log.Error("Unexpected next ack sequence", + zap.String("channel_id", srcChan), + zap.String("port_id", srcPort), + zap.Uint64("expected", res.NextSequenceReceive), + zap.Uint64("actual", mi.Sequence), + ) + return + } + + if mi.Sequence > res.NextSequenceReceive { + src.log.Warn("Not yet ready to relay this ack sequence", zap.String("channel_id", srcChan), zap.String("port_id", srcPort), zap.Uint64("expected", res.NextSequenceReceive), - zap.Uint64("actual", m[0].info.Sequence), + zap.Uint64("actual", mi.Sequence), ) return } @@ -96,9 +118,9 @@ func (pp *PathProcessor) getMessagesToSend( for i, msg := range m { // only handle consecutive sequences on ordered channels if i > 0 && msg.info.Sequence-1 != m[i-1].info.Sequence { - dst.log.Error("Packets are not consecutive", - zap.String("channel_id", m[0].info.DestChannel), - zap.String("port_id", m[0].info.DestChannel), + dst.log.Warn("Skipping non-consecutive packet(s)", + zap.String("channel_id", mi.DestChannel), + zap.String("port_id", mi.DestChannel), zap.Uint64("seq", msg.info.Sequence), zap.Uint64("prior_seq", m[i-1].info.Sequence), )