diff --git a/delivery/broker/validator_status.go b/delivery/broker/validator_status.go index f975231..a0239df 100644 --- a/delivery/broker/validator_status.go +++ b/delivery/broker/validator_status.go @@ -10,7 +10,7 @@ import ( ) func (b *Broker) PublishValidatorStatus(ctx context.Context, status model.ValidatorStatus) error { - if !checkCache(status.ValidatorAddress, status.Height, b.cache.valStatus) { + if !checkCache(status.ConsensusAddress, status.Height, b.cache.valStatus) { return nil } diff --git a/internal/app/app.go b/internal/app/app.go index 334566a..9bc2dab 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -155,7 +155,7 @@ func (a *App) Start(ctx context.Context) error { } tallyCache.SetCompareFn(lessInt64) - modules := modules.BuildModules(b, a.log, grpcCli, *tb, cdc, a.cfg.Modules, parser, a.cfg.ParseAvatarURL, tallyCache) + modules := modules.BuildModules(b, a.log, grpcCli, *tb, cdc, a.cfg.Modules, parser, tallyCache) ts := ts.NewToStorage() w := worker.New(a.cfg.WorkerConfig, *a.log, b, rpcCli, grpcCli, modules, s, cdc, *tb, *ts) server := server.New(a.cfg.Server, s, *a.log) diff --git a/internal/app/config.go b/internal/app/config.go index b853ae5..6cdec12 100644 --- a/internal/app/config.go +++ b/internal/app/config.go @@ -12,16 +12,15 @@ import ( ) type Config struct { - ChainPrefix string `env:"CHAIN_PREFIX"` - LogLevel string `env:"LOG_LEVEL" envDefault:"info"` - Server server.Config - Modules []string `env:"MODULES" required:"true"` - GRPCConfig grpc.Config - RPCConfig rpc.Config - BrokerConfig broker.Config - StorageConfig storage.Config - WorkerConfig worker.Config - StartTimeout time.Duration `env:"START_TIMEOUT"` - StopTimeout time.Duration `env:"STOP_TIMEOUT"` - ParseAvatarURL bool `env:"PARSE_AVATAR_URL" envDefault:"false"` + ChainPrefix string `env:"CHAIN_PREFIX"` + LogLevel string `env:"LOG_LEVEL" envDefault:"info"` + Server server.Config + Modules []string `env:"MODULES" required:"true"` + GRPCConfig grpc.Config + RPCConfig rpc.Config + BrokerConfig broker.Config + StorageConfig storage.Config + WorkerConfig worker.Config + StartTimeout time.Duration `env:"START_TIMEOUT"` + StopTimeout time.Duration `env:"STOP_TIMEOUT"` } diff --git a/modules/distribution/begin_blocker.go b/modules/distribution/begin_blocker.go index 5ef4f17..9aa5ee3 100644 --- a/modules/distribution/begin_blocker.go +++ b/modules/distribution/begin_blocker.go @@ -58,9 +58,9 @@ func (m *Module) parseProposerRewardEvent(ctx context.Context, eventsMap types.B } var ( - coin model.Coin - validator string - err error + coin model.Coin + operatorAddress string + err error ) for _, event := range events { @@ -72,7 +72,7 @@ func (m *Module) parseProposerRewardEvent(ctx context.Context, eventsMap types.B continue } - validator, coin, err = m.parseAttributes(event) + operatorAddress, coin, err = m.parseAttributes(event) if err != nil { m.log.Error(). Err(err). @@ -83,9 +83,9 @@ func (m *Module) parseProposerRewardEvent(ctx context.Context, eventsMap types.B } if err = m.broker.PublishProposerReward(ctx, model.ProposerReward{ - Height: height, - Validator: validator, - Reward: coin, + Height: height, + OperatorAddress: operatorAddress, + Reward: coin, }); err != nil { m.log.Error(). Err(err). @@ -107,9 +107,9 @@ func (m *Module) parseCommissionEvent(ctx context.Context, eventsMap types.Block } var ( - validator string - coin model.Coin - err error + operatorAddress string + coin model.Coin + err error ) for _, event := range events { @@ -121,7 +121,7 @@ func (m *Module) parseCommissionEvent(ctx context.Context, eventsMap types.Block continue } - validator, coin, err = m.parseAttributes(event) + operatorAddress, coin, err = m.parseAttributes(event) if err != nil { m.log.Error(). Err(err). @@ -132,9 +132,9 @@ func (m *Module) parseCommissionEvent(ctx context.Context, eventsMap types.Block } if err = m.broker.PublishDistributionCommission(ctx, model.DistributionCommission{ - Height: height, - Validator: validator, - Amount: coin, + Height: height, + OperatorAddress: operatorAddress, + Amount: coin, }); err != nil { m.log.Error(). Err(err). @@ -156,9 +156,9 @@ func (m *Module) parseRewardsEvent(ctx context.Context, eventsMap types.BlockerE } var ( - validator string - coin model.Coin - err error + operatorAddress string + coin model.Coin + err error ) for _, event := range events { @@ -170,7 +170,7 @@ func (m *Module) parseRewardsEvent(ctx context.Context, eventsMap types.BlockerE continue } - validator, coin, err = m.parseAttributes(event) + operatorAddress, coin, err = m.parseAttributes(event) if err != nil { m.log.Error(). Err(err). @@ -181,9 +181,9 @@ func (m *Module) parseRewardsEvent(ctx context.Context, eventsMap types.BlockerE } if err = m.broker.PublishDistributionReward(ctx, model.DistributionReward{ - Height: height, - Validator: validator, - Amount: coin, + Height: height, + OperatorAddress: operatorAddress, + Amount: coin, }); err != nil { m.log.Error(). Err(err). diff --git a/modules/distribution/message.go b/modules/distribution/message.go index 2b9df04..fefc227 100644 --- a/modules/distribution/message.go +++ b/modules/distribution/message.go @@ -46,7 +46,7 @@ func (m *Module) HandleMessage(ctx context.Context, index int, cosmosMsg sdk.Msg Coins: m.tbM.MapCoins(coin), Height: tx.Height, DelegatorAddress: msg.DelegatorAddress, - ValidatorAddress: msg.ValidatorAddress, + OperatorAddress: msg.ValidatorAddress, TxHash: tx.TxHash, MsgIndex: int64(index), }) @@ -86,7 +86,7 @@ func (m *Module) HandleMessage(ctx context.Context, index int, cosmosMsg sdk.Msg TxHash: tx.TxHash, MsgIndex: int64(index), WithdrawCommission: m.tbM.MapCoins(coins), - ValidatorAddress: msg.ValidatorAddress, + OperatorAddress: msg.ValidatorAddress, }) } diff --git a/modules/module.go b/modules/module.go index cffb182..703e084 100644 --- a/modules/module.go +++ b/modules/module.go @@ -23,7 +23,7 @@ import ( ) func BuildModules(b rep.Broker, log *zerolog.Logger, cli *grpcClient.Client, tbMapper tb.ToBroker, - cdc codec.Codec, modules []string, addressesParser coreModule.MessageAddressesParser, parseAvatarURL bool, + cdc codec.Codec, modules []string, addressesParser coreModule.MessageAddressesParser, tallyCache govModule.TallyCache[uint64, int64]) []types.Module { res := make([]types.Module, 0) @@ -49,7 +49,7 @@ func BuildModules(b rep.Broker, log *zerolog.Logger, cli *grpcClient.Client, tbM res = append(res, mintModule.New(b, cli, tbMapper)) case "staking": log.Info().Msg("staking module registered") - res = append(res, stakingModule.New(b, cli, tbMapper, cdc, modules, parseAvatarURL)) + res = append(res, stakingModule.New(b, cli, tbMapper, cdc, modules)) case "distribution": log.Info().Msg("distribution module registered") res = append(res, distributionModule.New(b, cli, tbMapper, cdc)) diff --git a/modules/slashing/begin_blocker.go b/modules/slashing/begin_blocker.go index bf60373..cc96524 100644 --- a/modules/slashing/begin_blocker.go +++ b/modules/slashing/begin_blocker.go @@ -38,7 +38,7 @@ func (m *Module) handleSlashEvent(ctx context.Context, eventsMap types.BlockerEv return nil } - var address, power, reason, jailed string + var operatorAddress, power, reason, jailed string for _, e := range events { if len(e.Attributes) < 4 { m.log.Warn(). @@ -62,7 +62,7 @@ func (m *Module) handleSlashEvent(ctx context.Context, eventsMap types.BlockerEv switch attr.Key { case slashingtypes.AttributeKeyAddress, base64KeyAddress: // required - address = attr.Value + operatorAddress = attr.Value case slashingtypes.AttributeKeyPower, base64KeyPower: // required power = attr.Value case slashingtypes.AttributeKeyReason, base64KeyReason: // required @@ -111,12 +111,12 @@ func (m *Module) handleSlashEvent(ctx context.Context, eventsMap types.BlockerEv } if err := m.broker.PublishHandleValidatorSignature(ctx, model.HandleValidatorSignature{ - Address: address, - Power: power, - Reason: reason, - Jailed: jailed, - Burned: burned, - Height: height, + OperatorAddress: operatorAddress, + Power: power, + Reason: reason, + Jailed: jailed, + Burned: burned, + Height: height, }); err != nil { return err } diff --git a/modules/slashing/message.go b/modules/slashing/message.go index 3e78dc1..2ff5a32 100644 --- a/modules/slashing/message.go +++ b/modules/slashing/message.go @@ -25,9 +25,9 @@ func (m *Module) HandleMessage(ctx context.Context, index int, cosmosMsg sdk.Msg func (m *Module) handleMsgUnjail(ctx context.Context, tx *types.Tx, index int, msg *slashtypes.MsgUnjail) error { return m.broker.PublishUnjailMessage(ctx, model.UnjailMessage{ - Height: tx.Height, - Hash: tx.TxHash, - Index: int64(index), - ValidatorAddr: msg.ValidatorAddr, + Height: tx.Height, + Hash: tx.TxHash, + Index: int64(index), + OperatorAddress: msg.ValidatorAddr, }) } diff --git a/modules/staking/genesis.go b/modules/staking/genesis.go index 33a2399..0dd472e 100644 --- a/modules/staking/genesis.go +++ b/modules/staking/genesis.go @@ -59,7 +59,7 @@ func (m *Module) HandleGenesis( } // Publish the description - if err := m.publishValidatorDescriptions(ctx, genState.Validators, doc.InitialHeight, m.parseAvatarURL); err != nil { + if err := m.publishValidatorDescriptions(ctx, genState.Validators, doc.InitialHeight); err != nil { return fmt.Errorf("error while storing staking genesis validator descriptions: %w", err) } @@ -206,11 +206,11 @@ func (m *Module) publishUnbondingDelegations(ctx context.Context, doc *cometbftt coin = types.NewCoinFromCdk(sdk.NewCoin(genState.Params.BondDenom, entry.InitialBalance)) // TODO: test it if err := m.broker.PublishUnbondingDelegation(ctx, model.UnbondingDelegation{ - Height: doc.InitialHeight, - DelegatorAddress: ud.DelegatorAddress, - ValidatorAddress: validator.OperatorAddress, - Coin: m.tbM.MapCoin(coin), - CompletionTimestamp: entry.CompletionTime, + Height: doc.InitialHeight, + DelegatorAddress: ud.DelegatorAddress, + OperatorAddress: validator.OperatorAddress, + Coin: m.tbM.MapCoin(coin), + CompletionTime: entry.CompletionTime, }); err != nil { return err } diff --git a/modules/staking/message.go b/modules/staking/message.go index 4de67a3..20279b7 100644 --- a/modules/staking/message.go +++ b/modules/staking/message.go @@ -93,11 +93,6 @@ func (m *Module) handleMsgCreateValidator( return err } - var avatarURL string - if m.parseAvatarURL { - avatarURL = m.getAvatarURL(msg.ValidatorAddress, msg.Description.Identity, height) - } - if err = m.broker.PublishValidatorDescription(ctx, model.ValidatorDescription{ OperatorAddress: msg.ValidatorAddress, Moniker: msg.Description.Moniker, @@ -105,7 +100,6 @@ func (m *Module) handleMsgCreateValidator( Website: msg.Description.Website, SecurityContact: msg.Description.SecurityContact, Details: msg.Description.Details, - AvatarURL: avatarURL, Height: height, }); err != nil { return err @@ -217,11 +211,11 @@ func (m *Module) handleMsgUndelegate(ctx context.Context, tx *types.Tx, index in } // TODO: test it if err = m.broker.PublishUnbondingDelegation(ctx, model.UnbondingDelegation{ - Height: tx.Height, - DelegatorAddress: msg.DelegatorAddress, - ValidatorAddress: msg.ValidatorAddress, - Coin: m.tbM.MapCoin(types.NewCoinFromCdk(msg.Amount)), - CompletionTimestamp: completionTime, + Height: tx.Height, + DelegatorAddress: msg.DelegatorAddress, + OperatorAddress: msg.ValidatorAddress, + Coin: m.tbM.MapCoin(types.NewCoinFromCdk(msg.Amount)), + CompletionTime: completionTime, }); err != nil { return err } @@ -229,11 +223,11 @@ func (m *Module) handleMsgUndelegate(ctx context.Context, tx *types.Tx, index in // TODO: test it if err = m.broker.PublishUnbondingDelegationMessage(ctx, model.UnbondingDelegationMessage{ UnbondingDelegation: model.UnbondingDelegation{ - Height: tx.Height, - DelegatorAddress: msg.DelegatorAddress, - ValidatorAddress: msg.ValidatorAddress, - Coin: m.tbM.MapCoin(types.NewCoinFromCdk(msg.Amount)), - CompletionTimestamp: completionTime, + Height: tx.Height, + DelegatorAddress: msg.DelegatorAddress, + OperatorAddress: msg.ValidatorAddress, + Coin: m.tbM.MapCoin(types.NewCoinFromCdk(msg.Amount)), + CompletionTime: completionTime, }, TxHash: tx.TxHash, MsgIndex: int64(index), @@ -320,11 +314,6 @@ func (m *Module) handleEditValidator( return err } - var avatarURL string - if m.parseAvatarURL { - avatarURL = m.getAvatarURL(msg.ValidatorAddress, msg.Description.Identity, height) - } - if err := m.broker.PublishValidatorDescription(ctx, model.ValidatorDescription{ OperatorAddress: msg.ValidatorAddress, Moniker: msg.Description.Moniker, @@ -332,7 +321,6 @@ func (m *Module) handleEditValidator( Website: msg.Description.Website, SecurityContact: msg.Description.SecurityContact, Details: msg.Description.Details, - AvatarURL: avatarURL, Height: height, }); err != nil { return err diff --git a/modules/staking/module.go b/modules/staking/module.go index 09f5832..62d4183 100644 --- a/modules/staking/module.go +++ b/modules/staking/module.go @@ -2,7 +2,6 @@ package staking import ( "os" - "sync" "github.com/cosmos/cosmos-sdk/codec" "github.com/rs/zerolog" @@ -31,13 +30,11 @@ type ( broker broker tbM tb.ToBroker cdc codec.Codec - avatarURLCache sync.Map enabledModules []string // xxx fixme - parseAvatarURL bool } ) -func New(b broker, cli *grpcClient.Client, tbM tb.ToBroker, cdc codec.Codec, modules []string, parseAvatarURL bool) *Module { +func New(b broker, cli *grpcClient.Client, tbM tb.ToBroker, cdc codec.Codec, modules []string) *Module { l := zerolog.New(os.Stderr).Output(zerolog.ConsoleWriter{Out: os.Stderr}).With().Timestamp(). Str("module", moduleName).Logger() @@ -48,8 +45,6 @@ func New(b broker, cli *grpcClient.Client, tbM tb.ToBroker, cdc codec.Codec, mod tbM: tbM, cdc: cdc, enabledModules: modules, - avatarURLCache: sync.Map{}, - parseAvatarURL: parseAvatarURL, } } diff --git a/modules/staking/validators.go b/modules/staking/validators.go index 93c04a6..1f90cc7 100644 --- a/modules/staking/validators.go +++ b/modules/staking/validators.go @@ -9,7 +9,6 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" - "github.com/bro-n-bro/spacebox-crawler/pkg/keybase" "github.com/bro-n-bro/spacebox-crawler/types" "github.com/bro-n-bro/spacebox/broker/model" ) @@ -18,11 +17,6 @@ const ( defaultLimit = 150 ) -type avatarURLCache struct { - Identity string - AvatarURL string -} - // HandleValidators handles validators for each block height. func (m *Module) HandleValidators(ctx context.Context, tmValidators *cometbftcoretypes.ResultValidators) error { vals, validators, err := GetValidators(ctx, tmValidators.BlockHeight, m.client.StakingQueryClient, m.cdc) @@ -34,7 +28,7 @@ func (m *Module) HandleValidators(ctx context.Context, tmValidators *cometbftcor return err } - if err = m.publishValidatorDescriptions(ctx, vals, tmValidators.BlockHeight, m.parseAvatarURL); err != nil { + if err = m.publishValidatorDescriptions(ctx, vals, tmValidators.BlockHeight); err != nil { return err } @@ -46,7 +40,7 @@ func (m *Module) HandleValidators(ctx context.Context, tmValidators *cometbftcor if err = m.broker.PublishValidatorStatus(ctx, model.ValidatorStatus{ Height: tmValidators.BlockHeight, - ValidatorAddress: consAddr.String(), + ConsensusAddress: consAddr.String(), Status: int64(val.GetStatus()), Jailed: val.IsJailed(), }); err != nil { @@ -109,24 +103,10 @@ func (m *Module) PublishValidatorsData(ctx context.Context, sVals []types.Stakin } // asyncPublishValidatorDescriptions process validator descriptions and publish them to the broker. -func (m *Module) publishValidatorDescriptions( - ctx context.Context, - vals stakingtypes.Validators, - height int64, - parseAvatarURL bool, -) error { - +func (m *Module) publishValidatorDescriptions(ctx context.Context, vals stakingtypes.Validators, height int64) error { for _, val := range vals { - if parseAvatarURL { - go func(val stakingtypes.Validator) { - ctx = context.Background() - avatarURL := m.getAvatarURL(val.OperatorAddress, val.Description.Identity, height) - _ = m.publishValidatorDescription(ctx, val, avatarURL, height) - }(val) - } else { - if err := m.publishValidatorDescription(ctx, val, "", height); err != nil { - return err - } + if err := m.publishValidatorDescription(ctx, val, height); err != nil { + return err } } @@ -139,7 +119,6 @@ func (m *Module) publishValidatorDescriptions( func (m *Module) publishValidatorDescription( ctx context.Context, val stakingtypes.Validator, - avatarURL string, height int64, ) error { @@ -150,7 +129,6 @@ func (m *Module) publishValidatorDescription( Website: val.Description.Website, SecurityContact: val.Description.SecurityContact, Details: val.Description.Details, - AvatarURL: avatarURL, Height: height, }); err != nil { m.log.Error().Err(err). @@ -163,41 +141,3 @@ func (m *Module) publishValidatorDescription( return nil } - -func (m *Module) getAvatarURL(operatorAddress, identity string, height int64) string { - var ( - avatarURL string - cacheItem avatarURLCache - err error - ctx = context.Background() - ) - - cacheVal, ok := m.avatarURLCache.Load(operatorAddress) - if ok { - cacheItem, ok = cacheVal.(avatarURLCache) - } - - // not exists or value is not equal to the current one - if !ok || cacheItem.Identity != identity { - // get avatar url from the keybase API - avatarURL, err = keybase.GetAvatarURL(ctx, identity) - if err != nil { - m.log.Error(). - Err(err). - Str("operator_address", operatorAddress). - Str("identity", identity). - Int64("height", height). - Msg("failed to get avatar url") - } else { - // update the cache - m.avatarURLCache.Store(operatorAddress, avatarURLCache{ - Identity: identity, - AvatarURL: avatarURL, - }) - } - } else { // can get from cache - avatarURL = cacheItem.AvatarURL - } - - return avatarURL -} diff --git a/pkg/worker/config.go b/pkg/worker/config.go index ba4a5f1..1c44960 100644 --- a/pkg/worker/config.go +++ b/pkg/worker/config.go @@ -6,7 +6,7 @@ type Config struct { ProcessErrorBlocksInterval time.Duration `env:"PROCESS_ERROR_BLOCKS_INTERVAL" envDefault:"1m"` ProcessNewBlocks bool `env:"SUBSCRIBE_NEW_BLOCKS"` // FIXME: or use ws enabled??? ProcessErrorBlocks bool `env:"PROCESS_ERROR_BLOCKS" envDefault:"true"` - MetricsEnabled bool `env:"WORKER_METRICS_ENABLED" envDefault:"false"` + MetricsEnabled bool `env:"METRICS_ENABLED" envDefault:"false"` RecoveryMode bool `env:"RECOVERY_MODE" envDefault:"false"` ProcessGenesis bool `env:"PROCESS_GENESIS" envDefault:"true"` WorkersCount int `env:"WORKERS_COUNT" envDefault:"1"`