Skip to content

Commit ce95a3a

Browse files
committed
REFACTOR: methods in MemcachedConnectio invoked with updateReplConnection.
1 parent 8b9d693 commit ce95a3a

File tree

4 files changed

+133
-64
lines changed

4 files changed

+133
-64
lines changed

src/main/java/net/spy/memcached/ArcusReplNodeAddress.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020

2121
import java.net.InetSocketAddress;
2222
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.Collections;
2325
import java.util.HashMap;
2426
import java.util.HashSet;
2527
import java.util.List;
2628
import java.util.Map;
2729
import java.util.Set;
30+
import java.util.stream.Collectors;
2831

2932
import net.spy.memcached.compat.log.Logger;
3033
import net.spy.memcached.compat.log.LoggerFactory;
@@ -168,6 +171,50 @@ public static boolean validateGroup(Map.Entry<String, List<ArcusReplNodeAddress>
168171
return true;
169172
}
170173

174+
public static Set<String> findChangedGroups(List<InetSocketAddress> update,
175+
Collection<MemcachedNode> olds) {
176+
Set<String> changedGroupSet = new HashSet<>();
177+
Map<String, InetSocketAddress> addrMap = update.stream()
178+
.collect(Collectors.toMap(InetSocketAddress::toString, addr -> addr));
179+
180+
for (MemcachedNode node : olds) {
181+
if (addrMap.remove(node.getSocketAddress().toString()) == null) {
182+
changedGroupSet.add(node.getReplicaGroup().getGroupName());
183+
}
184+
}
185+
186+
addrMap.values().stream()
187+
.map(addr -> ((ArcusReplNodeAddress) addr).getGroupName())
188+
.forEach(changedGroupSet::add);
189+
190+
return changedGroupSet;
191+
}
192+
193+
public static List<InetSocketAddress> findAddrsOfChangedGroups(List<InetSocketAddress> addrs,
194+
Set<String> changedGroups) {
195+
List<InetSocketAddress> changedGroupAddrs = new ArrayList<>();
196+
for (InetSocketAddress addr : addrs) {
197+
if (changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName())) {
198+
changedGroupAddrs.add(addr);
199+
}
200+
}
201+
return changedGroupAddrs;
202+
}
203+
204+
public static Set<ArcusReplNodeAddress> getAddrsFromNodes(List<MemcachedNode> nodes) {
205+
return nodes.stream()
206+
.map(node -> (ArcusReplNodeAddress) node.getSocketAddress())
207+
.collect(Collectors.toSet());
208+
}
209+
210+
public static Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
211+
List<ArcusReplNodeAddress> groupAddrs) {
212+
if (groupAddrs.size() <= 1) {
213+
return Collections.emptySet();
214+
}
215+
return new HashSet<>(groupAddrs.subList(1, groupAddrs.size()));
216+
}
217+
171218
public boolean isSameAddress(ArcusReplNodeAddress addr) {
172219
return this.getIPPort().equals(addr.getIPPort());
173220
}

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 7 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.nio.channels.SocketChannel;
3333
import java.util.ArrayList;
3434
import java.util.Collection;
35-
import java.util.Collections;
3635
import java.util.HashMap;
3736
import java.util.HashSet;
3837
import java.util.Iterator;
@@ -367,38 +366,6 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
367366
}
368367

369368
/* ENABLE_REPLICATION if */
370-
private Set<String> findChangedGroups(List<InetSocketAddress> addrs,
371-
Collection<MemcachedNode> nodes) {
372-
Map<String, InetSocketAddress> addrMap = new HashMap<>();
373-
for (InetSocketAddress each : addrs) {
374-
addrMap.put(each.toString(), each);
375-
}
376-
377-
Set<String> changedGroupSet = new HashSet<>();
378-
for (MemcachedNode node : nodes) {
379-
String nodeAddr = ((InetSocketAddress) node.getSocketAddress()).toString();
380-
if (addrMap.remove(nodeAddr) == null) { // removed node
381-
changedGroupSet.add(node.getReplicaGroup().getGroupName());
382-
}
383-
}
384-
for (String addr : addrMap.keySet()) { // newly added node
385-
ArcusReplNodeAddress a = (ArcusReplNodeAddress) addrMap.get(addr);
386-
changedGroupSet.add(a.getGroupName());
387-
}
388-
return changedGroupSet;
389-
}
390-
391-
private List<InetSocketAddress> findAddrsOfChangedGroups(List<InetSocketAddress> addrs,
392-
Set<String> changedGroups) {
393-
List<InetSocketAddress> changedGroupAddrs = new ArrayList<>();
394-
for (InetSocketAddress addr : addrs) {
395-
if (changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName())) {
396-
changedGroupAddrs.add(addr);
397-
}
398-
}
399-
return changedGroupAddrs;
400-
}
401-
402369
private void updateReplConnections(List<InetSocketAddress> addrs) throws IOException {
403370
List<MemcachedNode> attachNodes = new ArrayList<>();
404371
List<MemcachedNode> removeNodes = new ArrayList<>();
@@ -416,10 +383,11 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
416383
* we find out the changed groups with the comparison of previous and current znode list,
417384
* and update the state of groups based on them.
418385
*/
419-
Set<String> changedGroups = findChangedGroups(addrs, locator.getAll());
386+
Set<String> changedGroups = ArcusReplNodeAddress.findChangedGroups(addrs, locator.getAll());
420387

421388
Map<String, List<ArcusReplNodeAddress>> newAllGroups =
422-
ArcusReplNodeAddress.makeGroupAddrsList(findAddrsOfChangedGroups(addrs, changedGroups));
389+
ArcusReplNodeAddress.makeGroupAddrsList(
390+
ArcusReplNodeAddress.findAddrsOfChangedGroups(addrs, changedGroups));
423391

424392
// remove invalidated groups in changedGroups
425393
for (Map.Entry<String, List<ArcusReplNodeAddress>> entry : newAllGroups.entrySet()) {
@@ -467,8 +435,10 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
467435
assert oldMasterAddr != null : "invalid old rgroup";
468436
assert newMasterAddr != null : "invalid new rgroup";
469437

470-
Set<ArcusReplNodeAddress> oldSlaveAddrs = getAddrsFromNodes(oldSlaveNodes);
471-
Set<ArcusReplNodeAddress> newSlaveAddrs = getSlaveAddrsFromGroupAddrs(newGroupAddrs);
438+
Set<ArcusReplNodeAddress> oldSlaveAddrs
439+
= ArcusReplNodeAddress.getAddrsFromNodes(oldSlaveNodes);
440+
Set<ArcusReplNodeAddress> newSlaveAddrs
441+
= ArcusReplNodeAddress.getSlaveAddrsFromGroupAddrs(newGroupAddrs);
472442

473443
if (oldMasterAddr.isSameAddress(newMasterAddr)) {
474444
// add newly added slave node
@@ -560,30 +530,6 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
560530
// Remove the unavailable nodes.
561531
handleNodesToRemove(removeNodes);
562532
}
563-
564-
private Set<ArcusReplNodeAddress> getAddrsFromNodes(List<MemcachedNode> nodes) {
565-
Set<ArcusReplNodeAddress> addrs = Collections.emptySet();
566-
if (!nodes.isEmpty()) {
567-
addrs = new HashSet<>((int) (nodes.size() / .75f) + 1);
568-
for (MemcachedNode node : nodes) {
569-
addrs.add((ArcusReplNodeAddress) node.getSocketAddress());
570-
}
571-
}
572-
return addrs;
573-
}
574-
575-
private Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
576-
List<ArcusReplNodeAddress> groupAddrs) {
577-
Set<ArcusReplNodeAddress> slaveAddrs = Collections.emptySet();
578-
int groupSize = groupAddrs.size();
579-
if (groupSize > 1) {
580-
slaveAddrs = new HashSet<>((int) ((groupSize - 1) / .75f) + 1);
581-
for (int i = 1; i < groupSize; i++) {
582-
slaveAddrs.add(groupAddrs.get(i));
583-
}
584-
}
585-
return slaveAddrs;
586-
}
587533
/* ENABLE_REPLICATION end */
588534

589535
/* ENABLE_REPLICATION if */
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package net.spy.memcached;
2+
3+
4+
import java.net.InetSocketAddress;
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.Set;
8+
import java.util.stream.Collectors;
9+
10+
import org.junit.jupiter.api.Assertions;
11+
import org.junit.jupiter.api.Test;
12+
13+
14+
class ArcusReplNodeAddressTest {
15+
16+
@Test
17+
void findChangedGroupsTest() {
18+
List<ArcusReplNodeAddress> g0 = createReplList("g0", "192.168.0.1");
19+
List<ArcusReplNodeAddress> g1 = createReplList("g1", "192.168.0.2");
20+
List<MemcachedNode> old = new ArrayList<>();
21+
setReplGroup(g0, old);
22+
setReplGroup(g1, old);
23+
24+
List<InetSocketAddress> update = new ArrayList<>(g0);
25+
26+
Set<String> changedGroups = ArcusReplNodeAddress.findChangedGroups(update, old);
27+
Assertions.assertEquals(1, changedGroups.size());
28+
Assertions.assertTrue(changedGroups.contains("g1"));
29+
}
30+
31+
@Test
32+
void findAddrsOfChangedGroupsTest() {
33+
List<ArcusReplNodeAddress> g0 = createReplList("g0", "192.168.0.1");
34+
List<ArcusReplNodeAddress> g1 = createReplList("g1", "192.168.0.2");
35+
List<MemcachedNode> old = new ArrayList<>();
36+
setReplGroup(g0, old);
37+
setReplGroup(g1, old);
38+
39+
List<InetSocketAddress> update = new ArrayList<>();
40+
update.addAll(g0.subList(0, 2));
41+
update.addAll(g1.subList(0, 2));
42+
43+
Set<String> changedGroups = ArcusReplNodeAddress.findChangedGroups(update, old);
44+
List<InetSocketAddress> result
45+
= ArcusReplNodeAddress.findAddrsOfChangedGroups(update, changedGroups);
46+
47+
Assertions.assertEquals(4, result.size());
48+
Assertions.assertTrue(result.contains(g0.get(0)));
49+
Assertions.assertTrue(result.contains(g0.get(1)));
50+
Assertions.assertTrue(result.contains(g1.get(0)));
51+
Assertions.assertTrue(result.contains(g1.get(1)));
52+
}
53+
54+
private void setReplGroup(List<ArcusReplNodeAddress> group, List<MemcachedNode> old) {
55+
List<MockMemcachedNode> collect = group.stream()
56+
.map(MockMemcachedNode::new)
57+
.collect(Collectors.toList());
58+
MemcachedReplicaGroupImpl impl = null;
59+
for (MockMemcachedNode node : collect) {
60+
if (impl == null) {
61+
impl = new MemcachedReplicaGroupImpl(node);
62+
} else {
63+
node.setReplicaGroup(impl);
64+
}
65+
}
66+
old.addAll(collect);
67+
}
68+
69+
private List<ArcusReplNodeAddress> createReplList(String group, String ip) {
70+
List<ArcusReplNodeAddress> replList = new ArrayList<>();
71+
replList.add(ArcusReplNodeAddress.create(group, true, ip + ":" + 11211));
72+
replList.add(ArcusReplNodeAddress.create(group, false, ip + ":" + (11211 + 1)));
73+
replList.add(ArcusReplNodeAddress.create(group, false, ip + ":" + (11211 + 2)));
74+
return replList;
75+
}
76+
}

src/test/java/net/spy/memcached/MockMemcachedNode.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
public class MockMemcachedNode implements MemcachedNode {
3131
private final InetSocketAddress socketAddress;
32+
private MemcachedReplicaGroup memcachedReplicaGroup;
3233

3334
public SocketAddress getSocketAddress() {
3435
return socketAddress;
@@ -260,13 +261,12 @@ public String getOpQueueStatus() {
260261

261262
@Override
262263
public void setReplicaGroup(MemcachedReplicaGroup g) {
263-
// noop
264+
this.memcachedReplicaGroup = g;
264265
}
265266

266267
@Override
267268
public MemcachedReplicaGroup getReplicaGroup() {
268-
// noop
269-
return null;
269+
return memcachedReplicaGroup;
270270
}
271271

272272
@Override

0 commit comments

Comments
 (0)