1818 */
1919
2020use crate :: about:: { current, platform} ;
21- use crate :: option:: CONFIG ;
21+ use crate :: handlers:: http:: cluster:: utils:: check_liveness;
22+ use crate :: handlers:: http:: { base_path_without_preceding_slash, cluster} ;
23+ use crate :: option:: { Mode , CONFIG } ;
2224use crate :: storage;
2325use crate :: { metadata, stats} ;
2426
27+ use actix_web:: { web, HttpRequest , Responder } ;
2528use chrono:: { DateTime , Utc } ;
2629use clokwerk:: { AsyncScheduler , Interval } ;
30+ use http:: header;
2731use once_cell:: sync:: Lazy ;
2832use serde:: { Deserialize , Serialize } ;
2933use serde_json:: Value ;
@@ -56,14 +60,21 @@ pub struct Report {
5660 cpu_count : usize ,
5761 memory_total_bytes : u64 ,
5862 platform : String ,
59- mode : String ,
63+ storage_mode : String ,
64+ server_mode : String ,
6065 version : String ,
6166 commit_hash : String ,
67+ active_ingesters : u64 ,
68+ inactive_ingesters : u64 ,
69+ stream_count : usize ,
70+ total_events_count : u64 ,
71+ total_json_bytes : u64 ,
72+ total_parquet_bytes : u64 ,
6273 metrics : HashMap < String , Value > ,
6374}
6475
6576impl Report {
66- pub fn new ( ) -> Self {
77+ pub async fn new ( ) -> Self {
6778 let mut upt: f64 = 0.0 ;
6879 if let Ok ( uptime) = uptime_lib:: get ( ) {
6980 upt = uptime. as_secs_f64 ( ) ;
@@ -80,6 +91,7 @@ impl Report {
8091 cpu_count = info. cpus ( ) . len ( ) ;
8192 mem_total = info. total_memory ( ) ;
8293 }
94+ let ingester_metrics = fetch_ingesters_metrics ( ) . await ;
8395
8496 Self {
8597 deployment_id : storage:: StorageMetadata :: global ( ) . deployment_id ,
@@ -90,10 +102,17 @@ impl Report {
90102 cpu_count,
91103 memory_total_bytes : mem_total,
92104 platform : platform ( ) . to_string ( ) ,
93- mode : CONFIG . get_storage_mode_string ( ) . to_string ( ) ,
105+ storage_mode : CONFIG . get_storage_mode_string ( ) . to_string ( ) ,
106+ server_mode : CONFIG . parseable . mode . to_string ( ) ,
94107 version : current ( ) . released_version . to_string ( ) ,
95108 commit_hash : current ( ) . commit_hash ,
96- metrics : build_metrics ( ) ,
109+ active_ingesters : ingester_metrics. 0 ,
110+ inactive_ingesters : ingester_metrics. 1 ,
111+ stream_count : ingester_metrics. 2 ,
112+ total_events_count : ingester_metrics. 3 ,
113+ total_json_bytes : ingester_metrics. 4 ,
114+ total_parquet_bytes : ingester_metrics. 5 ,
115+ metrics : build_metrics ( ) . await ,
97116 }
98117 }
99118
@@ -103,6 +122,12 @@ impl Report {
103122 }
104123}
105124
125+ /// build the node metrics for the node ingester endpoint
126+ pub async fn get_analytics ( _: HttpRequest ) -> impl Responder {
127+ let json = NodeMetrics :: build ( ) ;
128+ web:: Json ( json)
129+ }
130+
106131fn total_streams ( ) -> usize {
107132 metadata:: STREAM_INFO . list_streams ( ) . len ( )
108133}
@@ -123,25 +148,65 @@ fn total_event_stats() -> (u64, u64, u64) {
123148 ( total_events, total_json_bytes, total_parquet_bytes)
124149}
125150
126- fn build_metrics ( ) -> HashMap < String , Value > {
151+ async fn fetch_ingesters_metrics ( ) -> ( u64 , u64 , usize , u64 , u64 , u64 ) {
152+ let event_stats = total_event_stats ( ) ;
153+ let mut node_metrics =
154+ NodeMetrics :: new ( total_streams ( ) , event_stats. 0 , event_stats. 1 , event_stats. 2 ) ;
155+
156+ let mut vec = vec ! [ ] ;
157+ let mut active_ingesters = 0u64 ;
158+ let mut offline_ingesters = 0u64 ;
159+ if CONFIG . parseable . mode == Mode :: Query {
160+ // send analytics for ingest servers
161+
162+ // ingester infos should be valid here, if not some thing is wrong
163+ let ingester_infos = cluster:: get_ingester_info ( ) . await . unwrap ( ) ;
164+
165+ for im in ingester_infos {
166+ if !check_liveness ( & im. domain_name ) . await {
167+ offline_ingesters += 1 ;
168+ continue ;
169+ }
170+
171+ let uri = url:: Url :: parse ( & format ! (
172+ "{}{}/analytics" ,
173+ im. domain_name,
174+ base_path_without_preceding_slash( )
175+ ) )
176+ . expect ( "Should be a valid URL" ) ;
177+
178+ let resp = reqwest:: Client :: new ( )
179+ . get ( uri)
180+ . header ( header:: AUTHORIZATION , im. token . clone ( ) )
181+ . header ( header:: CONTENT_TYPE , "application/json" )
182+ . send ( )
183+ . await
184+ . unwrap ( ) ; // should respond
185+
186+ let data = serde_json:: from_slice :: < NodeMetrics > ( & resp. bytes ( ) . await . unwrap ( ) ) . unwrap ( ) ;
187+ vec. push ( data) ;
188+ active_ingesters += 1 ;
189+ }
190+
191+ node_metrics. accumulate ( & mut vec) ;
192+ }
193+
194+ (
195+ active_ingesters,
196+ offline_ingesters,
197+ node_metrics. stream_count ,
198+ node_metrics. total_events_count ,
199+ node_metrics. total_json_bytes ,
200+ node_metrics. total_parquet_bytes ,
201+ )
202+ }
203+
204+ async fn build_metrics ( ) -> HashMap < String , Value > {
127205 // sysinfo refreshed in previous function
128206 // so no need to refresh again
129207 let sys = SYS_INFO . lock ( ) . unwrap ( ) ;
130208
131209 let mut metrics = HashMap :: new ( ) ;
132- metrics. insert ( "stream_count" . to_string ( ) , total_streams ( ) . into ( ) ) ;
133-
134- // total_event_stats returns event count, json bytes, parquet bytes in that order
135- metrics. insert (
136- "total_events_count" . to_string ( ) ,
137- total_event_stats ( ) . 0 . into ( ) ,
138- ) ;
139- metrics. insert ( "total_json_bytes" . to_string ( ) , total_event_stats ( ) . 1 . into ( ) ) ;
140- metrics. insert (
141- "total_parquet_bytes" . to_string ( ) ,
142- total_event_stats ( ) . 2 . into ( ) ,
143- ) ;
144-
145210 metrics. insert ( "memory_in_use_bytes" . to_string ( ) , sys. used_memory ( ) . into ( ) ) ;
146211 metrics. insert ( "memory_free_bytes" . to_string ( ) , sys. free_memory ( ) . into ( ) ) ;
147212
@@ -162,7 +227,7 @@ pub fn init_analytics_scheduler() {
162227 scheduler
163228 . every ( ANALYTICS_SEND_INTERVAL_SECONDS )
164229 . run ( move || async {
165- Report :: new ( ) . send ( ) . await ;
230+ Report :: new ( ) . await . send ( ) . await ;
166231 } ) ;
167232
168233 tokio:: spawn ( async move {
@@ -172,3 +237,45 @@ pub fn init_analytics_scheduler() {
172237 }
173238 } ) ;
174239}
240+
241+ #[ derive( Serialize , Deserialize , Default , Debug ) ]
242+ struct NodeMetrics {
243+ stream_count : usize ,
244+ total_events_count : u64 ,
245+ total_json_bytes : u64 ,
246+ total_parquet_bytes : u64 ,
247+ }
248+
249+ impl NodeMetrics {
250+ fn build ( ) -> Self {
251+ let event_stats = total_event_stats ( ) ;
252+ Self {
253+ stream_count : total_streams ( ) ,
254+ total_events_count : event_stats. 0 ,
255+ total_json_bytes : event_stats. 1 ,
256+ total_parquet_bytes : event_stats. 2 ,
257+ }
258+ }
259+
260+ fn new (
261+ stream_count : usize ,
262+ total_events_count : u64 ,
263+ total_json_bytes : u64 ,
264+ total_parquet_bytes : u64 ,
265+ ) -> Self {
266+ Self {
267+ stream_count,
268+ total_events_count,
269+ total_json_bytes,
270+ total_parquet_bytes,
271+ }
272+ }
273+
274+ fn accumulate ( & mut self , other : & mut [ NodeMetrics ] ) {
275+ other. iter ( ) . for_each ( |nm| {
276+ self . total_events_count += nm. total_events_count ;
277+ self . total_json_bytes += nm. total_json_bytes ;
278+ self . total_parquet_bytes += nm. total_parquet_bytes ;
279+ } ) ;
280+ }
281+ }
0 commit comments