Skip to content

Commit

Permalink
iterate the channelStateCache and add counts for each channel
Browse files Browse the repository at this point in the history
  • Loading branch information
danbryan committed Aug 18, 2023
1 parent 5a4be03 commit 41285e6
Showing 1 changed file with 32 additions and 14 deletions.
46 changes: 32 additions & 14 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,12 @@ SeqLoop:
return skipped, nil
}

// Define the Unrelayed struct outside the function
type Unrelayed struct {
Recv uint64
Ack uint64
}

// flush runs queries to relay any pending messages which may have been
// in blocks before the height that the chain processors started querying.
func (pp *PathProcessor) flush(ctx context.Context) error {
Expand Down Expand Up @@ -1499,9 +1505,31 @@ func (pp *PathProcessor) flush(ctx context.Context) error {
pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync)
pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync)

// Build a map before the if len(skipped) > 0 check
unrelayed := make(map[ChannelKey]Unrelayed)

// Initialize the map with Recv: 0 and Ack: 0 for each channel in pathEnd1.channelStateCache
for k := range pp.pathEnd1.channelStateCache {
unrelayed[k] = Unrelayed{Recv: 0, Ack: 0}
}

// Initialize the map with Recv: 0 and Ack: 0 for each channel in pathEnd2.channelStateCache
for k := range pp.pathEnd2.channelStateCache {
unrelayed[k] = Unrelayed{Recv: 0, Ack: 0}
}

// Update the map with actual counts from the skipped map
for _, chainSkipped := range skipped {
for channelKey, skipped := range chainSkipped {
if _, exists := unrelayed[channelKey]; exists {
unrelayed[channelKey] = Unrelayed{Recv: skipped.Recv, Ack: skipped.Ack}
}
}
}

totalUnrelayed := 0.0 // Initialize a variable to store the total number of unrelayed packets
if len(skipped) > 0 {
skippedPacketsString := ""
totalUnrelayed := 0.0 // Initialize a variable to store the total number of unrelayed packets

for chainID, chainSkipped := range skipped {
for channelKey, skipped := range chainSkipped {
Expand All @@ -1513,25 +1541,15 @@ func (pp *PathProcessor) flush(ctx context.Context) error {
}
}

// Assuming chainID or channelKey contains the path details
// pathName := chainID // or some derivation using chainID and channelKey
// srcChain := ... // derive or access the source chain
// dstChain := ... // derive or access the destination chain
// srcSequence := ... // derive or access the source sequence
// dstSequence := ... // derive or access the destination sequence

// Call the SetUnrelayedPackets function here for each path
// metrics.SetUnrelayedPackets(pathName, srcChain, dstChain, srcSequence, dstSequence, float64(skipped.Recv))

// hard code test if metric is exposed.
pp.metrics.SetUnrelayedPackets("mainnet-kujira-noble", "kujira", "noble", []uint64{1, 2, 3}, []uint64{0}, totalUnrelayed)

return fmt.Errorf(
"flush was successful, but packets are still pending. %s",
skippedPacketsString,
)
}

// hard code test if metric is exposed.
pp.metrics.SetUnrelayedPackets("mainnet-kujira-kava", "kujira", "noble", []uint64{1, 2, 3}, []uint64{0}, totalUnrelayed)

return nil
}

Expand Down

0 comments on commit 41285e6

Please sign in to comment.