Skip to content

Commit

Permalink
fix: connection issues (#293)
Browse files Browse the repository at this point in the history
  • Loading branch information
karenc-bq authored Nov 8, 2024
1 parent 58ccf39 commit 69eb723
Show file tree
Hide file tree
Showing 33 changed files with 366 additions and 449 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ jobs:
run-integration-tests:
name: Run Integration Tests with Default
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
dbEngine: ["mysql", "postgres"]

steps:
- name: Clone repository
Expand Down Expand Up @@ -57,7 +61,7 @@ jobs:
- name: Run Integration Tests
run: |
./gradlew --no-parallel --no-daemon test-aurora --info
./gradlew --no-parallel --no-daemon test-aurora-${{ matrix.dbEngine }} --info
env:
RDS_CLUSTER_DOMAIN: ${{ secrets.DB_CONN_SUFFIX }}
RDS_DB_REGION: ${{ secrets.AWS_DEFAULT_REGION }}
Expand Down Expand Up @@ -86,6 +90,6 @@ jobs:
if: always()
uses: actions/upload-artifact@v4
with:
name: integration-report-default
name: integration-report-default-${{ matrix.dbEngine }}
path: ./tests/integration/container/reports
retention-days: 5
8 changes: 6 additions & 2 deletions .github/workflows/integration_tests_latest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ jobs:
run-integration-tests:
name: Run Integration Tests with Latest
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
dbEngine: ["mysql", "postgres"]

steps:
- name: Clone repository
Expand Down Expand Up @@ -60,7 +64,7 @@ jobs:
- name: Run Integration Tests
run: |
./gradlew --no-parallel --no-daemon test-aurora --info
./gradlew --no-parallel --no-daemon test-aurora-${{ matrix.dbEngine }} --info
env:
RDS_CLUSTER_DOMAIN: ${{ secrets.DB_CONN_SUFFIX }}
RDS_DB_REGION: ${{ secrets.AWS_DEFAULT_REGION }}
Expand Down Expand Up @@ -89,6 +93,6 @@ jobs:
if: always()
uses: actions/upload-artifact@v4
with:
name: integration-report-latest
name: integration-report-latest-${{ matrix.dbEngine }}
path: ./tests/integration/container/reports
retention-days: 5
58 changes: 0 additions & 58 deletions THIRD-PARTY-LICENSES
Original file line number Diff line number Diff line change
Expand Up @@ -11980,64 +11980,6 @@ IN THE SOFTWARE.

-----------

The following npm packages may be included in this product:

- [email protected]
- [email protected]
- [email protected]

These packages each contain the following license and notice below:

Copyright jQuery Foundation and other contributors <https://jquery.org/>

Based on Underscore.js, copyright Jeremy Ashkenas,
DocumentCloud and Investigative Reporters & Editors <http://underscorejs.org/>

This software consists of voluntary contributions made by many
individuals. For exact contribution history, see the revision history
available at https://github.com/lodash/lodash

The following license applies to all parts of this software except as
documented below:

====

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

====

Copyright and related rights for sample code are waived via CC0. Sample
code is defined as all source code displayed within the prose of the
documentation.

CC0: http://creativecommons.org/publicdomain/zero/1.0/

====

Files located in the node_modules and vendor directories are externally
maintained libraries used by this software which have their own
licenses; we recommend you read them, as their terms may differ from the
terms above.

-----------

The following npm package may be included in this product:

- [email protected]
Expand Down
15 changes: 5 additions & 10 deletions common/lib/driver_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { AwsWrapperError } from "./utils/errors";
import { WrapperProperties } from "./wrapper_property";
import { Messages } from "./utils/messages";
import { RdsUtils } from "./utils/rds_utils";
import { HostInfoBuilder } from "./host_info_builder";
import { promisify } from "util";
import { lookup } from "dns";
import { PluginService } from "./plugin_service";
Expand Down Expand Up @@ -51,17 +50,13 @@ export class DriverConnectionProvider implements ConnectionProvider {
async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ClientWrapper> {
let resultTargetClient;
const resultProps = new Map(props);
let connectionHostInfo: HostInfo;

resultProps.set(WrapperProperties.HOST.name, hostInfo.host);
if (hostInfo.isPortSpecified()) {
resultProps.set(WrapperProperties.PORT.name, hostInfo.port);
}
const driverDialect: DriverDialect = pluginService.getDriverDialect();
try {
const targetClient: any = await driverDialect.connect(hostInfo, props);
connectionHostInfo = new HostInfoBuilder({
hostAvailabilityStrategy: hostInfo.hostAvailabilityStrategy
})
.copyFrom(hostInfo)
.build();
resultTargetClient = targetClient;
resultTargetClient = await driverDialect.connect(hostInfo, resultProps);
} catch (e: any) {
if (!WrapperProperties.ENABLE_GREEN_HOST_REPLACEMENT.get(props)) {
throw e;
Expand Down
28 changes: 28 additions & 0 deletions common/lib/error_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,35 @@
limitations under the License.
*/

import { ClientWrapper } from "./client_wrapper";

export interface ErrorHandler {
isLoginError(e: Error): boolean;

isNetworkError(e: Error): boolean;

/**
* Checks whether there has been an unexpected error emitted and if the error is a type of login error.
*/
hasLoginError(): boolean;

/**
* Checks whether there has been an unexpected error emitted and if the error is a type of network error.
*/
hasNetworkError(): boolean;

getUnexpectedError(): Error | null;

/**
* Attach an error event listener to the event emitter object in the ClientWrapper.
* The listener will track the latest error emitted to be handled in the future.
* @param clientWrapper a wrapper containing the target community client.
*/
attachErrorListener(clientWrapper: ClientWrapper | undefined): void;

/**
* Attach a No-Op error listener that ignores any error emitted.
* @param clientWrapper a wrapper containing the target community client.
*/
attachNoOpErrorListener(clientWrapper: ClientWrapper | undefined): void;
}
38 changes: 30 additions & 8 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ export class PluginService implements ErrorHandler, HostListProviderService {
this._isInTransaction = inTransaction;
}

isLoginError(e: Error): boolean {
return this.getCurrentClient().errorHandler.isLoginError(e);
}

isNetworkError(e: Error): boolean {
return this.getCurrentClient().errorHandler.isNetworkError(e);
}

getHostListProvider(): HostListProvider | null {
return this._hostListProvider ? this._hostListProvider : null;
}
Expand Down Expand Up @@ -479,4 +471,34 @@ export class PluginService implements ErrorHandler, HostListProviderService {
getTelemetryFactory(): TelemetryFactory {
return this.pluginServiceManagerContainer.pluginManager!.getTelemetryFactory();
}

/* Error Handler interface implementation */

isLoginError(e: Error): boolean {
return this.getCurrentClient().errorHandler.isLoginError(e);
}

isNetworkError(e: Error): boolean {
return this.getCurrentClient().errorHandler.isNetworkError(e);
}

hasLoginError(): boolean {
return this.getCurrentClient().errorHandler.hasLoginError();
}

hasNetworkError(): boolean {
return this.getCurrentClient().errorHandler.hasNetworkError();
}

getUnexpectedError(): Error | null {
return this.getCurrentClient().errorHandler.getUnexpectedError();
}

attachErrorListener(clientWrapper: ClientWrapper | undefined): void {
this.getCurrentClient().errorHandler.attachErrorListener(clientWrapper);
}

attachNoOpErrorListener(clientWrapper: ClientWrapper | undefined): void {
this.getCurrentClient().errorHandler.attachNoOpErrorListener(clientWrapper);
}
}
4 changes: 2 additions & 2 deletions common/lib/plugins/efm/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export class MonitorImpl implements Monitor {

startMonitoring(context: MonitorConnectionContext): void {
if (this.stopped) {
logger.warning(Messages.get("MonitorImpl.monitorIsStopped", this.hostInfo.host));
logger.warn(Messages.get("MonitorImpl.monitorIsStopped", this.hostInfo.host));
}

const currentTimeNanos: number = this.getCurrentTimeNano();
Expand All @@ -105,7 +105,7 @@ export class MonitorImpl implements Monitor {

stopMonitoring(context: MonitorConnectionContext): void {
if (context == null) {
logger.warning(Messages.get("MonitorImpl.contextNullWarning"));
logger.warn(Messages.get("MonitorImpl.contextNullWarning"));
return;
}

Expand Down
8 changes: 7 additions & 1 deletion common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
this.failoverMode = this._rdsUrlType === RdsUrlType.RDS_READER_CLUSTER ? FailoverMode.READER_OR_WRITER : FailoverMode.STRICT_WRITER;
}

logger.debug(Messages.get("Failover.parameterValue", "failoverMode", this.failoverMode.toString()));
logger.debug(Messages.get("Failover.parameterValue", "failoverMode", FailoverMode[this.failoverMode]));
}

override notifyConnectionChanged(changes: Set<HostChangeOptions>): Promise<OldConnectionSuggestionAction> {
Expand Down Expand Up @@ -294,6 +294,12 @@ export class FailoverPlugin extends AbstractConnectionPlugin {

override async execute<T>(methodName: string, methodFunc: () => Promise<T>): Promise<T> {
try {
// Verify there aren't any unexpected error emitted while the connection was idle.
if (this.pluginService.hasNetworkError()) {
// Throw the unexpected error directly to be handled.
throw this.pluginService.getUnexpectedError();
}

if (!this.enableFailoverSetting || this.canDirectExecute(methodName)) {
return await methodFunc();
}
Expand Down
7 changes: 7 additions & 0 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export interface ReaderFailoverHandler {
}

export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler {
private static readonly FAILOVER_FAILED = -3;
static readonly FAILED_READER_FAILOVER_RESULT = new ReaderFailoverResult(null, null, false);
static readonly DEFAULT_FAILOVER_TIMEOUT = 60000; // 60 sec
static readonly DEFAULT_READER_CONNECT_TIMEOUT = 30000; // 30 sec
Expand Down Expand Up @@ -164,13 +165,17 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
if (error instanceof AggregateError && error.message.includes("All promises were rejected")) {
// ignore and try the next batch
} else {
// Failover has failed.
this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED);
throw error;
}
}

await sleep(1000);
}

// Failover has failed.
this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED);
return new ReaderFailoverResult(null, null, false);
}

Expand All @@ -193,6 +198,8 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
// ignore so the next task batch can be attempted
return ClusterAwareReaderFailoverHandler.FAILED_READER_FAILOVER_RESULT;
}
// Reader failover has failed.
this.taskHandler.setSelectedConnectionAttemptTask(failoverTaskId, ClusterAwareReaderFailoverHandler.FAILOVER_FAILED);
throw error;
})
.finally(() => {
Expand Down
11 changes: 8 additions & 3 deletions common/lib/plugins/failover/writer_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export class ClusterAwareWriterFailoverHandler implements WriterFailoverHandler
const taskA = reconnectToWriterHandlerTask.call();
const taskB = waitForNewWriterHandlerTask.call();

let failed = false;
let selectedTask = "";
const singleTask: boolean = this.pluginService.getDialect().getFailoverRestrictions().includes(FailoverRestriction.DISABLE_TASK_A);
const failoverTaskPromise = singleTask ? taskB : Promise.any([taskA, taskB]);
Expand Down Expand Up @@ -136,17 +137,19 @@ export class ClusterAwareWriterFailoverHandler implements WriterFailoverHandler
this.logTaskSuccess(result);
return result;
}
failed = true;
throw new AwsWrapperError("Connection attempt task timed out.");
})
.catch((error: any) => {
logger.info(Messages.get("ClusterAwareWriterFailoverHandler.failedToConnectToWriterInstance"));
failed = true;
if (JSON.stringify(error).includes("Connection attempt task timed out.")) {
return new WriterFailoverResult(false, false, [], "None", null);
}
throw error;
})
.finally(async () => {
await reconnectToWriterHandlerTask.cancel(selectedTask);
await reconnectToWriterHandlerTask.cancel(failed, selectedTask);
await waitForNewWriterHandlerTask.cancel(selectedTask);
clearTimeout(timeoutId);
});
Expand Down Expand Up @@ -182,6 +185,7 @@ class ReconnectToWriterHandlerTask {
currentClient: ClientWrapper | null = null;
endTime: number;
failoverCompleted: boolean = false;
failoverCompletedDueToError: boolean = false;
timeoutId: any = -1;

constructor(
Expand Down Expand Up @@ -261,16 +265,17 @@ class ReconnectToWriterHandlerTask {
logger.error(error.message);
return new WriterFailoverResult(false, false, [], ClusterAwareWriterFailoverHandler.RECONNECT_WRITER_TASK, null);
} finally {
if (this.currentClient && !success) {
if (this.currentClient && (this.failoverCompletedDueToError || !success)) {
await this.pluginService.abortTargetClient(this.currentClient);
}
logger.info(Messages.get("ClusterAwareWriterFailoverHandler.taskAFinished"));
}
}

async cancel(selectedTask?: string) {
async cancel(failed: boolean, selectedTask?: string) {
clearTimeout(this.timeoutId);
this.failoverCompleted = true;
this.failoverCompletedDueToError = failed;

// Task B was returned.
if (selectedTask && selectedTask === ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK) {
Expand Down
3 changes: 0 additions & 3 deletions common/lib/round_robin_host_selector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import { CacheMap } from "./utils/cache_map";
import { HostAvailability } from "./host_availability/host_availability";
import { Messages } from "./utils/messages";

import pkgLodash from "lodash";
const { isInteger } = pkgLodash;

export class RoundRobinHostSelector implements HostSelector {
static DEFAULT_ROUND_ROBIN_CACHE_EXPIRE_NANO = 10 * 60_000_000_000; // 10 minutes
static DEFAULT_WEIGHT = 1;
Expand Down
4 changes: 3 additions & 1 deletion common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,7 @@
"DefaultTelemetryFactory.importFailure": "A tracing backend could not be found.",
"DefaultTelemetryFactory.missingTracingBackend": "A tracing backend could not be found.",
"DefaultTelemetryFactory.missingMetricsBackend": "A metrics backend could not be found.",
"InternalPooledConnectionProvider.pooledConnectionFailed": "Internal pooled connection failed with message: '%s'"
"InternalPooledConnectionProvider.pooledConnectionFailed": "Internal pooled connection failed with message: '%s'",
"ErrorHandler.NoOpListener": "[%s] NoOp error event listener caught error: '%s'",
"ErrorHandler.TrackerListener": "[%s] Tracker error event listener caught error: '%s'"
}
3 changes: 0 additions & 3 deletions common/lib/utils/telemetry/default_telemetry_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import { TelemetryContext } from "./telemetry_context";
import { NullTelemetryFactory } from "./null_telemetry_factory";
import { Messages } from "../messages";

import pkgLodash from "lodash";
const { toLower } = pkgLodash;

export class DefaultTelemetryFactory implements TelemetryFactory {
private readonly enableTelemetry: boolean;
private readonly telemetryTracesBackend: string;
Expand Down
Loading

0 comments on commit 69eb723

Please sign in to comment.