Skip to content

Commit

Permalink
Merge pull request #467 from DefiantLabs/feat/rpc-incremental-backoff
Browse files Browse the repository at this point in the history
Feat/rpc incremental backoff for BlockResult querying
  • Loading branch information
pharr117 authored Sep 4, 2023
2 parents b7ab9bb + 2205486 commit 94f5573
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 233 deletions.
23 changes: 4 additions & 19 deletions cmd/index.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"context"
"fmt"
"log"
"math"
Expand All @@ -21,7 +20,6 @@ import (
"github.com/DefiantLabs/cosmos-indexer/rpc"
"github.com/DefiantLabs/cosmos-indexer/tasks"
"github.com/spf13/cobra"
ctypes "github.com/tendermint/tendermint/rpc/core/types"

"gorm.io/gorm"
)
Expand Down Expand Up @@ -515,16 +513,15 @@ func (idxr *Indexer) indexBlockEvents(wg *sync.WaitGroup, failedBlockHandler cor

config.Log.Infof("Indexing block events from block: %v to %v", startHeight, endHeight)

// TODO: Strip this out of the Osmosis module and make it generalized
rpcClient := osmosis.URIClient{
rpcClient := rpc.URIClient{
Address: idxr.cl.Config.RPCAddr,
Client: &http.Client{},
}

currentHeight := startHeight

for endHeight == -1 || currentHeight <= endHeight {
bresults, err := getBlockResult(rpcClient, currentHeight)
bresults, err := rpc.GetBlockResultWithRetry(rpcClient, currentHeight, idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait)
if err != nil {
config.Log.Error(fmt.Sprintf("Error receiving block result for block %d", currentHeight), err)
failedBlockHandler(currentHeight, core.FailedBlockEventHandling, err)
Expand Down Expand Up @@ -619,15 +616,15 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor

config.Log.Infof("Indexing epoch events from epoch: %v to %v", startEpochNumber, endEpochNumber)

rpcClient := osmosis.URIClient{
rpcClient := rpc.URIClient{
Address: idxr.cl.Config.RPCAddr,
Client: &http.Client{},
}

for _, epoch := range epochsBetween {
config.Log.Infof("Indexing epoch events for epoch %v at height %d", epoch.EpochNumber, epoch.StartHeight)

bresults, err := getBlockResult(rpcClient, int64(epoch.StartHeight))
bresults, err := rpc.GetBlockResultWithRetry(rpcClient, int64(epoch.StartHeight), idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait)
if err != nil {
config.Log.Error(fmt.Sprintf("Error receiving block result for block %d", epoch.StartHeight), err)
failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err)
Expand Down Expand Up @@ -689,18 +686,6 @@ func GetEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifi
return epochsBetween, dbResp.Error
}

func getBlockResult(client osmosis.URIClient, height int64) (*ctypes.ResultBlockResults, error) {
brctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

bresults, err := client.DoBlockResults(brctx, &height)
if err != nil {
return nil, err
}

return bresults, nil
}

// doDBUpdates will read the data out of the db data chan that had been processed by the workers
// if this is a dry run, we will simply empty the channel and track progress
// otherwise we will index the data in the DB.
Expand Down
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func init() {
rootCmd.PersistentFlags().Int64Var(&conf.Base.WaitForChainDelay, "base.wait-for-chain-delay", 10, "seconds to wait between each check for node to catch up to the chain")
rootCmd.PersistentFlags().Int64Var(&conf.Base.BlockTimer, "base.block-timer", 10000, "print out how long it takes to process this many blocks")
rootCmd.PersistentFlags().BoolVar(&conf.Base.ExitWhenCaughtUp, "base.exit-when-caught-up", true, "mainly used for Osmosis rewards indexing")
rootCmd.PersistentFlags().Int64Var(&conf.Base.RPCRetryAttempts, "base.rpc-retry-attempts", 0, "number of RPC query retries to make")
rootCmd.PersistentFlags().Uint64Var(&conf.Base.RPCRetryMaxWait, "base.rpc-retry-max-wait", 30, "max retry incremental backoff wait time in seconds")

// Lens
rootCmd.PersistentFlags().StringVar(&conf.Lens.RPC, "lens.rpc", "", "node rpc endpoint")
Expand Down
2 changes: 2 additions & 0 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ epoch-events-end-epoch=752
dry = false # if true, indexing will occur but data will not be written to the database.
api = "" # node api endpoint
rpc-workers = 1
rpc-retry-attempts=0 #RPC queries are configured to retry if failed. This value sets how many retries to do before giving up. (-1 for indefinite retries)
rpc-retry-max-wait=30 #RPC query failure backoff max wait time in seconds

#Lens config options
[lens]
Expand Down
2 changes: 2 additions & 0 deletions config/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ type base struct {
EpochIndexingIdentifier string `mapstructure:"epoch-indexing-identifier"`
EpochEventsStartEpoch int64 `mapstructure:"epoch-events-start-epoch"`
EpochEventsEndEpoch int64 `mapstructure:"epoch-events-end-epoch"`
RPCRetryAttempts int64 `mapstructure:"rpc-retry-attempts"`
RPCRetryMaxWait uint64 `mapstructure:"rpc-retry-max-wait"`
}

type log struct {
Expand Down
179 changes: 0 additions & 179 deletions osmosis/helpers.go
Original file line number Diff line number Diff line change
@@ -1,180 +1 @@
package osmosis

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"reflect"

tmjson "github.com/tendermint/tendermint/libs/json"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
jsonrpc "github.com/tendermint/tendermint/rpc/jsonrpc/client"
types "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)

func DoHTTPReq(url string, authHeader string) (*http.Response, error) {
// Send req using http Client
client := &http.Client{}
req, _ := http.NewRequest("GET", url, nil)
req.Header.Add("Authorization", authHeader)
return client.Do(req)
}

func argsToURLValues(args map[string]interface{}) (url.Values, error) {
values := make(url.Values)
if len(args) == 0 {
return values, nil
}

err := argsToJSON(args)
if err != nil {
return nil, err
}

for key, val := range args {
values.Set(key, val.(string))
}

return values, nil
}

func argsToJSON(args map[string]interface{}) error {
for k, v := range args {
rt := reflect.TypeOf(v)
isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8
if isByteSlice {
bytes := reflect.ValueOf(v).Bytes()
args[k] = fmt.Sprintf("0x%X", bytes)
continue
}

data, err := tmjson.Marshal(v)
if err != nil {
return err
}
args[k] = string(data)
}
return nil
}

// Call issues a POST form HTTP request.
func (c *URIClient) DoHTTPGet(ctx context.Context, method string, params map[string]interface{}, result interface{}) (interface{}, error) {
values, err := argsToURLValues(params)
if err != nil {
return nil, fmt.Errorf("failed to encode params: %w", err)
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.Address+"/"+method, nil)
if err != nil {
return nil, fmt.Errorf("error creating new request: %w", err)
}

req.URL.RawQuery = values.Encode()
// fmt.Printf("Query string: %s\n", values.Encode())

// req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if c.AuthHeader != "" {
req.Header.Add("Authorization", c.AuthHeader)
}

resp, err := c.Client.Do(req)
if err != nil {
return nil, fmt.Errorf("get: %w", err)
}
defer resp.Body.Close()

responseBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response body: %w", err)
}

return unmarshalResponseBytes(responseBytes, jsonrpc.URIClientRequestID, result)
}

// From the JSON-RPC 2.0 spec:
// id: It MUST be the same as the value of the id member in the Request Object.
func validateAndVerifyID(res *types.RPCResponse, expectedID types.JSONRPCIntID) error {
if err := validateResponseID(res.ID); err != nil {
return err
}
if expectedID != res.ID.(types.JSONRPCIntID) { // validateResponseID ensured res.ID has the right type
return fmt.Errorf("response ID (%d) does not match request ID (%d)", res.ID, expectedID)
}
return nil
}

func validateResponseID(id interface{}) error {
if id == nil {
return errors.New("no ID")
}
_, ok := id.(types.JSONRPCIntID)
if !ok {
return fmt.Errorf("expected JSONRPCIntID, but got: %T", id)
}
return nil
}

func unmarshalResponseBytes(responseBytes []byte, expectedID types.JSONRPCIntID, result interface{}) (interface{}, error) {
// Read response. If rpc/core/types is imported, the result will unmarshal
// into the correct type.
response := &types.RPCResponse{}
if err := json.Unmarshal(responseBytes, response); err != nil {
return nil, fmt.Errorf("error unmarshalling: %w", err)
}

if response.Error != nil {
return nil, response.Error
}

if err := validateAndVerifyID(response, expectedID); err != nil {
return nil, fmt.Errorf("wrong ID: %w", err)
}

// Unmarshal the RawMessage into the result.
if err := tmjson.Unmarshal(response.Result, result); err != nil {
return nil, fmt.Errorf("error unmarshalling result: %w", err)
}

return result, nil
}

func (c *URIClient) DoBlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) {
result := new(ctypes.ResultBlockSearch)
params := map[string]interface{}{
"query": query,
"order_by": orderBy,
}

if page != nil {
params["page"] = page
}
if perPage != nil {
params["per_page"] = perPage
}

_, err := c.DoHTTPGet(ctx, "block_search", params, result)
if err != nil {
return nil, err
}

return result, nil
}

func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) {
result := new(ctypes.ResultBlockResults)
params := make(map[string]interface{})
if height != nil {
params["height"] = height
}

_, err := c.DoHTTPGet(ctx, "block_results", params, result)
if err != nil {
return nil, err
}

return result, nil
}
35 changes: 0 additions & 35 deletions osmosis/types.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,6 @@
package osmosis

import (
"net/http"
"time"
)

const (
ChainID = "osmosis-1"
Name = "osmosis"
)

type Result struct {
Data Data
}

type Data struct {
Value EventDataNewBlockHeader
}

type EventDataNewBlockHeader struct {
Header Header `json:"header"`
NumTxs string `json:"num_txs"` // Number of txs in a block
}

type Header struct {
// basic block info
ChainID string `json:"chain_id"`
Height string `json:"height"`
Time time.Time `json:"time"`
}

type TendermintNewBlockHeader struct {
Result Result
}

type URIClient struct {
Address string
Client *http.Client
AuthHeader string
}
Loading

0 comments on commit 94f5573

Please sign in to comment.