Skip to content

Commit

Permalink
Add progress bars for main phases
Browse files Browse the repository at this point in the history
Add progress bars for each of the main phases of an experiment -
Setup, Populate and Run. For Populate and Run we require the total
number of records / queries to show a useful end progress so far.

For the Populate phase we include the current rate of upsert. For the
Run phase we include the current latency and recall(*) values, these
require additional metrics of how many records have been upserted so
far (note that we generally upsert in batches, so the existing number
of Population requests is not sufficient.

(*) For recall we cannot calculate the current metric (last 10s), as
    we only have a single histogram accumulating the results - instead
    this shows the overall recall so far. There's an improvement
    raised to fix this (#108).
  • Loading branch information
daverigby committed Jun 17, 2024
1 parent e50cf87 commit 7490de6
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 76 deletions.
11 changes: 11 additions & 0 deletions vsb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import logging
from pathlib import Path

import rich.console
import rich.progress
import rich.live

logger = logging.getLogger("vsb")
"""Logger for VSB. Messages will be written to the log file and console."""


log_dir: Path = None
"""Directory where logs will be written to. Set in main()"""

console: rich.console.Console = None

progress: rich.progress.Progress = None
live: rich.live.Live = None
12 changes: 10 additions & 2 deletions vsb/databases/pgvector/pgvector.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import logging

import numpy as np
import pgvector.psycopg
import psycopg
from psycopg.types.json import Jsonb

import vsb
from .filter_util import FilterUtil
from ..base import DB, Namespace
from ...vsb_types import Record, DistanceMetric, RecordList, SearchRequest
Expand Down Expand Up @@ -110,6 +109,9 @@ def initialize_population(self):

def finalize_population(self, record_count: int):
# Create index.
create_index_id = vsb.progress.add_task(
f" Create pgvector index ({self.index_type})", total=None
)
sql = (
f"CREATE INDEX IF NOT EXISTS {self.table}_embedding_idx ON "
f"{self.table} USING {self.index_type} (embedding "
Expand All @@ -119,6 +121,12 @@ def finalize_population(self, record_count: int):
case "ivfflat":
sql += f" WITH (lists = {self.ivfflat_lists})"
self.conn.execute(sql)
vsb.progress.update(
create_index_id,
description=f" ✔ pgvector index ({self.index_type}) created",
total=1,
completed=1,
)

@staticmethod
def _get_distance_func(metric: DistanceMetric) -> str:
Expand Down
4 changes: 4 additions & 0 deletions vsb/databases/pinecone/pinecone.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

import vsb
from vsb import logger
from pinecone import PineconeException
from pinecone.grpc import PineconeGRPC, GRPCIndex
Expand Down Expand Up @@ -107,12 +108,15 @@ def initialize_population(self):
def finalize_population(self, record_count: int):
"""Wait until all records are visible in the index"""
logger.debug(f"PineconeDB: Waiting for record count to reach {record_count}")
finalize_id = vsb.progress.add_task("- Finalize population", total=record_count)
while True:
index_count = self.index.describe_index_stats()["total_vector_count"]
vsb.progress.update(finalize_id, completed=index_count)
if index_count >= record_count:
logger.debug(
f"PineconeDB: Index vector count reached {index_count}, "
f"finalize is complete"
)
vsb.progress.update(finalize_id, description=" ✔ Finalize population")
break
time.sleep(1)
8 changes: 8 additions & 0 deletions vsb/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from locust.exception import StopUser

import vsb
from vsb.cmdline_args import add_vsb_cmdline_args
from vsb.databases import Database
from vsb.workloads import Workload
Expand Down Expand Up @@ -53,6 +54,13 @@ def on_locust_init(environment, **_kwargs):
environment, iter(range(num_users)), phase
)

if isinstance(environment.runner, WorkerRunner):
# In distributed mode, we only want to log problems to the console,
# (a) because things get noisy if we log the same info from multiple
# workers, and (b) because logs from non-master will corrupt the
# progress bar display.
vsb.logger.setLevel(logging.ERROR)


def setup_runner(env):
options = env.parsed_options
Expand Down
162 changes: 162 additions & 0 deletions vsb/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
Logging support for VSB.
Includes setup code for logging and rich console output for progress bars etc.
Log messages from VSB should use the module-level `vsb.logger` for the actual
logger object - e.g.
from vsb import logger
logger.info("This is an info message")
"""

import io
import logging
import os
from datetime import datetime
from pathlib import Path
import rich.console
import rich.live
from rich.logging import RichHandler
from rich.progress import Progress
import rich.table
import vsb

logger = logging.getLogger("vsb")

progress_greenlet = None


class ExtraInfoColumn(rich.progress.ProgressColumn):
"""A custom rich.progress column which renders an extra_info field
of a task if the field is present, otherwise shows nothing.
extra_info field can include rich markup (e.g. [progress.description] etc).
"""

def render(self, task: rich.progress.Task) -> rich.text.Text:
# Check if the task has the extra_info field
if "extra_info" in task.fields:
return rich.text.Text.from_markup(task.fields["extra_info"])
return rich.text.Text()


class ProgressIOWrapper(io.IOBase):
"""A wrapper around a file-like object which updates a progress bar as data is
written to the file.
"""

def __init__(self, dest, total, progress, indent=0, *args, **kwargs):
"""Create a new ProgressIOWrapper object.
:param dest: The destination file-like object to write to.
:param total: The total number of bytes expected to be written (used to
percentage complete). Pass None if unknown - this won't show
a percentage complete, but will otherwise track progress.
:param progress: The Progress object to add a progress bar to.
:param indent: The number of spaces to indent the progress bar label
"""
self.path = dest
self.file = dest.open("wb")
self.task_id = progress.add_task(
(" " * indent) + dest.parent.name + "/" + dest.name, total=total
)
self.progress = progress
super().__init__(*args, **kwargs)

def write(self, b):
# Write data to the base object
bytes_written = self.file.write(b)
# Update the progress bar with the amount written
self.progress.update(self.task_id, advance=bytes_written)
return bytes_written

def flush(self):
return self.file.flush()

def close(self):
return self.file.close()

# Implement other necessary methods to fully comply with IO[bytes] interface
def seek(self, offset, whence=io.SEEK_SET):
return self.file.seek(offset, whence)

def tell(self):
return self.file.tell()

def read(self, n=-1):
return self.file.read(n)

def readable(self):
return self.file.readable()

def writable(self):
return self.file.writable()

def seekable(self):
return self.file.seekable()


def setup_logging(log_base: Path, level: str) -> Path:
level = level.upper()
# Setup the default logger to log to a file under
# <log_base>/<timestamp>/vsb.log,
# returning the directory created.
log_path = log_base / datetime.now().isoformat(timespec="seconds")
log_path.mkdir(parents=True)

file_handler = logging.FileHandler(log_path / "vsb.log")
file_handler.setLevel(level)
file_formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
file_handler.setFormatter(file_formatter)

# Configure the root logger to use the file handler
root_logger = logging.getLogger()
root_logger.setLevel(level)
root_logger.addHandler(file_handler)

# Setup a rich text console for log messages and progress bars etc.
# Set a fixed width of 300 to disable wrapping if not in a terminal (for CV
# so log lines we are checking for don't get wrapped).
width = None if os.getenv("TERM") else 300
vsb.console = rich.console.Console(width=width)
# Setup the specific logger for "vsb" to also log to stdout using RichHandler
rich_handler = RichHandler(
console=vsb.console,
log_time_format="%Y-%m-%dT%H:%M:%S%z",
omit_repeated_times=False,
show_path=False,
)
rich_handler.setLevel(level)
vsb.logger.setLevel(level)
vsb.logger.addHandler(rich_handler)

# And always logs errors to stdout (via RichHandler)
error_handler = RichHandler()
error_handler.setLevel(logging.ERROR)
root_logger.addHandler(error_handler)

return log_path


def make_progressbar() -> rich.progress.Progress:
"""Create a Progress object for use in displaying progress bars.
To display the progress of one or more tasks, call add_task() on the returned
object, then call update() or advance() to advance progress -
progress = make_progressbar()
task_id = progress.add_task("Task description", total=100)
progress.update(task_id, advance=1)
"""
progress = rich.progress.Progress(
rich.progress.TextColumn("[progress.description]{task.description}"),
rich.progress.MofNCompleteColumn(),
rich.progress.BarColumn(),
rich.progress.TaskProgressColumn(),
"[progress.elapsed]elapsed:",
rich.progress.TimeElapsedColumn(),
"[progress.remaining]remaining:",
rich.progress.TimeRemainingColumn(compact=True),
ExtraInfoColumn(),
console=vsb.console,
)
progress.start()
return progress
55 changes: 8 additions & 47 deletions vsb/main.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,14 @@
#!/usr/bin/env python3
import logging
import os

import sys
from datetime import datetime
from pathlib import Path

import configargparse
import locust.main
from rich.console import Console
from rich.logging import RichHandler

import vsb.metrics_tracker
import vsb
from vsb.cmdline_args import add_vsb_cmdline_args, validate_parsed_args


def setup_logging(log_base: Path, level: str) -> Path:
level = level.upper()
# Setup the default logger to log to a file under
# <log_base>/<timestamp>/vsb.log,
# returning the directory created.
log_path = log_base / datetime.now().isoformat(timespec="seconds")
log_path.mkdir(parents=True)

file_handler = logging.FileHandler(log_path / "vsb.log")
file_handler.setLevel(level)
file_formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
file_handler.setFormatter(file_formatter)

# Configure the root logger to use the file handler
root_logger = logging.getLogger()
root_logger.setLevel(level)
root_logger.addHandler(file_handler)

# Setup the specific logger for "vsb" to also log to stdout using RichHandler
# set fixed width of 300 to disable wrapping if not in a terminal (for CV
# so log lines we are checking for don't get wrapped).
width = None if os.getenv("TERM") else 300
rich_handler = RichHandler(
console=Console(width=width),
log_time_format="%Y-%m-%dT%H:%M:%S%z",
omit_repeated_times=False,
show_path=False,
)
rich_handler.setLevel(level)
vsb.logger.setLevel(level)
vsb.logger.addHandler(rich_handler)

# And always logs errors to stdout (via RichHandler)
error_handler = RichHandler()
error_handler.setLevel(logging.ERROR)
root_logger.addHandler(error_handler)

return log_path
from vsb.logging import logger, setup_logging


def main():
Expand Down Expand Up @@ -81,7 +38,11 @@ def main():

log_base = Path("reports") / args.database
vsb.log_dir = setup_logging(log_base=log_base, level=args.loglevel)
vsb.logger.info(f"Writing benchmark results to '{vsb.log_dir}'")
logger.info(
f"Vector Search Bench: Starting experiment with backend='{args.database}' and "
f"workload='{args.workload}'."
)
logger.info(f"Writing benchmark results to '{vsb.log_dir}'")

# If we got here then args are valid - pass them on to locusts' main(),
# appending the location of our locustfile and --headless to start
Expand Down
Loading

0 comments on commit 7490de6

Please sign in to comment.