Skip to content

Commit

Permalink
Merge pull request #164 from bro-n-bro/162-clean-up-and-v2-release-pr…
Browse files Browse the repository at this point in the history
…eparation

migrate crawler to v2 version
  • Loading branch information
malekvictor authored Jun 7, 2024
2 parents c996bb7 + 17f30f3 commit 90b0d96
Show file tree
Hide file tree
Showing 214 changed files with 409 additions and 8,895 deletions.
5 changes: 1 addition & 4 deletions .env
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
# Application settings
START_TIMEOUT=20s # Start application timeout duration
STOP_TIMEOUT=20s # Stop application timeout duration
MODULES=auth,authz,bandwidth,bank,core,distribution,dmn,feegrant,gov,graph,grid,ibc,liquidity,mint,rank,slashing,staking,wasm,raw # Modules for processing
CHAIN_PREFIX=cosmos # Prefix of indexing chain
DEFAULT_DEMON=uatom # Default demon of chain coins
PARSE_AVATAR_URL=false # Parse avatar url for validator from keychain. It will decrease index performance!!!

# Server settings
SERVER_PORT=2112
Expand All @@ -16,12 +13,12 @@ GRPC_URL=http://127.0.0.1:8090 # GRPC API
GRPC_SECURE_CONNECTION=false # GRPC secure connection
GRPC_TIMEOUT=15s # GRPC requests timeout
RPC_TIMEOUT=15s # RPC requests timeout
WS_ENABLED=true # Websocket enabled

# Broker settings
BROKER_SERVER=localhost:9092 # Broker address
PARTITIONS_COUNT=1
BROKER_ENABLED=true # Publish messages to broker
BATCH_PRODUCER=false # Enable batch producer (increase performance but experimental feature)

# Worker settings
WORKERS_COUNT=8 # Count of block processing processes
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: 1.21.4
go-version: 1.22.4

- name: lint
run: make lint
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21.4-alpine as builder
FROM golang:1.22.4-alpine as builder

ARG version

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ race: dep ## Run data race detector

install-linter: ## Install golangci-lint
@mkdir -p bin
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b bin v1.55.2
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b bin v1.59.0
@bin/golangci-lint --version

lint: install-linter ## Lint the files
Expand Down
4 changes: 2 additions & 2 deletions adapter/storage/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
"github.com/bro-n-bro/spacebox-crawler/types"
"github.com/bro-n-bro/spacebox-crawler/v2/adapter/storage/model"
"github.com/bro-n-bro/spacebox-crawler/v2/types"
)

func (s *Storage) GetBlockByHeight(ctx context.Context, height int64) (*model.Block, error) {
Expand Down
2 changes: 1 addition & 1 deletion adapter/storage/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"go.mongodb.org/mongo-driver/bson"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
"github.com/bro-n-bro/spacebox-crawler/v2/adapter/storage/model"
)

func (s *Storage) InsertErrorMessage(ctx context.Context, message model.Message) error {
Expand Down
2 changes: 1 addition & 1 deletion adapter/storage/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"go.mongodb.org/mongo-driver/bson"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
"github.com/bro-n-bro/spacebox-crawler/v2/adapter/storage/model"
)

func (s *Storage) InsertErrorTx(ctx context.Context, tx model.Tx) error {
Expand Down
21 changes: 0 additions & 21 deletions client/grpc/bank.go

This file was deleted.

83 changes: 20 additions & 63 deletions client/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,15 @@ import (

"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
"github.com/cosmos/cosmos-sdk/types/tx"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
authztypes "github.com/cosmos/cosmos-sdk/x/authz"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
distributiontypes "github.com/cosmos/cosmos-sdk/x/distribution/types"
feegranttypes "github.com/cosmos/cosmos-sdk/x/feegrant"
govtypes "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1"
minttypes "github.com/cosmos/cosmos-sdk/x/mint/types"
slashingtypes "github.com/cosmos/cosmos-sdk/x/slashing/types"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
ibctransfertypes "github.com/cosmos/ibc-go/v7/modules/apps/transfer/types"
bandwidthtypes "github.com/cybercongress/go-cyber/x/bandwidth/types"
dmntypes "github.com/cybercongress/go-cyber/x/dmn/types"
graphtypes "github.com/cybercongress/go-cyber/x/graph/types"
gridtypes "github.com/cybercongress/go-cyber/x/grid/types"
ranktypes "github.com/cybercongress/go-cyber/x/rank/types"
resourcestypes "github.com/cybercongress/go-cyber/x/resources/types"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/timeout"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
liquiditytypes "github.com/bro-n-bro/spacebox-crawler/types/liquidity"
"github.com/bro-n-bro/spacebox-crawler/v2/adapter/storage/model"
)

type (
Expand All @@ -40,29 +24,13 @@ type (
}

Client struct {
SlashingQueryClient slashingtypes.QueryClient
TmsService tmservice.ServiceClient
TxService tx.ServiceClient
BankQueryClient banktypes.QueryClient
AuthQueryClient authtypes.QueryClient
GovQueryClient govtypes.QueryClient
MintQueryClient minttypes.QueryClient
StakingQueryClient stakingtypes.QueryClient
DistributionQueryClient distributiontypes.QueryClient
AuthzQueryClient authztypes.QueryClient
FeegrantQueryClient feegranttypes.QueryClient
IbcTransferQueryClient ibctransfertypes.QueryClient
LiquidityQueryClient liquiditytypes.QueryClient
GraphQueryClient graphtypes.QueryClient
BandwidthQueryClient bandwidthtypes.QueryClient
DMNQueryClient dmntypes.QueryClient
GridQueryClient gridtypes.QueryClient
RankQueryClient ranktypes.QueryClient
ResourcesQueryClient resourcestypes.QueryClient
conn *grpc.ClientConn
log *zerolog.Logger
storage storage
cfg Config
TmsService tmservice.ServiceClient
TxService tx.ServiceClient

conn *grpc.ClientConn
log *zerolog.Logger
storage storage
cfg Config
}
)

Expand All @@ -79,16 +47,20 @@ func (c *Client) Start(ctx context.Context) error {
options := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithChainUnaryInterceptor(timeout.UnaryClientInterceptor(c.cfg.Timeout)), // request timeout
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.cfg.MaxReceiveMessageSize)),
}

if c.cfg.MetricsEnabled {
cm := grpcprom.NewClientMetrics(
grpcprom.WithClientHandlingTimeHistogram())

prometheus.MustRegister(cm)

options = append(
options,
grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithChainStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithChainUnaryInterceptor(cm.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(cm.StreamClientInterceptor()),
)

grpc_prometheus.EnableClientHandlingTimeHistogram()
}

// Add required secure grpc option based on config parameter
Expand All @@ -110,27 +82,12 @@ func (c *Client) Start(ctx context.Context) error {

c.TmsService = tmservice.NewServiceClient(grpcConn)
c.TxService = tx.NewServiceClient(grpcConn)
c.BankQueryClient = banktypes.NewQueryClient(grpcConn)
c.GovQueryClient = govtypes.NewQueryClient(grpcConn)
c.MintQueryClient = minttypes.NewQueryClient(grpcConn)
c.SlashingQueryClient = slashingtypes.NewQueryClient(grpcConn)
c.StakingQueryClient = stakingtypes.NewQueryClient(grpcConn)
c.DistributionQueryClient = distributiontypes.NewQueryClient(grpcConn)
c.AuthzQueryClient = authztypes.NewQueryClient(grpcConn)
c.FeegrantQueryClient = feegranttypes.NewQueryClient(grpcConn)
c.IbcTransferQueryClient = ibctransfertypes.NewQueryClient(grpcConn)
c.LiquidityQueryClient = liquiditytypes.NewQueryClient(grpcConn)
c.AuthQueryClient = authtypes.NewQueryClient(grpcConn)
c.GraphQueryClient = graphtypes.NewQueryClient(grpcConn)
c.BandwidthQueryClient = bandwidthtypes.NewQueryClient(grpcConn)
c.DMNQueryClient = dmntypes.NewQueryClient(grpcConn)
c.GridQueryClient = gridtypes.NewQueryClient(grpcConn)
c.RankQueryClient = ranktypes.NewQueryClient(grpcConn)
c.ResourcesQueryClient = resourcestypes.NewQueryClient(grpcConn)

c.conn = grpcConn

return nil
}

func (c *Client) Stop(_ context.Context) error { return c.conn.Close() }

func (c *Client) Conn() *grpc.ClientConn { return c.conn }
9 changes: 5 additions & 4 deletions client/grpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import "time"

type (
Config struct {
Host string `env:"GRPC_URL" envDefault:"http://localhost:9090"`
SecureConnection bool `env:"GRPC_SECURE_CONNECTION" envDefault:"false"`
MetricsEnabled bool `env:"METRICS_ENABLED" envDefault:"false"`
Timeout time.Duration `env:"GRPC_TIMEOUT" envDefault:"15s"`
Host string `env:"GRPC_URL" envDefault:"http://localhost:9090"`
SecureConnection bool `env:"GRPC_SECURE_CONNECTION" envDefault:"false"`
MetricsEnabled bool `env:"METRICS_ENABLED" envDefault:"false"`
MaxReceiveMessageSize int `env:"GRPC_MAX_RECEIVE_MESSAGE_SIZE_BYTES" envDefault:"5242880"` // 5MB
Timeout time.Duration `env:"GRPC_TIMEOUT" envDefault:"15s"`
}
)
2 changes: 1 addition & 1 deletion client/grpc/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
cometbfttypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/cosmos-sdk/types/tx"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
"github.com/bro-n-bro/spacebox-crawler/v2/adapter/storage/model"
)

// Txs queries for all the transactions in a block. Transactions are returned
Expand Down
18 changes: 0 additions & 18 deletions client/grpc/utils.go

This file was deleted.

2 changes: 1 addition & 1 deletion client/rpc/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package rpc
import (
"context"

"github.com/bro-n-bro/spacebox-crawler/types"
"github.com/bro-n-bro/spacebox-crawler/v2/types"
)

// GetBlockEvents returns begin block and end block events.
Expand Down
54 changes: 25 additions & 29 deletions client/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package rpc

import (
"context"
"net/http"

cometbftHttp "github.com/cometbft/cometbft/rpc/client/http"
jsonrpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type Client struct {
Expand All @@ -20,43 +22,37 @@ func New(cfg Config) *Client {
return &Client{cfg: cfg}
}

func (c *Client) Start(ctx context.Context) error {
// FIXME: does not work without websocket connection
var rpcCli *cometbftHttp.HTTP
if c.cfg.WSEnabled {
var err error
rpcCli, err = cometbftHttp.NewWithTimeout(c.cfg.Host, "/websocket", uint(c.cfg.Timeout.Seconds()))
if err != nil {
return err
}

if err = rpcCli.Start(); err != nil {
return err
}
} else {
var err error
rpcCli, err = cometbftHttp.NewWithTimeout(c.cfg.Host, "", uint(c.cfg.Timeout.Seconds()))
if err != nil {
return err
}
if err = rpcCli.Start(); err != nil {
return err
}
func (c *Client) Start(_ context.Context) error {
httpClient, err := jsonrpcclient.DefaultHTTPClient(c.cfg.Host)
if err != nil {
return err
}

c.RPCClient = rpcCli
httpClient.Timeout = c.cfg.Timeout

if c.cfg.MetricsEnabled {
httpClient.Transport = promhttp.InstrumentRoundTripperInFlight(inFlightGauge,
promhttp.InstrumentRoundTripperCounter(counter,
promhttp.InstrumentRoundTripperDuration(histVec, http.DefaultTransport)),
)
}

c.RPCClient, err = cometbftHttp.NewWithClient(c.cfg.Host, "/websocket", httpClient)
if err != nil {
return err
}

if err = c.RPCClient.Start(); err != nil {
return err
}

return nil
}

func (c *Client) Stop(_ context.Context) error {
if c.cfg.WSEnabled {
if err := c.RPCClient.Stop(); err != nil {
return err
}
if err := c.RPCClient.Stop(); err != nil {
return err
}

return nil
}

func (c *Client) WsEnabled() bool { return c.cfg.WSEnabled }
6 changes: 3 additions & 3 deletions client/rpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package rpc
import "time"

type Config struct {
Host string `env:"RPC_URL" envDefault:"http://localhost:26657"`
WSEnabled bool `env:"WS_ENABLED" envDefault:"true"`
Timeout time.Duration `env:"RPC_TIMEOUT" envDefault:"15s"`
Host string `env:"RPC_URL" envDefault:"http://localhost:26657"`
MetricsEnabled bool `env:"METRICS_ENABLED" envDefault:"false"`
Timeout time.Duration `env:"RPC_TIMEOUT" envDefault:"15s"`
}
30 changes: 30 additions & 0 deletions client/rpc/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package rpc

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
inFlightGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "client_in_flight_requests",
Help: "A gauge of in-flight requests for the wrapped client.",
})

counter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "client_api_requests_total",
Help: "A counter for requests from the wrapped client.",
},
[]string{"code", "method"},
)

histVec = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_duration_seconds",
Help: "A histogram of request latencies.",
Buckets: prometheus.DefBuckets,
},
[]string{"method"},
)
)
Loading

0 comments on commit 90b0d96

Please sign in to comment.