Skip to content

Commit

Permalink
feat: Add cluster info api end point (parseablehq#699)
Browse files Browse the repository at this point in the history
* feat: add endpoint on Query Server to fetch the cluster info

on hitting {url}/api/v1/cluster/info
it would return a json response with the information on the cluster

Return JSON looks like
```
[
    {
        domain_name: "<domain_name>:<port>",
        reachable: bool,
        error: string error message,
        status: string
    },
    .
    .
    .

]

* feat: add auth for /cluster/info endpoint

1. Add Action ListCluster
2. Authorize /cluster/info for ListCluster

* fix: get_objects fetch the proper files

When in S3 Mode, get_objects was fetching all the files 
that contained the pattern "ingester".
  • Loading branch information
Eshanatnight committed Apr 1, 2024
1 parent 8b4b77d commit ae61abd
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 17 deletions.
4 changes: 4 additions & 0 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ pub(crate) fn cross_origin_config() -> Cors {
Cors::default().block_on_origin_mismatch(false)
}
}

pub fn base_path_without_preceding_slash() -> String {
base_path().trim_start_matches('/').to_string()
}
91 changes: 81 additions & 10 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
*/

use crate::handlers::http::logstream::error::StreamError;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{
base_path, base_path_without_preceding_slash, cross_origin_config, API_BASE_PATH, API_VERSION,
};
use crate::rbac::role::Action;
use crate::{analytics, banner, metadata, metrics, migration, rbac, storage};
use actix_web::http::header;
use actix_web::web;
use actix_web::web::ServiceConfig;
use actix_web::{web, Responder};
use actix_web::{App, HttpServer};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -135,11 +139,22 @@ impl QueryServer {
.service(Server::get_user_webscope())
.service(Server::get_llm_webscope())
.service(Server::get_oauth_webscope(oidc_client))
.service(Server::get_user_role_webscope()),
.service(Server::get_user_role_webscope())
.service(Self::get_cluster_info_web_scope()),
)
.service(Server::get_generated());
}

fn get_cluster_info_web_scope() -> actix_web::Scope {
web::scope("/cluster").service(
web::resource("/info").route(
web::get()
.to(Self::get_cluster_info)
.authorize(Action::ListCluster),
),
)
}

// update the .query.json file and return the new IngesterMetadataArr
pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
let store = CONFIG.storage().get_object_store();
Expand All @@ -163,7 +178,7 @@ impl QueryServer {
}

pub async fn check_liveness(domain_name: &str) -> bool {
let uri = Url::parse(&format!("{}{}/liveness", domain_name, base_path())).unwrap();
let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap();

let reqw = reqwest::Client::new()
.get(uri)
Expand All @@ -174,6 +189,38 @@ impl QueryServer {
reqw.is_ok()
}

async fn get_cluster_info() -> Result<impl Responder, StreamError> {
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to get ingester info\n{:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
})?;

let mut infos = vec![];

for ingester in ingester_infos {
let uri = Url::parse(&format!("{}liveness", ingester.domain_name))
.expect("should always be a valid url");

let reqw = reqwest::Client::new()
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

infos.push(ClusterInfo::new(
&ingester.domain_name,
reqw.is_ok(),
reqw.as_ref().err().map(|e| e.to_string()),
reqw.ok().map(|r| r.status().to_string()),
));
}

Ok(actix_web::HttpResponse::Ok().json(infos))
}

/// initialize the server, run migrations as needed and start the server
async fn initialize(&self) -> anyhow::Result<()> {
migration::run_metadata_migration(&CONFIG).await?;
Expand Down Expand Up @@ -254,8 +301,8 @@ impl QueryServer {
for ingester in ingester_infos.iter() {
let url = format!(
"{}{}/logstream/{}",
ingester.domain_name.to_string().trim_end_matches('/'),
base_path(),
ingester.domain_name,
base_path_without_preceding_slash(),
stream_name
);

Expand All @@ -272,8 +319,8 @@ impl QueryServer {
for ingester in ingester_infos {
let url = format!(
"{}{}/logstream/{}",
ingester.domain_name.to_string().trim_end_matches('/'),
base_path(),
ingester.domain_name,
base_path_without_preceding_slash(),
stream_name
);

Expand Down Expand Up @@ -306,8 +353,8 @@ impl QueryServer {
for ingester in ingester_infos {
let url = format!(
"{}{}/logstream/{}/stats",
ingester.domain_name.to_string().trim_end_matches('/'),
base_path(),
ingester.domain_name,
base_path_without_preceding_slash(),
stream_name
);

Expand Down Expand Up @@ -615,3 +662,27 @@ impl StorageStats {
}
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
struct ClusterInfo {
domain_name: String,
reachable: bool,
error: Option<String>, // error message if the ingester is not reachable
status: Option<String>, // status message if the ingester is reachable
}

impl ClusterInfo {
fn new(
domain_name: &str,
reachable: bool,
error: Option<String>,
status: Option<String>,
) -> Self {
Self {
domain_name: domain_name.to_string(),
reachable,
error,
status,
}
}
}
3 changes: 3 additions & 0 deletions server/src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub enum Action {
ListRole,
GetAbout,
QueryLLM,
ListCluster,
All,
}

Expand Down Expand Up @@ -110,6 +111,7 @@ impl RoleBuilder {
| Action::PutAlert
| Action::GetAlert
| Action::All => Permission::Stream(action, self.stream.clone().unwrap()),
Action::ListCluster => Permission::Unit(action),
};
perms.push(perm);
}
Expand Down Expand Up @@ -220,6 +222,7 @@ pub mod model {
Action::GetAlert,
Action::GetAbout,
Action::QueryLLM,
Action::ListCluster,
],
stream: None,
tag: None,
Expand Down
12 changes: 10 additions & 2 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,23 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {

#[inline(always)]
fn schema_path(stream_name: &str) -> RelativePathBuf {
RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME])
match CONFIG.parseable.mode {
Mode::Ingest => {
let (ip, port) = get_address();
let file_name = format!(".ingester.{}.{}{}", ip, port, SCHEMA_FILE_NAME);

RelativePathBuf::from_iter([stream_name, &file_name])
}
Mode::All | Mode::Query => RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]),
}
}

#[inline(always)]
pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
match &CONFIG.parseable.mode {
Mode::Ingest => {
let (ip, port) = get_address();
let file_name = format!("ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME);
let file_name = format!(".ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME);
RelativePathBuf::from_iter([stream_name, &file_name])
}
Mode::Query | Mode::All => {
Expand Down
6 changes: 1 addition & 5 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,7 @@ impl ObjectStorage for S3 {
let mut res = vec![];

while let Some(meta) = list_stream.next().await.transpose()? {
let ingester_file = meta
.location
.filename()
.unwrap_or_default()
.contains("ingester");
let ingester_file = meta.location.filename().unwrap().starts_with("ingester");

if !ingester_file {
continue;
Expand Down

0 comments on commit ae61abd

Please sign in to comment.