From 33a917959231d3ac4bfa235d749b3648815a1c2a Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Mon, 6 Nov 2023 18:00:15 -0500 Subject: [PATCH] Add metrics to measure wal receive on pageserver --- pageserver/src/metrics.rs | 65 +++++++++++++------ .../walreceiver/walreceiver_connection.rs | 35 +++++----- 2 files changed, 63 insertions(+), 37 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 0a8a82e0e6a1..2b96b057b170 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -261,6 +261,35 @@ static LAST_RECORD_LSN: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +static LAST_RECEIVE_LSN: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "pageserver_last_receive_lsn", + "Last received LSN grouped by timeline", + &["tenant_id", "timeline_id", "timeline_region"] + ) + .expect("failed to define a metric") +}); + +static WAL_RECEIVE_TIME: Lazy = Lazy::new(|| { + register_histogram_vec!( + "pageserver_wal_receive_time_seconds", + "Estimated delay between sending and receiving a WAL record", + &["tenant_id", "timeline_id", "timeline_region"], + CRITICAL_OP_BUCKETS.into(), + ) + .expect("failed to define a metric") +}); + +static WAL_REPLICATION_MSG_RECORDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + "pageserver_wal_replication_msg_records_total", + "Number of records in a WAL replication message", + &["tenant_id", "timeline_id", "timeline_region"], + vec![1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 750.0, 1000.0, 1500.0, 2000.0], + ) + .expect("failed to define a metric") +}); + static RESIDENT_PHYSICAL_SIZE: Lazy = Lazy::new(|| { register_uint_gauge_vec!( "pageserver_resident_physical_size", @@ -633,27 +662,6 @@ pub static LIVE_CONNECTIONS_COUNT: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -// Remotexact - -pub static LAST_RECEIVED_LSN: Lazy = Lazy::new(|| { - register_int_gauge_vec!( - "pageserver_last_received_lsn", - "Last received LSN grouped by tenant, timeline and region", - &["tenant_id", "timeline_id", "timeline_region"] - ) - .expect("failed to define a metric") -}); - -pub static LSN_RECEIVE_DELAY: Lazy = Lazy::new(|| { - register_histogram_vec!( - "pageserver_lsn_receive_delay_seconds", - "Estiamted delay between sending and receiving a WAL record", - &["tenant_id", "timeline_id", "timeline_region"], - CRITICAL_OP_BUCKETS.into(), - ) - .expect("failed to define a metric") -}); - // remote storage metrics /// NB: increment _after_ recording the current value into [`REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST`]. @@ -978,6 +986,9 @@ pub struct TimelineMetrics { pub load_layer_map_histo: StorageTimeMetrics, pub garbage_collect_histo: StorageTimeMetrics, pub last_record_gauge: IntGauge, + pub last_receive_gauge: IntGauge, + pub wal_receive_time: Histogram, + pub wal_replication_msg_records: Histogram, pub resident_physical_size_gauge: UIntGauge, /// copy of LayeredTimeline.current_logical_size pub current_logical_size_gauge: UIntGauge, @@ -1017,6 +1028,15 @@ impl TimelineMetrics { let last_record_gauge = LAST_RECORD_LSN .get_metric_with_label_values(&[&tenant_id, &timeline_id, ®ion_id]) .unwrap(); + let last_receive_gauge = LAST_RECEIVE_LSN + .get_metric_with_label_values(&[&tenant_id, &timeline_id, ®ion_id]) + .unwrap(); + let wal_receive_time = WAL_RECEIVE_TIME + .get_metric_with_label_values(&[&tenant_id, &timeline_id, ®ion_id]) + .unwrap(); + let wal_replication_msg_records = WAL_REPLICATION_MSG_RECORDS + .get_metric_with_label_values(&[&tenant_id, &timeline_id, ®ion_id]) + .unwrap(); let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); @@ -1046,6 +1066,9 @@ impl TimelineMetrics { garbage_collect_histo, load_layer_map_histo, last_record_gauge, + last_receive_gauge, + wal_receive_time, + wal_replication_msg_records, resident_physical_size_gauge, current_logical_size_gauge, num_persistent_files_created, diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index bffa4ac7900e..b3c8c6e0e1c6 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -27,10 +27,7 @@ use tracing::{debug, error, info, trace, warn, Instrument}; use super::TaskStateUpdate; use crate::{ context::RequestContext, - metrics::{ - LAST_RECEIVED_LSN, LIVE_CONNECTIONS_COUNT, LSN_RECEIVE_DELAY, - WALRECEIVER_STARTED_CONNECTIONS, - }, + metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS}, task_mgr, task_mgr::TaskKind, task_mgr::WALRECEIVER_RUNTIME, @@ -124,16 +121,6 @@ pub(super) async fn handle_walreceiver_connection( debug_assert_current_span_has_tenant_and_timeline_id(); WALRECEIVER_STARTED_CONNECTIONS.inc(); - let last_received_lsn = LAST_RECEIVED_LSN.with_label_values(&[ - &timeline.tenant_id.to_string(), - &timeline.timeline_id.to_string(), - &timeline.region_id.to_string(), - ]); - let lsn_receive_delay = LSN_RECEIVE_DELAY.with_label_values(&[ - &timeline.tenant_id.to_string(), - &timeline.timeline_id.to_string(), - &timeline.region_id.to_string(), - ]); // Connect to the database in replication mode. info!("connecting to {wal_source_connconf:?}"); @@ -287,10 +274,18 @@ pub(super) async fn handle_walreceiver_connection( // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper. match &replication_message { ReplicationMessage::XLogData(xlog_data) => { - last_received_lsn.set(xlog_data.wal_end() as i64); + timeline + .metrics + .last_receive_gauge + .set(xlog_data.wal_end() as i64); + if let Ok(duration) = SystemTime::now().duration_since(xlog_data.timestamp()) { - lsn_receive_delay.observe(duration.as_secs_f64()); + timeline + .metrics + .wal_receive_time + .observe(duration.as_secs_f64()); } + connection_status.latest_connection_update = now; connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end())); connection_status.streaming_lsn = Some(Lsn::from( @@ -327,6 +322,7 @@ pub(super) async fn handle_walreceiver_connection( let mut decoded = DecodedWALRecord::default(); let mut modification = timeline.begin_modification(startlsn); let mut uncommitted_records = 0; + let mut num_records = 0; while let Some((lsn, recdata)) = waldecoder.poll_decode()? { // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are @@ -357,12 +353,19 @@ pub(super) async fn handle_walreceiver_connection( modification.commit().await?; uncommitted_records = 0; } + + num_records += 1; } if uncommitted_records > 0 { trace!("batch commit {} ingested WAL records", uncommitted_records); modification.commit().await?; } + + timeline + .metrics + .wal_replication_msg_records + .observe(num_records as f64); } if !caught_up && endlsn >= end_of_wal {