Skip to content

Commit

Permalink
fix: reader failover wait for completed batch
Browse files Browse the repository at this point in the history
  • Loading branch information
crystall-bitquill committed Feb 1, 2025
1 parent 132fd5d commit 04725a9
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 21 deletions.
44 changes: 25 additions & 19 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,9 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
return result;
}
} catch (error) {
if (error instanceof AggregateError && error.message.includes("All promises were rejected")) {
// ignore and try the next batch
} else {
// Failover has failed.
this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED);
throw error;
}
// Failover has failed.
this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED);
throw error;
}

await sleep(1000);
Expand All @@ -184,7 +180,7 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
throw new AwsWrapperError("Connection attempt task timed out.");
})
.catch((error) => {
if (error instanceof InternalQueryTimeoutError || (error instanceof AggregateError && error.message.includes("All promises were rejected"))) {
if (error instanceof InternalQueryTimeoutError) {
// ignore so the next task batch can be attempted
return ClusterAwareReaderFailoverHandler.FAILED_READER_FAILOVER_RESULT;
}
Expand All @@ -207,7 +203,26 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
tasks.push(secondTask.call());
}

return await Promise.any(tasks);
return await Promise.any(tasks).catch((error: AggregateError) => {
let errors: string = "";
for (const e of error.errors) {
// Propagate errors that are not caused by network errors.
if (!this.pluginService.isNetworkError(error)) {
errors += `\n\t${e} - ${e.message}`;
}
}
if (errors) {
const awsWrapperError = new AwsWrapperError(
Messages.get(
"ClusterAwareReaderFailoverHandler.batchFailed",
`[${hosts[i].hostId}${numTasks === 2 ? `, ${hosts[i + 1].hostId}` : ``}]`,
`[\n${errors}\n]`
)
);
return new ReaderFailoverResult(null, null, false, awsWrapperError);
}
return new ReaderFailoverResult(null, null, false, undefined);
});
}

getReaderHostsByPriority(hosts: HostInfo[]): HostInfo[] {
Expand Down Expand Up @@ -321,18 +336,9 @@ class ConnectionAttemptTask {
this.taskHandler.setSelectedConnectionAttemptTask(this.failoverTaskId, this.taskId);
return new ReaderFailoverResult(this.targetClient, this.newHost, true, undefined, this.taskId);
}
await this.pluginService.abortTargetClient(this.targetClient);
return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
throw new AwsWrapperError(Messages.get("ClusterAwareReaderFailoverHandler.selectedTaskChosen", this.newHost.host));
} catch (error) {
this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.NOT_AVAILABLE);
if (error instanceof Error) {
// Propagate errors that are not caused by network errors.
if (!this.pluginService.isNetworkError(error)) {
return new ReaderFailoverResult(null, null, false, error, this.taskId);
}

return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
}
throw error;
} finally {
await this.performFinalCleanup();
Expand Down
2 changes: 2 additions & 0 deletions common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
"ClusterAwareReaderFailoverHandler.attemptingReaderConnection": "Trying to connect to reader: '%s', with properties '%s'",
"ClusterAwareReaderFailoverHandler.successfulReaderConnection": "Connected to reader: '%s'",
"ClusterAwareReaderFailoverHandler.failedReaderConnection": "Failed to connect to reader: '%s'",
"ClusterAwareReaderFailoverHandler.batchFailed": "Reader connections for hosts [%s] failed with the following errors: %s",
"ClusterAwareReaderFailoverHandler.selectedTaskChosen": "Selected task has already been chosen. Abort client for host: %s",
"Utils.topology": "Topology: %s",
"RdsHostListProvider.incorrectDialect": "Dialect needs to be a topology aware dialect.",
"RdsHostListProvider.suggestedClusterId": "ClusterId '%s' is suggested for url '%s'.",
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/reader_failover_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ClientWrapper } from "../../common/lib/client_wrapper";
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
import { NodePostgresDriverDialect } from "../../pg/lib/dialect/node_postgres_driver_dialect";
import { PgClientWrapper } from "../../common/lib/pg_client_wrapper";
import { sleep } from "../../common/lib/utils/utils";

const host1 = new HostInfo("writer", 1234, HostRole.WRITER);
const host2 = new HostInfo("reader1", 1234, HostRole.READER);
Expand Down Expand Up @@ -85,6 +86,49 @@ describe("reader failover handler", () => {
expect(result.newHost).toBe(hosts[successHostIndex]);
}, 30000);

it("test failover - batch failure", async () => {
const expectedClients: ClientWrapper[] = [];
const hosts = [...defaultHosts];
const successHostIndex = 1;

when(mockPluginService.getHosts()).thenReturn(hosts);
when(await mockPluginService.getHostRole(anything())).thenReturn(HostRole.READER);

for (let i = 0; i < hosts.length; i++) {
if (i === successHostIndex) {
const client = new PgClientWrapper({}, hosts[i], properties);
expectedClients.push(client);
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
await sleep(100);
return client;
});
} else {
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
await sleep(100);
throw new AwsWrapperError("Rejecting test");
});
when(mockPluginService.isNetworkError(anything())).thenReturn(true);
expectedClients.push(undefined);
}
}
const mockPluginServiceInstance = instance(mockPluginService);

const target = new ClusterAwareReaderFailoverHandler(
mockPluginServiceInstance,
properties,
ClusterAwareReaderFailoverHandler.DEFAULT_FAILOVER_TIMEOUT,
ClusterAwareReaderFailoverHandler.DEFAULT_READER_CONNECT_TIMEOUT,
false
);
const result = await target.getConnectionFromHostGroup(hosts);
expect(result.isConnected).toBe(true);
expect(result.client).toBe(expectedClients[successHostIndex]);
expect(result.newHost).toBe(hosts[successHostIndex]);
verify(mockPluginService.forceConnect(hosts[successHostIndex], anything())).once();
verify(mockPluginService.abortTargetClient(undefined)).once();
verify(mockPluginService.abortTargetClient(expectedClients[successHostIndex])).never();
}, 30000);

it("test failover timeout", async () => {
// original host list: [active writer, active reader, current connection (reader), active
// reader, down reader, active reader]
Expand Down
15 changes: 13 additions & 2 deletions tests/unit/writer_failover_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { WriterFailoverResult } from "../../common/lib/plugins/failover/writer_f
import { ClientWrapper } from "../../common/lib/client_wrapper";
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
import { MySQLClientWrapper } from "../../common/lib/mysql_client_wrapper";
import { MySQL2DriverDialect } from "../../mysql/lib/dialect/mysql2_driver_dialect";

const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() });

Expand All @@ -46,10 +47,20 @@ const mockPluginService = mock(PluginService);
const mockReaderFailover = mock(ClusterAwareReaderFailoverHandler);

const mockTargetClient = { client: 123 };
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(mockTargetClient, builder.withHost("host").build(), new Map<string, any>());
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(
mockTargetClient,
builder.withHost("host").build(),
new Map<string, any>(),
new MySQL2DriverDialect()
);

const mockTargetClientB = { client: 456 };
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(mockTargetClientB, builder.withHost("host").build(), new Map<string, any>());
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(
mockTargetClientB,
builder.withHost("host").build(),
new Map<string, any>(),
new MySQL2DriverDialect()
);

describe("writer failover handler", () => {
beforeEach(() => {
Expand Down

0 comments on commit 04725a9

Please sign in to comment.