Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/datastore/DataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export enum StoreName {
private_schemas = 'private_schemas',
}

export const PersistedStores = [StoreName.public_schemas, StoreName.sam_schemas];
export const PersistedStores: ReadonlyArray<StoreName> = [StoreName.public_schemas, StoreName.sam_schemas];

export interface DataStore {
get<T>(key: string): T | undefined;
Expand All @@ -33,7 +33,7 @@ export interface DataStore {
export interface DataStoreFactory extends Closeable {
get(store: StoreName): DataStore;

storeNames(): ReadonlyArray<string>;
storeNames: ReadonlyArray<string>;

close(): Promise<void>;
}
Expand Down
9 changes: 4 additions & 5 deletions src/datastore/FileStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ export class FileStoreFactory implements DataStoreFactory {
private readonly metricsInterval: NodeJS.Timeout;
private readonly timeout: NodeJS.Timeout;

constructor(rootDir: string, storeNames: StoreName[] = PersistedStores) {
constructor(
rootDir: string,
public readonly storeNames = PersistedStores,
) {
this.log = LoggerFactory.getLogger('FileStore.Global');

this.fileDbRoot = join(rootDir, 'filedb');
Expand Down Expand Up @@ -56,10 +59,6 @@ export class FileStoreFactory implements DataStoreFactory {
return val;
}

storeNames(): ReadonlyArray<string> {
return [...this.stores.keys()];
}

close(): Promise<void> {
clearTimeout(this.timeout);
clearInterval(this.metricsInterval);
Expand Down
186 changes: 137 additions & 49 deletions src/datastore/LMDB.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { readdirSync, rmSync } from 'fs';
import { existsSync, readdirSync, rmSync } from 'fs';
import { join } from 'path';
import { open, RootDatabase, RootDatabaseOptionsWithPath } from 'lmdb';
import { LoggerFactory } from '../telemetry/LoggerFactory';
import { ScopedTelemetry } from '../telemetry/ScopedTelemetry';
import { Telemetry } from '../telemetry/TelemetryDecorator';
import { isWindows } from '../utils/Environment';
import { extractErrorMessage } from '../utils/Errors';
import { formatNumber, toString } from '../utils/String';
import { DataStore, DataStoreFactory, PersistedStores, StoreName } from './DataStore';
import { LMDBStore } from './lmdb/LMDBStore';
Expand All @@ -18,35 +19,34 @@ export class LMDBStoreFactory implements DataStoreFactory {
private readonly lmdbDir: string;
private readonly timeout: NodeJS.Timeout;
private readonly metricsInterval: NodeJS.Timeout;
private readonly env: RootDatabase;

private env: RootDatabase;
private openPid = process.pid;
private closed = false;

private readonly stores = new Map<StoreName, LMDBStore>();

constructor(rootDir: string, storeNames: StoreName[] = PersistedStores) {
constructor(
rootDir: string,
public readonly storeNames = PersistedStores,
) {
this.lmdbDir = join(rootDir, 'lmdb');

const config: RootDatabaseOptionsWithPath = {
path: join(this.lmdbDir, Version),
maxDbs: 10,
mapSize: TotalMaxDbSize,
encoding: Encoding,
encryptionKey: encryptionStrategy(VersionNumber),
};

if (isWindows) {
config.noSubdir = false;
config.overlappingSync = false;
}

this.env = open(config);
const { env, config } = createEnv(this.lmdbDir);
this.env = env;

for (const store of storeNames) {
const database = this.env.openDB<unknown, string>({
name: store,
encoding: Encoding,
});

this.stores.set(store, new LMDBStore(store, database));
const database = createDB(this.env, store);

this.stores.set(
store,
new LMDBStore(
store,
database,
(e) => this.handleError(e),
() => this.ensureValidEnv(),
),
);
}

this.metricsInterval = setInterval(() => {
Expand Down Expand Up @@ -81,20 +81,85 @@ export class LMDBStoreFactory implements DataStoreFactory {
return val;
}

storeNames(): ReadonlyArray<string> {
return [...this.stores.keys()];
}

async close(): Promise<void> {
// Clear the stores map but don't close individual stores
// LMDB will close them when we close the environment
if (this.closed) return;
this.closed = true;

clearInterval(this.metricsInterval);
clearTimeout(this.timeout);
this.stores.clear();
await this.env.close();
}

private handleError(error: unknown): void {
if (this.closed) return;
const msg = extractErrorMessage(error);

if (msg.includes('MDB_BAD_RSLOT') || msg.includes("doesn't match env pid")) {
this.recoverFromFork();
} else if (
msg.includes('MDB_CURSOR_FULL') ||
msg.includes('MDB_CORRUPTED') ||
msg.includes('MDB_PAGE_NOTFOUND') ||
msg.includes('MDB_BAD_TXN') ||
msg.includes('Commit failed') ||
msg.includes('closed database')
) {
this.recoverFromCorruption();
}
}

private ensureValidEnv(): void {
if (process.pid !== this.openPid) {
this.telemetry.count('process.fork', 1);
this.log.warn({ oldPid: this.openPid, newPid: process.pid }, 'Process fork detected');
this.reopenEnv();

// Update all stores with new handles
for (const store of this.storeNames) {
this.stores.get(store)?.updateStore(createDB(this.env, store));
}
}
}

private recoverFromFork(): void {
this.telemetry.count('forked', 1);
this.log.warn({ oldPid: this.openPid, newPid: process.pid }, 'Fork detected, reopening LMDB');
this.reopenEnv();
this.recreateStores();
}

private recoverFromCorruption(): void {
this.telemetry.count('corrupted', 1);
this.log.warn('Corruption detected, reopening LMDB');
this.reopenEnv();
this.recreateStores();
}

private reopenEnv(): void {
this.env = createEnv(this.lmdbDir).env;
this.openPid = process.pid;
this.log.warn('Recreated LMDB environment');
}

private recreateStores(): void {
for (const name of this.storeNames) {
const database = this.env.openDB<unknown, string>({ name, encoding: Encoding });
this.stores.set(
name,
new LMDBStore(
name,
database,
(e) => this.handleError(e),
() => this.ensureValidEnv(),
),
);
}
}

private cleanupOldVersions(): void {
if (this.closed || !existsSync(this.lmdbDir)) return;

const entries = readdirSync(this.lmdbDir, { withFileTypes: true });
for (const entry of entries) {
try {
Expand All @@ -110,31 +175,30 @@ export class LMDBStoreFactory implements DataStoreFactory {
}

private emitMetrics(): void {
const totalBytes = this.totalBytes();
if (this.closed) return;

const envStat = stats(this.env);
this.telemetry.histogram('version', VersionNumber);
this.telemetry.histogram('env.size.bytes', envStat.totalSize, { unit: 'By' });
this.telemetry.histogram('env.max.size.bytes', envStat.maxSize, {
unit: 'By',
});
this.telemetry.histogram('env.entries', envStat.entries);
try {
const totalBytes = this.totalBytes();

for (const [name, store] of this.stores.entries()) {
const stat = store.stats();

this.telemetry.histogram(`store.${name}.size.bytes`, stat.totalSize, {
const envStat = stats(this.env);
this.telemetry.histogram('version', VersionNumber);
this.telemetry.histogram('env.size.bytes', envStat.totalSize, { unit: 'By' });
this.telemetry.histogram('env.max.size.bytes', envStat.maxSize, {
unit: 'By',
});
this.telemetry.histogram(`store.${name}.entries`, stat.entries);
}
this.telemetry.histogram('env.entries', envStat.entries);

this.telemetry.histogram('total.usage', 100 * (totalBytes / TotalMaxDbSize), {
unit: '%',
});
this.telemetry.histogram('total.size.bytes', totalBytes, {
unit: 'By',
});
for (const [name, store] of this.stores.entries()) {
const stat = store.stats();
this.telemetry.histogram(`store.${name}.size.bytes`, stat.totalSize, { unit: 'By' });
this.telemetry.histogram(`store.${name}.entries`, stat.entries);
}

this.telemetry.histogram('total.usage', 100 * (totalBytes / TotalMaxDbSize), { unit: '%' });
this.telemetry.histogram('total.size.bytes', totalBytes, { unit: 'By' });
} catch (e) {
this.handleError(e);
}
}

private totalBytes() {
Expand All @@ -153,3 +217,27 @@ const VersionNumber = 5;
const Version = `v${VersionNumber}`;
const Encoding: 'msgpack' | 'json' | 'string' | 'binary' | 'ordered-binary' = 'msgpack';
const TotalMaxDbSize = 250 * 1024 * 1024; // 250MB max size

function createEnv(lmdbDir: string) {
const config: RootDatabaseOptionsWithPath = {
path: join(lmdbDir, Version),
maxDbs: 10,
mapSize: TotalMaxDbSize,
encoding: Encoding,
encryptionKey: encryptionStrategy(VersionNumber),
};

if (isWindows) {
config.noSubdir = false;
config.overlappingSync = false;
}

return {
config,
env: open(config),
};
}

function createDB(env: RootDatabase, name: string) {
return env.openDB<unknown, string>({ name, encoding: Encoding });
}
2 changes: 1 addition & 1 deletion src/datastore/MemoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class MemoryStoreFactory implements DataStoreFactory {
return val;
}

storeNames(): ReadonlyArray<string> {
get storeNames(): ReadonlyArray<string> {
return [...this.stores.keys()];
}

Expand Down
59 changes: 39 additions & 20 deletions src/datastore/lmdb/LMDBStore.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,72 @@
import { Database } from 'lmdb';
import { Logger } from 'pino';
import { LoggerFactory } from '../../telemetry/LoggerFactory';
import { ScopedTelemetry } from '../../telemetry/ScopedTelemetry';
import { TelemetryService } from '../../telemetry/TelemetryService';
import { DataStore, StoreName } from '../DataStore';
import { stats, StoreStatsType } from './Stats';

type ErrorHandler = (error: unknown) => void;

export class LMDBStore implements DataStore {
private readonly telemetry: ScopedTelemetry;

constructor(
public readonly name: StoreName,
protected readonly store: Database<unknown, string>,
private readonly log: Logger = LoggerFactory.getLogger(`LMDB.${name}`),
private store: Database<unknown, string>,
private readonly onError?: ErrorHandler,
private readonly validateDatabase?: () => void,
) {
this.telemetry = TelemetryService.instance.get(`LMDB.${name}`);
}

get<T>(key: string): T | undefined {
return this.telemetry.countExecution('get', () => {
return this.store.get(key) as T | undefined;
updateStore(store: Database<unknown, string>) {
this.store = store;
}

private exec<T>(op: string, fn: () => T): T {
return this.telemetry.measure(op, () => {
try {
this.validateDatabase?.();
return fn();
} catch (e) {
this.onError?.(e);
throw e;
}
});
}

put<T>(key: string, value: T): Promise<boolean> {
return this.telemetry.measureAsync('put', () => {
return this.store.put(key, value);
private async execAsync<T>(op: string, fn: () => Promise<T>): Promise<T> {
return await this.telemetry.measureAsync(op, async () => {
try {
this.validateDatabase?.();
return await fn();
} catch (e) {
this.onError?.(e);
throw e;
}
});
}

get<T>(key: string): T | undefined {
return this.exec('get', () => this.store.get(key) as T | undefined);
}

put<T>(key: string, value: T): Promise<boolean> {
return this.execAsync('put', () => this.store.put(key, value));
}

remove(key: string): Promise<boolean> {
return this.telemetry.countExecutionAsync('remove', () => {
return this.store.remove(key);
});
return this.execAsync('remove', () => this.store.remove(key));
}

clear(): Promise<void> {
return this.telemetry.countExecutionAsync('clear', () => {
return this.store.clearAsync();
});
return this.execAsync('clear', () => this.store.clearAsync());
}

keys(limit: number): ReadonlyArray<string> {
return this.telemetry.countExecution('keys', () => {
return this.store.getKeys({ limit }).asArray;
});
return this.exec('keys', () => this.store.getKeys({ limit }).asArray);
}

stats(): StoreStatsType {
return stats(this.store);
return this.exec('stats', () => stats(this.store));
}
}
Loading
Loading