Skip to content

Commit

Permalink
Add rack/dc aware load balancing
Browse files Browse the repository at this point in the history
scylladb/scylladb#12147 implements a feature for `/locanodes` to
filter out nodes by rack and/or datacenter.
Now we can make dynamodb client to target particular rack and/or datacenter.

This PR allows user to target dynamodb client to a particular rack/datacenter or rack+datacenter, via following API:
```
        AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, datacenter, rack);
```

Changes in auxiliary `AlternatorLiveNodes` API:
1. Constructor input argument is an single URI now
2. Validation is introduced
3. Start is moved out from the constructor
```
        liveNodes = new AlternatorLiveNodes(Collections.singletonList(seedURI), datacenter, rack);
        try {
            liveNodes.validate();
            liveNodes.checkIfRackAndDatacenterSetCorrectly();
        } catch (AlternatorLiveNodes.ValidationError | AlternatorLiveNodes.FailedToCheck e) {
            throw new RuntimeException(e);
        }
        liveNodes.start();
```

1. Regular, old code: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri)` for `6.2.0` and `master`
2. Dc+Rack-aware: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, "dc1", "rack1")`  for `6.2.0` and `master`
3. Dc-aware: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, "dc1", "")`  for `6.2.0` and `master`
4. Rack-aware: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, "", "rack1")`  for `6.2.0` and `master`

All tests are done for following deployments:
1. MultiDc
2. MultiRack+MultiDc
3. SingleDc
  • Loading branch information
dkropachev committed Nov 27, 2024
1 parent a2fbd8f commit 68cb118
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

import software.amazon.awssdk.endpoints.Endpoint;
import software.amazon.awssdk.services.dynamodb.endpoints.DynamoDbEndpointParams;
Expand All @@ -17,16 +19,27 @@
public class AlternatorEndpointProvider implements DynamoDbEndpointProvider {
private final AlternatorLiveNodes liveNodes;
private final Map<URI, CompletableFuture<Endpoint>> futureCache;
private static Logger logger = Logger.getLogger(AlternatorEndpointProvider.class.getName());

public AlternatorEndpointProvider(URI seedURI) {
this(seedURI, "", "");
}

public AlternatorEndpointProvider(URI seedURI, String datacenter, String rack) {
futureCache = new ConcurrentHashMap<>();
liveNodes = new AlternatorLiveNodes(seedURI);
liveNodes = new AlternatorLiveNodes(seedURI, datacenter, rack);
try {
liveNodes.validate();
} catch (AlternatorLiveNodes.ValidationError e) {
liveNodes.checkIfRackAndDatacenterSetCorrectly();
if (!datacenter.isEmpty() || !rack.isEmpty()) {
if (!liveNodes.checkIfRackDatacenterFeatureIsSupported()) {
logger.log(Level.SEVERE, String.format("server %s does not support rack or datacenter filtering", seedURI));
}
}
} catch (AlternatorLiveNodes.ValidationError | AlternatorLiveNodes.FailedToCheck e) {
throw new RuntimeException(e);
}
liveNodes.start();
}
liveNodes.start();
}

@Override
Expand Down
212 changes: 151 additions & 61 deletions java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.scylladb.alternator;

import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicInteger;
import java.net.URI;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.logging.Level;
Expand All @@ -26,10 +24,11 @@
public class AlternatorLiveNodes extends Thread {
private final String alternatorScheme;
private final int alternatorPort;

private final AtomicReference<List<URI>> liveNodes;
private final List<String> initialNodes;
private final List<URI> initialNodes;
private final AtomicInteger nextLiveNodeIndex;
private final String rack;
private final String datacenter;

private static Logger logger = Logger.getLogger(AlternatorLiveNodes.class.getName());

Expand All @@ -55,67 +54,73 @@ public void run() {
}
}

public AlternatorLiveNodes(String alternatorScheme, List<String> liveNodes, int alternatorPort) {
this.alternatorScheme = alternatorScheme;
this.initialNodes = new ArrayList<>(liveNodes);
this.liveNodes = new AtomicReference<>();
this.alternatorPort = alternatorPort;
this.nextLiveNodeIndex = new AtomicInteger(0);
public AlternatorLiveNodes(URI liveNode, String datacenter, String rack) {
this(Collections.singletonList(liveNode), liveNode.getScheme(), liveNode.getPort(), datacenter, rack);
}

public AlternatorLiveNodes(URI uri) {
this(uri.getScheme(), Collections.singletonList(uri.getHost()), uri.getPort());
public AlternatorLiveNodes(List<URI> liveNodes, String scheme, int port, String datacenter, String rack) {
if (liveNodes == null || liveNodes.isEmpty()) {
throw new RuntimeException("liveNodes cannot be null or empty");
}
this.alternatorScheme = scheme;
this.initialNodes = liveNodes;
this.liveNodes = new AtomicReference<>();
this.alternatorPort = port;
this.nextLiveNodeIndex = new AtomicInteger(0);
this.rack = rack;
this.datacenter = datacenter;
}

@Override
public void start() {
List<URI> nodes = new ArrayList<>();
for (String liveNode : initialNodes) {
try {
nodes.add(this.hostToURI(liveNode));
} catch (URISyntaxException | MalformedURLException e) {
// Should not happen, initialLiveNodes should be validated at this point
throw new RuntimeException(e);
}
try {
this.validate();
} catch (ValidationError e) {
throw new RuntimeException(e);
}
this.liveNodes.set(nodes);
liveNodes.set(initialNodes);

// setDaemon(true) allows the program to exit even if the thread is still running.
this.setDaemon(true);
super.start();
}

public void validateURI(URI uri) throws ValidationError {
try {
uri.toURL();
} catch (MalformedURLException e) {
throw new ValidationError("Invalid URI: " + uri, e);
}
}

public void validate() throws ValidationError {
this.validateConfig();
for (String liveNode : initialNodes) {
try {
this.hostToURI(liveNode);
} catch (MalformedURLException | URISyntaxException e) {
throw new ValidationError(String.format("failed to validate initial node %s", liveNode), e);
}
for (URI liveNode : initialNodes) {
this.validateURI(liveNode);
}
}

public static class ValidationError extends Exception {
public ValidationError(String message) {
super(message);
}

public ValidationError(String message, Throwable cause) {
super(message, cause);
}
}

private void validateConfig() throws ValidationError {
try {
// Make sure that `alternatorScheme` and `alternatorPort` are correct values
this.hostToURI("1.1.1.1");
} catch (MalformedURLException | URISyntaxException e) {
throw new ValidationError("failed to validate configuration", e);
}
}

private URI hostToURI(String host) throws URISyntaxException, MalformedURLException {
return hostToURI(host, null, null);
}

private URI hostToURI(String host, String path, String query) throws URISyntaxException, MalformedURLException {
URI uri = new URI(alternatorScheme, null, host, alternatorPort, path, query, null);
URI uri = new URI(alternatorScheme, null, host, alternatorPort, null, null, null);
// Make sure that URI to URL conversion works
uri.toURL();
return uri;
Expand All @@ -129,10 +134,10 @@ public URI nextAsURI() {
return nodes.get(Math.abs(nextLiveNodeIndex.getAndIncrement() % nodes.size()));
}

public URI nextAsURI(String file, String query) {
public URI nextAsURI(String path, String query) {
try {
URI uri = this.nextAsURI();
return new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), file, query, null);
return new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), path, query, null);
} catch (URISyntaxException e) {
// Should never happen, nextAsURI content is already validated
throw new RuntimeException(e);
Expand All @@ -147,38 +152,123 @@ private static String streamToString(java.io.InputStream is) {
}

private void updateLiveNodes() throws IOException {
List<URI> newHosts = new ArrayList<>();
URI uri = nextAsURI("/localnodes", null);
List<URI> newHosts = getNodes(nextAsLocalNodesURI());
if (!newHosts.isEmpty()) {
liveNodes.set(newHosts);
logger.log(Level.FINE, "Updated hosts to " + liveNodes);
}
}

private List<URI> getNodes(URI uri) throws IOException {
// Note that despite this being called HttpURLConnection, it actually
// supports HTTPS as well.
HttpURLConnection conn;
conn = (HttpURLConnection) uri.toURL().openConnection();
try {
conn = (HttpURLConnection) uri.toURL().openConnection();
} catch (MalformedURLException e){
// Should never happen, uri is already validated at this point
conn.setRequestMethod("GET");
} catch (ProtocolException e) {
// It can happen only of conn is already connected or "GET" is not a valid method
// Both cases not true, os it should happen
throw new RuntimeException(e);
}
conn.setRequestMethod("GET");
int responseCode = conn.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
String response = streamToString(conn.getInputStream());
// response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"]
response = response.trim();
response = response.substring(1, response.length() - 1);
String[] list = response.split(",");
for (String host : list) {
host = host.trim();
host = host.substring(1, host.length() - 1);
try {
newHosts.add(this.hostToURI(host));
} catch (URISyntaxException | MalformedURLException e) {
logger.log(Level.WARNING, "Invalid host: " + host, e);
}
if (responseCode != HttpURLConnection.HTTP_OK) {
return Collections.emptyList();
}
String response = streamToString(conn.getInputStream());
// response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"]
response = response.trim();
response = response.substring(1, response.length() - 1);
String[] list = response.split(",");
List<URI> newHosts = new ArrayList<>();
for (String host : list) {
if (host.isEmpty()){
continue;
}
host = host.trim();
host = host.substring(1, host.length() - 1);
try {
newHosts.add(this.hostToURI(host));
} catch (URISyntaxException | MalformedURLException e) {
logger.log(Level.WARNING, "Invalid host: " + host, e);
}
}
if (!newHosts.isEmpty()) {
liveNodes.set(newHosts);
logger.log(Level.FINE, "Updated hosts to " + liveNodes);
return newHosts;
}

private URI nextAsLocalNodesURI() {
if (this.rack.isEmpty() && this.datacenter.isEmpty()) {
return nextAsURI("/localnodes", null);
}
String query = "";
if (!this.rack.isEmpty()) {
query = "rack=" + this.rack;
}
if (!this.datacenter.isEmpty()) {
if (query.isEmpty()) {
query = "dc=" + this.datacenter;
} else {
query += "&dc=" + this.datacenter;
}
}
return nextAsURI("/localnodes", query);
}

public static class FailedToCheck extends Exception {
public FailedToCheck(String message, Throwable cause) {
super(message, cause);
}

public FailedToCheck(String message) {
super(message);
}
}

/**
* Checks if server returns non-empty node list for given datacenter/rack.
* throws {@link FailedToCheck} if it fails to reach server and {@link ValidationError} if list is empty
* otherwise do not throw
*
**/
public void checkIfRackAndDatacenterSetCorrectly() throws FailedToCheck, ValidationError {
if (this.rack.isEmpty() && this.datacenter.isEmpty()) {
return;
}
try {
List<URI> nodes = getNodes(nextAsLocalNodesURI());
if (nodes.isEmpty()) {
throw new ValidationError("node returned empty list, datacenter or rack are set incorrectly");
}
} catch (IOException e) {
throw new FailedToCheck("failed to read list of nodes from the node", e);
}
}

/**
* Returns true if remote node supports /localnodes?rack=<>&dc=<datacenter>.
* If it can't conclude by any reason it throws {@link FailedToCheck}
*/
public Boolean checkIfRackDatacenterFeatureIsSupported() throws FailedToCheck {
URI uri = nextAsURI("/localnodes", null);
URI fakeRackUrl;
try {
fakeRackUrl = new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getQuery(), "rack=fakeRack", "");
} catch (URISyntaxException e) {
// Should not ever happen
throw new FailedToCheck("Invalid URI: " + uri, e);
}
try {
List<URI> hostsWithFakeRack = getNodes(fakeRackUrl);
List<URI> hostsWithoutRack = getNodes(uri);
if (hostsWithoutRack.isEmpty()) {
// This should not normally happen.
// If list of nodes is empty, it is impossible to conclude if it supports rack/datacenter filtering or not.
throw new FailedToCheck(String.format("host %s returned empty list", uri));
}
// When rack filtering is not supported server returns same nodes.
return hostsWithFakeRack.size() != hostsWithoutRack.size();
} catch (IOException e) {
throw new FailedToCheck("failed to read list of nodes from the node", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,34 @@
import com.amazonaws.Request;

import java.net.URI;
import java.util.logging.Level;
import java.util.logging.Logger;

/* AlternatorRequestHandler is RequestHandler2 implementation for AWS SDK
* for Java v1. It tells the SDK to replace the endpoint in the request,
* whatever it was, with the next Alternator node.
*/
public class AlternatorRequestHandler extends RequestHandler2 {

private static Logger logger = Logger.getLogger(AlternatorRequestHandler.class.getName());

AlternatorLiveNodes liveNodes;
public AlternatorRequestHandler(URI seedURI){
liveNodes = new AlternatorLiveNodes(seedURI);

public AlternatorRequestHandler(URI seedURI) {
this(seedURI, "", "");
}

public AlternatorRequestHandler(URI seedURI, String datacenter, String rack) {
liveNodes = new AlternatorLiveNodes(seedURI, datacenter, rack);
try {
liveNodes.validate();
} catch (AlternatorLiveNodes.ValidationError e) {
liveNodes.checkIfRackAndDatacenterSetCorrectly();
if (!datacenter.isEmpty() || !rack.isEmpty()) {
if (!liveNodes.checkIfRackDatacenterFeatureIsSupported()) {
logger.log(Level.SEVERE, String.format("server %s does not support rack or datacenter filtering", seedURI));
}
}
} catch (AlternatorLiveNodes.ValidationError | AlternatorLiveNodes.FailedToCheck e) {
throw new RuntimeException(e);
}
liveNodes.start();
Expand Down
Loading

0 comments on commit 68cb118

Please sign in to comment.