From f2d63d4bc17a40fc245f94bd563213175bd37a51 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 28 Nov 2024 14:00:24 +0800 Subject: [PATCH] refactor(meta): extract code for connecting sql meta store Signed-off-by: Bugen Zhao --- src/meta/node/src/server.rs | 73 +++++++++------------------- src/meta/src/backup_restore/utils.rs | 34 ++----------- src/meta/src/controller/mod.rs | 45 +++++++++++++---- src/meta/src/lib.rs | 2 +- 4 files changed, 64 insertions(+), 90 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 77a2aecc030a..c87096a6aa3c 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -28,7 +28,6 @@ use risingwave_common_service::{MetricsManager, TracingExtractLayer}; use risingwave_meta::barrier::GlobalBarrierManager; use risingwave_meta::controller::catalog::CatalogController; use risingwave_meta::controller::cluster::ClusterController; -use risingwave_meta::controller::IN_MEMORY_STORE; use risingwave_meta::manager::{MetadataManager, META_NODE_ID}; use risingwave_meta::rpc::election::dummy::DummyElectionClient; use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; @@ -128,47 +127,19 @@ pub async fn rpc_serve( init_session_config: SessionConfig, shutdown: CancellationToken, ) -> MetaResult<()> { - match meta_store_backend { + let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?; + + let election_client = match meta_store_backend { MetaStoreBackend::Mem => { - let dummy_election_client = Arc::new(DummyElectionClient::new( + // Use a dummy election client. + Arc::new(DummyElectionClient::new( address_info.advertise_addr.clone(), - )); - let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await?; - rpc_serve_with_store( - SqlMetaStore::new(conn, IN_MEMORY_STORE.to_string()), - dummy_election_client, - address_info, - max_cluster_heartbeat_interval, - lease_interval_secs, - opts, - init_system_params, - init_session_config, - shutdown, - ) - .await + )) } - MetaStoreBackend::Sql { endpoint, config } => { - let is_sqlite = DbBackend::Sqlite.is_prefix_of(&endpoint); - let mut options = sea_orm::ConnectOptions::new(endpoint.clone()); - options - .max_connections(config.max_connections) - .min_connections(config.min_connections) - .connect_timeout(Duration::from_secs(config.connection_timeout_sec)) - .idle_timeout(Duration::from_secs(config.idle_timeout_sec)) - .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec)); - - if is_sqlite { - // Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access, - // here we forcibly specify the number of connections as 1. - options.max_connections(1); - } - - let conn = sea_orm::Database::connect(options).await?; - let meta_store_sql = SqlMetaStore::new(conn, endpoint); - + MetaStoreBackend::Sql { .. } => { // Init election client. let id = address_info.advertise_addr.clone(); - let conn = meta_store_sql.conn.clone(); + let conn = meta_store_impl.conn.clone(); let election_client: ElectionClientRef = match conn.get_database_backend() { DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)), DbBackend::Postgres => { @@ -180,20 +151,22 @@ pub async fn rpc_serve( }; election_client.init().await?; - rpc_serve_with_store( - meta_store_sql, - election_client, - address_info, - max_cluster_heartbeat_interval, - lease_interval_secs, - opts, - init_system_params, - init_session_config, - shutdown, - ) - .await + election_client } - } + }; + + rpc_serve_with_store( + meta_store_impl, + election_client, + address_info, + max_cluster_heartbeat_interval, + lease_interval_secs, + opts, + init_system_params, + init_session_config, + shutdown, + ) + .await } /// Bootstraps the follower or leader service based on the election status. diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index 6dcb5dac8e01..fc2cbd33a131 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -13,17 +13,15 @@ // limitations under the License. use std::sync::Arc; -use std::time::Duration; use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage}; use risingwave_common::config::{MetaBackend, MetaStoreConfig, ObjectStoreConfig}; use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use sea_orm::DbBackend; use crate::backup_restore::RestoreOpts; -use crate::controller::{SqlMetaStore, IN_MEMORY_STORE}; +use crate::controller::SqlMetaStore; use crate::MetaStoreBackend; // Code is copied from src/meta/src/rpc/server.rs. TODO #6482: extract method. @@ -53,32 +51,10 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult { config: MetaStoreConfig::default(), }, }; - match meta_store_backend { - MetaStoreBackend::Mem => { - let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap(); - Ok(SqlMetaStore::new(conn, IN_MEMORY_STORE.to_string())) - } - MetaStoreBackend::Sql { endpoint, config } => { - let max_connection = if DbBackend::Sqlite.is_prefix_of(&endpoint) { - // Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access, - // here we forcibly specify the number of connections as 1. - 1 - } else { - config.max_connections - }; - let mut options = sea_orm::ConnectOptions::new(endpoint.clone()); - options - .max_connections(max_connection) - .min_connections(config.min_connections) - .connect_timeout(Duration::from_secs(config.connection_timeout_sec)) - .idle_timeout(Duration::from_secs(config.idle_timeout_sec)) - .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec)); - let conn = sea_orm::Database::connect(options) - .await - .map_err(|e| BackupError::MetaStorage(e.into()))?; - Ok(SqlMetaStore::new(conn, endpoint)) - } - } + + SqlMetaStore::connect(meta_store_backend) + .await + .map_err(|e| BackupError::MetaStorage(e.into())) } pub async fn get_backup_store(opts: RestoreOpts) -> BackupResult { diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 831a639eac41..677abac7cc3d 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::time::Duration; use anyhow::{anyhow, Context}; use risingwave_common::hash::VnodeCount; @@ -31,9 +32,9 @@ use risingwave_pb::catalog::{ PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable, PbView, }; -use sea_orm::{DatabaseConnection, ModelTrait}; +use sea_orm::{DatabaseConnection, DbBackend, ModelTrait}; -use crate::{MetaError, MetaResult}; +use crate::{MetaError, MetaResult, MetaStoreBackend}; pub mod catalog; pub mod cluster; @@ -66,18 +67,42 @@ pub struct SqlMetaStore { pub const IN_MEMORY_STORE: &str = "sqlite::memory:"; impl SqlMetaStore { - pub fn new(conn: DatabaseConnection, endpoint: String) -> Self { - Self { conn, endpoint } + pub async fn connect(backend: MetaStoreBackend) -> Result { + Ok(match backend { + MetaStoreBackend::Mem => { + let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await?; + Self { + conn, + endpoint: IN_MEMORY_STORE.to_owned(), + } + } + MetaStoreBackend::Sql { endpoint, config } => { + let is_sqlite = DbBackend::Sqlite.is_prefix_of(&endpoint); + let mut options = sea_orm::ConnectOptions::new(endpoint.clone()); + options + .max_connections(config.max_connections) + .min_connections(config.min_connections) + .connect_timeout(Duration::from_secs(config.connection_timeout_sec)) + .idle_timeout(Duration::from_secs(config.idle_timeout_sec)) + .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec)); + + if is_sqlite { + // Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access, + // here we forcibly specify the number of connections as 1. + options.max_connections(1); + } + + let conn = sea_orm::Database::connect(options).await?; + Self { conn, endpoint } + } + }) } #[cfg(any(test, feature = "test"))] pub async fn for_test() -> Self { - let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap(); - Migrator::up(&conn, None).await.unwrap(); - Self { - conn, - endpoint: IN_MEMORY_STORE.to_string(), - } + let this = Self::connect(MetaStoreBackend::Mem).await.unwrap(); + Migrator::up(&this.conn, None).await.unwrap(); + this } /// Check whether the cluster, which uses SQL as the backend, is a new cluster. diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index aa870fa98acc..f496d3e0d25b 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -53,7 +53,7 @@ pub use rpc::{ElectionClient, ElectionMember}; use crate::manager::MetaOpts; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum MetaStoreBackend { Mem, Sql {