Skip to content

Commit

Permalink
fix: failover not properly disposing connections
Browse files Browse the repository at this point in the history
  • Loading branch information
karenc-bq committed Nov 7, 2024
1 parent b0ca288 commit 5c025f8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 41 deletions.
10 changes: 7 additions & 3 deletions common/lib/plugins/failover/writer_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export class ClusterAwareWriterFailoverHandler implements WriterFailoverHandler
const taskA = reconnectToWriterHandlerTask.call();
const taskB = waitForNewWriterHandlerTask.call();

let failed = false;
let selectedTask = "";
const singleTask: boolean = this.pluginService.getDialect().getFailoverRestrictions().includes(FailoverRestriction.DISABLE_TASK_A);
const failoverTaskPromise = singleTask ? taskB : Promise.any([taskA, taskB]);
Expand Down Expand Up @@ -143,10 +144,11 @@ export class ClusterAwareWriterFailoverHandler implements WriterFailoverHandler
if (JSON.stringify(error).includes("Connection attempt task timed out.")) {
return new WriterFailoverResult(false, false, [], "None", null);
}
failed = true;
throw error;
})
.finally(async () => {
await reconnectToWriterHandlerTask.cancel(selectedTask);
await reconnectToWriterHandlerTask.cancel(failed, selectedTask);
await waitForNewWriterHandlerTask.cancel(selectedTask);
clearTimeout(timeoutId);
});
Expand Down Expand Up @@ -182,6 +184,7 @@ class ReconnectToWriterHandlerTask {
currentClient: ClientWrapper | null = null;
endTime: number;
failoverCompleted: boolean = false;
failoverCompletedDueToError: boolean = false;
timeoutId: any = -1;

constructor(
Expand Down Expand Up @@ -261,16 +264,17 @@ class ReconnectToWriterHandlerTask {
logger.error(error.message);
return new WriterFailoverResult(false, false, [], ClusterAwareWriterFailoverHandler.RECONNECT_WRITER_TASK, null);
} finally {
if (this.currentClient && !success) {
if (this.currentClient && (this.failoverCompletedDueToError || !success)) {
await this.pluginService.abortTargetClient(this.currentClient);
}
logger.info(Messages.get("ClusterAwareWriterFailoverHandler.taskAFinished"));
}
}

async cancel(selectedTask?: string) {
async cancel(failed: boolean, selectedTask?: string) {
clearTimeout(this.timeoutId);
this.failoverCompleted = true;
this.failoverCompletedDueToError = failed;

// Task B was returned.
if (selectedTask && selectedTask === ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK) {
Expand Down
78 changes: 40 additions & 38 deletions tests/integration/container/tests/read_write_splitting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
import { TestEnvironment } from "./utils/test_environment";
import { DriverHelper } from "./utils/driver_helper";
import { AuroraTestUtility } from "./utils/aurora_test_utility";
import { AwsWrapperError, FailoverFailedError, FailoverSuccessError, TransactionResolutionUnknownError } from "../../../../common/lib/utils/errors";
import {
AwsWrapperError,
FailoverFailedError,
FailoverSuccessError,
TransactionResolutionUnknownError
} from "../../../../common/lib/utils/errors";
import { DatabaseEngine } from "./utils/database_engine";
import { QueryResult } from "pg";
import { ProxyHelper } from "./utils/proxy_helper";
Expand All @@ -29,7 +34,6 @@ import { AwsPoolConfig } from "../../../../common/lib/aws_pool_config";
import { ConnectionProviderManager } from "../../../../common/lib/connection_provider_manager";
import { InternalPoolMapping } from "../../../../common/lib/utils/internal_pool_mapping";
import { HostInfo } from "../../../../common/lib/host_info";
import { sleep } from "../../../../common/lib/utils/utils";

const itIf =
!features.includes(TestEnvironmentFeatures.PERFORMANCE) && features.includes(TestEnvironmentFeatures.IAM) && instanceCount >= 2 ? it : it.skip;
Expand Down Expand Up @@ -520,6 +524,40 @@ describe("aurora read write splitting", () => {
1000000
);

itIf(
"test pooled connection failover failed",
async () => {
const config = await initConfigWithFailover(env.proxyDatabaseInfo.writerInstanceEndpoint, env.proxyDatabaseInfo.instanceEndpointPort, true);
config["failoverTimeoutMs"] = 1000;

client = initClientFunc(config);

provider = new InternalPooledConnectionProvider({
minConnections: 0,
maxConnections: 10,
maxIdleConnections: 10
});
ConnectionProviderManager.setConnectionProvider(provider);

await client.connect();
const initialWriterId = await auroraTestUtility.queryInstanceId(client);

// Kill all instances
await ProxyHelper.disableAllConnectivity(env.engine);
await expect(async () => {
await auroraTestUtility.queryInstanceId(client);
}).rejects.toThrow(FailoverFailedError);
await ProxyHelper.enableAllConnectivity();
await client.connect();
await TestEnvironment.verifyClusterStatus();

const newWriterId = await auroraTestUtility.queryInstanceId(client);
expect(newWriterId).toBe(initialWriterId);
await client.end();
},
1000000
);

itIf(
"test pooled connection failover in transaction",
async () => {
Expand Down Expand Up @@ -638,40 +676,4 @@ describe("aurora read write splitting", () => {
},
1000000
);
itIf(
"test pooled connection failover failed",
async () => {
const config = await initConfigWithFailover(env.proxyDatabaseInfo.writerInstanceEndpoint, env.proxyDatabaseInfo.instanceEndpointPort, true);
config["failoverTimeoutMs"] = 1000;

client = initClientFunc(config);

provider = new InternalPooledConnectionProvider({
minConnections: 0,
maxConnections: 10,
maxIdleConnections: 10
});
ConnectionProviderManager.setConnectionProvider(provider);

await client.connect();
const initialWriterId = await auroraTestUtility.queryInstanceId(client);

// Kill all instances
await ProxyHelper.disableAllConnectivity(env.engine);
await expect(async () => {
await auroraTestUtility.queryInstanceId(client);
}).rejects.toThrow(FailoverFailedError);
await ProxyHelper.enableAllConnectivity();
await client.connect();
await TestEnvironment.verifyClusterStatus();

const newWriterId = await auroraTestUtility.queryInstanceId(client);
expect(newWriterId).toBe(initialWriterId);
await client.end();

// Sleep for a minute to ensure all ongoing promises have ended.
await sleep(60000);
},
1000000
);
});

0 comments on commit 5c025f8

Please sign in to comment.