Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 1h sum fields for telem writes/queries #25785

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions influxdb3_telemetry/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,34 @@ impl EventsBucket {
#[derive(Debug, Default)]
pub(crate) struct PerMinuteWrites {
pub lines: Stats<u64>,
pub total_lines: u64,
pub size_bytes: Stats<u64>,
pub total_size_bytes: u64,
}

impl PerMinuteWrites {
pub fn add_sample(&mut self, num_lines: usize, size_bytes: usize) -> Option<()> {
self.lines.update(num_lines as u64);
self.size_bytes.update(size_bytes as u64)?;
let new_num_lines = num_lines as u64;
self.lines.update(new_num_lines);
self.total_lines += new_num_lines;

let new_size_bytes = size_bytes as u64;
self.size_bytes.update(new_size_bytes)?;
self.total_size_bytes += new_size_bytes;
Some(())
}
}

#[derive(Debug, Default)]
pub(crate) struct PerMinuteReads {
pub num_queries: Stats<u64>,
pub total_num_queries: u64,
}

impl PerMinuteReads {
pub fn add_sample(&mut self) -> Option<()> {
self.num_queries.update(1);
self.total_num_queries += 1;
Some(())
}
}
Expand Down
16 changes: 15 additions & 1 deletion influxdb3_telemetry/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,55 @@ use crate::{
#[derive(Debug, Default)]
pub(crate) struct Writes {
pub lines: RollingStats<u64>,
pub total_lines: u64,
pub size_bytes: RollingStats<u64>,
pub total_size_bytes: u64,
// num_writes is just Stats and not RollingStats as we don't
// aggregate num_writes at the per minute interval, it can
// just be taken from the events bucket.
pub num_writes: Stats<u64>,
pub total_num_writes: u64,
}

impl Writes {
pub fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> {
let num_writes = events_bucket.num_writes as u64;
self.lines.update(&events_bucket.writes.lines);
self.size_bytes.update(&events_bucket.writes.size_bytes);
self.num_writes.update(events_bucket.num_writes as u64);
self.num_writes.update(num_writes);
self.total_lines += events_bucket.writes.total_lines;
self.total_size_bytes += events_bucket.writes.total_size_bytes;
self.total_num_writes += num_writes;
Some(())
}

pub fn reset(&mut self) {
self.lines.reset();
self.size_bytes.reset();
self.num_writes.reset();
self.total_lines = 0;
self.total_size_bytes = 0;
self.total_num_writes = 0;
}
}

#[derive(Debug, Default)]
pub(crate) struct Queries {
// We don't aggregate the num_queries at 1 min intervals
pub num_queries: Stats<u64>,
pub total_num_queries: u64,
}

impl Queries {
pub fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> {
self.num_queries.update(events_bucket.num_queries as u64);
self.total_num_queries += events_bucket.queries.total_num_queries;
Some(())
}

pub fn reset(&mut self) {
self.num_queries.reset();
self.total_num_queries = 0;
}
}

Expand Down
2 changes: 1 addition & 1 deletion influxdb3_telemetry/src/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn sample_all_metrics(sampler: &mut CpuAndMemorySampler, store: &Arc<TelemetrySt
} else {
debug!("Cannot get cpu/mem usage stats for this process");
}
store.rollup_events();
store.rollup_events_1m();
}

#[cfg(test)]
Expand Down
10 changes: 9 additions & 1 deletion influxdb3_telemetry/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,22 @@ pub(crate) struct TelemetryPayload {
pub write_requests_min_1m: u64,
pub write_requests_max_1m: u64,
pub write_requests_avg_1m: u64,
pub write_requests_sum_1h: u64,

pub write_lines_min_1m: u64,
pub write_lines_max_1m: u64,
pub write_lines_avg_1m: u64,
pub write_lines_sum_1h: u64,

pub write_mb_min_1m: u64,
pub write_mb_max_1m: u64,
pub write_mb_avg_1m: u64,
pub write_mb_sum_1h: u64,
// reads
pub query_requests_min_1m: u64,
pub query_requests_max_1m: u64,
pub query_requests_avg_1m: u64,
pub query_requests_sum_1h: u64,
// parquet files
pub parquet_file_count: u64,
pub parquet_file_size_mb: f64,
Expand Down Expand Up @@ -108,7 +112,7 @@ async fn send_telemetry(store: &Arc<TelemetryStore>, telem_sender: &mut Telemetr
}
// if we tried sending and failed, we currently still reset the
// metrics, it is ok to miss few samples
store.reset_metrics();
store.reset_metrics_1h();
}

#[cfg(test)]
Expand Down Expand Up @@ -183,6 +187,10 @@ mod tests {
parquet_file_size_mb: 100.0,
parquet_row_count: 100,
uptime_secs: 100,
write_requests_sum_1h: 200,
write_lines_sum_1h: 200,
write_mb_sum_1h: 200,
query_requests_sum_1h: 200,
}
}
}
55 changes: 41 additions & 14 deletions influxdb3_telemetry/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl TelemetryStore {
// num::cast probably has had overflow. Best to
// reset all metrics to start again
warn!("cpu/memory could not be added, resetting metrics");
inner_store.reset_metrics();
inner_store.reset_metrics_1h();
});
}

Expand All @@ -108,14 +108,14 @@ impl TelemetryStore {
inner_store.per_minute_events_bucket.update_num_queries();
}

pub(crate) fn rollup_events(&self) {
pub(crate) fn rollup_events_1m(&self) {
let mut inner_store = self.inner.lock();
inner_store.rollup_reads_and_writes();
inner_store.rollup_reads_and_writes_1m();
}

pub(crate) fn reset_metrics(&self) {
pub(crate) fn reset_metrics_1h(&self) {
let mut inner_store = self.inner.lock();
inner_store.reset_metrics();
inner_store.reset_metrics_1h();
}

pub(crate) fn snapshot(&self) -> TelemetryPayload {
Expand Down Expand Up @@ -225,30 +225,36 @@ impl TelemetryStoreInner {
parquet_file_count: 0,
parquet_file_size_mb: 0.0,
parquet_row_count: 0,

// sums over hour
write_requests_sum_1h: self.writes.total_num_writes,
write_lines_sum_1h: self.writes.total_lines,
write_mb_sum_1h: to_mega_bytes(self.writes.total_size_bytes),
query_requests_sum_1h: self.reads.total_num_queries,
}
}

pub(crate) fn rollup_reads_and_writes(&mut self) {
pub(crate) fn rollup_reads_and_writes_1m(&mut self) {
debug!(
events_summary = ?self.per_minute_events_bucket,
"Rolling up writes/reads"
);
self.rollup_writes();
self.rollup_reads();
self.rollup_writes_1m();
self.rollup_reads_1m();
self.per_minute_events_bucket.reset();
}

fn rollup_writes(&mut self) {
fn rollup_writes_1m(&mut self) {
let events_summary = &self.per_minute_events_bucket;
self.writes.add_sample(events_summary);
}

fn rollup_reads(&mut self) {
fn rollup_reads_1m(&mut self) {
let events_summary = &self.per_minute_events_bucket;
self.reads.add_sample(events_summary);
}

fn reset_metrics(&mut self) {
fn reset_metrics_1h(&mut self) {
self.cpu.reset();
self.memory.reset();
self.writes.reset();
Expand Down Expand Up @@ -363,7 +369,7 @@ mod tests {
store.update_num_queries();

// now rollup reads/writes
store.rollup_events();
store.rollup_events_1m();
let snapshot = store.snapshot();
info!(
snapshot = ?snapshot,
Expand All @@ -373,14 +379,17 @@ mod tests {
assert_eq!(1, snapshot.write_lines_min_1m);
assert_eq!(120, snapshot.write_lines_max_1m);
assert_eq!(56, snapshot.write_lines_avg_1m);
assert_eq!(222, snapshot.write_lines_sum_1h);

assert_eq!(0, snapshot.write_mb_min_1m);
assert_eq!(0, snapshot.write_mb_max_1m);
assert_eq!(0, snapshot.write_mb_avg_1m);
assert_eq!(0, snapshot.write_mb_sum_1h);

assert_eq!(3, snapshot.query_requests_min_1m);
assert_eq!(3, snapshot.query_requests_max_1m);
assert_eq!(3, snapshot.query_requests_avg_1m);
assert_eq!(3, snapshot.query_requests_sum_1h);

// add more writes after rollup
store.add_write_metrics(100, 101_024_000);
Expand All @@ -392,7 +401,7 @@ mod tests {
store.update_num_queries();
store.update_num_queries();

store.rollup_events();
store.rollup_events_1m();
let snapshot = store.snapshot();
info!(
snapshot = ?snapshot,
Expand All @@ -401,17 +410,20 @@ mod tests {
assert_eq!(1, snapshot.write_lines_min_1m);
assert_eq!(120, snapshot.write_lines_max_1m);
assert_eq!(56, snapshot.write_lines_avg_1m);
assert_eq!(444, snapshot.write_lines_sum_1h);

assert_eq!(0, snapshot.write_mb_min_1m);
assert_eq!(200, snapshot.write_mb_max_1m);
assert_eq!(50, snapshot.write_mb_avg_1m);
assert_eq!(401, snapshot.write_mb_sum_1h);

assert_eq!(2, snapshot.query_requests_min_1m);
assert_eq!(3, snapshot.query_requests_max_1m);
assert_eq!(3, snapshot.query_requests_avg_1m);
assert_eq!(5, snapshot.query_requests_sum_1h);

// reset
store.reset_metrics();
store.reset_metrics_1h();
// check snapshot 3
let snapshot = store.snapshot();
info!(snapshot = ?snapshot, "sample snapshot 3");
Expand All @@ -421,6 +433,21 @@ mod tests {
assert_eq!(0, snapshot.memory_used_mb_min_1m);
assert_eq!(0, snapshot.memory_used_mb_max_1m);
assert_eq!(0, snapshot.memory_used_mb_avg_1m);

assert_eq!(0, snapshot.write_lines_min_1m);
assert_eq!(0, snapshot.write_lines_max_1m);
assert_eq!(0, snapshot.write_lines_avg_1m);
assert_eq!(0, snapshot.write_lines_sum_1h);

assert_eq!(0, snapshot.write_mb_min_1m);
assert_eq!(0, snapshot.write_mb_max_1m);
assert_eq!(0, snapshot.write_mb_avg_1m);
assert_eq!(0, snapshot.write_mb_sum_1h);

assert_eq!(0, snapshot.query_requests_min_1m);
assert_eq!(0, snapshot.query_requests_max_1m);
assert_eq!(0, snapshot.query_requests_avg_1m);
assert_eq!(0, snapshot.query_requests_sum_1h);
}

#[test]
Expand Down
Loading