Skip to content

Commit b367f4e

Browse files
authored
Implement the CustomEndpointPlugin (#1122)
1 parent ed3bbe7 commit b367f4e

File tree

45 files changed

+2064
-157
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2064
-157
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ All notable changes to this project will be documented in this file.
33

44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/#semantic-versioning-200).
55

6+
### :magic_wand: Added
7+
- Custom Endpoint Plugin. See [UsingTheCustomEndpointPlugin.md](https://github.com/aws/aws-advanced-jdbc-wrapper/blob/main/docs/using-the-jdbc-driver/using-plugins/UsingTheCustomEndpointPlugin.md).
8+
69
### :bug: Fixed
710
- Use the cluster URL as the default cluster ID ([PR #1131](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1131)).
811
- Fix logic in SlidingExpirationCache and SlidingExpirationCacheWithCleanupThread ([PR #1142](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1142)).
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package software.amazon.jdbc;
18+
19+
import java.util.Collections;
20+
import java.util.Set;
21+
import org.checkerframework.checker.nullness.qual.Nullable;
22+
import software.amazon.jdbc.util.Utils;
23+
24+
/**
25+
* Represents the allowed and blocked hosts for connections.
26+
*/
27+
public class AllowedAndBlockedHosts {
28+
@Nullable private final Set<String> allowedHostIds;
29+
@Nullable private final Set<String> blockedHostIds;
30+
31+
/**
32+
* Constructs an AllowedAndBlockedHosts instance with the specified allowed and blocked host IDs.
33+
*
34+
* @param allowedHostIds The set of allowed host IDs for connections. If null or empty, all host IDs that are not in
35+
* {@code blockedHostIds} are allowed.
36+
* @param blockedHostIds The set of blocked host IDs for connections. If null or empty, all host IDs in
37+
* {@code allowedHostIds} are allowed. If {@code allowedHostIds} is also null or empty, there
38+
* are no restrictions on which hosts are allowed.
39+
*/
40+
public AllowedAndBlockedHosts(@Nullable Set<String> allowedHostIds, @Nullable Set<String> blockedHostIds) {
41+
this.allowedHostIds = Utils.isNullOrEmpty(allowedHostIds) ? null : Collections.unmodifiableSet(allowedHostIds);
42+
this.blockedHostIds = Utils.isNullOrEmpty(blockedHostIds) ? null : Collections.unmodifiableSet(blockedHostIds);
43+
}
44+
45+
/**
46+
* Returns the set of allowed host IDs for connections. If null or empty, all host IDs that are not in
47+
* {@code blockedHostIds} are allowed.
48+
*
49+
* @return the set of allowed host IDs for connections.
50+
*/
51+
@Nullable
52+
public Set<String> getAllowedHostIds() {
53+
return this.allowedHostIds;
54+
}
55+
56+
/**
57+
* Returns the set of blocked host IDs for connections. If null or empty, all host IDs in {@code allowedHostIds} are
58+
* allowed. If {@code allowedHostIds} is also null or empty, there are no restrictions on which hosts are allowed.
59+
*
60+
* @return the set of blocked host IDs for connections.
61+
*/
62+
@Nullable
63+
public Set<String> getBlockedHostIds() {
64+
return this.blockedHostIds;
65+
}
66+
}

wrapper/src/main/java/software/amazon/jdbc/ConnectionPluginChainBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import software.amazon.jdbc.plugin.DriverMetaDataConnectionPluginFactory;
3636
import software.amazon.jdbc.plugin.ExecutionTimeConnectionPluginFactory;
3737
import software.amazon.jdbc.plugin.LogQueryConnectionPluginFactory;
38+
import software.amazon.jdbc.plugin.customendpoint.CustomEndpointPluginFactory;
3839
import software.amazon.jdbc.plugin.dev.DeveloperConnectionPluginFactory;
3940
import software.amazon.jdbc.plugin.efm.HostMonitoringConnectionPluginFactory;
4041
import software.amazon.jdbc.plugin.failover.FailoverConnectionPluginFactory;
@@ -63,6 +64,7 @@ public class ConnectionPluginChainBuilder {
6364
put("executionTime", ExecutionTimeConnectionPluginFactory.class);
6465
put("logQuery", LogQueryConnectionPluginFactory.class);
6566
put("dataCache", DataCacheConnectionPluginFactory.class);
67+
put("customEndpoint", CustomEndpointPluginFactory.class);
6668
put("efm", HostMonitoringConnectionPluginFactory.class);
6769
put("efm2", software.amazon.jdbc.plugin.efm2.HostMonitoringConnectionPluginFactory.class);
6870
put("failover", FailoverConnectionPluginFactory.class);
@@ -93,6 +95,7 @@ public class ConnectionPluginChainBuilder {
9395
{
9496
put(DriverMetaDataConnectionPluginFactory.class, 100);
9597
put(DataCacheConnectionPluginFactory.class, 200);
98+
put(CustomEndpointPluginFactory.class, 380);
9699
put(AuroraInitialConnectionStrategyPluginFactory.class, 390);
97100
put(AuroraConnectionTrackerPluginFactory.class, 400);
98101
put(AuroraStaleDnsPluginFactory.class, 500);

wrapper/src/main/java/software/amazon/jdbc/ConnectionPluginManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import software.amazon.jdbc.plugin.DefaultConnectionPlugin;
3838
import software.amazon.jdbc.plugin.ExecutionTimeConnectionPlugin;
3939
import software.amazon.jdbc.plugin.LogQueryConnectionPlugin;
40+
import software.amazon.jdbc.plugin.customendpoint.CustomEndpointPlugin;
4041
import software.amazon.jdbc.plugin.efm.HostMonitoringConnectionPlugin;
4142
import software.amazon.jdbc.plugin.failover.FailoverConnectionPlugin;
4243
import software.amazon.jdbc.plugin.federatedauth.FederatedAuthPlugin;
@@ -50,6 +51,7 @@
5051
import software.amazon.jdbc.util.AsynchronousMethodsHelper;
5152
import software.amazon.jdbc.util.Messages;
5253
import software.amazon.jdbc.util.SqlMethodAnalyzer;
54+
import software.amazon.jdbc.util.Utils;
5355
import software.amazon.jdbc.util.WrapperUtils;
5456
import software.amazon.jdbc.util.telemetry.TelemetryContext;
5557
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
@@ -85,6 +87,7 @@ public class ConnectionPluginManager implements CanReleaseResources, Wrapper {
8587
put(FastestResponseStrategyPlugin.class, "plugin:fastestResponseStrategy");
8688
put(DefaultConnectionPlugin.class, "plugin:targetDriver");
8789
put(AuroraInitialConnectionStrategyPlugin.class, "plugin:initialConnection");
90+
put(CustomEndpointPlugin.class, "plugin:customEndpoint");
8891
}
8992
};
9093

@@ -493,7 +496,7 @@ public HostSpec getHostSpecByStrategy(List<HostSpec> hosts, HostRole role, Strin
493496

494497
if (isSubscribed) {
495498
try {
496-
final HostSpec host = hosts == null || hosts.isEmpty()
499+
final HostSpec host = Utils.isNullOrEmpty(hosts)
497500
? plugin.getHostSpecByStrategy(role, strategy)
498501
: plugin.getHostSpecByStrategy(hosts, role, strategy);
499502

wrapper/src/main/java/software/amazon/jdbc/PluginService.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,31 @@ EnumSet<NodeChangeOptions> setCurrentConnection(
6262
@Nullable ConnectionPlugin skipNotificationForThisPlugin)
6363
throws SQLException;
6464

65+
/**
66+
* Get host information for all hosts in the cluster.
67+
*
68+
* @return host information for all hosts in the cluster.
69+
*/
70+
List<HostSpec> getAllHosts();
71+
72+
/**
73+
* Get host information for allowed hosts in the cluster. Certain hosts in the cluster may be disallowed, and these
74+
* hosts will not be returned by this function. For example, if a custom endpoint is being used, hosts outside the
75+
* custom endpoint will not be returned.
76+
*
77+
* @return host information for allowed hosts in the cluster.
78+
*/
6579
List<HostSpec> getHosts();
6680

6781
HostSpec getInitialConnectionHostSpec();
6882

83+
/**
84+
* Set the collection of hosts that should be allowed and/or blocked for connections.
85+
*
86+
* @param allowedAndBlockedHosts An object defining the allowed and blocked sets of hosts.
87+
*/
88+
void setAllowedAndBlockedHosts(AllowedAndBlockedHosts allowedAndBlockedHosts);
89+
6990
/**
7091
* Returns a boolean indicating if the available {@link ConnectionProvider} or
7192
* {@link ConnectionPlugin} instances support the selection of a host with the requested role and

wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Set;
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.atomic.AtomicReference;
3435
import java.util.concurrent.locks.ReentrantLock;
3536
import java.util.logging.Logger;
3637
import java.util.stream.Collectors;
@@ -52,6 +53,7 @@
5253
import software.amazon.jdbc.targetdriverdialect.TargetDriverDialect;
5354
import software.amazon.jdbc.util.CacheMap;
5455
import software.amazon.jdbc.util.Messages;
56+
import software.amazon.jdbc.util.Utils;
5557
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
5658

5759
public class PluginServiceImpl implements PluginService, CanReleaseResources,
@@ -66,7 +68,8 @@ public class PluginServiceImpl implements PluginService, CanReleaseResources,
6668
private final String originalUrl;
6769
private final String driverProtocol;
6870
protected volatile HostListProvider hostListProvider;
69-
protected List<HostSpec> hosts = new ArrayList<>();
71+
protected List<HostSpec> allHosts = new ArrayList<>();
72+
protected AtomicReference<AllowedAndBlockedHosts> allowedAndBlockedHosts = new AtomicReference<>();
7073
protected Connection currentConnection;
7174
protected HostSpec currentHostSpec;
7275
protected HostSpec initialConnectionHostSpec;
@@ -162,10 +165,20 @@ public HostSpec getCurrentHostSpec() {
162165
this.currentHostSpec = this.initialConnectionHostSpec;
163166

164167
if (this.currentHostSpec == null) {
165-
if (this.getHosts().isEmpty()) {
168+
if (this.getAllHosts().isEmpty()) {
166169
throw new RuntimeException(Messages.get("PluginServiceImpl.hostListEmpty"));
167170
}
168-
this.currentHostSpec = this.getWriter(this.getHosts());
171+
172+
this.currentHostSpec = this.getWriter(this.getAllHosts());
173+
if (!this.getHosts().contains(this.currentHostSpec)) {
174+
throw new RuntimeException(
175+
Messages.get("PluginServiceImpl.currentHostNotAllowed",
176+
new Object[] {
177+
currentHostSpec == null ? "<null>" : currentHostSpec.getHost(),
178+
Utils.logTopology(this.getHosts(), "")})
179+
);
180+
}
181+
169182
if (this.currentHostSpec == null) {
170183
this.currentHostSpec = this.getHosts().get(0);
171184
}
@@ -187,6 +200,11 @@ public HostSpec getInitialConnectionHostSpec() {
187200
return this.initialConnectionHostSpec;
188201
}
189202

203+
@Override
204+
public void setAllowedAndBlockedHosts(AllowedAndBlockedHosts allowedAndBlockedHosts) {
205+
this.allowedAndBlockedHosts.set(allowedAndBlockedHosts);
206+
}
207+
190208
@Override
191209
public boolean acceptsStrategy(HostRole role, String strategy) throws SQLException {
192210
return this.pluginManager.acceptsStrategy(role, strategy);
@@ -364,9 +382,35 @@ protected EnumSet<NodeChangeOptions> compare(
364382
return changes;
365383
}
366384

385+
@Override
386+
public List<HostSpec> getAllHosts() {
387+
return this.allHosts;
388+
}
389+
367390
@Override
368391
public List<HostSpec> getHosts() {
369-
return this.hosts;
392+
AllowedAndBlockedHosts hostPermissions = this.allowedAndBlockedHosts.get();
393+
if (hostPermissions == null) {
394+
return this.allHosts;
395+
}
396+
397+
List<HostSpec> hosts = this.allHosts;
398+
Set<String> allowedHostIds = hostPermissions.getAllowedHostIds();
399+
Set<String> blockedHostIds = hostPermissions.getBlockedHostIds();
400+
401+
if (!Utils.isNullOrEmpty(allowedHostIds)) {
402+
hosts = hosts.stream()
403+
.filter((hostSpec -> allowedHostIds.contains(hostSpec.getHostId())))
404+
.collect(Collectors.toList());
405+
}
406+
407+
if (!Utils.isNullOrEmpty(blockedHostIds)) {
408+
hosts = hosts.stream()
409+
.filter((hostSpec -> !blockedHostIds.contains(hostSpec.getHostId())))
410+
.collect(Collectors.toList());
411+
}
412+
413+
return hosts;
370414
}
371415

372416
@Override
@@ -376,7 +420,7 @@ public void setAvailability(final @NonNull Set<String> hostAliases, final @NonNu
376420
return;
377421
}
378422

379-
final List<HostSpec> hostsToChange = this.getHosts().stream()
423+
final List<HostSpec> hostsToChange = this.getAllHosts().stream()
380424
.filter((host) -> hostAliases.contains(host.asAlias())
381425
|| host.getAliases().stream().anyMatch(hostAliases::contains))
382426
.distinct()
@@ -427,18 +471,18 @@ public HostListProvider getHostListProvider() {
427471
@Override
428472
public void refreshHostList() throws SQLException {
429473
final List<HostSpec> updatedHostList = this.getHostListProvider().refresh();
430-
if (!Objects.equals(updatedHostList, this.hosts)) {
474+
if (!Objects.equals(updatedHostList, this.allHosts)) {
431475
updateHostAvailability(updatedHostList);
432-
setNodeList(this.hosts, updatedHostList);
476+
setNodeList(this.allHosts, updatedHostList);
433477
}
434478
}
435479

436480
@Override
437481
public void refreshHostList(final Connection connection) throws SQLException {
438482
final List<HostSpec> updatedHostList = this.getHostListProvider().refresh(connection);
439-
if (!Objects.equals(updatedHostList, this.hosts)) {
483+
if (!Objects.equals(updatedHostList, this.allHosts)) {
440484
updateHostAvailability(updatedHostList);
441-
setNodeList(this.hosts, updatedHostList);
485+
setNodeList(this.allHosts, updatedHostList);
442486
}
443487
}
444488

@@ -447,7 +491,7 @@ public void forceRefreshHostList() throws SQLException {
447491
final List<HostSpec> updatedHostList = this.getHostListProvider().forceRefresh();
448492
if (updatedHostList != null) {
449493
updateHostAvailability(updatedHostList);
450-
setNodeList(this.hosts, updatedHostList);
494+
setNodeList(this.allHosts, updatedHostList);
451495
}
452496
}
453497

@@ -456,7 +500,7 @@ public void forceRefreshHostList(final Connection connection) throws SQLExceptio
456500
final List<HostSpec> updatedHostList = this.getHostListProvider().forceRefresh(connection);
457501
if (updatedHostList != null) {
458502
updateHostAvailability(updatedHostList);
459-
setNodeList(this.hosts, updatedHostList);
503+
setNodeList(this.allHosts, updatedHostList);
460504
}
461505
}
462506

@@ -476,7 +520,7 @@ public boolean forceRefreshHostList(final boolean shouldVerifyWriter, final long
476520
((BlockingHostListProvider) hostListProvider).forceRefresh(shouldVerifyWriter, timeoutMs);
477521
if (updatedHostList != null) {
478522
updateHostAvailability(updatedHostList);
479-
setNodeList(this.hosts, updatedHostList);
523+
setNodeList(this.allHosts, updatedHostList);
480524
return true;
481525
}
482526
} catch (TimeoutException ex) {
@@ -520,7 +564,7 @@ void setNodeList(@Nullable final List<HostSpec> oldHosts,
520564
}
521565

522566
if (!changes.isEmpty()) {
523-
this.hosts = newHosts != null ? newHosts : new ArrayList<>();
567+
this.allHosts = newHosts != null ? newHosts : new ArrayList<>();
524568
this.pluginManager.notifyNodeListChanged(changes);
525569
}
526570
}

wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsHostListProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ public List<HostSpec> refresh(final Connection connection) throws SQLException {
525525
: this.hostListProviderService.getCurrentConnection();
526526

527527
final FetchTopologyResult results = getTopology(currentConnection, false);
528-
LOGGER.finest(() -> Utils.logTopology(results.hosts, results.isCachedData ? "[From cache] " : ""));
528+
LOGGER.finest(() -> Utils.logTopology(results.hosts, results.isCachedData ? "[From cache] Topology:" : null));
529529

530530
this.hostList = results.hosts;
531531
return Collections.unmodifiableList(hostList);

wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraConnectionTrackerPlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public <T, E extends Exception> T execute(final Class<T> resultClass, final Clas
136136
}
137137

138138
private void checkWriterChanged() {
139-
final HostSpec hostSpecAfterFailover = this.getWriter(this.pluginService.getHosts());
139+
final HostSpec hostSpecAfterFailover = this.getWriter(this.pluginService.getAllHosts());
140140

141141
if (this.currentWriter == null) {
142142
this.currentWriter = hostSpecAfterFailover;
@@ -153,7 +153,7 @@ private void checkWriterChanged() {
153153

154154
private void rememberWriter() {
155155
if (this.currentWriter == null || this.needUpdateCurrentWriter) {
156-
this.currentWriter = this.getWriter(this.pluginService.getHosts());
156+
this.currentWriter = this.getWriter(this.pluginService.getAllHosts());
157157
this.needUpdateCurrentWriter = false;
158158
}
159159
}

wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraInitialConnectionStrategyPlugin.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ private void delay(final long delayMs) {
351351
}
352352

353353
private HostSpec getWriter() {
354-
for (final HostSpec host : this.pluginService.getHosts()) {
354+
for (final HostSpec host : this.pluginService.getAllHosts()) {
355355
if (host.getRole() == HostRole.WRITER) {
356356
return host;
357357
}
@@ -380,12 +380,12 @@ private HostSpec getReader(final Properties props) throws SQLException {
380380
}
381381

382382
private boolean hasNoReaders() {
383-
if (this.pluginService.getHosts().isEmpty()) {
383+
if (this.pluginService.getAllHosts().isEmpty()) {
384384
// Topology inconclusive/corrupted.
385385
return false;
386386
}
387387

388-
for (HostSpec hostSpec : this.pluginService.getHosts()) {
388+
for (HostSpec hostSpec : this.pluginService.getAllHosts()) {
389389
if (hostSpec.getRole() == HostRole.WRITER) {
390390
continue;
391391
}

0 commit comments

Comments
 (0)