Skip to content

Commit

Permalink
Skip database upkeep if any signed entity type is locked
Browse files Browse the repository at this point in the history
Since that would probably mean that some writing are currently occuring
on the database (like if the cardano transactions preloader is running).
  • Loading branch information
Alenar committed Jun 28, 2024
1 parent 503e9b3 commit bcb059a
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 2 deletions.
1 change: 1 addition & 0 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,7 @@ impl DependenciesBuilder {
self.get_sqlite_connection().await?,
self.get_sqlite_connection_cardano_transaction_pool()
.await?,
self.get_signed_entity_lock().await?,
self.get_logger()?,
));

Expand Down
57 changes: 56 additions & 1 deletion mithril-aggregator/src/services/upkeep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use anyhow::Context;
use async_trait::async_trait;
use slog::{info, Logger};

use mithril_common::signed_entity_type_lock::SignedEntityTypeLock;
use mithril_common::StdResult;
use mithril_persistence::sqlite::{
SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
Expand All @@ -31,6 +32,7 @@ pub trait UpkeepService: Send + Sync {
pub struct AggregatorUpkeepService {
main_db_connection: Arc<SqliteConnection>,
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
logger: Logger,
}

Expand All @@ -39,16 +41,26 @@ impl AggregatorUpkeepService {
pub fn new(
main_db_connection: Arc<SqliteConnection>,
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
logger: Logger,
) -> Self {
Self {
main_db_connection,
cardano_tx_connection_pool,
signed_entity_type_lock,
logger,
}
}

async fn upkeep_all_databases(&self) -> StdResult<()> {
if self.signed_entity_type_lock.has_locked_entities().await {
info!(
self.logger,
"UpkeepService::Some entities are locked - Skipping database upkeep"
);
return Ok(());
}

let main_db_connection = self.main_db_connection.clone();
let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
let db_upkeep_logger = self.logger.clone();
Expand Down Expand Up @@ -99,9 +111,13 @@ impl UpkeepService for AggregatorUpkeepService {

#[cfg(test)]
mod tests {
use mithril_common::entities::SignedEntityTypeDiscriminants;
use mithril_common::test_utils::TempDir;

use crate::database::test_helper::{cardano_tx_db_file_connection, main_db_file_connection};
use crate::database::test_helper::{
cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection,
main_db_file_connection,
};
use crate::test_tools::TestLogger;

use super::*;
Expand All @@ -125,6 +141,7 @@ mod tests {
Arc::new(SqliteConnectionPool::build_from_connection(
cardano_tx_connection,
)),
Arc::new(SignedEntityTypeLock::default()),
TestLogger::file(&log_path),
);

Expand All @@ -148,4 +165,42 @@ mod tests {
"Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup"
);
}

#[tokio::test]
async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
let log_path = TempDir::create(
"aggregator_upkeep",
"test_doesnt_cleanup_db_if_any_entity_is_locked",
)
.join("upkeep.log");

let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
let service = AggregatorUpkeepService::new(
Arc::new(main_db_connection().unwrap()),
Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
signed_entity_type_lock.clone(),
TestLogger::file(&log_path),
);

// Separate block to ensure the log is flushed after run
{
signed_entity_type_lock
.lock(SignedEntityTypeDiscriminants::CardanoTransactions)
.await;
service.run().await.expect("Upkeep service failed");
}

let logs = std::fs::read_to_string(&log_path).unwrap();

assert_eq!(
logs.matches(SqliteCleaningTask::Vacuum.log_message())
.count(),
0,
);
assert_eq!(
logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
.count(),
0,
);
}
}
18 changes: 18 additions & 0 deletions mithril-common/src/signed_entity_type_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ impl SignedEntityTypeLock {
locked_entities.remove(&entity_type.into());
}

/// Check if there are any locked signed entities.
pub async fn has_locked_entities(&self) -> bool {
let locked_entities = self.locked_entities.read().await;
!locked_entities.is_empty()
}

/// List only the unlocked signed entities in the given list.
pub async fn filter_unlocked_entries<T: Into<SignedEntityTypeDiscriminants> + Clone>(
&self,
Expand Down Expand Up @@ -189,4 +195,16 @@ mod tests {
vec![SignedEntityTypeDiscriminants::CardanoTransactions]
);
}

#[tokio::test]
async fn has_locked_entities() {
let signed_entity_type_lock = SignedEntityTypeLock::new();

assert!(!signed_entity_type_lock.has_locked_entities().await);

signed_entity_type_lock
.lock(SignedEntityTypeDiscriminants::MithrilStakeDistribution)
.await;
assert!(signed_entity_type_lock.has_locked_entities().await);
}
}
6 changes: 6 additions & 0 deletions mithril-signer/src/database/test_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use std::path::Path;
use mithril_common::StdResult;
use mithril_persistence::sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection};

/// In-memory sqlite database without foreign key support with migrations applied
pub fn main_db_connection() -> StdResult<SqliteConnection> {
let builder = ConnectionBuilder::open_memory();
build_main_db_connection(builder)
}

/// File sqlite database without foreign key support with migrations applied and WAL activated
pub fn main_db_file_connection(db_path: &Path) -> StdResult<SqliteConnection> {
let builder = ConnectionBuilder::open_file(db_path)
Expand Down
1 change: 1 addition & 0 deletions mithril-signer/src/runtime/signer_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> {
let upkeep_service = Arc::new(SignerUpkeepService::new(
sqlite_connection.clone(),
sqlite_connection_cardano_transaction_pool,
signed_entity_type_lock.clone(),
slog_scope::logger(),
));

Expand Down
57 changes: 56 additions & 1 deletion mithril-signer/src/upkeep_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use anyhow::Context;
use async_trait::async_trait;
use slog::{info, Logger};

use mithril_common::signed_entity_type_lock::SignedEntityTypeLock;
use mithril_common::StdResult;
use mithril_persistence::sqlite::{
SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
Expand All @@ -31,6 +32,7 @@ pub trait UpkeepService: Send + Sync {
pub struct SignerUpkeepService {
main_db_connection: Arc<SqliteConnection>,
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
logger: Logger,
}

Expand All @@ -39,16 +41,26 @@ impl SignerUpkeepService {
pub fn new(
main_db_connection: Arc<SqliteConnection>,
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
logger: Logger,
) -> Self {
Self {
main_db_connection,
cardano_tx_connection_pool,
signed_entity_type_lock,
logger,
}
}

async fn upkeep_all_databases(&self) -> StdResult<()> {
if self.signed_entity_type_lock.has_locked_entities().await {
info!(
self.logger,
"UpkeepService::Some entities are locked - Skipping database upkeep"
);
return Ok(());
}

let main_db_connection = self.main_db_connection.clone();
let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
let db_upkeep_logger = self.logger.clone();
Expand Down Expand Up @@ -99,9 +111,13 @@ impl UpkeepService for SignerUpkeepService {

#[cfg(test)]
mod tests {
use mithril_common::entities::SignedEntityTypeDiscriminants;
use mithril_common::test_utils::TempDir;

use crate::database::test_helper::{cardano_tx_db_file_connection, main_db_file_connection};
use crate::database::test_helper::{
cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection,
main_db_file_connection,
};
use crate::test_tools::TestLogger;

use super::*;
Expand All @@ -125,6 +141,7 @@ mod tests {
Arc::new(SqliteConnectionPool::build_from_connection(
cardano_tx_connection,
)),
Arc::new(SignedEntityTypeLock::default()),
TestLogger::file(&log_path),
);

Expand All @@ -148,4 +165,42 @@ mod tests {
"Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup"
);
}

#[tokio::test]
async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
let log_path = TempDir::create(
"signer_upkeep",
"test_doesnt_cleanup_db_if_any_entity_is_locked",
)
.join("upkeep.log");

let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
let service = SignerUpkeepService::new(
Arc::new(main_db_connection().unwrap()),
Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
signed_entity_type_lock.clone(),
TestLogger::file(&log_path),
);

// Separate block to ensure the log is flushed after run
{
signed_entity_type_lock
.lock(SignedEntityTypeDiscriminants::CardanoTransactions)
.await;
service.run().await.expect("Upkeep service failed");
}

let logs = std::fs::read_to_string(&log_path).unwrap();

assert_eq!(
logs.matches(SqliteCleaningTask::Vacuum.log_message())
.count(),
0,
);
assert_eq!(
logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
.count(),
0,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ impl StateMachineTester {
let upkeep_service = Arc::new(SignerUpkeepService::new(
sqlite_connection.clone(),
sqlite_connection_cardano_transaction_pool,
signed_entity_type_lock.clone(),
slog_scope::logger(),
));

Expand Down

0 comments on commit bcb059a

Please sign in to comment.