Skip to content

Commit

Permalink
Fix: reinitialization periodic networks after reset (#1002)
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Nov 8, 2023
1 parent f14d399 commit ae8eca6
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 14 deletions.
46 changes: 40 additions & 6 deletions cmd/indexer/indexer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"context"
"time"

"github.com/baking-bad/bcdhub/internal/models"
"github.com/baking-bad/bcdhub/internal/models/block"
Expand Down Expand Up @@ -39,22 +40,55 @@ func (initializer Initializer) Init(ctx context.Context) error {
// check first block in node and in database, compare its hash.
// if hash is differed new periodic chain was started.
log.Info().Str("network", initializer.network.String()).Msg("checking for new periodic chain...")
blockHash, err := initializer.rpc.BlockHash(ctx, 1)

var (
notRunning = true
)

for notRunning {
header, err := initializer.rpc.GetHead(ctx)
if err != nil {
return err
}
notRunning = header.Level == 0
log.Info().Bool("running", !notRunning).Str("network", initializer.network.String()).Msg("chain status")
if notRunning {
time.Sleep(time.Second * 10)
}
}

header, err := initializer.rpc.GetHeader(ctx, 1)
if err != nil {
return err
}

firstBlock, err := initializer.block.Get(ctx, 1)
log.Info().Str("node_hash", blockHash).Str("indexer_hash", firstBlock.Hash).Msg("checking first block hash...")
if err == nil && firstBlock.Hash != blockHash {
if err != nil {
return nil
}

log.Info().
Str("network", initializer.network.String()).
Str("node_hash", header.Hash).
Str("indexer_hash", firstBlock.Hash).
Msg("checking first block hash...")
if firstBlock.Hash != header.Hash {
log.Info().Str("network", initializer.network.String()).Msg("found new periodic chain")
log.Warn().Str("network", initializer.network.String()).Msg("drop database...")
if err := initializer.repo.Drop(ctx); err != nil {
if err := initializer.drop(ctx); err != nil {
return err
}
log.Warn().Str("network", initializer.network.String()).Msg("database was dropped")
}
}
}

return initializer.repo.InitDatabase(ctx)
}

func (initializer Initializer) drop(ctx context.Context) error {
log.Warn().Str("network", initializer.network.String()).Msg("drop database...")
if err := initializer.repo.Drop(ctx); err != nil {
return err
}
log.Warn().Str("network", initializer.network.String()).Msg("database was dropped")
return nil
}
3 changes: 1 addition & 2 deletions internal/helpers/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package helpers
import (
"net/url"
"os"
"path"
"path/filepath"
"strings"
"unicode"
Expand Down Expand Up @@ -33,7 +32,7 @@ func URLJoin(baseURL, queryPath string) (string, error) {
if err != nil {
return "", err
}
u.Path = path.Join(u.Path, queryPath)
u = u.JoinPath(queryPath)
return u.String(), nil
}

Expand Down
12 changes: 7 additions & 5 deletions internal/noderpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ func NewWaitNodeRPC(baseURL string, opts ...NodeOption) *NodeRPC {
return node
}

func (rpc *NodeRPC) checkStatusCode(r io.Reader, statusCode int, checkStatusCode bool) error {
func (rpc *NodeRPC) checkStatusCode(r io.Reader, statusCode int, checkStatusCode bool, uri string) error {
switch {
case statusCode == http.StatusOK:
return nil
case statusCode == http.StatusNotFound:
return errors.Errorf("%s: not found", uri)
case statusCode > http.StatusInternalServerError:
return NewNodeUnavailiableError(rpc.baseURL, statusCode)
case checkStatusCode:
Expand All @@ -107,7 +109,7 @@ func (rpc *NodeRPC) checkStatusCode(r io.Reader, statusCode int, checkStatusCode
}

func (rpc *NodeRPC) parseResponse(r io.Reader, statusCode int, checkStatusCode bool, uri string, response interface{}) error {
if err := rpc.checkStatusCode(r, statusCode, checkStatusCode); err != nil {
if err := rpc.checkStatusCode(r, statusCode, checkStatusCode, uri); err != nil {
return fmt.Errorf("%w (%s): %w", ErrNodeRPCError, uri, err)
}

Expand Down Expand Up @@ -173,7 +175,7 @@ func (rpc *NodeRPC) get(ctx context.Context, uri string, response interface{}) e
return err
}

return rpc.parseResponse(buffer, resp.StatusCode, true, uri, response)
return rpc.parseResponse(buffer, resp.StatusCode, true, resp.Request.URL.String(), response)
}

func (rpc *NodeRPC) getRaw(ctx context.Context, uri string) ([]byte, error) {
Expand All @@ -196,7 +198,7 @@ func (rpc *NodeRPC) getRaw(ctx context.Context, uri string) ([]byte, error) {
}
defer resp.Body.Close()

if err := rpc.checkStatusCode(resp.Body, resp.StatusCode, true); err != nil {
if err := rpc.checkStatusCode(resp.Body, resp.StatusCode, true, uri); err != nil {
return nil, fmt.Errorf("%w (%s): %w", ErrNodeRPCError, uri, err)
}
return io.ReadAll(resp.Body)
Expand Down Expand Up @@ -227,7 +229,7 @@ func (rpc *NodeRPC) post(ctx context.Context, uri string, data interface{}, chec
return err
}

return rpc.parseResponse(buffer, resp.StatusCode, checkStatusCode, uri, response)
return rpc.parseResponse(buffer, resp.StatusCode, checkStatusCode, resp.Request.URL.String(), response)
}

// Block - returns block
Expand Down
2 changes: 1 addition & 1 deletion internal/noderpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestNodeRPC_checkStatusCode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rpc := new(NodeRPC)
err := rpc.checkStatusCode(tt.r, tt.statusCode, tt.checkStatusCode)
err := rpc.checkStatusCode(tt.r, tt.statusCode, tt.checkStatusCode, "")
require.Equal(t, tt.wantErr, err != nil)
if err != nil {
require.ErrorContains(t, err, tt.errString)
Expand Down

0 comments on commit ae8eca6

Please sign in to comment.