From afe78b72201f800ab8f7bf18719f6cfd6497fc21 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 10 Sep 2024 15:36:36 +0300 Subject: [PATCH] Fix connection starvation during snapshot recovery --- .../external_node_strategy.rs | 8 +- .../src/external_node/snapshot_recovery.rs | 78 ++++++++++++++++++- 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs b/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs index 317f0b197d8..bdd69214de9 100644 --- a/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs +++ b/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs @@ -76,16 +76,18 @@ impl WiringLayer for ExternalNodeInitStrategyLayer { }); let snapshot_recovery = match self.snapshot_recovery_config { Some(recovery_config) => { + // Add a connection for checking whether the storage is initialized. let recovery_pool = input .master_pool - .get_custom(self.max_postgres_concurrency.get() as u32) + .get_custom(self.max_postgres_concurrency.get() as u32 + 1) .await?; - let recovery = Arc::new(ExternalNodeSnapshotRecovery { + let recovery: Arc = Arc::new(ExternalNodeSnapshotRecovery { client: client.clone(), pool: recovery_pool, + max_concurrency: self.max_postgres_concurrency, recovery_config, app_health, - }) as Arc; + }); Some(recovery) } None => None, diff --git a/core/node/node_storage_init/src/external_node/snapshot_recovery.rs b/core/node/node_storage_init/src/external_node/snapshot_recovery.rs index d9ba60a1bcb..9bc065b939c 100644 --- a/core/node/node_storage_init/src/external_node/snapshot_recovery.rs +++ b/core/node/node_storage_init/src/external_node/snapshot_recovery.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Instant}; +use std::{num::NonZeroUsize, sync::Arc, time::Instant}; use anyhow::Context as _; use tokio::sync::watch; @@ -17,6 +17,7 @@ use crate::{InitializeStorage, SnapshotRecoveryConfig}; pub struct ExternalNodeSnapshotRecovery { pub client: Box>, pub pool: ConnectionPool, + pub max_concurrency: NonZeroUsize, pub recovery_config: SnapshotRecoveryConfig, pub app_health: Arc, } @@ -24,8 +25,17 @@ pub struct ExternalNodeSnapshotRecovery { #[async_trait::async_trait] impl InitializeStorage for ExternalNodeSnapshotRecovery { async fn initialize_storage(&self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { - let pool = self.pool.clone(); tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk"); + + let pool_size = self.pool.max_size() as usize; + if pool_size < self.max_concurrency.get() + 1 { + tracing::error!( + "Connection pool has insufficient number of connections ({pool_size} vs concurrency {} + 1 connection for checks). \ + This will likely lead to pool starvation during recovery.", + self.max_concurrency + ); + } + let object_store_config = self.recovery_config.object_store_config.clone().context( "Snapshot object store must be presented if snapshot recovery is activated", @@ -34,10 +44,13 @@ impl InitializeStorage for ExternalNodeSnapshotRecovery { .create_store() .await?; - let config = SnapshotsApplierConfig::default(); + let config = SnapshotsApplierConfig { + max_concurrency: self.max_concurrency, + ..SnapshotsApplierConfig::default() + }; let mut snapshots_applier_task = SnapshotsApplierTask::new( config, - pool, + self.pool.clone(), Box::new(self.client.clone().for_component("snapshot_recovery")), object_store, ); @@ -80,3 +93,60 @@ impl InitializeStorage for ExternalNodeSnapshotRecovery { Ok(completed) } } + +#[cfg(test)] +mod tests { + use std::future; + + use zksync_types::{ + tokens::{TokenInfo, TokenMetadata}, + Address, L2BlockNumber, + }; + use zksync_web3_decl::client::MockClient; + + use super::*; + + #[tokio::test] + async fn recovery_does_not_starve_pool_connections() { + let pool = ConnectionPool::constrained_test_pool(5).await; + let app_health = Arc::new(AppHealthCheck::new(None, None)); + let client = MockClient::builder(L2::default()) + .method("en_syncTokens", |_number: Option| { + Ok(vec![TokenInfo { + l1_address: Address::repeat_byte(1), + l2_address: Address::repeat_byte(2), + metadata: TokenMetadata { + name: "test".to_string(), + symbol: "TEST".to_string(), + decimals: 18, + }, + }]) + }) + .build(); + let recovery = ExternalNodeSnapshotRecovery { + client: Box::new(client), + pool, + max_concurrency: NonZeroUsize::new(4).unwrap(), + recovery_config: SnapshotRecoveryConfig { + snapshot_l1_batch_override: None, + drop_storage_key_preimages: false, + object_store_config: None, + }, + app_health, + }; + + // Emulate recovery by indefinitely holding onto `max_concurrency` connections. In practice, + // the snapshot applier will release connections eventually, but it may require more time than the connection + // acquisition timeout configured for the DB pool. + for _ in 0..recovery.max_concurrency.get() { + let connection = recovery.pool.connection().await.unwrap(); + tokio::spawn(async move { + future::pending::<()>().await; + drop(connection); + }); + } + + // The only token reported by the mock client isn't recovered + assert!(!recovery.is_initialized().await.unwrap()); + } +}