diff --git a/cmd/query.go b/cmd/query.go index 15ccd743a..b6733663f 100644 --- a/cmd/query.go +++ b/cmd/query.go @@ -983,25 +983,29 @@ $ %s query unrelayed-pkts demo-path channel-0`, src, dst := path.Src.ChainID, path.Dst.ChainID - c, err := a.config.Chains.Gets(src, dst) + chains, err := a.config.Chains.Gets(src, dst) if err != nil { return err } - if err = c[src].SetPath(path.Src); err != nil { - return err + if chain, ok := chains[src]; ok { + if err = chain.SetPath(path.Src); err != nil { + return err + } } - if err = c[dst].SetPath(path.Dst); err != nil { - return err + if chain, ok := chains[dst]; ok { + if err = chain.SetPath(path.Dst); err != nil { + return err + } } channelID := args[1] - channel, err := relayer.QueryChannel(cmd.Context(), c[src], channelID) + channel, err := relayer.QueryChannel(cmd.Context(), chains[src], channelID) if err != nil { return err } - sp := relayer.UnrelayedSequences(cmd.Context(), c[src], c[dst], channel) + sp := relayer.UnrelayedSequences(cmd.Context(), chains[src], chains[dst], channel) out, err := json.Marshal(sp) if err != nil { diff --git a/relayer/processor/metrics.go b/relayer/processor/metrics.go index 1048eed1e..ed0850e86 100644 --- a/relayer/processor/metrics.go +++ b/relayer/processor/metrics.go @@ -1,6 +1,7 @@ package processor import ( + "strconv" "time" "github.com/prometheus/client_golang/prometheus" @@ -18,6 +19,7 @@ type PrometheusMetrics struct { BlockQueryFailure *prometheus.CounterVec ClientExpiration *prometheus.GaugeVec ClientTrustingPeriod *prometheus.GaugeVec + UnrelayedPackets *prometheus.GaugeVec } func (m *PrometheusMetrics) AddPacketsObserved(pathName, chain, channel, port, eventType string, count int) { @@ -56,6 +58,18 @@ func (m *PrometheusMetrics) IncTxFailure(pathName, chain, errDesc string) { m.TxFailureError.WithLabelValues(pathName, chain, errDesc).Inc() } +func sequenceToString(sequence []uint64) string { + var sequenceString string + for _, seq := range sequence { + sequenceString += strconv.FormatInt(int64(seq), 10) + } + return sequenceString +} + +func (m *PrometheusMetrics) SetUnrelayedPackets(pathName, srcChain, dstChain string, srcSequence []uint64, dstSequence []uint64, numPackets float64) { + m.UnrelayedPackets.WithLabelValues(pathName, srcChain, dstChain, sequenceToString(srcSequence), sequenceToString(dstSequence)).Set(numPackets) +} + func NewPrometheusMetrics() *PrometheusMetrics { packetLabels := []string{"path_name", "chain", "channel", "port", "type"} heightLabels := []string{"chain"} @@ -64,6 +78,7 @@ func NewPrometheusMetrics() *PrometheusMetrics { walletLabels := []string{"chain", "gas_price", "key", "address", "denom"} clientExpirationLables := []string{"path_name", "chain", "client_id", "trusting_period"} clientTrustingPeriodLables := []string{"path_name", "chain", "client_id"} + unrelayedLabels := []string{"path_name", "src_chain", "dst_chain", "src_sequence", "dst_sequence"} registry := prometheus.NewRegistry() registerer := promauto.With(registry) return &PrometheusMetrics{ @@ -104,5 +119,9 @@ func NewPrometheusMetrics() *PrometheusMetrics { Name: "cosmos_relayer_client_trusting_period_seconds", Help: "The trusting period (in seconds) of the client", }, clientTrustingPeriodLables), + UnrelayedPackets: registerer.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cosmos_relayer_unrelayed_packets", + Help: "The number of unrelayed packets between two chains", + }, unrelayedLabels), } } diff --git a/relayer/strategies.go b/relayer/strategies.go index fd2856aad..69672e458 100644 --- a/relayer/strategies.go +++ b/relayer/strategies.go @@ -103,7 +103,7 @@ func StartRelayer( src, dst := chains[p.Src.ChainID], chains[p.Dst.ChainID] src.PathEnd = p.Src dst.PathEnd = p.Dst - go relayerStartLegacy(ctx, log, src, dst, p.Filter, TwoMB, maxMsgLength, memo, errorChan) + go relayerStartLegacy(ctx, log, src, dst, p.Filter, TwoMB, maxMsgLength, memo, errorChan, metrics) return errorChan default: panic(fmt.Errorf("unexpected processor type: %s, supports one of: [%s, %s]", processorType, ProcessorEvents, ProcessorLegacy)) @@ -175,7 +175,7 @@ func relayerStartEventProcessor( } // relayerStartLegacy is the main loop of the relayer. -func relayerStartLegacy(ctx context.Context, log *zap.Logger, src, dst *Chain, filter ChannelFilter, maxTxSize, maxMsgLength uint64, memo string, errCh chan<- error) { +func relayerStartLegacy(ctx context.Context, log *zap.Logger, src, dst *Chain, filter ChannelFilter, maxTxSize, maxMsgLength uint64, memo string, errCh chan<- error, metrics *processor.PrometheusMetrics) { defer close(errCh) // Query the list of channels on the src connection. @@ -219,7 +219,7 @@ func relayerStartLegacy(ctx context.Context, log *zap.Logger, src, dst *Chain, f if !channel.active { channel.active = true wg.Add(1) - go relayUnrelayedPacketsAndAcks(ctx, log, &wg, src, dst, maxTxSize, maxMsgLength, memo, channel, channels) + go relayUnrelayedPacketsAndAcks(ctx, log, &wg, src, dst, maxTxSize, maxMsgLength, memo, channel, channels, metrics) } } @@ -346,7 +346,7 @@ func applyChannelFilterRule(filter ChannelFilter, channels []*types.IdentifiedCh } // relayUnrelayedPacketsAndAcks will relay all the pending packets and acknowledgements on both the src and dst chains. -func relayUnrelayedPacketsAndAcks(ctx context.Context, log *zap.Logger, wg *sync.WaitGroup, src, dst *Chain, maxTxSize, maxMsgLength uint64, memo string, srcChannel *ActiveChannel, channels chan<- *ActiveChannel) { +func relayUnrelayedPacketsAndAcks(ctx context.Context, log *zap.Logger, wg *sync.WaitGroup, src, dst *Chain, maxTxSize, maxMsgLength uint64, memo string, srcChannel *ActiveChannel, channels chan<- *ActiveChannel, metrics *processor.PrometheusMetrics) { // make goroutine signal its death, whether it's a panic or a return defer func() { wg.Done() @@ -354,7 +354,7 @@ func relayUnrelayedPacketsAndAcks(ctx context.Context, log *zap.Logger, wg *sync }() for { - if ok := relayUnrelayedPackets(ctx, log, src, dst, maxTxSize, maxMsgLength, memo, srcChannel.channel); !ok { + if ok := relayUnrelayedPackets(ctx, log, src, dst, maxTxSize, maxMsgLength, memo, srcChannel.channel, metrics); !ok { return } if ok := relayUnrelayedAcks(ctx, log, src, dst, maxTxSize, maxMsgLength, memo, srcChannel.channel); !ok { @@ -374,7 +374,7 @@ func relayUnrelayedPacketsAndAcks(ctx context.Context, log *zap.Logger, wg *sync // relayUnrelayedPackets fetches unrelayed packet sequence numbers and attempts to relay the associated packets. // relayUnrelayedPackets returns true if packets were empty or were successfully relayed. // Otherwise, it logs the errors and returns false. -func relayUnrelayedPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, maxTxSize, maxMsgLength uint64, memo string, srcChannel *types.IdentifiedChannel) bool { +func relayUnrelayedPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, maxTxSize, maxMsgLength uint64, memo string, srcChannel *types.IdentifiedChannel, metrics *processor.PrometheusMetrics) bool { // Fetch any unrelayed sequences depending on the channel order sp := UnrelayedSequences(ctx, src, dst, srcChannel) @@ -393,6 +393,8 @@ func relayUnrelayedPackets(ctx context.Context, log *zap.Logger, src, dst *Chain } if len(sp.Src) > 0 { + count := len(sp.Src) + len(sp.Dst) + metrics.SetUnrelayedPackets("mainnet-kujira-noble", src.ChainID(), dst.ChainID(), sp.Src, sp.Dst, float64(count)) src.log.Info( "Unrelayed source packets", zap.String("src_chain_id", src.ChainID()),