1+ from __future__ import absolute_import
2+ from functools import partial
3+
4+ import json
5+ from queue import Empty
6+ import errno
7+ from cachecontrol import CacheControl
8+ from ldclient import LDClient , _headers , log , _evaluate , _stream_headers , StreamProcessor , Config
9+ from ldclient .twisted_sse import TwistedSSEClient
10+ from requests .packages .urllib3 .exceptions import ProtocolError
11+ from twisted .internet import task , defer
12+ import txrequests
13+
14+
15+ class TwistedLDClient (LDClient ):
16+ def __init__ (self , api_key , config = None ):
17+ if config is None :
18+ config = TwistedConfig .default ()
19+ super (TwistedLDClient , self ).__init__ (api_key , config )
20+ self ._session = CacheControl (txrequests .Session ())
21+
22+ def _check_consumer (self ):
23+ if not self ._consumer or not self ._consumer .is_alive ():
24+ self ._consumer = TwistedConsumer (self ._session , self ._queue , self ._api_key , self ._config )
25+ self ._consumer .start ()
26+
27+ def flush (self ):
28+ if self ._offline :
29+ return defer .succeed (True )
30+ self ._check_consumer ()
31+ return self ._consumer .flush ()
32+
33+ def toggle (self , key , user , default = False ):
34+ @defer .inlineCallbacks
35+ def run (should_retry ):
36+ # noinspection PyBroadException
37+ try :
38+ if self ._offline :
39+ defer .returnValue (default )
40+ val = yield self ._toggle (key , user , default )
41+ self ._send ({'kind' : 'feature' , 'key' : key , 'user' : user , 'value' : val })
42+ defer .returnValue (val )
43+ except ProtocolError as e :
44+ inner = e .args [1 ]
45+ if inner .errno == errno .ECONNRESET and should_retry :
46+ log .warning ('ProtocolError exception caught while getting flag. Retrying.' )
47+ d = yield run (False )
48+ defer .returnValue (d )
49+ else :
50+ log .exception ('Unhandled exception. Returning default value for flag.' )
51+ defer .returnValue (default )
52+ except Exception :
53+ log .exception ('Unhandled exception. Returning default value for flag.' )
54+ defer .returnValue (default )
55+
56+ return run (True )
57+
58+ @defer .inlineCallbacks
59+ def _toggle (self , key , user , default ):
60+ if self ._config ._stream and self ._stream_processor .initialized ():
61+ feature = self ._stream_processor .get_feature (key )
62+ else :
63+ hdrs = _headers (self ._api_key )
64+ uri = self ._config ._base_uri + '/api/eval/features/' + key
65+ r = yield self ._session .get (uri , headers = hdrs , timeout = (self ._config ._connect , self ._config ._read ))
66+ r .raise_for_status ()
67+ feature = r .json ()
68+ val = _evaluate (feature , user )
69+ if val is None :
70+ val = default
71+ defer .returnValue (val )
72+
73+
74+ class TwistedConfig (Config ):
75+ def __init__ (self , * args , ** kwargs ):
76+ super (TwistedConfig , self ).__init__ (* args , ** kwargs )
77+ self ._stream_processor_class = TwistedStreamProcessor
78+
79+
80+ class TwistedStreamProcessor (object ):
81+
82+ def __init__ (self , api_key , config ):
83+ self ._store = config ._feature_store_class ()
84+ self .sse_client = TwistedSSEClient (config ._stream_uri + "/" , headers = _stream_headers (api_key ,
85+ "PythonTwistedClient" ),
86+ verify = config ._verify ,
87+ on_event = partial (StreamProcessor .process_message , self ._store ))
88+ self .running = False
89+
90+ def start (self ):
91+ self .sse_client .start ()
92+ self .running = True
93+
94+ def stop (self ):
95+ self .sse_client .stop ()
96+
97+ def get_feature (self , key ):
98+ return self ._store .get (key )
99+
100+ def initialized (self ):
101+ return self ._store .initialized ()
102+
103+ def is_alive (self ):
104+ return self .running
105+
106+
107+ class TwistedConsumer (object ):
108+ def __init__ (self , session , queue , api_key , config ):
109+ self ._queue = queue
110+ """ @type: queue.Queue """
111+ self ._session = session
112+ """ :type: txrequests.Session """
113+
114+ self ._api_key = api_key
115+ self ._config = config
116+ """ :type: Deferred """
117+ self ._looping_call = None
118+ """ :type: LoopingCall"""
119+
120+ def start (self ):
121+ self ._flushed = defer .Deferred ()
122+ self ._looping_call = task .LoopingCall (self ._consume )
123+ self ._looping_call .start (5 )
124+
125+ def stop (self ):
126+ self ._looping_call .stop ()
127+
128+ def is_alive (self ):
129+ return self ._looping_call is not None and self ._looping_call .running
130+
131+ def flush (self ):
132+ return self ._consume ()
133+
134+ def _consume (self ):
135+ items = []
136+ try :
137+ while True :
138+ items .append (self ._queue .get_nowait ())
139+ except Empty :
140+ pass
141+
142+ if items :
143+ return self .send_batch (items )
144+
145+ @defer .inlineCallbacks
146+ def send_batch (self , events ):
147+ @defer .inlineCallbacks
148+ def do_send (should_retry ):
149+ # noinspection PyBroadException
150+ try :
151+ if isinstance (events , dict ):
152+ body = [events ]
153+ else :
154+ body = events
155+ hdrs = _headers (self ._api_key )
156+ uri = self ._config ._base_uri + '/api/events/bulk'
157+ r = yield self ._session .post (uri , headers = hdrs , timeout = (self ._config ._connect , self ._config ._read ),
158+ data = json .dumps (body ))
159+ r .raise_for_status ()
160+ except ProtocolError as e :
161+ inner = e .args [1 ]
162+ if inner .errno == errno .ECONNRESET and should_retry :
163+ log .warning ('ProtocolError exception caught while sending events. Retrying.' )
164+ yield do_send (False )
165+ else :
166+ log .exception ('Unhandled exception in event consumer. Analytics events were not processed.' )
167+ except :
168+ log .exception ('Unhandled exception in event consumer. Analytics events were not processed.' )
169+ try :
170+ yield do_send (True )
171+ finally :
172+ for _ in events :
173+ self ._queue .task_done ()
0 commit comments