From 3072933230a97972a57e39be30a2131ff1107966 Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Thu, 13 Jun 2024 16:48:41 +0100 Subject: [PATCH 1/3] Logging: Use vsb.logger Use vsb.logger instead of the global 'logging' methods, as we only show messages from the vsb logger to stdout (everything still goes to the vsg.log file). --- vsb/users.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vsb/users.py b/vsb/users.py index 888e371..848acd1 100644 --- a/vsb/users.py +++ b/vsb/users.py @@ -1,4 +1,3 @@ -import logging import time import traceback from enum import Enum, auto From e50cf87868f2e4b5af6a8ec382aed1fefbd3bf6e Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Fri, 14 Jun 2024 14:18:17 +0100 Subject: [PATCH 2/3] Add VectorWorkload.request_count property Add a new VectorWorkload.request_count property to all workloads, which specified how many requests are in the query dataset. This will be used to show a progress bar for the Run phase - the MasterRunner needs to know the total number of requests for the progress bar's denominator. --- vsb/workloads/base.py | 8 ++++++++ vsb/workloads/mnist/mnist.py | 8 ++++++++ vsb/workloads/nq_768_tasb/nq_768_tasb.py | 10 +++++++++- vsb/workloads/yfcc/yfcc.py | 8 ++++++++ 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/vsb/workloads/base.py b/vsb/workloads/base.py index 1d676db..b9a46d9 100644 --- a/vsb/workloads/base.py +++ b/vsb/workloads/base.py @@ -38,6 +38,14 @@ def record_count(self) -> int: """ raise NotImplementedError + @property + @abstractmethod + def request_count(self) -> int: + """ + The number of requests in the Run phase of the test. + """ + raise NotImplementedError + @abstractmethod def get_sample_record(self) -> Record: """ diff --git a/vsb/workloads/mnist/mnist.py b/vsb/workloads/mnist/mnist.py index cb9e6b5..dfc863d 100644 --- a/vsb/workloads/mnist/mnist.py +++ b/vsb/workloads/mnist/mnist.py @@ -22,6 +22,10 @@ def __init__(self, name: str, cache_dir: str): def record_count(self) -> int: return 60000 + @property + def request_count(self) -> int: + return 10_000 + class MnistTest(MnistBase): """Reduced, "test" variant of mnist; with 1% of the full dataset (600 @@ -33,3 +37,7 @@ def __init__(self, name: str, cache_dir: str): @property def record_count(self) -> int: return 600 + + @property + def request_count(self) -> int: + return 20 diff --git a/vsb/workloads/nq_768_tasb/nq_768_tasb.py b/vsb/workloads/nq_768_tasb/nq_768_tasb.py index b1e9ea9..ca747ba 100644 --- a/vsb/workloads/nq_768_tasb/nq_768_tasb.py +++ b/vsb/workloads/nq_768_tasb/nq_768_tasb.py @@ -20,7 +20,11 @@ def __init__(self, name: str, cache_dir: str): @property def record_count(self) -> int: - return 2680893 + return 2_680_893 + + @property + def request_count(self) -> int: + 3_452 class Nq768TasbTest(Nq768TasbBase): @@ -34,3 +38,7 @@ def __init__(self, name: str, cache_dir: str): @property def record_count(self) -> int: return 26809 + + @property + def request_count(self) -> int: + 35 diff --git a/vsb/workloads/yfcc/yfcc.py b/vsb/workloads/yfcc/yfcc.py index 3568e21..54d76e5 100644 --- a/vsb/workloads/yfcc/yfcc.py +++ b/vsb/workloads/yfcc/yfcc.py @@ -24,6 +24,10 @@ def __init__(self, name: str, cache_dir: str): def record_count(self) -> int: return 10_000_000 + @property + def request_count(self) -> int: + return 100_000 + class YFCCTest(YFCCBase): """Reduced, "test" variant of YFCC; with ~0.1% of the full dataset / 0.5% @@ -41,3 +45,7 @@ def __init__(self, name: str, cache_dir: str): @property def record_count(self) -> int: return 10_000 + + @property + def request_count(self) -> int: + return 500 From 619e2c0481bed8056d6b8511f522d76058c5e60f Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Fri, 14 Jun 2024 18:21:58 +0100 Subject: [PATCH 3/3] Add progress bars for main phases Add progress bars for each of the main phases of an experiment - Setup, Populate and Run. For Populate and Run we require the total number of records / queries to show a useful end progress so far. For the Populate phase we include the current rate of upsert. For the Run phase we include the current latency and recall(*) values, these require additional metrics of how many records have been upserted so far (note that we generally upsert in batches, so the existing number of Population requests is not sufficient. (*) For recall we cannot calculate the current metric (last 10s), as we only have a single histogram accumulating the results - instead this shows the overall recall so far. There's an improvement raised to fix this (#108). --- vsb/__init__.py | 16 +++ vsb/databases/pgvector/pgvector.py | 15 ++- vsb/databases/pinecone/pinecone.py | 10 ++ vsb/locustfile.py | 8 ++ vsb/logging.py | 165 +++++++++++++++++++++++++++++ vsb/main.py | 55 ++-------- vsb/metrics_tracker.py | 75 +++++++++---- vsb/users.py | 100 ++++++++++++++++- vsb/vsb_types.py | 3 + vsb/workloads/dataset.py | 14 ++- 10 files changed, 385 insertions(+), 76 deletions(-) create mode 100644 vsb/logging.py diff --git a/vsb/__init__.py b/vsb/__init__.py index a9b9824..2810fca 100644 --- a/vsb/__init__.py +++ b/vsb/__init__.py @@ -1,7 +1,23 @@ import logging from pathlib import Path +import rich.console +import rich.progress +import rich.live + logger = logging.getLogger("vsb") +"""Logger for VSB. Messages will be written to the log file and console.""" + log_dir: Path = None """Directory where logs will be written to. Set in main()""" + +console: rich.console.Console = None + +progress: rich.progress.Progress = None +""" +Progress bar for the current task. Only created for non-Worker processes +(i.e. LocalRunner if non-distributed; Master if distributed). +""" + +live: rich.live.Live = None diff --git a/vsb/databases/pgvector/pgvector.py b/vsb/databases/pgvector/pgvector.py index 1255906..6e93bd6 100644 --- a/vsb/databases/pgvector/pgvector.py +++ b/vsb/databases/pgvector/pgvector.py @@ -1,10 +1,9 @@ -import logging - import numpy as np import pgvector.psycopg import psycopg from psycopg.types.json import Jsonb +import vsb from .filter_util import FilterUtil from ..base import DB, Namespace from ...vsb_types import Record, DistanceMetric, RecordList, SearchRequest @@ -110,6 +109,11 @@ def initialize_population(self): def finalize_population(self, record_count: int): # Create index. + + if vsb.progress: + create_index_id = vsb.progress.add_task( + f" Create pgvector index ({self.index_type})", total=None + ) sql = ( f"CREATE INDEX IF NOT EXISTS {self.table}_embedding_idx ON " f"{self.table} USING {self.index_type} (embedding " @@ -119,6 +123,13 @@ def finalize_population(self, record_count: int): case "ivfflat": sql += f" WITH (lists = {self.ivfflat_lists})" self.conn.execute(sql) + if vsb.progress: + vsb.progress.update( + create_index_id, + description=f" ✔ pgvector index ({self.index_type}) created", + total=1, + completed=1, + ) @staticmethod def _get_distance_func(metric: DistanceMetric) -> str: diff --git a/vsb/databases/pinecone/pinecone.py b/vsb/databases/pinecone/pinecone.py index bbbfad1..f6ad457 100644 --- a/vsb/databases/pinecone/pinecone.py +++ b/vsb/databases/pinecone/pinecone.py @@ -1,5 +1,6 @@ import logging +import vsb from vsb import logger from pinecone import PineconeException from pinecone.grpc import PineconeGRPC, GRPCIndex @@ -107,8 +108,14 @@ def initialize_population(self): def finalize_population(self, record_count: int): """Wait until all records are visible in the index""" logger.debug(f"PineconeDB: Waiting for record count to reach {record_count}") + if vsb.progress: + finalize_id = vsb.progress.add_task( + "- Finalize population", total=record_count + ) while True: index_count = self.index.describe_index_stats()["total_vector_count"] + if vsb.progress: + vsb.progress.update(finalize_id, completed=index_count) if index_count >= record_count: logger.debug( f"PineconeDB: Index vector count reached {index_count}, " @@ -116,3 +123,6 @@ def finalize_population(self, record_count: int): ) break time.sleep(1) + + if vsb.progress: + vsb.progress.update(finalize_id, description=" ✔ Finalize population") diff --git a/vsb/locustfile.py b/vsb/locustfile.py index fc5ad52..b30dcee 100644 --- a/vsb/locustfile.py +++ b/vsb/locustfile.py @@ -11,6 +11,7 @@ from locust.exception import StopUser +import vsb from vsb.cmdline_args import add_vsb_cmdline_args from vsb.databases import Database from vsb.workloads import Workload @@ -53,6 +54,13 @@ def on_locust_init(environment, **_kwargs): environment, iter(range(num_users)), phase ) + if isinstance(environment.runner, WorkerRunner): + # In distributed mode, we only want to log problems to the console, + # (a) because things get noisy if we log the same info from multiple + # workers, and (b) because logs from non-master will corrupt the + # progress bar display. + vsb.logger.setLevel(logging.ERROR) + def setup_runner(env): options = env.parsed_options diff --git a/vsb/logging.py b/vsb/logging.py new file mode 100644 index 0000000..c47d54d --- /dev/null +++ b/vsb/logging.py @@ -0,0 +1,165 @@ +""" +Logging support for VSB. +Includes setup code for logging and rich console output for progress bars etc. + +Log messages from VSB should use the module-level `vsb.logger` for the actual +logger object - e.g. + + from vsb import logger + + logger.info("This is an info message") +""" + +import io +import logging +import os +from datetime import datetime +from pathlib import Path +import rich.console +import rich.live +from rich.logging import RichHandler +from rich.progress import Progress +import rich.table +import vsb + +logger = logging.getLogger("vsb") + +progress_greenlet = None + + +class ExtraInfoColumn(rich.progress.ProgressColumn): + """A custom rich.progress column which renders an extra_info field + of a task if the field is present, otherwise shows nothing. + extra_info field can include rich markup (e.g. [progress.description] etc). + """ + + def render(self, task: rich.progress.Task) -> rich.text.Text: + # Check if the task has the extra_info field + if "extra_info" in task.fields: + return rich.text.Text.from_markup(task.fields["extra_info"]) + return rich.text.Text() + + +class ProgressIOWrapper(io.IOBase): + """A wrapper around a file-like object which updates a progress bar as data is + written to the file. + """ + + def __init__(self, dest, total, progress, indent=0, *args, **kwargs): + """Create a new ProgressIOWrapper object. + :param dest: The destination file-like object to write to. + :param total: The total number of bytes expected to be written (used to + percentage complete). Pass None if unknown - this won't show + a percentage complete, but will otherwise track progress. + :param progress: The Progress object to add a progress bar to. If None + then no progress bar will be shown. + :param indent: The number of spaces to indent the progress bar label + """ + self.path = dest + self.file = dest.open("wb") + self.progress = progress + if self.progress: + self.task_id = progress.add_task( + (" " * indent) + dest.parent.name + "/" + dest.name, total=total + ) + super().__init__(*args, **kwargs) + + def write(self, b): + # Write data to the base object + bytes_written = self.file.write(b) + if self.progress: + # Update the progress bar with the amount written + self.progress.update(self.task_id, advance=bytes_written) + return bytes_written + + def flush(self): + return self.file.flush() + + def close(self): + return self.file.close() + + # Implement other necessary methods to fully comply with IO[bytes] interface + def seek(self, offset, whence=io.SEEK_SET): + return self.file.seek(offset, whence) + + def tell(self): + return self.file.tell() + + def read(self, n=-1): + return self.file.read(n) + + def readable(self): + return self.file.readable() + + def writable(self): + return self.file.writable() + + def seekable(self): + return self.file.seekable() + + +def setup_logging(log_base: Path, level: str) -> Path: + level = level.upper() + # Setup the default logger to log to a file under + # //vsb.log, + # returning the directory created. + log_path = log_base / datetime.now().isoformat(timespec="seconds") + log_path.mkdir(parents=True) + + file_handler = logging.FileHandler(log_path / "vsb.log") + file_handler.setLevel(level) + file_formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") + file_handler.setFormatter(file_formatter) + + # Configure the root logger to use the file handler + root_logger = logging.getLogger() + root_logger.setLevel(level) + root_logger.addHandler(file_handler) + + # Setup a rich text console for log messages and progress bars etc. + # Set a fixed width of 300 to disable wrapping if not in a terminal (for CV + # so log lines we are checking for don't get wrapped). + width = None if os.getenv("TERM") else 300 + vsb.console = rich.console.Console(width=width) + # Setup the specific logger for "vsb" to also log to stdout using RichHandler + rich_handler = RichHandler( + console=vsb.console, + log_time_format="%Y-%m-%dT%H:%M:%S%z", + omit_repeated_times=False, + show_path=False, + ) + rich_handler.setLevel(level) + vsb.logger.setLevel(level) + vsb.logger.addHandler(rich_handler) + + # And always logs errors to stdout (via RichHandler) + error_handler = RichHandler() + error_handler.setLevel(logging.ERROR) + root_logger.addHandler(error_handler) + + return log_path + + +def make_progressbar() -> rich.progress.Progress: + """Create a Progress object for use in displaying progress bars. + To display the progress of one or more tasks, call add_task() on the returned + object, then call update() or advance() to advance progress - + + progress = make_progressbar() + task_id = progress.add_task("Task description", total=100) + progress.update(task_id, advance=1) + """ + progress = rich.progress.Progress( + rich.progress.TextColumn("[progress.description]{task.description}"), + rich.progress.MofNCompleteColumn(), + rich.progress.BarColumn(), + rich.progress.TaskProgressColumn(), + "[progress.elapsed]elapsed:", + rich.progress.TimeElapsedColumn(), + "[progress.remaining]remaining:", + rich.progress.TimeRemainingColumn(compact=True), + ExtraInfoColumn(), + console=vsb.console, + ) + progress.start() + return progress diff --git a/vsb/main.py b/vsb/main.py index 310c059..2aeb94e 100755 --- a/vsb/main.py +++ b/vsb/main.py @@ -1,57 +1,14 @@ #!/usr/bin/env python3 -import logging -import os + import sys -from datetime import datetime from pathlib import Path import configargparse import locust.main -from rich.console import Console -from rich.logging import RichHandler -import vsb.metrics_tracker +import vsb from vsb.cmdline_args import add_vsb_cmdline_args, validate_parsed_args - - -def setup_logging(log_base: Path, level: str) -> Path: - level = level.upper() - # Setup the default logger to log to a file under - # //vsb.log, - # returning the directory created. - log_path = log_base / datetime.now().isoformat(timespec="seconds") - log_path.mkdir(parents=True) - - file_handler = logging.FileHandler(log_path / "vsb.log") - file_handler.setLevel(level) - file_formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") - file_handler.setFormatter(file_formatter) - - # Configure the root logger to use the file handler - root_logger = logging.getLogger() - root_logger.setLevel(level) - root_logger.addHandler(file_handler) - - # Setup the specific logger for "vsb" to also log to stdout using RichHandler - # set fixed width of 300 to disable wrapping if not in a terminal (for CV - # so log lines we are checking for don't get wrapped). - width = None if os.getenv("TERM") else 300 - rich_handler = RichHandler( - console=Console(width=width), - log_time_format="%Y-%m-%dT%H:%M:%S%z", - omit_repeated_times=False, - show_path=False, - ) - rich_handler.setLevel(level) - vsb.logger.setLevel(level) - vsb.logger.addHandler(rich_handler) - - # And always logs errors to stdout (via RichHandler) - error_handler = RichHandler() - error_handler.setLevel(logging.ERROR) - root_logger.addHandler(error_handler) - - return log_path +from vsb.logging import logger, setup_logging def main(): @@ -81,7 +38,11 @@ def main(): log_base = Path("reports") / args.database vsb.log_dir = setup_logging(log_base=log_base, level=args.loglevel) - vsb.logger.info(f"Writing benchmark results to '{vsb.log_dir}'") + logger.info( + f"Vector Search Bench: Starting experiment with backend='{args.database}' and " + f"workload='{args.workload}'." + ) + logger.info(f"Writing benchmark results to '{vsb.log_dir}'") # If we got here then args are valid - pass them on to locusts' main(), # appending the location of our locustfile and --headless to start diff --git a/vsb/metrics_tracker.py b/vsb/metrics_tracker.py index 7a4f3a2..5562d6e 100644 --- a/vsb/metrics_tracker.py +++ b/vsb/metrics_tracker.py @@ -12,6 +12,7 @@ import json import time +from collections import defaultdict from hdrh.histogram import HdrHistogram from locust import events @@ -23,9 +24,9 @@ # Calculated custom metrics for each performed operation. # Nested dict, where top-level is the request_type (Populate, Search, ...), under -# that there is an instance of HdrHistogram for each metric name ( +# that there is an instance of HdrHistogram or a plain int for each metric name ( # recall, ...) -calculated_metrics: dict[str, dict[str, HdrHistogram]] = {} +calculated_metrics: dict[str, dict[str, HdrHistogram | int]] = {} # Start and end times for each phase of the workload. phases: dict[str, str] = {} @@ -47,6 +48,27 @@ def get_histogram(request_type: str, metric: str) -> HdrHistogram: return req_type_metrics.setdefault(metric, HdrHistogram(1, 100_000, 3)) +def update_counter(request_type: str, metric: str, value: int) -> None: + """ + Update the counter for the given metric for the given request type, + creating a counter from zero histogram is the request type and/or metric is + not already recorded. + """ + req_type_metrics = calculated_metrics.setdefault(request_type, defaultdict(int)) + req_type_metrics[metric] += value + + +def get_metric_percentile(request_type: str, metric: str, percentile: float) -> float: + """ + Get the value of the given percentile for the given metric for the given + request type, or None if the metric is not recorded. + """ + if req_type_metrics := calculated_metrics.get(request_type, None): + if isinstance(value := req_type_metrics.get(metric, None), HdrHistogram): + return value.get_value_at_percentile(percentile) / HDR_SCALE_FACTOR + return None + + def get_stats_json(stats: RequestStats) -> str: """ Serialise locust's standard stats, then merge in our custom metrics. @@ -54,17 +76,20 @@ def get_stats_json(stats: RequestStats) -> str: serialized = stats.serialize_stats() for s in serialized: if custom := calculated_metrics.get(s["method"], None): - for metric, hist in custom.items(): - info = { - "min": hist.get_min_value() / HDR_SCALE_FACTOR, - "max": hist.get_max_value() / HDR_SCALE_FACTOR, - "mean": hist.get_mean_value() / HDR_SCALE_FACTOR, - "percentiles": {}, - } - for p in [1, 5, 25, 50, 90, 99, 99.9, 99.99]: - info["percentiles"][p] = ( - hist.get_value_at_percentile(p) / HDR_SCALE_FACTOR - ) + for metric, value in custom.items(): + if isinstance(value, HdrHistogram): + info = { + "min": value.get_min_value() / HDR_SCALE_FACTOR, + "max": value.get_max_value() / HDR_SCALE_FACTOR, + "mean": value.get_mean_value() / HDR_SCALE_FACTOR, + "percentiles": {}, + } + for p in [1, 5, 25, 50, 75, 90, 99, 99.9, 99.99]: + info["percentiles"][p] = ( + value.get_value_at_percentile(p) / HDR_SCALE_FACTOR + ) + else: + info = value s[metric] = info return json.dumps(serialized, indent=4) @@ -78,6 +103,9 @@ def on_request(request_type, name, response_time, response_length, **kwargs): if req_metrics := kwargs.get("metrics", None): for k, v in req_metrics.items(): get_histogram(request_type, k).record_value(v * HDR_SCALE_FACTOR) + if req_counters := kwargs.get("counters", None): + for k, v in req_counters.items(): + update_counter(request_type, k, v) @events.report_to_master.add_listener @@ -97,8 +125,11 @@ def on_report_to_master(client_id, data: dict()): # the master instance. serialized[req_type] = {} hist: HdrHistogram - for metric, hist in metrics.items(): - serialized[req_type][metric] = hist.encode() + for name, value in metrics.items(): + if isinstance(value, HdrHistogram): + serialized[req_type][name] = value.encode() + else: + serialized[req_type][name] = value data["metrics"] = serialized calculated_metrics.clear() @@ -110,16 +141,18 @@ def on_worker_report(client_id, data: dict()): from a worker. Here we add our metrics to the master's aggregated stats dict. """ - logger.debug(f"metrics.on_worker_report(): data:{data}") for req_type, metrics in data["metrics"].items(): - # Decode the base64 string back to an HdrHistogram, then add to the master's - # stats. - for metric_name, base64_histo in metrics.items(): - get_histogram(req_type, metric_name).decode_and_add(base64_histo) + for metric_name, value in metrics.items(): + if isinstance(value, bytes): + # Decode the base64 string back to an HdrHistogram, then add to + # the master's stats. + get_histogram(req_type, metric_name).decode_and_add(value) + else: + update_counter(req_type, metric_name, value) @events.quitting.add_listener -def on_quitting(environment): +def print_metrics_on_quitting(environment): # Emit stats once on the master (if running in distributed mode) or # once on the LocalRunner (if running in single-process mode). if not isinstance(environment.runner, WorkerRunner): diff --git a/vsb/users.py b/vsb/users.py index 848acd1..ae33d99 100644 --- a/vsb/users.py +++ b/vsb/users.py @@ -2,9 +2,13 @@ import traceback from enum import Enum, auto +import rich.progress from locust import User, task, LoadTestShape from locust.exception import StopUser +import locust.stats +import vsb +import vsb.logging from vsb import metrics, metrics_tracker from vsb.databases import DB from vsb.vsb_types import RecordList, SearchRequest @@ -39,7 +43,13 @@ def setup(self): case self.State.Active: self.database.initialize_population() self.environment.runner.send_message( - "update_progress", {"user": 0, "phase": "setup"} + "update_progress", + { + "user": 0, + "phase": "setup", + "record_count": self.environment.workload.record_count, + "request_count": self.environment.workload.request_count, + }, ) self.state = self.State.Done case self.State.Done: @@ -108,6 +118,7 @@ def do_load(self): name=self.workload.name, response_time=elapsed_ms, response_length=0, + counters={"records": len(vectors)}, ) except StopIteration: logger.debug(f"User id:{self.user_id} completed Populate phase") @@ -230,6 +241,8 @@ class Phase(Enum): def __init__(self): super().__init__() self.phase = LoadShape.Phase.Init + self.record_count: int = None + self.progress_task_id: rich.progress.TaskID = None self.num_users = 0 self.skip_populate = False self.completed_users = {"populate": set(), "run": set()} @@ -247,15 +260,14 @@ def tick(self): # self.runner is not initialised until after __init__(), so we must # lazily register our message handler and other information from # self.runner on the first tick() call. - self.runner.environment.runner.register_message( - "update_progress", self.on_update_progress - ) + self.runner.register_message("update_progress", self.on_update_progress) parsed_opts = self.runner.environment.parsed_options self.num_users = parsed_opts.num_users self.skip_populate = parsed_opts.skip_populate self._transition_phase(LoadShape.Phase.Setup) return self.tick() case LoadShape.Phase.Setup: + vsb.progress.update(self.progress_task_id, total=1) return 1, 1, [SetupUser] case LoadShape.Phase.TransitionFromSetup: if self.get_current_user_count() == 0: @@ -269,6 +281,7 @@ def tick(self): return self.tick() return 0, self.num_users, [] case LoadShape.Phase.Populate: + self._update_progress_bar() return self.num_users, self.num_users, [PopulateUser] case LoadShape.Phase.TransitionToRun: if self.get_current_user_count() == 0: @@ -278,6 +291,7 @@ def tick(self): return self.tick() return 0, self.num_users, [] case LoadShape.Phase.Run: + self._update_progress_bar() return self.num_users, self.num_users, [RunUser] case LoadShape.Phase.Done: return None @@ -291,11 +305,24 @@ def _transition_phase(self, new: Phase): LoadShape.Phase.Populate, LoadShape.Phase.Run, ] + if vsb.progress is not None: + self._update_progress_bar(mark_completed=True) + vsb.progress.update( + self.progress_task_id, description=f"✔ {self.phase.name} complete" + ) + vsb.progress.stop() + vsb.progress = None if self.phase in tracked_phases: metrics_tracker.record_phase_end(self.phase.name) self.phase = new if self.phase in tracked_phases: metrics_tracker.record_phase_start(self.phase.name) + # Started a new phase - create a progress object for it (which will + # display progress bars for each task on screen) + vsb.progress = vsb.logging.make_progressbar() + self.progress_task_id = vsb.progress.add_task( + f"Performing {self.phase.name} phase", total=None + ) def on_update_progress(self, msg, **kwargs): # Fired when VSBLoadShape (running on the master) receives an @@ -306,8 +333,12 @@ def on_update_progress(self, msg, **kwargs): match self.phase: case LoadShape.Phase.Setup: assert msg.data["phase"] == "setup" + self.record_count = msg.data["record_count"] + self.request_count = msg.data["request_count"] logger.debug( - f"VSBLoadShape.update_progress() - SetupUser completed - " + f"VSBLoadShape.update_progress() - SetupUser completed with " + f"record_count={self.record_count}, request_count=" + f"{self.request_count} - " f"moving to TransitionFromSetup phase" ) self._transition_phase(LoadShape.Phase.TransitionFromSetup) @@ -347,3 +378,62 @@ def on_update_progress(self, msg, **kwargs): logger.error( f"VSBLoadShape.update_progress() - Unexpected progress update in Done phase!" ) + + def _update_progress_bar(self, mark_completed: bool = False): + """Update the phase progress bar for the current phase.""" + match self.phase: + case LoadShape.Phase.Setup: + pass + vsb.progress.update( + self.progress_task_id, total=1, completed=1 if mark_completed else 0 + ) + case LoadShape.Phase.Populate: + completed = vsb.metrics_tracker.calculated_metrics.get( + "Populate", {} + ).get("records", 0) + + env = self.runner.environment + stats: locust.stats.StatsEntry = env.stats.get( + env.parsed_options.workload, "Populate" + ) + duration = time.time() - stats.start_time + rps_str = "records/sec: {:.1f}".format(completed / duration) + vsb.progress.update( + self.progress_task_id, + completed=completed, + total=self.record_count, + extra_info=rps_str, + ) + case LoadShape.Phase.Run: + # TODO: When we add additional request types other than Search, + # we need to expand this to include them. + env = self.runner.environment + stats = env.stats.get(env.parsed_options.workload, "Search") + + latency_str = ", ".join( + [ + f"p{p}={stats.get_current_response_time_percentile(p/100.0) or '...'}ms" + for p in [5, 95] + ] + ) + + def get_recall_pct(p): + return vsb.metrics_tracker.get_metric_percentile( + "Search", "recall", p + ) + + recall_str = ", ".join([f"p{p}={get_recall_pct(p)}" for p in [50, 5]]) + + last_n = locust.stats.CURRENT_RESPONSE_TIME_PERCENTILE_WINDOW + metrics_str = ( + f"last {last_n}s metrics: [magenta]latency: {latency_str}[/magenta]" + + " | " + + f"[magenta]recall: {recall_str}" + ) + + vsb.progress.update( + self.progress_task_id, + completed=stats.num_requests, + total=self.request_count, + extra_info=metrics_str, + ) diff --git a/vsb/vsb_types.py b/vsb/vsb_types.py index 7b6a06c..2ad6470 100644 --- a/vsb/vsb_types.py +++ b/vsb/vsb_types.py @@ -16,6 +16,9 @@ class Record(BaseModel): class RecordList(RootModel): root: list[Record] + def __len__(self): + return len(self.root) + def __iter__(self): return iter(self.root) diff --git a/vsb/workloads/dataset.py b/vsb/workloads/dataset.py index a785cbf..3179dd5 100644 --- a/vsb/workloads/dataset.py +++ b/vsb/workloads/dataset.py @@ -10,7 +10,9 @@ import pyarrow.dataset as ds from pyarrow.parquet import ParquetDataset, ParquetFile +import vsb from vsb import logger +from vsb.logging import ProgressIOWrapper class Dataset: @@ -268,13 +270,23 @@ def should_download(blob): f"Parquet dataset: downloading {len(to_download)} files belonging to " f"dataset '{self.name}'" ) + if vsb.progress: + download_task = vsb.progress.add_task( + " Downloading dataset files", total=len(to_download) + ) for blob in to_download: logger.debug( f"Dataset file '{blob.name}' not found in cache - will be downloaded" ) dest_path = self.cache / blob.name dest_path.parent.mkdir(parents=True, exist_ok=True) - blob.download_to_filename(self.cache / blob.name) + blob.download_to_file( + ProgressIOWrapper( + dest=dest_path, progress=vsb.progress, total=blob.size, indent=2 + ) + ) + if vsb.progress: + vsb.progress.update(download_task, advance=1) def _load_parquet_dataset(self, kind, limit=0): parquet_files = [f for f in (self.cache / self.name).glob(kind + "/*.parquet")]