diff --git a/Akka.Hosting.Benchmark.Tests/Akka.Hosting.Benchmark.Tests.csproj b/Akka.Hosting.Benchmark.Tests/Akka.Hosting.Benchmark.Tests.csproj new file mode 100644 index 00000000..96a77277 --- /dev/null +++ b/Akka.Hosting.Benchmark.Tests/Akka.Hosting.Benchmark.Tests.csproj @@ -0,0 +1,23 @@ + + + + Exe + net8.0 + enable + enable + false + + + + + + + + + + + + + + + diff --git a/Akka.Hosting.Benchmark.Tests/Cluster/Delivery/ClusterDeliveryBenchmark.cs b/Akka.Hosting.Benchmark.Tests/Cluster/Delivery/ClusterDeliveryBenchmark.cs new file mode 100644 index 00000000..aff47d9b --- /dev/null +++ b/Akka.Hosting.Benchmark.Tests/Cluster/Delivery/ClusterDeliveryBenchmark.cs @@ -0,0 +1,125 @@ +using Akka.Actor; +using Akka.Cluster.Hosting; +using Akka.Cluster.Sharding.Delivery; +using Akka.Hosting.Benchmark.Tests.Configurations; +using Akka.Persistence.Delivery; +using Akka.Persistence.Hosting; +using Akka.Remote.Hosting; +using Akka.Util; +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Akka.Hosting.Benchmark.Tests.Cluster.Delivery; + +[Config(typeof(MacroBenchmarkConfig))] +public class ClusterDeliveryBenchmark +{ + private IHost? _host; + private IActorRef? _producer; + private IActorRef? _region; + private IActorRef? _controller; + private IActorRef? _aggregator; + + [Params(3000, 6000)] + public int MessageCount; + + [GlobalSetup] + public void GlobalSetup() + { + _host = Host.CreateDefaultBuilder() + .ConfigureServices((context, services) => + { + services.AddAkka("BenchmarkSystem", (builder, provider) => + { + builder + .WithRemoting() + .WithClustering() + .WithShardRegion( + typeName: "TestConsumer", + entityPropsFactory: (system, _, resolver) => id => ShardingConsumerController.Create( + c => resolver.Props(id, c), + ShardingConsumerController.Settings.Create(system)), + messageExtractor: new MessageExtractor(), + shardOptions: new ShardOptions()) + .WithActors((system, registry, resolver) => + { + var actor = system.ActorOf(Props.Create(() => new AggregateActor(MessageCount))); + registry.Register(actor); + }); + }); + }) + .Build(); + + _host.StartAsync().GetAwaiter().GetResult(); + + var system = _host.Services.GetRequiredService(); + var registry = _host.Services.GetRequiredService(); + + // Join cluster + var tcs = new TaskCompletionSource(); + var cluster = Akka.Cluster.Cluster.Get(system); + cluster.RegisterOnMemberUp(() => + { + tcs.SetResult(); + }); + cluster.Join(cluster.SelfAddress); + tcs.Task.WaitAsync(TimeSpan.FromSeconds(3)).GetAwaiter().GetResult(); + + // Register the sharding region for later use + var region = registry.Get(); + _region = region; + + // Get the test completed aggregator + _aggregator = registry.Get(); + + // Create the ShardingProducerController + _controller = system.ActorOf( + ShardingProducerController.Create( + producerId: "test-producer", + shardRegion: region, + durableQueue: Option.None, + settings: ShardingProducerController.Settings.Create(system) + ), + "producerController" + ); + + // Create the producer actor + _producer = system.ActorOf(Props.Create(() => new ProducerActor(_controller)), "producer"); + } + + [GlobalCleanup] + public void Teardown() + { + if (_host is not null) + { + var sys = _host.Services.GetRequiredService(); + if(_aggregator is not null) + sys.Stop(_aggregator); + if(_producer is not null) + sys.Stop(_producer); + + _host.StopAsync().GetAwaiter().GetResult(); + } + + _aggregator = null; + _producer = null; + _region = null; + _controller = null; + } + + [IterationSetup] + public void IterationSetup() + { + _aggregator.Ask(Reset.Instance).GetAwaiter().GetResult(); + } + + [Benchmark] + public async Task ClusterShardingDeliveryMessageThroughputBenchmark() + { + foreach (var i in Enumerable.Range(1, MessageCount)) + _producer.Tell(i); + + await _aggregator.Ask(GetCompleted.Instance); + } +} \ No newline at end of file diff --git a/Akka.Hosting.Benchmark.Tests/Cluster/Delivery/ClusterDeliveryDurableQueueBenchmark.cs b/Akka.Hosting.Benchmark.Tests/Cluster/Delivery/ClusterDeliveryDurableQueueBenchmark.cs new file mode 100644 index 00000000..3d869595 --- /dev/null +++ b/Akka.Hosting.Benchmark.Tests/Cluster/Delivery/ClusterDeliveryDurableQueueBenchmark.cs @@ -0,0 +1,132 @@ +using Akka.Actor; +using Akka.Cluster.Hosting; +using Akka.Cluster.Sharding.Delivery; +using Akka.Hosting.Benchmark.Tests.Configurations; +using Akka.Persistence.Delivery; +using Akka.Persistence.Hosting; +using Akka.Remote.Hosting; +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Akka.Hosting.Benchmark.Tests.Cluster.Delivery; + +[Config(typeof(MacroBenchmarkConfig))] +public class ClusterDeliveryDurableQueueBenchmark +{ + private IHost? _host; + private IActorRef? _producer; + private IActorRef? _region; + private IActorRef? _controller; + private IActorRef? _aggregator; + + [Params(3000, 6000)] + public int MessageCount; + + [GlobalSetup] + public void GlobalSetup() + { + _host = Host.CreateDefaultBuilder() + .ConfigureServices((context, services) => + { + services.AddAkka("BenchmarkSystem", (builder, provider) => + { + builder + .WithRemoting() + .WithClustering() + .WithShardRegion( + typeName: "TestConsumer", + entityPropsFactory: (system, _, resolver) => id => ShardingConsumerController.Create( + c => resolver.Props(id, c), + ShardingConsumerController.Settings.Create(system)), + messageExtractor: new MessageExtractor(), + shardOptions: new ShardOptions()) + .WithInMemoryJournal() + .WithInMemorySnapshotStore() + .WithActors((system, registry, resolver) => + { + var actor = system.ActorOf(Props.Create(() => new AggregateActor(MessageCount))); + registry.Register(actor); + }); + }); + }) + .Build(); + + _host.StartAsync().GetAwaiter().GetResult(); + + var system = _host.Services.GetRequiredService(); + var registry = _host.Services.GetRequiredService(); + + // Join cluster + var tcs = new TaskCompletionSource(); + var cluster = Akka.Cluster.Cluster.Get(system); + cluster.RegisterOnMemberUp(() => + { + tcs.SetResult(); + }); + cluster.Join(cluster.SelfAddress); + tcs.Task.WaitAsync(TimeSpan.FromSeconds(3)).GetAwaiter().GetResult(); + + // Register the sharding region for later use + var region = registry.Get(); + _region = region; + + // Get the test completed aggregator + _aggregator = registry.Get(); + + // Create a durable queue props + var durableQueueProps = EventSourcedProducerQueue.Create( + persistentId: "eventSourced-durableQueue", + system + ); + + // Create the ShardingProducerController + _controller = system.ActorOf( + ShardingProducerController.Create( + producerId: "test-producer", + shardRegion: region, + durableQueue: durableQueueProps, + settings: ShardingProducerController.Settings.Create(system) + ), + "producerController" + ); + + // Create the producer actor + _producer = system.ActorOf(Props.Create(() => new ProducerActor(_controller)), "producer"); + } + + [GlobalCleanup] + public void Teardown() + { + if (_host is not null) + { + var sys = _host.Services.GetRequiredService(); + if(_aggregator is not null) + sys.Stop(_aggregator); + if(_producer is not null) + sys.Stop(_producer); + + _host.StopAsync().GetAwaiter().GetResult(); + } + + _aggregator = null; + _producer = null; + _region = null; + _controller = null; + } + + [IterationSetup] + public void IterationSetup() + { + _aggregator.Ask(Reset.Instance).GetAwaiter().GetResult(); + } + + [Benchmark] + public async Task ClusterShardingDeliveryDurableQueueMessageThroughputBenchmark() + { + foreach (var i in Enumerable.Range(1, MessageCount)) + _producer.Tell(i); + + await _aggregator.Ask(GetCompleted.Instance); + } +} \ No newline at end of file diff --git a/Akka.Hosting.Benchmark.Tests/Cluster/Delivery/Shared.cs b/Akka.Hosting.Benchmark.Tests/Cluster/Delivery/Shared.cs new file mode 100644 index 00000000..dd2b3501 --- /dev/null +++ b/Akka.Hosting.Benchmark.Tests/Cluster/Delivery/Shared.cs @@ -0,0 +1,162 @@ +using Akka.Actor; +using Akka.Cluster.Sharding; +using Akka.Cluster.Sharding.Delivery; +using Akka.Delivery; +using Akka.Event; + +namespace Akka.Hosting.Benchmark.Tests.Cluster.Delivery; + +#region Messages + +internal record Job(int Payload); + +internal class GetCompleted +{ + public static readonly GetCompleted Instance = new(); + private GetCompleted() { } +} + +internal class Completed +{ + public static readonly Completed Instance = new(); + private Completed() { } +} + +internal class Reset +{ + public static readonly Reset Instance = new(); + private Reset() { } +} + +#endregion + +#region Classes + +// The entity actor +internal class TestConsumerEntity : ReceiveActor +{ + private readonly IActorRef _aggregator; + private readonly string _entityId; + private readonly IActorRef _consumerController; + private readonly ILoggingAdapter _log; + + public TestConsumerEntity(string entityId, IActorRef consumerController, IRequiredActor reqAggregator) + { + _entityId = entityId; + _consumerController = consumerController; + _aggregator = reqAggregator.ActorRef; + _log = Context.GetLogger(); + + Receive>(delivery => + { + _aggregator.Tell(Done.Instance); + delivery.ConfirmTo.Tell(ConsumerController.Confirmed.Instance); + }); + } + + protected override void PreStart() + { + _consumerController.Tell(new ConsumerController.Start(Self)); + } +} + +// Message extractor for sharding +internal class MessageExtractor() : HashCodeMessageExtractor(10) +{ + public override string EntityId(object message) => + message is Job cmd ? (cmd.Payload % 10).ToString() : string.Empty; +} + +// The producer actor +internal class ProducerActor : ReceiveActor, IWithStash +{ + private readonly IActorRef _producerController; + private IActorRef _sendNext = ActorRefs.Nobody; + private readonly ILoggingAdapter _log; + + public ProducerActor(IActorRef producerController) + { + _log = Context.GetLogger(); + _producerController = producerController; + Become(Idle); + } + + public IStash Stash { get; set; } = null!; + + protected override void PreStart() + { + _producerController.Tell(new ShardingProducerController.Start(Self)); + } + + private void Idle() + { + Receive>(next => + { + _sendNext = next.SendNextTo; + Stash.Unstash(); + Become(Active); + }); + + Receive(_ => + { + Stash.Stash(); + }); + } + + private void Active() + { + Receive(msg => + { + Become(Idle); + _sendNext.Tell(new ShardingEnvelope(msg.ToString(), new Job(msg))); + }); + Receive(msg => + { + _log.Info(">>>> Sending {0}", msg.Payload); + }); + + Receive>(next => + { + _sendNext = next.SendNextTo; + }); + } +} + +internal class AggregateActor : ReceiveActor +{ + private IActorRef? _reportTo; + private readonly int _totalMessageCount; + private int _messageCount; + + public AggregateActor(int messageCount) + { + _totalMessageCount = messageCount; + Receive(_ => + { + _messageCount++; + if (_messageCount < _totalMessageCount || _reportTo == null) + return; + + _reportTo.Tell(Completed.Instance); + }); + Receive(_ => + { + _messageCount = 0; + Sender.Tell(Done.Instance); + }); + Receive(_ => + { + if (_messageCount >= _totalMessageCount) + { + Sender.Tell(Completed.Instance); + return; + } + + _reportTo = Sender; + }); + } +} + +internal class TestRegion; + +#endregion diff --git a/Akka.Hosting.Benchmark.Tests/Configurations/Configs.cs b/Akka.Hosting.Benchmark.Tests/Configurations/Configs.cs new file mode 100644 index 00000000..f831ba90 --- /dev/null +++ b/Akka.Hosting.Benchmark.Tests/Configurations/Configs.cs @@ -0,0 +1,106 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System.Reflection; +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Columns; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Engines; +using BenchmarkDotNet.Exporters; +using BenchmarkDotNet.Jobs; +using BenchmarkDotNet.Loggers; +using BenchmarkDotNet.Reports; +using BenchmarkDotNet.Running; + +namespace Akka.Hosting.Benchmark.Tests.Configurations +{ + public class RequestsPerSecondColumn : IColumn + { + public string Id => nameof(RequestsPerSecondColumn); + public string ColumnName => "Req/sec"; + + public bool IsDefault(Summary summary, BenchmarkCase benchmarkCase) => false; + public string GetValue(Summary summary, BenchmarkCase benchmarkCase) => GetValue(summary, benchmarkCase, null); + public bool IsAvailable(Summary summary) => true; + public bool AlwaysShow => true; + public ColumnCategory Category => ColumnCategory.Custom; + public int PriorityInCategory => -1; + public bool IsNumeric => true; + public UnitType UnitType => UnitType.Dimensionless; + public string Legend => "Requests per Second"; + + public string GetValue(Summary summary, BenchmarkCase benchmarkCase, SummaryStyle? style) + { + var benchmarkAttribute = benchmarkCase.Descriptor.WorkloadMethod.GetCustomAttribute(); + var totalOperations = benchmarkAttribute?.OperationsPerInvoke ?? 1; + + if (!summary.HasReport(benchmarkCase)) + return ""; + + var report = summary[benchmarkCase]; + var statistics = report?.ResultStatistics; + if(statistics is null) + return ""; + + var nsPerOperation = statistics.Mean; + var operationsPerSecond = 1 / (nsPerOperation / 1e9); + + return operationsPerSecond.ToString("N2"); // or format as you like + + } + } + + + /// + /// Basic BenchmarkDotNet configuration used for microbenchmarks. + /// + public class MicroBenchmarkConfig : ManualConfig + { + public MicroBenchmarkConfig() + { + AddDiagnoser(MemoryDiagnoser.Default); + AddExporter(MarkdownExporter.GitHub); + AddLogger(ConsoleLogger.Default); + } + } + + /// + /// BenchmarkDotNet configuration used for monitored jobs (not for microbenchmarks). + /// + public class MonitoringConfig : ManualConfig + { + public MonitoringConfig() + { + AddExporter(MarkdownExporter.GitHub); + AddColumn(new RequestsPerSecondColumn()); + } + } + + public class MacroBenchmarkConfig : ManualConfig + { + public MacroBenchmarkConfig() + { + AddExporter(MarkdownExporter.GitHub); + AddColumn(new RequestsPerSecondColumn()); + AddColumn(new CategoriesColumn()); + AddLogger(ConsoleLogger.Default); + + int processorCount = Environment.ProcessorCount; + IntPtr affinityMask = (IntPtr)((1 << processorCount) - 1); + + AddJob(Job.LongRun + .WithGcMode(new GcMode { Server = true, Concurrent = true }) + .WithWarmupCount(3) // Reduced from 25 + .WithIterationCount(10) // Reduced from 50 + .RunOncePerIteration() + .WithStrategy(RunStrategy.Monitoring) + // .WithAffinity(affinityMask) // Optional + ); + } + } +} diff --git a/Akka.Hosting.Benchmark.Tests/Program.cs b/Akka.Hosting.Benchmark.Tests/Program.cs new file mode 100644 index 00000000..6dedaf57 --- /dev/null +++ b/Akka.Hosting.Benchmark.Tests/Program.cs @@ -0,0 +1,4 @@ +using System.Reflection; +using BenchmarkDotNet.Running; + +BenchmarkSwitcher.FromAssembly(Assembly.GetExecutingAssembly()).Run(args); \ No newline at end of file diff --git a/Akka.Hosting.sln b/Akka.Hosting.sln index e6ad9bf7..2d9cb26a 100644 --- a/Akka.Hosting.sln +++ b/Akka.Hosting.sln @@ -44,6 +44,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Remote.Hosting.Tests", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Hosting.LoggingDemo", "src\Examples\Akka.Hosting.LoggingDemo\Akka.Hosting.LoggingDemo.csproj", "{298D7727-FDC6-49B2-9030-CC7F0E09B0B8}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", "Benchmarks", "{9852A2E3-80A8-4290-A59A-622399C7A8E8}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Hosting.Benchmark.Tests", "Akka.Hosting.Benchmark.Tests\Akka.Hosting.Benchmark.Tests.csproj", "{E47F4790-0AC9-4DA8-BB91-0894AD18C9B5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -110,6 +114,10 @@ Global {298D7727-FDC6-49B2-9030-CC7F0E09B0B8}.Debug|Any CPU.Build.0 = Debug|Any CPU {298D7727-FDC6-49B2-9030-CC7F0E09B0B8}.Release|Any CPU.ActiveCfg = Release|Any CPU {298D7727-FDC6-49B2-9030-CC7F0E09B0B8}.Release|Any CPU.Build.0 = Release|Any CPU + {E47F4790-0AC9-4DA8-BB91-0894AD18C9B5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E47F4790-0AC9-4DA8-BB91-0894AD18C9B5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E47F4790-0AC9-4DA8-BB91-0894AD18C9B5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E47F4790-0AC9-4DA8-BB91-0894AD18C9B5}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -118,6 +126,7 @@ Global {5F6A7BE8-6906-46CE-BA1C-72EA11EFA33B} = {EFA970FF-6BCC-4C38-84D8-324D40F2BF03} {4F79325B-9EE7-4501-800F-7A1F8DFBCC80} = {EFA970FF-6BCC-4C38-84D8-324D40F2BF03} {298D7727-FDC6-49B2-9030-CC7F0E09B0B8} = {EFA970FF-6BCC-4C38-84D8-324D40F2BF03} + {E47F4790-0AC9-4DA8-BB91-0894AD18C9B5} = {9852A2E3-80A8-4290-A59A-622399C7A8E8} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B99E6BB8-642A-4A68-86DF-69567CBA700A}