Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocol/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export COMMIT=$(shell git rev-parse HEAD)
export GO111MODULE = on

# process build tags
build_tags = netgo
build_tags = netgo rocksdb

ifeq ($(LEDGER_ENABLED),true)
ifeq ($(OS),Windows_NT)
Expand Down
102 changes: 94 additions & 8 deletions protocol/app/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package app

import (
"sync"
"sync/atomic"
"time"

sending "github.com/dydxprotocol/v4-chain/protocol/x/sending/types"

errorsmod "cosmossdk.io/errors"
"cosmossdk.io/store/cachemulti"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -185,14 +186,21 @@ type lockingAnteHandler struct {
incrementSequence ante.IncrementSequenceDecorator
replayProtection customante.ReplayProtectionDecorator
sigVerification accountplusante.CircuitBreakerDecorator
consumeTxSizeGas ante.ConsumeTxSizeGasDecorator
consumeTxSizeGas sdk.AnteDecorator
deductFee ante.DeductFeeDecorator
clobRateLimit clobante.ClobRateLimitDecorator
clob clobante.ClobDecorator
marketUpdates customante.ValidateMarketUpdateDecorator
}

func (h *lockingAnteHandler) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
// Count Check/ReCheck occurrences for per-block visibility.
if ctx.IsReCheckTx() {
atomic.AddUint64(&recheckTxCounter, 1)
} else if ctx.IsCheckTx() {
atomic.AddUint64(&checkTxCounter, 1)
}

ctx = log.AddPersistentTagsToLogger(ctx,
log.Callback, lib.TxMode(ctx),
log.BlockHeight, ctx.BlockHeight()+1,
Expand All @@ -212,6 +220,8 @@ func (h *lockingAnteHandler) clobAnteHandle(ctx sdk.Context, tx sdk.Tx, simulate
newCtx sdk.Context,
err error,
) {
tStart := time.Now()
var lockAcquireMs, globalLockWaitMs, writeMs int64
// These ante decorators access state but only state that is mutated during `deliverTx`. The Cosmos SDK
// is responsible for linearizing the reads and writes during `deliverTx`.
if ctx, err = h.freeInfiniteGasDecorator.AnteHandle(ctx, tx, simulate, noOpAnteHandle); err != nil {
Expand Down Expand Up @@ -249,10 +259,22 @@ func (h *lockingAnteHandler) clobAnteHandle(ctx sdk.Context, tx sdk.Tx, simulate
return ctx, err
}

cacheMs = ctx.MultiStore().(cachemulti.Store).CacheMultiStoreWithLocking(map[storetypes.StoreKey][][]byte{
lms, ok := ctx.MultiStore().(interface {
CacheMultiStoreWithLocking(map[storetypes.StoreKey][][]byte) storetypes.CacheMultiStore
})
if !ok {
return ctx, errorsmod.Wrap(sdkerrors.ErrLogic, "MultiStore does not support CacheMultiStoreWithLocking")
}
t0 := time.Now()
cacheMs = lms.CacheMultiStoreWithLocking(map[storetypes.StoreKey][][]byte{
h.authStoreKey: signers,
})
defer cacheMs.(storetypes.LockingStore).Unlock()
lockAcquireMs = time.Since(t0).Milliseconds()
ls, ok := cacheMs.(storetypes.LockingStore)
if !ok {
return ctx, errorsmod.Wrap(sdkerrors.ErrLogic, "CacheMultiStore does not implement LockingStore")
}
defer ls.Unlock()
ctx = ctx.WithMultiStore(cacheMs)
}

Expand Down Expand Up @@ -288,8 +310,15 @@ func (h *lockingAnteHandler) clobAnteHandle(ctx sdk.Context, tx sdk.Tx, simulate
// We now acquire the global ante handler since the clob decorator is not thread safe and performs
// several reads and writes across many stores.
if !simulate && (ctx.IsCheckTx() || ctx.IsReCheckTx()) {
g0 := time.Now()
h.globalLock.Lock()
defer h.globalLock.Unlock()
g1 := time.Now()
globalLockWaitMs = g1.Sub(g0).Milliseconds()
defer func() {
gReleaseStart := time.Now()
h.globalLock.Unlock()
_ = gReleaseStart // release is effectively instantaneous; hold time approximated below.
}()
}

if ctx, err = h.clobRateLimit.AnteHandle(ctx, tx, simulate, noOpAnteHandle); err != nil {
Expand All @@ -302,7 +331,26 @@ func (h *lockingAnteHandler) clobAnteHandle(ctx sdk.Context, tx sdk.Tx, simulate
// During non-simulated `checkTx` we must write the store since we own branching and writing.
// During `deliverTx` and simulation the Cosmos SDK is responsible for branching and writing.
if err == nil && !simulate && (ctx.IsCheckTx() || ctx.IsReCheckTx()) {
w0 := time.Now()
cacheMs.Write()
writeMs = time.Since(w0).Milliseconds()
}

if ctx.IsReCheckTx() {
totalMs := time.Since(tStart).Milliseconds()
if totalMs >= recheckSlowThresholdMs || lockAcquireMs >= recheckSlowThresholdMs || writeMs >= recheckSlowThresholdMs || globalLockWaitMs >= recheckSlowThresholdMs {
log.InfoLog(ctx, "PIERRICK: ReCheckTx slow (clob)",
"total_ms", totalMs,
"lock_acquire_ms", lockAcquireMs,
"global_lock_wait_ms", globalLockWaitMs,
"write_ms", writeMs,
)
}
// Aggregate duration stats.
recordTxDuration(true, uint64(totalMs))
} else if ctx.IsCheckTx() {
// Only aggregate non-recheck durations explicitly.
recordTxDuration(false, uint64(time.Since(tStart).Milliseconds()))
}

return ctx, err
Expand Down Expand Up @@ -359,6 +407,8 @@ func (h *lockingAnteHandler) otherMsgAnteHandle(ctx sdk.Context, tx sdk.Tx, simu
newCtx sdk.Context,
err error,
) {
tStart := time.Now()
var lockAcquireMs, globalLockWaitMs, writeMs int64
// During `deliverTx` we hold an exclusive lock on `app.mtx` and have a context with a branched state store
// allowing us to not have to perform any further locking or state store branching.
//
Expand Down Expand Up @@ -407,14 +457,33 @@ func (h *lockingAnteHandler) otherMsgAnteHandle(ctx sdk.Context, tx sdk.Tx, simu
return ctx, err
}

cacheMs = ctx.MultiStore().(cachemulti.Store).CacheMultiStoreWithLocking(map[storetypes.StoreKey][][]byte{
lms, ok := ctx.MultiStore().(interface {
CacheMultiStoreWithLocking(map[storetypes.StoreKey][][]byte) storetypes.CacheMultiStore
})
if !ok {
return ctx, errorsmod.Wrap(sdkerrors.ErrLogic, "MultiStore does not support CacheMultiStoreWithLocking")
}
t0 := time.Now()
cacheMs = lms.CacheMultiStoreWithLocking(map[storetypes.StoreKey][][]byte{
h.authStoreKey: signers,
})
defer cacheMs.(storetypes.LockingStore).Unlock()
lockAcquireMs = time.Since(t0).Milliseconds()
ls, ok := cacheMs.(storetypes.LockingStore)
if !ok {
return ctx, errorsmod.Wrap(sdkerrors.ErrLogic, "CacheMultiStore does not implement LockingStore")
}
defer ls.Unlock()
ctx = ctx.WithMultiStore(cacheMs)

g0 := time.Now()
h.globalLock.Lock()
defer h.globalLock.Unlock()
g1 := time.Now()
globalLockWaitMs = g1.Sub(g0).Milliseconds()
defer func() {
gReleaseStart := time.Now()
h.globalLock.Unlock()
_ = gReleaseStart
}()
}

if ctx, err = h.consumeTxSizeGas.AnteHandle(ctx, tx, simulate, noOpAnteHandle); err != nil {
Expand Down Expand Up @@ -447,7 +516,24 @@ func (h *lockingAnteHandler) otherMsgAnteHandle(ctx sdk.Context, tx sdk.Tx, simu
// During non-simulated `checkTx` we must write the store since we own branching and writing.
// During `deliverTx` and simulation the Cosmos SDK is responsible for branching and writing.
if err == nil && !simulate && (ctx.IsCheckTx() || ctx.IsReCheckTx()) {
w0 := time.Now()
cacheMs.Write()
writeMs = time.Since(w0).Milliseconds()
}

if ctx.IsReCheckTx() {
totalMs := time.Since(tStart).Milliseconds()
if totalMs >= recheckSlowThresholdMs || lockAcquireMs >= recheckSlowThresholdMs || writeMs >= recheckSlowThresholdMs || globalLockWaitMs >= recheckSlowThresholdMs {
log.InfoLog(ctx, "PIERRICK: ReCheckTx slow (other)",
"total_ms", totalMs,
"lock_acquire_ms", lockAcquireMs,
"global_lock_wait_ms", globalLockWaitMs,
"write_ms", writeMs,
)
}
recordTxDuration(true, uint64(totalMs))
} else if ctx.IsCheckTx() {
recordTxDuration(false, uint64(time.Since(tStart).Milliseconds()))
}

return ctx, err
Expand Down
Loading