diff --git a/Cargo.toml b/Cargo.toml index 69b16ac9..36a444a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ once_cell = "1.7.2" pem = "3.0" percent-encoding = "2.1.0" pin-project = "1.0.2" -serde = "1" +serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" socket2 = "0.5.2" thiserror = "1.0.4" @@ -78,6 +78,7 @@ rand = "0.8.0" [features] default = [ + "metrics", "flate2/zlib", "mysql_common/bigdecimal", "mysql_common/rust_decimal", @@ -95,6 +96,7 @@ default-rustls = [ "derive", "rustls-tls", ] +metrics = [] minimal = ["flate2/zlib"] native-tls-tls = ["native-tls", "tokio-native-tls"] rustls-tls = [ diff --git a/src/buffer_pool.rs b/src/buffer_pool.rs index 03b1a4cc..b26851c0 100644 --- a/src/buffer_pool.rs +++ b/src/buffer_pool.rs @@ -6,6 +6,7 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +use crate::metrics::BufferPoolMetrics; use crossbeam::queue::ArrayQueue; use std::{mem::replace, ops::Deref, sync::Arc}; @@ -14,6 +15,7 @@ pub struct BufferPool { buffer_size_cap: usize, buffer_init_cap: usize, pool: ArrayQueue>, + metrics: BufferPoolMetrics, } impl BufferPool { @@ -37,14 +39,21 @@ impl BufferPool { pool: ArrayQueue::new(pool_cap), buffer_size_cap, buffer_init_cap, + metrics: Default::default(), } } pub fn get(self: &Arc) -> PooledBuf { - let buf = self - .pool - .pop() - .unwrap_or_else(|| Vec::with_capacity(self.buffer_init_cap)); + let buf = match self.pool.pop() { + Some(buf) => { + self.metrics.reuses.incr(); + buf + } + None => { + self.metrics.creations.incr(); + Vec::with_capacity(self.buffer_init_cap) + } + }; debug_assert_eq!(buf.len(), 0); PooledBuf(buf, self.clone()) } @@ -64,7 +73,15 @@ impl BufferPool { buf.shrink_to(self.buffer_size_cap); // ArrayQueue will make sure to drop the buffer if capacity is exceeded - let _ = self.pool.push(buf); + match self.pool.push(buf) { + Ok(()) => self.metrics.returns.incr(), + Err(_buf) => self.metrics.discards.incr(), + }; + } + + #[cfg(feature = "metrics")] + pub(crate) fn snapshot_metrics(&self) -> BufferPoolMetrics { + self.metrics.clone() } } diff --git a/src/conn/mod.rs b/src/conn/mod.rs index 356c5658..4d1450ed 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -39,6 +39,7 @@ use crate::{ consts::{CapabilityFlags, Command, StatusFlags}, error::*, io::Stream, + metrics::ConnMetrics, opts::Opts, queryable::{ query_result::{QueryResult, ResultSetMeta}, @@ -59,6 +60,8 @@ const DEFAULT_WAIT_TIMEOUT: usize = 28800; /// Helper that asynchronously disconnects the givent connection on the default tokio executor. fn disconnect(mut conn: Conn) { + conn.metrics().disconnects.incr(); + let disconnected = conn.inner.disconnected; // Mark conn as disconnected. @@ -119,6 +122,7 @@ struct ConnInner { /// One-time connection-level infile handler. infile_handler: Option> + Send + Sync + 'static>>>, + conn_metrics: Arc, } impl fmt::Debug for ConnInner { @@ -140,6 +144,7 @@ impl fmt::Debug for ConnInner { impl ConnInner { /// Constructs an empty connection. fn empty(opts: Opts) -> ConnInner { + let conn_metrics: Arc = Default::default(); ConnInner { capabilities: opts.get_capabilities(), status: StatusFlags::empty(), @@ -154,7 +159,7 @@ impl ConnInner { tx_status: TxStatus::None, last_io: Instant::now(), wait_timeout: Duration::from_secs(0), - stmt_cache: StmtCache::new(opts.stmt_cache_size()), + stmt_cache: StmtCache::new(opts.stmt_cache_size(), conn_metrics.clone()), socket: opts.socket().map(Into::into), opts, nonce: Vec::default(), @@ -164,6 +169,7 @@ impl ConnInner { server_key: None, infile_handler: None, reset_upon_returning_to_a_pool: false, + conn_metrics, } } @@ -175,6 +181,18 @@ impl ConnInner { .as_mut() .ok_or_else(|| DriverError::ConnectionClosed.into()) } + + fn set_pool(&mut self, pool: Option) { + let conn_metrics = if let Some(ref pool) = pool { + Arc::clone(&pool.inner.metrics.conn) + } else { + Default::default() + }; + self.conn_metrics = Arc::clone(&conn_metrics); + self.stmt_cache.conn_metrics = conn_metrics; + + self.pool = pool; + } } /// MySql server connection. @@ -926,6 +944,8 @@ impl Conn { conn.run_init_commands().await?; conn.run_setup_commands().await?; + conn.metrics().connects.incr(); + Ok(conn) } .boxed() @@ -1162,6 +1182,10 @@ impl Conn { self.inner.stmt_cache.clear(); self.inner.infile_handler = None; self.run_setup_commands().await?; + // self.inner.set_pool(pool); + + // TODO: clear some metrics? + Ok(()) } @@ -1276,6 +1300,10 @@ impl Conn { Ok(BinlogStream::new(self)) } + + pub(crate) fn metrics(&self) -> &ConnMetrics { + &self.inner.conn_metrics + } } #[cfg(test)] diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 8b21e685..476beeb7 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -142,9 +142,10 @@ impl Future for GetConn { return match result { Ok(mut c) => { - c.inner.pool = Some(pool); + c.inner.set_pool(Some(pool)); c.inner.reset_upon_returning_to_a_pool = self.reset_upon_returning_to_a_pool; + c.metrics().connects.incr(); Poll::Ready(Ok(c)) } Err(e) => { @@ -160,7 +161,8 @@ impl Future for GetConn { self.inner = GetConnInner::Done; let pool = self.pool_take(); - c.inner.pool = Some(pool); + pool.inner.metrics.reuses.incr(); + c.inner.set_pool(Some(pool)); c.inner.reset_upon_returning_to_a_pool = self.reset_upon_returning_to_a_pool; return Poll::Ready(Ok(c)); diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 9fc29e71..9d332343 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -25,6 +25,7 @@ use std::{ use crate::{ conn::{pool::futures::*, Conn}, error::*, + metrics::PoolMetrics, opts::{Opts, PoolOpts}, queryable::transaction::{Transaction, TxOpts}, }; @@ -121,6 +122,10 @@ impl Waitlist { self.queue.remove(&tmp); } + fn len(&self) -> usize { + self.queue.len() + } + fn is_empty(&self) -> bool { self.queue.is_empty() } @@ -177,6 +182,7 @@ pub struct Inner { close: atomic::AtomicBool, closed: atomic::AtomicBool, exchange: Mutex, + pub(crate) metrics: PoolMetrics, } /// Asynchronous pool of MySql connections. @@ -190,7 +196,7 @@ pub struct Inner { #[derive(Debug, Clone)] pub struct Pool { opts: Opts, - inner: Arc, + pub(super) inner: Arc, drop: mpsc::UnboundedSender>, } @@ -219,6 +225,7 @@ impl Pool { exist: 0, recycler: Some((rx, pool_opts)), }), + metrics: Default::default(), }), drop: tx, } @@ -232,6 +239,7 @@ impl Pool { /// Async function that resolves to `Conn`. pub fn get_conn(&self) -> GetConn { + self.inner.metrics.gets.incr(); let reset_connection = self.opts.pool_opts().reset_connection(); GetConn::new(self, reset_connection) } @@ -250,6 +258,11 @@ impl Pool { DisconnectPool::new(self) } + #[cfg(feature = "metrics")] + pub fn snapshot_metrics(&self) -> PoolMetrics { + self.inner.metrics.clone() + } + /// A way to return connection taken from a pool. fn return_conn(&mut self, conn: Conn) { // NOTE: we're not in async context here, so we can't block or return NotReady @@ -258,6 +271,8 @@ impl Pool { } fn send_to_recycler(&self, conn: Conn) { + self.inner.metrics.recycler.recycles.incr(); + if let Err(conn) = self.drop.send(Some(conn)) { let conn = conn.0.unwrap(); @@ -354,6 +369,19 @@ impl Pool { let mut exchange = self.inner.exchange.lock().unwrap(); exchange.waiting.remove(queue_id); } + + /// Returns the number of + /// - open connections, + /// - idling connections in the pool and + /// - tasks waiting for a connection. + pub fn queue_stats(&self) -> (usize, usize, usize) { + let exchange = self.inner.exchange.lock().unwrap(); + ( + exchange.exist, + exchange.available.len(), + exchange.waiting.len(), + ) + } } impl Drop for Conn { diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index 1ea855c0..c2744b23 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -67,8 +67,10 @@ impl Future for Recycler { let mut exchange = $self.inner.exchange.lock().unwrap(); if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() { drop(exchange); + $self.inner.metrics.recycler.discards.incr(); $self.discard.push($conn.close_conn().boxed()); } else { + $self.inner.metrics.recycler.recycled_returnals.incr(); exchange.available.push_back($conn.into()); if let Some(w) = exchange.waiting.pop() { w.wake(); @@ -80,11 +82,14 @@ impl Future for Recycler { macro_rules! conn_decision { ($self:ident, $conn:ident) => { if $conn.inner.stream.is_none() || $conn.inner.disconnected { + $self.inner.metrics.recycler.discards.incr(); // drop unestablished connection $self.discard.push(futures_util::future::ok(()).boxed()); } else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() { + $self.inner.metrics.recycler.cleans.incr(); $self.cleaning.push($conn.cleanup_for_pool().boxed()); } else if $conn.expired() || close { + $self.inner.metrics.recycler.discards.incr(); $self.discard.push($conn.close_conn().boxed()); } else if $conn.inner.reset_upon_returning_to_a_pool { $self.reset.push($conn.reset_for_pool().boxed()); diff --git a/src/conn/routines/change_user.rs b/src/conn/routines/change_user.rs index 28b51d4e..453ecac6 100644 --- a/src/conn/routines/change_user.rs +++ b/src/conn/routines/change_user.rs @@ -17,6 +17,8 @@ pub struct ChangeUser; impl Routine<()> for ChangeUser { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.change_user.incr(); + #[cfg(feature = "tracing")] let span = debug_span!( "mysql_async::change_user", diff --git a/src/conn/routines/exec.rs b/src/conn/routines/exec.rs index 262a90c9..41b27670 100644 --- a/src/conn/routines/exec.rs +++ b/src/conn/routines/exec.rs @@ -25,6 +25,8 @@ impl<'a> ExecRoutine<'a> { impl Routine<()> for ExecRoutine<'_> { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.execs.incr(); + #[cfg(feature = "tracing")] let span = info_span!( "mysql_async::exec", diff --git a/src/conn/routines/next_set.rs b/src/conn/routines/next_set.rs index ecb2784a..dcb8ce04 100644 --- a/src/conn/routines/next_set.rs +++ b/src/conn/routines/next_set.rs @@ -24,6 +24,8 @@ where P: Protocol, { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.next_sets.incr(); + #[cfg(feature = "tracing")] let span = debug_span!( "mysql_async::next_set", diff --git a/src/conn/routines/ping.rs b/src/conn/routines/ping.rs index 5f9d017e..bbf9011a 100644 --- a/src/conn/routines/ping.rs +++ b/src/conn/routines/ping.rs @@ -14,6 +14,8 @@ pub struct PingRoutine; impl Routine<()> for PingRoutine { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.pings.incr(); + #[cfg(feature = "tracing")] let span = debug_span!("mysql_async::ping", mysql_async.connection.id = conn.id()); diff --git a/src/conn/routines/prepare.rs b/src/conn/routines/prepare.rs index 33970e58..205d1ac8 100644 --- a/src/conn/routines/prepare.rs +++ b/src/conn/routines/prepare.rs @@ -26,6 +26,8 @@ impl PrepareRoutine { impl Routine> for PrepareRoutine { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result>> { + conn.metrics().routines.prepares.incr(); + #[cfg(feature = "tracing")] let span = info_span!( "mysql_async::prepare", diff --git a/src/conn/routines/query.rs b/src/conn/routines/query.rs index a82864f6..280a971d 100644 --- a/src/conn/routines/query.rs +++ b/src/conn/routines/query.rs @@ -29,6 +29,8 @@ impl<'a, L: TracingLevel> QueryRoutine<'a, L> { impl Routine<()> for QueryRoutine<'_, L> { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.queries.incr(); + #[cfg(feature = "tracing")] let span = create_span!( L::LEVEL, diff --git a/src/conn/routines/reset.rs b/src/conn/routines/reset.rs index b52beb8d..76a5d29c 100644 --- a/src/conn/routines/reset.rs +++ b/src/conn/routines/reset.rs @@ -14,6 +14,8 @@ pub struct ResetRoutine; impl Routine<()> for ResetRoutine { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.resets.incr(); + #[cfg(feature = "tracing")] let span = debug_span!("mysql_async::reset", mysql_async.connection.id = conn.id()); diff --git a/src/conn/stmt_cache.rs b/src/conn/stmt_cache.rs index 595836ac..f581a7c7 100644 --- a/src/conn/stmt_cache.rs +++ b/src/conn/stmt_cache.rs @@ -16,7 +16,10 @@ use std::{ sync::Arc, }; -use crate::queryable::stmt::StmtInner; +use crate::{ + metrics::{ConnMetrics, StmtCacheMetrics}, + queryable::stmt::StmtInner, +}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct QueryString(pub Arc<[u8]>); @@ -43,14 +46,17 @@ pub struct StmtCache { cap: usize, cache: LruCache, query_map: HashMap>, + + pub(super) conn_metrics: Arc, } impl StmtCache { - pub fn new(cap: usize) -> Self { + pub fn new(cap: usize, conn_metrics: Arc) -> Self { Self { cap, cache: LruCache::unbounded(), query_map: Default::default(), + conn_metrics, } } @@ -63,13 +69,21 @@ impl StmtCache { { let id = self.query_map.get(query).cloned(); match id { - Some(id) => self.cache.get(&id), - None => None, + Some(id) => { + self.metrics().hits.incr(); + self.cache.get(&id) + } + None => { + self.metrics().misses.incr(); + None + } } } pub fn put(&mut self, query: Arc<[u8]>, stmt: Arc) -> Option> { if self.cap == 0 { + self.metrics().drops.incr(); + // drops return None; } @@ -77,9 +91,11 @@ impl StmtCache { self.query_map.insert(query.clone(), stmt.id()); self.cache.put(stmt.id(), Entry { stmt, query }); + self.metrics().additions.incr(); if self.cache.len() > self.cap { if let Some((_, entry)) = self.cache.pop_lru() { + self.metrics().evictions.incr(); self.query_map.remove(&*entry.query.0.as_ref()); return Some(entry.stmt); } @@ -91,14 +107,20 @@ impl StmtCache { pub fn clear(&mut self) { self.query_map.clear(); self.cache.clear(); + self.metrics().resets.incr(); } pub fn remove(&mut self, id: u32) { if let Some(entry) = self.cache.pop(&id) { self.query_map.remove::<[u8]>(entry.query.borrow()); + self.metrics().removals.incr(); } } + fn metrics(&self) -> &StmtCacheMetrics { + &self.conn_metrics.stmt_cache + } + #[cfg(test)] pub fn iter(&self) -> impl Iterator { self.cache.iter() diff --git a/src/lib.rs b/src/lib.rs index 9c5ff836..168aa43e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -440,6 +440,7 @@ mod connection_like; mod error; mod io; mod local_infile_handler; +mod metrics; mod opts; mod query; mod queryable; @@ -536,6 +537,10 @@ pub mod futures { pub use crate::conn::pool::futures::{DisconnectPool, GetConn}; } +#[cfg(feature = "metrics")] +#[doc(inline)] +pub use metrics::{BufferPoolMetrics, PoolMetrics}; + /// Traits used in this crate pub mod prelude { #[doc(inline)] diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 00000000..d93062ef --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,148 @@ +use serde::Serialize; +#[cfg(feature = "metrics")] +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +#[derive(Debug, Default, Serialize)] +pub struct PoolMetrics { + pub gets: Counter, + pub reuses: Counter, + pub direct_returnals: Counter, + pub discards: Counter, + + pub(crate) recycler: RecyclerMetrics, + pub(crate) conn: Arc, +} + +// Manually implemented Clone to clone ConnMetrics, not the Arc. +impl Clone for PoolMetrics { + fn clone(&self) -> Self { + PoolMetrics { + gets: self.gets.clone(), + reuses: self.reuses.clone(), + direct_returnals: self.direct_returnals.clone(), + discards: self.discards.clone(), + recycler: self.recycler.clone(), + conn: Arc::new(self.conn.as_ref().clone()), + } + } +} + +impl PoolMetrics { + #[inline] + pub fn recycler(&self) -> &RecyclerMetrics { + &self.recycler + } + + #[inline] + pub fn connections(&self) -> &ConnMetrics { + &self.conn + } +} + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct RecyclerMetrics { + pub recycles: Counter, + pub discards: Counter, + pub cleans: Counter, + pub recycled_returnals: Counter, +} + +#[derive(Clone, Debug, Default, Serialize)] +pub struct ConnMetrics { + pub connects: Counter, + pub reuses: Counter, + pub disconnects: Counter, + + pub(crate) routines: RoutineMetrics, + pub(crate) stmt_cache: StmtCacheMetrics, +} + +impl ConnMetrics { + #[inline] + pub fn routines(&self) -> &RoutineMetrics { + &self.routines + } + + #[inline] + pub fn stmt_cache(&self) -> &StmtCacheMetrics { + &self.stmt_cache + } +} + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct RoutineMetrics { + pub change_user: Counter, + pub queries: Counter, + pub prepares: Counter, + pub execs: Counter, + pub pings: Counter, + pub resets: Counter, + pub next_sets: Counter, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct StmtCacheMetrics { + pub additions: Counter, + pub drops: Counter, + pub removals: Counter, + pub evictions: Counter, + pub resets: Counter, + pub hits: Counter, + pub misses: Counter, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[non_exhaustive] +pub struct BufferPoolMetrics { + pub creations: Counter, + pub reuses: Counter, + pub returns: Counter, + pub discards: Counter, +} + +#[cfg(feature = "metrics")] +impl BufferPoolMetrics { + #[inline] + pub fn snapshot_global() -> Self { + crate::BUFFER_POOL.snapshot_metrics() + } +} + +#[cfg(feature = "metrics")] +#[derive(Debug, Default, Serialize)] +pub struct Counter(AtomicUsize); + +#[cfg(not(feature = "metrics"))] +#[derive(Clone, Debug, Default, Serialize)] +pub struct Counter; + +impl Counter { + #[cfg(feature = "metrics")] + #[inline] + pub fn value(&self) -> usize { + self.0.load(Ordering::Relaxed) + } + + #[inline] + pub(crate) fn incr(&self) { + self.incr_by(1) + } + + #[allow(unused_variables)] + #[inline] + pub(crate) fn incr_by(&self, count: usize) { + #[cfg(feature = "metrics")] + self.0.fetch_add(count, Ordering::Relaxed); + } +} + +#[cfg(feature = "metrics")] +impl Clone for Counter { + fn clone(&self) -> Self { + Counter(AtomicUsize::new(self.0.load(Ordering::Relaxed))) + } +}