Skip to content

Commit 56005f1

Browse files
committed
INTERNAL: Not to cancel operations when node is removed from ZK.
1 parent ba93538 commit 56005f1

File tree

10 files changed

+110
-0
lines changed

10 files changed

+110
-0
lines changed

src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class ArcusKetamaNodeLocator extends SpyObject implements NodeLocator {
4242

4343
private final TreeMap<Long, SortedSet<MemcachedNode>> ketamaNodes;
4444
private final Collection<MemcachedNode> allNodes;
45+
private final Collection<MemcachedNode> delayedClosingNodes = new HashSet<MemcachedNode>();
4546

4647
/* ENABLE_MIGRATION if */
4748
private TreeMap<Long, SortedSet<MemcachedNode>> ketamaAlterNodes;
@@ -226,6 +227,10 @@ public void update(Collection<MemcachedNode> toAttach,
226227
for (MemcachedNode node : toDelete) {
227228
allNodes.remove(node);
228229
removeHash(node);
230+
if (node.hasOp()) {
231+
delayedClosingNodes.add(node);
232+
continue;
233+
}
229234
try {
230235
node.closeChannel();
231236
} catch (IOException e) {
@@ -244,6 +249,14 @@ public void update(Collection<MemcachedNode> toAttach,
244249
}
245250
}
246251

252+
public Collection<MemcachedNode> getDelayedClosingNodes() {
253+
return Collections.unmodifiableCollection(delayedClosingNodes);
254+
}
255+
256+
public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
257+
delayedClosingNodes.removeAll(closedNodes);
258+
}
259+
247260
private Long getKetamaHashPoint(byte[] digest, int h) {
248261
return ((long) (digest[3 + h * 4] & 0xFF) << 24)
249262
| ((long) (digest[2 + h * 4] & 0xFF) << 16)
@@ -440,6 +453,10 @@ public void updateAlter(Collection<MemcachedNode> toAttach,
440453
for (MemcachedNode node : toDelete) {
441454
alterNodes.remove(node);
442455
removeHashOfAlter(node);
456+
if (node.hasOp()) {
457+
delayedClosingNodes.add(node);
458+
continue;
459+
}
443460
try {
444461
node.closeChannel();
445462
} catch (IOException e) {

src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class ArcusReplKetamaNodeLocator extends SpyObject implements NodeLocator
4343
private final TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaGroups;
4444
private final HashMap<String, MemcachedReplicaGroup> allGroups;
4545
private final Collection<MemcachedNode> allNodes;
46+
private final Collection<MemcachedNode> delayedClosingNodes = new HashSet<MemcachedNode>();
4647

4748
/* ENABLE_MIGRATION if */
4849
private TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaAlterGroups;
@@ -267,6 +268,10 @@ public void update(Collection<MemcachedNode> toAttach,
267268
for (MemcachedNode node : toDelete) {
268269
allNodes.remove(node);
269270
removeNodeFromGroup(node);
271+
if (node.hasOp()) {
272+
delayedClosingNodes.add(node);
273+
continue;
274+
}
270275
try {
271276
node.closeChannel();
272277
} catch (IOException e) {
@@ -303,6 +308,14 @@ public void update(Collection<MemcachedNode> toAttach,
303308
}
304309
}
305310

311+
public Collection<MemcachedNode> getDelayedClosingNodes() {
312+
return Collections.unmodifiableCollection(delayedClosingNodes);
313+
}
314+
315+
public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
316+
delayedClosingNodes.removeAll(closedNodes);
317+
}
318+
306319
public void switchoverReplGroup(MemcachedReplicaGroup group) {
307320
lock.lock();
308321
group.changeRole();
@@ -573,6 +586,10 @@ public void updateAlter(Collection<MemcachedNode> toAttach,
573586
removeHashOfAlter(mrg);
574587
}
575588
}
589+
if (node.hasOp()) {
590+
delayedClosingNodes.add(node);
591+
continue;
592+
}
576593
try {
577594
node.closeChannel();
578595
} catch (IOException e) {

src/main/java/net/spy/memcached/ArrayModNodeLocator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.Arrays;
2222
import java.util.Collection;
23+
import java.util.HashSet;
2324
import java.util.Iterator;
2425
import java.util.List;
2526

@@ -76,6 +77,14 @@ public void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode>
7677
throw new UnsupportedOperationException("update not supported");
7778
}
7879

80+
public Collection<MemcachedNode> getDelayedClosingNodes() {
81+
return new HashSet<MemcachedNode>();
82+
}
83+
84+
public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
85+
// do NOT throw UnsupportedOperationException here for test codes.
86+
}
87+
7988
/* ENABLE_MIGRATION if */
8089
public Collection<MemcachedNode> getAlterAll() {
8190
return new ArrayList<MemcachedNode>();

src/main/java/net/spy/memcached/KetamaNodeLocator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.Collection;
2222
import java.util.Collections;
23+
import java.util.HashSet;
2324
import java.util.Iterator;
2425
import java.util.List;
2526
import java.util.Map;
@@ -148,6 +149,14 @@ public void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode>
148149
throw new UnsupportedOperationException("update not supported");
149150
}
150151

152+
public Collection<MemcachedNode> getDelayedClosingNodes() {
153+
return new HashSet<MemcachedNode>();
154+
}
155+
156+
public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
157+
// do NOT throw UnsupportedOperationException here for test codes.
158+
}
159+
151160
public SortedMap<Long, MemcachedNode> getKetamaNodes() {
152161
return Collections.unmodifiableSortedMap(ketamaNodes);
153162
}

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,9 @@ public void handleIO() throws IOException {
304304
}
305305
/* ENABLE_REPLICATION end */
306306

307+
// Deal with the memcached nodes that removed from ZK but has operation in queue.
308+
handleDelayedClosingNodes();
309+
307310
// Deal with the memcached server group that's been added by CacheManager.
308311
handleCacheNodesChange();
309312

@@ -330,6 +333,11 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
330333
}
331334
/* ENABLE_MIGRATION end */
332335

336+
if (node.isActive()) {
337+
// if memcached node is removed from ZK but still can serve operations, do NOT cancel it.
338+
continue;
339+
}
340+
333341
// removing node is not related to failure mode.
334342
// so, cancel operations regardless of failure mode.
335343
String cause = "node removed.";
@@ -680,6 +688,27 @@ public void complete() {
680688
addOperation(node, op);
681689
}
682690

691+
// Handle the memcached nodes that removed from ZK but has operation in queue.
692+
void handleDelayedClosingNodes() {
693+
Collection<MemcachedNode> nodes = locator.getDelayedClosingNodes();
694+
Collection<MemcachedNode> closedNodes = new HashSet<MemcachedNode>();
695+
696+
for (MemcachedNode node : nodes) {
697+
if (node.hasOp()) {
698+
continue;
699+
}
700+
try {
701+
node.closeChannel();
702+
} catch (IOException e) {
703+
getLogger().error("Failed to closeChannel the node : " + node);
704+
} finally {
705+
closedNodes.add(node);
706+
}
707+
}
708+
709+
locator.updateDelayedClosingNodes(closedNodes);
710+
}
711+
683712
// Handle the memcached server group that's been added by CacheManager.
684713
void handleCacheNodesChange() throws IOException {
685714
/* ENABLE_MIGRATION if */

src/main/java/net/spy/memcached/MemcachedNode.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ public interface MemcachedNode {
109109
*/
110110
boolean hasWriteOp();
111111

112+
/**
113+
* True if any operation is in operation queue.
114+
*/
115+
boolean hasOp();
116+
112117
/**
113118
* Add an operation to the queue. Authentication operations should
114119
* never be added to the queue, but this is not checked.

src/main/java/net/spy/memcached/MemcachedNodeROImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ public boolean hasWriteOp() {
125125
return root.hasReadOp();
126126
}
127127

128+
public boolean hasOp() {
129+
return root.hasOp();
130+
}
131+
128132
public boolean isActive() {
129133
return root.isActive();
130134
}

src/main/java/net/spy/memcached/NodeLocator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,18 @@ public interface NodeLocator {
5858
*/
5959
void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode> toDelete);
6060

61+
/**
62+
* Get all memcached nodes that removed from ZK but has operation in queue.
63+
* Note that this feature is only available in ArcusKetamaNodeLocator.
64+
*/
65+
Collection<MemcachedNode> getDelayedClosingNodes();
66+
67+
/**
68+
* Update all memcached nodes that removed from ZK but has operation in queue.
69+
* Note that this feature is only available in ArcusKetamaNodeLocator.
70+
*/
71+
void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes);
72+
6173
/* ENABLE_MIGRATION if */
6274
/**
6375
* Get all alter memcached nodes.

src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,10 @@ public final boolean hasWriteOp() {
343343
return !(optimizedOp == null && writeQ.isEmpty());
344344
}
345345

346+
public final boolean hasOp() {
347+
return hasReadOp() || hasWriteOp() || !inputQueue.isEmpty();
348+
}
349+
346350
public final void addOpToInputQ(Operation op) {
347351
op.setHandlingNode(this);
348352
op.initialize();

src/test/java/net/spy/memcached/MockMemcachedNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ public boolean hasWriteOp() {
103103
return false;
104104
}
105105

106+
public boolean hasOp() {
107+
return false;
108+
}
109+
106110
public void addOpToInputQ(Operation op) {
107111
// noop
108112
}

0 commit comments

Comments
 (0)