Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add cluster info api end point #699

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ pub(crate) fn cross_origin_config() -> Cors {
Cors::default().block_on_origin_mismatch(false)
}
}

pub fn base_path_without_preceding_slash() -> String {
base_path().trim_start_matches('/').to_string()
}
91 changes: 81 additions & 10 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
*/

use crate::handlers::http::logstream::error::StreamError;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{
base_path, base_path_without_preceding_slash, cross_origin_config, API_BASE_PATH, API_VERSION,
};
use crate::rbac::role::Action;
use crate::{analytics, banner, metadata, metrics, migration, rbac, storage};
use actix_web::http::header;
use actix_web::web;
use actix_web::web::ServiceConfig;
use actix_web::{web, Responder};
use actix_web::{App, HttpServer};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -135,11 +139,22 @@ impl QueryServer {
.service(Server::get_user_webscope())
.service(Server::get_llm_webscope())
.service(Server::get_oauth_webscope(oidc_client))
.service(Server::get_user_role_webscope()),
.service(Server::get_user_role_webscope())
.service(Self::get_cluster_info_web_scope()),
)
.service(Server::get_generated());
}

fn get_cluster_info_web_scope() -> actix_web::Scope {
web::scope("/cluster").service(
web::resource("/info").route(
web::get()
.to(Self::get_cluster_info)
.authorize(Action::ListCluster),
),
)
}

// update the .query.json file and return the new IngesterMetadataArr
pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
let store = CONFIG.storage().get_object_store();
Expand All @@ -163,7 +178,7 @@ impl QueryServer {
}

pub async fn check_liveness(domain_name: &str) -> bool {
let uri = Url::parse(&format!("{}{}/liveness", domain_name, base_path())).unwrap();
let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap();

let reqw = reqwest::Client::new()
.get(uri)
Expand All @@ -174,6 +189,38 @@ impl QueryServer {
reqw.is_ok()
}

async fn get_cluster_info() -> Result<impl Responder, StreamError> {
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to get ingester info\n{:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
})?;

let mut infos = vec![];

for ingester in ingester_infos {
let uri = Url::parse(&format!("{}liveness", ingester.domain_name))
.expect("should always be a valid url");

let reqw = reqwest::Client::new()
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

infos.push(ClusterInfo::new(
&ingester.domain_name,
reqw.is_ok(),
reqw.as_ref().err().map(|e| e.to_string()),
reqw.ok().map(|r| r.status().to_string()),
));
}

Ok(actix_web::HttpResponse::Ok().json(infos))
}

/// initialize the server, run migrations as needed and start the server
async fn initialize(&self) -> anyhow::Result<()> {
migration::run_metadata_migration(&CONFIG).await?;
Expand Down Expand Up @@ -254,8 +301,8 @@ impl QueryServer {
for ingester in ingester_infos.iter() {
let url = format!(
"{}{}/logstream/{}",
ingester.domain_name.to_string().trim_end_matches('/'),
base_path(),
ingester.domain_name,
base_path_without_preceding_slash(),
stream_name
);

Expand All @@ -272,8 +319,8 @@ impl QueryServer {
for ingester in ingester_infos {
let url = format!(
"{}{}/logstream/{}",
ingester.domain_name.to_string().trim_end_matches('/'),
base_path(),
ingester.domain_name,
base_path_without_preceding_slash(),
stream_name
);

Expand Down Expand Up @@ -306,8 +353,8 @@ impl QueryServer {
for ingester in ingester_infos {
let url = format!(
"{}{}/logstream/{}/stats",
ingester.domain_name.to_string().trim_end_matches('/'),
base_path(),
ingester.domain_name,
base_path_without_preceding_slash(),
stream_name
);

Expand Down Expand Up @@ -615,3 +662,27 @@ impl StorageStats {
}
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
struct ClusterInfo {
domain_name: String,
reachable: bool,
error: Option<String>, // error message if the ingester is not reachable
status: Option<String>, // status message if the ingester is reachable
}

impl ClusterInfo {
fn new(
domain_name: &str,
reachable: bool,
error: Option<String>,
status: Option<String>,
) -> Self {
Self {
domain_name: domain_name.to_string(),
reachable,
error,
status,
}
}
}
3 changes: 3 additions & 0 deletions server/src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum Action {
ListRole,
GetAbout,
QueryLLM,
ListCluster,
All,
}

Expand Down Expand Up @@ -108,6 +109,7 @@ impl RoleBuilder {
| Action::PutAlert
| Action::GetAlert
| Action::All => Permission::Stream(action, self.stream.clone().unwrap()),
Action::ListCluster => Permission::Unit(action),
};
perms.push(perm);
}
Expand Down Expand Up @@ -215,6 +217,7 @@ pub mod model {
Action::GetAlert,
Action::GetAbout,
Action::QueryLLM,
Action::ListCluster,
],
stream: None,
tag: None,
Expand Down
12 changes: 10 additions & 2 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,15 +451,23 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {

#[inline(always)]
fn schema_path(stream_name: &str) -> RelativePathBuf {
RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME])
match CONFIG.parseable.mode {
Mode::Ingest => {
let (ip, port) = get_address();
let file_name = format!(".ingester.{}.{}{}", ip, port, SCHEMA_FILE_NAME);

RelativePathBuf::from_iter([stream_name, &file_name])
}
Mode::All | Mode::Query => RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]),
}
}

#[inline(always)]
pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
match &CONFIG.parseable.mode {
Mode::Ingest => {
let (ip, port) = get_address();
let file_name = format!("ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME);
let file_name = format!(".ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME);
RelativePathBuf::from_iter([stream_name, &file_name])
}
Mode::Query | Mode::All => {
Expand Down
6 changes: 1 addition & 5 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,7 @@ impl ObjectStorage for S3 {
let mut res = vec![];

while let Some(meta) = list_stream.next().await.transpose()? {
let ingester_file = meta
.location
.filename()
.unwrap_or_default()
.contains("ingester");
let ingester_file = meta.location.filename().unwrap().starts_with("ingester");

if !ingester_file {
continue;
Expand Down
Loading