Skip to content

Commit

Permalink
Optimization: change data fields to bytes (#215)
Browse files Browse the repository at this point in the history
* Optimization: change data fields to bytes

* Fix: tests

* Use brotli for compressing message data

* Decode pub key for reducing message size

* Descrease profile sample rate

* Rebased
  • Loading branch information
aopoltorzhicky authored Jun 23, 2024
1 parent 8770c25 commit 2c5e233
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 139 deletions.
2 changes: 1 addition & 1 deletion cmd/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func initSentry(e *echo.Echo, db postgres.Storage, dsn, environment string) erro
Environment: environment,
EnableTracing: true,
TracesSampleRate: 1.0,
ProfilesSampleRate: 1.0,
ProfilesSampleRate: 0.25,
Release: os.Getenv("TAG"),
IgnoreTransactions: []string{
"GET /v1/ws",
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
cosmossdk.io/errors v1.0.0
cosmossdk.io/math v1.1.2
github.com/MarceloPetrucio/go-scalar-api-reference v0.0.0-20240521013641-ce5d2efe0e06
github.com/andybalholm/brotli v1.0.5
github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go-v2/config v1.27.11
github.com/aws/aws-sdk-go-v2/credentials v1.17.11
Expand Down Expand Up @@ -35,14 +36,15 @@ require (
github.com/labstack/echo/v4 v4.12.0
github.com/lib/pq v1.10.9
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.31.0
github.com/rs/zerolog v1.32.0
github.com/shopspring/decimal v1.3.1
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.9.0
github.com/tendermint/tendermint v0.34.29
github.com/uptrace/bun v1.1.17
github.com/uptrace/bun/dialect/pgdialect v1.1.17
github.com/uptrace/bun/driver/pgdriver v1.1.17
github.com/vmihailenco/msgpack/v5 v5.4.1
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
Expand All @@ -67,7 +69,6 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.13.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.11.4 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-sdk-go v1.44.122 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
Expand Down Expand Up @@ -173,7 +174,7 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
github.com/klauspost/compress v1.17.3 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/labstack/gommon v0.4.2 // indirect
Expand Down Expand Up @@ -235,7 +236,6 @@ require (
github.com/ulikunitz/xz v0.5.11 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/zondax/hid v0.9.2 // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/reedsolomon v1.12.1 h1:NhWgum1efX1x58daOBGCFWcxtEhOhXKKl1HAPQUp03Q=
Expand Down Expand Up @@ -947,8 +948,8 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj
github.com/rs/cors v1.8.3 h1:O+qNyWn7Z+F9M0ILBHgMVPuB1xTOucVd5gtaYyXBpRo=
github.com/rs/cors v1.8.3/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday v1.6.0 h1:KqfZb0pUVN2lYqZUYRddxF4OR8ZMURnJIG5Y3VRLtww=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand Down
37 changes: 7 additions & 30 deletions internal/storage/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ import (
"time"

pkgTypes "github.com/celenium-io/celestia-indexer/pkg/types"
jsoniter "github.com/json-iterator/go"

"github.com/celenium-io/celestia-indexer/internal/storage/types"
"github.com/dipdup-net/indexer-sdk/pkg/storage"
"github.com/uptrace/bun"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

type EventFilter struct {
Limit int
Offset int
Expand All @@ -35,36 +32,16 @@ type IEvent interface {
type Event struct {
bun.BaseModel `bun:"event" comment:"Table with celestia events."`

Id uint64 `bun:"id,pk,notnull,autoincrement" comment:"Unique internal id"`
Height pkgTypes.Level `bun:"height,notnull" comment:"The number (height) of this block" stats:"func:min max,filterable"`
Time time.Time `bun:"time,pk,notnull" comment:"The time of block" stats:"func:min max,filterable"`
Position int64 `bun:"position" comment:"Position in transaction"`
Type types.EventType `bun:",type:event_type" comment:"Event type" stats:"filterable"`
TxId *uint64 `bun:"tx_id" comment:"Transaction id"`
Data map[string]any `bun:"data,type:jsonb,nullzero" comment:"Event data"`
Id uint64 `bun:"id,pk,notnull,autoincrement" comment:"Unique internal id"`
Height pkgTypes.Level `bun:"height,notnull" comment:"The number (height) of this block" stats:"func:min max,filterable"`
Time time.Time `bun:"time,pk,notnull" comment:"The time of block" stats:"func:min max,filterable"`
Position int64 `bun:"position" comment:"Position in transaction"`
Type types.EventType `bun:",type:event_type" comment:"Event type" stats:"filterable"`
TxId *uint64 `bun:"tx_id" comment:"Transaction id"`
Data map[string]any `bun:"data,msgpack,type:bytea,nullzero" comment:"Event data"`
}

// TableName -
func (Event) TableName() string {
return "event"
}

func (e Event) Columns() []string {
return []string{
"height", "time", "position", "type",
"tx_id", "data",
}
}

func (e Event) Flat() []any {
data := []any{
e.Height, e.Time, e.Position, e.Type, e.TxId, nil,
}
if len(e.Data) > 0 {
raw, err := json.MarshalToString(e.Data)
if err == nil {
data[5] = raw
}
}
return data
}
16 changes: 8 additions & 8 deletions internal/storage/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ type IMessage interface {
type Message struct {
bun.BaseModel `bun:"message" comment:"Table with celestia messages."`

Id uint64 `bun:"id,pk,notnull,autoincrement" comment:"Unique internal id"`
Height pkgTypes.Level `bun:",notnull" comment:"The number (height) of this block" stats:"func:min max,filterable"`
Time time.Time `bun:"time,pk,notnull" comment:"The time of block" stats:"func:min max,filterable"`
Position int64 `bun:"position" comment:"Position in transaction"`
Type types.MsgType `bun:",type:msg_type" comment:"Message type" stats:"filterable"`
TxId uint64 `bun:"tx_id" comment:"Parent transaction id"`
Size int `bun:"size" comment:"Message size in bytes"`
Data map[string]any `bun:"data,type:jsonb,nullzero" comment:"Message data"`
Id uint64 `bun:"id,pk,notnull,autoincrement" comment:"Unique internal id"`
Height pkgTypes.Level `bun:",notnull" comment:"The number (height) of this block" stats:"func:min max,filterable"`
Time time.Time `bun:"time,pk,notnull" comment:"The time of block" stats:"func:min max,filterable"`
Position int64 `bun:"position" comment:"Position in transaction"`
Type types.MsgType `bun:",type:msg_type" comment:"Message type" stats:"filterable"`
TxId uint64 `bun:"tx_id" comment:"Parent transaction id"`
Size int `bun:"size" comment:"Message size in bytes"`
Data types.PackedBytes `bun:"data,type:bytea,nullzero" comment:"Message data"`

Namespace []Namespace `bun:"m2m:namespace_message,join:Message=Namespace"`
Addresses []AddressWithType `bun:"-"`
Expand Down
39 changes: 0 additions & 39 deletions internal/storage/mock/staking_log.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions internal/storage/postgres/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,6 @@ func createIndices(ctx context.Context, conn *database.Bun) error {
}

// StakingLog
if _, err := tx.NewCreateIndex().
IfNotExists().
Model((*storage.StakingLog)(nil)).
Index("staking_log_address_id_idx").
Column("address_id").
Where("address_id is not null").
Exec(ctx); err != nil {
return err
}
if _, err := tx.NewCreateIndex().
IfNotExists().
Model((*storage.StakingLog)(nil)).
Expand Down
16 changes: 0 additions & 16 deletions internal/storage/postgres/staking_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
package postgres

import (
"context"

"github.com/celenium-io/celestia-indexer/internal/storage"
"github.com/dipdup-net/go-lib/database"
"github.com/dipdup-net/indexer-sdk/pkg/storage/postgres"
Expand All @@ -22,17 +20,3 @@ func NewStakingLog(db *database.Bun) *StakingLog {
Table: postgres.NewTable[*storage.StakingLog](db),
}
}

func (d *StakingLog) ByValidator(ctx context.Context, id uint64, limit, offset int) (logs []storage.StakingLog, err error) {
query := d.DB().NewSelect().
Model(&logs).
Where("validator_id = ?", id)

query = limitScope(query, limit)
if offset > 0 {
query = query.Offset(offset)
}

err = query.Scan(ctx)
return
}
11 changes: 4 additions & 7 deletions internal/storage/postgres/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ import (
"time"

"github.com/celenium-io/celestia-indexer/pkg/types"
jsoniter "github.com/json-iterator/go"
"github.com/lib/pq"
"github.com/shopspring/decimal"
"github.com/uptrace/bun"
"github.com/vmihailenco/msgpack/v5"

models "github.com/celenium-io/celestia-indexer/internal/storage"
"github.com/dipdup-net/indexer-sdk/pkg/storage"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

type Transaction struct {
storage.Transaction
}
Expand Down Expand Up @@ -161,11 +159,10 @@ func (tx Transaction) SaveEvents(ctx context.Context, events ...models.Event) er
}

for i := range events {
var s *string
var s []byte
if len(events[i].Data) > 0 {
raw, err := json.MarshalToString(events[i].Data)
if err == nil {
s = &raw
if raw, err := msgpack.Marshal(events[i].Data); err == nil {
s = raw
}
}

Expand Down
3 changes: 0 additions & 3 deletions internal/storage/staking_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package storage

import (
"context"
"time"

"github.com/celenium-io/celestia-indexer/internal/storage/types"
Expand All @@ -17,8 +16,6 @@ import (
//go:generate mockgen -source=$GOFILE -destination=mock/$GOFILE -package=mock -typed
type IStakingLog interface {
storage.Table[*StakingLog]

ByValidator(ctx context.Context, validatorId uint64, limit, offset int) ([]StakingLog, error)
}

// Delegation -
Expand Down
53 changes: 53 additions & 0 deletions internal/storage/types/packed_bytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package types

import (
"bytes"
"database/sql"
"database/sql/driver"

"github.com/andybalholm/brotli"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

type PackedBytes map[string]any

var _ sql.Scanner = (*PackedBytes)(nil)

func (pb *PackedBytes) Scan(src interface{}) error {
if src == nil {
return nil
}
b, ok := src.([]byte)
if !ok {
return errors.Errorf("invalid packed bytes type: %T", src)
}

result := bytes.NewBuffer(b)
return json.NewDecoder(brotli.NewReader(result)).Decode(pb)
}

var _ driver.Valuer = (*PackedBytes)(nil)

func (pb PackedBytes) Value() (driver.Value, error) {
return pb.ToBytes()
}

func (pb PackedBytes) ToBytes() ([]byte, error) {
b, err := json.Marshal(pb)
if err != nil {
return nil, err
}
result := bytes.NewBuffer(nil)
writer := brotli.NewWriterLevel(result, brotli.BestSpeed)

if _, err := writer.Write(b); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return result.Bytes(), nil
}
7 changes: 6 additions & 1 deletion pkg/indexer/decode/handle/test/staking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,19 @@ func TestDecodeMsg_SuccessOnMsgCreateValidator(t *testing.T) {
},
}

data := structs.Map(m)
data["Pubkey"] = map[string]any{
"key": pk.PubKey().Bytes(),
"type": "ed25519",
}
msgExpected := storage.Message{
Id: 0,
Height: blob.Height,
Time: now,
Position: 0,
Type: storageTypes.MsgCreateValidator,
TxId: 0,
Data: structs.Map(m),
Data: data,
Size: 201,
Namespace: nil,
Addresses: addressesExpected,
Expand Down
Loading

0 comments on commit 2c5e233

Please sign in to comment.