Skip to content

Commit 22e43d3

Browse files
committed
Adding TokenBucket implementation
1 parent e47fc48 commit 22e43d3

15 files changed

+767
-0
lines changed

TwentyTwenty.BaseLine.sln

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{B24C04AA-739
77
EndProject
88
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TwentyTwenty.BaseLine", "src\TwentyTwenty.BaseLine\TwentyTwenty.BaseLine.csproj", "{949D9C42-4EC9-4F6C-AFE7-F0F70DDB56AB}"
99
EndProject
10+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{7A8BC93C-05C5-4EEE-944F-364AED32DE46}"
11+
EndProject
12+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TwentyTwenty.BaseLine.Tests", "test\TwentyTwenty.BaseLine.Tests\TwentyTwenty.BaseLine.Tests.csproj", "{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}"
13+
EndProject
1014
Global
1115
GlobalSection(SolutionConfigurationPlatforms) = preSolution
1216
Debug|Any CPU = Debug|Any CPU
@@ -32,8 +36,21 @@ Global
3236
{949D9C42-4EC9-4F6C-AFE7-F0F70DDB56AB}.Release|x64.Build.0 = Release|Any CPU
3337
{949D9C42-4EC9-4F6C-AFE7-F0F70DDB56AB}.Release|x86.ActiveCfg = Release|Any CPU
3438
{949D9C42-4EC9-4F6C-AFE7-F0F70DDB56AB}.Release|x86.Build.0 = Release|Any CPU
39+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
40+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Debug|Any CPU.Build.0 = Debug|Any CPU
41+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Debug|x64.ActiveCfg = Debug|Any CPU
42+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Debug|x64.Build.0 = Debug|Any CPU
43+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Debug|x86.ActiveCfg = Debug|Any CPU
44+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Debug|x86.Build.0 = Debug|Any CPU
45+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Release|Any CPU.ActiveCfg = Release|Any CPU
46+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Release|Any CPU.Build.0 = Release|Any CPU
47+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Release|x64.ActiveCfg = Release|Any CPU
48+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Release|x64.Build.0 = Release|Any CPU
49+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Release|x86.ActiveCfg = Release|Any CPU
50+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8}.Release|x86.Build.0 = Release|Any CPU
3551
EndGlobalSection
3652
GlobalSection(NestedProjects) = preSolution
3753
{949D9C42-4EC9-4F6C-AFE7-F0F70DDB56AB} = {B24C04AA-739A-4AAB-B60E-6FA5BB07355F}
54+
{83EA4B8B-C5EF-456B-A4A2-CF9DB07B81D8} = {7A8BC93C-05C5-4EEE-944F-364AED32DE46}
3855
EndGlobalSection
3956
EndGlobal
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using System;
2+
3+
namespace TwentyTwenty.BaseLine.RateLimiting
4+
{
5+
/// <summary>
6+
/// A token bucket refill strategy that will provide N tokens for a token bucket to consume every T units of time.
7+
/// The tokens are refilled in bursts rather than at a fixed rate. This refill strategy will never allow more than
8+
/// N tokens to be consumed during a window of time T.
9+
/// </summary>
10+
public class FixedIntervalRefillStrategy : IRefillStrategy
11+
{
12+
private readonly Ticker _ticker;
13+
private readonly long _numTokens;
14+
private readonly long _periodInTicks;
15+
private long _nextRefillTime;
16+
private readonly object _syncRoot = new object();
17+
18+
/// <summary>Create a FixedIntervalRefillStrategy.</summary>
19+
/// <param name="ticker">A ticker to use to measure time.</param>
20+
/// <param name="numTokens">The number of tokens to add to the bucket every interval.</param>
21+
/// <param name="period">How often to refill the bucket.</param>
22+
public FixedIntervalRefillStrategy(Ticker ticker, long numTokens, TimeSpan period)
23+
{
24+
_ticker = ticker;
25+
_numTokens = numTokens;
26+
_periodInTicks = period.Ticks;
27+
_nextRefillTime = -1;
28+
}
29+
30+
public long Refill()
31+
{
32+
lock (_syncRoot)
33+
{
34+
var now = _ticker.Read();
35+
if (now < _nextRefillTime)
36+
{
37+
return 0;
38+
}
39+
var refillAmount = Math.Max((now - _nextRefillTime) / _periodInTicks, 1);
40+
_nextRefillTime += _periodInTicks * refillAmount;
41+
return _numTokens * refillAmount;
42+
}
43+
}
44+
}
45+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace TwentyTwenty.BaseLine.RateLimiting
2+
{
3+
/// <summary>
4+
/// Encapsulation of a refilling strategy for a token bucket.
5+
/// </summary>
6+
public interface IRefillStrategy
7+
{
8+
/// <summary>Returns the number of tokens to add to the token bucket.</summary>
9+
/// <returns>The number of tokens to add to the token bucket.</returns>
10+
long Refill();
11+
}
12+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace TwentyTwenty.BaseLine.RateLimiting
2+
{
3+
/// <summary>
4+
/// Encapsulation of a strategy for relinquishing control of the CPU.
5+
/// </summary>
6+
public interface ISleepStrategy
7+
{
8+
/// <summary>
9+
/// Sleep for a short period of time to allow other threads and system processes to execute.
10+
/// </summary>
11+
void Sleep();
12+
}
13+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System.Threading.Tasks;
2+
3+
namespace TwentyTwenty.BaseLine.RateLimiting
4+
{
5+
/// <summary>
6+
/// A token bucket is used for rate limiting access to a portion of code.
7+
///
8+
/// See <a href="http://en.wikipedia.org/wiki/Token_bucket">Token Bucket on Wikipedia</a>
9+
/// See <a href="http://en.wikipedia.org/wiki/Leaky_bucket">Leaky Bucket on Wikipedia</a>
10+
/// </summary>
11+
public interface ITokenBucket
12+
{
13+
/// <summary>
14+
/// Attempt to consume a single token from the bucket. If it was consumed then <code>true</code>
15+
/// is returned, otherwise <code>false</code> is returned.
16+
/// </summary>
17+
/// <returns><code>true</code> if the tokens were consumed, <code>false</code> otherwise.</returns>
18+
bool TryConsume();
19+
20+
/// <summary>
21+
/// Attempt to consume a specified number of tokens from the bucket. If the tokens were consumed then <code>true</code>
22+
/// is returned, otherwise <code>false</code> is returned.
23+
/// </summary>
24+
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
25+
/// <returns><code>true</code> if the tokens were consumed, <code>false</code> otherwise.</returns>
26+
bool TryConsume(long numTokens);
27+
28+
/// <summary>
29+
/// Consume a single token from the bucket. If no token is currently available then this method will block until a
30+
/// token becomes available.
31+
/// </summary>
32+
void Consume();
33+
34+
/// <summary>
35+
/// Consume a single token from the bucket asynchronously. This method does not block
36+
/// <returns>A task that returns once a token has been consumed</returns>
37+
/// </summary>
38+
Task ConsumeAsync();
39+
40+
/// <summary>
41+
/// Consumes multiple tokens from the bucket. If enough tokens are not currently available then this method will block
42+
/// </summary>
43+
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
44+
void Consume(long numTokens);
45+
46+
/// <summary>
47+
/// Consume multiple tokens from the bucket asynchronously. This method does not block
48+
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
49+
/// <returns>A task that returns once the requested tokens have been consumed</returns>
50+
/// </summary>
51+
Task ConsumeAsync(long numTokens);
52+
}
53+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System;
2+
3+
namespace TwentyTwenty.BaseLine.RateLimiting
4+
{
5+
public abstract class Ticker
6+
{
7+
private class SystemTicker : Ticker
8+
{
9+
public override long Read()
10+
{
11+
return DateTime.Now.Ticks;
12+
}
13+
}
14+
15+
private static readonly Ticker SystemTickerInstance = new SystemTicker();
16+
17+
public abstract long Read();
18+
19+
public static Ticker Default()
20+
{
21+
return SystemTickerInstance;
22+
}
23+
}
24+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace TwentyTwenty.BaseLine.RateLimiting
5+
{
6+
/// <summary>
7+
/// A token bucket implementation that is of a leaky bucket in the sense that it has a finite capacity and any added
8+
/// tokens that would exceed this capacity will "overflow" out of the bucket and are lost forever.
9+
///
10+
/// In this implementation the rules for refilling the bucket are encapsulated in a provided <see cref="IRefillStrategy"/>
11+
/// instance. Prior to attempting to consume any tokens the refill strategy will be consulted to see how many tokens
12+
/// should be added to the bucket.
13+
///
14+
/// In addition in this implementation the method of yielding CPU control is encapsulated in the provided
15+
/// <see cref="ISleepStrategy"/> instance. For high performance applications where tokens are being refilled incredibly quickly
16+
/// and an accurate bucket implementation is required, it may be useful to never yield control of the CPU and to instead
17+
/// busy wait. This strategy allows the caller to make this decision for themselves instead of the library forcing a
18+
/// decision.
19+
/// </summary>
20+
internal class TokenBucket : ITokenBucket
21+
{
22+
private readonly long _capacity;
23+
private readonly IRefillStrategy _refillStrategy;
24+
private readonly ISleepStrategy _sleepStrategy;
25+
private long _size;
26+
private readonly object _syncRoot = new object();
27+
28+
public TokenBucket(long capacity, IRefillStrategy refillStrategy, ISleepStrategy sleepStrategy)
29+
{
30+
_capacity = capacity;
31+
_refillStrategy = refillStrategy;
32+
_sleepStrategy = sleepStrategy;
33+
_size = 0;
34+
}
35+
36+
/// <summary>
37+
/// Attempt to consume a single token from the bucket. If it was consumed then <code>true</code>
38+
/// is returned, otherwise <code>false</code> is returned.
39+
/// </summary>
40+
/// <returns><code>true</code> if the tokens were consumed, <code>false</code> otherwise.</returns>
41+
public bool TryConsume()
42+
{
43+
return TryConsume(1);
44+
}
45+
46+
/// <summary>
47+
/// Attempt to consume a specified number of tokens from the bucket. If the tokens were consumed then <code>true</code>
48+
/// is returned, otherwise <code>false</code> is returned.
49+
/// </summary>
50+
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
51+
/// <returns><code>true</code> if the tokens were consumed, <code>false</code> otherwise.</returns>
52+
public bool TryConsume(long numTokens)
53+
{
54+
if (numTokens <= 0)
55+
throw new ArgumentOutOfRangeException("numTokens", "Number of tokens to consume must be positive");
56+
if (numTokens > _capacity)
57+
throw new ArgumentOutOfRangeException("numTokens", "Number of tokens to consume must be less than the capacity of the bucket.");
58+
59+
lock (_syncRoot)
60+
{
61+
// Give the refill strategy a chance to add tokens if it needs to, but beware of overflow
62+
var newTokens = Math.Min(_capacity, Math.Max(0, _refillStrategy.Refill()));
63+
_size = Math.Max(0, Math.Min(_size + newTokens, _capacity));
64+
65+
if (numTokens > _size) return false;
66+
67+
// Now try to consume some tokens
68+
_size -= numTokens;
69+
return true;
70+
}
71+
}
72+
73+
/// <summary>
74+
/// Consume a single token from the bucket. If no token is currently available then this method will block until a
75+
/// token becomes available.
76+
/// </summary>
77+
public void Consume()
78+
{
79+
Consume(1);
80+
}
81+
82+
/// <summary>
83+
/// Consume a single token from the bucket asynchronously. This method does not block
84+
/// <returns>A task that returns once a token has been consumed</returns>
85+
/// </summary>
86+
public Task ConsumeAsync()
87+
{
88+
return ConsumeAsync(1);
89+
}
90+
91+
/// <summary>
92+
/// Consumes multiple tokens from the bucket. If enough tokens are not currently available then this method will block
93+
/// </summary>
94+
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
95+
public void Consume(long numTokens)
96+
{
97+
while (true) {
98+
if (TryConsume(numTokens)) {
99+
break;
100+
}
101+
102+
_sleepStrategy.Sleep();
103+
}
104+
}
105+
106+
/// <summary>
107+
/// Consume multiple tokens from the bucket asynchronously. This method does not block
108+
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
109+
/// <returns>A task that returns once the requested tokens have been consumed</returns>
110+
/// </summary>
111+
public Task ConsumeAsync(long numTokens)
112+
{
113+
return Task.Factory.StartNew(() => Consume(numTokens));
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)