From 102203949bb43b81cd12477720ec23d5ae68a813 Mon Sep 17 00:00:00 2001 From: Victor Neznaykin Date: Fri, 20 Oct 2023 22:11:45 +0400 Subject: [PATCH 1/5] fix processing nil messages add more metrics --- .github/workflows/build_and_push.yml | 4 ++-- Dockerfile | 4 +++- client/grpc/client.go | 8 ++++++-- client/grpc/tx.go | 5 ++--- internal/app/app.go | 16 ++++++++++++++-- internal/rep/client.go | 2 +- pkg/worker/process.go | 18 +++++++++++++++++- pkg/worker/utils.go | 4 ++-- pkg/worker/worker.go | 15 +++++++++++---- 9 files changed, 58 insertions(+), 18 deletions(-) diff --git a/.github/workflows/build_and_push.yml b/.github/workflows/build_and_push.yml index b7c5758..dcd41b7 100644 --- a/.github/workflows/build_and_push.yml +++ b/.github/workflows/build_and_push.yml @@ -21,10 +21,10 @@ jobs: - name: Build specify tag run: | echo 'build image: bronbro/spacebox-crawler:${{github.ref_name}}' - docker build -t bronbro/spacebox-crawler:${{github.ref_name}} --target=app . + docker build -t bronbro/spacebox-crawler:${{github.ref_name}} --target=app --build-arg version=${{github.ref_name}} . - name: Build latest tag if: startsWith(github.ref, 'refs/tags/v') - run: docker build -t bronbro/spacebox-crawler:latest --target=app . + run: docker build -t bronbro/spacebox-crawler:latest --target=app --build-arg version=latest . - name: push specify tag to registry run: docker push bronbro/spacebox-crawler:${{github.ref_name}} - name: push latest tag to registry diff --git a/Dockerfile b/Dockerfile index 1902705..5820b53 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,7 @@ FROM golang:1.20.5-alpine as builder +ARG version + ENV CGO_ENABLED=1 RUN apk update && apk add --no-cache make git build-base musl-dev librdkafka librdkafka-dev @@ -9,7 +11,7 @@ COPY . ./ RUN echo "build binary" && \ export PATH=$PATH:/usr/local/go/bin && \ go mod download && \ - go build -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ + go build ldflags="-X 'internal/app.Version=$version'" -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ mkdir -p /spacebox-crawler && \ mv main /spacebox-crawler/main && \ rm -Rf /usr/local/go/src diff --git a/client/grpc/client.go b/client/grpc/client.go index bed97f1..e10ecd7 100644 --- a/client/grpc/client.go +++ b/client/grpc/client.go @@ -18,6 +18,7 @@ import ( stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" ibctransfertypes "github.com/cosmos/ibc-go/v7/modules/apps/transfer/types" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/rs/zerolog" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -40,11 +41,14 @@ type Client struct { IbcTransferQueryClient ibctransfertypes.QueryClient LiquidityQueryClient liquiditytypes.QueryClient conn *grpc.ClientConn + log *zerolog.Logger cfg Config } -func New(cfg Config) *Client { - return &Client{cfg: cfg} +func New(cfg Config, l zerolog.Logger) *Client { + l = l.With().Str("cmp", "grpc-client").Logger() + + return &Client{cfg: cfg, log: &l} } func (c *Client) Start(ctx context.Context) error { diff --git a/client/grpc/tx.go b/client/grpc/tx.go index da4f5f4..c5cf021 100644 --- a/client/grpc/tx.go +++ b/client/grpc/tx.go @@ -3,7 +3,6 @@ package grpc import ( "context" "encoding/hex" - "log" cometbfttypes "github.com/cometbft/cometbft/types" "github.com/cosmos/cosmos-sdk/types/tx" @@ -12,13 +11,13 @@ import ( // Txs queries for all the transactions in a block. Transactions are returned // in sdk.TxResponse format which internally contains a sdk.Tx. An error is // returned if any query fails. -func (c *Client) Txs(ctx context.Context, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) { +func (c *Client) Txs(ctx context.Context, height int64, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) { txResponses := make([]*tx.GetTxResponse, 0, len(txs)) for _, tmTx := range txs { respPb, err := c.TxService.GetTx(ctx, &tx.GetTxRequest{Hash: hex.EncodeToString(tmTx.Hash())}) if err != nil { - log.Println("GetTx error:", err) + c.log.Error().Err(err).Int64("height", height).Msg("GetTx error") continue } diff --git a/internal/app/app.go b/internal/app/app.go index 940d890..25c2a97 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -39,6 +39,8 @@ import ( interchainprovider "github.com/cosmos/interchain-security/v3/x/ccv/provider" interchaintypes "github.com/cosmos/interchain-security/v3/x/ccv/types" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/rs/zerolog" "github.com/bro-n-bro/spacebox-crawler/adapter/storage" @@ -62,6 +64,9 @@ const ( ) var ( + // Version provided by ldflags + Version = "develop" + ErrStartTimeout = errors.New("start timeout") ErrShutdownTimeout = errors.New("shutdown timeout") @@ -82,7 +87,7 @@ type ( ) func New(cfg Config, l zerolog.Logger) *App { - l = l.With().Str("cmp", "app").Logger() + l = l.With().Str("version", Version).Str("cmp", "app").Logger() return &App{ log: &l, @@ -93,7 +98,7 @@ func New(cfg Config, l zerolog.Logger) *App { func (a *App) Start(ctx context.Context) error { a.log.Info().Msg("starting app") - grpcCli := grpcClient.New(a.cfg.GRPCConfig) + grpcCli := grpcClient.New(a.cfg.GRPCConfig, *a.log) rpcCli := rpcClient.New(a.cfg.RPCConfig) // TODO: use redis @@ -144,6 +149,13 @@ func (a *App) Start(ctx context.Context) error { valStatusCache.Patch(cache.WithMetrics[string, int64]("validators_status")) tallyCache.Patch(cache.WithMetrics[uint64, int64]("tally")) accCache.Patch(cache.WithMetrics[string, int64]("account")) + + promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "spacebox_crawler", + Name: "version", + Help: "Crawler version", + ConstLabels: prometheus.Labels{"version": Version}, + }).Inc() } b := broker.New(a.cfg.BrokerConfig, a.cfg.Modules, *a.log, diff --git a/internal/rep/client.go b/internal/rep/client.go index c1514a5..a6122ed 100644 --- a/internal/rep/client.go +++ b/internal/rep/client.go @@ -15,7 +15,7 @@ type ( Block(ctx context.Context, height int64) (*cometbftcoretypes.ResultBlock, error) Validators(ctx context.Context, height int64) (*cometbftcoretypes.ResultValidators, error) - Txs(ctx context.Context, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) + Txs(ctx context.Context, height int64, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) } RPCClient interface { diff --git a/pkg/worker/process.go b/pkg/worker/process.go index dbd5e93..1b02b5b 100644 --- a/pkg/worker/process.go +++ b/pkg/worker/process.go @@ -18,6 +18,7 @@ import ( const ( keyHeight = "height" + keyTxHash = "tx_hash" keyModule = "module" ) @@ -167,7 +168,7 @@ func (w *Worker) processHeight(ctx context.Context, workerIndex int, height int6 _txsDur := time.Now() - txsRes, err := w.grpcClient.Txs(ctx, block.Block.Data.Txs) + txsRes, err := w.grpcClient.Txs(ctx, height, block.Block.Data.Txs) if err != nil { w.log.Error().Err(err).Msgf("get txs error: %v", err) w.setErrorStatusWithLogging(ctx, height, err.Error()) @@ -296,6 +297,21 @@ func (w *Worker) processMessages(ctx context.Context, txs []*types.Tx) error { } func (w *Worker) processMessage(ctx context.Context, msg *codec.Any, tx *types.Tx, msgIndex int) error { + if msg == nil { + w.log.Warn().Int64(keyHeight, tx.Height).Str(keyTxHash, tx.TxHash).Msg("can't process nil message") + + if err := w.storage.InsertErrorMessage(ctx, w.tsM.NewErrorMessage(tx.Height, "message is nil")); err != nil { + w.log.Error(). + Err(err). + Int64(keyHeight, tx.Height). + Msgf("fail to insert error_message: %v", err) + + return err + } + + return nil + } + stdMsg, err := w.unpackMessage(ctx, tx.Height, msg) if err != nil { return err diff --git a/pkg/worker/utils.go b/pkg/worker/utils.go index f80bb52..59dcd89 100644 --- a/pkg/worker/utils.go +++ b/pkg/worker/utils.go @@ -62,13 +62,13 @@ func (w *Worker) unpackMessage(ctx context.Context, height int64, msg *codec.Any } if strings.HasPrefix(err.Error(), "no concrete type registered for type URL") { - w.log.Warn().Err(err).Msgf("error while unpacking message: %s", err) + w.log.Warn().Err(err).Int64(keyHeight, height).Msgf("error while unpacking message: %s", err) if err = w.storage.InsertErrorMessage(ctx, w.tsM.NewErrorMessage(height, err.Error())); err != nil { w.log.Error(). Err(err). Int64(keyHeight, height). - Msgf("Fail to insert error_message: %v", err) + Msgf("fail to insert error_message: %v", err) return nil, err } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6be40f1..2a8ec4c 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -81,6 +81,17 @@ func (w *Worker) Start(_ context.Context) error { Help: "Duration of parsed blockchain objects", }, []string{"type"}), } + + var val float64 + if w.cfg.RecoveryMode { + val = 1 + } + + promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "spacebox_crawler", + Name: "worker_recovery_mode", + Help: "Is worker recovery mode enabled", + }).Set(val) } ctx, cancel := context.WithCancel(context.Background()) @@ -188,7 +199,3 @@ func (w *Worker) Stop(_ context.Context) error { return nil } - -func (w *Worker) GetProcessMessagesFn() func(ctx context.Context, txs []*types.Tx) error { - return w.processMessages -} From 975be489addd4e43d5cb3b7584d9e3f506644943 Mon Sep 17 00:00:00 2001 From: Victor Neznaykin Date: Fri, 20 Oct 2023 22:25:14 +0400 Subject: [PATCH 2/5] fix dockerfile --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 5820b53..d9f7ae1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,7 @@ COPY . ./ RUN echo "build binary" && \ export PATH=$PATH:/usr/local/go/bin && \ go mod download && \ - go build ldflags="-X 'internal/app.Version=$version'" -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ + go build ldflags="-X 'internal/app.go.Version=$version'" -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ mkdir -p /spacebox-crawler && \ mv main /spacebox-crawler/main && \ rm -Rf /usr/local/go/src From f22afe8fa30a62fc3699abea18b64d08c298a50d Mon Sep 17 00:00:00 2001 From: Victor Neznaykin Date: Fri, 20 Oct 2023 22:34:39 +0400 Subject: [PATCH 3/5] fix dockerfile --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index d9f7ae1..88472e7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,7 @@ COPY . ./ RUN echo "build binary" && \ export PATH=$PATH:/usr/local/go/bin && \ go mod download && \ - go build ldflags="-X 'internal/app.go.Version=$version'" -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ + go build -ldflags="-X 'internal/app.Version=$version'" -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ mkdir -p /spacebox-crawler && \ mv main /spacebox-crawler/main && \ rm -Rf /usr/local/go/src From 0f3b0d8e8a5705c421b18d792c7378630249adb2 Mon Sep 17 00:00:00 2001 From: Victor Neznaykin Date: Sun, 22 Oct 2023 23:31:20 +0400 Subject: [PATCH 4/5] fix ldflags --- Dockerfile | 2 +- cmd/main.go | 5 ++++- internal/app/app.go | 21 ++++++++++----------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/Dockerfile b/Dockerfile index 88472e7..0360b1a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,7 @@ COPY . ./ RUN echo "build binary" && \ export PATH=$PATH:/usr/local/go/bin && \ go mod download && \ - go build -ldflags="-X 'internal/app.Version=$version'" -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ + go build -ldflags="-X 'main.Version=$version'" -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ mkdir -p /spacebox-crawler && \ mv main /spacebox-crawler/main && \ rm -Rf /usr/local/go/src diff --git a/cmd/main.go b/cmd/main.go index 2d59f39..caa5b40 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,6 +13,9 @@ import ( executor "github.com/bro-n-bro/spacebox-crawler/pkg/app" ) +// Version provided by ldflags +var Version = "develop" + const ( DefaultEnvFile = ".env" EnvFile = "ENV_FILE" @@ -52,7 +55,7 @@ func main() { Logger() // create an application - a := app.New(cfg, logger) + a := app.New(cfg, Version, logger) // run service if err := executor.Run(a); err != nil { diff --git a/internal/app/app.go b/internal/app/app.go index 25c2a97..a8f7f4f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -64,9 +64,6 @@ const ( ) var ( - // Version provided by ldflags - Version = "develop" - ErrStartTimeout = errors.New("start timeout") ErrShutdownTimeout = errors.New("shutdown timeout") @@ -76,9 +73,10 @@ var ( type ( App struct { - log *zerolog.Logger - cmps []cmp - cfg Config + log *zerolog.Logger + cmps []cmp + cfg Config + version string } cmp struct { Service rep.Lifecycle @@ -86,12 +84,13 @@ type ( } ) -func New(cfg Config, l zerolog.Logger) *App { - l = l.With().Str("version", Version).Str("cmp", "app").Logger() +func New(cfg Config, version string, l zerolog.Logger) *App { + l = l.With().Str("version", version).Str("cmp", "app").Logger() return &App{ - log: &l, - cfg: cfg, + log: &l, + cfg: cfg, + version: version, } } @@ -154,7 +153,7 @@ func (a *App) Start(ctx context.Context) error { Namespace: "spacebox_crawler", Name: "version", Help: "Crawler version", - ConstLabels: prometheus.Labels{"version": Version}, + ConstLabels: prometheus.Labels{"version": a.version}, }).Inc() } From 1b58ed6dc087d77091651e641d876c2c349794bd Mon Sep 17 00:00:00 2001 From: Victor Neznaykin Date: Sun, 22 Oct 2023 23:36:20 +0400 Subject: [PATCH 5/5] fix by linter --- cmd/main.go | 6 +++--- internal/app/app.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index caa5b40..f80950b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,14 +13,14 @@ import ( executor "github.com/bro-n-bro/spacebox-crawler/pkg/app" ) -// Version provided by ldflags -var Version = "develop" - const ( DefaultEnvFile = ".env" EnvFile = "ENV_FILE" ) +// Version provided by ldflags +var Version = "develop" + func main() { // try to get .env file from Environments fileName, ok := os.LookupEnv(EnvFile) diff --git a/internal/app/app.go b/internal/app/app.go index a8f7f4f..ec33c4f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -74,9 +74,9 @@ var ( type ( App struct { log *zerolog.Logger + version string cmps []cmp cfg Config - version string } cmp struct { Service rep.Lifecycle