Skip to content

Commit

Permalink
Disable Standalone Mode after migration to the Distributed Mode (pars…
Browse files Browse the repository at this point in the history
…eablehq#707)

* disable standalone when in distributed mode

impl standalone/distributed check
add migration function - Eshan Chatterjee
update StorageMetadata struct
add StandaloneWithDistributed error in MetadataError
impl ToString for Mode

* fix: double print of error message

* fix: logic to handle server mode if EnvChange None

If the server was restarted with the same staging and in standalone mode
the app was not blocking that case

* Refactor determine_environment function
  • Loading branch information
Eshanatnight committed Apr 1, 2024
1 parent 3d11844 commit 6aba4a5
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 28 deletions.
4 changes: 4 additions & 0 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,10 @@ pub mod error {
fn from(value: MetadataError) -> Self {
match value {
MetadataError::StreamMetaNotFound(s) => StreamError::StreamNotFound(s),
MetadataError::StandaloneWithDistributed(s) => StreamError::Custom {
msg: s,
status: StatusCode::INTERNAL_SERVER_ERROR,
},
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ pub mod error {
pub enum MetadataError {
#[error("Metadata for stream {0} not found. Please create the stream and try again")]
StreamMetaNotFound(String),
#[error("Metadata Error: {0}")]
StandaloneWithDistributed(String),
}

#[derive(Debug, thiserror::Error)]
Expand Down
9 changes: 9 additions & 0 deletions server/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> {
let metadata = metadata_migration::v2_v3(storage_metadata);
put_remote_metadata(&*object_store, &metadata).await?;
}
Some("v3") => {
let mdata = metadata_migration::update_v3(storage_metadata);
put_remote_metadata(&*object_store, &mdata).await?;
}
_ => (),
}
}
Expand All @@ -75,13 +79,18 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> {
let metadata = metadata_migration::v2_v3(staging_metadata);
put_staging_metadata(config, &metadata)?;
}
Some("v3") => {
let mdata = metadata_migration::update_v3(staging_metadata);
put_staging_metadata(config, &mdata)?;
}
_ => (),
}
}

Ok(())
}

/// run the migration for all streams
pub async fn run_migration(config: &Config) -> anyhow::Result<()> {
let storage = config.storage().get_object_store();
let streams = storage.list_streams().await?;
Expand Down
75 changes: 63 additions & 12 deletions server/src/migration/metadata_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,54 @@
*/

use rand::distributions::DistString;
use serde_json::{Map, Value};
use serde_json::{Map, Value as JsonValue};

pub fn v1_v3(mut storage_metadata: serde_json::Value) -> Value {
use crate::option::CONFIG;

/*
v1
{
"version": "v1",
"mode": "drive"
"user": string,
"staging": "string",
"storage": "string",
"deployment_id": "string"
"stream": string,
"default_role": null
}
*/
pub fn v1_v3(mut storage_metadata: JsonValue) -> JsonValue {
let metadata = storage_metadata.as_object_mut().unwrap();
*metadata.get_mut("version").unwrap() = Value::String("v3".to_string());
*metadata.get_mut("version").unwrap() = JsonValue::String("v3".to_string());
metadata.remove("user");
metadata.remove("stream");
metadata.insert("users".to_string(), Value::Array(vec![]));
metadata.insert("streams".to_string(), Value::Array(vec![]));
metadata.insert("roles".to_string(), Value::Array(vec![]));
metadata.insert("users".to_string(), JsonValue::Array(vec![]));
metadata.insert("streams".to_string(), JsonValue::Array(vec![]));
metadata.insert("roles".to_string(), JsonValue::Array(vec![]));
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
storage_metadata
}

pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value {
/*
v2
{
"version": "v2",
"users": [
{
"role": ["privilege1", "privilege2", ...]
},
...
]
...
}
*/
pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue {
let metadata = storage_metadata.as_object_mut().unwrap();
*metadata.get_mut("version").unwrap() = Value::String("v3".to_string());
*metadata.get_mut("version").unwrap() = JsonValue::String("v3".to_string());
let users = metadata
.get_mut("users")
.expect("users field is present")
Expand All @@ -46,7 +78,7 @@ pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value {
// user is an object
let user = user.as_object_mut().unwrap();
// take out privileges
let Value::Array(privileges) = user.remove("role").expect("role exists for v2") else {
let JsonValue::Array(privileges) = user.remove("role").expect("role exists for v2") else {
panic!("privileges is an arrray")
};

Expand All @@ -55,15 +87,34 @@ pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value {
if !privileges.is_empty() {
let role_name =
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
privileges_map.push((role_name.clone(), Value::Array(privileges)));
roles.push(Value::from(role_name));
privileges_map.push((role_name.clone(), JsonValue::Array(privileges)));
roles.push(JsonValue::from(role_name));
}
user.insert("roles".to_string(), roles.into());
}

metadata.insert(
"roles".to_string(),
Value::Object(Map::from_iter(privileges_map)),
JsonValue::Object(Map::from_iter(privileges_map)),
);
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
storage_metadata
}

// maybe rename
pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue {
let metadata = storage_metadata.as_object_mut().unwrap();
let sm = metadata.get("server_mode");

if sm.is_none() {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
}

storage_metadata
}
19 changes: 17 additions & 2 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,26 @@ pub enum Mode {
impl Mode {
pub fn to_str(&self) -> &str {
match self {
Mode::Query => "Query Server",
Mode::Ingest => "Ingest Server",
Mode::Query => "Query",
Mode::Ingest => "Ingest",
Mode::All => "All",
}
}

pub fn from_string(mode: &str) -> Result<Self, String> {
match mode {
"Query" => Ok(Mode::Query),
"Ingest" => Ok(Mode::Ingest),
"All" => Ok(Mode::All),
x => Err(format!("Invalid mode: {}", x)),
}
}
}

impl ToString for Mode {
fn to_string(&self) -> String {
self.to_str().to_string()
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
Expand Down
77 changes: 63 additions & 14 deletions server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use once_cell::sync::OnceCell;
use std::io;

use crate::{
metadata::error::stream_info::MetadataError,
option::{Mode, CONFIG, JOIN_COMMUNITY},
rbac::{role::model::DefaultPrivilege, user::User},
storage::ObjectStorageError,
Expand Down Expand Up @@ -55,6 +56,7 @@ pub struct StorageMetadata {
pub deployment_id: uid::Uid,
pub users: Vec<User>,
pub streams: Vec<String>,
pub server_mode: String,
#[serde(default)]
pub roles: HashMap<String, Vec<DefaultPrivilege>>,
#[serde(default)]
Expand All @@ -69,6 +71,7 @@ impl StorageMetadata {
staging: CONFIG.staging_dir().to_path_buf(),
storage: CONFIG.storage().get_endpoint(),
deployment_id: uid::gen(),
server_mode: CONFIG.parseable.mode.to_string(),
users: Vec::new(),
streams: Vec::new(),
roles: HashMap::default(),
Expand Down Expand Up @@ -100,18 +103,8 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
let storage = CONFIG.storage().get_object_store();
let remote_metadata = storage.get_metadata().await?;

let check = match (staging_metadata, remote_metadata) {
(Some(staging), Some(remote)) => {
if staging.deployment_id == remote.deployment_id {
EnvChange::None(remote)
} else {
EnvChange::NewRemote
}
}
(None, Some(remote)) => EnvChange::NewStaging(remote),
(Some(_), None) => EnvChange::NewRemote,
(None, None) => EnvChange::CreateBoth,
};
// Env Change needs to be updated
let check = determine_environment(staging_metadata, remote_metadata);

// flags for if metadata needs to be synced
let mut overwrite_staging = false;
Expand All @@ -121,6 +114,12 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
EnvChange::None(metadata) => {
// overwrite staging anyways so that it matches remote in case of any divergence
overwrite_staging = true;
if CONFIG.parseable.mode == Mode::All {
standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
.map_err(|err| {
ObjectStorageError::Custom(err.to_string())
})?;
}
Ok(metadata)
},
EnvChange::NewRemote => {
Expand All @@ -134,8 +133,22 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
// overwrite remote in all and query mode
// because staging dir has changed.
match CONFIG.parseable.mode {
Mode::All | Mode::Query => overwrite_remote = true,
_ => {
Mode::All => {
standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
.map_err(|err| {
ObjectStorageError::Custom(err.to_string())
})?;
overwrite_remote = true;
},
Mode::Query => {
overwrite_remote = true;
metadata.server_mode = CONFIG.parseable.mode.to_string();
metadata.staging = CONFIG.staging_dir().to_path_buf();
},
Mode::Ingest => {
// if ingest server is started fetch the metadata from remote
// update the server mode for local metadata
metadata.server_mode = CONFIG.parseable.mode.to_string();
metadata.staging = CONFIG.staging_dir().to_path_buf();
},
}
Expand Down Expand Up @@ -173,6 +186,32 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
Ok(metadata)
}

fn determine_environment(
staging_metadata: Option<StorageMetadata>,
remote_metadata: Option<StorageMetadata>,
) -> EnvChange {
match (staging_metadata, remote_metadata) {
(Some(staging), Some(remote)) => {
// if both staging and remote have same deployment id
if staging.deployment_id == remote.deployment_id {
EnvChange::None(remote)
} else if Mode::from_string(&remote.server_mode).unwrap() == Mode::All
&& (CONFIG.parseable.mode == Mode::Query || CONFIG.parseable.mode == Mode::Ingest)
{
// if you are switching to distributed mode from standalone mode
// it will create a new staging rather than a new remote
EnvChange::NewStaging(remote)
} else {
// it is a new remote
EnvChange::NewRemote
}
}
(None, Some(remote)) => EnvChange::NewStaging(remote),
(Some(_), None) => EnvChange::NewRemote,
(None, None) => EnvChange::CreateBoth,
}
}

// variant contain remote metadata
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EnvChange {
Expand All @@ -187,6 +226,16 @@ pub enum EnvChange {
CreateBoth,
}

fn standalone_when_distributed(remote_server_mode: Mode) -> Result<(), MetadataError> {
// mode::all -> mode::query | mode::ingest allowed
// but mode::query | mode::ingest -> mode::all not allowed
if remote_server_mode == Mode::Query {
return Err(MetadataError::StandaloneWithDistributed("Starting Standalone Mode is not permitted when Distributed Mode is enabled. Please restart the server with Distributed Mode enabled.".to_string()));
}

Ok(())
}

pub fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
let bytes = match fs::read(path) {
Expand Down

0 comments on commit 6aba4a5

Please sign in to comment.