diff --git a/Cargo.toml b/Cargo.toml index 0af37c0e..5f66c086 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ pem = "1.0.1" percent-encoding = "2.1.0" pin-project = "1.0.2" priority-queue = "1" -serde = "1" +serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" socket2 = "0.4.2" thiserror = "1.0.4" @@ -92,6 +92,7 @@ default-rustls = [ "mysql_common/frunk", "rustls-tls", ] +metrics = [] minimal = ["flate2/zlib"] native-tls-tls = ["native-tls", "tokio-native-tls"] rustls-tls = [ diff --git a/src/buffer_pool.rs b/src/buffer_pool.rs index 03b1a4cc..bb4efaf3 100644 --- a/src/buffer_pool.rs +++ b/src/buffer_pool.rs @@ -6,6 +6,7 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +use crate::metrics::BufferPoolMetrics; use crossbeam::queue::ArrayQueue; use std::{mem::replace, ops::Deref, sync::Arc}; @@ -14,6 +15,7 @@ pub struct BufferPool { buffer_size_cap: usize, buffer_init_cap: usize, pool: ArrayQueue>, + metrics: BufferPoolMetrics, } impl BufferPool { @@ -37,14 +39,21 @@ impl BufferPool { pool: ArrayQueue::new(pool_cap), buffer_size_cap, buffer_init_cap, + metrics: Default::default(), } } pub fn get(self: &Arc) -> PooledBuf { - let buf = self - .pool - .pop() - .unwrap_or_else(|| Vec::with_capacity(self.buffer_init_cap)); + let buf = match self.pool.pop() { + Some(buf) => { + self.metrics.reuses.inc(); + buf + } + None => { + self.metrics.creations.inc(); + Vec::with_capacity(self.buffer_init_cap) + } + }; debug_assert_eq!(buf.len(), 0); PooledBuf(buf, self.clone()) } @@ -64,7 +73,15 @@ impl BufferPool { buf.shrink_to(self.buffer_size_cap); // ArrayQueue will make sure to drop the buffer if capacity is exceeded - let _ = self.pool.push(buf); + match self.pool.push(buf) { + Ok(()) => self.metrics.returns.inc(), + Err(_buf) => self.metrics.discards.inc(), + }; + } + + #[cfg(feature = "metrics")] + pub(crate) fn snapshot_metrics(&self) -> BufferPoolMetrics { + self.metrics.clone() } } diff --git a/src/conn/mod.rs b/src/conn/mod.rs index b4ead418..bea3b8bf 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -38,6 +38,7 @@ use crate::{ consts::{CapabilityFlags, Command, StatusFlags}, error::*, io::Stream, + metrics::ConnMetrics, opts::Opts, queryable::{ query_result::{QueryResult, ResultSetMeta}, @@ -56,6 +57,8 @@ pub mod stmt_cache; /// Helper that asynchronously disconnects the givent connection on the default tokio executor. fn disconnect(mut conn: Conn) { + conn.metrics().disconnects.inc(); + let disconnected = conn.inner.disconnected; // Mark conn as disconnected. @@ -114,6 +117,7 @@ struct ConnInner { /// One-time connection-level infile handler. infile_handler: Option> + Send + Sync + 'static>>>, + conn_metrics: Arc, } impl fmt::Debug for ConnInner { @@ -133,6 +137,7 @@ impl fmt::Debug for ConnInner { impl ConnInner { /// Constructs an empty connection. fn empty(opts: Opts) -> ConnInner { + let conn_metrics: Arc = Default::default(); ConnInner { capabilities: opts.get_capabilities(), status: StatusFlags::empty(), @@ -147,7 +152,7 @@ impl ConnInner { tx_status: TxStatus::None, last_io: Instant::now(), wait_timeout: Duration::from_secs(0), - stmt_cache: StmtCache::new(opts.stmt_cache_size()), + stmt_cache: StmtCache::new(opts.stmt_cache_size(), conn_metrics.clone()), socket: opts.socket().map(Into::into), opts, nonce: Vec::default(), @@ -155,6 +160,7 @@ impl ConnInner { auth_switched: false, disconnected: false, infile_handler: None, + conn_metrics, } } @@ -166,6 +172,18 @@ impl ConnInner { .as_mut() .ok_or_else(|| DriverError::ConnectionClosed.into()) } + + fn set_pool(&mut self, pool: Option) { + let conn_metrics = if let Some(ref pool) = pool { + Arc::clone(&pool.inner.metrics.conn) + } else { + Default::default() + }; + self.conn_metrics = Arc::clone(&conn_metrics); + self.stmt_cache.conn_metrics = conn_metrics; + + self.pool = pool; + } } /// MySql server connection. @@ -854,6 +872,8 @@ impl Conn { conn.read_wait_timeout().await?; conn.run_init_commands().await?; + conn.metrics().connects.inc(); + Ok(conn) } .boxed() @@ -957,7 +977,10 @@ impl Conn { self.inner.stmt_cache.clear(); self.inner.infile_handler = None; - self.inner.pool = pool; + self.inner.set_pool(pool); + + // TODO: clear some metrics? + Ok(()) } @@ -1062,6 +1085,10 @@ impl Conn { Ok(BinlogStream::new(self)) } + + pub(crate) fn metrics(&self) -> &ConnMetrics { + &self.inner.conn_metrics + } } #[cfg(test)] diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 73e8a999..164496ab 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -140,7 +140,8 @@ impl Future for GetConn { return match result { Ok(mut c) => { - c.inner.pool = Some(pool); + c.inner.set_pool(Some(pool)); + c.metrics().connects.inc(); Poll::Ready(Ok(c)) } Err(e) => { @@ -156,7 +157,7 @@ impl Future for GetConn { self.inner = GetConnInner::Done; let pool = self.pool_take(); - checked_conn.inner.pool = Some(pool); + checked_conn.inner.set_pool(Some(pool)); return Poll::Ready(Ok(checked_conn)); } Err(_) => { diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 9fa107be..c368fd36 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -25,6 +25,7 @@ use std::{ use crate::{ conn::{pool::futures::*, Conn}, error::*, + metrics::PoolMetrics, opts::{Opts, PoolOpts}, queryable::transaction::{Transaction, TxOpts, TxStatus}, }; @@ -177,6 +178,7 @@ pub struct Inner { close: atomic::AtomicBool, closed: atomic::AtomicBool, exchange: Mutex, + pub(crate) metrics: PoolMetrics, } /// Asynchronous pool of MySql connections. @@ -190,7 +192,7 @@ pub struct Inner { #[derive(Debug, Clone)] pub struct Pool { opts: Opts, - inner: Arc, + pub(super) inner: Arc, drop: mpsc::UnboundedSender>, } @@ -219,6 +221,7 @@ impl Pool { exist: 0, recycler: Some((rx, pool_opts)), }), + metrics: Default::default(), }), drop: tx, } @@ -232,6 +235,7 @@ impl Pool { /// Async function that resolves to `Conn`. pub fn get_conn(&self) -> GetConn { + self.inner.metrics.gets.inc(); GetConn::new(self) } @@ -249,6 +253,11 @@ impl Pool { DisconnectPool::new(self) } + #[cfg(feature = "metrics")] + pub fn snapshot_metrics(&self) -> PoolMetrics { + self.inner.metrics.clone() + } + /// A way to return connection taken from a pool. fn return_conn(&mut self, conn: Conn) { // NOTE: we're not in async context here, so we can't block or return NotReady @@ -264,11 +273,14 @@ impl Pool { { let mut exchange = self.inner.exchange.lock().unwrap(); if exchange.available.len() < self.opts.pool_opts().active_bound() { + self.inner.metrics.returns.inc(); exchange.available.push_back(conn.into()); if let Some(w) = exchange.waiting.pop() { w.wake(); } return; + } else { + self.inner.metrics.discards.inc(); } } @@ -276,6 +288,8 @@ impl Pool { } fn send_to_recycler(&self, conn: Conn) { + self.inner.metrics.recycler.recycles.inc(); + if let Err(conn) = self.drop.send(Some(conn)) { let conn = conn.0.unwrap(); diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index 2a704dbc..7fcd21a3 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -63,18 +63,23 @@ impl Future for Recycler { macro_rules! conn_decision { ($self:ident, $conn:ident) => { if $conn.inner.stream.is_none() || $conn.inner.disconnected { + self.inner.metrics.recycler.discards.inc(); // drop unestablished connection $self.discard.push(futures_util::future::ok(()).boxed()); } else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() { + self.inner.metrics.recycler.cleans.inc(); $self.cleaning.push($conn.cleanup_for_pool().boxed()); } else if $conn.expired() || close { + self.inner.metrics.recycler.discards.inc(); $self.discard.push($conn.close_conn().boxed()); } else { let mut exchange = $self.inner.exchange.lock().unwrap(); if exchange.available.len() >= $self.pool_opts.active_bound() { drop(exchange); + self.inner.metrics.recycler.discards.inc(); $self.discard.push($conn.close_conn().boxed()); } else { + self.inner.metrics.recycler.returns.inc(); exchange.available.push_back($conn.into()); if let Some(w) = exchange.waiting.pop() { w.wake(); diff --git a/src/conn/routines/exec.rs b/src/conn/routines/exec.rs index 81e7e61b..538e79b1 100644 --- a/src/conn/routines/exec.rs +++ b/src/conn/routines/exec.rs @@ -25,6 +25,8 @@ impl<'a> ExecRoutine<'a> { impl Routine<()> for ExecRoutine<'_> { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.execs.inc(); + #[cfg(feature = "tracing")] let span = info_span!( "mysql_async::exec", diff --git a/src/conn/routines/next_set.rs b/src/conn/routines/next_set.rs index 2f381cff..0e037f65 100644 --- a/src/conn/routines/next_set.rs +++ b/src/conn/routines/next_set.rs @@ -24,6 +24,8 @@ where P: Protocol, { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.next_sets.inc(); + #[cfg(feature = "tracing")] let span = debug_span!( "mysql_async::next_set", diff --git a/src/conn/routines/ping.rs b/src/conn/routines/ping.rs index e6d7910f..47805fe0 100644 --- a/src/conn/routines/ping.rs +++ b/src/conn/routines/ping.rs @@ -14,6 +14,8 @@ pub struct PingRoutine; impl Routine<()> for PingRoutine { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.pings.inc(); + #[cfg(feature = "tracing")] let span = debug_span!("mysql_async::ping", mysql_async.connection.id = conn.id()); diff --git a/src/conn/routines/prepare.rs b/src/conn/routines/prepare.rs index 3e41bf93..5f648f53 100644 --- a/src/conn/routines/prepare.rs +++ b/src/conn/routines/prepare.rs @@ -26,6 +26,8 @@ impl PrepareRoutine { impl Routine> for PrepareRoutine { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result>> { + conn.metrics().routines.prepares.inc(); + #[cfg(feature = "tracing")] let span = info_span!( "mysql_async::prepare", diff --git a/src/conn/routines/query.rs b/src/conn/routines/query.rs index 775a2644..87f14058 100644 --- a/src/conn/routines/query.rs +++ b/src/conn/routines/query.rs @@ -22,6 +22,8 @@ impl<'a> QueryRoutine<'a> { impl Routine<()> for QueryRoutine<'_> { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.queries.inc(); + #[cfg(feature = "tracing")] let span = info_span!( "mysql_async::query", diff --git a/src/conn/routines/reset.rs b/src/conn/routines/reset.rs index f48e9ef3..0210526c 100644 --- a/src/conn/routines/reset.rs +++ b/src/conn/routines/reset.rs @@ -14,6 +14,8 @@ pub struct ResetRoutine; impl Routine<()> for ResetRoutine { fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> { + conn.metrics().routines.resets.inc(); + #[cfg(feature = "tracing")] let span = debug_span!("mysql_async::reset", mysql_async.connection.id = conn.id()); diff --git a/src/conn/stmt_cache.rs b/src/conn/stmt_cache.rs index 595836ac..10b1bf61 100644 --- a/src/conn/stmt_cache.rs +++ b/src/conn/stmt_cache.rs @@ -16,7 +16,10 @@ use std::{ sync::Arc, }; -use crate::queryable::stmt::StmtInner; +use crate::{ + metrics::{ConnMetrics, StmtCacheMetrics}, + queryable::stmt::StmtInner, +}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct QueryString(pub Arc<[u8]>); @@ -43,14 +46,17 @@ pub struct StmtCache { cap: usize, cache: LruCache, query_map: HashMap>, + + pub(super) conn_metrics: Arc, } impl StmtCache { - pub fn new(cap: usize) -> Self { + pub fn new(cap: usize, conn_metrics: Arc) -> Self { Self { cap, cache: LruCache::unbounded(), query_map: Default::default(), + conn_metrics, } } @@ -63,13 +69,20 @@ impl StmtCache { { let id = self.query_map.get(query).cloned(); match id { - Some(id) => self.cache.get(&id), - None => None, + Some(id) => { + self.metrics().hits.inc(); + self.cache.get(&id) + } + None => { + self.metrics().misses.inc(); + None + } } } pub fn put(&mut self, query: Arc<[u8]>, stmt: Arc) -> Option> { if self.cap == 0 { + // drops return None; } @@ -77,9 +90,11 @@ impl StmtCache { self.query_map.insert(query.clone(), stmt.id()); self.cache.put(stmt.id(), Entry { stmt, query }); + self.metrics().additions.inc(); if self.cache.len() > self.cap { if let Some((_, entry)) = self.cache.pop_lru() { + self.metrics().evictions.inc(); self.query_map.remove(&*entry.query.0.as_ref()); return Some(entry.stmt); } @@ -91,14 +106,20 @@ impl StmtCache { pub fn clear(&mut self) { self.query_map.clear(); self.cache.clear(); + self.metrics().resets.inc(); } pub fn remove(&mut self, id: u32) { if let Some(entry) = self.cache.pop(&id) { self.query_map.remove::<[u8]>(entry.query.borrow()); + self.metrics().removals.inc(); } } + fn metrics(&self) -> &StmtCacheMetrics { + &self.conn_metrics.stmt_cache + } + #[cfg(test)] pub fn iter(&self) -> impl Iterator { self.cache.iter() diff --git a/src/lib.rs b/src/lib.rs index 2e0a6e09..ba9debb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -432,6 +432,7 @@ mod connection_like; mod error; mod io; mod local_infile_handler; +pub mod metrics; mod opts; mod query; mod queryable; diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 00000000..15c22c7d --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,96 @@ +use serde::Serialize; +#[cfg(feature = "metrics")] +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +#[derive(Clone, Debug, Default, Serialize)] +pub struct PoolMetrics { + pub gets: AtomicCounter, + pub returns: AtomicCounter, + pub discards: AtomicCounter, + + pub recycler: RecyclerMetrics, + pub conn: Arc, +} + +#[derive(Clone, Debug, Default, Serialize)] +pub struct RecyclerMetrics { + pub recycles: AtomicCounter, + pub discards: AtomicCounter, + pub cleans: AtomicCounter, + pub returns: AtomicCounter, +} + +#[derive(Clone, Debug, Default, Serialize)] +pub struct ConnMetrics { + pub connects: AtomicCounter, + pub reuses: AtomicCounter, + pub disconnects: AtomicCounter, + + pub routines: RoutinesMetrics, + pub stmt_cache: StmtCacheMetrics, +} + +#[derive(Clone, Debug, Default, Serialize)] +pub struct RoutinesMetrics { + pub queries: AtomicCounter, + pub prepares: AtomicCounter, + pub execs: AtomicCounter, + pub pings: AtomicCounter, + pub resets: AtomicCounter, + pub next_sets: AtomicCounter, +} + +#[derive(Clone, Debug, Default, Serialize)] +pub struct StmtCacheMetrics { + pub additions: AtomicCounter, + pub removals: AtomicCounter, + pub evictions: AtomicCounter, + pub resets: AtomicCounter, + pub hits: AtomicCounter, + pub misses: AtomicCounter, +} + +#[derive(Clone, Debug, Default, Serialize)] +pub struct BufferPoolMetrics { + pub creations: AtomicCounter, + pub reuses: AtomicCounter, + pub returns: AtomicCounter, + pub discards: AtomicCounter, +} + +#[cfg(feature = "metrics")] +impl BufferPoolMetrics { + pub fn snapshot_global() -> Self { + crate::BUFFER_POOL.snapshot_metrics() + } +} + +#[cfg(feature = "metrics")] +#[derive(Debug, Default, Serialize)] +pub struct AtomicCounter(AtomicUsize); + +#[cfg(not(feature = "metrics"))] +#[derive(Clone, Debug, Default, Serialize)] +pub struct AtomicCounter; + +impl AtomicCounter { + #[cfg(feature = "metrics")] + #[inline] + pub fn value(&self) -> usize { + self.0.load(Ordering::Relaxed) + } + + #[inline] + pub(crate) fn inc(&self) { + #[cfg(feature = "metrics")] + self.0.fetch_add(1, Ordering::Relaxed); + } +} + +#[cfg(feature = "metrics")] +impl Clone for AtomicCounter { + fn clone(&self) -> Self { + AtomicCounter(AtomicUsize::new(self.0.load(Ordering::Relaxed))) + } +}