Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add performance counters gated by metrics feature #231

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pem = "3.0"
percent-encoding = "2.1.0"
pin-project = "1.0.2"
rand = "0.8.5"
serde = "1"
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
socket2 = "0.5.2"
thiserror = "1.0.4"
Expand Down Expand Up @@ -95,6 +95,7 @@ default-rustls = [
"derive",
"rustls-tls",
]
metrics = []
minimal = ["flate2/zlib"]
native-tls-tls = ["native-tls", "tokio-native-tls"]
rustls-tls = [
Expand Down
27 changes: 22 additions & 5 deletions src/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.

use crate::metrics::BufferPoolMetrics;
use crossbeam::queue::ArrayQueue;
use std::{mem::replace, ops::Deref, sync::Arc};

Expand All @@ -14,6 +15,7 @@ pub struct BufferPool {
buffer_size_cap: usize,
buffer_init_cap: usize,
pool: ArrayQueue<Vec<u8>>,
metrics: BufferPoolMetrics,
}

impl BufferPool {
Expand All @@ -37,14 +39,21 @@ impl BufferPool {
pool: ArrayQueue::new(pool_cap),
buffer_size_cap,
buffer_init_cap,
metrics: Default::default(),
}
}

pub fn get(self: &Arc<Self>) -> PooledBuf {
let buf = self
.pool
.pop()
.unwrap_or_else(|| Vec::with_capacity(self.buffer_init_cap));
let buf = match self.pool.pop() {
Some(buf) => {
self.metrics.reuses.incr();
buf
}
None => {
self.metrics.creations.incr();
Vec::with_capacity(self.buffer_init_cap)
}
};
debug_assert_eq!(buf.len(), 0);
PooledBuf(buf, self.clone())
}
Expand All @@ -64,7 +73,15 @@ impl BufferPool {
buf.shrink_to(self.buffer_size_cap);

// ArrayQueue will make sure to drop the buffer if capacity is exceeded
let _ = self.pool.push(buf);
match self.pool.push(buf) {
Ok(()) => self.metrics.returns.incr(),
Err(_buf) => self.metrics.discards.incr(),
};
}

#[cfg(feature = "metrics")]
pub(crate) fn snapshot_metrics(&self) -> BufferPoolMetrics {
self.metrics.clone()
}
}

Expand Down
30 changes: 29 additions & 1 deletion src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::{
consts::{CapabilityFlags, Command, StatusFlags},
error::*,
io::Stream,
metrics::ConnMetrics,
opts::Opts,
queryable::{
query_result::{QueryResult, ResultSetMeta},
Expand All @@ -59,6 +60,8 @@ const DEFAULT_WAIT_TIMEOUT: usize = 28800;

/// Helper that asynchronously disconnects the givent connection on the default tokio executor.
fn disconnect(mut conn: Conn) {
conn.metrics().disconnects.incr();

let disconnected = conn.inner.disconnected;

// Mark conn as disconnected.
Expand Down Expand Up @@ -120,6 +123,7 @@ struct ConnInner {
/// One-time connection-level infile handler.
infile_handler:
Option<Pin<Box<dyn Future<Output = crate::Result<InfileData>> + Send + Sync + 'static>>>,
conn_metrics: Arc<ConnMetrics>,
}

impl fmt::Debug for ConnInner {
Expand All @@ -142,6 +146,7 @@ impl ConnInner {
/// Constructs an empty connection.
fn empty(opts: Opts) -> ConnInner {
let ttl_deadline = opts.pool_opts().new_connection_ttl_deadline();
let conn_metrics: Arc<ConnMetrics> = Default::default();
ConnInner {
capabilities: opts.get_capabilities(),
status: StatusFlags::empty(),
Expand All @@ -156,7 +161,7 @@ impl ConnInner {
tx_status: TxStatus::None,
last_io: Instant::now(),
wait_timeout: Duration::from_secs(0),
stmt_cache: StmtCache::new(opts.stmt_cache_size()),
stmt_cache: StmtCache::new(opts.stmt_cache_size(), conn_metrics.clone()),
socket: opts.socket().map(Into::into),
opts,
ttl_deadline,
Expand All @@ -167,6 +172,7 @@ impl ConnInner {
server_key: None,
infile_handler: None,
reset_upon_returning_to_a_pool: false,
conn_metrics,
}
}

Expand All @@ -178,6 +184,18 @@ impl ConnInner {
.as_mut()
.ok_or_else(|| DriverError::ConnectionClosed.into())
}

fn set_pool(&mut self, pool: Option<Pool>) {
let conn_metrics = if let Some(ref pool) = pool {
Arc::clone(&pool.inner.metrics.conn)
} else {
Default::default()
};
self.conn_metrics = Arc::clone(&conn_metrics);
self.stmt_cache.conn_metrics = conn_metrics;

self.pool = pool;
}
}

/// MySql server connection.
Expand Down Expand Up @@ -929,6 +947,8 @@ impl Conn {
conn.run_init_commands().await?;
conn.run_setup_commands().await?;

conn.metrics().connects.incr();

Ok(conn)
}
.boxed()
Expand Down Expand Up @@ -1170,6 +1190,10 @@ impl Conn {
self.inner.stmt_cache.clear();
self.inner.infile_handler = None;
self.run_setup_commands().await?;
// self.inner.set_pool(pool);

// TODO: clear some metrics?

Ok(())
}

Expand Down Expand Up @@ -1284,6 +1308,10 @@ impl Conn {

Ok(BinlogStream::new(self))
}

pub(crate) fn metrics(&self) -> &ConnMetrics {
&self.inner.conn_metrics
}
}

#[cfg(test)]
Expand Down
6 changes: 4 additions & 2 deletions src/conn/pool/futures/get_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ impl Future for GetConn {

return match result {
Ok(mut c) => {
c.inner.pool = Some(pool);
c.inner.set_pool(Some(pool));
c.inner.reset_upon_returning_to_a_pool =
self.reset_upon_returning_to_a_pool;
c.metrics().connects.incr();
Poll::Ready(Ok(c))
}
Err(e) => {
Expand All @@ -160,7 +161,8 @@ impl Future for GetConn {
self.inner = GetConnInner::Done;

let pool = self.pool_take();
c.inner.pool = Some(pool);
pool.inner.metrics.reuses.incr();
c.inner.set_pool(Some(pool));
c.inner.reset_upon_returning_to_a_pool =
self.reset_upon_returning_to_a_pool;
return Poll::Ready(Ok(c));
Expand Down
30 changes: 29 additions & 1 deletion src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
use crate::{
conn::{pool::futures::*, Conn},
error::*,
metrics::PoolMetrics,
opts::{Opts, PoolOpts},
queryable::transaction::{Transaction, TxOpts},
};
Expand Down Expand Up @@ -133,6 +134,10 @@ impl Waitlist {
self.queue.remove(&tmp);
}

fn len(&self) -> usize {
self.queue.len()
}

fn is_empty(&self) -> bool {
self.queue.is_empty()
}
Expand Down Expand Up @@ -189,6 +194,7 @@ pub struct Inner {
close: atomic::AtomicBool,
closed: atomic::AtomicBool,
exchange: Mutex<Exchange>,
pub(crate) metrics: PoolMetrics,
}

/// Asynchronous pool of MySql connections.
Expand All @@ -202,7 +208,7 @@ pub struct Inner {
#[derive(Debug, Clone)]
pub struct Pool {
opts: Opts,
inner: Arc<Inner>,
pub(super) inner: Arc<Inner>,
drop: mpsc::UnboundedSender<Option<Conn>>,
}

Expand Down Expand Up @@ -231,6 +237,7 @@ impl Pool {
exist: 0,
recycler: Some((rx, pool_opts)),
}),
metrics: Default::default(),
}),
drop: tx,
}
Expand All @@ -244,6 +251,7 @@ impl Pool {

/// Async function that resolves to `Conn`.
pub fn get_conn(&self) -> GetConn {
self.inner.metrics.gets.incr();
let reset_connection = self.opts.pool_opts().reset_connection();
GetConn::new(self, reset_connection)
}
Expand All @@ -262,6 +270,11 @@ impl Pool {
DisconnectPool::new(self)
}

#[cfg(feature = "metrics")]
pub fn snapshot_metrics(&self) -> PoolMetrics {
self.inner.metrics.clone()
}

/// A way to return connection taken from a pool.
fn return_conn(&mut self, conn: Conn) {
// NOTE: we're not in async context here, so we can't block or return NotReady
Expand All @@ -270,6 +283,8 @@ impl Pool {
}

fn send_to_recycler(&self, conn: Conn) {
self.inner.metrics.recycler.recycles.incr();

if let Err(conn) = self.drop.send(Some(conn)) {
let conn = conn.0.unwrap();

Expand Down Expand Up @@ -366,6 +381,19 @@ impl Pool {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.waiting.remove(queue_id);
}

/// Returns the number of
/// - open connections,
/// - idling connections in the pool and
/// - tasks waiting for a connection.
pub fn queue_stats(&self) -> (usize, usize, usize) {
let exchange = self.inner.exchange.lock().unwrap();
(
exchange.exist,
exchange.available.len(),
exchange.waiting.len(),
)
}
}

impl Drop for Conn {
Expand Down
5 changes: 5 additions & 0 deletions src/conn/pool/recycler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ impl Future for Recycler {
let mut exchange = $self.inner.exchange.lock().unwrap();
if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() {
drop(exchange);
$self.inner.metrics.recycler.discards.incr();
$self.discard.push($conn.close_conn().boxed());
} else {
$self.inner.metrics.recycler.recycled_returnals.incr();
exchange.available.push_back($conn.into());
if let Some(w) = exchange.waiting.pop() {
w.wake();
Expand All @@ -80,11 +82,14 @@ impl Future for Recycler {
macro_rules! conn_decision {
($self:ident, $conn:ident) => {
if $conn.inner.stream.is_none() || $conn.inner.disconnected {
$self.inner.metrics.recycler.discards.incr();
// drop unestablished connection
$self.discard.push(futures_util::future::ok(()).boxed());
} else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() {
$self.inner.metrics.recycler.cleans.incr();
$self.cleaning.push($conn.cleanup_for_pool().boxed());
} else if $conn.expired() || close {
$self.inner.metrics.recycler.discards.incr();
$self.discard.push($conn.close_conn().boxed());
} else if $conn.inner.reset_upon_returning_to_a_pool {
$self.reset.push($conn.reset_for_pool().boxed());
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/change_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub struct ChangeUser;

impl Routine<()> for ChangeUser {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.change_user.incr();

#[cfg(feature = "tracing")]
let span = debug_span!(
"mysql_async::change_user",
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ impl<'a> ExecRoutine<'a> {

impl Routine<()> for ExecRoutine<'_> {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.execs.incr();

#[cfg(feature = "tracing")]
let span = info_span!(
"mysql_async::exec",
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/next_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ where
P: Protocol,
{
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.next_sets.incr();

#[cfg(feature = "tracing")]
let span = debug_span!(
"mysql_async::next_set",
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct PingRoutine;

impl Routine<()> for PingRoutine {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.pings.incr();

#[cfg(feature = "tracing")]
let span = debug_span!("mysql_async::ping", mysql_async.connection.id = conn.id());

Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ impl PrepareRoutine {

impl Routine<Arc<StmtInner>> for PrepareRoutine {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<Arc<StmtInner>>> {
conn.metrics().routines.prepares.incr();

#[cfg(feature = "tracing")]
let span = info_span!(
"mysql_async::prepare",
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ impl<'a, L: TracingLevel> QueryRoutine<'a, L> {

impl<L: TracingLevel> Routine<()> for QueryRoutine<'_, L> {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.queries.incr();

#[cfg(feature = "tracing")]
let span = create_span!(
L::LEVEL,
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct ResetRoutine;

impl Routine<()> for ResetRoutine {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.resets.incr();

#[cfg(feature = "tracing")]
let span = debug_span!("mysql_async::reset", mysql_async.connection.id = conn.id());

Expand Down
Loading
Loading