1+ using System ;
2+ using System . Diagnostics ;
3+ using System . Linq ;
4+ using System . Threading ;
5+ using System . Threading . Tasks ;
6+ using Microsoft . Extensions . DependencyInjection ;
7+ using NUnit . Framework ;
8+ using Rebus . Bus ;
9+ using Rebus . Config ;
10+ using Rebus . Handlers ;
11+ using Rebus . Logging ;
12+ using Rebus . Routing . TypeBased ;
13+ using Rebus . Tests . Contracts ;
14+ using Rebus . Transport . InMem ;
15+ #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
16+
17+ namespace Rebus . ServiceProvider . Tests . Examples ;
18+
19+ [ TestFixture ]
20+ public class InMemoryPerformanceCheck : FixtureBase
21+ {
22+ Microsoft . Extensions . DependencyInjection . ServiceProvider provider ;
23+
24+ protected override void SetUp ( )
25+ {
26+ base . SetUp ( ) ;
27+
28+ var services = new ServiceCollection ( ) ;
29+
30+ services . AddRebus ( configure =>
31+ configure . Transport ( t => t . UseInMemoryTransport ( new ( ) , "test-queue" ) )
32+ . Routing ( r => r . TypeBased ( ) . Map < PerformanceData > ( "test-queue" ) ) // Map all messages in the assembly of Program to the queue
33+ . Logging ( l => l . ColoredConsole ( LogLevel . Warn ) )
34+ ) ;
35+
36+ services . AddRebusHandler < PerformanceConsumer > ( ) ;
37+
38+ services . AddSingleton < PerformanceProducer > ( ) ;
39+
40+ provider = services . BuildServiceProvider ( ) ;
41+ provider . StartHostedServices ( ) ;
42+ }
43+
44+ protected override void TearDown ( )
45+ {
46+ provider . Dispose ( ) ;
47+ base . TearDown ( ) ;
48+ }
49+
50+ [ Test ]
51+ [ Repeat ( 10 ) ]
52+ public async Task RunTheTest ( )
53+ {
54+ var producer = provider . GetRequiredService < PerformanceProducer > ( ) ;
55+
56+ var elapsedMilliseconds = await producer . TestPerformance ( ) ;
57+
58+ Console . WriteLine ( $ "Perf test execution time: { elapsedMilliseconds } ms") ;
59+ }
60+
61+ public static class PerformanceCounter
62+ {
63+ public static int Counter = 10000 ;
64+ }
65+
66+ public class PerformanceProducer ( IBus bus )
67+ {
68+ public static readonly AutoResetEvent AutoResetEvent = new ( initialState : false ) ;
69+
70+ public async Task < long > TestPerformance ( )
71+ {
72+ var stopwatch = Stopwatch . StartNew ( ) ;
73+
74+ var messages = Enumerable . Range ( 0 , PerformanceCounter . Counter )
75+ . Select ( n => new PerformanceData ( n ) ) ;
76+
77+ foreach ( var message in messages )
78+ {
79+ await bus . Send ( message ) ;
80+ }
81+
82+ AutoResetEvent . WaitOne ( ) ; // Wait for all messages to be processed
83+
84+ return stopwatch . ElapsedMilliseconds ;
85+ }
86+ }
87+
88+ public class PerformanceConsumer : IHandleMessages < PerformanceData >
89+ {
90+ static int counter ;
91+
92+ public async Task Handle ( PerformanceData message )
93+ {
94+ if ( counter != message . Index )
95+ {
96+ Console . WriteLine ( $ "Expected message index { counter } , but got { message . Index } ") ;
97+ }
98+
99+ var result = Interlocked . Increment ( ref counter ) ;
100+
101+ if ( result == PerformanceCounter . Counter )
102+ {
103+ Interlocked . Exchange ( ref counter , 0 ) ; // Reset counter for next performance test
104+ PerformanceProducer . AutoResetEvent . Set ( ) ; // Signal that all messages have been processed
105+ }
106+ }
107+ }
108+
109+ public record PerformanceData ( int Index ) ;
110+ }
0 commit comments