diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 63dc2c22c3..13a202d617 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -1231,6 +1231,7 @@ impl DependenciesBuilder { self.get_sqlite_connection().await?, self.get_sqlite_connection_cardano_transaction_pool() .await?, + self.get_signed_entity_lock().await?, self.get_logger()?, )); diff --git a/mithril-aggregator/src/services/upkeep.rs b/mithril-aggregator/src/services/upkeep.rs index 4a849545b2..60b89d351d 100644 --- a/mithril-aggregator/src/services/upkeep.rs +++ b/mithril-aggregator/src/services/upkeep.rs @@ -11,6 +11,7 @@ use anyhow::Context; use async_trait::async_trait; use slog::{info, Logger}; +use mithril_common::signed_entity_type_lock::SignedEntityTypeLock; use mithril_common::StdResult; use mithril_persistence::sqlite::{ SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool, @@ -31,6 +32,7 @@ pub trait UpkeepService: Send + Sync { pub struct AggregatorUpkeepService { main_db_connection: Arc, cardano_tx_connection_pool: Arc, + signed_entity_type_lock: Arc, logger: Logger, } @@ -39,16 +41,26 @@ impl AggregatorUpkeepService { pub fn new( main_db_connection: Arc, cardano_tx_connection_pool: Arc, + signed_entity_type_lock: Arc, logger: Logger, ) -> Self { Self { main_db_connection, cardano_tx_connection_pool, + signed_entity_type_lock, logger, } } async fn upkeep_all_databases(&self) -> StdResult<()> { + if self.signed_entity_type_lock.has_locked_entities().await { + info!( + self.logger, + "UpkeepService::Some entities are locked - Skipping database upkeep" + ); + return Ok(()); + } + let main_db_connection = self.main_db_connection.clone(); let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone(); let db_upkeep_logger = self.logger.clone(); @@ -99,9 +111,13 @@ impl UpkeepService for AggregatorUpkeepService { #[cfg(test)] mod tests { + use mithril_common::entities::SignedEntityTypeDiscriminants; use mithril_common::test_utils::TempDir; - use crate::database::test_helper::{cardano_tx_db_file_connection, main_db_file_connection}; + use crate::database::test_helper::{ + cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection, + main_db_file_connection, + }; use crate::test_tools::TestLogger; use super::*; @@ -125,6 +141,7 @@ mod tests { Arc::new(SqliteConnectionPool::build_from_connection( cardano_tx_connection, )), + Arc::new(SignedEntityTypeLock::default()), TestLogger::file(&log_path), ); @@ -148,4 +165,42 @@ mod tests { "Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup" ); } + + #[tokio::test] + async fn test_doesnt_cleanup_db_if_any_entity_is_locked() { + let log_path = TempDir::create( + "aggregator_upkeep", + "test_doesnt_cleanup_db_if_any_entity_is_locked", + ) + .join("upkeep.log"); + + let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default()); + let service = AggregatorUpkeepService::new( + Arc::new(main_db_connection().unwrap()), + Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()), + signed_entity_type_lock.clone(), + TestLogger::file(&log_path), + ); + + // Separate block to ensure the log is flushed after run + { + signed_entity_type_lock + .lock(SignedEntityTypeDiscriminants::CardanoTransactions) + .await; + service.run().await.expect("Upkeep service failed"); + } + + let logs = std::fs::read_to_string(&log_path).unwrap(); + + assert_eq!( + logs.matches(SqliteCleaningTask::Vacuum.log_message()) + .count(), + 0, + ); + assert_eq!( + logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message()) + .count(), + 0, + ); + } } diff --git a/mithril-common/src/signed_entity_type_lock.rs b/mithril-common/src/signed_entity_type_lock.rs index 640aef844e..d22500f1c3 100644 --- a/mithril-common/src/signed_entity_type_lock.rs +++ b/mithril-common/src/signed_entity_type_lock.rs @@ -45,6 +45,12 @@ impl SignedEntityTypeLock { locked_entities.remove(&entity_type.into()); } + /// Check if there are any locked signed entities. + pub async fn has_locked_entities(&self) -> bool { + let locked_entities = self.locked_entities.read().await; + !locked_entities.is_empty() + } + /// List only the unlocked signed entities in the given list. pub async fn filter_unlocked_entries + Clone>( &self, @@ -189,4 +195,16 @@ mod tests { vec![SignedEntityTypeDiscriminants::CardanoTransactions] ); } + + #[tokio::test] + async fn has_locked_entities() { + let signed_entity_type_lock = SignedEntityTypeLock::new(); + + assert!(!signed_entity_type_lock.has_locked_entities().await); + + signed_entity_type_lock + .lock(SignedEntityTypeDiscriminants::MithrilStakeDistribution) + .await; + assert!(signed_entity_type_lock.has_locked_entities().await); + } } diff --git a/mithril-signer/src/database/test_helper.rs b/mithril-signer/src/database/test_helper.rs index 57aa617549..89b2063622 100644 --- a/mithril-signer/src/database/test_helper.rs +++ b/mithril-signer/src/database/test_helper.rs @@ -3,6 +3,12 @@ use std::path::Path; use mithril_common::StdResult; use mithril_persistence::sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection}; +/// In-memory sqlite database without foreign key support with migrations applied +pub fn main_db_connection() -> StdResult { + let builder = ConnectionBuilder::open_memory(); + build_main_db_connection(builder) +} + /// File sqlite database without foreign key support with migrations applied and WAL activated pub fn main_db_file_connection(db_path: &Path) -> StdResult { let builder = ConnectionBuilder::open_file(db_path) diff --git a/mithril-signer/src/runtime/signer_services.rs b/mithril-signer/src/runtime/signer_services.rs index f2c0996f69..7f36e2854d 100644 --- a/mithril-signer/src/runtime/signer_services.rs +++ b/mithril-signer/src/runtime/signer_services.rs @@ -336,6 +336,7 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> { let upkeep_service = Arc::new(SignerUpkeepService::new( sqlite_connection.clone(), sqlite_connection_cardano_transaction_pool, + signed_entity_type_lock.clone(), slog_scope::logger(), )); diff --git a/mithril-signer/src/upkeep_service.rs b/mithril-signer/src/upkeep_service.rs index 0a0f9d7b41..fad60d3c5c 100644 --- a/mithril-signer/src/upkeep_service.rs +++ b/mithril-signer/src/upkeep_service.rs @@ -11,6 +11,7 @@ use anyhow::Context; use async_trait::async_trait; use slog::{info, Logger}; +use mithril_common::signed_entity_type_lock::SignedEntityTypeLock; use mithril_common::StdResult; use mithril_persistence::sqlite::{ SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool, @@ -31,6 +32,7 @@ pub trait UpkeepService: Send + Sync { pub struct SignerUpkeepService { main_db_connection: Arc, cardano_tx_connection_pool: Arc, + signed_entity_type_lock: Arc, logger: Logger, } @@ -39,16 +41,26 @@ impl SignerUpkeepService { pub fn new( main_db_connection: Arc, cardano_tx_connection_pool: Arc, + signed_entity_type_lock: Arc, logger: Logger, ) -> Self { Self { main_db_connection, cardano_tx_connection_pool, + signed_entity_type_lock, logger, } } async fn upkeep_all_databases(&self) -> StdResult<()> { + if self.signed_entity_type_lock.has_locked_entities().await { + info!( + self.logger, + "UpkeepService::Some entities are locked - Skipping database upkeep" + ); + return Ok(()); + } + let main_db_connection = self.main_db_connection.clone(); let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone(); let db_upkeep_logger = self.logger.clone(); @@ -99,9 +111,13 @@ impl UpkeepService for SignerUpkeepService { #[cfg(test)] mod tests { + use mithril_common::entities::SignedEntityTypeDiscriminants; use mithril_common::test_utils::TempDir; - use crate::database::test_helper::{cardano_tx_db_file_connection, main_db_file_connection}; + use crate::database::test_helper::{ + cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection, + main_db_file_connection, + }; use crate::test_tools::TestLogger; use super::*; @@ -125,6 +141,7 @@ mod tests { Arc::new(SqliteConnectionPool::build_from_connection( cardano_tx_connection, )), + Arc::new(SignedEntityTypeLock::default()), TestLogger::file(&log_path), ); @@ -148,4 +165,42 @@ mod tests { "Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup" ); } + + #[tokio::test] + async fn test_doesnt_cleanup_db_if_any_entity_is_locked() { + let log_path = TempDir::create( + "signer_upkeep", + "test_doesnt_cleanup_db_if_any_entity_is_locked", + ) + .join("upkeep.log"); + + let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default()); + let service = SignerUpkeepService::new( + Arc::new(main_db_connection().unwrap()), + Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()), + signed_entity_type_lock.clone(), + TestLogger::file(&log_path), + ); + + // Separate block to ensure the log is flushed after run + { + signed_entity_type_lock + .lock(SignedEntityTypeDiscriminants::CardanoTransactions) + .await; + service.run().await.expect("Upkeep service failed"); + } + + let logs = std::fs::read_to_string(&log_path).unwrap(); + + assert_eq!( + logs.matches(SqliteCleaningTask::Vacuum.log_message()) + .count(), + 0, + ); + assert_eq!( + logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message()) + .count(), + 0, + ); + } } diff --git a/mithril-signer/tests/test_extensions/state_machine_tester.rs b/mithril-signer/tests/test_extensions/state_machine_tester.rs index a39f1e1d50..b657c96d56 100644 --- a/mithril-signer/tests/test_extensions/state_machine_tester.rs +++ b/mithril-signer/tests/test_extensions/state_machine_tester.rs @@ -213,6 +213,7 @@ impl StateMachineTester { let upkeep_service = Arc::new(SignerUpkeepService::new( sqlite_connection.clone(), sqlite_connection_cardano_transaction_pool, + signed_entity_type_lock.clone(), slog_scope::logger(), ));