diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index fa81c9693..26af06fa9 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -590,6 +590,10 @@ pub mod error { fn from(value: MetadataError) -> Self { match value { MetadataError::StreamMetaNotFound(s) => StreamError::StreamNotFound(s), + MetadataError::StandaloneWithDistributed(s) => StreamError::Custom { + msg: s, + status: StatusCode::INTERNAL_SERVER_ERROR, + }, } } } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 5dffbee51..64085e02c 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -288,6 +288,8 @@ pub mod error { pub enum MetadataError { #[error("Metadata for stream {0} not found. Please create the stream and try again")] StreamMetaNotFound(String), + #[error("Metadata Error: {0}")] + StandaloneWithDistributed(String), } #[derive(Debug, thiserror::Error)] diff --git a/server/src/migration.rs b/server/src/migration.rs index e5958fd29..40e7c1340 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -60,6 +60,10 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { let metadata = metadata_migration::v2_v3(storage_metadata); put_remote_metadata(&*object_store, &metadata).await?; } + Some("v3") => { + let mdata = metadata_migration::update_v3(storage_metadata); + put_remote_metadata(&*object_store, &mdata).await?; + } _ => (), } } @@ -75,6 +79,10 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { let metadata = metadata_migration::v2_v3(staging_metadata); put_staging_metadata(config, &metadata)?; } + Some("v3") => { + let mdata = metadata_migration::update_v3(staging_metadata); + put_staging_metadata(config, &mdata)?; + } _ => (), } } @@ -82,6 +90,7 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { Ok(()) } +/// run the migration for all streams pub async fn run_migration(config: &Config) -> anyhow::Result<()> { let storage = config.storage().get_object_store(); let streams = storage.list_streams().await?; diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 36507a28f..814bf2547 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -17,22 +17,54 @@ */ use rand::distributions::DistString; -use serde_json::{Map, Value}; +use serde_json::{Map, Value as JsonValue}; -pub fn v1_v3(mut storage_metadata: serde_json::Value) -> Value { +use crate::option::CONFIG; + +/* +v1 +{ + "version": "v1", + "mode": "drive" + "user": string, + "staging": "string", + "storage": "string", + "deployment_id": "string" + "stream": string, + "default_role": null +} +*/ +pub fn v1_v3(mut storage_metadata: JsonValue) -> JsonValue { let metadata = storage_metadata.as_object_mut().unwrap(); - *metadata.get_mut("version").unwrap() = Value::String("v3".to_string()); + *metadata.get_mut("version").unwrap() = JsonValue::String("v3".to_string()); metadata.remove("user"); metadata.remove("stream"); - metadata.insert("users".to_string(), Value::Array(vec![])); - metadata.insert("streams".to_string(), Value::Array(vec![])); - metadata.insert("roles".to_string(), Value::Array(vec![])); + metadata.insert("users".to_string(), JsonValue::Array(vec![])); + metadata.insert("streams".to_string(), JsonValue::Array(vec![])); + metadata.insert("roles".to_string(), JsonValue::Array(vec![])); + metadata.insert( + "server_mode".to_string(), + JsonValue::String(CONFIG.parseable.mode.to_string()), + ); storage_metadata } -pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value { +/* +v2 +{ + "version": "v2", + "users": [ + { + "role": ["privilege1", "privilege2", ...] + }, + ... + ] + ... +} +*/ +pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue { let metadata = storage_metadata.as_object_mut().unwrap(); - *metadata.get_mut("version").unwrap() = Value::String("v3".to_string()); + *metadata.get_mut("version").unwrap() = JsonValue::String("v3".to_string()); let users = metadata .get_mut("users") .expect("users field is present") @@ -46,7 +78,7 @@ pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value { // user is an object let user = user.as_object_mut().unwrap(); // take out privileges - let Value::Array(privileges) = user.remove("role").expect("role exists for v2") else { + let JsonValue::Array(privileges) = user.remove("role").expect("role exists for v2") else { panic!("privileges is an arrray") }; @@ -55,15 +87,34 @@ pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value { if !privileges.is_empty() { let role_name = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 8); - privileges_map.push((role_name.clone(), Value::Array(privileges))); - roles.push(Value::from(role_name)); + privileges_map.push((role_name.clone(), JsonValue::Array(privileges))); + roles.push(JsonValue::from(role_name)); } user.insert("roles".to_string(), roles.into()); } metadata.insert( "roles".to_string(), - Value::Object(Map::from_iter(privileges_map)), + JsonValue::Object(Map::from_iter(privileges_map)), + ); + metadata.insert( + "server_mode".to_string(), + JsonValue::String(CONFIG.parseable.mode.to_string()), ); storage_metadata } + +// maybe rename +pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue { + let metadata = storage_metadata.as_object_mut().unwrap(); + let sm = metadata.get("server_mode"); + + if sm.is_none() { + metadata.insert( + "server_mode".to_string(), + JsonValue::String(CONFIG.parseable.mode.to_string()), + ); + } + + storage_metadata +} diff --git a/server/src/option.rs b/server/src/option.rs index e24e5e7e5..0607c826c 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -201,11 +201,26 @@ pub enum Mode { impl Mode { pub fn to_str(&self) -> &str { match self { - Mode::Query => "Query Server", - Mode::Ingest => "Ingest Server", + Mode::Query => "Query", + Mode::Ingest => "Ingest", Mode::All => "All", } } + + pub fn from_string(mode: &str) -> Result { + match mode { + "Query" => Ok(Mode::Query), + "Ingest" => Ok(Mode::Ingest), + "All" => Ok(Mode::All), + x => Err(format!("Invalid mode: {}", x)), + } + } +} + +impl ToString for Mode { + fn to_string(&self) -> String { + self.to_str().to_string() + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 3c051ac62..4ad410e34 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -26,6 +26,7 @@ use once_cell::sync::OnceCell; use std::io; use crate::{ + metadata::error::stream_info::MetadataError, option::{Mode, CONFIG, JOIN_COMMUNITY}, rbac::{role::model::DefaultPrivilege, user::User}, storage::ObjectStorageError, @@ -55,6 +56,7 @@ pub struct StorageMetadata { pub deployment_id: uid::Uid, pub users: Vec, pub streams: Vec, + pub server_mode: String, #[serde(default)] pub roles: HashMap>, #[serde(default)] @@ -69,6 +71,7 @@ impl StorageMetadata { staging: CONFIG.staging_dir().to_path_buf(), storage: CONFIG.storage().get_endpoint(), deployment_id: uid::gen(), + server_mode: CONFIG.parseable.mode.to_string(), users: Vec::new(), streams: Vec::new(), roles: HashMap::default(), @@ -100,18 +103,8 @@ pub async fn resolve_parseable_metadata() -> Result { - if staging.deployment_id == remote.deployment_id { - EnvChange::None(remote) - } else { - EnvChange::NewRemote - } - } - (None, Some(remote)) => EnvChange::NewStaging(remote), - (Some(_), None) => EnvChange::NewRemote, - (None, None) => EnvChange::CreateBoth, - }; + // Env Change needs to be updated + let check = determine_environment(staging_metadata, remote_metadata); // flags for if metadata needs to be synced let mut overwrite_staging = false; @@ -121,6 +114,12 @@ pub async fn resolve_parseable_metadata() -> Result { // overwrite staging anyways so that it matches remote in case of any divergence overwrite_staging = true; + if CONFIG.parseable.mode == Mode::All { + standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here")) + .map_err(|err| { + ObjectStorageError::Custom(err.to_string()) + })?; + } Ok(metadata) }, EnvChange::NewRemote => { @@ -134,8 +133,22 @@ pub async fn resolve_parseable_metadata() -> Result overwrite_remote = true, - _ => { + Mode::All => { + standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here")) + .map_err(|err| { + ObjectStorageError::Custom(err.to_string()) + })?; + overwrite_remote = true; + }, + Mode::Query => { + overwrite_remote = true; + metadata.server_mode = CONFIG.parseable.mode.to_string(); + metadata.staging = CONFIG.staging_dir().to_path_buf(); + }, + Mode::Ingest => { + // if ingest server is started fetch the metadata from remote + // update the server mode for local metadata + metadata.server_mode = CONFIG.parseable.mode.to_string(); metadata.staging = CONFIG.staging_dir().to_path_buf(); }, } @@ -173,6 +186,32 @@ pub async fn resolve_parseable_metadata() -> Result, + remote_metadata: Option, +) -> EnvChange { + match (staging_metadata, remote_metadata) { + (Some(staging), Some(remote)) => { + // if both staging and remote have same deployment id + if staging.deployment_id == remote.deployment_id { + EnvChange::None(remote) + } else if Mode::from_string(&remote.server_mode).unwrap() == Mode::All + && (CONFIG.parseable.mode == Mode::Query || CONFIG.parseable.mode == Mode::Ingest) + { + // if you are switching to distributed mode from standalone mode + // it will create a new staging rather than a new remote + EnvChange::NewStaging(remote) + } else { + // it is a new remote + EnvChange::NewRemote + } + } + (None, Some(remote)) => EnvChange::NewStaging(remote), + (Some(_), None) => EnvChange::NewRemote, + (None, None) => EnvChange::CreateBoth, + } +} + // variant contain remote metadata #[derive(Debug, Clone, PartialEq, Eq)] pub enum EnvChange { @@ -187,6 +226,16 @@ pub enum EnvChange { CreateBoth, } +fn standalone_when_distributed(remote_server_mode: Mode) -> Result<(), MetadataError> { + // mode::all -> mode::query | mode::ingest allowed + // but mode::query | mode::ingest -> mode::all not allowed + if remote_server_mode == Mode::Query { + return Err(MetadataError::StandaloneWithDistributed("Starting Standalone Mode is not permitted when Distributed Mode is enabled. Please restart the server with Distributed Mode enabled.".to_string())); + } + + Ok(()) +} + pub fn get_staging_metadata() -> io::Result> { let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); let bytes = match fs::read(path) {