Skip to content

Commit e5d967c

Browse files
dkropachevnyh
authored andcommitted
Add rack/dc aware load balancing
scylladb/scylladb#12147 implements a feature for `/locanodes` to filter out nodes by rack and/or datacenter. Now we can make dynamodb client to target particular rack and/or datacenter. This PR allows user to target dynamodb client to a particular rack/datacenter or rack+datacenter, via following API: ``` AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, datacenter, rack); ``` Changes in auxiliary `AlternatorLiveNodes` API: 1. Constructor input argument is an single URI now 2. Validation is introduced 3. Start is moved out from the constructor ``` liveNodes = new AlternatorLiveNodes(Collections.singletonList(seedURI), datacenter, rack); try { liveNodes.validate(); liveNodes.checkIfRackAndDatacenterSetCorrectly(); } catch (AlternatorLiveNodes.ValidationError | AlternatorLiveNodes.FailedToCheck e) { throw new RuntimeException(e); } liveNodes.start(); ``` 1. Regular, old code: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri)` for `6.2.0` and `master` 2. Dc+Rack-aware: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, "dc1", "rack1")` for `6.2.0` and `master` 3. Dc-aware: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, "dc1", "")` for `6.2.0` and `master` 4. Rack-aware: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, "", "rack1")` for `6.2.0` and `master` All tests are done for following deployments: 1. MultiDc 2. MultiRack+MultiDc 3. SingleDc Closes #40
1 parent a2fbd8f commit e5d967c

File tree

6 files changed

+301
-86
lines changed

6 files changed

+301
-86
lines changed

java/src/main/java/com/scylladb/alternator/AlternatorEndpointProvider.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import java.util.Map;
55
import java.util.concurrent.CompletableFuture;
66
import java.util.concurrent.ConcurrentHashMap;
7+
import java.util.logging.Level;
8+
import java.util.logging.Logger;
79

810
import software.amazon.awssdk.endpoints.Endpoint;
911
import software.amazon.awssdk.services.dynamodb.endpoints.DynamoDbEndpointParams;
@@ -17,16 +19,27 @@
1719
public class AlternatorEndpointProvider implements DynamoDbEndpointProvider {
1820
private final AlternatorLiveNodes liveNodes;
1921
private final Map<URI, CompletableFuture<Endpoint>> futureCache;
22+
private static Logger logger = Logger.getLogger(AlternatorEndpointProvider.class.getName());
2023

2124
public AlternatorEndpointProvider(URI seedURI) {
25+
this(seedURI, "", "");
26+
}
27+
28+
public AlternatorEndpointProvider(URI seedURI, String datacenter, String rack) {
2229
futureCache = new ConcurrentHashMap<>();
23-
liveNodes = new AlternatorLiveNodes(seedURI);
30+
liveNodes = new AlternatorLiveNodes(seedURI, datacenter, rack);
2431
try {
2532
liveNodes.validate();
26-
} catch (AlternatorLiveNodes.ValidationError e) {
33+
liveNodes.checkIfRackAndDatacenterSetCorrectly();
34+
if (!datacenter.isEmpty() || !rack.isEmpty()) {
35+
if (!liveNodes.checkIfRackDatacenterFeatureIsSupported()) {
36+
logger.log(Level.SEVERE, String.format("server %s does not support rack or datacenter filtering", seedURI));
37+
}
38+
}
39+
} catch (AlternatorLiveNodes.ValidationError | AlternatorLiveNodes.FailedToCheck e) {
2740
throw new RuntimeException(e);
28-
}
29-
liveNodes.start();
41+
}
42+
liveNodes.start();
3043
}
3144

3245
@Override
Lines changed: 151 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package com.scylladb.alternator;
22

33
import java.io.IOException;
4-
import java.net.MalformedURLException;
5-
import java.util.Collections;
6-
import java.util.List;
7-
import java.util.ArrayList;
8-
import java.util.Scanner;
9-
import java.util.concurrent.atomic.AtomicInteger;
104
import java.net.URI;
5+
import java.net.MalformedURLException;
116
import java.net.URISyntaxException;
127
import java.net.HttpURLConnection;
8+
import java.net.ProtocolException;
9+
import java.util.*;
10+
import java.util.concurrent.atomic.AtomicInteger;
1311
import java.util.concurrent.atomic.AtomicReference;
1412
import java.util.logging.Logger;
1513
import java.util.logging.Level;
@@ -26,10 +24,11 @@
2624
public class AlternatorLiveNodes extends Thread {
2725
private final String alternatorScheme;
2826
private final int alternatorPort;
29-
3027
private final AtomicReference<List<URI>> liveNodes;
31-
private final List<String> initialNodes;
28+
private final List<URI> initialNodes;
3229
private final AtomicInteger nextLiveNodeIndex;
30+
private final String rack;
31+
private final String datacenter;
3332

3433
private static Logger logger = Logger.getLogger(AlternatorLiveNodes.class.getName());
3534

@@ -55,67 +54,73 @@ public void run() {
5554
}
5655
}
5756

58-
public AlternatorLiveNodes(String alternatorScheme, List<String> liveNodes, int alternatorPort) {
59-
this.alternatorScheme = alternatorScheme;
60-
this.initialNodes = new ArrayList<>(liveNodes);
61-
this.liveNodes = new AtomicReference<>();
62-
this.alternatorPort = alternatorPort;
63-
this.nextLiveNodeIndex = new AtomicInteger(0);
57+
public AlternatorLiveNodes(URI liveNode, String datacenter, String rack) {
58+
this(Collections.singletonList(liveNode), liveNode.getScheme(), liveNode.getPort(), datacenter, rack);
6459
}
6560

66-
public AlternatorLiveNodes(URI uri) {
67-
this(uri.getScheme(), Collections.singletonList(uri.getHost()), uri.getPort());
61+
public AlternatorLiveNodes(List<URI> liveNodes, String scheme, int port, String datacenter, String rack) {
62+
if (liveNodes == null || liveNodes.isEmpty()) {
63+
throw new RuntimeException("liveNodes cannot be null or empty");
64+
}
65+
this.alternatorScheme = scheme;
66+
this.initialNodes = liveNodes;
67+
this.liveNodes = new AtomicReference<>();
68+
this.alternatorPort = port;
69+
this.nextLiveNodeIndex = new AtomicInteger(0);
70+
this.rack = rack;
71+
this.datacenter = datacenter;
6872
}
6973

7074
@Override
7175
public void start() {
72-
List<URI> nodes = new ArrayList<>();
73-
for (String liveNode : initialNodes) {
74-
try {
75-
nodes.add(this.hostToURI(liveNode));
76-
} catch (URISyntaxException | MalformedURLException e) {
77-
// Should not happen, initialLiveNodes should be validated at this point
78-
throw new RuntimeException(e);
79-
}
76+
try {
77+
this.validate();
78+
} catch (ValidationError e) {
79+
throw new RuntimeException(e);
8080
}
81-
this.liveNodes.set(nodes);
81+
liveNodes.set(initialNodes);
8282

8383
// setDaemon(true) allows the program to exit even if the thread is still running.
8484
this.setDaemon(true);
8585
super.start();
8686
}
8787

88+
public void validateURI(URI uri) throws ValidationError {
89+
try {
90+
uri.toURL();
91+
} catch (MalformedURLException e) {
92+
throw new ValidationError("Invalid URI: " + uri, e);
93+
}
94+
}
95+
8896
public void validate() throws ValidationError {
8997
this.validateConfig();
90-
for (String liveNode : initialNodes) {
91-
try {
92-
this.hostToURI(liveNode);
93-
} catch (MalformedURLException | URISyntaxException e) {
94-
throw new ValidationError(String.format("failed to validate initial node %s", liveNode), e);
95-
}
98+
for (URI liveNode : initialNodes) {
99+
this.validateURI(liveNode);
96100
}
97101
}
98102

99103
public static class ValidationError extends Exception {
104+
public ValidationError(String message) {
105+
super(message);
106+
}
107+
100108
public ValidationError(String message, Throwable cause) {
101109
super(message, cause);
102110
}
103111
}
104112

105113
private void validateConfig() throws ValidationError {
106114
try {
115+
// Make sure that `alternatorScheme` and `alternatorPort` are correct values
107116
this.hostToURI("1.1.1.1");
108117
} catch (MalformedURLException | URISyntaxException e) {
109118
throw new ValidationError("failed to validate configuration", e);
110119
}
111120
}
112121

113122
private URI hostToURI(String host) throws URISyntaxException, MalformedURLException {
114-
return hostToURI(host, null, null);
115-
}
116-
117-
private URI hostToURI(String host, String path, String query) throws URISyntaxException, MalformedURLException {
118-
URI uri = new URI(alternatorScheme, null, host, alternatorPort, path, query, null);
123+
URI uri = new URI(alternatorScheme, null, host, alternatorPort, null, null, null);
119124
// Make sure that URI to URL conversion works
120125
uri.toURL();
121126
return uri;
@@ -129,10 +134,10 @@ public URI nextAsURI() {
129134
return nodes.get(Math.abs(nextLiveNodeIndex.getAndIncrement() % nodes.size()));
130135
}
131136

132-
public URI nextAsURI(String file, String query) {
137+
public URI nextAsURI(String path, String query) {
133138
try {
134139
URI uri = this.nextAsURI();
135-
return new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), file, query, null);
140+
return new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), path, query, null);
136141
} catch (URISyntaxException e) {
137142
// Should never happen, nextAsURI content is already validated
138143
throw new RuntimeException(e);
@@ -147,38 +152,123 @@ private static String streamToString(java.io.InputStream is) {
147152
}
148153

149154
private void updateLiveNodes() throws IOException {
150-
List<URI> newHosts = new ArrayList<>();
151-
URI uri = nextAsURI("/localnodes", null);
155+
List<URI> newHosts = getNodes(nextAsLocalNodesURI());
156+
if (!newHosts.isEmpty()) {
157+
liveNodes.set(newHosts);
158+
logger.log(Level.FINE, "Updated hosts to " + liveNodes);
159+
}
160+
}
161+
162+
private List<URI> getNodes(URI uri) throws IOException {
152163
// Note that despite this being called HttpURLConnection, it actually
153164
// supports HTTPS as well.
154165
HttpURLConnection conn;
166+
conn = (HttpURLConnection) uri.toURL().openConnection();
155167
try {
156-
conn = (HttpURLConnection) uri.toURL().openConnection();
157-
} catch (MalformedURLException e){
158-
// Should never happen, uri is already validated at this point
168+
conn.setRequestMethod("GET");
169+
} catch (ProtocolException e) {
170+
// It can happen only of conn is already connected or "GET" is not a valid method
171+
// Both cases not true, os it should happen
159172
throw new RuntimeException(e);
160173
}
161-
conn.setRequestMethod("GET");
162174
int responseCode = conn.getResponseCode();
163-
if (responseCode == HttpURLConnection.HTTP_OK) {
164-
String response = streamToString(conn.getInputStream());
165-
// response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"]
166-
response = response.trim();
167-
response = response.substring(1, response.length() - 1);
168-
String[] list = response.split(",");
169-
for (String host : list) {
170-
host = host.trim();
171-
host = host.substring(1, host.length() - 1);
172-
try {
173-
newHosts.add(this.hostToURI(host));
174-
} catch (URISyntaxException | MalformedURLException e) {
175-
logger.log(Level.WARNING, "Invalid host: " + host, e);
176-
}
175+
if (responseCode != HttpURLConnection.HTTP_OK) {
176+
return Collections.emptyList();
177+
}
178+
String response = streamToString(conn.getInputStream());
179+
// response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"]
180+
response = response.trim();
181+
response = response.substring(1, response.length() - 1);
182+
String[] list = response.split(",");
183+
List<URI> newHosts = new ArrayList<>();
184+
for (String host : list) {
185+
if (host.isEmpty()){
186+
continue;
187+
}
188+
host = host.trim();
189+
host = host.substring(1, host.length() - 1);
190+
try {
191+
newHosts.add(this.hostToURI(host));
192+
} catch (URISyntaxException | MalformedURLException e) {
193+
logger.log(Level.WARNING, "Invalid host: " + host, e);
177194
}
178195
}
179-
if (!newHosts.isEmpty()) {
180-
liveNodes.set(newHosts);
181-
logger.log(Level.FINE, "Updated hosts to " + liveNodes);
196+
return newHosts;
197+
}
198+
199+
private URI nextAsLocalNodesURI() {
200+
if (this.rack.isEmpty() && this.datacenter.isEmpty()) {
201+
return nextAsURI("/localnodes", null);
202+
}
203+
String query = "";
204+
if (!this.rack.isEmpty()) {
205+
query = "rack=" + this.rack;
206+
}
207+
if (!this.datacenter.isEmpty()) {
208+
if (query.isEmpty()) {
209+
query = "dc=" + this.datacenter;
210+
} else {
211+
query += "&dc=" + this.datacenter;
212+
}
213+
}
214+
return nextAsURI("/localnodes", query);
215+
}
216+
217+
public static class FailedToCheck extends Exception {
218+
public FailedToCheck(String message, Throwable cause) {
219+
super(message, cause);
220+
}
221+
222+
public FailedToCheck(String message) {
223+
super(message);
224+
}
225+
}
226+
227+
/**
228+
* Checks if server returns non-empty node list for given datacenter/rack.
229+
* throws {@link FailedToCheck} if it fails to reach server and {@link ValidationError} if list is empty
230+
* otherwise do not throw
231+
*
232+
**/
233+
public void checkIfRackAndDatacenterSetCorrectly() throws FailedToCheck, ValidationError {
234+
if (this.rack.isEmpty() && this.datacenter.isEmpty()) {
235+
return;
236+
}
237+
try {
238+
List<URI> nodes = getNodes(nextAsLocalNodesURI());
239+
if (nodes.isEmpty()) {
240+
throw new ValidationError("node returned empty list, datacenter or rack are set incorrectly");
241+
}
242+
} catch (IOException e) {
243+
throw new FailedToCheck("failed to read list of nodes from the node", e);
244+
}
245+
}
246+
247+
/**
248+
* Returns true if remote node supports /localnodes?rack=<>&dc=<datacenter>.
249+
* If it can't conclude by any reason it throws {@link FailedToCheck}
250+
*/
251+
public Boolean checkIfRackDatacenterFeatureIsSupported() throws FailedToCheck {
252+
URI uri = nextAsURI("/localnodes", null);
253+
URI fakeRackUrl;
254+
try {
255+
fakeRackUrl = new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getQuery(), "rack=fakeRack", "");
256+
} catch (URISyntaxException e) {
257+
// Should not ever happen
258+
throw new FailedToCheck("Invalid URI: " + uri, e);
259+
}
260+
try {
261+
List<URI> hostsWithFakeRack = getNodes(fakeRackUrl);
262+
List<URI> hostsWithoutRack = getNodes(uri);
263+
if (hostsWithoutRack.isEmpty()) {
264+
// This should not normally happen.
265+
// If list of nodes is empty, it is impossible to conclude if it supports rack/datacenter filtering or not.
266+
throw new FailedToCheck(String.format("host %s returned empty list", uri));
267+
}
268+
// When rack filtering is not supported server returns same nodes.
269+
return hostsWithFakeRack.size() != hostsWithoutRack.size();
270+
} catch (IOException e) {
271+
throw new FailedToCheck("failed to read list of nodes from the node", e);
182272
}
183273
}
184274
}

java/src/main/java/com/scylladb/alternator/AlternatorRequestHandler.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,34 @@
44
import com.amazonaws.Request;
55

66
import java.net.URI;
7+
import java.util.logging.Level;
8+
import java.util.logging.Logger;
79

810
/* AlternatorRequestHandler is RequestHandler2 implementation for AWS SDK
911
* for Java v1. It tells the SDK to replace the endpoint in the request,
1012
* whatever it was, with the next Alternator node.
1113
*/
1214
public class AlternatorRequestHandler extends RequestHandler2 {
15+
16+
private static Logger logger = Logger.getLogger(AlternatorRequestHandler.class.getName());
17+
1318
AlternatorLiveNodes liveNodes;
14-
public AlternatorRequestHandler(URI seedURI){
15-
liveNodes = new AlternatorLiveNodes(seedURI);
19+
20+
public AlternatorRequestHandler(URI seedURI) {
21+
this(seedURI, "", "");
22+
}
23+
24+
public AlternatorRequestHandler(URI seedURI, String datacenter, String rack) {
25+
liveNodes = new AlternatorLiveNodes(seedURI, datacenter, rack);
1626
try {
1727
liveNodes.validate();
18-
} catch (AlternatorLiveNodes.ValidationError e) {
28+
liveNodes.checkIfRackAndDatacenterSetCorrectly();
29+
if (!datacenter.isEmpty() || !rack.isEmpty()) {
30+
if (!liveNodes.checkIfRackDatacenterFeatureIsSupported()) {
31+
logger.log(Level.SEVERE, String.format("server %s does not support rack or datacenter filtering", seedURI));
32+
}
33+
}
34+
} catch (AlternatorLiveNodes.ValidationError | AlternatorLiveNodes.FailedToCheck e) {
1935
throw new RuntimeException(e);
2036
}
2137
liveNodes.start();

0 commit comments

Comments
 (0)