From af4598c0673d4a29a9c2f1d4782ea4ebc9e7f62d Mon Sep 17 00:00:00 2001 From: trunghai95 Date: Tue, 9 Jul 2024 15:16:28 +0800 Subject: [PATCH] Use Apollo config change listener --- config/apolloconfig/apollo.go | 6 +---- config/apolloconfig/apollo_test.go | 1 - config/apolloconfig/listener.go | 35 +++++++++++++++++----------- config/apolloconfig/listener_test.go | 16 +++++++------ server/iprestriction/client.go | 6 +++++ server/tokenlogoinfo/Client.go | 6 +++++ utils/innerchainIdmanager.go | 29 ++++++++++++++++++++++- utils/messagebridge/processor.go | 28 ++++++++++++++++++++++ utils/messagebridge/usdclxly.go | 22 +++++++++++++++++ utils/messagebridge/wsteth.go | 22 +++++++++++++++++ 10 files changed, 144 insertions(+), 27 deletions(-) diff --git a/config/apolloconfig/apollo.go b/config/apolloconfig/apollo.go index 26ebd05c..4a79249a 100644 --- a/config/apolloconfig/apollo.go +++ b/config/apolloconfig/apollo.go @@ -13,9 +13,7 @@ import ( ) var ( - enabled = false - disableEntryDebugLog = false - defaultClient *agollo.Client + defaultClient *agollo.Client textUnmarshalerType = reflect.TypeOf((*encoding.TextUnmarshaler)(nil)).Elem() ) @@ -31,7 +29,6 @@ func GetClient() *agollo.Client { // Init initializes the connection to the Apollo server // This should not be called if Apollo config is disabled for the service func Init(c Config) error { - enabled = true SetLogger() cfg := &agolloConfig.AppConfig{ AppID: c.AppID, @@ -52,7 +49,6 @@ func Init(c Config) error { client.AddChangeListener(GetDefaultListener()) defaultClient = client - disableEntryDebugLog = c.DisableEntryDebugLog return nil } diff --git a/config/apolloconfig/apollo_test.go b/config/apolloconfig/apollo_test.go index e3c1303a..2498fbec 100644 --- a/config/apolloconfig/apollo_test.go +++ b/config/apolloconfig/apollo_test.go @@ -52,7 +52,6 @@ func TestLoad(t *testing.T) { }, } - enabled = true getString = func(key string) (string, error) { s, ok := resultMapping[key] if !ok { diff --git a/config/apolloconfig/listener.go b/config/apolloconfig/listener.go index ff091d42..f1925e64 100644 --- a/config/apolloconfig/listener.go +++ b/config/apolloconfig/listener.go @@ -37,10 +37,6 @@ func (l *ConfigChangeListener) OnChange(event *storage.ChangeEvent) { getLogger().Debugf("ConfigChangeListener#OnChange received: %v", toJson(event)) for key, change := range event.Changes { - // Only handle ADDED and MODIFIED type - if change.ChangeType == storage.DELETED { - continue - } for _, handler := range l.changeHandlers[key] { handler.handle(change, key) } @@ -63,9 +59,10 @@ func (l *ConfigChangeListener) RegisterHandler(key string, opts ...handlerOpt) { // changeHandler contains the information for handling the config change for one config, in a specific context type changeHandler struct { - obj any - callbackFn func(key string, change *storage.ConfigChange) - locker sync.Locker + obj any + beforeFn func(key string, change *storage.ConfigChange) + afterFn func(key string, change *storage.ConfigChange, obj any) + locker sync.Locker } func (h *changeHandler) handle(change *storage.ConfigChange, key string) { @@ -74,15 +71,20 @@ func (h *changeHandler) handle(change *storage.ConfigChange, key string) { defer h.locker.Unlock() } - if h.obj != nil { + if h.beforeFn != nil { + h.beforeFn(key, change) + } + + if h.obj != nil && change.ChangeType != storage.DELETED { + // Only update the object if change is ADDED or MODIFIED err := decodeStringToObject(change.NewValue.(string), h.obj) if err != nil { getLogger().WithFields("key", key).Errorf("changeHandler#handle decodeStringToObject error: %v", err) } } - if h.callbackFn != nil { - h.callbackFn(key, change) + if h.afterFn != nil { + h.afterFn(key, change, h.obj) } } @@ -97,10 +99,17 @@ func withConfigObj[T any](obj *T) handlerOpt { } } -// WithCallbackFn assigns a callback function that will be called when a config key is changed -func WithCallbackFn(callbackFn func(string, *storage.ConfigChange)) handlerOpt { +// WithBeforeFn assigns a function to be called before the config object is updated +func WithBeforeFn(beforeFn func(string, *storage.ConfigChange)) handlerOpt { + return func(handler *changeHandler) { + handler.beforeFn = beforeFn + } +} + +// WithAfterFn assigns a function to be called after the config object is updated +func WithAfterFn(afterFn func(string, *storage.ConfigChange, any)) handlerOpt { return func(handler *changeHandler) { - handler.callbackFn = callbackFn + handler.afterFn = afterFn } } diff --git a/config/apolloconfig/listener_test.go b/config/apolloconfig/listener_test.go index 970e11b4..5c774ef9 100644 --- a/config/apolloconfig/listener_test.go +++ b/config/apolloconfig/listener_test.go @@ -28,7 +28,6 @@ func TestConfigChangeListener(t *testing.T) { "sub": `{"C":0.55, "E": "e1"}`, } - enabled = true getString = func(key string) (string, error) { s, ok := configMapping[key] if !ok { @@ -47,7 +46,10 @@ func TestConfigChangeListener(t *testing.T) { } cnt := make(map[string]int) - callback := func(key string, _ *storage.ConfigChange) { + before := func(key string, _ *storage.ConfigChange) { + cnt[key]++ + } + after := func(key string, _ *storage.ConfigChange, _ any) { cnt[key]++ } @@ -58,11 +60,11 @@ func TestConfigChangeListener(t *testing.T) { var stringField = s.A mutex := &sync.Mutex{} - RegisterChangeHandler("stringField", &stringField, WithCallbackFn(callback), WithLocker(mutex)) - RegisterChangeHandler("stringField", &s.A, WithCallbackFn(callback), WithLocker(mutex)) - RegisterChangeHandler("sub", &s.B, WithCallbackFn(callback), WithLocker(mutex)) - RegisterChangeHandler("e", &s.B.E, WithCallbackFn(callback), WithLocker(mutex)) - RegisterChangeHandler("mp", &s.B.D, WithCallbackFn(callback), WithLocker(mutex)) + RegisterChangeHandler("stringField", &stringField, WithAfterFn(after), WithLocker(mutex)) + RegisterChangeHandler("stringField", &s.A, WithBeforeFn(before), WithLocker(mutex)) + RegisterChangeHandler("sub", &s.B, WithAfterFn(after), WithLocker(mutex)) + RegisterChangeHandler("e", &s.B.E, WithAfterFn(after), WithLocker(mutex)) + RegisterChangeHandler("mp", &s.B.D, WithBeforeFn(before), WithLocker(mutex)) listener := GetDefaultListener() listener.OnChange(&storage.ChangeEvent{ diff --git a/server/iprestriction/client.go b/server/iprestriction/client.go index aee82759..adcb87b5 100644 --- a/server/iprestriction/client.go +++ b/server/iprestriction/client.go @@ -6,8 +6,10 @@ import ( "net/http" "net/url" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" "github.com/0xPolygonHermez/zkevm-bridge-service/nacos" + "github.com/apolloconfig/agollo/v4/storage" ) type Client struct { @@ -36,6 +38,10 @@ func InitClient(c Config) { Timeout: c.Timeout.Duration, }, } + apolloconfig.RegisterChangeHandler( + "IPRestriction", + &client.cfg, + apolloconfig.WithAfterFn(func(string, *storage.ConfigChange, any) { client.httpClient.Timeout = client.cfg.Timeout.Duration })) } func GetClient() *Client { diff --git a/server/tokenlogoinfo/Client.go b/server/tokenlogoinfo/Client.go index 2b7e4abd..4ef3eaa8 100644 --- a/server/tokenlogoinfo/Client.go +++ b/server/tokenlogoinfo/Client.go @@ -10,9 +10,11 @@ import ( "strconv" "strings" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/models/tokenlogo" "github.com/0xPolygonHermez/zkevm-bridge-service/nacos" "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/apolloconfig/agollo/v4/storage" ) const ( @@ -48,6 +50,10 @@ func InitClient(c Config) { Timeout: c.Timeout.Duration, }, } + apolloconfig.RegisterChangeHandler( + "TokenLogoServiceConfig", + &client.cfg, + apolloconfig.WithAfterFn(func(string, *storage.ConfigChange) { client.httpClient.Timeout = client.cfg.Timeout.Duration })) } func (c *Client) GetTokenLogoInfos(tokenAddArr []*tokenlogo.QueryLogoParam) (map[string]tokenlogo.LogoInfo, error) { diff --git a/utils/innerchainIdmanager.go b/utils/innerchainIdmanager.go index ce18e2ad..b3296852 100644 --- a/utils/innerchainIdmanager.go +++ b/utils/innerchainIdmanager.go @@ -1,13 +1,34 @@ package utils import ( + "sync" + + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/apolloconfig/agollo/v4/storage" ) -var standardIdKeyMapper, innerIdKeyMapper map[uint64]uint64 +var ( + standardIdKeyMapper, innerIdKeyMapper map[uint64]uint64 + chainIDMapperLock = &sync.RWMutex{} +) func InnitOkInnerChainIdMapper(cfg businessconfig.Config) { + initOkInnerChainIdMapper(cfg) + + apolloconfig.RegisterChangeHandler( + "BusinessConfig", + &businessconfig.Config{}, + apolloconfig.WithAfterFn(func(_ string, _ *storage.ConfigChange, c any) { + initOkInnerChainIdMapper(*c.(*businessconfig.Config)) + })) +} + +func initOkInnerChainIdMapper(cfg businessconfig.Config) { + chainIDMapperLock.Lock() + defer chainIDMapperLock.Unlock() + standardIdKeyMapper = make(map[uint64]uint64, len(cfg.StandardChainIds)) innerIdKeyMapper = make(map[uint64]uint64, len(cfg.StandardChainIds)) if cfg.StandardChainIds == nil { @@ -22,6 +43,9 @@ func InnitOkInnerChainIdMapper(cfg businessconfig.Config) { } func GetStandardChainIdByInnerId(innerChainId uint64) uint64 { + chainIDMapperLock.RLock() + defer chainIDMapperLock.RUnlock() + chainId, found := innerIdKeyMapper[innerChainId] if !found { return innerChainId @@ -30,6 +54,9 @@ func GetStandardChainIdByInnerId(innerChainId uint64) uint64 { } func GetInnerChainIdByStandardId(chainId uint64) uint64 { + chainIDMapperLock.RLock() + defer chainIDMapperLock.RUnlock() + innerChainId, found := standardIdKeyMapper[chainId] if !found { return chainId diff --git a/utils/messagebridge/processor.go b/utils/messagebridge/processor.go index 237cdd67..9429467b 100644 --- a/utils/messagebridge/processor.go +++ b/utils/messagebridge/processor.go @@ -2,6 +2,7 @@ package messagebridge import ( "math/big" + "sync" "github.com/0xPolygonHermez/zkevm-bridge-service/etherman" "github.com/0xPolygonHermez/zkevm-bridge-service/utils" @@ -18,12 +19,14 @@ const ( var ( emptyAddress = common.Address{} processorMap = make(map[ProcessorType]*Processor) + mutexMap = make(map[ProcessorType]*sync.RWMutex) ) // Processor hosts the processing functions for an LxLy bridge using the message bridge feature // Each Processor object should be used for one type of bridged token only // Current supported tokens: USDC, wstETH type Processor struct { + pType ProcessorType contractToTokenMapping map[common.Address]common.Address // DecodeMetadata decodes the metadata of the message bridge, returns the actual destination address and bridged amount DecodeMetadataFn func(metadata []byte) (common.Address, *big.Int) @@ -31,6 +34,10 @@ type Processor struct { // GetContractAddressList returns the list of contract addresses that need to be processed through this struct func (u *Processor) GetContractAddressList() []common.Address { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + result := make([]common.Address, 0) for addr := range u.contractToTokenMapping { result = append(result, addr) @@ -40,6 +47,10 @@ func (u *Processor) GetContractAddressList() []common.Address { // GetTokenAddressList returns the list of original token addresses func (u *Processor) GetTokenAddressList() []common.Address { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + result := make([]common.Address, 0) for _, addr := range u.contractToTokenMapping { result = append(result, addr) @@ -49,6 +60,10 @@ func (u *Processor) GetTokenAddressList() []common.Address { // CheckContractAddress returns true if the input address is in the contract address list of this bridge func (u *Processor) CheckContractAddress(address common.Address) bool { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + if _, ok := u.contractToTokenMapping[address]; ok { return true } @@ -57,6 +72,10 @@ func (u *Processor) CheckContractAddress(address common.Address) bool { // GetTokenFromContract return the token address from the bridge contract address, for displaying func (u *Processor) GetTokenFromContract(contractAddress common.Address) (common.Address, bool) { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + if token, ok := u.contractToTokenMapping[contractAddress]; ok { return token, true } @@ -131,3 +150,12 @@ func ReplaceDepositInfo(deposit *etherman.Deposit, overwriteOrigNetworkID bool) } processor.ReplaceDepositInfo(deposit, overwriteOrigNetworkID) } + +func getMutex(tp ProcessorType) *sync.RWMutex { + mutex := mutexMap[tp] + if mutex == nil { + mutex = &sync.RWMutex{} + mutexMap[tp] = mutex + } + return mutex +} diff --git a/utils/messagebridge/usdclxly.go b/utils/messagebridge/usdclxly.go index 92c0ebf5..c1f86e0f 100644 --- a/utils/messagebridge/usdclxly.go +++ b/utils/messagebridge/usdclxly.go @@ -3,11 +3,33 @@ package messagebridge import ( "math/big" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/apolloconfig/agollo/v4/storage" "github.com/ethereum/go-ethereum/common" ) func InitUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses []common.Address) { + initUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses) + apolloconfig.RegisterChangeHandler( + "BusinessConfig", + &businessconfig.Config{}, + apolloconfig.WithAfterFn(func(_ string, change *storage.ConfigChange, c any) { + cfg := c.(*businessconfig.Config) + if change.ChangeType == storage.DELETED || len(cfg.USDCContractAddresses) == 0 || len(cfg.USDCContractAddresses) == 0 { + delete(processorMap, USDC) + return + } + initUSDCLxLyProcessor(cfg.USDCContractAddresses, cfg.USDCTokenAddresses) + })) +} + +func initUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses []common.Address) { + mutex := getMutex(USDC) + mutex.Lock() + defer mutex.Unlock() + log.Debugf("USDCLxLyMapping: contracts[%v] tokens[%v]", usdcContractAddresses, usdcTokenAddresses) if len(usdcContractAddresses) != len(usdcTokenAddresses) { log.Errorf("InitUSDCLxLyProcessor: contract addresses (%v) and token addresses (%v) have different length", len(usdcContractAddresses), len(usdcTokenAddresses)) diff --git a/utils/messagebridge/wsteth.go b/utils/messagebridge/wsteth.go index 7d8f27af..059ecbee 100644 --- a/utils/messagebridge/wsteth.go +++ b/utils/messagebridge/wsteth.go @@ -3,11 +3,33 @@ package messagebridge import ( "math/big" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/apolloconfig/agollo/v4/storage" "github.com/ethereum/go-ethereum/common" ) func InitWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses []common.Address) { + initWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses) + apolloconfig.RegisterChangeHandler( + "BusinessConfig", + &businessconfig.Config{}, + apolloconfig.WithAfterFn(func(_ string, change *storage.ConfigChange, c any) { + cfg := c.(*businessconfig.Config) + if change.ChangeType == storage.DELETED || len(cfg.WstETHContractAddresses) == 0 || len(cfg.WstETHTokenAddresses) == 0 { + delete(processorMap, WstETH) + return + } + initUSDCLxLyProcessor(cfg.WstETHContractAddresses, cfg.WstETHTokenAddresses) + })) +} + +func initWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses []common.Address) { + mutex := getMutex(WstETH) + mutex.Lock() + defer mutex.Unlock() + log.Debugf("WstETHMapping: contracts[%v] tokens[%v]", wstETHContractAddresses, wstETHTokenAddresses) if len(wstETHContractAddresses) != len(wstETHTokenAddresses) { log.Errorf("InitWstETHProcessor: contract addresses (%v) and token addresses (%v) have different length", len(wstETHContractAddresses), len(wstETHTokenAddresses))