-
Notifications
You must be signed in to change notification settings - Fork 46
Add twisted support #18 #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ab76809
55779c5
df73979
38f7211
b9112b2
66c1eab
faa0f25
6ee365e
d5f99a8
94b04a7
fb2141c
20649e9
9a77cf2
9925af5
a8b67f0
24e117c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,9 @@ | ||
venv | ||
p2venv | ||
*.pyc | ||
dist | ||
ldclient_py.egg-info | ||
build/ | ||
test.py | ||
.idea | ||
*.iml | ||
test.py |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,15 @@ | ||
dependencies: | ||
pre: | ||
- pyenv shell 2.6.8; $(pyenv which pip) install --upgrade pip | ||
- pyenv shell 2.7.10; $(pyenv which pip) install --upgrade pip | ||
- pyenv shell 3.3.3; $(pyenv which pip) install --upgrade pip | ||
- pyenv shell 2.6.8; $(pyenv which pip) install -r test-requirements.txt | ||
- pyenv shell 2.7.10; $(pyenv which pip) install -r test-requirements.txt | ||
- pyenv shell 3.3.3; $(pyenv which pip) install -r test-requirements.txt | ||
- pyenv shell 2.6.8; $(pyenv which python) setup.py install | ||
- pyenv shell 2.7.10; $(pyenv which pip) install -r twisted-requirements.txt | ||
- pyenv shell 3.3.3; $(pyenv which pip) install -r twisted-requirements.txt | ||
- pyenv shell 2.7.10; $(pyenv which python) setup.py install | ||
- pyenv shell 3.3.3; $(pyenv which python) setup.py install | ||
|
||
test: | ||
override: | ||
- pyenv shell 2.6.8; $(pyenv which py.test) testing | ||
- pyenv shell 3.3.3; $(pyenv which py.test) testing | ||
- pyenv shell 2.7.10; $(pyenv which py.test) testing | ||
- pyenv shell 3.3.3; $(pyenv which py.test) --ignore=testing/test_sse_twisted.py -s testing |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from __future__ import print_function | ||
from ldclient.twisted import TwistedLDClient | ||
from twisted.internet import task, defer | ||
|
||
@defer.inlineCallbacks | ||
def main(reactor): | ||
apiKey = 'whatever' | ||
client = TwistedLDClient(apiKey) | ||
user = { | ||
u'key': u'xyz', | ||
u'custom': { | ||
u'bizzle': u'def' | ||
} | ||
} | ||
val = yield client.toggle('foo', user) | ||
yield client.flush() | ||
print("Value: {}".format(val)) | ||
|
||
if __name__ == '__main__': | ||
task.react(main) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,10 +55,14 @@ def __init__(self, | |
capacity = 10000, | ||
stream_uri = 'https://stream.launchdarkly.com', | ||
stream = False, | ||
verify = True): | ||
verify = True, | ||
stream_processor_class = None, | ||
feature_store_class = None): | ||
self._base_uri = base_uri.rstrip('\\') | ||
self._stream_uri = stream_uri.rstrip('\\') | ||
self._stream = stream | ||
self._stream_processor_class = StreamProcessor if not stream_processor_class else stream_processor_class | ||
self._feature_store_class = InMemoryFeatureStore if not feature_store_class else feature_store_class | ||
self._connect = connect_timeout | ||
self._read = read_timeout | ||
self._upload_limit = upload_limit | ||
|
@@ -135,34 +139,45 @@ def __init__(self, api_key, config): | |
self.daemon = True | ||
self._api_key = api_key | ||
self._config = config | ||
self._store = InMemoryFeatureStore() | ||
self._store = config._feature_store_class() | ||
self._running = False | ||
|
||
def run(self): | ||
log.debug("Starting stream processor") | ||
self._running = True | ||
hdrs = _stream_headers(self._api_key) | ||
uri = self._config._stream_uri + "/" | ||
messages = SSEClient(uri, verify = self._config._verify, headers = hdrs) | ||
for msg in messages: | ||
payload = json.loads(msg.data) | ||
if msg.event == 'put/features': | ||
self._store.init(payload) | ||
elif msg.event == 'patch/features': | ||
key = payload['path'][1:] | ||
feature = payload['data'] | ||
self._store.upsert(key, feature) | ||
elif msg.event == 'delete/features': | ||
key = payload['path'][1:] | ||
version = payload['version'] | ||
self._store.delete(key, version) | ||
else: | ||
log.warning('Unhandled event in stream processor: ' + msg.event) | ||
if not self._running: | ||
break | ||
self.process_message(self._store, msg) | ||
|
||
def initialized(self): | ||
return self._store.initialized() | ||
|
||
def get_feature(self, key): | ||
return self._store.get(key) | ||
|
||
def stop(self): | ||
self._running = False | ||
|
||
@staticmethod | ||
def process_message(store, msg): | ||
payload = json.loads(msg.data) | ||
if msg.event == 'put': | ||
store.init(payload) | ||
elif msg.event == 'patch': | ||
key = payload['path'][1:] | ||
feature = payload['data'] | ||
store.upsert(key, feature) | ||
elif msg.event == 'delete': | ||
key = payload['path'][1:] | ||
version = payload['version'] | ||
store.delete(key, version) | ||
else: | ||
log.warning('Unhandled event in stream processor: ' + msg.event) | ||
|
||
class Consumer(Thread): | ||
def __init__(self, queue, api_key, config): | ||
Thread.__init__(self) | ||
|
@@ -251,8 +266,9 @@ def __init__(self, api_key, config = None): | |
self._consumer = None | ||
self._offline = False | ||
self._lock = Lock() | ||
self._stream_processor = None | ||
if self._config._stream: | ||
self._stream_processor = StreamProcessor(api_key, config) | ||
self._stream_processor = config._stream_processor_class(api_key, config) | ||
self._stream_processor.start() | ||
|
||
def _check_consumer(self): | ||
|
@@ -261,9 +277,11 @@ def _check_consumer(self): | |
self._consumer = Consumer(self._queue, self._api_key, self._config) | ||
self._consumer.start() | ||
|
||
def _stop_consumer(self): | ||
def _stop_consumers(self): | ||
if self._consumer and self._consumer.is_alive(): | ||
self._consumer.stop() | ||
if self._stream_processor and self._stream_processor.is_alive(): | ||
self._stream_processor.stop() | ||
|
||
def _send(self, event): | ||
if self._offline: | ||
|
@@ -283,7 +301,7 @@ def identify(self, user): | |
|
||
def set_offline(self): | ||
self._offline = True | ||
self._stop_consumer() | ||
self._stop_consumers() | ||
|
||
def set_online(self): | ||
self._offline = False | ||
|
@@ -339,8 +357,11 @@ def _toggle(self, key, user, default): | |
def _headers(api_key): | ||
return {'Authorization': 'api_key ' + api_key, 'User-Agent': 'PythonClient/' + __version__, 'Content-Type': "application/json"} | ||
|
||
def _stream_headers(api_key): | ||
return {'Authorization': 'api_key ' + api_key, 'User-Agent': 'PythonClient/' + __version__, 'Accept': "text/event-stream"} | ||
def _stream_headers(api_key, client="PythonClient"): | ||
return {'Authorization': 'api_key ' + api_key, | ||
'User-Agent': 'PythonClient/' + __version__, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to set the |
||
'Cache-Control': 'no-cache', | ||
'Accept': "text/event-stream"} | ||
|
||
def _param_for_user(feature, user): | ||
if 'key' in user and user['key']: | ||
|
@@ -420,5 +441,4 @@ def _evaluate(feature, user): | |
total += float(variation['weight']) / 100.0 | ||
if param < total: | ||
return variation['value'] | ||
|
||
return None |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
from __future__ import absolute_import | ||
from functools import partial | ||
|
||
import json | ||
from queue import Empty | ||
import errno | ||
from cachecontrol import CacheControl | ||
from ldclient import LDClient, _headers, log, _evaluate, _stream_headers, StreamProcessor, Config | ||
from ldclient.twisted_sse import TwistedSSEClient | ||
from requests.packages.urllib3.exceptions import ProtocolError | ||
from twisted.internet import task, defer | ||
import txrequests | ||
|
||
|
||
class TwistedLDClient(LDClient): | ||
def __init__(self, api_key, config=None): | ||
if config is None: | ||
config = TwistedConfig.default() | ||
super(TwistedLDClient, self).__init__(api_key, config) | ||
self._session = CacheControl(txrequests.Session()) | ||
|
||
def _check_consumer(self): | ||
if not self._consumer or not self._consumer.is_alive(): | ||
self._consumer = TwistedConsumer(self._session, self._queue, self._api_key, self._config) | ||
self._consumer.start() | ||
|
||
def flush(self): | ||
if self._offline: | ||
return defer.succeed(True) | ||
self._check_consumer() | ||
return self._consumer.flush() | ||
|
||
def toggle(self, key, user, default=False): | ||
@defer.inlineCallbacks | ||
def run(should_retry): | ||
# noinspection PyBroadException | ||
try: | ||
if self._offline: | ||
defer.returnValue(default) | ||
val = yield self._toggle(key, user, default) | ||
self._send({'kind': 'feature', 'key': key, 'user': user, 'value': val}) | ||
defer.returnValue(val) | ||
except ProtocolError as e: | ||
inner = e.args[1] | ||
if inner.errno == errno.ECONNRESET and should_retry: | ||
log.warning('ProtocolError exception caught while getting flag. Retrying.') | ||
d = yield run(False) | ||
defer.returnValue(d) | ||
else: | ||
log.exception('Unhandled exception. Returning default value for flag.') | ||
defer.returnValue(default) | ||
except Exception: | ||
log.exception('Unhandled exception. Returning default value for flag.') | ||
defer.returnValue(default) | ||
|
||
return run(True) | ||
|
||
@defer.inlineCallbacks | ||
def _toggle(self, key, user, default): | ||
if self._config._stream and self._stream_processor.initialized(): | ||
feature = self._stream_processor.get_feature(key) | ||
else: | ||
hdrs = _headers(self._api_key) | ||
uri = self._config._base_uri + '/api/eval/features/' + key | ||
r = yield self._session.get(uri, headers=hdrs, timeout=(self._config._connect, self._config._read)) | ||
r.raise_for_status() | ||
feature = r.json() | ||
val = _evaluate(feature, user) | ||
if val is None: | ||
val = default | ||
defer.returnValue(val) | ||
|
||
|
||
class TwistedConfig(Config): | ||
def __init__(self, *args, **kwargs): | ||
super(TwistedConfig, self).__init__(*args, **kwargs) | ||
self._stream_processor_class = TwistedStreamProcessor | ||
|
||
|
||
class TwistedStreamProcessor(object): | ||
|
||
def __init__(self, api_key, config): | ||
self._store = config._feature_store_class() | ||
self.sse_client = TwistedSSEClient(config._stream_uri + "/", headers=_stream_headers(api_key, | ||
"PythonTwistedClient"), | ||
verify=config._verify, | ||
on_event=partial(StreamProcessor.process_message, self._store)) | ||
self.running = False | ||
|
||
def start(self): | ||
self.sse_client.start() | ||
self.running = True | ||
|
||
def stop(self): | ||
self.sse_client.stop() | ||
|
||
def get_feature(self, key): | ||
return self._store.get(key) | ||
|
||
def initialized(self): | ||
return self._store.initialized() | ||
|
||
def is_alive(self): | ||
return self.running | ||
|
||
|
||
class TwistedConsumer(object): | ||
def __init__(self, session, queue, api_key, config): | ||
self._queue = queue | ||
""" @type: queue.Queue """ | ||
self._session = session | ||
""" :type: txrequests.Session """ | ||
|
||
self._api_key = api_key | ||
self._config = config | ||
""" :type: Deferred """ | ||
self._looping_call = None | ||
""" :type: LoopingCall""" | ||
|
||
def start(self): | ||
self._flushed = defer.Deferred() | ||
self._looping_call = task.LoopingCall(self._consume) | ||
self._looping_call.start(5) | ||
|
||
def stop(self): | ||
self._looping_call.stop() | ||
|
||
def is_alive(self): | ||
return self._looping_call is not None and self._looping_call.running | ||
|
||
def flush(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If flush is called on an empty queue, it will block until the next time I also think This is important for our integration tests-- our test harness connects to the same Pubnub channel as the dev console and checks for the presence of events to verify that the client is sending them properly. For the SDKs that implement flush as I've described it, the harness runs in about 5 minutes. Without flush, the twisted tests have been running for 30 minutes so far since we're waiting up to 5 seconds for each event to appear. On the plus side the integration tests are passing 👍 |
||
return self._consume() | ||
|
||
def _consume(self): | ||
items = [] | ||
try: | ||
while True: | ||
items.append(self._queue.get_nowait()) | ||
except Empty: | ||
pass | ||
|
||
if items: | ||
return self.send_batch(items) | ||
|
||
@defer.inlineCallbacks | ||
def send_batch(self, events): | ||
@defer.inlineCallbacks | ||
def do_send(should_retry): | ||
# noinspection PyBroadException | ||
try: | ||
if isinstance(events, dict): | ||
body = [events] | ||
else: | ||
body = events | ||
hdrs = _headers(self._api_key) | ||
uri = self._config._base_uri + '/api/events/bulk' | ||
r = yield self._session.post(uri, headers=hdrs, timeout=(self._config._connect, self._config._read), | ||
data=json.dumps(body)) | ||
r.raise_for_status() | ||
except ProtocolError as e: | ||
inner = e.args[1] | ||
if inner.errno == errno.ECONNRESET and should_retry: | ||
log.warning('ProtocolError exception caught while sending events. Retrying.') | ||
yield do_send(False) | ||
else: | ||
log.exception('Unhandled exception in event consumer. Analytics events were not processed.') | ||
except: | ||
log.exception('Unhandled exception in event consumer. Analytics events were not processed.') | ||
try: | ||
yield do_send(True) | ||
finally: | ||
for _ in events: | ||
self._queue.task_done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're making one minor change to the streaming SDKs-- rather than subscribe to the root resource
https://stream.launchdarkly.com/
, we're going to subscribe tohttps:/stream.launchdarkly.com/features
.We're doing this because we're going to start adding other things to
stream.ld.com
that are not relevant to the SDKs, and the root resource is intended to subscribe you to everything.Besides changing the URI here, you'll also need to change the event names (e.g.
put/features
becomesput
). Here's the change we made to ruby, as a reference:launchdarkly/ruby-server-sdk@e2aeae5
I was going to do this myself, but since your PR is still out I'd either have to fork your fork, or make the change on master and make you deal with the conflict. I figure just having you do it is easiest.