Skip to content

Commit 8edc422

Browse files
committed
resolve most of the comments
1 parent 0771182 commit 8edc422

File tree

1 file changed

+18
-17
lines changed

1 file changed

+18
-17
lines changed

tokio/src/sync/semaphore.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -109,31 +109,34 @@ use std::sync::Arc;
109109
/// performance or even errors.
110110
///
111111
/// The provided example uses a `TokenBucket`, implemented using a semaphore, that
112-
/// limits operations to a specific rate. The token bucket will be refilled gradually.
113-
/// When the rate is exceeded, the `acquire` method will await until a token is available.
112+
/// limits operations to a specific rate although token buckets allow short bursts that are faster
113+
/// than the allowed rate. The token bucket will be refilled after each interval. When the rate is exceeded,
114+
/// the `acquire` method will await until a token is available. Note that this implementation is suboptimal
115+
/// when the rate is large, because it consumes a lot of cpu constantly looping and sleeping.
114116
/// ```
115117
/// use std::sync::Arc;
116-
/// use tokio::sync::{AcquireError, Semaphore, SemaphorePermit};
117-
/// use tokio::time::{sleep, Duration};
118+
/// use tokio::sync::{AcquireError, Semaphore};
119+
/// use tokio::time::{interval, Duration};
118120
///
119121
/// struct TokenBucket {
120122
/// sem: Arc<Semaphore>,
121123
/// jh: tokio::task::JoinHandle<()>,
122124
/// }
123125
///
124126
/// impl TokenBucket {
125-
/// fn new(rate: usize) -> Self {
127+
/// fn new(duration: Duration) -> Self {
128+
/// let rate = (1.0 / duration.as_secs_f32()) as usize;
126129
/// let sem = Arc::new(Semaphore::new(rate));
127130
///
128-
/// // refills the permits each 1/rate seconds.
131+
/// // refills the tokens at the end of each interval
129132
/// let jh = tokio::spawn({
130133
/// let sem = sem.clone();
134+
/// let mut interval = interval(duration);
135+
/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
131136
///
132137
/// async move {
133-
/// let time_slice = 1.0 / (rate as f32);
134-
///
135138
/// loop {
136-
/// sleep(Duration::from_secs_f32(time_slice)).await;
139+
/// interval.tick().await;
137140
///
138141
/// let cap = rate - sem.available_permits();
139142
/// sem.add_permits(std::cmp::min(cap, 1));
@@ -144,8 +147,8 @@ use std::sync::Arc;
144147
/// Self { jh, sem }
145148
/// }
146149
///
147-
/// async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
148-
/// self.sem.acquire().await
150+
/// async fn acquire(&self) -> Result<(), AcquireError> {
151+
/// self.sem.acquire().await.map(|p| p.forget())
149152
/// }
150153
///
151154
/// async fn close(self) {
@@ -158,14 +161,12 @@ use std::sync::Arc;
158161
///
159162
/// #[tokio::main]
160163
/// async fn main() {
161-
/// let bucket = TokenBucket::new(1);
164+
/// let ops = 1.0; // operation per second
165+
/// let update_interval = 1.0 / ops;
166+
/// let bucket = TokenBucket::new(Duration::from_secs_f32(update_interval));
162167
///
163168
/// for _ in 0..5 {
164-
/// bucket
165-
/// .acquire()
166-
/// .await
167-
/// .map(|permit| permit.forget())
168-
/// .unwrap();
169+
/// bucket.acquire().await.unwrap();
169170
///
170171
/// // do the operation
171172
/// }

0 commit comments

Comments
 (0)