Skip to content

Commit

Permalink
Java: remove synchronized parts
Browse files Browse the repository at this point in the history
  • Loading branch information
dkropachev committed Nov 14, 2024
1 parent 3b89842 commit 600d06e
Showing 1 changed file with 35 additions and 14 deletions.
49 changes: 35 additions & 14 deletions java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import java.util.ArrayList;
import java.util.Scanner;
import java.util.Arrays;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
Expand All @@ -25,11 +25,11 @@
* to one of these nodes.
*/
public class AlternatorLiveNodes extends Thread {
private String alternatorScheme;
private int alternatorPort;
private final String alternatorScheme;
private final int alternatorPort;

private List<String> liveNodes;
private int nextLiveNodeIndex;
private final CopyOnWriteArrayList<String> liveNodes;
private final AtomicInteger nextLiveNodeIndex;

private static Logger logger = Logger.getLogger(AlternatorLiveNodes.class.getName());

Expand All @@ -51,9 +51,9 @@ public void run() {
// they also start the thread after creating the object.
private AlternatorLiveNodes(String alternatorScheme, List<String> liveNodes, int alternatorPort) {
this.alternatorScheme = alternatorScheme;
this.liveNodes = liveNodes;
this.liveNodes = new CopyOnWriteArrayList<>(liveNodes);
this.alternatorPort = alternatorPort;
this.nextLiveNodeIndex = 0;
this.nextLiveNodeIndex = new AtomicInteger(0);
}

public static AlternatorLiveNodes create(String scheme, List<String> hosts, int port) {
Expand All @@ -69,10 +69,17 @@ public static AlternatorLiveNodes create(URI uri) {
return create(uri.getScheme(), Arrays.asList(uri.getHost()), uri.getPort());
}

public synchronized String nextNode() {
String node = liveNodes.get(nextLiveNodeIndex);
public String nextNode() {
String node;
// There is a small chance that between liveNodes.size and liveNodes.get execution is interrupted
// and liveNodes.size is being changed, for that purpose it is iterating until it is not happening.
while (true) {
try {
node = liveNodes.get(nextLiveNodeIndex.addAndGet(1) % liveNodes.size());
break;
} catch (RuntimeException ignored) {}
}
logger.log(Level.FINE, "Using node " + node);
nextLiveNodeIndex = (nextLiveNodeIndex + 1) % liveNodes.size();
return node;
}

Expand Down Expand Up @@ -128,9 +135,23 @@ private void updateLiveNodes() {
logger.log(Level.FINE, "Request failed: " + url, e);
}
if (!newHosts.isEmpty()) {
synchronized(this) {
this.liveNodes = newHosts;
this.nextLiveNodeIndex = 0;
ArrayList<String> toDelete = new ArrayList<>();
ArrayList<String> toAdd = new ArrayList<>();
liveNodes.forEach(node -> {
if(!newHosts.contains(node)) {
toDelete.add(node);
}
});
newHosts.forEach(host -> {
if (!liveNodes.contains(host)) {
toAdd.add(host);
}
});
if (!toAdd.isEmpty()) {
liveNodes.addAll(toAdd);
}
if (!toDelete.isEmpty()) {
liveNodes.removeAll(toDelete);
}
logger.log(Level.FINE, "Updated hosts to " + this.liveNodes.toString());
}
Expand Down

0 comments on commit 600d06e

Please sign in to comment.