Skip to content

Commit

Permalink
Add metrics for pool internals
Browse files Browse the repository at this point in the history
  • Loading branch information
w4 committed Nov 11, 2024
1 parent 129b8d8 commit 942b558
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tracing = { version = "0.1.37", default-features = false, features = [
], optional = true }
twox-hash = "2"
url = "2.1"
hdrhistogram = { version = "7.5", optional = true }

[dependencies.tokio-rustls]
version = "0.26"
Expand Down
2 changes: 2 additions & 0 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ struct ConnInner {
auth_plugin: AuthPlugin<'static>,
auth_switched: bool,
server_key: Option<Vec<u8>>,
active_since: Instant,
/// Connection is already disconnected.
pub(crate) disconnected: bool,
/// One-time connection-level infile handler.
Expand Down Expand Up @@ -169,6 +170,7 @@ impl ConnInner {
server_key: None,
infile_handler: None,
reset_upon_returning_to_a_pool: false,
active_since: Instant::now(),
}
}

Expand Down
127 changes: 127 additions & 0 deletions src/conn/pool/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::sync::atomic::AtomicUsize;

use serde::Serialize;

#[derive(Default, Debug, Serialize)]
#[non_exhaustive]
pub struct Metrics {
/// Guage of active connections to the database server, this includes both connections that have belong
/// to the pool, and connections currently owned by the application.
pub connection_count: AtomicUsize,
/// Guage of active connections that currently belong to the pool.
pub connections_in_pool: AtomicUsize,
/// Guage of GetConn requests that are currently active.
pub active_wait_requests: AtomicUsize,
/// Counter of connections that failed to be created.
pub create_failed: AtomicUsize,
/// Counter of connections discarded due to pool constraints.
pub discarded_superfluous_connection: AtomicUsize,
/// Counter of connections discarded due to being closed upon return to the pool.
pub discarded_unestablished_connection: AtomicUsize,
/// Counter of connections that have been returned to the pool dirty that needed to be cleaned
/// (ie. open transactions, pending queries, etc).
pub dirty_connection_return: AtomicUsize,
/// Counter of connections that have been discarded as they were expired by the pool constraints.
pub discarded_expired_connection: AtomicUsize,
/// Counter of connections that have been reset.
pub resetting_connection: AtomicUsize,
/// Counter of connections that have been discarded as they returned an error during cleanup.
pub discarded_error_during_cleanup: AtomicUsize,
/// Counter of connections that have been returned to the pool.
pub connection_returned_to_pool: AtomicUsize,
/// Histogram of times connections have spent outside of the pool.
#[cfg(feature = "hdrhistogram")]
pub connection_active_duration: MetricsHistogram,
/// Histogram of times connections have spent inside of the pool.
#[cfg(feature = "hdrhistogram")]
pub connection_idle_duration: MetricsHistogram,
/// Histogram of times connections have spent being checked for health.
#[cfg(feature = "hdrhistogram")]
pub check_duration: MetricsHistogram,
/// Histogram of time spent waiting to connect to the server.
#[cfg(feature = "hdrhistogram")]
pub connect_duration: MetricsHistogram,
}

#[cfg(feature = "hdrhistogram")]
#[derive(Debug)]
pub struct MetricsHistogram(std::sync::Mutex<hdrhistogram::Histogram<u64>>);

#[cfg(feature = "hdrhistogram")]
impl Default for MetricsHistogram {
fn default() -> Self {
let hdr = hdrhistogram::Histogram::new_with_bounds(1, 30 * 1_00_000, 2).unwrap();
Self(std::sync::Mutex::new(hdr))
}
}

#[cfg(feature = "hdrhistogram")]
impl Serialize for MetricsHistogram {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let hdr = self.0.lock().unwrap();

/// A percentile of this histogram - for supporting serializers this
/// will ignore the key (such as `90%ile`) and instead add a
/// dimension to the metrics (such as `quantile=0.9`).
macro_rules! ile {
($e:expr) => {
&MetricAlias(concat!("!|quantile=", $e), hdr.value_at_quantile($e))
};
}

/// A 'qualified' metric name - for supporting serializers such as
/// serde_prometheus, this will prepend the metric name to this key,
/// outputting `response_time_count`, for example rather than just
/// `count`.
macro_rules! qual {
($e:expr) => {
&MetricAlias("<|", $e)
};
}

use serde::ser::SerializeMap;

let mut tup = serializer.serialize_map(Some(10))?;
tup.serialize_entry("samples", qual!(hdr.len()))?;
tup.serialize_entry("min", qual!(hdr.min()))?;
tup.serialize_entry("max", qual!(hdr.max()))?;
tup.serialize_entry("mean", qual!(hdr.mean()))?;
tup.serialize_entry("stdev", qual!(hdr.stdev()))?;
tup.serialize_entry("90%ile", ile!(0.9))?;
tup.serialize_entry("95%ile", ile!(0.95))?;
tup.serialize_entry("99%ile", ile!(0.99))?;
tup.serialize_entry("99.9%ile", ile!(0.999))?;
tup.serialize_entry("99.99%ile", ile!(0.9999))?;
tup.end()
}
}

/// This is a mocked 'newtype' (eg. `A(u64)`) that instead allows us to
/// define our own type name that doesn't have to abide by Rust's constraints
/// on type names. This allows us to do some manipulation of our metrics,
/// allowing us to add dimensionality to our metrics via key=value pairs, or
/// key manipulation on serializers that support it.
#[cfg(feature = "hdrhistogram")]
struct MetricAlias<T: Serialize>(&'static str, T);

#[cfg(feature = "hdrhistogram")]
impl<T: Serialize> Serialize for MetricAlias<T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_newtype_struct(self.0, &self.1)
}
}

#[cfg(feature = "hdrhistogram")]
impl std::ops::Deref for MetricsHistogram {
type Target = std::sync::Mutex<hdrhistogram::Histogram<u64>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}
99 changes: 90 additions & 9 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ use crate::{
queryable::transaction::{Transaction, TxOpts},
};

pub use metrics::Metrics;

mod recycler;
// this is a really unfortunate name for a module
pub mod futures;
mod metrics;
mod ttl_check_inerval;

/// Connection that is idling in the pool.
Expand Down Expand Up @@ -107,7 +110,7 @@ struct Waitlist {
}

impl Waitlist {
fn push(&mut self, waker: Waker, queue_id: QueueId) {
fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
// The documentation of Future::poll says:
// Note that on multiple calls to poll, only the Waker from
// the Context passed to the most recent call should be
Expand All @@ -120,7 +123,9 @@ impl Waitlist {
// This means we have to remove first to have the most recent
// waker in the queue.
self.remove(queue_id);
self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
self.queue
.push(QueuedWaker { queue_id, waker }, queue_id)
.is_none()
}

fn pop(&mut self) -> Option<Waker> {
Expand All @@ -130,8 +135,8 @@ impl Waitlist {
}
}

fn remove(&mut self, id: QueueId) {
self.queue.remove(&id);
fn remove(&mut self, id: QueueId) -> bool {
self.queue.remove(&id).is_some()
}

fn peek_id(&mut self) -> Option<QueueId> {
Expand Down Expand Up @@ -181,6 +186,7 @@ impl Hash for QueuedWaker {
/// Connection pool data.
#[derive(Debug)]
pub struct Inner {
metrics: Arc<Metrics>,
close: atomic::AtomicBool,
closed: atomic::AtomicBool,
exchange: Mutex<Exchange>,
Expand Down Expand Up @@ -220,6 +226,10 @@ impl Pool {
inner: Arc::new(Inner {
close: false.into(),
closed: false.into(),
metrics: Arc::new(Metrics {
connection_count: atomic::AtomicUsize::new(pool_opts.constraints().max()),
..Metrics::default()
}),
exchange: Mutex::new(Exchange {
available: VecDeque::with_capacity(pool_opts.constraints().max()),
waiting: Waitlist::default(),
Expand All @@ -231,6 +241,11 @@ impl Pool {
}
}

/// Returns metrics for the connection pool.
pub fn metrics(&self) -> Arc<Metrics> {
self.inner.metrics.clone()
}

/// Creates a new pool of connections.
pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
let opts = Opts::from_str(url.as_ref())?;
Expand Down Expand Up @@ -288,6 +303,10 @@ impl Pool {
pub(super) fn cancel_connection(&self) {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.exist -= 1;
self.inner
.metrics
.create_failed
.fetch_add(1, atomic::Ordering::Relaxed);
// we just enabled the creation of a new connection!
if let Some(w) = exchange.waiting.pop() {
w.wake();
Expand Down Expand Up @@ -320,15 +339,44 @@ impl Pool {

// If we are not, just queue
if !highest {
exchange.waiting.push(cx.waker().clone(), queue_id);
if exchange.waiting.push(cx.waker().clone(), queue_id) {
self.inner
.metrics
.active_wait_requests
.fetch_add(1, atomic::Ordering::Relaxed);
}
return Poll::Pending;
}

while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() {
#[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled
while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() {
self.inner
.metrics
.connections_in_pool
.fetch_sub(1, atomic::Ordering::Relaxed);

if !conn.expired() {
#[cfg(feature = "hdrhistogram")]
self.inner
.metrics
.connection_idle_duration
.lock()
.unwrap()
.saturating_record(since.elapsed().as_micros() as u64);
#[cfg(feature = "hdrhistogram")]
let metrics = self.metrics();
conn.inner.active_since = Instant::now();
return Poll::Ready(Ok(GetConnInner::Checking(
async move {
conn.stream_mut()?.check().await?;
#[cfg(feature = "hdrhistogram")]
metrics
.check_duration
.lock()
.unwrap()
.saturating_record(
conn.inner.active_since.elapsed().as_micros() as u64
);
Ok(conn)
}
.boxed(),
Expand All @@ -344,19 +392,52 @@ impl Pool {
// we are allowed to make a new connection, so we will!
exchange.exist += 1;

self.inner
.metrics
.connection_count
.fetch_add(1, atomic::Ordering::Relaxed);

let opts = self.opts.clone();
#[cfg(feature = "hdrhistogram")]
let metrics = self.metrics();

return Poll::Ready(Ok(GetConnInner::Connecting(
Conn::new(self.opts.clone()).boxed(),
async move {
let conn = Conn::new(opts).await;
#[cfg(feature = "hdrhistogram")]
if let Ok(conn) = &conn {
metrics
.connect_duration
.lock()
.unwrap()
.saturating_record(
conn.inner.active_since.elapsed().as_micros() as u64
);
}
conn
}
.boxed(),
)));
}

// Polled, but no conn available? Back into the queue.
exchange.waiting.push(cx.waker().clone(), queue_id);
if exchange.waiting.push(cx.waker().clone(), queue_id) {
self.inner
.metrics
.active_wait_requests
.fetch_add(1, atomic::Ordering::Relaxed);
}
Poll::Pending
}

fn unqueue(&self, queue_id: QueueId) {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.waiting.remove(queue_id);
if exchange.waiting.remove(queue_id) {
self.inner
.metrics
.active_wait_requests
.fetch_sub(1, atomic::Ordering::Relaxed);
}
}
}

Expand Down
Loading

0 comments on commit 942b558

Please sign in to comment.