Skip to content

Commit

Permalink
INTERNAL: Not to cancel operations when node is removed from ZK but s…
Browse files Browse the repository at this point in the history
…till alive.
  • Loading branch information
uhm0311 committed May 8, 2024
1 parent ba93538 commit d520ae8
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 5 deletions.
17 changes: 17 additions & 0 deletions src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ArcusKetamaNodeLocator extends SpyObject implements NodeLocator {

private final TreeMap<Long, SortedSet<MemcachedNode>> ketamaNodes;
private final Collection<MemcachedNode> allNodes;
private final Collection<MemcachedNode> delayedClosingNodes = new HashSet<MemcachedNode>();

/* ENABLE_MIGRATION if */
private TreeMap<Long, SortedSet<MemcachedNode>> ketamaAlterNodes;
Expand Down Expand Up @@ -226,6 +227,10 @@ public void update(Collection<MemcachedNode> toAttach,
for (MemcachedNode node : toDelete) {
allNodes.remove(node);
removeHash(node);
if (node.hasOp() && node.isActive()) {
delayedClosingNodes.add(node);
continue;
}
try {
node.closeChannel();
} catch (IOException e) {
Expand All @@ -244,6 +249,14 @@ public void update(Collection<MemcachedNode> toAttach,
}
}

public Collection<MemcachedNode> getDelayedClosingNodes() {
return Collections.unmodifiableCollection(delayedClosingNodes);
}

public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
delayedClosingNodes.removeAll(closedNodes);
}

private Long getKetamaHashPoint(byte[] digest, int h) {
return ((long) (digest[3 + h * 4] & 0xFF) << 24)
| ((long) (digest[2 + h * 4] & 0xFF) << 16)
Expand Down Expand Up @@ -440,6 +453,10 @@ public void updateAlter(Collection<MemcachedNode> toAttach,
for (MemcachedNode node : toDelete) {
alterNodes.remove(node);
removeHashOfAlter(node);
if (node.hasOp() && node.isActive()) {
delayedClosingNodes.add(node);
continue;
}
try {
node.closeChannel();
} catch (IOException e) {
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ArcusReplKetamaNodeLocator extends SpyObject implements NodeLocator
private final TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaGroups;
private final HashMap<String, MemcachedReplicaGroup> allGroups;
private final Collection<MemcachedNode> allNodes;
private final Collection<MemcachedNode> delayedClosingNodes = new HashSet<MemcachedNode>();

/* ENABLE_MIGRATION if */
private TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaAlterGroups;
Expand Down Expand Up @@ -267,6 +268,10 @@ public void update(Collection<MemcachedNode> toAttach,
for (MemcachedNode node : toDelete) {
allNodes.remove(node);
removeNodeFromGroup(node);
if (node.hasOp() && node.isActive()) {
delayedClosingNodes.add(node);
continue;
}
try {
node.closeChannel();
} catch (IOException e) {
Expand Down Expand Up @@ -303,6 +308,14 @@ public void update(Collection<MemcachedNode> toAttach,
}
}

public Collection<MemcachedNode> getDelayedClosingNodes() {
return Collections.unmodifiableCollection(delayedClosingNodes);
}

public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
delayedClosingNodes.removeAll(closedNodes);
}

public void switchoverReplGroup(MemcachedReplicaGroup group) {
lock.lock();
group.changeRole();
Expand Down Expand Up @@ -573,6 +586,10 @@ public void updateAlter(Collection<MemcachedNode> toAttach,
removeHashOfAlter(mrg);
}
}
if (node.hasOp() && node.isActive()) {
delayedClosingNodes.add(node);
continue;
}
try {
node.closeChannel();
} catch (IOException e) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/net/spy/memcached/ArrayModNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

Expand Down Expand Up @@ -76,6 +77,14 @@ public void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode>
throw new UnsupportedOperationException("update not supported");
}

public Collection<MemcachedNode> getDelayedClosingNodes() {
return new HashSet<MemcachedNode>();
}

public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
// do NOT throw UnsupportedOperationException here for test codes.
}

/* ENABLE_MIGRATION if */
public Collection<MemcachedNode> getAlterAll() {
return new ArrayList<MemcachedNode>();
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/net/spy/memcached/KetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -148,6 +149,14 @@ public void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode>
throw new UnsupportedOperationException("update not supported");
}

public Collection<MemcachedNode> getDelayedClosingNodes() {
return new HashSet<MemcachedNode>();
}

public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
// do NOT throw UnsupportedOperationException here for test codes.
}

public SortedMap<Long, MemcachedNode> getKetamaNodes() {
return Collections.unmodifiableSortedMap(ketamaNodes);
}
Expand Down
55 changes: 51 additions & 4 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ public void handleIO() throws IOException {
}
/* ENABLE_REPLICATION end */

// Deal with the memcached nodes that removed from ZK but has operation in queue.
handleDelayedClosingNodes();

// Deal with the memcached server group that's been added by CacheManager.
handleCacheNodesChange();

Expand All @@ -330,12 +333,18 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
}
/* ENABLE_MIGRATION end */

if (node.isActive()) {
// if a memcached node is removed from ZK but can still serve operations, do NOT cancel it.
// operations that remain in operation queue will be processed until connection is lost.
// once all remaining operations are processed, client will close connection.
// if connection is lost before remaining operations are processed,
// all of them will be canceled after connection is lost.
continue;
}

// removing node is not related to failure mode.
// so, cancel operations regardless of failure mode.
String cause = "node removed.";
cancelOperations(node.destroyReadQueue(false), cause);
cancelOperations(node.destroyWriteQueue(false), cause);
cancelOperations(node.destroyInputQueue(), cause);
cancelAllOperations(node, "node removed.");
}
}

Expand Down Expand Up @@ -680,6 +689,38 @@ public void complete() {
addOperation(node, op);
}

// Handle the memcached nodes that removed from ZK but has operation in queue.
void handleDelayedClosingNodes() {
Collection<MemcachedNode> closingNodes = locator.getDelayedClosingNodes();
if (closingNodes.isEmpty()) {
return;
}

Collection<MemcachedNode> closedNodes = new HashSet<MemcachedNode>();
for (MemcachedNode node : closingNodes) {
boolean isActive = node.isActive();
boolean hasOp = node.hasOp();

if (isActive && !hasOp) {
try {
node.closeChannel();
} catch (IOException e) {
getLogger().error("Failed to closeChannel the node : " + node);
}
} else if (!isActive && hasOp) {
cancelAllOperations(node, "connection lost after node removed.");
} else {
continue;
}

closedNodes.add(node);
}

if (!closedNodes.isEmpty()) {
locator.updateDelayedClosingNodes(closedNodes);
}
}

// Handle the memcached server group that's been added by CacheManager.
void handleCacheNodesChange() throws IOException {
/* ENABLE_MIGRATION if */
Expand Down Expand Up @@ -1225,6 +1266,12 @@ private void cancelOperations(Collection<Operation> ops, String cause) {
}
}

private void cancelAllOperations(MemcachedNode node, String cause) {
cancelOperations(node.destroyReadQueue(false), cause);
cancelOperations(node.destroyWriteQueue(false), cause);
cancelOperations(node.destroyInputQueue(), cause);
}

private void redistributeOperations(Collection<Operation> ops, String cause) {
for (Operation op : ops) {
if (op instanceof KeyedOperation) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public interface MemcachedNode {
*/
boolean hasWriteOp();

/**
* True if any operation is in operation queue.
*/
boolean hasOp();

/**
* Add an operation to the queue. Authentication operations should
* never be added to the queue, but this is not checked.
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedNodeROImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public boolean hasWriteOp() {
return root.hasReadOp();
}

public boolean hasOp() {
return root.hasOp();
}

public boolean isActive() {
return root.isActive();
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/net/spy/memcached/NodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ public interface NodeLocator {
*/
void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode> toDelete);

/**
* Get all memcached nodes that removed from ZK but has operation in queue.
* Note that this feature is only available in ArcusKetamaNodeLocator.
*/
Collection<MemcachedNode> getDelayedClosingNodes();

/**
* Update all memcached nodes that removed from ZK but has operation in queue.
* Note that this feature is only available in ArcusKetamaNodeLocator.
*/
void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes);

/* ENABLE_MIGRATION if */
/**
* Get all alter memcached nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ public final boolean hasWriteOp() {
return !(optimizedOp == null && writeQ.isEmpty());
}

public final boolean hasOp() {
return hasReadOp() || hasWriteOp() || !inputQueue.isEmpty();
}

public final void addOpToInputQ(Operation op) {
op.setHandlingNode(this);
op.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void testReadOnliness() throws Exception {
Set<String> acceptable = new HashSet<String>(Arrays.asList(
"toString", "getSocketAddress", "getBytesRemainingToWrite",
"getReconnectCount", "getSelectionOps", "getNodeName", "hasReadOp",
"hasWriteOp", "isActive", "isFirstConnecting"));
"hasWriteOp", "hasOp", "isActive", "isFirstConnecting"));

for (Method meth : MemcachedNode.class.getMethods()) {
if (acceptable.contains(meth.getName())) {
Expand Down
4 changes: 4 additions & 0 deletions src/test/java/net/spy/memcached/MockMemcachedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ public boolean hasWriteOp() {
return false;
}

public boolean hasOp() {
return false;
}

public void addOpToInputQ(Operation op) {
// noop
}
Expand Down

0 comments on commit d520ae8

Please sign in to comment.