Skip to content

Commit

Permalink
parallel execution optimization for AnteErr tx & fix e2c smb bug (#3188)
Browse files Browse the repository at this point in the history
* fix smb bug caused by e2c before venus6

* fix bug

* optimize code

* Update baseapp.go

fix typo

---------

Co-authored-by: KamiD <[email protected]>
  • Loading branch information
LeoGuo621 and KamiD authored Jul 11, 2023
1 parent 21d245c commit 9483c69
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 70 deletions.
25 changes: 17 additions & 8 deletions app/app_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,17 @@ func getTxFeeHandler() sdk.GetTxFeeHandler {

// getTxFeeAndFromHandler get tx fee and from
func getTxFeeAndFromHandler(ek appante.EVMKeeper) sdk.GetTxFeeAndFromHandler {
return func(ctx sdk.Context, tx sdk.Tx) (fee sdk.Coins, isEvm bool, isE2C bool, from string, to string, err error, supportPara bool) {
return func(ctx sdk.Context, tx sdk.Tx) (fee sdk.Coins, isEvm bool, needUpdateTXCounter bool, from string, to string, err error, supportPara bool) {
if evmTx, ok := tx.(*evmtypes.MsgEthereumTx); ok {
isEvm = true
supportPara = true
if appante.IsE2CTx(ek, &ctx, evmTx) {
isE2C = true
if tmtypes.HigherThanVenus6(ctx.BlockHeight()) {
needUpdateTXCounter = true
}
// E2C will include cosmos Msg in the Payload.
// Sometimes, this Msg do not support parallel execution.
if !isParaSupportedE2CMsg(evmTx.Data.Payload) {
if !tmtypes.HigherThanVenus6(ctx.BlockHeight()) || !isParaSupportedE2CMsg(evmTx.Data.Payload) {
supportPara = false
}
}
Expand All @@ -143,11 +145,18 @@ func getTxFeeAndFromHandler(ek appante.EVMKeeper) sdk.GetTxFeeAndFromHandler {
}
} else if feeTx, ok := tx.(authante.FeeTx); ok {
fee = feeTx.GetFee()
if stdTx, ok := tx.(*auth.StdTx); ok && len(stdTx.Msgs) == 1 { // only support one message
if msg, ok := stdTx.Msgs[0].(interface{ CalFromAndToForPara() (string, string) }); ok {
from, to = msg.CalFromAndToForPara()
if tmtypes.HigherThanVenus6(ctx.BlockHeight()) {
supportPara = true
if tx.GetType() == sdk.StdTxType {
if tmtypes.HigherThanEarth(ctx.BlockHeight()) {
needUpdateTXCounter = true
}
txMsgs := tx.GetMsgs()
// only support one message
if len(txMsgs) == 1 {
if msg, ok := txMsgs[0].(interface{ CalFromAndToForPara() (string, string) }); ok {
from, to = msg.CalFromAndToForPara()
if tmtypes.HigherThanVenus6(ctx.BlockHeight()) {
supportPara = true
}
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion libs/cosmos-sdk/baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,10 +697,16 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context
ctx.SetGasMeter(sdk.NewInfiniteGasMeter())
}
if app.parallelTxManage.isAsyncDeliverTx && mode == runTxModeDeliverInAsync {
app.parallelTxManage.txByteMpCMIndexLock.RLock()
ctx.SetParaMsg(&sdk.ParaMsg{
HaveCosmosTxInBlock: app.parallelTxManage.haveCosmosTxInBlock,
// Concurrency security issues need to be considered here,
// and there is a small probability that NeedUpdateTXCounter() will be wrong
// due to concurrent reading and writing of pm.txIndexMpUpdateTXCounter (slice),
// but such tx will be rerun, so this case can be ignored.
NeedUpdateTXCounter: app.parallelTxManage.NeedUpdateTXCounter(),
CosmosIndexInBlock: app.parallelTxManage.txByteMpCosmosIndex[string(txBytes)],
})
app.parallelTxManage.txByteMpCMIndexLock.RUnlock()
ctx.SetTxBytes(txBytes)
ctx.ResetWatcher()
}
Expand Down
126 changes: 78 additions & 48 deletions libs/cosmos-sdk/baseapp/baseapp_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ var (
)

type extraDataForTx struct {
supportPara bool
fee sdk.Coins
isEvm bool
isE2C bool
from string
to string
stdTx sdk.Tx
decodeErr error
supportPara bool
fee sdk.Coins
isEvm bool
needUpdateTXCounter bool
from string
to string
stdTx sdk.Tx
decodeErr error
}

type txWithIndex struct {
Expand Down Expand Up @@ -71,15 +71,15 @@ func (app *BaseApp) getExtraDataByTxs(txs [][]byte) {
app.blockDataCache.SetTx(txBytes, tx)
}

coin, isEvm, isE2C, s, toAddr, _, supportPara := app.getTxFeeAndFromHandler(app.getContextForTx(runTxModeDeliver, txBytes), tx)
coin, isEvm, needUpdateTXCounter, s, toAddr, _, supportPara := app.getTxFeeAndFromHandler(app.getContextForTx(runTxModeDeliver, txBytes), tx)
para.extraTxsInfo[index] = &extraDataForTx{
supportPara: supportPara,
fee: coin,
isEvm: isEvm,
isE2C: isE2C,
from: s,
to: toAddr,
stdTx: tx,
supportPara: supportPara,
fee: coin,
isEvm: isEvm,
needUpdateTXCounter: needUpdateTXCounter,
from: s,
to: toAddr,
stdTx: tx,
}
wg.Done()
}
Expand Down Expand Up @@ -129,49 +129,45 @@ func Union(x string, yString string) {
// calGroup cal group by txs
func (app *BaseApp) calGroup() {

para := app.parallelTxManage
pm := app.parallelTxManage

rootAddr = make(map[string]string, 0)
para.cosmosTxIndexInBlock = 0
for index, tx := range para.extraTxsInfo {
pm.cosmosTxIndexInBlock = 0
for index, tx := range pm.extraTxsInfo {
if tx.supportPara { //evmTx & wasmTx
Union(tx.from, tx.to)
} else {
para.haveCosmosTxInBlock = true
app.parallelTxManage.putResult(index, &executeResult{paraMsg: &sdk.ParaMsg{}, msIsNil: true})
}

if (!tx.isEvm && tx.supportPara) || tx.isE2C {
// means wasm or e2c tx
para.haveCosmosTxInBlock = true
if tx.needUpdateTXCounter {
pm.txIndexMpUpdateTXCounter[index] = true
pm.txByteMpCosmosIndex[string(pm.txs[index])] = pm.cosmosTxIndexInBlock
pm.cosmosTxIndexInBlock++
}

if !tx.isEvm || tx.isE2C {
para.txByteMpCosmosIndex[string(para.txs[index])] = para.cosmosTxIndexInBlock
para.cosmosTxIndexInBlock++
}
}

addrToID := make(map[string]int, 0)

for index, txInfo := range para.extraTxsInfo {
for index, txInfo := range pm.extraTxsInfo {
if !txInfo.supportPara {
continue
}
rootAddr := Find(txInfo.from)
id, exist := addrToID[rootAddr]
if !exist {
id = len(para.groupList)
id = len(pm.groupList)
addrToID[rootAddr] = id

}
para.groupList[id] = append(para.groupList[id], index)
para.txIndexWithGroup[index] = id
pm.groupList[id] = append(pm.groupList[id], index)
pm.txIndexWithGroup[index] = id
}

groupSize := len(para.groupList)
groupSize := len(pm.groupList)
for groupIndex := 0; groupIndex < groupSize; groupIndex++ {
list := para.groupList[groupIndex]
list := pm.groupList[groupIndex]
for index := 0; index < len(list); index++ {
if index+1 <= len(list)-1 {
app.parallelTxManage.nextTxInGroup[list[index]] = list[index+1]
Expand Down Expand Up @@ -247,7 +243,7 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx {
break
}
isReRun := false
if pm.isConflict(res) || overFlow(currentGas, res.resp.GasUsed, maxGas) {
if pm.isConflict(res) || overFlow(currentGas, res.resp.GasUsed, maxGas) || pm.haveAnteErrTx {
rerunIdx++
isReRun = true
// conflict rerun tx
Expand All @@ -256,8 +252,10 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx {
}
res = app.deliverTxWithCache(pm.upComingTxIndex)
}

if res.paraMsg.AnteErr != nil {
res.msIsNil = true
pm.handleAnteErrTx(res.paraMsg.NeedUpdateTXCounter)
}

pm.deliverTxs[pm.upComingTxIndex] = &res.resp
Expand All @@ -268,7 +266,7 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx {
app.deliverState.ctx.BlockGasMeter().ConsumeGas(sdk.Gas(res.resp.GasUsed), "unexpected error")
pm.blockGasMeterMu.Unlock()

pm.SetCurrentIndex(pm.upComingTxIndex, res)
pm.SetCurrentIndexRes(pm.upComingTxIndex, res)

if !res.msIsNil {
pm.currTxFee = pm.currTxFee.Add(pm.extraTxsInfo[pm.upComingTxIndex].fee.Sub(pm.finalResult[pm.upComingTxIndex].paraMsg.RefundFee)...)
Expand Down Expand Up @@ -320,7 +318,7 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx {
ctx, _ := app.cacheTxContext(app.getContextForTx(runTxModeDeliver, []byte{}), []byte{})
ctx.SetMultiStore(app.parallelTxManage.cms)

if app.parallelTxManage.haveCosmosTxInBlock {
if app.parallelTxManage.NeedUpdateTXCounter() {
app.updateCosmosTxCount(ctx, app.parallelTxManage.cosmosTxIndexInBlock-1)
}

Expand Down Expand Up @@ -454,16 +452,19 @@ func newExecuteResult(r abci.ResponseDeliverTx, ms sdk.CacheMultiStore, counter
}

type parallelTxManager struct {
blockHeight int64
groupTasks []*groupTask
blockGasMeterMu sync.Mutex
haveCosmosTxInBlock bool
isAsyncDeliverTx bool
txs [][]byte
txSize int
alreadyEnd bool
cosmosTxIndexInBlock int
txByteMpCosmosIndex map[string]int
blockHeight int64
groupTasks []*groupTask
blockGasMeterMu sync.Mutex
isAsyncDeliverTx bool
txs [][]byte
txSize int
alreadyEnd bool

cosmosTxIndexInBlock int
txByteMpCMIndexLock sync.RWMutex
txByteMpCosmosIndex map[string]int
txIndexMpUpdateTXCounter []bool
haveAnteErrTx bool

resultCh chan int
resultCb func(data int)
Expand Down Expand Up @@ -634,6 +635,8 @@ func newParallelTxManager() *parallelTxManager {
txIndexWithGroup: make(map[int]int),
resultCh: make(chan int, maxTxResultInChan),

txByteMpCMIndexLock: sync.RWMutex{},

blockMpCache: newCacheRWSetList(),
chainMpCache: newCacheRWSetList(),
blockMultiStores: newCacheMultiStoreList(),
Expand Down Expand Up @@ -764,7 +767,6 @@ func (pm *parallelTxManager) init(txs [][]byte, blockHeight int64, deliverStateM
txSize := len(txs)
pm.blockHeight = blockHeight
pm.groupTasks = make([]*groupTask, 0)
pm.haveCosmosTxInBlock = false
pm.isAsyncDeliverTx = true
pm.txs = txs
pm.txSize = txSize
Expand All @@ -788,6 +790,8 @@ func (pm *parallelTxManager) init(txs [][]byte, blockHeight int64, deliverStateM
pm.txByteMpCosmosIndex = make(map[string]int, 0)
pm.nextTxInGroup = make(map[int]int)

pm.haveAnteErrTx = false
pm.txIndexMpUpdateTXCounter = make([]bool, txSize)
pm.extraTxsInfo = make([]*extraDataForTx, txSize)
pm.txReps = make([]*executeResult, txSize)
pm.finalResult = make([]*executeResult, txSize)
Expand Down Expand Up @@ -815,7 +819,7 @@ func (pm *parallelTxManager) getParentMsByTxIndex(txIndex int) (sdk.CacheMultiSt
return ms, useCurrent
}

func (pm *parallelTxManager) SetCurrentIndex(txIndex int, res *executeResult) {
func (pm *parallelTxManager) SetCurrentIndexRes(txIndex int, res *executeResult) {
if res.msIsNil {
return
}
Expand All @@ -837,3 +841,29 @@ func (pm *parallelTxManager) SetCurrentIndex(txIndex int, res *executeResult) {
}

}

func (pm *parallelTxManager) NeedUpdateTXCounter() bool {
res := false
for _, v := range pm.txIndexMpUpdateTXCounter {
res = res || v
}
return res
}

// When an AnteErr tx is encountered, this tx will be discarded,
// and the cosmosIndex of the remaining tx needs to be corrected.
func (pm *parallelTxManager) handleAnteErrTx(needUpdateTXCounter bool) {
pm.haveAnteErrTx = true
pm.txIndexMpUpdateTXCounter[pm.upComingTxIndex] = false

if needUpdateTXCounter {
pm.cosmosTxIndexInBlock--
pm.txByteMpCMIndexLock.Lock()
for index, tx := range pm.txs {
if _, ok := pm.txByteMpCosmosIndex[string(tx)]; ok && index > pm.upComingTxIndex {
pm.txByteMpCosmosIndex[string(tx)]--
}
}
pm.txByteMpCMIndexLock.Unlock()
}
}
11 changes: 6 additions & 5 deletions libs/cosmos-sdk/types/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package types

import (
"context"
"github.com/ethereum/go-ethereum/core/vm"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/vm"
"github.com/gogo/protobuf/proto"
"github.com/okex/exchain/libs/system/trace"
abci "github.com/okex/exchain/libs/tendermint/abci/types"
"github.com/okex/exchain/libs/tendermint/libs/log"

"github.com/okex/exchain/libs/cosmos-sdk/store/gaskv"
stypes "github.com/okex/exchain/libs/cosmos-sdk/store/types"
"github.com/okex/exchain/libs/system/trace"
abci "github.com/okex/exchain/libs/tendermint/abci/types"
"github.com/okex/exchain/libs/tendermint/libs/log"
)

/*
Expand Down Expand Up @@ -92,7 +92,8 @@ func (c *Context) IsDeliverWithSerial() bool {
}

func (c *Context) UseParamCache() bool {
return c.isDeliverWithSerial || (c.paraMsg != nil && !c.paraMsg.HaveCosmosTxInBlock) || c.checkTx
// NeedUpdateTXCounter of E2C tx also is true.
return c.isDeliverWithSerial || (c.paraMsg != nil && !c.paraMsg.NeedUpdateTXCounter) || c.checkTx
}

func (c *Context) IsCheckTx() bool { return c.checkTx }
Expand Down
2 changes: 1 addition & 1 deletion libs/cosmos-sdk/types/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewDB(name, dir string) (db dbm.DB, err error) {

type ParaMsg struct {
UseCurrentState bool
HaveCosmosTxInBlock bool
NeedUpdateTXCounter bool
AnteErr error
RefundFee Coins
LogIndex int
Expand Down
14 changes: 7 additions & 7 deletions x/wasm/keeper/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package keeper
import (
"encoding/binary"

types2 "github.com/okex/exchain/libs/tendermint/types"

sdk "github.com/okex/exchain/libs/cosmos-sdk/types"

tmtypes "github.com/okex/exchain/libs/tendermint/types"
"github.com/okex/exchain/x/wasm/types"
)

Expand All @@ -30,7 +28,7 @@ func NewCountTXDecorator(storeKey sdk.StoreKey) *CountTXDecorator {
// The ante handler passes the counter value via sdk.Context upstream. See `types.TXCounter(ctx)` to read the value.
// Simulations don't get a tx counter value assigned.
func (a CountTXDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler) (sdk.Context, error) {
if simulate || !types2.HigherThanEarth(ctx.BlockHeight()) {
if simulate || !tmtypes.HigherThanEarth(ctx.BlockHeight()) {
return next(ctx, tx, simulate)
}
currentGasmeter := ctx.GasMeter()
Expand Down Expand Up @@ -108,7 +106,9 @@ func (d LimitSimulationGasDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simu
}

func UpdateTxCount(ctx sdk.Context, storeKey sdk.StoreKey, txCount int) {
store := ctx.KVStore(storeKey)
currentHeight := ctx.BlockHeight()
store.Set(types.TXCounterPrefix, encodeHeightCounter(currentHeight, uint32(txCount+1)))
if tmtypes.HigherThanEarth(ctx.BlockHeight()) {
store := ctx.KVStore(storeKey)
currentHeight := ctx.BlockHeight()
store.Set(types.TXCounterPrefix, encodeHeightCounter(currentHeight, uint32(txCount+1)))
}
}

0 comments on commit 9483c69

Please sign in to comment.