Skip to content

Commit 6f56026

Browse files
committed
more work
1 parent 7626c38 commit 6f56026

File tree

7 files changed

+285
-61
lines changed

7 files changed

+285
-61
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from setuptools import setup, find_packages
22
setup(name='txgossip',
3-
version='0.0',
3+
version='0.1',
44
description='simple cluster communication protocol',
55
author='Johan Rydberg',
66
author_email='[email protected]',

txgossip/gossip.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ def _gossip(self):
144144
self._gossip_with_peer(random.choice(dead_peers))
145145

146146
for state in self._states.values():
147-
state.check_suspected()
147+
if state.name != self.name:
148+
state.check_suspected()
148149

149150
def _gossip_with_peer(self, peer):
150151
"""Send a gossip message to C{peer}."""
@@ -212,9 +213,12 @@ def get(self):
212213
def __getitem__(self, key):
213214
return self.state[key]
214215

215-
def __setitem__(self, key, value):
216+
def set(self, key, value):
216217
self.state[key] = value
217218

219+
def __setitem__(self, key, value):
220+
self.set(key, value)
221+
218222
def __contains__(self, key):
219223
return key in self.state
220224

txgossip/recipies.py

Lines changed: 53 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
# Copyright (C) 2011 Johan Rydberg
2-
# Copyright (C) 2010 Bob Potter
32
#
43
# Permission is hereby granted, free of charge, to any person
54
# obtaining a copy of this software and associated documentation files
@@ -46,31 +45,31 @@ class LeaderElectionMixin:
4645
VOTE_KEY = 'leader:vote'
4746
LEADER_KEY = 'leader:leader'
4847

49-
def __init__(self, clock):
48+
def __init__(self, clock, vote_delay=5):
5049
self.clock = clock
5150
self._election_timeout = None
5251
self._gossiper = None
52+
self.is_leader = None
53+
self.vote_delay = vote_delay
5354

5455
def make_connection(self, gossiper):
5556
self._gossiper = gossiper
57+
self.start_election()
5658

5759
def _check_consensus(self, key):
5860
"""Check if all peers have the same value for C{key}.
5961
6062
Return the value if they all have the same value, otherwise
6163
return C{None}.
6264
"""
63-
try:
64-
correct = self._gossiper[key]
65-
for peer in self._gossiper.live_peers:
66-
value = peer[key]
67-
if value != correct:
68-
return None
69-
except KeyError:
70-
# One peer did not even have the key.
71-
return None
72-
else:
73-
return correct
65+
correct = self._gossiper.get(key)
66+
for peer in self._gossiper.live_peers:
67+
if not key in peer.keys():
68+
return None
69+
value = peer.get(key)
70+
if value != correct:
71+
return None
72+
return correct
7473

7574
def value_changed(self, peer, key, value):
7675
"""Inform about a change of a key-value pair.
@@ -85,7 +84,7 @@ def value_changed(self, peer, key, value):
8584
if key == self.VOTE_KEY:
8685
leader = self._check_consensus(self.VOTE_KEY)
8786
if leader:
88-
self._gossiper[self.LEADER_KEY] = leader
87+
self._gossiper.set(self.LEADER_KEY, leader)
8988
elif key == self.LEADER_KEY:
9089
leader = self._check_consensus(self.LEADER_KEY)
9190
if leader:
@@ -102,19 +101,13 @@ def _vote(self):
102101

103102
# Check if we're one of the peers that would like to become
104103
# master.
105-
vote, currp = None, None
106-
try:
107-
curr = self._gossiper[self.PRIO_KEY]
108-
if curr is not None:
109-
vote = self._gossiper
110-
except KeyError:
111-
pass
104+
vote = None
105+
curr = self._gossiper.get(self.PRIO_KEY)
106+
if curr is not None:
107+
vote = self._gossiper
112108

113109
for peer in self._gossiper.live_peers:
114-
try:
115-
prio = peer[self.PRIO_KEY]
116-
except KeyError:
117-
continue
110+
prio = peer.get(self.PRIO_KEY)
118111
if prio is None:
119112
# This peer did not want to become leader.
120113
continue
@@ -126,15 +119,9 @@ def _vote(self):
126119
vote = peer
127120
elif prio == curr:
128121
# We need to break the tie.
129-
if hash(peer) > hash(vote):
122+
if hash(peer.name) > hash(vote.name):
130123
vote = peer
131-
132-
# See if there's a need to change our vote, or if we'll stand
133-
# by our last vote.
134-
if self.VOTE_KEY in self._gossiper:
135-
if self._gossiper[self.VOTE_KEY] == vote.name:
136-
return
137-
self._gossiper[self.VOTE_KEY] = vote.name
124+
self._gossiper.set(self.VOTE_KEY, vote.name)
138125

139126
def start_election(self):
140127
"""Start an election.
@@ -155,6 +142,7 @@ def leader_elected(self, is_leader, leader):
155142
@param is_leader: C{True} if this peer is the leader.
156143
@param leader: The address of the peer that is the leader.
157144
"""
145+
self.is_leader = is_leader
158146

159147
def peer_dead(self, peer):
160148
"""A peer is dead."""
@@ -186,26 +174,39 @@ def __init__(self, clock, storage, ignore_keys=[]):
186174
def make_connection(self, gossiper):
187175
self._gossiper = gossiper
188176

189-
def value_changed(self, peer, key, timestamp_value):
190-
"""A peer has changed its value."""
191-
if key == '__heartbeat__':
192-
# We do not care about updates to the heartbeat value.
193-
return
194-
timestamp, value = timestamp_value
177+
def persist_key_value(self, key, timestamped_value):
178+
self._storage[key] = timestamped_value
179+
if hasattr(self._storage, 'sync'):
180+
self._storage.sync()
181+
182+
def replicate_key_value(self, peer, key, timestamped_value):
183+
timestamp, value = timestamped_value
195184
if key in self._storage:
196185
current_timestamp, current_value = self._storage[key]
197186
if timestamp <= current_timestamp:
198187
return
199-
self._storage[key] = (timestamp, value)
188+
self._gossiper.set(key, timestamped_value)
189+
190+
def value_changed(self, peer, key, timestamp_value):
191+
"""A peer has changed its value."""
192+
if key == '__heartbeat__' or key in self._ignore_keys:
193+
return
194+
if peer.name == self._gossiper.name:
195+
self.persist_key_value(key, timestamp_value)
196+
else:
197+
self.replicate_key_value(peer, key, timestamp_value)
198+
199+
def set(self, key, value):
200+
self._gossiper.set(key, [self.clock.seconds(), value])
200201

201202
def __setitem__(self, key, value):
202-
self._gossiper[key] = (self.clock.seconds(), value)
203+
self.set(key, value)
203204

204205
def __getitem__(self, key):
205-
return self._storage[key][1]
206+
return self._gossiper.get(key)[1]
206207

207208
def get(self, key, default=None):
208-
if key in self._storage:
209+
if key in self.keys():
209210
return self[key]
210211
return default
211212

@@ -218,19 +219,16 @@ def keys(self, pattern=None):
218219
return [key for key in keys
219220
if fnmatch.fnmatch(key, pattern)]
220221

222+
def load_from(self, storage):
223+
for key in storage:
224+
if key not in self._ignore_keys:
225+
self._gossiper.set(key, storage[key])
226+
221227
def __contains__(self, key):
222228
return key in self.keys()
223229

224-
def timestamp_for_key(self, key):
225-
return self._storage[key][0]
226-
227-
def synchronize_keys_with_peer(self, peer):
228-
"""Synchronize keys with C{peer}.
230+
def peer_dead(self, peer):
231+
"""A peer is dead."""
229232

230-
Will iterate through all the keys that C{peer} has and see if
231-
there's some values that are newer than ours.
232-
"""
233-
for key, value in peer.items():
234-
if key in self._ignore_keys:
235-
continue
236-
self.value_changed(peer, key, value)
233+
def peer_alive(self, peer):
234+
"""A peer is alive."""

txgossip/state.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,17 @@ def __contains__(self, key):
6767
def __setitem__(self, key, value):
6868
self.update_local(key, value)
6969

70+
def set(self, key, value):
71+
self.update_local(key, value)
72+
7073
def __getitem__(self, key):
7174
return self.attrs[key][0]
7275

76+
def get(self, key, default=None):
77+
if key in self.attrs:
78+
return self.attrs[key][0]
79+
return default
80+
7381
def has_key(self, key):
7482
return key in self.attrs
7583

@@ -82,7 +90,7 @@ def items(self):
8290

8391
def set_key(self, k, v, n):
8492
self.attrs[k] = (v, n)
85-
self.participant.value_changed(self.name, str(k), v)
93+
self.participant.value_changed(self, str(k), v)
8694

8795
def beat_that_heart(self):
8896
self.heart_beat_version += 1
@@ -111,9 +119,9 @@ def check_suspected(self):
111119
def mark_alive(self):
112120
alive, self.alive = self.alive, True
113121
if not alive:
114-
self.participant.peer_alive(self.name)
122+
self.participant.peer_alive(self)
115123

116124
def mark_dead(self):
117125
if self.alive:
118126
self.alive = False
119-
self.participant.peer_dead(self.name)
127+
self.participant.peer_dead(self)

txgossip/test/__init__.py

Whitespace-only changes.

txgossip/test/test_keystore.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Copyright (C) 2011 Johan Rydberg
2+
#
3+
# Permission is hereby granted, free of charge, to any person
4+
# obtaining a copy of this software and associated documentation files
5+
# (the "Software"), to deal in the Software without restriction,
6+
# including without limitation the rights to use, copy, modify, merge,
7+
# publish, distribute, sublicense, and/or sell copies of the Software,
8+
# and to permit persons to whom the Software is furnished to do so,
9+
# subject to the following conditions:
10+
#
11+
# The above copyright notice and this permission notice shall be
12+
# included in all copies or substantial portions of the Software.
13+
#
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16+
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
18+
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19+
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20+
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
23+
from mockito import mock, when, verify
24+
25+
from twisted.trial import unittest
26+
from twisted.internet import task
27+
28+
from txgossip.recipies import KeyStoreMixin
29+
30+
31+
class KeyStoreTestCase(unittest.TestCase):
32+
"""Test cases for the key-value store mixin."""
33+
34+
def setUp(self):
35+
self.clock = task.Clock()
36+
self.storage = {}
37+
self.keystore = KeyStoreMixin(self.clock, self.storage)
38+
self.gossiper = mock()
39+
self.gossiper.name = 'self'
40+
self.keystore.make_connection(self.gossiper)
41+
42+
def test_replicate_when_remote_peer_changed_value(self):
43+
self.keystore.value_changed('a', 'k', (0, 'value'))
44+
verify(self.gossiper).set('k', (0, 'value'))
45+
self.assertNotIn('k', self.storage)
46+
47+
def test_ignore_replication_when_remote_peer_has_old_value(self):
48+
self.storage['k'] = (1, 'value')
49+
self.keystore.value_changed('a', 'k', (0, 'value'))
50+
verify(self.gossiper, times=0).set('k', (0, 'value'))
51+
52+
def test_persist_value_when_set_on_local_peer(self):
53+
self.keystore.value_changed('self', 'k', (0, 'value'))
54+
self.assertIn('k', self.storage)
55+
self.assertEquals(self.storage['k'], (0, 'value'))
56+
57+
def test_keys_returns_all_keys_in_gossiper(self):
58+
when(self.gossiper).keys().thenReturn(['a', 'b'])
59+
self.assertEquals(self.keystore.keys(), ['a', 'b'])
60+
61+
def test_keys_can_be_filtered(self):
62+
when(self.gossiper).keys().thenReturn(['ab', 'ba'])
63+
self.assertEquals(self.keystore.keys('b*'), ['ba'])
64+
65+
def test_contains_use_keys_from_gossiper(self):
66+
when(self.gossiper).keys().thenReturn(['a', 'b'])
67+
self.assertIn('a', self.keystore)
68+
69+
def test_get_results_default_value_if_value_not_present(self):
70+
when(self.gossiper).keys().thenReturn([])
71+
self.assertEquals(self.keystore.get('a', '!'), '!')
72+
73+
def test_get_returns_value_from_gossip_state(self):
74+
when(self.gossiper).keys().thenReturn(['a'])
75+
when(self.gossiper).get('a').thenReturn((0, '!'))
76+
self.assertEquals(self.keystore.get('a'), '!')

0 commit comments

Comments
 (0)