From 07ba72a7ec7abc96e129417d3acac54e7e63917b Mon Sep 17 00:00:00 2001 From: Aref Date: Sun, 24 Nov 2019 16:46:08 +0300 Subject: [PATCH] Allows parallel actions call limits the number of parallel calls per interval, regardless of execution time of each call. --- .../CallCountByIntervalAwaitableConstraint.cs | 121 ++++++++++++++++++ RateLimiter/TimeLimiter.cs | 10 ++ 2 files changed, 131 insertions(+) create mode 100644 RateLimiter/CallCountByIntervalAwaitableConstraint.cs diff --git a/RateLimiter/CallCountByIntervalAwaitableConstraint.cs b/RateLimiter/CallCountByIntervalAwaitableConstraint.cs new file mode 100644 index 0000000..2bf2b0b --- /dev/null +++ b/RateLimiter/CallCountByIntervalAwaitableConstraint.cs @@ -0,0 +1,121 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace RateLimiter +{ + /// + /// Provide an awaitable constraint based on number of times per duration + /// + public class CallCountByIntervalAwaitableConstraint : IAwaitableConstraint + { + /// + /// List of the last time stamps + /// + public IReadOnlyList TimeStamps => _TimeStamps.ToList(); + + /// + /// Stack of the last time stamps + /// + protected LimitedSizeStack _TimeStamps { get; } + + private int _Count { get; } + private TimeSpan _TimeSpan { get; } + private SemaphoreSlim _Semaphore { get; } = new SemaphoreSlim(1, 1); + private ITime _Time { get; } + + /// + /// Constructs a new AwaitableConstraint based on number of times per duration + /// + /// + /// + public CallCountByIntervalAwaitableConstraint(int count, TimeSpan timeSpan) : this(count, timeSpan, TimeSystem.StandardTime) + { + } + + internal CallCountByIntervalAwaitableConstraint(int count, TimeSpan timeSpan, ITime time) + { + if (count <= 0) + throw new ArgumentException("count should be strictly positive", nameof(count)); + + if (timeSpan.TotalMilliseconds <= 0) + throw new ArgumentException("timeSpan should be strictly positive", nameof(timeSpan)); + + _Count = count; + _TimeSpan = timeSpan; + _TimeStamps = new LimitedSizeStack(_Count); + _Time = time; + } + + /// + /// returns a task that will complete once the constraint is fulfilled + /// + /// + /// Cancel the wait + /// + /// + /// A disposable that should be disposed upon task completion + /// + public async Task WaitForReadiness(CancellationToken cancellationToken) + { + try + { + + await _Semaphore.WaitAsync(cancellationToken); + var count = 0; + var now = _Time.GetNow(); + var target = now - _TimeSpan; + LinkedListNode element = _TimeStamps.First, last = null; + while ((element != null) && (element.Value > target)) + { + last = element; + element = element.Next; + count++; + } + + if (count >= _Count) + { + var timeToWait = last.Value.Add(_TimeSpan) - now; + now = now.Add(timeToWait); + Debug.Assert(element == null); + Debug.Assert(last != null); + await _Time.GetDelay(timeToWait, cancellationToken); + } + _TimeStamps.Push(now); + _Semaphore.Release(); + + return new DisposeAction(OnEnded); + } + catch (Exception) + { + _Semaphore.Release(); + throw; + } + } + + /// + /// Clone CallCountByIntervalAwaitableConstraint + /// + /// + public IAwaitableConstraint Clone() + { + return new CallCountByIntervalAwaitableConstraint(_Count, _TimeSpan, _Time); + } + + private void OnEnded() + { + OnEnded(TimeSystem.StandardTime.GetNow()); + } + + /// + /// Called when action has been executed + /// + /// + protected virtual void OnEnded(DateTime now) + { + } + } +} diff --git a/RateLimiter/TimeLimiter.cs b/RateLimiter/TimeLimiter.cs index e36b8e9..13f176b 100644 --- a/RateLimiter/TimeLimiter.cs +++ b/RateLimiter/TimeLimiter.cs @@ -164,6 +164,16 @@ public static TimeLimiter GetFromMaxCountByInterval(int maxCount, TimeSpan timeS return new TimeLimiter(new CountByIntervalAwaitableConstraint(maxCount, timeSpan)); } + /// + /// Returns a TimeLimiter based on a maximum number of parallel calls + /// + /// + /// + /// + public static TimeLimiter GetFromMaxCallCountByInterval(int maxCount, TimeSpan timeSpan) + { + return new TimeLimiter(new CallCountByIntervalAwaitableConstraint(maxCount, timeSpan)); + } /// /// Create that will save state using action passed through parameter. ///