From d0d6418986a8c9ec4adc3b6428ca1c143ad4d852 Mon Sep 17 00:00:00 2001 From: Victor Neznaykin Date: Wed, 10 Jan 2024 11:59:42 +0400 Subject: [PATCH] add custom raw module --- .env | 2 +- client/rpc/block_results.go | 19 +++++++++++++ delivery/broker/broker.go | 2 ++ delivery/broker/raw.go | 17 ++++++++++++ delivery/broker/topics.go | 53 ++++++++++++++++++++----------------- internal/rep/broker.go | 5 ++++ modules/module.go | 4 +++ modules/raw/block.go | 47 ++++++++++++++++++++++++++++++++ modules/raw/broker.go | 9 +++++++ modules/raw/module.go | 37 ++++++++++++++++++++++++++ modules/raw/transaction.go | 46 ++++++++++++++++++++++++++++++++ types/cosmos.go | 6 +++++ 12 files changed, 222 insertions(+), 25 deletions(-) create mode 100644 client/rpc/block_results.go create mode 100644 delivery/broker/raw.go create mode 100644 modules/raw/block.go create mode 100644 modules/raw/broker.go create mode 100644 modules/raw/module.go create mode 100644 modules/raw/transaction.go diff --git a/.env b/.env index 5b6ba0d..1d0292a 100644 --- a/.env +++ b/.env @@ -1,7 +1,7 @@ # Application settings START_TIMEOUT=20s # Start application timeout duration STOP_TIMEOUT=20s # Stop application timeout duration -MODULES=auth,authz,bandwidth,bank,core,distribution,dmn,feegrant,gov,graph,grid,ibc,liquidity,mint,rank,slashing,staking,wasm # Modules for processing +MODULES=auth,authz,bandwidth,bank,core,distribution,dmn,feegrant,gov,graph,grid,ibc,liquidity,mint,rank,slashing,staking,wasm,raw # Modules for processing CHAIN_PREFIX=cosmos # Prefix of indexing chain DEFAULT_DEMON=uatom # Default demon of chain coins PARSE_AVATAR_URL=false # Parse avatar url for validator from keychain. It will decrease index performance!!! diff --git a/client/rpc/block_results.go b/client/rpc/block_results.go new file mode 100644 index 0000000..d14cba3 --- /dev/null +++ b/client/rpc/block_results.go @@ -0,0 +1,19 @@ +package rpc + +import ( + "context" + + coretypes "github.com/cometbft/cometbft/rpc/core/types" +) + +func (c *Client) GetBlockResults(ctx context.Context, height int64) (*coretypes.ResultBlockResults, error) { + ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) + defer cancel() + + result, err := c.RPCClient.BlockResults(ctx, &height) + if err != nil { + return nil, err + } + + return result, nil +} diff --git a/delivery/broker/broker.go b/delivery/broker/broker.go index 5b683f4..850a1e7 100644 --- a/delivery/broker/broker.go +++ b/delivery/broker/broker.go @@ -223,6 +223,8 @@ func (b *Broker) getCurrentTopics(modules []string) []string { topics = append(topics, resourcesTopics.ToStringSlice()...) case "wasm": topics = append(topics, wasmTopics.ToStringSlice()...) + case "raw": + topics = append(topics, rawTopics.ToStringSlice()...) default: b.log.Warn().Str("name", m).Msg("unknown module in config") continue diff --git a/delivery/broker/raw.go b/delivery/broker/raw.go new file mode 100644 index 0000000..885ae64 --- /dev/null +++ b/delivery/broker/raw.go @@ -0,0 +1,17 @@ +package broker + +import ( + "context" +) + +func (b *Broker) PublishRawBlock(_ context.Context, block interface{}) error { + return b.marshalAndProduce(Account, block) +} + +func (b *Broker) PublishRawTransaction(_ context.Context, tx interface{}) error { + return b.marshalAndProduce(Account, tx) +} + +func (b *Broker) PublishRawBlockResults(_ context.Context, br interface{}) error { + return b.marshalAndProduce(Account, br) +} diff --git a/delivery/broker/topics.go b/delivery/broker/topics.go index 469bff5..3798743 100644 --- a/delivery/broker/topics.go +++ b/delivery/broker/topics.go @@ -2,57 +2,60 @@ package broker var ( Account Topic = newTopic("account") - AuthzGrant Topic = newTopic("authz_grant") - AcknowledgementMessage Topic = newTopic("acknowledgement_message") AccountBalance Topic = newTopic("account_balance") + AcknowledgementMessage Topic = newTopic("acknowledgement_message") AnnualProvision Topic = newTopic("annual_provision") - Block Topic = newTopic("block") + AuthzGrant Topic = newTopic("authz_grant") BandwidthParams Topic = newTopic("bandwidth_params") + Block Topic = newTopic("block") CancelUnbondingDelegationMessage Topic = newTopic("cancel_unbonding_delegation_message") CommunityPool Topic = newTopic("community_pool") - CyberlinkMessage Topic = newTopic("cyberlink_message") - Cyberlink Topic = newTopic("cyberlink") + CreateRouteMessage Topic = newTopic("create_route_message") CreateValidatorMessage Topic = newTopic("create_validator_message") - DistributionCommission Topic = newTopic("distribution_commission") - DistributionReward Topic = newTopic("distribution_reward") - DistributionParams Topic = newTopic("distribution_params") - DelegationReward Topic = newTopic("delegation_reward") - DelegationRewardMessage Topic = newTopic("delegation_reward_message") + Cyberlink Topic = newTopic("cyberlink") + CyberlinkMessage Topic = newTopic("cyberlink_message") + DMNParams Topic = newTopic("dmn_params") Delegation Topic = newTopic("delegation") DelegationMessage Topic = newTopic("delegation_message") + DelegationReward Topic = newTopic("delegation_reward") + DelegationRewardMessage Topic = newTopic("delegation_reward_message") DeleteRouteMessage Topic = newTopic("delete_route_message") - EditRouteNameMessage Topic = newTopic("edit_route_name_message") - EditRouteMessage Topic = newTopic("edit_route_message") - CreateRouteMessage Topic = newTopic("create_route_message") DenomTrace Topic = newTopic("denom_trace") - DMNParams Topic = newTopic("dmn_params") + DistributionCommission Topic = newTopic("distribution_commission") + DistributionParams Topic = newTopic("distribution_params") + DistributionReward Topic = newTopic("distribution_reward") + EditRouteMessage Topic = newTopic("edit_route_message") + EditRouteNameMessage Topic = newTopic("edit_route_name_message") EditValidatorMessage Topic = newTopic("edit_validator_message") ExecMessage Topic = newTopic("exec_message") FeeAllowance Topic = newTopic("fee_allowance") GovParams Topic = newTopic("gov_params") - GrantMessage Topic = newTopic("grant_message") GrantAllowanceMessage Topic = newTopic("grant_allowance_message") + GrantMessage Topic = newTopic("grant_message") GridParams Topic = newTopic("grid_params") HandleValidatorSignature Topic = newTopic("handle_validator_signature") + InvestmintMessage Topic = newTopic("investmint_message") LiquidityPool Topic = newTopic("liquidity_pool") Message Topic = newTopic("message") MintParams Topic = newTopic("mint_params") MultiSendMessage Topic = newTopic("multisend_message") Particle Topic = newTopic("particle") Proposal Topic = newTopic("proposal") - ProposalVoteMessage Topic = newTopic("proposal_vote_message") - ProposalTallyResult Topic = newTopic("proposal_tally_result") ProposalDeposit Topic = newTopic("proposal_deposit") ProposalDepositMessage Topic = newTopic("proposal_deposit_message") + ProposalTallyResult Topic = newTopic("proposal_tally_result") + ProposalVoteMessage Topic = newTopic("proposal_vote_message") ProposerReward Topic = newTopic("proposer_reward") - RevokeAllowanceMessage Topic = newTopic("revoke_allowance_message") RankParams Topic = newTopic("rank_params") - Route Topic = newTopic("route") - InvestmintMessage Topic = newTopic("investmint_message") + RawBlock Topic = newTopic("raw_block") + RawBlockResults Topic = newTopic("raw_block_results") + RawTransaction Topic = newTopic("raw_transaction") + ReceivePacketMessage Topic = newTopic("receive_packet_message") Redelegation Topic = newTopic("redelegation") RedelegationMessage Topic = newTopic("redelegation_message") + RevokeAllowanceMessage Topic = newTopic("revoke_allowance_message") RevokeMessage Topic = newTopic("revoke_message") - ReceivePacketMessage Topic = newTopic("receive_packet_message") + Route Topic = newTopic("route") SendMessage Topic = newTopic("send_message") SetWithdrawAddressMessage Topic = newTopic("set_withdraw_address_message") SlashingParams Topic = newTopic("slashing_params") @@ -67,11 +70,11 @@ var ( UnbondingDelegationMessage Topic = newTopic("unbonding_delegation_message") UnjailMessage Topic = newTopic("unjail_message") Validator Topic = newTopic("validator") - ValidatorInfo Topic = newTopic("validator_info") - ValidatorStatus Topic = newTopic("validator_status") - ValidatorDescription Topic = newTopic("validator_description") ValidatorCommission Topic = newTopic("validator_commission") + ValidatorDescription Topic = newTopic("validator_description") + ValidatorInfo Topic = newTopic("validator_info") ValidatorPreCommit Topic = newTopic("validator_precommit") + ValidatorStatus Topic = newTopic("validator_status") ValidatorVotingPower Topic = newTopic("validator_voting_power") VoteWeightedMessage Topic = newTopic("vote_weighted_message") WithdrawValidatorCommissionMessage Topic = newTopic("withdraw_validator_commission_message") @@ -120,6 +123,8 @@ var ( resourcesTopics = Topics{InvestmintMessage} wasmTopics = Topics{Cyberlink, Particle} + + rawTopics = Topics{RawBlock, RawTransaction, RawBlockResults} ) type ( diff --git a/internal/rep/broker.go b/internal/rep/broker.go index da8ee8c..fc411b4 100644 --- a/internal/rep/broker.go +++ b/internal/rep/broker.go @@ -117,4 +117,9 @@ type Broker interface { // resources module PublishInvestmintMessage(context.Context, model.InvestmintMessage) error + + // raw + PublishRawBlock(ctx context.Context, b interface{}) error + PublishRawTransaction(ctx context.Context, tx interface{}) error + PublishRawBlockResults(ctx context.Context, br interface{}) error } diff --git a/modules/module.go b/modules/module.go index dd46236..53a006b 100644 --- a/modules/module.go +++ b/modules/module.go @@ -22,6 +22,7 @@ import ( liquidityModule "github.com/bro-n-bro/spacebox-crawler/modules/liquidity" mintModule "github.com/bro-n-bro/spacebox-crawler/modules/mint" rankModule "github.com/bro-n-bro/spacebox-crawler/modules/rank" + rawModule "github.com/bro-n-bro/spacebox-crawler/modules/raw" resourcesModule "github.com/bro-n-bro/spacebox-crawler/modules/resources" slashingModule "github.com/bro-n-bro/spacebox-crawler/modules/slashing" stakingModule "github.com/bro-n-bro/spacebox-crawler/modules/staking" @@ -115,6 +116,9 @@ func BuildModules( case wasmModule.ModuleName: wasm := wasmModule.New(brk, cdc) mods.Add(wasm) + case rawModule.ModuleName: + raw := rawModule.New(brk, rpc, tbm) + mods.Add(raw) default: log.Warn().Str("name", mod).Msg("unknown module") continue diff --git a/modules/raw/block.go b/modules/raw/block.go new file mode 100644 index 0000000..ba7b59f --- /dev/null +++ b/modules/raw/block.go @@ -0,0 +1,47 @@ +package raw + +import ( + "context" + "encoding/json" + "fmt" + + jsoniter "github.com/json-iterator/go" + + "github.com/bro-n-bro/spacebox-crawler/types" +) + +func (m *Module) HandleBlock(ctx context.Context, block *types.Block) error { + rawBlock := struct { + Hash string `json:"hash"` + ProposerAddress string `json:"proposer_address"` + Block json.RawMessage `json:"block"` + TotalGas uint64 `json:"total_gas"` + NumTxs uint16 `json:"num_txs"` + }{ + TotalGas: block.TotalGas, + Hash: block.Hash, + ProposerAddress: block.ProposerAddress, + NumTxs: uint16(block.TxNum), + } + + var err error + rawBlock.Block, err = jsoniter.Marshal(block.Raw().Block) + if err != nil { + return fmt.Errorf("failed to marshal block: %w", err) + } + + if err = m.broker.PublishRawBlock(ctx, rawBlock); err != nil { + return fmt.Errorf("failed to publish raw block: %w", err) + } + + return m.publishBlockResults(ctx, block.Height) +} + +func (m *Module) publishBlockResults(ctx context.Context, height int64) error { + rawBR, err := m.rpcClient.GetBlockResults(ctx, height) + if err != nil { + return fmt.Errorf("failed to get block results: %w", err) + } + + return m.broker.PublishRawBlockResults(ctx, rawBR) +} diff --git a/modules/raw/broker.go b/modules/raw/broker.go new file mode 100644 index 0000000..2ca2ec3 --- /dev/null +++ b/modules/raw/broker.go @@ -0,0 +1,9 @@ +package raw + +import "context" + +type broker interface { + PublishRawBlock(ctx context.Context, b interface{}) error + PublishRawTransaction(ctx context.Context, tx interface{}) error + PublishRawBlockResults(ctx context.Context, br interface{}) error +} diff --git a/modules/raw/module.go b/modules/raw/module.go new file mode 100644 index 0000000..2efe476 --- /dev/null +++ b/modules/raw/module.go @@ -0,0 +1,37 @@ +package raw + +import ( + "github.com/rs/zerolog" + + rpcClient "github.com/bro-n-bro/spacebox-crawler/client/rpc" + "github.com/bro-n-bro/spacebox-crawler/modules/utils" + tb "github.com/bro-n-bro/spacebox-crawler/pkg/mapper/to_broker" + "github.com/bro-n-bro/spacebox-crawler/types" +) + +const ( + ModuleName = "raw" +) + +var ( + _ types.Module = &Module{} + _ types.BlockHandler = &Module{} +) + +type Module struct { + log *zerolog.Logger + rpcClient *rpcClient.Client + broker broker + tbM tb.ToBroker +} + +func New(b broker, cli *rpcClient.Client, tbM tb.ToBroker) *Module { + return &Module{ + log: utils.NewModuleLogger(ModuleName), + broker: b, + rpcClient: cli, + tbM: tbM, + } +} + +func (m *Module) Name() string { return ModuleName } diff --git a/modules/raw/transaction.go b/modules/raw/transaction.go new file mode 100644 index 0000000..ad0ce9c --- /dev/null +++ b/modules/raw/transaction.go @@ -0,0 +1,46 @@ +package raw + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/cosmos/gogoproto/jsonpb" + + "github.com/bro-n-bro/spacebox-crawler/types" +) + +var ( + bufPool = &sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + } + + marshler = &jsonpb.Marshaler{ + EmitDefaults: false, // Set to false if you don't want to include default values in the JSON output + } +) + +func (m *Module) HandleTx(ctx context.Context, tx *types.Tx) error { + rawTx := struct { + Signer string `json:"signer"` + TxResponse json.RawMessage `json:"tx_response"` + }{ + Signer: tx.Signer, + } + + b := bufPool.Get().(*bytes.Buffer) //nolint:forcetypeassert + b.Reset() + defer bufPool.Put(b) + + if err := marshler.Marshal(b, tx.TxResponse); err != nil { + return fmt.Errorf("failed to marshal tx response: %w", err) + } + + rawTx.TxResponse = append(rawTx.TxResponse, b.Bytes()...) + + return m.broker.PublishRawTransaction(ctx, rawTx) +} diff --git a/types/cosmos.go b/types/cosmos.go index 4fa2ff2..3679707 100644 --- a/types/cosmos.go +++ b/types/cosmos.go @@ -35,6 +35,8 @@ type ( } Block struct { + rb *cometbftcoretypes.ResultBlock + Timestamp time.Time Hash string ProposerAddress string @@ -93,6 +95,8 @@ func NewBlockFromTmBlock(blk *cometbftcoretypes.ResultBlock, totalGas uint64) *B res.ValidatorPreCommits = NewValidatorPreCommitsFromTmSignatures(blk.Block.LastCommit.Signatures) } + res.rb = blk + return res } @@ -214,3 +218,5 @@ func (tx Tx) FindAttributeByKey(event sdk.StringEvent, attrKey string) (string, func (tx Tx) Successful() bool { return tx.TxResponse.Code == 0 } + +func (b Block) Raw() *cometbftcoretypes.ResultBlock { return b.rb }