Skip to content

Commit 40bf635

Browse files
committed
REFACTOR: methods in MemcachedConnectio invoked with updateReplConnection.
1 parent 8b9d693 commit 40bf635

File tree

4 files changed

+134
-64
lines changed

4 files changed

+134
-64
lines changed

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 7 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.nio.channels.SocketChannel;
3333
import java.util.ArrayList;
3434
import java.util.Collection;
35-
import java.util.Collections;
3635
import java.util.HashMap;
3736
import java.util.HashSet;
3837
import java.util.Iterator;
@@ -367,38 +366,6 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
367366
}
368367

369368
/* ENABLE_REPLICATION if */
370-
private Set<String> findChangedGroups(List<InetSocketAddress> addrs,
371-
Collection<MemcachedNode> nodes) {
372-
Map<String, InetSocketAddress> addrMap = new HashMap<>();
373-
for (InetSocketAddress each : addrs) {
374-
addrMap.put(each.toString(), each);
375-
}
376-
377-
Set<String> changedGroupSet = new HashSet<>();
378-
for (MemcachedNode node : nodes) {
379-
String nodeAddr = ((InetSocketAddress) node.getSocketAddress()).toString();
380-
if (addrMap.remove(nodeAddr) == null) { // removed node
381-
changedGroupSet.add(node.getReplicaGroup().getGroupName());
382-
}
383-
}
384-
for (String addr : addrMap.keySet()) { // newly added node
385-
ArcusReplNodeAddress a = (ArcusReplNodeAddress) addrMap.get(addr);
386-
changedGroupSet.add(a.getGroupName());
387-
}
388-
return changedGroupSet;
389-
}
390-
391-
private List<InetSocketAddress> findAddrsOfChangedGroups(List<InetSocketAddress> addrs,
392-
Set<String> changedGroups) {
393-
List<InetSocketAddress> changedGroupAddrs = new ArrayList<>();
394-
for (InetSocketAddress addr : addrs) {
395-
if (changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName())) {
396-
changedGroupAddrs.add(addr);
397-
}
398-
}
399-
return changedGroupAddrs;
400-
}
401-
402369
private void updateReplConnections(List<InetSocketAddress> addrs) throws IOException {
403370
List<MemcachedNode> attachNodes = new ArrayList<>();
404371
List<MemcachedNode> removeNodes = new ArrayList<>();
@@ -416,10 +383,11 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
416383
* we find out the changed groups with the comparison of previous and current znode list,
417384
* and update the state of groups based on them.
418385
*/
419-
Set<String> changedGroups = findChangedGroups(addrs, locator.getAll());
386+
Set<String> changedGroups = MemcachedReplicaGroup.findChangedGroups(addrs, locator.getAll());
420387

421388
Map<String, List<ArcusReplNodeAddress>> newAllGroups =
422-
ArcusReplNodeAddress.makeGroupAddrsList(findAddrsOfChangedGroups(addrs, changedGroups));
389+
ArcusReplNodeAddress.makeGroupAddrsList(
390+
MemcachedReplicaGroup.findAddrsOfChangedGroups(addrs, changedGroups));
423391

424392
// remove invalidated groups in changedGroups
425393
for (Map.Entry<String, List<ArcusReplNodeAddress>> entry : newAllGroups.entrySet()) {
@@ -467,8 +435,10 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
467435
assert oldMasterAddr != null : "invalid old rgroup";
468436
assert newMasterAddr != null : "invalid new rgroup";
469437

470-
Set<ArcusReplNodeAddress> oldSlaveAddrs = getAddrsFromNodes(oldSlaveNodes);
471-
Set<ArcusReplNodeAddress> newSlaveAddrs = getSlaveAddrsFromGroupAddrs(newGroupAddrs);
438+
Set<ArcusReplNodeAddress> oldSlaveAddrs
439+
= MemcachedReplicaGroup.getAddrsFromNodes(oldSlaveNodes);
440+
Set<ArcusReplNodeAddress> newSlaveAddrs
441+
= MemcachedReplicaGroup.getSlaveAddrsFromGroupAddrs(newGroupAddrs);
472442

473443
if (oldMasterAddr.isSameAddress(newMasterAddr)) {
474444
// add newly added slave node
@@ -560,30 +530,6 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
560530
// Remove the unavailable nodes.
561531
handleNodesToRemove(removeNodes);
562532
}
563-
564-
private Set<ArcusReplNodeAddress> getAddrsFromNodes(List<MemcachedNode> nodes) {
565-
Set<ArcusReplNodeAddress> addrs = Collections.emptySet();
566-
if (!nodes.isEmpty()) {
567-
addrs = new HashSet<>((int) (nodes.size() / .75f) + 1);
568-
for (MemcachedNode node : nodes) {
569-
addrs.add((ArcusReplNodeAddress) node.getSocketAddress());
570-
}
571-
}
572-
return addrs;
573-
}
574-
575-
private Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
576-
List<ArcusReplNodeAddress> groupAddrs) {
577-
Set<ArcusReplNodeAddress> slaveAddrs = Collections.emptySet();
578-
int groupSize = groupAddrs.size();
579-
if (groupSize > 1) {
580-
slaveAddrs = new HashSet<>((int) ((groupSize - 1) / .75f) + 1);
581-
for (int i = 1; i < groupSize; i++) {
582-
slaveAddrs.add(groupAddrs.get(i));
583-
}
584-
}
585-
return slaveAddrs;
586-
}
587533
/* ENABLE_REPLICATION end */
588534

589535
/* ENABLE_REPLICATION if */

src/main/java/net/spy/memcached/MemcachedReplicaGroup.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,15 @@
1818
/* ENABLE_REPLICATION if */
1919
package net.spy.memcached;
2020

21+
import java.net.InetSocketAddress;
2122
import java.util.ArrayList;
23+
import java.util.Collection;
2224
import java.util.Collections;
25+
import java.util.HashSet;
2326
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.stream.Collectors;
2430

2531
import net.spy.memcached.compat.SpyObject;
2632

@@ -190,5 +196,47 @@ private MemcachedNode getNextActiveSlaveNodeNoRotate() {
190196
public static String getGroupNameFromNode(final MemcachedNode node) {
191197
return ((ArcusReplNodeAddress) node.getSocketAddress()).getGroupName();
192198
}
199+
200+
static Set<String> findChangedGroups(List<InetSocketAddress> update,
201+
Collection<MemcachedNode> olds) {
202+
Set<String> changedGroupSet = new HashSet<>();
203+
Map<String, InetSocketAddress> addrMap = update.stream()
204+
.collect(Collectors.toMap(InetSocketAddress::toString, addr -> addr));
205+
206+
for (MemcachedNode node : olds) {
207+
if (addrMap.remove(node.getSocketAddress().toString()) == null) {
208+
changedGroupSet.add(node.getReplicaGroup().getGroupName());
209+
}
210+
}
211+
212+
addrMap.values().stream()
213+
.map(addr -> ((ArcusReplNodeAddress) addr).getGroupName())
214+
.forEach(changedGroupSet::add);
215+
216+
return changedGroupSet;
217+
}
218+
219+
static List<InetSocketAddress> findAddrsOfChangedGroups(List<InetSocketAddress> addrs,
220+
Set<String> changedGroups) {
221+
List<InetSocketAddress> changedGroupAddrs = new ArrayList<>();
222+
addrs.stream()
223+
.filter(addr -> changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName()))
224+
.forEach(changedGroupAddrs::add);
225+
return changedGroupAddrs;
226+
}
227+
228+
static Set<ArcusReplNodeAddress> getAddrsFromNodes(List<MemcachedNode> nodes) {
229+
return nodes.stream()
230+
.map(node -> (ArcusReplNodeAddress) node.getSocketAddress())
231+
.collect(Collectors.toSet());
232+
}
233+
234+
static Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
235+
List<ArcusReplNodeAddress> groupAddrs) {
236+
if (groupAddrs.size() <= 1) {
237+
return Collections.emptySet();
238+
}
239+
return new HashSet<>(groupAddrs.subList(1, groupAddrs.size()));
240+
}
193241
}
194242
/* ENABLE_REPLICATION end */
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package net.spy.memcached;
2+
3+
4+
import java.net.InetSocketAddress;
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.Set;
8+
import java.util.stream.Collectors;
9+
10+
import org.junit.jupiter.api.Assertions;
11+
import org.junit.jupiter.api.Test;
12+
13+
14+
class ArcusReplNodeAddressTest {
15+
16+
@Test
17+
void findChangedGroupsTest() {
18+
List<ArcusReplNodeAddress> g0 = createReplList("g0", "192.168.0.1");
19+
List<ArcusReplNodeAddress> g1 = createReplList("g1", "192.168.0.2");
20+
List<MemcachedNode> old = new ArrayList<>();
21+
setReplGroup(g0, old);
22+
setReplGroup(g1, old);
23+
24+
List<InetSocketAddress> update = new ArrayList<>(g0);
25+
26+
Set<String> changedGroups = MemcachedReplicaGroup.findChangedGroups(update, old);
27+
Assertions.assertEquals(1, changedGroups.size());
28+
Assertions.assertTrue(changedGroups.contains("g1"));
29+
}
30+
31+
@Test
32+
void findAddrsOfChangedGroupsTest() {
33+
List<ArcusReplNodeAddress> g0 = createReplList("g0", "192.168.0.1");
34+
List<ArcusReplNodeAddress> g1 = createReplList("g1", "192.168.0.2");
35+
List<MemcachedNode> old = new ArrayList<>();
36+
setReplGroup(g0, old);
37+
setReplGroup(g1, old);
38+
39+
List<InetSocketAddress> update = new ArrayList<>();
40+
update.addAll(g0.subList(0, 2));
41+
update.addAll(g1.subList(0, 2));
42+
43+
Set<String> changedGroups = MemcachedReplicaGroup.findChangedGroups(update, old);
44+
List<InetSocketAddress> result
45+
= MemcachedReplicaGroup.findAddrsOfChangedGroups(update, changedGroups);
46+
47+
Assertions.assertEquals(4, result.size());
48+
Assertions.assertTrue(result.contains(g0.get(0)));
49+
Assertions.assertTrue(result.contains(g0.get(1)));
50+
Assertions.assertTrue(result.contains(g1.get(0)));
51+
Assertions.assertTrue(result.contains(g1.get(1)));
52+
}
53+
54+
private void setReplGroup(List<ArcusReplNodeAddress> group, List<MemcachedNode> old) {
55+
List<MockMemcachedNode> collect = group.stream()
56+
.map(MockMemcachedNode::new)
57+
.collect(Collectors.toList());
58+
MemcachedReplicaGroupImpl impl = null;
59+
for (MockMemcachedNode node : collect) {
60+
if (impl == null) {
61+
impl = new MemcachedReplicaGroupImpl(node);
62+
} else {
63+
node.setReplicaGroup(impl);
64+
}
65+
}
66+
old.addAll(collect);
67+
}
68+
69+
private List<ArcusReplNodeAddress> createReplList(String group, String ip) {
70+
List<ArcusReplNodeAddress> replList = new ArrayList<>();
71+
replList.add(ArcusReplNodeAddress.create(group, true, ip + ":" + 11211));
72+
replList.add(ArcusReplNodeAddress.create(group, false, ip + ":" + (11211 + 1)));
73+
replList.add(ArcusReplNodeAddress.create(group, false, ip + ":" + (11211 + 2)));
74+
return replList;
75+
}
76+
}

src/test/java/net/spy/memcached/MockMemcachedNode.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
public class MockMemcachedNode implements MemcachedNode {
3131
private final InetSocketAddress socketAddress;
32+
private MemcachedReplicaGroup memcachedReplicaGroup;
3233

3334
public SocketAddress getSocketAddress() {
3435
return socketAddress;
@@ -260,13 +261,12 @@ public String getOpQueueStatus() {
260261

261262
@Override
262263
public void setReplicaGroup(MemcachedReplicaGroup g) {
263-
// noop
264+
this.memcachedReplicaGroup = g;
264265
}
265266

266267
@Override
267268
public MemcachedReplicaGroup getReplicaGroup() {
268-
// noop
269-
return null;
269+
return memcachedReplicaGroup;
270270
}
271271

272272
@Override

0 commit comments

Comments
 (0)