Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add progress bars for main phases #109

Merged
merged 3 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions vsb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import logging
from pathlib import Path

import rich.console
import rich.progress
import rich.live

logger = logging.getLogger("vsb")
"""Logger for VSB. Messages will be written to the log file and console."""


log_dir: Path = None
"""Directory where logs will be written to. Set in main()"""

console: rich.console.Console = None

progress: rich.progress.Progress = None
"""
Progress bar for the current task. Only created for non-Worker processes
(i.e. LocalRunner if non-distributed; Master if distributed).
"""

live: rich.live.Live = None
15 changes: 13 additions & 2 deletions vsb/databases/pgvector/pgvector.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import logging

import numpy as np
import pgvector.psycopg
import psycopg
from psycopg.types.json import Jsonb

import vsb
from .filter_util import FilterUtil
from ..base import DB, Namespace
from ...vsb_types import Record, DistanceMetric, RecordList, SearchRequest
Expand Down Expand Up @@ -110,6 +109,11 @@ def initialize_population(self):

def finalize_population(self, record_count: int):
# Create index.

if vsb.progress:
create_index_id = vsb.progress.add_task(
f" Create pgvector index ({self.index_type})", total=None
)
sql = (
f"CREATE INDEX IF NOT EXISTS {self.table}_embedding_idx ON "
f"{self.table} USING {self.index_type} (embedding "
Expand All @@ -119,6 +123,13 @@ def finalize_population(self, record_count: int):
case "ivfflat":
sql += f" WITH (lists = {self.ivfflat_lists})"
self.conn.execute(sql)
if vsb.progress:
vsb.progress.update(
create_index_id,
description=f" ✔ pgvector index ({self.index_type}) created",
total=1,
completed=1,
)

@staticmethod
def _get_distance_func(metric: DistanceMetric) -> str:
Expand Down
10 changes: 10 additions & 0 deletions vsb/databases/pinecone/pinecone.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

import vsb
from vsb import logger
from pinecone import PineconeException
from pinecone.grpc import PineconeGRPC, GRPCIndex
Expand Down Expand Up @@ -107,12 +108,21 @@ def initialize_population(self):
def finalize_population(self, record_count: int):
"""Wait until all records are visible in the index"""
logger.debug(f"PineconeDB: Waiting for record count to reach {record_count}")
if vsb.progress:
finalize_id = vsb.progress.add_task(
"- Finalize population", total=record_count
)
while True:
index_count = self.index.describe_index_stats()["total_vector_count"]
if vsb.progress:
vsb.progress.update(finalize_id, completed=index_count)
if index_count >= record_count:
logger.debug(
f"PineconeDB: Index vector count reached {index_count}, "
f"finalize is complete"
)
break
time.sleep(1)

if vsb.progress:
vsb.progress.update(finalize_id, description=" ✔ Finalize population")
8 changes: 8 additions & 0 deletions vsb/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from locust.exception import StopUser

import vsb
from vsb.cmdline_args import add_vsb_cmdline_args
from vsb.databases import Database
from vsb.workloads import Workload
Expand Down Expand Up @@ -53,6 +54,13 @@ def on_locust_init(environment, **_kwargs):
environment, iter(range(num_users)), phase
)

if isinstance(environment.runner, WorkerRunner):
# In distributed mode, we only want to log problems to the console,
# (a) because things get noisy if we log the same info from multiple
# workers, and (b) because logs from non-master will corrupt the
# progress bar display.
vsb.logger.setLevel(logging.ERROR)


def setup_runner(env):
options = env.parsed_options
Expand Down
165 changes: 165 additions & 0 deletions vsb/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""
Logging support for VSB.
Includes setup code for logging and rich console output for progress bars etc.

Log messages from VSB should use the module-level `vsb.logger` for the actual
logger object - e.g.

from vsb import logger

logger.info("This is an info message")
"""

import io
import logging
import os
from datetime import datetime
from pathlib import Path
import rich.console
import rich.live
from rich.logging import RichHandler
from rich.progress import Progress
import rich.table
import vsb

logger = logging.getLogger("vsb")

progress_greenlet = None


class ExtraInfoColumn(rich.progress.ProgressColumn):
"""A custom rich.progress column which renders an extra_info field
of a task if the field is present, otherwise shows nothing.
extra_info field can include rich markup (e.g. [progress.description] etc).
"""

def render(self, task: rich.progress.Task) -> rich.text.Text:
# Check if the task has the extra_info field
if "extra_info" in task.fields:
return rich.text.Text.from_markup(task.fields["extra_info"])
return rich.text.Text()


class ProgressIOWrapper(io.IOBase):
"""A wrapper around a file-like object which updates a progress bar as data is
written to the file.
"""

def __init__(self, dest, total, progress, indent=0, *args, **kwargs):
"""Create a new ProgressIOWrapper object.
:param dest: The destination file-like object to write to.
:param total: The total number of bytes expected to be written (used to
percentage complete). Pass None if unknown - this won't show
a percentage complete, but will otherwise track progress.
:param progress: The Progress object to add a progress bar to. If None
then no progress bar will be shown.
:param indent: The number of spaces to indent the progress bar label
"""
self.path = dest
self.file = dest.open("wb")
self.progress = progress
if self.progress:
self.task_id = progress.add_task(
(" " * indent) + dest.parent.name + "/" + dest.name, total=total
)
super().__init__(*args, **kwargs)

def write(self, b):
# Write data to the base object
bytes_written = self.file.write(b)
if self.progress:
# Update the progress bar with the amount written
self.progress.update(self.task_id, advance=bytes_written)
return bytes_written

def flush(self):
return self.file.flush()

def close(self):
return self.file.close()

# Implement other necessary methods to fully comply with IO[bytes] interface
def seek(self, offset, whence=io.SEEK_SET):
return self.file.seek(offset, whence)

def tell(self):
return self.file.tell()

def read(self, n=-1):
return self.file.read(n)

def readable(self):
return self.file.readable()

def writable(self):
return self.file.writable()

def seekable(self):
return self.file.seekable()


def setup_logging(log_base: Path, level: str) -> Path:
level = level.upper()
# Setup the default logger to log to a file under
# <log_base>/<timestamp>/vsb.log,
# returning the directory created.
log_path = log_base / datetime.now().isoformat(timespec="seconds")
log_path.mkdir(parents=True)

file_handler = logging.FileHandler(log_path / "vsb.log")
file_handler.setLevel(level)
file_formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
file_handler.setFormatter(file_formatter)

# Configure the root logger to use the file handler
root_logger = logging.getLogger()
root_logger.setLevel(level)
root_logger.addHandler(file_handler)

# Setup a rich text console for log messages and progress bars etc.
# Set a fixed width of 300 to disable wrapping if not in a terminal (for CV
# so log lines we are checking for don't get wrapped).
width = None if os.getenv("TERM") else 300
vsb.console = rich.console.Console(width=width)
# Setup the specific logger for "vsb" to also log to stdout using RichHandler
rich_handler = RichHandler(
console=vsb.console,
log_time_format="%Y-%m-%dT%H:%M:%S%z",
omit_repeated_times=False,
show_path=False,
)
rich_handler.setLevel(level)
vsb.logger.setLevel(level)
vsb.logger.addHandler(rich_handler)

# And always logs errors to stdout (via RichHandler)
error_handler = RichHandler()
error_handler.setLevel(logging.ERROR)
root_logger.addHandler(error_handler)

return log_path


def make_progressbar() -> rich.progress.Progress:
"""Create a Progress object for use in displaying progress bars.
To display the progress of one or more tasks, call add_task() on the returned
object, then call update() or advance() to advance progress -

progress = make_progressbar()
task_id = progress.add_task("Task description", total=100)
progress.update(task_id, advance=1)
"""
progress = rich.progress.Progress(
rich.progress.TextColumn("[progress.description]{task.description}"),
rich.progress.MofNCompleteColumn(),
rich.progress.BarColumn(),
rich.progress.TaskProgressColumn(),
"[progress.elapsed]elapsed:",
rich.progress.TimeElapsedColumn(),
"[progress.remaining]remaining:",
rich.progress.TimeRemainingColumn(compact=True),
ExtraInfoColumn(),
console=vsb.console,
)
progress.start()
return progress
55 changes: 8 additions & 47 deletions vsb/main.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,14 @@
#!/usr/bin/env python3
import logging
import os

import sys
from datetime import datetime
from pathlib import Path

import configargparse
import locust.main
from rich.console import Console
from rich.logging import RichHandler

import vsb.metrics_tracker
import vsb
from vsb.cmdline_args import add_vsb_cmdline_args, validate_parsed_args


def setup_logging(log_base: Path, level: str) -> Path:
level = level.upper()
# Setup the default logger to log to a file under
# <log_base>/<timestamp>/vsb.log,
# returning the directory created.
log_path = log_base / datetime.now().isoformat(timespec="seconds")
log_path.mkdir(parents=True)

file_handler = logging.FileHandler(log_path / "vsb.log")
file_handler.setLevel(level)
file_formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
file_handler.setFormatter(file_formatter)

# Configure the root logger to use the file handler
root_logger = logging.getLogger()
root_logger.setLevel(level)
root_logger.addHandler(file_handler)

# Setup the specific logger for "vsb" to also log to stdout using RichHandler
# set fixed width of 300 to disable wrapping if not in a terminal (for CV
# so log lines we are checking for don't get wrapped).
width = None if os.getenv("TERM") else 300
rich_handler = RichHandler(
console=Console(width=width),
log_time_format="%Y-%m-%dT%H:%M:%S%z",
omit_repeated_times=False,
show_path=False,
)
rich_handler.setLevel(level)
vsb.logger.setLevel(level)
vsb.logger.addHandler(rich_handler)

# And always logs errors to stdout (via RichHandler)
error_handler = RichHandler()
error_handler.setLevel(logging.ERROR)
root_logger.addHandler(error_handler)

return log_path
from vsb.logging import logger, setup_logging


def main():
Expand Down Expand Up @@ -81,7 +38,11 @@ def main():

log_base = Path("reports") / args.database
vsb.log_dir = setup_logging(log_base=log_base, level=args.loglevel)
vsb.logger.info(f"Writing benchmark results to '{vsb.log_dir}'")
logger.info(
f"Vector Search Bench: Starting experiment with backend='{args.database}' and "
f"workload='{args.workload}'."
)
logger.info(f"Writing benchmark results to '{vsb.log_dir}'")

# If we got here then args are valid - pass them on to locusts' main(),
# appending the location of our locustfile and --headless to start
Expand Down
Loading