Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): extract code for connecting sql meta store #19603

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;

/// Default to bypass cluster limits iff in debug mode.
const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);

#[serde_as]
/// This is the Session Config of RisingWave.
#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
Expand Down Expand Up @@ -306,7 +309,7 @@ pub struct SessionConfig {
/// Bypass checks on cluster limits
///
/// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
#[parameter(default = false)]
#[parameter(default = BYPASS_CLUSTER_LIMITS)]
bypass_cluster_limits: bool,

/// The maximum number of parallelism a streaming query can use. Defaults to 256.
Expand Down
4 changes: 0 additions & 4 deletions src/config/ci-sim.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,3 @@ max_concurrent_creating_streaming_jobs = 0

[meta]
meta_leader_lease_secs = 10

[meta.developer]
meta_actor_cnt_per_worker_parallelism_soft_limit = 65536
meta_actor_cnt_per_worker_parallelism_hard_limit = 65536
73 changes: 23 additions & 50 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use risingwave_common_service::{MetricsManager, TracingExtractLayer};
use risingwave_meta::barrier::GlobalBarrierManager;
use risingwave_meta::controller::catalog::CatalogController;
use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::controller::IN_MEMORY_STORE;
use risingwave_meta::manager::{MetadataManager, META_NODE_ID};
use risingwave_meta::rpc::election::dummy::DummyElectionClient;
use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
Expand Down Expand Up @@ -128,47 +127,19 @@ pub async fn rpc_serve(
init_session_config: SessionConfig,
shutdown: CancellationToken,
) -> MetaResult<()> {
match meta_store_backend {
let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;

let election_client = match meta_store_backend {
MetaStoreBackend::Mem => {
let dummy_election_client = Arc::new(DummyElectionClient::new(
// Use a dummy election client.
Arc::new(DummyElectionClient::new(
address_info.advertise_addr.clone(),
));
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await?;
rpc_serve_with_store(
SqlMetaStore::new(conn, IN_MEMORY_STORE.to_string()),
dummy_election_client,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
opts,
init_system_params,
init_session_config,
shutdown,
)
.await
))
}
MetaStoreBackend::Sql { endpoint, config } => {
let is_sqlite = DbBackend::Sqlite.is_prefix_of(&endpoint);
let mut options = sea_orm::ConnectOptions::new(endpoint.clone());
options
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connection_timeout_sec))
.idle_timeout(Duration::from_secs(config.idle_timeout_sec))
.acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));

if is_sqlite {
// Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
// here we forcibly specify the number of connections as 1.
options.max_connections(1);
}

let conn = sea_orm::Database::connect(options).await?;
let meta_store_sql = SqlMetaStore::new(conn, endpoint);

MetaStoreBackend::Sql { .. } => {
// Init election client.
let id = address_info.advertise_addr.clone();
let conn = meta_store_sql.conn.clone();
let conn = meta_store_impl.conn.clone();
let election_client: ElectionClientRef = match conn.get_database_backend() {
DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
DbBackend::Postgres => {
Expand All @@ -180,20 +151,22 @@ pub async fn rpc_serve(
};
election_client.init().await?;

rpc_serve_with_store(
meta_store_sql,
election_client,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
opts,
init_system_params,
init_session_config,
shutdown,
)
.await
election_client
}
}
};

rpc_serve_with_store(
meta_store_impl,
election_client,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
opts,
init_system_params,
init_session_config,
shutdown,
)
.await
}

/// Bootstraps the follower or leader service based on the election status.
Expand Down
34 changes: 5 additions & 29 deletions src/meta/src/backup_restore/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage};
use risingwave_common::config::{MetaBackend, MetaStoreConfig, ObjectStoreConfig};
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use sea_orm::DbBackend;

use crate::backup_restore::RestoreOpts;
use crate::controller::{SqlMetaStore, IN_MEMORY_STORE};
use crate::controller::SqlMetaStore;
use crate::MetaStoreBackend;

// Code is copied from src/meta/src/rpc/server.rs. TODO #6482: extract method.
Expand Down Expand Up @@ -53,32 +51,10 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<SqlMetaStore> {
config: MetaStoreConfig::default(),
},
};
match meta_store_backend {
MetaStoreBackend::Mem => {
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap();
Ok(SqlMetaStore::new(conn, IN_MEMORY_STORE.to_string()))
}
MetaStoreBackend::Sql { endpoint, config } => {
let max_connection = if DbBackend::Sqlite.is_prefix_of(&endpoint) {
// Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
// here we forcibly specify the number of connections as 1.
1
} else {
config.max_connections
};
let mut options = sea_orm::ConnectOptions::new(endpoint.clone());
options
.max_connections(max_connection)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connection_timeout_sec))
.idle_timeout(Duration::from_secs(config.idle_timeout_sec))
.acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
let conn = sea_orm::Database::connect(options)
.await
.map_err(|e| BackupError::MetaStorage(e.into()))?;
Ok(SqlMetaStore::new(conn, endpoint))
}
}

SqlMetaStore::connect(meta_store_backend)
.await
.map_err(|e| BackupError::MetaStorage(e.into()))
}

pub async fn get_backup_store(opts: RestoreOpts) -> BackupResult<MetaSnapshotStorageRef> {
Expand Down
46 changes: 36 additions & 10 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::time::Duration;

use anyhow::{anyhow, Context};
use risingwave_common::hash::VnodeCount;
Expand All @@ -31,9 +32,9 @@ use risingwave_pb::catalog::{
PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
PbView,
};
use sea_orm::{DatabaseConnection, ModelTrait};
use sea_orm::{DatabaseConnection, DbBackend, ModelTrait};

use crate::{MetaError, MetaResult};
use crate::{MetaError, MetaResult, MetaStoreBackend};

pub mod catalog;
pub mod cluster;
Expand Down Expand Up @@ -66,18 +67,43 @@ pub struct SqlMetaStore {
pub const IN_MEMORY_STORE: &str = "sqlite::memory:";

impl SqlMetaStore {
pub fn new(conn: DatabaseConnection, endpoint: String) -> Self {
Self { conn, endpoint }
/// Connect to the SQL meta store based on the given configuration.
pub async fn connect(backend: MetaStoreBackend) -> Result<Self, sea_orm::DbErr> {
Ok(match backend {
MetaStoreBackend::Mem => {
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await?;
Self {
conn,
endpoint: IN_MEMORY_STORE.to_owned(),
}
}
MetaStoreBackend::Sql { endpoint, config } => {
let is_sqlite = DbBackend::Sqlite.is_prefix_of(&endpoint);
let mut options = sea_orm::ConnectOptions::new(endpoint.clone());
options
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connection_timeout_sec))
.idle_timeout(Duration::from_secs(config.idle_timeout_sec))
.acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));

if is_sqlite {
// Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
// here we forcibly specify the number of connections as 1.
options.max_connections(1);
}

let conn = sea_orm::Database::connect(options).await?;
Self { conn, endpoint }
}
})
}

#[cfg(any(test, feature = "test"))]
pub async fn for_test() -> Self {
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap();
Migrator::up(&conn, None).await.unwrap();
Self {
conn,
endpoint: IN_MEMORY_STORE.to_string(),
}
let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
Migrator::up(&this.conn, None).await.unwrap();
this
}

/// Check whether the cluster, which uses SQL as the backend, is a new cluster.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use rpc::{ElectionClient, ElectionMember};

use crate::manager::MetaOpts;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum MetaStoreBackend {
Mem,
Sql {
Expand Down