Skip to content

Commit

Permalink
feat: limitless plugin (#219)
Browse files Browse the repository at this point in the history
Co-authored-by: sergiyv-bitquill <[email protected]>
  • Loading branch information
sergiyvamz and sergiyv-improving authored Nov 8, 2024
1 parent c565c9c commit b7fda8f
Show file tree
Hide file tree
Showing 54 changed files with 1,203 additions and 94 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,4 @@ build/**/*
/benchmark/*

*/.DS_Store
/.run/All unit tests.run.xml
2 changes: 1 addition & 1 deletion common/lib/abstract_connection_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export abstract class AbstractConnectionPlugin implements ConnectionPlugin {
return false;
}

getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo | undefined {
getHostInfoByStrategy(role: HostRole, strategy: string, hosts?: HostInfo[]): HostInfo | undefined {
throw new Error("getHostInfoByStrategy is not supported by this plugin.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ export class AwsSecretsManagerPluginFactory extends ConnectionPluginFactory {
return new AwsSecretsManagerPluginFactory.awsSecretsManagerPlugin.AwsSecretsManagerPlugin(pluginService, new Map(properties));
} catch (error: any) {
if (error.code === "MODULE_NOT_FOUND") {
logger.error(error);
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", "AwsSecretsManagerPlugin"));
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "AwsSecretsManagerPlugin"));
}
throw error;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ export class IamAuthenticationPluginFactory extends ConnectionPluginFactory {
}
return new IamAuthenticationPluginFactory.iamAuthenticationPlugin.IamAuthenticationPlugin(pluginService);
} catch (error: any) {
logger.error(error);
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", "IamAuthenticationPlugin"));
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "IamAuthenticationPlugin"));
}
}
}
2 changes: 1 addition & 1 deletion common/lib/connection_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ export interface ConnectionPlugin {

acceptsStrategy(role: HostRole, strategy: string): boolean;

getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo | undefined;
getHostInfoByStrategy(role: HostRole, strategy: string, hosts?: HostInfo[]): HostInfo | undefined;
}
2 changes: 2 additions & 0 deletions common/lib/connection_plugin_chain_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { AuroraConnectionTrackerPluginFactory } from "./plugins/connection_track
import { ConnectionProviderManager } from "./connection_provider_manager";
import { DeveloperConnectionPluginFactory } from "./plugins/dev/developer_connection_plugin_factory";
import { ConnectionPluginFactory } from "./plugin_factory";
import { LimitlessConnectionPluginFactory } from "./plugins/limitless/limitless_connection_plugin_factory";

/*
Type alias used for plugin factory sorting. It holds a reference to a plugin
Expand All @@ -56,6 +57,7 @@ export class ConnectionPluginChainBuilder {
["readWriteSplitting", { factory: ReadWriteSplittingPluginFactory, weight: 600 }],
["failover", { factory: FailoverPluginFactory, weight: 700 }],
["efm", { factory: HostMonitoringPluginFactory, weight: 800 }],
["limitless", { factory: LimitlessConnectionPluginFactory, weight: 950 }],
["iam", { factory: IamAuthenticationPluginFactory, weight: 1000 }],
["secretsManager", { factory: AwsSecretsManagerPluginFactory, weight: 1100 }],
["federatedAuth", { factory: FederatedAuthPluginFactory, weight: 1200 }],
Expand Down
11 changes: 10 additions & 1 deletion common/lib/database_dialect/database_dialect_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { RdsUtils } from "../utils/rds_utils";
import { logger } from "../../logutils";
import { CacheMap } from "../utils/cache_map";
import { ClientWrapper } from "../client_wrapper";
import { RdsUrlType } from "../utils/rds_url_type";

export class DatabaseDialectManager implements DatabaseDialectProvider {
/**
Expand Down Expand Up @@ -116,6 +117,14 @@ export class DatabaseDialectManager implements DatabaseDialectProvider {

if (this.dbType === DatabaseType.POSTGRES) {
const type = this.rdsHelper.identifyRdsType(host);
if (type === RdsUrlType.RDS_AURORA_LIMITLESS_DB_SHARD_GROUP) {
this.canUpdate = false;
this.dialectCode = DatabaseDialectCodes.AURORA_PG;
this.dialect = <DatabaseDialect>this.knownDialectsByCode.get(DatabaseDialectCodes.AURORA_PG);
this.logCurrentDialect();
return this.dialect;
}

if (type.isRdsCluster) {
this.canUpdate = true;
this.dialectCode = DatabaseDialectCodes.AURORA_PG;
Expand Down Expand Up @@ -178,6 +187,6 @@ export class DatabaseDialectManager implements DatabaseDialectProvider {
}

logCurrentDialect() {
logger.info(`Current dialect: ${this.dialectCode}, ${this.dialect.getDialectName()}, canUpdate: ${this.canUpdate}`);
logger.debug(`Current dialect: ${this.dialectCode}, ${this.dialect.getDialectName()}, canUpdate: ${this.canUpdate}`);
}
}
19 changes: 19 additions & 0 deletions common/lib/database_dialect/limitless_database_dialect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

export interface LimitlessDatabaseDialect {
getLimitlessRoutersQuery(): string;
}
4 changes: 2 additions & 2 deletions common/lib/driver_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { HostRole } from "./host_role";
import { HostInfo } from "./host_info";
import { HostSelector } from "./host_selector";
import { RandomHostSelector } from "./random_host_selector";
import { AwsWrapperError } from "./utils/errors";
import { UnsupportedStrategyError } from "./utils/errors";
import { WrapperProperties } from "./wrapper_property";
import { Messages } from "./utils/messages";
import { RdsUtils } from "./utils/rds_utils";
Expand Down Expand Up @@ -106,7 +106,7 @@ export class DriverConnectionProvider implements ConnectionProvider {
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {
const acceptedStrategy = DriverConnectionProvider.acceptedStrategies.get(strategy);
if (!acceptedStrategy) {
throw new AwsWrapperError(Messages.get("ConnectionProvider.unsupportedHostSelectorStrategy", strategy, "DriverConnectionProvider"));
throw new UnsupportedStrategyError(Messages.get("ConnectionProvider.unsupportedHostSelectorStrategy", strategy, "DriverConnectionProvider"));
}
return acceptedStrategy.getHost(hosts, role, props);
}
Expand Down
38 changes: 38 additions & 0 deletions common/lib/highest_weight_host_selector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { HostSelector } from "./host_selector";
import { HostInfo } from "./host_info";
import { HostRole } from "./host_role";
import { AwsWrapperError } from "./utils/errors";
import { HostAvailability } from "./host_availability/host_availability";
import { Messages } from "./utils/messages";

export class HighestWeightHostSelector implements HostSelector {
static STRATEGY_NAME = "highestWeight";

getHost(hosts: HostInfo[], role: HostRole, props?: Map<string, any>): HostInfo {
const eligibleHosts: HostInfo[] = hosts
.filter((host: HostInfo) => host.role === role && host.availability === HostAvailability.AVAILABLE)
.sort((hostA: HostInfo, hostB: HostInfo) => (hostA.weight > hostB.weight ? -1 : hostA.weight < hostB.weight ? 1 : 0));

if (eligibleHosts.length === 0) {
throw new AwsWrapperError(Messages.get("HostSelector.noHostsMatchingRole", role));
}

return eligibleHosts[0];
}
}
14 changes: 12 additions & 2 deletions common/lib/host_info.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,26 @@ export class HostInfo {
}

get url() {
let url = this.isPortSpecified() ? `${this.host}:${this.port}` : this.host;
let url = this.hostAndPort;
if (!url.endsWith("/")) {
url += "/";
}

return url;
}

get hostAndPort() {
return this.isPortSpecified() ? `${this.host}:${this.port}` : this.host;
}

equals(other: HostInfo): boolean {
return this.port === other.port && this.availability === other.availability && this.role === other.role && this.weight === other.weight;
return (
this.host === other.host &&
this.port === other.port &&
this.availability === other.availability &&
this.role === other.role &&
this.weight === other.weight
);
}

getAvailability(): HostAvailability {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,8 @@ export class ConnectionStringHostListProvider implements StaticHostListProvider
getHostProviderType(): string {
return this.constructor.name;
}

getClusterId(): string {
throw new AwsWrapperError("ConnectionStringHostListProvider does not support getClusterId.");
}
}
2 changes: 2 additions & 0 deletions common/lib/host_list_provider/host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ export interface HostListProvider {
createHost(host: string, isWriter: boolean, weight: number, lastUpdateTime: number, port?: number): HostInfo;

getHostProviderType(): string;

getClusterId(): string;
}
21 changes: 13 additions & 8 deletions common/lib/host_list_provider/rds_host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export class RdsHostListProvider implements DynamicHostListProvider {
// identification
this.clusterId = this.initialHost.url;
} else if (this.rdsUrlType.isRds) {
const clusterSuggestedResult: ClusterSuggestedResult | null = this.getSuggestedClusterId(this.initialHost.host);
const clusterSuggestedResult: ClusterSuggestedResult | null = this.getSuggestedClusterId(this.initialHost.hostAndPort);
if (clusterSuggestedResult && clusterSuggestedResult.clusterId) {
this.clusterId = clusterSuggestedResult.clusterId;
this.isPrimaryClusterId = clusterSuggestedResult.isPrimaryClusterId;
Expand Down Expand Up @@ -215,17 +215,17 @@ export class RdsHostListProvider implements DynamicHostListProvider {
}
}

private getSuggestedClusterId(host: string): ClusterSuggestedResult | null {
private getSuggestedClusterId(hostAndPort: string): ClusterSuggestedResult | null {
for (const [key, hosts] of RdsHostListProvider.topologyCache.getEntries()) {
const isPrimaryCluster: boolean = RdsHostListProvider.primaryClusterIdCache.get(key, false, this.suggestedClusterIdRefreshRateNano) ?? false;
if (key === host) {
return new ClusterSuggestedResult(host, isPrimaryCluster);
if (key === hostAndPort) {
return new ClusterSuggestedResult(hostAndPort, isPrimaryCluster);
}

if (hosts) {
for (const hostInfo of hosts) {
if (hostInfo.host === host) {
logger.debug(Messages.get("RdsHostListProvider.suggestedClusterId", key, host));
if (hostInfo.hostAndPort === hostAndPort) {
logger.debug(Messages.get("RdsHostListProvider.suggestedClusterId", key, hostAndPort));
return new ClusterSuggestedResult(key, isPrimaryCluster);
}
}
Expand Down Expand Up @@ -296,11 +296,11 @@ export class RdsHostListProvider implements DynamicHostListProvider {
const writerCount: number = writers.length;
if (writerCount === 0) {
hosts = [];
} else if (writers[0]) {
} else if (writerCount === 1) {
hosts.push(writers[0]);
} else {
const sortedWriters: HostInfo[] = writers.sort((a, b) => {
return a.lastUpdateTime - b.lastUpdateTime;
return b.lastUpdateTime - a.lastUpdateTime; // reverse order
});

hosts.push(sortedWriters[0]);
Expand Down Expand Up @@ -383,6 +383,11 @@ export class RdsHostListProvider implements DynamicHostListProvider {
getHostProviderType(): string {
return this.constructor.name;
}

getClusterId(): string {
this.init();
return this.clusterId;
}
}

export class FetchTopologyResult {
Expand Down
4 changes: 2 additions & 2 deletions common/lib/plugin_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,15 @@ export class PluginManager {
return false;
}

getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo {
getHostInfoByStrategy(role: HostRole, strategy: string, hosts?: HostInfo[]): HostInfo {
for (const plugin of this._plugins) {
const pluginSubscribedMethods = plugin.getSubscribedMethods();
const isSubscribed =
pluginSubscribedMethods.has(PluginManager.ALL_METHODS) || pluginSubscribedMethods.has(PluginManager.GET_HOST_INFO_BY_STRATEGY_METHOD);

if (isSubscribed) {
try {
const host = plugin.getHostInfoByStrategy(role, strategy);
const host = plugin.getHostInfoByStrategy(role, strategy, hosts);
if (host) {
return host;
}
Expand Down
4 changes: 2 additions & 2 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ export class PluginService implements ErrorHandler, HostListProviderService {
this._initialConnectionHostInfo = initialConnectionHostInfo;
}

getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo | undefined {
getHostInfoByStrategy(role: HostRole, strategy: string, hosts?: HostInfo[]): HostInfo | undefined {
const pluginManager = this.pluginServiceManagerContainer.pluginManager;
return pluginManager?.getHostInfoByStrategy(role, strategy);
return pluginManager?.getHostInfoByStrategy(role, strategy, hosts);
}

getCurrentHostInfo(): HostInfo | null {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ export class AuroraInitialConnectionStrategyFactory extends ConnectionPluginFact
}
return new AuroraInitialConnectionStrategyFactory.auroraInitialConnectionStrategyPlugin.AuroraInitialConnectionStrategyPlugin(pluginService);
} catch (error: any) {
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", "AuroraInitialConnectionStrategyPlugin"));
throw new AwsWrapperError(
Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "AuroraInitialConnectionStrategyPlugin")
);
}
}
}
3 changes: 1 addition & 2 deletions common/lib/plugins/connect_time_plugin_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ export class ConnectTimePluginFactory extends ConnectionPluginFactory {
}
return new ConnectTimePluginFactory.connectTimePlugin.ConnectTimePlugin();
} catch (error: any) {
logger.error(error.message);
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", "ConnectTimePlugin"));
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "ConnectTimePlugin"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ export class AuroraConnectionTrackerPluginFactory extends ConnectionPluginFactor
}
return new AuroraConnectionTrackerPluginFactory.auroraConnectionTrackerPlugin.AuroraConnectionTrackerPlugin(pluginService);
} catch (error: any) {
logger.error(error.message);
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", "AuroraConnectionTrackerPlugin"));
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "AuroraConnectionTrackerPlugin"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class OpenedConnectionTracker {

const instanceEndpoint = [...aliases]
.filter((x) => OpenedConnectionTracker.rdsUtils.isRdsInstance(OpenedConnectionTracker.rdsUtils.removePort(x)))
.reduce((max, s) => (s > max ? s : max));
.reduce((max, s) => (s > max ? s : max), "");

if (!instanceEndpoint) {
logger.debug(Messages.get("OpenedConnectionTracker.unableToPopulateOpenedConnectionQueue", hostInfo.host));
Expand Down
7 changes: 5 additions & 2 deletions common/lib/plugins/default_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,15 @@ export class DefaultPlugin extends AbstractConnectionPlugin {
return this.connectionProviderManager.acceptsStrategy(role, strategy);
}

override getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo {
override getHostInfoByStrategy(role: HostRole, strategy: string, hosts?: HostInfo[]): HostInfo {
if (role === HostRole.UNKNOWN) {
throw new AwsWrapperError(Messages.get("DefaultConnectionPlugin.unknownRoleRequested"));
}

const hosts = this.pluginService.getHosts();
if (!hosts) {
hosts = this.pluginService.getHosts();
}

if (hosts.length < 1) {
throw new AwsWrapperError(Messages.get("DefaultConnectionPlugin.noHostsAvailable"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ export class DeveloperConnectionPluginFactory extends ConnectionPluginFactory {
}
return new DeveloperConnectionPluginFactory.developerPlugin.DeveloperConnectionPlugin(pluginService, properties, new RdsUtils());
} catch (error: any) {
logger.error(error);
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", "DeveloperConnectionPlugin"));
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "DeveloperConnectionPlugin"));
}
}
}
3 changes: 1 addition & 2 deletions common/lib/plugins/efm/host_monitoring_plugin_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ export class HostMonitoringPluginFactory extends ConnectionPluginFactory {
}
return new HostMonitoringPluginFactory.hostMonitoringPlugin.HostMonitoringConnectionPlugin(pluginService, properties, new RdsUtils());
} catch (error: any) {
logger.error(error.message);
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", "HostMonitoringPlugin"));
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "HostMonitoringPlugin"));
}
}
}
3 changes: 1 addition & 2 deletions common/lib/plugins/execute_time_plugin_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ export class ExecuteTimePluginFactory extends ConnectionPluginFactory {
}
return new ExecuteTimePluginFactory.executeTimePlugin.ExecuteTimePlugin();
} catch (error: any) {
logger.error(error.message);
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", "ExecuteTimePlugin"));
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "ExecuteTimePlugin"));
}
}
}
3 changes: 1 addition & 2 deletions common/lib/plugins/failover/failover_plugin_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ export class FailoverPluginFactory extends ConnectionPluginFactory {
}
return new FailoverPluginFactory.failoverPlugin.FailoverPlugin(pluginService, properties, new RdsUtils());
} catch (error: any) {
logger.error(error.message);
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", "FailoverPlugin"));
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "FailoverPlugin"));
}
}
}
Loading

0 comments on commit b7fda8f

Please sign in to comment.