Skip to content

Commit

Permalink
Add class representations for dataset and engine
Browse files Browse the repository at this point in the history
  • Loading branch information
kacperlukawski committed Jul 13, 2022
1 parent 89f2e7c commit ebb9ccd
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 90 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,4 @@ load::time = 0.0052424
different datasets? For simpler cases that may work, but there might be some
specific problems that won't be possible for each dataset.
4. How do we handle engine errors?
5. Dataset should be also represented by a class instance:
- that will give a possibility to not assume the filenames in scenario
- it will be easier to deal with paths

The dataset should also have a file-based config, like engine.
5. The dataset should also have a file-based config, like engine.
5 changes: 5 additions & 0 deletions benchmark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from pathlib import Path

# Base directory point to the main directory of the project, so all the data
# loaded from files can refer to it as a root directory
BASE_DIRECTORY = Path(__file__).parent.parent
36 changes: 32 additions & 4 deletions benchmark/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import abc
import tempfile
from pathlib import Path
from typing import Text, Union
from typing import Text, Union, Generator

PathLike = Union[Text, Path]
from benchmark.engine import Engine
from benchmark.types import PathLike


class Container(abc.ABC):
Expand All @@ -12,13 +13,40 @@ class Container(abc.ABC):
server or client of the engine.
"""

def __init__(self):
self.volumes = []

def mount(self, source: PathLike, target: PathLike):
"""
Add provided source path as a target in the container to be mounted
when .run method is called.
:param source:
:param target:
:return:
"""
self.volumes.append(f"{source}:{target}")

def run(self):
"""
Start the container using the backend
:return:
"""
...

def remove(self):
"""
Stop and remove the container using backend
:return:
"""
...

def logs(self) -> Generator[Union[Text, bytes], None, None]:
"""
Iterate through all the logs produced by the container
:return:
"""
...

def is_ready(self) -> bool:
"""
A healthcheck, making sure the container is properly set up.
Expand Down Expand Up @@ -63,8 +91,8 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.temp_dir.__exit__(exc_type, exc_val, exc_tb)

def initialize_server(self, engine: Text) -> Server:
def initialize_server(self, engine: Engine) -> Server:
...

def initialize_client(self, engine: Text) -> Client:
def initialize_client(self, engine: Engine) -> Client:
...
95 changes: 30 additions & 65 deletions benchmark/backend/docker.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,37 @@
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Text, Union, Optional, Dict, List
from typing import Text, Union, Optional, List, Generator

from benchmark.backend import Backend, PathLike, Server, Client, Container
from benchmark import BASE_DIRECTORY
from benchmark.backend import Backend, Server, Client, Container
from docker.models import containers

import logging
import docker

from benchmark.engine import Engine, ContainerConf
from benchmark.types import PathLike

logger = logging.getLogger(__name__)


@dataclass
class DockerContainerConf:
engine: Text
image: Optional[Text] = None
dockerfile: Optional[Text] = None
environment: Optional[Dict[Text, Union[Text, int, bool]]] = None
main: Optional[Text] = None
hostname: Optional[Text] = None

@classmethod
def from_file(
cls, path: Text, engine: Text, container: Text = "server"
) -> "DockerContainerConf":
with open(path, "r") as fp:
conf = json.load(fp)
return DockerContainerConf(engine=engine, **conf[container])

def dockerfile_path(self, root_dir: Path) -> Path:
"""
Calculates the absolute path to the directory containing the dockerfile,
using given root directory as a base.
:param root_dir:
:return:
"""
return root_dir / "engine" / self.engine


class DockerContainer(Container):
def __init__(
self,
container_conf: DockerContainerConf,
container_conf: ContainerConf,
docker_backend: "DockerBackend",
):
super().__init__()
self.container_conf = container_conf
self.docker_backend = docker_backend
self.container: containers.Container = None
self.volumes = []

def mount(self, source: PathLike, target: PathLike):
self.volumes.append(f"{source}:{target}")
self._docker_backend = docker_backend
self._docker_container: containers.Container = None

def run(self):
# Build the dockerfile if it was provided as a container image. This is
# typically done for the clients, as they may require some custom setup
if self.container_conf.dockerfile is not None:
dockerfile_path = self.container_conf.dockerfile_path(
self.docker_backend.root_dir
self._docker_backend.root_dir
)
image, logs = self.docker_backend.docker_client.images.build(
image, logs = self._docker_backend.docker_client.images.build(
path=str(dockerfile_path),
dockerfile=self.container_conf.dockerfile,
)
Expand All @@ -75,23 +45,30 @@ def run(self):
# Create the container either using the image or dockerfile, if that was
# provided. The dockerfile has a preference over the image name.
logger.debug("Running a container using image %s", self.container_conf.image)
self.container = self.docker_backend.docker_client.containers.run(
self._docker_container = self._docker_backend.docker_client.containers.run(
self.container_conf.image,
detach=True,
volumes=self.volumes,
environment=self.container_conf.environment,
hostname=self.container_conf.hostname,
network=self.docker_backend.network.name,
network=self._docker_backend.network.name,
)

# TODO: remove the image on exit

def logs(self):
for log_entry in self.container.logs(stream=True, follow=True):
def remove(self):
# Sometimes the container has been created but not launched, so the
# underlying Docker container won't be created
if self._docker_container is not None:
self._docker_container.stop()
self._docker_container.remove()

def logs(self) -> Generator[Union[Text, bytes], None, None]:
for log_entry in self._docker_container.logs(stream=True, follow=True):
yield log_entry

def is_ready(self) -> bool:
# TODO: implement the healthcheck
# TODO: implement the healthcheck, but probably on engine level
return True


Expand All @@ -102,7 +79,7 @@ class DockerServer(Server, DockerContainer):
class DockerClient(Client, DockerContainer):
def load_data(self, filename: Text):
command = f"{self.container_conf.main} load {filename}"
_, generator = self.container.exec_run(command, stream=True)
_, generator = self._docker_container.exec_run(command, stream=True)
return generator


Expand All @@ -116,7 +93,7 @@ class DockerBackend(Backend):

def __init__(
self,
root_dir: Union[PathLike],
root_dir: Union[PathLike] = BASE_DIRECTORY,
docker_client: Optional[docker.DockerClient] = None,
):
super().__init__(root_dir)
Expand All @@ -137,32 +114,20 @@ def __exit__(self, exc_type, exc_val, exc_tb):
# Kill all the containers on the context manager exit, so there are no
# orphaned containers once the benchmark is finished
for container in self.containers:
container.container.kill()

# Remove the data volume as well, so there won't be any volume left
# self.data_volume.remove()
container.remove()

# Finally get rid of the network as well
self.network.remove()

def initialize_server(self, engine: Text) -> Server:
server_conf = DockerContainerConf.from_file(
self.root_dir / "engine" / engine / "config.json",
engine=engine,
container="server",
)
def initialize_server(self, engine: Engine) -> Server:
server_conf = engine.get_config("server")
logger.info("Initializing %s server: %s", engine, server_conf)
server = DockerServer(server_conf, self)
self.containers.append(server)
return server

def initialize_client(self, engine: Text) -> Client:
# TODO: Create a docker volume so the data is available on client instances
client_conf = DockerContainerConf.from_file(
self.root_dir / "engine" / engine / "config.json",
engine=engine,
container="client",
)
def initialize_client(self, engine: Engine) -> Client:
client_conf = engine.get_config("client")
logger.info("Initializing %s client: %s", engine, client_conf)
client = DockerClient(client_conf, self)
self.containers.append(client)
Expand Down
18 changes: 18 additions & 0 deletions benchmark/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pathlib import Path
from typing import Text

from benchmark import BASE_DIRECTORY


class Dataset:

@classmethod
def from_name(cls, name: Text) -> "Dataset":
# TODO: load dataset info from given path
return Dataset(name)

def __init__(self, name: Text):
self.name = name

def path(self) -> Path:
return BASE_DIRECTORY / "dataset" / self.name
46 changes: 46 additions & 0 deletions benchmark/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Text, Optional, Dict, Union

from benchmark import BASE_DIRECTORY


@dataclass
class ContainerConf:
engine: Text
image: Optional[Text] = None
dockerfile: Optional[Text] = None
environment: Optional[Dict[Text, Union[Text, int, bool]]] = None
main: Optional[Text] = None
hostname: Optional[Text] = None

def dockerfile_path(self, root_dir: Path) -> Path:
"""
Calculates the absolute path to the directory containing the dockerfile,
using given root directory as a base.
:param root_dir:
:return:
"""
return BASE_DIRECTORY / "engine" / self.engine


class Engine:
"""
An abstraction over vector database engine.
"""

@classmethod
def from_name(cls, name: Text) -> "Engine":
container_configs = {}
with open(BASE_DIRECTORY / "engine" / name / "config.json", "r") as fp:
config = json.load(fp)
for container_name, conf in config.items():
container_configs[container_name] = ContainerConf(engine=name, **conf)
return Engine(container_configs)

def __init__(self, container_configs: Dict[Text, ContainerConf]):
self.container_configs = container_configs

def get_config(self, container_name: Text) -> ContainerConf:
return self.container_configs.get(container_name)
20 changes: 14 additions & 6 deletions benchmark/scenario/__init__.py → benchmark/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from collections import defaultdict
from typing import Text, Union, Dict

from benchmark.backend import Backend
from benchmark.dataset import Dataset
from benchmark.engine import Engine

ScenarioReport = Dict[Text, Dict[Text, float]]


Expand All @@ -13,21 +17,26 @@ class Scenario:
"""

@classmethod
def from_string(cls, scenario: Text, backend: "Backend") -> "Scenario":
def load_class(cls, scenario: Text) -> "Scenario":
package_name, class_name = scenario.rsplit(".", maxsplit=1)
module = importlib.import_module(package_name)
clazz = getattr(module, class_name)
scenario = clazz(backend)
scenario = clazz()
return scenario

def __init__(self, backend: "Backend"):
self.backend = backend
def __init__(self):
self._kpis = defaultdict(lambda: defaultdict(list))

def execute(self, engine: Text, dataset: Text):
def execute(self, backend: Backend, engine: Engine, dataset: Dataset):
...

def collect_kpis(self, output: Union[Text, bytes]):
"""
Iterate through the output lines, extract the logged KPIs info and
combine them into the format of ScenarioReport.
:param output:
:return:
"""
if isinstance(output, bytes):
output = output.decode("utf-8")
results = re.findall(
Expand All @@ -39,4 +48,3 @@ def collect_kpis(self, output: Union[Text, bytes]):
def process_results(self) -> ScenarioReport:
# TODO: need to think about better structure
return self._kpis

4 changes: 4 additions & 0 deletions benchmark/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from pathlib import Path
from typing import Text, Union

PathLike = Union[Text, Path]
12 changes: 7 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
from pathlib import Path

from benchmark.backend.docker import DockerBackend
from benchmark.dataset import Dataset
from benchmark.engine import Engine
from parser import parser
from benchmark.scenario import Scenario

Expand All @@ -13,11 +14,12 @@
# scenario decides how many instances of each type it requires.

args = parser.parse_args()
current_dir = Path(__file__).parent
with DockerBackend(current_dir) as backend:
with DockerBackend() as backend:
try:
scenario = Scenario.from_string(args.scenario, backend)
results = scenario.execute(args.engine, args.dataset)
engine = Engine.from_name(args.engine)
dataset = Dataset.from_name(args.dataset)
scenario = Scenario.load_class(args.scenario)
results = scenario.execute(backend, engine, dataset)

# Iterate and display all the metrics
# TODO: make the KPI metrics more configurable
Expand Down
Loading

0 comments on commit ebb9ccd

Please sign in to comment.