Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allows parallel actions execution. Crucial for slow actions #19

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions RateLimiter.Tests/Sample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,45 @@ public async Task Test100Thread()

await Task.WhenAll(tasks.ToArray());
}

[Fact]
public async Task ParallelSlowActionPerformance()
{
var stopWatch = Stopwatch.StartNew();
var limiter = TimeLimiter.GetFromMaxCountByInterval(100, TimeSpan.FromMinutes(1));
Func<Task> func = async () => await Task.Delay(300);

await Task.WhenAll(
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func));
stopWatch.Stop();

stopWatch.Elapsed.TotalSeconds.Should().BeLessThan(1);
}

[Fact]
public async Task ParallelSlowActionPerformaceLimited()
{
var stopWatch = Stopwatch.StartNew();
var limiter = TimeLimiter.GetFromMaxCountByInterval(3, TimeSpan.FromSeconds(1));
Func<Task> func = async () => await Task.Delay(300);

await Task.WhenAll(
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func),
limiter.Enqueue(func));
stopWatch.Stop();

stopWatch.Elapsed.TotalSeconds.Should().BeLessThan(3);
}
}
}
63 changes: 52 additions & 11 deletions RateLimiter/CountByIntervalAwaitableConstraint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@ public class CountByIntervalAwaitableConstraint : IAwaitableConstraint
/// <summary>
/// List of the last time stamps
/// </summary>
public IReadOnlyList<DateTime> TimeStamps => _TimeStamps.ToList();
public IReadOnlyList<DateTime?> TimeStamps => _TimeStamps.ToList();

/// <summary>
/// Stack of the last time stamps
/// </summary>
protected LimitedSizeStack<DateTime> _TimeStamps { get; }
protected LimitedSizeStack<DateTime?> _TimeStamps { get; }

private int _Count { get; }
private TimeSpan _TimeSpan { get; }
private SemaphoreSlim _Semaphore { get; } = new SemaphoreSlim(1, 1);

private SemaphoreSlim _CapacitySemaphore { get; }
private SemaphoreSlim _ListSemaphore { get; } = new SemaphoreSlim(1, 1);
private SemaphoreSlim _DelaySemaphore { get; } = new SemaphoreSlim(1, 1);
private ITime _Time { get; }

/// <summary>
Expand All @@ -46,8 +49,9 @@ internal CountByIntervalAwaitableConstraint(int count, TimeSpan timeSpan, ITime

_Count = count;
_TimeSpan = timeSpan;
_TimeStamps = new LimitedSizeStack<DateTime>(_Count);
_TimeStamps = new LimitedSizeStack<DateTime?>(_Count);
_Time = time;
_CapacitySemaphore = new SemaphoreSlim(_Count, _Count);
}

/// <summary>
Expand All @@ -61,31 +65,63 @@ internal CountByIntervalAwaitableConstraint(int count, TimeSpan timeSpan, ITime
/// </returns>
public async Task<IDisposable> WaitForReadiness(CancellationToken cancellationToken)
{
await _Semaphore.WaitAsync(cancellationToken);
await _CapacitySemaphore.WaitAsync(cancellationToken);
try
{
await _DelaySemaphore.WaitAsync(cancellationToken);
}
catch
{
_CapacitySemaphore.Release();
throw;
}
try
{
await _ListSemaphore.WaitAsync(cancellationToken);
}
catch
{
_DelaySemaphore.Release();
_CapacitySemaphore.Release();
throw;
}
var count = 0;
var now = _Time.GetNow();
var target = now - _TimeSpan;
LinkedListNode<DateTime> element = _TimeStamps.First, last = null;
while ((element != null) && (element.Value > target))
LinkedListNode<DateTime?> element = _TimeStamps.First, last = null;
while ((element != null) && (element.Value == null || element.Value > target))
{
last = element;
element = element.Next;
count++;
}

if (count < _Count)
{
_TimeStamps.Push(null);
_DelaySemaphore.Release();
_ListSemaphore.Release();
return new DisposeAction(OnEnded);
}


Debug.Assert(element == null);
Debug.Assert(last != null);
var timeToWait = last.Value.Add(_TimeSpan) - now;
Debug.Assert(last.Value != null);
var timeToWait = last.Value.Value.Add(_TimeSpan) - now;
_ListSemaphore.Release();
try
{
await _Time.GetDelay(timeToWait, cancellationToken);
await _ListSemaphore.WaitAsync(cancellationToken);
_TimeStamps.Push(null); //also removes the oldest timestamp
_ListSemaphore.Release();
_DelaySemaphore.Release();
}
catch (Exception)
{
_Semaphore.Release();
_DelaySemaphore.Release();
_CapacitySemaphore.Release();
throw;
}

Expand All @@ -103,10 +139,15 @@ public IAwaitableConstraint Clone()

private void OnEnded()
{
_ListSemaphore.Wait(); //always small amount of time
var now = _Time.GetNow();
_TimeStamps.Push(now);
if (!_TimeStamps.ReplaceLast(null, now))
{
_TimeStamps.Push(now);
}
OnEnded(now);
_Semaphore.Release();
_ListSemaphore.Release();
_CapacitySemaphore.Release();
}

/// <summary>
Expand Down
14 changes: 14 additions & 0 deletions RateLimiter/LimitedSizeStack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,19 @@ public void Push(T item)
if (Count > _MaxSize)
RemoveLast();
}

/// <summary>
/// Removes the last occurrence of the specified value from the System.Collections.Generic.LinkedList`1.
/// </summary>
/// <param name="currentValue">The current value.</param>
/// <param name="newValue">The new value.</param>
public bool ReplaceLast(T currentValue, T newValue)
{
LinkedListNode<T> node = FindLast(currentValue);
if (node == null)
return false;
node.Value = newValue;
return true;
}
}
}