Skip to content

Add twisted support #18 #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Sep 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
venv
p2venv
*.pyc
dist
ldclient_py.egg-info
build/
test.py
.idea
*.iml
test.py
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ Development information (for developing this module itself)

pip install -r requirements.txt
pip install -r test-requirements.txt
pip install -r twisted-requirements.txt

2. Run tests:

$ py.test
$ py.test testing


Learn more
Expand Down
12 changes: 7 additions & 5 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
dependencies:
pre:
- pyenv shell 2.6.8; $(pyenv which pip) install --upgrade pip
- pyenv shell 2.7.10; $(pyenv which pip) install --upgrade pip
- pyenv shell 3.3.3; $(pyenv which pip) install --upgrade pip
- pyenv shell 2.6.8; $(pyenv which pip) install -r test-requirements.txt
- pyenv shell 2.7.10; $(pyenv which pip) install -r test-requirements.txt
- pyenv shell 3.3.3; $(pyenv which pip) install -r test-requirements.txt
- pyenv shell 2.6.8; $(pyenv which python) setup.py install
- pyenv shell 2.7.10; $(pyenv which pip) install -r twisted-requirements.txt
- pyenv shell 3.3.3; $(pyenv which pip) install -r twisted-requirements.txt
- pyenv shell 2.7.10; $(pyenv which python) setup.py install
- pyenv shell 3.3.3; $(pyenv which python) setup.py install

test:
override:
- pyenv shell 2.6.8; $(pyenv which py.test) testing
- pyenv shell 3.3.3; $(pyenv which py.test) testing
- pyenv shell 2.7.10; $(pyenv which py.test) testing
- pyenv shell 3.3.3; $(pyenv which py.test) --ignore=testing/test_sse_twisted.py -s testing
20 changes: 20 additions & 0 deletions demo/demo_twisted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import print_function
from ldclient.twisted import TwistedLDClient
from twisted.internet import task, defer

@defer.inlineCallbacks
def main(reactor):
apiKey = 'whatever'
client = TwistedLDClient(apiKey)
user = {
u'key': u'xyz',
u'custom': {
u'bizzle': u'def'
}
}
val = yield client.toggle('foo', user)
yield client.flush()
print("Value: {}".format(val))

if __name__ == '__main__':
task.react(main)
62 changes: 41 additions & 21 deletions ldclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ def __init__(self,
capacity = 10000,
stream_uri = 'https://stream.launchdarkly.com',
stream = False,
verify = True):
verify = True,
stream_processor_class = None,
feature_store_class = None):
self._base_uri = base_uri.rstrip('\\')
self._stream_uri = stream_uri.rstrip('\\')
self._stream = stream
self._stream_processor_class = StreamProcessor if not stream_processor_class else stream_processor_class
self._feature_store_class = InMemoryFeatureStore if not feature_store_class else feature_store_class
self._connect = connect_timeout
self._read = read_timeout
self._upload_limit = upload_limit
Expand Down Expand Up @@ -135,34 +139,45 @@ def __init__(self, api_key, config):
self.daemon = True
self._api_key = api_key
self._config = config
self._store = InMemoryFeatureStore()
self._store = config._feature_store_class()
self._running = False

def run(self):
log.debug("Starting stream processor")
self._running = True
hdrs = _stream_headers(self._api_key)
uri = self._config._stream_uri + "/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're making one minor change to the streaming SDKs-- rather than subscribe to the root resource https://stream.launchdarkly.com/, we're going to subscribe to https:/stream.launchdarkly.com/features.

We're doing this because we're going to start adding other things to stream.ld.com that are not relevant to the SDKs, and the root resource is intended to subscribe you to everything.

Besides changing the URI here, you'll also need to change the event names (e.g. put/features becomes put). Here's the change we made to ruby, as a reference:

launchdarkly/ruby-server-sdk@e2aeae5

I was going to do this myself, but since your PR is still out I'd either have to fork your fork, or make the change on master and make you deal with the conflict. I figure just having you do it is easiest.

messages = SSEClient(uri, verify = self._config._verify, headers = hdrs)
for msg in messages:
payload = json.loads(msg.data)
if msg.event == 'put/features':
self._store.init(payload)
elif msg.event == 'patch/features':
key = payload['path'][1:]
feature = payload['data']
self._store.upsert(key, feature)
elif msg.event == 'delete/features':
key = payload['path'][1:]
version = payload['version']
self._store.delete(key, version)
else:
log.warning('Unhandled event in stream processor: ' + msg.event)
if not self._running:
break
self.process_message(self._store, msg)

def initialized(self):
return self._store.initialized()

def get_feature(self, key):
return self._store.get(key)

def stop(self):
self._running = False

@staticmethod
def process_message(store, msg):
payload = json.loads(msg.data)
if msg.event == 'put':
store.init(payload)
elif msg.event == 'patch':
key = payload['path'][1:]
feature = payload['data']
store.upsert(key, feature)
elif msg.event == 'delete':
key = payload['path'][1:]
version = payload['version']
store.delete(key, version)
else:
log.warning('Unhandled event in stream processor: ' + msg.event)

class Consumer(Thread):
def __init__(self, queue, api_key, config):
Thread.__init__(self)
Expand Down Expand Up @@ -251,8 +266,9 @@ def __init__(self, api_key, config = None):
self._consumer = None
self._offline = False
self._lock = Lock()
self._stream_processor = None
if self._config._stream:
self._stream_processor = StreamProcessor(api_key, config)
self._stream_processor = config._stream_processor_class(api_key, config)
self._stream_processor.start()

def _check_consumer(self):
Expand All @@ -261,9 +277,11 @@ def _check_consumer(self):
self._consumer = Consumer(self._queue, self._api_key, self._config)
self._consumer.start()

def _stop_consumer(self):
def _stop_consumers(self):
if self._consumer and self._consumer.is_alive():
self._consumer.stop()
if self._stream_processor and self._stream_processor.is_alive():
self._stream_processor.stop()

def _send(self, event):
if self._offline:
Expand All @@ -283,7 +301,7 @@ def identify(self, user):

def set_offline(self):
self._offline = True
self._stop_consumer()
self._stop_consumers()

def set_online(self):
self._offline = False
Expand Down Expand Up @@ -339,8 +357,11 @@ def _toggle(self, key, user, default):
def _headers(api_key):
return {'Authorization': 'api_key ' + api_key, 'User-Agent': 'PythonClient/' + __version__, 'Content-Type': "application/json"}

def _stream_headers(api_key):
return {'Authorization': 'api_key ' + api_key, 'User-Agent': 'PythonClient/' + __version__, 'Accept': "text/event-stream"}
def _stream_headers(api_key, client="PythonClient"):
return {'Authorization': 'api_key ' + api_key,
'User-Agent': 'PythonClient/' + __version__,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to set the User-Agent to TwistedPythonClient since the codebases are so different

'Cache-Control': 'no-cache',
'Accept': "text/event-stream"}

def _param_for_user(feature, user):
if 'key' in user and user['key']:
Expand Down Expand Up @@ -420,5 +441,4 @@ def _evaluate(feature, user):
total += float(variation['weight']) / 100.0
if param < total:
return variation['value']

return None
173 changes: 173 additions & 0 deletions ldclient/twisted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from __future__ import absolute_import
from functools import partial

import json
from queue import Empty
import errno
from cachecontrol import CacheControl
from ldclient import LDClient, _headers, log, _evaluate, _stream_headers, StreamProcessor, Config
from ldclient.twisted_sse import TwistedSSEClient
from requests.packages.urllib3.exceptions import ProtocolError
from twisted.internet import task, defer
import txrequests


class TwistedLDClient(LDClient):
def __init__(self, api_key, config=None):
if config is None:
config = TwistedConfig.default()
super(TwistedLDClient, self).__init__(api_key, config)
self._session = CacheControl(txrequests.Session())

def _check_consumer(self):
if not self._consumer or not self._consumer.is_alive():
self._consumer = TwistedConsumer(self._session, self._queue, self._api_key, self._config)
self._consumer.start()

def flush(self):
if self._offline:
return defer.succeed(True)
self._check_consumer()
return self._consumer.flush()

def toggle(self, key, user, default=False):
@defer.inlineCallbacks
def run(should_retry):
# noinspection PyBroadException
try:
if self._offline:
defer.returnValue(default)
val = yield self._toggle(key, user, default)
self._send({'kind': 'feature', 'key': key, 'user': user, 'value': val})
defer.returnValue(val)
except ProtocolError as e:
inner = e.args[1]
if inner.errno == errno.ECONNRESET and should_retry:
log.warning('ProtocolError exception caught while getting flag. Retrying.')
d = yield run(False)
defer.returnValue(d)
else:
log.exception('Unhandled exception. Returning default value for flag.')
defer.returnValue(default)
except Exception:
log.exception('Unhandled exception. Returning default value for flag.')
defer.returnValue(default)

return run(True)

@defer.inlineCallbacks
def _toggle(self, key, user, default):
if self._config._stream and self._stream_processor.initialized():
feature = self._stream_processor.get_feature(key)
else:
hdrs = _headers(self._api_key)
uri = self._config._base_uri + '/api/eval/features/' + key
r = yield self._session.get(uri, headers=hdrs, timeout=(self._config._connect, self._config._read))
r.raise_for_status()
feature = r.json()
val = _evaluate(feature, user)
if val is None:
val = default
defer.returnValue(val)


class TwistedConfig(Config):
def __init__(self, *args, **kwargs):
super(TwistedConfig, self).__init__(*args, **kwargs)
self._stream_processor_class = TwistedStreamProcessor


class TwistedStreamProcessor(object):

def __init__(self, api_key, config):
self._store = config._feature_store_class()
self.sse_client = TwistedSSEClient(config._stream_uri + "/", headers=_stream_headers(api_key,
"PythonTwistedClient"),
verify=config._verify,
on_event=partial(StreamProcessor.process_message, self._store))
self.running = False

def start(self):
self.sse_client.start()
self.running = True

def stop(self):
self.sse_client.stop()

def get_feature(self, key):
return self._store.get(key)

def initialized(self):
return self._store.initialized()

def is_alive(self):
return self.running


class TwistedConsumer(object):
def __init__(self, session, queue, api_key, config):
self._queue = queue
""" @type: queue.Queue """
self._session = session
""" :type: txrequests.Session """

self._api_key = api_key
self._config = config
""" :type: Deferred """
self._looping_call = None
""" :type: LoopingCall"""

def start(self):
self._flushed = defer.Deferred()
self._looping_call = task.LoopingCall(self._consume)
self._looping_call.start(5)

def stop(self):
self._looping_call.stop()

def is_alive(self):
return self._looping_call is not None and self._looping_call.running

def flush(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If flush is called on an empty queue, it will block until the next time _consume has actual work to do. This could be longer than 5 seconds if the queue is empty.

I also think flush should force an immediate request with the current queue contents, rather than merely acting as a synchronization point indicating that the next _consume cycle has completed.

This is important for our integration tests-- our test harness connects to the same Pubnub channel as the dev console and checks for the presence of events to verify that the client is sending them properly. For the SDKs that implement flush as I've described it, the harness runs in about 5 minutes. Without flush, the twisted tests have been running for 30 minutes so far since we're waiting up to 5 seconds for each event to appear.

On the plus side the integration tests are passing 👍

return self._consume()

def _consume(self):
items = []
try:
while True:
items.append(self._queue.get_nowait())
except Empty:
pass

if items:
return self.send_batch(items)

@defer.inlineCallbacks
def send_batch(self, events):
@defer.inlineCallbacks
def do_send(should_retry):
# noinspection PyBroadException
try:
if isinstance(events, dict):
body = [events]
else:
body = events
hdrs = _headers(self._api_key)
uri = self._config._base_uri + '/api/events/bulk'
r = yield self._session.post(uri, headers=hdrs, timeout=(self._config._connect, self._config._read),
data=json.dumps(body))
r.raise_for_status()
except ProtocolError as e:
inner = e.args[1]
if inner.errno == errno.ECONNRESET and should_retry:
log.warning('ProtocolError exception caught while sending events. Retrying.')
yield do_send(False)
else:
log.exception('Unhandled exception in event consumer. Analytics events were not processed.')
except:
log.exception('Unhandled exception in event consumer. Analytics events were not processed.')
try:
yield do_send(True)
finally:
for _ in events:
self._queue.task_done()
Loading