diff --git a/core/lib/mempool/src/mempool_store.rs b/core/lib/mempool/src/mempool_store.rs index 334a4783a76..47c6395cd7a 100644 --- a/core/lib/mempool/src/mempool_store.rs +++ b/core/lib/mempool/src/mempool_store.rs @@ -1,4 +1,4 @@ -use std::collections::{hash_map, BTreeSet, HashMap, HashSet}; +use std::collections::{hash_map, BTreeSet, HashMap}; use zksync_types::{ l1::L1Tx, l2::L2Tx, Address, ExecuteTransactionCommon, Nonce, PriorityOpId, Transaction, @@ -221,22 +221,53 @@ impl MempoolStore { } fn gc(&mut self) -> Vec
{ - if self.size >= self.capacity { - let index: HashSet<_> = self + if self.size > self.capacity { + let mut transactions = std::mem::take(&mut self.l2_transactions_per_account); + let mut possibly_kept: Vec<_> = self .l2_priority_queue .iter() - .map(|pointer| pointer.account) + .rev() + .filter_map(|pointer| { + transactions + .remove(&pointer.account) + .map(|txs| (pointer.account, txs)) + }) .collect(); - let transactions = std::mem::take(&mut self.l2_transactions_per_account); - let (kept, drained) = transactions + let mut number_of_accounts_kept = possibly_kept + .iter() + .scan(0, |sum, (_, txs)| { + *sum += txs.len(); + (*sum <= self.capacity as usize).then_some(()) + }) + .count(); + if number_of_accounts_kept == 0 { + tracing::warn!("mempool capacity is too low to handle txs from single account, consider increasing capacity"); + // Keep at least one entry, otherwise mempool won't return any new L2 tx to process. + number_of_accounts_kept = 1; + } + let (kept, drained) = { + let mut drained: Vec<_> = transactions.into_keys().collect(); + let also_drained = possibly_kept + .split_off(number_of_accounts_kept) + .into_iter() + .map(|(address, _)| address); + drained.extend(also_drained); + + (possibly_kept, drained) + }; + + let l2_priority_queue = std::mem::take(&mut self.l2_priority_queue); + self.l2_priority_queue = l2_priority_queue .into_iter() - .partition(|(address, _)| index.contains(address)); - self.l2_transactions_per_account = kept; + .rev() + .take(number_of_accounts_kept) + .collect(); + self.l2_transactions_per_account = kept.into_iter().collect(); self.size = self .l2_transactions_per_account .iter() - .fold(0, |agg, (_, tnxs)| agg + tnxs.len() as u64); - return drained.into_keys().collect(); + .fold(0, |agg, (_, txs)| agg + txs.len() as u64); + return drained; } vec![] } diff --git a/core/lib/mempool/src/tests.rs b/core/lib/mempool/src/tests.rs index 96ef600984f..ed107fc409c 100644 --- a/core/lib/mempool/src/tests.rs +++ b/core/lib/mempool/src/tests.rs @@ -321,32 +321,26 @@ fn stashed_accounts() { #[test] fn mempool_capacity() { - let mut mempool = MempoolStore::new(PriorityOpId(0), 5); + let mut mempool = MempoolStore::new(PriorityOpId(0), 4); let account0 = Address::random(); let account1 = Address::random(); let account2 = Address::random(); + let account3 = Address::random(); let transactions = vec![ gen_l2_tx(account0, Nonce(0)), gen_l2_tx(account0, Nonce(1)), gen_l2_tx(account0, Nonce(2)), - gen_l2_tx(account1, Nonce(1)), - gen_l2_tx(account2, Nonce(1)), + gen_l2_tx_with_timestamp(account1, Nonce(0), unix_timestamp_ms() + 1), + gen_l2_tx_with_timestamp(account2, Nonce(0), unix_timestamp_ms() + 2), + gen_l2_tx(account3, Nonce(1)), ]; mempool.insert(transactions, HashMap::new()); - // the mempool is full. Accounts with non-sequential nonces got stashed + // Mempool is full. Accounts with non-sequential nonces and some accounts with lowest score got stashed assert_eq!( HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts), - HashSet::<_>::from_iter(vec![account1, account2]), - ); - // verify that existing good-to-go transactions and new ones got picked - mempool.insert( - vec![gen_l2_tx_with_timestamp( - account1, - Nonce(0), - unix_timestamp_ms() + 1, - )], - HashMap::new(), + HashSet::<_>::from_iter(vec![account2, account3]), ); + // verify that good-to-go transactions are kept. for _ in 0..3 { assert_eq!( mempool @@ -363,6 +357,37 @@ fn mempool_capacity() { .initiator_account(), account1 ); + assert!(!mempool.has_next(&L2TxFilter::default())); +} + +#[test] +fn mempool_does_not_purge_all_accounts() { + let mut mempool = MempoolStore::new(PriorityOpId(0), 1); + let account0 = Address::random(); + let account1 = Address::random(); + let transactions = vec![ + gen_l2_tx(account0, Nonce(0)), + gen_l2_tx(account0, Nonce(1)), + gen_l2_tx(account1, Nonce(1)), + ]; + mempool.insert(transactions, HashMap::new()); + // Mempool is full. Account 1 has tx with non-sequential nonce so it should be purged. + // Txs from account 0 have sequential nonces but their number is greater than capacity; they should be kept. + assert_eq!( + HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts), + HashSet::<_>::from_iter(vec![account1]), + ); + // verify that good-to-go transactions are kept. + for _ in 0..2 { + assert_eq!( + mempool + .next_transaction(&L2TxFilter::default()) + .unwrap() + .initiator_account(), + account0 + ); + } + assert!(!mempool.has_next(&L2TxFilter::default())); } fn gen_l2_tx(address: Address, nonce: Nonce) -> Transaction { diff --git a/core/node/state_keeper/src/mempool_actor.rs b/core/node/state_keeper/src/mempool_actor.rs index dbe1e4cb977..d19a769e21b 100644 --- a/core/node/state_keeper/src/mempool_actor.rs +++ b/core/node/state_keeper/src/mempool_actor.rs @@ -89,12 +89,30 @@ impl MempoolFetcher { .await .context("failed getting pending protocol version")?; - let l2_tx_filter = l2_tx_filter( - self.batch_fee_input_provider.as_ref(), - protocol_version.into(), - ) - .await - .context("failed creating L2 transaction filter")?; + let l2_tx_filter = if let Some(unsealed_batch) = storage + .blocks_dal() + .get_unsealed_l1_batch() + .await + .context("failed getting unsealed batch")? + { + let (base_fee, gas_per_pubdata) = derive_base_fee_and_gas_per_pubdata( + unsealed_batch.fee_input, + protocol_version.into(), + ); + + L2TxFilter { + fee_input: unsealed_batch.fee_input, + fee_per_gas: base_fee, + gas_per_pubdata: gas_per_pubdata as u32, + } + } else { + l2_tx_filter( + self.batch_fee_input_provider.as_ref(), + protocol_version.into(), + ) + .await + .context("failed creating L2 transaction filter")? + }; let transactions = storage .transactions_dal()