Skip to content

Commit

Permalink
Allows parallel actions execution. Crucial for slow actions
Browse files Browse the repository at this point in the history
  • Loading branch information
Boychenko committed Dec 5, 2019
1 parent 81808bd commit 80338cf
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 11 deletions.
40 changes: 40 additions & 0 deletions RateLimiter.Tests/Sample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,45 @@ public async Task Test100Thread()

await Task.WhenAll(tasks.ToArray());
}

[Fact]
public async Task ParallelSlowActionPerformance()
{
var stopWatch = Stopwatch.StartNew();
var limiter = TimeLimiter.GetFromMaxCountByInterval(100, TimeSpan.FromMinutes(1));
Func<Task> func = async () => await Task.Delay(300);

await Task.WhenAll(
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func));
stopWatch.Stop();

stopWatch.Elapsed.TotalSeconds.Should().BeLessThan(1);
}

[Fact]
public async Task ParallelSlowActionPerformaceLimited()
{
var stopWatch = Stopwatch.StartNew();
var limiter = TimeLimiter.GetFromMaxCountByInterval(3, TimeSpan.FromSeconds(1));
Func<Task> func = async () => await Task.Delay(300);

await Task.WhenAll(
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func));
stopWatch.Stop();

stopWatch.Elapsed.TotalSeconds.Should().BeLessThan(3);
}
}
}
63 changes: 52 additions & 11 deletions RateLimiter/CountByIntervalAwaitableConstraint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@ public class CountByIntervalAwaitableConstraint : IAwaitableConstraint
/// <summary>
/// List of the last time stamps
/// </summary>
public IReadOnlyList<DateTime> TimeStamps => _TimeStamps.ToList();
public IReadOnlyList<DateTime?> TimeStamps => _TimeStamps.ToList();

/// <summary>
/// Stack of the last time stamps
/// </summary>
protected LimitedSizeStack<DateTime> _TimeStamps { get; }
protected LimitedSizeStack<DateTime?> _TimeStamps { get; }

private int _Count { get; }
private TimeSpan _TimeSpan { get; }
private SemaphoreSlim _Semaphore { get; } = new SemaphoreSlim(1, 1);

private SemaphoreSlim _CapacitySemaphore { get; }
private SemaphoreSlim _ListSemaphore { get; } = new SemaphoreSlim(1, 1);
private SemaphoreSlim _DelaySemaphore { get; } = new SemaphoreSlim(1, 1);
private ITime _Time { get; }

/// <summary>
Expand All @@ -46,8 +49,9 @@ internal CountByIntervalAwaitableConstraint(int count, TimeSpan timeSpan, ITime

_Count = count;
_TimeSpan = timeSpan;
_TimeStamps = new LimitedSizeStack<DateTime>(_Count);
_TimeStamps = new LimitedSizeStack<DateTime?>(_Count);
_Time = time;
_CapacitySemaphore = new SemaphoreSlim(_Count, _Count);
}

/// <summary>
Expand All @@ -61,31 +65,63 @@ internal CountByIntervalAwaitableConstraint(int count, TimeSpan timeSpan, ITime
/// </returns>
public async Task<IDisposable> WaitForReadiness(CancellationToken cancellationToken)
{
await _Semaphore.WaitAsync(cancellationToken);
await _CapacitySemaphore.WaitAsync(cancellationToken);
try
{
await _DelaySemaphore.WaitAsync(cancellationToken);
}
catch
{
_CapacitySemaphore.Release();
throw;
}
try
{
await _ListSemaphore.WaitAsync(cancellationToken);
}
catch
{
_DelaySemaphore.Release();
_CapacitySemaphore.Release();
throw;
}
var count = 0;
var now = _Time.GetNow();
var target = now - _TimeSpan;
LinkedListNode<DateTime> element = _TimeStamps.First, last = null;
while ((element != null) && (element.Value > target))
LinkedListNode<DateTime?> element = _TimeStamps.First, last = null;
while ((element != null) && (element.Value == null || element.Value > target))
{
last = element;
element = element.Next;
count++;
}

if (count < _Count)
{
_TimeStamps.Push(null);
_DelaySemaphore.Release();
_ListSemaphore.Release();
return new DisposeAction(OnEnded);
}


Debug.Assert(element == null);
Debug.Assert(last != null);
var timeToWait = last.Value.Add(_TimeSpan) - now;
Debug.Assert(last.Value != null);
var timeToWait = last.Value.Value.Add(_TimeSpan) - now;
_ListSemaphore.Release();
try
{
await _Time.GetDelay(timeToWait, cancellationToken);
await _ListSemaphore.WaitAsync(cancellationToken);
_TimeStamps.Push(null); //also removes the oldest timestamp
_ListSemaphore.Release();
_DelaySemaphore.Release();
}
catch (Exception)
{
_Semaphore.Release();
_DelaySemaphore.Release();
_CapacitySemaphore.Release();
throw;
}

Expand All @@ -103,10 +139,15 @@ public IAwaitableConstraint Clone()

private void OnEnded()
{
_ListSemaphore.Wait(); //always small amount of time
var now = _Time.GetNow();
_TimeStamps.Push(now);
if (!_TimeStamps.ReplaceLast(null, now))
{
_TimeStamps.Push(now);
}
OnEnded(now);
_Semaphore.Release();
_ListSemaphore.Release();
_CapacitySemaphore.Release();
}

/// <summary>
Expand Down
14 changes: 14 additions & 0 deletions RateLimiter/LimitedSizeStack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,19 @@ public void Push(T item)
if (Count > _MaxSize)
RemoveLast();
}

/// <summary>
/// Removes the last occurrence of the specified value from the System.Collections.Generic.LinkedList`1.
/// </summary>
/// <param name="currentValue">The current value.</param>
/// <param name="newValue">The new value.</param>
public bool ReplaceLast(T currentValue, T newValue)
{
LinkedListNode<T> node = FindLast(currentValue);
if (node == null)
return false;
node.Value = newValue;
return true;
}
}
}

0 comments on commit 80338cf

Please sign in to comment.