diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 3f7db8a3cca..c1d66a78a0b 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -76,6 +76,89 @@ use std::sync::Arc; /// } /// ``` /// +/// Implement a simple token bucket for rate limiting +/// +/// Many applications and systems have constraints on the rate at which certain +/// operations should occur. Exceeding this rate can result in suboptimal +/// performance or even errors. +/// +/// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate +/// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that +/// arrive at the same time. +/// +/// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a +/// certain rate that defines the rate limit. When a burst of requests arrives, tokens are +/// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to +/// wait for new tokens to be added. +/// +/// Unlike the example that limits how many requests can be handled at the same time, we do not add +/// tokens back when we finish handling a request. Instead, tokens are added only by a timer task. +/// +/// Note that this implementation is suboptimal when the duration is small, because it consumes a +/// lot of cpu constantly looping and sleeping. +/// +/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket +/// ``` +/// use std::sync::Arc; +/// use tokio::sync::{AcquireError, Semaphore}; +/// use tokio::time::{interval, Duration}; +/// +/// struct TokenBucket { +/// sem: Arc, +/// jh: tokio::task::JoinHandle<()>, +/// } +/// +/// impl TokenBucket { +/// fn new(duration: Duration, capacity: usize) -> Self { +/// let sem = Arc::new(Semaphore::new(capacity)); +/// +/// // refills the tokens at the end of each interval +/// let jh = tokio::spawn({ +/// let sem = sem.clone(); +/// let mut interval = interval(duration); +/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); +/// +/// async move { +/// loop { +/// interval.tick().await; +/// +/// if sem.available_permits() < capacity { +/// sem.add_permits(1); +/// } +/// } +/// } +/// }); +/// +/// Self { jh, sem } +/// } +/// +/// async fn acquire(&self) -> Result<(), AcquireError> { +/// self.sem.acquire().await.map(|p| p.forget()) +/// } +/// +/// async fn close(self) { +/// self.sem.close(); +/// self.jh.abort(); +/// let _ = self.jh.await; +/// } +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let capacity = 5; // operation per second +/// let update_interval = Duration::from_secs_f32(1.0 / capacity as f32); +/// let bucket = TokenBucket::new(update_interval, capacity); +/// +/// for _ in 0..5 { +/// bucket.acquire().await.unwrap(); +/// +/// // do the operation +/// } +/// +/// bucket.close().await; +/// } +/// ``` +/// /// Limit the number of incoming requests being handled at the same time. /// /// Similar to limiting the number of simultaneously opened files, network handles