From 53270c01e975a4fba56b61975a66eee24649e9bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Tue, 26 Dec 2023 10:14:00 -0100 Subject: [PATCH 1/2] Hide GetConn This makes the future implementation private. This is a braking change, but: * It is unlikely anyone was using GetConn directly instead of just awaiting it. * This opens the way for replacing the manual implementation with an async fn. --- src/conn/pool/futures/get_conn.rs | 17 +++++++++++------ src/conn/pool/futures/mod.rs | 4 ++-- src/conn/pool/mod.rs | 8 ++++---- src/lib.rs | 2 +- tests/exports.rs | 2 +- 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index b89f9bc6..0b5672b7 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -58,20 +58,25 @@ impl fmt::Debug for GetConnInner { /// This future will take connection from a pool and resolve to [`Conn`]. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct GetConn { - pub(crate) queue_id: QueueId, - pub(crate) pool: Option, - pub(crate) inner: GetConnInner, +struct GetConn { + queue_id: QueueId, + pool: Option, + inner: GetConnInner, reset_upon_returning_to_a_pool: bool, #[cfg(feature = "tracing")] span: Arc, } +pub(crate) async fn get_conn(pool: Pool) -> Result { + let reset_connection = pool.opts.pool_opts().reset_connection(); + GetConn::new(pool, reset_connection).await +} + impl GetConn { - pub(crate) fn new(pool: &Pool, reset_upon_returning_to_a_pool: bool) -> GetConn { + fn new(pool: Pool, reset_upon_returning_to_a_pool: bool) -> GetConn { GetConn { queue_id: QueueId::next(), - pool: Some(pool.clone()), + pool: Some(pool), inner: GetConnInner::New, reset_upon_returning_to_a_pool, #[cfg(feature = "tracing")] diff --git a/src/conn/pool/futures/mod.rs b/src/conn/pool/futures/mod.rs index 00842994..6cf18080 100644 --- a/src/conn/pool/futures/mod.rs +++ b/src/conn/pool/futures/mod.rs @@ -6,8 +6,8 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +pub use self::disconnect_pool::DisconnectPool; +pub(crate) use self::get_conn::get_conn; pub(super) use self::get_conn::GetConnInner; -pub use self::{disconnect_pool::DisconnectPool, get_conn::GetConn}; - mod disconnect_pool; mod get_conn; diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 465ea8e4..a222da8a 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -15,6 +15,7 @@ use std::{ cmp::Reverse, collections::VecDeque, convert::TryFrom, + future::Future, hash::{Hash, Hasher}, str::FromStr, sync::{atomic, Arc, Mutex}, @@ -239,9 +240,8 @@ impl Pool { } /// Async function that resolves to `Conn`. - pub fn get_conn(&self) -> GetConn { - let reset_connection = self.opts.pool_opts().reset_connection(); - GetConn::new(self, reset_connection) + pub fn get_conn(&self) -> impl Future> { + get_conn(self.clone()) } /// Starts a new transaction. @@ -253,7 +253,7 @@ impl Pool { /// Async function that disconnects this pool from the server and resolves to `()`. /// /// **Note:** This Future won't resolve until all active connections, taken from it, - /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error. + /// are dropped or disonnected. Also all pending and new `get_conn()`'s will resolve to error. pub fn disconnect(self) -> DisconnectPool { DisconnectPool::new(self) } diff --git a/src/lib.rs b/src/lib.rs index af44f67f..e642287b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -542,7 +542,7 @@ pub use self::queryable::stmt::Statement; /// Futures used in this crate pub mod futures { - pub use crate::conn::pool::futures::{DisconnectPool, GetConn}; + pub use crate::conn::pool::futures::DisconnectPool; } /// Traits used in this crate diff --git a/tests/exports.rs b/tests/exports.rs index 6f9feef8..fb1fcd32 100644 --- a/tests/exports.rs +++ b/tests/exports.rs @@ -1,7 +1,7 @@ #[allow(unused_imports)] use mysql_async::{ consts, from_row, from_row_opt, from_value, from_value_opt, - futures::{DisconnectPool, GetConn}, + futures::DisconnectPool, params, prelude::{ BatchQuery, FromRow, FromValue, GlobalHandler, Protocol, Query, Queryable, StatementLike, From ad0d425f0cc402e3553d7c4c7a71b58c7e458332 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Tue, 26 Dec 2023 12:49:15 -0100 Subject: [PATCH 2/2] Re-implement get_conn as a plain async fn Avoiding the manual implementation of Future makes this code quite a bit simpler IMHO. --- src/conn/pool/futures/get_conn.rs | 199 ++++++++++++------------------ 1 file changed, 82 insertions(+), 117 deletions(-) diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 0b5672b7..e40b707e 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -6,19 +6,7 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. -use std::{ - fmt, - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -use futures_core::ready; -#[cfg(feature = "tracing")] -use { - std::sync::Arc, - tracing::{debug_span, Span}, -}; +use std::{fmt, future::poll_fn, task::Context}; use crate::{ conn::{ @@ -55,35 +43,29 @@ impl fmt::Debug for GetConnInner { } } -/// This future will take connection from a pool and resolve to [`Conn`]. #[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -struct GetConn { +struct GetConnState { queue_id: QueueId, pool: Option, inner: GetConnInner, - reset_upon_returning_to_a_pool: bool, - #[cfg(feature = "tracing")] - span: Arc, -} - -pub(crate) async fn get_conn(pool: Pool) -> Result { - let reset_connection = pool.opts.pool_opts().reset_connection(); - GetConn::new(pool, reset_connection).await } -impl GetConn { - fn new(pool: Pool, reset_upon_returning_to_a_pool: bool) -> GetConn { - GetConn { - queue_id: QueueId::next(), - pool: Some(pool), - inner: GetConnInner::New, - reset_upon_returning_to_a_pool, - #[cfg(feature = "tracing")] - span: Arc::new(debug_span!("mysql_async::get_conn")), +impl Drop for GetConnState { + fn drop(&mut self) { + // We drop a connection before it can be resolved, a.k.a. cancelling it. + // Make sure we maintain the necessary invariants towards the pool. + if let Some(pool) = self.pool.take() { + // Remove the waker from the pool's waitlist in case this task was + // woken by another waker, like from tokio::time::timeout. + pool.unqueue(self.queue_id); + if let GetConnInner::Connecting(..) = self.inner { + pool.cancel_connection(); + } } } +} +impl GetConnState { fn pool_mut(&mut self) -> &mut Pool { self.pool .as_mut() @@ -97,95 +79,78 @@ impl GetConn { } } -// this manual implementation of Future may seem stupid, but we sort -// of need it to get the dropping behavior we want. -impl Future for GetConn { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - #[cfg(feature = "tracing")] - let span = self.span.clone(); - #[cfg(feature = "tracing")] - let _span_guard = span.enter(); - loop { - match self.inner { - GetConnInner::New => { - let queue_id = self.queue_id; - let next = ready!(self.pool_mut().poll_new_conn(cx, queue_id))?; - match next { - GetConnInner::Connecting(conn_fut) => { - self.inner = GetConnInner::Connecting(conn_fut); - } - GetConnInner::Checking(conn_fut) => { - self.inner = GetConnInner::Checking(conn_fut); - } - GetConnInner::Done => unreachable!( - "Pool::poll_new_conn never gives out already-consumed GetConns" - ), - GetConnInner::New => { - unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") - } +/// This future will take connection from a pool and resolve to [`Conn`]. +#[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip_all))] +pub(crate) async fn get_conn(pool: Pool) -> Result { + let reset_upon_returning_to_a_pool = pool.opts.pool_opts().reset_connection(); + let queue_id = QueueId::next(); + let mut state = GetConnState { + queue_id, + pool: Some(pool), + inner: GetConnInner::New, + }; + + loop { + match state.inner { + GetConnInner::New => { + let pool = state.pool_mut(); + let poll_new = |cx: &mut Context<'_>| pool.poll_new_conn(cx, queue_id); + let next = poll_fn(poll_new).await?; + match next { + GetConnInner::Connecting(conn_fut) => { + state.inner = GetConnInner::Connecting(conn_fut); } - } - GetConnInner::Done => { - unreachable!("GetConn::poll polled after returning Async::Ready"); - } - GetConnInner::Connecting(ref mut f) => { - let result = ready!(Pin::new(f).poll(cx)); - let pool = self.pool_take(); - - self.inner = GetConnInner::Done; - - return match result { - Ok(mut c) => { - c.inner.pool = Some(pool); - c.inner.reset_upon_returning_to_a_pool = - self.reset_upon_returning_to_a_pool; - Poll::Ready(Ok(c)) - } - Err(e) => { - pool.cancel_connection(); - Poll::Ready(Err(e)) - } - }; - } - GetConnInner::Checking(ref mut f) => { - let result = ready!(Pin::new(f).poll(cx)); - match result { - Ok(mut c) => { - self.inner = GetConnInner::Done; - - let pool = self.pool_take(); - c.inner.pool = Some(pool); - c.inner.reset_upon_returning_to_a_pool = - self.reset_upon_returning_to_a_pool; - return Poll::Ready(Ok(c)); - } - Err(_) => { - // Idling connection is broken. We'll drop it and try again. - self.inner = GetConnInner::New; - - let pool = self.pool_mut(); - pool.cancel_connection(); - continue; - } + GetConnInner::Checking(conn_fut) => { + state.inner = GetConnInner::Checking(conn_fut); + } + GetConnInner::Done => unreachable!( + "Pool::poll_new_conn never gives out already-consumed GetConns" + ), + GetConnInner::New => { + unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") } } } - } - } -} + GetConnInner::Done => { + unreachable!("GetConn::poll polled after returning Async::Ready"); + } + GetConnInner::Connecting(ref mut f) => { + let result = f.await; + let pool = state.pool_take(); + state.inner = GetConnInner::Done; + + return match result { + Ok(mut c) => { + c.inner.pool = Some(pool); + c.inner.reset_upon_returning_to_a_pool = reset_upon_returning_to_a_pool; + Ok(c) + } + Err(e) => { + pool.cancel_connection(); + Err(e) + } + }; + } + GetConnInner::Checking(ref mut f) => { + let result = f.await; + match result { + Ok(mut c) => { + state.inner = GetConnInner::Done; + + let pool = state.pool_take(); + c.inner.pool = Some(pool); + c.inner.reset_upon_returning_to_a_pool = reset_upon_returning_to_a_pool; + return Ok(c); + } + Err(_) => { + // Idling connection is broken. We'll drop it and try again. + state.inner = GetConnInner::New; -impl Drop for GetConn { - fn drop(&mut self) { - // We drop a connection before it can be resolved, a.k.a. cancelling it. - // Make sure we maintain the necessary invariants towards the pool. - if let Some(pool) = self.pool.take() { - // Remove the waker from the pool's waitlist in case this task was - // woken by another waker, like from tokio::time::timeout. - pool.unqueue(self.queue_id); - if let GetConnInner::Connecting(..) = self.inner { - pool.cancel_connection(); + let pool = state.pool_mut(); + pool.cancel_connection(); + continue; + } + } } } }