Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add missing mutex lock. #145

Merged
merged 8 commits into from
Sep 1, 2023
Merged
9 changes: 6 additions & 3 deletions relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@
prevNetworkSectionHash []byte
}

func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *IconChainProcessor {

Check warning on line 79 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L79

Added line #L79 was not covered by tests
return &IconChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),

Check warning on line 81 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L81

Added line #L81 was not covered by tests
chainProvider: provider,
latestClientState: make(latestClientState),
connectionStateCache: make(processor.ConnectionStateCache),
Expand All @@ -86,7 +86,7 @@
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,

Check warning on line 89 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L89

Added line #L89 was not covered by tests
}
}

Expand Down Expand Up @@ -152,7 +152,7 @@
}

// start_query_cycle
icp.log.Debug("Starting query cycle")

Check warning on line 155 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L155

Added line #L155 was not covered by tests
err := icp.monitoring(ctx, &persistence)
return err
}
Expand All @@ -171,12 +171,12 @@
return snapshotHeight
}

func (icp *IconChainProcessor) getLastSavedHeight() int {
snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId())
if err != nil || snapshotHeight < 0 {
return 0
}
return snapshotHeight

Check warning on line 179 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L174-L179

Added lines #L174 - L179 were not covered by tests
}

func (icp *IconChainProcessor) initializeConnectionState(ctx context.Context) error {
Expand All @@ -198,9 +198,9 @@
CounterpartyClientID: c.Counterparty.ClientId,
}] = c.State == conntypes.OPEN

icp.log.Debug("Found open connection",
zap.String("client-id ", c.ClientId),
zap.String("connection-id ", c.Id),

Check warning on line 203 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L201-L203

Added lines #L201 - L203 were not covered by tests
)
}
return nil
Expand Down Expand Up @@ -232,11 +232,11 @@
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN

icp.log.Debug("Found open channel",
zap.String("channel-id", ch.ChannelId),
zap.String("port-id ", ch.PortId),
zap.String("counterparty-channel-id", ch.Counterparty.ChannelId),
zap.String("counterparty-port-id", ch.Counterparty.PortId))

Check warning on line 239 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L235-L239

Added lines #L235 - L239 were not covered by tests
}

return nil
Expand Down Expand Up @@ -285,9 +285,8 @@
return err
}
}
// }

icp.log.Info("Start to query from height", zap.Int64("height", processedheight))

Check warning on line 289 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L289

Added line #L289 was not covered by tests
// subscribe to monitor block
ctxMonitorBlock, cancelMonitorBlock := context.WithCancel(ctx)
reconnect()
Expand All @@ -297,7 +296,7 @@
icp.firstTime = true

blockReq := &types.BlockRequest{
Height: types.NewHexInt(int64(icp.chainProvider.PCfg.StartHeight)),
Height: types.NewHexInt(int64(processedheight)),

Check warning on line 299 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L299

Added line #L299 was not covered by tests
EventFilters: GetMonitorEventFilters(icp.chainProvider.PCfg.IbcHandlerAddress),
}

Expand All @@ -309,8 +308,8 @@
case err := <-errCh:
return err

case <-icp.heightSnapshotChan:
icp.SnapshotHeight(icp.getHeightToSave(int64(icp.latestBlock.Height)))

Check warning on line 312 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L311-L312

Added lines #L311 - L312 were not covered by tests

case <-reconnectCh:
cancelMonitorBlock()
Expand All @@ -318,7 +317,7 @@

go func(ctx context.Context, cancel context.CancelFunc) {
blockReq.Height = types.NewHexInt(processedheight)
icp.log.Debug("Try to reconnect from", zap.Int64("height", processedheight))

Check warning on line 320 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L320

Added line #L320 was not covered by tests
err := icp.chainProvider.client.MonitorBlock(ctx, blockReq, func(conn *websocket.Conn, v *types.BlockNotification) error {
if !errors.Is(ctx.Err(), context.Canceled) {
btpBlockNotifCh <- v
Expand All @@ -327,10 +326,10 @@
}, func(conn *websocket.Conn) {
}, func(conn *websocket.Conn, err error) {})
if err != nil {
ht := icp.getHeightToSave(processedheight)
if ht != icp.getLastSavedHeight() {
icp.SnapshotHeight(ht)
}

Check warning on line 332 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L329-L332

Added lines #L329 - L332 were not covered by tests
if errors.Is(err, context.Canceled) {
return
}
Expand All @@ -346,16 +345,16 @@
err := icp.verifyBlock(ctx, br.Header)
if err != nil {
reconnect()
icp.log.Warn("Failed to verify BTP Block",
zap.Int64("height", br.Height),

Check warning on line 349 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L348-L349

Added lines #L348 - L349 were not covered by tests
zap.Error(err),
)
break
}

icp.log.Debug("Verified block ",
zap.Int64("height", int64(processedheight)))

Check warning on line 357 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L355-L357

Added lines #L355 - L357 were not covered by tests
icp.latestBlock = provider.LatestBlock{
Height: uint64(processedheight),
}
Expand All @@ -368,7 +367,7 @@
}

ibcHeaderCache[uint64(br.Height)] = br.Header
icp.log.Debug("Queried block ",

Check warning on line 370 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L370

Added line #L370 was not covered by tests
zap.Int64("height", br.Height))
err = icp.handlePathProcessorUpdate(ctx, br.Header, ibcMessageCache, ibcHeaderCache.Clone())
if err != nil {
Expand All @@ -379,6 +378,9 @@
break
}
time.Sleep(10 * time.Millisecond)
if icp.firstTime {
time.Sleep(4000 * time.Millisecond)
}

Check warning on line 383 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L381-L383

Added lines #L381 - L383 were not covered by tests
icp.firstTime = false
if br = nil; len(btpBlockRespCh) > 0 {
br = <-btpBlockRespCh
Expand All @@ -400,7 +402,7 @@
if err != nil {
return err
} else if height != processedheight+i {
icp.log.Warn("Reconnect: missing block notification ",
icp.log.Warn("Reconnect: missing block notification",

Check warning on line 405 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L405

Added line #L405 was not covered by tests
zap.Int64("got", height),
zap.Int64("expected", processedheight+i),
)
Expand Down Expand Up @@ -473,20 +475,20 @@
}
}

func (icp *IconChainProcessor) getHeightToSave(height int64) int {

Check warning on line 478 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L478

Added line #L478 was not covered by tests
retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
if ht < 0 {
return 0
}
return ht

Check warning on line 484 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L480-L484

Added lines #L480 - L484 were not covered by tests
}

func (icp *IconChainProcessor) SnapshotHeight(height int) {
icp.log.Info("Save height for snapshot", zap.Int("height", height))
err := rlycommon.SnapshotHeight(icp.Provider().ChainId(), height)
if err != nil {
icp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))

Check warning on line 491 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L487-L491

Added lines #L487 - L491 were not covered by tests
}
}

Expand Down Expand Up @@ -545,8 +547,8 @@
icp.verifier.nextProofContext = header.Validators
icp.verifier.verifiedHeight = int64(header.Height())
icp.verifier.prevNetworkSectionHash = types.NewNetworkSection(header.Header).Hash()
icp.log.Debug("Verified block ",
zap.Uint64("height", header.Height()))

Check warning on line 551 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L550-L551

Added lines #L550 - L551 were not covered by tests
return nil
}

Expand Down Expand Up @@ -623,8 +625,8 @@
request.err = errors.Wrapf(err, "event.UnmarshalFromBytes: %v", err)
return
}
icp.log.Info("Detected eventlog ", zap.Int64("height", request.height),
zap.String("eventlog", IconCosmosEventMap[string(el.Indexed[0])]))

Check warning on line 629 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L628-L629

Added lines #L628 - L629 were not covered by tests
eventlogs = append(eventlogs, el)
}

Expand All @@ -650,7 +652,7 @@
request.response.IsProcessed = processed
return
}
request.err = errors.Wrapf(err, "Failed to get btp header: %v", err)

Check warning on line 655 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L655

Added line #L655 was not covered by tests
return
}
request.response.Header = NewIconIBCHeader(btpHeader, validators, int64(btpHeader.MainHeight))
Expand Down Expand Up @@ -697,6 +699,7 @@
if state, ok := icp.latestClientState[clientID]; ok {
return state, nil
}

cs, err := icp.chainProvider.QueryClientStateWithoutProof(ctx, int64(icp.latestBlock.Height), clientID)
if err != nil {
return provider.ClientState{}, err
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/icon/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,20 +191,20 @@
//ChainProvider Methods

func (icp *IconProvider) Init(ctx context.Context) error {
// if _, err := os.Stat(icp.PCfg.Keystore); err != nil {
// return err
// }

// ksByte, err := os.ReadFile(icp.PCfg.Keystore)
// if err != nil {
// return err
// }

// wallet, err := wallet.NewFromKeyStore(ksByte, []byte(icp.PCfg.Password))
// if err != nil {
// return err
// }
// icp.AddWallet(wallet)

Check warning on line 207 in relayer/chains/icon/provider.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/provider.go#L194-L207

Added lines #L194 - L207 were not covered by tests
return nil
}

Expand All @@ -225,7 +225,7 @@
return nil, fmt.Errorf("Blockinterval cannot be empty in Icon config")
}

trustingBlockPeriod := uint64(dstTrustingPeriod) / (icp.PCfg.BlockInterval * uint64(common.NanosecondRatio))
trustingBlockPeriod := uint64(dstTrustingPeriod) / (icp.PCfg.BlockInterval * uint64(common.NanoToMilliRatio))

Check warning on line 228 in relayer/chains/icon/provider.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/provider.go#L228

Added line #L228 was not covered by tests

return &icon.ClientState{
// In case of Icon: Trusting Period is block Difference // see: light.proto in ibc-integration
Expand Down Expand Up @@ -262,7 +262,7 @@

func (icp *IconProvider) ConnectionProof(ctx context.Context, msgOpenAck provider.ConnectionInfo, height uint64) (provider.ConnectionProof, error) {

connState, err := icp.QueryConnection(ctx, int64(msgOpenAck.Height), msgOpenAck.ConnID)

Check warning on line 265 in relayer/chains/icon/provider.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/provider.go#L265

Added line #L265 was not covered by tests
if err != nil {
return provider.ConnectionProof{}, err
}
Expand All @@ -273,16 +273,16 @@
}

func (icp *IconProvider) ChannelProof(ctx context.Context, msg provider.ChannelInfo, height uint64) (provider.ChannelProof, error) {

Check warning on line 276 in relayer/chains/icon/provider.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/provider.go#L276

Added line #L276 was not covered by tests
channelResult, err := icp.QueryChannel(ctx, int64(msg.Height), msg.ChannelID, msg.PortID)
if err != nil {
return provider.ChannelProof{}, nil
}
return provider.ChannelProof{
Proof: channelResult.Proof,
ProofHeight: channelResult.ProofHeight,
Ordering: chantypes.Order(channelResult.Channel.GetOrdering()),
Version: channelResult.Channel.Version,

Check warning on line 285 in relayer/chains/icon/provider.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/provider.go#L282-L285

Added lines #L282 - L285 were not covered by tests
}, nil
}

Expand Down Expand Up @@ -332,7 +332,7 @@
}
return provider.PacketProof{
Proof: packetAckResponse.Proof,
ProofHeight: packetAckResponse.ProofHeight,

Check warning on line 335 in relayer/chains/icon/provider.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/provider.go#L335

Added line #L335 was not covered by tests
}, nil

}
Expand Down Expand Up @@ -435,7 +435,7 @@
}

func (icp *IconProvider) Key() string {
return icp.PCfg.Keystore

Check warning on line 438 in relayer/chains/icon/provider.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/provider.go#L438

Added line #L438 was not covered by tests
}

func (icp *IconProvider) Wallet() (module.Wallet, error) {
Expand All @@ -443,7 +443,7 @@
}

func (icp *IconProvider) Address() (string, error) {
return icp.ShowAddress(icp.PCfg.Keystore)

Check warning on line 446 in relayer/chains/icon/provider.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/provider.go#L446

Added line #L446 was not covered by tests
}

func (icp *IconProvider) Timeout() string {
Expand Down
30 changes: 23 additions & 7 deletions relayer/chains/wasm/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"time"

wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types"
"github.com/avast/retry-go/v4"
abci "github.com/cometbft/cometbft/abci/types"
rpcclient "github.com/cometbft/cometbft/rpc/client"
tmtypes "github.com/cometbft/cometbft/types"
Expand All @@ -19,6 +20,7 @@
"github.com/cosmos/gogoproto/proto"
tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint"
"github.com/icon-project/IBC-Integration/libraries/go/common/icon"
"go.uber.org/zap"

querytypes "github.com/cosmos/cosmos-sdk/types/query"
bankTypes "github.com/cosmos/cosmos-sdk/x/bank/types"
Expand Down Expand Up @@ -313,33 +315,45 @@
}

func (ap *WasmProvider) QueryClientConsensusState(ctx context.Context, chainHeight int64, clientid string, clientHeight ibcexported.Height) (*clienttypes.QueryConsensusStateResponse, error) {

consensusStateParam, err := types.NewConsensusStateByHeight(clientid, uint64(clientHeight.GetRevisionHeight())).Bytes()

Check warning on line 319 in relayer/chains/wasm/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/query.go#L318-L319

Added lines #L318 - L319 were not covered by tests
consensusState, err := ap.QueryIBCHandlerContractProcessed(ctx, consensusStateParam)
if err != nil {
return nil, err
}

cdc := codec.NewProtoCodec(ap.Cdc.InterfaceRegistry)
csState, err := clienttypes.UnmarshalConsensusState(cdc, consensusState)
if err != nil {

Check warning on line 327 in relayer/chains/wasm/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/query.go#L325-L327

Added lines #L325 - L327 were not covered by tests
return nil, err
}

anyConsensusState, err := clienttypes.PackConsensusState(csState)

Check warning on line 331 in relayer/chains/wasm/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/query.go#L331

Added line #L331 was not covered by tests
if err != nil {
return nil, err
}
return clienttypes.NewQueryConsensusStateResponse(anyConsensusState, nil, clienttypes.NewHeight(0, uint64(chainHeight))), nil
}

func (ap *WasmProvider) QueryIBCHandlerContract(ctx context.Context, param wasmtypes.RawContractMessage) (*wasmtypes.QuerySmartContractStateResponse, error) {
done := ap.SetSDKContext()
defer done()
return ap.QueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{
Address: ap.PCfg.IbcHandlerAddress,
QueryData: param,
})
func (ap *WasmProvider) QueryIBCHandlerContract(ctx context.Context, param wasmtypes.RawContractMessage) (op *wasmtypes.QuerySmartContractStateResponse, err error) {
return op, retry.Do(func() error {
done := ap.SetSDKContext()
defer done()
op, err = ap.QueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{
Address: ap.PCfg.IbcHandlerAddress,
QueryData: param,
})
return err
}, retry.Context(ctx), retry.Attempts(latestHeightQueryRetries), retry.Delay(50*time.Millisecond), retry.LastErrorOnly(true), retry.OnRetry(func(n uint, err error) {
ap.log.Error(
"Failed to query",
zap.Uint("attempt", n+1),
zap.Uint("max_attempts", latestHeightQueryRetries),
zap.Any("Param", param),
zap.Error(err),
)
}))

Check warning on line 355 in relayer/chains/wasm/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/query.go#L338-L355

Added lines #L338 - L355 were not covered by tests

}

func (ap *WasmProvider) QueryIBCHandlerContractProcessed(ctx context.Context, param wasmtypes.RawContractMessage) ([]byte, error) {
Expand Down Expand Up @@ -492,6 +506,8 @@
}

func (ap *WasmProvider) QueryWasmProof(ctx context.Context, storageKey []byte, height int64) ([]byte, error) {
done := ap.SetSDKContext()
defer done()

Check warning on line 510 in relayer/chains/wasm/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/query.go#L509-L510

Added lines #L509 - L510 were not covered by tests
ibcAddr, err := sdk.AccAddressFromBech32(ap.PCfg.IbcHandlerAddress)
if err != nil {
return nil, err
Expand Down
16 changes: 12 additions & 4 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
Header *types.LightBlock
}

func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *WasmChainProcessor {

Check warning on line 69 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L69

Added line #L69 was not covered by tests
return &WasmChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand All @@ -76,7 +76,7 @@
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,

Check warning on line 79 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L79

Added line #L79 was not covered by tests
}
}

Expand All @@ -90,6 +90,7 @@
defaultMinQueryLoopDuration = 1 * time.Second
defaultBalanceUpdateWaitDuration = 60 * time.Second
inSyncNumBlocksThreshold = 2
MaxBlockFetch = 100
)

// latestClientState is a map of clientID to the latest clientInfo for that client.
Expand Down Expand Up @@ -176,7 +177,7 @@
// clientState will return the most recent client state if client messages
// have already been observed for the clientID, otherwise it will query for it.
func (ccp *WasmChainProcessor) clientState(ctx context.Context, clientID string) (provider.ClientState, error) {
if state, ok := ccp.latestClientState[clientID]; ok && state.TrustingPeriod > 0 {
if state, ok := ccp.latestClientState[clientID]; ok {

Check warning on line 180 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L180

Added line #L180 was not covered by tests
return state, nil
}
cs, err := ccp.chainProvider.QueryClientState(ctx, int64(ccp.latestBlock.Height), clientID)
Expand Down Expand Up @@ -221,7 +222,7 @@
func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
// this will be used for persistence across query cycle loop executions
persistence := queryCyclePersistence{
minQueryLoopDuration: time.Duration(ccp.chainProvider.PCfg.BlockInterval * uint64(common.NanosecondRatio)),
minQueryLoopDuration: time.Duration(ccp.chainProvider.PCfg.BlockInterval * uint64(common.NanoToMilliRatio)),

Check warning on line 225 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L225

Added line #L225 was not covered by tests
lastBalanceUpdate: time.Unix(0, 0),
balanceUpdateWaitDuration: defaultBalanceUpdateWaitDuration,
}
Expand Down Expand Up @@ -293,8 +294,8 @@
select {
case <-ctx.Done():
return nil
case <-ccp.heightSnapshotChan:
ccp.SnapshotHeight(ccp.getHeightToSave(persistence.latestHeight))

Check warning on line 298 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L297-L298

Added lines #L297 - L298 were not covered by tests
case <-ticker.C:
ticker.Reset(persistence.minQueryLoopDuration)
}
Expand All @@ -317,11 +318,11 @@
CounterpartyConnID: c.Counterparty.ConnectionId,
CounterpartyClientID: c.Counterparty.ClientId,
}] = c.State == conntypes.OPEN

ccp.log.Debug("Found open connection",
zap.String("client-id ", c.ClientId),
zap.String("connection-id ", c.Id),
)

Check warning on line 325 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L321-L325

Added lines #L321 - L325 were not covered by tests
}
return nil
}
Expand Down Expand Up @@ -350,11 +351,11 @@
CounterpartyChannelID: ch.Counterparty.ChannelId,
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN
ccp.log.Debug("Found open channel",
zap.String("channel-id", ch.ChannelId),
zap.String("port-id ", ch.PortId),
zap.String("counterparty-channel-id", ch.Counterparty.ChannelId),
zap.String("counterparty-port-id", ch.Counterparty.PortId))

Check warning on line 358 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L354-L358

Added lines #L354 - L358 were not covered by tests
}
return nil
}
Expand All @@ -368,16 +369,16 @@
zap.Uint("attempts", latestHeightQueryRetries),
zap.Error(err),
)

// TODO: Save height when node status is false?
// ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight))

Check warning on line 374 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L372-L374

Added lines #L372 - L374 were not covered by tests
return nil
}

persistence.latestHeight = status.SyncInfo.LatestBlockHeight
// ccp.chainProvider.setCometVersion(ccp.log, status.NodeInfo.Version)

ccp.log.Debug("Queried latest height",

Check warning on line 381 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L381

Added line #L381 was not covered by tests
zap.Int64("latest_height", persistence.latestHeight),
)

Expand Down Expand Up @@ -410,7 +411,14 @@
chainID := ccp.chainProvider.ChainId()
var latestHeader provider.IBCHeader

for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ {
syncUpHeight := func() int64 {
if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch {
return persistence.latestQueriedBlock + MaxBlockFetch
}
return persistence.latestHeight

Check warning on line 418 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L414-L418

Added lines #L414 - L418 were not covered by tests
}

for i := persistence.latestQueriedBlock + 1; i <= syncUpHeight(); i++ {

Check warning on line 421 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L421

Added line #L421 was not covered by tests
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
var lightBlock *types.LightBlock
Expand All @@ -434,7 +442,7 @@
}

if err := ccp.Verify(ctx, lightBlock); err != nil {
ccp.log.Warn("Failed to verify block", zap.Int64("height", blockRes.Height), zap.Error(err))

Check warning on line 445 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L445

Added line #L445 was not covered by tests
return err
}

Expand All @@ -460,7 +468,7 @@
messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64, ccp.chainProvider.PCfg.IbcHandlerAddress, base64Encoded)

for _, m := range messages {
ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType))
ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", heightUint64))

Check warning on line 471 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L471

Added line #L471 was not covered by tests
ccp.handleMessage(ctx, m, ibcMessagesCache)
}
}
Expand Down Expand Up @@ -510,20 +518,20 @@
return nil
}

func (ccp *WasmChainProcessor) getHeightToSave(height int64) int {

Check warning on line 521 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L521

Added line #L521 was not covered by tests
retryAfter := ccp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
if ht < 0 {
return 0
}
return ht

Check warning on line 527 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L523-L527

Added lines #L523 - L527 were not covered by tests
}

func (ccp *WasmChainProcessor) SnapshotHeight(height int) {
ccp.log.Info("Save height for snapshot", zap.Int("height", height))
err := common.SnapshotHeight(ccp.Provider().ChainId(), height)
if err != nil {
ccp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))

Check warning on line 534 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L530-L534

Added lines #L530 - L534 were not covered by tests
}
}

Expand Down
2 changes: 1 addition & 1 deletion relayer/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
ConnectionKey = "connection"
ChannelKey = "channel"
ONE_HOUR = 60 * 60 * 1000
NanosecondRatio = 1000_000
NanoToMilliRatio = 1000_000
)

var (
Expand Down
Loading