From cc259d58d183886116a4e5c04942b0a2751a44fb Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 10 Jan 2025 19:42:27 +0530 Subject: [PATCH] refactor and expose ingest server util (#1091) --- src/handlers/http/modal/ingest_server.rs | 165 +++++++++++------------ src/storage/staging.rs | 4 +- 2 files changed, 82 insertions(+), 87 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index dce52a17b..37057966e 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -45,7 +45,6 @@ use crate::{handlers::http::base_path, option::CONFIG}; use actix_web::web; use actix_web::web::resource; use actix_web::Scope; -use anyhow::anyhow; use async_trait::async_trait; use base64::Engine; use bytes::Bytes; @@ -85,14 +84,14 @@ impl ParseableServer for IngestServer { // parseable can't use local storage for persistence when running a distributed setup if CONFIG.get_storage_mode_string() == "Local drive" { return Err(anyhow::Error::msg( - "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", - )); + "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", + )); } // check for querier state. Is it there, or was it there in the past - let parseable_json = self.check_querier_state().await?; + let parseable_json = check_querier_state().await?; // to get the .parseable.json file in staging - self.validate_credentials().await?; + validate_credentials().await?; Ok(parseable_json) } @@ -112,7 +111,7 @@ impl ParseableServer for IngestServer { tokio::spawn(airplane::server()); // set the ingestor metadata - self.set_ingestor_metadata().await?; + set_ingestor_metadata().await?; // Ingestors shouldn't have to deal with OpenId auth flow let app = self.start(prometheus, None); @@ -278,96 +277,92 @@ impl IngestServer { ), ) } +} + +// create the ingestor metadata and put the .ingestor.json file in the object store +pub async fn set_ingestor_metadata() -> anyhow::Result<()> { + let storage_ingestor_metadata = migrate_ingester_metadata().await?; + let store = CONFIG.storage().get_object_store(); + + // find the meta file in staging if not generate new metadata + let resource = INGESTOR_META.clone(); + // use the id that was generated/found in the staging and + // generate the path for the object store + let path = ingestor_metadata_path(None); + + // we are considering that we can always get from object store + if let Some(mut store_data) = storage_ingestor_metadata { + if store_data.domain_name != INGESTOR_META.domain_name { + store_data + .domain_name + .clone_from(&INGESTOR_META.domain_name); + store_data.port.clone_from(&INGESTOR_META.port); - // create the ingestor metadata and put the .ingestor.json file in the object store - async fn set_ingestor_metadata(&self) -> anyhow::Result<()> { - let storage_ingestor_metadata = migrate_ingester_metadata().await?; - let store = CONFIG.storage().get_object_store(); - - // find the meta file in staging if not generate new metadata - let resource = INGESTOR_META.clone(); - // use the id that was generated/found in the staging and - // generate the path for the object store - let path = ingestor_metadata_path(None); - - // we are considering that we can always get from object store - if storage_ingestor_metadata.is_some() { - let mut store_data = storage_ingestor_metadata.unwrap(); - - if store_data.domain_name != INGESTOR_META.domain_name { - store_data - .domain_name - .clone_from(&INGESTOR_META.domain_name); - store_data.port.clone_from(&INGESTOR_META.port); - - let resource = Bytes::from(serde_json::to_vec(&store_data)?); - - // if pushing to object store fails propagate the error - return store - .put_object(&path, resource) - .await - .map_err(|err| anyhow!(err)); - } - } else { - let resource = Bytes::from(serde_json::to_vec(&resource)?); + let resource = Bytes::from(serde_json::to_vec(&store_data)?); + // if pushing to object store fails propagate the error store.put_object(&path, resource).await?; } + } else { + let resource = Bytes::from(serde_json::to_vec(&resource)?); - Ok(()) + store.put_object(&path, resource).await?; } - // check for querier state. Is it there, or was it there in the past - // this should happen before the set the ingestor metadata - async fn check_querier_state(&self) -> anyhow::Result, ObjectStorageError> { - // how do we check for querier state? - // based on the work flow of the system, the querier will always need to start first - // i.e the querier will create the `.parseable.json` file - - let store = CONFIG.storage().get_object_store(); - let path = parseable_json_path(); + Ok(()) +} - let parseable_json = store.get_object(&path).await; - match parseable_json { - Ok(_) => Ok(Some(parseable_json.unwrap())), - Err(_) => Err(ObjectStorageError::Custom( +// check for querier state. Is it there, or was it there in the past +// this should happen before the set the ingestor metadata +async fn check_querier_state() -> anyhow::Result, ObjectStorageError> { + // how do we check for querier state? + // based on the work flow of the system, the querier will always need to start first + // i.e the querier will create the `.parseable.json` file + let parseable_json = CONFIG + .storage() + .get_object_store() + .get_object(&parseable_json_path()) + .await + .map_err(|_| { + ObjectStorageError::Custom( "Query Server has not been started yet. Please start the querier server first." .to_string(), - )), - } - } - - async fn validate_credentials(&self) -> anyhow::Result<()> { - // check if your creds match with others - let store = CONFIG.storage().get_object_store(); - let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); - let ingestor_metadata = store - .get_objects( - Some(&base_path), - Box::new(|file_name| file_name.starts_with("ingestor")), ) - .await?; - if !ingestor_metadata.is_empty() { - let ingestor_metadata_value: Value = - serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json"); - let check = ingestor_metadata_value - .as_object() - .and_then(|meta| meta.get("token")) - .and_then(|token| token.as_str()) - .unwrap(); - - let token = base64::prelude::BASE64_STANDARD.encode(format!( - "{}:{}", - CONFIG.parseable.username, CONFIG.parseable.password - )); - - let token = format!("Basic {}", token); - - if check != token { - return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again.")); - } - } + })?; + + Ok(Some(parseable_json)) +} - Ok(()) +async fn validate_credentials() -> anyhow::Result<()> { + // check if your creds match with others + let store = CONFIG.storage().get_object_store(); + let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); + let ingestor_metadata = store + .get_objects( + Some(&base_path), + Box::new(|file_name| file_name.starts_with("ingestor")), + ) + .await?; + if !ingestor_metadata.is_empty() { + let ingestor_metadata_value: Value = + serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json"); + let check = ingestor_metadata_value + .as_object() + .and_then(|meta| meta.get("token")) + .and_then(|token| token.as_str()) + .unwrap(); + + let token = base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG.parseable.username, CONFIG.parseable.password + )); + + let token = format!("Basic {}", token); + + if check != token { + return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again.")); + } } + + Ok(()) } diff --git a/src/storage/staging.rs b/src/storage/staging.rs index e93aa4bfe..9646949c5 100644 --- a/src/storage/staging.rs +++ b/src/storage/staging.rs @@ -129,7 +129,7 @@ impl StorageDir { let paths = dir .flatten() .map(|file| file.path()) - .filter(|file| file.extension().map_or(false, |ext| ext.eq("arrows"))) + .filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows"))) .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); @@ -199,7 +199,7 @@ impl StorageDir { dir.flatten() .map(|file| file.path()) - .filter(|file| file.extension().map_or(false, |ext| ext.eq("parquet"))) + .filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet"))) .collect() }