Skip to content

Commit

Permalink
feat: add endpoint on Query Server to fetch the cluster info
Browse files Browse the repository at this point in the history
on hitting {url}/api/v1/cluster/info
it would return a json response with the information on the cluster

Return JSON looks like
```
[
    {
        domain_name: "<domain_name>:<port>",
        reachable: bool,
        error: string error message,
        status: string
    },
    .
    .
    .
}
  • Loading branch information
Eshanatnight committed Mar 11, 2024
1 parent b663831 commit d06370c
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 11 deletions.
4 changes: 4 additions & 0 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ pub(crate) fn cross_origin_config() -> Cors {
Cors::default().block_on_origin_mismatch(false)
}
}

pub fn base_path_without_preceding_slash() -> String {
base_path().trim_start_matches('/').to_string()
}
84 changes: 73 additions & 11 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
*/

use crate::handlers::http::logstream::error::StreamError;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
use crate::handlers::http::{base_path, base_path_without_preceding_slash, cross_origin_config, API_BASE_PATH, API_VERSION};
use crate::{analytics, banner, metadata, metrics, migration, rbac, storage};
use actix_web::http::header;
use actix_web::web;
use actix_web::web::ServiceConfig;
use actix_web::{web, Responder};
use actix_web::{App, HttpServer};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -135,11 +135,17 @@ impl QueryServer {
.service(Server::get_user_webscope())
.service(Server::get_llm_webscope())
.service(Server::get_oauth_webscope(oidc_client))
.service(Server::get_user_role_webscope()),
)
.service(Server::get_user_role_webscope())
.service(Self::get_cluster_info_web_scope()),
)
.service(Server::get_generated());
}

fn get_cluster_info_web_scope() -> actix_web::Scope {
web::scope("/cluster")
.service(web::resource("/info").route(web::get().to(Self::get_cluster_info)))
}

// update the .query.json file and return the new IngesterMetadataArr
pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
let store = CONFIG.storage().get_object_store();
Expand All @@ -163,7 +169,7 @@ impl QueryServer {
}

pub async fn check_liveness(domain_name: &str) -> bool {
let uri = Url::parse(&format!("{}{}/liveness", domain_name, base_path())).unwrap();
let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap();

let reqw = reqwest::Client::new()
.get(uri)
Expand All @@ -174,6 +180,38 @@ impl QueryServer {
reqw.is_ok()
}

async fn get_cluster_info() -> Result<impl Responder, StreamError> {
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to get ingester info\n{:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
})?;

let mut infos = vec![];

for ingester in ingester_infos {
let uri = Url::parse(&format!("{}liveness", ingester.domain_name))
.expect("should always be a valid url");

let reqw = reqwest::Client::new()
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

infos.push(ClusterInfo::new(
&ingester.domain_name,
reqw.is_ok(),
reqw.as_ref().err().map(|e| e.to_string()),
reqw.ok().map(|r| r.status().to_string()),
));
}

Ok(actix_web::HttpResponse::Ok().json(infos))
}

/// initialize the server, run migrations as needed and start the server
async fn initialize(&self) -> anyhow::Result<()> {
migration::run_metadata_migration(&CONFIG).await?;
Expand Down Expand Up @@ -254,8 +292,8 @@ impl QueryServer {
for ingester in ingester_infos.iter() {
let url = format!(
"{}{}/logstream/{}",
ingester.domain_name.to_string().trim_end_matches('/'),
base_path(),
ingester.domain_name,
base_path_without_preceding_slash(),
stream_name
);

Expand All @@ -272,8 +310,8 @@ impl QueryServer {
for ingester in ingester_infos {
let url = format!(
"{}{}/logstream/{}",
ingester.domain_name.to_string().trim_end_matches('/'),
base_path(),
ingester.domain_name,
base_path_without_preceding_slash(),
stream_name
);

Expand Down Expand Up @@ -306,8 +344,8 @@ impl QueryServer {
for ingester in ingester_infos {
let url = format!(
"{}{}/logstream/{}/stats",
ingester.domain_name.to_string().trim_end_matches('/'),
base_path(),
ingester.domain_name,
base_path_without_preceding_slash(),
stream_name
);

Expand Down Expand Up @@ -615,3 +653,27 @@ impl StorageStats {
}
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
struct ClusterInfo {
domain_name: String,
reachable: bool,
error: Option<String>, // error message if the ingester is not reachable
status: Option<String>, // status message if the ingester is reachable
}

impl ClusterInfo {
fn new(
domain_name: &str,
reachable: bool,
error: Option<String>,
status: Option<String>,
) -> Self {
Self {
domain_name: domain_name.to_string(),
reachable,
error,
status,
}
}
}

0 comments on commit d06370c

Please sign in to comment.