Skip to content

Commit 4688bc6

Browse files
fix: reader failover wait for completed batch
1 parent 2aa927f commit 4688bc6

File tree

4 files changed

+84
-21
lines changed

4 files changed

+84
-21
lines changed

common/lib/plugins/failover/reader_failover_handler.ts

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,9 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
152152
return result;
153153
}
154154
} catch (error) {
155-
if (error instanceof AggregateError && error.message.includes("All promises were rejected")) {
156-
// ignore and try the next batch
157-
} else {
158-
// Failover has failed.
159-
this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED);
160-
throw error;
161-
}
155+
// Failover has failed.
156+
this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED);
157+
throw error;
162158
}
163159

164160
await sleep(1000);
@@ -184,7 +180,7 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
184180
throw new AwsWrapperError("Connection attempt task timed out.");
185181
})
186182
.catch((error) => {
187-
if (error instanceof InternalQueryTimeoutError || (error instanceof AggregateError && error.message.includes("All promises were rejected"))) {
183+
if (error instanceof InternalQueryTimeoutError) {
188184
// ignore so the next task batch can be attempted
189185
return ClusterAwareReaderFailoverHandler.FAILED_READER_FAILOVER_RESULT;
190186
}
@@ -207,7 +203,26 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
207203
tasks.push(secondTask.call());
208204
}
209205

210-
return await Promise.any(tasks);
206+
return await Promise.any(tasks).catch((error: AggregateError) => {
207+
let errors: string = "";
208+
for (const e of error.errors) {
209+
// Propagate errors that are not caused by network errors.
210+
if (!this.pluginService.isNetworkError(error)) {
211+
errors += `\n\t${e} - ${e.message}`;
212+
}
213+
}
214+
if (errors) {
215+
const awsWrapperError = new AwsWrapperError(
216+
Messages.get(
217+
"ClusterAwareReaderFailoverHandler.batchFailed",
218+
`[${hosts[i].hostId}${numTasks === 2 ? `, ${hosts[i + 1].hostId}` : ``}]`,
219+
`[\n${errors}\n]`
220+
)
221+
);
222+
return new ReaderFailoverResult(null, null, false, awsWrapperError);
223+
}
224+
return new ReaderFailoverResult(null, null, false, undefined);
225+
});
211226
}
212227

213228
getReaderHostsByPriority(hosts: HostInfo[]): HostInfo[] {
@@ -321,18 +336,9 @@ class ConnectionAttemptTask {
321336
this.taskHandler.setSelectedConnectionAttemptTask(this.failoverTaskId, this.taskId);
322337
return new ReaderFailoverResult(this.targetClient, this.newHost, true, undefined, this.taskId);
323338
}
324-
await this.pluginService.abortTargetClient(this.targetClient);
325-
return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
339+
throw new AwsWrapperError(Messages.get("ClusterAwareReaderFailoverHandler.selectedTaskChosen", this.newHost.host));
326340
} catch (error) {
327341
this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.NOT_AVAILABLE);
328-
if (error instanceof Error) {
329-
// Propagate errors that are not caused by network errors.
330-
if (!this.pluginService.isNetworkError(error)) {
331-
return new ReaderFailoverResult(null, null, false, error, this.taskId);
332-
}
333-
334-
return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
335-
}
336342
throw error;
337343
} finally {
338344
await this.performFinalCleanup();

common/lib/utils/locales/en.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
"ClusterAwareReaderFailoverHandler.attemptingReaderConnection": "Trying to connect to reader: '%s', with properties '%s'",
2626
"ClusterAwareReaderFailoverHandler.successfulReaderConnection": "Connected to reader: '%s'",
2727
"ClusterAwareReaderFailoverHandler.failedReaderConnection": "Failed to connect to reader: '%s'",
28+
"ClusterAwareReaderFailoverHandler.batchFailed": "Reader connections for hosts [%s] failed with the following errors: %s",
29+
"ClusterAwareReaderFailoverHandler.selectedTaskChosen": "Selected task has already been chosen. Abort client for host: %s",
2830
"Utils.topology": "Topology: %s",
2931
"RdsHostListProvider.incorrectDialect": "Dialect needs to be a topology aware dialect.",
3032
"RdsHostListProvider.suggestedClusterId": "ClusterId '%s' is suggested for url '%s'.",

tests/unit/reader_failover_handler.test.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { ClientWrapper } from "../../common/lib/client_wrapper";
2525
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
2626
import { NodePostgresDriverDialect } from "../../pg/lib/dialect/node_postgres_driver_dialect";
2727
import { PgClientWrapper } from "../../common/lib/pg_client_wrapper";
28+
import { sleep } from "../../common/lib/utils/utils";
2829

2930
const host1 = new HostInfo("writer", 1234, HostRole.WRITER);
3031
const host2 = new HostInfo("reader1", 1234, HostRole.READER);
@@ -85,6 +86,49 @@ describe("reader failover handler", () => {
8586
expect(result.newHost).toBe(hosts[successHostIndex]);
8687
}, 30000);
8788

89+
it("test failover - batch failure", async () => {
90+
const expectedClients: ClientWrapper[] = [];
91+
const hosts = [...defaultHosts];
92+
const successHostIndex = 1;
93+
94+
when(mockPluginService.getHosts()).thenReturn(hosts);
95+
when(await mockPluginService.getHostRole(anything())).thenReturn(HostRole.READER);
96+
97+
for (let i = 0; i < hosts.length; i++) {
98+
if (i === successHostIndex) {
99+
const client = new PgClientWrapper({}, hosts[i], properties);
100+
expectedClients.push(client);
101+
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
102+
await sleep(100);
103+
return client;
104+
});
105+
} else {
106+
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
107+
await sleep(100);
108+
throw new AwsWrapperError("Rejecting test");
109+
});
110+
when(mockPluginService.isNetworkError(anything())).thenReturn(true);
111+
expectedClients.push(undefined);
112+
}
113+
}
114+
const mockPluginServiceInstance = instance(mockPluginService);
115+
116+
const target = new ClusterAwareReaderFailoverHandler(
117+
mockPluginServiceInstance,
118+
properties,
119+
ClusterAwareReaderFailoverHandler.DEFAULT_FAILOVER_TIMEOUT,
120+
ClusterAwareReaderFailoverHandler.DEFAULT_READER_CONNECT_TIMEOUT,
121+
false
122+
);
123+
const result = await target.getConnectionFromHostGroup(hosts);
124+
expect(result.isConnected).toBe(true);
125+
expect(result.client).toBe(expectedClients[successHostIndex]);
126+
expect(result.newHost).toBe(hosts[successHostIndex]);
127+
verify(mockPluginService.forceConnect(hosts[successHostIndex], anything())).once();
128+
verify(mockPluginService.abortTargetClient(undefined)).once();
129+
verify(mockPluginService.abortTargetClient(expectedClients[successHostIndex])).never();
130+
}, 30000);
131+
88132
it("test failover timeout", async () => {
89133
// original host list: [active writer, active reader, current connection (reader), active
90134
// reader, down reader, active reader]

tests/unit/writer_failover_handler.test.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { WriterFailoverResult } from "../../common/lib/plugins/failover/writer_f
2828
import { ClientWrapper } from "../../common/lib/client_wrapper";
2929
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
3030
import { MySQLClientWrapper } from "../../common/lib/mysql_client_wrapper";
31+
import { MySQL2DriverDialect } from "../../mysql/lib/dialect/mysql2_driver_dialect";
3132

3233
const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() });
3334

@@ -46,10 +47,20 @@ const mockPluginService = mock(PluginService);
4647
const mockReaderFailover = mock(ClusterAwareReaderFailoverHandler);
4748

4849
const mockTargetClient = { client: 123 };
49-
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(mockTargetClient, builder.withHost("host").build(), new Map<string, any>());
50+
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(
51+
mockTargetClient,
52+
builder.withHost("host").build(),
53+
new Map<string, any>(),
54+
new MySQL2DriverDialect()
55+
);
5056

5157
const mockTargetClientB = { client: 456 };
52-
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(mockTargetClientB, builder.withHost("host").build(), new Map<string, any>());
58+
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(
59+
mockTargetClientB,
60+
builder.withHost("host").build(),
61+
new Map<string, any>(),
62+
new MySQL2DriverDialect()
63+
);
5364

5465
describe("writer failover handler", () => {
5566
beforeEach(() => {

0 commit comments

Comments
 (0)