Skip to content

Commit

Permalink
update files in distributed mode to use hash
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Apr 16, 2024
1 parent 7e708bb commit bd647f9
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 62 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.12.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"

[build-dependencies]
cargo_toml = "0.15"
Expand Down
27 changes: 20 additions & 7 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,27 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result<impl Responder, PostErr
if check_liveness(&domain_name).await {
return Err(PostError::Invalid(anyhow::anyhow!("Node Online")));
}

let url = Url::parse(&domain_name).unwrap();
let ingestor_meta_filename = ingestor_metadata_path(
url.host_str().unwrap().to_owned(),
url.port().unwrap().to_string(),
)
.to_string();
let object_store = CONFIG.storage().get_object_store();

let ingestor_metadatas = object_store
.get_objects(
Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)),
Box::new(|file_name| file_name.starts_with("ingestor")),
)
.await?;

let ingestor_metadata = ingestor_metadatas
.iter()
.map(|elem| serde_json::from_slice::<IngestorMetadata>(elem).unwrap_or_default())
.collect_vec();

let ingestor_metadata = ingestor_metadata
.iter()
.filter(|elem| elem.domain_name == domain_name)
.collect_vec();

let ingestor_meta_filename =
ingestor_metadata_path(Some(&ingestor_metadata[0].ingestor_id)).to_string();
let msg = match object_store
.try_delete_ingestor_meta(ingestor_meta_filename)
.await
Expand Down
38 changes: 14 additions & 24 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::rbac::role::Action;
use crate::storage;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::object_storage::parseable_json_path;
use crate::storage::staging;
use crate::storage::ObjectStorageError;
use crate::sync;

Expand All @@ -36,24 +37,25 @@ use super::ssl_acceptor::get_ssl_acceptor;
use super::IngestorMetadata;
use super::OpenIdClient;
use super::ParseableServer;
use super::DEFAULT_VERSION;

use crate::utils::get_address;
use actix_web::body::MessageBody;
use actix_web::Scope;
use actix_web::{web, App, HttpServer};
use actix_web_prometheus::PrometheusMetrics;
use async_trait::async_trait;
use base64::Engine;
use itertools::Itertools;
use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use url::Url;

use crate::{
handlers::http::{base_path, cross_origin_config},
option::CONFIG,
};

pub static INGESTOR_META: Lazy<IngestorMetadata> =
Lazy::new(|| staging::get_ingestor_info().expect("dir is readable and writeable"));

#[derive(Default)]
pub struct IngestServer;

Expand Down Expand Up @@ -102,6 +104,7 @@ impl ParseableServer for IngestServer {
/// implement the init method will just invoke the initialize method
async fn init(&self) -> anyhow::Result<()> {
self.validate()?;

// check for querier state. Is it there, or was it there in the past
self.check_querier_state().await?;
// to get the .parseable.json file in staging
Expand Down Expand Up @@ -181,37 +184,23 @@ impl IngestServer {
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
let store = CONFIG.storage().get_object_store();

let sock = get_address();
let path = ingestor_metadata_path(sock.ip().to_string(), sock.port().to_string());
// find the meta file in staging if not generate new metadata
let resource = INGESTOR_META.clone();
// use the id that was generated/found in the staging and
// generate the path for the object store
let path = ingestor_metadata_path(None);

if store.get_object(&path).await.is_ok() {
println!("ingestor metadata already exists");
log::info!("ingestor metadata already exists");
return Ok(());
};

let scheme = CONFIG.parseable.get_scheme();
let resource = IngestorMetadata::new(
sock.port().to_string(),
CONFIG
.parseable
.domain_address
.clone()
.unwrap_or_else(|| {
Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap()
})
.to_string(),
DEFAULT_VERSION.to_string(),
store.get_bucket_name(),
&CONFIG.parseable.username,
&CONFIG.parseable.password,
);

let resource = serde_json::to_string(&resource)
.unwrap()
.try_into_bytes()
.unwrap();

store.put_object(&path, resource).await?;
store.put_object(&path, resource.clone()).await?;

Ok(())
}
Expand Down Expand Up @@ -270,6 +259,7 @@ impl IngestServer {
}

async fn initialize(&self) -> anyhow::Result<()> {
// ! Undefined and Untested behaviour
if let Some(cache_manager) = LocalCacheManager::global() {
cache_manager
.validate(CONFIG.parseable.local_cache_size)
Expand Down
13 changes: 11 additions & 2 deletions server/src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct IngestorMetadata {
pub domain_name: String,
pub bucket_name: String,
pub token: String,
pub ingestor_id: String,
}

impl IngestorMetadata {
Expand All @@ -71,6 +72,7 @@ impl IngestorMetadata {
bucket_name: String,
username: &str,
password: &str,
ingestor_id: String,
) -> Self {
let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password));

Expand All @@ -82,8 +84,13 @@ impl IngestorMetadata {
version,
bucket_name,
token,
ingestor_id,
}
}

pub fn get_ingestor_id(&self) -> String {
self.ingestor_id.clone()
}
}

#[cfg(test)]
Expand All @@ -102,9 +109,10 @@ mod test {
"somebucket".to_string(),
"admin",
"admin",
"ingestor_id".to_string(),
);

let rhs = serde_json::from_slice::<IngestorMetadata>(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"#).unwrap();
let rhs = serde_json::from_slice::<IngestorMetadata>(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id"}"#).unwrap();

assert_eq!(rhs, lhs);
}
Expand All @@ -118,13 +126,14 @@ mod test {
"somebucket".to_string(),
"admin",
"admin",
"ingestor_id".to_string(),
);

let lhs = serde_json::to_string(&im)
.unwrap()
.try_into_bytes()
.unwrap();
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"#
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"#
.try_into_bytes()
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ impl QueryServer {
.service(Server::get_llm_webscope())
.service(Server::get_oauth_webscope(oidc_client))
.service(Server::get_user_role_webscope())
.service(Self::get_cluster_info_web_scope()),
.service(Self::get_cluster_web_scope()),
)
.service(Server::get_generated());
}

fn get_cluster_info_web_scope() -> actix_web::Scope {
fn get_cluster_web_scope() -> actix_web::Scope {
web::scope("/cluster")
.service(
// GET "/cluster/info" ==> Get info of the cluster
Expand Down
27 changes: 16 additions & 11 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::{
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};

use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::option::Mode;
use crate::utils::get_address;
use crate::{
Expand Down Expand Up @@ -257,7 +258,7 @@ pub trait ObjectStorage: Sync + 'static {
let stream_metadata = match self.get_object(&stream_json_path(stream_name)).await {
Ok(data) => data,
Err(_) => {
// ! this is hard coded for now
// get the base stream metadata
let bytes = self
.get_object(&RelativePathBuf::from_iter([
stream_name,
Expand Down Expand Up @@ -538,11 +539,9 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {
fn schema_path(stream_name: &str) -> RelativePathBuf {
match CONFIG.parseable.mode {
Mode::Ingest => {
let addr = get_address();
let file_name = format!(
".ingestor.{}.{}{}",
addr.ip(),
addr.port(),
".ingestor.{}{}",
INGESTOR_META.ingestor_id.clone(),
SCHEMA_FILE_NAME
);

Expand All @@ -558,11 +557,9 @@ fn schema_path(stream_name: &str) -> RelativePathBuf {
pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
match &CONFIG.parseable.mode {
Mode::Ingest => {
let addr = get_address();
let file_name = format!(
".ingestor.{}.{}{}",
addr.ip(),
addr.port(),
".ingestor.{}{}",
INGESTOR_META.get_ingestor_id(),
STREAM_METADATA_FILE_NAME
);
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
Expand All @@ -581,6 +578,7 @@ pub fn parseable_json_path() -> RelativePathBuf {
RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, PARSEABLE_METADATA_FILE_NAME])
}

/// TODO: Needs to be updated for distributed mode
#[inline(always)]
fn alert_json_path(stream_name: &str) -> RelativePathBuf {
RelativePathBuf::from_iter([stream_name, ALERT_FILE_NAME])
Expand All @@ -594,9 +592,16 @@ fn manifest_path(prefix: &str) -> RelativePathBuf {
}

#[inline(always)]
pub fn ingestor_metadata_path(ip: String, port: String) -> RelativePathBuf {
pub fn ingestor_metadata_path(id: Option<&str>) -> RelativePathBuf {
if let Some(id) = id {
return RelativePathBuf::from_iter([
PARSEABLE_ROOT_DIRECTORY,
&format!("ingestor.{}.json", id),
]);
}

RelativePathBuf::from_iter([
PARSEABLE_ROOT_DIRECTORY,
&format!("ingestor.{}.{}.json", ip, port),
&format!("ingestor.{}.json", INGESTOR_META.get_ingestor_id()),
])
}
Loading

0 comments on commit bd647f9

Please sign in to comment.