Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Use Caching in Priority Nonce Mempool for Tx Look ups (backport #520) #523

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion block/base/lane.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func NewBaseLane(

lane.LaneMempool = NewMempool(
DefaultTxPriority(),
lane.cfg.TxEncoder,
lane.cfg.SignerExtractor,
lane.cfg.MaxTxs,
)
Expand Down
40 changes: 4 additions & 36 deletions block/base/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"

signer_extraction "github.com/skip-mev/block-sdk/adapters/signer_extraction_adapter"
"github.com/skip-mev/block-sdk/block/utils"
)

type (
Expand All @@ -20,7 +19,7 @@
// transactions.
Mempool[C comparable] struct {
// index defines an index of transactions.
index sdkmempool.Mempool
index MempoolInterface

// signerExtractor defines the signer extraction adapter that allows us to
// extract the signer from a transaction.
Expand All @@ -31,19 +30,11 @@
// of two transactions. The index utilizes this struct to order transactions
// in the mempool.
txPriority TxPriority[C]

// txEncoder defines the sdk.Tx encoder that allows us to encode transactions
// to bytes.
txEncoder sdk.TxEncoder

// txCache is a map of all transactions in the mempool. It is used
// to quickly check if a transaction is already in the mempool.
txCache map[string]struct{}
}
)

// NewMempool returns a new Mempool.
func NewMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder, extractor signer_extraction.Adapter, maxTx int) *Mempool[C] {
func NewMempool[C comparable](txPriority TxPriority[C], extractor signer_extraction.Adapter, maxTx int) *Mempool[C] {
return &Mempool[C]{
index: NewPriorityMempool(
PriorityNonceMempoolConfig[C]{
Expand All @@ -54,8 +45,6 @@
),
extractor: extractor,
txPriority: txPriority,
txEncoder: txEncoder,
txCache: make(map[string]struct{}),
}
}

Expand All @@ -67,17 +56,9 @@
// Insert inserts a transaction into the mempool.
func (cm *Mempool[C]) Insert(ctx context.Context, tx sdk.Tx) error {
if err := cm.index.Insert(ctx, tx); err != nil {
return fmt.Errorf("failed to insert tx into auction index: %w", err)
return fmt.Errorf("failed to insert tx into mempool: %w", err)

Check warning on line 59 in block/base/mempool.go

View check run for this annotation

Codecov / codecov/patch

block/base/mempool.go#L59

Added line #L59 was not covered by tests
}

hash, err := utils.GetTxHash(cm.txEncoder, tx)
if err != nil {
cm.Remove(tx)
return err
}

cm.txCache[hash] = struct{}{}

return nil
}

Expand All @@ -87,13 +68,6 @@
return fmt.Errorf("failed to remove transaction from the mempool: %w", err)
}

hash, err := utils.GetTxHash(cm.txEncoder, tx)
if err != nil {
return fmt.Errorf("failed to get tx hash string: %w", err)
}

delete(cm.txCache, hash)

return nil
}

Expand All @@ -112,13 +86,7 @@

// Contains returns true if the transaction is contained in the mempool.
func (cm *Mempool[C]) Contains(tx sdk.Tx) bool {
hash, err := utils.GetTxHash(cm.txEncoder, tx)
if err != nil {
return false
}

_, ok := cm.txCache[hash]
return ok
return cm.index.Contains(tx)

Check warning on line 89 in block/base/mempool.go

View check run for this annotation

Codecov / codecov/patch

block/base/mempool.go#L89

Added line #L89 was not covered by tests
}

// Compare determines the relative priority of two transactions belonging in the same lane.
Expand Down
39 changes: 37 additions & 2 deletions block/base/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,49 @@ type txGen struct {
amount sdk.Coin
}

var (
numAccounts = 10
numTxsPerAcct = 100
)

func BenchmarkContains(b *testing.B) {
acct := testutils.RandomAccounts(rand.New(rand.NewSource(1)), numAccounts)
txc := testutils.CreateTestEncodingConfig().TxConfig

mp := base.NewMempool(
base.DefaultTxPriority(),
signerextraction.NewDefaultAdapter(),
1000,
)

txs := make([]sdk.Tx, numAccounts*numTxsPerAcct)
for i := 0; i < numAccounts; i++ {
for j := 0; j < numTxsPerAcct; j++ {
tx, err := testutils.CreateTx(txc, acct[i], uint64(j), 0, nil, sdk.NewCoin("stake", sdkmath.NewInt(1)))
require.NoError(b, err)
err = mp.Insert(sdk.Context{}, tx)
require.NoError(b, err)
txs[i*numTxsPerAcct+j] = tx
}
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, tx := range txs {
found := mp.Contains(tx)
if !found {
b.Fatalf("tx not found in mempool")
}
}
}
}

func TestMempoolComparison(t *testing.T) {
acct := testutils.RandomAccounts(rand.New(rand.NewSource(1)), 2)
txc := testutils.CreateTestEncodingConfig().TxConfig
ctx := testutils.CreateBaseSDKContext(t)
mp := base.NewMempool(
base.DefaultTxPriority(),
txc.TxEncoder(),
signerextraction.NewDefaultAdapter(),
1000,
)
Expand Down Expand Up @@ -102,7 +138,6 @@ func TestMempoolSelect(t *testing.T) {
se := signerextraction.NewDefaultAdapter()
mp := base.NewMempool(
base.DefaultTxPriority(),
txc.TxEncoder(),
se,
1000,
)
Expand Down
1 change: 0 additions & 1 deletion block/base/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func WithMempoolConfigs[C comparable](cfg LaneConfig, txPriority TxPriority[C])
return func(l *BaseLane) {
l.LaneMempool = NewMempool(
txPriority,
cfg.TxEncoder,
cfg.SignerExtractor,
cfg.MaxTxs,
)
Expand Down
28 changes: 27 additions & 1 deletion block/base/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,19 @@
)

var (
_ sdkmempool.Mempool = (*PriorityNonceMempool[int64])(nil)
_ MempoolInterface = (*PriorityNonceMempool[int64])(nil)
_ sdkmempool.Iterator = (*PriorityNonceIterator[int64])(nil)
)

type (
// MempoolInterface defines the interface a mempool should implement.
MempoolInterface interface {
sdkmempool.Mempool

// Contains returns true if the transaction is in the mempool.
Contains(tx sdk.Tx) bool
}

// PriorityNonceMempoolConfig defines the configuration used to configure the
// PriorityNonceMempool.
PriorityNonceMempoolConfig[C comparable] struct {
Expand Down Expand Up @@ -462,6 +470,24 @@
return nil
}

// Contains returns true if the transaction is in the mempool.
func (mp *PriorityNonceMempool[C]) Contains(tx sdk.Tx) bool {
signers, err := mp.signerExtractor.GetSigners(tx)
if err != nil {
return false

Check warning on line 477 in block/base/priority_nonce.go

View check run for this annotation

Codecov / codecov/patch

block/base/priority_nonce.go#L474-L477

Added lines #L474 - L477 were not covered by tests
}
if len(signers) == 0 {
return false

Check warning on line 480 in block/base/priority_nonce.go

View check run for this annotation

Codecov / codecov/patch

block/base/priority_nonce.go#L479-L480

Added lines #L479 - L480 were not covered by tests
}

sig := signers[0]
nonce := sig.Sequence
sender := sig.Signer.String()

Check warning on line 485 in block/base/priority_nonce.go

View check run for this annotation

Codecov / codecov/patch

block/base/priority_nonce.go#L483-L485

Added lines #L483 - L485 were not covered by tests

_, ok := mp.scores[txMeta[C]{nonce: nonce, sender: sender}]
return ok

Check warning on line 488 in block/base/priority_nonce.go

View check run for this annotation

Codecov / codecov/patch

block/base/priority_nonce.go#L487-L488

Added lines #L487 - L488 were not covered by tests
}

func IsEmpty[C comparable](mempool sdkmempool.Mempool) error {
mp := mempool.(*PriorityNonceMempool[C])
if mp.priorityIndex.Len() != 0 {
Expand Down
8 changes: 4 additions & 4 deletions lanes/base/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *BaseTestSuite) TestCompareTxPriority() {
}

func (s *BaseTestSuite) TestInsert() {
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)

s.Run("should be able to insert a transaction", func() {
tx, err := testutils.CreateRandomTx(
Expand Down Expand Up @@ -180,7 +180,7 @@ func (s *BaseTestSuite) TestInsert() {
}

func (s *BaseTestSuite) TestRemove() {
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)

s.Run("should be able to remove a transaction", func() {
tx, err := testutils.CreateRandomTx(
Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *BaseTestSuite) TestRemove() {

func (s *BaseTestSuite) TestSelect() {
s.Run("should be able to select transactions in the correct order", func() {
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)

tx1, err := testutils.CreateRandomTx(
s.encodingConfig.TxConfig,
Expand Down Expand Up @@ -261,7 +261,7 @@ func (s *BaseTestSuite) TestSelect() {
})

s.Run("should be able to select a single transaction", func() {
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)

tx1, err := testutils.CreateRandomTx(
s.encodingConfig.TxConfig,
Expand Down
Loading