From 949ae96ed6c3e85812291e8f6e9ba299b0872809 Mon Sep 17 00:00:00 2001 From: Ahmed Sagdati <37515857+segfault-magnet@users.noreply.github.com> Date: Tue, 19 Nov 2024 10:16:04 +0100 Subject: [PATCH] chore: add tests for the state listener (#143) --- Cargo.lock | 1 + packages/services/Cargo.toml | 1 + packages/services/src/lib.rs | 31 ++- packages/services/src/state_committer.rs | 36 ++- packages/services/src/state_listener.rs | 320 ++++++++++++++++++++++- 5 files changed, 360 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b241273..1e1d9790 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5632,6 +5632,7 @@ dependencies = [ "services", "storage", "tai64", + "test-case", "thiserror", "tokio", "tokio-util", diff --git a/packages/services/Cargo.toml b/packages/services/Cargo.toml index f972dedc..3863131b 100644 --- a/packages/services/Cargo.toml +++ b/packages/services/Cargo.toml @@ -28,6 +28,7 @@ tracing = { workspace = true } trait-variant = { workspace = true } [dev-dependencies] +test-case = { workspace = true } clock = { workspace = true, features = ["test-helpers"] } delegate = { workspace = true } eth = { workspace = true, features = ["test-helpers"] } diff --git a/packages/services/src/lib.rs b/packages/services/src/lib.rs index 418b58c9..36c22bcf 100644 --- a/packages/services/src/lib.rs +++ b/packages/services/src/lib.rs @@ -79,7 +79,7 @@ pub(crate) mod test_utils { use ports::{ l1::FragmentEncoder, storage::Storage, - types::{CollectNonEmpty, CompressedFuelBlock, DateTime, Fragment, NonEmpty, Utc}, + types::{CollectNonEmpty, CompressedFuelBlock, DateTime, Fragment, L1Tx, NonEmpty, Utc}, }; use rand::RngCore; use storage::{DbWithProcess, PostgresProcess}; @@ -187,13 +187,13 @@ pub(crate) mod test_utils { } pub fn expects_state_submissions( - expectations: impl IntoIterator>, [u8; 32])>, + expectations: impl IntoIterator>, ports::types::L1Tx)>, ) -> ports::l1::MockApi { let mut sequence = Sequence::new(); let mut l1_mock = ports::l1::MockApi::new(); - for (fragment, tx_id) in expectations { + for (fragment, tx) in expectations { l1_mock .expect_submit_state_fragments() .withf(move |data, _previous_tx| { @@ -207,10 +207,7 @@ pub(crate) mod test_utils { .return_once(move |fragments, _previous_tx| { Box::pin(async move { Ok(( - ports::types::L1Tx { - hash: tx_id, - ..Default::default() - }, + tx, FragmentsSubmitted { num_fragments: min(fragments.len(), 6).try_into().unwrap(), }, @@ -427,7 +424,13 @@ pub(crate) mod test_utils { impl Setup { pub async fn send_fragments(&self, eth_tx: [u8; 32]) { StateCommitter::new( - mocks::l1::expects_state_submissions(vec![(None, eth_tx)]), + mocks::l1::expects_state_submissions(vec![( + None, + L1Tx { + hash: eth_tx, + ..Default::default() + }, + )]), mocks::fuel::latest_height_is(0), self.db(), crate::StateCommitterConfig::default(), @@ -458,8 +461,14 @@ pub(crate) mod test_utils { let clock = TestClock::default(); clock.set_time(finalization_time); - let tx = [1; 32]; - let l1_mock = mocks::l1::expects_state_submissions(vec![(None, tx)]); + let tx_hash = [1; 32]; + let l1_mock = mocks::l1::expects_state_submissions(vec![( + None, + L1Tx { + hash: tx_hash, + ..Default::default() + }, + )]); let fuel_mock = mocks::fuel::latest_height_is(0); let mut committer = StateCommitter::new( l1_mock, @@ -470,7 +479,7 @@ pub(crate) mod test_utils { ); committer.run().await.unwrap(); - let l1_mock = mocks::l1::txs_finished(0, 0, [(tx, TxStatus::Success)]); + let l1_mock = mocks::l1::txs_finished(0, 0, [(tx_hash, TxStatus::Success)]); StateListener::new( l1_mock, diff --git a/packages/services/src/state_committer.rs b/packages/services/src/state_committer.rs index 6db03476..2946e28c 100644 --- a/packages/services/src/state_committer.rs +++ b/packages/services/src/state_committer.rs @@ -262,10 +262,12 @@ mod tests { let fragments = setup.insert_fragments(0, 4).await; - let tx_hash = [0; 32]; let l1_mock_submit = test_utils::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments.clone()).unwrap()), - tx_hash, + L1Tx { + hash: [0; 32], + ..Default::default() + }, )]); let fuel_mock = test_utils::mocks::fuel::latest_height_is(0); @@ -301,7 +303,10 @@ mod tests { let tx_hash = [1; 32]; let l1_mock_submit = test_utils::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments.clone()).unwrap()), - tx_hash, + L1Tx { + hash: tx_hash, + ..Default::default() + }, )]); let fuel_mock = test_utils::mocks::fuel::latest_height_is(0); @@ -375,7 +380,10 @@ mod tests { let tx_hash = [3; 32]; let l1_mock_submit = test_utils::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments).unwrap()), - tx_hash, + L1Tx { + hash: tx_hash, + ..Default::default() + }, )]); let fuel_mock = test_utils::mocks::fuel::latest_height_is(0); @@ -414,7 +422,10 @@ mod tests { let tx_hash = [4; 32]; let l1_mock_submit = test_utils::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments_to_submit).unwrap()), - tx_hash, + L1Tx { + hash: tx_hash, + ..Default::default() + }, )]); let fuel_mock = test_utils::mocks::fuel::latest_height_is(1); @@ -453,7 +464,10 @@ mod tests { let tx_hash = [5; 32]; let l1_mock_submit = test_utils::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments.clone()).unwrap()), - tx_hash, + L1Tx { + hash: tx_hash, + ..Default::default() + }, )]); let fuel_mock = test_utils::mocks::fuel::latest_height_is(0); @@ -495,11 +509,17 @@ mod tests { let l1_mock_submit = test_utils::mocks::l1::expects_state_submissions([ ( Some(NonEmpty::from_vec(fragments.clone()).unwrap()), - tx_hash_1, + L1Tx { + hash: tx_hash_1, + ..Default::default() + }, ), ( Some(NonEmpty::from_vec(fragments.clone()).unwrap()), - tx_hash_2, + L1Tx { + hash: tx_hash_2, + ..Default::default() + }, ), ]); diff --git a/packages/services/src/state_listener.rs b/packages/services/src/state_listener.rs index 70cf0434..c45123f9 100644 --- a/packages/services/src/state_listener.rs +++ b/packages/services/src/state_listener.rs @@ -45,7 +45,7 @@ where Db: Storage, C: Clock, { - async fn check_non_finalized_txs(&mut self, non_finalized_txs: Vec) -> crate::Result<()> { + async fn check_non_finalized_txs(&self, non_finalized_txs: Vec) -> crate::Result<()> { let current_block_number: u64 = self.l1_adapter.get_block_number().await?.into(); // we need to accumulate all the changes and then update the db atomically @@ -64,7 +64,7 @@ where // not included in block - check what happened to the tx match (tx.state, self.l1_adapter.is_squeezed_out(tx.hash).await?) { - (TransactionState::Pending, true) => { + (TransactionState::Pending | TransactionState::IncludedInBlock, true) => { // not in the mempool anymore set it to failed selective_change.push((tx.hash, tx.nonce, TransactionState::Failed)); @@ -195,16 +195,24 @@ impl Metrics { #[cfg(test)] mod tests { + use std::time::Duration; + use clock::TestClock; + use mockall::predicate::eq; + use ports::types::{L1Height, TransactionResponse, Utc}; + use test_case::test_case; use super::*; - use crate::test_utils::{ - self, - mocks::{self, l1::TxStatus}, + use crate::{ + test_utils::{ + self, + mocks::{self, l1::TxStatus}, + }, + Result, StateCommitter, }; #[tokio::test] - async fn state_listener_will_update_tx_state_if_finalized() -> crate::Result<()> { + async fn successful_finalized_tx() -> Result<()> { // given let setup = test_utils::Setup::init().await; @@ -252,7 +260,7 @@ mod tests { } #[tokio::test] - async fn state_listener_will_update_tx_from_pending_to_included() -> crate::Result<()> { + async fn successful_tx_in_block_not_finalized() -> Result<()> { // given let setup = test_utils::Setup::init().await; @@ -297,7 +305,7 @@ mod tests { } #[tokio::test] - async fn state_listener_from_pending_to_included_to_finalized_tx() -> crate::Result<()> { + async fn from_pending_to_included_to_success_finalized_tx() -> Result<()> { // given let setup = test_utils::Setup::init().await; @@ -365,7 +373,7 @@ mod tests { } #[tokio::test] - async fn state_listener_from_pending_to_included_to_pending() -> crate::Result<()> { + async fn reorg_threw_out_tx_from_block_into_pool() -> Result<()> { // given let setup = test_utils::Setup::init().await; @@ -427,7 +435,65 @@ mod tests { } #[tokio::test] - async fn state_listener_will_update_tx_state_if_failed() -> crate::Result<()> { + async fn reorg_threw_out_tx_from_block_into_pool_and_got_squeezed_out() -> Result<()> { + // given + let setup = test_utils::Setup::init().await; + + let _ = setup.insert_fragments(0, 1).await; + + let tx_hash = [0; 32]; + setup.send_fragments(tx_hash).await; + + let mut mock = ports::l1::MockApi::new(); + + mock.expect_get_transaction_response() + .once() + .with(eq(tx_hash)) + .return_once(|_| Box::pin(async { Ok(Some(TransactionResponse::new(1, true))) })); + mock.expect_get_block_number() + .returning(|| Box::pin(async { Ok(L1Height::from(1u32)) })); + + let mut listener = StateListener::new( + mock, + setup.db(), + 5, + TestClock::default(), + IntGauge::new("test", "test").unwrap(), + ); + listener.run().await?; + + let mut l1 = ports::l1::MockApi::new(); + l1.expect_get_block_number() + .returning(|| Box::pin(async { Ok(5.into()) })); + l1.expect_get_transaction_response() + .once() + .with(eq(tx_hash)) + .return_once(|_| Box::pin(async { Ok(None) })); + l1.expect_is_squeezed_out() + .once() + .with(eq(tx_hash)) + .return_once(|_| Box::pin(async { Ok(true) })); + let mut listener = StateListener::new( + l1, + setup.db(), + 5, + TestClock::default(), + IntGauge::new("test", "test").unwrap(), + ); + + // when + listener.run().await?; + + // then + let db = setup.db(); + assert!(!db.has_nonfinalized_txs().await?); + assert!(!db.has_pending_txs().await?); + + Ok(()) + } + + #[tokio::test] + async fn tx_failed() -> Result<()> { // given let setup = test_utils::Setup::init().await; @@ -472,4 +538,238 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn fine_to_have_nothing_to_check() -> Result<()> { + // given + let setup = test_utils::Setup::init().await; + + let mut listener = StateListener::new( + ports::l1::MockApi::new(), + setup.db(), + 5, + TestClock::default(), + IntGauge::new("test", "test").unwrap(), + ); + + // when + let res = listener.run().await; + + // then + assert!(res.is_ok()); + Ok(()) + } + + #[tokio::test] + async fn a_pending_tx_got_squeezed_out() -> Result<()> { + // given + let setup = test_utils::Setup::init().await; + let _ = setup.insert_fragments(0, 1).await; + + let tx_hash = [0; 32]; + setup.send_fragments(tx_hash).await; + let mut l1 = ports::l1::MockApi::new(); + l1.expect_get_block_number() + .returning(|| Box::pin(async { Ok(5.into()) })); + + l1.expect_get_transaction_response() + .with(eq(tx_hash)) + .once() + .return_once(|_| Box::pin(async { Ok(None) })); + + l1.expect_is_squeezed_out() + .with(eq(tx_hash)) + .once() + .return_once(|_| Box::pin(async { Ok(true) })); + + let mut sut = StateListener::new( + l1, + setup.db(), + 5, + TestClock::default(), + IntGauge::new("test", "test").unwrap(), + ); + + // when + sut.run().await?; + + // then + assert!(!setup.db().has_pending_txs().await?); + + Ok(()) + } + + #[tokio::test] + async fn block_inclusion_of_replacement_leaves_no_pending_txs() -> Result<()> { + // given + let setup = test_utils::Setup::init().await; + + // Insert multiple fragments with the same nonce + let _ = setup.insert_fragments(0, 1).await; + + let start_time = Utc::now(); + let orig_tx_hash = [0; 32]; + let orig_tx = L1Tx { + hash: orig_tx_hash, + created_at: Some(start_time), + ..Default::default() + }; + + let replacement_tx_time = start_time + Duration::from_secs(1); + let replacement_tx_hash = [1; 32]; + let replacement_tx = L1Tx { + hash: replacement_tx_hash, + created_at: Some(replacement_tx_time), + ..Default::default() + }; + + let clock = TestClock::default(); + let mut committer = StateCommitter::new( + mocks::l1::expects_state_submissions(vec![(None, orig_tx), (None, replacement_tx)]), + mocks::fuel::latest_height_is(0), + setup.db(), + crate::StateCommitterConfig { + gas_bump_timeout: Duration::ZERO, + ..Default::default() + }, + clock.clone(), + ); + + // Orig tx + committer.run().await?; + + // Replacement + clock.set_time(replacement_tx_time); + committer.run().await?; + + assert_eq!(setup.db().get_pending_txs().await.unwrap().len(), 2); + + let current_height = 10u64; + let mut l1 = ports::l1::MockApi::new(); + l1.expect_get_block_number() + .returning(move || Box::pin(async move { Ok(current_height.try_into().unwrap()) })); + + l1.expect_get_transaction_response() + .with(eq(orig_tx_hash)) + .returning(|_| Box::pin(async { Ok(None) })); + l1.expect_is_squeezed_out() + .with(eq(orig_tx_hash)) + .returning(|_| Box::pin(async { Ok(true) })); + l1.expect_get_transaction_response() + .with(eq(replacement_tx_hash)) + .once() + .return_once(move |_| { + Box::pin(async move { Ok(Some(TransactionResponse::new(current_height, true))) }) + }); + + let mut listener = StateListener::new( + l1, + setup.db(), + 10, + clock, + IntGauge::new("test", "test").unwrap(), + ); + + // when + listener.run().await?; + + // then + let db = setup.db(); + assert!(!db.has_pending_txs().await?); + assert!(db.has_nonfinalized_txs().await?); + + Ok(()) + } + + #[test_case(true ; "replacement tx succeeds")] + #[test_case(false ; "replacement tx fails")] + #[tokio::test] + async fn finalized_replacement_tx_will_leave_no_pending_tx( + replacement_tx_succeeded: bool, + ) -> Result<()> { + // given + let setup = test_utils::Setup::init().await; + + // Insert multiple fragments with the same nonce + let _ = setup.insert_fragments(0, 1).await; + + let start_time = Utc::now(); + let orig_tx_hash = [0; 32]; + let orig_tx = L1Tx { + hash: orig_tx_hash, + created_at: Some(start_time), + ..Default::default() + }; + + let replacement_tx_time = start_time + Duration::from_secs(1); + let replacement_tx_hash = [1; 32]; + let replacement_tx = L1Tx { + hash: replacement_tx_hash, + created_at: Some(replacement_tx_time), + ..Default::default() + }; + + let clock = TestClock::default(); + let mut committer = StateCommitter::new( + mocks::l1::expects_state_submissions(vec![(None, orig_tx), (None, replacement_tx)]), + mocks::fuel::latest_height_is(0), + setup.db(), + crate::StateCommitterConfig { + gas_bump_timeout: Duration::ZERO, + ..Default::default() + }, + clock.clone(), + ); + + // Orig tx + committer.run().await?; + + // Replacement + clock.set_time(replacement_tx_time); + committer.run().await?; + + assert_eq!(setup.db().get_pending_txs().await.unwrap().len(), 2); + + let blocks_to_finalize = 1u64; + let current_height = 10u64; + let mut l1 = ports::l1::MockApi::new(); + l1.expect_get_block_number() + .returning(move || Box::pin(async move { Ok(current_height.try_into().unwrap()) })); + + l1.expect_get_transaction_response() + .with(eq(orig_tx_hash)) + .returning(|_| Box::pin(async { Ok(None) })); + l1.expect_is_squeezed_out() + .with(eq(orig_tx_hash)) + .returning(|_| Box::pin(async { Ok(true) })); + l1.expect_get_transaction_response() + .with(eq(replacement_tx_hash)) + .once() + .return_once(move |_| { + Box::pin(async move { + Ok(Some(TransactionResponse::new( + current_height - blocks_to_finalize, + replacement_tx_succeeded, + ))) + }) + }); + + let mut listener = StateListener::new( + l1, + setup.db(), + blocks_to_finalize, + clock, + IntGauge::new("test", "test").unwrap(), + ); + + // when + listener.run().await?; + + // then + let db = setup.db(); + assert!(!db.has_pending_txs().await?); + assert!(!db.has_nonfinalized_txs().await?); + + Ok(()) + } }