diff --git a/config.yaml b/config.yaml index 0ef6372..575ce99 100644 --- a/config.yaml +++ b/config.yaml @@ -35,4 +35,6 @@ profiler: state_contract_cfg: issuer_id: ['', ''] - disable_filtration: true \ No newline at end of file + disable_filtration: true + states_per_request: 15 + max_blocks_per_request: 100 \ No newline at end of file diff --git a/internal/config/states.go b/internal/config/states.go index b12ef2c..c9c6d05 100644 --- a/internal/config/states.go +++ b/internal/config/states.go @@ -7,8 +7,10 @@ import ( ) type StateV2Config struct { - IssuerID []string `fig:"issuer_id"` - DisableFiltration bool `fig:"disable_filtration"` + IssuerID []string `fig:"issuer_id"` + DisableFiltration bool `fig:"disable_filtration"` + StatesPerRequest int64 `fig:"states_per_request"` + MaxBlocksPerRequest uint64 `fig:"max_blocks_per_request"` } func (c *config) States() StateV2Config { diff --git a/internal/rarimo/stateupdate.go b/internal/rarimo/stateupdate.go index a25ce15..2f0a035 100644 --- a/internal/rarimo/stateupdate.go +++ b/internal/rarimo/stateupdate.go @@ -4,9 +4,8 @@ import ( "context" "math/big" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common/hexutil" statebind "github.com/rarimo/evm-identity-saver-svc/pkg/state" oracletypes "github.com/rarimo/rarimo-core/x/oraclemanager/types" @@ -28,11 +27,12 @@ type StateUpdateMessageMaker struct { txCreatorAddr string contract string homeChain string + statesPerRequest int64 stateDataProvider StateDataProvider } -func NewStateUpdateMessageMaker(txCreatorAddr string, contract string, homeChain string, stateDataProvider StateDataProvider) *StateUpdateMessageMaker { - return &StateUpdateMessageMaker{txCreatorAddr: txCreatorAddr, contract: contract, homeChain: homeChain, stateDataProvider: stateDataProvider} +func NewStateUpdateMessageMaker(txCreatorAddr string, contract string, homeChain string, statesPerRequest int64, stateDataProvider StateDataProvider) *StateUpdateMessageMaker { + return &StateUpdateMessageMaker{txCreatorAddr: txCreatorAddr, contract: contract, homeChain: homeChain, statesPerRequest: statesPerRequest, stateDataProvider: stateDataProvider} } func (m *StateUpdateMessageMaker) StateUpdateMsgByBlock(ctx context.Context, issuer, block *big.Int) (*oracletypes.MsgCreateIdentityStateTransferOp, error) { @@ -138,32 +138,32 @@ func (m *StateUpdateMessageMaker) getStatesOnBlock(ctx context.Context, issuer, return nil, nil, errors.Wrap(err, "failed to get overall states count") } - if length.Cmp(big.NewInt(1)) == 0 { - // It is a new state. Only one state transition exist. Ignore that state transition. - // FIXME + if length.Cmp(big.NewInt(m.statesPerRequest)) == 0 { + // We need more states on contract. Ignore that state transition. return nil, nil, nil } + length = new(big.Int).Sub(length, big.NewInt(m.statesPerRequest)) + for { states, err := m.stateDataProvider.GetStateInfoHistoryById(&bind.CallOpts{ Context: ctx, - }, issuer, new(big.Int).Sub(length, big.NewInt(2)), big.NewInt(2)) + }, issuer, length, big.NewInt(m.statesPerRequest)) if err != nil { return nil, nil, errors.Wrap(err, "failed to get last two states") } - replacedState := states[0] - latestState := states[1] - - if latestState.CreatedAtBlock.Cmp(block) == 0 { - return &latestState, &replacedState, nil + for i := 1; i < len(states); i++ { + replacedState := states[0] + latestState := states[1] + if latestState.CreatedAtBlock.Cmp(block) == 0 { + return &latestState, &replacedState, nil + } } - // FIXME maybe increase step to reduce RPC calls amount - length.Sub(length, big.NewInt(1)) - - if length.Cmp(big.NewInt(1)) == 0 { + length = new(big.Int).Sub(length, big.NewInt(m.statesPerRequest-1)) + if length.Cmp(big.NewInt(1)) <= 0 { return nil, nil, errors.Wrap(err, "requested state on block does not exist") } } @@ -177,32 +177,33 @@ func (m *StateUpdateMessageMaker) getGISTsOnBlock(ctx context.Context, block *bi return nil, nil, errors.Wrap(err, "failed to get overall gists count") } - if length.Cmp(big.NewInt(1)) == 0 { - // Only one state transition exist. Ignore that state transition. - // FIXME + if length.Cmp(big.NewInt(m.statesPerRequest)) == 0 { + // We need more states on contract. Ignore that state transition. return nil, nil, nil } + length = new(big.Int).Sub(length, big.NewInt(m.statesPerRequest)) + for { gists, err := m.stateDataProvider.GetGISTRootHistory(&bind.CallOpts{ Context: ctx, - }, new(big.Int).Sub(length, big.NewInt(2)), big.NewInt(2)) + }, length, big.NewInt(m.statesPerRequest)) if err != nil { return nil, nil, errors.Wrap(err, "failed to get last two gists") } - replacedGIST := gists[0] - latestGIST := gists[1] + for i := 1; i < len(gists); i++ { + replacedGIST := gists[i-1] + latestGIST := gists[i] - if latestGIST.CreatedAtBlock.Cmp(block) == 0 { - return &latestGIST, &replacedGIST, nil + if latestGIST.CreatedAtBlock.Cmp(block) == 0 { + return &latestGIST, &replacedGIST, nil + } } - // FIXME maybe increase step to reduce RPC calls amount - length.Sub(length, big.NewInt(1)) - - if length.Cmp(big.NewInt(1)) == 0 { + length = new(big.Int).Sub(length, big.NewInt(m.statesPerRequest-1)) + if length.Cmp(big.NewInt(1)) <= 0 { return nil, nil, errors.Wrap(err, "requested gist on block does not exist") } } diff --git a/internal/services/evm/stateChangeListener.go b/internal/services/evm/stateChangeListener.go index 1a16056..78b0940 100644 --- a/internal/services/evm/stateChangeListener.go +++ b/internal/services/evm/stateChangeListener.go @@ -17,8 +17,6 @@ import ( "gitlab.com/distributed_lab/running" ) -const MaxBlocksPerRequest = 100 - func RunStateChangeListener(ctx context.Context, cfg config.Config) { const runnerName = "state_change_listener" @@ -54,11 +52,13 @@ func RunStateChangeListener(ctx context.Context, cfg config.Config) { cfg.Broadcaster().Sender(), cfg.Ethereum().ContractAddr.String(), cfg.Ethereum().NetworkName, + cfg.States().StatesPerRequest, stateData, ), filter: filter, fromBlock: cfg.Ethereum().StartFromBlock, blockWindow: cfg.Ethereum().BlockWindow, + maxBlocks: cfg.States().MaxBlocksPerRequest, } running.WithBackOff(ctx, log, runnerName, @@ -85,6 +85,7 @@ type stateChangeListener struct { filter func(string) bool fromBlock uint64 blockWindow uint64 + maxBlocks uint64 } func (l *stateChangeListener) subscription(ctx context.Context) error { @@ -100,9 +101,9 @@ func (l *stateChangeListener) subscription(ctx context.Context) error { return nil } - if l.fromBlock+MaxBlocksPerRequest < lastBlock { - l.log.Debugf("maxBlockPerRequest limit exceeded: setting last block to %d instead of %d", l.fromBlock+MaxBlocksPerRequest, lastBlock) - lastBlock = l.fromBlock + MaxBlocksPerRequest + if l.fromBlock+l.maxBlocks < lastBlock { + l.log.Debugf("maxBlockPerRequest limit exceeded: setting last block to %d instead of %d", l.fromBlock+l.maxBlocks, lastBlock) + lastBlock = l.fromBlock + l.maxBlocks } l.log.Infof("Starting subscription from %d to %d", l.fromBlock, lastBlock)