diff --git a/Cargo.toml b/Cargo.toml index bd55f046..1eaa0a4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ pem = "1.0.1" percent-encoding = "2.1.0" pin-project = "1.0.2" priority-queue = "1" -serde = "1" +serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" socket2 = "0.4.2" thiserror = "1.0.4" @@ -96,6 +96,7 @@ default-rustls = [ "derive", "rustls-tls", ] +metrics = [] minimal = ["flate2/zlib"] native-tls-tls = ["native-tls", "tokio-native-tls"] rustls-tls = [ diff --git a/src/buffer_pool.rs b/src/buffer_pool.rs index 03b1a4cc..b26851c0 100644 --- a/src/buffer_pool.rs +++ b/src/buffer_pool.rs @@ -6,6 +6,7 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +use crate::metrics::BufferPoolMetrics; use crossbeam::queue::ArrayQueue; use std::{mem::replace, ops::Deref, sync::Arc}; @@ -14,6 +15,7 @@ pub struct BufferPool { buffer_size_cap: usize, buffer_init_cap: usize, pool: ArrayQueue>, + metrics: BufferPoolMetrics, } impl BufferPool { @@ -37,14 +39,21 @@ impl BufferPool { pool: ArrayQueue::new(pool_cap), buffer_size_cap, buffer_init_cap, + metrics: Default::default(), } } pub fn get(self: &Arc) -> PooledBuf { - let buf = self - .pool - .pop() - .unwrap_or_else(|| Vec::with_capacity(self.buffer_init_cap)); + let buf = match self.pool.pop() { + Some(buf) => { + self.metrics.reuses.incr(); + buf + } + None => { + self.metrics.creations.incr(); + Vec::with_capacity(self.buffer_init_cap) + } + }; debug_assert_eq!(buf.len(), 0); PooledBuf(buf, self.clone()) } @@ -64,7 +73,15 @@ impl BufferPool { buf.shrink_to(self.buffer_size_cap); // ArrayQueue will make sure to drop the buffer if capacity is exceeded - let _ = self.pool.push(buf); + match self.pool.push(buf) { + Ok(()) => self.metrics.returns.incr(), + Err(_buf) => self.metrics.discards.incr(), + }; + } + + #[cfg(feature = "metrics")] + pub(crate) fn snapshot_metrics(&self) -> BufferPoolMetrics { + self.metrics.clone() } } diff --git a/src/conn/mod.rs b/src/conn/mod.rs index 4f0500f1..b020cae4 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -38,6 +38,7 @@ use crate::{ consts::{CapabilityFlags, Command, StatusFlags}, error::*, io::Stream, + metrics::ConnMetrics, opts::Opts, queryable::{ query_result::{QueryResult, ResultSetMeta}, @@ -56,6 +57,8 @@ pub mod stmt_cache; /// Helper that asynchronously disconnects the givent connection on the default tokio executor. fn disconnect(mut conn: Conn) { + conn.metrics().disconnects.incr(); + let disconnected = conn.inner.disconnected; // Mark conn as disconnected. @@ -116,6 +119,7 @@ struct ConnInner { /// One-time connection-level infile handler. infile_handler: Option> + Send + Sync + 'static>>>, + conn_metrics: Arc, } impl fmt::Debug for ConnInner { @@ -137,6 +141,7 @@ impl fmt::Debug for ConnInner { impl ConnInner { /// Constructs an empty connection. fn empty(opts: Opts) -> ConnInner { + let conn_metrics: Arc = Default::default(); ConnInner { capabilities: opts.get_capabilities(), status: StatusFlags::empty(), @@ -151,7 +156,7 @@ impl ConnInner { tx_status: TxStatus::None, last_io: Instant::now(), wait_timeout: Duration::from_secs(0), - stmt_cache: StmtCache::new(opts.stmt_cache_size()), + stmt_cache: StmtCache::new(opts.stmt_cache_size(), conn_metrics.clone()), socket: opts.socket().map(Into::into), opts, nonce: Vec::default(), @@ -161,6 +166,7 @@ impl ConnInner { server_key: None, infile_handler: None, reset_upon_returning_to_a_pool: false, + conn_metrics, } } @@ -172,6 +178,18 @@ impl ConnInner { .as_mut() .ok_or_else(|| DriverError::ConnectionClosed.into()) } + + fn set_pool(&mut self, pool: Option) { + let conn_metrics = if let Some(ref pool) = pool { + Arc::clone(&pool.inner.metrics.conn) + } else { + Default::default() + }; + self.conn_metrics = Arc::clone(&conn_metrics); + self.stmt_cache.conn_metrics = conn_metrics; + + self.pool = pool; + } } /// MySql server connection. @@ -907,6 +925,8 @@ impl Conn { conn.read_wait_timeout().await?; conn.run_init_commands().await?; + conn.metrics().connects.incr(); + Ok(conn) } .boxed() @@ -1045,6 +1065,10 @@ impl Conn { self.routine(routines::ChangeUser).await?; self.inner.stmt_cache.clear(); self.inner.infile_handler = None; + // self.inner.set_pool(pool); + + // TODO: clear some metrics? + Ok(()) } @@ -1159,6 +1183,10 @@ impl Conn { Ok(BinlogStream::new(self)) } + + pub(crate) fn metrics(&self) -> &ConnMetrics { + &self.inner.conn_metrics + } } #[cfg(test)] diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 8b21e685..476beeb7 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -142,9 +142,10 @@ impl Future for GetConn { return match result { Ok(mut c) => { - c.inner.pool = Some(pool); + c.inner.set_pool(Some(pool)); c.inner.reset_upon_returning_to_a_pool = self.reset_upon_returning_to_a_pool; + c.metrics().connects.incr(); Poll::Ready(Ok(c)) } Err(e) => { @@ -160,7 +161,8 @@ impl Future for GetConn { self.inner = GetConnInner::Done; let pool = self.pool_take(); - c.inner.pool = Some(pool); + pool.inner.metrics.reuses.incr(); + c.inner.set_pool(Some(pool)); c.inner.reset_upon_returning_to_a_pool = self.reset_upon_returning_to_a_pool; return Poll::Ready(Ok(c)); diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 84182e8e..505ebc9b 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -25,6 +25,7 @@ use std::{ use crate::{ conn::{pool::futures::*, Conn}, error::*, + metrics::PoolMetrics, opts::{Opts, PoolOpts}, queryable::transaction::{Transaction, TxOpts, TxStatus}, }; @@ -177,6 +178,7 @@ pub struct Inner { close: atomic::AtomicBool, closed: atomic::AtomicBool, exchange: Mutex, + pub(crate) metrics: PoolMetrics, } /// Asynchronous pool of MySql connections. @@ -190,7 +192,7 @@ pub struct Inner { #[derive(Debug, Clone)] pub struct Pool { opts: Opts, - inner: Arc, + pub(super) inner: Arc, drop: mpsc::UnboundedSender>, } @@ -219,6 +221,7 @@ impl Pool { exist: 0, recycler: Some((rx, pool_opts)), }), + metrics: Default::default(), }), drop: tx, } @@ -232,6 +235,7 @@ impl Pool { /// Async function that resolves to `Conn`. pub fn get_conn(&self) -> GetConn { + self.inner.metrics.gets.incr(); GetConn::new(self, true) } @@ -249,6 +253,11 @@ impl Pool { DisconnectPool::new(self) } + #[cfg(feature = "metrics")] + pub fn snapshot_metrics(&self) -> PoolMetrics { + self.inner.metrics.clone() + } + /// A way to return connection taken from a pool. fn return_conn(&mut self, conn: Conn) { // NOTE: we're not in async context here, so we can't block or return NotReady @@ -264,11 +273,14 @@ impl Pool { { let mut exchange = self.inner.exchange.lock().unwrap(); if exchange.available.len() < self.opts.pool_opts().active_bound() { + self.inner.metrics.direct_returnals.incr(); exchange.available.push_back(conn.into()); if let Some(w) = exchange.waiting.pop() { w.wake(); } return; + } else { + self.inner.metrics.discards.incr(); } } @@ -276,6 +288,8 @@ impl Pool { } fn send_to_recycler(&self, conn: Conn) { + self.inner.metrics.recycler.recycles.incr(); + if let Err(conn) = self.drop.send(Some(conn)) { let conn = conn.0.unwrap(); diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index 5a705868..a7dcd7d4 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -67,8 +67,10 @@ impl Future for Recycler { let mut exchange = $self.inner.exchange.lock().unwrap(); if exchange.available.len() >= $self.pool_opts.active_bound() { drop(exchange); + $self.inner.metrics.recycler.discards.incr(); $self.discard.push($conn.close_conn().boxed()); } else { + $self.inner.metrics.recycler.recycled_returnals.incr(); exchange.available.push_back($conn.into()); if let Some(w) = exchange.waiting.pop() { w.wake(); @@ -80,11 +82,14 @@ impl Future for Recycler { macro_rules! conn_decision { ($self:ident, $conn:ident) => { if $conn.inner.stream.is_none() || $conn.inner.disconnected { + $self.inner.metrics.recycler.discards.incr(); // drop unestablished connection $self.discard.push(futures_util::future::ok(()).boxed()); } else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() { + $self.inner.metrics.recycler.cleans.incr(); $self.cleaning.push($conn.cleanup_for_pool().boxed()); } else if $conn.expired() || close { + $self.inner.metrics.recycler.discards.incr(); $self.discard.push($conn.close_conn().boxed()); } else if $conn.inner.reset_upon_returning_to_a_pool { $self.reset.push($conn.reset_for_pool().boxed()); diff --git a/src/conn/routines/exec.rs b/src/conn/routines/exec.rs index 262a90c9..41b27670 100644 --- a/src/conn/routines/exec.rs +++ b/src/conn/routines/exec.rs @@ -25,6 +25,8 @@ impl<'a> ExecRoutine<'a> { impl Routine<()> for ExecRoutine<'_> { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.execs.incr(); + #[cfg(feature = "tracing")] let span = info_span!( "mysql_async::exec", diff --git a/src/conn/routines/next_set.rs b/src/conn/routines/next_set.rs index ecb2784a..dcb8ce04 100644 --- a/src/conn/routines/next_set.rs +++ b/src/conn/routines/next_set.rs @@ -24,6 +24,8 @@ where P: Protocol, { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.next_sets.incr(); + #[cfg(feature = "tracing")] let span = debug_span!( "mysql_async::next_set", diff --git a/src/conn/routines/ping.rs b/src/conn/routines/ping.rs index 5f9d017e..bbf9011a 100644 --- a/src/conn/routines/ping.rs +++ b/src/conn/routines/ping.rs @@ -14,6 +14,8 @@ pub struct PingRoutine; impl Routine<()> for PingRoutine { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.pings.incr(); + #[cfg(feature = "tracing")] let span = debug_span!("mysql_async::ping", mysql_async.connection.id = conn.id()); diff --git a/src/conn/routines/prepare.rs b/src/conn/routines/prepare.rs index 33970e58..205d1ac8 100644 --- a/src/conn/routines/prepare.rs +++ b/src/conn/routines/prepare.rs @@ -26,6 +26,8 @@ impl PrepareRoutine { impl Routine> for PrepareRoutine { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result>> { + conn.metrics().routines.prepares.incr(); + #[cfg(feature = "tracing")] let span = info_span!( "mysql_async::prepare", diff --git a/src/conn/routines/query.rs b/src/conn/routines/query.rs index a82864f6..280a971d 100644 --- a/src/conn/routines/query.rs +++ b/src/conn/routines/query.rs @@ -29,6 +29,8 @@ impl<'a, L: TracingLevel> QueryRoutine<'a, L> { impl Routine<()> for QueryRoutine<'_, L> { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.queries.incr(); + #[cfg(feature = "tracing")] let span = create_span!( L::LEVEL, diff --git a/src/conn/routines/reset.rs b/src/conn/routines/reset.rs index b52beb8d..76a5d29c 100644 --- a/src/conn/routines/reset.rs +++ b/src/conn/routines/reset.rs @@ -14,6 +14,8 @@ pub struct ResetRoutine; impl Routine<()> for ResetRoutine { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.resets.incr(); + #[cfg(feature = "tracing")] let span = debug_span!("mysql_async::reset", mysql_async.connection.id = conn.id()); diff --git a/src/conn/stmt_cache.rs b/src/conn/stmt_cache.rs index 595836ac..f581a7c7 100644 --- a/src/conn/stmt_cache.rs +++ b/src/conn/stmt_cache.rs @@ -16,7 +16,10 @@ use std::{ sync::Arc, }; -use crate::queryable::stmt::StmtInner; +use crate::{ + metrics::{ConnMetrics, StmtCacheMetrics}, + queryable::stmt::StmtInner, +}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct QueryString(pub Arc<[u8]>); @@ -43,14 +46,17 @@ pub struct StmtCache { cap: usize, cache: LruCache, query_map: HashMap>, + + pub(super) conn_metrics: Arc, } impl StmtCache { - pub fn new(cap: usize) -> Self { + pub fn new(cap: usize, conn_metrics: Arc) -> Self { Self { cap, cache: LruCache::unbounded(), query_map: Default::default(), + conn_metrics, } } @@ -63,13 +69,21 @@ impl StmtCache { { let id = self.query_map.get(query).cloned(); match id { - Some(id) => self.cache.get(&id), - None => None, + Some(id) => { + self.metrics().hits.incr(); + self.cache.get(&id) + } + None => { + self.metrics().misses.incr(); + None + } } } pub fn put(&mut self, query: Arc<[u8]>, stmt: Arc) -> Option> { if self.cap == 0 { + self.metrics().drops.incr(); + // drops return None; } @@ -77,9 +91,11 @@ impl StmtCache { self.query_map.insert(query.clone(), stmt.id()); self.cache.put(stmt.id(), Entry { stmt, query }); + self.metrics().additions.incr(); if self.cache.len() > self.cap { if let Some((_, entry)) = self.cache.pop_lru() { + self.metrics().evictions.incr(); self.query_map.remove(&*entry.query.0.as_ref()); return Some(entry.stmt); } @@ -91,14 +107,20 @@ impl StmtCache { pub fn clear(&mut self) { self.query_map.clear(); self.cache.clear(); + self.metrics().resets.incr(); } pub fn remove(&mut self, id: u32) { if let Some(entry) = self.cache.pop(&id) { self.query_map.remove::<[u8]>(entry.query.borrow()); + self.metrics().removals.incr(); } } + fn metrics(&self) -> &StmtCacheMetrics { + &self.conn_metrics.stmt_cache + } + #[cfg(test)] pub fn iter(&self) -> impl Iterator { self.cache.iter() diff --git a/src/lib.rs b/src/lib.rs index 5d6d78b7..dd03c9bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -438,6 +438,7 @@ mod connection_like; mod error; mod io; mod local_infile_handler; +pub mod metrics; mod opts; mod query; mod queryable; diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 00000000..4d186182 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,109 @@ +use serde::Serialize; +#[cfg(feature = "metrics")] +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct PoolMetrics { + pub gets: Counter, + pub reuses: Counter, + pub direct_returnals: Counter, + pub discards: Counter, + + pub recycler: RecyclerMetrics, + pub conn: Arc, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct RecyclerMetrics { + pub recycles: Counter, + pub discards: Counter, + pub cleans: Counter, + pub recycled_returnals: Counter, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct ConnMetrics { + pub connects: Counter, + pub reuses: Counter, + pub disconnects: Counter, + + pub routines: RoutinesMetrics, + pub stmt_cache: StmtCacheMetrics, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct RoutinesMetrics { + pub queries: Counter, + pub prepares: Counter, + pub execs: Counter, + pub pings: Counter, + pub resets: Counter, + pub next_sets: Counter, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct StmtCacheMetrics { + pub additions: Counter, + pub drops: Counter, + pub removals: Counter, + pub evictions: Counter, + pub resets: Counter, + pub hits: Counter, + pub misses: Counter, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct BufferPoolMetrics { + pub creations: Counter, + pub reuses: Counter, + pub returns: Counter, + pub discards: Counter, +} + +#[cfg(feature = "metrics")] +impl BufferPoolMetrics { + pub fn snapshot_global() -> Self { + crate::BUFFER_POOL.snapshot_metrics() + } +} + +#[cfg(feature = "metrics")] +#[derive(Debug, Default, Serialize)] +pub struct Counter(AtomicUsize); + +#[cfg(not(feature = "metrics"))] +#[derive(Clone, Debug, Default, Serialize)] +pub struct Counter; + +impl Counter { + #[cfg(feature = "metrics")] + #[inline] + pub fn value(&self) -> usize { + self.0.load(Ordering::Relaxed) + } + + #[inline] + pub(crate) fn incr(&self) { + self.incr_by(1) + } + + #[inline] + pub(crate) fn incr_by(&self, count: usize) { + #[cfg(feature = "metrics")] + self.0.fetch_add(count, Ordering::Relaxed); + } +} + +#[cfg(feature = "metrics")] +impl Clone for Counter { + fn clone(&self) -> Self { + Counter(AtomicUsize::new(self.0.load(Ordering::Relaxed))) + } +}