diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index 62b13997..a13721c7 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -245,7 +245,6 @@ def __init__(self, member_id, parsed_url, options): namespace = options.get('namespace', self.DEFAULT_NAMESPACE) self._namespace = self._to_binary(namespace) self._group_prefix = self._namespace + b"_group" - self._leader_prefix = self._namespace + b"_leader" self._beat_prefix = self._namespace + b"_beats" self._groups = self._namespace + b"_groups" self._client = None @@ -385,6 +384,10 @@ def _encode_member_id(self, member_id): def _decode_member_id(self, member_id): return self._to_binary(member_id) + def _encode_group_leader(self, group_id): + group_id = self._to_binary(group_id) + return b"leader_of_" + group_id + def _encode_group_id(self, group_id, apply_namespace=True): group_id = self._to_binary(group_id) if not apply_namespace: @@ -649,13 +652,27 @@ def watch_leave_group(self, group_id, callback): def unwatch_leave_group(self, group_id, callback): return super(RedisDriver, self).unwatch_leave_group(group_id, callback) - @staticmethod - def watch_elected_as_leader(group_id, callback): - raise tooz.NotImplemented + def watch_elected_as_leader(self, group_id, callback): + return super(RedisDriver, self).watch_elected_as_leader( + group_id, callback) + + def unwatch_elected_as_leader(self, group_id, callback): + return super(RedisDriver, self).unwatch_elected_as_leader( + group_id, callback) + + def _get_leader_lock(self, group_id): + name = self._encode_group_leader(group_id) + return self.get_lock(name) - @staticmethod - def unwatch_elected_as_leader(group_id, callback): - raise tooz.NotImplemented + def _run_leadership(self, watch): + for group_id, hooks in six.iteritems(self._hooks_elected_leader): + if watch.expired(): + return + leader_lock = self._get_leader_lock(group_id) + if leader_lock.acquire(blocking=False): + # We got the lock + hooks.run(coordination.LeaderElected(group_id, + self._member_id)) def run_watchers(self, timeout=None): w = timeutils.StopWatch(duration=timeout) @@ -693,6 +710,7 @@ def run_watchers(self, timeout=None): coordination.MemberLeftGroup(group_id, member_id))) self._group_members[group_id] = group_members + self._run_leadership(w) return result