From cd255ccc02ed59aa3bfd0480ecc2c5c14125862b Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 14 Nov 2024 21:45:08 -0400 Subject: [PATCH] Java: validate initial hosts, configuration and nodes loaded from API --- .../AlternatorEndpointProvider.java | 8 +- .../alternator/AlternatorLiveNodes.java | 171 ++++++++++++------ .../alternator/AlternatorRequestHandler.java | 13 +- .../com/scylladb/alternator/test/Demo1.java | 1 - .../com/scylladb/alternator/test/Demo3.java | 4 +- 5 files changed, 129 insertions(+), 68 deletions(-) diff --git a/java/src/main/java/com/scylladb/alternator/AlternatorEndpointProvider.java b/java/src/main/java/com/scylladb/alternator/AlternatorEndpointProvider.java index 4754e07..0fed7e9 100644 --- a/java/src/main/java/com/scylladb/alternator/AlternatorEndpointProvider.java +++ b/java/src/main/java/com/scylladb/alternator/AlternatorEndpointProvider.java @@ -20,7 +20,13 @@ public class AlternatorEndpointProvider implements DynamoDbEndpointProvider { public AlternatorEndpointProvider(URI seedURI) { futureCache = new ConcurrentHashMap<>(); - liveNodes = AlternatorLiveNodes.create(seedURI); + liveNodes = new AlternatorLiveNodes(seedURI); + try { + liveNodes.validate(); + } catch (AlternatorLiveNodes.ValidationError e) { + throw new RuntimeException(e); + } + liveNodes.start(); } @Override diff --git a/java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java b/java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java index 3ce99f1..df36a39 100644 --- a/java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java +++ b/java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java @@ -1,11 +1,12 @@ package com.scylladb.alternator; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.Collections; import java.util.List; import java.util.ArrayList; import java.util.Scanner; -import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.HttpURLConnection; @@ -26,69 +27,115 @@ public class AlternatorLiveNodes extends Thread { private final String alternatorScheme; private final int alternatorPort; - private final AtomicReference> liveNodes; + private final AtomicReference> liveNodes; + private final List initialNodes; private final AtomicInteger nextLiveNodeIndex; private static Logger logger = Logger.getLogger(AlternatorLiveNodes.class.getName()); @Override public void run() { - logger.log(Level.INFO, "AlternatorLiveNodes thread starting"); - for (;;) { - updateLiveNodes(); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - logger.log(Level.INFO, "AlternatorLiveNodes thread interrupted and stopping"); - return; + logger.log(Level.INFO, "AlternatorLiveNodes thread started"); + try { + for (; ; ) { + try { + updateLiveNodes(); + } catch (IOException e) { + logger.log(Level.SEVERE, "AlternatorLiveNodes failed to sync nodes list", e); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.log(Level.INFO, "AlternatorLiveNodes thread interrupted and stopping"); + return; + } } + } finally { + logger.log(Level.INFO, "AlternatorLiveNodes thread stopped"); } } - // A private constructor. Use the create() functions below instead, as - // they also start the thread after creating the object. - private AlternatorLiveNodes(String alternatorScheme, List liveNodes, int alternatorPort) { + public AlternatorLiveNodes(String alternatorScheme, List liveNodes, int alternatorPort) { this.alternatorScheme = alternatorScheme; - this.liveNodes = new AtomicReference<>(new ArrayList<>(liveNodes)); + this.initialNodes = new ArrayList<>(liveNodes); + this.liveNodes = new AtomicReference<>(); this.alternatorPort = alternatorPort; this.nextLiveNodeIndex = new AtomicInteger(0); } - public static AlternatorLiveNodes create(String scheme, List hosts, int port) { - AlternatorLiveNodes ret = new AlternatorLiveNodes(scheme, hosts, port); - // setDaemon(true) allows the program to exit even if the thread is - // is still running. - ret.setDaemon(true); - ret.start(); - // Make sure - return ret; + public AlternatorLiveNodes(URI uri) { + this(uri.getScheme(), Collections.singletonList(uri.getHost()), uri.getPort()); } - public static AlternatorLiveNodes create(URI uri) { - return create(uri.getScheme(), Arrays.asList(uri.getHost()), uri.getPort()); + + @Override + public void start() { + List nodes = new ArrayList<>(); + for (String liveNode : initialNodes) { + try { + nodes.add(this.hostToURI(liveNode)); + } catch (URISyntaxException | MalformedURLException e) { + // Should not happen, initialLiveNodes should be validated at this point + throw new RuntimeException(e); + } + } + this.liveNodes.set(nodes); + + // setDaemon(true) allows the program to exit even if the thread is still running. + this.setDaemon(true); + super.start(); } - public String nextNode() { - List nodes = liveNodes.get(); - String node = nodes.get(Math.abs(nextLiveNodeIndex.getAndIncrement() % nodes.size())); - logger.log(Level.FINE, "Using node " + node); - return node; + public void validate() throws ValidationError { + this.validateConfig(); + for (String liveNode : initialNodes) { + try { + this.hostToURI(liveNode); + } catch (MalformedURLException | URISyntaxException e) { + throw new ValidationError(String.format("failed to validate initial node %s", liveNode), e); + } + } } - public URI nextAsURI() { + public static class ValidationError extends Exception { + public ValidationError(String message, Throwable cause) { + super(message, cause); + } + } + + private void validateConfig() throws ValidationError { try { - return new URI(alternatorScheme, null, nextNode(), alternatorPort, "", null, null); - } catch (URISyntaxException e) { - logger.log(Level.WARNING, "nextAsURI", e); - return null; + this.hostToURI("1.1.1.1"); + } catch (MalformedURLException | URISyntaxException e) { + throw new ValidationError("failed to validate configuration", e); + } + } + + private URI hostToURI(String host) throws URISyntaxException, MalformedURLException { + return hostToURI(host, null, null); + } + + private URI hostToURI(String host, String path, String query) throws URISyntaxException, MalformedURLException { + URI uri = new URI(alternatorScheme, null, host, alternatorPort, path, query, null); + // Make sure that URI to URL conversion works + uri.toURL(); + return uri; + } + + public URI nextAsURI() { + List nodes = liveNodes.get(); + if (nodes.isEmpty()) { + throw new IllegalStateException("No live nodes available"); } + return nodes.get(Math.abs(nextLiveNodeIndex.getAndIncrement() % nodes.size())); } - public URI nextAsURI(String file) { + public URI nextAsURI(String file, String query) { try { - return new URI(alternatorScheme, "", nextNode(), alternatorPort, file, "", ""); + URI uri = this.nextAsURI(); + return new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), file, query, null); } catch (URISyntaxException e) { - logger.log(Level.WARNING, "nextAsURI", e); - return null; + // Should never happen, nextAsURI content is already validated + throw new RuntimeException(e); } } @@ -99,29 +146,35 @@ private static String streamToString(java.io.InputStream is) { return s.hasNext() ? s.next() : ""; } - private void updateLiveNodes() { - List newHosts = new ArrayList<>(); - URI uri = nextAsURI("/localnodes"); + private void updateLiveNodes() throws IOException { + List newHosts = new ArrayList<>(); + URI uri = nextAsURI("/localnodes", null); + // Note that despite this being called HttpURLConnection, it actually + // supports HTTPS as well. + HttpURLConnection conn; try { - // Note that despite this being called HttpURLConnection, it actually - // supports HTTPS as well. - HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection(); - conn.setRequestMethod("GET"); - int responseCode = conn.getResponseCode(); - if (responseCode == HttpURLConnection.HTTP_OK) { - String response = streamToString(conn.getInputStream()); - // response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"] - response = response.trim(); - response = response.substring(1, response.length() - 1); - String[] list = response.split(","); - for (String host : list) { - host = host.trim(); - host = host.substring(1, host.length() - 1); - newHosts.add(host); + conn = (HttpURLConnection) uri.toURL().openConnection(); + } catch (MalformedURLException e){ + // Should never happen, uri is already validated at this point + throw new RuntimeException(e); + } + conn.setRequestMethod("GET"); + int responseCode = conn.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + String response = streamToString(conn.getInputStream()); + // response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"] + response = response.trim(); + response = response.substring(1, response.length() - 1); + String[] list = response.split(","); + for (String host : list) { + host = host.trim(); + host = host.substring(1, host.length() - 1); + try { + newHosts.add(this.hostToURI(host)); + } catch (URISyntaxException | MalformedURLException e) { + logger.log(Level.WARNING, "Invalid host: " + host, e); } } - } catch (IOException e) { - logger.log(Level.FINE, "Request failed: " + uri, e); } if (!newHosts.isEmpty()) { liveNodes.set(newHosts); diff --git a/java/src/main/java/com/scylladb/alternator/AlternatorRequestHandler.java b/java/src/main/java/com/scylladb/alternator/AlternatorRequestHandler.java index 7bc029f..a762c2a 100644 --- a/java/src/main/java/com/scylladb/alternator/AlternatorRequestHandler.java +++ b/java/src/main/java/com/scylladb/alternator/AlternatorRequestHandler.java @@ -1,7 +1,5 @@ package com.scylladb.alternator; -import com.scylladb.alternator.AlternatorLiveNodes; - import com.amazonaws.handlers.RequestHandler2; import com.amazonaws.Request; @@ -13,9 +11,16 @@ */ public class AlternatorRequestHandler extends RequestHandler2 { AlternatorLiveNodes liveNodes; - public AlternatorRequestHandler(URI seedURI) { - liveNodes = AlternatorLiveNodes.create(seedURI); + public AlternatorRequestHandler(URI seedURI){ + liveNodes = new AlternatorLiveNodes(seedURI); + try { + liveNodes.validate(); + } catch (AlternatorLiveNodes.ValidationError e) { + throw new RuntimeException(e); + } + liveNodes.start(); } + @Override public void beforeRequest(Request request) { request.setEndpoint(liveNodes.nextAsURI()); diff --git a/java/src/test/java/com/scylladb/alternator/test/Demo1.java b/java/src/test/java/com/scylladb/alternator/test/Demo1.java index 4e61576..02ac439 100755 --- a/java/src/test/java/com/scylladb/alternator/test/Demo1.java +++ b/java/src/test/java/com/scylladb/alternator/test/Demo1.java @@ -14,7 +14,6 @@ import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.SDKGlobalConfiguration; import com.amazonaws.auth.BasicAWSCredentials; diff --git a/java/src/test/java/com/scylladb/alternator/test/Demo3.java b/java/src/test/java/com/scylladb/alternator/test/Demo3.java index 5e62cd9..6f1686b 100644 --- a/java/src/test/java/com/scylladb/alternator/test/Demo3.java +++ b/java/src/test/java/com/scylladb/alternator/test/Demo3.java @@ -26,9 +26,7 @@ import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; import software.amazon.awssdk.core.internal.http.loader.DefaultSdkAsyncHttpClientBuilder; -import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -95,7 +93,7 @@ public static void main(String[] args) throws MalformedURLException { if (endpoint != null) { URI uri = URI.create(endpoint); - AlternatorEndpointProvider alternatorEndpointProvider = new AlternatorEndpointProvider(uri); + AlternatorEndpointProvider alternatorEndpointProvider = new AlternatorEndpointProvider(uri); if (trustSSL != null && trustSSL.booleanValue()) {