Skip to content

Commit

Permalink
Use Apollo config change listener
Browse files Browse the repository at this point in the history
  • Loading branch information
trunghai95 committed Jul 9, 2024
1 parent b85166a commit af4598c
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 27 deletions.
6 changes: 1 addition & 5 deletions config/apolloconfig/apollo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
)

var (
enabled = false
disableEntryDebugLog = false
defaultClient *agollo.Client
defaultClient *agollo.Client

textUnmarshalerType = reflect.TypeOf((*encoding.TextUnmarshaler)(nil)).Elem()
)
Expand All @@ -31,7 +29,6 @@ func GetClient() *agollo.Client {
// Init initializes the connection to the Apollo server
// This should not be called if Apollo config is disabled for the service
func Init(c Config) error {
enabled = true
SetLogger()
cfg := &agolloConfig.AppConfig{
AppID: c.AppID,
Expand All @@ -52,7 +49,6 @@ func Init(c Config) error {

client.AddChangeListener(GetDefaultListener())
defaultClient = client
disableEntryDebugLog = c.DisableEntryDebugLog
return nil
}

Expand Down
1 change: 0 additions & 1 deletion config/apolloconfig/apollo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func TestLoad(t *testing.T) {
},
}

enabled = true
getString = func(key string) (string, error) {
s, ok := resultMapping[key]
if !ok {
Expand Down
35 changes: 22 additions & 13 deletions config/apolloconfig/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ func (l *ConfigChangeListener) OnChange(event *storage.ChangeEvent) {
getLogger().Debugf("ConfigChangeListener#OnChange received: %v", toJson(event))

for key, change := range event.Changes {
// Only handle ADDED and MODIFIED type
if change.ChangeType == storage.DELETED {
continue
}
for _, handler := range l.changeHandlers[key] {
handler.handle(change, key)
}
Expand All @@ -63,9 +59,10 @@ func (l *ConfigChangeListener) RegisterHandler(key string, opts ...handlerOpt) {

// changeHandler contains the information for handling the config change for one config, in a specific context
type changeHandler struct {
obj any
callbackFn func(key string, change *storage.ConfigChange)
locker sync.Locker
obj any
beforeFn func(key string, change *storage.ConfigChange)
afterFn func(key string, change *storage.ConfigChange, obj any)
locker sync.Locker
}

func (h *changeHandler) handle(change *storage.ConfigChange, key string) {
Expand All @@ -74,15 +71,20 @@ func (h *changeHandler) handle(change *storage.ConfigChange, key string) {
defer h.locker.Unlock()
}

if h.obj != nil {
if h.beforeFn != nil {
h.beforeFn(key, change)
}

if h.obj != nil && change.ChangeType != storage.DELETED {
// Only update the object if change is ADDED or MODIFIED
err := decodeStringToObject(change.NewValue.(string), h.obj)
if err != nil {
getLogger().WithFields("key", key).Errorf("changeHandler#handle decodeStringToObject error: %v", err)
}
}

if h.callbackFn != nil {
h.callbackFn(key, change)
if h.afterFn != nil {
h.afterFn(key, change, h.obj)
}
}

Expand All @@ -97,10 +99,17 @@ func withConfigObj[T any](obj *T) handlerOpt {
}
}

// WithCallbackFn assigns a callback function that will be called when a config key is changed
func WithCallbackFn(callbackFn func(string, *storage.ConfigChange)) handlerOpt {
// WithBeforeFn assigns a function to be called before the config object is updated
func WithBeforeFn(beforeFn func(string, *storage.ConfigChange)) handlerOpt {
return func(handler *changeHandler) {
handler.beforeFn = beforeFn
}
}

// WithAfterFn assigns a function to be called after the config object is updated
func WithAfterFn(afterFn func(string, *storage.ConfigChange, any)) handlerOpt {
return func(handler *changeHandler) {
handler.callbackFn = callbackFn
handler.afterFn = afterFn
}
}

Expand Down
16 changes: 9 additions & 7 deletions config/apolloconfig/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func TestConfigChangeListener(t *testing.T) {
"sub": `{"C":0.55, "E": "e1"}`,
}

enabled = true
getString = func(key string) (string, error) {
s, ok := configMapping[key]
if !ok {
Expand All @@ -47,7 +46,10 @@ func TestConfigChangeListener(t *testing.T) {
}

cnt := make(map[string]int)
callback := func(key string, _ *storage.ConfigChange) {
before := func(key string, _ *storage.ConfigChange) {
cnt[key]++
}
after := func(key string, _ *storage.ConfigChange, _ any) {
cnt[key]++
}

Expand All @@ -58,11 +60,11 @@ func TestConfigChangeListener(t *testing.T) {

var stringField = s.A
mutex := &sync.Mutex{}
RegisterChangeHandler("stringField", &stringField, WithCallbackFn(callback), WithLocker(mutex))
RegisterChangeHandler("stringField", &s.A, WithCallbackFn(callback), WithLocker(mutex))
RegisterChangeHandler("sub", &s.B, WithCallbackFn(callback), WithLocker(mutex))
RegisterChangeHandler("e", &s.B.E, WithCallbackFn(callback), WithLocker(mutex))
RegisterChangeHandler("mp", &s.B.D, WithCallbackFn(callback), WithLocker(mutex))
RegisterChangeHandler("stringField", &stringField, WithAfterFn(after), WithLocker(mutex))
RegisterChangeHandler("stringField", &s.A, WithBeforeFn(before), WithLocker(mutex))
RegisterChangeHandler("sub", &s.B, WithAfterFn(after), WithLocker(mutex))
RegisterChangeHandler("e", &s.B.E, WithAfterFn(after), WithLocker(mutex))
RegisterChangeHandler("mp", &s.B.D, WithBeforeFn(before), WithLocker(mutex))

listener := GetDefaultListener()
listener.OnChange(&storage.ChangeEvent{
Expand Down
6 changes: 6 additions & 0 deletions server/iprestriction/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"net/http"
"net/url"

"github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/log"
"github.com/0xPolygonHermez/zkevm-bridge-service/nacos"
"github.com/apolloconfig/agollo/v4/storage"
)

type Client struct {
Expand Down Expand Up @@ -36,6 +38,10 @@ func InitClient(c Config) {
Timeout: c.Timeout.Duration,
},
}
apolloconfig.RegisterChangeHandler(
"IPRestriction",
&client.cfg,
apolloconfig.WithAfterFn(func(string, *storage.ConfigChange, any) { client.httpClient.Timeout = client.cfg.Timeout.Duration }))
}

func GetClient() *Client {
Expand Down
6 changes: 6 additions & 0 deletions server/tokenlogoinfo/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"strconv"
"strings"

"github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/models/tokenlogo"
"github.com/0xPolygonHermez/zkevm-bridge-service/nacos"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/apolloconfig/agollo/v4/storage"
)

const (
Expand Down Expand Up @@ -48,6 +50,10 @@ func InitClient(c Config) {
Timeout: c.Timeout.Duration,
},
}
apolloconfig.RegisterChangeHandler(
"TokenLogoServiceConfig",
&client.cfg,
apolloconfig.WithAfterFn(func(string, *storage.ConfigChange) { client.httpClient.Timeout = client.cfg.Timeout.Duration }))

Check failure on line 56 in server/tokenlogoinfo/Client.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, amd64)

cannot use func(string, *storage.ConfigChange) {…} (value of type func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange)) as func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange, any) value in argument to apolloconfig.WithAfterFn

Check failure on line 56 in server/tokenlogoinfo/Client.go

View workflow job for this annotation

GitHub Actions / lint

cannot use func(string, *storage.ConfigChange) {…} (value of type func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange)) as func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange, any) value in argument to apolloconfig.WithAfterFn (typecheck)

Check failure on line 56 in server/tokenlogoinfo/Client.go

View workflow job for this annotation

GitHub Actions / lint

cannot use func(string, *storage.ConfigChange) {…} (value of type func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange)) as func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange, any) value in argument to apolloconfig.WithAfterFn) (typecheck)

Check failure on line 56 in server/tokenlogoinfo/Client.go

View workflow job for this annotation

GitHub Actions / lint

cannot use func(string, *storage.ConfigChange) {…} (value of type func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange)) as func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange, any) value in argument to apolloconfig.WithAfterFn) (typecheck)

Check failure on line 56 in server/tokenlogoinfo/Client.go

View workflow job for this annotation

GitHub Actions / lint

cannot use func(string, *storage.ConfigChange) {…} (value of type func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange)) as func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange, any) value in argument to apolloconfig.WithAfterFn) (typecheck)

Check failure on line 56 in server/tokenlogoinfo/Client.go

View workflow job for this annotation

GitHub Actions / lint

cannot use func(string, *storage.ConfigChange) {…} (value of type func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange)) as func(string, *"github.com/apolloconfig/agollo/v4/storage".ConfigChange, any) value in argument to apolloconfig.WithAfterFn) (typecheck)
}

func (c *Client) GetTokenLogoInfos(tokenAddArr []*tokenlogo.QueryLogoParam) (map[string]tokenlogo.LogoInfo, error) {
Expand Down
29 changes: 28 additions & 1 deletion utils/innerchainIdmanager.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
package utils

import (
"sync"

"github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/log"
"github.com/apolloconfig/agollo/v4/storage"
)

var standardIdKeyMapper, innerIdKeyMapper map[uint64]uint64
var (
standardIdKeyMapper, innerIdKeyMapper map[uint64]uint64
chainIDMapperLock = &sync.RWMutex{}
)

func InnitOkInnerChainIdMapper(cfg businessconfig.Config) {
initOkInnerChainIdMapper(cfg)

apolloconfig.RegisterChangeHandler(
"BusinessConfig",
&businessconfig.Config{},
apolloconfig.WithAfterFn(func(_ string, _ *storage.ConfigChange, c any) {
initOkInnerChainIdMapper(*c.(*businessconfig.Config))
}))
}

func initOkInnerChainIdMapper(cfg businessconfig.Config) {
chainIDMapperLock.Lock()
defer chainIDMapperLock.Unlock()

standardIdKeyMapper = make(map[uint64]uint64, len(cfg.StandardChainIds))
innerIdKeyMapper = make(map[uint64]uint64, len(cfg.StandardChainIds))
if cfg.StandardChainIds == nil {
Expand All @@ -22,6 +43,9 @@ func InnitOkInnerChainIdMapper(cfg businessconfig.Config) {
}

func GetStandardChainIdByInnerId(innerChainId uint64) uint64 {
chainIDMapperLock.RLock()
defer chainIDMapperLock.RUnlock()

chainId, found := innerIdKeyMapper[innerChainId]
if !found {
return innerChainId
Expand All @@ -30,6 +54,9 @@ func GetStandardChainIdByInnerId(innerChainId uint64) uint64 {
}

func GetInnerChainIdByStandardId(chainId uint64) uint64 {
chainIDMapperLock.RLock()
defer chainIDMapperLock.RUnlock()

innerChainId, found := standardIdKeyMapper[chainId]
if !found {
return chainId
Expand Down
28 changes: 28 additions & 0 deletions utils/messagebridge/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package messagebridge

import (
"math/big"
"sync"

"github.com/0xPolygonHermez/zkevm-bridge-service/etherman"
"github.com/0xPolygonHermez/zkevm-bridge-service/utils"
Expand All @@ -18,19 +19,25 @@ const (
var (
emptyAddress = common.Address{}
processorMap = make(map[ProcessorType]*Processor)
mutexMap = make(map[ProcessorType]*sync.RWMutex)
)

// Processor hosts the processing functions for an LxLy bridge using the message bridge feature
// Each Processor object should be used for one type of bridged token only
// Current supported tokens: USDC, wstETH
type Processor struct {
pType ProcessorType
contractToTokenMapping map[common.Address]common.Address
// DecodeMetadata decodes the metadata of the message bridge, returns the actual destination address and bridged amount
DecodeMetadataFn func(metadata []byte) (common.Address, *big.Int)
}

// GetContractAddressList returns the list of contract addresses that need to be processed through this struct
func (u *Processor) GetContractAddressList() []common.Address {
mutex := getMutex(u.pType)
mutex.RLock()
defer mutex.RUnlock()

result := make([]common.Address, 0)
for addr := range u.contractToTokenMapping {
result = append(result, addr)
Expand All @@ -40,6 +47,10 @@ func (u *Processor) GetContractAddressList() []common.Address {

// GetTokenAddressList returns the list of original token addresses
func (u *Processor) GetTokenAddressList() []common.Address {
mutex := getMutex(u.pType)
mutex.RLock()
defer mutex.RUnlock()

result := make([]common.Address, 0)
for _, addr := range u.contractToTokenMapping {
result = append(result, addr)
Expand All @@ -49,6 +60,10 @@ func (u *Processor) GetTokenAddressList() []common.Address {

// CheckContractAddress returns true if the input address is in the contract address list of this bridge
func (u *Processor) CheckContractAddress(address common.Address) bool {
mutex := getMutex(u.pType)
mutex.RLock()
defer mutex.RUnlock()

if _, ok := u.contractToTokenMapping[address]; ok {
return true
}
Expand All @@ -57,6 +72,10 @@ func (u *Processor) CheckContractAddress(address common.Address) bool {

// GetTokenFromContract return the token address from the bridge contract address, for displaying
func (u *Processor) GetTokenFromContract(contractAddress common.Address) (common.Address, bool) {
mutex := getMutex(u.pType)
mutex.RLock()
defer mutex.RUnlock()

if token, ok := u.contractToTokenMapping[contractAddress]; ok {
return token, true
}
Expand Down Expand Up @@ -131,3 +150,12 @@ func ReplaceDepositInfo(deposit *etherman.Deposit, overwriteOrigNetworkID bool)
}
processor.ReplaceDepositInfo(deposit, overwriteOrigNetworkID)
}

func getMutex(tp ProcessorType) *sync.RWMutex {
mutex := mutexMap[tp]
if mutex == nil {
mutex = &sync.RWMutex{}
mutexMap[tp] = mutex
}
return mutex
}
22 changes: 22 additions & 0 deletions utils/messagebridge/usdclxly.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,33 @@ package messagebridge
import (
"math/big"

"github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/log"
"github.com/apolloconfig/agollo/v4/storage"
"github.com/ethereum/go-ethereum/common"
)

func InitUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses []common.Address) {
initUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses)
apolloconfig.RegisterChangeHandler(
"BusinessConfig",
&businessconfig.Config{},
apolloconfig.WithAfterFn(func(_ string, change *storage.ConfigChange, c any) {
cfg := c.(*businessconfig.Config)
if change.ChangeType == storage.DELETED || len(cfg.USDCContractAddresses) == 0 || len(cfg.USDCContractAddresses) == 0 {
delete(processorMap, USDC)
return
}
initUSDCLxLyProcessor(cfg.USDCContractAddresses, cfg.USDCTokenAddresses)
}))
}

func initUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses []common.Address) {
mutex := getMutex(USDC)
mutex.Lock()
defer mutex.Unlock()

log.Debugf("USDCLxLyMapping: contracts[%v] tokens[%v]", usdcContractAddresses, usdcTokenAddresses)
if len(usdcContractAddresses) != len(usdcTokenAddresses) {
log.Errorf("InitUSDCLxLyProcessor: contract addresses (%v) and token addresses (%v) have different length", len(usdcContractAddresses), len(usdcTokenAddresses))
Expand Down
22 changes: 22 additions & 0 deletions utils/messagebridge/wsteth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,33 @@ package messagebridge
import (
"math/big"

"github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/log"
"github.com/apolloconfig/agollo/v4/storage"
"github.com/ethereum/go-ethereum/common"
)

func InitWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses []common.Address) {
initWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses)
apolloconfig.RegisterChangeHandler(
"BusinessConfig",
&businessconfig.Config{},
apolloconfig.WithAfterFn(func(_ string, change *storage.ConfigChange, c any) {
cfg := c.(*businessconfig.Config)
if change.ChangeType == storage.DELETED || len(cfg.WstETHContractAddresses) == 0 || len(cfg.WstETHTokenAddresses) == 0 {
delete(processorMap, WstETH)
return
}
initUSDCLxLyProcessor(cfg.WstETHContractAddresses, cfg.WstETHTokenAddresses)
}))
}

func initWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses []common.Address) {
mutex := getMutex(WstETH)
mutex.Lock()
defer mutex.Unlock()

log.Debugf("WstETHMapping: contracts[%v] tokens[%v]", wstETHContractAddresses, wstETHTokenAddresses)
if len(wstETHContractAddresses) != len(wstETHTokenAddresses) {
log.Errorf("InitWstETHProcessor: contract addresses (%v) and token addresses (%v) have different length", len(wstETHContractAddresses), len(wstETHTokenAddresses))
Expand Down

0 comments on commit af4598c

Please sign in to comment.