diff --git a/README.md b/README.md index 1c810631..9ccee3fc 100644 --- a/README.md +++ b/README.md @@ -131,8 +131,4 @@ load::time = 0.0052424 different datasets? For simpler cases that may work, but there might be some specific problems that won't be possible for each dataset. 4. How do we handle engine errors? -5. Dataset should be also represented by a class instance: - - that will give a possibility to not assume the filenames in scenario - - it will be easier to deal with paths - - The dataset should also have a file-based config, like engine. +5. The dataset should also have a file-based config, like engine. diff --git a/benchmark/__init__.py b/benchmark/__init__.py index e69de29b..7877fef4 100644 --- a/benchmark/__init__.py +++ b/benchmark/__init__.py @@ -0,0 +1,5 @@ +from pathlib import Path + +# Base directory point to the main directory of the project, so all the data +# loaded from files can refer to it as a root directory +BASE_DIRECTORY = Path(__file__).parent.parent diff --git a/benchmark/backend/__init__.py b/benchmark/backend/__init__.py index 439a98c3..56646a8f 100644 --- a/benchmark/backend/__init__.py +++ b/benchmark/backend/__init__.py @@ -1,9 +1,10 @@ import abc import tempfile from pathlib import Path -from typing import Text, Union +from typing import Text, Union, Generator -PathLike = Union[Text, Path] +from benchmark.engine import Engine +from benchmark.types import PathLike class Container(abc.ABC): @@ -12,6 +13,19 @@ class Container(abc.ABC): server or client of the engine. """ + def __init__(self): + self.volumes = [] + + def mount(self, source: PathLike, target: PathLike): + """ + Add provided source path as a target in the container to be mounted + when .run method is called. + :param source: + :param target: + :return: + """ + self.volumes.append(f"{source}:{target}") + def run(self): """ Start the container using the backend @@ -19,6 +33,20 @@ def run(self): """ ... + def remove(self): + """ + Stop and remove the container using backend + :return: + """ + ... + + def logs(self) -> Generator[Union[Text, bytes], None, None]: + """ + Iterate through all the logs produced by the container + :return: + """ + ... + def is_ready(self) -> bool: """ A healthcheck, making sure the container is properly set up. @@ -63,8 +91,8 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.temp_dir.__exit__(exc_type, exc_val, exc_tb) - def initialize_server(self, engine: Text) -> Server: + def initialize_server(self, engine: Engine) -> Server: ... - def initialize_client(self, engine: Text) -> Client: + def initialize_client(self, engine: Engine) -> Client: ... diff --git a/benchmark/backend/docker.py b/benchmark/backend/docker.py index 549f8e11..ff37bafa 100644 --- a/benchmark/backend/docker.py +++ b/benchmark/backend/docker.py @@ -1,67 +1,37 @@ -import json -from dataclasses import dataclass -from pathlib import Path -from typing import Text, Union, Optional, Dict, List +from typing import Text, Union, Optional, List, Generator -from benchmark.backend import Backend, PathLike, Server, Client, Container +from benchmark import BASE_DIRECTORY +from benchmark.backend import Backend, Server, Client, Container from docker.models import containers import logging import docker +from benchmark.engine import Engine, ContainerConf +from benchmark.types import PathLike logger = logging.getLogger(__name__) -@dataclass -class DockerContainerConf: - engine: Text - image: Optional[Text] = None - dockerfile: Optional[Text] = None - environment: Optional[Dict[Text, Union[Text, int, bool]]] = None - main: Optional[Text] = None - hostname: Optional[Text] = None - - @classmethod - def from_file( - cls, path: Text, engine: Text, container: Text = "server" - ) -> "DockerContainerConf": - with open(path, "r") as fp: - conf = json.load(fp) - return DockerContainerConf(engine=engine, **conf[container]) - - def dockerfile_path(self, root_dir: Path) -> Path: - """ - Calculates the absolute path to the directory containing the dockerfile, - using given root directory as a base. - :param root_dir: - :return: - """ - return root_dir / "engine" / self.engine - - class DockerContainer(Container): def __init__( self, - container_conf: DockerContainerConf, + container_conf: ContainerConf, docker_backend: "DockerBackend", ): + super().__init__() self.container_conf = container_conf - self.docker_backend = docker_backend - self.container: containers.Container = None - self.volumes = [] - - def mount(self, source: PathLike, target: PathLike): - self.volumes.append(f"{source}:{target}") + self._docker_backend = docker_backend + self._docker_container: containers.Container = None def run(self): # Build the dockerfile if it was provided as a container image. This is # typically done for the clients, as they may require some custom setup if self.container_conf.dockerfile is not None: dockerfile_path = self.container_conf.dockerfile_path( - self.docker_backend.root_dir + self._docker_backend.root_dir ) - image, logs = self.docker_backend.docker_client.images.build( + image, logs = self._docker_backend.docker_client.images.build( path=str(dockerfile_path), dockerfile=self.container_conf.dockerfile, ) @@ -75,23 +45,30 @@ def run(self): # Create the container either using the image or dockerfile, if that was # provided. The dockerfile has a preference over the image name. logger.debug("Running a container using image %s", self.container_conf.image) - self.container = self.docker_backend.docker_client.containers.run( + self._docker_container = self._docker_backend.docker_client.containers.run( self.container_conf.image, detach=True, volumes=self.volumes, environment=self.container_conf.environment, hostname=self.container_conf.hostname, - network=self.docker_backend.network.name, + network=self._docker_backend.network.name, ) # TODO: remove the image on exit - def logs(self): - for log_entry in self.container.logs(stream=True, follow=True): + def remove(self): + # Sometimes the container has been created but not launched, so the + # underlying Docker container won't be created + if self._docker_container is not None: + self._docker_container.stop() + self._docker_container.remove() + + def logs(self) -> Generator[Union[Text, bytes], None, None]: + for log_entry in self._docker_container.logs(stream=True, follow=True): yield log_entry def is_ready(self) -> bool: - # TODO: implement the healthcheck + # TODO: implement the healthcheck, but probably on engine level return True @@ -102,7 +79,7 @@ class DockerServer(Server, DockerContainer): class DockerClient(Client, DockerContainer): def load_data(self, filename: Text): command = f"{self.container_conf.main} load {filename}" - _, generator = self.container.exec_run(command, stream=True) + _, generator = self._docker_container.exec_run(command, stream=True) return generator @@ -116,7 +93,7 @@ class DockerBackend(Backend): def __init__( self, - root_dir: Union[PathLike], + root_dir: Union[PathLike] = BASE_DIRECTORY, docker_client: Optional[docker.DockerClient] = None, ): super().__init__(root_dir) @@ -137,32 +114,20 @@ def __exit__(self, exc_type, exc_val, exc_tb): # Kill all the containers on the context manager exit, so there are no # orphaned containers once the benchmark is finished for container in self.containers: - container.container.kill() - - # Remove the data volume as well, so there won't be any volume left - # self.data_volume.remove() + container.remove() # Finally get rid of the network as well self.network.remove() - def initialize_server(self, engine: Text) -> Server: - server_conf = DockerContainerConf.from_file( - self.root_dir / "engine" / engine / "config.json", - engine=engine, - container="server", - ) + def initialize_server(self, engine: Engine) -> Server: + server_conf = engine.get_config("server") logger.info("Initializing %s server: %s", engine, server_conf) server = DockerServer(server_conf, self) self.containers.append(server) return server - def initialize_client(self, engine: Text) -> Client: - # TODO: Create a docker volume so the data is available on client instances - client_conf = DockerContainerConf.from_file( - self.root_dir / "engine" / engine / "config.json", - engine=engine, - container="client", - ) + def initialize_client(self, engine: Engine) -> Client: + client_conf = engine.get_config("client") logger.info("Initializing %s client: %s", engine, client_conf) client = DockerClient(client_conf, self) self.containers.append(client) diff --git a/benchmark/dataset.py b/benchmark/dataset.py new file mode 100644 index 00000000..6f7ac2a0 --- /dev/null +++ b/benchmark/dataset.py @@ -0,0 +1,18 @@ +from pathlib import Path +from typing import Text + +from benchmark import BASE_DIRECTORY + + +class Dataset: + + @classmethod + def from_name(cls, name: Text) -> "Dataset": + # TODO: load dataset info from given path + return Dataset(name) + + def __init__(self, name: Text): + self.name = name + + def path(self) -> Path: + return BASE_DIRECTORY / "dataset" / self.name diff --git a/benchmark/engine.py b/benchmark/engine.py new file mode 100644 index 00000000..c78b5698 --- /dev/null +++ b/benchmark/engine.py @@ -0,0 +1,46 @@ +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Text, Optional, Dict, Union + +from benchmark import BASE_DIRECTORY + + +@dataclass +class ContainerConf: + engine: Text + image: Optional[Text] = None + dockerfile: Optional[Text] = None + environment: Optional[Dict[Text, Union[Text, int, bool]]] = None + main: Optional[Text] = None + hostname: Optional[Text] = None + + def dockerfile_path(self, root_dir: Path) -> Path: + """ + Calculates the absolute path to the directory containing the dockerfile, + using given root directory as a base. + :param root_dir: + :return: + """ + return BASE_DIRECTORY / "engine" / self.engine + + +class Engine: + """ + An abstraction over vector database engine. + """ + + @classmethod + def from_name(cls, name: Text) -> "Engine": + container_configs = {} + with open(BASE_DIRECTORY / "engine" / name / "config.json", "r") as fp: + config = json.load(fp) + for container_name, conf in config.items(): + container_configs[container_name] = ContainerConf(engine=name, **conf) + return Engine(container_configs) + + def __init__(self, container_configs: Dict[Text, ContainerConf]): + self.container_configs = container_configs + + def get_config(self, container_name: Text) -> ContainerConf: + return self.container_configs.get(container_name) diff --git a/benchmark/scenario/__init__.py b/benchmark/scenario.py similarity index 68% rename from benchmark/scenario/__init__.py rename to benchmark/scenario.py index 32f6fa02..5076cb84 100644 --- a/benchmark/scenario/__init__.py +++ b/benchmark/scenario.py @@ -3,6 +3,10 @@ from collections import defaultdict from typing import Text, Union, Dict +from benchmark.backend import Backend +from benchmark.dataset import Dataset +from benchmark.engine import Engine + ScenarioReport = Dict[Text, Dict[Text, float]] @@ -13,21 +17,26 @@ class Scenario: """ @classmethod - def from_string(cls, scenario: Text, backend: "Backend") -> "Scenario": + def load_class(cls, scenario: Text) -> "Scenario": package_name, class_name = scenario.rsplit(".", maxsplit=1) module = importlib.import_module(package_name) clazz = getattr(module, class_name) - scenario = clazz(backend) + scenario = clazz() return scenario - def __init__(self, backend: "Backend"): - self.backend = backend + def __init__(self): self._kpis = defaultdict(lambda: defaultdict(list)) - def execute(self, engine: Text, dataset: Text): + def execute(self, backend: Backend, engine: Engine, dataset: Dataset): ... def collect_kpis(self, output: Union[Text, bytes]): + """ + Iterate through the output lines, extract the logged KPIs info and + combine them into the format of ScenarioReport. + :param output: + :return: + """ if isinstance(output, bytes): output = output.decode("utf-8") results = re.findall( @@ -39,4 +48,3 @@ def collect_kpis(self, output: Union[Text, bytes]): def process_results(self) -> ScenarioReport: # TODO: need to think about better structure return self._kpis - diff --git a/benchmark/types.py b/benchmark/types.py new file mode 100644 index 00000000..c9607963 --- /dev/null +++ b/benchmark/types.py @@ -0,0 +1,4 @@ +from pathlib import Path +from typing import Text, Union + +PathLike = Union[Text, Path] diff --git a/main.py b/main.py index b6429805..dcbcad6d 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,8 @@ import logging -from pathlib import Path from benchmark.backend.docker import DockerBackend +from benchmark.dataset import Dataset +from benchmark.engine import Engine from parser import parser from benchmark.scenario import Scenario @@ -13,11 +14,12 @@ # scenario decides how many instances of each type it requires. args = parser.parse_args() -current_dir = Path(__file__).parent -with DockerBackend(current_dir) as backend: +with DockerBackend() as backend: try: - scenario = Scenario.from_string(args.scenario, backend) - results = scenario.execute(args.engine, args.dataset) + engine = Engine.from_name(args.engine) + dataset = Dataset.from_name(args.dataset) + scenario = Scenario.load_class(args.scenario) + results = scenario.execute(backend, engine, dataset) # Iterate and display all the metrics # TODO: make the KPI metrics more configurable diff --git a/scenario/load.py b/scenario/load.py index c307c3de..1a98b6e7 100644 --- a/scenario/load.py +++ b/scenario/load.py @@ -1,10 +1,13 @@ import logging import time -from typing import Text +from benchmark.backend import Backend +from benchmark.dataset import Dataset +from benchmark.engine import Engine from benchmark.scenario import Scenario, ScenarioReport logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) class MeasureLoadTimeSingleClient(Scenario): @@ -15,17 +18,17 @@ class MeasureLoadTimeSingleClient(Scenario): WAIT_TIME_SEC = 5.0 - def execute(self, engine: Text, dataset: Text): + def execute(self, backend: Backend, engine: Engine, dataset: Dataset): # Initialize the server first, so the client can communicate with it - server = self.backend.initialize_server(engine) + server = backend.initialize_server(engine) server.run() while not server.is_ready(): time.sleep(self.WAIT_TIME_SEC) logger.debug("Initialized %s server", server) # Now create a single client instance - client = self.backend.initialize_client(engine) - client.mount(self.backend.root_dir / "dataset" / dataset, "/dataset") + client = backend.initialize_client(engine) + client.mount(dataset.path(), "/dataset") client.run() logger.debug("Initialized %s client", client)