From 80f5f5a455f360695867aba86ea13997bcfafb0f Mon Sep 17 00:00:00 2001 From: pharr117 Date: Sun, 3 Sep 2023 13:26:06 -0400 Subject: [PATCH 1/2] Refactor RPC server HTTP request codebase to move it into rpc package instead of osmosis package --- cmd/index.go | 6 +- osmosis/helpers.go | 179 --------------------------------------------- osmosis/types.go | 35 --------- rpc/blocks.go | 175 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 178 insertions(+), 217 deletions(-) create mode 100644 rpc/blocks.go diff --git a/cmd/index.go b/cmd/index.go index 0aeb26e4..0a556f2f 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -516,7 +516,7 @@ func (idxr *Indexer) indexBlockEvents(wg *sync.WaitGroup, failedBlockHandler cor config.Log.Infof("Indexing block events from block: %v to %v", startHeight, endHeight) // TODO: Strip this out of the Osmosis module and make it generalized - rpcClient := osmosis.URIClient{ + rpcClient := rpc.URIClient{ Address: idxr.cl.Config.RPCAddr, Client: &http.Client{}, } @@ -619,7 +619,7 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor config.Log.Infof("Indexing epoch events from epoch: %v to %v", startEpochNumber, endEpochNumber) - rpcClient := osmosis.URIClient{ + rpcClient := rpc.URIClient{ Address: idxr.cl.Config.RPCAddr, Client: &http.Client{}, } @@ -689,7 +689,7 @@ func GetEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifi return epochsBetween, dbResp.Error } -func getBlockResult(client osmosis.URIClient, height int64) (*ctypes.ResultBlockResults, error) { +func getBlockResult(client rpc.URIClient, height int64) (*ctypes.ResultBlockResults, error) { brctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) defer cancel() diff --git a/osmosis/helpers.go b/osmosis/helpers.go index 2d2e8b35..110ffdf4 100644 --- a/osmosis/helpers.go +++ b/osmosis/helpers.go @@ -1,180 +1 @@ package osmosis - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "net/url" - "reflect" - - tmjson "github.com/tendermint/tendermint/libs/json" - ctypes "github.com/tendermint/tendermint/rpc/core/types" - jsonrpc "github.com/tendermint/tendermint/rpc/jsonrpc/client" - types "github.com/tendermint/tendermint/rpc/jsonrpc/types" -) - -func DoHTTPReq(url string, authHeader string) (*http.Response, error) { - // Send req using http Client - client := &http.Client{} - req, _ := http.NewRequest("GET", url, nil) - req.Header.Add("Authorization", authHeader) - return client.Do(req) -} - -func argsToURLValues(args map[string]interface{}) (url.Values, error) { - values := make(url.Values) - if len(args) == 0 { - return values, nil - } - - err := argsToJSON(args) - if err != nil { - return nil, err - } - - for key, val := range args { - values.Set(key, val.(string)) - } - - return values, nil -} - -func argsToJSON(args map[string]interface{}) error { - for k, v := range args { - rt := reflect.TypeOf(v) - isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 - if isByteSlice { - bytes := reflect.ValueOf(v).Bytes() - args[k] = fmt.Sprintf("0x%X", bytes) - continue - } - - data, err := tmjson.Marshal(v) - if err != nil { - return err - } - args[k] = string(data) - } - return nil -} - -// Call issues a POST form HTTP request. -func (c *URIClient) DoHTTPGet(ctx context.Context, method string, params map[string]interface{}, result interface{}) (interface{}, error) { - values, err := argsToURLValues(params) - if err != nil { - return nil, fmt.Errorf("failed to encode params: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.Address+"/"+method, nil) - if err != nil { - return nil, fmt.Errorf("error creating new request: %w", err) - } - - req.URL.RawQuery = values.Encode() - // fmt.Printf("Query string: %s\n", values.Encode()) - - // req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - if c.AuthHeader != "" { - req.Header.Add("Authorization", c.AuthHeader) - } - - resp, err := c.Client.Do(req) - if err != nil { - return nil, fmt.Errorf("get: %w", err) - } - defer resp.Body.Close() - - responseBytes, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("read response body: %w", err) - } - - return unmarshalResponseBytes(responseBytes, jsonrpc.URIClientRequestID, result) -} - -// From the JSON-RPC 2.0 spec: -// id: It MUST be the same as the value of the id member in the Request Object. -func validateAndVerifyID(res *types.RPCResponse, expectedID types.JSONRPCIntID) error { - if err := validateResponseID(res.ID); err != nil { - return err - } - if expectedID != res.ID.(types.JSONRPCIntID) { // validateResponseID ensured res.ID has the right type - return fmt.Errorf("response ID (%d) does not match request ID (%d)", res.ID, expectedID) - } - return nil -} - -func validateResponseID(id interface{}) error { - if id == nil { - return errors.New("no ID") - } - _, ok := id.(types.JSONRPCIntID) - if !ok { - return fmt.Errorf("expected JSONRPCIntID, but got: %T", id) - } - return nil -} - -func unmarshalResponseBytes(responseBytes []byte, expectedID types.JSONRPCIntID, result interface{}) (interface{}, error) { - // Read response. If rpc/core/types is imported, the result will unmarshal - // into the correct type. - response := &types.RPCResponse{} - if err := json.Unmarshal(responseBytes, response); err != nil { - return nil, fmt.Errorf("error unmarshalling: %w", err) - } - - if response.Error != nil { - return nil, response.Error - } - - if err := validateAndVerifyID(response, expectedID); err != nil { - return nil, fmt.Errorf("wrong ID: %w", err) - } - - // Unmarshal the RawMessage into the result. - if err := tmjson.Unmarshal(response.Result, result); err != nil { - return nil, fmt.Errorf("error unmarshalling result: %w", err) - } - - return result, nil -} - -func (c *URIClient) DoBlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { - result := new(ctypes.ResultBlockSearch) - params := map[string]interface{}{ - "query": query, - "order_by": orderBy, - } - - if page != nil { - params["page"] = page - } - if perPage != nil { - params["per_page"] = perPage - } - - _, err := c.DoHTTPGet(ctx, "block_search", params, result) - if err != nil { - return nil, err - } - - return result, nil -} - -func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { - result := new(ctypes.ResultBlockResults) - params := make(map[string]interface{}) - if height != nil { - params["height"] = height - } - - _, err := c.DoHTTPGet(ctx, "block_results", params, result) - if err != nil { - return nil, err - } - - return result, nil -} diff --git a/osmosis/types.go b/osmosis/types.go index dc287dfe..52414efe 100644 --- a/osmosis/types.go +++ b/osmosis/types.go @@ -1,41 +1,6 @@ package osmosis -import ( - "net/http" - "time" -) - const ( ChainID = "osmosis-1" Name = "osmosis" ) - -type Result struct { - Data Data -} - -type Data struct { - Value EventDataNewBlockHeader -} - -type EventDataNewBlockHeader struct { - Header Header `json:"header"` - NumTxs string `json:"num_txs"` // Number of txs in a block -} - -type Header struct { - // basic block info - ChainID string `json:"chain_id"` - Height string `json:"height"` - Time time.Time `json:"time"` -} - -type TendermintNewBlockHeader struct { - Result Result -} - -type URIClient struct { - Address string - Client *http.Client - AuthHeader string -} diff --git a/rpc/blocks.go b/rpc/blocks.go new file mode 100644 index 00000000..9ea4b55c --- /dev/null +++ b/rpc/blocks.go @@ -0,0 +1,175 @@ +package rpc + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "reflect" + + tmjson "github.com/tendermint/tendermint/libs/json" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + jsonrpc "github.com/tendermint/tendermint/rpc/jsonrpc/client" + types "github.com/tendermint/tendermint/rpc/jsonrpc/types" +) + +func argsToURLValues(args map[string]interface{}) (url.Values, error) { + values := make(url.Values) + if len(args) == 0 { + return values, nil + } + + err := argsToJSON(args) + if err != nil { + return nil, err + } + + for key, val := range args { + values.Set(key, val.(string)) + } + + return values, nil +} + +func argsToJSON(args map[string]interface{}) error { + for k, v := range args { + rt := reflect.TypeOf(v) + isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 + if isByteSlice { + bytes := reflect.ValueOf(v).Bytes() + args[k] = fmt.Sprintf("0x%X", bytes) + continue + } + + data, err := tmjson.Marshal(v) + if err != nil { + return err + } + args[k] = string(data) + } + return nil +} + +func (c *URIClient) DoHTTPGet(ctx context.Context, method string, params map[string]interface{}, result interface{}) (interface{}, error) { + values, err := argsToURLValues(params) + if err != nil { + return nil, fmt.Errorf("failed to encode params: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.Address+"/"+method, nil) + if err != nil { + return nil, fmt.Errorf("error creating new request: %w", err) + } + + req.URL.RawQuery = values.Encode() + // fmt.Printf("Query string: %s\n", values.Encode()) + + // req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + if c.AuthHeader != "" { + req.Header.Add("Authorization", c.AuthHeader) + } + + resp, err := c.Client.Do(req) + if err != nil { + return nil, fmt.Errorf("get: %w", err) + } + defer resp.Body.Close() + + responseBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response body: %w", err) + } + + return unmarshalResponseBytes(responseBytes, jsonrpc.URIClientRequestID, result) +} + +type URIClient struct { + Address string + Client *http.Client + AuthHeader string +} + +func unmarshalResponseBytes(responseBytes []byte, expectedID types.JSONRPCIntID, result interface{}) (interface{}, error) { + // Read response. If rpc/core/types is imported, the result will unmarshal + // into the correct type. + response := &types.RPCResponse{} + if err := json.Unmarshal(responseBytes, response); err != nil { + return nil, fmt.Errorf("error unmarshalling: %w", err) + } + + if response.Error != nil { + return nil, response.Error + } + + if err := validateAndVerifyID(response, expectedID); err != nil { + return nil, fmt.Errorf("wrong ID: %w", err) + } + + // Unmarshal the RawMessage into the result. + if err := tmjson.Unmarshal(response.Result, result); err != nil { + return nil, fmt.Errorf("error unmarshalling result: %w", err) + } + + return result, nil +} + +func validateAndVerifyID(res *types.RPCResponse, expectedID types.JSONRPCIntID) error { + if err := validateResponseID(res.ID); err != nil { + return err + } + if expectedID != res.ID.(types.JSONRPCIntID) { // validateResponseID ensured res.ID has the right type + return fmt.Errorf("response ID (%d) does not match request ID (%d)", res.ID, expectedID) + } + return nil +} + +func validateResponseID(id interface{}) error { + if id == nil { + return errors.New("no ID") + } + _, ok := id.(types.JSONRPCIntID) + if !ok { + return fmt.Errorf("expected JSONRPCIntID, but got: %T", id) + } + return nil +} + +func (c *URIClient) DoBlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { + result := new(ctypes.ResultBlockSearch) + params := map[string]interface{}{ + "query": query, + "order_by": orderBy, + } + + if page != nil { + params["page"] = page + } + if perPage != nil { + params["per_page"] = perPage + } + + _, err := c.DoHTTPGet(ctx, "block_search", params, result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { + result := new(ctypes.ResultBlockResults) + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + + _, err := c.DoHTTPGet(ctx, "block_results", params, result) + if err != nil { + return nil, err + } + + return result, nil +} From 2205486be1637e0b78703c6dc25a39493b95fcbb Mon Sep 17 00:00:00 2001 From: pharr117 Date: Sun, 3 Sep 2023 17:55:30 -0400 Subject: [PATCH 2/2] Add the following: * New configs for RPC retry values * Function for exponential backoff calculation to determine how long to wait based on a number of attempts * Backoff wrapper around GetBlockResult for use in the event and epoch indexer --- cmd/index.go | 19 ++----------- cmd/root.go | 2 ++ config.toml.example | 2 ++ config/app_config.go | 2 ++ rpc/blocks.go | 68 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 76 insertions(+), 17 deletions(-) diff --git a/cmd/index.go b/cmd/index.go index 0a556f2f..d869bcb2 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -1,7 +1,6 @@ package cmd import ( - "context" "fmt" "log" "math" @@ -21,7 +20,6 @@ import ( "github.com/DefiantLabs/cosmos-indexer/rpc" "github.com/DefiantLabs/cosmos-indexer/tasks" "github.com/spf13/cobra" - ctypes "github.com/tendermint/tendermint/rpc/core/types" "gorm.io/gorm" ) @@ -515,7 +513,6 @@ func (idxr *Indexer) indexBlockEvents(wg *sync.WaitGroup, failedBlockHandler cor config.Log.Infof("Indexing block events from block: %v to %v", startHeight, endHeight) - // TODO: Strip this out of the Osmosis module and make it generalized rpcClient := rpc.URIClient{ Address: idxr.cl.Config.RPCAddr, Client: &http.Client{}, @@ -524,7 +521,7 @@ func (idxr *Indexer) indexBlockEvents(wg *sync.WaitGroup, failedBlockHandler cor currentHeight := startHeight for endHeight == -1 || currentHeight <= endHeight { - bresults, err := getBlockResult(rpcClient, currentHeight) + bresults, err := rpc.GetBlockResultWithRetry(rpcClient, currentHeight, idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait) if err != nil { config.Log.Error(fmt.Sprintf("Error receiving block result for block %d", currentHeight), err) failedBlockHandler(currentHeight, core.FailedBlockEventHandling, err) @@ -627,7 +624,7 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor for _, epoch := range epochsBetween { config.Log.Infof("Indexing epoch events for epoch %v at height %d", epoch.EpochNumber, epoch.StartHeight) - bresults, err := getBlockResult(rpcClient, int64(epoch.StartHeight)) + bresults, err := rpc.GetBlockResultWithRetry(rpcClient, int64(epoch.StartHeight), idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait) if err != nil { config.Log.Error(fmt.Sprintf("Error receiving block result for block %d", epoch.StartHeight), err) failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err) @@ -689,18 +686,6 @@ func GetEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifi return epochsBetween, dbResp.Error } -func getBlockResult(client rpc.URIClient, height int64) (*ctypes.ResultBlockResults, error) { - brctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) - defer cancel() - - bresults, err := client.DoBlockResults(brctx, &height) - if err != nil { - return nil, err - } - - return bresults, nil -} - // doDBUpdates will read the data out of the db data chan that had been processed by the workers // if this is a dry run, we will simply empty the channel and track progress // otherwise we will index the data in the DB. diff --git a/cmd/root.go b/cmd/root.go index 43729acf..0cd2f62e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -68,6 +68,8 @@ func init() { rootCmd.PersistentFlags().Int64Var(&conf.Base.WaitForChainDelay, "base.wait-for-chain-delay", 10, "seconds to wait between each check for node to catch up to the chain") rootCmd.PersistentFlags().Int64Var(&conf.Base.BlockTimer, "base.block-timer", 10000, "print out how long it takes to process this many blocks") rootCmd.PersistentFlags().BoolVar(&conf.Base.ExitWhenCaughtUp, "base.exit-when-caught-up", true, "mainly used for Osmosis rewards indexing") + rootCmd.PersistentFlags().Int64Var(&conf.Base.RPCRetryAttempts, "base.rpc-retry-attempts", 0, "number of RPC query retries to make") + rootCmd.PersistentFlags().Uint64Var(&conf.Base.RPCRetryMaxWait, "base.rpc-retry-max-wait", 30, "max retry incremental backoff wait time in seconds") // Lens rootCmd.PersistentFlags().StringVar(&conf.Lens.RPC, "lens.rpc", "", "node rpc endpoint") diff --git a/config.toml.example b/config.toml.example index 80615bf8..98924748 100644 --- a/config.toml.example +++ b/config.toml.example @@ -26,6 +26,8 @@ epoch-events-end-epoch=752 dry = false # if true, indexing will occur but data will not be written to the database. api = "" # node api endpoint rpc-workers = 1 +rpc-retry-attempts=0 #RPC queries are configured to retry if failed. This value sets how many retries to do before giving up. (-1 for indefinite retries) +rpc-retry-max-wait=30 #RPC query failure backoff max wait time in seconds #Lens config options [lens] diff --git a/config/app_config.go b/config/app_config.go index 9e9d9f75..e9b71521 100644 --- a/config/app_config.go +++ b/config/app_config.go @@ -169,6 +169,8 @@ type base struct { EpochIndexingIdentifier string `mapstructure:"epoch-indexing-identifier"` EpochEventsStartEpoch int64 `mapstructure:"epoch-events-start-epoch"` EpochEventsEndEpoch int64 `mapstructure:"epoch-events-end-epoch"` + RPCRetryAttempts int64 `mapstructure:"rpc-retry-attempts"` + RPCRetryMaxWait uint64 `mapstructure:"rpc-retry-max-wait"` } type log struct { diff --git a/rpc/blocks.go b/rpc/blocks.go index 9ea4b55c..d5cf591c 100644 --- a/rpc/blocks.go +++ b/rpc/blocks.go @@ -6,10 +6,13 @@ import ( "errors" "fmt" "io" + "math" "net/http" "net/url" "reflect" + "time" + "github.com/DefiantLabs/cosmos-indexer/config" tmjson "github.com/tendermint/tendermint/libs/json" ctypes "github.com/tendermint/tendermint/rpc/core/types" jsonrpc "github.com/tendermint/tendermint/rpc/jsonrpc/client" @@ -173,3 +176,68 @@ func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes. return result, nil } + +func GetBlockResult(client URIClient, height int64) (*ctypes.ResultBlockResults, error) { + brctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + defer cancel() + + bresults, err := client.DoBlockResults(brctx, &height) + if err != nil { + return nil, err + } + + return bresults, nil +} + +func GetBlockResultWithRetry(client URIClient, height int64, retryMaxAttempts int64, retryMaxWaitSeconds uint64) (*ctypes.ResultBlockResults, error) { + if retryMaxAttempts == 0 { + return GetBlockResult(client, height) + } + + if retryMaxWaitSeconds < 2 { + retryMaxWaitSeconds = 2 + } + + var attempts int64 + maxRetryTime := time.Duration(retryMaxWaitSeconds) * time.Second + if maxRetryTime < 0 { + config.Log.Warn("Detected maxRetryTime overflow, setting time to sane maximum of 30s") + maxRetryTime = 30 * time.Second + } + + currentBackoffDuration, maxReached := getBackoffDurationForAttempts(attempts, maxRetryTime) + + for { + resp, err := GetBlockResult(client, height) + attempts++ + if err != nil && (retryMaxAttempts < 0 || (attempts <= retryMaxAttempts)) { + config.Log.Error("Error getting RPC response, backing off and trying again", err) + config.Log.Debugf("Attempt %d with wait time %+v", attempts, currentBackoffDuration) + time.Sleep(currentBackoffDuration) + + // guard against overflow + if !maxReached { + currentBackoffDuration, maxReached = getBackoffDurationForAttempts(attempts, maxRetryTime) + } + + } else { + if err != nil { + config.Log.Error("Error getting RPC response, reached max retry attempts") + } + return resp, err + } + } +} + +func getBackoffDurationForAttempts(numAttempts int64, maxRetryTime time.Duration) (time.Duration, bool) { + backoffBase := 1.5 + backoffDuration := time.Duration(math.Pow(backoffBase, float64(numAttempts)) * float64(time.Second)) + + maxReached := false + if backoffDuration > maxRetryTime || backoffDuration < 0 { + maxReached = true + backoffDuration = maxRetryTime + } + + return backoffDuration, maxReached +}