Skip to content

Commit

Permalink
Merge pull request #152 from bro-n-bro/143-crawler-refactoring
Browse files Browse the repository at this point in the history
143 crawler refactoring
  • Loading branch information
malekvictor authored Mar 6, 2024
2 parents 31dcaf1 + fc939ab commit 918a5f6
Show file tree
Hide file tree
Showing 22 changed files with 638 additions and 302 deletions.
10 changes: 9 additions & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Application settings
START_TIMEOUT=20s # Start application timeout duration
STOP_TIMEOUT=20s # Stop application timeout duration
MODULES=auth,authz,bandwidth,bank,core,distribution,dmn,feegrant,gov,graph,grid,ibc,liquidity,mint,rank,slashing,staking,wasm # Modules for processing
MODULES=auth,authz,bandwidth,bank,core,distribution,dmn,feegrant,gov,graph,grid,ibc,liquidity,mint,rank,slashing,staking,wasm,raw # Modules for processing
CHAIN_PREFIX=cosmos # Prefix of indexing chain
DEFAULT_DEMON=uatom # Default demon of chain coins
PARSE_AVATAR_URL=false # Parse avatar url for validator from keychain. It will decrease index performance!!!
Expand Down Expand Up @@ -31,6 +31,7 @@ START_HEIGHT=13071519 # Start block height
STOP_HEIGHT=0 # Stop block height
PROCESS_ERROR_BLOCKS_INTERVAL=1m # Interval to reprocess error blocks again
PROCESS_GENESIS=true # Parse 0 height of genesis
MAX_MESSAGE_MAX_BYTES=5242880 # Max message size in bytes (5MB)

# Mongo settings
MONGO_CRAWLER_URI=mongodb://localhost:27018/spacebox # Database connection url
Expand All @@ -42,3 +43,10 @@ MAX_CONNECTING=100
# Debug
LOG_LEVEL=info # Level of logging
RECOVERY_MODE=false # Detect panic without stop application. It will decrease index performance!!!

# Health checker
HEALTHCHECK_ENABLED=true
HEALTHCHECK_FATAL_ON_CHECK=true
HEALTHCHECK_MAX_LAST_BLOCK_LAG=1m
HEALTHCHECK_INTERVAL=10m
HEALTHCHECK_START_DELAY=1m
16 changes: 16 additions & 0 deletions adapter/storage/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
"github.com/bro-n-bro/spacebox-crawler/types"
Expand Down Expand Up @@ -116,6 +117,21 @@ func (s *Storage) GetAllBlocks(ctx context.Context) (blocks []*model.Block, err
return blocks, err
}

func (s *Storage) GetLatestBlock(ctx context.Context) (*model.Block, error) {
var block model.Block

err := s.blocksCollection.FindOne(
ctx,
bson.D{},
options.FindOne().SetSort(bson.D{{Key: "_id", Value: -1}}),
).Decode(&block)
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, types.ErrBlockNotFound
}

return &block, err
}

func (s *Storage) setErrorStatusForProcessing(ctx context.Context) error {
filter := bson.D{{Key: "status", Value: model.StatusProcessing}}
update := bson.D{{Key: "$set", Value: bson.D{
Expand Down
19 changes: 19 additions & 0 deletions client/rpc/block_results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package rpc

import (
"context"

coretypes "github.com/cometbft/cometbft/rpc/core/types"
)

func (c *Client) GetBlockResults(ctx context.Context, height int64) (*coretypes.ResultBlockResults, error) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()

result, err := c.RPCClient.BlockResults(ctx, &height)
if err != nil {
return nil, err
}

return result, nil
}
62 changes: 3 additions & 59 deletions delivery/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,9 @@ func (b *Broker) Start(ctx context.Context) error {
return errors.Wrap(err, MsgErrCreateAdminClient)
}

// get enabled topics based on enabled modules
topics := b.getCurrentTopics(b.modules)
kafkaTopics := make([]kafka.TopicSpecification, len(topics))
kafkaTopics := make([]kafka.TopicSpecification, len(allTopics))
// kafkaPartitions := make([]kafka.PartitionsSpecification, len(topics))
for i, topic := range topics {
for i, topic := range allTopics {
kafkaTopics[i] = kafka.TopicSpecification{
Topic: topic,
NumPartitions: b.cfg.PartitionsCount,
Expand All @@ -98,7 +96,7 @@ func (b *Broker) Start(ctx context.Context) error {
// create a producer connection
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": b.cfg.ServerURL,
"message.max.bytes": 5 << 20, // 5 MB
"message.max.bytes": b.cfg.MaxMessageBytes,
})
if err != nil {
b.log.Error().Err(err).Msg(MsgErrCreateProducer)
Expand Down Expand Up @@ -178,60 +176,6 @@ func (b *Broker) produce(topic Topic, data []byte) error {
return nil
}

// getCurrentTopics returns the list of topics based on enabled modules.
// nolint:gocyclo
func (b *Broker) getCurrentTopics(modules []string) []string {
topics := make([]string, 0)

for _, m := range modules {
switch m {
case "auth":
topics = append(topics, authTopics.ToStringSlice()...)
case "bank":
topics = append(topics, bankTopics.ToStringSlice()...)
case "gov":
topics = append(topics, govTopics.ToStringSlice()...)
case "mint":
topics = append(topics, mintTopics.ToStringSlice()...)
case "staking":
topics = append(topics, stakingTopics.ToStringSlice()...)
case "distribution":
topics = append(topics, distributionTopics.ToStringSlice()...)
case "core":
topics = append(topics, coreTopics.ToStringSlice()...)
case "authz":
topics = append(topics, authzTopics.ToStringSlice()...)
case "feegrant":
topics = append(topics, feegrantTopics.ToStringSlice()...)
case "slashing":
topics = append(topics, slashingTopics.ToStringSlice()...)
case "ibc":
topics = append(topics, ibcTopics.ToStringSlice()...)
case "liquidity":
topics = append(topics, liquidityTopics.ToStringSlice()...)
case "graph":
topics = append(topics, graphTopics.ToStringSlice()...)
case "bandwidth":
topics = append(topics, bandwidthTopics.ToStringSlice()...)
case "dmn":
topics = append(topics, dmnTopics.ToStringSlice()...)
case "grid":
topics = append(topics, gridTopics.ToStringSlice()...)
case "rank":
topics = append(topics, rankTopics.ToStringSlice()...)
case "resources":
topics = append(topics, resourcesTopics.ToStringSlice()...)
case "wasm":
topics = append(topics, wasmTopics.ToStringSlice()...)
default:
b.log.Warn().Str("name", m).Msg("unknown module in config")
continue
}
}

return removeDuplicates(topics)
}

func WithValidatorCache(valCache cache[string, int64]) func(b *Broker) {
return func(b *Broker) {
b.cache.validator = valCache
Expand Down
1 change: 1 addition & 0 deletions delivery/broker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ type (
Config struct {
ServerURL string `env:"BROKER_SERVER"`
PartitionsCount int `env:"PARTITIONS_COUNT" envDefault:"1"`
MaxMessageBytes int `env:"MAX_MESSAGE_MAX_BYTES" envDefault:"5242880"` // 5MB
Enabled bool `env:"BROKER_ENABLED"`
}
)
21 changes: 21 additions & 0 deletions delivery/broker/raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package broker

import (
"context"
)

func (b *Broker) PublishRawBlock(_ context.Context, block interface{}) error {
return b.marshalAndProduce(RawBlock, block)
}

func (b *Broker) PublishRawTransaction(_ context.Context, tx interface{}) error {
return b.marshalAndProduce(RawTransaction, tx)
}

func (b *Broker) PublishRawBlockResults(_ context.Context, br interface{}) error {
return b.marshalAndProduce(RawBlockResults, br)
}

func (b *Broker) PublishRawGenesis(_ context.Context, g interface{}) error {
return b.marshalAndProduce(RawGenesis, g)
}
66 changes: 42 additions & 24 deletions delivery/broker/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,61 @@ package broker

var (
Account Topic = newTopic("account")
AuthzGrant Topic = newTopic("authz_grant")
AcknowledgementMessage Topic = newTopic("acknowledgement_message")
AccountBalance Topic = newTopic("account_balance")
AcknowledgementMessage Topic = newTopic("acknowledgement_message")
AnnualProvision Topic = newTopic("annual_provision")
Block Topic = newTopic("block")
AuthzGrant Topic = newTopic("authz_grant")
BandwidthParams Topic = newTopic("bandwidth_params")
Block Topic = newTopic("block")
CancelUnbondingDelegationMessage Topic = newTopic("cancel_unbonding_delegation_message")
CommunityPool Topic = newTopic("community_pool")
CyberlinkMessage Topic = newTopic("cyberlink_message")
Cyberlink Topic = newTopic("cyberlink")
CreateRouteMessage Topic = newTopic("create_route_message")
CreateValidatorMessage Topic = newTopic("create_validator_message")
DistributionCommission Topic = newTopic("distribution_commission")
DistributionReward Topic = newTopic("distribution_reward")
DistributionParams Topic = newTopic("distribution_params")
DelegationReward Topic = newTopic("delegation_reward")
DelegationRewardMessage Topic = newTopic("delegation_reward_message")
Cyberlink Topic = newTopic("cyberlink")
CyberlinkMessage Topic = newTopic("cyberlink_message")
DMNParams Topic = newTopic("dmn_params")
Delegation Topic = newTopic("delegation")
DelegationMessage Topic = newTopic("delegation_message")
DelegationReward Topic = newTopic("delegation_reward")
DelegationRewardMessage Topic = newTopic("delegation_reward_message")
DeleteRouteMessage Topic = newTopic("delete_route_message")
EditRouteNameMessage Topic = newTopic("edit_route_name_message")
EditRouteMessage Topic = newTopic("edit_route_message")
CreateRouteMessage Topic = newTopic("create_route_message")
DenomTrace Topic = newTopic("denom_trace")
DMNParams Topic = newTopic("dmn_params")
DistributionCommission Topic = newTopic("distribution_commission")
DistributionParams Topic = newTopic("distribution_params")
DistributionReward Topic = newTopic("distribution_reward")
EditRouteMessage Topic = newTopic("edit_route_message")
EditRouteNameMessage Topic = newTopic("edit_route_name_message")
EditValidatorMessage Topic = newTopic("edit_validator_message")
ExecMessage Topic = newTopic("exec_message")
FeeAllowance Topic = newTopic("fee_allowance")
GovParams Topic = newTopic("gov_params")
GrantMessage Topic = newTopic("grant_message")
GrantAllowanceMessage Topic = newTopic("grant_allowance_message")
GrantMessage Topic = newTopic("grant_message")
GridParams Topic = newTopic("grid_params")
HandleValidatorSignature Topic = newTopic("handle_validator_signature")
InvestmintMessage Topic = newTopic("investmint_message")
LiquidityPool Topic = newTopic("liquidity_pool")
Message Topic = newTopic("message")
MintParams Topic = newTopic("mint_params")
MultiSendMessage Topic = newTopic("multisend_message")
Particle Topic = newTopic("particle")
Proposal Topic = newTopic("proposal")
ProposalVoteMessage Topic = newTopic("proposal_vote_message")
ProposalTallyResult Topic = newTopic("proposal_tally_result")
ProposalDeposit Topic = newTopic("proposal_deposit")
ProposalDepositMessage Topic = newTopic("proposal_deposit_message")
ProposalTallyResult Topic = newTopic("proposal_tally_result")
ProposalVoteMessage Topic = newTopic("proposal_vote_message")
ProposerReward Topic = newTopic("proposer_reward")
RevokeAllowanceMessage Topic = newTopic("revoke_allowance_message")
RankParams Topic = newTopic("rank_params")
Route Topic = newTopic("route")
InvestmintMessage Topic = newTopic("investmint_message")
RawBlock Topic = newTopic("raw_block")
RawBlockResults Topic = newTopic("raw_block_results")
RawGenesis Topic = newTopic("raw_genesis")
RawTransaction Topic = newTopic("raw_transaction")
ReceivePacketMessage Topic = newTopic("receive_packet_message")
Redelegation Topic = newTopic("redelegation")
RedelegationMessage Topic = newTopic("redelegation_message")
RevokeAllowanceMessage Topic = newTopic("revoke_allowance_message")
RevokeMessage Topic = newTopic("revoke_message")
ReceivePacketMessage Topic = newTopic("receive_packet_message")
Route Topic = newTopic("route")
SendMessage Topic = newTopic("send_message")
SetWithdrawAddressMessage Topic = newTopic("set_withdraw_address_message")
SlashingParams Topic = newTopic("slashing_params")
Expand All @@ -67,11 +71,11 @@ var (
UnbondingDelegationMessage Topic = newTopic("unbonding_delegation_message")
UnjailMessage Topic = newTopic("unjail_message")
Validator Topic = newTopic("validator")
ValidatorInfo Topic = newTopic("validator_info")
ValidatorStatus Topic = newTopic("validator_status")
ValidatorDescription Topic = newTopic("validator_description")
ValidatorCommission Topic = newTopic("validator_commission")
ValidatorDescription Topic = newTopic("validator_description")
ValidatorInfo Topic = newTopic("validator_info")
ValidatorPreCommit Topic = newTopic("validator_precommit")
ValidatorStatus Topic = newTopic("validator_status")
ValidatorVotingPower Topic = newTopic("validator_voting_power")
VoteWeightedMessage Topic = newTopic("vote_weighted_message")
WithdrawValidatorCommissionMessage Topic = newTopic("withdraw_validator_commission_message")
Expand Down Expand Up @@ -120,6 +124,20 @@ var (
resourcesTopics = Topics{InvestmintMessage}

wasmTopics = Topics{Cyberlink, Particle}

rawTopics = Topics{RawBlock, RawTransaction, RawBlockResults, RawGenesis}

// allTopics is the list of all topics.
allTopics = func(tcs []Topics) []string {
stringTopics := make([]string, 0)
for _, t := range tcs {
stringTopics = append(stringTopics, t.ToStringSlice()...)
}
return removeDuplicates(stringTopics)
}([]Topics{
authTopics, bankTopics, distributionTopics, govTopics, mintTopics, stakingTopics, coreTopics, authzTopics,
feegrantTopics, slashingTopics, ibcTopics, liquidityTopics, graphTopics, bandwidthTopics, dmnTopics,
gridTopics, rankTopics, resourcesTopics, wasmTopics, rawTopics})
)

type (
Expand Down
Loading

0 comments on commit 918a5f6

Please sign in to comment.