@@ -13,51 +13,63 @@ public class MySQLFixture : IDisposable
13
13
private const string _username = "root" ;
14
14
private const string _password = "root" ;
15
15
16
- private readonly MySqlConnection _connection ;
17
-
18
16
public IReplicationClient Client { get ; private set ; }
19
17
18
+ private SemaphoreSlim _semaphore = new SemaphoreSlim ( 1 , 1 ) ;
19
+
20
20
public static MySQLFixture Instance { get ; } = new MySQLFixture ( ) ;
21
21
22
22
private MySQLFixture ( )
23
23
{
24
- _connection = new MySqlConnection ( $ "Server={ _host } ;Database=garden;Uid={ _username } ;Pwd={ _password } ;") ;
25
24
Client = new ReplicationClient ( ) ;
26
25
ConnectAsync ( ) . Wait ( ) ;
27
26
}
28
27
29
28
private async Task ConnectAsync ( )
30
29
{
31
- await _connection . OpenAsync ( ) ;
32
30
await Client . ConnectAsync ( _host , _username , _password , 1 ) ;
33
31
}
34
32
33
+ private MySqlConnection GetConnection ( )
34
+ {
35
+ var connection = new MySqlConnection ( $ "Server={ _host } ;Database=garden;Uid={ _username } ;Pwd={ _password } ;") ;
36
+ connection . OpenAsync ( ) . Wait ( ) ;
37
+ return connection ;
38
+ }
39
+
35
40
public MySqlCommand CreateCommand ( )
36
41
{
37
- return _connection . CreateCommand ( ) ;
42
+ return GetConnection ( ) . CreateCommand ( ) ;
38
43
}
39
44
40
45
public async Task < TLogEvent > ReceiveAsync < TLogEvent > ( CancellationToken cancellationToken = default )
41
46
where TLogEvent : LogEvent
42
47
{
43
- while ( ! cancellationToken . IsCancellationRequested )
44
- {
45
- var logEvent = await Client . ReceiveAsync ( ) ;
48
+ await _semaphore . WaitAsync ( cancellationToken ) ;
46
49
47
- if ( logEvent is TLogEvent requiredLogEvent )
50
+ try
51
+ {
52
+ while ( ! cancellationToken . IsCancellationRequested )
48
53
{
49
- return requiredLogEvent ;
54
+ var logEvent = await Client . ReceiveAsync ( ) ;
55
+
56
+ if ( logEvent is TLogEvent requiredLogEvent )
57
+ {
58
+ return requiredLogEvent ;
59
+ }
50
60
}
51
61
}
62
+ finally
63
+ {
64
+ _semaphore . Release ( ) ;
65
+ }
52
66
53
67
return default ;
54
68
}
55
69
56
70
public void Dispose ( )
57
71
{
58
72
Client ? . CloseAsync ( ) . AsTask ( ) . Wait ( ) ;
59
- _connection ? . CloseAsync ( ) . Wait ( ) ;
60
- _connection ? . Dispose ( ) ;
61
73
}
62
74
}
63
75
}
0 commit comments