Skip to content

Commit

Permalink
Merge PR: paralleled-tx handle race (#2389)
Browse files Browse the repository at this point in the history
* first

* delete mu in base

* check race

* use reusableGasMeter

Co-authored-by: Zhong Qiu <[email protected]>
Co-authored-by: cwbhhjl <[email protected]>
Co-authored-by: xiangjianmeng <[email protected]>
  • Loading branch information
4 people authored Aug 25, 2022
1 parent 94087fc commit 9ad6773
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 41 deletions.
7 changes: 7 additions & 0 deletions app/ante/GasLimitDecorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,16 @@ type GasLimitDecorator struct {
func (g GasLimitDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler) (sdk.Context, error) {
pinAnte(ctx.AnteTracer(), "GasLimitDecorator")

currentGasMeter := ctx.GasMeter() // avoid race
infGasMeter := sdk.GetReusableInfiniteGasMeter()
ctx.SetGasMeter(infGasMeter)
if tx.GetGas() > g.evm.GetParams(ctx).MaxGasLimitPerTx {
ctx.SetGasMeter(currentGasMeter)
sdk.ReturnInfiniteGasMeter(infGasMeter)
return ctx, sdkerrors.Wrapf(sdkerrors.ErrTxTooLarge, "too large gas limit, it must be less than %d", g.evm.GetParams(ctx).MaxGasLimitPerTx)
}

ctx.SetGasMeter(currentGasMeter)
sdk.ReturnInfiniteGasMeter(infGasMeter)
return next(ctx, tx, simulate)
}
12 changes: 11 additions & 1 deletion libs/cosmos-sdk/baseapp/baseapp_mode_deliverInAsync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package baseapp

import sdk "github.com/okex/exchain/libs/cosmos-sdk/types"
import (
sdk "github.com/okex/exchain/libs/cosmos-sdk/types"
)

func (m *modeHandlerDeliverInAsync) handleDeferRefund(info *runTxInfo) {
app := m.app
Expand Down Expand Up @@ -46,3 +48,11 @@ func (m *modeHandlerDeliverInAsync) handleRunMsg(info *runTxInfo) (err error) {

return
}

// ====================================================
// 2. handleGasConsumed
func (m *modeHandlerDeliverInAsync) handleGasConsumed(info *runTxInfo) (err error) {
m.app.parallelTxManage.blockGasMeterMu.Lock()
defer m.app.parallelTxManage.blockGasMeterMu.Unlock()
return m.modeHandlerBase.handleGasConsumed(info)
}
118 changes: 79 additions & 39 deletions libs/cosmos-sdk/baseapp/baseapp_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (app *BaseApp) calGroup() {
Union(tx.from, tx.to)
} else {
para.haveCosmosTxInBlock = true
app.parallelTxManage.txReps[index] = &executeResult{paraMsg: &sdk.ParaMsg{}}
app.parallelTxManage.txResultCollector.putResult(index, &executeResult{paraMsg: &sdk.ParaMsg{}})
}
}

Expand Down Expand Up @@ -204,7 +204,6 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx {
pm.workgroup.isReady = true
app.parallelTxManage.workgroup.Start()

txReps := pm.txReps
deliverTxs := make([]*abci.ResponseDeliverTx, pm.txSize)

asyncCb := func(execRes *executeResult) {
Expand All @@ -222,16 +221,16 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx {
if receiveTxIndex < txIndex {
return
}
txReps[receiveTxIndex] = execRes
pm.txResultCollector.putResult(receiveTxIndex, execRes)

if pm.workgroup.isFailed(pm.workgroup.runningStats(receiveTxIndex)) {
txReps[receiveTxIndex] = nil
pm.txResultCollector.putResult(receiveTxIndex, nil)
// reRun already failed tx
pm.workgroup.AddTask(receiveTxIndex)
} else {
if nextTx, ok := pm.nextTxInGroup[receiveTxIndex]; ok {
if !pm.workgroup.isRunning(nextTx) {
txReps[nextTx] = nil
pm.txResultCollector.putResult(nextTx, nil)
// run next tx in this group
pm.workgroup.AddTask(nextTx)
}
Expand All @@ -243,9 +242,11 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx {
return
}

for txReps[txIndex] != nil {
res := txReps[txIndex]

for true {
res := pm.txResultCollector.getTxResult(txIndex)
if res == nil {
break
}
if pm.newIsConflict(res) || overFlow(currentGas, res.resp.GasUsed, maxGas) {
rerunIdx++

Expand All @@ -254,26 +255,27 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx {
app.fixFeeCollector()
}
res = app.deliverTxWithCache(txIndex)
txReps[txIndex] = res
pm.txResultCollector.putResult(txIndex, res)

if nextTx, ok := app.parallelTxManage.nextTxInGroup[txIndex]; ok {
if !pm.workgroup.isRunning(nextTx) {
txReps[nextTx] = nil
pm.txResultCollector.putResult(nextTx, nil)
pm.workgroup.AddTask(nextTx)
}
}

}
if txReps[txIndex].paraMsg.AnteErr != nil {
if pm.txResultCollector.getTxResult(txIndex).paraMsg.AnteErr != nil {
res.ms = nil
}

txRs := res.resp
deliverTxs[txIndex] = &txRs

pm.blockGasMeterMu.Lock()
// Note : don't take care of the case of ErrorGasOverflow
app.deliverState.ctx.BlockGasMeter().ConsumeGas(sdk.Gas(res.resp.GasUsed), "unexpected error")

pm.blockGasMeterMu.Unlock()
// merge tx
pm.SetCurrentIndex(txIndex, res)

Expand All @@ -285,7 +287,7 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx {
signal <- 0
return
}
if txReps[txIndex] == nil && !pm.workgroup.isRunning(txIndex) {
if pm.txResultCollector.getTxResult(txIndex) == nil && !pm.workgroup.isRunning(txIndex) {
pm.workgroup.AddTask(txIndex)
}
}
Expand Down Expand Up @@ -331,12 +333,12 @@ func (app *BaseApp) endParallelTxs() [][]byte {
watchers := make([]sdk.IWatcher, app.parallelTxManage.txSize)
txs := make([]sdk.Tx, app.parallelTxManage.txSize)
for index := 0; index < app.parallelTxManage.txSize; index++ {
paraM := app.parallelTxManage.txReps[index].paraMsg
logIndex[index] = paraM.LogIndex
errs[index] = paraM.AnteErr
hasEnterEvmTx[index] = paraM.HasRunEvmTx
resp[index] = app.parallelTxManage.txReps[index].resp
watchers[index] = app.parallelTxManage.txReps[index].watcher
txRes := app.parallelTxManage.txResultCollector.getTxResult(index)
logIndex[index] = txRes.paraMsg.LogIndex
errs[index] = txRes.paraMsg.AnteErr
hasEnterEvmTx[index] = txRes.paraMsg.HasRunEvmTx
resp[index] = txRes.resp
watchers[index] = txRes.watcher
txs[index] = app.parallelTxManage.extraTxsInfo[index].stdTx
}
app.watcherCollector(watchers...)
Expand Down Expand Up @@ -552,13 +554,57 @@ func (c *conflictCheck) clear() {
}
}

type txResultCollector struct {
mu sync.RWMutex
txReps []*executeResult
}

func newExecResult() *txResultCollector {
return &txResultCollector{
mu: sync.RWMutex{},
txReps: make([]*executeResult, 0),
}
}

func (e *txResultCollector) clear() {
e.mu.Lock()
e.txReps = nil
e.mu.Unlock()
}

func (e *txResultCollector) init(txSize int) {
txRepsCap := cap(e.txReps)
if e.txReps == nil || txRepsCap < txSize {
e.txReps = make([]*executeResult, txSize)
} else if txRepsCap >= txSize {
e.txReps = e.txReps[0:txSize:txRepsCap]
// https://github.com/golang/go/issues/5373
for i := range e.txReps {
e.txReps[i] = nil
}
}
}

func (e *txResultCollector) putResult(index int, txResult *executeResult) {
e.mu.Lock()
e.txReps[index] = txResult
e.mu.Unlock()
}

func (e *txResultCollector) getTxResult(index int) *executeResult {
e.mu.RLock()
defer e.mu.RUnlock()
return e.txReps[index]
}

type parallelTxManager struct {
blockGasMeterMu sync.Mutex
haveCosmosTxInBlock bool
isAsyncDeliverTx bool
workgroup *asyncWorkGroup

extraTxsInfo []*extraDataForTx
txReps []*executeResult
extraTxsInfo []*extraDataForTx
txResultCollector *txResultCollector

groupList map[int][]int
nextTxInGroup map[int]int
Expand All @@ -580,16 +626,18 @@ type parallelTxManager struct {
func newParallelTxManager() *parallelTxManager {
isAsync := sm.DeliverTxsExecMode(viper.GetInt(sm.FlagDeliverTxsExecMode)) == sm.DeliverTxsExecModeParallel
return &parallelTxManager{
blockGasMeterMu: sync.Mutex{},
isAsyncDeliverTx: isAsync,
workgroup: newAsyncWorkGroup(),

groupList: make(map[int][]int),
nextTxInGroup: make(map[int]int),
preTxInGroup: make(map[int]int),

cc: newConflictCheck(),
currIndex: -1,
currTxFee: sdk.Coins{},
txResultCollector: newExecResult(),
cc: newConflictCheck(),
currIndex: -1,
currTxFee: sdk.Coins{},

blockMultiStores: newCacheMultiStoreList(),
chainMultiStores: newCacheMultiStoreList(),
Expand All @@ -615,7 +663,7 @@ func (f *parallelTxManager) addBlockCacheToChainCache() {
if shouldCleanChainCache(f.blockHeight) {
f.chainMultiStores.Clear()
} else {
jobChan := make(chan types.CacheMultiStore, f.blockMultiStores.stores.Len())
jobChan := make(chan types.CacheMultiStore, f.blockMultiStores.Len())
for index := 0; index < maxGoroutineNumberInParaTx; index++ {
go func(ch chan types.CacheMultiStore) {
for j := range ch {
Expand Down Expand Up @@ -664,7 +712,7 @@ func (f *parallelTxManager) clear() {
}

f.extraTxsInfo = nil
f.txReps = nil
f.txResultCollector.clear()

for key := range f.groupList {
delete(f.groupList, key)
Expand All @@ -683,16 +731,8 @@ func (f *parallelTxManager) clear() {

func (f *parallelTxManager) init() {
txSize := f.txSize
txRepsCap := cap(f.txReps)
if f.txReps == nil || txRepsCap < txSize {
f.txReps = make([]*executeResult, txSize)
} else if txRepsCap >= txSize {
f.txReps = f.txReps[0:txSize:txRepsCap]
// https://github.com/golang/go/issues/5373
for i := range f.txReps {
f.txReps[i] = nil
}
}

f.txResultCollector.init(txSize)

txsInfoCap := cap(f.extraTxsInfo)
if f.extraTxsInfo == nil || txsInfoCap < txSize {
Expand Down Expand Up @@ -724,7 +764,7 @@ func (f *parallelTxManager) getTxResult(index int) sdk.CacheMultiStore {
preIndexInGroup, ok := f.preTxInGroup[index]
if ok && preIndexInGroup > f.currIndex {
// get parent tx ms
preResp := f.txReps[preIndexInGroup]
preResp := f.txResultCollector.getTxResult(preIndexInGroup)

if preResp != nil && preResp.paraMsg.AnteErr == nil {
if preResp.ms == nil {
Expand All @@ -745,7 +785,7 @@ func (f *parallelTxManager) getTxResult(index int) sdk.CacheMultiStore {
// mark failed if running
f.workgroup.markFailed(f.workgroup.runningStats(next))
} else {
f.txReps[next] = nil
f.txResultCollector.putResult(next, nil)
}
}

Expand Down Expand Up @@ -775,5 +815,5 @@ func (f *parallelTxManager) SetCurrentIndex(txIndex int, res *executeResult) {
if res.paraMsg.AnteErr != nil {
return
}
f.currTxFee = f.currTxFee.Add(f.extraTxsInfo[txIndex].fee.Sub(f.txReps[txIndex].paraMsg.RefundFee)...)
f.currTxFee = f.currTxFee.Add(f.extraTxsInfo[txIndex].fee.Sub(f.txResultCollector.getTxResult(txIndex).paraMsg.RefundFee)...)
}
10 changes: 10 additions & 0 deletions libs/cosmos-sdk/baseapp/baseapp_synclist.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ func newCacheMultiStoreList() *cacheMultiStoreList {
}
}

func (c *cacheMultiStoreList) Len() int {
c.mtx.Lock()

defer c.mtx.Unlock()
return c.stores.Len()

}

func (c *cacheMultiStoreList) PushStores(stores map[int]types.CacheMultiStore) {
c.mtx.Lock()
for _, v := range stores {
Expand All @@ -32,9 +40,11 @@ func (c *cacheMultiStoreList) PushStore(store types.CacheMultiStore) {
}

func (c *cacheMultiStoreList) Range(cb func(c types.CacheMultiStore)) {
c.mtx.Lock()
for i := c.stores.Front(); i != nil; i = i.Next() {
cb(i.Value.(types.CacheMultiStore))
}
c.mtx.Unlock()
}

func (c *cacheMultiStoreList) GetStoreWithParent(parent types.CacheMultiStore) types.CacheMultiStore {
Expand Down
2 changes: 1 addition & 1 deletion x/evm/types/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func (c *Cache) UpdateBlockedContractMethod(bcl BlockedContractList, isCheckTx b
for i, _ := range bcl {
c.blockedContractMethodsCache[string(bcl[i].Address)] = bcl[i]
}
c.blockedMutex.Unlock()
c.needBlockedUpdate = false
c.blockedMutex.Unlock()
}

func SetEvmParamsNeedUpdate() {
Expand Down

0 comments on commit 9ad6773

Please sign in to comment.