diff --git a/server/src/analytics.rs b/server/src/analytics.rs index e10311469..f4e0988d8 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -18,12 +18,16 @@ */ use crate::about::{current, platform}; -use crate::option::CONFIG; +use crate::handlers::http::cluster::utils::check_liveness; +use crate::handlers::http::{base_path_without_preceding_slash, cluster}; +use crate::option::{Mode, CONFIG}; use crate::storage; use crate::{metadata, stats}; +use actix_web::{web, HttpRequest, Responder}; use chrono::{DateTime, Utc}; use clokwerk::{AsyncScheduler, Interval}; +use http::header; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -56,14 +60,21 @@ pub struct Report { cpu_count: usize, memory_total_bytes: u64, platform: String, - mode: String, + storage_mode: String, + server_mode: String, version: String, commit_hash: String, + active_ingesters: u64, + inactive_ingesters: u64, + stream_count: usize, + total_events_count: u64, + total_json_bytes: u64, + total_parquet_bytes: u64, metrics: HashMap, } impl Report { - pub fn new() -> Self { + pub async fn new() -> Self { let mut upt: f64 = 0.0; if let Ok(uptime) = uptime_lib::get() { upt = uptime.as_secs_f64(); @@ -80,6 +91,7 @@ impl Report { cpu_count = info.cpus().len(); mem_total = info.total_memory(); } + let ingester_metrics = fetch_ingesters_metrics().await; Self { deployment_id: storage::StorageMetadata::global().deployment_id, @@ -90,10 +102,17 @@ impl Report { cpu_count, memory_total_bytes: mem_total, platform: platform().to_string(), - mode: CONFIG.get_storage_mode_string().to_string(), + storage_mode: CONFIG.get_storage_mode_string().to_string(), + server_mode: CONFIG.parseable.mode.to_string(), version: current().released_version.to_string(), commit_hash: current().commit_hash, - metrics: build_metrics(), + active_ingesters: ingester_metrics.0, + inactive_ingesters: ingester_metrics.1, + stream_count: ingester_metrics.2, + total_events_count: ingester_metrics.3, + total_json_bytes: ingester_metrics.4, + total_parquet_bytes: ingester_metrics.5, + metrics: build_metrics().await, } } @@ -103,6 +122,12 @@ impl Report { } } +/// build the node metrics for the node ingester endpoint +pub async fn get_analytics(_: HttpRequest) -> impl Responder { + let json = NodeMetrics::build(); + web::Json(json) +} + fn total_streams() -> usize { metadata::STREAM_INFO.list_streams().len() } @@ -123,25 +148,65 @@ fn total_event_stats() -> (u64, u64, u64) { (total_events, total_json_bytes, total_parquet_bytes) } -fn build_metrics() -> HashMap { +async fn fetch_ingesters_metrics() -> (u64, u64, usize, u64, u64, u64) { + let event_stats = total_event_stats(); + let mut node_metrics = + NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2); + + let mut vec = vec![]; + let mut active_ingesters = 0u64; + let mut offline_ingesters = 0u64; + if CONFIG.parseable.mode == Mode::Query { + // send analytics for ingest servers + + // ingester infos should be valid here, if not some thing is wrong + let ingester_infos = cluster::get_ingester_info().await.unwrap(); + + for im in ingester_infos { + if !check_liveness(&im.domain_name).await { + offline_ingesters += 1; + continue; + } + + let uri = url::Url::parse(&format!( + "{}{}/analytics", + im.domain_name, + base_path_without_preceding_slash() + )) + .expect("Should be a valid URL"); + + let resp = reqwest::Client::new() + .get(uri) + .header(header::AUTHORIZATION, im.token.clone()) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .unwrap(); // should respond + + let data = serde_json::from_slice::(&resp.bytes().await.unwrap()).unwrap(); + vec.push(data); + active_ingesters += 1; + } + + node_metrics.accumulate(&mut vec); + } + + ( + active_ingesters, + offline_ingesters, + node_metrics.stream_count, + node_metrics.total_events_count, + node_metrics.total_json_bytes, + node_metrics.total_parquet_bytes, + ) +} + +async fn build_metrics() -> HashMap { // sysinfo refreshed in previous function // so no need to refresh again let sys = SYS_INFO.lock().unwrap(); let mut metrics = HashMap::new(); - metrics.insert("stream_count".to_string(), total_streams().into()); - - // total_event_stats returns event count, json bytes, parquet bytes in that order - metrics.insert( - "total_events_count".to_string(), - total_event_stats().0.into(), - ); - metrics.insert("total_json_bytes".to_string(), total_event_stats().1.into()); - metrics.insert( - "total_parquet_bytes".to_string(), - total_event_stats().2.into(), - ); - metrics.insert("memory_in_use_bytes".to_string(), sys.used_memory().into()); metrics.insert("memory_free_bytes".to_string(), sys.free_memory().into()); @@ -162,7 +227,7 @@ pub fn init_analytics_scheduler() { scheduler .every(ANALYTICS_SEND_INTERVAL_SECONDS) .run(move || async { - Report::new().send().await; + Report::new().await.send().await; }); tokio::spawn(async move { @@ -172,3 +237,45 @@ pub fn init_analytics_scheduler() { } }); } + +#[derive(Serialize, Deserialize, Default, Debug)] +struct NodeMetrics { + stream_count: usize, + total_events_count: u64, + total_json_bytes: u64, + total_parquet_bytes: u64, +} + +impl NodeMetrics { + fn build() -> Self { + let event_stats = total_event_stats(); + Self { + stream_count: total_streams(), + total_events_count: event_stats.0, + total_json_bytes: event_stats.1, + total_parquet_bytes: event_stats.2, + } + } + + fn new( + stream_count: usize, + total_events_count: u64, + total_json_bytes: u64, + total_parquet_bytes: u64, + ) -> Self { + Self { + stream_count, + total_events_count, + total_json_bytes, + total_parquet_bytes, + } + } + + fn accumulate(&mut self, other: &mut [NodeMetrics]) { + other.iter().for_each(|nm| { + self.total_events_count += nm.total_events_count; + self.total_json_bytes += nm.total_json_bytes; + self.total_parquet_bytes += nm.total_parquet_bytes; + }); + } +} diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 08665160c..3dc952301 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -138,12 +138,24 @@ impl IngestServer { .service(Server::get_query_factory()) .service(Server::get_ingest_factory()) .service(Self::logstream_api()) - .service(Server::get_about_factory()), + .service(Server::get_about_factory()) + .service(Self::analytics_factory()), ) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()); } + fn analytics_factory() -> Scope { + web::scope("/analytics").service( + // GET "/analytics" ==> Get analytics data + web::resource("").route( + web::get() + .to(analytics::get_analytics) + .authorize(Action::GetAnalytics), + ), + ) + } + fn logstream_api() -> Scope { web::scope("/logstream") .service( @@ -298,11 +310,6 @@ impl IngestServer { let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync(); - // all internal data structures populated now. - // start the analytics scheduler if enabled - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler(); - } let app = self.start(prometheus, CONFIG.parseable.openid.clone()); tokio::pin!(app); loop { diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 47b34ad41..6636c925c 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -49,6 +49,7 @@ pub enum Action { ListClusterMetrics, DeleteIngester, All, + GetAnalytics, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -102,7 +103,11 @@ impl RoleBuilder { | Action::CreateStream | Action::DeleteStream | Action::GetStream - | Action::ListStream => Permission::Unit(action), + | Action::ListStream + | Action::ListCluster + | Action::ListClusterMetrics + | Action::DeleteIngester + | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema | Action::GetStats @@ -113,9 +118,6 @@ impl RoleBuilder { | Action::PutAlert | Action::GetAlert | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), - Action::ListCluster => Permission::Unit(action), - Action::ListClusterMetrics => Permission::Unit(action), - Action::DeleteIngester => Permission::Unit(action), }; perms.push(perm); }