Skip to content

Commit

Permalink
Implement watch/unwatch elected_as_leader for redis driver
Browse files Browse the repository at this point in the history
Change-Id: Idd5143801cfe0e095f31c04e9268887605268b62
  • Loading branch information
Joshua Harlow committed Jun 1, 2015
1 parent ba5740a commit ec519f4
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions tooz/drivers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ def __init__(self, member_id, parsed_url, options):
namespace = options.get('namespace', self.DEFAULT_NAMESPACE)
self._namespace = self._to_binary(namespace)
self._group_prefix = self._namespace + b"_group"
self._leader_prefix = self._namespace + b"_leader"
self._beat_prefix = self._namespace + b"_beats"
self._groups = self._namespace + b"_groups"
self._client = None
Expand Down Expand Up @@ -385,6 +384,10 @@ def _encode_member_id(self, member_id):
def _decode_member_id(self, member_id):
return self._to_binary(member_id)

def _encode_group_leader(self, group_id):
group_id = self._to_binary(group_id)
return b"leader_of_" + group_id

def _encode_group_id(self, group_id, apply_namespace=True):
group_id = self._to_binary(group_id)
if not apply_namespace:
Expand Down Expand Up @@ -649,13 +652,27 @@ def watch_leave_group(self, group_id, callback):
def unwatch_leave_group(self, group_id, callback):
return super(RedisDriver, self).unwatch_leave_group(group_id, callback)

@staticmethod
def watch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
def watch_elected_as_leader(self, group_id, callback):
return super(RedisDriver, self).watch_elected_as_leader(
group_id, callback)

def unwatch_elected_as_leader(self, group_id, callback):
return super(RedisDriver, self).unwatch_elected_as_leader(
group_id, callback)

def _get_leader_lock(self, group_id):
name = self._encode_group_leader(group_id)
return self.get_lock(name)

@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
def _run_leadership(self, watch):
for group_id, hooks in six.iteritems(self._hooks_elected_leader):
if watch.expired():
return
leader_lock = self._get_leader_lock(group_id)
if leader_lock.acquire(blocking=False):
# We got the lock
hooks.run(coordination.LeaderElected(group_id,
self._member_id))

def run_watchers(self, timeout=None):
w = timeutils.StopWatch(duration=timeout)
Expand Down Expand Up @@ -693,6 +710,7 @@ def run_watchers(self, timeout=None):
coordination.MemberLeftGroup(group_id,
member_id)))
self._group_members[group_id] = group_members
self._run_leadership(w)
return result


Expand Down

0 comments on commit ec519f4

Please sign in to comment.