1
+ using System ;
2
+ using System . Diagnostics ;
3
+ using System . Threading ;
4
+ using System . Threading . Tasks ;
5
+ using Microsoft . Extensions . DependencyInjection ;
6
+ using NUnit . Framework ;
7
+ using Rebus . Bus ;
8
+ using Rebus . Config ;
9
+ using Rebus . Handlers ;
10
+ using Rebus . Logging ;
11
+ using Rebus . Routing . TypeBased ;
12
+ using Rebus . Tests . Contracts ;
13
+ using Rebus . Transport . InMem ;
14
+
15
+ namespace Rebus . ServiceProvider . Tests . Examples ;
16
+
17
+ [ TestFixture ]
18
+ public class InMemoryPerformanceCheck : FixtureBase
19
+ {
20
+ Microsoft . Extensions . DependencyInjection . ServiceProvider provider ;
21
+
22
+ protected override void SetUp ( )
23
+ {
24
+ base . SetUp ( ) ;
25
+
26
+ var services = new ServiceCollection ( ) ;
27
+
28
+ services . AddRebus ( configure =>
29
+ configure . Transport ( t => t . UseInMemoryTransport ( new ( ) , "test-queue" ) )
30
+ . Routing ( r => r . TypeBased ( ) . Map < PerformanceData > ( "test-queue" ) ) // Map all messages in the assembly of Program to the queue
31
+ . Logging ( l => l . ColoredConsole ( LogLevel . Warn ) )
32
+ ) ;
33
+
34
+ services . AddRebusHandler < PerformanceConsumer > ( ) ;
35
+
36
+ services . AddSingleton < PerformanceProducer > ( ) ;
37
+
38
+ provider = services . BuildServiceProvider ( ) ;
39
+ provider . StartHostedServices ( ) ;
40
+ }
41
+
42
+ protected override void TearDown ( )
43
+ {
44
+ provider . Dispose ( ) ;
45
+ base . TearDown ( ) ;
46
+ }
47
+
48
+ [ Test ]
49
+ [ Repeat ( 10 ) ]
50
+ public async Task RunTheTest ( )
51
+ {
52
+ var producer = provider . GetRequiredService < PerformanceProducer > ( ) ;
53
+
54
+ var elapsedMilliseconds = await producer . TestPerformance ( ) ;
55
+
56
+ Console . WriteLine ( $ "Perf test execution time: { elapsedMilliseconds } ms") ;
57
+ }
58
+
59
+ public static class PerformanceCounter
60
+ {
61
+ public static int Counter = 10000 ;
62
+ }
63
+
64
+ public class PerformanceProducer
65
+ {
66
+ public static AutoResetEvent AutoResetEvent { get ; } = new AutoResetEvent ( false ) ;
67
+
68
+ private readonly IBus _bus ;
69
+
70
+ public PerformanceProducer ( IBus bus )
71
+ {
72
+ _bus = bus ;
73
+ }
74
+
75
+ public async Task < long > TestPerformance ( )
76
+ {
77
+ Stopwatch stopwatch = new Stopwatch ( ) ;
78
+ stopwatch . Start ( ) ;
79
+
80
+ for ( int i = 0 ; i < PerformanceCounter . Counter ; i ++ )
81
+ {
82
+ await _bus . Send ( new PerformanceData ( i ) ) ;
83
+ }
84
+
85
+ AutoResetEvent . WaitOne ( ) ; // Wait for all messages to be processed
86
+ stopwatch . Stop ( ) ;
87
+ return stopwatch . ElapsedMilliseconds ;
88
+ }
89
+ }
90
+
91
+ public class PerformanceConsumer : IHandleMessages < PerformanceData >
92
+ {
93
+ static int counter = 0 ;
94
+
95
+ public async Task Handle ( PerformanceData message )
96
+ {
97
+ if ( counter != message . Index )
98
+ {
99
+ Console . WriteLine ( $ "Expected message index { counter } , but got { message . Index } ") ;
100
+ }
101
+
102
+ Interlocked . Increment ( ref counter ) ;
103
+
104
+ if ( counter == PerformanceCounter . Counter )
105
+ {
106
+ counter = 0 ; // Reset counter for next performance test
107
+ PerformanceProducer . AutoResetEvent . Set ( ) ; // Signal that all messages have been processed
108
+ }
109
+
110
+ await Task . CompletedTask ;
111
+ }
112
+ }
113
+
114
+ public record PerformanceData ( int Index ) ;
115
+ }
0 commit comments