1
1
# encoding: utf-8
2
+
3
+ import os
4
+ import time
5
+ import threading
2
6
import socket
3
7
import os
4
8
from tornado import gen
5
- from tornado .ioloop import IOLoop
6
9
from tormysql .pool import ConnectionNotFoundError
7
10
from pymysql import OperationalError
8
11
from tornado .testing import gen_test
9
12
from tormysql import Connection , ConnectionPool
10
- from maproxy .proxyserver import ProxyServer
11
- from maproxy .session import SessionFactory
13
+ import sevent
12
14
from tests import BaseTestCase
13
15
16
+ class Request (object ):
17
+ def __init__ (self , conn , host , port ):
18
+ self .conn = conn
19
+ self .pconn = sevent .tcp .Socket ()
20
+ self .buffer = None
21
+ self .connected = False
22
+
23
+ self .conn .on ("data" , self .on_data )
24
+ self .conn .on ("close" , self .on_close )
25
+
26
+ self .pconn .on ("connect" , self .on_pconnect )
27
+ self .pconn .on ("data" , self .on_pdata )
28
+ self .pconn .on ("close" , self .on_pclose )
29
+
30
+ self .pconn .connect ((host , int (port )))
31
+
32
+ def on_data (self , conn , data ):
33
+ if self .connected :
34
+ self .pconn .write (data )
35
+ else :
36
+ self .buffer = data
37
+
38
+ def on_pdata (self , conn , data ):
39
+ self .conn .write (data )
40
+
41
+ def on_close (self , conn ):
42
+ self .pconn .end ()
43
+ try :
44
+ TestThroughProxy .proxys .remove (self )
45
+ except :
46
+ pass
47
+
48
+ def on_pclose (self , conn ):
49
+ self .conn .end ()
50
+ try :
51
+ TestThroughProxy .proxys .remove (self )
52
+ except :
53
+ pass
54
+
55
+ def on_pconnect (self , conn ):
56
+ self .connected = True
57
+ if self .buffer :
58
+ self .pconn .write (self .buffer )
59
+ self .buffer = None
14
60
15
61
class TestThroughProxy (BaseTestCase ):
62
+ proxys = []
63
+
16
64
def setUp (self ):
17
65
super (BaseTestCase , self ).setUp ()
18
66
self .PARAMS = dict (self .PARAMS )
19
67
20
68
s = socket .socket ()
21
69
s .bind (('127.0.0.1' , 0 ))
22
-
23
- self .host , self .port = self .PARAMS ['host' ], self .PARAMS ['port' ]
24
70
_ , self .pport = s .getsockname ()
25
71
s .close ()
26
72
73
+ self .host , self .port = self .PARAMS ['host' ], self .PARAMS ['port' ]
74
+
27
75
def init_proxy (self ):
28
- self .proxy = ProxyServer (
29
- self .host ,
30
- self .port ,
31
- session_factory = SessionFactory (),
32
- )
76
+ def on_connect (server , conn ):
77
+ TestThroughProxy .proxys .append (Request (conn , self .host , self .port ))
78
+
79
+ self .proxy_server = sevent .tcp .Server ()
80
+ self .proxy_server .on ("connection" , on_connect )
81
+ self .proxy_server .listen (("0.0.0.0" , self .pport ))
82
+
33
83
self .PARAMS ['port' ] = self .pport
34
84
self .PARAMS ['host' ] = '127.0.0.1'
35
85
36
- self .proxy .listen (self .pport )
37
-
38
86
def _close_proxy_sessions (self ):
39
- for sock in self .proxy .SessionsList :
40
- try :
41
- sock .c2p_stream .close ()
42
- except :
43
- pass
87
+ for request in TestThroughProxy .proxys :
88
+ request .conn .end ()
44
89
45
90
def tearDown (self ):
46
91
try :
47
- self .proxy .stop ()
92
+ for request in TestThroughProxy .proxys :
93
+ request .conn .end ()
94
+ self .proxy_server .close ()
48
95
except :
49
96
pass
50
97
super (BaseTestCase , self ).tearDown ()
51
98
52
99
@gen .coroutine
53
100
def _execute_test_connection_closing (self ):
54
101
self .init_proxy ()
102
+
55
103
connection = yield Connection (** self .PARAMS )
56
104
cursor = connection .cursor ()
57
105
self ._close_proxy_sessions ()
@@ -62,14 +110,17 @@ def _execute_test_connection_closing(self):
62
110
pass
63
111
else :
64
112
raise AssertionError ("Unexpected normal situation" )
65
- self .proxy .stop ()
113
+
114
+ self .proxy_server .close ()
66
115
67
116
@gen .coroutine
68
117
def _execute_test_connection_closed (self ):
69
118
self .init_proxy ()
119
+
70
120
conn = yield Connection (** self .PARAMS )
71
121
yield conn .close ()
72
- self .proxy .stop ()
122
+
123
+ self .proxy_server .close ()
73
124
74
125
try :
75
126
yield Connection (** self .PARAMS )
@@ -81,6 +132,7 @@ def _execute_test_connection_closed(self):
81
132
@gen .coroutine
82
133
def _execute_test_remote_closing (self ):
83
134
self .init_proxy ()
135
+
84
136
pool = ConnectionPool (
85
137
max_connections = int (os .getenv ("MYSQL_POOL" , "5" )),
86
138
idle_seconds = 7200 ,
@@ -90,7 +142,9 @@ def _execute_test_remote_closing(self):
90
142
try :
91
143
conn = yield pool .Connection ()
92
144
yield conn .do_close ()
93
- self .proxy .stop ()
145
+
146
+ self .proxy_server .close ()
147
+
94
148
yield pool .Connection ()
95
149
except OperationalError :
96
150
pass
@@ -102,6 +156,7 @@ def _execute_test_remote_closing(self):
102
156
@gen .coroutine
103
157
def _execute_test_pool_closing (self ):
104
158
self .init_proxy ()
159
+
105
160
pool = ConnectionPool (
106
161
max_connections = int (os .getenv ("MYSQL_POOL" , "5" )),
107
162
idle_seconds = 7200 ,
@@ -118,10 +173,19 @@ def _execute_test_pool_closing(self):
118
173
raise AssertionError ("Unexpected normal situation" )
119
174
finally :
120
175
yield pool .close ()
121
- self .proxy .stop ()
176
+
177
+ self .proxy_server .close ()
122
178
123
179
@gen_test
124
180
def test (self ):
181
+ loop = sevent .instance ()
182
+ def run ():
183
+ loop .start ()
184
+
185
+ self .proxy_thread = threading .Thread (target = run )
186
+ self .proxy_thread .setDaemon (True )
187
+ self .proxy_thread .start ()
188
+
125
189
yield self ._execute_test_connection_closing ()
126
190
yield self ._execute_test_connection_closed ()
127
191
yield self ._execute_test_remote_closing ()
0 commit comments