Skip to content

Commit

Permalink
PoC Mailbox optimizations (#1618)
Browse files Browse the repository at this point in the history
* Add LockingUnboundedMailboxQueue.cs
  • Loading branch information
rogeralsing authored May 27, 2022
1 parent f47b751 commit 74bf3c5
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 9 deletions.
2 changes: 1 addition & 1 deletion benchmarks/InprocessBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,4 @@ private void SendBatch(IContext context)

_messageCount -= _batchSize;
}
}
}
3 changes: 2 additions & 1 deletion benchmarks/PropsBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

using PropsBenchmark;
using Proto;
using Proto.Mailbox;

var system = new ActorSystem();
var props = Props.FromFunc(ctx => Task.CompletedTask);
var props = Props.FromFunc(ctx => Task.CompletedTask).WithMailbox(() => new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new LockingUnboundedMailboxQueue(4)));

Console.WriteLine("Starting");
for (var i = 0; i < 1_000_000; i++)
Expand Down
10 changes: 5 additions & 5 deletions benchmarks/SpawnBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Runtime;
using System.Threading.Tasks;
using Proto;
using Proto.Mailbox;

namespace SpawnBenchmark;

Expand Down Expand Up @@ -43,7 +44,7 @@ public Task ReceiveAsync(IContext context)

for (var i = 0; i < r.Div; i++)
{
var child = _system.Root.Spawn(Props(_system));
var child = _system.Root.Spawn(Props);
context.Request(child, new Request
{
Num = r.Num + i * (r.Size / r.Div),
Expand Down Expand Up @@ -71,9 +72,8 @@ public Task ReceiveAsync(IContext context)
}
}

private static MyActor ProduceActor(ActorSystem system) => new(system);

public static Props Props(ActorSystem system) => Proto.Props.FromProducer(() => ProduceActor(system));
public static readonly Props Props = Props.FromProducer(s => new MyActor(s)).WithMailbox(() => new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new LockingUnboundedMailboxQueue(4)));

}

class Program
Expand All @@ -87,7 +87,7 @@ private static void Main()
{
Console.WriteLine($"Is Server GC {GCSettings.IsServerGC}");

var pid = context.Spawn(MyActor.Props(system));
var pid = context.Spawn(MyActor.Props);
var sw = Stopwatch.StartNew();
var t = context.RequestAsync<long>(pid, new Request
{
Expand Down
56 changes: 56 additions & 0 deletions src/Proto.Actor/Mailbox/LockingUnboundedMailboxQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// -----------------------------------------------------------------------
// <copyright file = "LockingUnboundedMailboxQueue.cs" company = "Asynkron AB">
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Threading;

namespace Proto.Mailbox;

public class LockingUnboundedMailboxQueue : IMailboxQueue
{
private readonly object _lock = new();
private readonly Queue<object> _queue;
private long _count;

public LockingUnboundedMailboxQueue(int initialCapacity)
{
_queue = new Queue<object>(initialCapacity);
}

public bool HasMessages => Length > 0;
public int Length {
get {
Interlocked.Read(ref _count);
return (int)_count;
}
}

public void Push(object message)
{
lock (_lock)
{
_queue.Enqueue(message);
Interlocked.Increment(ref _count);
}
}

public object? Pop()
{
if (!HasMessages)
{
return null;
}

lock (_lock)
{
if (_queue.TryDequeue(out var msg))
{
Interlocked.Decrement(ref _count);
}
return msg;
}
}
}
2 changes: 1 addition & 1 deletion src/Proto.Actor/Mailbox/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,4 @@ public interface IMailboxStatistics
/// This method is invoked when all messages in the mailbox have been received.
/// </summary>
void MailboxEmpty();
}
}
1 change: 0 additions & 1 deletion src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

namespace Proto.Mailbox;


public class UnboundedMailboxQueue : IMailboxQueue
{
private readonly ConcurrentQueue<object> _messages = new();
Expand Down

0 comments on commit 74bf3c5

Please sign in to comment.