Skip to content

Commit

Permalink
refactor: make clear async in sliding expiration cache
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq committed Jan 29, 2025
1 parent 132fd5d commit 0682fce
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 45 deletions.
17 changes: 4 additions & 13 deletions common/lib/internal_pooled_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import { logger } from "../logutils";
export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources {
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(10 * 60_000_000_000); // 10 minutes
static readonly POOL_EXPIRATION_NANOS: bigint = BigInt(30 * 60_000_000_000); // 30 minutes
protected static databasePools: SlidingExpirationCache<string, any> = new SlidingExpirationCache(
protected static databasePools: SlidingExpirationCache<string, AwsPoolClient> = new SlidingExpirationCache(
InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS,
(pool: any) => pool.getActiveCount() === 0,
(pool: any) => pool.end()
(pool: AwsPoolClient) => pool.getActiveCount() === 0,
(pool: AwsPoolClient) => pool.end()
);

private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
Expand Down Expand Up @@ -122,16 +122,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
}

async releaseResources() {
for (const [_key, value] of InternalPooledConnectionProvider.databasePools.entries) {
if (value.item) {
await value.item.releaseResources();
}
}
InternalPooledConnectionProvider.clearDatabasePools();
}

static clearDatabasePools() {
InternalPooledConnectionProvider.databasePools.clear();
await InternalPooledConnectionProvider.databasePools.clear();
}

getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {
Expand Down
21 changes: 10 additions & 11 deletions common/lib/plugins/efm/monitor_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ export class MonitorServiceImpl implements MonitorService {
protected static readonly monitors: SlidingExpirationCache<string, Monitor> = new SlidingExpirationCache(
MonitorServiceImpl.CACHE_CLEANUP_NANOS,
undefined,
() => {}
async (monitor: Monitor) => {
await monitor.releaseResources();
}
);
private readonly pluginService: PluginService;
private cachedMonitorHostKeys: Set<string> | undefined;
Expand Down Expand Up @@ -107,7 +109,7 @@ export class MonitorServiceImpl implements MonitorService {
}

stopMonitoringForAllConnections(hostKeys: Set<string>) {
let monitor;
let monitor: Monitor;
for (const hostKey of hostKeys) {
monitor = MonitorServiceImpl.monitors.get(hostKey);
if (monitor) {
Expand All @@ -118,8 +120,8 @@ export class MonitorServiceImpl implements MonitorService {
}

async getMonitor(hostKeys: Set<string>, hostInfo: HostInfo, properties: Map<string, any>): Promise<Monitor | null> {
let monitor;
let anyHostKey;
let monitor: Monitor;
let anyHostKey: string;
for (const hostKey of hostKeys) {
monitor = MonitorServiceImpl.monitors.get(hostKey);
anyHostKey = hostKey;
Expand Down Expand Up @@ -158,16 +160,13 @@ export class MonitorServiceImpl implements MonitorService {
}

async releaseResources() {
for (const [key, monitor] of MonitorServiceImpl.monitors.entries) {
if (monitor.item) {
await monitor.item.releaseResources();
}
}
await MonitorServiceImpl.monitors.clear();
this.cachedMonitorHostKeys = undefined;
this.cachedMonitorRef = undefined;
}

static clearMonitors() {
MonitorServiceImpl.monitors.clear();
// Used for performance testing.
static async clearMonitors() {
await MonitorServiceImpl.monitors.clear();
}
}
4 changes: 2 additions & 2 deletions common/lib/plugins/limitless/limitless_router_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ export class LimitlessRouterServiceImpl implements LimitlessRouterService {
}
}

static clearMonitors() {
LimitlessRouterServiceImpl.monitors.clear();
static async clearMonitors() {
await LimitlessRouterServiceImpl.monitors.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export class HostResponseTimeServiceImpl implements HostResponseTimeService {
readonly intervalMs: number;
protected hosts: HostInfo[];
private readonly telemetryFactory: TelemetryFactory;
protected static monitoringHosts: SlidingExpirationCache<string, any> = new SlidingExpirationCache(
protected static monitoringHosts: SlidingExpirationCache<string, HostResponseTimeMonitor> = new SlidingExpirationCache(
HostResponseTimeServiceImpl.CACHE_CLEANUP_NANOS,
undefined,
async (monitor: HostResponseTimeMonitor) => {
Expand Down
10 changes: 5 additions & 5 deletions common/lib/utils/sliding_expiration_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ class CacheItem<V> {
export class SlidingExpirationCache<K, V> {
private _cleanupIntervalNanos: bigint = BigInt(10 * 60_000_000_000); // 10 minutes
private readonly _shouldDisposeFunc?: (item: V) => boolean;
private readonly _itemDisposalFunc?: (item: V) => void;
private readonly _itemDisposalFunc?: (item: V) => Promise<void>;
map: Map<K, CacheItem<V>> = new Map<K, CacheItem<V>>();
private _cleanupTimeNanos: bigint;

constructor(cleanupIntervalNanos: bigint, shouldDisposeFunc?: (item: V) => boolean, itemDisposalFunc?: (item: V) => void) {
constructor(cleanupIntervalNanos: bigint, shouldDisposeFunc?: (item: V) => boolean, itemDisposalFunc?: (item: V) => Promise<void>) {
this._cleanupIntervalNanos = cleanupIntervalNanos;
this._shouldDisposeFunc = shouldDisposeFunc;
this._itemDisposalFunc = itemDisposalFunc;
Expand Down Expand Up @@ -116,7 +116,7 @@ export class SlidingExpirationCache<K, V> {
return cacheItem;
});

if (item != undefined && item != null && this._itemDisposalFunc != null) {
if (item != undefined && this._itemDisposalFunc != null) {
this._itemDisposalFunc(item);
}
}
Expand All @@ -128,10 +128,10 @@ export class SlidingExpirationCache<K, V> {
return getTimeInNanos() > cacheItem.expirationTimeNs;
}

clear(): void {
async clear(): Promise<void> {
for (const [key, val] of this.map.entries()) {
if (val !== undefined && this._itemDisposalFunc !== undefined) {
this._itemDisposalFunc(val.item);
await this._itemDisposalFunc(val.item);
}
}
this.map.clear();
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/container/tests/performance.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async function doMeasurePerformance(sleepDelayMillis: number, repeatTimes: numbe
try {
await ProxyHelper.enableAllConnectivity();
await client.end();
MonitorServiceImpl.clearMonitors();
await MonitorServiceImpl.clearMonitors();
} catch (error: any) {
// ignore
}
Expand Down
16 changes: 8 additions & 8 deletions tests/unit/internal_pool_connection_provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import { HostListProvider } from "../../common/lib/host_list_provider/host_list_
import { WrapperProperties } from "../../common/lib/wrapper_property";
import { InternalPooledConnectionProvider } from "../../common/lib/internal_pooled_connection_provider";
import { AwsPoolConfig } from "../../common/lib/aws_pool_config";
import { ConnectionProviderManager } from "../../common/lib/connection_provider_manager";
import { RdsUtils } from "../../common/lib/utils/rds_utils";
import { PoolKey } from "../../common/lib/utils/pool_key";
import { InternalPoolMapping } from "../../common/lib/utils/internal_pool_mapping";
Expand All @@ -52,6 +51,7 @@ const readerHost1Connection = builder.withHost(readerUrl1Connection).withPort(54
const readerHost2Connection = builder.withHost(readerUrl2Connection).withPort(5432).withRole(HostRole.READER).build();
const writerHostNoConnection = builder.withHost(writerUrlNoConnections).withPort(5432).withRole(HostRole.WRITER).build();
const testHostsList = [writerHostNoConnection, readerHost1Connection, readerHost2Connection];
let providerSpy = null;

function getTestPoolMap() {
const target: SlidingExpirationCache<string, any> = new SlidingExpirationCache(BigInt(10000000));
Expand Down Expand Up @@ -102,7 +102,7 @@ describe("internal pool connection provider test", () => {
props.clear();
});

afterEach(() => {
afterEach(async () => {
reset(mockReaderClient);
reset(mockAwsMySQLClient);
reset(mockHostInfo);
Expand All @@ -119,7 +119,7 @@ describe("internal pool connection provider test", () => {
reset(mockRdsUtils);
reset(mockHostListProvider);

InternalPooledConnectionProvider.clearDatabasePools();
await providerSpy.releaseResources();
});

it("test connect with default mapping", async () => {
Expand All @@ -142,7 +142,7 @@ describe("internal pool connection provider test", () => {
const poolConfig: AwsPoolConfig = new AwsPoolConfig(config);

const provider = spy(new InternalPooledConnectionProvider(poolConfig));
const providerSpy = instance(provider);
providerSpy = instance(provider);
when(await provider.getPoolConnection(anything(), anything())).thenReturn(mockPoolClientWrapper);

await providerSpy.connect(hostInfo, mockPluginServiceInstance, props);
Expand Down Expand Up @@ -181,7 +181,7 @@ describe("internal pool connection provider test", () => {
const poolConfig: AwsPoolConfig = new AwsPoolConfig(config);

const provider = spy(new InternalPooledConnectionProvider(poolConfig, myKeyFunc));
const providerSpy = instance(provider);
providerSpy = instance(provider);
when(await provider.getPoolConnection(anything(), anything())).thenReturn(mockPoolClientWrapper);

await providerSpy.connect(hostInfo, mockPluginServiceInstance, props);
Expand All @@ -196,15 +196,15 @@ describe("internal pool connection provider test", () => {

it("test random strategy", async () => {
const provider = spy(new InternalPooledConnectionProvider(mockPoolConfig));
const providerSpy = instance(provider);
providerSpy = instance(provider);
providerSpy.setDatabasePools(getTestPoolMap());
const selectedHost = providerSpy.getHostInfoByStrategy(testHostsList, HostRole.READER, "random", props);
expect(selectedHost.host === readerUrl1Connection || selectedHost.host === readerUrl2Connection).toBeTruthy();
});

it("test least connection strategy", async () => {
const provider = spy(new InternalPooledConnectionProvider(mockPoolConfig));
const providerSpy = instance(provider);
providerSpy = instance(provider);
providerSpy.setDatabasePools(getTestPoolMap());
when(internalPoolWithOneConnection.getActiveCount()).thenReturn(1);

Expand All @@ -227,7 +227,7 @@ describe("internal pool connection provider test", () => {
when(mockDriverDialect.getAwsPoolClient(anything())).thenThrow(new Error("testError"));

const provider = spy(new InternalPooledConnectionProvider(poolConfig));
const providerSpy = instance(provider);
providerSpy = instance(provider);
when(await provider.getPoolConnection(anything(), anything())).thenReturn(mockPoolClientWrapper);

await expect(providerSpy.connect(hostInfo, mockPluginServiceInstance, props)).rejects.toThrow("testError");
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/sliding_expiration_cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class DisposableItem {
this.disposed = false;
}

dispose() {
async dispose() {
this.disposed = true;
}
}
Expand All @@ -52,7 +52,7 @@ describe("test_sliding_expiration_cache", () => {
const target = new SlidingExpirationCache(
BigInt(50_000_000),
(item: DisposableItem) => item.shouldDispose,
(item) => item.dispose()
async (item) => item.dispose()
);
const itemToRemove = new DisposableItem(true);
let result = target.computeIfAbsent("itemToRemove", () => itemToRemove, BigInt(15_000_000_000));
Expand Down Expand Up @@ -89,7 +89,7 @@ describe("test_sliding_expiration_cache", () => {
const target = new SlidingExpirationCache(
BigInt(50_000_000),
(item: DisposableItem) => item.shouldDispose,
(item) => item.dispose()
async (item) => item.dispose()
);
const item1 = new DisposableItem(false);
const item2 = new DisposableItem(false);
Expand All @@ -101,7 +101,7 @@ describe("test_sliding_expiration_cache", () => {
expect(target.get(1)).toEqual(item1);
expect(target.get(2)).toEqual(item2);

target.clear();
await target.clear();

expect(target.size).toEqual(0);
expect(target.get(1)).toEqual(undefined);
Expand Down

0 comments on commit 0682fce

Please sign in to comment.