Skip to content

Commit

Permalink
add custom raw module
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Neznaykin committed Jan 10, 2024
1 parent 76e3ce4 commit d0d6418
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Application settings
START_TIMEOUT=20s # Start application timeout duration
STOP_TIMEOUT=20s # Stop application timeout duration
MODULES=auth,authz,bandwidth,bank,core,distribution,dmn,feegrant,gov,graph,grid,ibc,liquidity,mint,rank,slashing,staking,wasm # Modules for processing
MODULES=auth,authz,bandwidth,bank,core,distribution,dmn,feegrant,gov,graph,grid,ibc,liquidity,mint,rank,slashing,staking,wasm,raw # Modules for processing
CHAIN_PREFIX=cosmos # Prefix of indexing chain
DEFAULT_DEMON=uatom # Default demon of chain coins
PARSE_AVATAR_URL=false # Parse avatar url for validator from keychain. It will decrease index performance!!!
Expand Down
19 changes: 19 additions & 0 deletions client/rpc/block_results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package rpc

import (
"context"

coretypes "github.com/cometbft/cometbft/rpc/core/types"
)

func (c *Client) GetBlockResults(ctx context.Context, height int64) (*coretypes.ResultBlockResults, error) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()

result, err := c.RPCClient.BlockResults(ctx, &height)
if err != nil {
return nil, err
}

return result, nil
}
2 changes: 2 additions & 0 deletions delivery/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ func (b *Broker) getCurrentTopics(modules []string) []string {
topics = append(topics, resourcesTopics.ToStringSlice()...)
case "wasm":
topics = append(topics, wasmTopics.ToStringSlice()...)
case "raw":
topics = append(topics, rawTopics.ToStringSlice()...)
default:
b.log.Warn().Str("name", m).Msg("unknown module in config")
continue
Expand Down
17 changes: 17 additions & 0 deletions delivery/broker/raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package broker

import (
"context"
)

func (b *Broker) PublishRawBlock(_ context.Context, block interface{}) error {
return b.marshalAndProduce(Account, block)
}

func (b *Broker) PublishRawTransaction(_ context.Context, tx interface{}) error {
return b.marshalAndProduce(Account, tx)
}

func (b *Broker) PublishRawBlockResults(_ context.Context, br interface{}) error {
return b.marshalAndProduce(Account, br)
}
53 changes: 29 additions & 24 deletions delivery/broker/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,60 @@ package broker

var (
Account Topic = newTopic("account")
AuthzGrant Topic = newTopic("authz_grant")
AcknowledgementMessage Topic = newTopic("acknowledgement_message")
AccountBalance Topic = newTopic("account_balance")
AcknowledgementMessage Topic = newTopic("acknowledgement_message")
AnnualProvision Topic = newTopic("annual_provision")
Block Topic = newTopic("block")
AuthzGrant Topic = newTopic("authz_grant")
BandwidthParams Topic = newTopic("bandwidth_params")
Block Topic = newTopic("block")
CancelUnbondingDelegationMessage Topic = newTopic("cancel_unbonding_delegation_message")
CommunityPool Topic = newTopic("community_pool")
CyberlinkMessage Topic = newTopic("cyberlink_message")
Cyberlink Topic = newTopic("cyberlink")
CreateRouteMessage Topic = newTopic("create_route_message")
CreateValidatorMessage Topic = newTopic("create_validator_message")
DistributionCommission Topic = newTopic("distribution_commission")
DistributionReward Topic = newTopic("distribution_reward")
DistributionParams Topic = newTopic("distribution_params")
DelegationReward Topic = newTopic("delegation_reward")
DelegationRewardMessage Topic = newTopic("delegation_reward_message")
Cyberlink Topic = newTopic("cyberlink")
CyberlinkMessage Topic = newTopic("cyberlink_message")
DMNParams Topic = newTopic("dmn_params")
Delegation Topic = newTopic("delegation")
DelegationMessage Topic = newTopic("delegation_message")
DelegationReward Topic = newTopic("delegation_reward")
DelegationRewardMessage Topic = newTopic("delegation_reward_message")
DeleteRouteMessage Topic = newTopic("delete_route_message")
EditRouteNameMessage Topic = newTopic("edit_route_name_message")
EditRouteMessage Topic = newTopic("edit_route_message")
CreateRouteMessage Topic = newTopic("create_route_message")
DenomTrace Topic = newTopic("denom_trace")
DMNParams Topic = newTopic("dmn_params")
DistributionCommission Topic = newTopic("distribution_commission")
DistributionParams Topic = newTopic("distribution_params")
DistributionReward Topic = newTopic("distribution_reward")
EditRouteMessage Topic = newTopic("edit_route_message")
EditRouteNameMessage Topic = newTopic("edit_route_name_message")
EditValidatorMessage Topic = newTopic("edit_validator_message")
ExecMessage Topic = newTopic("exec_message")
FeeAllowance Topic = newTopic("fee_allowance")
GovParams Topic = newTopic("gov_params")
GrantMessage Topic = newTopic("grant_message")
GrantAllowanceMessage Topic = newTopic("grant_allowance_message")
GrantMessage Topic = newTopic("grant_message")
GridParams Topic = newTopic("grid_params")
HandleValidatorSignature Topic = newTopic("handle_validator_signature")
InvestmintMessage Topic = newTopic("investmint_message")
LiquidityPool Topic = newTopic("liquidity_pool")
Message Topic = newTopic("message")
MintParams Topic = newTopic("mint_params")
MultiSendMessage Topic = newTopic("multisend_message")
Particle Topic = newTopic("particle")
Proposal Topic = newTopic("proposal")
ProposalVoteMessage Topic = newTopic("proposal_vote_message")
ProposalTallyResult Topic = newTopic("proposal_tally_result")
ProposalDeposit Topic = newTopic("proposal_deposit")
ProposalDepositMessage Topic = newTopic("proposal_deposit_message")
ProposalTallyResult Topic = newTopic("proposal_tally_result")
ProposalVoteMessage Topic = newTopic("proposal_vote_message")
ProposerReward Topic = newTopic("proposer_reward")
RevokeAllowanceMessage Topic = newTopic("revoke_allowance_message")
RankParams Topic = newTopic("rank_params")
Route Topic = newTopic("route")
InvestmintMessage Topic = newTopic("investmint_message")
RawBlock Topic = newTopic("raw_block")
RawBlockResults Topic = newTopic("raw_block_results")
RawTransaction Topic = newTopic("raw_transaction")
ReceivePacketMessage Topic = newTopic("receive_packet_message")
Redelegation Topic = newTopic("redelegation")
RedelegationMessage Topic = newTopic("redelegation_message")
RevokeAllowanceMessage Topic = newTopic("revoke_allowance_message")
RevokeMessage Topic = newTopic("revoke_message")
ReceivePacketMessage Topic = newTopic("receive_packet_message")
Route Topic = newTopic("route")
SendMessage Topic = newTopic("send_message")
SetWithdrawAddressMessage Topic = newTopic("set_withdraw_address_message")
SlashingParams Topic = newTopic("slashing_params")
Expand All @@ -67,11 +70,11 @@ var (
UnbondingDelegationMessage Topic = newTopic("unbonding_delegation_message")
UnjailMessage Topic = newTopic("unjail_message")
Validator Topic = newTopic("validator")
ValidatorInfo Topic = newTopic("validator_info")
ValidatorStatus Topic = newTopic("validator_status")
ValidatorDescription Topic = newTopic("validator_description")
ValidatorCommission Topic = newTopic("validator_commission")
ValidatorDescription Topic = newTopic("validator_description")
ValidatorInfo Topic = newTopic("validator_info")
ValidatorPreCommit Topic = newTopic("validator_precommit")
ValidatorStatus Topic = newTopic("validator_status")
ValidatorVotingPower Topic = newTopic("validator_voting_power")
VoteWeightedMessage Topic = newTopic("vote_weighted_message")
WithdrawValidatorCommissionMessage Topic = newTopic("withdraw_validator_commission_message")
Expand Down Expand Up @@ -120,6 +123,8 @@ var (
resourcesTopics = Topics{InvestmintMessage}

wasmTopics = Topics{Cyberlink, Particle}

rawTopics = Topics{RawBlock, RawTransaction, RawBlockResults}
)

type (
Expand Down
5 changes: 5 additions & 0 deletions internal/rep/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,9 @@ type Broker interface {

// resources module
PublishInvestmintMessage(context.Context, model.InvestmintMessage) error

// raw
PublishRawBlock(ctx context.Context, b interface{}) error
PublishRawTransaction(ctx context.Context, tx interface{}) error
PublishRawBlockResults(ctx context.Context, br interface{}) error
}
4 changes: 4 additions & 0 deletions modules/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
liquidityModule "github.com/bro-n-bro/spacebox-crawler/modules/liquidity"
mintModule "github.com/bro-n-bro/spacebox-crawler/modules/mint"
rankModule "github.com/bro-n-bro/spacebox-crawler/modules/rank"
rawModule "github.com/bro-n-bro/spacebox-crawler/modules/raw"
resourcesModule "github.com/bro-n-bro/spacebox-crawler/modules/resources"
slashingModule "github.com/bro-n-bro/spacebox-crawler/modules/slashing"
stakingModule "github.com/bro-n-bro/spacebox-crawler/modules/staking"
Expand Down Expand Up @@ -115,6 +116,9 @@ func BuildModules(
case wasmModule.ModuleName:
wasm := wasmModule.New(brk, cdc)
mods.Add(wasm)
case rawModule.ModuleName:
raw := rawModule.New(brk, rpc, tbm)
mods.Add(raw)
default:
log.Warn().Str("name", mod).Msg("unknown module")
continue
Expand Down
47 changes: 47 additions & 0 deletions modules/raw/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package raw

import (
"context"
"encoding/json"
"fmt"

jsoniter "github.com/json-iterator/go"

"github.com/bro-n-bro/spacebox-crawler/types"
)

func (m *Module) HandleBlock(ctx context.Context, block *types.Block) error {
rawBlock := struct {
Hash string `json:"hash"`
ProposerAddress string `json:"proposer_address"`
Block json.RawMessage `json:"block"`
TotalGas uint64 `json:"total_gas"`
NumTxs uint16 `json:"num_txs"`
}{
TotalGas: block.TotalGas,
Hash: block.Hash,
ProposerAddress: block.ProposerAddress,
NumTxs: uint16(block.TxNum),
}

var err error
rawBlock.Block, err = jsoniter.Marshal(block.Raw().Block)
if err != nil {
return fmt.Errorf("failed to marshal block: %w", err)
}

if err = m.broker.PublishRawBlock(ctx, rawBlock); err != nil {
return fmt.Errorf("failed to publish raw block: %w", err)
}

return m.publishBlockResults(ctx, block.Height)
}

func (m *Module) publishBlockResults(ctx context.Context, height int64) error {
rawBR, err := m.rpcClient.GetBlockResults(ctx, height)
if err != nil {
return fmt.Errorf("failed to get block results: %w", err)
}

return m.broker.PublishRawBlockResults(ctx, rawBR)
}
9 changes: 9 additions & 0 deletions modules/raw/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package raw

import "context"

type broker interface {
PublishRawBlock(ctx context.Context, b interface{}) error
PublishRawTransaction(ctx context.Context, tx interface{}) error
PublishRawBlockResults(ctx context.Context, br interface{}) error
}
37 changes: 37 additions & 0 deletions modules/raw/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package raw

import (
"github.com/rs/zerolog"

rpcClient "github.com/bro-n-bro/spacebox-crawler/client/rpc"
"github.com/bro-n-bro/spacebox-crawler/modules/utils"
tb "github.com/bro-n-bro/spacebox-crawler/pkg/mapper/to_broker"
"github.com/bro-n-bro/spacebox-crawler/types"
)

const (
ModuleName = "raw"
)

var (
_ types.Module = &Module{}
_ types.BlockHandler = &Module{}
)

type Module struct {
log *zerolog.Logger
rpcClient *rpcClient.Client
broker broker
tbM tb.ToBroker
}

func New(b broker, cli *rpcClient.Client, tbM tb.ToBroker) *Module {
return &Module{
log: utils.NewModuleLogger(ModuleName),
broker: b,
rpcClient: cli,
tbM: tbM,
}
}

func (m *Module) Name() string { return ModuleName }
46 changes: 46 additions & 0 deletions modules/raw/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package raw

import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"

"github.com/cosmos/gogoproto/jsonpb"

"github.com/bro-n-bro/spacebox-crawler/types"
)

var (
bufPool = &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}

marshler = &jsonpb.Marshaler{
EmitDefaults: false, // Set to false if you don't want to include default values in the JSON output
}
)

func (m *Module) HandleTx(ctx context.Context, tx *types.Tx) error {
rawTx := struct {
Signer string `json:"signer"`
TxResponse json.RawMessage `json:"tx_response"`
}{
Signer: tx.Signer,
}

b := bufPool.Get().(*bytes.Buffer) //nolint:forcetypeassert
b.Reset()
defer bufPool.Put(b)

if err := marshler.Marshal(b, tx.TxResponse); err != nil {
return fmt.Errorf("failed to marshal tx response: %w", err)
}

rawTx.TxResponse = append(rawTx.TxResponse, b.Bytes()...)

return m.broker.PublishRawTransaction(ctx, rawTx)
}
6 changes: 6 additions & 0 deletions types/cosmos.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type (
}

Block struct {
rb *cometbftcoretypes.ResultBlock

Timestamp time.Time
Hash string
ProposerAddress string
Expand Down Expand Up @@ -93,6 +95,8 @@ func NewBlockFromTmBlock(blk *cometbftcoretypes.ResultBlock, totalGas uint64) *B
res.ValidatorPreCommits = NewValidatorPreCommitsFromTmSignatures(blk.Block.LastCommit.Signatures)
}

res.rb = blk

return res
}

Expand Down Expand Up @@ -214,3 +218,5 @@ func (tx Tx) FindAttributeByKey(event sdk.StringEvent, attrKey string) (string,
func (tx Tx) Successful() bool {
return tx.TxResponse.Code == 0
}

func (b Block) Raw() *cometbftcoretypes.ResultBlock { return b.rb }

0 comments on commit d0d6418

Please sign in to comment.