From 82197f8ad4352a71d290cf62aa18939cca32fe7a Mon Sep 17 00:00:00 2001 From: NikAiyer Date: Mon, 24 Feb 2025 19:03:28 -0600 Subject: [PATCH] [MASTRA-1974] PG Vector Index Update (#1971) This PR does the following: - adds index rebuilding for ivfflat indexes in order to handle optimization - adds testing suite to compare different dimensions, sizes and k values to determine recall, latency and clustering - adds support for other pg index types --- .changeset/lemon-news-press.md | 6 + docs/src/pages/docs/reference/rag/pg.mdx | 192 ++++++++- stores/pg/README.md | 26 +- stores/pg/docker-compose.perf.yaml | 21 + stores/pg/package.json | 3 + stores/pg/src/vector/index.test.ts | 191 ++++++--- stores/pg/src/vector/index.ts | 165 ++++++-- stores/pg/src/vector/performance.helpers.ts | 286 ++++++++++++++ stores/pg/src/vector/types.ts | 16 + .../pg/src/vector/vector.performance.test.ts | 371 ++++++++++++++++++ stores/pg/vitest.config.ts | 1 + stores/pg/vitest.perf.config.ts | 8 + 12 files changed, 1194 insertions(+), 92 deletions(-) create mode 100644 .changeset/lemon-news-press.md create mode 100644 stores/pg/docker-compose.perf.yaml create mode 100644 stores/pg/src/vector/performance.helpers.ts create mode 100644 stores/pg/src/vector/types.ts create mode 100644 stores/pg/src/vector/vector.performance.test.ts create mode 100644 stores/pg/vitest.perf.config.ts diff --git a/.changeset/lemon-news-press.md b/.changeset/lemon-news-press.md new file mode 100644 index 0000000000..234028ac57 --- /dev/null +++ b/.changeset/lemon-news-press.md @@ -0,0 +1,6 @@ +--- +'@mastra/pg': patch +'docs': patch +--- + +Update PG vector to allow for multiple index types diff --git a/docs/src/pages/docs/reference/rag/pg.mdx b/docs/src/pages/docs/reference/rag/pg.mdx index 0f0543f759..abc5dd8e81 100644 --- a/docs/src/pages/docs/reference/rag/pg.mdx +++ b/docs/src/pages/docs/reference/rag/pg.mdx @@ -43,9 +43,111 @@ It provides robust vector similarity search capabilities within your existing Po defaultValue: "cosine", description: "Distance metric for similarity search", }, + { + name: "indexConfig", + type: "IndexConfig", + isOptional: true, + defaultValue: "{ type: 'ivfflat' }", + description: "Index configuration", + }, + { + name: "defineIndex", + type: "boolean", + isOptional: true, + defaultValue: "true", + description: "Whether to define the index", + }, + ]} +/> + +#### IndexConfig + + +#### Memory Requirements + +HNSW indexes require significant shared memory during construction. For 100K vectors: +- Small dimensions (64d): ~60MB with default settings +- Medium dimensions (256d): ~180MB with default settings +- Large dimensions (384d+): ~250MB+ with default settings + +Higher M values or efConstruction values will increase memory requirements significantly. Adjust your system's shared memory limits if needed. + ### upsert() @@ -138,10 +265,17 @@ Returns an array of index names as strings. Returns: ```typescript copy -interface IndexStats { +interface PGIndexStats { dimension: number; count: number; metric: "cosine" | "euclidean" | "dotproduct"; + type: "flat" | "hnsw" | "ivfflat"; + config: { + m?: number; + efConstruction?: number; + lists?: number; + probes?: number; + }; } ``` @@ -161,6 +295,56 @@ interface IndexStats { Closes the database connection pool. Should be called when done using the store. +### defineIndex() + + + +Defines or redefines an index with specified metric and configuration. Will drop any existing index before creating the new one. + +```typescript copy +// Define HNSW index +await pgVector.defineIndex("my_vectors", "cosine", { + type: "hnsw", + hnsw: { + m: 8, + efConstruction: 32 + } +}); + +// Define IVF index +await pgVector.defineIndex("my_vectors", "cosine", { + type: "ivfflat", + ivf: { + lists: 100, + } +}); + +// Define flat index +await pgVector.defineIndex("my_vectors", "cosine", { + type: "flat" +}); +``` + ## Response Types Query results are returned in this format: @@ -188,5 +372,11 @@ try { } ``` +## Best Practices + +- Regularly evaluate your index configuration to ensure optimal performance. +- Adjust parameters like `lists` and `m` based on dataset size and query requirements. +- Rebuild indexes periodically to maintain efficiency, especially after significant data changes. + ### Related - [Metadata Filters](./metadata-filters) \ No newline at end of file diff --git a/stores/pg/README.md b/stores/pg/README.md index 1d77861302..670fb7a521 100644 --- a/stores/pg/README.md +++ b/stores/pg/README.md @@ -56,7 +56,7 @@ const store = new PostgresStore({ port: 5432, database: 'mastra', user: 'postgres', - password: 'postgres' + password: 'postgres', }); // Create a thread @@ -64,17 +64,19 @@ await store.saveThread({ id: 'thread-123', resourceId: 'resource-456', title: 'My Thread', - metadata: { key: 'value' } + metadata: { key: 'value' }, }); // Add messages to thread -await store.saveMessages([{ - id: 'msg-789', - threadId: 'thread-123', - role: 'user', - type: 'text', - content: [{ type: 'text', text: 'Hello' }] -}]); +await store.saveMessages([ + { + id: 'msg-789', + threadId: 'thread-123', + role: 'user', + type: 'text', + content: [{ type: 'text', text: 'Hello' }], + }, +]); // Query threads and messages const savedThread = await store.getThread('thread-123'); @@ -97,14 +99,18 @@ Connection pool settings: ## Features ### Vector Store Features + - Vector similarity search with cosine, euclidean, and dot product metrics - Advanced metadata filtering with MongoDB-like query syntax - Minimum score threshold for queries - Automatic UUID generation for vectors - Table management (create, list, describe, delete, truncate) -- Uses pgvector's IVFFLAT indexing with 100 lists +- Uses pgvector's IVFFLAT indexing with 100 lists by default +- Supports HNSW indexing with configurable parameters +- Supports flat indexing ### Storage Features + - Thread and message storage with JSON support - Atomic transactions for data consistency - Efficient batch operations diff --git a/stores/pg/docker-compose.perf.yaml b/stores/pg/docker-compose.perf.yaml new file mode 100644 index 0000000000..89d87a52ce --- /dev/null +++ b/stores/pg/docker-compose.perf.yaml @@ -0,0 +1,21 @@ +services: + db: + image: pgvector/pgvector:pg16 + container_name: 'pg-perf-test-db' + ports: + - '5435:5432' + environment: + POSTGRES_USER: ${POSTGRES_USER:-postgres} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres} + POSTGRES_DB: ${POSTGRES_DB:-mastra} + shm_size: 1gb + command: + - "postgres" + - "-c" + - "shared_buffers=512MB" + - "-c" + - "maintenance_work_mem=1024MB" + - "-c" + - "work_mem=512MB" + tmpfs: + - /var/lib/postgresql/data \ No newline at end of file diff --git a/stores/pg/package.json b/stores/pg/package.json index 4e71b3bbaa..bdbc827f79 100644 --- a/stores/pg/package.json +++ b/stores/pg/package.json @@ -19,7 +19,10 @@ "build:watch": "pnpm build --watch", "pretest": "docker compose up -d && (for i in $(seq 1 30); do docker compose exec -T db pg_isready -U postgres && break || (sleep 1; [ $i -eq 30 ] && exit 1); done)", "test": "vitest run", + "pretest:perf": "docker compose -f docker-compose.perf.yaml up -d && (for i in $(seq 1 30); do docker compose -f docker-compose.perf.yaml exec -T db pg_isready -U postgres && break || (sleep 1; [ $i -eq 30 ] && exit 1); done)", + "test:perf": "NODE_OPTIONS='--max-old-space-size=16384' vitest run -c vitest.perf.config.ts", "posttest": "docker compose down -v", + "posttest:perf": "docker compose -f docker-compose.perf.yaml down -v", "pretest:watch": "docker compose up -d", "test:watch": "vitest watch", "posttest:watch": "docker compose down -v" diff --git a/stores/pg/src/vector/index.test.ts b/stores/pg/src/vector/index.test.ts index 6ba3055e60..033898f02e 100644 --- a/stores/pg/src/vector/index.test.ts +++ b/stores/pg/src/vector/index.test.ts @@ -42,6 +42,32 @@ describe('PgVector', () => { it('should throw error if dimension is invalid', async () => { await expect(vectorDB.createIndex(`testIndexNameFail`, 0)).rejects.toThrow(); }); + + it('should create index with flat type', async () => { + await vectorDB.createIndex(testIndexName2, 3, 'cosine', { type: 'flat' }); + const stats = await vectorDB.describeIndex(testIndexName2); + expect(stats.type).toBe('flat'); + }); + + it('should create index with hnsw type', async () => { + await vectorDB.createIndex(testIndexName2, 3, 'cosine', { + type: 'hnsw', + hnsw: { m: 16, efConstruction: 64 }, // Any reasonable values work + }); + const stats = await vectorDB.describeIndex(testIndexName2); + expect(stats.type).toBe('hnsw'); + expect(stats.config.m).toBe(16); + }); + + it('should create index with ivfflat type and lists', async () => { + await vectorDB.createIndex(testIndexName2, 3, 'cosine', { + type: 'ivfflat', + ivf: { lists: 100 }, + }); + const stats = await vectorDB.describeIndex(testIndexName2); + expect(stats.type).toBe('ivfflat'); + expect(stats.config.lists).toBe(100); + }); }); describe('listIndexes', () => { @@ -86,6 +112,10 @@ describe('PgVector', () => { const stats = await vectorDB.describeIndex(indexName); expect(stats).toEqual({ + type: 'ivfflat', + config: { + lists: 100, + }, dimension: 3, count: 2, metric: 'cosine', @@ -152,59 +182,61 @@ describe('PgVector', () => { }); describe('Basic Query Operations', () => { - const indexName = 'test_query_2'; - beforeAll(async () => { - try { + ['flat', 'hnsw', 'ivfflat'].forEach(indexType => { + const indexName = `test_query_2_${indexType}`; + beforeAll(async () => { + try { + await vectorDB.deleteIndex(indexName); + } catch (e) { + // Ignore if doesn't exist + } + await vectorDB.createIndex(indexName, 3); + }); + + beforeEach(async () => { + await vectorDB.truncateIndex(indexName); + const vectors = [ + [1, 0, 0], + [0.8, 0.2, 0], + [0, 1, 0], + ]; + const metadata = [ + { type: 'a', value: 1 }, + { type: 'b', value: 2 }, + { type: 'a', value: 3 }, + ]; + await vectorDB.upsert(indexName, vectors, metadata); + }); + + afterAll(async () => { await vectorDB.deleteIndex(indexName); - } catch (e) { - // Ignore if doesn't exist - } - await vectorDB.createIndex(indexName, 3); - }); - - beforeEach(async () => { - await vectorDB.truncateIndex(indexName); - const vectors = [ - [1, 0, 0], - [0.8, 0.2, 0], - [0, 1, 0], - ]; - const metadata = [ - { type: 'a', value: 1 }, - { type: 'b', value: 2 }, - { type: 'a', value: 3 }, - ]; - await vectorDB.upsert(indexName, vectors, metadata); - }); - - afterAll(async () => { - await vectorDB.deleteIndex(indexName); - }); + }); - it('should return closest vectors', async () => { - const results = await vectorDB.query(indexName, [1, 0, 0], 1); - expect(results).toHaveLength(1); - expect(results[0]?.vector).toBe(undefined); - expect(results[0]?.score).toBeCloseTo(1, 5); - }); + it('should return closest vectors', async () => { + const results = await vectorDB.query(indexName, [1, 0, 0], 1); + expect(results).toHaveLength(1); + expect(results[0]?.vector).toBe(undefined); + expect(results[0]?.score).toBeCloseTo(1, 5); + }); - it('should return vector with result', async () => { - const results = await vectorDB.query(indexName, [1, 0, 0], 1, undefined, true); - expect(results).toHaveLength(1); - expect(results[0]?.vector).toStrictEqual([1, 0, 0]); - }); + it('should return vector with result', async () => { + const results = await vectorDB.query(indexName, [1, 0, 0], 1, undefined, true); + expect(results).toHaveLength(1); + expect(results[0]?.vector).toStrictEqual([1, 0, 0]); + }); - it('should respect topK parameter', async () => { - const results = await vectorDB.query(indexName, [1, 0, 0], 2); - expect(results).toHaveLength(2); - }); + it('should respect topK parameter', async () => { + const results = await vectorDB.query(indexName, [1, 0, 0], 2); + expect(results).toHaveLength(2); + }); - it('should handle filters correctly', async () => { - const results = await vectorDB.query(indexName, [1, 0, 0], 10, { type: 'a' }); + it('should handle filters correctly', async () => { + const results = await vectorDB.query(indexName, [1, 0, 0], 10, { type: 'a' }); - expect(results).toHaveLength(1); - results.forEach(result => { - expect(result?.metadata?.type).toBe('a'); + expect(results).toHaveLength(1); + results.forEach(result => { + expect(result?.metadata?.type).toBe('a'); + }); }); }); }); @@ -1202,4 +1234,69 @@ describe('PgVector', () => { }); }); }); + + describe('Search Parameters', () => { + const indexName = 'test_search_params'; + const vectors = [ + [1, 0, 0], // Query vector will be closest to this + [0.8, 0.2, 0], // Second closest + [0, 1, 0], // Third (much further) + ]; + + describe('HNSW Parameters', () => { + beforeAll(async () => { + await vectorDB.createIndex(indexName, 3, 'cosine', { + type: 'hnsw', + hnsw: { m: 16, efConstruction: 64 }, + }); + await vectorDB.upsert(indexName, vectors); + }); + + afterAll(async () => { + await vectorDB.deleteIndex(indexName); + }); + + it('should use default ef value', async () => { + const results = await vectorDB.query(indexName, [1, 0, 0], 2); + expect(results).toHaveLength(2); + expect(results[0]?.score).toBeCloseTo(1, 5); + expect(results[1]?.score).toBeGreaterThan(0.9); // Second vector should be close + }); + + it('should respect custom ef value', async () => { + const results = await vectorDB.query(indexName, [1, 0, 0], 2, undefined, undefined, undefined, { ef: 100 }); + expect(results).toHaveLength(2); + expect(results[0]?.score).toBeCloseTo(1, 5); + expect(results[1]?.score).toBeGreaterThan(0.9); + }); + }); + + describe('IVF Parameters', () => { + beforeAll(async () => { + await vectorDB.createIndex(indexName, 3, 'cosine', { + type: 'ivfflat', + ivf: { lists: 2 }, // Small number for test data + }); + await vectorDB.upsert(indexName, vectors); + }); + + afterAll(async () => { + await vectorDB.deleteIndex(indexName); + }); + + it('should use default probe value', async () => { + const results = await vectorDB.query(indexName, [1, 0, 0], 2); + expect(results).toHaveLength(2); + expect(results[0]?.score).toBeCloseTo(1, 5); + expect(results[1]?.score).toBeGreaterThan(0.9); + }); + + it('should respect custom probe value', async () => { + const results = await vectorDB.query(indexName, [1, 0, 0], 2, undefined, undefined, undefined, { probes: 2 }); + expect(results).toHaveLength(2); + expect(results[0]?.score).toBeCloseTo(1, 5); + expect(results[1]?.score).toBeGreaterThan(0.9); + }); + }); + }); }); diff --git a/stores/pg/src/vector/index.ts b/stores/pg/src/vector/index.ts index 0d1d57846b..f27ebd93fb 100644 --- a/stores/pg/src/vector/index.ts +++ b/stores/pg/src/vector/index.ts @@ -4,9 +4,21 @@ import pg from 'pg'; import { PGFilterTranslator } from './filter'; import { buildFilterQuery } from './sql-builder'; +import { type IndexConfig, type IndexType } from './types'; + +export interface PGIndexStats extends IndexStats { + type: IndexType; + config: { + m?: number; + efConstruction?: number; + lists?: number; + probes?: number; + }; +} export class PgVector extends MastraVector { private pool: pg.Pool; + private indexCache: Map = new Map(); constructor(connectionString: string) { super(); @@ -35,6 +47,13 @@ export class PgVector extends MastraVector { return translatedFilter; } + async getIndexInfo(indexName: string): Promise { + if (!this.indexCache.has(indexName)) { + this.indexCache.set(indexName, await this.describeIndex(indexName)); + } + return this.indexCache.get(indexName)!; + } + async query( indexName: string, queryVector: number[], @@ -42,14 +61,32 @@ export class PgVector extends MastraVector { filter?: Filter, includeVector: boolean = false, minScore: number = 0, // Optional minimum score threshold + options?: { + ef?: number; // For HNSW + probes?: number; // For IVF + }, ): Promise { const client = await this.pool.connect(); try { const vectorStr = `[${queryVector.join(',')}]`; - const translatedFilter = this.transformFilter(filter); const { sql: filterQuery, values: filterValues } = buildFilterQuery(translatedFilter, minScore); + // Get index type and configuration + const indexInfo = await this.getIndexInfo(indexName); + + // Set HNSW search parameter if applicable + if (indexInfo.type === 'hnsw') { + // Calculate ef and clamp between 1 and 1000 + const calculatedEf = options?.ef ?? Math.max(topK, (indexInfo?.config?.m ?? 16) * topK); + const searchEf = Math.min(1000, Math.max(1, calculatedEf)); + await client.query(`SET LOCAL hnsw.ef_search = ${searchEf}`); + } + + if (indexInfo.type === 'ivfflat' && options?.probes) { + await client.query(`SET LOCAL ivfflat.probes = ${options.probes}`); + } + const query = ` WITH vector_scores AS ( SELECT @@ -88,25 +125,23 @@ export class PgVector extends MastraVector { const client = await this.pool.connect(); try { await client.query('BEGIN'); - const vectorIds = ids || vectors.map(() => crypto.randomUUID()); for (let i = 0; i < vectors.length; i++) { const query = ` - INSERT INTO ${indexName} (vector_id, embedding, metadata) - VALUES ($1, $2::vector, $3::jsonb) - ON CONFLICT (vector_id) - DO UPDATE SET - embedding = $2::vector, - metadata = $3::jsonb - RETURNING embedding::text + INSERT INTO ${indexName} (vector_id, embedding, metadata) + VALUES ($1, $2::vector, $3::jsonb) + ON CONFLICT (vector_id) + DO UPDATE SET + embedding = $2::vector, + metadata = $3::jsonb + RETURNING embedding::text `; await client.query(query, [vectorIds[i], `[${vectors[i]?.join(',')}]`, JSON.stringify(metadata?.[i] || {})]); } await client.query('COMMIT'); - return vectorIds; } catch (error) { await client.query('ROLLBACK'); @@ -120,6 +155,8 @@ export class PgVector extends MastraVector { indexName: string, dimension: number, metric: 'cosine' | 'euclidean' | 'dotproduct' = 'cosine', + indexConfig: IndexConfig = {}, + defineIndex: boolean = true, ): Promise { const client = await this.pool.connect(); try { @@ -155,16 +192,9 @@ export class PgVector extends MastraVector { ); `); - // Create the index - const indexMethod = - metric === 'cosine' ? 'vector_cosine_ops' : metric === 'euclidean' ? 'vector_l2_ops' : 'vector_ip_ops'; - - await client.query(` - CREATE INDEX IF NOT EXISTS ${indexName}_vector_idx - ON public.${indexName} - USING ivfflat (embedding ${indexMethod}) - WITH (lists = 100); - `); + if (defineIndex) { + await this.defineIndex(indexName, metric, indexConfig); + } } catch (error: any) { console.error('Failed to create vector table:', error); throw error; @@ -173,6 +203,57 @@ export class PgVector extends MastraVector { } } + async defineIndex( + indexName: string, + metric: 'cosine' | 'euclidean' | 'dotproduct' = 'cosine', + indexConfig: IndexConfig, + ): Promise { + const client = await this.pool.connect(); + try { + await client.query(`DROP INDEX IF EXISTS ${indexName}_vector_idx`); + + if (indexConfig.type === 'flat') return; + + const metricOp = + metric === 'cosine' ? 'vector_cosine_ops' : metric === 'euclidean' ? 'vector_l2_ops' : 'vector_ip_ops'; + + let indexSQL: string; + if (indexConfig.type === 'hnsw') { + const m = indexConfig.hnsw?.m ?? 8; + const efConstruction = indexConfig.hnsw?.efConstruction ?? 32; + + indexSQL = ` + CREATE INDEX ${indexName}_vector_idx + ON ${indexName} + USING hnsw (embedding ${metricOp}) + WITH ( + m = ${m}, + ef_construction = ${efConstruction} + ) + `; + } else { + let lists: number; + if (indexConfig.ivf?.lists) { + lists = indexConfig.ivf.lists; + } else { + const size = (await client.query(`SELECT COUNT(*) FROM ${indexName}`)).rows[0].count; + lists = Math.max(100, Math.min(4000, Math.floor(Math.sqrt(size) * 2))); + } + indexSQL = ` + CREATE INDEX ${indexName}_vector_idx + ON ${indexName} + USING ivfflat (embedding ${metricOp}) + WITH (lists = ${lists}); + `; + } + + await client.query(indexSQL); + this.indexCache.delete(indexName); + } finally { + client.release(); + } + } + async listIndexes(): Promise { const client = await this.pool.connect(); try { @@ -190,7 +271,7 @@ export class PgVector extends MastraVector { } } - async describeIndex(indexName: string): Promise { + async describeIndex(indexName: string): Promise { const client = await this.pool.connect(); try { // Get vector dimension @@ -208,9 +289,10 @@ export class PgVector extends MastraVector { `; // Get index metric type - const metricQuery = ` + const indexQuery = ` SELECT am.amname as index_method, + pg_get_indexdef(i.indexrelid) as index_def, opclass.opcname as operator_class FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid @@ -219,29 +301,44 @@ export class PgVector extends MastraVector { WHERE c.relname = '${indexName}_vector_idx'; `; - const [dimResult, countResult, metricResult] = await Promise.all([ + const [dimResult, countResult, indexResult] = await Promise.all([ client.query(dimensionQuery, [indexName]), client.query(countQuery), - client.query(metricQuery), + client.query(indexQuery), ]); + const { index_method, index_def, operator_class } = indexResult.rows[0] || { + index_method: 'flat', + index_def: '', + operator_class: 'cosine', + }; + // Convert pg_vector index method to our metric type - let metric: 'cosine' | 'euclidean' | 'dotproduct' = 'cosine'; - if (metricResult.rows.length > 0) { - const operatorClass = metricResult.rows[0].operator_class; - if (operatorClass.includes('l2')) { - metric = 'euclidean'; - } else if (operatorClass.includes('ip')) { - metric = 'dotproduct'; - } else if (operatorClass.includes('cosine')) { - metric = 'cosine'; - } + const metric = operator_class.includes('l2') + ? 'euclidean' + : operator_class.includes('ip') + ? 'dotproduct' + : 'cosine'; + + // Parse index configuration + const config: { m?: number; efConstruction?: number; lists?: number } = {}; + + if (index_method === 'hnsw') { + const m = index_def.match(/m\s*=\s*'?(\d+)'?/)?.[1]; + const efConstruction = index_def.match(/ef_construction\s*=\s*'?(\d+)'?/)?.[1]; + if (m) config.m = parseInt(m); + if (efConstruction) config.efConstruction = parseInt(efConstruction); + } else if (index_method === 'ivfflat') { + const lists = index_def.match(/lists\s*=\s*'?(\d+)'?/)?.[1]; + if (lists) config.lists = parseInt(lists); } return { dimension: dimResult.rows[0].dimension, count: parseInt(countResult.rows[0].count), metric, + type: index_method as 'flat' | 'hnsw' | 'ivfflat', + config, }; } catch (e: any) { await client.query('ROLLBACK'); diff --git a/stores/pg/src/vector/performance.helpers.ts b/stores/pg/src/vector/performance.helpers.ts new file mode 100644 index 0000000000..2a298914fb --- /dev/null +++ b/stores/pg/src/vector/performance.helpers.ts @@ -0,0 +1,286 @@ +import { type IndexConfig, type IndexType } from './types'; + +import { PgVector } from '.'; + +export interface TestResult { + distribution: string; + dimension: number; + type: IndexType; + size: number; + k?: number; + metrics: { + recall?: number; + minRecall?: number; + maxRecall?: number; + latency?: { + p50: number; + p95: number; + lists?: number; + vectorsPerList?: number; + m?: number; + ef?: number; + }; + clustering?: { + numLists?: number; + avgVectorsPerList?: number; + recommendedLists?: number; + distribution?: string; + }; + }; +} + +export const generateRandomVectors = (count: number, dim: number) => { + return Array.from({ length: count }, () => { + return Array.from({ length: dim }, () => Math.random() * 2 - 1); + }); +}; + +export const generateClusteredVectors = (count: number, dim: number, numClusters: number = 10) => { + // Generate cluster centers + const centers = Array.from({ length: numClusters }, () => Array.from({ length: dim }, () => Math.random() * 2 - 1)); + + // Generate vectors around centers with varying spread + return Array.from({ length: count }, () => { + // Pick a random cluster, with some clusters being more popular + const centerIdx = Math.floor(Math.pow(Math.random(), 2) * numClusters); + const center = centers[centerIdx] as number[]; + + // Add noise, with some vectors being further from centers + const spread = Math.random() < 0.8 ? 0.1 : 0.5; // 80% close, 20% far + return center.map(c => c + (Math.random() * spread - spread / 2)); + }); +}; + +// Or even more extreme: +export const generateSkewedVectors = (count: number, dim: number) => { + // Create dense clusters with sparse regions + const vectors: number[][] = []; + + const denseCount = Math.floor(count * 0.6); + const sparseCount = count - denseCount; + + // Dense cluster (60% of vectors) + const denseCenter = Array.from({ length: dim }, () => Math.random() * 0.2); + for (let i = 0; i < denseCount; i++) { + vectors.push(denseCenter.map(c => c + (Math.random() * 0.1 - 0.05))); + } + + // Scattered vectors (40%) + for (let i = 0; i < sparseCount; i++) { + vectors.push(Array.from({ length: dim }, () => Math.random() * 2 - 1)); + } + + return vectors.sort(() => Math.random() - 0.5); // Shuffle +}; + +export const findNearestBruteForce = (query: number[], vectors: number[][], k: number) => { + const similarities = vectors.map((vector, idx) => { + const similarity = cosineSimilarity(query, vector); + return { idx, dist: similarity }; + }); + + const sorted = similarities.sort((a, b) => b.dist - a.dist); + return sorted.slice(0, k).map(x => x.idx); +}; + +export const calculateRecall = (actual: number[], expected: number[], k: number): number => { + let score = 0; + for (let i = 0; i < k; i++) { + if (actual[i] === expected[i]) { + score += 1; + } else if (expected.includes(actual[i] ?? 0)) { + score += 0.5; + } + } + return score / k; +}; + +export function cosineSimilarity(a: number[], b: number[]): number { + const dotProduct = a.reduce((sum, val, i) => sum + (val ?? 0) * (b[i] ?? 0), 0); + const normA = Math.sqrt(a.reduce((sum, val) => sum + val * val, 0)); + const normB = Math.sqrt(b.reduce((sum, val) => sum + val * val, 0)); + return dotProduct / (normA * normB); +} + +export const formatTable = (data: any[], columns: string[]) => { + const colWidths = columns.map(col => + Math.max( + col.length, + ...data.map(row => { + const value = row[col]; + return value === undefined || value === null ? '-'.length : value.toString().length; + }), + ), + ); + + const topBorder = '┌' + colWidths.map(w => '─'.repeat(w)).join('┬') + '┐'; + const headerSeparator = '├' + colWidths.map(w => '─'.repeat(w)).join('┼') + '┤'; + const bottomBorder = '└' + colWidths.map(w => '─'.repeat(w)).join('┴') + '┘'; + + const header = '│' + columns.map((col, i) => col.padEnd(colWidths[i] ?? 0)).join('│') + '│'; + const rows = data.map( + row => + '│' + + columns + .map((col, i) => { + const value = row[col]; + const displayValue = value === undefined || value === null ? '-' : value.toString(); + return displayValue.padEnd(colWidths[i]); + }) + .join('│') + + '│', + ); + + return [topBorder, header, headerSeparator, ...rows, bottomBorder].join('\n'); +}; + +export const groupBy = ( + array: T[], + key: K | ((item: T) => string), + reducer?: (group: T[]) => any, +): Record => { + const grouped = array.reduce( + (acc, item) => { + const value = typeof key === 'function' ? key(item) : item[key]; + if (!acc[value as any]) acc[value as any] = []; + acc[value as any]?.push(item); + return acc; + }, + {} as Record, + ); + + if (reducer) { + return Object.fromEntries(Object.entries(grouped).map(([key, group]) => [key, reducer(group)])); + } + + return grouped; +}; + +export const calculateTimeout = (dimension: number, size: number, k: number) => { + let timeout = 600000; + if (dimension >= 1024) timeout *= 3; + else if (dimension >= 384) timeout *= 1.5; + if (size >= 10000) timeout *= 2; + if (k >= 75) timeout *= 1.5; + return timeout * 5; +}; + +export const baseTestConfigs = { + smokeTests: [{ dimension: 384, size: 1_000, k: 10, queryCount: 10 }], + '64': [ + { dimension: 64, size: 100, k: 10, queryCount: 30 }, + { dimension: 64, size: 100, k: 25, queryCount: 30 }, + { dimension: 64, size: 100, k: 50, queryCount: 30 }, + { dimension: 64, size: 100, k: 100, queryCount: 30 }, + { dimension: 64, size: 1_000, k: 10, queryCount: 30 }, + { dimension: 64, size: 1_000, k: 25, queryCount: 30 }, + { dimension: 64, size: 1_000, k: 50, queryCount: 30 }, + { dimension: 64, size: 1_000, k: 100, queryCount: 30 }, + { dimension: 64, size: 10_000, k: 10, queryCount: 30 }, + { dimension: 64, size: 100_000, k: 10, queryCount: 30 }, + { dimension: 64, size: 100_000, k: 25, queryCount: 30 }, + { dimension: 64, size: 100_000, k: 50, queryCount: 30 }, + { dimension: 64, size: 100_000, k: 100, queryCount: 30 }, + { dimension: 64, size: 500_000, k: 10, queryCount: 30 }, + { dimension: 64, size: 1_000_000, k: 10, queryCount: 30 }, + ], + '384': [ + { dimension: 384, size: 100, k: 10, queryCount: 30 }, + { dimension: 384, size: 100, k: 25, queryCount: 30 }, + { dimension: 384, size: 100, k: 50, queryCount: 30 }, + { dimension: 384, size: 100, k: 100, queryCount: 30 }, + { dimension: 384, size: 1_000, k: 10, queryCount: 30 }, + { dimension: 384, size: 1_000, k: 25, queryCount: 30 }, + { dimension: 384, size: 1_000, k: 50, queryCount: 30 }, + { dimension: 384, size: 1_000, k: 100, queryCount: 30 }, + { dimension: 384, size: 10_000, k: 10, queryCount: 30 }, + { dimension: 384, size: 100_000, k: 10, queryCount: 30 }, + { dimension: 384, size: 100_000, k: 25, queryCount: 30 }, + { dimension: 384, size: 100_000, k: 50, queryCount: 30 }, + { dimension: 384, size: 100_000, k: 100, queryCount: 30 }, + { dimension: 384, size: 500_000, k: 10, queryCount: 30 }, + ], + '1024': [ + { dimension: 1024, size: 100, k: 10, queryCount: 30 }, + { dimension: 1024, size: 100, k: 25, queryCount: 30 }, + { dimension: 1024, size: 100, k: 50, queryCount: 30 }, + { dimension: 1024, size: 100, k: 100, queryCount: 30 }, + { dimension: 1024, size: 1_000, k: 10, queryCount: 30 }, + { dimension: 1024, size: 1_000, k: 25, queryCount: 30 }, + { dimension: 1024, size: 1_000, k: 50, queryCount: 30 }, + { dimension: 1024, size: 1_000, k: 100, queryCount: 30 }, + { dimension: 1024, size: 10_000, k: 10, queryCount: 30 }, + { dimension: 1024, size: 10_000, k: 25, queryCount: 30 }, + { dimension: 1024, size: 10_000, k: 50, queryCount: 30 }, + { dimension: 1024, size: 10_000, k: 100, queryCount: 30 }, + { dimension: 1024, size: 50_000, k: 10, queryCount: 30 }, + { dimension: 1024, size: 50_000, k: 25, queryCount: 30 }, + ], + stressTests: [ + // Maximum load + { dimension: 512, size: 1_000_000, k: 50, queryCount: 5 }, + + // Dense search + { dimension: 256, size: 1_000_000, k: 100, queryCount: 5 }, + + { dimension: 1024, size: 500_000, k: 50, queryCount: 5 }, + ], +}; + +export interface TestConfig { + dimension: number; + size: number; + k: number; + queryCount: number; +} + +export async function warmupQuery(vectorDB: PgVector, indexName: string, dimension: number, k: number) { + const warmupVector = generateRandomVectors(1, dimension)[0] as number[]; + await vectorDB.query(indexName, warmupVector, k); +} + +export async function measureLatency(fn: () => Promise): Promise<[number, T]> { + const start = performance.now(); + const result = await fn(); + const end = performance.now(); + return [end - start, result]; +} + +export const getListCount = (indexConfig: IndexConfig, size: number): number | undefined => { + if (indexConfig.type !== 'ivfflat') return undefined; + if (indexConfig.ivf?.lists) return indexConfig.ivf.lists; + return Math.max(100, Math.min(4000, Math.floor(Math.sqrt(size) * 2))); +}; + +export const getHNSWConfig = (indexConfig: IndexConfig): { m: number; efConstruction: number } => { + return { + m: indexConfig.hnsw?.m ?? 8, + efConstruction: indexConfig.hnsw?.efConstruction ?? 32, + }; +}; + +export function getSearchEf(k: number, m: number) { + return { + default: Math.max(k, m * k), // Default calculation + lower: Math.max(k, (m * k) / 2), // Lower quality, faster + higher: Math.max(k, m * k * 2), // Higher quality, slower + }; +} + +export function getIndexDescription({ + type, + hnsw, +}: { + type: IndexType; + hnsw: { m: number; efConstruction: number }; +}): string { + if (type === 'hnsw') { + return `HNSW(m=${hnsw.m},ef=${hnsw.efConstruction})`; + } + + if (type === 'ivfflat') { + return `IVF`; + } + + return 'Flat'; +} diff --git a/stores/pg/src/vector/types.ts b/stores/pg/src/vector/types.ts new file mode 100644 index 0000000000..18a3faf35d --- /dev/null +++ b/stores/pg/src/vector/types.ts @@ -0,0 +1,16 @@ +export type IndexType = 'ivfflat' | 'hnsw' | 'flat'; + +interface IVFConfig { + lists?: number; +} + +interface HNSWConfig { + m?: number; // Max number of connections (default: 16) + efConstruction?: number; // Build-time complexity (default: 64) +} + +export interface IndexConfig { + type?: IndexType; + ivf?: IVFConfig; + hnsw?: HNSWConfig; +} diff --git a/stores/pg/src/vector/vector.performance.test.ts b/stores/pg/src/vector/vector.performance.test.ts new file mode 100644 index 0000000000..383f601036 --- /dev/null +++ b/stores/pg/src/vector/vector.performance.test.ts @@ -0,0 +1,371 @@ +import pg from 'pg'; +import { describe, it, beforeAll, afterAll, beforeEach, afterEach } from 'vitest'; + +import { + baseTestConfigs, + TestConfig, + TestResult, + calculateTimeout, + generateRandomVectors, + findNearestBruteForce, + calculateRecall, + formatTable, + groupBy, + measureLatency, + getListCount, + getSearchEf, + generateClusteredVectors, + generateSkewedVectors, + getHNSWConfig, + getIndexDescription, +} from './performance.helpers'; +import { IndexConfig, IndexType } from './types'; + +import { PgVector } from '.'; + +interface IndexTestConfig extends IndexConfig { + type: IndexType; + rebuild?: boolean; +} + +class PGPerformanceVector extends PgVector { + private perfPool: pg.Pool; + + constructor(connectionString: string) { + super(connectionString); + + const basePool = new pg.Pool({ + connectionString, + max: 20, // Maximum number of clients in the pool + idleTimeoutMillis: 30000, // Close idle connections after 30 seconds + connectionTimeoutMillis: 2000, // Fail fast if can't connect + }); + + this.perfPool = basePool; + } + + async bulkUpsert(indexName: string, vectors: number[][], metadata?: any[], ids?: string[]) { + const client = await this.perfPool.connect(); + try { + await client.query('BEGIN'); + const vectorIds = ids || vectors.map(() => crypto.randomUUID()); + + // Same query structure as upsert, just using unnest for bulk operation + const query = ` + INSERT INTO ${indexName} (vector_id, embedding, metadata) + SELECT * FROM unnest( + $1::text[], + $2::vector[], + $3::jsonb[] + ) + ON CONFLICT (vector_id) + DO UPDATE SET + embedding = EXCLUDED.embedding, + metadata = EXCLUDED.metadata + RETURNING embedding::text + `; + + // Same parameter structure as upsert, just as arrays + await client.query(query, [ + vectorIds, + vectors.map(v => `[${v.join(',')}]`), + (metadata || vectors.map(() => ({}))).map(m => JSON.stringify(m)), + ]); + await client.query('COMMIT'); + return vectorIds; + } catch (error) { + await client.query('ROLLBACK'); + throw error; + } finally { + client.release(); + } + } +} + +const warmupCache = new Map(); +async function smartWarmup( + vectorDB: PGPerformanceVector, + testIndexName: string, + indexType: string, + dimension: number, + k: number, +) { + const cacheKey = `${dimension}-${k}-${indexType}`; + if (!warmupCache.has(cacheKey)) { + console.log(`Warming up ${indexType} index for ${dimension}d vectors, k=${k}`); + const warmupVector = generateRandomVectors(1, dimension)[0] as number[]; + await vectorDB.query(testIndexName, warmupVector, k); + warmupCache.set(cacheKey, true); + } +} + +const connectionString = process.env.DB_URL || `postgresql://postgres:postgres@localhost:5435/mastra`; +describe('PostgreSQL Index Performance', () => { + let vectorDB: PGPerformanceVector; + const testIndexName = 'test_index_performance'; + const results: TestResult[] = []; + + const indexConfigs: IndexTestConfig[] = [ + { type: 'flat' }, // Test flat/linear search as baseline + { type: 'ivfflat', ivf: { lists: 100 } }, // Test IVF with fixed lists + { type: 'ivfflat', rebuild: true }, // Test IVF with calculated lists and rebuild + { type: 'hnsw' }, // Test HNSW with default parameters + { type: 'hnsw', hnsw: { m: 16, efConstruction: 64 } }, // Test HNSW with custom parameters + ]; + beforeAll(async () => { + // Initialize PGPerformanceVector + vectorDB = new PGPerformanceVector(connectionString); + }); + beforeEach(async () => { + await vectorDB.deleteIndex(testIndexName); + }); + + afterEach(async () => { + await vectorDB.deleteIndex(testIndexName); + }); + + afterAll(async () => { + await vectorDB.disconnect(); + analyzeResults(results); + }); + + // Combine all test configs + const allConfigs: TestConfig[] = [ + ...baseTestConfigs['64'], + ...baseTestConfigs['384'], + ...baseTestConfigs['1024'], + ...baseTestConfigs.smokeTests, + ...baseTestConfigs.stressTests, + ]; + + // For each index config + for (const indexConfig of indexConfigs) { + const indexType = indexConfig.type; + const rebuild = indexConfig.rebuild ?? false; + const hnswConfig = getHNSWConfig(indexConfig); + const indexDescription = getIndexDescription({ + type: indexType, + hnsw: hnswConfig, + }); + + describe(`Index: ${indexDescription}`, () => { + for (const testConfig of allConfigs) { + const timeout = calculateTimeout(testConfig.dimension, testConfig.size, testConfig.k); + const testDesc = `dim=${testConfig.dimension} size=${testConfig.size} k=${testConfig.k}`; + + for (const [distType, generator] of Object.entries(distributions)) { + it( + testDesc, + async () => { + const testVectors = generator(testConfig.size, testConfig.dimension); + const queryVectors = generator(testConfig.queryCount, testConfig.dimension); + + // Create index and insert vectors + const lists = getListCount(indexConfig, testConfig.size); + + await vectorDB.createIndex( + testIndexName, + testConfig.dimension, + 'cosine', + indexConfig, + indexType === 'ivfflat', + ); + + console.log( + `Batched bulk upserting ${testVectors.length} ${distType} vectors into index ${testIndexName}`, + ); + const batchSizes = splitIntoRandomBatches(testConfig.size, testConfig.dimension); + await batchedBulkUpsert(vectorDB, testIndexName, testVectors, batchSizes); + if (indexType === 'hnsw' || rebuild) { + console.log('rebuilding index'); + await vectorDB.defineIndex(testIndexName, 'cosine', indexConfig); + console.log('index rebuilt'); + } + await smartWarmup(vectorDB, testIndexName, indexType, testConfig.dimension, testConfig.k); + + // For HNSW, test different EF values + const efValues = indexType === 'hnsw' ? getSearchEf(testConfig.k, hnswConfig.m) : { default: undefined }; + + for (const [efType, ef] of Object.entries(efValues)) { + const recalls: number[] = []; + const latencies: number[] = []; + + for (const queryVector of queryVectors) { + const expectedNeighbors = findNearestBruteForce(queryVector, testVectors, testConfig.k); + + const [latency, actualResults] = await measureLatency(async () => + vectorDB.query( + testIndexName, + queryVector, + testConfig.k, + undefined, + false, + 0, + { ef }, // For HNSW + ), + ); + + const actualNeighbors = actualResults.map(r => r.metadata?.index); + const recall = calculateRecall(actualNeighbors, expectedNeighbors, testConfig.k); + recalls.push(recall); + latencies.push(latency); + } + + const sorted = [...latencies].sort((a, b) => a - b); + results.push({ + distribution: distType, + dimension: testConfig.dimension, + size: testConfig.size, + k: testConfig.k, + type: indexType, + metrics: { + recall: recalls.length > 0 ? recalls.reduce((a, b) => a + b, 0) / recalls.length : 0, + minRecall: Math.min(...recalls), + maxRecall: Math.max(...recalls), + latency: { + p50: sorted[Math.floor(sorted.length * 0.5)], + p95: sorted[Math.floor(sorted.length * 0.95)], + ...(indexType === 'ivfflat' && { + lists, + vectorsPerList: Math.round(testConfig.size / (lists || 1)), + }), + ...(indexType === 'hnsw' && { + m: hnswConfig.m, + efConstruction: hnswConfig.efConstruction, + ef, + efType, + }), + }, + ...(indexType === 'ivfflat' && { + clustering: { + numLists: lists, + avgVectorsPerList: testConfig.size / (lists || 1), + recommendedLists: Math.floor(Math.sqrt(testConfig.size)), + distribution: distType, + }, + }), + }, + }); + } + }, + timeout, + ); + } + } + }); + } +}); + +function analyzeResults(results: TestResult[]) { + const byType = groupBy(results, (r: TestResult) => r.type); + Object.entries(byType).forEach(([type, typeResults]) => { + console.log(`\n=== ${type.toUpperCase()} Index Analysis ===\n`); + + const byDimension = groupBy(typeResults, (r: TestResult) => r.dimension.toString()); + Object.entries(byDimension).forEach(([dim, dimResults]) => { + console.log(`\n--- Analysis for ${dim} dimensions ---\n`); + + // Combined Performance Analysis + const columns = ['Distribution', 'Dataset Size', 'K']; + if (type === 'hnsw') { + columns.push('M', 'EF Construction', 'EF', 'EF Type'); + } else if (type === 'ivfflat') { + columns.push('Lists', 'Vectors/List'); + } + columns.push('Min Recall', 'Avg Recall', 'Max Recall', 'P50 (ms)', 'P95 (ms)'); + + const performanceData = Object.values( + groupBy( + dimResults, + (r: any) => `${r.size}-${r.k}-${type === 'ivfflat' ? r.metrics.latency.lists : r.metrics.latency.m}`, + (results: any[]) => { + const sortedResults = [...results].sort( + (a, b) => + ['random', 'clustered', 'skewed', 'mixed'].indexOf(a.distribution) - + ['random', 'clustered', 'skewed', 'mixed'].indexOf(b.distribution), + ); + return sortedResults.map(result => ({ + Distribution: result.distribution, + 'Dataset Size': result.size, + K: result.k, + ...(type === 'ivfflat' + ? { + Lists: result.metrics.latency.lists, + 'Vectors/List': result.metrics.latency.vectorsPerList, + } + : {}), + ...(type === 'hnsw' + ? { + M: result.metrics.latency.m, + 'EF Construction': result.metrics.latency.efConstruction, + EF: result.metrics.latency.ef, + 'EF Type': result.metrics.latency.efType, + } + : {}), + 'Min Recall': result.metrics.minRecall.toFixed(3), + 'Avg Recall': result.metrics.recall.toFixed(3), + 'Max Recall': result.metrics.maxRecall.toFixed(3), + 'P50 (ms)': result.metrics.latency.p50.toFixed(2), + 'P95 (ms)': result.metrics.latency.p95.toFixed(2), + })); + }, + ), + ).flat(); + + console.log(formatTable(performanceData, columns)); + }); + }); +} + +function splitIntoRandomBatches(total: number, dimension: number): number[] { + const batches: number[] = []; + let remaining = total; + + const batchRange = dimension === 1024 ? 5000 : 15000; + + while (remaining > 0) { + const batchSize = Math.min(remaining, batchRange + Math.floor(Math.random() * batchRange)); + batches.push(batchSize); + remaining -= batchSize; + } + + return batches; +} + +async function batchedBulkUpsert( + vectorDB: PGPerformanceVector, + testIndexName: string, + vectors: number[][], + batchSizes: number[], +) { + let offset = 0; + const vectorIds = vectors.map((_, idx) => `vec_${idx}`); + const metadata = vectors.map((_, idx) => ({ index: idx })); + + for (const size of batchSizes) { + const batch = vectors.slice(offset, offset + size); + const batchIds = vectorIds.slice(offset, offset + size); + const batchMetadata = metadata.slice(offset, offset + size); + await vectorDB.bulkUpsert(testIndexName, batch, batchMetadata, batchIds); + offset += size; + console.log(`${offset} of ${vectors.length} vectors upserted`); + } +} + +const distributions = { + random: generateRandomVectors, + clustered: generateClusteredVectors, + skewed: generateSkewedVectors, + mixed: (size: number, dimension: number) => { + const generators = [generateRandomVectors, generateClusteredVectors, generateSkewedVectors]; + const batchSizes = splitIntoRandomBatches(size, dimension); + + let vectors: number[][] = []; + for (const batchSize of batchSizes) { + const generator = generators[Math.floor(Math.random() * generators.length)]; + vectors = vectors.concat(generator(batchSize, dimension)); + } + + return vectors; + }, +}; diff --git a/stores/pg/vitest.config.ts b/stores/pg/vitest.config.ts index 46b9d5ef72..b293c3a193 100644 --- a/stores/pg/vitest.config.ts +++ b/stores/pg/vitest.config.ts @@ -4,6 +4,7 @@ export default defineConfig({ test: { environment: 'node', include: ['src/**/*.test.ts'], + exclude: ['src/**/*.performance.test.ts'], coverage: { reporter: ['text', 'json', 'html'], }, diff --git a/stores/pg/vitest.perf.config.ts b/stores/pg/vitest.perf.config.ts new file mode 100644 index 0000000000..a913e2eabc --- /dev/null +++ b/stores/pg/vitest.perf.config.ts @@ -0,0 +1,8 @@ +import { defineConfig } from 'vitest/config'; + +export default defineConfig({ + test: { + environment: 'node', + include: ['src/**/*.performance.test.ts'], + }, +});