diff --git a/vsb/databases/__init__.py b/vsb/databases/__init__.py new file mode 100644 index 0000000..e47ac98 --- /dev/null +++ b/vsb/databases/__init__.py @@ -0,0 +1,19 @@ +from enum import Enum +from .base import DB + + +class Database(Enum): + """Set of supported database backends, the value is the string used to + specify via --database=""" + Pinecone = "pinecone" + PGVector = "pgvector" + + def build(self) -> DB: + """Construct an instance of """ + match self: + case Database.Pinecone: + from .pinecone.pinecone import PineconeDB + return PineconeDB() + case Database.PGVector: + from .pgvector.pgvector import PGVectorDB + return PGVectorDB() diff --git a/vsb/databases/base.py b/vsb/databases/base.py new file mode 100644 index 0000000..370d3e3 --- /dev/null +++ b/vsb/databases/base.py @@ -0,0 +1,45 @@ +from abc import ABC, abstractmethod +from enum import Enum, auto + + +class Request(Enum): + Upsert = auto() + Search = auto() + + +class Index(ABC): + """Abstract class with represents an index or one or more vector records. + Specific implementations should subclass this and implement all abstract + methods. + Instance of this (derived) class are typically created via the corresponding + (concrete) DB create_index() method. + """ + + @abstractmethod + def upsert(self, ident, vector, metadata): + raise NotImplementedError + + @abstractmethod + def search(self, query_vector): + raise NotImplementedError + + def do_request(self, request): + print(f"Got request: {request}") + match request.operation: + case Request.Upsert: + self.upsert(request.id, request.vector, request.metadata) + return + case Request.Search: + response = self.search(request.q_vector) + # Record timing, calculate Recall etc. + + +class DB(ABC): + """Abstract class which represents a database which can store vector + records in one or more Indexes. Specific Vector DB implementations should + subclass this and implement all abstract methods. + """ + + @abstractmethod + def create_index(self, tenant: str) -> Index: + raise NotImplementedError diff --git a/vsb/databases/pgvector/pgvector.py b/vsb/databases/pgvector/pgvector.py new file mode 100644 index 0000000..e64b49c --- /dev/null +++ b/vsb/databases/pgvector/pgvector.py @@ -0,0 +1,12 @@ +from ..vectordb import VectorDB + + +class PGVectorDB(VectorDB): + def __init__(self): + print("PGVectorDB::__init__") + + def upsert(self, ident, vector, metadata): + pass + + def search(self, query_vector): + pass diff --git a/vsb/databases/pinecone/pinecone.py b/vsb/databases/pinecone/pinecone.py new file mode 100644 index 0000000..095cc73 --- /dev/null +++ b/vsb/databases/pinecone/pinecone.py @@ -0,0 +1,21 @@ +from ..base import DB, Index + + +class PineconeIndex(Index): + def __init__(self, tenent: str): + pass + + def upsert(self, ident, vector, metadata): + pass + + def search(self, query_vector): + pass + + +class PineconeDB(DB): + def __init__(self): + print("PineconeDB::__init__") + + def create_index(self, tenant: str) -> Index: + return PineconeIndex() + diff --git a/vsb/vsb.py b/vsb/vsb.py new file mode 100755 index 0000000..c807586 --- /dev/null +++ b/vsb/vsb.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 + +from enum import Enum, auto +from locust import User, events, task, TaskSet +from locust.env import Environment +from locust.exception import StopUser +from locust.log import setup_logging +from locust.runners import WorkerRunner +from locust.stats import stats_history, stats_printer + +from databases import Database +from workloads import Workload +import argparse +import gevent +import logging +import sys + + +setup_logging("INFO") + + +class VectorSearchUser(User): + class Phase(Enum): + Load = auto() + Run = auto() + + """Represents a single user (aka client) performing requests against + a particular Backend.""" + def __init__(self, environment): + super().__init__(environment) + self.count = 0 + self.database = Database(environment.options.database).build() + self.workload = Workload(environment.options.workload).build() + self.phase = VectorSearchUser.Phase.Load + + @task + def request(self): + match self.phase: + case VectorSearchUser.Phase.Load: + self.do_load() + case VectorSearchUser.Phase.Run: + self.do_run() + + def do_load(self): + if batch := self.workload.next_record_batch(): + print(f"Batch: {batch}") + print(f"Loading batch of size:", len(batch)) + self.database.upsert(batch) + else: + # No more data to load, advance to Run phase. + self.phase = VectorSearchUser.Phase.Run + + def do_run(self): + if self.workload.execute_next_request(self.workload): + print(f"Issue request {self.count}: request") + else: + runner = self.environment.runner + logging.info(f"User count: {runner.user_count}") + if runner.user_count == 1: + logging.info("Last user stopped, quitting runner") + if isinstance(runner, WorkerRunner): + runner._send_stats() # send a final report + # need to trigger this in a separate greenlet, in case test_stop handlers do something async + gevent.spawn_later(0.1, runner.quit) + raise StopUser() + + + +def main(): + parser = argparse.ArgumentParser( + prog='VCB', + description='Vector Search Bench') + parser.add_argument("--database", required=True, + choices=tuple(e.value for e in Database)) + parser.add_argument("--workload", required=True, + choices=tuple(e.value for e in Workload)) + + options = parser.parse_args() + + # setup Environment and Runner + env = Environment(user_classes=[VectorSearchUser], events=events) + env.options = options + + runner = env.create_local_runner() + + # start a WebUI instance + #web_ui = env.create_web_ui("127.0.0.1", 8089) + + # execute init event handlers (only really needed if you have registered any) + env.events.init.fire(environment=env, runner=runner)# , web_ui=web_ui) + + # start a greenlet that periodically outputs the current stats + gevent.spawn(stats_printer(env.stats)) + + # start a greenlet that save current stats to history + gevent.spawn(stats_history, env.runner) + + # start the test + runner.start(1, spawn_rate=10) + + # in 30 seconds stop the runner + gevent.spawn_later(30, runner.quit) + + # wait for the greenlets + runner.greenlet.join() + + # stop the web server for good measures +# web_ui.stop() + + +if __name__ == "__main__": + main() diff --git a/vsb/workloads/__init__.py b/vsb/workloads/__init__.py new file mode 100644 index 0000000..4ab9632 --- /dev/null +++ b/vsb/workloads/__init__.py @@ -0,0 +1,17 @@ +from enum import Enum +from .base import VectorWorkload + + +class Workload(Enum): + """Set of supported workloads, the value is the string used to + specify via --benchmark= + """ + + MNIST = "mnist" + + def build(self) -> VectorWorkload: + """Construct an instance of Benchmark based on the value of the enum""" + match self: + case Workload.MNIST: + from .mnist.mnist import MNIST + return MNIST() diff --git a/vsb/workloads/base.py b/vsb/workloads/base.py new file mode 100644 index 0000000..9f42b13 --- /dev/null +++ b/vsb/workloads/base.py @@ -0,0 +1,19 @@ +from abc import ABC, abstractmethod + + +class VectorWorkload(ABC): + @abstractmethod + def next_record_batch(self): + """ + For initial dataset ingest, returns the next + :return: + """ + raise NotImplementedError + + @abstractmethod + def execute_next_request(self, db: 'DB') -> bool: + """Obtain the next request for this workload and execute against the given + database. Returns true if execution should continue after this request, + or false if the workload is complete. + """ + raise NotImplementedError diff --git a/dataset.py b/vsb/workloads/dataset.py similarity index 100% rename from dataset.py rename to vsb/workloads/dataset.py diff --git a/vsb/workloads/mnist/mnist.py b/vsb/workloads/mnist/mnist.py new file mode 100644 index 0000000..e9323b9 --- /dev/null +++ b/vsb/workloads/mnist/mnist.py @@ -0,0 +1,23 @@ +from ..base import VectorWorkload +from ..dataset import Dataset + + +class MNIST(VectorWorkload): + def __init__(self): + print("MNIST::__init__") + self.dataset = Dataset(name="mnist") + self.dataset.load_documents() + print(self.dataset.documents) + self.records = Dataset.split_dataframe(self.dataset.documents, 100) + print(self.records) + self.operation_count = 0 + self.operation_limit = 10 + + def next_record_batch(self): + print("MNIST::next_record_batch") + + def execute_next_request(self, db) ->bool: + print("MNIST::execute_next_request") + self.operation_count += 1 + return self.operation_count < self.operation_limit +