Skip to content

Commit 8ae6ba9

Browse files
committed
Fix distribution of masters.
1 parent 6238f65 commit 8ae6ba9

File tree

3 files changed

+82
-24
lines changed

3 files changed

+82
-24
lines changed

ruskit/distribute.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ def __init__(self, node, tag, host_index, master=None):
1414
self.master = master
1515
self.slaves = []
1616

17+
@classmethod
18+
def divide_by_host(cls, nodes, host_num):
19+
nodes_per_host = [[] for _ in xrange(host_num)]
20+
for n in nodes:
21+
nodes_per_host[n.host_index].append(n)
22+
return nodes_per_host
23+
1724
def __getattr__(self, attr):
1825
return getattr(self.node, attr)
1926

ruskit/failover.py

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
from copy import copy
12
import logging
23
from itertools import izip_longest
34

45
import redis
56

67
from .cluster import Cluster, ClusterNode
7-
from .distribute import gen_distribution
8+
from .distribute import gen_distribution, NodeWrapper
89

910

1011
logger = logging.getLogger(__name__)
@@ -16,13 +17,43 @@ def __init__(self, cluster, new_nodes):
1617
self.cluster = cluster
1718
self.new_nodes = new_nodes
1819
self.result = None
20+
self.dis = gen_distribution(self.cluster.nodes, new_nodes)
1921

2022
def get_distribution(self):
21-
return gen_distribution(self.cluster.nodes, self.new_nodes)
23+
hosts = map(copy, self.dis['hosts'])
24+
masters = self.dis['masters']
25+
slaves = self.dis['slaves']
26+
result = self.peek_result()
27+
plan = result['plan']
28+
deleted_masters = [p['master'] for p in plan]
29+
new_slaves = []
30+
new_masters = []
31+
for p in plan:
32+
m = copy(p['master'])
33+
s = copy(p['slave'])
34+
# After failover, their roles will change
35+
m.slaves = []
36+
m.master = s
37+
s.slaves.append(m)
38+
s.master = None
39+
new_masters.append(s)
40+
new_slaves.append(m)
41+
new_slaves = NodeWrapper.divide_by_host(new_slaves, len(hosts))
42+
new_masters = NodeWrapper.divide_by_host(new_masters, len(hosts))
43+
44+
masters = [list(set(m) - set(deleted_masters)) for m in masters]
45+
masters = [o + n for o, n in zip(masters, new_masters)]
46+
slaves = [o + n for o, n in zip(slaves, new_slaves)]
47+
return {
48+
'hosts': hosts,
49+
'masters': masters,
50+
'slaves': slaves,
51+
'frees': NodeWrapper.divide_by_host([], len(hosts)),
52+
}
2253

2354
def peek_result(self):
2455
if self.result:
25-
return result
56+
return self.result
2657
return self.gen_plan(self.new_nodes)
2758

2859
def move_masters_to_new_hosts(self, fast_mode=False):
@@ -64,24 +95,35 @@ def promote_new_masters(self, plan):
6495

6596
def gen_plan(self, new_nodes):
6697
plan = []
67-
dis = gen_distribution(self.cluster.nodes, new_nodes)
68-
masters = dis['masters']
69-
frees = filter(None, dis['frees'])
70-
hosts_len = len(dis['hosts'])
98+
masters = map(copy, self.dis['masters'])
99+
frees = filter(None, map(copy, self.dis['frees']))
100+
hosts_len = len(self.dis['hosts'])
71101
masters_sum = len(sum(masters, []))
72102
aver = (masters_sum + hosts_len - 1) / hosts_len
73-
failover_masters = sum([h[aver:] for h in masters], [])
74-
# merge [[a,b], [c], [d,e,f]] to [a,c,e,b,e,f] for example
103+
new_hosts_num = len(set(n.host_index for n in sum(frees, [])))
104+
num_in_old_hosts = masters_sum / \
105+
hosts_len * (hosts_len - new_hosts_num) + masters_sum % hosts_len
106+
moved_masters = masters_sum - num_in_old_hosts
107+
# merge [[a,b], [c], [d,e,f]] to [a,c,d,b,e,f] for example
75108
frees = filter(None, sum(map(list, izip_longest(*frees)), []))
76-
while len(failover_masters) > 0 and len(frees) > 0:
77-
m = failover_masters.pop()
78-
f = frees.pop()
109+
while moved_masters > 0 and len(frees) > 0:
110+
m = self.select_master(masters)
111+
f = frees.pop(0)
112+
f.master = m
113+
m.slaves.append(f)
79114
plan.append({
80115
'slave': f,
81116
'master': m,
82117
})
118+
moved_masters -= 1
83119
self.result = {
84120
'plan': plan,
85121
'frees': frees,
86122
}
87123
return self.result
124+
125+
def select_master(self, masters):
126+
nums = map(len, masters)
127+
m = max(nums)
128+
i = nums.index(m)
129+
return masters[i].pop()

tests/test_move_nodes.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,18 @@ def __init__(self):
1515
def dummy_gen_distribution_for_move_masters(nodes, new_nodes):
1616
hosts = ['host1', 'host2', 'host3', 'host4']
1717
masters = [
18-
['m1', 'm2', 'm3'],
19-
['m4', 'm5', 'm6', 'm10', 'm11'],
20-
['m7', 'm8', 'm9'],
18+
[NodeWrapper(i, 'm{}'.format(i), 0) for i in range(1, 4)],
19+
[NodeWrapper(i, 'm{}'.format(i), 1) for i in range(4, 9)],
20+
[NodeWrapper(i, 'm{}'.format(i), 2) for i in range(9, 11)],
2121
[],
2222
]
2323
slaves = [
24-
['s1'],
25-
['s2'],
26-
['s3'],
24+
[NodeWrapper(None, 's1', 1)],
25+
[NodeWrapper(None, 's2', 2)],
26+
[NodeWrapper(None, 's3', 3)],
27+
[],
2728
]
28-
frees = [[], [], [], ['f1', 'f2']]
29+
frees = [[], [], [], [NodeWrapper(i, 'f{}'.format(i), 3) for i in range(1, 3)]]
2930
return {
3031
'hosts': hosts,
3132
'masters': masters,
@@ -92,12 +93,20 @@ def test_move_master(self):
9293
plan = result['plan']
9394
p1 = plan[0]
9495
p2 = plan[1]
95-
if p1['master'] != 'm11':
96+
if p1['master'].tag != 'm8':
9697
p1, p2 = p2, p1
97-
self.assertEqual(p1['master'], 'm11')
98-
self.assertEqual(p1['slave'], 'f2')
99-
self.assertEqual(p2['master'], 'm10')
100-
self.assertEqual(p2['slave'], 'f1')
98+
slaves = ('f1', 'f2')
99+
self.assertEqual(p1['master'].tag, 'm8')
100+
self.assertTrue(p1['slave'].tag in slaves)
101+
self.assertEqual(p2['master'].tag, 'm7')
102+
self.assertTrue(p2['slave'].tag in slaves)
103+
104+
dis = manager.get_distribution()
105+
masters = dis['masters']
106+
masters_num = list(set(map(len, masters)))
107+
self.assertTrue(len(masters_num) <= 2)
108+
if len(masters_num) == 2:
109+
self.assertTrue(abs(masters_num[0] - masters_num[1]) <= 1)
101110

102111
@patch('ruskit.distribute.gen_distribution',
103112
MoveSlavesMockData.dummy_gen_distribution)

0 commit comments

Comments
 (0)