Skip to content

Commit

Permalink
pairing session
Browse files Browse the repository at this point in the history
  • Loading branch information
danbryan committed Aug 18, 2023
1 parent d567f85 commit c81ba83
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 13 deletions.
18 changes: 11 additions & 7 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,25 +983,29 @@ $ %s query unrelayed-pkts demo-path channel-0`,

src, dst := path.Src.ChainID, path.Dst.ChainID

c, err := a.config.Chains.Gets(src, dst)
chains, err := a.config.Chains.Gets(src, dst)
if err != nil {
return err
}

if err = c[src].SetPath(path.Src); err != nil {
return err
if chain, ok := chains[src]; ok {
if err = chain.SetPath(path.Src); err != nil {
return err
}
}
if err = c[dst].SetPath(path.Dst); err != nil {
return err
if chain, ok := chains[dst]; ok {
if err = chain.SetPath(path.Dst); err != nil {
return err
}
}

channelID := args[1]
channel, err := relayer.QueryChannel(cmd.Context(), c[src], channelID)
channel, err := relayer.QueryChannel(cmd.Context(), chains[src], channelID)
if err != nil {
return err
}

sp := relayer.UnrelayedSequences(cmd.Context(), c[src], c[dst], channel)
sp := relayer.UnrelayedSequences(cmd.Context(), chains[src], chains[dst], channel)

out, err := json.Marshal(sp)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions relayer/processor/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package processor

import (
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -18,6 +19,7 @@ type PrometheusMetrics struct {
BlockQueryFailure *prometheus.CounterVec
ClientExpiration *prometheus.GaugeVec
ClientTrustingPeriod *prometheus.GaugeVec
UnrelayedPackets *prometheus.GaugeVec
}

func (m *PrometheusMetrics) AddPacketsObserved(pathName, chain, channel, port, eventType string, count int) {
Expand Down Expand Up @@ -56,6 +58,18 @@ func (m *PrometheusMetrics) IncTxFailure(pathName, chain, errDesc string) {
m.TxFailureError.WithLabelValues(pathName, chain, errDesc).Inc()
}

func sequenceToString(sequence []uint64) string {
var sequenceString string
for _, seq := range sequence {
sequenceString += strconv.FormatInt(int64(seq), 10)
}
return sequenceString
}

func (m *PrometheusMetrics) SetUnrelayedPackets(pathName, srcChain, dstChain string, srcSequence []uint64, dstSequence []uint64, numPackets float64) {
m.UnrelayedPackets.WithLabelValues(pathName, srcChain, dstChain, sequenceToString(srcSequence), sequenceToString(dstSequence)).Set(numPackets)
}

func NewPrometheusMetrics() *PrometheusMetrics {
packetLabels := []string{"path_name", "chain", "channel", "port", "type"}
heightLabels := []string{"chain"}
Expand All @@ -64,6 +78,7 @@ func NewPrometheusMetrics() *PrometheusMetrics {
walletLabels := []string{"chain", "gas_price", "key", "address", "denom"}
clientExpirationLables := []string{"path_name", "chain", "client_id", "trusting_period"}
clientTrustingPeriodLables := []string{"path_name", "chain", "client_id"}
unrelayedLabels := []string{"path_name", "src_chain", "dst_chain", "src_sequence", "dst_sequence"}
registry := prometheus.NewRegistry()
registerer := promauto.With(registry)
return &PrometheusMetrics{
Expand Down Expand Up @@ -104,5 +119,9 @@ func NewPrometheusMetrics() *PrometheusMetrics {
Name: "cosmos_relayer_client_trusting_period_seconds",
Help: "The trusting period (in seconds) of the client",
}, clientTrustingPeriodLables),
UnrelayedPackets: registerer.NewGaugeVec(prometheus.GaugeOpts{
Name: "cosmos_relayer_unrelayed_packets",
Help: "The number of unrelayed packets between two chains",
}, unrelayedLabels),
}
}
14 changes: 8 additions & 6 deletions relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func StartRelayer(
src, dst := chains[p.Src.ChainID], chains[p.Dst.ChainID]
src.PathEnd = p.Src
dst.PathEnd = p.Dst
go relayerStartLegacy(ctx, log, src, dst, p.Filter, TwoMB, maxMsgLength, memo, errorChan)
go relayerStartLegacy(ctx, log, src, dst, p.Filter, TwoMB, maxMsgLength, memo, errorChan, metrics)
return errorChan
default:
panic(fmt.Errorf("unexpected processor type: %s, supports one of: [%s, %s]", processorType, ProcessorEvents, ProcessorLegacy))
Expand Down Expand Up @@ -175,7 +175,7 @@ func relayerStartEventProcessor(
}

// relayerStartLegacy is the main loop of the relayer.
func relayerStartLegacy(ctx context.Context, log *zap.Logger, src, dst *Chain, filter ChannelFilter, maxTxSize, maxMsgLength uint64, memo string, errCh chan<- error) {
func relayerStartLegacy(ctx context.Context, log *zap.Logger, src, dst *Chain, filter ChannelFilter, maxTxSize, maxMsgLength uint64, memo string, errCh chan<- error, metrics *processor.PrometheusMetrics) {
defer close(errCh)

// Query the list of channels on the src connection.
Expand Down Expand Up @@ -219,7 +219,7 @@ func relayerStartLegacy(ctx context.Context, log *zap.Logger, src, dst *Chain, f
if !channel.active {
channel.active = true
wg.Add(1)
go relayUnrelayedPacketsAndAcks(ctx, log, &wg, src, dst, maxTxSize, maxMsgLength, memo, channel, channels)
go relayUnrelayedPacketsAndAcks(ctx, log, &wg, src, dst, maxTxSize, maxMsgLength, memo, channel, channels, metrics)
}
}

Expand Down Expand Up @@ -346,15 +346,15 @@ func applyChannelFilterRule(filter ChannelFilter, channels []*types.IdentifiedCh
}

// relayUnrelayedPacketsAndAcks will relay all the pending packets and acknowledgements on both the src and dst chains.
func relayUnrelayedPacketsAndAcks(ctx context.Context, log *zap.Logger, wg *sync.WaitGroup, src, dst *Chain, maxTxSize, maxMsgLength uint64, memo string, srcChannel *ActiveChannel, channels chan<- *ActiveChannel) {
func relayUnrelayedPacketsAndAcks(ctx context.Context, log *zap.Logger, wg *sync.WaitGroup, src, dst *Chain, maxTxSize, maxMsgLength uint64, memo string, srcChannel *ActiveChannel, channels chan<- *ActiveChannel, metrics *processor.PrometheusMetrics) {
// make goroutine signal its death, whether it's a panic or a return
defer func() {
wg.Done()
channels <- srcChannel
}()

for {
if ok := relayUnrelayedPackets(ctx, log, src, dst, maxTxSize, maxMsgLength, memo, srcChannel.channel); !ok {
if ok := relayUnrelayedPackets(ctx, log, src, dst, maxTxSize, maxMsgLength, memo, srcChannel.channel, metrics); !ok {
return
}
if ok := relayUnrelayedAcks(ctx, log, src, dst, maxTxSize, maxMsgLength, memo, srcChannel.channel); !ok {
Expand All @@ -374,7 +374,7 @@ func relayUnrelayedPacketsAndAcks(ctx context.Context, log *zap.Logger, wg *sync
// relayUnrelayedPackets fetches unrelayed packet sequence numbers and attempts to relay the associated packets.
// relayUnrelayedPackets returns true if packets were empty or were successfully relayed.
// Otherwise, it logs the errors and returns false.
func relayUnrelayedPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, maxTxSize, maxMsgLength uint64, memo string, srcChannel *types.IdentifiedChannel) bool {
func relayUnrelayedPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, maxTxSize, maxMsgLength uint64, memo string, srcChannel *types.IdentifiedChannel, metrics *processor.PrometheusMetrics) bool {
// Fetch any unrelayed sequences depending on the channel order
sp := UnrelayedSequences(ctx, src, dst, srcChannel)

Expand All @@ -393,6 +393,8 @@ func relayUnrelayedPackets(ctx context.Context, log *zap.Logger, src, dst *Chain
}

if len(sp.Src) > 0 {
count := len(sp.Src) + len(sp.Dst)
metrics.SetUnrelayedPackets("mainnet-kujira-noble", src.ChainID(), dst.ChainID(), sp.Src, sp.Dst, float64(count))
src.log.Info(
"Unrelayed source packets",
zap.String("src_chain_id", src.ChainID()),
Expand Down

0 comments on commit c81ba83

Please sign in to comment.