Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable Standalone Mode after migration to the Distributed Mode #707

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ pub mod error {
fn from(value: MetadataError) -> Self {
match value {
MetadataError::StreamMetaNotFound(s) => StreamError::StreamNotFound(s),
MetadataError::StandaloneWithDistributed(s) => StreamError::Custom {
msg: s,
status: StatusCode::INTERNAL_SERVER_ERROR,
},
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ pub mod error {
pub enum MetadataError {
#[error("Metadata for stream {0} not found. Please create the stream and try again")]
StreamMetaNotFound(String),
#[error("Metadata Error: {0}")]
StandaloneWithDistributed(String),
}

#[derive(Debug, thiserror::Error)]
Expand Down
9 changes: 9 additions & 0 deletions server/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> {
let metadata = metadata_migration::v2_v3(storage_metadata);
put_remote_metadata(&*object_store, &metadata).await?;
}
Some("v3") => {
let mdata = metadata_migration::update_v3(storage_metadata);
put_remote_metadata(&*object_store, &mdata).await?;
}
_ => (),
}
}
Expand All @@ -75,13 +79,18 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> {
let metadata = metadata_migration::v2_v3(staging_metadata);
put_staging_metadata(config, &metadata)?;
}
Some("v3") => {
let mdata = metadata_migration::update_v3(staging_metadata);
put_staging_metadata(config, &mdata)?;
}
_ => (),
}
}

Ok(())
}

/// run the migration for all streams
pub async fn run_migration(config: &Config) -> anyhow::Result<()> {
let storage = config.storage().get_object_store();
let streams = storage.list_streams().await?;
Expand Down
75 changes: 63 additions & 12 deletions server/src/migration/metadata_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,54 @@
*/

use rand::distributions::DistString;
use serde_json::{Map, Value};
use serde_json::{Map, Value as JsonValue};

pub fn v1_v3(mut storage_metadata: serde_json::Value) -> Value {
use crate::option::CONFIG;

/*
v1
{
"version": "v1",
"mode": "drive"
"user": string,
"staging": "string",
"storage": "string",
"deployment_id": "string"
"stream": string,
"default_role": null
}
*/
pub fn v1_v3(mut storage_metadata: JsonValue) -> JsonValue {
let metadata = storage_metadata.as_object_mut().unwrap();
*metadata.get_mut("version").unwrap() = Value::String("v3".to_string());
*metadata.get_mut("version").unwrap() = JsonValue::String("v3".to_string());
metadata.remove("user");
metadata.remove("stream");
metadata.insert("users".to_string(), Value::Array(vec![]));
metadata.insert("streams".to_string(), Value::Array(vec![]));
metadata.insert("roles".to_string(), Value::Array(vec![]));
metadata.insert("users".to_string(), JsonValue::Array(vec![]));
metadata.insert("streams".to_string(), JsonValue::Array(vec![]));
metadata.insert("roles".to_string(), JsonValue::Array(vec![]));
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
storage_metadata
}

pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value {
/*
v2
{
"version": "v2",
"users": [
{
"role": ["privilege1", "privilege2", ...]
},
...
]
...
}
*/
pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue {
let metadata = storage_metadata.as_object_mut().unwrap();
*metadata.get_mut("version").unwrap() = Value::String("v3".to_string());
*metadata.get_mut("version").unwrap() = JsonValue::String("v3".to_string());
let users = metadata
.get_mut("users")
.expect("users field is present")
Expand All @@ -46,7 +78,7 @@ pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value {
// user is an object
let user = user.as_object_mut().unwrap();
// take out privileges
let Value::Array(privileges) = user.remove("role").expect("role exists for v2") else {
let JsonValue::Array(privileges) = user.remove("role").expect("role exists for v2") else {
panic!("privileges is an arrray")
};

Expand All @@ -55,15 +87,34 @@ pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value {
if !privileges.is_empty() {
let role_name =
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
privileges_map.push((role_name.clone(), Value::Array(privileges)));
roles.push(Value::from(role_name));
privileges_map.push((role_name.clone(), JsonValue::Array(privileges)));
roles.push(JsonValue::from(role_name));
}
user.insert("roles".to_string(), roles.into());
}

metadata.insert(
"roles".to_string(),
Value::Object(Map::from_iter(privileges_map)),
JsonValue::Object(Map::from_iter(privileges_map)),
);
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
storage_metadata
}

// maybe rename
pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue {
let metadata = storage_metadata.as_object_mut().unwrap();
let sm = metadata.get("server_mode");

if sm.is_none() {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
}

storage_metadata
}
19 changes: 17 additions & 2 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,26 @@ pub enum Mode {
impl Mode {
pub fn to_str(&self) -> &str {
match self {
Mode::Query => "Query Server",
Mode::Ingest => "Ingest Server",
Mode::Query => "Query",
Mode::Ingest => "Ingest",
Mode::All => "All",
}
}

pub fn from_string(mode: &str) -> Result<Self, String> {
match mode {
"Query" => Ok(Mode::Query),
"Ingest" => Ok(Mode::Ingest),
"All" => Ok(Mode::All),
x => Err(format!("Invalid mode: {}", x)),
}
}
}

impl ToString for Mode {
fn to_string(&self) -> String {
self.to_str().to_string()
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
Expand Down
77 changes: 63 additions & 14 deletions server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use once_cell::sync::OnceCell;
use std::io;

use crate::{
metadata::error::stream_info::MetadataError,
option::{Mode, CONFIG, JOIN_COMMUNITY},
rbac::{role::model::DefaultPrivilege, user::User},
storage::ObjectStorageError,
Expand Down Expand Up @@ -55,6 +56,7 @@ pub struct StorageMetadata {
pub deployment_id: uid::Uid,
pub users: Vec<User>,
pub streams: Vec<String>,
pub server_mode: String,
#[serde(default)]
pub roles: HashMap<String, Vec<DefaultPrivilege>>,
#[serde(default)]
Expand All @@ -69,6 +71,7 @@ impl StorageMetadata {
staging: CONFIG.staging_dir().to_path_buf(),
storage: CONFIG.storage().get_endpoint(),
deployment_id: uid::gen(),
server_mode: CONFIG.parseable.mode.to_string(),
users: Vec::new(),
streams: Vec::new(),
roles: HashMap::default(),
Expand Down Expand Up @@ -100,18 +103,8 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
let storage = CONFIG.storage().get_object_store();
let remote_metadata = storage.get_metadata().await?;

let check = match (staging_metadata, remote_metadata) {
(Some(staging), Some(remote)) => {
if staging.deployment_id == remote.deployment_id {
EnvChange::None(remote)
} else {
EnvChange::NewRemote
}
}
(None, Some(remote)) => EnvChange::NewStaging(remote),
(Some(_), None) => EnvChange::NewRemote,
(None, None) => EnvChange::CreateBoth,
};
// Env Change needs to be updated
let check = determine_environment(staging_metadata, remote_metadata);

// flags for if metadata needs to be synced
let mut overwrite_staging = false;
Expand All @@ -121,6 +114,12 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
EnvChange::None(metadata) => {
// overwrite staging anyways so that it matches remote in case of any divergence
overwrite_staging = true;
if CONFIG.parseable.mode == Mode::All {
standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
.map_err(|err| {
ObjectStorageError::Custom(err.to_string())
})?;
}
Ok(metadata)
},
EnvChange::NewRemote => {
Expand All @@ -134,8 +133,22 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
// overwrite remote in all and query mode
// because staging dir has changed.
match CONFIG.parseable.mode {
Mode::All | Mode::Query => overwrite_remote = true,
_ => {
Mode::All => {
standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
.map_err(|err| {
ObjectStorageError::Custom(err.to_string())
})?;
overwrite_remote = true;
},
Mode::Query => {
overwrite_remote = true;
metadata.server_mode = CONFIG.parseable.mode.to_string();
metadata.staging = CONFIG.staging_dir().to_path_buf();
},
Mode::Ingest => {
// if ingest server is started fetch the metadata from remote
// update the server mode for local metadata
metadata.server_mode = CONFIG.parseable.mode.to_string();
metadata.staging = CONFIG.staging_dir().to_path_buf();
},
}
Expand Down Expand Up @@ -173,6 +186,32 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
Ok(metadata)
}

fn determine_environment(
staging_metadata: Option<StorageMetadata>,
remote_metadata: Option<StorageMetadata>,
) -> EnvChange {
match (staging_metadata, remote_metadata) {
(Some(staging), Some(remote)) => {
// if both staging and remote have same deployment id
if staging.deployment_id == remote.deployment_id {
EnvChange::None(remote)
} else if Mode::from_string(&remote.server_mode).unwrap() == Mode::All
&& (CONFIG.parseable.mode == Mode::Query || CONFIG.parseable.mode == Mode::Ingest)
{
// if you are switching to distributed mode from standalone mode
// it will create a new staging rather than a new remote
EnvChange::NewStaging(remote)
} else {
// it is a new remote
EnvChange::NewRemote
}
}
(None, Some(remote)) => EnvChange::NewStaging(remote),
(Some(_), None) => EnvChange::NewRemote,
(None, None) => EnvChange::CreateBoth,
}
}

// variant contain remote metadata
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EnvChange {
Expand All @@ -187,6 +226,16 @@ pub enum EnvChange {
CreateBoth,
}

fn standalone_when_distributed(remote_server_mode: Mode) -> Result<(), MetadataError> {
// mode::all -> mode::query | mode::ingest allowed
// but mode::query | mode::ingest -> mode::all not allowed
if remote_server_mode == Mode::Query {
return Err(MetadataError::StandaloneWithDistributed("Starting Standalone Mode is not permitted when Distributed Mode is enabled. Please restart the server with Distributed Mode enabled.".to_string()));
}

Ok(())
}

pub fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
let bytes = match fs::read(path) {
Expand Down
Loading