Skip to content

Commit 650d33a

Browse files
committed
LLD implementations of requester and processor
* Add RedisLDDRequester to pull features from LDD-populated redis * Add ExpiringDict from master to add caching to RedisLDDRequester * Add 'events' config parameter to control whether events are sent to LD * Add new 'redis' optional deps for sync redis RedisLDDRequester
1 parent dd2516a commit 650d33a

10 files changed

+287
-7
lines changed

NOTICE.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
This product includes software (ExpiringDict) developed by
2+
Mailgun (https://github.com/mailgun/expiringdict).

ldclient/client.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,23 @@ def __init__(self,
3232
stream=False,
3333
verify=True,
3434
defaults=None,
35+
events=True,
3536
stream_processor_class=None,
3637
feature_store_class=None,
3738
feature_requester_class=None,
3839
consumer_class=None):
40+
"""
41+
42+
:param stream_processor_class: A factory for a StreamProcessor implementation taking the api key, config,
43+
and FeatureStore implementation
44+
:type stream_processor_class: (str, Config, FeatureStore) -> StreamProcessor
45+
:param feature_store_class: A factory for a FeatureStore implementation
46+
:type feature_store_class: () -> FeatureStore
47+
:param feature_requester_class: A factory for a FeatureRequester implementation taking the api key and config
48+
:type feature_requester_class: (str, Config) -> FeatureRequester
49+
:param consumer_class: A factory for an EventConsumer implementation taking the event queue, api key, and config
50+
:type consumer_class: (queue.Queue, str, Config) -> EventConsumer
51+
"""
3952
if defaults is None:
4053
defaults = {}
4154

@@ -53,6 +66,7 @@ def __init__(self,
5366
self.capacity = capacity
5467
self.verify = verify
5568
self.defaults = defaults
69+
self.events = events
5670

5771
def get_default(self, key, default):
5872
return default if key not in self.defaults else self.defaults[key]
@@ -164,7 +178,7 @@ def _stop_consumers(self):
164178
self._stream_processor.stop()
165179

166180
def _send(self, event):
167-
if self._offline:
181+
if self._offline or not self._config.events:
168182
return
169183
self._check_consumer()
170184
event['creationDate'] = int(time.time() * 1000)

ldclient/expiringdict.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
'''
2+
Dictionary with auto-expiring values for caching purposes.
3+
4+
Expiration happens on any access, object is locked during cleanup from expired
5+
values. Can not store more than max_len elements - the oldest will be deleted.
6+
7+
>>> ExpiringDict(max_len=100, max_age_seconds=10)
8+
9+
The values stored in the following way:
10+
{
11+
key1: (value1, created_time1),
12+
key2: (value2, created_time2)
13+
}
14+
15+
NOTE: iteration over dict and also keys() do not remove expired values!
16+
17+
Copied from https://github.com/mailgun/expiringdict/commit/d17d071721dd12af6829819885a74497492d7fb7 under the APLv2
18+
'''
19+
20+
import time
21+
from threading import RLock
22+
23+
try:
24+
from collections import OrderedDict
25+
except ImportError:
26+
# Python < 2.7
27+
from ordereddict import OrderedDict
28+
29+
30+
class ExpiringDict(OrderedDict):
31+
def __init__(self, max_len, max_age_seconds):
32+
assert max_age_seconds >= 0
33+
assert max_len >= 1
34+
35+
OrderedDict.__init__(self)
36+
self.max_len = max_len
37+
self.max_age = max_age_seconds
38+
self.lock = RLock()
39+
40+
def __contains__(self, key):
41+
""" Return True if the dict has a key, else return False. """
42+
try:
43+
with self.lock:
44+
item = OrderedDict.__getitem__(self, key)
45+
if time.time() - item[1] < self.max_age:
46+
return True
47+
else:
48+
del self[key]
49+
except KeyError:
50+
pass
51+
return False
52+
53+
def __getitem__(self, key, with_age=False):
54+
""" Return the item of the dict.
55+
56+
Raises a KeyError if key is not in the map.
57+
"""
58+
with self.lock:
59+
item = OrderedDict.__getitem__(self, key)
60+
item_age = time.time() - item[1]
61+
if item_age < self.max_age:
62+
if with_age:
63+
return item[0], item_age
64+
else:
65+
return item[0]
66+
else:
67+
del self[key]
68+
raise KeyError(key)
69+
70+
def __setitem__(self, key, value):
71+
""" Set d[key] to value. """
72+
with self.lock:
73+
if len(self) == self.max_len:
74+
self.popitem(last=False)
75+
OrderedDict.__setitem__(self, key, (value, time.time()))
76+
77+
def pop(self, key, default=None):
78+
""" Get item from the dict and remove it.
79+
80+
Return default if expired or does not exist. Never raise KeyError.
81+
"""
82+
with self.lock:
83+
try:
84+
item = OrderedDict.__getitem__(self, key)
85+
del self[key]
86+
return item[0]
87+
except KeyError:
88+
return default
89+
90+
def ttl(self, key):
91+
""" Return TTL of the `key` (in seconds).
92+
93+
Returns None for non-existent or expired keys.
94+
"""
95+
key_value, key_age = self.get(key, with_age=True)
96+
if key_age:
97+
key_ttl = self.max_age - key_age
98+
if key_ttl > 0:
99+
return key_ttl
100+
return None
101+
102+
def get(self, key, default=None, with_age=False):
103+
" Return the value for key if key is in the dictionary, else default. "
104+
try:
105+
return self.__getitem__(key, with_age)
106+
except KeyError:
107+
if with_age:
108+
return default, None
109+
else:
110+
return default
111+
112+
def items(self):
113+
""" Return a copy of the dictionary's list of (key, value) pairs. """
114+
r = []
115+
for key in self:
116+
try:
117+
r.append((key, self[key]))
118+
except KeyError:
119+
pass
120+
return r
121+
122+
def values(self):
123+
""" Return a copy of the dictionary's list of values.
124+
See the note for dict.items(). """
125+
r = []
126+
for key in self:
127+
try:
128+
r.append(self[key])
129+
except KeyError:
130+
pass
131+
return r
132+
133+
def fromkeys(self):
134+
" Create a new dictionary with keys from seq and values set to value. "
135+
raise NotImplementedError()
136+
137+
def iteritems(self):
138+
""" Return an iterator over the dictionary's (key, value) pairs. """
139+
raise NotImplementedError()
140+
141+
def itervalues(self):
142+
""" Return an iterator over the dictionary's values. """
143+
raise NotImplementedError()
144+
145+
def viewitems(self):
146+
" Return a new view of the dictionary's items ((key, value) pairs). "
147+
raise NotImplementedError()
148+
149+
def viewkeys(self):
150+
""" Return a new view of the dictionary's keys. """
151+
raise NotImplementedError()
152+
153+
def viewvalues(self):
154+
""" Return a new view of the dictionary's values. """
155+
raise NotImplementedError()

ldclient/redis_requester.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import json
2+
from ldclient.expiringdict import ExpiringDict
3+
from ldclient.interfaces import FeatureRequester
4+
import redis
5+
6+
7+
# noinspection PyUnusedLocal
8+
def create_redis_ldd_requester(api_key, config, store, **kwargs):
9+
return RedisLDDRequester(config, **kwargs)
10+
11+
12+
class RedisLDDRequester(FeatureRequester):
13+
"""
14+
Requests features from redis, usually stored via the LaunchDarkly Daemon (LDD). Recommended to be combined
15+
with the ExpiringInMemoryFeatureStore
16+
"""
17+
def __init__(self, config,
18+
expiration=15,
19+
redis_host='localhost',
20+
redis_port=6379,
21+
redis_prefix='launchdarkly'):
22+
"""
23+
:type config: Config
24+
"""
25+
self._redis_host = redis_host
26+
self._redis_port = redis_port
27+
self._features_key = "{}:features".format(redis_prefix)
28+
self._cache = ExpiringDict(max_len=config.capacity, max_age_seconds=expiration)
29+
self._pool = None
30+
31+
def _get_connection(self):
32+
if self._pool is None:
33+
self._pool = redis.ConnectionPool(host=self._redis_host, port=self._redis_port)
34+
return redis.Redis(connection_pool=self._pool)
35+
36+
def get(self, key, callback):
37+
cached = self._cache.get(key)
38+
if cached is not None:
39+
return cached
40+
else:
41+
rd = self._get_connection()
42+
raw = rd.hget(self._features_key, key)
43+
if raw:
44+
val = json.loads(raw)
45+
else:
46+
val = None
47+
self._cache[key] = val
48+
return val

ldclient/twisted_impls.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import txrequests
1616

1717

18-
class TwistedFeatureRequester(FeatureRequester):
18+
class TwistedHttpFeatureRequester(FeatureRequester):
1919

2020
def __init__(self, api_key, config):
2121
self._api_key = api_key
@@ -64,7 +64,7 @@ def __init__(self, *args, **kwargs):
6464
super(TwistedConfig, self).__init__(*args, **kwargs)
6565
self.stream_processor_class = TwistedStreamProcessor
6666
self.consumer_class = TwistedEventConsumer
67-
self.feature_requester_class = TwistedFeatureRequester
67+
self.feature_requester_class = TwistedHttpFeatureRequester
6868

6969

7070
class TwistedStreamProcessor(StreamProcessor):
@@ -108,10 +108,8 @@ def __init__(self, queue, api_key, config):
108108

109109
self._looping_call = None
110110
""" :type: LoopingCall"""
111-
self._flushed = None
112111

113112
def start(self):
114-
self._flushed = defer.Deferred()
115113
self._looping_call = task.LoopingCall(self._consume)
116114
self._looping_call.start(5)
117115

ldclient/twisted_redis.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import json
2+
from ldclient.interfaces import StreamProcessor
3+
from twisted.internet import task, defer, protocol, reactor
4+
from txredis.client import RedisClient
5+
6+
7+
# noinspection PyUnusedLocal
8+
def create_redis_ldd_processor(api_key, config, store, **kwargs):
9+
return TwistedRedisLDDStreamProcessor(store, **kwargs)
10+
11+
12+
class TwistedRedisLDDStreamProcessor(StreamProcessor):
13+
def __init__(self, store, update_delay=15, redis_host='localhost',
14+
redis_port=6379,
15+
redis_prefix='launchdarkly'):
16+
self._running = False
17+
18+
if update_delay == 0:
19+
update_delay = .5
20+
self._update_delay = update_delay
21+
22+
self._store = store
23+
""" :type: ldclient.interfaces.FeatureStore """
24+
25+
self._features_key = "{}:features".format(redis_prefix)
26+
self._redis_host = redis_host
27+
self._redis_port = redis_port
28+
self._looping_call = None
29+
30+
def start(self):
31+
self._running = True
32+
self._looping_call = task.LoopingCall(self._refresh)
33+
self._looping_call.start(self._update_delay)
34+
35+
def stop(self):
36+
self._looping_call.stop()
37+
38+
def is_alive(self):
39+
return self._looping_call is not None and self._looping_call.running
40+
41+
def _get_connection(self):
42+
client_creator = protocol.ClientCreator(reactor, RedisClient)
43+
return client_creator.connectTCP(self._redis_host, self._redis_port)
44+
45+
@defer.inlineCallbacks
46+
def _refresh(self):
47+
redis = yield self._get_connection()
48+
""" :type: RedisClient """
49+
result = yield redis.hgetall(self._features_key)
50+
data = json.loads(result)
51+
self._store.init(data)

redis-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
redis>=2.10

setup.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
install_reqs = parse_requirements('requirements.txt', session=uuid.uuid1())
1313
test_reqs = parse_requirements('test-requirements.txt', session=uuid.uuid1())
1414
twisted_reqs = parse_requirements('twisted-requirements.txt', session=uuid.uuid1())
15+
redis_reqs = parse_requirements('redis-requirements.txt', session=uuid.uuid1())
1516

1617
# reqs is a list of requirement
1718
# e.g. ['django==1.5.1', 'mezzanine==1.4.6']
1819
reqs = [str(ir.req) for ir in install_reqs]
1920
testreqs = [str(ir.req) for ir in test_reqs]
2021
txreqs = [str(ir.req) for ir in twisted_reqs]
22+
redisreqs = [str(ir.req) for ir in redis_reqs]
2123

2224

2325
class PyTest(Command):
@@ -47,7 +49,8 @@ def run(self):
4749
'Programming Language :: Python :: 2 :: Only',
4850
],
4951
extras_require={
50-
"twisted": txreqs
52+
"twisted": txreqs,
53+
"redis": redisreqs
5154
},
5255
tests_require=testreqs,
5356
cmdclass = {'test': PyTest},

testing/test_integration_twisted.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ def test_sse_reconnect(server, stream):
6363
yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim"))
6464

6565

66+
@pytest.inlineCallbacks
67+
def test_toggle_redis_background(server):
68+
server.add_feature("foo", feature("foo", "jim")['foo'])
69+
client = LDClient("apikey", TwistedConfig(base_uri=server.url, ))
70+
yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim"))
71+
72+
6673
def feature(key, val):
6774
return {
6875
key: {"name": "Feature {}".format(key), "key": key, "kind": "flag", "salt": "Zm9v", "on": val,

twisted-requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
txrequests>=0.9
2-
pyOpenSSL>=0.14
2+
pyOpenSSL>=0.14
3+
txredis>=2.3

0 commit comments

Comments
 (0)