Skip to content

Commit

Permalink
Hot tier standalone (#943)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsinhaparseable authored Oct 8, 2024
1 parent 2a2d64f commit 57d50a1
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 113 deletions.
106 changes: 105 additions & 1 deletion server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::ingest::create_stream_if_not_exists;
use super::modal::utils::logstream_utils::create_update_stream;
use crate::alerts::Alerts;
use crate::handlers::STREAM_TYPE_KEY;
use crate::hottier::HotTierManager;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::CONFIG;
Expand Down Expand Up @@ -512,6 +512,110 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
Ok((web::Json(stream_info), StatusCode::OK))
}

pub async fn put_stream_hot_tier(
req: HttpRequest,
body: web::Json<serde_json::Value>,
) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
return Err(StreamError::Custom {
msg: "Hot tier can not be updated for internal stream".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

let body = body.into_inner();
let mut hottier: StreamHotTier = match serde_json::from_value(body) {
Ok(hottier) => hottier,
Err(err) => return Err(StreamError::InvalidHotTierConfig(err)),
};

validator::hot_tier(&hottier.size.to_string())?;

STREAM_INFO.set_hot_tier(&stream_name, true)?;
if let Some(hot_tier_manager) = HotTierManager::global() {
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = Some(existing_hot_tier_used_size.to_string());
hottier.available_size = Some(hottier.size.clone());
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
.await?;
let storage = CONFIG.storage().get_object_store();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.hot_tier_enabled = Some(true);
storage
.put_stream_manifest(&stream_name, &stream_metadata)
.await?;
}

Ok((
format!("hot tier set for stream {stream_name}"),
StatusCode::OK,
))
}

pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

if let Some(hot_tier_manager) = HotTierManager::global() {
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes"));
hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes"));
Ok((web::Json(hot_tier), StatusCode::OK))
} else {
Err(StreamError::Custom {
msg: format!("hot tier not initialised for stream {}", stream_name),
status: (StatusCode::BAD_REQUEST),
})
}
}

pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
return Err(StreamError::Custom {
msg: "Hot tier can not be deleted for internal stream".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.delete_hot_tier(&stream_name).await?;
}
Ok((
format!("hot tier deleted for stream {stream_name}"),
StatusCode::OK,
))
}

pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
if let Ok(stream_exists) =
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await
Expand Down
107 changes: 1 addition & 106 deletions server/src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ use crate::{
logstream::{error::StreamError, get_stats_date},
modal::utils::logstream_utils::create_update_stream,
},
hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION},
hottier::HotTierManager,
metadata::{self, STREAM_INFO},
option::CONFIG,
stats::{self, Stats},
storage::{StorageDir, StreamType},
validator,
};

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
Expand Down Expand Up @@ -218,107 +217,3 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, Strea
let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?;
Ok((web::Json(cache_enabled), StatusCode::OK))
}

pub async fn put_stream_hot_tier(
req: HttpRequest,
body: web::Json<serde_json::Value>,
) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
return Err(StreamError::Custom {
msg: "Hot tier can not be updated for internal stream".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

let body = body.into_inner();
let mut hottier: StreamHotTier = match serde_json::from_value(body) {
Ok(hottier) => hottier,
Err(err) => return Err(StreamError::InvalidHotTierConfig(err)),
};

validator::hot_tier(&hottier.size.to_string())?;

STREAM_INFO.set_hot_tier(&stream_name, true)?;
if let Some(hot_tier_manager) = HotTierManager::global() {
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = Some(existing_hot_tier_used_size.to_string());
hottier.available_size = Some(hottier.size.clone());
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
.await?;
let storage = CONFIG.storage().get_object_store();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.hot_tier_enabled = Some(true);
storage
.put_stream_manifest(&stream_name, &stream_metadata)
.await?;
}

Ok((
format!("hot tier set for stream {stream_name}"),
StatusCode::OK,
))
}

pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

if let Some(hot_tier_manager) = HotTierManager::global() {
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes"));
hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes"));
Ok((web::Json(hot_tier), StatusCode::OK))
} else {
Err(StreamError::Custom {
msg: format!("hot tier not initialised for stream {}", stream_name),
status: (StatusCode::BAD_REQUEST),
})
}
}

pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
return Err(StreamError::Custom {
msg: "Hot tier can not be deleted for internal stream".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.delete_hot_tier(&stream_name).await?;
}
Ok((
format!("hot tier deleted for stream {stream_name}"),
StatusCode::OK,
))
}
6 changes: 3 additions & 3 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,17 @@ impl QueryServer {
// PUT "/logstream/{logstream}/hottier" ==> Set hottier for given logstream
.route(
web::put()
.to(querier_logstream::put_stream_hot_tier)
.to(logstream::put_stream_hot_tier)
.authorize_for_stream(Action::PutHotTierEnabled),
)
.route(
web::get()
.to(querier_logstream::get_stream_hot_tier)
.to(logstream::get_stream_hot_tier)
.authorize_for_stream(Action::GetHotTierEnabled),
)
.route(
web::delete()
.to(querier_logstream::delete_stream_hot_tier)
.to(logstream::delete_stream_hot_tier)
.authorize_for_stream(Action::DeleteHotTierEnabled),
),
),
Expand Down
5 changes: 5 additions & 0 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::handlers::http::users::dashboards;
use crate::handlers::http::users::filters;
use crate::handlers::http::API_BASE_PATH;
use crate::handlers::http::API_VERSION;
use crate::hottier::HotTierManager;
use crate::localcache::LocalCacheManager;
use crate::metrics;
use crate::migration;
Expand Down Expand Up @@ -554,6 +555,10 @@ impl Server {

storage::retention::load_retention_from_global();

if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.download_from_s3()?;
};

let (localsync_handler, mut localsync_outbox, localsync_inbox) =
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
Expand Down
8 changes: 5 additions & 3 deletions server/src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ impl HotTierManager {
pub fn global() -> Option<&'static HotTierManager> {
static INSTANCE: OnceCell<HotTierManager> = OnceCell::new();

let hot_tier_path = CONFIG.parseable.hot_tier_storage_path.as_ref()?;

let hot_tier_path = &CONFIG.parseable.hot_tier_storage_path;
if hot_tier_path.is_none() {
return None;
}
Some(INSTANCE.get_or_init(|| {
let hot_tier_path = hot_tier_path.clone();
let hot_tier_path = hot_tier_path.as_ref().unwrap().clone();
std::fs::create_dir_all(&hot_tier_path).unwrap();
HotTierManager {
filesystem: LocalFileSystem::new(),
Expand Down
9 changes: 9 additions & 0 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ Cloud Native, log analytics platform for modern applications."#,
.exit()
}

if cli.hot_tier_storage_path.is_some() {
create_parseable_cli_command()
.error(
ErrorKind::ValueValidation,
"Cannot use hot tier with local-store subcommand.",
)
.exit()
}

Config {
parseable: cli,
storage: Arc::new(storage),
Expand Down

0 comments on commit 57d50a1

Please sign in to comment.