Skip to content

Commit

Permalink
Pick latest available lsn. Add more metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Oct 31, 2023
1 parent b5f60f7 commit 1ecb908
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async fn build_timeline_info_common(
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let last_record_lsn = timeline.get_last_record_lsn();
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
let guard = timeline.last_received_wal.lock().unwrap();
let guard = timeline.last_received_wal.read().unwrap();
if let Some(info) = guard.as_ref() {
(
Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
Expand Down
27 changes: 24 additions & 3 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use metrics::{
use once_cell::sync::Lazy;
use strum::VariantNames;
use strum_macros::{EnumVariantNames, IntoStaticStr};
use utils::id::{TenantId, TimelineId};
use utils::id::{TenantId, TimelineId, RegionId};

/// Prometheus histogram buckets (in seconds) for operations in the critical
/// path. In other words, operations that directly affect that latency of user
Expand Down Expand Up @@ -256,7 +256,7 @@ static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
"Last record LSN grouped by timeline",
&["tenant_id", "timeline_id"]
&["tenant_id", "timeline_id", "timeline_region"]
)
.expect("failed to define a metric")
});
Expand Down Expand Up @@ -633,6 +633,25 @@ pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});

pub static LAST_RECEIVED_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_received_lsn",
"Last received LSN grouped by tenant, timeline and region",
&["tenant_id", "timeline_id", "timeline_region"]
)
.expect("failed to define a metric")
});

pub static LSN_RECEIVE_DELAY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_lsn_receive_delay_seconds",
"Estiamted delay between sending and receiving a WAL record",
&["tenant_id", "timeline_id", "timeline_region"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});

// remote storage metrics

/// NB: increment _after_ recording the current value into [`REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST`].
Expand Down Expand Up @@ -970,10 +989,12 @@ impl TimelineMetrics {
pub fn new(
tenant_id: &TenantId,
timeline_id: &TimelineId,
region_id: &RegionId,
evictions_with_low_residence_duration_builder: EvictionsWithLowResidenceDurationBuilder,
) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
let region_id = region_id.to_string();
let flush_time_histo =
StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id);
let compact_time_histo =
Expand All @@ -992,7 +1013,7 @@ impl TimelineMetrics {
let garbage_collect_histo =
StorageTimeMetrics::new(StorageTimeOperation::Gc, &tenant_id, &timeline_id);
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.get_metric_with_label_values(&[&tenant_id, &timeline_id, &region_id])
.unwrap();
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
Expand Down
17 changes: 15 additions & 2 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,8 +956,21 @@ impl PageServerHandler {
ctx: &RequestContext,
) -> anyhow::Result<PagestreamBeMessage> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, Lsn(0), true, &latest_gc_cutoff_lsn, ctx).await?;
let lsn = {
let last_received_lsn = timeline
.last_received_wal
.read()
.unwrap()
.as_ref()
.map(|wal| wal.last_received_msg_lsn);
if let Some(lsn) = last_received_lsn {
lsn
} else {
// No WAL has been received yet. Fall back to getting the latest processed lsn
Self::wait_or_get_last_lsn(timeline, Lsn(0), true, &latest_gc_cutoff_lsn, ctx)
.await?
}
};

Ok(PagestreamBeMessage::GetLatestLsn(
PagestreamGetLatestLsnResponse { lsn },
Expand Down
5 changes: 3 additions & 2 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ pub struct Timeline {
/// Information about the last processed message by the WAL receiver,
/// or None if WAL receiver has not received anything for this timeline
/// yet.
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
pub last_received_wal: RwLock<Option<WalReceiverInfo>>,
pub walreceiver: Mutex<Option<WalReceiver>>,

/// Relation size cache
Expand Down Expand Up @@ -1431,6 +1431,7 @@ impl Timeline {
metrics: TimelineMetrics::new(
&tenant_id,
&timeline_id,
&metadata.region_id(),
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
"mtime",
evictions_low_residence_duration_metric_threshold,
Expand Down Expand Up @@ -1466,7 +1467,7 @@ impl Timeline {
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,

last_received_wal: Mutex::new(None),
last_received_wal: RwLock::new(None),
rel_size_cache: RwLock::new(HashMap::new()),

download_all_remote_layers_task_info: RwLock::new(None),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
use super::TaskStateUpdate;
use crate::{
context::RequestContext,
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS},
metrics::{LAST_RECEIVED_LSN, LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, LSN_RECEIVE_DELAY},
task_mgr,
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
Expand Down Expand Up @@ -118,6 +118,16 @@ pub(super) async fn handle_walreceiver_connection(
debug_assert_current_span_has_tenant_and_timeline_id();

WALRECEIVER_STARTED_CONNECTIONS.inc();
let last_received_lsn = LAST_RECEIVED_LSN.with_label_values(&[
&timeline.tenant_id.to_string(),
&timeline.timeline_id.to_string(),
&timeline.region_id.to_string(),
]);
let lsn_receive_delay = LSN_RECEIVE_DELAY.with_label_values(&[
&timeline.tenant_id.to_string(),
&timeline.timeline_id.to_string(),
&timeline.region_id.to_string(),
]);

// Connect to the database in replication mode.
info!("connecting to {wal_source_connconf:?}");
Expand Down Expand Up @@ -271,6 +281,10 @@ pub(super) async fn handle_walreceiver_connection(
// fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
match &replication_message {
ReplicationMessage::XLogData(xlog_data) => {
last_received_lsn.set(xlog_data.wal_end() as i64);
if let Ok(duration) = SystemTime::now().duration_since(xlog_data.timestamp()) {
lsn_receive_delay.observe(duration.as_secs_f64());
}
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
connection_status.streaming_lsn = Some(Lsn::from(
Expand Down Expand Up @@ -391,7 +405,7 @@ pub(super) async fn handle_walreceiver_connection(
.expect("Received message time should be before UNIX EPOCH!")
.as_micros(),
};
*timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
*timeline.last_received_wal.write().unwrap() = Some(last_received_wal);

// Send the replication feedback message.
// Regular standby_status_update fields are put into this message.
Expand Down

0 comments on commit 1ecb908

Please sign in to comment.