Skip to content

Commit 5d3038b

Browse files
committed
load: Add "weight" load variant
Adds a `weight` load variant, which weights an inner load. This is useful in circumstances where it is desireable to artificially inflate or deflate load. One such example is canary deployments, where it might be preferable for a canary to accept less load than its non-canary counterparts. This change is adapted from the weight implementation that used to exist within tower but was removed (see a496fbf) and an associated unmerged PR (#282).
1 parent c5632a2 commit 5d3038b

File tree

5 files changed

+263
-4
lines changed

5 files changed

+263
-4
lines changed

tower/examples/tower-balance.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use tower::balance as lb;
1616
use tower::discover::{Change, Discover};
1717
use tower::limit::concurrency::ConcurrencyLimit;
1818
use tower::load;
19+
use tower::load::weight::{HasWeight, Weight};
1920
use tower::util::ServiceExt;
2021
use tower_service::Service;
2122

@@ -35,6 +36,7 @@ static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
3536
Duration::from_millis(500),
3637
Duration::from_millis(1000),
3738
];
39+
static ENDPOINT_WEIGHTS: [f64; 10] = [1.0, 1.0, 0.0, 0.01, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
3840

3941
struct Summary {
4042
latencies: Histogram<u64>,
@@ -55,6 +57,11 @@ async fn main() {
5557
print!("{}ms, ", l);
5658
}
5759
println!("]");
60+
print!("ENDPOINT_WEIGHTS=[");
61+
for weight in &ENDPOINT_WEIGHTS {
62+
print!("{weight}, ")
63+
}
64+
println!("]");
5865

5966
let decay = Duration::from_secs(10);
6067
let d = gen_disco();
@@ -66,17 +73,42 @@ async fn main() {
6673
));
6774
run("P2C+PeakEWMA...", pe).await;
6875

76+
let d = gen_disco();
77+
let pe = lb::p2c::Balance::new(load::WeightedDiscover::new(load::PeakEwmaDiscover::new(
78+
d,
79+
DEFAULT_RTT,
80+
decay,
81+
load::CompleteOnResponse::default(),
82+
)));
83+
run("P2C+PeakEWMA+Weighted...", pe).await;
84+
6985
let d = gen_disco();
7086
let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
7187
d,
7288
load::CompleteOnResponse::default(),
7389
));
7490
run("P2C+LeastLoaded...", ll).await;
91+
92+
let d = gen_disco();
93+
let ll = lb::p2c::Balance::new(load::WeightedDiscover::new(
94+
load::PendingRequestsDiscover::new(d, load::CompleteOnResponse::default()),
95+
));
96+
run("P2C+LeastLoaded+Weighted...", ll).await;
7597
}
7698

7799
type Error = Box<dyn std::error::Error + Send + Sync>;
78100

79-
type Key = usize;
101+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
102+
struct Key {
103+
instance: usize,
104+
weight: Weight,
105+
}
106+
107+
impl HasWeight for Key {
108+
fn weight(&self) -> Weight {
109+
self.weight
110+
}
111+
}
80112

81113
pin_project! {
82114
struct Disco<S> {
@@ -117,8 +149,9 @@ fn gen_disco() -> impl Discover<
117149
Disco::new(
118150
MAX_ENDPOINT_LATENCIES
119151
.iter()
152+
.zip(ENDPOINT_WEIGHTS)
120153
.enumerate()
121-
.map(|(instance, latency)| {
154+
.map(|(instance, (latency, weight))| {
122155
let svc = tower::service_fn(move |_| {
123156
let start = Instant::now();
124157

@@ -133,7 +166,12 @@ fn gen_disco() -> impl Discover<
133166
}
134167
});
135168

136-
(instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
169+
let key = Key {
170+
instance,
171+
weight: Weight::from(weight),
172+
};
173+
174+
(key, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
137175
})
138176
.collect(),
139177
)

tower/src/load/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//! - [`Constant`] — Always returns the same constant load value for a service.
77
//! - [`PendingRequests`] — Measures load by tracking the number of in-flight requests.
88
//! - [`PeakEwma`] — Measures load using a moving average of the peak latency for the service.
9+
//! - [`Weight`] - Adds a weighting to an inner Load.
910
//!
1011
//! In general, you will want to use one of these when using the types in [`tower::balance`] which
1112
//! balance services depending on their load. Which load metric to use depends on your exact
@@ -63,6 +64,7 @@ pub mod completion;
6364
mod constant;
6465
pub mod peak_ewma;
6566
pub mod pending_requests;
67+
pub mod weight;
6668

6769
pub use self::{
6870
completion::{CompleteOnResponse, TrackCompletion},
@@ -72,7 +74,10 @@ pub use self::{
7274
};
7375

7476
#[cfg(feature = "discover")]
75-
pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover};
77+
pub use self::{
78+
peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover,
79+
weight::WeightedDiscover,
80+
};
7681

7782
/// Types that implement this trait can give an estimate of how loaded they are.
7883
///

tower/src/load/peak_ewma.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ use pin_project_lite::pin_project;
1010
use std::pin::Pin;
1111

1212
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
13+
use super::weight::Weight;
1314
use super::Load;
1415
use std::task::{Context, Poll};
1516
use std::{
17+
ops,
1618
sync::{Arc, Mutex},
1719
time::Duration,
1820
};
@@ -69,6 +71,14 @@ pin_project! {
6971
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
7072
pub struct Cost(f64);
7173

74+
impl ops::Div<Weight> for Cost {
75+
type Output = f64;
76+
77+
fn div(self, weight: Weight) -> f64 {
78+
self.0 / weight
79+
}
80+
}
81+
7282
/// Tracks an in-flight request and updates the RTT-estimate on Drop.
7383
#[derive(Debug)]
7484
pub struct Handle {

tower/src/load/pending_requests.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ use crate::discover::{Change, Discover};
66
use futures_core::{ready, Stream};
77
#[cfg(feature = "discover")]
88
use pin_project_lite::pin_project;
9+
use std::ops;
910
#[cfg(feature = "discover")]
1011
use std::pin::Pin;
1112

1213
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
14+
use super::weight::Weight;
1315
use super::Load;
1416
use std::sync::Arc;
1517
use std::task::{Context, Poll};
@@ -43,6 +45,14 @@ pin_project! {
4345
#[derive(Clone, Copy, Debug, Default, PartialOrd, PartialEq, Ord, Eq)]
4446
pub struct Count(usize);
4547

48+
impl ops::Div<Weight> for Count {
49+
type Output = f64;
50+
51+
fn div(self, weight: Weight) -> f64 {
52+
self.0 / weight
53+
}
54+
}
55+
4656
/// Tracks an in-flight request by reference count.
4757
#[derive(Debug)]
4858
pub struct Handle(RefCount);

tower/src/load/weight.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
//! A [`Load`] implementation which implements weighting on top of an inner [`Load`].
2+
//!
3+
//! This can be useful in such cases as canary deployments, where it is desirable for a
4+
//! particular service to receive less than its fair share of load than other services.
5+
6+
#[cfg(feature = "discover")]
7+
use crate::discover::{Change, Discover};
8+
#[cfg(feature = "discover")]
9+
use futures_core::Stream;
10+
#[cfg(feature = "discover")]
11+
use pin_project_lite::pin_project;
12+
#[cfg(feature = "discover")]
13+
use std::pin::Pin;
14+
15+
use futures_core::ready;
16+
use std::ops;
17+
use std::task::{Context, Poll};
18+
use tower_service::Service;
19+
20+
use super::Load;
21+
22+
/// A weight on [0.0, ∞].
23+
///
24+
/// Lesser-weighted nodes receive less traffic than heavier-weighted nodes.
25+
///
26+
/// This is represented internally as an integer, rather than a float, so that it can implement
27+
/// `Hash` and `Eq`.
28+
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)]
29+
pub struct Weight(u32);
30+
31+
impl Weight {
32+
/// Minimum Weight
33+
pub const MIN: Weight = Weight(0);
34+
/// Unit of Weight - what 1.0_f64 corresponds to
35+
pub const UNIT: Weight = Weight(10_000);
36+
/// Maximum Weight
37+
pub const MAX: Weight = Weight(u32::MAX);
38+
}
39+
40+
impl Default for Weight {
41+
fn default() -> Self {
42+
Weight::UNIT
43+
}
44+
}
45+
46+
impl From<f64> for Weight {
47+
fn from(w: f64) -> Self {
48+
if w < 0.0 || w == f64::NAN {
49+
Self::MIN
50+
} else if w == f64::INFINITY {
51+
Self::MAX
52+
} else {
53+
Weight((w * (Weight::UNIT.0 as f64)).round() as u32)
54+
}
55+
}
56+
}
57+
58+
impl Into<f64> for Weight {
59+
fn into(self) -> f64 {
60+
(self.0 as f64) / (Weight::UNIT.0 as f64)
61+
}
62+
}
63+
64+
impl ops::Div<Weight> for f64 {
65+
type Output = f64;
66+
67+
fn div(self, w: Weight) -> f64 {
68+
if w == Weight::MIN {
69+
f64::INFINITY
70+
} else {
71+
let w: f64 = w.into();
72+
self / w
73+
}
74+
}
75+
}
76+
77+
impl ops::Div<Weight> for usize {
78+
type Output = f64;
79+
80+
fn div(self, w: Weight) -> f64 {
81+
(self as f64) / w
82+
}
83+
}
84+
85+
/// Measures the load of the underlying service by weighting that service's load by a constant
86+
/// weighting factor.
87+
#[derive(Clone, Debug, PartialEq, PartialOrd)]
88+
pub struct Weighted<S> {
89+
inner: S,
90+
weight: Weight,
91+
}
92+
93+
impl<S> Weighted<S> {
94+
/// Wraps an `S`-typed service so that its load is weighted by the given weight.
95+
pub fn new<W: Into<Weight>>(inner: S, w: W) -> Self {
96+
let weight = w.into();
97+
Self { inner, weight }
98+
}
99+
}
100+
101+
impl<S> Load for Weighted<S>
102+
where
103+
S: Load,
104+
S::Metric: ops::Div<Weight>,
105+
<S::Metric as ops::Div<Weight>>::Output: PartialOrd,
106+
{
107+
type Metric = <S::Metric as ops::Div<Weight>>::Output;
108+
109+
fn load(&self) -> Self::Metric {
110+
self.inner.load() / self.weight
111+
}
112+
}
113+
114+
impl<R, S: Service<R>> Service<R> for Weighted<S> {
115+
type Response = S::Response;
116+
type Error = S::Error;
117+
type Future = S::Future;
118+
119+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
120+
self.inner.poll_ready(cx)
121+
}
122+
123+
fn call(&mut self, req: R) -> Self::Future {
124+
self.inner.call(req)
125+
}
126+
}
127+
128+
#[cfg(feature = "discover")]
129+
pin_project! {
130+
/// Wraps a `D`-typed stream of discovered services with [`Weighted`].
131+
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
132+
#[derive(Debug)]
133+
pub struct WeightedDiscover<D>{
134+
#[pin]
135+
discover: D,
136+
}
137+
}
138+
139+
impl<D> WeightedDiscover<D> {
140+
/// Wraps a [`Discover`], wrapping all of its services with [`Weighted`].
141+
pub fn new(discover: D) -> Self {
142+
Self { discover }
143+
}
144+
}
145+
146+
/// Allows [`tower::Discover::Key`] to expose a weight, so that they can be included in a discover
147+
/// stream
148+
pub trait HasWeight {
149+
/// Returns the [`Weight`]
150+
fn weight(&self) -> Weight;
151+
}
152+
153+
impl<T: HasWeight> From<T> for Weighted<T> {
154+
fn from(inner: T) -> Self {
155+
let weight = inner.weight();
156+
Self { inner, weight }
157+
}
158+
}
159+
160+
impl<T> HasWeight for Weighted<T> {
161+
fn weight(&self) -> Weight {
162+
self.weight
163+
}
164+
}
165+
166+
#[cfg(feature = "discover")]
167+
impl<D> Stream for WeightedDiscover<D>
168+
where
169+
D: Discover,
170+
D::Key: HasWeight,
171+
{
172+
type Item = Result<Change<D::Key, Weighted<D::Service>>, D::Error>;
173+
174+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
175+
use self::Change::*;
176+
177+
let this = self.project();
178+
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
179+
None => return Poll::Ready(None),
180+
Some(Insert(k, svc)) => {
181+
let w = k.weight();
182+
Insert(k, Weighted::new(svc, w))
183+
}
184+
Some(Remove(k)) => Remove(k),
185+
};
186+
187+
Poll::Ready(Some(Ok(change)))
188+
}
189+
}
190+
191+
#[test]
192+
fn div_min() {
193+
assert_eq!(10.0 / Weight::MIN, f64::INFINITY);
194+
assert_eq!(10 / Weight::MIN, f64::INFINITY);
195+
assert_eq!(0 / Weight::MIN, f64::INFINITY);
196+
}

0 commit comments

Comments
 (0)