Skip to content

Commit

Permalink
Merge branch 'dev' into yxq/add-commit-mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
KamiD authored Jun 1, 2023
2 parents a2167d7 + 2420a8b commit e6188c6
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 10 deletions.
25 changes: 24 additions & 1 deletion app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type OecConfig struct {
commitGapOffset int64
// enable mempool sim gu factor
enableMempoolSimGuFactor bool

maxSubscriptionClients int
}

const (
Expand Down Expand Up @@ -171,6 +173,7 @@ const (
FlagDebugGcInterval = "debug.gc-interval"
FlagCommitGapOffset = "commit-gap-offset"
FlagEnableMempoolSimGuFactor = "enable-mem-sim-gu-factor"
FlagMaxSubscriptionClients = "max-subscription-clients"
)

var (
Expand Down Expand Up @@ -324,6 +327,7 @@ func (c *OecConfig) loadFromConfig() {
c.SetGcInterval(viper.GetInt(FlagDebugGcInterval))
c.SetIavlAcNoBatch(viper.GetBool(tmiavl.FlagIavlCommitAsyncNoBatch))
c.SetEnableMempoolSimGuFactor(viper.GetBool(FlagEnableMempoolSimGuFactor))
c.SetMaxSubscriptionClients(viper.GetInt(FlagMaxSubscriptionClients))
}

func resolveNodeKeyWhitelist(plain string) []string {
Expand Down Expand Up @@ -397,7 +401,8 @@ func (c *OecConfig) format() string {
enable-analyzer: %v
iavl-commit-async-no-batch: %v
enable-mempool-sim-gu-factor: %v
active-view-change: %v`, system.ChainName,
active-view-change: %v
max_subscription_clients: %v`, system.ChainName,
c.GetMempoolRecheck(),
c.GetMempoolForceRecheckGap(),
c.GetMempoolSize(),
Expand Down Expand Up @@ -428,6 +433,7 @@ func (c *OecConfig) format() string {
c.GetIavlAcNoBatch(),
c.GetEnableMempoolSimGuFactor(),
c.GetActiveVC(),
c.GetMaxSubscriptionClients(),
)
}

Expand Down Expand Up @@ -700,6 +706,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) {
return
}
c.SetEnableMempoolSimGuFactor(r)
case FlagMaxSubscriptionClients:
r, err := strconv.Atoi(v)
if err != nil {
return
}
c.SetMaxSubscriptionClients(r)
}

}
Expand Down Expand Up @@ -1149,3 +1161,14 @@ func (c *OecConfig) SetEnableMempoolSimGuFactor(v bool) {
func (c *OecConfig) GetEnableMempoolSimGuFactor() bool {
return c.enableMempoolSimGuFactor
}

func (c *OecConfig) SetMaxSubscriptionClients(v int) {
if v < 0 {
v = 0
}
c.maxSubscriptionClients = v
}

func (c *OecConfig) GetMaxSubscriptionClients() int {
return c.maxSubscriptionClients
}
2 changes: 2 additions & 0 deletions app/rpc/tests/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/okex/exchain/app/config"
"github.com/okex/exchain/app/crypto/ethsecp256k1"
"github.com/okex/exchain/app/rpc/backend"
cosmos_context "github.com/okex/exchain/libs/cosmos-sdk/client/context"
Expand Down Expand Up @@ -149,6 +150,7 @@ func (suite *RPCTestSuite) SetupTest() {
viper.Set(flags.FlagKeyringBackend, "test")

viper.Set(rpc.FlagPersonalAPI, true)
viper.Set(config.FlagMaxSubscriptionClients, 100)

senderPv := suite.chain.SenderAccountPVBZ()
genesisAcc = suite.chain.SenderAccount().GetAddress()
Expand Down
4 changes: 4 additions & 0 deletions cmd/client/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,9 @@ func RegisterAppFlag(cmd *cobra.Command) {
cmd.Flags().Int(backend.FlagLogsLimit, 0, "Maximum number of logs returned when calling eth_getLogs")
cmd.Flags().Int(backend.FlagLogsTimeout, 60, "Maximum query duration when calling eth_getLogs")
cmd.Flags().Int(websockets.FlagSubscribeLimit, 15, "Maximum subscription on a websocket connection")

// flags for tendermint rpc
cmd.Flags().Int(config.FlagMaxSubscriptionClients, 100, "Maximum number of unique clientIDs that Tendermint RPC server can /subscribe or /broadcast_tx_commit")

wasm.AddModuleInitFlags(cmd)
}
21 changes: 17 additions & 4 deletions libs/tendermint/config/dynamic_config_okchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type IDynamicConfig interface {
GetDynamicGpMaxGasUsed() int64
GetGasLimitBuffer() uint64
GetEnableMempoolSimGuFactor() bool
GetMaxSubscriptionClients() int
}

var DynamicConfig IDynamicConfig = MockDynamicConfig{}
Expand All @@ -47,10 +48,11 @@ func SetDynamicConfig(c IDynamicConfig) {
}

type MockDynamicConfig struct {
enableDeleteMinGPTx bool
dynamicGpMode int
dynamicGpMaxTxNum int64
dynamicGpMaxGasUsed int64
enableDeleteMinGPTx bool
dynamicGpMode int
dynamicGpMaxTxNum int64
dynamicGpMaxGasUsed int64
maxSubscriptionClients int
}

func (d MockDynamicConfig) GetMempoolRecheck() bool {
Expand Down Expand Up @@ -210,3 +212,14 @@ func (d MockDynamicConfig) GetGasLimitBuffer() uint64 {
func (d MockDynamicConfig) GetEnableMempoolSimGuFactor() bool {
return false
}

func (d MockDynamicConfig) GetMaxSubscriptionClients() int {
return d.maxSubscriptionClients
}

func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) {
if value < 0 {
return
}
d.maxSubscriptionClients = value
}
9 changes: 9 additions & 0 deletions libs/tendermint/lite/proxy/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/okex/exchain/libs/tendermint/abci/example/kvstore"
cfg "github.com/okex/exchain/libs/tendermint/config"
"github.com/okex/exchain/libs/tendermint/crypto/merkle"
"github.com/okex/exchain/libs/tendermint/lite"
certclient "github.com/okex/exchain/libs/tendermint/lite/client"
Expand Down Expand Up @@ -125,6 +126,7 @@ func _TestAppProofs(t *testing.T) {
}

func TestTxProofs(t *testing.T) {
setMocConfig(100)
assert, require := assert.New(t), require.New(t)

cl := rpclocal.New(node)
Expand Down Expand Up @@ -162,3 +164,10 @@ func TestTxProofs(t *testing.T) {
require.Nil(err, "%#v", err)
require.Equal(res.Proof.RootHash, commit.Header.DataHash)
}

func setMocConfig(clientNum int) {
moc := cfg.MockDynamicConfig{}
moc.SetMaxSubscriptionClients(100)

cfg.SetDynamicConfig(moc)
}
12 changes: 12 additions & 0 deletions libs/tendermint/rpc/client/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

abci "github.com/okex/exchain/libs/tendermint/abci/types"
cfg "github.com/okex/exchain/libs/tendermint/config"
tmrand "github.com/okex/exchain/libs/tendermint/libs/rand"
"github.com/okex/exchain/libs/tendermint/rpc/client"
ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types"
Expand All @@ -27,6 +28,7 @@ func MakeTxKV() ([]byte, []byte, []byte) {
}

func TestHeaderEvents(t *testing.T) {
setMocConfig(100)
for i, c := range GetClients() {
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
Expand All @@ -49,6 +51,7 @@ func TestHeaderEvents(t *testing.T) {
}

func TestBlockEvents(t *testing.T) {
setMocConfig(100)
for i, c := range GetClients() {
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
Expand Down Expand Up @@ -97,6 +100,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "a
func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }

func testTxEventsSent(t *testing.T, broadcastMethod string) {
setMocConfig(100)
for i, c := range GetClients() {
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
Expand Down Expand Up @@ -150,6 +154,7 @@ func TestClientsResubscribe(t *testing.T) {
}

func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) {
setMocConfig(100)
c := getHTTPClient()

// on Subscribe
Expand All @@ -166,3 +171,10 @@ func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) {
err = c.UnsubscribeAll(context.Background(), "TestHeaderEvents")
assert.Error(t, err)
}

func setMocConfig(clientNum int) {
moc := cfg.MockDynamicConfig{}
moc.SetMaxSubscriptionClients(100)

cfg.SetDynamicConfig(moc)
}
5 changes: 3 additions & 2 deletions libs/tendermint/rpc/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/pkg/errors"

"github.com/okex/exchain/libs/tendermint/config"
tmpubsub "github.com/okex/exchain/libs/tendermint/libs/pubsub"
tmquery "github.com/okex/exchain/libs/tendermint/libs/pubsub/query"
ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types"
Expand All @@ -22,8 +23,8 @@ const (
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
addr := ctx.RemoteAddr()

if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
if env.EventBus.NumClients() >= config.DynamicConfig.GetMaxSubscriptionClients() {
return nil, fmt.Errorf("max_subscription_clients %d reached", config.DynamicConfig.GetMaxSubscriptionClients())
} else if env.EventBus.NumClientSubscriptions(addr) >= env.Config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient)
}
Expand Down
5 changes: 3 additions & 2 deletions libs/tendermint/rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"

abci "github.com/okex/exchain/libs/tendermint/abci/types"
"github.com/okex/exchain/libs/tendermint/config"
mempl "github.com/okex/exchain/libs/tendermint/mempool"
ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types"
rpctypes "github.com/okex/exchain/libs/tendermint/rpc/jsonrpc/types"
Expand Down Expand Up @@ -61,8 +62,8 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas
func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
subscriber := ctx.RemoteAddr()

if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
if env.EventBus.NumClients() >= config.DynamicConfig.GetMaxSubscriptionClients() {
return nil, fmt.Errorf("max_subscription_clients %d reached", config.DynamicConfig.GetMaxSubscriptionClients())
} else if env.EventBus.NumClientSubscriptions(subscriber) >= env.Config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient)
}
Expand Down
12 changes: 11 additions & 1 deletion libs/tendermint/rpc/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/okex/exchain/libs/tendermint/abci/example/kvstore"
cfg "github.com/okex/exchain/libs/tendermint/config"
core_grpc "github.com/okex/exchain/libs/tendermint/rpc/grpc"
rpctest "github.com/okex/exchain/libs/tendermint/rpc/test"
"github.com/stretchr/testify/require"
)

func TestMain(m *testing.M) {
Expand All @@ -24,6 +26,7 @@ func TestMain(m *testing.M) {
}

func TestBroadcastTx(t *testing.T) {
setMocConfig(100)
res, err := rpctest.GetGRPCClient().BroadcastTx(
context.Background(),
&core_grpc.RequestBroadcastTx{Tx: []byte("this is a tx")},
Expand All @@ -32,3 +35,10 @@ func TestBroadcastTx(t *testing.T) {
require.EqualValues(t, 0, res.CheckTx.Code)
require.EqualValues(t, 0, res.DeliverTx.Code)
}

func setMocConfig(clientNum int) {
moc := cfg.MockDynamicConfig{}
moc.SetMaxSubscriptionClients(100)

cfg.SetDynamicConfig(moc)
}

0 comments on commit e6188c6

Please sign in to comment.