diff --git a/.github/workflows/build_and_push.yml b/.github/workflows/build_and_push.yml index b7c5758..dcd41b7 100644 --- a/.github/workflows/build_and_push.yml +++ b/.github/workflows/build_and_push.yml @@ -21,10 +21,10 @@ jobs: - name: Build specify tag run: | echo 'build image: bronbro/spacebox-crawler:${{github.ref_name}}' - docker build -t bronbro/spacebox-crawler:${{github.ref_name}} --target=app . + docker build -t bronbro/spacebox-crawler:${{github.ref_name}} --target=app --build-arg version=${{github.ref_name}} . - name: Build latest tag if: startsWith(github.ref, 'refs/tags/v') - run: docker build -t bronbro/spacebox-crawler:latest --target=app . + run: docker build -t bronbro/spacebox-crawler:latest --target=app --build-arg version=latest . - name: push specify tag to registry run: docker push bronbro/spacebox-crawler:${{github.ref_name}} - name: push latest tag to registry diff --git a/Dockerfile b/Dockerfile index 1902705..0360b1a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,7 @@ FROM golang:1.20.5-alpine as builder +ARG version + ENV CGO_ENABLED=1 RUN apk update && apk add --no-cache make git build-base musl-dev librdkafka librdkafka-dev @@ -9,7 +11,7 @@ COPY . ./ RUN echo "build binary" && \ export PATH=$PATH:/usr/local/go/bin && \ go mod download && \ - go build -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ + go build -ldflags="-X 'main.Version=$version'" -tags musl /go/src/github.com/spacebox-crawler/cmd/main.go && \ mkdir -p /spacebox-crawler && \ mv main /spacebox-crawler/main && \ rm -Rf /usr/local/go/src diff --git a/client/grpc/client.go b/client/grpc/client.go index bed97f1..e10ecd7 100644 --- a/client/grpc/client.go +++ b/client/grpc/client.go @@ -18,6 +18,7 @@ import ( stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" ibctransfertypes "github.com/cosmos/ibc-go/v7/modules/apps/transfer/types" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/rs/zerolog" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -40,11 +41,14 @@ type Client struct { IbcTransferQueryClient ibctransfertypes.QueryClient LiquidityQueryClient liquiditytypes.QueryClient conn *grpc.ClientConn + log *zerolog.Logger cfg Config } -func New(cfg Config) *Client { - return &Client{cfg: cfg} +func New(cfg Config, l zerolog.Logger) *Client { + l = l.With().Str("cmp", "grpc-client").Logger() + + return &Client{cfg: cfg, log: &l} } func (c *Client) Start(ctx context.Context) error { diff --git a/client/grpc/tx.go b/client/grpc/tx.go index da4f5f4..c5cf021 100644 --- a/client/grpc/tx.go +++ b/client/grpc/tx.go @@ -3,7 +3,6 @@ package grpc import ( "context" "encoding/hex" - "log" cometbfttypes "github.com/cometbft/cometbft/types" "github.com/cosmos/cosmos-sdk/types/tx" @@ -12,13 +11,13 @@ import ( // Txs queries for all the transactions in a block. Transactions are returned // in sdk.TxResponse format which internally contains a sdk.Tx. An error is // returned if any query fails. -func (c *Client) Txs(ctx context.Context, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) { +func (c *Client) Txs(ctx context.Context, height int64, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) { txResponses := make([]*tx.GetTxResponse, 0, len(txs)) for _, tmTx := range txs { respPb, err := c.TxService.GetTx(ctx, &tx.GetTxRequest{Hash: hex.EncodeToString(tmTx.Hash())}) if err != nil { - log.Println("GetTx error:", err) + c.log.Error().Err(err).Int64("height", height).Msg("GetTx error") continue } diff --git a/cmd/main.go b/cmd/main.go index 2d59f39..f80950b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,6 +18,9 @@ const ( EnvFile = "ENV_FILE" ) +// Version provided by ldflags +var Version = "develop" + func main() { // try to get .env file from Environments fileName, ok := os.LookupEnv(EnvFile) @@ -52,7 +55,7 @@ func main() { Logger() // create an application - a := app.New(cfg, logger) + a := app.New(cfg, Version, logger) // run service if err := executor.Run(a); err != nil { diff --git a/internal/app/app.go b/internal/app/app.go index 940d890..ec33c4f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -39,6 +39,8 @@ import ( interchainprovider "github.com/cosmos/interchain-security/v3/x/ccv/provider" interchaintypes "github.com/cosmos/interchain-security/v3/x/ccv/types" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/rs/zerolog" "github.com/bro-n-bro/spacebox-crawler/adapter/storage" @@ -71,9 +73,10 @@ var ( type ( App struct { - log *zerolog.Logger - cmps []cmp - cfg Config + log *zerolog.Logger + version string + cmps []cmp + cfg Config } cmp struct { Service rep.Lifecycle @@ -81,19 +84,20 @@ type ( } ) -func New(cfg Config, l zerolog.Logger) *App { - l = l.With().Str("cmp", "app").Logger() +func New(cfg Config, version string, l zerolog.Logger) *App { + l = l.With().Str("version", version).Str("cmp", "app").Logger() return &App{ - log: &l, - cfg: cfg, + log: &l, + cfg: cfg, + version: version, } } func (a *App) Start(ctx context.Context) error { a.log.Info().Msg("starting app") - grpcCli := grpcClient.New(a.cfg.GRPCConfig) + grpcCli := grpcClient.New(a.cfg.GRPCConfig, *a.log) rpcCli := rpcClient.New(a.cfg.RPCConfig) // TODO: use redis @@ -144,6 +148,13 @@ func (a *App) Start(ctx context.Context) error { valStatusCache.Patch(cache.WithMetrics[string, int64]("validators_status")) tallyCache.Patch(cache.WithMetrics[uint64, int64]("tally")) accCache.Patch(cache.WithMetrics[string, int64]("account")) + + promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "spacebox_crawler", + Name: "version", + Help: "Crawler version", + ConstLabels: prometheus.Labels{"version": a.version}, + }).Inc() } b := broker.New(a.cfg.BrokerConfig, a.cfg.Modules, *a.log, diff --git a/internal/rep/client.go b/internal/rep/client.go index c1514a5..a6122ed 100644 --- a/internal/rep/client.go +++ b/internal/rep/client.go @@ -15,7 +15,7 @@ type ( Block(ctx context.Context, height int64) (*cometbftcoretypes.ResultBlock, error) Validators(ctx context.Context, height int64) (*cometbftcoretypes.ResultValidators, error) - Txs(ctx context.Context, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) + Txs(ctx context.Context, height int64, txs cometbfttypes.Txs) ([]*tx.GetTxResponse, error) } RPCClient interface { diff --git a/pkg/worker/process.go b/pkg/worker/process.go index dbd5e93..1b02b5b 100644 --- a/pkg/worker/process.go +++ b/pkg/worker/process.go @@ -18,6 +18,7 @@ import ( const ( keyHeight = "height" + keyTxHash = "tx_hash" keyModule = "module" ) @@ -167,7 +168,7 @@ func (w *Worker) processHeight(ctx context.Context, workerIndex int, height int6 _txsDur := time.Now() - txsRes, err := w.grpcClient.Txs(ctx, block.Block.Data.Txs) + txsRes, err := w.grpcClient.Txs(ctx, height, block.Block.Data.Txs) if err != nil { w.log.Error().Err(err).Msgf("get txs error: %v", err) w.setErrorStatusWithLogging(ctx, height, err.Error()) @@ -296,6 +297,21 @@ func (w *Worker) processMessages(ctx context.Context, txs []*types.Tx) error { } func (w *Worker) processMessage(ctx context.Context, msg *codec.Any, tx *types.Tx, msgIndex int) error { + if msg == nil { + w.log.Warn().Int64(keyHeight, tx.Height).Str(keyTxHash, tx.TxHash).Msg("can't process nil message") + + if err := w.storage.InsertErrorMessage(ctx, w.tsM.NewErrorMessage(tx.Height, "message is nil")); err != nil { + w.log.Error(). + Err(err). + Int64(keyHeight, tx.Height). + Msgf("fail to insert error_message: %v", err) + + return err + } + + return nil + } + stdMsg, err := w.unpackMessage(ctx, tx.Height, msg) if err != nil { return err diff --git a/pkg/worker/utils.go b/pkg/worker/utils.go index f80bb52..59dcd89 100644 --- a/pkg/worker/utils.go +++ b/pkg/worker/utils.go @@ -62,13 +62,13 @@ func (w *Worker) unpackMessage(ctx context.Context, height int64, msg *codec.Any } if strings.HasPrefix(err.Error(), "no concrete type registered for type URL") { - w.log.Warn().Err(err).Msgf("error while unpacking message: %s", err) + w.log.Warn().Err(err).Int64(keyHeight, height).Msgf("error while unpacking message: %s", err) if err = w.storage.InsertErrorMessage(ctx, w.tsM.NewErrorMessage(height, err.Error())); err != nil { w.log.Error(). Err(err). Int64(keyHeight, height). - Msgf("Fail to insert error_message: %v", err) + Msgf("fail to insert error_message: %v", err) return nil, err } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6be40f1..2a8ec4c 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -81,6 +81,17 @@ func (w *Worker) Start(_ context.Context) error { Help: "Duration of parsed blockchain objects", }, []string{"type"}), } + + var val float64 + if w.cfg.RecoveryMode { + val = 1 + } + + promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "spacebox_crawler", + Name: "worker_recovery_mode", + Help: "Is worker recovery mode enabled", + }).Set(val) } ctx, cancel := context.WithCancel(context.Background()) @@ -188,7 +199,3 @@ func (w *Worker) Stop(_ context.Context) error { return nil } - -func (w *Worker) GetProcessMessagesFn() func(ctx context.Context, txs []*types.Tx) error { - return w.processMessages -}