Skip to content

fix: reader failover wait for complete batch #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
this._properties = properties;
this.pluginService = pluginService;
this._rdsHelper = rdsHelper;

this.initSettings();

this._readerFailoverHandler = readerFailoverHandler
? readerFailoverHandler
: new ClusterAwareReaderFailoverHandler(
Expand All @@ -120,7 +123,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
this.failoverClusterTopologyRefreshRateMsSetting,
this.failoverWriterReconnectIntervalMsSetting
);
this.initSettings();
this._staleDnsHelper = new StaleDnsHelper(this.pluginService);

const telemetryFactory = this.pluginService.getTelemetryFactory();
Expand Down
44 changes: 25 additions & 19 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,9 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
return result;
}
} catch (error) {
if (error instanceof AggregateError && error.message.includes("All promises were rejected")) {
// ignore and try the next batch
} else {
// Failover has failed.
this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED);
throw error;
}
// Failover has failed.
this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED);
throw error;
}

await sleep(1000);
Expand All @@ -184,7 +180,7 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
throw new AwsWrapperError("Connection attempt task timed out.");
})
.catch((error) => {
if (error instanceof InternalQueryTimeoutError || (error instanceof AggregateError && error.message.includes("All promises were rejected"))) {
if (error instanceof InternalQueryTimeoutError) {
// ignore so the next task batch can be attempted
return ClusterAwareReaderFailoverHandler.FAILED_READER_FAILOVER_RESULT;
}
Expand All @@ -207,7 +203,26 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
tasks.push(secondTask.call());
}

return await Promise.any(tasks);
return await Promise.any(tasks).catch((error: AggregateError) => {
let errors: string = "";
for (const e of error.errors) {
// Propagate errors that are not caused by network errors.
if (!this.pluginService.isNetworkError(e)) {
errors += `\n\t${e} - ${e.message}`;
}
}
if (errors) {
const awsWrapperError = new AwsWrapperError(
Messages.get(
"ClusterAwareReaderFailoverHandler.batchFailed",
`[${hosts[i].hostId}${numTasks === 2 ? `, ${hosts[i + 1].hostId}` : ``}]`,
`[\n${errors}\n]`
Comment on lines +216 to +219
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if we need to build this custom message or if we could just use the aggregate error's message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we return a ReaderFailoverResult with an error here it will be sent back to the user, so I'd prefer for them to see an AwsWrapperError with some additional context rather than an AggregateError

)
);
return new ReaderFailoverResult(null, null, false, awsWrapperError);
}
return new ReaderFailoverResult(null, null, false, undefined);
});
}

getReaderHostsByPriority(hosts: HostInfo[]): HostInfo[] {
Expand Down Expand Up @@ -321,18 +336,9 @@ class ConnectionAttemptTask {
this.taskHandler.setSelectedConnectionAttemptTask(this.failoverTaskId, this.taskId);
return new ReaderFailoverResult(this.targetClient, this.newHost, true, undefined, this.taskId);
}
await this.pluginService.abortTargetClient(this.targetClient);
return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
throw new AwsWrapperError(Messages.get("ClusterAwareReaderFailoverHandler.selectedTaskChosen", this.newHost.host));
} catch (error) {
this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.NOT_AVAILABLE);
if (error instanceof Error) {
// Propagate errors that are not caused by network errors.
if (!this.pluginService.isNetworkError(error)) {
return new ReaderFailoverResult(null, null, false, error, this.taskId);
}

return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
}
throw error;
} finally {
await this.performFinalCleanup();
Expand Down
2 changes: 2 additions & 0 deletions common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
"ClusterAwareReaderFailoverHandler.attemptingReaderConnection": "Trying to connect to reader: '%s', with properties '%s'",
"ClusterAwareReaderFailoverHandler.successfulReaderConnection": "Connected to reader: '%s'",
"ClusterAwareReaderFailoverHandler.failedReaderConnection": "Failed to connect to reader: '%s'",
"ClusterAwareReaderFailoverHandler.batchFailed": "Reader connections for hosts [%s] failed with the following errors: %s",
"ClusterAwareReaderFailoverHandler.selectedTaskChosen": "Selected task has already been chosen. Abort client for host: %s",
"Utils.topology": "Topology: %s",
"RdsHostListProvider.incorrectDialect": "Dialect needs to be a topology aware dialect.",
"RdsHostListProvider.suggestedClusterId": "ClusterId '%s' is suggested for url '%s'.",
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/reader_failover_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ClientWrapper } from "../../common/lib/client_wrapper";
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
import { NodePostgresDriverDialect } from "../../pg/lib/dialect/node_postgres_driver_dialect";
import { PgClientWrapper } from "../../common/lib/pg_client_wrapper";
import { sleep } from "../../common/lib/utils/utils";

const host1 = new HostInfo("writer", 1234, HostRole.WRITER);
const host2 = new HostInfo("reader1", 1234, HostRole.READER);
Expand Down Expand Up @@ -85,6 +86,49 @@ describe("reader failover handler", () => {
expect(result.newHost).toBe(hosts[successHostIndex]);
}, 30000);

it("test failover - batch failure", async () => {
const expectedClients: ClientWrapper[] = [];
const hosts = [...defaultHosts];
const successHostIndex = 1;

when(mockPluginService.getHosts()).thenReturn(hosts);
when(await mockPluginService.getHostRole(anything())).thenReturn(HostRole.READER);

for (let i = 0; i < hosts.length; i++) {
if (i === successHostIndex) {
const client = new PgClientWrapper({}, hosts[i], properties);
expectedClients.push(client);
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
await sleep(100);
return client;
});
} else {
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
await sleep(100);
throw new AwsWrapperError("Rejecting test");
});
when(mockPluginService.isNetworkError(anything())).thenReturn(true);
expectedClients.push(undefined);
}
}
const mockPluginServiceInstance = instance(mockPluginService);

const target = new ClusterAwareReaderFailoverHandler(
mockPluginServiceInstance,
properties,
ClusterAwareReaderFailoverHandler.DEFAULT_FAILOVER_TIMEOUT,
ClusterAwareReaderFailoverHandler.DEFAULT_READER_CONNECT_TIMEOUT,
false
);
const result = await target.getConnectionFromHostGroup(hosts);
expect(result.isConnected).toBe(true);
expect(result.client).toBe(expectedClients[successHostIndex]);
expect(result.newHost).toBe(hosts[successHostIndex]);
verify(mockPluginService.forceConnect(hosts[successHostIndex], anything())).once();
verify(mockPluginService.abortTargetClient(undefined)).once();
verify(mockPluginService.abortTargetClient(expectedClients[successHostIndex])).never();
}, 30000);

it("test failover timeout", async () => {
// original host list: [active writer, active reader, current connection (reader), active
// reader, down reader, active reader]
Expand Down
15 changes: 13 additions & 2 deletions tests/unit/writer_failover_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { WriterFailoverResult } from "../../common/lib/plugins/failover/writer_f
import { ClientWrapper } from "../../common/lib/client_wrapper";
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
import { MySQLClientWrapper } from "../../common/lib/mysql_client_wrapper";
import { MySQL2DriverDialect } from "../../mysql/lib/dialect/mysql2_driver_dialect";

const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() });

Expand All @@ -46,10 +47,20 @@ const mockPluginService = mock(PluginService);
const mockReaderFailover = mock(ClusterAwareReaderFailoverHandler);

const mockTargetClient = { client: 123 };
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(mockTargetClient, builder.withHost("host").build(), new Map<string, any>());
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(
mockTargetClient,
builder.withHost("host").build(),
new Map<string, any>(),
new MySQL2DriverDialect()
);

const mockTargetClientB = { client: 456 };
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(mockTargetClientB, builder.withHost("host").build(), new Map<string, any>());
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(
mockTargetClientB,
builder.withHost("host").build(),
new Map<string, any>(),
new MySQL2DriverDialect()
);

describe("writer failover handler", () => {
beforeEach(() => {
Expand Down