Skip to content

Commit

Permalink
Merge pull request #45 from dkropachev/dk/java-validate-hosts
Browse files Browse the repository at this point in the history
Java: validate initial hosts, configuration and nodes loaded from API
  • Loading branch information
dkropachev authored Nov 19, 2024
2 parents 6d02241 + cd255cc commit a2fbd8f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ public class AlternatorEndpointProvider implements DynamoDbEndpointProvider {

public AlternatorEndpointProvider(URI seedURI) {
futureCache = new ConcurrentHashMap<>();
liveNodes = AlternatorLiveNodes.create(seedURI);
liveNodes = new AlternatorLiveNodes(seedURI);
try {
liveNodes.validate();
} catch (AlternatorLiveNodes.ValidationError e) {
throw new RuntimeException(e);
}
liveNodes.start();
}

@Override
Expand Down
171 changes: 112 additions & 59 deletions java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.scylladb.alternator;

import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Scanner;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.HttpURLConnection;
Expand All @@ -26,69 +27,115 @@ public class AlternatorLiveNodes extends Thread {
private final String alternatorScheme;
private final int alternatorPort;

private final AtomicReference<List<String>> liveNodes;
private final AtomicReference<List<URI>> liveNodes;
private final List<String> initialNodes;
private final AtomicInteger nextLiveNodeIndex;

private static Logger logger = Logger.getLogger(AlternatorLiveNodes.class.getName());

@Override
public void run() {
logger.log(Level.INFO, "AlternatorLiveNodes thread starting");
for (;;) {
updateLiveNodes();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.log(Level.INFO, "AlternatorLiveNodes thread interrupted and stopping");
return;
logger.log(Level.INFO, "AlternatorLiveNodes thread started");
try {
for (; ; ) {
try {
updateLiveNodes();
} catch (IOException e) {
logger.log(Level.SEVERE, "AlternatorLiveNodes failed to sync nodes list", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.log(Level.INFO, "AlternatorLiveNodes thread interrupted and stopping");
return;
}
}
} finally {
logger.log(Level.INFO, "AlternatorLiveNodes thread stopped");
}
}

// A private constructor. Use the create() functions below instead, as
// they also start the thread after creating the object.
private AlternatorLiveNodes(String alternatorScheme, List<String> liveNodes, int alternatorPort) {
public AlternatorLiveNodes(String alternatorScheme, List<String> liveNodes, int alternatorPort) {
this.alternatorScheme = alternatorScheme;
this.liveNodes = new AtomicReference<>(new ArrayList<>(liveNodes));
this.initialNodes = new ArrayList<>(liveNodes);
this.liveNodes = new AtomicReference<>();
this.alternatorPort = alternatorPort;
this.nextLiveNodeIndex = new AtomicInteger(0);
}

public static AlternatorLiveNodes create(String scheme, List<String> hosts, int port) {
AlternatorLiveNodes ret = new AlternatorLiveNodes(scheme, hosts, port);
// setDaemon(true) allows the program to exit even if the thread is
// is still running.
ret.setDaemon(true);
ret.start();
// Make sure
return ret;
public AlternatorLiveNodes(URI uri) {
this(uri.getScheme(), Collections.singletonList(uri.getHost()), uri.getPort());
}
public static AlternatorLiveNodes create(URI uri) {
return create(uri.getScheme(), Arrays.asList(uri.getHost()), uri.getPort());

@Override
public void start() {
List<URI> nodes = new ArrayList<>();
for (String liveNode : initialNodes) {
try {
nodes.add(this.hostToURI(liveNode));
} catch (URISyntaxException | MalformedURLException e) {
// Should not happen, initialLiveNodes should be validated at this point
throw new RuntimeException(e);
}
}
this.liveNodes.set(nodes);

// setDaemon(true) allows the program to exit even if the thread is still running.
this.setDaemon(true);
super.start();
}

public String nextNode() {
List<String> nodes = liveNodes.get();
String node = nodes.get(Math.abs(nextLiveNodeIndex.getAndIncrement() % nodes.size()));
logger.log(Level.FINE, "Using node " + node);
return node;
public void validate() throws ValidationError {
this.validateConfig();
for (String liveNode : initialNodes) {
try {
this.hostToURI(liveNode);
} catch (MalformedURLException | URISyntaxException e) {
throw new ValidationError(String.format("failed to validate initial node %s", liveNode), e);
}
}
}

public URI nextAsURI() {
public static class ValidationError extends Exception {
public ValidationError(String message, Throwable cause) {
super(message, cause);
}
}

private void validateConfig() throws ValidationError {
try {
return new URI(alternatorScheme, null, nextNode(), alternatorPort, "", null, null);
} catch (URISyntaxException e) {
logger.log(Level.WARNING, "nextAsURI", e);
return null;
this.hostToURI("1.1.1.1");
} catch (MalformedURLException | URISyntaxException e) {
throw new ValidationError("failed to validate configuration", e);
}
}

private URI hostToURI(String host) throws URISyntaxException, MalformedURLException {
return hostToURI(host, null, null);
}

private URI hostToURI(String host, String path, String query) throws URISyntaxException, MalformedURLException {
URI uri = new URI(alternatorScheme, null, host, alternatorPort, path, query, null);
// Make sure that URI to URL conversion works
uri.toURL();
return uri;
}

public URI nextAsURI() {
List<URI> nodes = liveNodes.get();
if (nodes.isEmpty()) {
throw new IllegalStateException("No live nodes available");
}
return nodes.get(Math.abs(nextLiveNodeIndex.getAndIncrement() % nodes.size()));
}

public URI nextAsURI(String file) {
public URI nextAsURI(String file, String query) {
try {
return new URI(alternatorScheme, "", nextNode(), alternatorPort, file, "", "");
URI uri = this.nextAsURI();
return new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), file, query, null);
} catch (URISyntaxException e) {
logger.log(Level.WARNING, "nextAsURI", e);
return null;
// Should never happen, nextAsURI content is already validated
throw new RuntimeException(e);
}
}

Expand All @@ -99,29 +146,35 @@ private static String streamToString(java.io.InputStream is) {
return s.hasNext() ? s.next() : "";
}

private void updateLiveNodes() {
List<String> newHosts = new ArrayList<>();
URI uri = nextAsURI("/localnodes");
private void updateLiveNodes() throws IOException {
List<URI> newHosts = new ArrayList<>();
URI uri = nextAsURI("/localnodes", null);
// Note that despite this being called HttpURLConnection, it actually
// supports HTTPS as well.
HttpURLConnection conn;
try {
// Note that despite this being called HttpURLConnection, it actually
// supports HTTPS as well.
HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection();
conn.setRequestMethod("GET");
int responseCode = conn.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
String response = streamToString(conn.getInputStream());
// response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"]
response = response.trim();
response = response.substring(1, response.length() - 1);
String[] list = response.split(",");
for (String host : list) {
host = host.trim();
host = host.substring(1, host.length() - 1);
newHosts.add(host);
conn = (HttpURLConnection) uri.toURL().openConnection();
} catch (MalformedURLException e){
// Should never happen, uri is already validated at this point
throw new RuntimeException(e);
}
conn.setRequestMethod("GET");
int responseCode = conn.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
String response = streamToString(conn.getInputStream());
// response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"]
response = response.trim();
response = response.substring(1, response.length() - 1);
String[] list = response.split(",");
for (String host : list) {
host = host.trim();
host = host.substring(1, host.length() - 1);
try {
newHosts.add(this.hostToURI(host));
} catch (URISyntaxException | MalformedURLException e) {
logger.log(Level.WARNING, "Invalid host: " + host, e);
}
}
} catch (IOException e) {
logger.log(Level.FINE, "Request failed: " + uri, e);
}
if (!newHosts.isEmpty()) {
liveNodes.set(newHosts);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.scylladb.alternator;

import com.scylladb.alternator.AlternatorLiveNodes;

import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.Request;

Expand All @@ -13,9 +11,16 @@
*/
public class AlternatorRequestHandler extends RequestHandler2 {
AlternatorLiveNodes liveNodes;
public AlternatorRequestHandler(URI seedURI) {
liveNodes = AlternatorLiveNodes.create(seedURI);
public AlternatorRequestHandler(URI seedURI){
liveNodes = new AlternatorLiveNodes(seedURI);
try {
liveNodes.validate();
} catch (AlternatorLiveNodes.ValidationError e) {
throw new RuntimeException(e);
}
liveNodes.start();
}

@Override
public void beforeRequest(Request<?> request) {
request.setEndpoint(liveNodes.nextAsURI());
Expand Down
1 change: 0 additions & 1 deletion java/src/test/java/com/scylladb/alternator/test/Demo1.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.SDKGlobalConfiguration;

import com.amazonaws.auth.BasicAWSCredentials;
Expand Down
4 changes: 1 addition & 3 deletions java/src/test/java/com/scylladb/alternator/test/Demo3.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.internal.http.loader.DefaultSdkAsyncHttpClientBuilder;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
Expand Down Expand Up @@ -95,7 +93,7 @@ public static void main(String[] args) throws MalformedURLException {

if (endpoint != null) {
URI uri = URI.create(endpoint);
AlternatorEndpointProvider alternatorEndpointProvider = new AlternatorEndpointProvider(uri);
AlternatorEndpointProvider alternatorEndpointProvider = new AlternatorEndpointProvider(uri);


if (trustSSL != null && trustSSL.booleanValue()) {
Expand Down

0 comments on commit a2fbd8f

Please sign in to comment.