Skip to content

Commit

Permalink
refactor(meta): extract code for connecting sql meta store
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 28, 2024
1 parent 521f674 commit f2d63d4
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 90 deletions.
73 changes: 23 additions & 50 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use risingwave_common_service::{MetricsManager, TracingExtractLayer};
use risingwave_meta::barrier::GlobalBarrierManager;
use risingwave_meta::controller::catalog::CatalogController;
use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::controller::IN_MEMORY_STORE;
use risingwave_meta::manager::{MetadataManager, META_NODE_ID};
use risingwave_meta::rpc::election::dummy::DummyElectionClient;
use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
Expand Down Expand Up @@ -128,47 +127,19 @@ pub async fn rpc_serve(
init_session_config: SessionConfig,
shutdown: CancellationToken,
) -> MetaResult<()> {
match meta_store_backend {
let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;

let election_client = match meta_store_backend {
MetaStoreBackend::Mem => {
let dummy_election_client = Arc::new(DummyElectionClient::new(
// Use a dummy election client.
Arc::new(DummyElectionClient::new(
address_info.advertise_addr.clone(),
));
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await?;
rpc_serve_with_store(
SqlMetaStore::new(conn, IN_MEMORY_STORE.to_string()),
dummy_election_client,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
opts,
init_system_params,
init_session_config,
shutdown,
)
.await
))
}
MetaStoreBackend::Sql { endpoint, config } => {
let is_sqlite = DbBackend::Sqlite.is_prefix_of(&endpoint);
let mut options = sea_orm::ConnectOptions::new(endpoint.clone());
options
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connection_timeout_sec))
.idle_timeout(Duration::from_secs(config.idle_timeout_sec))
.acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));

if is_sqlite {
// Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
// here we forcibly specify the number of connections as 1.
options.max_connections(1);
}

let conn = sea_orm::Database::connect(options).await?;
let meta_store_sql = SqlMetaStore::new(conn, endpoint);

MetaStoreBackend::Sql { .. } => {
// Init election client.
let id = address_info.advertise_addr.clone();
let conn = meta_store_sql.conn.clone();
let conn = meta_store_impl.conn.clone();
let election_client: ElectionClientRef = match conn.get_database_backend() {
DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
DbBackend::Postgres => {
Expand All @@ -180,20 +151,22 @@ pub async fn rpc_serve(
};
election_client.init().await?;

rpc_serve_with_store(
meta_store_sql,
election_client,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
opts,
init_system_params,
init_session_config,
shutdown,
)
.await
election_client
}
}
};

rpc_serve_with_store(
meta_store_impl,
election_client,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
opts,
init_system_params,
init_session_config,
shutdown,
)
.await
}

/// Bootstraps the follower or leader service based on the election status.
Expand Down
34 changes: 5 additions & 29 deletions src/meta/src/backup_restore/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage};
use risingwave_common::config::{MetaBackend, MetaStoreConfig, ObjectStoreConfig};
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use sea_orm::DbBackend;

use crate::backup_restore::RestoreOpts;
use crate::controller::{SqlMetaStore, IN_MEMORY_STORE};
use crate::controller::SqlMetaStore;
use crate::MetaStoreBackend;

// Code is copied from src/meta/src/rpc/server.rs. TODO #6482: extract method.
Expand Down Expand Up @@ -53,32 +51,10 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<SqlMetaStore> {
config: MetaStoreConfig::default(),
},
};
match meta_store_backend {
MetaStoreBackend::Mem => {
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap();
Ok(SqlMetaStore::new(conn, IN_MEMORY_STORE.to_string()))
}
MetaStoreBackend::Sql { endpoint, config } => {
let max_connection = if DbBackend::Sqlite.is_prefix_of(&endpoint) {
// Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
// here we forcibly specify the number of connections as 1.
1
} else {
config.max_connections
};
let mut options = sea_orm::ConnectOptions::new(endpoint.clone());
options
.max_connections(max_connection)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connection_timeout_sec))
.idle_timeout(Duration::from_secs(config.idle_timeout_sec))
.acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
let conn = sea_orm::Database::connect(options)
.await
.map_err(|e| BackupError::MetaStorage(e.into()))?;
Ok(SqlMetaStore::new(conn, endpoint))
}
}

SqlMetaStore::connect(meta_store_backend)
.await
.map_err(|e| BackupError::MetaStorage(e.into()))
}

pub async fn get_backup_store(opts: RestoreOpts) -> BackupResult<MetaSnapshotStorageRef> {
Expand Down
45 changes: 35 additions & 10 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::time::Duration;

use anyhow::{anyhow, Context};
use risingwave_common::hash::VnodeCount;
Expand All @@ -31,9 +32,9 @@ use risingwave_pb::catalog::{
PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
PbView,
};
use sea_orm::{DatabaseConnection, ModelTrait};
use sea_orm::{DatabaseConnection, DbBackend, ModelTrait};

use crate::{MetaError, MetaResult};
use crate::{MetaError, MetaResult, MetaStoreBackend};

pub mod catalog;
pub mod cluster;
Expand Down Expand Up @@ -66,18 +67,42 @@ pub struct SqlMetaStore {
pub const IN_MEMORY_STORE: &str = "sqlite::memory:";

impl SqlMetaStore {
pub fn new(conn: DatabaseConnection, endpoint: String) -> Self {
Self { conn, endpoint }
pub async fn connect(backend: MetaStoreBackend) -> Result<Self, sea_orm::DbErr> {
Ok(match backend {
MetaStoreBackend::Mem => {
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await?;
Self {
conn,
endpoint: IN_MEMORY_STORE.to_owned(),
}
}
MetaStoreBackend::Sql { endpoint, config } => {
let is_sqlite = DbBackend::Sqlite.is_prefix_of(&endpoint);
let mut options = sea_orm::ConnectOptions::new(endpoint.clone());
options
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connection_timeout_sec))
.idle_timeout(Duration::from_secs(config.idle_timeout_sec))
.acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));

if is_sqlite {
// Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
// here we forcibly specify the number of connections as 1.
options.max_connections(1);
}

let conn = sea_orm::Database::connect(options).await?;
Self { conn, endpoint }
}
})
}

#[cfg(any(test, feature = "test"))]
pub async fn for_test() -> Self {
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap();
Migrator::up(&conn, None).await.unwrap();
Self {
conn,
endpoint: IN_MEMORY_STORE.to_string(),
}
let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
Migrator::up(&this.conn, None).await.unwrap();
this
}

/// Check whether the cluster, which uses SQL as the backend, is a new cluster.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use rpc::{ElectionClient, ElectionMember};

use crate::manager::MetaOpts;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum MetaStoreBackend {
Mem,
Sql {
Expand Down

0 comments on commit f2d63d4

Please sign in to comment.