From 80338cf22f5a75f93b163b98844bb7e957324b42 Mon Sep 17 00:00:00 2001 From: Alexander Boychenko Date: Thu, 5 Dec 2019 12:24:31 +0700 Subject: [PATCH] Allows parallel actions execution. Crucial for slow actions --- RateLimiter.Tests/Sample.cs | 40 ++++++++++++ .../CountByIntervalAwaitableConstraint.cs | 63 +++++++++++++++---- RateLimiter/LimitedSizeStack.cs | 14 +++++ 3 files changed, 106 insertions(+), 11 deletions(-) diff --git a/RateLimiter.Tests/Sample.cs b/RateLimiter.Tests/Sample.cs index 52ed75b..18b13f2 100644 --- a/RateLimiter.Tests/Sample.cs +++ b/RateLimiter.Tests/Sample.cs @@ -159,5 +159,45 @@ public async Task Test100Thread() await Task.WhenAll(tasks.ToArray()); } + + [Fact] + public async Task ParallelSlowActionPerformance() + { + var stopWatch = Stopwatch.StartNew(); + var limiter = TimeLimiter.GetFromMaxCountByInterval(100, TimeSpan.FromMinutes(1)); + Func func = async () => await Task.Delay(300); + + await Task.WhenAll( + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func)); + stopWatch.Stop(); + + stopWatch.Elapsed.TotalSeconds.Should().BeLessThan(1); + } + + [Fact] + public async Task ParallelSlowActionPerformaceLimited() + { + var stopWatch = Stopwatch.StartNew(); + var limiter = TimeLimiter.GetFromMaxCountByInterval(3, TimeSpan.FromSeconds(1)); + Func func = async () => await Task.Delay(300); + + await Task.WhenAll( + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func), + limiter.Enqueue(func)); + stopWatch.Stop(); + + stopWatch.Elapsed.TotalSeconds.Should().BeLessThan(3); + } } } diff --git a/RateLimiter/CountByIntervalAwaitableConstraint.cs b/RateLimiter/CountByIntervalAwaitableConstraint.cs index 460be4f..690c3bd 100644 --- a/RateLimiter/CountByIntervalAwaitableConstraint.cs +++ b/RateLimiter/CountByIntervalAwaitableConstraint.cs @@ -15,16 +15,19 @@ public class CountByIntervalAwaitableConstraint : IAwaitableConstraint /// /// List of the last time stamps /// - public IReadOnlyList TimeStamps => _TimeStamps.ToList(); + public IReadOnlyList TimeStamps => _TimeStamps.ToList(); /// /// Stack of the last time stamps /// - protected LimitedSizeStack _TimeStamps { get; } + protected LimitedSizeStack _TimeStamps { get; } private int _Count { get; } private TimeSpan _TimeSpan { get; } - private SemaphoreSlim _Semaphore { get; } = new SemaphoreSlim(1, 1); + + private SemaphoreSlim _CapacitySemaphore { get; } + private SemaphoreSlim _ListSemaphore { get; } = new SemaphoreSlim(1, 1); + private SemaphoreSlim _DelaySemaphore { get; } = new SemaphoreSlim(1, 1); private ITime _Time { get; } /// @@ -46,8 +49,9 @@ internal CountByIntervalAwaitableConstraint(int count, TimeSpan timeSpan, ITime _Count = count; _TimeSpan = timeSpan; - _TimeStamps = new LimitedSizeStack(_Count); + _TimeStamps = new LimitedSizeStack(_Count); _Time = time; + _CapacitySemaphore = new SemaphoreSlim(_Count, _Count); } /// @@ -61,12 +65,31 @@ internal CountByIntervalAwaitableConstraint(int count, TimeSpan timeSpan, ITime /// public async Task WaitForReadiness(CancellationToken cancellationToken) { - await _Semaphore.WaitAsync(cancellationToken); + await _CapacitySemaphore.WaitAsync(cancellationToken); + try + { + await _DelaySemaphore.WaitAsync(cancellationToken); + } + catch + { + _CapacitySemaphore.Release(); + throw; + } + try + { + await _ListSemaphore.WaitAsync(cancellationToken); + } + catch + { + _DelaySemaphore.Release(); + _CapacitySemaphore.Release(); + throw; + } var count = 0; var now = _Time.GetNow(); var target = now - _TimeSpan; - LinkedListNode element = _TimeStamps.First, last = null; - while ((element != null) && (element.Value > target)) + LinkedListNode element = _TimeStamps.First, last = null; + while ((element != null) && (element.Value == null || element.Value > target)) { last = element; element = element.Next; @@ -74,18 +97,31 @@ public async Task WaitForReadiness(CancellationToken cancellationTo } if (count < _Count) + { + _TimeStamps.Push(null); + _DelaySemaphore.Release(); + _ListSemaphore.Release(); return new DisposeAction(OnEnded); + } + Debug.Assert(element == null); Debug.Assert(last != null); - var timeToWait = last.Value.Add(_TimeSpan) - now; + Debug.Assert(last.Value != null); + var timeToWait = last.Value.Value.Add(_TimeSpan) - now; + _ListSemaphore.Release(); try { await _Time.GetDelay(timeToWait, cancellationToken); + await _ListSemaphore.WaitAsync(cancellationToken); + _TimeStamps.Push(null); //also removes the oldest timestamp + _ListSemaphore.Release(); + _DelaySemaphore.Release(); } catch (Exception) { - _Semaphore.Release(); + _DelaySemaphore.Release(); + _CapacitySemaphore.Release(); throw; } @@ -103,10 +139,15 @@ public IAwaitableConstraint Clone() private void OnEnded() { + _ListSemaphore.Wait(); //always small amount of time var now = _Time.GetNow(); - _TimeStamps.Push(now); + if (!_TimeStamps.ReplaceLast(null, now)) + { + _TimeStamps.Push(now); + } OnEnded(now); - _Semaphore.Release(); + _ListSemaphore.Release(); + _CapacitySemaphore.Release(); } /// diff --git a/RateLimiter/LimitedSizeStack.cs b/RateLimiter/LimitedSizeStack.cs index bf1fe86..eab31ca 100644 --- a/RateLimiter/LimitedSizeStack.cs +++ b/RateLimiter/LimitedSizeStack.cs @@ -31,5 +31,19 @@ public void Push(T item) if (Count > _MaxSize) RemoveLast(); } + + /// + /// Removes the last occurrence of the specified value from the System.Collections.Generic.LinkedList`1. + /// + /// The current value. + /// The new value. + public bool ReplaceLast(T currentValue, T newValue) + { + LinkedListNode node = FindLast(currentValue); + if (node == null) + return false; + node.Value = newValue; + return true; + } } }