From 74bf3c5394d348ab1097727650fc47b85b301d6a Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Fri, 27 May 2022 16:27:54 +0200 Subject: [PATCH] PoC Mailbox optimizations (#1618) * Add LockingUnboundedMailboxQueue.cs --- benchmarks/InprocessBenchmark/Program.cs | 2 +- benchmarks/PropsBenchmark/Program.cs | 3 +- benchmarks/SpawnBenchmark/Program.cs | 10 ++-- .../Mailbox/LockingUnboundedMailboxQueue.cs | 56 +++++++++++++++++++ src/Proto.Actor/Mailbox/Mailbox.cs | 2 +- .../Mailbox/UnboundedMailboxQueue.cs | 1 - 6 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 src/Proto.Actor/Mailbox/LockingUnboundedMailboxQueue.cs diff --git a/benchmarks/InprocessBenchmark/Program.cs b/benchmarks/InprocessBenchmark/Program.cs index 2680bc899a..ffccb2c885 100644 --- a/benchmarks/InprocessBenchmark/Program.cs +++ b/benchmarks/InprocessBenchmark/Program.cs @@ -149,4 +149,4 @@ private void SendBatch(IContext context) _messageCount -= _batchSize; } -} \ No newline at end of file +} diff --git a/benchmarks/PropsBenchmark/Program.cs b/benchmarks/PropsBenchmark/Program.cs index 23854dab11..d70b2e2a8e 100644 --- a/benchmarks/PropsBenchmark/Program.cs +++ b/benchmarks/PropsBenchmark/Program.cs @@ -3,9 +3,10 @@ using PropsBenchmark; using Proto; +using Proto.Mailbox; var system = new ActorSystem(); -var props = Props.FromFunc(ctx => Task.CompletedTask); +var props = Props.FromFunc(ctx => Task.CompletedTask).WithMailbox(() => new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new LockingUnboundedMailboxQueue(4))); Console.WriteLine("Starting"); for (var i = 0; i < 1_000_000; i++) diff --git a/benchmarks/SpawnBenchmark/Program.cs b/benchmarks/SpawnBenchmark/Program.cs index 79627607e0..99345f2e01 100644 --- a/benchmarks/SpawnBenchmark/Program.cs +++ b/benchmarks/SpawnBenchmark/Program.cs @@ -8,6 +8,7 @@ using System.Runtime; using System.Threading.Tasks; using Proto; +using Proto.Mailbox; namespace SpawnBenchmark; @@ -43,7 +44,7 @@ public Task ReceiveAsync(IContext context) for (var i = 0; i < r.Div; i++) { - var child = _system.Root.Spawn(Props(_system)); + var child = _system.Root.Spawn(Props); context.Request(child, new Request { Num = r.Num + i * (r.Size / r.Div), @@ -71,9 +72,8 @@ public Task ReceiveAsync(IContext context) } } - private static MyActor ProduceActor(ActorSystem system) => new(system); - - public static Props Props(ActorSystem system) => Proto.Props.FromProducer(() => ProduceActor(system)); + public static readonly Props Props = Props.FromProducer(s => new MyActor(s)).WithMailbox(() => new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new LockingUnboundedMailboxQueue(4))); + } class Program @@ -87,7 +87,7 @@ private static void Main() { Console.WriteLine($"Is Server GC {GCSettings.IsServerGC}"); - var pid = context.Spawn(MyActor.Props(system)); + var pid = context.Spawn(MyActor.Props); var sw = Stopwatch.StartNew(); var t = context.RequestAsync(pid, new Request { diff --git a/src/Proto.Actor/Mailbox/LockingUnboundedMailboxQueue.cs b/src/Proto.Actor/Mailbox/LockingUnboundedMailboxQueue.cs new file mode 100644 index 0000000000..85e838fb4f --- /dev/null +++ b/src/Proto.Actor/Mailbox/LockingUnboundedMailboxQueue.cs @@ -0,0 +1,56 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Proto.Mailbox; + +public class LockingUnboundedMailboxQueue : IMailboxQueue +{ + private readonly object _lock = new(); + private readonly Queue _queue; + private long _count; + + public LockingUnboundedMailboxQueue(int initialCapacity) + { + _queue = new Queue(initialCapacity); + } + + public bool HasMessages => Length > 0; + public int Length { + get { + Interlocked.Read(ref _count); + return (int)_count; + } + } + + public void Push(object message) + { + lock (_lock) + { + _queue.Enqueue(message); + Interlocked.Increment(ref _count); + } + } + + public object? Pop() + { + if (!HasMessages) + { + return null; + } + + lock (_lock) + { + if (_queue.TryDequeue(out var msg)) + { + Interlocked.Decrement(ref _count); + } + return msg; + } + } +} \ No newline at end of file diff --git a/src/Proto.Actor/Mailbox/Mailbox.cs b/src/Proto.Actor/Mailbox/Mailbox.cs index b3394ef912..5dcb2d732f 100644 --- a/src/Proto.Actor/Mailbox/Mailbox.cs +++ b/src/Proto.Actor/Mailbox/Mailbox.cs @@ -322,4 +322,4 @@ public interface IMailboxStatistics /// This method is invoked when all messages in the mailbox have been received. /// void MailboxEmpty(); -} \ No newline at end of file +} diff --git a/src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs b/src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs index 72cc192fa4..09a16cfefc 100644 --- a/src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs +++ b/src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs @@ -7,7 +7,6 @@ namespace Proto.Mailbox; - public class UnboundedMailboxQueue : IMailboxQueue { private readonly ConcurrentQueue _messages = new();