Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCC] Add performance improvements to locking and validation #350

Open
wants to merge 39 commits into
base: occ-main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6bc6c90
Add occ todos / comments (#317)
udpatil Sep 13, 2023
b66d23e
Multiversion Item Implementation and Tests (#318)
udpatil Sep 26, 2023
0048776
[occ] Add incarnation field (#321)
udpatil Sep 29, 2023
5d8941c
[occ] Implement basic multiversion store (#322)
udpatil Oct 6, 2023
dac5f7b
[occ] Add concurrency worker configuration (#324)
stevenlanders Oct 9, 2023
94bb98f
[occ] Occ multiversion store (#326)
udpatil Oct 10, 2023
5f89416
[occ] Add batch tx delivery interface (#327)
stevenlanders Oct 10, 2023
571d00a
[occ] MVKV store implementation and tests (#323)
udpatil Oct 10, 2023
9886602
[occ] Add validation function for transaction state to multiversionst…
udpatil Oct 13, 2023
293ac79
[occ] Add basic worker task and scheduler shell (#328)
stevenlanders Oct 17, 2023
dfb2260
[occ] Implement iterator for mvkv (#329)
udpatil Oct 17, 2023
663716a
fix dependency (#334)
udpatil Oct 17, 2023
b34d61c
[occ] Iterateset tracking and validation implementation (#337)
udpatil Oct 19, 2023
0aebbc9
[occ] Add scheduler logic for validation (#336)
stevenlanders Oct 19, 2023
096041b
[occ] Fix situation where no stores causes a panic (#338)
stevenlanders Oct 20, 2023
0b9193c
Add occ flag check to context (#340)
stevenlanders Oct 23, 2023
27484e4
[occ] Add struct field and helpers for estimate prefills (#341)
udpatil Oct 24, 2023
95ddc84
Fix map access panic (#343)
stevenlanders Oct 30, 2023
be4a4ae
Gen estimates writeset (#344)
udpatil Nov 3, 2023
931e2f6
[OCC] Add trace spans to scheduler (#347)
stevenlanders Nov 6, 2023
eac8657
[occ] Fix parent store readset validation (#348)
udpatil Nov 10, 2023
4111eeb
update lock to read lock on read
stevenlanders Nov 13, 2023
5bd9359
make validates parallel
stevenlanders Nov 13, 2023
09eaa06
add narrower lock
stevenlanders Nov 13, 2023
8818539
fix panic
stevenlanders Nov 14, 2023
1505649
make assertion dynamic
stevenlanders Nov 14, 2023
e321cf5
add more parallelization
stevenlanders Nov 14, 2023
b64efe1
add more parallelization
stevenlanders Nov 14, 2023
0d53d94
fix nested span
stevenlanders Nov 14, 2023
2752c9b
fix apphash
stevenlanders Nov 14, 2023
ce568cd
update spans
stevenlanders Nov 14, 2023
ac01db9
back the mapCacheBackend with a sync.Map
stevenlanders Nov 15, 2023
dc8e018
Revert "back the mapCacheBackend with a sync.Map"
stevenlanders Nov 15, 2023
ec87828
try early validate
stevenlanders Nov 15, 2023
9830c6c
add validation for previous tx dependency
stevenlanders Nov 15, 2023
2859512
fix span for validate
stevenlanders Nov 15, 2023
4205725
avoid execute race
stevenlanders Nov 15, 2023
e2a259a
cleanup
stevenlanders Nov 15, 2023
961ef16
fix data race
stevenlanders Nov 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
push:
branches:
- main
- occ-main # TODO: remove after occ work is done

permissions:
contents: read
Expand Down
24 changes: 23 additions & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/tasks"

"github.com/armon/go-metrics"
"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -234,11 +236,31 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
}, nil
}

// DeliverTxBatch executes multiple txs
func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) {
scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.TracingInfo, app.DeliverTx)
// This will basically no-op the actual prefill if the metadata for the txs is empty

// process all txs, this will also initializes the MVS if prefill estimates was disabled
txRes, err := scheduler.ProcessAll(ctx, req.TxEntries)
if err != nil {
// TODO: handle error
}

responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
for _, tx := range txRes {
responses = append(responses, &sdk.DeliverTxResult{Response: tx})
}
return sdk.DeliverTxBatchResponse{Results: responses}
}

// DeliverTx implements the ABCI interface and executes a tx in DeliverTx mode.
// State only gets persisted if all messages are valid and get executed successfully.
// Otherwise, the ResponseDeliverTx will contain releveant error information.
// Otherwise, the ResponseDeliverTx will contain relevant error information.
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
// TODO: (occ) this is the function called from sei-chain to perform execution of a transaction.
// We'd likely replace this with an execution tasks that is scheduled by the OCC scheduler
func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
defer func() {
Expand Down
25 changes: 24 additions & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/armon/go-metrics"
"github.com/cosmos/cosmos-sdk/server/config"
"github.com/cosmos/cosmos-sdk/utils/tracing"
"github.com/gogo/protobuf/proto"
sdbm "github.com/sei-protocol/sei-tm-db/backends"
Expand Down Expand Up @@ -60,7 +61,8 @@ const (
FlagArchivalArweaveIndexDBFullPath = "archival-arweave-index-db-full-path"
FlagArchivalArweaveNodeURL = "archival-arweave-node-url"

FlagChainID = "chain-id"
FlagChainID = "chain-id"
FlagConcurrencyWorkers = "concurrency-workers"
)

var (
Expand Down Expand Up @@ -168,6 +170,8 @@ type BaseApp struct { //nolint: maligned
TmConfig *tmcfg.Config

TracingInfo *tracing.Info

concurrencyWorkers int
}

type appStore struct {
Expand Down Expand Up @@ -294,6 +298,16 @@ func NewBaseApp(
app.cms.(*rootmulti.Store).SetOrphanConfig(app.orphanConfig)
}

// if no option overrode already, initialize to the flags value
// this avoids forcing every implementation to pass an option, but allows it
if app.concurrencyWorkers == 0 {
app.concurrencyWorkers = cast.ToInt(appOpts.Get(FlagConcurrencyWorkers))
}
// safely default this to the default value if 0
if app.concurrencyWorkers == 0 {
app.concurrencyWorkers = config.DefaultConcurrencyWorkers
}

return app
}

Expand All @@ -307,6 +321,11 @@ func (app *BaseApp) AppVersion() uint64 {
return app.appVersion
}

// ConcurrencyWorkers returns the number of concurrent workers for the BaseApp.
func (app *BaseApp) ConcurrencyWorkers() int {
return app.concurrencyWorkers
}

// Version returns the application's version string.
func (app *BaseApp) Version() string {
return app.version
Expand Down Expand Up @@ -821,6 +840,7 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
// TODO: (occ) This is an example of where we wrap the multistore with a cache multistore, and then return a modified context using that multistore
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, sdk.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
Expand Down Expand Up @@ -974,6 +994,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
storeAccessOpEvents := msCache.GetEvents()
accessOps := ctx.TxMsgAccessOps()[acltypes.ANTE_MSG_INDEX]

// TODO: (occ) This is an example of where we do our current validation. Note that this validation operates on the declared dependencies for a TX / antehandler + the utilized dependencies, whereas the validation
missingAccessOps := ctx.MsgValidator().ValidateAccessOperations(accessOps, storeAccessOpEvents)
if len(missingAccessOps) != 0 {
for op := range missingAccessOps {
Expand Down Expand Up @@ -1118,6 +1139,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, mode runTxMode) (*s
storeAccessOpEvents := msgMsCache.GetEvents()
accessOps := ctx.TxMsgAccessOps()[i]
missingAccessOps := ctx.MsgValidator().ValidateAccessOperations(accessOps, storeAccessOpEvents)
// TODO: (occ) This is where we are currently validating our per message dependencies,
// whereas validation will be done holistically based on the mvkv for OCC approach
if len(missingAccessOps) != 0 {
for op := range missingAccessOps {
ctx.Logger().Info((fmt.Sprintf("eventMsgName=%s Missing Access Operation:%s ", eventMsgName, op.String())))
Expand Down
145 changes: 145 additions & 0 deletions baseapp/deliver_tx_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package baseapp

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

func anteHandler(capKey sdk.StoreKey, storeKey []byte) sdk.AnteHandler {
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
store := ctx.KVStore(capKey)
txTest := tx.(txTest)

if txTest.FailOnAnte {
return ctx, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "ante handler failure")
}

val := getIntFromStore(store, storeKey)
setIntOnStore(store, storeKey, val+1)

ctx.EventManager().EmitEvents(
counterEvent("ante-val", val+1),
)

return ctx, nil
}
}

func handlerKVStore(capKey sdk.StoreKey) sdk.Handler {
return func(ctx sdk.Context, msg sdk.Msg) (*sdk.Result, error) {
ctx = ctx.WithEventManager(sdk.NewEventManager())
res := &sdk.Result{}

// Extract the unique ID from the message (assuming you have added this)
txIndex := ctx.TxIndex()

// Use the unique ID to get a specific key for this transaction
sharedKey := []byte(fmt.Sprintf("shared"))
txKey := []byte(fmt.Sprintf("tx-%d", txIndex))

// Similar steps as before: Get the store, retrieve a value, increment it, store back, emit an event
// Get the store
store := ctx.KVStore(capKey)

// increment per-tx key (no conflict)
val := getIntFromStore(store, txKey)
setIntOnStore(store, txKey, val+1)

// increment shared key
sharedVal := getIntFromStore(store, sharedKey)
setIntOnStore(store, sharedKey, sharedVal+1)

// Emit an event with the incremented value and the unique ID
ctx.EventManager().EmitEvent(
sdk.NewEvent(sdk.EventTypeMessage,
sdk.NewAttribute("shared-val", fmt.Sprintf("%d", sharedVal+1)),
sdk.NewAttribute("tx-val", fmt.Sprintf("%d", val+1)),
sdk.NewAttribute("tx-id", fmt.Sprintf("%d", txIndex)),
),
)

res.Events = ctx.EventManager().Events().ToABCIEvents()
return res, nil
}
}

func requireAttribute(t *testing.T, evts []abci.Event, name string, val string) {
for _, evt := range evts {
for _, att := range evt.Attributes {
if string(att.Key) == name {
require.Equal(t, val, string(att.Value))
return
}
}
}
require.Fail(t, fmt.Sprintf("attribute %s not found via value %s", name, val))
}

func TestDeliverTxBatch(t *testing.T) {
// test increments in the ante
anteKey := []byte("ante-key")

anteOpt := func(bapp *BaseApp) {
bapp.SetAnteHandler(anteHandler(capKey1, anteKey))
}

// test increments in the handler
routerOpt := func(bapp *BaseApp) {
r := sdk.NewRoute(routeMsgCounter, handlerKVStore(capKey1))
bapp.Router().AddRoute(r)
}

app := setupBaseApp(t, anteOpt, routerOpt)
app.InitChain(context.Background(), &abci.RequestInitChain{})

// Create same codec used in txDecoder
codec := codec.NewLegacyAmino()
registerTestCodec(codec)

nBlocks := 3
txPerHeight := 5

for blockN := 0; blockN < nBlocks; blockN++ {
header := tmproto.Header{Height: int64(blockN) + 1}
app.setDeliverState(header)
app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(sdk.NewInfiniteGasMeter())
app.BeginBlock(app.deliverState.ctx, abci.RequestBeginBlock{Header: header})

var requests []*sdk.DeliverTxEntry
for i := 0; i < txPerHeight; i++ {
counter := int64(blockN*txPerHeight + i)
tx := newTxCounter(counter, counter)

txBytes, err := codec.Marshal(tx)
require.NoError(t, err)
requests = append(requests, &sdk.DeliverTxEntry{
Request: abci.RequestDeliverTx{Tx: txBytes},
})
}

responses := app.DeliverTxBatch(app.deliverState.ctx, sdk.DeliverTxBatchRequest{TxEntries: requests})
require.Len(t, responses.Results, txPerHeight)

for idx, deliverTxRes := range responses.Results {
res := deliverTxRes.Response
require.Equal(t, abci.CodeTypeOK, res.Code)
requireAttribute(t, res.Events, "tx-id", fmt.Sprintf("%d", idx))
requireAttribute(t, res.Events, "tx-val", fmt.Sprintf("%d", blockN+1))
requireAttribute(t, res.Events, "shared-val", fmt.Sprintf("%d", blockN*txPerHeight+idx+1))
}

app.EndBlock(app.deliverState.ctx, abci.RequestEndBlock{})
require.Empty(t, app.deliverState.ctx.MultiStore().GetEvents())
app.SetDeliverStateToCommit()
app.Commit(context.Background())
}
}
11 changes: 11 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func SetSnapshotInterval(interval uint64) func(*BaseApp) {
return func(app *BaseApp) { app.SetSnapshotInterval(interval) }
}

func SetConcurrencyWorkers(workers int) func(*BaseApp) {
return func(app *BaseApp) { app.SetConcurrencyWorkers(workers) }
}

// SetSnapshotKeepRecent sets the recent snapshots to keep.
func SetSnapshotKeepRecent(keepRecent uint32) func(*BaseApp) {
return func(app *BaseApp) { app.SetSnapshotKeepRecent(keepRecent) }
Expand Down Expand Up @@ -295,6 +299,13 @@ func (app *BaseApp) SetSnapshotInterval(snapshotInterval uint64) {
app.snapshotInterval = snapshotInterval
}

func (app *BaseApp) SetConcurrencyWorkers(workers int) {
if app.sealed {
panic("SetConcurrencyWorkers() on sealed BaseApp")
}
app.concurrencyWorkers = workers
}

// SetSnapshotKeepRecent sets the number of recent snapshots to keep.
func (app *BaseApp) SetSnapshotKeepRecent(snapshotKeepRecent uint32) {
if app.sealed {
Expand Down
4 changes: 4 additions & 0 deletions proto/cosmos/accesscontrol/constants.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ enum ResourceType {
KV_DEX_SHORT_ORDER_COUNT = 92; // child of KV_DEX

KV_BANK_DEFERRED = 93; // child of KV
reserved 94;
KV_BANK_DEFERRED_MODULE_TX_INDEX = 95; // child of KV_BANK_DEFERRED

KV_DEX_MEM_CONTRACTS_TO_PROCESS = 96; // child of KV_DEX_MEM
KV_DEX_MEM_DOWNSTREAM_CONTRACTS = 97; // child of KV_DEX_MEM
}

enum WasmMessageSubtype {
Expand Down
9 changes: 9 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const (

// DefaultGRPCWebAddress defines the default address to bind the gRPC-web server to.
DefaultGRPCWebAddress = "0.0.0.0:9091"

// DefaultConcurrencyWorkers defines the default workers to use for concurrent transactions
DefaultConcurrencyWorkers = 10
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -88,6 +91,10 @@ type BaseConfig struct {
SeparateOrphanVersionsToKeep int64 `mapstructure:"separate-orphan-versions-to-keep"`
NumOrphanPerFile int `mapstructure:"num-orphan-per-file"`
OrphanDirectory string `mapstructure:"orphan-dir"`

// ConcurrencyWorkers defines the number of workers to use for concurrent
// transaction execution. A value of -1 means unlimited workers. Default value is 10.
ConcurrencyWorkers int `mapstructure:"concurrency-workers"`
}

// APIConfig defines the API listener configuration.
Expand Down Expand Up @@ -236,6 +243,7 @@ func DefaultConfig() *Config {
IAVLDisableFastNode: true,
CompactionInterval: 0,
NoVersioning: false,
ConcurrencyWorkers: DefaultConcurrencyWorkers,
},
Telemetry: telemetry.Config{
Enabled: false,
Expand Down Expand Up @@ -310,6 +318,7 @@ func GetConfig(v *viper.Viper) (Config, error) {
SeparateOrphanVersionsToKeep: v.GetInt64("separate-orphan-versions-to-keep"),
NumOrphanPerFile: v.GetInt("num-orphan-per-file"),
OrphanDirectory: v.GetString("orphan-dir"),
ConcurrencyWorkers: v.GetInt("concurrency-workers"),
},
Telemetry: telemetry.Config{
ServiceName: v.GetString("telemetry.service-name"),
Expand Down
5 changes: 5 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ func TestSetSnapshotDirectory(t *testing.T) {
cfg := DefaultConfig()
require.Equal(t, "", cfg.StateSync.SnapshotDirectory)
}

func TestSetConcurrencyWorkers(t *testing.T) {
cfg := DefaultConfig()
require.Equal(t, DefaultConcurrencyWorkers, cfg.ConcurrencyWorkers)
}
3 changes: 3 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ num-orphan-per-file = {{ .BaseConfig.NumOrphanPerFile }}
# if separate-orphan-storage is true, where to store orphan data
orphan-dir = "{{ .BaseConfig.OrphanDirectory }}"

# concurrency-workers defines how many workers to run for concurrent transaction execution
# concurrency-workers = {{ .BaseConfig.ConcurrencyWorkers }}

###############################################################################
### Telemetry Configuration ###
###############################################################################
Expand Down
8 changes: 8 additions & 0 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,11 @@ func (kv kvStore) ReverseSubspaceIterator(prefix []byte) sdk.Iterator {
func NewCommitMultiStore() sdk.CommitMultiStore {
return multiStore{kv: make(map[sdk.StoreKey]kvStore)}
}

func (ms multiStore) SetKVStores(handler func(key store.StoreKey, s sdk.KVStore) store.CacheWrap) store.MultiStore {
panic("not implemented")
}

func (ms multiStore) StoreKeys() []sdk.StoreKey {
panic("not implemented")
}
2 changes: 2 additions & 0 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
FlagSeparateOrphanVersionsToKeep = "separate-orphan-versions-to-keep"
FlagNumOrphanPerFile = "num-orphan-per-file"
FlagOrphanDirectory = "orphan-dir"
FlagConcurrencyWorkers = "concurrency-workers"

// state sync-related flags
FlagStateSyncSnapshotInterval = "state-sync.snapshot-interval"
Expand Down Expand Up @@ -252,6 +253,7 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
cmd.Flags().Int64(FlagSeparateOrphanVersionsToKeep, 2, "Number of versions to keep if storing orphans separately")
cmd.Flags().Int(FlagNumOrphanPerFile, 100000, "Number of orphans to store on each file if storing orphans separately")
cmd.Flags().String(FlagOrphanDirectory, path.Join(defaultNodeHome, "orphans"), "Directory to store orphan files if storing orphans separately")
cmd.Flags().Int(FlagConcurrencyWorkers, config.DefaultConcurrencyWorkers, "Number of workers to process concurrent transactions")

cmd.Flags().Bool(flagGRPCOnly, false, "Start the node in gRPC query only mode (no Tendermint process is started)")
cmd.Flags().Bool(flagGRPCEnable, true, "Define if the gRPC server should be enabled")
Expand Down
Loading