Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove context timeout from initalize connection/channel state #164

Merged
merged 1 commit into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@
prevNetworkSectionHash []byte
}

func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *IconChainProcessor {

Check warning on line 79 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L79

Added line #L79 was not covered by tests
return &IconChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),

Check warning on line 81 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L81

Added line #L81 was not covered by tests
chainProvider: provider,
latestClientState: make(latestClientState),
connectionStateCache: make(processor.ConnectionStateCache),
Expand All @@ -86,7 +86,7 @@
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,

Check warning on line 89 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L89

Added line #L89 was not covered by tests
}
}

Expand Down Expand Up @@ -152,38 +152,37 @@
}

// start_query_cycle
icp.log.Debug("Starting query cycle")

Check warning on line 155 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L155

Added line #L155 was not covered by tests
err := icp.monitoring(ctx, &persistence)
return err
}

func (icp *IconChainProcessor) StartFromHeight(ctx context.Context) int64 {

Check warning on line 160 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L160

Added line #L160 was not covered by tests
cfg := icp.Provider().ProviderConfig().(*IconProviderConfig)

Check warning on line 162 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L162

Added line #L162 was not covered by tests
if cfg.StartHeight != 0 {
return cfg.StartHeight

Check warning on line 164 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L164

Added line #L164 was not covered by tests
}
snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId())
if err != nil {
icp.log.Warn("Failed to load height from snapshot", zap.Error(err))
} else {
icp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight))
}
return snapshotHeight

Check warning on line 172 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L170-L172

Added lines #L170 - L172 were not covered by tests
}

func (icp *IconChainProcessor) getLastSavedHeight() int64 {
snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId())
if err != nil || snapshotHeight < 0 {
return 0

Check warning on line 178 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L175-L178

Added lines #L175 - L178 were not covered by tests
}
return snapshotHeight
}

func (icp *IconChainProcessor) initializeConnectionState(ctx context.Context) error {
// TODO: review
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
// ctx, cancel := context.WithTimeout(ctx, queryTimeout)
// defer cancel()

Check warning on line 185 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L184-L185

Added lines #L184 - L185 were not covered by tests

connections, err := icp.chainProvider.QueryConnections(ctx)
if err != nil {
Expand All @@ -199,18 +198,17 @@
CounterpartyClientID: c.Counterparty.ClientId,
}] = c.State == conntypes.OPEN

icp.log.Debug("Found open connection",
zap.String("client-id ", c.ClientId),
zap.String("connection-id ", c.Id),

Check warning on line 203 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L201-L203

Added lines #L201 - L203 were not covered by tests
)
}
return nil
}

func (icp *IconChainProcessor) initializeChannelState(ctx context.Context) error {
// TODO:
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
// ctx, cancel := context.WithTimeout(ctx, queryTimeout)
// defer cancel()

Check warning on line 211 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L210-L211

Added lines #L210 - L211 were not covered by tests
channels, err := icp.chainProvider.QueryChannels(ctx)
if err != nil {
return fmt.Errorf("error querying channels: %w", err)
Expand All @@ -233,11 +231,11 @@
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN

icp.log.Debug("Found open channel",
zap.String("channel-id", ch.ChannelId),
zap.String("port-id ", ch.PortId),
zap.String("counterparty-channel-id", ch.Counterparty.ChannelId),
zap.String("counterparty-port-id", ch.Counterparty.PortId))

Check warning on line 238 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L234-L238

Added lines #L234 - L238 were not covered by tests
}

return nil
Expand Down Expand Up @@ -276,24 +274,24 @@
}

var err error
processedheight := icp.StartFromHeight(ctx)
latestHeight, err := icp.chainProvider.QueryLatestHeight(ctx)
if err != nil {
icp.log.Error("Error fetching block", zap.Error(err))
return err
}
if processedheight > latestHeight {
icp.log.Warn("Start height set is greater than latest height",
zap.Int64("start height", processedheight),
zap.Int64("latest Height", latestHeight),
)
processedheight = latestHeight
}

Check warning on line 289 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L277-L289

Added lines #L277 - L289 were not covered by tests
if processedheight <= 0 {
processedheight = latestHeight

Check warning on line 291 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L291

Added line #L291 was not covered by tests
}

icp.log.Info("Start to query from height", zap.Int64("height", processedheight))

Check warning on line 294 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L294

Added line #L294 was not covered by tests
// subscribe to monitor block
ctxMonitorBlock, cancelMonitorBlock := context.WithCancel(ctx)
reconnect()
Expand All @@ -303,7 +301,7 @@
icp.firstTime = true

blockReq := &types.BlockRequest{
Height: types.NewHexInt(int64(processedheight)),

Check warning on line 304 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L304

Added line #L304 was not covered by tests
EventFilters: GetMonitorEventFilters(icp.chainProvider.PCfg.IbcHandlerAddress),
}

Expand All @@ -315,8 +313,8 @@
case err := <-errCh:
return err

case <-icp.heightSnapshotChan:
icp.SnapshotHeight(icp.getHeightToSave(int64(icp.latestBlock.Height)))

Check warning on line 317 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L316-L317

Added lines #L316 - L317 were not covered by tests

case <-reconnectCh:
cancelMonitorBlock()
Expand All @@ -324,7 +322,7 @@

go func(ctx context.Context, cancel context.CancelFunc) {
blockReq.Height = types.NewHexInt(processedheight)
icp.log.Debug("Try to reconnect from", zap.Int64("height", processedheight))

Check warning on line 325 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L325

Added line #L325 was not covered by tests
err := icp.chainProvider.client.MonitorBlock(ctx, blockReq, func(conn *websocket.Conn, v *types.BlockNotification) error {
if !errors.Is(ctx.Err(), context.Canceled) {
btpBlockNotifCh <- v
Expand All @@ -333,10 +331,10 @@
}, func(conn *websocket.Conn) {
}, func(conn *websocket.Conn, err error) {})
if err != nil {
ht := icp.getHeightToSave(processedheight)
if ht != icp.getLastSavedHeight() {
icp.SnapshotHeight(ht)
}

Check warning on line 337 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L334-L337

Added lines #L334 - L337 were not covered by tests
if errors.Is(err, context.Canceled) {
return
}
Expand All @@ -352,16 +350,16 @@
err := icp.verifyBlock(ctx, br.Header)
if err != nil {
reconnect()
icp.log.Warn("Failed to verify BTP block",
zap.Int64("height", br.Height),

Check warning on line 354 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L353-L354

Added lines #L353 - L354 were not covered by tests
zap.Error(err),
)
break
}

icp.log.Debug("Verified block ",
zap.Int64("height", int64(processedheight)))

Check warning on line 362 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L360-L362

Added lines #L360 - L362 were not covered by tests
icp.latestBlock = provider.LatestBlock{
Height: uint64(processedheight),
}
Expand All @@ -374,7 +372,7 @@
}

ibcHeaderCache[uint64(br.Height)] = br.Header
icp.log.Debug("Queried block ",

Check warning on line 375 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L375

Added line #L375 was not covered by tests
zap.Int64("height", br.Height))
err = icp.handlePathProcessorUpdate(ctx, br.Header, ibcMessageCache, ibcHeaderCache.Clone())
if err != nil {
Expand All @@ -385,9 +383,9 @@
break
}
time.Sleep(10 * time.Millisecond)
if icp.firstTime {
time.Sleep(4000 * time.Millisecond)
}

Check warning on line 388 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L386-L388

Added lines #L386 - L388 were not covered by tests
icp.firstTime = false
if br = nil; len(btpBlockRespCh) > 0 {
br = <-btpBlockRespCh
Expand All @@ -409,7 +407,7 @@
if err != nil {
return err
} else if height != processedheight+i {
icp.log.Warn("Reconnect: missing block notification",

Check warning on line 410 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L410

Added line #L410 was not covered by tests
zap.Int64("got", height),
zap.Int64("expected", processedheight+i),
)
Expand Down Expand Up @@ -482,20 +480,20 @@
}
}

func (icp *IconChainProcessor) getHeightToSave(height int64) int64 {

Check warning on line 483 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L483

Added line #L483 was not covered by tests
retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := height - int64(retryAfter)
if ht < 0 {
return 0
}
return ht

Check warning on line 489 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L485-L489

Added lines #L485 - L489 were not covered by tests
}

func (icp *IconChainProcessor) SnapshotHeight(height int64) {
icp.log.Info("Save height for snapshot", zap.Int64("height", height))
err := rlycommon.SnapshotHeight(icp.Provider().ChainId(), height)
if err != nil {
icp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height))

Check warning on line 496 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L492-L496

Added lines #L492 - L496 were not covered by tests
}
}

Expand Down Expand Up @@ -554,8 +552,8 @@
icp.verifier.nextProofContext = header.Validators
icp.verifier.verifiedHeight = int64(header.Height())
icp.verifier.prevNetworkSectionHash = types.NewNetworkSection(header.Header).Hash()
icp.log.Debug("Verified block ",
zap.Uint64("height", header.Height()))

Check warning on line 556 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L555-L556

Added lines #L555 - L556 were not covered by tests
return nil
}

Expand Down Expand Up @@ -632,8 +630,8 @@
request.err = errors.Wrapf(err, "event.UnmarshalFromBytes: %v", err)
return
}
icp.log.Info("Detected eventlog ", zap.Int64("height", request.height),
zap.String("eventlog", IconCosmosEventMap[string(el.Indexed[0])]))

Check warning on line 634 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L633-L634

Added lines #L633 - L634 were not covered by tests
eventlogs = append(eventlogs, el)
}

Expand All @@ -659,7 +657,7 @@
request.response.IsProcessed = processed
return
}
request.err = errors.Wrapf(err, "Failed to get btp header: %v", err)

Check warning on line 660 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L660

Added line #L660 was not covered by tests
return
}
request.response.Header = NewIconIBCHeader(btpHeader, validators, int64(btpHeader.MainHeight))
Expand Down
8 changes: 4 additions & 4 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
Header *types.LightBlock
}

func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *WasmChainProcessor {

Check warning on line 69 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L69

Added line #L69 was not covered by tests
return &WasmChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand All @@ -76,7 +76,7 @@
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,

Check warning on line 79 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L79

Added line #L79 was not covered by tests
}
}

Expand Down Expand Up @@ -178,7 +178,7 @@
// clientState will return the most recent client state if client messages
// have already been observed for the clientID, otherwise it will query for it.
func (ccp *WasmChainProcessor) clientState(ctx context.Context, clientID string) (provider.ClientState, error) {
if state, ok := ccp.latestClientState[clientID]; ok {

Check warning on line 181 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L181

Added line #L181 was not covered by tests
return state, nil
}
cs, err := ccp.chainProvider.QueryClientState(ctx, int64(ccp.latestBlock.Height), clientID)
Expand All @@ -203,16 +203,16 @@
balanceUpdateWaitDuration time.Duration
}

func (ccp *WasmChainProcessor) StartFromHeight(ctx context.Context) int64 {

Check warning on line 206 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L206

Added line #L206 was not covered by tests
cfg := ccp.Provider().ProviderConfig().(*WasmProviderConfig)
if cfg.StartHeight != 0 {
return int64(cfg.StartHeight)

Check warning on line 209 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L209

Added line #L209 was not covered by tests
}
snapshotHeight, err := common.LoadSnapshotHeight(ccp.Provider().ChainId())
if err != nil {
ccp.log.Warn("Failed to load height from snapshot", zap.Error(err))
} else {
ccp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight))

Check warning on line 215 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L215

Added line #L215 was not covered by tests
}
return snapshotHeight
}
Expand Down Expand Up @@ -248,13 +248,13 @@

// this will make initial QueryLoop iteration look back initialBlockHistory blocks in history
latestQueriedBlock := ccp.StartFromHeight(ctx)
if latestQueriedBlock <= 0 || latestQueriedBlock > persistence.latestHeight {
latestQueriedBlock = persistence.latestHeight

Check warning on line 252 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L251-L252

Added lines #L251 - L252 were not covered by tests
}

persistence.latestQueriedBlock = int64(latestQueriedBlock)

ccp.log.Info("Start to query from height ", zap.Int64("height", latestQueriedBlock))

Check warning on line 257 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L257

Added line #L257 was not covered by tests

_, lightBlock, err := ccp.chainProvider.QueryLightBlock(ctx, persistence.latestQueriedBlock)
if err != nil {
Expand Down Expand Up @@ -292,8 +292,8 @@
select {
case <-ctx.Done():
return nil
case <-ccp.heightSnapshotChan:
ccp.SnapshotHeight(ccp.getHeightToSave(persistence.latestHeight))

Check warning on line 296 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L295-L296

Added lines #L295 - L296 were not covered by tests
case <-ticker.C:
ticker.Reset(persistence.minQueryLoopDuration)
}
Expand All @@ -302,8 +302,8 @@

// initializeConnectionState will bootstrap the connectionStateCache with the open connection state.
func (ccp *WasmChainProcessor) initializeConnectionState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
// ctx, cancel := context.WithTimeout(ctx, queryTimeout)
// defer cancel()

Check warning on line 306 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L305-L306

Added lines #L305 - L306 were not covered by tests
connections, err := ccp.chainProvider.QueryConnections(ctx)
if err != nil {
return fmt.Errorf("error querying connections: %w", err)
Expand All @@ -316,19 +316,19 @@
CounterpartyConnID: c.Counterparty.ConnectionId,
CounterpartyClientID: c.Counterparty.ClientId,
}] = c.State == conntypes.OPEN

ccp.log.Debug("Found open connection",
zap.String("client-id ", c.ClientId),
zap.String("connection-id ", c.Id),
)

Check warning on line 323 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L319-L323

Added lines #L319 - L323 were not covered by tests
}
return nil
}

// initializeChannelState will bootstrap the channelStateCache with the open channel state.
func (ccp *WasmChainProcessor) initializeChannelState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
// ctx, cancel := context.WithTimeout(ctx, queryTimeout)
// defer cancel()

Check warning on line 331 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L330-L331

Added lines #L330 - L331 were not covered by tests
channels, err := ccp.chainProvider.QueryChannels(ctx)
if err != nil {
return fmt.Errorf("error querying channels: %w", err)
Expand All @@ -349,11 +349,11 @@
CounterpartyChannelID: ch.Counterparty.ChannelId,
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN
ccp.log.Debug("Found open channel",
zap.String("channel-id", ch.ChannelId),
zap.String("port-id ", ch.PortId),
zap.String("counterparty-channel-id", ch.Counterparty.ChannelId),
zap.String("counterparty-port-id", ch.Counterparty.PortId))

Check warning on line 356 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L352-L356

Added lines #L352 - L356 were not covered by tests
}
return nil
}
Expand All @@ -367,16 +367,16 @@
zap.Uint("attempts", latestHeightQueryRetries),
zap.Error(err),
)

// TODO: Save height when node status is false?
// ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight))

Check warning on line 372 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L370-L372

Added lines #L370 - L372 were not covered by tests
return nil
}

persistence.latestHeight = status.SyncInfo.LatestBlockHeight
// ccp.chainProvider.setCometVersion(ccp.log, status.NodeInfo.Version)

ccp.log.Debug("Queried latest height",

Check warning on line 379 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L379

Added line #L379 was not covered by tests
zap.Int64("latest_height", persistence.latestHeight),
)

Expand Down Expand Up @@ -409,14 +409,14 @@
chainID := ccp.chainProvider.ChainId()
var latestHeader provider.IBCHeader

syncUpHeight := func() int64 {
if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch {
return persistence.latestQueriedBlock + MaxBlockFetch
}
return persistence.latestHeight

Check warning on line 416 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L412-L416

Added lines #L412 - L416 were not covered by tests
}

for i := persistence.latestQueriedBlock + 1; i <= syncUpHeight(); i++ {

Check warning on line 419 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L419

Added line #L419 was not covered by tests
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
var lightBlock *types.LightBlock
Expand All @@ -440,7 +440,7 @@
}

if err := ccp.Verify(ctx, lightBlock); err != nil {
ccp.log.Warn("Failed to verify block", zap.Int64("height", blockRes.Height), zap.Error(err))

Check warning on line 443 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L443

Added line #L443 was not covered by tests
return err
}

Expand All @@ -466,7 +466,7 @@
messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64, ccp.chainProvider.PCfg.IbcHandlerAddress, base64Encoded)

for _, m := range messages {
ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", heightUint64))

Check warning on line 469 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L469

Added line #L469 was not covered by tests
ccp.handleMessage(ctx, m, ibcMessagesCache)
}
}
Expand Down Expand Up @@ -516,20 +516,20 @@
return nil
}

func (ccp *WasmChainProcessor) getHeightToSave(height int64) int64 {

Check warning on line 519 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L519

Added line #L519 was not covered by tests
retryAfter := ccp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := height - int64(retryAfter)
if ht < 0 {
return 0
}
return ht

Check warning on line 525 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L521-L525

Added lines #L521 - L525 were not covered by tests
}

func (ccp *WasmChainProcessor) SnapshotHeight(height int64) {
ccp.log.Info("Save height for snapshot", zap.Int64("height", height))
err := common.SnapshotHeight(ccp.Provider().ChainId(), height)
if err != nil {
ccp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height))

Check warning on line 532 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L528-L532

Added lines #L528 - L532 were not covered by tests
}
}

Expand Down
Loading