Skip to content

Commit

Permalink
path processor race (#995)
Browse files Browse the repository at this point in the history
* fix path processor indexing

* clone maps before passing to pathprocessor

* Merge should Clone

* pre-size maps
  • Loading branch information
agouin authored Sep 20, 2022
1 parent a4206f0 commit 2c2c6a7
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 25 deletions.
4 changes: 2 additions & 2 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,12 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
pp.HandleNewData(chainID, processor.ChainProcessorCacheData{
LatestBlock: ccp.latestBlock,
LatestHeader: latestHeader,
IBCMessagesCache: ibcMessagesCache,
IBCMessagesCache: ibcMessagesCache.Clone(),
InSync: ccp.inSync,
ClientState: clientState,
ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID),
ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients),
IBCHeaderCache: ibcHeaderCache,
IBCHeaderCache: ibcHeaderCache.Clone(),
})
}

Expand Down
52 changes: 32 additions & 20 deletions relayer/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ type IBCMessagesCache struct {
ChannelHandshake ChannelMessagesCache
}

// Clone makes a deep copy of an IBCMessagesCache.
func (c IBCMessagesCache) Clone() IBCMessagesCache {
x := IBCMessagesCache{
PacketFlow: make(ChannelPacketMessagesCache, len(c.PacketFlow)),
ConnectionHandshake: make(ConnectionMessagesCache, len(c.ConnectionHandshake)),
ChannelHandshake: make(ChannelMessagesCache, len(c.ChannelHandshake)),
}
x.PacketFlow.Merge(c.PacketFlow)
x.ConnectionHandshake.Merge(c.ConnectionHandshake)
x.ChannelHandshake.Merge(c.ChannelHandshake)
return x
}

// NewIBCMessagesCache returns an empty IBCMessagesCache.
func NewIBCMessagesCache() IBCMessagesCache {
return IBCMessagesCache{
Expand Down Expand Up @@ -257,12 +270,10 @@ func (c PacketMessagesCache) DeleteMessages(toDelete ...map[string][]uint64) {
// Merge merges another ChannelPacketMessagesCache into this one.
func (c ChannelPacketMessagesCache) Merge(other ChannelPacketMessagesCache) {
for channelKey, messageCache := range other {
_, ok := c[channelKey]
if !ok {
c[channelKey] = messageCache
} else {
c[channelKey].Merge(messageCache)
if _, ok := c[channelKey]; !ok {
c[channelKey] = make(PacketMessagesCache)
}
c[channelKey].Merge(messageCache)
}
}

Expand Down Expand Up @@ -304,12 +315,10 @@ func (c ChannelPacketMessagesCache) Retain(k ChannelKey, m string, pi provider.P
// Merge merges another PacketMessagesCache into this one.
func (c PacketMessagesCache) Merge(other PacketMessagesCache) {
for ibcMessage, messageCache := range other {
_, ok := c[ibcMessage]
if !ok {
c[ibcMessage] = messageCache
} else {
c[ibcMessage].Merge(messageCache)
if _, ok := c[ibcMessage]; !ok {
c[ibcMessage] = make(PacketSequenceCache)
}
c[ibcMessage].Merge(messageCache)
}
}

Expand All @@ -323,12 +332,10 @@ func (c PacketSequenceCache) Merge(other PacketSequenceCache) {
// Merge merges another ConnectionMessagesCache into this one.
func (c ConnectionMessagesCache) Merge(other ConnectionMessagesCache) {
for ibcMessage, messageCache := range other {
_, ok := c[ibcMessage]
if !ok {
c[ibcMessage] = messageCache
} else {
c[ibcMessage].Merge(messageCache)
if _, ok := c[ibcMessage]; !ok {
c[ibcMessage] = make(ConnectionMessageCache)
}
c[ibcMessage].Merge(messageCache)
}
}

Expand Down Expand Up @@ -360,12 +367,10 @@ func (c ConnectionMessageCache) Merge(other ConnectionMessageCache) {
// Merge merges another ChannelMessagesCache into this one.
func (c ChannelMessagesCache) Merge(other ChannelMessagesCache) {
for ibcMessage, messageCache := range other {
_, ok := c[ibcMessage]
if !ok {
c[ibcMessage] = messageCache
} else {
c[ibcMessage].Merge(messageCache)
if _, ok := c[ibcMessage]; !ok {
c[ibcMessage] = make(ChannelMessageCache)
}
c[ibcMessage].Merge(messageCache)
}
}

Expand Down Expand Up @@ -397,6 +402,13 @@ func (c ChannelMessageCache) Merge(other ChannelMessageCache) {
// IBCHeaderCache holds a mapping of IBCHeaders for their block height.
type IBCHeaderCache map[uint64]provider.IBCHeader

// Clone makes a deep copy of an IBCHeaderCache.
func (c IBCHeaderCache) Clone() IBCHeaderCache {
x := make(IBCHeaderCache, len(c))
x.Merge(c)
return x
}

// Merge merges another IBCHeaderCache into this one.
func (c IBCHeaderCache) Merge(other IBCHeaderCache) {
for k, v := range other {
Expand Down
6 changes: 3 additions & 3 deletions relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func StartRelayer(
}

ePaths := make([]path, len(paths))
for _, np := range paths {
for i, np := range paths {
pathName := np.Name
p := np.Path

Expand All @@ -62,10 +62,10 @@ func StartRelayer(
filterSrc = append(filterSrc, ruleSrc)
filterDst = append(filterDst, ruleDst)
}
ePaths = append(ePaths, path{
ePaths[i] = path{
src: processor.NewPathEnd(pathName, p.Src.ChainID, p.Src.ClientID, filter.Rule, filterSrc),
dst: processor.NewPathEnd(pathName, p.Dst.ChainID, p.Dst.ClientID, filter.Rule, filterDst),
})
}
}

go relayerStartEventProcessor(ctx, log, chainProcessors, ePaths, initialBlockHistory, maxTxSize, maxMsgLength, memo, errorChan, metrics)
Expand Down

0 comments on commit 2c2c6a7

Please sign in to comment.