Skip to content

Commit

Permalink
Merge branch '143-crawler-refactoring' into raw-module-with-neutron
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Neznaykin committed Feb 3, 2024
2 parents 8b7e59c + c739a23 commit 84ce1ef
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 13 deletions.
7 changes: 7 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ MAX_CONNECTING=100
# Debug
LOG_LEVEL=info # Level of logging
RECOVERY_MODE=false # Detect panic without stop application. It will decrease index performance!!!

# Health checker
HEALTHCHECK_ENABLED=true
HEALTHCHECK_FATAL_ON_CHECK=true
HEALTHCHECK_MAX_LAST_BLOCK_LAG=1m
HEALTHCHECK_INTERVAL=10m
HEALTHCHECK_START_DELAY=1m
16 changes: 16 additions & 0 deletions adapter/storage/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
"github.com/bro-n-bro/spacebox-crawler/types"
Expand Down Expand Up @@ -116,6 +117,21 @@ func (s *Storage) GetAllBlocks(ctx context.Context) (blocks []*model.Block, err
return blocks, err
}

func (s *Storage) GetLatestBlock(ctx context.Context) (*model.Block, error) {
var block model.Block

err := s.blocksCollection.FindOne(
ctx,
bson.D{},
options.FindOne().SetSort(bson.D{{Key: "_id", Value: -1}}),
).Decode(&block)
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, types.ErrBlockNotFound
}

return &block, err
}

func (s *Storage) setErrorStatusForProcessing(ctx context.Context) error {
filter := bson.D{{Key: "status", Value: model.StatusProcessing}}
update := bson.D{{Key: "$set", Value: bson.D{
Expand Down
24 changes: 24 additions & 0 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
blocksdktypes "github.com/skip-mev/block-sdk/x/auction/types"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage"
"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
grpcClient "github.com/bro-n-bro/spacebox-crawler/client/grpc"
rpcClient "github.com/bro-n-bro/spacebox-crawler/client/rpc"
"github.com/bro-n-bro/spacebox-crawler/delivery/broker"
Expand All @@ -67,6 +68,7 @@ import (
"github.com/bro-n-bro/spacebox-crawler/modules"
"github.com/bro-n-bro/spacebox-crawler/modules/core"
"github.com/bro-n-bro/spacebox-crawler/pkg/cache"
healthchecker "github.com/bro-n-bro/spacebox-crawler/pkg/health_checker"
tb "github.com/bro-n-bro/spacebox-crawler/pkg/mapper/to_broker"
ts "github.com/bro-n-bro/spacebox-crawler/pkg/mapper/to_storage"
"github.com/bro-n-bro/spacebox-crawler/pkg/worker"
Expand Down Expand Up @@ -196,6 +198,7 @@ func (a *App) Start(ctx context.Context) error {
tos = ts.NewToStorage()
wrk = worker.New(a.cfg.WorkerConfig, *a.log, brk, rpcCli, grpcCli, mds, sto, cod, *tbr, *tos)
srv = server.New(a.cfg.Server, sto, *a.log)
hc = healthchecker.New(*a.log, checkLastBlockDiff(a.cfg.HealthcheckConfig.MaxBlockLag, sto), a.cfg.HealthcheckConfig) //nolint:lll
)

MakeSDKConfig(a.cfg, sdk.GetConfig())
Expand All @@ -207,6 +210,7 @@ func (a *App) Start(ctx context.Context) error {
cmp{brk, "broker"},
cmp{wrk, "worker"},
cmp{srv, "server"},
cmp{hc, "health_checker"},
)

okCh, errCh := make(chan struct{}), make(chan error)
Expand Down Expand Up @@ -367,3 +371,23 @@ func MakeSDKConfig(cfg Config, sdkConfig *sdk.Config) {
prefix+sdk.PrefixValidator+sdk.PrefixConsensus+sdk.PrefixPublic,
)
}

// checkLastBlockDiff checks whether the block was created no later than maxDiff.
func checkLastBlockDiff(maxDiff time.Duration, storage interface {
GetLatestBlock(ctx context.Context) (*model.Block, error)
}) func(context.Context, *zerolog.Logger) bool {

return func(ctx context.Context, log *zerolog.Logger) bool {
lastBlock, err := storage.GetLatestBlock(ctx)
if err != nil {
log.Error().Err(err).Msg("cannot get latest block")
return true
}

if lastBlock == nil {
return true
}

return time.Since(lastBlock.Created) <= maxDiff
}
}
28 changes: 15 additions & 13 deletions internal/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ import (
"github.com/bro-n-bro/spacebox-crawler/client/rpc"
"github.com/bro-n-bro/spacebox-crawler/delivery/broker"
"github.com/bro-n-bro/spacebox-crawler/delivery/server"
healthchecker "github.com/bro-n-bro/spacebox-crawler/pkg/health_checker"
"github.com/bro-n-bro/spacebox-crawler/pkg/worker"
)

type Config struct {
ChainPrefix string `env:"CHAIN_PREFIX"`
DefaultDenom string `env:"DEFAULT_DENOM" envDefault:"uatom"`
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
Server server.Config
Modules []string `env:"MODULES" required:"true"`
GRPCConfig grpc.Config
RPCConfig rpc.Config
BrokerConfig broker.Config
StorageConfig storage.Config
WorkerConfig worker.Config
StartTimeout time.Duration `env:"START_TIMEOUT"`
StopTimeout time.Duration `env:"STOP_TIMEOUT"`
MetricsEnabled bool `env:"METRICS_ENABLED" envDefault:"false"`
ChainPrefix string `env:"CHAIN_PREFIX"`
DefaultDenom string `env:"DEFAULT_DENOM" envDefault:"uatom"`
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
Server server.Config
Modules []string `env:"MODULES" required:"true"`
GRPCConfig grpc.Config
RPCConfig rpc.Config
BrokerConfig broker.Config
StorageConfig storage.Config
WorkerConfig worker.Config
HealthcheckConfig healthchecker.Config
StartTimeout time.Duration `env:"START_TIMEOUT"`
StopTimeout time.Duration `env:"STOP_TIMEOUT"`
MetricsEnabled bool `env:"METRICS_ENABLED" envDefault:"false"`
}
11 changes: 11 additions & 0 deletions pkg/health_checker/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package healthchecker

import "time"

type Config struct {
MaxBlockLag time.Duration `env:"HEALTHCHECK_MAX_LAST_BLOCK_LAG" envDefault:"5m"`
Interval time.Duration `env:"HEALTHCHECK_INTERVAL" envDefault:"1m"`
StartDelay time.Duration `env:"HEALTHCHECK_START_DELAY"`
Enabled bool `env:"HEALTHCHECK_ENABLED" envDefault:"false"`
FatalOnCheck bool `env:"HEALTHCHECK_FATAL_ON_CHECK" envDefault:"true"`
}
91 changes: 91 additions & 0 deletions pkg/health_checker/health_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package healthchecker

import (
"context"
"time"

"github.com/rs/zerolog"
)

const defaultInterval = time.Minute

type (
CheckFn func(ctx context.Context, log *zerolog.Logger) bool

Checker struct {
log *zerolog.Logger
cancel context.CancelFunc
isHealth CheckFn
cfg Config
}
)

func New(l zerolog.Logger, checkFn CheckFn, cfg Config) *Checker {
l = l.With().Str("cmp", "healthchecker").Logger()

if cfg.Interval == 0 {
cfg.Interval = defaultInterval
}

return &Checker{
log: &l,
isHealth: checkFn,
cfg: cfg,
}
}

func (c *Checker) Start(_ context.Context) error {
if !c.cfg.Enabled {
return nil
}

go c.run()
return nil
}

func (c *Checker) Stop(_ context.Context) error {
if !c.cfg.Enabled {
return nil
}

c.cancel()
return nil
}

func (c *Checker) run() {
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel

if c.cfg.StartDelay > 0 {
<-time.Tick(c.cfg.StartDelay) //nolint:staticcheck
}

ticker := time.NewTicker(c.cfg.Interval)
defer ticker.Stop()

c.log.Debug().Msg("checker started")

for {
select {
case <-ctx.Done():
c.log.Info().Msg("checker stopped")
return
case <-ticker.C:
c.log.Info().Msg("checking health")

func() {
ctx2, cancel2 := context.WithTimeout(ctx, c.cfg.Interval/2)
defer cancel2()

if !c.isHealth(ctx2, c.log) {
if c.cfg.FatalOnCheck {
c.log.Fatal().Msg("service is not healthy")
return
}

c.log.Warn().Msg("service is not healthy")
}
}()
}
}
}

0 comments on commit 84ce1ef

Please sign in to comment.