Skip to content

Add SubnetAddressTranslator #2013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,48 @@ public enum DefaultDriverOption implements DriverOption {
*
* <p>Value-type: boolean
*/
SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san");
SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san"),
/**
* An address to always translate all node addresses to that same proxy hostname no matter what IP
* address a node has, but still using its native transport port.
*
* <p>Value-Type: {@link String}
*/
ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME("advanced.address-translator.advertised-hostname"),
/**
* A map of Cassandra node subnets (CIDR notations) to target addresses, for example (note quoted
* keys):
*
* <pre>
* advanced.address-translator.subnet-addresses {
* "100.64.0.0/15" = "cassandra.datacenter1.com:9042"
* "100.66.0.0/15" = "cassandra.datacenter2.com:9042"
* # IPv6 example:
* # "::ffff:6440:0/111" = "cassandra.datacenter1.com:9042"
* # "::ffff:6442:0/111" = "cassandra.datacenter2.com:9042"
* }
* </pre>
*
* Note: subnets must be represented as prefix blocks, see {@link
* inet.ipaddr.Address#isPrefixBlock()}.
*
* <p>Value type: {@link java.util.Map Map}&#60;{@link String},{@link String}&#62;
*/
ADDRESS_TRANSLATOR_SUBNET_ADDRESSES("advanced.address-translator.subnet-addresses"),
/**
* A default address to fallback to if Cassandra node IP isn't contained in any of the configured
* subnets.
*
* <p>Value-Type: {@link String}
*/
ADDRESS_TRANSLATOR_DEFAULT_ADDRESS("advanced.address-translator.default-address"),
/**
* Whether to resolve the addresses on initialization (if true) or on each node (re-)connection
* (if false). Defaults to false.
*
* <p>Value-Type: boolean
*/
ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES("advanced.address-translator.resolve-addresses");

private final String path;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@

import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
import com.datastax.oss.driver.internal.core.util.AddressUtils;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
Expand All @@ -41,7 +38,22 @@ public static Set<EndPoint> merge(

Set<EndPoint> result = Sets.newHashSet(programmaticContactPoints);
for (String spec : configContactPoints) {
for (InetSocketAddress address : extract(spec, resolve)) {

Set<InetSocketAddress> addresses = Collections.emptySet();
try {
addresses = AddressUtils.extract(spec, resolve);
} catch (RuntimeException e) {
LOG.warn("Ignoring invalid contact point {} ({})", spec, e.getMessage(), e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just continue here to next iteration of the outer for loop. You know addresses is the empty set at this point so there's no point iterating over it below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I look at this now it feels like we had to make this a bit more complicated because AddressUtils.extract() now throws exceptions in most cases rather than just logging errors and returning an empty set. Was there a particular reason for this change? It's not immediately clear the exceptions buy you much here.

Copy link
Contributor Author

@jahstreet jahstreet Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, #extract code was used only in this class and we logged errors together with reasons of these errors. Now the info about reasons is moved to util method, which is called from multiple places. In this class, I aimed to keep logging (as well as other functionality) as close to the origin as seemed possible to avoid opinionated refactoring, so I needed a way to get reasons of errors from the utility #extract to log them together with the context logs.
Happy to agree on the way it should look like and change accordingly.


if (addresses.size() > 1) {
LOG.info(
"Contact point {} resolves to multiple addresses, will use them all ({})",
spec,
addresses);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this log message offer us much useful information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, it was there so I kept it as is.
As for me, this log is a good additional info when debugging failed to connect issues. Like, one could be surprised to see the client failed to connect logs where contact points do not match the configured ones.
What is your opinion on the need of it?


for (InetSocketAddress address : addresses) {
DefaultEndPoint endPoint = new DefaultEndPoint(address);
boolean wasNew = result.add(endPoint);
if (!wasNew) {
Expand All @@ -51,43 +63,4 @@ public static Set<EndPoint> merge(
}
return ImmutableSet.copyOf(result);
}

private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
int separator = spec.lastIndexOf(':');
if (separator < 0) {
LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec);
return Collections.emptySet();
}

String host = spec.substring(0, separator);
String portSpec = spec.substring(separator + 1);
int port;
try {
port = Integer.parseInt(portSpec);
} catch (NumberFormatException e) {
LOG.warn("Ignoring invalid contact point {} (expecting a number, got {})", spec, portSpec);
return Collections.emptySet();
}
if (!resolve) {
return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port));
} else {
try {
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
if (inetAddresses.length > 1) {
LOG.info(
"Contact point {} resolves to multiple addresses, will use them all ({})",
spec,
Arrays.deepToString(inetAddresses));
}
Set<InetSocketAddress> result = new HashSet<>();
for (InetAddress inetAddress : inetAddresses) {
result.add(new InetSocketAddress(inetAddress, port));
}
return result;
} catch (UnknownHostException e) {
LOG.warn("Ignoring invalid contact point {} (unknown host {})", spec, host);
return Collections.emptySet();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
*/
package com.datastax.oss.driver.internal.core.addresstranslation;

import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME;

import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.context.DriverContext;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.InetSocketAddress;
Expand All @@ -37,28 +38,13 @@ public class FixedHostNameAddressTranslator implements AddressTranslator {

private static final Logger LOG = LoggerFactory.getLogger(FixedHostNameAddressTranslator.class);

public static final String ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME =
"advanced.address-translator.advertised-hostname";

public static DriverOption ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME_OPTION =
new DriverOption() {
@NonNull
@Override
public String getPath() {
return ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME;
}
};

private final String advertisedHostname;
private final String logPrefix;

public FixedHostNameAddressTranslator(@NonNull DriverContext context) {
logPrefix = context.getSessionName();
advertisedHostname =
context
.getConfig()
.getDefaultProfile()
.getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME_OPTION);
context.getConfig().getDefaultProfile().getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME);
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package com.datastax.oss.driver.internal.core.addresstranslation;

import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;

class Subnet {
private final byte[] subnet;
private final byte[] networkMask;
private final byte[] upper;
private final byte[] lower;

private Subnet(byte[] subnet, byte[] networkMask) {
this.subnet = subnet;
this.networkMask = networkMask;

byte[] upper = new byte[subnet.length];
byte[] lower = new byte[subnet.length];
for (int i = 0; i < subnet.length; i++) {
upper[i] = (byte) (subnet[i] | ~networkMask[i]);
lower[i] = (byte) (subnet[i] & networkMask[i]);
}
this.upper = upper;
this.lower = lower;
}

static Subnet parse(String subnetCIDR) throws UnknownHostException {
String[] parts = subnetCIDR.split("/");
if (parts.length != 2) {
throw new IllegalArgumentException("Invalid subnet: " + subnetCIDR);
}

boolean isIPv6 = parts[0].contains(":");
byte[] subnet = InetAddress.getByName(parts[0]).getAddress();
if (isIPv4(subnet) && isIPv6) {
subnet = toIPv6(subnet);
}
int prefixLength = Integer.parseInt(parts[1]);
validatePrefixLength(subnet, prefixLength);

byte[] networkMask = toNetworkMask(subnet, prefixLength);
validateSubnetIsPrefixBlock(subnet, networkMask, subnetCIDR);
return new Subnet(subnet, networkMask);
}

private static byte[] toNetworkMask(byte[] subnet, int prefixLength) {
int fullBytes = prefixLength / 8;
int remainingBits = prefixLength % 8;
byte[] mask = new byte[subnet.length];
Arrays.fill(mask, 0, fullBytes, (byte) 0xFF);
if (remainingBits > 0) {
mask[fullBytes] = (byte) (0xFF << (8 - remainingBits));
}
return mask;
}

private static void validatePrefixLength(byte[] subnet, int prefixLength) {
int max_prefix_length = subnet.length * 8;
if (prefixLength < 0 || max_prefix_length < prefixLength) {
throw new IllegalArgumentException(
String.format(
"Prefix length %s must be within [0; %s]", prefixLength, max_prefix_length));
}
}

private static void validateSubnetIsPrefixBlock(
byte[] subnet, byte[] networkMask, String subnetCIDR) {
byte[] prefixBlock = toPrefixBlock(subnet, networkMask);
if (!Arrays.equals(subnet, prefixBlock)) {
throw new IllegalArgumentException(
String.format("Subnet %s must be represented as a network prefix block", subnetCIDR));
}
}

private static byte[] toPrefixBlock(byte[] subnet, byte[] networkMask) {
byte[] prefixBlock = new byte[subnet.length];
for (int i = 0; i < subnet.length; i++) {
prefixBlock[i] = (byte) (subnet[i] & networkMask[i]);
}
return prefixBlock;
}

@VisibleForTesting
byte[] getSubnet() {
return Arrays.copyOf(subnet, subnet.length);
}

@VisibleForTesting
byte[] getNetworkMask() {
return Arrays.copyOf(networkMask, networkMask.length);
}

byte[] getUpper() {
return Arrays.copyOf(upper, upper.length);
}

byte[] getLower() {
return Arrays.copyOf(lower, lower.length);
}

boolean isIPv4() {
return isIPv4(subnet);
}

boolean isIPv6() {
return isIPv6(subnet);
}

boolean contains(byte[] ip) {
if (isIPv4() && !isIPv4(ip)) {
return false;
}
if (isIPv6() && isIPv4(ip)) {
ip = toIPv6(ip);
}
if (subnet.length != ip.length) {
throw new IllegalArgumentException(
"IP version is unknown: " + Arrays.toString(toZeroBasedByteArray(ip)));
}
for (int i = 0; i < subnet.length; i++) {
if (subnet[i] != (byte) (ip[i] & networkMask[i])) {
return false;
}
}
return true;
}

private static boolean isIPv4(byte[] ip) {
return ip.length == 4;
}

private static boolean isIPv6(byte[] ip) {
return ip.length == 16;
}

private static byte[] toIPv6(byte[] ipv4) {
byte[] ipv6 = new byte[16];
ipv6[10] = (byte) 0xFF;
ipv6[11] = (byte) 0xFF;
System.arraycopy(ipv4, 0, ipv6, 12, 4);
return ipv6;
}

@Override
public String toString() {
return Arrays.toString(toZeroBasedByteArray(subnet));
}

private static int[] toZeroBasedByteArray(byte[] bytes) {
int[] res = new int[bytes.length];
for (int i = 0; i < bytes.length; i++) {
res[i] = bytes[i] & 0xFF;
}
return res;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.datastax.oss.driver.internal.core.addresstranslation;

import java.net.InetSocketAddress;
import java.net.UnknownHostException;

class SubnetAddress {
private final Subnet subnet;
private final InetSocketAddress address;

SubnetAddress(String subnetCIDR, InetSocketAddress address) {
try {
this.subnet = Subnet.parse(subnetCIDR);
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
this.address = address;
}

InetSocketAddress getAddress() {
return this.address;
}

boolean isOverlapping(SubnetAddress other) {
Subnet thisSubnet = this.subnet;
Subnet otherSubnet = other.subnet;
return thisSubnet.contains(otherSubnet.getLower())
|| thisSubnet.contains(otherSubnet.getUpper())
|| otherSubnet.contains(thisSubnet.getLower())
|| otherSubnet.contains(thisSubnet.getUpper());
}

boolean contains(InetSocketAddress address) {
return subnet.contains(address.getAddress().getAddress());
}

boolean isIPv4() {
return subnet.isIPv4();
}

boolean isIPv6() {
return subnet.isIPv6();
}

@Override
public String toString() {
return "SubnetAddress[subnet=" + subnet + ", address=" + address + "]";
}
}
Loading