Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Sep 26, 2024
1 parent 89ed670 commit 5fc46ab
Show file tree
Hide file tree
Showing 22 changed files with 206 additions and 258 deletions.
52 changes: 6 additions & 46 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,46 +59,6 @@ pub const INTERNAL_STREAM_NAME: &str = "pmeta";

const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);

pub async fn sync_cache_with_ingestors(
url: &str,
ingestor: IngestorMetadata,
body: bool,
) -> Result<(), StreamError> {
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let request_body: Bytes = Bytes::from(body.to_string());
let client = reqwest::Client::new();
let resp = client
.put(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
.body(request_body)
.send()
.await
.map_err(|err| {
// log the error and return a custom error
log::error!(
"Fatal: failed to set cache: {}\n Error: {:?}",
ingestor.domain_name,
err
);
StreamError::Network(err)
})?;

// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
log::error!(
"failed to set cache: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.text().await
);
}

Ok(())
}

// forward the create/update stream request to all ingestors to keep them in sync
pub async fn sync_streams_with_ingestors(
headers: HeaderMap,
Expand All @@ -122,7 +82,7 @@ pub async fn sync_streams_with_ingestors(
continue;
}
let url = format!(
"{}{}/logstream/{}",
"{}{}/logstream/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
Expand Down Expand Up @@ -176,7 +136,7 @@ pub async fn sync_users_with_roles_with_ingestors(
continue;
}
let url = format!(
"{}{}/user/{}/role",
"{}{}/user/{}/role/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -224,7 +184,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
continue;
}
let url = format!(
"{}{}/user/{}",
"{}{}/user/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -285,7 +245,7 @@ pub async fn sync_user_creation_with_ingestors(
continue;
}
let url = format!(
"{}{}/user/{}",
"{}{}/user/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -333,7 +293,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
continue;
}
let url = format!(
"{}{}/user/{}/generate-new-password",
"{}{}/user/{}/generate-new-password/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -389,7 +349,7 @@ pub async fn sync_role_update_with_ingestors(
continue;
}
let url = format!(
"{}{}/role/{}",
"{}{}/role/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
name
Expand Down
7 changes: 2 additions & 5 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
use super::otel;
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;
use super::otel;
use crate::event::{
self,
error::EventError,
format::{self, EventFormat},
};
use crate::handlers::{
LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY,
};
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::{self, STREAM_INFO};
Expand Down Expand Up @@ -141,7 +139,6 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
Ok(HttpResponse::Ok().finish())
}


// Handler for POST /api/v1/logstream/{logstream}
// only ingests events into the specified logstream
// fails if the logstream does not exist
Expand Down
111 changes: 12 additions & 99 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@
use self::error::{CreateStreamError, StreamError};
use super::base_path_without_preceding_slash;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use super::cluster::{
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors,
INTERNAL_STREAM_NAME,
};
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
use super::ingest::create_stream_if_not_exists;
use super::modal::utils::logstream_utils::create_update_stream;
use crate::alerts::Alerts;
use crate::handlers::STREAM_TYPE_KEY;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::hottier::HotTierManager;
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::{Mode, CONFIG};
use crate::option::CONFIG;
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::storage::StreamType;
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
Expand All @@ -47,7 +44,6 @@ use arrow_schema::{Field, Schema};
use bytes::Bytes;
use chrono::Utc;
use http::{HeaderName, HeaderValue};
use itertools::Itertools;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
Expand All @@ -59,7 +55,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

let objectstore = CONFIG.storage().get_object_store();

objectstore.delete_stream(&stream_name).await?;
Expand All @@ -85,7 +81,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {

for ingestor in ingestor_metadata {
let url = format!(
"{}{}/logstream/{}",
"{}{}/logstream/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
Expand Down Expand Up @@ -290,13 +286,8 @@ pub async fn put_retention(
pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

match CONFIG.parseable.mode {
Mode::Ingest | Mode::All => {
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
}
_ => {}
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}

let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?;
Expand All @@ -310,61 +301,11 @@ pub async fn put_enable_cache(
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let storage = CONFIG.storage().get_object_store();

match CONFIG.parseable.mode {
Mode::Query => {
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::from(err)
})?;
for ingestor in ingestor_metadata {
let url = format!(
"{}{}/logstream/{}/cache",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
);

super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?;
}
}
Mode::Ingest => {
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
// here the ingest server has not found the stream
// so it should check if the stream exists in storage
let check = storage
.list_streams()
.await?
.iter()
.map(|stream| stream.name.clone())
.contains(&stream_name);

if !check {
log::error!("Stream {} not found", stream_name.clone());
return Err(StreamError::StreamNotFound(stream_name.clone()));
}
metadata::STREAM_INFO
.upsert_stream_info(
&*storage,
LogStream {
name: stream_name.clone().to_owned(),
},
)
.await
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
}
Mode::All => {
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
}
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
let enable_cache = body.into_inner();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
Expand Down Expand Up @@ -614,34 +555,6 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
Ok((web::Json(stream_info), StatusCode::OK))
}

pub async fn put_stream_hot_tier(
_req: HttpRequest,
_body: web::Json<serde_json::Value>,
) -> Result<(), StreamError> {
Err(StreamError::Custom {
msg: "Hot tier can only be enabled in query mode".to_string(),
status: StatusCode::BAD_REQUEST,
})
}

pub async fn get_stream_hot_tier(
_req: HttpRequest
) -> Result<(), StreamError> {
Err(StreamError::Custom {
msg: "Hot tier can only be enabled in query mode".to_string(),
status: StatusCode::BAD_REQUEST,
})
}

pub async fn delete_stream_hot_tier(
_req: HttpRequest
) -> Result<(), StreamError> {
Err(StreamError::Custom {
msg: "Hot tier can only be enabled in query mode".to_string(),
status: StatusCode::BAD_REQUEST,
})
}

pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
if let Ok(stream_exists) =
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await
Expand Down
16 changes: 12 additions & 4 deletions server/src/handlers/http/modal/ingest/ingester_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ use bytes::Bytes;
use http::StatusCode;
use itertools::Itertools;

use crate::{event, handlers::http::{logstream::error::StreamError, modal::utils::logstream_utils::create_update_stream}, metadata::{self, STREAM_INFO}, option::CONFIG, stats, storage::LogStream};
use crate::{
event,
handlers::http::{
logstream::error::StreamError, modal::utils::logstream_utils::create_update_stream,
},
metadata::{self, STREAM_INFO},
option::CONFIG,
stats,
storage::LogStream,
};

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
Expand Down Expand Up @@ -35,7 +44,6 @@ pub async fn put_enable_cache(
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let storage = CONFIG.storage().get_object_store();


if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
Expand All @@ -61,7 +69,7 @@ pub async fn put_enable_cache(
)
.await
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;

let enable_cache = body.into_inner();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.cache_enabled = enable_cache;
Expand All @@ -85,4 +93,4 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, Strea

let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?;
Ok((web::Json(cache_enabled), StatusCode::OK))
}
}
14 changes: 10 additions & 4 deletions server/src/handlers/http/modal/ingest/ingester_rbac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ use std::collections::HashSet;
use actix_web::{web, Responder};
use tokio::sync::Mutex;

use crate::{handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError}, rbac::{user::{self, User as ParseableUser}, Users}, storage};

use crate::{
handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError},
rbac::{
user::{self, User as ParseableUser},
Users,
},
storage,
};

// async aware lock for updating storage metadata and user map atomicically
static UPDATE_LOCK: Mutex<()> = Mutex::const_new(());
Expand Down Expand Up @@ -87,7 +93,7 @@ pub async fn post_gen_password(username: web::Path<String>) -> Result<impl Respo
let username = username.into_inner();
let mut new_hash = String::default();
let mut metadata = get_metadata().await?;

let _ = storage::put_staging_metadata(&metadata);
if let Some(user) = metadata
.users
Expand All @@ -105,4 +111,4 @@ pub async fn post_gen_password(username: web::Path<String>) -> Result<impl Respo
Users.change_password_hash(&username, &new_hash);

Ok("Updated")
}
}
12 changes: 7 additions & 5 deletions server/src/handlers/http/modal/ingest/ingester_role.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use actix_web::{web, HttpResponse, Responder};
use bytes::Bytes;

use crate::{handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError}, rbac::{map::mut_roles, role::model::DefaultPrivilege}, storage};

use crate::{
handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError},
rbac::{map::mut_roles, role::model::DefaultPrivilege},
storage,
};

// Handler for PUT /api/v1/role/{name}
// Creates a new role or update existing one
Expand All @@ -11,10 +14,9 @@ pub async fn put(name: web::Path<String>, body: Bytes) -> Result<impl Responder,
let privileges = serde_json::from_slice::<Vec<DefaultPrivilege>>(&body)?;
let mut metadata = get_metadata().await?;
metadata.roles.insert(name.clone(), privileges.clone());

let _ = storage::put_staging_metadata(&metadata);
mut_roles().insert(name.clone(), privileges.clone());


Ok(HttpResponse::Ok().finish())
}
}
Loading

0 comments on commit 5fc46ab

Please sign in to comment.