diff --git a/common/lib/internal_pooled_connection_provider.ts b/common/lib/internal_pooled_connection_provider.ts index f4f9678f..0de26740 100644 --- a/common/lib/internal_pooled_connection_provider.ts +++ b/common/lib/internal_pooled_connection_provider.ts @@ -43,10 +43,10 @@ import { logger } from "../logutils"; export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources { static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(10 * 60_000_000_000); // 10 minutes static readonly POOL_EXPIRATION_NANOS: bigint = BigInt(30 * 60_000_000_000); // 30 minutes - protected static databasePools: SlidingExpirationCache = new SlidingExpirationCache( + protected static databasePools: SlidingExpirationCache = new SlidingExpirationCache( InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS, - (pool: any) => pool.getActiveCount() === 0, - (pool: any) => pool.end() + (pool: AwsPoolClient) => pool.getActiveCount() === 0, + (pool: AwsPoolClient) => pool.end() ); private static readonly acceptedStrategies: Map = new Map([ @@ -122,16 +122,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide } async releaseResources() { - for (const [_key, value] of InternalPooledConnectionProvider.databasePools.entries) { - if (value.item) { - await value.item.releaseResources(); - } - } - InternalPooledConnectionProvider.clearDatabasePools(); - } - - static clearDatabasePools() { - InternalPooledConnectionProvider.databasePools.clear(); + await InternalPooledConnectionProvider.databasePools.clear(); } getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map): HostInfo { diff --git a/common/lib/plugins/efm/monitor_service.ts b/common/lib/plugins/efm/monitor_service.ts index ce480ca9..3e1aadb6 100644 --- a/common/lib/plugins/efm/monitor_service.ts +++ b/common/lib/plugins/efm/monitor_service.ts @@ -46,7 +46,9 @@ export class MonitorServiceImpl implements MonitorService { protected static readonly monitors: SlidingExpirationCache = new SlidingExpirationCache( MonitorServiceImpl.CACHE_CLEANUP_NANOS, undefined, - () => {} + async (monitor: Monitor) => { + await monitor.releaseResources(); + } ); private readonly pluginService: PluginService; private cachedMonitorHostKeys: Set | undefined; @@ -107,7 +109,7 @@ export class MonitorServiceImpl implements MonitorService { } stopMonitoringForAllConnections(hostKeys: Set) { - let monitor; + let monitor: Monitor; for (const hostKey of hostKeys) { monitor = MonitorServiceImpl.monitors.get(hostKey); if (monitor) { @@ -118,8 +120,8 @@ export class MonitorServiceImpl implements MonitorService { } async getMonitor(hostKeys: Set, hostInfo: HostInfo, properties: Map): Promise { - let monitor; - let anyHostKey; + let monitor: Monitor; + let anyHostKey: string; for (const hostKey of hostKeys) { monitor = MonitorServiceImpl.monitors.get(hostKey); anyHostKey = hostKey; @@ -158,16 +160,13 @@ export class MonitorServiceImpl implements MonitorService { } async releaseResources() { - for (const [key, monitor] of MonitorServiceImpl.monitors.entries) { - if (monitor.item) { - await monitor.item.releaseResources(); - } - } + await MonitorServiceImpl.monitors.clear(); this.cachedMonitorHostKeys = undefined; this.cachedMonitorRef = undefined; } - static clearMonitors() { - MonitorServiceImpl.monitors.clear(); + // Used for performance testing. + static async clearMonitors() { + await MonitorServiceImpl.monitors.clear(); } } diff --git a/common/lib/plugins/limitless/limitless_router_service.ts b/common/lib/plugins/limitless/limitless_router_service.ts index 85ddfa9f..a3770b70 100644 --- a/common/lib/plugins/limitless/limitless_router_service.ts +++ b/common/lib/plugins/limitless/limitless_router_service.ts @@ -254,7 +254,7 @@ export class LimitlessRouterServiceImpl implements LimitlessRouterService { } } - static clearMonitors() { - LimitlessRouterServiceImpl.monitors.clear(); + static async clearMonitors() { + await LimitlessRouterServiceImpl.monitors.clear(); } } diff --git a/common/lib/plugins/strategy/fastest_response/host_response_time_service.ts b/common/lib/plugins/strategy/fastest_response/host_response_time_service.ts index ea59087d..627a69b8 100644 --- a/common/lib/plugins/strategy/fastest_response/host_response_time_service.ts +++ b/common/lib/plugins/strategy/fastest_response/host_response_time_service.ts @@ -45,7 +45,7 @@ export class HostResponseTimeServiceImpl implements HostResponseTimeService { readonly intervalMs: number; protected hosts: HostInfo[]; private readonly telemetryFactory: TelemetryFactory; - protected static monitoringHosts: SlidingExpirationCache = new SlidingExpirationCache( + protected static monitoringHosts: SlidingExpirationCache = new SlidingExpirationCache( HostResponseTimeServiceImpl.CACHE_CLEANUP_NANOS, undefined, async (monitor: HostResponseTimeMonitor) => { diff --git a/common/lib/utils/sliding_expiration_cache.ts b/common/lib/utils/sliding_expiration_cache.ts index cc7d324b..e8ccf01b 100644 --- a/common/lib/utils/sliding_expiration_cache.ts +++ b/common/lib/utils/sliding_expiration_cache.ts @@ -39,11 +39,11 @@ class CacheItem { export class SlidingExpirationCache { private _cleanupIntervalNanos: bigint = BigInt(10 * 60_000_000_000); // 10 minutes private readonly _shouldDisposeFunc?: (item: V) => boolean; - private readonly _itemDisposalFunc?: (item: V) => void; + private readonly _itemDisposalFunc?: (item: V) => Promise; map: Map> = new Map>(); private _cleanupTimeNanos: bigint; - constructor(cleanupIntervalNanos: bigint, shouldDisposeFunc?: (item: V) => boolean, itemDisposalFunc?: (item: V) => void) { + constructor(cleanupIntervalNanos: bigint, shouldDisposeFunc?: (item: V) => boolean, itemDisposalFunc?: (item: V) => Promise) { this._cleanupIntervalNanos = cleanupIntervalNanos; this._shouldDisposeFunc = shouldDisposeFunc; this._itemDisposalFunc = itemDisposalFunc; @@ -116,7 +116,7 @@ export class SlidingExpirationCache { return cacheItem; }); - if (item != undefined && item != null && this._itemDisposalFunc != null) { + if (item != undefined && this._itemDisposalFunc != null) { this._itemDisposalFunc(item); } } @@ -128,10 +128,10 @@ export class SlidingExpirationCache { return getTimeInNanos() > cacheItem.expirationTimeNs; } - clear(): void { + async clear(): Promise { for (const [key, val] of this.map.entries()) { if (val !== undefined && this._itemDisposalFunc !== undefined) { - this._itemDisposalFunc(val.item); + await this._itemDisposalFunc(val.item); } } this.map.clear(); diff --git a/tests/integration/container/tests/performance.test.ts b/tests/integration/container/tests/performance.test.ts index 6144111e..e2d66679 100644 --- a/tests/integration/container/tests/performance.test.ts +++ b/tests/integration/container/tests/performance.test.ts @@ -219,7 +219,7 @@ async function doMeasurePerformance(sleepDelayMillis: number, repeatTimes: numbe try { await ProxyHelper.enableAllConnectivity(); await client.end(); - MonitorServiceImpl.clearMonitors(); + await MonitorServiceImpl.clearMonitors(); } catch (error: any) { // ignore } diff --git a/tests/unit/internal_pool_connection_provider.test.ts b/tests/unit/internal_pool_connection_provider.test.ts index ce486ef9..04195da1 100644 --- a/tests/unit/internal_pool_connection_provider.test.ts +++ b/tests/unit/internal_pool_connection_provider.test.ts @@ -26,7 +26,6 @@ import { HostListProvider } from "../../common/lib/host_list_provider/host_list_ import { WrapperProperties } from "../../common/lib/wrapper_property"; import { InternalPooledConnectionProvider } from "../../common/lib/internal_pooled_connection_provider"; import { AwsPoolConfig } from "../../common/lib/aws_pool_config"; -import { ConnectionProviderManager } from "../../common/lib/connection_provider_manager"; import { RdsUtils } from "../../common/lib/utils/rds_utils"; import { PoolKey } from "../../common/lib/utils/pool_key"; import { InternalPoolMapping } from "../../common/lib/utils/internal_pool_mapping"; @@ -52,6 +51,7 @@ const readerHost1Connection = builder.withHost(readerUrl1Connection).withPort(54 const readerHost2Connection = builder.withHost(readerUrl2Connection).withPort(5432).withRole(HostRole.READER).build(); const writerHostNoConnection = builder.withHost(writerUrlNoConnections).withPort(5432).withRole(HostRole.WRITER).build(); const testHostsList = [writerHostNoConnection, readerHost1Connection, readerHost2Connection]; +let providerSpy = null; function getTestPoolMap() { const target: SlidingExpirationCache = new SlidingExpirationCache(BigInt(10000000)); @@ -102,7 +102,7 @@ describe("internal pool connection provider test", () => { props.clear(); }); - afterEach(() => { + afterEach(async () => { reset(mockReaderClient); reset(mockAwsMySQLClient); reset(mockHostInfo); @@ -119,7 +119,7 @@ describe("internal pool connection provider test", () => { reset(mockRdsUtils); reset(mockHostListProvider); - InternalPooledConnectionProvider.clearDatabasePools(); + await providerSpy.releaseResources(); }); it("test connect with default mapping", async () => { @@ -142,7 +142,7 @@ describe("internal pool connection provider test", () => { const poolConfig: AwsPoolConfig = new AwsPoolConfig(config); const provider = spy(new InternalPooledConnectionProvider(poolConfig)); - const providerSpy = instance(provider); + providerSpy = instance(provider); when(await provider.getPoolConnection(anything(), anything())).thenReturn(mockPoolClientWrapper); await providerSpy.connect(hostInfo, mockPluginServiceInstance, props); @@ -181,7 +181,7 @@ describe("internal pool connection provider test", () => { const poolConfig: AwsPoolConfig = new AwsPoolConfig(config); const provider = spy(new InternalPooledConnectionProvider(poolConfig, myKeyFunc)); - const providerSpy = instance(provider); + providerSpy = instance(provider); when(await provider.getPoolConnection(anything(), anything())).thenReturn(mockPoolClientWrapper); await providerSpy.connect(hostInfo, mockPluginServiceInstance, props); @@ -196,7 +196,7 @@ describe("internal pool connection provider test", () => { it("test random strategy", async () => { const provider = spy(new InternalPooledConnectionProvider(mockPoolConfig)); - const providerSpy = instance(provider); + providerSpy = instance(provider); providerSpy.setDatabasePools(getTestPoolMap()); const selectedHost = providerSpy.getHostInfoByStrategy(testHostsList, HostRole.READER, "random", props); expect(selectedHost.host === readerUrl1Connection || selectedHost.host === readerUrl2Connection).toBeTruthy(); @@ -204,7 +204,7 @@ describe("internal pool connection provider test", () => { it("test least connection strategy", async () => { const provider = spy(new InternalPooledConnectionProvider(mockPoolConfig)); - const providerSpy = instance(provider); + providerSpy = instance(provider); providerSpy.setDatabasePools(getTestPoolMap()); when(internalPoolWithOneConnection.getActiveCount()).thenReturn(1); @@ -227,7 +227,7 @@ describe("internal pool connection provider test", () => { when(mockDriverDialect.getAwsPoolClient(anything())).thenThrow(new Error("testError")); const provider = spy(new InternalPooledConnectionProvider(poolConfig)); - const providerSpy = instance(provider); + providerSpy = instance(provider); when(await provider.getPoolConnection(anything(), anything())).thenReturn(mockPoolClientWrapper); await expect(providerSpy.connect(hostInfo, mockPluginServiceInstance, props)).rejects.toThrow("testError"); diff --git a/tests/unit/sliding_expiration_cache.test.ts b/tests/unit/sliding_expiration_cache.test.ts index 38bf7356..03d1003c 100644 --- a/tests/unit/sliding_expiration_cache.test.ts +++ b/tests/unit/sliding_expiration_cache.test.ts @@ -25,7 +25,7 @@ class DisposableItem { this.disposed = false; } - dispose() { + async dispose() { this.disposed = true; } } @@ -52,7 +52,7 @@ describe("test_sliding_expiration_cache", () => { const target = new SlidingExpirationCache( BigInt(50_000_000), (item: DisposableItem) => item.shouldDispose, - (item) => item.dispose() + async (item) => item.dispose() ); const itemToRemove = new DisposableItem(true); let result = target.computeIfAbsent("itemToRemove", () => itemToRemove, BigInt(15_000_000_000)); @@ -89,7 +89,7 @@ describe("test_sliding_expiration_cache", () => { const target = new SlidingExpirationCache( BigInt(50_000_000), (item: DisposableItem) => item.shouldDispose, - (item) => item.dispose() + async (item) => item.dispose() ); const item1 = new DisposableItem(false); const item2 = new DisposableItem(false); @@ -101,7 +101,7 @@ describe("test_sliding_expiration_cache", () => { expect(target.get(1)).toEqual(item1); expect(target.get(2)).toEqual(item2); - target.clear(); + await target.clear(); expect(target.size).toEqual(0); expect(target.get(1)).toEqual(undefined);