Skip to content

Commit

Permalink
fix: reader failover wait for completed batch
Browse files Browse the repository at this point in the history
  • Loading branch information
crystall-bitquill committed Jan 31, 2025
1 parent 132fd5d commit ef40f06
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 13 deletions.
28 changes: 17 additions & 11 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,21 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
tasks.push(secondTask.call());
}

return await Promise.any(tasks);
return await Promise.any(tasks).catch((error: AggregateError) => {
let errors = "\n[";
for (const e of error.errors) {
errors += `\n\t${e} - ${e.message}`;
}
errors += "\n]";
const awsWrapperError = new AwsWrapperError(
Messages.get(
"ClusterAwareReaderFailoverHandler.batchFailed",
`[${hosts[i].hostId}${numTasks === 2 ? `, ${hosts[i + 1].hostId}` : ``}]`,
errors
)
);
return new ReaderFailoverResult(null, null, false, awsWrapperError);
});
}

getReaderHostsByPriority(hosts: HostInfo[]): HostInfo[] {
Expand Down Expand Up @@ -321,18 +335,9 @@ class ConnectionAttemptTask {
this.taskHandler.setSelectedConnectionAttemptTask(this.failoverTaskId, this.taskId);
return new ReaderFailoverResult(this.targetClient, this.newHost, true, undefined, this.taskId);
}
await this.pluginService.abortTargetClient(this.targetClient);
return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
throw new AwsWrapperError("Selected task has been chosen. Abort client for host: " + this.newHost.host);
} catch (error) {
this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.NOT_AVAILABLE);
if (error instanceof Error) {
// Propagate errors that are not caused by network errors.
if (!this.pluginService.isNetworkError(error)) {
return new ReaderFailoverResult(null, null, false, error, this.taskId);
}

return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
}
throw error;
} finally {
await this.performFinalCleanup();
Expand All @@ -342,6 +347,7 @@ class ConnectionAttemptTask {
async performFinalCleanup() {
if (this.taskHandler.getSelectedConnectionAttemptTask(this.failoverTaskId) !== this.taskId) {
await this.pluginService.abortTargetClient(this.targetClient);
this.taskHandler.setSelectedConnectionAttemptTask(this.failoverTaskId, -1);
}
}
}
1 change: 1 addition & 0 deletions common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"ClusterAwareReaderFailoverHandler.attemptingReaderConnection": "Trying to connect to reader: '%s', with properties '%s'",
"ClusterAwareReaderFailoverHandler.successfulReaderConnection": "Connected to reader: '%s'",
"ClusterAwareReaderFailoverHandler.failedReaderConnection": "Failed to connect to reader: '%s'",
"ClusterAwareReaderFailoverHandler.batchFailed": "Reader connections for hosts [%s] failed with the following errors: %s",
"Utils.topology": "Topology: %s",
"RdsHostListProvider.incorrectDialect": "Dialect needs to be a topology aware dialect.",
"RdsHostListProvider.suggestedClusterId": "ClusterId '%s' is suggested for url '%s'.",
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/reader_failover_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ClientWrapper } from "../../common/lib/client_wrapper";
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
import { NodePostgresDriverDialect } from "../../pg/lib/dialect/node_postgres_driver_dialect";
import { PgClientWrapper } from "../../common/lib/pg_client_wrapper";
import { sleep } from "../../common/lib/utils/utils";

const host1 = new HostInfo("writer", 1234, HostRole.WRITER);
const host2 = new HostInfo("reader1", 1234, HostRole.READER);
Expand Down Expand Up @@ -85,6 +86,52 @@ describe("reader failover handler", () => {
expect(result.newHost).toBe(hosts[successHostIndex]);
}, 30000);

it("test failover - batch failure", async () => {
const expectedClients: ClientWrapper[] = [];
const hosts = [...defaultHosts];
const successHostIndex = 4;

when(mockPluginService.getHosts()).thenReturn(hosts);
when(await mockPluginService.getHostRole(anything())).thenReturn(HostRole.READER);

for (let i = 0; i < hosts.length; i++) {
if (i !== 0 && i !== 2) {
const client = new PgClientWrapper({}, hosts[i], properties);
expectedClients.push(client);
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
await sleep(100);
return client;
});
} else {
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
await sleep(100);
throw new AwsWrapperError("Rejecting test");
});
when(mockPluginService.isNetworkError(anything())).thenReturn(true);
expectedClients.push(undefined);
}
}
const mockPluginServiceInstance = instance(mockPluginService);

const target = new ClusterAwareReaderFailoverHandler(
mockPluginServiceInstance,
properties,
ClusterAwareReaderFailoverHandler.DEFAULT_FAILOVER_TIMEOUT,
ClusterAwareReaderFailoverHandler.DEFAULT_READER_CONNECT_TIMEOUT,
false
);
const result = await target.getConnectionFromHostGroup(hosts);
expect(result.isConnected).toBe(true);
expect(result.client).toBe(expectedClients[successHostIndex]);
expect(result.newHost).toBe(hosts[successHostIndex]);
for (let i = 0; i < hosts.length - 1; i++) {
if (i !== 0 && i !== 2 && i !== successHostIndex) {
verify(mockPluginService.abortTargetClient(expectedClients[i])).once();
}
}
verify(mockPluginService.abortTargetClient(undefined)).twice();
}, 30000);

it("test failover timeout", async () => {
// original host list: [active writer, active reader, current connection (reader), active
// reader, down reader, active reader]
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/writer_failover_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { WriterFailoverResult } from "../../common/lib/plugins/failover/writer_f
import { ClientWrapper } from "../../common/lib/client_wrapper";
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
import { MySQLClientWrapper } from "../../common/lib/mysql_client_wrapper";
import { MySQL2DriverDialect } from "../../mysql/lib/dialect/mysql2_driver_dialect";

const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() });

Expand All @@ -46,10 +47,10 @@ const mockPluginService = mock(PluginService);
const mockReaderFailover = mock(ClusterAwareReaderFailoverHandler);

const mockTargetClient = { client: 123 };
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(mockTargetClient, builder.withHost("host").build(), new Map<string, any>());
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(mockTargetClient, builder.withHost("host").build(), new Map<string, any>(), new MySQL2DriverDialect());

const mockTargetClientB = { client: 456 };
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(mockTargetClientB, builder.withHost("host").build(), new Map<string, any>());
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(mockTargetClientB, builder.withHost("host").build(), new Map<string, any>(), new MySQL2DriverDialect());

describe("writer failover handler", () => {
beforeEach(() => {
Expand Down

0 comments on commit ef40f06

Please sign in to comment.