Skip to content

Commit

Permalink
Merge pull request #132 from bro-n-bro/131-sometime-crawler-fails-wit…
Browse files Browse the repository at this point in the history
…h-panic

131 fix nil messages processing
  • Loading branch information
malekvictor authored Oct 22, 2023
2 parents 82eac15 + 1b58ed6 commit 10c1eee
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 25 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_and_push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ jobs:
- name: Build specify tag
run: |
echo 'build image: bronbro/spacebox-crawler:${{github.ref_name}}'
docker build -t bronbro/spacebox-crawler:${{github.ref_name}} --target=app .
docker build -t bronbro/spacebox-crawler:${{github.ref_name}} --target=app --build-arg version=${{github.ref_name}} .
- name: Build latest tag
if: startsWith(github.ref, 'refs/tags/v')
run: docker build -t bronbro/spacebox-crawler:latest --target=app .
run: docker build -t bronbro/spacebox-crawler:latest --target=app --build-arg version=latest .
- name: push specify tag to registry
run: docker push bronbro/spacebox-crawler:${{github.ref_name}}
- name: push latest tag to registry
Expand Down
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM golang:1.20.5-alpine as builder

ARG version

ENV CGO_ENABLED=1

RUN apk update && apk add --no-cache make git build-base musl-dev librdkafka librdkafka-dev
Expand All @@ -9,7 +11,7 @@ COPY . ./
RUN echo "build binary" && \
export PATH=$PATH:/usr/local/go/bin && \
go mod download && \
go build -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \
go build -ldflags="-X 'main.Version=$version'" -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \
mkdir -p /spacebox-crawler && \
mv main /spacebox-crawler/main && \
rm -Rf /usr/local/go/src
Expand Down
8 changes: 6 additions & 2 deletions client/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
ibctransfertypes "github.com/cosmos/ibc-go/v7/modules/apps/transfer/types"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -40,11 +41,14 @@ type Client struct {
IbcTransferQueryClient ibctransfertypes.QueryClient
LiquidityQueryClient liquiditytypes.QueryClient
conn *grpc.ClientConn
log *zerolog.Logger
cfg Config
}

func New(cfg Config) *Client {
return &Client{cfg: cfg}
func New(cfg Config, l zerolog.Logger) *Client {
l = l.With().Str("cmp", "grpc-client").Logger()

return &Client{cfg: cfg, log: &l}
}

func (c *Client) Start(ctx context.Context) error {
Expand Down
5 changes: 2 additions & 3 deletions client/grpc/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package grpc
import (
"context"
"encoding/hex"
"log"

cometbfttypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/cosmos-sdk/types/tx"
Expand All @@ -12,13 +11,13 @@ import (
// Txs queries for all the transactions in a block. Transactions are returned
// in sdk.TxResponse format which internally contains a sdk.Tx. An error is
// returned if any query fails.
func (c *Client) Txs(ctx context.Context, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) {
func (c *Client) Txs(ctx context.Context, height int64, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) {
txResponses := make([]*tx.GetTxResponse, 0, len(txs))

for _, tmTx := range txs {
respPb, err := c.TxService.GetTx(ctx, &tx.GetTxRequest{Hash: hex.EncodeToString(tmTx.Hash())})
if err != nil {
log.Println("GetTx error:", err)
c.log.Error().Err(err).Int64("height", height).Msg("GetTx error")
continue
}

Expand Down
5 changes: 4 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const (
EnvFile = "ENV_FILE"
)

// Version provided by ldflags
var Version = "develop"

func main() {
// try to get .env file from Environments
fileName, ok := os.LookupEnv(EnvFile)
Expand Down Expand Up @@ -52,7 +55,7 @@ func main() {
Logger()

// create an application
a := app.New(cfg, logger)
a := app.New(cfg, Version, logger)

// run service
if err := executor.Run(a); err != nil {
Expand Down
27 changes: 19 additions & 8 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
interchainprovider "github.com/cosmos/interchain-security/v3/x/ccv/provider"
interchaintypes "github.com/cosmos/interchain-security/v3/x/ccv/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage"
Expand Down Expand Up @@ -71,29 +73,31 @@ var (

type (
App struct {
log *zerolog.Logger
cmps []cmp
cfg Config
log *zerolog.Logger
version string
cmps []cmp
cfg Config
}
cmp struct {
Service rep.Lifecycle
Name string
}
)

func New(cfg Config, l zerolog.Logger) *App {
l = l.With().Str("cmp", "app").Logger()
func New(cfg Config, version string, l zerolog.Logger) *App {
l = l.With().Str("version", version).Str("cmp", "app").Logger()

return &App{
log: &l,
cfg: cfg,
log: &l,
cfg: cfg,
version: version,
}
}

func (a *App) Start(ctx context.Context) error {
a.log.Info().Msg("starting app")

grpcCli := grpcClient.New(a.cfg.GRPCConfig)
grpcCli := grpcClient.New(a.cfg.GRPCConfig, *a.log)
rpcCli := rpcClient.New(a.cfg.RPCConfig)

// TODO: use redis
Expand Down Expand Up @@ -144,6 +148,13 @@ func (a *App) Start(ctx context.Context) error {
valStatusCache.Patch(cache.WithMetrics[string, int64]("validators_status"))
tallyCache.Patch(cache.WithMetrics[uint64, int64]("tally"))
accCache.Patch(cache.WithMetrics[string, int64]("account"))

promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "spacebox_crawler",
Name: "version",
Help: "Crawler version",
ConstLabels: prometheus.Labels{"version": a.version},
}).Inc()
}

b := broker.New(a.cfg.BrokerConfig, a.cfg.Modules, *a.log,
Expand Down
2 changes: 1 addition & 1 deletion internal/rep/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type (
Block(ctx context.Context, height int64) (*cometbftcoretypes.ResultBlock, error)
Validators(ctx context.Context, height int64) (*cometbftcoretypes.ResultValidators, error)

Txs(ctx context.Context, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error)
Txs(ctx context.Context, height int64, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error)
}

RPCClient interface {
Expand Down
18 changes: 17 additions & 1 deletion pkg/worker/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

const (
keyHeight = "height"
keyTxHash = "tx_hash"
keyModule = "module"
)

Expand Down Expand Up @@ -167,7 +168,7 @@ func (w *Worker) processHeight(ctx context.Context, workerIndex int, height int6

_txsDur := time.Now()

txsRes, err := w.grpcClient.Txs(ctx, block.Block.Data.Txs)
txsRes, err := w.grpcClient.Txs(ctx, height, block.Block.Data.Txs)
if err != nil {
w.log.Error().Err(err).Msgf("get txs error: %v", err)
w.setErrorStatusWithLogging(ctx, height, err.Error())
Expand Down Expand Up @@ -296,6 +297,21 @@ func (w *Worker) processMessages(ctx context.Context, txs []*types.Tx) error {
}

func (w *Worker) processMessage(ctx context.Context, msg *codec.Any, tx *types.Tx, msgIndex int) error {
if msg == nil {
w.log.Warn().Int64(keyHeight, tx.Height).Str(keyTxHash, tx.TxHash).Msg("can't process nil message")

if err := w.storage.InsertErrorMessage(ctx, w.tsM.NewErrorMessage(tx.Height, "message is nil")); err != nil {
w.log.Error().
Err(err).
Int64(keyHeight, tx.Height).
Msgf("fail to insert error_message: %v", err)

return err
}

return nil
}

stdMsg, err := w.unpackMessage(ctx, tx.Height, msg)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/worker/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ func (w *Worker) unpackMessage(ctx context.Context, height int64, msg *codec.Any
}

if strings.HasPrefix(err.Error(), "no concrete type registered for type URL") {
w.log.Warn().Err(err).Msgf("error while unpacking message: %s", err)
w.log.Warn().Err(err).Int64(keyHeight, height).Msgf("error while unpacking message: %s", err)

if err = w.storage.InsertErrorMessage(ctx, w.tsM.NewErrorMessage(height, err.Error())); err != nil {
w.log.Error().
Err(err).
Int64(keyHeight, height).
Msgf("Fail to insert error_message: %v", err)
Msgf("fail to insert error_message: %v", err)
return nil, err
}

Expand Down
15 changes: 11 additions & 4 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ func (w *Worker) Start(_ context.Context) error {
Help: "Duration of parsed blockchain objects",
}, []string{"type"}),
}

var val float64
if w.cfg.RecoveryMode {
val = 1
}

promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "spacebox_crawler",
Name: "worker_recovery_mode",
Help: "Is worker recovery mode enabled",
}).Set(val)
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -188,7 +199,3 @@ func (w *Worker) Stop(_ context.Context) error {

return nil
}

func (w *Worker) GetProcessMessagesFn() func(ctx context.Context, txs []*types.Tx) error {
return w.processMessages
}

0 comments on commit 10c1eee

Please sign in to comment.