Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix start height greater than latest height issue #151

Merged
merged 6 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@
}
}

func (ccp *CosmosChainProcessor) SnapshotHeight(height int) {
func (ccp *CosmosChainProcessor) SnapshotHeight(height int64) {

Check warning on line 265 in relayer/chains/cosmos/cosmos_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/cosmos/cosmos_chain_processor.go#L265

Added line #L265 was not covered by tests
panic("Not implemented for Cosmos")
}

func (ccp *CosmosChainProcessor) StartFromHeight(ctx context.Context) int {
func (ccp *CosmosChainProcessor) StartFromHeight(ctx context.Context) int64 {

Check warning on line 269 in relayer/chains/cosmos/cosmos_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/cosmos/cosmos_chain_processor.go#L269

Added line #L269 was not covered by tests
panic("Not implemented for Cosmos")
}

Expand Down
43 changes: 25 additions & 18 deletions relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@
prevNetworkSectionHash []byte
}

func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *IconChainProcessor {

Check warning on line 79 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L79

Added line #L79 was not covered by tests
return &IconChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),

Check warning on line 81 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L81

Added line #L81 was not covered by tests
chainProvider: provider,
latestClientState: make(latestClientState),
connectionStateCache: make(processor.ConnectionStateCache),
Expand All @@ -86,7 +86,7 @@
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,

Check warning on line 89 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L89

Added line #L89 was not covered by tests
}
}

Expand Down Expand Up @@ -152,29 +152,30 @@
}

// start_query_cycle
icp.log.Debug("Starting query cycle")

Check warning on line 155 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L155

Added line #L155 was not covered by tests
err := icp.monitoring(ctx, &persistence)
return err
}

func (icp *IconChainProcessor) StartFromHeight(ctx context.Context) int {
func (icp *IconChainProcessor) StartFromHeight(ctx context.Context) int64 {

Check warning on line 160 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L160

Added line #L160 was not covered by tests
cfg := icp.Provider().ProviderConfig().(*IconProviderConfig)

Check warning on line 162 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L162

Added line #L162 was not covered by tests
if cfg.StartHeight != 0 {
return int(cfg.StartHeight)
return cfg.StartHeight

Check warning on line 164 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L164

Added line #L164 was not covered by tests
}
snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId())
if err != nil {
icp.log.Warn("Failed to load height from snapshot", zap.Error(err))
} else {
icp.log.Info("Obtained start height from config", zap.Int("height", snapshotHeight))
icp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight))
}
return snapshotHeight

Check warning on line 172 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L170-L172

Added lines #L170 - L172 were not covered by tests
}

func (icp *IconChainProcessor) getLastSavedHeight() int {
func (icp *IconChainProcessor) getLastSavedHeight() int64 {
snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId())
if err != nil || snapshotHeight < 0 {
return 0

Check warning on line 178 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L175-L178

Added lines #L175 - L178 were not covered by tests
}
return snapshotHeight
}
Expand All @@ -198,9 +199,9 @@
CounterpartyClientID: c.Counterparty.ClientId,
}] = c.State == conntypes.OPEN

icp.log.Debug("Found open connection",
zap.String("client-id ", c.ClientId),
zap.String("connection-id ", c.Id),

Check warning on line 204 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L202-L204

Added lines #L202 - L204 were not covered by tests
)
}
return nil
Expand Down Expand Up @@ -232,11 +233,11 @@
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN

icp.log.Debug("Found open channel",
zap.String("channel-id", ch.ChannelId),
zap.String("port-id ", ch.PortId),
zap.String("counterparty-channel-id", ch.Counterparty.ChannelId),
zap.String("counterparty-port-id", ch.Counterparty.PortId))

Check warning on line 240 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L236-L240

Added lines #L236 - L240 were not covered by tests
}

return nil
Expand Down Expand Up @@ -275,18 +276,24 @@
}

var err error
// processedheight := int64(icp.chainProvider.lastBTPBlockHeight)
// if processedheight == 0 {
processedheight := int64(icp.StartFromHeight(ctx))
processedheight := icp.StartFromHeight(ctx)
latestHeight, err := icp.chainProvider.QueryLatestHeight(ctx)
if err != nil {
icp.log.Error("Error fetching block", zap.Error(err))
return err
}
if processedheight > latestHeight {
icp.log.Warn("Start height set is greater than latest height",
zap.Int64("start height", processedheight),
zap.Int64("latest Height", latestHeight),
)
processedheight = latestHeight
}

Check warning on line 291 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L279-L291

Added lines #L279 - L291 were not covered by tests
if processedheight <= 0 {
processedheight, err = icp.chainProvider.QueryLatestHeight(ctx)
if err != nil {
fmt.Println("Error fetching latest block")
return err
}
processedheight = latestHeight

Check warning on line 293 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L293

Added line #L293 was not covered by tests
}

icp.log.Info("Start to query from height", zap.Int64("height", processedheight))

Check warning on line 296 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L296

Added line #L296 was not covered by tests
// subscribe to monitor block
ctxMonitorBlock, cancelMonitorBlock := context.WithCancel(ctx)
reconnect()
Expand All @@ -296,7 +303,7 @@
icp.firstTime = true

blockReq := &types.BlockRequest{
Height: types.NewHexInt(int64(processedheight)),

Check warning on line 306 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L306

Added line #L306 was not covered by tests
EventFilters: GetMonitorEventFilters(icp.chainProvider.PCfg.IbcHandlerAddress),
}

Expand All @@ -308,8 +315,8 @@
case err := <-errCh:
return err

case <-icp.heightSnapshotChan:
icp.SnapshotHeight(icp.getHeightToSave(int64(icp.latestBlock.Height)))

Check warning on line 319 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L318-L319

Added lines #L318 - L319 were not covered by tests

case <-reconnectCh:
cancelMonitorBlock()
Expand All @@ -317,7 +324,7 @@

go func(ctx context.Context, cancel context.CancelFunc) {
blockReq.Height = types.NewHexInt(processedheight)
icp.log.Debug("Try to reconnect from", zap.Int64("height", processedheight))

Check warning on line 327 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L327

Added line #L327 was not covered by tests
err := icp.chainProvider.client.MonitorBlock(ctx, blockReq, func(conn *websocket.Conn, v *types.BlockNotification) error {
if !errors.Is(ctx.Err(), context.Canceled) {
btpBlockNotifCh <- v
Expand All @@ -326,10 +333,10 @@
}, func(conn *websocket.Conn) {
}, func(conn *websocket.Conn, err error) {})
if err != nil {
ht := icp.getHeightToSave(processedheight)
if ht != icp.getLastSavedHeight() {
icp.SnapshotHeight(ht)
}

Check warning on line 339 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L336-L339

Added lines #L336 - L339 were not covered by tests
if errors.Is(err, context.Canceled) {
return
}
Expand All @@ -345,16 +352,16 @@
err := icp.verifyBlock(ctx, br.Header)
if err != nil {
reconnect()
icp.log.Warn("Failed to verify BTP Block",
icp.log.Warn("Failed to verify BTP block",
zap.Int64("height", br.Height),

Check warning on line 356 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L355-L356

Added lines #L355 - L356 were not covered by tests
zap.Error(err),
)
break
}

icp.log.Debug("Verified block ",
zap.Int64("height", int64(processedheight)))

Check warning on line 364 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L362-L364

Added lines #L362 - L364 were not covered by tests
icp.latestBlock = provider.LatestBlock{
Height: uint64(processedheight),
}
Expand All @@ -367,7 +374,7 @@
}

ibcHeaderCache[uint64(br.Height)] = br.Header
icp.log.Debug("Queried block ",

Check warning on line 377 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L377

Added line #L377 was not covered by tests
zap.Int64("height", br.Height))
err = icp.handlePathProcessorUpdate(ctx, br.Header, ibcMessageCache, ibcHeaderCache.Clone())
if err != nil {
Expand All @@ -378,9 +385,9 @@
break
}
time.Sleep(10 * time.Millisecond)
if icp.firstTime {
time.Sleep(4000 * time.Millisecond)
}

Check warning on line 390 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L388-L390

Added lines #L388 - L390 were not covered by tests
icp.firstTime = false
if br = nil; len(btpBlockRespCh) > 0 {
br = <-btpBlockRespCh
Expand All @@ -402,7 +409,7 @@
if err != nil {
return err
} else if height != processedheight+i {
icp.log.Warn("Reconnect: missing block notification",

Check warning on line 412 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L412

Added line #L412 was not covered by tests
zap.Int64("got", height),
zap.Int64("expected", processedheight+i),
)
Expand Down Expand Up @@ -475,20 +482,20 @@
}
}

func (icp *IconChainProcessor) getHeightToSave(height int64) int {
func (icp *IconChainProcessor) getHeightToSave(height int64) int64 {

Check warning on line 485 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L485

Added line #L485 was not covered by tests
retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
ht := height - int64(retryAfter)
if ht < 0 {
return 0
}
return ht

Check warning on line 491 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L487-L491

Added lines #L487 - L491 were not covered by tests
}

func (icp *IconChainProcessor) SnapshotHeight(height int) {
icp.log.Info("Save height for snapshot", zap.Int("height", height))
func (icp *IconChainProcessor) SnapshotHeight(height int64) {
icp.log.Info("Save height for snapshot", zap.Int64("height", height))
err := rlycommon.SnapshotHeight(icp.Provider().ChainId(), height)
if err != nil {
icp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))
icp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height))

Check warning on line 498 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L494-L498

Added lines #L494 - L498 were not covered by tests
}
}

Expand Down Expand Up @@ -547,8 +554,8 @@
icp.verifier.nextProofContext = header.Validators
icp.verifier.verifiedHeight = int64(header.Height())
icp.verifier.prevNetworkSectionHash = types.NewNetworkSection(header.Header).Hash()
icp.log.Debug("Verified block ",
zap.Uint64("height", header.Height()))

Check warning on line 558 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L557-L558

Added lines #L557 - L558 were not covered by tests
return nil
}

Expand Down Expand Up @@ -625,8 +632,8 @@
request.err = errors.Wrapf(err, "event.UnmarshalFromBytes: %v", err)
return
}
icp.log.Info("Detected eventlog ", zap.Int64("height", request.height),
zap.String("eventlog", IconCosmosEventMap[string(el.Indexed[0])]))

Check warning on line 636 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L635-L636

Added lines #L635 - L636 were not covered by tests
eventlogs = append(eventlogs, el)
}

Expand All @@ -652,7 +659,7 @@
request.response.IsProcessed = processed
return
}
request.err = errors.Wrapf(err, "Failed to get btp header: %v", err)

Check warning on line 662 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L662

Added line #L662 was not covered by tests
return
}
request.response.Header = NewIconIBCHeader(btpHeader, validators, int64(btpHeader.MainHeight))
Expand Down
4 changes: 2 additions & 2 deletions relayer/chains/mock/mock_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func NewMockChainProcessor(ctx context.Context, log *zap.Logger, chainID string,
}
}

func (mcp *MockChainProcessor) SnapshotHeight(height int) {
func (mcp *MockChainProcessor) SnapshotHeight(height int64) {
panic("")
}

func (mcp *MockChainProcessor) StartFromHeight(ctx context.Context) int {
func (mcp *MockChainProcessor) StartFromHeight(ctx context.Context) int64 {
return 0
}

Expand Down
4 changes: 2 additions & 2 deletions relayer/chains/penumbra/penumbra_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,11 @@ func (pcp *PenumbraChainProcessor) initializeChannelState(ctx context.Context) e
return nil
}

func (ccp *PenumbraChainProcessor) SnapshotHeight(height int) {
func (ccp *PenumbraChainProcessor) SnapshotHeight(height int64) {
panic("Not implemented for Penumbra")
}

func (ccp *PenumbraChainProcessor) StartFromHeight(ctx context.Context) int {
func (ccp *PenumbraChainProcessor) StartFromHeight(ctx context.Context) int64 {
panic("Not implemented for Penumbra")
}

Expand Down
25 changes: 11 additions & 14 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
Header *types.LightBlock
}

func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *WasmChainProcessor {

Check warning on line 69 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L69

Added line #L69 was not covered by tests
return &WasmChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand All @@ -76,7 +76,7 @@
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,

Check warning on line 79 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L79

Added line #L79 was not covered by tests
}
}

Expand Down Expand Up @@ -178,7 +178,7 @@
// clientState will return the most recent client state if client messages
// have already been observed for the clientID, otherwise it will query for it.
func (ccp *WasmChainProcessor) clientState(ctx context.Context, clientID string) (provider.ClientState, error) {
if state, ok := ccp.latestClientState[clientID]; ok {

Check warning on line 181 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L181

Added line #L181 was not covered by tests
return state, nil
}
cs, err := ccp.chainProvider.QueryClientState(ctx, int64(ccp.latestBlock.Height), clientID)
Expand All @@ -203,16 +203,16 @@
balanceUpdateWaitDuration time.Duration
}

func (ccp *WasmChainProcessor) StartFromHeight(ctx context.Context) int {
func (ccp *WasmChainProcessor) StartFromHeight(ctx context.Context) int64 {

Check warning on line 206 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L206

Added line #L206 was not covered by tests
cfg := ccp.Provider().ProviderConfig().(*WasmProviderConfig)
if cfg.StartHeight != 0 {
return int(cfg.StartHeight)
return int64(cfg.StartHeight)

Check warning on line 209 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L209

Added line #L209 was not covered by tests
}
snapshotHeight, err := common.LoadSnapshotHeight(ccp.Provider().ChainId())
if err != nil {
ccp.log.Warn("Failed to load height from snapshot", zap.Error(err))
} else {
ccp.log.Info("Obtained start height from config", zap.Int("height", snapshotHeight))
ccp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight))

Check warning on line 215 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L215

Added line #L215 was not covered by tests
}
return snapshotHeight
}
Expand All @@ -223,7 +223,7 @@
func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
// this will be used for persistence across query cycle loop executions
persistence := queryCyclePersistence{
minQueryLoopDuration: time.Duration(ccp.chainProvider.PCfg.BlockInterval * uint64(common.NanoToMilliRatio)),

Check warning on line 226 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L226

Added line #L226 was not covered by tests
lastBalanceUpdate: time.Unix(0, 0),
balanceUpdateWaitDuration: defaultBalanceUpdateWaitDuration,
}
Expand All @@ -248,16 +248,13 @@

// this will make initial QueryLoop iteration look back initialBlockHistory blocks in history
latestQueriedBlock := ccp.StartFromHeight(ctx)
if latestQueriedBlock < 0 {
latestQueriedBlock = int(persistence.latestHeight - int64(initialBlockHistory))
if latestQueriedBlock < 0 {
latestQueriedBlock = 0
}
if latestQueriedBlock <= 0 || latestQueriedBlock > persistence.latestHeight {
latestQueriedBlock = persistence.latestHeight

Check warning on line 252 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L251-L252

Added lines #L251 - L252 were not covered by tests
}

persistence.latestQueriedBlock = int64(latestQueriedBlock)

ccp.log.Info("Start to query from height ", zap.Int("height", latestQueriedBlock))
ccp.log.Info("Start to query from height ", zap.Int64("height", latestQueriedBlock))

Check warning on line 257 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L257

Added line #L257 was not covered by tests

_, lightBlock, err := ccp.chainProvider.QueryLightBlock(ctx, persistence.latestQueriedBlock)
if err != nil {
Expand Down Expand Up @@ -295,8 +292,8 @@
select {
case <-ctx.Done():
return nil
case <-ccp.heightSnapshotChan:
ccp.SnapshotHeight(ccp.getHeightToSave(persistence.latestHeight))

Check warning on line 296 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L295-L296

Added lines #L295 - L296 were not covered by tests
case <-ticker.C:
ticker.Reset(persistence.minQueryLoopDuration)
}
Expand All @@ -319,11 +316,11 @@
CounterpartyConnID: c.Counterparty.ConnectionId,
CounterpartyClientID: c.Counterparty.ClientId,
}] = c.State == conntypes.OPEN

ccp.log.Debug("Found open connection",
zap.String("client-id ", c.ClientId),
zap.String("connection-id ", c.Id),
)

Check warning on line 323 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L319-L323

Added lines #L319 - L323 were not covered by tests
}
return nil
}
Expand Down Expand Up @@ -352,11 +349,11 @@
CounterpartyChannelID: ch.Counterparty.ChannelId,
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN
ccp.log.Debug("Found open channel",
zap.String("channel-id", ch.ChannelId),
zap.String("port-id ", ch.PortId),
zap.String("counterparty-channel-id", ch.Counterparty.ChannelId),
zap.String("counterparty-port-id", ch.Counterparty.PortId))

Check warning on line 356 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L352-L356

Added lines #L352 - L356 were not covered by tests
}
return nil
}
Expand All @@ -370,16 +367,16 @@
zap.Uint("attempts", latestHeightQueryRetries),
zap.Error(err),
)

// TODO: Save height when node status is false?
// ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight))

Check warning on line 372 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L370-L372

Added lines #L370 - L372 were not covered by tests
return nil
}

persistence.latestHeight = status.SyncInfo.LatestBlockHeight
// ccp.chainProvider.setCometVersion(ccp.log, status.NodeInfo.Version)

ccp.log.Debug("Queried latest height",

Check warning on line 379 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L379

Added line #L379 was not covered by tests
zap.Int64("latest_height", persistence.latestHeight),
)

Expand Down Expand Up @@ -412,14 +409,14 @@
chainID := ccp.chainProvider.ChainId()
var latestHeader provider.IBCHeader

syncUpHeight := func() int64 {
if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch {
return persistence.latestQueriedBlock + MaxBlockFetch
}
return persistence.latestHeight

Check warning on line 416 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L412-L416

Added lines #L412 - L416 were not covered by tests
}

for i := persistence.latestQueriedBlock + 1; i <= syncUpHeight(); i++ {

Check warning on line 419 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L419

Added line #L419 was not covered by tests
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
var lightBlock *types.LightBlock
Expand All @@ -443,7 +440,7 @@
}

if err := ccp.Verify(ctx, lightBlock); err != nil {
ccp.log.Warn("Failed to verify block", zap.Int64("height", blockRes.Height), zap.Error(err))

Check warning on line 443 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L443

Added line #L443 was not covered by tests
return err
}

Expand All @@ -469,7 +466,7 @@
messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64, ccp.chainProvider.PCfg.IbcHandlerAddress, base64Encoded)

for _, m := range messages {
ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", heightUint64))

Check warning on line 469 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L469

Added line #L469 was not covered by tests
ccp.handleMessage(ctx, m, ibcMessagesCache)
}
}
Expand Down Expand Up @@ -519,20 +516,20 @@
return nil
}

func (ccp *WasmChainProcessor) getHeightToSave(height int64) int {
func (ccp *WasmChainProcessor) getHeightToSave(height int64) int64 {

Check warning on line 519 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L519

Added line #L519 was not covered by tests
retryAfter := ccp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
ht := height - int64(retryAfter)
if ht < 0 {
return 0
}
return ht

Check warning on line 525 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L521-L525

Added lines #L521 - L525 were not covered by tests
}

func (ccp *WasmChainProcessor) SnapshotHeight(height int) {
ccp.log.Info("Save height for snapshot", zap.Int("height", height))
func (ccp *WasmChainProcessor) SnapshotHeight(height int64) {
ccp.log.Info("Save height for snapshot", zap.Int64("height", height))
err := common.SnapshotHeight(ccp.Provider().ChainId(), height)
if err != nil {
ccp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))
ccp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height))

Check warning on line 532 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L528-L532

Added lines #L528 - L532 were not covered by tests
}
}

Expand Down
6 changes: 3 additions & 3 deletions relayer/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func getSnapshotPath(chain_name string) (string, error) {
return snapshot, nil
}

func SnapshotHeight(chain_id string, height int) error {
func SnapshotHeight(chain_id string, height int64) error {
snapshot, err := getSnapshotPath(chain_id)
if err != nil {
return fmt.Errorf("Failed to find snapshot path, %w", err)
Expand All @@ -46,7 +46,7 @@ func SnapshotHeight(chain_id string, height int) error {
return nil
}

func LoadSnapshotHeight(chain_id string) (int, error) {
func LoadSnapshotHeight(chain_id string) (int64, error) {
snapshot, err := getSnapshotPath(chain_id)
if err != nil {
return -1, fmt.Errorf("Failed to find snapshot path, %w", err)
Expand All @@ -56,5 +56,5 @@ func LoadSnapshotHeight(chain_id string) (int, error) {
if err != nil {
return -1, fmt.Errorf("Failed reading file, %w", err)
}
return strconv.Atoi(strings.TrimSuffix(string(content), "\n"))
return strconv.ParseInt(strings.TrimSuffix(string(content), "\n"), 10, 64)
}
4 changes: 2 additions & 2 deletions relayer/processor/chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ type ChainProcessor interface {

// Take snapshot of height every N blocks or when the chain processor fails, so that the relayer
// can restart from that height
SnapshotHeight(height int)
SnapshotHeight(height int64)

// If the relay goes down, start chain processor from height returned by this function
// CAN return max(snapshotHeight, latestHeightFromClient)
StartFromHeight(ctx context.Context) int
StartFromHeight(ctx context.Context) int64
}

// ChainProcessors is a slice of ChainProcessor instances.
Expand Down
Loading