Skip to content

Commit

Permalink
Add metrics to measure wal receive on pageserver
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Nov 6, 2023
1 parent 87d8ee4 commit 33a9179
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 37 deletions.
65 changes: 44 additions & 21 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,35 @@ static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});

static LAST_RECEIVE_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_receive_lsn",
"Last received LSN grouped by timeline",
&["tenant_id", "timeline_id", "timeline_region"]
)
.expect("failed to define a metric")
});

static WAL_RECEIVE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_wal_receive_time_seconds",
"Estimated delay between sending and receiving a WAL record",
&["tenant_id", "timeline_id", "timeline_region"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});

static WAL_REPLICATION_MSG_RECORDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_wal_replication_msg_records_total",
"Number of records in a WAL replication message",
&["tenant_id", "timeline_id", "timeline_region"],
vec![1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 750.0, 1000.0, 1500.0, 2000.0],
)
.expect("failed to define a metric")
});

static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_resident_physical_size",
Expand Down Expand Up @@ -633,27 +662,6 @@ pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});

// Remotexact

pub static LAST_RECEIVED_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_received_lsn",
"Last received LSN grouped by tenant, timeline and region",
&["tenant_id", "timeline_id", "timeline_region"]
)
.expect("failed to define a metric")
});

pub static LSN_RECEIVE_DELAY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_lsn_receive_delay_seconds",
"Estiamted delay between sending and receiving a WAL record",
&["tenant_id", "timeline_id", "timeline_region"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});

// remote storage metrics

/// NB: increment _after_ recording the current value into [`REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST`].
Expand Down Expand Up @@ -978,6 +986,9 @@ pub struct TimelineMetrics {
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
pub last_receive_gauge: IntGauge,
pub wal_receive_time: Histogram,
pub wal_replication_msg_records: Histogram,
pub resident_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
Expand Down Expand Up @@ -1017,6 +1028,15 @@ impl TimelineMetrics {
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &timeline_id, &region_id])
.unwrap();
let last_receive_gauge = LAST_RECEIVE_LSN
.get_metric_with_label_values(&[&tenant_id, &timeline_id, &region_id])
.unwrap();
let wal_receive_time = WAL_RECEIVE_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id, &region_id])
.unwrap();
let wal_replication_msg_records = WAL_REPLICATION_MSG_RECORDS
.get_metric_with_label_values(&[&tenant_id, &timeline_id, &region_id])
.unwrap();
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
Expand Down Expand Up @@ -1046,6 +1066,9 @@ impl TimelineMetrics {
garbage_collect_histo,
load_layer_map_histo,
last_record_gauge,
last_receive_gauge,
wal_receive_time,
wal_replication_msg_records,
resident_physical_size_gauge,
current_logical_size_gauge,
num_persistent_files_created,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
use super::TaskStateUpdate;
use crate::{
context::RequestContext,
metrics::{
LAST_RECEIVED_LSN, LIVE_CONNECTIONS_COUNT, LSN_RECEIVE_DELAY,
WALRECEIVER_STARTED_CONNECTIONS,
},
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS},
task_mgr,
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
Expand Down Expand Up @@ -124,16 +121,6 @@ pub(super) async fn handle_walreceiver_connection(
debug_assert_current_span_has_tenant_and_timeline_id();

WALRECEIVER_STARTED_CONNECTIONS.inc();
let last_received_lsn = LAST_RECEIVED_LSN.with_label_values(&[
&timeline.tenant_id.to_string(),
&timeline.timeline_id.to_string(),
&timeline.region_id.to_string(),
]);
let lsn_receive_delay = LSN_RECEIVE_DELAY.with_label_values(&[
&timeline.tenant_id.to_string(),
&timeline.timeline_id.to_string(),
&timeline.region_id.to_string(),
]);

// Connect to the database in replication mode.
info!("connecting to {wal_source_connconf:?}");
Expand Down Expand Up @@ -287,10 +274,18 @@ pub(super) async fn handle_walreceiver_connection(
// fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
match &replication_message {
ReplicationMessage::XLogData(xlog_data) => {
last_received_lsn.set(xlog_data.wal_end() as i64);
timeline
.metrics
.last_receive_gauge
.set(xlog_data.wal_end() as i64);

if let Ok(duration) = SystemTime::now().duration_since(xlog_data.timestamp()) {
lsn_receive_delay.observe(duration.as_secs_f64());
timeline
.metrics
.wal_receive_time
.observe(duration.as_secs_f64());
}

connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
connection_status.streaming_lsn = Some(Lsn::from(
Expand Down Expand Up @@ -327,6 +322,7 @@ pub(super) async fn handle_walreceiver_connection(
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(startlsn);
let mut uncommitted_records = 0;
let mut num_records = 0;
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
Expand Down Expand Up @@ -357,12 +353,19 @@ pub(super) async fn handle_walreceiver_connection(
modification.commit().await?;
uncommitted_records = 0;
}

num_records += 1;
}

if uncommitted_records > 0 {
trace!("batch commit {} ingested WAL records", uncommitted_records);
modification.commit().await?;
}

timeline
.metrics
.wal_replication_msg_records
.observe(num_records as f64);
}

if !caught_up && endlsn >= end_of_wal {
Expand Down

0 comments on commit 33a9179

Please sign in to comment.