Skip to content

Commit

Permalink
Merge pull request #85 from bro-n-bro/68-add-swaps-table
Browse files Browse the repository at this point in the history
68/70 add swaps and liquidity pool table
  • Loading branch information
malekvictor authored Jul 31, 2023
2 parents 691bfb7 + d788e50 commit f68b90e
Show file tree
Hide file tree
Showing 19 changed files with 8,750 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Application settings
START_TIMEOUT=20s # Start application timeout duration
STOP_TIMEOUT=20s # Stop application timeout duration
MODULES=bank,core,auth,authz,distribution,gov,mint,staking,slashing,feegrant,ibc # Modules for processing
MODULES=bank,core,auth,authz,distribution,gov,mint,staking,slashing,feegrant,ibc,liquidity # Modules for processing
CHAIN_PREFIX=cosmos # Prefix of indexing chain
PARSE_AVATAR_URL=false # Parse avatar url for validator from keychain. It will decrease index performance!!!

Expand Down
4 changes: 4 additions & 0 deletions client/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

liquiditytypes "github.com/bro-n-bro/spacebox-crawler/types/liquidity"
)

type Client struct {
Expand All @@ -34,6 +36,7 @@ type Client struct {
AuthzQueryClient authztypes.QueryClient
FeegrantQueryClient feegranttypes.QueryClient
IbcTransferQueryClient ibctransfertypes.QueryClient
LiquidityQueryClient liquiditytypes.QueryClient
conn *grpc.ClientConn
cfg Config
}
Expand Down Expand Up @@ -88,6 +91,7 @@ func (c *Client) Start(ctx context.Context) error {
c.AuthzQueryClient = authztypes.NewQueryClient(grpcConn)
c.FeegrantQueryClient = feegranttypes.NewQueryClient(grpcConn)
c.IbcTransferQueryClient = ibctransfertypes.NewQueryClient(grpcConn)
c.LiquidityQueryClient = liquiditytypes.NewQueryClient(grpcConn)

c.conn = grpcConn

Expand Down
2 changes: 2 additions & 0 deletions delivery/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ func (b *Broker) getCurrentTopics(modules []string) []string {
topics = append(topics, slashingTopics.ToStringSlice()...)
case "ibc":
topics = append(topics, ibcTopics.ToStringSlice()...)
case "liquidity":
topics = append(topics, liquidityTopics.ToStringSlice()...)
default:
b.log.Warn().Msgf("unknown module in config: %v", m)
continue
Expand Down
19 changes: 19 additions & 0 deletions delivery/broker/liquidity_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package broker

import (
"context"

jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"

"github.com/bro-n-bro/spacebox/broker/model"
)

func (b *Broker) PublishLiquidityPool(_ context.Context, v model.LiquidityPool) error {
data, err := jsoniter.Marshal(v)
if err != nil {
return errors.Wrap(err, MsgErrJSONMarshalFail)
}

return b.produce(LiquidityPool, data)
}
19 changes: 19 additions & 0 deletions delivery/broker/swap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package broker

import (
"context"

jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"

"github.com/bro-n-bro/spacebox/broker/model"
)

func (b *Broker) PublishSwap(ctx context.Context, swap model.Swap) error {
data, err := jsoniter.Marshal(swap)
if err != nil {
return errors.Wrap(err, MsgErrJSONMarshalFail)
}

return b.produce(Swap, data)
}
4 changes: 4 additions & 0 deletions delivery/broker/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
GrantMessage Topic = newTopic("grant_message")
GrantAllowanceMessage Topic = newTopic("grant_allowance_message")
HandleValidatorSignature Topic = newTopic("handle_validator_signature")
LiquidityPool Topic = newTopic("liquidity_pool")
Message Topic = newTopic("message")
MintParams Topic = newTopic("mint_params")
MultiSendMessage Topic = newTopic("multisend_message")
Expand All @@ -45,6 +46,7 @@ var (
StakingPool Topic = newTopic("staking_pool")
SubmitProposalMessage Topic = newTopic("submit_proposal_message")
Supply Topic = newTopic("supply")
Swap Topic = newTopic("swap")
Transaction Topic = newTopic("transaction")
TransferMessage Topic = newTopic("transfer_message")
UnbondingDelegation Topic = newTopic("unbonding_delegation")
Expand Down Expand Up @@ -85,6 +87,8 @@ var (
slashingTopics = Topics{UnjailMessage, HandleValidatorSignature}

ibcTopics = Topics{TransferMessage, AcknowledgementMessage, ReceivePacketMessage, DenomTrace}

liquidityTopics = Topics{Swap, LiquidityPool}
)

type (
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ replace (
)

require (
github.com/bro-n-bro/spacebox v0.1.3
github.com/bro-n-bro/spacebox v0.1.7
github.com/caarlos0/env/v6 v6.10.1
github.com/cometbft/cometbft v0.37.1
github.com/confluentinc/confluent-kafka-go v1.9.2
Expand All @@ -33,7 +33,9 @@ require (
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
golang.org/x/sync v0.1.0
golang.org/x/time v0.1.0
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4
google.golang.org/grpc v1.55.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand Down Expand Up @@ -180,11 +182,9 @@ require (
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.110.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.6 // indirect
pgregory.net/rapid v0.5.5 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d/go.mod h1:6QX/PXZ
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816 h1:41iFGWnSlI2gVpmOtVTJZNodLdLQLn/KsJqFvXwnd/s=
github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bro-n-bro/spacebox v0.1.3 h1:WcnqdgSdJCmYIxyq9qc9XeyFYXduF8SmWpizCBWPgJw=
github.com/bro-n-bro/spacebox v0.1.3/go.mod h1:HtLDKfstS5gaocXKym0e1LLygmxLFHgn4UxBbDzTyLg=
github.com/bro-n-bro/spacebox v0.1.7 h1:f5THMZMWZAKmMr02BSVchaHP0TaMvbTCfKTiAhU2mU4=
github.com/bro-n-bro/spacebox v0.1.7/go.mod h1:HtLDKfstS5gaocXKym0e1LLygmxLFHgn4UxBbDzTyLg=
github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U=
github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/btcutil v1.1.2 h1:XLMbX8JQEiwMcYft2EGi8zPUkoa0abKIU6/BJSRsjzQ=
Expand Down
4 changes: 4 additions & 0 deletions internal/rep/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,8 @@ type Broker interface {
PublishAcknowledgementMessage(context.Context, model.AcknowledgementMessage) error
PublishReceivePacketMessage(context.Context, model.RecvPacketMessage) error
PublishDenomTrace(context.Context, model.DenomTrace) error

// liquidity
PublishSwap(context.Context, model.Swap) error
PublishLiquidityPool(context.Context, model.LiquidityPool) error
}
12 changes: 12 additions & 0 deletions modules/liquidity/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package liquidity

import (
"context"

"github.com/bro-n-bro/spacebox/broker/model"
)

type broker interface {
PublishSwap(context.Context, model.Swap) error
PublishLiquidityPool(context.Context, model.LiquidityPool) error
}
148 changes: 148 additions & 0 deletions modules/liquidity/end_blocker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package liquidity

import (
"context"
"encoding/base64"
"strconv"

"github.com/pkg/errors"

"github.com/bro-n-bro/spacebox-crawler/modules/utils"
"github.com/bro-n-bro/spacebox-crawler/types"
"github.com/bro-n-bro/spacebox-crawler/types/liquidity"
"github.com/bro-n-bro/spacebox/broker/model"
)

//nolint:lll
var (
base64KeyPoolID = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValuePoolID))
base64KeyBatchIndex = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueBatchIndex))
base64KeyMsgIndex = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueMsgIndex))
base64KeySwapRequester = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueSwapRequester))
base64KeyOfferCoinDenom = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueOfferCoinDenom))
base64KeyOfferCoinAmount = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueOfferCoinAmount))
base64KeyExchangedOfferCoinAmount = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueExchangedOfferCoinAmount))
base64KeyDemandCoinDenom = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueDemandCoinDenom))
base64KeyOrderPrice = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueOrderPrice))
base64KeySwapPrice = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueSwapPrice))
base64KeyTransactedCoinAmount = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueTransactedCoinAmount))
base64KeyRemainingOfferCoinAmount = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueRemainingOfferCoinAmount))
base64KeyExchangedDemandCoinAmount = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueExchangedDemandCoinAmount))
base64KeyOfferCoinFeeAmount = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueOfferCoinFeeAmount))
base64KeyExchangedCoinFeeAmount = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueExchangedCoinFeeAmount))
base64KeyOrderExpiryHeight = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueOrderExpiryHeight))
base64KeySuccess = base64.StdEncoding.EncodeToString([]byte(liquidity.AttributeValueSuccess))
)

//nolint:gocyclo
func (m *Module) HandleEndBlocker(ctx context.Context, eventsMap types.BlockerEvents, height int64) error {
events, ok := eventsMap[liquidity.EventTypeSwapTransacted]
if !ok {
return nil
}

var err error
for _, event := range events {
if len(event.Attributes) < 16 {
m.log.Warn().Str("handler", "HandleEndBlocker").Msg("not enough attributes in event")
continue
}

var (
msgIndex, batchIndex, poolID uint32
swapRequester, offerCoinDenom, demandCoinDenom string
success bool
orderExpiryHeight int64
exchangedCoinFeeAmount, orderPrice, swapPrice, offerCoinAmount, exchangedDemandCoinAmount,
transactedCoinAmount, offerCoinFeeAmount, remainingOfferCoinAmount float64
)

for _, attr := range event.Attributes {
// try to decode value if needed
switch attr.Key {
case base64KeyPoolID, base64KeyBatchIndex, base64KeyMsgIndex, base64KeySwapRequester,
base64KeyOfferCoinDenom, base64KeyExchangedOfferCoinAmount, base64KeyDemandCoinDenom,
base64KeyOrderPrice, base64KeySwapPrice, base64KeyTransactedCoinAmount,
base64KeyRemainingOfferCoinAmount, base64KeyExchangedDemandCoinAmount,
base64KeyOfferCoinFeeAmount, base64KeyExchangedCoinFeeAmount, base64KeyOrderExpiryHeight,
base64KeySuccess, base64KeyOfferCoinAmount:

attr.Value, err = utils.DecodeToString(attr.Value)
if err != nil {
return err
}
}

switch attr.Key {
case liquidity.AttributeValuePoolID, base64KeyPoolID:
var id uint64
id, err = strconv.ParseUint(attr.Value, 10, 32)
poolID = uint32(id)
case liquidity.AttributeValueBatchIndex, base64KeyBatchIndex:
var index uint64
index, err = strconv.ParseUint(attr.Value, 10, 32)
batchIndex = uint32(index)
case liquidity.AttributeValueMsgIndex, base64KeyMsgIndex:
var index uint64
index, err = strconv.ParseUint(attr.Value, 10, 32)
msgIndex = uint32(index)
case liquidity.AttributeValueSwapRequester, base64KeySwapRequester:
swapRequester = attr.Value
// case liquidity.AttributeValueSwapTypeId:
case liquidity.AttributeValueOfferCoinDenom, base64KeyOfferCoinDenom:
offerCoinDenom = attr.Value
case liquidity.AttributeValueOfferCoinAmount, base64KeyOfferCoinAmount:
offerCoinAmount, err = strconv.ParseFloat(attr.Value, 64)
case liquidity.AttributeValueDemandCoinDenom, base64KeyDemandCoinDenom:
demandCoinDenom = attr.Value
case liquidity.AttributeValueOrderPrice, base64KeyOrderPrice:
orderPrice, err = strconv.ParseFloat(attr.Value, 64)
case liquidity.AttributeValueSwapPrice, base64KeySwapPrice:
swapPrice, err = strconv.ParseFloat(attr.Value, 64)
case liquidity.AttributeValueTransactedCoinAmount, base64KeyTransactedCoinAmount:
transactedCoinAmount, err = strconv.ParseFloat(attr.Value, 64)
case liquidity.AttributeValueRemainingOfferCoinAmount, base64KeyRemainingOfferCoinAmount:
remainingOfferCoinAmount, err = strconv.ParseFloat(attr.Value, 64)
case liquidity.AttributeValueExchangedDemandCoinAmount, base64KeyExchangedDemandCoinAmount:
exchangedDemandCoinAmount, err = strconv.ParseFloat(attr.Value, 64)
case liquidity.AttributeValueOfferCoinFeeAmount, base64KeyOfferCoinFeeAmount:
offerCoinFeeAmount, err = strconv.ParseFloat(attr.Value, 64)
case liquidity.AttributeValueExchangedCoinFeeAmount, base64KeyExchangedCoinFeeAmount:
exchangedCoinFeeAmount, err = strconv.ParseFloat(attr.Value, 64)
// case liquidity.AttributeValueReservedOfferCoinFeeAmount:
case liquidity.AttributeValueOrderExpiryHeight, base64KeyOrderExpiryHeight:
orderExpiryHeight, err = strconv.ParseInt(attr.Value, 10, 64)
case liquidity.AttributeValueSuccess, base64KeySuccess:
success = attr.Value == liquidity.Success
}

if err != nil {
return errors.Wrap(err, "liquidity: failed to parse event attributes")
}
}

if err = m.broker.PublishSwap(ctx, model.Swap{
Height: height,
MsgIndex: msgIndex,
BatchIndex: batchIndex,
PoolID: poolID,
SwapRequester: swapRequester,
OfferCoinDenom: offerCoinDenom,
OfferCoinAmount: offerCoinAmount,
DemandCoinDenom: demandCoinDenom,
ExchangedDemandCoinAmount: exchangedDemandCoinAmount,
TransactedCoinAmount: transactedCoinAmount,
RemainingOfferCoinAmount: remainingOfferCoinAmount,
OfferCoinFeeAmount: offerCoinFeeAmount,
OrderExpiryHeight: orderExpiryHeight,
ExchangedCoinFeeAmount: exchangedCoinFeeAmount,
OrderPrice: orderPrice,
SwapPrice: swapPrice,
Success: success,
}); err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit f68b90e

Please sign in to comment.