Skip to content

Commit

Permalink
feat: 1h sum fields for telem writes/queries
Browse files Browse the repository at this point in the history
closes: #25713
  • Loading branch information
praveen-influx committed Jan 10, 2025
1 parent 0da0785 commit a99e68f
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 19 deletions.
13 changes: 11 additions & 2 deletions influxdb3_telemetry/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,34 @@ impl EventsBucket {
#[derive(Debug, Default)]
pub(crate) struct PerMinuteWrites {
pub lines: Stats<u64>,
pub total_lines: u64,
pub size_bytes: Stats<u64>,
pub total_size_bytes: u64,
}

impl PerMinuteWrites {
pub fn add_sample(&mut self, num_lines: usize, size_bytes: usize) -> Option<()> {
self.lines.update(num_lines as u64);
self.size_bytes.update(size_bytes as u64)?;
let new_num_lines = num_lines as u64;
self.lines.update(new_num_lines);
self.total_lines += new_num_lines;

let new_size_bytes = size_bytes as u64;
self.size_bytes.update(new_size_bytes)?;
self.total_size_bytes += new_size_bytes;
Some(())
}
}

#[derive(Debug, Default)]
pub(crate) struct PerMinuteReads {
pub num_queries: Stats<u64>,
pub total_num_queries: u64,
}

impl PerMinuteReads {
pub fn add_sample(&mut self) -> Option<()> {
self.num_queries.update(1);
self.total_num_queries += 1;
Some(())
}
}
Expand Down
16 changes: 15 additions & 1 deletion influxdb3_telemetry/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,55 @@ use crate::{
#[derive(Debug, Default)]
pub(crate) struct Writes {
pub lines: RollingStats<u64>,
pub total_lines: u64,
pub size_bytes: RollingStats<u64>,
pub total_size_bytes: u64,
// num_writes is just Stats and not RollingStats as we don't
// aggregate num_writes at the per minute interval, it can
// just be taken from the events bucket.
pub num_writes: Stats<u64>,
pub total_num_writes: u64,
}

impl Writes {
pub fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> {
let num_writes = events_bucket.num_writes as u64;
self.lines.update(&events_bucket.writes.lines);
self.size_bytes.update(&events_bucket.writes.size_bytes);
self.num_writes.update(events_bucket.num_writes as u64);
self.num_writes.update(num_writes);
self.total_lines += events_bucket.writes.total_lines;
self.total_size_bytes += events_bucket.writes.total_size_bytes;
self.total_num_writes += num_writes;
Some(())
}

pub fn reset(&mut self) {
self.lines.reset();
self.size_bytes.reset();
self.num_writes.reset();
self.total_lines = 0;
self.total_size_bytes = 0;
self.total_num_writes = 0;
}
}

#[derive(Debug, Default)]
pub(crate) struct Queries {
// We don't aggregate the num_queries at 1 min intervals
pub num_queries: Stats<u64>,
pub total_num_queries: u64,
}

impl Queries {
pub fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> {
self.num_queries.update(events_bucket.num_queries as u64);
self.total_num_queries += events_bucket.queries.total_num_queries;
Some(())
}

pub fn reset(&mut self) {
self.num_queries.reset();
self.total_num_queries = 0;
}
}

Expand Down
2 changes: 1 addition & 1 deletion influxdb3_telemetry/src/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn sample_all_metrics(sampler: &mut CpuAndMemorySampler, store: &Arc<TelemetrySt
} else {
debug!("Cannot get cpu/mem usage stats for this process");
}
store.rollup_events();
store.rollup_events_1m();
}

#[cfg(test)]
Expand Down
10 changes: 9 additions & 1 deletion influxdb3_telemetry/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,22 @@ pub(crate) struct TelemetryPayload {
pub write_requests_min_1m: u64,
pub write_requests_max_1m: u64,
pub write_requests_avg_1m: u64,
pub write_requests_sum_1h: u64,

pub write_lines_min_1m: u64,
pub write_lines_max_1m: u64,
pub write_lines_avg_1m: u64,
pub write_lines_sum_1h: u64,

pub write_mb_min_1m: u64,
pub write_mb_max_1m: u64,
pub write_mb_avg_1m: u64,
pub write_mb_sum_1h: u64,
// reads
pub query_requests_min_1m: u64,
pub query_requests_max_1m: u64,
pub query_requests_avg_1m: u64,
pub query_requests_sum_1h: u64,
// parquet files
pub parquet_file_count: u64,
pub parquet_file_size_mb: f64,
Expand Down Expand Up @@ -108,7 +112,7 @@ async fn send_telemetry(store: &Arc<TelemetryStore>, telem_sender: &mut Telemetr
}
// if we tried sending and failed, we currently still reset the
// metrics, it is ok to miss few samples
store.reset_metrics();
store.reset_metrics_1h();
}

#[cfg(test)]
Expand Down Expand Up @@ -183,6 +187,10 @@ mod tests {
parquet_file_size_mb: 100.0,
parquet_row_count: 100,
uptime_secs: 100,
write_requests_sum_1h: 200,
write_lines_sum_1h: 200,
write_mb_sum_1h: 200,
query_requests_sum_1h: 200,
}
}
}
55 changes: 41 additions & 14 deletions influxdb3_telemetry/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl TelemetryStore {
// num::cast probably has had overflow. Best to
// reset all metrics to start again
warn!("cpu/memory could not be added, resetting metrics");
inner_store.reset_metrics();
inner_store.reset_metrics_1h();
});
}

Expand All @@ -108,14 +108,14 @@ impl TelemetryStore {
inner_store.per_minute_events_bucket.update_num_queries();
}

pub(crate) fn rollup_events(&self) {
pub(crate) fn rollup_events_1m(&self) {
let mut inner_store = self.inner.lock();
inner_store.rollup_reads_and_writes();
inner_store.rollup_reads_and_writes_1m();
}

pub(crate) fn reset_metrics(&self) {
pub(crate) fn reset_metrics_1h(&self) {
let mut inner_store = self.inner.lock();
inner_store.reset_metrics();
inner_store.reset_metrics_1h();
}

pub(crate) fn snapshot(&self) -> TelemetryPayload {
Expand Down Expand Up @@ -225,30 +225,36 @@ impl TelemetryStoreInner {
parquet_file_count: 0,
parquet_file_size_mb: 0.0,
parquet_row_count: 0,

// sums over hour
write_requests_sum_1h: self.writes.total_num_writes,
write_lines_sum_1h: self.writes.total_lines,
write_mb_sum_1h: to_mega_bytes(self.writes.total_size_bytes),
query_requests_sum_1h: self.reads.total_num_queries,
}
}

pub(crate) fn rollup_reads_and_writes(&mut self) {
pub(crate) fn rollup_reads_and_writes_1m(&mut self) {
debug!(
events_summary = ?self.per_minute_events_bucket,
"Rolling up writes/reads"
);
self.rollup_writes();
self.rollup_reads();
self.rollup_writes_1m();
self.rollup_reads_1m();
self.per_minute_events_bucket.reset();
}

fn rollup_writes(&mut self) {
fn rollup_writes_1m(&mut self) {
let events_summary = &self.per_minute_events_bucket;
self.writes.add_sample(events_summary);
}

fn rollup_reads(&mut self) {
fn rollup_reads_1m(&mut self) {
let events_summary = &self.per_minute_events_bucket;
self.reads.add_sample(events_summary);
}

fn reset_metrics(&mut self) {
fn reset_metrics_1h(&mut self) {
self.cpu.reset();
self.memory.reset();
self.writes.reset();
Expand Down Expand Up @@ -363,7 +369,7 @@ mod tests {
store.update_num_queries();

// now rollup reads/writes
store.rollup_events();
store.rollup_events_1m();
let snapshot = store.snapshot();
info!(
snapshot = ?snapshot,
Expand All @@ -373,14 +379,17 @@ mod tests {
assert_eq!(1, snapshot.write_lines_min_1m);
assert_eq!(120, snapshot.write_lines_max_1m);
assert_eq!(56, snapshot.write_lines_avg_1m);
assert_eq!(222, snapshot.write_lines_sum_1h);

assert_eq!(0, snapshot.write_mb_min_1m);
assert_eq!(0, snapshot.write_mb_max_1m);
assert_eq!(0, snapshot.write_mb_avg_1m);
assert_eq!(0, snapshot.write_mb_sum_1h);

assert_eq!(3, snapshot.query_requests_min_1m);
assert_eq!(3, snapshot.query_requests_max_1m);
assert_eq!(3, snapshot.query_requests_avg_1m);
assert_eq!(3, snapshot.query_requests_sum_1h);

// add more writes after rollup
store.add_write_metrics(100, 101_024_000);
Expand All @@ -392,7 +401,7 @@ mod tests {
store.update_num_queries();
store.update_num_queries();

store.rollup_events();
store.rollup_events_1m();
let snapshot = store.snapshot();
info!(
snapshot = ?snapshot,
Expand All @@ -401,17 +410,20 @@ mod tests {
assert_eq!(1, snapshot.write_lines_min_1m);
assert_eq!(120, snapshot.write_lines_max_1m);
assert_eq!(56, snapshot.write_lines_avg_1m);
assert_eq!(444, snapshot.write_lines_sum_1h);

assert_eq!(0, snapshot.write_mb_min_1m);
assert_eq!(200, snapshot.write_mb_max_1m);
assert_eq!(50, snapshot.write_mb_avg_1m);
assert_eq!(401, snapshot.write_mb_sum_1h);

assert_eq!(2, snapshot.query_requests_min_1m);
assert_eq!(3, snapshot.query_requests_max_1m);
assert_eq!(3, snapshot.query_requests_avg_1m);
assert_eq!(5, snapshot.query_requests_sum_1h);

// reset
store.reset_metrics();
store.reset_metrics_1h();
// check snapshot 3
let snapshot = store.snapshot();
info!(snapshot = ?snapshot, "sample snapshot 3");
Expand All @@ -421,6 +433,21 @@ mod tests {
assert_eq!(0, snapshot.memory_used_mb_min_1m);
assert_eq!(0, snapshot.memory_used_mb_max_1m);
assert_eq!(0, snapshot.memory_used_mb_avg_1m);

assert_eq!(0, snapshot.write_lines_min_1m);
assert_eq!(0, snapshot.write_lines_max_1m);
assert_eq!(0, snapshot.write_lines_avg_1m);
assert_eq!(0, snapshot.write_lines_sum_1h);

assert_eq!(0, snapshot.write_mb_min_1m);
assert_eq!(0, snapshot.write_mb_max_1m);
assert_eq!(0, snapshot.write_mb_avg_1m);
assert_eq!(0, snapshot.write_mb_sum_1h);

assert_eq!(0, snapshot.query_requests_min_1m);
assert_eq!(0, snapshot.query_requests_max_1m);
assert_eq!(0, snapshot.query_requests_avg_1m);
assert_eq!(0, snapshot.query_requests_sum_1h);
}

#[test]
Expand Down

0 comments on commit a99e68f

Please sign in to comment.