diff --git a/src/datastore/DataStore.ts b/src/datastore/DataStore.ts index bad48c0f..d3fcddd6 100644 --- a/src/datastore/DataStore.ts +++ b/src/datastore/DataStore.ts @@ -16,7 +16,7 @@ export enum StoreName { private_schemas = 'private_schemas', } -export const PersistedStores = [StoreName.public_schemas, StoreName.sam_schemas]; +export const PersistedStores: ReadonlyArray = [StoreName.public_schemas, StoreName.sam_schemas]; export interface DataStore { get(key: string): T | undefined; @@ -33,7 +33,7 @@ export interface DataStore { export interface DataStoreFactory extends Closeable { get(store: StoreName): DataStore; - storeNames(): ReadonlyArray; + storeNames: ReadonlyArray; close(): Promise; } diff --git a/src/datastore/FileStore.ts b/src/datastore/FileStore.ts index 433d9126..22aec9eb 100644 --- a/src/datastore/FileStore.ts +++ b/src/datastore/FileStore.ts @@ -20,7 +20,10 @@ export class FileStoreFactory implements DataStoreFactory { private readonly metricsInterval: NodeJS.Timeout; private readonly timeout: NodeJS.Timeout; - constructor(rootDir: string, storeNames: StoreName[] = PersistedStores) { + constructor( + rootDir: string, + public readonly storeNames = PersistedStores, + ) { this.log = LoggerFactory.getLogger('FileStore.Global'); this.fileDbRoot = join(rootDir, 'filedb'); @@ -56,10 +59,6 @@ export class FileStoreFactory implements DataStoreFactory { return val; } - storeNames(): ReadonlyArray { - return [...this.stores.keys()]; - } - close(): Promise { clearTimeout(this.timeout); clearInterval(this.metricsInterval); diff --git a/src/datastore/LMDB.ts b/src/datastore/LMDB.ts index 159c30e3..89fc0b9d 100644 --- a/src/datastore/LMDB.ts +++ b/src/datastore/LMDB.ts @@ -1,10 +1,11 @@ -import { readdirSync, rmSync } from 'fs'; +import { existsSync, readdirSync, rmSync } from 'fs'; import { join } from 'path'; import { open, RootDatabase, RootDatabaseOptionsWithPath } from 'lmdb'; import { LoggerFactory } from '../telemetry/LoggerFactory'; import { ScopedTelemetry } from '../telemetry/ScopedTelemetry'; import { Telemetry } from '../telemetry/TelemetryDecorator'; import { isWindows } from '../utils/Environment'; +import { extractErrorMessage } from '../utils/Errors'; import { formatNumber, toString } from '../utils/String'; import { DataStore, DataStoreFactory, PersistedStores, StoreName } from './DataStore'; import { LMDBStore } from './lmdb/LMDBStore'; @@ -18,35 +19,34 @@ export class LMDBStoreFactory implements DataStoreFactory { private readonly lmdbDir: string; private readonly timeout: NodeJS.Timeout; private readonly metricsInterval: NodeJS.Timeout; - private readonly env: RootDatabase; + + private env: RootDatabase; + private openPid = process.pid; + private closed = false; private readonly stores = new Map(); - constructor(rootDir: string, storeNames: StoreName[] = PersistedStores) { + constructor( + rootDir: string, + public readonly storeNames = PersistedStores, + ) { this.lmdbDir = join(rootDir, 'lmdb'); - const config: RootDatabaseOptionsWithPath = { - path: join(this.lmdbDir, Version), - maxDbs: 10, - mapSize: TotalMaxDbSize, - encoding: Encoding, - encryptionKey: encryptionStrategy(VersionNumber), - }; - - if (isWindows) { - config.noSubdir = false; - config.overlappingSync = false; - } - - this.env = open(config); + const { env, config } = createEnv(this.lmdbDir); + this.env = env; for (const store of storeNames) { - const database = this.env.openDB({ - name: store, - encoding: Encoding, - }); - - this.stores.set(store, new LMDBStore(store, database)); + const database = createDB(this.env, store); + + this.stores.set( + store, + new LMDBStore( + store, + database, + (e) => this.handleError(e), + () => this.ensureValidEnv(), + ), + ); } this.metricsInterval = setInterval(() => { @@ -81,20 +81,85 @@ export class LMDBStoreFactory implements DataStoreFactory { return val; } - storeNames(): ReadonlyArray { - return [...this.stores.keys()]; - } - async close(): Promise { - // Clear the stores map but don't close individual stores - // LMDB will close them when we close the environment + if (this.closed) return; + this.closed = true; + clearInterval(this.metricsInterval); clearTimeout(this.timeout); this.stores.clear(); await this.env.close(); } + private handleError(error: unknown): void { + if (this.closed) return; + const msg = extractErrorMessage(error); + + if (msg.includes('MDB_BAD_RSLOT') || msg.includes("doesn't match env pid")) { + this.recoverFromFork(); + } else if ( + msg.includes('MDB_CURSOR_FULL') || + msg.includes('MDB_CORRUPTED') || + msg.includes('MDB_PAGE_NOTFOUND') || + msg.includes('MDB_BAD_TXN') || + msg.includes('Commit failed') || + msg.includes('closed database') + ) { + this.recoverFromCorruption(); + } + } + + private ensureValidEnv(): void { + if (process.pid !== this.openPid) { + this.telemetry.count('process.fork', 1); + this.log.warn({ oldPid: this.openPid, newPid: process.pid }, 'Process fork detected'); + this.reopenEnv(); + + // Update all stores with new handles + for (const store of this.storeNames) { + this.stores.get(store)?.updateStore(createDB(this.env, store)); + } + } + } + + private recoverFromFork(): void { + this.telemetry.count('forked', 1); + this.log.warn({ oldPid: this.openPid, newPid: process.pid }, 'Fork detected, reopening LMDB'); + this.reopenEnv(); + this.recreateStores(); + } + + private recoverFromCorruption(): void { + this.telemetry.count('corrupted', 1); + this.log.warn('Corruption detected, reopening LMDB'); + this.reopenEnv(); + this.recreateStores(); + } + + private reopenEnv(): void { + this.env = createEnv(this.lmdbDir).env; + this.openPid = process.pid; + this.log.warn('Recreated LMDB environment'); + } + + private recreateStores(): void { + for (const name of this.storeNames) { + const database = this.env.openDB({ name, encoding: Encoding }); + this.stores.set( + name, + new LMDBStore( + name, + database, + (e) => this.handleError(e), + () => this.ensureValidEnv(), + ), + ); + } + } + private cleanupOldVersions(): void { + if (this.closed || !existsSync(this.lmdbDir)) return; + const entries = readdirSync(this.lmdbDir, { withFileTypes: true }); for (const entry of entries) { try { @@ -110,31 +175,30 @@ export class LMDBStoreFactory implements DataStoreFactory { } private emitMetrics(): void { - const totalBytes = this.totalBytes(); + if (this.closed) return; - const envStat = stats(this.env); - this.telemetry.histogram('version', VersionNumber); - this.telemetry.histogram('env.size.bytes', envStat.totalSize, { unit: 'By' }); - this.telemetry.histogram('env.max.size.bytes', envStat.maxSize, { - unit: 'By', - }); - this.telemetry.histogram('env.entries', envStat.entries); + try { + const totalBytes = this.totalBytes(); - for (const [name, store] of this.stores.entries()) { - const stat = store.stats(); - - this.telemetry.histogram(`store.${name}.size.bytes`, stat.totalSize, { + const envStat = stats(this.env); + this.telemetry.histogram('version', VersionNumber); + this.telemetry.histogram('env.size.bytes', envStat.totalSize, { unit: 'By' }); + this.telemetry.histogram('env.max.size.bytes', envStat.maxSize, { unit: 'By', }); - this.telemetry.histogram(`store.${name}.entries`, stat.entries); - } + this.telemetry.histogram('env.entries', envStat.entries); - this.telemetry.histogram('total.usage', 100 * (totalBytes / TotalMaxDbSize), { - unit: '%', - }); - this.telemetry.histogram('total.size.bytes', totalBytes, { - unit: 'By', - }); + for (const [name, store] of this.stores.entries()) { + const stat = store.stats(); + this.telemetry.histogram(`store.${name}.size.bytes`, stat.totalSize, { unit: 'By' }); + this.telemetry.histogram(`store.${name}.entries`, stat.entries); + } + + this.telemetry.histogram('total.usage', 100 * (totalBytes / TotalMaxDbSize), { unit: '%' }); + this.telemetry.histogram('total.size.bytes', totalBytes, { unit: 'By' }); + } catch (e) { + this.handleError(e); + } } private totalBytes() { @@ -153,3 +217,27 @@ const VersionNumber = 5; const Version = `v${VersionNumber}`; const Encoding: 'msgpack' | 'json' | 'string' | 'binary' | 'ordered-binary' = 'msgpack'; const TotalMaxDbSize = 250 * 1024 * 1024; // 250MB max size + +function createEnv(lmdbDir: string) { + const config: RootDatabaseOptionsWithPath = { + path: join(lmdbDir, Version), + maxDbs: 10, + mapSize: TotalMaxDbSize, + encoding: Encoding, + encryptionKey: encryptionStrategy(VersionNumber), + }; + + if (isWindows) { + config.noSubdir = false; + config.overlappingSync = false; + } + + return { + config, + env: open(config), + }; +} + +function createDB(env: RootDatabase, name: string) { + return env.openDB({ name, encoding: Encoding }); +} diff --git a/src/datastore/MemoryStore.ts b/src/datastore/MemoryStore.ts index 5b5e01b5..3e49405e 100644 --- a/src/datastore/MemoryStore.ts +++ b/src/datastore/MemoryStore.ts @@ -70,7 +70,7 @@ export class MemoryStoreFactory implements DataStoreFactory { return val; } - storeNames(): ReadonlyArray { + get storeNames(): ReadonlyArray { return [...this.stores.keys()]; } diff --git a/src/datastore/lmdb/LMDBStore.ts b/src/datastore/lmdb/LMDBStore.ts index 36586a25..16fb4540 100644 --- a/src/datastore/lmdb/LMDBStore.ts +++ b/src/datastore/lmdb/LMDBStore.ts @@ -1,53 +1,72 @@ import { Database } from 'lmdb'; -import { Logger } from 'pino'; -import { LoggerFactory } from '../../telemetry/LoggerFactory'; import { ScopedTelemetry } from '../../telemetry/ScopedTelemetry'; import { TelemetryService } from '../../telemetry/TelemetryService'; import { DataStore, StoreName } from '../DataStore'; import { stats, StoreStatsType } from './Stats'; +type ErrorHandler = (error: unknown) => void; + export class LMDBStore implements DataStore { private readonly telemetry: ScopedTelemetry; constructor( public readonly name: StoreName, - protected readonly store: Database, - private readonly log: Logger = LoggerFactory.getLogger(`LMDB.${name}`), + private store: Database, + private readonly onError?: ErrorHandler, + private readonly validateDatabase?: () => void, ) { this.telemetry = TelemetryService.instance.get(`LMDB.${name}`); } - get(key: string): T | undefined { - return this.telemetry.countExecution('get', () => { - return this.store.get(key) as T | undefined; + updateStore(store: Database) { + this.store = store; + } + + private exec(op: string, fn: () => T): T { + return this.telemetry.measure(op, () => { + try { + this.validateDatabase?.(); + return fn(); + } catch (e) { + this.onError?.(e); + throw e; + } }); } - put(key: string, value: T): Promise { - return this.telemetry.measureAsync('put', () => { - return this.store.put(key, value); + private async execAsync(op: string, fn: () => Promise): Promise { + return await this.telemetry.measureAsync(op, async () => { + try { + this.validateDatabase?.(); + return await fn(); + } catch (e) { + this.onError?.(e); + throw e; + } }); } + get(key: string): T | undefined { + return this.exec('get', () => this.store.get(key) as T | undefined); + } + + put(key: string, value: T): Promise { + return this.execAsync('put', () => this.store.put(key, value)); + } + remove(key: string): Promise { - return this.telemetry.countExecutionAsync('remove', () => { - return this.store.remove(key); - }); + return this.execAsync('remove', () => this.store.remove(key)); } clear(): Promise { - return this.telemetry.countExecutionAsync('clear', () => { - return this.store.clearAsync(); - }); + return this.execAsync('clear', () => this.store.clearAsync()); } keys(limit: number): ReadonlyArray { - return this.telemetry.countExecution('keys', () => { - return this.store.getKeys({ limit }).asArray; - }); + return this.exec('keys', () => this.store.getKeys({ limit }).asArray); } stats(): StoreStatsType { - return stats(this.store); + return this.exec('stats', () => stats(this.store)); } } diff --git a/src/datastore/lmdb/LockedLMDBStore.ts b/src/datastore/lmdb/LockedLMDBStore.ts deleted file mode 100644 index d6aaa43b..00000000 --- a/src/datastore/lmdb/LockedLMDBStore.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { Mutex } from 'async-mutex'; -import { Database } from 'lmdb'; -import { LoggerFactory } from '../../telemetry/LoggerFactory'; -import { StoreName } from '../DataStore'; -import { LMDBStore } from './LMDBStore'; - -export class LockedLMDBStore extends LMDBStore { - private readonly lock = new Mutex(); - - constructor(name: StoreName, store: Database) { - super(name, store, LoggerFactory.getLogger(`LockedLMDBStore.${name}`)); - } - - override get(key: string): T | undefined { - if (this.lock.isLocked()) { - throw new LockedError('DataStore is locked'); - } - - return super.get(key); - } - - override put(key: string, value: T): Promise { - return this.lock.runExclusive(() => { - return super.put(key, value); - }); - } - - override remove(key: string): Promise { - return this.lock.runExclusive(() => { - return super.remove(key); - }); - } - - override clear(): Promise { - return this.lock.runExclusive(() => { - return super.clear(); - }); - } - - override keys(limit: number): ReadonlyArray { - if (this.lock.isLocked()) { - throw new LockedError('DataStore is locked'); - } - return super.keys(limit); - } -} - -class LockedError extends Error { - constructor(message?: string, options?: ErrorOptions) { - super(message, options); - } -} diff --git a/tst/unit/datastore/LMDB.recovery.test.ts b/tst/unit/datastore/LMDB.recovery.test.ts new file mode 100644 index 00000000..415979c0 --- /dev/null +++ b/tst/unit/datastore/LMDB.recovery.test.ts @@ -0,0 +1,180 @@ +import fs from 'fs'; +import { join } from 'path'; +import { v4 } from 'uuid'; +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { StoreName } from '../../../src/datastore/DataStore'; +import { LMDBStoreFactory } from '../../../src/datastore/LMDB'; + +describe('LMDB fork detection and recovery', () => { + let testDir: string; + let factory: LMDBStoreFactory; + let originalPid: number; + + beforeEach(() => { + testDir = join(process.cwd(), 'node_modules', '.cache', 'lmdb-recovery-test', v4()); + originalPid = process.pid; + fs.mkdirSync(testDir, { recursive: true }); + factory = new LMDBStoreFactory(testDir); + }); + + afterEach(async () => { + Object.defineProperty(process, 'pid', { value: originalPid, configurable: true }); + await factory.close(); + }); + + describe('fork detection', () => { + it('should update store handle and succeed on same store reference after fork', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('key', 'value'); + + // Simulate fork + Object.defineProperty(process, 'pid', { value: originalPid + 1000, configurable: true }); + + // Same store reference should work - it updates its internal handle + expect(store.get('key')).toBe('value'); + }); + + it('should allow writes after fork on same store reference', async () => { + const store = factory.get(StoreName.public_schemas); + + // Simulate fork before any operation + Object.defineProperty(process, 'pid', { value: originalPid + 1000, configurable: true }); + + // Write should succeed after proactive recovery + await store.put('newkey', 'newvalue'); + expect(store.get('newkey')).toBe('newvalue'); + }); + + it('should handle fork during keys() operation', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('k1', 'v1'); + await store.put('k2', 'v2'); + + Object.defineProperty(process, 'pid', { value: originalPid + 1000, configurable: true }); + + const keys = store.keys(10); + expect(keys).toContain('k1'); + expect(keys).toContain('k2'); + }); + + it('should handle fork during remove() operation', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('toremove', 'value'); + + Object.defineProperty(process, 'pid', { value: originalPid + 1000, configurable: true }); + + await store.remove('toremove'); + expect(store.get('toremove')).toBeUndefined(); + }); + + it('should handle fork during clear() operation', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('k1', 'v1'); + await store.put('k2', 'v2'); + + Object.defineProperty(process, 'pid', { value: originalPid + 1000, configurable: true }); + + await store.clear(); + expect(store.keys(10)).toHaveLength(0); + }); + + it('should handle multiple consecutive forks', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('persistent', 'data'); + + for (let i = 1; i <= 5; i++) { + Object.defineProperty(process, 'pid', { value: originalPid + i * 1000, configurable: true }); + // Same store reference works across multiple forks + expect(store.get('persistent')).toBe('data'); + } + }); + + it('should not reopen when PID unchanged', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('key', 'value'); + + // Multiple operations without PID change should not trigger recovery + expect(store.get('key')).toBe('value'); + expect(store.get('key')).toBe('value'); + await store.put('key2', 'value2'); + expect(store.keys(10)).toHaveLength(2); + }); + }); + + describe('store isolation after fork', () => { + it('should maintain data isolation between stores after fork', async () => { + const store1 = factory.get(StoreName.public_schemas); + const store2 = factory.get(StoreName.sam_schemas); + + await store1.put('key', 'store1-value'); + await store2.put('key', 'store2-value'); + + Object.defineProperty(process, 'pid', { value: originalPid + 1000, configurable: true }); + + // Both stores should recover independently + expect(store1.get('key')).toBe('store1-value'); + expect(store2.get('key')).toBe('store2-value'); + }); + }); + + describe('factory behavior', () => { + it('should throw for unknown store name', () => { + expect(() => factory.get('unknown-store' as StoreName)).toThrow('Store unknown-store not found'); + }); + + it('should return correct store names', () => { + const names = factory.storeNames; + expect(names).toContain(StoreName.public_schemas); + expect(names).toContain(StoreName.sam_schemas); + }); + + it('should be idempotent on close', () => { + expect(async () => { + await factory.close(); + await factory.close(); + await factory.close(); + }).not.toThrow(); + }); + + it('should clear timers on close', async () => { + const clearIntervalSpy = vi.spyOn(globalThis, 'clearInterval'); + const clearTimeoutSpy = vi.spyOn(globalThis, 'clearTimeout'); + + await factory.close(); + + expect(clearIntervalSpy).toHaveBeenCalled(); + expect(clearTimeoutSpy).toHaveBeenCalled(); + + clearIntervalSpy.mockRestore(); + clearTimeoutSpy.mockRestore(); + }); + }); + + describe('cleanup safety', () => { + it('should handle missing lmdb directory during cleanup', async () => { + await factory.close(); + fs.rmSync(testDir, { recursive: true, force: true }); + + const newFactory = new LMDBStoreFactory(testDir); + + expect(async () => { + await newFactory.close(); + }).not.toThrow(); + }); + + it('should cleanup old version directories', async () => { + // Create old version directories + const lmdbDir = join(testDir, 'lmdb'); + fs.mkdirSync(join(lmdbDir, 'v1'), { recursive: true }); + fs.mkdirSync(join(lmdbDir, 'v2'), { recursive: true }); + fs.mkdirSync(join(lmdbDir, 'v3'), { recursive: true }); + + // Wait for cleanup timeout (we can't easily test the 2min timeout, + // but we verify the directories exist before close) + expect(fs.existsSync(join(lmdbDir, 'v1'))).toBe(true); + expect(fs.existsSync(join(lmdbDir, 'v2'))).toBe(true); + + await factory.close(); + }); + }); +}); diff --git a/tst/unit/datastore/LMDB.test.ts b/tst/unit/datastore/LMDB.test.ts index fda8badc..d56732cd 100644 --- a/tst/unit/datastore/LMDB.test.ts +++ b/tst/unit/datastore/LMDB.test.ts @@ -204,4 +204,20 @@ describe('LMDB', () => { await newFactory.close(); }); }); + + describe('stats', () => { + it('should return store statistics', async () => { + const store = lmdbFactory.get(StoreName.public_schemas); + + await store.put('key1', 'value1'); + await store.put('key2', { nested: 'data' }); + + const storeStats = (store as any).stats(); + + expect(storeStats).toHaveProperty('totalSize'); + expect(storeStats).toHaveProperty('entries'); + expect(storeStats).toHaveProperty('maxSize'); + expect(storeStats.entries).toBeGreaterThanOrEqual(0); + }); + }); });