Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: issues with custom domains during failover #1265

Merged
merged 4 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import software.amazon.jdbc.Driver;
import software.amazon.jdbc.HostSpec;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.PropertyDefinition;
import software.amazon.jdbc.util.CacheMap;
import software.amazon.jdbc.util.ConnectionUrlParser;
import software.amazon.jdbc.util.Messages;
Expand Down Expand Up @@ -81,7 +82,11 @@ public class DialectManager implements DialectProvider {
private Dialect dialect = null;
private String dialectCode;

private PluginService pluginService;
private final PluginService pluginService;

static {
PropertyDefinition.registerPluginProperties(DialectManager.class);
}

public DialectManager(PluginService pluginService) {
this.pluginService = pluginService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.HostRole;
Expand All @@ -43,17 +45,24 @@ public class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl

private static final Logger LOGGER = Logger.getLogger(AuroraConnectionTrackerPlugin.class.getName());

// Check topology changes 3 min after last failover
private static final long TOPOLOGY_CHANGES_EXPECTED_TIME_MS = TimeUnit.MINUTES.toNanos(3);
sergiyvamz marked this conversation as resolved.
Show resolved Hide resolved

static final String METHOD_ABORT = "Connection.abort";
static final String METHOD_CLOSE = "Connection.close";
private static final Set<String> subscribedMethods =
Collections.unmodifiableSet(new HashSet<String>() {
{
addAll(SubscribedMethodHelper.NETWORK_BOUND_METHODS);
add(METHOD_CLOSE);
add(METHOD_ABORT);
add("connect");
add("notifyNodeListChanged");
}
});

private static final AtomicLong hostListRefreshThresholdTimeNano = new AtomicLong(0);
sergiyvamz marked this conversation as resolved.
Show resolved Hide resolved
sergiyvamz marked this conversation as resolved.
Show resolved Hide resolved

private final PluginService pluginService;
private final RdsUtils rdsHelper;
private final OpenedConnectionTracker tracker;
Expand Down Expand Up @@ -87,7 +96,7 @@ public Connection connect(final String driverProtocol, final HostSpec hostSpec,

if (conn != null) {
final RdsUrlType type = this.rdsHelper.identifyRdsType(hostSpec.getHost());
if (type.isRdsCluster()) {
if (type.isRdsCluster() || type == RdsUrlType.OTHER) {
hostSpec.resetAliases();
this.pluginService.fillAliases(conn, hostSpec);
}
Expand All @@ -106,23 +115,50 @@ public <T, E extends Exception> T execute(final Class<T> resultClass, final Clas
this.rememberWriter();

try {
if (!methodName.equals(METHOD_CLOSE) && !methodName.equals(METHOD_ABORT)) {
long localHostListRefreshThresholdTimeNano = hostListRefreshThresholdTimeNano.get();
boolean needRefreshHostLists = false;
if (localHostListRefreshThresholdTimeNano > 0) {
if (localHostListRefreshThresholdTimeNano < System.nanoTime()) {
sergiyvamz marked this conversation as resolved.
Show resolved Hide resolved
// The time specified in hostListRefreshThresholdTimeNano isn't yet reached.
// Need to continue to refresh host list.
needRefreshHostLists = true;
} else {
// The time specified in hostListRefreshThresholdTimeNano is reached, and we can stop further refreshes
// of host list. If hostListRefreshThresholdTimeNano has changed while this thread processes the code,
// we can't override a new value in hostListRefreshThresholdTimeNano.
hostListRefreshThresholdTimeNano.compareAndSet(localHostListRefreshThresholdTimeNano, 0);
}
}
if (this.needUpdateCurrentWriter || needRefreshHostLists) {
// Calling this method may effectively close/abort a current connection
this.checkWriterChanged(needRefreshHostLists);
}
}
final T result = jdbcMethodFunc.call();
if ((methodName.equals(METHOD_CLOSE) || methodName.equals(METHOD_ABORT))) {
tracker.invalidateCurrentConnection(currentHostSpec, this.pluginService.getCurrentConnection());
} else if (this.needUpdateCurrentWriter) {
this.checkWriterChanged();
tracker.removeConnectionTracking(currentHostSpec, this.pluginService.getCurrentConnection());
}
return result;

} catch (final Exception e) {
if (e instanceof FailoverSQLException) {
this.checkWriterChanged();
hostListRefreshThresholdTimeNano.set(System.nanoTime() + TOPOLOGY_CHANGES_EXPECTED_TIME_MS);
// Calling this method may effectively close/abort a current connection
this.checkWriterChanged(true);
}
throw e;
}
}

private void checkWriterChanged() {
private void checkWriterChanged(boolean needRefreshHostLists) {
if (needRefreshHostLists) {
try {
this.pluginService.refreshHostList();
} catch (SQLException ex) {
// do nothing
}
}
final HostSpec hostSpecAfterFailover = this.getWriter(this.pluginService.getAllHosts());

if (this.currentWriter == null) {
Expand All @@ -135,6 +171,7 @@ private void checkWriterChanged() {
tracker.logOpenedConnections();
this.currentWriter = hostSpecAfterFailover;
this.needUpdateCurrentWriter = false;
hostListRefreshThresholdTimeNano.set(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -67,16 +69,47 @@
"1000",
"Time between each retry of opening a connection.");

public static final AwsWrapperProperty VERIFY_OPENED_CONNECTION_TYPE =
new AwsWrapperProperty(
"verifyOpenedConnectionType",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add this property to UsingTheAuroraInitialConnectionStrategyPlugin.md?

null,
"Force to verify an opened connection to be either a writer or a reader.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Force to verify an opened connection to be either a writer or a reader.");
"Specifies whether the role for opened connection should be verified. Set to 'writer' to verify that the connection opened is to a writer, and 'reader' to verify that the connection opened is to a reader.");


private enum VerifyOpenedConnectionType {
WRITER,
READER;

private static final Map<String, VerifyOpenedConnectionType> nameToValue =
new HashMap<String, VerifyOpenedConnectionType>() {
{
put("writer", WRITER);
put("reader", READER);
}
};

public static VerifyOpenedConnectionType fromValue(String value) {
if (value == null) {
return null;
}
return nameToValue.get(value.toLowerCase());
}
}

private final PluginService pluginService;
private HostListProviderService hostListProviderService;
private final RdsUtils rdsUtils = new RdsUtils();

private VerifyOpenedConnectionType verifyOpenedConnectionType = null;

Check warning on line 102 in wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraInitialConnectionStrategyPlugin.java

View workflow job for this annotation

GitHub Actions / Qodana Community for JVM

Unused assignment

Variable `verifyOpenedConnectionType` initializer `null` is redundant

sergiyvamz marked this conversation as resolved.
Show resolved Hide resolved

static {
PropertyDefinition.registerPluginProperties(AuroraInitialConnectionStrategyPlugin.class);
}

public AuroraInitialConnectionStrategyPlugin(final PluginService pluginService, final Properties properties) {
this.pluginService = pluginService;
this.verifyOpenedConnectionType =
VerifyOpenedConnectionType.fromValue(VERIFY_OPENED_CONNECTION_TYPE.getString(properties));
}

@Override
Expand Down Expand Up @@ -110,12 +143,8 @@

final RdsUrlType type = this.rdsUtils.identifyRdsType(hostSpec.getHost());

if (!type.isRdsCluster()) {
// It's not a cluster endpoint. Continue with a normal workflow.
return connectFunc.call();
}

if (type == RdsUrlType.RDS_WRITER_CLUSTER) {
if (type == RdsUrlType.RDS_WRITER_CLUSTER
|| isInitialConnection && this.verifyOpenedConnectionType == VerifyOpenedConnectionType.WRITER) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need an extra set of brackets, otherwise we will still verify when using a writer cluster and it is not an initial connection

Suggested change
|| isInitialConnection && this.verifyOpenedConnectionType == VerifyOpenedConnectionType.WRITER) {
|| (isInitialConnection && this.verifyOpenedConnectionType == VerifyOpenedConnectionType.WRITER)) {

Connection writerCandidateConn = this.getVerifiedWriterConnection(props, isInitialConnection, connectFunc);
if (writerCandidateConn == null) {
// Can't get writer connection. Continue with a normal workflow.
Expand All @@ -124,7 +153,8 @@
return writerCandidateConn;
}

if (type == RdsUrlType.RDS_READER_CLUSTER) {
if (type == RdsUrlType.RDS_READER_CLUSTER
|| isInitialConnection && this.verifyOpenedConnectionType == VerifyOpenedConnectionType.READER) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|| isInitialConnection && this.verifyOpenedConnectionType == VerifyOpenedConnectionType.READER) {
|| (isInitialConnection && this.verifyOpenedConnectionType == VerifyOpenedConnectionType.READER)) {

Connection readerCandidateConn = this.getVerifiedReaderConnection(props, isInitialConnection, connectFunc);
if (readerCandidateConn == null) {
// Can't get a reader connection. Continue with a normal workflow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
Expand All @@ -35,6 +36,7 @@
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.RdsUtils;
import software.amazon.jdbc.util.StringUtils;
import software.amazon.jdbc.util.SynchronousExecutor;
import software.amazon.jdbc.util.telemetry.TelemetryContext;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
import software.amazon.jdbc.util.telemetry.TelemetryTraceLevel;
Expand All @@ -50,16 +52,17 @@ public class OpenedConnectionTracker {
invalidateThread.setDaemon(true);
return invalidateThread;
});
private static final ExecutorService abortConnectionExecutorService =
Executors.newCachedThreadPool(
r -> {
final Thread abortThread = new Thread(r);
abortThread.setDaemon(true);
return abortThread;
});
private static final Executor abortConnectionExecutor = new SynchronousExecutor();
sergiyvamz marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger LOGGER = Logger.getLogger(OpenedConnectionTracker.class.getName());
private static final RdsUtils rdsUtils = new RdsUtils();

private static final Set<String> safeToCheckClosedClasses = new HashSet<>(Arrays.asList(
"HikariProxyConnection",
"org.postgresql.jdbc.PgConnection",
"com.mysql.cj.jdbc.ConnectionImpl",
"org.mariadb.jdbc.Connection"));

private final PluginService pluginService;

public OpenedConnectionTracker(final PluginService pluginService) {
Expand All @@ -72,6 +75,7 @@ public void populateOpenedConnectionQueue(final HostSpec hostSpec, final Connect
// Check if the connection was established using an instance endpoint
if (rdsUtils.isRdsInstance(hostSpec.getHost())) {
trackConnection(hostSpec.getHostAndPort(), conn);
logOpenedConnections();
return;
}

Expand All @@ -80,14 +84,17 @@ public void populateOpenedConnectionQueue(final HostSpec hostSpec, final Connect
.max(String::compareToIgnoreCase)
.orElse(null);

if (instanceEndpoint == null) {
LOGGER.finest(
Messages.get("OpenedConnectionTracker.unableToPopulateOpenedConnectionQueue",
new Object[] {hostSpec.getHost()}));
if (instanceEndpoint != null) {
trackConnection(instanceEndpoint, conn);
logOpenedConnections();
return;
}

trackConnection(instanceEndpoint, conn);
// It seems there's no RDS instance host found. It might be a custom domain name. Let's track by all aliases
for (String alias : aliases) {
trackConnection(alias, conn);
}
logOpenedConnections();
}

/**
Expand All @@ -100,28 +107,27 @@ public void invalidateAllConnections(final HostSpec hostSpec) {
invalidateAllConnections(hostSpec.getAliases().toArray(new String[] {}));
}

public void invalidateAllConnections(final String... node) {
public void invalidateAllConnections(final String... keys) {
TelemetryFactory telemetryFactory = this.pluginService.getTelemetryFactory();
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
TELEMETRY_INVALIDATE_CONNECTIONS, TelemetryTraceLevel.NESTED);

try {
final Optional<String> instanceEndpoint = Arrays.stream(node)
.filter(x -> rdsUtils.isRdsInstance(rdsUtils.removePort(x)))
.findFirst();
if (!instanceEndpoint.isPresent()) {
return;
for (String key : keys) {
try {
final Queue<WeakReference<Connection>> connectionQueue = openedConnections.get(key);
sergiyvamz marked this conversation as resolved.
Show resolved Hide resolved
logConnectionQueue(key, connectionQueue);
invalidateConnections(connectionQueue);
} catch (Exception ex) {
// ignore and continue
}
}
final Queue<WeakReference<Connection>> connectionQueue = openedConnections.get(instanceEndpoint.get());
logConnectionQueue(instanceEndpoint.get(), connectionQueue);
invalidateConnections(openedConnections.get(instanceEndpoint.get()));

} finally {
telemetryContext.closeContext();
}
}

public void invalidateCurrentConnection(final HostSpec hostSpec, final Connection connection) {
public void removeConnectionTracking(final HostSpec hostSpec, final Connection connection) {
final String host = rdsUtils.isRdsInstance(hostSpec.getHost())
? hostSpec.asAlias()
: hostSpec.getAliases().stream()
Expand All @@ -134,8 +140,11 @@ public void invalidateCurrentConnection(final HostSpec hostSpec, final Connectio
}

final Queue<WeakReference<Connection>> connectionQueue = openedConnections.get(host);
logConnectionQueue(host, connectionQueue);
connectionQueue.removeIf(connectionWeakReference -> Objects.equals(connectionWeakReference.get(), connection));
if (connectionQueue != null) {
logConnectionQueue(host, connectionQueue);
connectionQueue.removeIf(
connectionWeakReference -> Objects.equals(connectionWeakReference.get(), connection));
}
}

private void trackConnection(final String instanceEndpoint, final Connection connection) {
Expand All @@ -144,10 +153,12 @@ private void trackConnection(final String instanceEndpoint, final Connection con
instanceEndpoint,
(k) -> new ConcurrentLinkedQueue<>());
connectionQueue.add(new WeakReference<>(connection));
logOpenedConnections();
}

private void invalidateConnections(final Queue<WeakReference<Connection>> connectionQueue) {
if (connectionQueue == null || connectionQueue.isEmpty()) {
return;
}
invalidateConnectionsExecutorService.submit(() -> {
WeakReference<Connection> connReference;
while ((connReference = connectionQueue.poll()) != null) {
Expand All @@ -157,7 +168,7 @@ private void invalidateConnections(final Queue<WeakReference<Connection>> connec
}

try {
conn.abort(abortConnectionExecutorService);
conn.abort(abortConnectionExecutor);
} catch (final SQLException e) {
// swallow this exception, current connection should be useless anyway.
}
Expand Down Expand Up @@ -204,7 +215,10 @@ public void pruneNullConnections() {
if (conn == null) {
return true;
}
if (conn.getClass().getSimpleName().equals("HikariProxyConnection")) {
// The following classes do not check connection validity by calling a DB server
// so it's safe to check whether connection is already closed.
if (safeToCheckClosedClasses.contains(conn.getClass().getSimpleName())
|| safeToCheckClosedClasses.contains(conn.getClass().getName())) {
try {
return conn.isClosed();
} catch (SQLException ex) {
Expand Down
Loading
Loading