Skip to content

Commit

Permalink
fix: connection tracker not pruning closed connections (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
karenc-bq authored Nov 15, 2024
1 parent fb1632d commit 5fe6443
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ import { HostRole } from "../../host_role";
import { OpenedConnectionTracker } from "./opened_connection_tracker";

export class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin implements CanReleaseResources {
static readonly METHOD_END: string = "end";
private static readonly subscribedMethods = new Set<string>(
["connect", "forceConnect", "notifyHostListChanged"].concat(SubscribedMethodHelper.NETWORK_BOUND_METHODS)
);
private static readonly subscribedMethods = new Set<string>(["notifyHostListChanged"].concat(SubscribedMethodHelper.NETWORK_BOUND_METHODS));

private readonly pluginService: PluginService;
private readonly rdsUtils: RdsUtils;
Expand Down Expand Up @@ -85,14 +82,11 @@ export class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl
}

override async execute<T>(methodName: string, methodFunc: () => Promise<T>, methodArgs: any[]): Promise<T> {
const currentHostInfo = this.pluginService.getCurrentHostInfo();
this.rememberWriter();

try {
const result = await methodFunc();
if (methodName === AuroraConnectionTrackerPlugin.METHOD_END) {
this.tracker.invalidateCurrentConnection(currentHostInfo, this.pluginService.getCurrentClient().targetClient!);
} else if (this.needUpdateCurrentWriter) {
if (this.needUpdateCurrentWriter) {
await this.checkWriterChanged();
}
return result;
Expand Down Expand Up @@ -131,7 +125,7 @@ export class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl
}

async notifyHostListChanged(changes: Map<string, Set<HostChangeOptions>>): Promise<void> {
for (const [key, value] of changes.entries()) {
for (const [key, _] of changes.entries()) {
const hostChanges = changes.get(key);
if (hostChanges) {
if (hostChanges.has(HostChangeOptions.PROMOTED_TO_READER)) {
Expand Down
19 changes: 4 additions & 15 deletions common/lib/plugins/connection_tracker/opened_connection_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,6 @@ export class OpenedConnectionTracker {
}
}

invalidateCurrentConnection(hostInfo: HostInfo | null, client: ClientWrapper): void {
const host = OpenedConnectionTracker.rdsUtils.isRdsInstance(hostInfo!.host)
? hostInfo!.asAlias
: [...hostInfo!.aliases].filter((x) => OpenedConnectionTracker.rdsUtils.removePort(x)).at(0);

if (!host) {
return;
}

const connectionQueue = OpenedConnectionTracker.openedConnections.get(host);
this.logConnectionQueue(host, connectionQueue!);
connectionQueue!.filter((x) => x.deref() !== client);
}

private trackConnection(instanceEndpoint: string, client: ClientWrapper): void {
const connectionQueue = MapUtils.computeIfAbsent(
OpenedConnectionTracker.openedConnections,
Expand Down Expand Up @@ -136,7 +122,10 @@ export class OpenedConnectionTracker {

pruneNullConnections(): void {
for (const [key, queue] of OpenedConnectionTracker.openedConnections) {
queue.filter((connWeakRef: WeakRef<ClientWrapper>) => connWeakRef);
OpenedConnectionTracker.openedConnections.set(
key,
queue.filter((connWeakRef: WeakRef<ClientWrapper>) => connWeakRef.deref())
);
}
}
}
2 changes: 1 addition & 1 deletion common/lib/plugins/efm/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ export class MonitorImpl implements Monitor {

logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`);
this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, monitoringConnProperties);
logger.debug(`Successfully opened monitoring connection to ${this.hostInfo.url}`);
logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`);
return Promise.resolve(new ConnectionStatus(true, this.getCurrentTimeNano() - startNanos));
} catch (error: any) {
this.instanceInvalidCounter.inc();
Expand Down
1 change: 1 addition & 0 deletions common/lib/plugins/failover/writer_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ export class ClusterAwareWriterFailoverHandler implements WriterFailoverHandler
result.isNewHost
? "ClusterAwareWriterFailoverHandler.successfullyConnectedToNewWriterInstance"
: "ClusterAwareWriterFailoverHandler.successfullyReconnectedToWriterInstance",
result.client.id,
newWriterHost
)
);
Expand Down
4 changes: 2 additions & 2 deletions common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
"ClusterAwareWriterFailoverHandler.failoverCalledWithInvalidTopology": "Failover was called with an invalid (null or empty) topology.",
"ClusterAwareWriterFailoverHandler.failedToConnectToWriterInstance": "Failed to connect to the writer instance.",
"ClusterAwareWriterFailoverHandler.successfulConnectionInvalidTopology": "'%s' successfully established a connection but doesn't contain a valid topology.",
"ClusterAwareWriterFailoverHandler.successfullyConnectedToNewWriterInstance": "Successfully connected to the new writer instance: '%s'",
"ClusterAwareWriterFailoverHandler.successfullyReconnectedToWriterInstance": "Successfully re-connected to the current writer instance: '%s'",
"ClusterAwareWriterFailoverHandler.successfullyConnectedToNewWriterInstance": "Successfully connected to the new writer instance: '%s'- '%s'",
"ClusterAwareWriterFailoverHandler.successfullyReconnectedToWriterInstance": "Successfully re-connected to the current writer instance: '%s'- '%s'",
"ClusterAwareWriterFailoverHandler.taskAAttemptReconnectToWriterInstance": "[TaskA] Attempting to re-connect to the current writer instance: '%s', with properties '%s'",
"ClusterAwareWriterFailoverHandler.taskAEncounteredException": "[TaskA] encountered an exception: '%s'",
"ClusterAwareWriterFailoverHandler.taskAFinished": "[TaskA] Finished",
Expand Down
4 changes: 2 additions & 2 deletions common/lib/utils/subscribed_method_helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/

export class SubscribedMethodHelper {
static readonly NETWORK_BOUND_METHODS: string[] = ["connect", "query", "rollback"];
static readonly METHODS_REQUIRING_UPDATED_TOPOLOGY: string[] = ["connect", "query"];
static readonly NETWORK_BOUND_METHODS: string[] = ["connect", "forceConnect", "query", "rollback"];
static readonly METHODS_REQUIRING_UPDATED_TOPOLOGY: string[] = ["connect", "forceConnect", "query"];
}
4 changes: 3 additions & 1 deletion mysql/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ export class AwsMySQLClient extends AwsClient {
this.properties,
"end",
() => {
return ClientUtils.queryWithTimeout(this.targetClient!.end(), this.properties);
const res = ClientUtils.queryWithTimeout(this.targetClient!.end(), this.properties);
this.targetClient = null;
return res;
},
null
);
Expand Down
4 changes: 3 additions & 1 deletion pg/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ export class AwsPGClient extends AwsClient {
this.properties,
"end",
() => {
return this.targetClient!.end();
const res = this.targetClient!.end();
this.targetClient = null;
return res;
},
null
);
Expand Down
11 changes: 0 additions & 11 deletions tests/unit/aurora_connection_tracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ describe("aurora connection tracker tests", () => {
const plugin = new AuroraConnectionTrackerPlugin(instance(mockPluginService), instance(mockRdsUtils), instance(mockTracker));

await expect(plugin.execute("query", mockSqlFunc, SQL_ARGS)).rejects.toThrow(expectedException);
verify(mockTracker.invalidateCurrentConnection(originalHost, mockClientInstance.targetClient!)).never();
verify(mockTracker.invalidateAllConnections(originalHost)).never();
});

Expand All @@ -111,16 +110,6 @@ describe("aurora connection tracker tests", () => {

await plugin.execute("query", mockSqlFunc, SQL_ARGS);
await expect(plugin.execute("query", mockSqlFunc, SQL_ARGS)).rejects.toThrow(expectedException);
verify(mockTracker.invalidateCurrentConnection(originalHost, mockClientInstance.targetClient!)).never();
verify(mockTracker.invalidateAllConnections(originalHost)).once();
});

it("test track new connections parameters", async () => {
const originalHost = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() }).withHost("host").build();
when(mockPluginService.getCurrentHostInfo()).thenReturn(originalHost);
when(mockPluginService.getHosts()).thenReturn([originalHost]);
const plugin = new AuroraConnectionTrackerPlugin(instance(mockPluginService), instance(mockRdsUtils), instance(mockTracker));
await plugin.execute(AuroraConnectionTrackerPlugin.METHOD_END, mockCloseOrAbortFunc, SQL_ARGS);
verify(mockTracker.invalidateCurrentConnection(originalHost, mockClientInstance.targetClient!)).called();
});
});

0 comments on commit 5fe6443

Please sign in to comment.