Skip to content

Commit 9e8655f

Browse files
committed
feat(pool): add a Singleton pool type
1 parent b9dc3d2 commit 9e8655f

File tree

6 files changed

+662
-0
lines changed

6 files changed

+662
-0
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo
4242
http-body-util = "0.1.0"
4343
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
4444
tokio-test = "0.4"
45+
tower-test = "0.4"
4546
pretty_env_logger = "0.5"
4647

4748
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
@@ -60,6 +61,7 @@ default = []
6061
full = [
6162
"client",
6263
"client-legacy",
64+
"client-pool",
6365
"client-proxy",
6466
"client-proxy-system",
6567
"server",
@@ -74,6 +76,7 @@ full = [
7476

7577
client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"]
7678
client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"]
79+
client-pool = []
7780
client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"]
7881
client-proxy-system = ["dep:system-configuration", "dep:windows-registry"]
7982

src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,8 @@
44
#[cfg(feature = "client-legacy")]
55
pub mod legacy;
66

7+
#[cfg(feature = "client-pool")]
8+
pub mod pool;
9+
710
#[cfg(feature = "client-proxy")]
811
pub mod proxy;

src/client/pool/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
//! Composable pool services
2+
3+
pub mod singleton;

src/client/pool/singleton.rs

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
//! Singleton pools
2+
//!
3+
//! The singleton pool combines a MakeService that should only produce a single
4+
//! active connection. It can bundle all concurrent calls to it, so that only
5+
//! one connection is made. All calls to the singleton will return a clone of
6+
//! the inner service once established.
7+
//!
8+
//! This fits the HTTP/2 case well.
9+
10+
use std::sync::{Arc, Mutex};
11+
use std::task::{self, Poll};
12+
13+
use tokio::sync::oneshot;
14+
use tower_service::Service;
15+
16+
use self::internal::{DitchGuard, SingletonError, SingletonFuture};
17+
18+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
19+
20+
/// A singleton pool over an inner service.
21+
///
22+
/// The singleton wraps an inner service maker, bundling all calls to ensure
23+
/// only one service is created. Once made, it returns clones of the made
24+
/// service.
25+
#[derive(Debug)]
26+
pub struct Singleton<M, Dst>
27+
where
28+
M: Service<Dst>,
29+
{
30+
mk_svc: M,
31+
state: Arc<Mutex<State<M::Response>>>,
32+
}
33+
34+
#[derive(Debug)]
35+
enum State<S> {
36+
Empty,
37+
Making(Vec<oneshot::Sender<S>>),
38+
Made(S),
39+
}
40+
41+
impl<M, Target> Singleton<M, Target>
42+
where
43+
M: Service<Target>,
44+
M::Response: Clone,
45+
{
46+
/// Create a new singleton pool over an inner make service.
47+
pub fn new(mk_svc: M) -> Self {
48+
Singleton {
49+
mk_svc,
50+
state: Arc::new(Mutex::new(State::Empty)),
51+
}
52+
}
53+
54+
// pub fn clear? cancel?
55+
56+
/// Retains the inner made service if specified by the predicate.
57+
pub fn retain<F>(&mut self, mut predicate: F)
58+
where
59+
F: FnMut(&mut M::Response) -> bool,
60+
{
61+
let mut locked = self.state.lock().unwrap();
62+
match *locked {
63+
State::Empty => {}
64+
State::Making(..) => {}
65+
State::Made(ref mut svc) => {
66+
if !predicate(svc) {
67+
*locked = State::Empty;
68+
}
69+
}
70+
}
71+
}
72+
73+
/// Returns whether this singleton pool is empty.
74+
///
75+
/// If this pool has created a shared instance, or is currently in the
76+
/// process of creating one, this returns false.
77+
pub fn is_empty(&self) -> bool {
78+
matches!(*self.state.lock().unwrap(), State::Empty)
79+
}
80+
}
81+
82+
impl<M, Target> Service<Target> for Singleton<M, Target>
83+
where
84+
M: Service<Target>,
85+
M::Response: Clone,
86+
M::Error: Into<BoxError>,
87+
{
88+
type Response = M::Response;
89+
type Error = SingletonError;
90+
type Future = SingletonFuture<M::Future, M::Response>;
91+
92+
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
93+
if let State::Empty = *self.state.lock().unwrap() {
94+
return self
95+
.mk_svc
96+
.poll_ready(cx)
97+
.map_err(|e| SingletonError(e.into()));
98+
}
99+
Poll::Ready(Ok(()))
100+
}
101+
102+
fn call(&mut self, dst: Target) -> Self::Future {
103+
let mut locked = self.state.lock().unwrap();
104+
match *locked {
105+
State::Empty => {
106+
let fut = self.mk_svc.call(dst);
107+
*locked = State::Making(Vec::new());
108+
SingletonFuture::Driving {
109+
future: fut,
110+
singleton: DitchGuard(Arc::downgrade(&self.state)),
111+
}
112+
}
113+
State::Making(ref mut waiters) => {
114+
let (tx, rx) = oneshot::channel();
115+
waiters.push(tx);
116+
SingletonFuture::Waiting { rx }
117+
}
118+
State::Made(ref svc) => SingletonFuture::Made {
119+
svc: Some(svc.clone()),
120+
},
121+
}
122+
}
123+
}
124+
125+
impl<M, Target> Clone for Singleton<M, Target>
126+
where
127+
M: Service<Target> + Clone,
128+
{
129+
fn clone(&self) -> Self {
130+
Self {
131+
mk_svc: self.mk_svc.clone(),
132+
state: self.state.clone(),
133+
}
134+
}
135+
}
136+
137+
// Holds some "pub" items that otherwise shouldn't be public.
138+
mod internal {
139+
use std::future::Future;
140+
use std::pin::Pin;
141+
use std::sync::{Mutex, Weak};
142+
use std::task::{self, Poll};
143+
144+
use futures_core::ready;
145+
use pin_project_lite::pin_project;
146+
use tokio::sync::oneshot;
147+
148+
use super::{BoxError, State};
149+
150+
pin_project! {
151+
#[project = SingletonFutureProj]
152+
pub enum SingletonFuture<F, S> {
153+
Driving {
154+
#[pin]
155+
future: F,
156+
singleton: DitchGuard<S>,
157+
},
158+
Waiting {
159+
rx: oneshot::Receiver<S>,
160+
},
161+
Made {
162+
svc: Option<S>,
163+
},
164+
}
165+
}
166+
167+
// XXX: pub because of the enum SingletonFuture
168+
pub struct DitchGuard<S>(pub(super) Weak<Mutex<State<S>>>);
169+
170+
impl<F, S, E> Future for SingletonFuture<F, S>
171+
where
172+
F: Future<Output = Result<S, E>>,
173+
E: Into<BoxError>,
174+
S: Clone,
175+
{
176+
type Output = Result<S, SingletonError>;
177+
178+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
179+
match self.project() {
180+
SingletonFutureProj::Driving { future, singleton } => {
181+
match ready!(future.poll(cx)) {
182+
Ok(svc) => {
183+
if let Some(state) = singleton.0.upgrade() {
184+
let mut locked = state.lock().unwrap();
185+
singleton.0 = Weak::new();
186+
match std::mem::replace(&mut *locked, State::Made(svc.clone())) {
187+
State::Making(waiters) => {
188+
for tx in waiters {
189+
let _ = tx.send(svc.clone());
190+
}
191+
}
192+
State::Empty | State::Made(_) => {
193+
// shouldn't happen!
194+
unreachable!()
195+
}
196+
}
197+
}
198+
Poll::Ready(Ok(svc))
199+
}
200+
Err(e) => {
201+
if let Some(state) = singleton.0.upgrade() {
202+
let mut locked = state.lock().unwrap();
203+
singleton.0 = Weak::new();
204+
*locked = State::Empty;
205+
}
206+
Poll::Ready(Err(SingletonError(e.into())))
207+
}
208+
}
209+
}
210+
SingletonFutureProj::Waiting { rx } => match ready!(Pin::new(rx).poll(cx)) {
211+
Ok(svc) => Poll::Ready(Ok(svc)),
212+
Err(_canceled) => Poll::Ready(Err(SingletonError(Canceled.into()))),
213+
},
214+
SingletonFutureProj::Made { svc } => Poll::Ready(Ok(svc.take().unwrap())),
215+
}
216+
}
217+
}
218+
219+
impl<S> Drop for DitchGuard<S> {
220+
fn drop(&mut self) {
221+
if let Some(state) = self.0.upgrade() {
222+
if let Ok(mut locked) = state.lock() {
223+
*locked = State::Empty;
224+
}
225+
}
226+
}
227+
}
228+
229+
// An opaque error type. By not exposing the type, nor being specifically
230+
// Box<dyn Error>, we can _change_ the type once we no longer need the Canceled
231+
// error type. This will be possible with the refactor to baton passing.
232+
#[derive(Debug)]
233+
pub struct SingletonError(pub(super) BoxError);
234+
235+
impl std::fmt::Display for SingletonError {
236+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237+
f.write_str("singleton connection error")
238+
}
239+
}
240+
241+
impl std::error::Error for SingletonError {
242+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
243+
Some(&*self.0)
244+
}
245+
}
246+
247+
#[derive(Debug)]
248+
struct Canceled;
249+
250+
impl std::fmt::Display for Canceled {
251+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252+
f.write_str("singleton connection canceled")
253+
}
254+
}
255+
256+
impl std::error::Error for Canceled {}
257+
}
258+
259+
#[cfg(test)]
260+
mod tests {
261+
use std::future::Future;
262+
use std::pin::Pin;
263+
use std::task::Poll;
264+
265+
use tower_service::Service;
266+
267+
use super::Singleton;
268+
269+
#[tokio::test]
270+
async fn first_call_drives_subsequent_wait() {
271+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
272+
273+
let mut singleton = Singleton::new(mock_svc);
274+
275+
handle.allow(1);
276+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
277+
.await
278+
.unwrap();
279+
// First call: should go into Driving
280+
let fut1 = singleton.call(());
281+
// Second call: should go into Waiting
282+
let fut2 = singleton.call(());
283+
284+
// Expect exactly one request to the inner service
285+
let ((), send_response) = handle.next_request().await.unwrap();
286+
send_response.send_response("svc");
287+
288+
// Both futures should resolve to the same value
289+
assert_eq!(fut1.await.unwrap(), "svc");
290+
assert_eq!(fut2.await.unwrap(), "svc");
291+
}
292+
293+
#[tokio::test]
294+
async fn made_state_returns_immediately() {
295+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
296+
let mut singleton = Singleton::new(mock_svc);
297+
298+
handle.allow(1);
299+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
300+
.await
301+
.unwrap();
302+
// Drive first call to completion
303+
let fut1 = singleton.call(());
304+
let ((), send_response) = handle.next_request().await.unwrap();
305+
send_response.send_response("svc");
306+
assert_eq!(fut1.await.unwrap(), "svc");
307+
308+
// Second call should not hit inner service
309+
let res = singleton.call(()).await.unwrap();
310+
assert_eq!(res, "svc");
311+
}
312+
313+
#[tokio::test]
314+
async fn cancel_waiter_does_not_affect_others() {
315+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
316+
let mut singleton = Singleton::new(mock_svc);
317+
318+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
319+
.await
320+
.unwrap();
321+
let fut1 = singleton.call(());
322+
let fut2 = singleton.call(());
323+
drop(fut2); // cancel one waiter
324+
325+
let ((), send_response) = handle.next_request().await.unwrap();
326+
send_response.send_response("svc");
327+
328+
assert_eq!(fut1.await.unwrap(), "svc");
329+
}
330+
331+
// TODO: this should be able to be improved with a cooperative baton refactor
332+
#[tokio::test]
333+
async fn cancel_driver_cancels_all() {
334+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
335+
let mut singleton = Singleton::new(mock_svc);
336+
337+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
338+
.await
339+
.unwrap();
340+
let mut fut1 = singleton.call(());
341+
let fut2 = singleton.call(());
342+
343+
// poll driver just once, and then drop
344+
crate::common::future::poll_fn(move |cx| {
345+
let _ = Pin::new(&mut fut1).poll(cx);
346+
Poll::Ready(())
347+
})
348+
.await;
349+
350+
let ((), send_response) = handle.next_request().await.unwrap();
351+
send_response.send_response("svc");
352+
353+
assert_eq!(
354+
fut2.await.unwrap_err().0.to_string(),
355+
"singleton connection canceled"
356+
);
357+
}
358+
}

0 commit comments

Comments
 (0)