diff --git a/common/lib/plugins/failover/reader_failover_handler.ts b/common/lib/plugins/failover/reader_failover_handler.ts index 398c90e4..821a177c 100644 --- a/common/lib/plugins/failover/reader_failover_handler.ts +++ b/common/lib/plugins/failover/reader_failover_handler.ts @@ -152,13 +152,9 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler return result; } } catch (error) { - if (error instanceof AggregateError && error.message.includes("All promises were rejected")) { - // ignore and try the next batch - } else { - // Failover has failed. - this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED); - throw error; - } + // Failover has failed. + this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED); + throw error; } await sleep(1000); @@ -184,7 +180,7 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler throw new AwsWrapperError("Connection attempt task timed out."); }) .catch((error) => { - if (error instanceof InternalQueryTimeoutError || (error instanceof AggregateError && error.message.includes("All promises were rejected"))) { + if (error instanceof InternalQueryTimeoutError) { // ignore so the next task batch can be attempted return ClusterAwareReaderFailoverHandler.FAILED_READER_FAILOVER_RESULT; } @@ -207,7 +203,26 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler tasks.push(secondTask.call()); } - return await Promise.any(tasks); + return await Promise.any(tasks).catch((error: AggregateError) => { + let errors: string = ""; + for (const e of error.errors) { + // Propagate errors that are not caused by network errors. + if (!this.pluginService.isNetworkError(error)) { + errors += `\n\t${e} - ${e.message}`; + } + } + if (errors) { + const awsWrapperError = new AwsWrapperError( + Messages.get( + "ClusterAwareReaderFailoverHandler.batchFailed", + `[${hosts[i].hostId}${numTasks === 2 ? `, ${hosts[i + 1].hostId}` : ``}]`, + `[\n${errors}\n]` + ) + ); + return new ReaderFailoverResult(null, null, false, awsWrapperError); + } + return new ReaderFailoverResult(null, null, false, undefined); + }); } getReaderHostsByPriority(hosts: HostInfo[]): HostInfo[] { @@ -321,18 +336,9 @@ class ConnectionAttemptTask { this.taskHandler.setSelectedConnectionAttemptTask(this.failoverTaskId, this.taskId); return new ReaderFailoverResult(this.targetClient, this.newHost, true, undefined, this.taskId); } - await this.pluginService.abortTargetClient(this.targetClient); - return new ReaderFailoverResult(null, null, false, undefined, this.taskId); + throw new AwsWrapperError(Messages.get("ClusterAwareReaderFailoverHandler.selectedTaskChosen", this.newHost.host)); } catch (error) { this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.NOT_AVAILABLE); - if (error instanceof Error) { - // Propagate errors that are not caused by network errors. - if (!this.pluginService.isNetworkError(error)) { - return new ReaderFailoverResult(null, null, false, error, this.taskId); - } - - return new ReaderFailoverResult(null, null, false, undefined, this.taskId); - } throw error; } finally { await this.performFinalCleanup(); diff --git a/common/lib/utils/locales/en.json b/common/lib/utils/locales/en.json index 344b65eb..e9bc25b4 100644 --- a/common/lib/utils/locales/en.json +++ b/common/lib/utils/locales/en.json @@ -25,6 +25,8 @@ "ClusterAwareReaderFailoverHandler.attemptingReaderConnection": "Trying to connect to reader: '%s', with properties '%s'", "ClusterAwareReaderFailoverHandler.successfulReaderConnection": "Connected to reader: '%s'", "ClusterAwareReaderFailoverHandler.failedReaderConnection": "Failed to connect to reader: '%s'", + "ClusterAwareReaderFailoverHandler.batchFailed": "Reader connections for hosts [%s] failed with the following errors: %s", + "ClusterAwareReaderFailoverHandler.selectedTaskChosen": "Selected task has already been chosen. Abort client for host: %s", "Utils.topology": "Topology: %s", "RdsHostListProvider.incorrectDialect": "Dialect needs to be a topology aware dialect.", "RdsHostListProvider.suggestedClusterId": "ClusterId '%s' is suggested for url '%s'.", diff --git a/tests/unit/reader_failover_handler.test.ts b/tests/unit/reader_failover_handler.test.ts index 19e72641..774d7c78 100644 --- a/tests/unit/reader_failover_handler.test.ts +++ b/tests/unit/reader_failover_handler.test.ts @@ -25,6 +25,7 @@ import { ClientWrapper } from "../../common/lib/client_wrapper"; import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect"; import { NodePostgresDriverDialect } from "../../pg/lib/dialect/node_postgres_driver_dialect"; import { PgClientWrapper } from "../../common/lib/pg_client_wrapper"; +import { sleep } from "../../common/lib/utils/utils"; const host1 = new HostInfo("writer", 1234, HostRole.WRITER); const host2 = new HostInfo("reader1", 1234, HostRole.READER); @@ -85,6 +86,49 @@ describe("reader failover handler", () => { expect(result.newHost).toBe(hosts[successHostIndex]); }, 30000); + it("test failover - batch failure", async () => { + const expectedClients: ClientWrapper[] = []; + const hosts = [...defaultHosts]; + const successHostIndex = 1; + + when(mockPluginService.getHosts()).thenReturn(hosts); + when(await mockPluginService.getHostRole(anything())).thenReturn(HostRole.READER); + + for (let i = 0; i < hosts.length; i++) { + if (i === successHostIndex) { + const client = new PgClientWrapper({}, hosts[i], properties); + expectedClients.push(client); + when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => { + await sleep(100); + return client; + }); + } else { + when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => { + await sleep(100); + throw new AwsWrapperError("Rejecting test"); + }); + when(mockPluginService.isNetworkError(anything())).thenReturn(true); + expectedClients.push(undefined); + } + } + const mockPluginServiceInstance = instance(mockPluginService); + + const target = new ClusterAwareReaderFailoverHandler( + mockPluginServiceInstance, + properties, + ClusterAwareReaderFailoverHandler.DEFAULT_FAILOVER_TIMEOUT, + ClusterAwareReaderFailoverHandler.DEFAULT_READER_CONNECT_TIMEOUT, + false + ); + const result = await target.getConnectionFromHostGroup(hosts); + expect(result.isConnected).toBe(true); + expect(result.client).toBe(expectedClients[successHostIndex]); + expect(result.newHost).toBe(hosts[successHostIndex]); + verify(mockPluginService.forceConnect(hosts[successHostIndex], anything())).once(); + verify(mockPluginService.abortTargetClient(undefined)).once(); + verify(mockPluginService.abortTargetClient(expectedClients[successHostIndex])).never(); + }, 30000); + it("test failover timeout", async () => { // original host list: [active writer, active reader, current connection (reader), active // reader, down reader, active reader] diff --git a/tests/unit/writer_failover_handler.test.ts b/tests/unit/writer_failover_handler.test.ts index 54bbe12e..9543dd90 100644 --- a/tests/unit/writer_failover_handler.test.ts +++ b/tests/unit/writer_failover_handler.test.ts @@ -28,6 +28,7 @@ import { WriterFailoverResult } from "../../common/lib/plugins/failover/writer_f import { ClientWrapper } from "../../common/lib/client_wrapper"; import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect"; import { MySQLClientWrapper } from "../../common/lib/mysql_client_wrapper"; +import { MySQL2DriverDialect } from "../../mysql/lib/dialect/mysql2_driver_dialect"; const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() }); @@ -46,10 +47,20 @@ const mockPluginService = mock(PluginService); const mockReaderFailover = mock(ClusterAwareReaderFailoverHandler); const mockTargetClient = { client: 123 }; -const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(mockTargetClient, builder.withHost("host").build(), new Map()); +const mockClientWrapper: ClientWrapper = new MySQLClientWrapper( + mockTargetClient, + builder.withHost("host").build(), + new Map(), + new MySQL2DriverDialect() +); const mockTargetClientB = { client: 456 }; -const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(mockTargetClientB, builder.withHost("host").build(), new Map()); +const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper( + mockTargetClientB, + builder.withHost("host").build(), + new Map(), + new MySQL2DriverDialect() +); describe("writer failover handler", () => { beforeEach(() => {