Skip to content

Commit

Permalink
fix: internal pool connection hanging (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
joyc-bq authored Oct 24, 2024
1 parent 14f20f4 commit 737571e
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 35 deletions.
2 changes: 1 addition & 1 deletion common/lib/aws_pool_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export interface AwsPoolClient {

end(poolClient: any): Promise<any>;

releaseResources(): void;
releaseResources(): Promise<void>;

getIdleCount(): number;

Expand Down
2 changes: 0 additions & 2 deletions common/lib/aws_pool_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ export class AwsPoolConfig {
readonly maxConnections?: number | undefined;
readonly minConnections?: number | undefined;
readonly idleTimeoutMillis?: number | undefined;
readonly connectionTimeoutMillis?: number | undefined;
readonly allowExitOnIdle?: boolean | undefined;
readonly maxIdleConnections?: number | undefined;

constructor(props?: any) {
this.maxConnections = props.maxConnections ?? 10;
this.idleTimeoutMillis = props.idleTimeoutMillis ?? 60000;
this.connectionTimeoutMillis = props.connectionTimeoutMillis ?? 60000;
this.maxIdleConnections = props.maxIdleConnections ?? 10;
this.allowExitOnIdle = props.allowExitOnIdle ?? false;
this.minConnections = props.minConnections ?? 0;
Expand Down
10 changes: 4 additions & 6 deletions common/lib/internal_pooled_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
}

public async releaseResources() {
this.internalPool?.releaseResources();
for (const [key, val] of this.databasePools.entries) {
val.item.releaseResources();
}
this.databasePools.clear();
}

Expand All @@ -151,7 +153,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide

getHostUrlSet(): Set<string> {
const hostUrls: Set<string> = new Set<string>();
for (const [key, val] of this.databasePools.entries) {
for (const [key, _val] of this.databasePools.entries) {
hostUrls.add(key.getUrl());
}
return hostUrls;
Expand Down Expand Up @@ -186,8 +188,4 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
setDatabasePools(connectionPools: SlidingExpirationCache<PoolKey, any>): void {
this.databasePools = connectionPools;
}

getTargetName(): string {
return this.constructor.name;
}
}
4 changes: 2 additions & 2 deletions mysql/lib/dialect/mysql_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ export class MySQLDatabaseDialect implements DatabaseDialect {
return finalPoolConfig;
}

end(clientWrapper: ClientWrapper): Promise<void> {
return clientWrapper.client.end();
async end(clientWrapper: ClientWrapper): Promise<void> {
return await clientWrapper.client.end();
}
}
4 changes: 3 additions & 1 deletion mysql/lib/mysql_error_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ export class MySQLErrorHandler implements ErrorHandler {
e.message.includes("Connection lost: The server closed the connection.") ||
e.message.includes("Query inactivity timeout") ||
e.message.includes("Can't add new command when connection is in closed state") ||
e.message.includes(Messages.get("ClientUtils.queryTaskTimeout"))
e.message.includes(Messages.get("ClientUtils.queryTaskTimeout")) ||
// Pooled connection network errors
e.message.includes("connect ETIMEDOUT")
);
}
}
5 changes: 2 additions & 3 deletions mysql/lib/mysql_pool_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
limitations under the License.
*/

import { ClientWrapper } from "../../common/lib/client_wrapper";
import { createPool, PoolOptions, Pool } from "mysql2/promise";
import { createPool, PoolOptions } from "mysql2/promise";
import { AwsPoolClient } from "../../common/lib/aws_pool_client";
import { Messages } from "../../common/lib/utils/messages";
import { AwsWrapperError } from "../../common/lib/utils/errors";
Expand Down Expand Up @@ -47,7 +46,7 @@ export class AwsMysqlPoolClient implements AwsPoolClient {
return this.targetPool.pool._allConnections.length;
}

async releaseResources() {
async releaseResources(): Promise<void> {
await this.targetPool.end();
}
}
5 changes: 2 additions & 3 deletions pg/lib/dialect/pg_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,11 @@ export class PgDatabaseDialect implements DatabaseDialect {
finalPoolConfig.max = poolConfig?.maxConnections;
finalPoolConfig.min = poolConfig?.minConnections;
finalPoolConfig.idleTimeoutMillis = poolConfig?.idleTimeoutMillis;
finalPoolConfig.connectionTimeoutMillis = poolConfig?.connectionTimeoutMillis;
finalPoolConfig.allowExitOnIdle = poolConfig?.allowExitOnIdle;
return finalPoolConfig;
}

end(clientWrapper: ClientWrapper): Promise<void> {
return clientWrapper.client.end();
async end(clientWrapper: ClientWrapper): Promise<void> {
return await clientWrapper.client.end();
}
}
3 changes: 2 additions & 1 deletion pg/lib/pg_error_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ export class PgErrorHandler implements ErrorHandler {
return (
e.message.includes("Connection terminated unexpectedly") ||
e.message.includes("Client has encountered a connection error and is not queryable") ||
e.message.includes("Query read timeout")
e.message.includes("Query read timeout") ||
e.message.includes("Connection terminated due to connection timeout")
);
}
}
8 changes: 2 additions & 6 deletions pg/lib/pg_pool_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
limitations under the License.
*/

import { ClientWrapper } from "../../common/lib/client_wrapper";
import { Pool, PoolClient, PoolConfig } from "pg";
import { AwsPoolClient } from "../../common/lib/aws_pool_client";
import { Messages } from "../../common/lib/utils/messages";
Expand All @@ -41,10 +40,7 @@ export class AwsPgPoolClient implements AwsPoolClient {
}

async end(poolClient: any) {
if (poolClient == undefined) {
return;
}
await poolClient.release(true);
await poolClient?.release(true);
}

getIdleCount(): number {
Expand All @@ -55,7 +51,7 @@ export class AwsPgPoolClient implements AwsPoolClient {
return this.targetPool.totalCount;
}

async releaseResources() {
async releaseResources(): Promise<void> {
await this.targetPool.end();
}
}
13 changes: 7 additions & 6 deletions tests/integration/container/tests/read_write_splitting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ let initClientFunc: (props: any) => any;
let client: any;
let secondaryClient: any;
let auroraTestUtility: AuroraTestUtility;
let provider: any;
let provider: InternalPooledConnectionProvider | null;

async function initDefaultConfig(host: string, port: number, connectToProxy: boolean): Promise<any> {
let config: any = {
Expand Down Expand Up @@ -100,7 +100,6 @@ describe("aurora read write splitting", () => {
if (client !== null) {
try {
await client.end();
await client.releaseResources();
} catch (error) {
// pass
}
Expand Down Expand Up @@ -524,9 +523,7 @@ describe("aurora read write splitting", () => {
client = initClientFunc(config);
secondaryClient = initClientFunc(config);

const provider = new InternalPooledConnectionProvider(
new AwsPoolConfig({ minConnections: 0, maxConnections: 10, maxIdleConnections: 10, connectionTimeoutMillis: 10000 })
);
const provider = new InternalPooledConnectionProvider(new AwsPoolConfig({ minConnections: 0, maxConnections: 10, maxIdleConnections: 10 }));

ConnectionProviderManager.setConnectionProvider(provider);

Expand Down Expand Up @@ -567,7 +564,11 @@ describe("aurora read write splitting", () => {
logger.debug(error.stack);
});

provider = new InternalPooledConnectionProvider();
provider = new InternalPooledConnectionProvider({
minConnections: 0,
maxConnections: 10,
maxIdleConnections: 10
});
ConnectionProviderManager.setConnectionProvider(provider);

await client.connect();
Expand Down
6 changes: 2 additions & 4 deletions tests/unit/internal_pool_connection_provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ describe("reader write splitting test", () => {
when(mockDialect.getAwsPoolClient(anything())).thenReturn(mockAwsPoolClient);
const config = {
maxConnection: 10,
idleTimeoutMillis: 10000,
connectionTimeoutMillis: 10000
idleTimeoutMillis: 60000
};
when(mockDialect.preparePoolClientProperties(anything(), anything())).thenReturn(config);
const poolConfig: AwsPoolConfig = new AwsPoolConfig(config);
Expand Down Expand Up @@ -149,8 +148,7 @@ describe("reader write splitting test", () => {
when(mockDialect.getAwsPoolClient(anything())).thenReturn(mockAwsPoolClient);
const config = {
maxConnection: 10,
idleTimeoutMillis: 10000,
connectionTimeoutMillis: 10000
idleTimeoutMillis: 60000
};
const myKeyFunc: InternalPoolMapping = {
getKey: (hostInfo: HostInfo, props: Map<string, any>) => {
Expand Down

0 comments on commit 737571e

Please sign in to comment.