diff --git a/influxdb3_telemetry/src/bucket.rs b/influxdb3_telemetry/src/bucket.rs index a886709b07c..d734e05e0f4 100644 --- a/influxdb3_telemetry/src/bucket.rs +++ b/influxdb3_telemetry/src/bucket.rs @@ -42,13 +42,20 @@ impl EventsBucket { #[derive(Debug, Default)] pub(crate) struct PerMinuteWrites { pub lines: Stats, + pub total_lines: u64, pub size_bytes: Stats, + pub total_size_bytes: u64, } impl PerMinuteWrites { pub fn add_sample(&mut self, num_lines: usize, size_bytes: usize) -> Option<()> { - self.lines.update(num_lines as u64); - self.size_bytes.update(size_bytes as u64)?; + let new_num_lines = num_lines as u64; + self.lines.update(new_num_lines); + self.total_lines += new_num_lines; + + let new_size_bytes = size_bytes as u64; + self.size_bytes.update(new_size_bytes)?; + self.total_size_bytes += new_size_bytes; Some(()) } } @@ -56,11 +63,13 @@ impl PerMinuteWrites { #[derive(Debug, Default)] pub(crate) struct PerMinuteReads { pub num_queries: Stats, + pub total_num_queries: u64, } impl PerMinuteReads { pub fn add_sample(&mut self) -> Option<()> { self.num_queries.update(1); + self.total_num_queries += 1; Some(()) } } diff --git a/influxdb3_telemetry/src/metrics.rs b/influxdb3_telemetry/src/metrics.rs index 6fd324c0234..bd667a0fd7b 100644 --- a/influxdb3_telemetry/src/metrics.rs +++ b/influxdb3_telemetry/src/metrics.rs @@ -6,24 +6,35 @@ use crate::{ #[derive(Debug, Default)] pub(crate) struct Writes { pub lines: RollingStats, + pub total_lines: u64, pub size_bytes: RollingStats, + pub total_size_bytes: u64, // num_writes is just Stats and not RollingStats as we don't // aggregate num_writes at the per minute interval, it can // just be taken from the events bucket. pub num_writes: Stats, + pub total_num_writes: u64, } impl Writes { pub fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> { + let num_writes = events_bucket.num_writes as u64; self.lines.update(&events_bucket.writes.lines); self.size_bytes.update(&events_bucket.writes.size_bytes); - self.num_writes.update(events_bucket.num_writes as u64); + self.num_writes.update(num_writes); + self.total_lines += events_bucket.writes.total_lines; + self.total_size_bytes += events_bucket.writes.total_size_bytes; + self.total_num_writes += num_writes; Some(()) } pub fn reset(&mut self) { self.lines.reset(); self.size_bytes.reset(); + self.num_writes.reset(); + self.total_lines = 0; + self.total_size_bytes = 0; + self.total_num_writes = 0; } } @@ -31,16 +42,19 @@ impl Writes { pub(crate) struct Queries { // We don't aggregate the num_queries at 1 min intervals pub num_queries: Stats, + pub total_num_queries: u64, } impl Queries { pub fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> { self.num_queries.update(events_bucket.num_queries as u64); + self.total_num_queries += events_bucket.queries.total_num_queries; Some(()) } pub fn reset(&mut self) { self.num_queries.reset(); + self.total_num_queries = 0; } } diff --git a/influxdb3_telemetry/src/sampler.rs b/influxdb3_telemetry/src/sampler.rs index 6600415274a..2f3b68f8a93 100644 --- a/influxdb3_telemetry/src/sampler.rs +++ b/influxdb3_telemetry/src/sampler.rs @@ -103,7 +103,7 @@ fn sample_all_metrics(sampler: &mut CpuAndMemorySampler, store: &Arc, telem_sender: &mut Telemetr } // if we tried sending and failed, we currently still reset the // metrics, it is ok to miss few samples - store.reset_metrics(); + store.reset_metrics_1h(); } #[cfg(test)] @@ -183,6 +187,10 @@ mod tests { parquet_file_size_mb: 100.0, parquet_row_count: 100, uptime_secs: 100, + write_requests_sum_1h: 200, + write_lines_sum_1h: 200, + write_mb_sum_1h: 200, + query_requests_sum_1h: 200, } } } diff --git a/influxdb3_telemetry/src/store.rs b/influxdb3_telemetry/src/store.rs index d266e1d61ca..ff23c68e44a 100644 --- a/influxdb3_telemetry/src/store.rs +++ b/influxdb3_telemetry/src/store.rs @@ -92,7 +92,7 @@ impl TelemetryStore { // num::cast probably has had overflow. Best to // reset all metrics to start again warn!("cpu/memory could not be added, resetting metrics"); - inner_store.reset_metrics(); + inner_store.reset_metrics_1h(); }); } @@ -108,14 +108,14 @@ impl TelemetryStore { inner_store.per_minute_events_bucket.update_num_queries(); } - pub(crate) fn rollup_events(&self) { + pub(crate) fn rollup_events_1m(&self) { let mut inner_store = self.inner.lock(); - inner_store.rollup_reads_and_writes(); + inner_store.rollup_reads_and_writes_1m(); } - pub(crate) fn reset_metrics(&self) { + pub(crate) fn reset_metrics_1h(&self) { let mut inner_store = self.inner.lock(); - inner_store.reset_metrics(); + inner_store.reset_metrics_1h(); } pub(crate) fn snapshot(&self) -> TelemetryPayload { @@ -225,30 +225,36 @@ impl TelemetryStoreInner { parquet_file_count: 0, parquet_file_size_mb: 0.0, parquet_row_count: 0, + + // sums over hour + write_requests_sum_1h: self.writes.total_num_writes, + write_lines_sum_1h: self.writes.total_lines, + write_mb_sum_1h: to_mega_bytes(self.writes.total_size_bytes), + query_requests_sum_1h: self.reads.total_num_queries, } } - pub(crate) fn rollup_reads_and_writes(&mut self) { + pub(crate) fn rollup_reads_and_writes_1m(&mut self) { debug!( events_summary = ?self.per_minute_events_bucket, "Rolling up writes/reads" ); - self.rollup_writes(); - self.rollup_reads(); + self.rollup_writes_1m(); + self.rollup_reads_1m(); self.per_minute_events_bucket.reset(); } - fn rollup_writes(&mut self) { + fn rollup_writes_1m(&mut self) { let events_summary = &self.per_minute_events_bucket; self.writes.add_sample(events_summary); } - fn rollup_reads(&mut self) { + fn rollup_reads_1m(&mut self) { let events_summary = &self.per_minute_events_bucket; self.reads.add_sample(events_summary); } - fn reset_metrics(&mut self) { + fn reset_metrics_1h(&mut self) { self.cpu.reset(); self.memory.reset(); self.writes.reset(); @@ -363,7 +369,7 @@ mod tests { store.update_num_queries(); // now rollup reads/writes - store.rollup_events(); + store.rollup_events_1m(); let snapshot = store.snapshot(); info!( snapshot = ?snapshot, @@ -373,14 +379,17 @@ mod tests { assert_eq!(1, snapshot.write_lines_min_1m); assert_eq!(120, snapshot.write_lines_max_1m); assert_eq!(56, snapshot.write_lines_avg_1m); + assert_eq!(222, snapshot.write_lines_sum_1h); assert_eq!(0, snapshot.write_mb_min_1m); assert_eq!(0, snapshot.write_mb_max_1m); assert_eq!(0, snapshot.write_mb_avg_1m); + assert_eq!(0, snapshot.write_mb_sum_1h); assert_eq!(3, snapshot.query_requests_min_1m); assert_eq!(3, snapshot.query_requests_max_1m); assert_eq!(3, snapshot.query_requests_avg_1m); + assert_eq!(3, snapshot.query_requests_sum_1h); // add more writes after rollup store.add_write_metrics(100, 101_024_000); @@ -392,7 +401,7 @@ mod tests { store.update_num_queries(); store.update_num_queries(); - store.rollup_events(); + store.rollup_events_1m(); let snapshot = store.snapshot(); info!( snapshot = ?snapshot, @@ -401,17 +410,20 @@ mod tests { assert_eq!(1, snapshot.write_lines_min_1m); assert_eq!(120, snapshot.write_lines_max_1m); assert_eq!(56, snapshot.write_lines_avg_1m); + assert_eq!(444, snapshot.write_lines_sum_1h); assert_eq!(0, snapshot.write_mb_min_1m); assert_eq!(200, snapshot.write_mb_max_1m); assert_eq!(50, snapshot.write_mb_avg_1m); + assert_eq!(401, snapshot.write_mb_sum_1h); assert_eq!(2, snapshot.query_requests_min_1m); assert_eq!(3, snapshot.query_requests_max_1m); assert_eq!(3, snapshot.query_requests_avg_1m); + assert_eq!(5, snapshot.query_requests_sum_1h); // reset - store.reset_metrics(); + store.reset_metrics_1h(); // check snapshot 3 let snapshot = store.snapshot(); info!(snapshot = ?snapshot, "sample snapshot 3"); @@ -421,6 +433,21 @@ mod tests { assert_eq!(0, snapshot.memory_used_mb_min_1m); assert_eq!(0, snapshot.memory_used_mb_max_1m); assert_eq!(0, snapshot.memory_used_mb_avg_1m); + + assert_eq!(0, snapshot.write_lines_min_1m); + assert_eq!(0, snapshot.write_lines_max_1m); + assert_eq!(0, snapshot.write_lines_avg_1m); + assert_eq!(0, snapshot.write_lines_sum_1h); + + assert_eq!(0, snapshot.write_mb_min_1m); + assert_eq!(0, snapshot.write_mb_max_1m); + assert_eq!(0, snapshot.write_mb_avg_1m); + assert_eq!(0, snapshot.write_mb_sum_1h); + + assert_eq!(0, snapshot.query_requests_min_1m); + assert_eq!(0, snapshot.query_requests_max_1m); + assert_eq!(0, snapshot.query_requests_avg_1m); + assert_eq!(0, snapshot.query_requests_sum_1h); } #[test]