Skip to content

Commit

Permalink
improve mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
perekopskiy committed Oct 16, 2024
1 parent 0edd796 commit bce5a67
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 30 deletions.
51 changes: 41 additions & 10 deletions core/lib/mempool/src/mempool_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{hash_map, BTreeSet, HashMap, HashSet};
use std::collections::{hash_map, BTreeSet, HashMap};

use zksync_types::{
l1::L1Tx, l2::L2Tx, Address, ExecuteTransactionCommon, Nonce, PriorityOpId, Transaction,
Expand Down Expand Up @@ -221,22 +221,53 @@ impl MempoolStore {
}

fn gc(&mut self) -> Vec<Address> {
if self.size >= self.capacity {
let index: HashSet<_> = self
if self.size > self.capacity {
let mut transactions = std::mem::take(&mut self.l2_transactions_per_account);
let mut possibly_kept: Vec<_> = self
.l2_priority_queue
.iter()
.map(|pointer| pointer.account)
.rev()
.filter_map(|pointer| {
transactions
.remove(&pointer.account)
.map(|txs| (pointer.account, txs))
})
.collect();
let transactions = std::mem::take(&mut self.l2_transactions_per_account);
let (kept, drained) = transactions
let mut number_of_accounts_kept = possibly_kept
.iter()
.scan(0, |sum, (_, txs)| {
*sum += txs.len();
(*sum <= self.capacity as usize).then_some(())
})
.count();
if number_of_accounts_kept == 0 {
tracing::warn!("mempool capacity is too low to handle txs from single account, consider increasing capacity");
// Keep at least one entry, otherwise mempool won't return any new L2 tx to process.
number_of_accounts_kept = 1;
}
let (kept, drained) = {
let mut drained: Vec<_> = transactions.into_keys().collect();
let also_drained = possibly_kept
.split_off(number_of_accounts_kept)
.into_iter()
.map(|(address, _)| address);
drained.extend(also_drained);

(possibly_kept, drained)
};

let l2_priority_queue = std::mem::take(&mut self.l2_priority_queue);
self.l2_priority_queue = l2_priority_queue
.into_iter()
.partition(|(address, _)| index.contains(address));
self.l2_transactions_per_account = kept;
.rev()
.take(number_of_accounts_kept)
.collect();
self.l2_transactions_per_account = kept.into_iter().collect();
self.size = self
.l2_transactions_per_account
.iter()
.fold(0, |agg, (_, tnxs)| agg + tnxs.len() as u64);
return drained.into_keys().collect();
.fold(0, |agg, (_, txs)| agg + txs.len() as u64);
return drained;
}
vec![]
}
Expand Down
53 changes: 39 additions & 14 deletions core/lib/mempool/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,32 +321,26 @@ fn stashed_accounts() {

#[test]
fn mempool_capacity() {
let mut mempool = MempoolStore::new(PriorityOpId(0), 5);
let mut mempool = MempoolStore::new(PriorityOpId(0), 4);
let account0 = Address::random();
let account1 = Address::random();
let account2 = Address::random();
let account3 = Address::random();
let transactions = vec![
gen_l2_tx(account0, Nonce(0)),
gen_l2_tx(account0, Nonce(1)),
gen_l2_tx(account0, Nonce(2)),
gen_l2_tx(account1, Nonce(1)),
gen_l2_tx(account2, Nonce(1)),
gen_l2_tx_with_timestamp(account1, Nonce(0), unix_timestamp_ms() + 1),
gen_l2_tx_with_timestamp(account2, Nonce(0), unix_timestamp_ms() + 2),
gen_l2_tx(account3, Nonce(1)),
];
mempool.insert(transactions, HashMap::new());
// the mempool is full. Accounts with non-sequential nonces got stashed
// Mempool is full. Accounts with non-sequential nonces and some accounts with lowest score got stashed
assert_eq!(
HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts),
HashSet::<_>::from_iter(vec![account1, account2]),
);
// verify that existing good-to-go transactions and new ones got picked
mempool.insert(
vec![gen_l2_tx_with_timestamp(
account1,
Nonce(0),
unix_timestamp_ms() + 1,
)],
HashMap::new(),
HashSet::<_>::from_iter(vec![account2, account3]),
);
// verify that good-to-go transactions are kept.
for _ in 0..3 {
assert_eq!(
mempool
Expand All @@ -363,6 +357,37 @@ fn mempool_capacity() {
.initiator_account(),
account1
);
assert!(!mempool.has_next(&L2TxFilter::default()));
}

#[test]
fn mempool_does_not_purge_all_accounts() {
let mut mempool = MempoolStore::new(PriorityOpId(0), 1);
let account0 = Address::random();
let account1 = Address::random();
let transactions = vec![
gen_l2_tx(account0, Nonce(0)),
gen_l2_tx(account0, Nonce(1)),
gen_l2_tx(account1, Nonce(1)),
];
mempool.insert(transactions, HashMap::new());
// Mempool is full. Account 1 has tx with non-sequential nonce so it should be purged.
// Txs from account 0 have sequential nonces but their number is greater than capacity; they should be kept.
assert_eq!(
HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts),
HashSet::<_>::from_iter(vec![account1]),
);
// verify that good-to-go transactions are kept.
for _ in 0..2 {
assert_eq!(
mempool
.next_transaction(&L2TxFilter::default())
.unwrap()
.initiator_account(),
account0
);
}
assert!(!mempool.has_next(&L2TxFilter::default()));
}

fn gen_l2_tx(address: Address, nonce: Nonce) -> Transaction {
Expand Down
30 changes: 24 additions & 6 deletions core/node/state_keeper/src/mempool_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,30 @@ impl MempoolFetcher {
.await
.context("failed getting pending protocol version")?;

let l2_tx_filter = l2_tx_filter(
self.batch_fee_input_provider.as_ref(),
protocol_version.into(),
)
.await
.context("failed creating L2 transaction filter")?;
let l2_tx_filter = if let Some(unsealed_batch) = storage
.blocks_dal()
.get_unsealed_l1_batch()
.await
.context("failed getting unsealed batch")?
{
let (base_fee, gas_per_pubdata) = derive_base_fee_and_gas_per_pubdata(
unsealed_batch.fee_input,
protocol_version.into(),
);

L2TxFilter {
fee_input: unsealed_batch.fee_input,
fee_per_gas: base_fee,
gas_per_pubdata: gas_per_pubdata as u32,
}
} else {
l2_tx_filter(
self.batch_fee_input_provider.as_ref(),
protocol_version.into(),
)
.await
.context("failed creating L2 transaction filter")?
};

let transactions = storage
.transactions_dal()
Expand Down

0 comments on commit bce5a67

Please sign in to comment.