From 2420a8b5951f153835e56740caa1685eec8b9745 Mon Sep 17 00:00:00 2001 From: Leo <52782564+LeoGuo621@users.noreply.github.com> Date: Thu, 1 Jun 2023 09:44:05 +0800 Subject: [PATCH] dynamic config max_subscription_clients (#3144) * expand max_subscription_clients 5 times * add dynamic config * improve code * modify code * fix ut * fix ut --- app/config/config.go | 25 ++++++++++++++++++- app/rpc/tests/rpc_test.go | 2 ++ cmd/client/flags.go | 4 +++ .../config/dynamic_config_okchain.go | 21 +++++++++++++--- libs/tendermint/lite/proxy/query_test.go | 9 +++++++ libs/tendermint/rpc/client/event_test.go | 12 +++++++++ libs/tendermint/rpc/core/events.go | 5 ++-- libs/tendermint/rpc/core/mempool.go | 5 ++-- libs/tendermint/rpc/grpc/grpc_test.go | 12 ++++++++- 9 files changed, 85 insertions(+), 10 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index 838c97bfd3..99e1035704 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -124,6 +124,8 @@ type OecConfig struct { commitGapOffset int64 // enable mempool sim gu factor enableMempoolSimGuFactor bool + + maxSubscriptionClients int } const ( @@ -162,6 +164,7 @@ const ( FlagDebugGcInterval = "debug.gc-interval" FlagCommitGapOffset = "commit-gap-offset" FlagEnableMempoolSimGuFactor = "enable-mem-sim-gu-factor" + FlagMaxSubscriptionClients = "max-subscription-clients" ) var ( @@ -312,6 +315,7 @@ func (c *OecConfig) loadFromConfig() { c.SetGcInterval(viper.GetInt(FlagDebugGcInterval)) c.SetIavlAcNoBatch(viper.GetBool(tmiavl.FlagIavlCommitAsyncNoBatch)) c.SetEnableMempoolSimGuFactor(viper.GetBool(FlagEnableMempoolSimGuFactor)) + c.SetMaxSubscriptionClients(viper.GetInt(FlagMaxSubscriptionClients)) } func resolveNodeKeyWhitelist(plain string) []string { @@ -385,7 +389,8 @@ func (c *OecConfig) format() string { enable-analyzer: %v iavl-commit-async-no-batch: %v enable-mempool-sim-gu-factor: %v - active-view-change: %v`, system.ChainName, + active-view-change: %v + max_subscription_clients: %v`, system.ChainName, c.GetMempoolRecheck(), c.GetMempoolForceRecheckGap(), c.GetMempoolSize(), @@ -416,6 +421,7 @@ func (c *OecConfig) format() string { c.GetIavlAcNoBatch(), c.GetEnableMempoolSimGuFactor(), c.GetActiveVC(), + c.GetMaxSubscriptionClients(), ) } @@ -670,6 +676,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) { return } c.SetEnableMempoolSimGuFactor(r) + case FlagMaxSubscriptionClients: + r, err := strconv.Atoi(v) + if err != nil { + return + } + c.SetMaxSubscriptionClients(r) } } @@ -1095,3 +1107,14 @@ func (c *OecConfig) SetEnableMempoolSimGuFactor(v bool) { func (c *OecConfig) GetEnableMempoolSimGuFactor() bool { return c.enableMempoolSimGuFactor } + +func (c *OecConfig) SetMaxSubscriptionClients(v int) { + if v < 0 { + v = 0 + } + c.maxSubscriptionClients = v +} + +func (c *OecConfig) GetMaxSubscriptionClients() int { + return c.maxSubscriptionClients +} diff --git a/app/rpc/tests/rpc_test.go b/app/rpc/tests/rpc_test.go index 4028b2f6d5..16d6ecf08d 100644 --- a/app/rpc/tests/rpc_test.go +++ b/app/rpc/tests/rpc_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/okex/exchain/app/config" "github.com/okex/exchain/app/crypto/ethsecp256k1" "github.com/okex/exchain/app/rpc/backend" cosmos_context "github.com/okex/exchain/libs/cosmos-sdk/client/context" @@ -149,6 +150,7 @@ func (suite *RPCTestSuite) SetupTest() { viper.Set(flags.FlagKeyringBackend, "test") viper.Set(rpc.FlagPersonalAPI, true) + viper.Set(config.FlagMaxSubscriptionClients, 100) senderPv := suite.chain.SenderAccountPVBZ() genesisAcc = suite.chain.SenderAccount().GetAddress() diff --git a/cmd/client/flags.go b/cmd/client/flags.go index 7c892c466d..6f96b5bdeb 100644 --- a/cmd/client/flags.go +++ b/cmd/client/flags.go @@ -143,5 +143,9 @@ func RegisterAppFlag(cmd *cobra.Command) { cmd.Flags().Int(backend.FlagLogsLimit, 0, "Maximum number of logs returned when calling eth_getLogs") cmd.Flags().Int(backend.FlagLogsTimeout, 60, "Maximum query duration when calling eth_getLogs") cmd.Flags().Int(websockets.FlagSubscribeLimit, 15, "Maximum subscription on a websocket connection") + + // flags for tendermint rpc + cmd.Flags().Int(config.FlagMaxSubscriptionClients, 100, "Maximum number of unique clientIDs that Tendermint RPC server can /subscribe or /broadcast_tx_commit") + wasm.AddModuleInitFlags(cmd) } diff --git a/libs/tendermint/config/dynamic_config_okchain.go b/libs/tendermint/config/dynamic_config_okchain.go index 34843f27a4..d402c6e7ed 100644 --- a/libs/tendermint/config/dynamic_config_okchain.go +++ b/libs/tendermint/config/dynamic_config_okchain.go @@ -35,6 +35,7 @@ type IDynamicConfig interface { GetDynamicGpMaxGasUsed() int64 GetGasLimitBuffer() uint64 GetEnableMempoolSimGuFactor() bool + GetMaxSubscriptionClients() int } var DynamicConfig IDynamicConfig = MockDynamicConfig{} @@ -44,10 +45,11 @@ func SetDynamicConfig(c IDynamicConfig) { } type MockDynamicConfig struct { - enableDeleteMinGPTx bool - dynamicGpMode int - dynamicGpMaxTxNum int64 - dynamicGpMaxGasUsed int64 + enableDeleteMinGPTx bool + dynamicGpMode int + dynamicGpMaxTxNum int64 + dynamicGpMaxGasUsed int64 + maxSubscriptionClients int } func (d MockDynamicConfig) GetMempoolRecheck() bool { @@ -195,3 +197,14 @@ func (d MockDynamicConfig) GetGasLimitBuffer() uint64 { func (d MockDynamicConfig) GetEnableMempoolSimGuFactor() bool { return false } + +func (d MockDynamicConfig) GetMaxSubscriptionClients() int { + return d.maxSubscriptionClients +} + +func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) { + if value < 0 { + return + } + d.maxSubscriptionClients = value +} diff --git a/libs/tendermint/lite/proxy/query_test.go b/libs/tendermint/lite/proxy/query_test.go index 77de7e03c3..40b84ff1d4 100644 --- a/libs/tendermint/lite/proxy/query_test.go +++ b/libs/tendermint/lite/proxy/query_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/okex/exchain/libs/tendermint/abci/example/kvstore" + cfg "github.com/okex/exchain/libs/tendermint/config" "github.com/okex/exchain/libs/tendermint/crypto/merkle" "github.com/okex/exchain/libs/tendermint/lite" certclient "github.com/okex/exchain/libs/tendermint/lite/client" @@ -125,6 +126,7 @@ func _TestAppProofs(t *testing.T) { } func TestTxProofs(t *testing.T) { + setMocConfig(100) assert, require := assert.New(t), require.New(t) cl := rpclocal.New(node) @@ -162,3 +164,10 @@ func TestTxProofs(t *testing.T) { require.Nil(err, "%#v", err) require.Equal(res.Proof.RootHash, commit.Header.DataHash) } + +func setMocConfig(clientNum int) { + moc := cfg.MockDynamicConfig{} + moc.SetMaxSubscriptionClients(100) + + cfg.SetDynamicConfig(moc) +} diff --git a/libs/tendermint/rpc/client/event_test.go b/libs/tendermint/rpc/client/event_test.go index e36a106839..e628d88d87 100644 --- a/libs/tendermint/rpc/client/event_test.go +++ b/libs/tendermint/rpc/client/event_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/okex/exchain/libs/tendermint/abci/types" + cfg "github.com/okex/exchain/libs/tendermint/config" tmrand "github.com/okex/exchain/libs/tendermint/libs/rand" "github.com/okex/exchain/libs/tendermint/rpc/client" ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types" @@ -27,6 +28,7 @@ func MakeTxKV() ([]byte, []byte, []byte) { } func TestHeaderEvents(t *testing.T) { + setMocConfig(100) for i, c := range GetClients() { i, c := i, c // capture params t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { @@ -49,6 +51,7 @@ func TestHeaderEvents(t *testing.T) { } func TestBlockEvents(t *testing.T) { + setMocConfig(100) for i, c := range GetClients() { i, c := i, c // capture params t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { @@ -97,6 +100,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "a func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") } func testTxEventsSent(t *testing.T, broadcastMethod string) { + setMocConfig(100) for i, c := range GetClients() { i, c := i, c // capture params t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { @@ -150,6 +154,7 @@ func TestClientsResubscribe(t *testing.T) { } func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) { + setMocConfig(100) c := getHTTPClient() // on Subscribe @@ -166,3 +171,10 @@ func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) { err = c.UnsubscribeAll(context.Background(), "TestHeaderEvents") assert.Error(t, err) } + +func setMocConfig(clientNum int) { + moc := cfg.MockDynamicConfig{} + moc.SetMaxSubscriptionClients(100) + + cfg.SetDynamicConfig(moc) +} diff --git a/libs/tendermint/rpc/core/events.go b/libs/tendermint/rpc/core/events.go index f32da80ace..fdf97aa801 100644 --- a/libs/tendermint/rpc/core/events.go +++ b/libs/tendermint/rpc/core/events.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" + "github.com/okex/exchain/libs/tendermint/config" tmpubsub "github.com/okex/exchain/libs/tendermint/libs/pubsub" tmquery "github.com/okex/exchain/libs/tendermint/libs/pubsub/query" ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types" @@ -22,8 +23,8 @@ const ( func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { addr := ctx.RemoteAddr() - if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { - return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients) + if env.EventBus.NumClients() >= config.DynamicConfig.GetMaxSubscriptionClients() { + return nil, fmt.Errorf("max_subscription_clients %d reached", config.DynamicConfig.GetMaxSubscriptionClients()) } else if env.EventBus.NumClientSubscriptions(addr) >= env.Config.MaxSubscriptionsPerClient { return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient) } diff --git a/libs/tendermint/rpc/core/mempool.go b/libs/tendermint/rpc/core/mempool.go index e4a77c7819..cd26774c4b 100644 --- a/libs/tendermint/rpc/core/mempool.go +++ b/libs/tendermint/rpc/core/mempool.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" abci "github.com/okex/exchain/libs/tendermint/abci/types" + "github.com/okex/exchain/libs/tendermint/config" mempl "github.com/okex/exchain/libs/tendermint/mempool" ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types" rpctypes "github.com/okex/exchain/libs/tendermint/rpc/jsonrpc/types" @@ -61,8 +62,8 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { subscriber := ctx.RemoteAddr() - if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { - return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients) + if env.EventBus.NumClients() >= config.DynamicConfig.GetMaxSubscriptionClients() { + return nil, fmt.Errorf("max_subscription_clients %d reached", config.DynamicConfig.GetMaxSubscriptionClients()) } else if env.EventBus.NumClientSubscriptions(subscriber) >= env.Config.MaxSubscriptionsPerClient { return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient) } diff --git a/libs/tendermint/rpc/grpc/grpc_test.go b/libs/tendermint/rpc/grpc/grpc_test.go index ec5ae22dc6..6abecf0279 100644 --- a/libs/tendermint/rpc/grpc/grpc_test.go +++ b/libs/tendermint/rpc/grpc/grpc_test.go @@ -5,10 +5,12 @@ import ( "os" "testing" + "github.com/stretchr/testify/require" + "github.com/okex/exchain/libs/tendermint/abci/example/kvstore" + cfg "github.com/okex/exchain/libs/tendermint/config" core_grpc "github.com/okex/exchain/libs/tendermint/rpc/grpc" rpctest "github.com/okex/exchain/libs/tendermint/rpc/test" - "github.com/stretchr/testify/require" ) func TestMain(m *testing.M) { @@ -24,6 +26,7 @@ func TestMain(m *testing.M) { } func TestBroadcastTx(t *testing.T) { + setMocConfig(100) res, err := rpctest.GetGRPCClient().BroadcastTx( context.Background(), &core_grpc.RequestBroadcastTx{Tx: []byte("this is a tx")}, @@ -32,3 +35,10 @@ func TestBroadcastTx(t *testing.T) { require.EqualValues(t, 0, res.CheckTx.Code) require.EqualValues(t, 0, res.DeliverTx.Code) } + +func setMocConfig(clientNum int) { + moc := cfg.MockDynamicConfig{} + moc.SetMaxSubscriptionClients(100) + + cfg.SetDynamicConfig(moc) +}