Skip to content

Commit

Permalink
fixed fetching states and gists to reduce RCP calls
Browse files Browse the repository at this point in the history
  • Loading branch information
olegfomenko committed Dec 5, 2023
1 parent 6157053 commit f1b6ab9
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 37 deletions.
4 changes: 3 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ profiler:

state_contract_cfg:
issuer_id: ['', '']
disable_filtration: true
disable_filtration: true
states_per_request: 15
max_blocks_per_request: 100
6 changes: 4 additions & 2 deletions internal/config/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
)

type StateV2Config struct {
IssuerID []string `fig:"issuer_id"`
DisableFiltration bool `fig:"disable_filtration"`
IssuerID []string `fig:"issuer_id"`
DisableFiltration bool `fig:"disable_filtration"`
StatesPerRequest int64 `fig:"states_per_request"`
MaxBlocksPerRequest uint64 `fig:"max_blocks_per_request"`
}

func (c *config) States() StateV2Config {
Expand Down
59 changes: 30 additions & 29 deletions internal/rarimo/stateupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common/hexutil"

statebind "github.com/rarimo/evm-identity-saver-svc/pkg/state"
oracletypes "github.com/rarimo/rarimo-core/x/oraclemanager/types"
Expand All @@ -28,11 +27,12 @@ type StateUpdateMessageMaker struct {
txCreatorAddr string
contract string
homeChain string
statesPerRequest int64
stateDataProvider StateDataProvider
}

func NewStateUpdateMessageMaker(txCreatorAddr string, contract string, homeChain string, stateDataProvider StateDataProvider) *StateUpdateMessageMaker {
return &StateUpdateMessageMaker{txCreatorAddr: txCreatorAddr, contract: contract, homeChain: homeChain, stateDataProvider: stateDataProvider}
func NewStateUpdateMessageMaker(txCreatorAddr string, contract string, homeChain string, statesPerRequest int64, stateDataProvider StateDataProvider) *StateUpdateMessageMaker {
return &StateUpdateMessageMaker{txCreatorAddr: txCreatorAddr, contract: contract, homeChain: homeChain, statesPerRequest: statesPerRequest, stateDataProvider: stateDataProvider}
}

func (m *StateUpdateMessageMaker) StateUpdateMsgByBlock(ctx context.Context, issuer, block *big.Int) (*oracletypes.MsgCreateIdentityStateTransferOp, error) {
Expand Down Expand Up @@ -138,32 +138,32 @@ func (m *StateUpdateMessageMaker) getStatesOnBlock(ctx context.Context, issuer,
return nil, nil, errors.Wrap(err, "failed to get overall states count")
}

if length.Cmp(big.NewInt(1)) == 0 {
// It is a new state. Only one state transition exist. Ignore that state transition.
// FIXME
if length.Cmp(big.NewInt(m.statesPerRequest)) == 0 {
// We need more states on contract. Ignore that state transition.
return nil, nil, nil
}

length = new(big.Int).Sub(length, big.NewInt(m.statesPerRequest))

for {
states, err := m.stateDataProvider.GetStateInfoHistoryById(&bind.CallOpts{
Context: ctx,
}, issuer, new(big.Int).Sub(length, big.NewInt(2)), big.NewInt(2))
}, issuer, length, big.NewInt(m.statesPerRequest))

if err != nil {
return nil, nil, errors.Wrap(err, "failed to get last two states")
}

replacedState := states[0]
latestState := states[1]

if latestState.CreatedAtBlock.Cmp(block) == 0 {
return &latestState, &replacedState, nil
for i := 1; i < len(states); i++ {
replacedState := states[0]
latestState := states[1]
if latestState.CreatedAtBlock.Cmp(block) == 0 {
return &latestState, &replacedState, nil
}
}

// FIXME maybe increase step to reduce RPC calls amount
length.Sub(length, big.NewInt(1))

if length.Cmp(big.NewInt(1)) == 0 {
length = new(big.Int).Sub(length, big.NewInt(m.statesPerRequest-1))
if length.Cmp(big.NewInt(1)) <= 0 {
return nil, nil, errors.Wrap(err, "requested state on block does not exist")
}
}
Expand All @@ -177,32 +177,33 @@ func (m *StateUpdateMessageMaker) getGISTsOnBlock(ctx context.Context, block *bi
return nil, nil, errors.Wrap(err, "failed to get overall gists count")
}

if length.Cmp(big.NewInt(1)) == 0 {
// Only one state transition exist. Ignore that state transition.
// FIXME
if length.Cmp(big.NewInt(m.statesPerRequest)) == 0 {
// We need more states on contract. Ignore that state transition.
return nil, nil, nil
}

length = new(big.Int).Sub(length, big.NewInt(m.statesPerRequest))

for {
gists, err := m.stateDataProvider.GetGISTRootHistory(&bind.CallOpts{
Context: ctx,
}, new(big.Int).Sub(length, big.NewInt(2)), big.NewInt(2))
}, length, big.NewInt(m.statesPerRequest))

if err != nil {
return nil, nil, errors.Wrap(err, "failed to get last two gists")
}

replacedGIST := gists[0]
latestGIST := gists[1]
for i := 1; i < len(gists); i++ {
replacedGIST := gists[i-1]
latestGIST := gists[i]

if latestGIST.CreatedAtBlock.Cmp(block) == 0 {
return &latestGIST, &replacedGIST, nil
if latestGIST.CreatedAtBlock.Cmp(block) == 0 {
return &latestGIST, &replacedGIST, nil
}
}

// FIXME maybe increase step to reduce RPC calls amount
length.Sub(length, big.NewInt(1))

if length.Cmp(big.NewInt(1)) == 0 {
length = new(big.Int).Sub(length, big.NewInt(m.statesPerRequest-1))
if length.Cmp(big.NewInt(1)) <= 0 {
return nil, nil, errors.Wrap(err, "requested gist on block does not exist")
}
}
Expand Down
11 changes: 6 additions & 5 deletions internal/services/evm/stateChangeListener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"gitlab.com/distributed_lab/running"
)

const MaxBlocksPerRequest = 100

func RunStateChangeListener(ctx context.Context, cfg config.Config) {
const runnerName = "state_change_listener"

Expand Down Expand Up @@ -54,11 +52,13 @@ func RunStateChangeListener(ctx context.Context, cfg config.Config) {
cfg.Broadcaster().Sender(),
cfg.Ethereum().ContractAddr.String(),
cfg.Ethereum().NetworkName,
cfg.States().StatesPerRequest,
stateData,
),
filter: filter,
fromBlock: cfg.Ethereum().StartFromBlock,
blockWindow: cfg.Ethereum().BlockWindow,
maxBlocks: cfg.States().MaxBlocksPerRequest,
}

running.WithBackOff(ctx, log, runnerName,
Expand All @@ -85,6 +85,7 @@ type stateChangeListener struct {
filter func(string) bool
fromBlock uint64
blockWindow uint64
maxBlocks uint64
}

func (l *stateChangeListener) subscription(ctx context.Context) error {
Expand All @@ -100,9 +101,9 @@ func (l *stateChangeListener) subscription(ctx context.Context) error {
return nil
}

if l.fromBlock+MaxBlocksPerRequest < lastBlock {
l.log.Debugf("maxBlockPerRequest limit exceeded: setting last block to %d instead of %d", l.fromBlock+MaxBlocksPerRequest, lastBlock)
lastBlock = l.fromBlock + MaxBlocksPerRequest
if l.fromBlock+l.maxBlocks < lastBlock {
l.log.Debugf("maxBlockPerRequest limit exceeded: setting last block to %d instead of %d", l.fromBlock+l.maxBlocks, lastBlock)
lastBlock = l.fromBlock + l.maxBlocks
}

l.log.Infof("Starting subscription from %d to %d", l.fromBlock, lastBlock)
Expand Down

0 comments on commit f1b6ab9

Please sign in to comment.