Skip to content

Commit

Permalink
add saving err txs to mongo storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Neznaykin committed Nov 29, 2023
1 parent e8dce60 commit ba8bb09
Showing 7 changed files with 58 additions and 9 deletions.
10 changes: 10 additions & 0 deletions adapter/storage/model/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package model

import "time"

type Tx struct {
Created time.Time
ErrorMessage string `bson:"error_message"`
Hash string `bson:"hash"`
Height int64 `bson:"height"`
}
2 changes: 2 additions & 0 deletions adapter/storage/storage.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ type Storage struct {
cli *mongo.Client
blocksCollection *mongo.Collection
messagesCollection *mongo.Collection
txCollection *mongo.Collection

cfg Config
}
@@ -61,6 +62,7 @@ func (s *Storage) Start(ctx context.Context) error {
blocksCollection := s.cli.Database("spacebox").Collection("blocks")
s.blocksCollection = blocksCollection
s.messagesCollection = s.cli.Database("spacebox").Collection("error_messages")
s.txCollection = s.cli.Database("spacebox").Collection("error_txs")

mod := mongo.IndexModel{
Keys: bson.M{"height": 1}, // index in ascending order or -1 for descending order
21 changes: 21 additions & 0 deletions adapter/storage/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package storage

import (
"context"

"go.mongodb.org/mongo-driver/bson"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
)

func (s *Storage) InsertErrorTx(ctx context.Context, tx model.Tx) error {
if _, err := s.txCollection.InsertOne(ctx, tx); err != nil {
return err
}

return nil
}

func (s *Storage) CountErrorTxs(ctx context.Context) (int64, error) {
return s.txCollection.CountDocuments(ctx, bson.D{})
}
10 changes: 8 additions & 2 deletions client/grpc/client.go
Original file line number Diff line number Diff line change
@@ -30,10 +30,15 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
liquiditytypes "github.com/bro-n-bro/spacebox-crawler/types/liquidity"
)

type (
storage interface {
InsertErrorTx(ctx context.Context, tx model.Tx) error
}

Client struct {
SlashingQueryClient slashingtypes.QueryClient
TmsService tmservice.ServiceClient
@@ -56,14 +61,15 @@ type (
ResourcesQueryClient resourcestypes.QueryClient
conn *grpc.ClientConn
log *zerolog.Logger
storage storage
cfg Config
}
)

func New(cfg Config, l zerolog.Logger) *Client {
func New(cfg Config, l zerolog.Logger, st storage) *Client {
l = l.With().Str("cmp", "grpc-client").Logger()

return &Client{cfg: cfg, log: &l}
return &Client{cfg: cfg, log: &l, storage: st}
}

func (c *Client) Start(ctx context.Context) error {
16 changes: 14 additions & 2 deletions client/grpc/tx.go
Original file line number Diff line number Diff line change
@@ -3,9 +3,12 @@ package grpc
import (
"context"
"encoding/hex"
"time"

cometbfttypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/cosmos-sdk/types/tx"

"github.com/bro-n-bro/spacebox-crawler/adapter/storage/model"
)

// Txs queries for all the transactions in a block. Transactions are returned
@@ -15,9 +18,18 @@ func (c *Client) Txs(ctx context.Context, height int64, txs cometbfttypes.Txs) (
txResponses := make([]*tx.GetTxResponse, 0, len(txs))

for _, tmTx := range txs {
respPb, err := c.TxService.GetTx(ctx, &tx.GetTxRequest{Hash: hex.EncodeToString(tmTx.Hash())})
hash := hex.EncodeToString(tmTx.Hash())

respPb, err := c.TxService.GetTx(ctx, &tx.GetTxRequest{Hash: hash})
if err != nil {
c.log.Error().Err(err).Int64("height", height).Msg("GetTx error")
_ = c.storage.InsertErrorTx(ctx, model.Tx{
Created: time.Now(),
ErrorMessage: err.Error(),
Hash: hash,
Height: height,
})

c.log.Warn().Err(err).Int64("height", height).Msg("GetTx error")
continue
}

7 changes: 2 additions & 5 deletions internal/app/app.go
Original file line number Diff line number Diff line change
@@ -103,11 +103,6 @@ func New(cfg Config, version string, l zerolog.Logger) *App {
func (a *App) Start(ctx context.Context) error {
a.log.Info().Msg("starting app")

var (
grpcCli = grpcClient.New(a.cfg.GRPCConfig, *a.log)
rpcCli = rpcClient.New(a.cfg.RPCConfig)
)

// TODO: use redis
valCache, err := cache.New[string, int64](defaultCacheSize, cache.WithCompareFunc[string, int64](lessInt64))
if err != nil {
@@ -174,6 +169,8 @@ func (a *App) Start(ctx context.Context) error {
var (
cod, amn = MakeEncodingConfig()
sto = storage.New(a.cfg.StorageConfig, *a.log)
rpcCli = rpcClient.New(a.cfg.RPCConfig)
grpcCli = grpcClient.New(a.cfg.GRPCConfig, *a.log, sto)
tbr = tb.NewToBroker(cod, amn.LegacyAmino)
par = core.JoinMessageParsers(core.CosmosMessageAddressesParser)

1 change: 1 addition & 0 deletions internal/rep/storage.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ type Storage interface {
UpdateStatus(ctx context.Context, height int64, status model.Status) error
GetErrorBlockHeights(ctx context.Context) ([]int64, error)

InsertErrorTx(ctx context.Context, message model.Tx) error
InsertErrorMessage(ctx context.Context, message model.Message) error

Ping(ctx context.Context) error

0 comments on commit ba8bb09

Please sign in to comment.