diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index 9ded563..f7a9615 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -20,4 +20,4 @@ jobs: pip install pylint - name: Analysing the code with pylint run: | - pylint -d C0103 $(git ls-files '*.py') \ No newline at end of file + pylint -d C0103 $(git ls-files '*.py' | grep -v 'hunter/') \ No newline at end of file diff --git a/README.md b/README.md index b85a259..4ee669c 100644 --- a/README.md +++ b/README.md @@ -61,8 +61,12 @@ Orion provides flexibility in configuring its behavior by allowing users to set For enhanced troubleshooting and debugging, Orion supports the ```--debug``` flag, enabling the generation of detailed debug logs. +Activate Orion's regression detection tool for performance-scale CPT runs effortlessly with the ```--hunter-analyze``` command. This seamlessly integrates with metadata and hunter, ensuring a robust and efficient regression detection process. + Additionally, users can specify a custom path for the output CSV file using the ```--output``` flag, providing control over the location where the generated CSV will be stored. + + Orion's seamless integration with metadata and hunter ensures a robust regression detection tool for perf-scale CPT runs. diff --git a/hunter/__init__.py b/hunter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hunter/analysis.py b/hunter/analysis.py new file mode 100644 index 0000000..deb4773 --- /dev/null +++ b/hunter/analysis.py @@ -0,0 +1,241 @@ +from dataclasses import dataclass +from typing import Iterable, List, Reversible + +import numpy as np +from scipy.stats import ttest_ind_from_stats +from signal_processing_algorithms.e_divisive import EDivisive +from signal_processing_algorithms.e_divisive.base import SignificanceTester +from signal_processing_algorithms.e_divisive.calculators import cext_calculator +from signal_processing_algorithms.e_divisive.change_points import EDivisiveChangePoint +from signal_processing_algorithms.e_divisive.significance_test import ( + QHatPermutationsSignificanceTester, +) + + +@dataclass +class ComparativeStats: + """ + Keeps statistics of two series of data and the probability both series + have the same distribution. + """ + + mean_1: float + mean_2: float + std_1: float + std_2: float + pvalue: float + + def forward_rel_change(self): + """Relative change from left to right""" + return self.mean_2 / self.mean_1 - 1.0 + + def backward_rel_change(self): + """Relative change from right to left""" + return self.mean_1 / self.mean_2 - 1.0 + + def change_magnitude(self): + """Maximum of absolutes of rel_change and rel_change_reversed""" + return max(abs(self.forward_rel_change()), abs(self.backward_rel_change())) + + +@dataclass +class ChangePoint: + index: int + stats: ComparativeStats + + +class ExtendedSignificanceTester(SignificanceTester): + """ + Adds capability of exposing the means and deviations of both sides of the split + and the pvalue (strength) of the split. + """ + + pvalue: float + + def change_point(self, index: int, series: np.ndarray, windows: Iterable[int]) -> ChangePoint: + """ + Computes properties of the change point if the change point gets + inserted at the given index into the series array. + """ + ... + + def compare(self, left: np.ndarray, right: np.ndarray) -> ComparativeStats: + """ + Compares two sets of points for similarity / difference. + Computes basic stats and probability both sets come from the same distribution/ + """ + ... + + @staticmethod + def find_window(candidate: int, window_endpoints: Reversible[int]) -> (int, int): + start: int = next((x for x in reversed(window_endpoints) if x < candidate), None) + end: int = next((x for x in window_endpoints if x > candidate), None) + return start, end + + def is_significant( + self, candidate: EDivisiveChangePoint, series: np.ndarray, windows: Iterable[int] + ) -> bool: + try: + cp = self.change_point(candidate.index, series, windows) + return cp.stats.pvalue <= self.pvalue + except ValueError: + return False + + +class TTestSignificanceTester(ExtendedSignificanceTester): + """ + Uses two-sided Student's T-test to decide if a candidate change point + splits the series into pieces that are significantly different from each other. + This test is good if the data between the change points have normal distribution. + It works well even with tiny numbers of points (<10). + """ + + def __init__(self, pvalue: float): + self.pvalue = pvalue + + def change_point( + self, index: int, series: np.ndarray, window_endpoints: Reversible[int] + ) -> ChangePoint: + + (start, end) = self.find_window(index, window_endpoints) + left = series[start:index] + right = series[index:end] + stats = self.compare(left, right) + return ChangePoint(index, stats) + + def compare(self, left: np.ndarray, right: np.ndarray) -> ComparativeStats: + if len(left) == 0 or len(right) == 0: + raise ValueError + + mean_l = np.mean(left) + mean_r = np.mean(right) + std_l = np.std(left) if len(left) >= 2 else 0.0 + std_r = np.std(right) if len(right) >= 2 else 0.0 + + if len(left) + len(right) > 2: + (_, p) = ttest_ind_from_stats( + mean_l, std_l, len(left), mean_r, std_r, len(right), alternative="two-sided" + ) + else: + p = 1.0 + return ComparativeStats(mean_l, mean_r, std_l, std_r, p) + + +def fill_missing(data: List[float]): + """ + Forward-fills None occurrences with nearest previous non-None values. + Initial None values are back-filled with the nearest future non-None value. + """ + prev = None + for i in range(len(data)): + if data[i] is None and prev is not None: + data[i] = prev + prev = data[i] + + prev = None + for i in reversed(range(len(data))): + if data[i] is None and prev is not None: + data[i] = prev + prev = data[i] + + +def merge( + change_points: List[ChangePoint], series: np.array, max_pvalue: float, min_magnitude: float +) -> List[ChangePoint]: + """ + Removes weak change points recursively going bottom-up + until we get only high-quality change points + that meet the P-value and rel_change criteria. + + Parameters: + :param max_pvalue: maximum accepted pvalue + :param min_magnitude: minimum accepted relative change + """ + + tester = TTestSignificanceTester(max_pvalue) + while change_points: + + # Select the change point with weakest unacceptable P-value + # If all points have acceptable P-values, select the change-point with + # the least relative change: + weakest_cp = max(change_points, key=lambda c: c.stats.pvalue) + if weakest_cp.stats.pvalue < max_pvalue: + weakest_cp = min(change_points, key=lambda c: c.stats.change_magnitude()) + if weakest_cp.stats.change_magnitude() > min_magnitude: + return change_points + + # Remove the point from the list + weakest_cp_index = change_points.index(weakest_cp) + del change_points[weakest_cp_index] + + # We can't continue yet, because by removing a change_point + # the adjacent change points changed their properties. + # Recompute the adjacent change point stats: + window_endpoints = [0] + [cp.index for cp in change_points] + [len(series)] + + def recompute(index: int): + if index < 0 or index >= len(change_points): + return + cp = change_points[index] + change_points[index] = tester.change_point(cp.index, series, window_endpoints) + + recompute(weakest_cp_index) + recompute(weakest_cp_index + 1) + + return change_points + + +def split(series: np.array, window_len: int = 30, max_pvalue: float = 0.001) -> List[ChangePoint]: + """ + Finds change points by splitting the series top-down. + + Internally it uses the EDivisive algorithm from mongodb-signal-processing + that recursively splits the series in a way to maximize some measure of + dissimilarity (denoted qhat) between the chunks. + Splitting happens as long as the dissimilarity is statistically significant. + + Unfortunately this algorithms has a few downsides: + - the complexity is O(n^2), where n is the length of the series + - if there are too many change points and too much data, the change points in the middle + of the series may be missed + + This function tries to address these issues by invoking EDivisive on smaller + chunks (windows) of the input data instead of the full series and then merging the results. + Each window should be large enough to contain enough points to detect a change-point. + Consecutive windows overlap so that we won't miss changes happening between them. + """ + assert "Window length must be at least 2", window_len >= 2 + start = 0 + step = int(window_len / 2) + indexes = [] + tester = TTestSignificanceTester(max_pvalue) + while start < len(series): + end = min(start + window_len, len(series)) + calculator = cext_calculator + algo = EDivisive(seed=None, calculator=calculator, significance_tester=tester) + pts = algo.get_change_points(series[start:end]) + new_indexes = [p.index + start for p in pts] + new_indexes.sort() + last_new_change_point_index = next(iter(new_indexes[-1:]), 0) + start = max(last_new_change_point_index, start + step) + indexes += new_indexes + + window_endpoints = [0] + indexes + [len(series)] + return [tester.change_point(i, series, window_endpoints) for i in indexes] + + +def compute_change_points_orig(series: np.array, max_pvalue: float = 0.001) -> List[ChangePoint]: + calculator = cext_calculator + tester = QHatPermutationsSignificanceTester(calculator, pvalue=max_pvalue, permutations=100) + algo = EDivisive(seed=None, calculator=calculator, significance_tester=tester) + pts = algo.get_change_points(series) + indexes = [p.index for p in pts] + window_endpoints = [0] + indexes + [len(series)] + return [tester.change_point(i, series, window_endpoints) for i in indexes] + + +def compute_change_points( + series: np.array, window_len: int = 50, max_pvalue: float = 0.001, min_magnitude: float = 0.05 +) -> List[ChangePoint]: + change_points = split(series, window_len, max_pvalue * 10) + return merge(change_points, series, max_pvalue, min_magnitude) diff --git a/hunter/attributes.py b/hunter/attributes.py new file mode 100644 index 0000000..c69165a --- /dev/null +++ b/hunter/attributes.py @@ -0,0 +1,45 @@ +from datetime import datetime +from typing import Dict + +from hunter.util import format_timestamp + + +def form_hyperlink_html_str(display_text: str, url: str) -> str: + return f'
  • {display_text}
  • ' + + +def form_created_msg_html_str() -> str: + formatted_time = format_timestamp(int(datetime.now().timestamp()), False) + return f"

    Created by Hunter: {formatted_time}

    " + + +def get_back_links(attributes: Dict[str, str]) -> str: + """ + This method is responsible for providing an HTML string corresponding to Fallout and GitHub + links associated to the attributes of a Fallout-based test run. + + - If no GitHub commit or branch data is provided in the attributes dict, no hyperlink data + associated to GitHub project repository is provided in the returned HTML. + """ + + # grabbing test runner related data (e.g. Fallout) + html_str = "" + if attributes.get("test_url"): + html_str = form_hyperlink_html_str(display_text="Test", url=attributes.get("test_url")) + + if attributes.get("run_url"): + html_str = form_hyperlink_html_str(display_text="Test run", url=attributes.get("run_url")) + + # grabbing Github project repository related data + # TODO: Will we be responsible for handling versioning from repositories aside from bdp? + repo_url = attributes.get("repo_url", "http://github.com/riptano/bdp") + if attributes.get("commit"): + html_str += form_hyperlink_html_str( + display_text="Git commit", url=f"{repo_url}/commit/{attributes.get('commit')}" + ) + elif attributes.get("branch"): + html_str += form_hyperlink_html_str( + display_text="Git branch", url=f"{repo_url}/tree/{attributes.get('branch')}" + ) + html_str += form_created_msg_html_str() + return html_str diff --git a/hunter/config.py b/hunter/config.py new file mode 100644 index 0000000..323acde --- /dev/null +++ b/hunter/config.py @@ -0,0 +1,150 @@ +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional + +from expandvars import expandvars +from ruamel.yaml import YAML + +from hunter.grafana import GrafanaConfig +from hunter.graphite import GraphiteConfig +from hunter.slack import SlackConfig +from hunter.test_config import TestConfig, create_test_config +from hunter.util import merge_dict_list + + +@dataclass +class Config: + graphite: Optional[GraphiteConfig] + grafana: Optional[GrafanaConfig] + tests: Dict[str, TestConfig] + test_groups: Dict[str, List[TestConfig]] + slack: SlackConfig + + +@dataclass +class ConfigError(Exception): + message: str + + +def load_templates(config: Dict) -> Dict[str, Dict]: + templates = config.get("templates", {}) + if not isinstance(templates, Dict): + raise ConfigError("Property `templates` is not a dictionary") + return templates + + +def load_tests(config: Dict, templates: Dict) -> Dict[str, TestConfig]: + tests = config.get("tests", {}) + if not isinstance(tests, Dict): + raise ConfigError("Property `tests` is not a dictionary") + + result = {} + for (test_name, test_config) in tests.items(): + template_names = test_config.get("inherit", []) + if not isinstance(template_names, List): + template_names = [templates] + try: + template_list = [templates[name] for name in template_names] + except KeyError as e: + raise ConfigError(f"Template {e.args[0]} referenced in test {test_name} not found") + test_config = merge_dict_list(template_list + [test_config]) + result[test_name] = create_test_config(test_name, test_config) + + return result + + +def load_test_groups(config: Dict, tests: Dict[str, TestConfig]) -> Dict[str, List[TestConfig]]: + groups = config.get("test_groups", {}) + if not isinstance(groups, Dict): + raise ConfigError("Property `test_groups` is not a dictionary") + + result = {} + for (group_name, test_names) in groups.items(): + test_list = [] + if not isinstance(test_names, List): + raise ConfigError(f"Test group {group_name} must be a list") + for test_name in test_names: + test_config = tests.get(test_name) + if test_config is None: + raise ConfigError(f"Test {test_name} referenced by group {group_name} not found.") + test_list.append(test_config) + + result[group_name] = test_list + + return result + + +def load_config_from(config_file: Path) -> Config: + """Loads config from the specified location""" + try: + content = expandvars(config_file.read_text()) + yaml = YAML(typ="safe") + config = yaml.load(content) + """ + if Grafana configs not explicitly set in yaml file, default to same as Graphite + server at port 3000 + """ + graphite_config = None + grafana_config = None + if "graphite" in config: + if "url" not in config["graphite"]: + raise ValueError("graphite.url") + graphite_config = GraphiteConfig(url=config["graphite"]["url"]) + if config.get("grafana") is None: + config["grafana"] = {} + config["grafana"]["url"] = f"{config['graphite']['url'].strip('/')}:3000/" + config["grafana"]["user"] = os.environ.get("GRAFANA_USER", "admin") + config["grafana"]["password"] = os.environ.get("GRAFANA_PASSWORD", "admin") + grafana_config = GrafanaConfig( + url=config["grafana"]["url"], + user=config["grafana"]["user"], + password=config["grafana"]["password"], + ) + + slack_config = None + if config.get("slack") is not None: + if not config["slack"]["token"]: + raise ValueError("slack.token") + slack_config = SlackConfig( + bot_token=config["slack"]["token"], + ) + + templates = load_templates(config) + tests = load_tests(config, templates) + groups = load_test_groups(config, tests) + + return Config( + graphite=graphite_config, + grafana=grafana_config, + slack=slack_config, + tests=tests, + test_groups=groups, + ) + + except FileNotFoundError as e: + raise ConfigError(f"Configuration file not found: {e.filename}") + except KeyError as e: + raise ConfigError(f"Configuration key not found: {e.args[0]}") + except ValueError as e: + raise ConfigError(f"Value for configuration key not found: {e.args[0]}") + + +def load_config() -> Config: + """Loads config from one of the default locations""" + + env_config_path = os.environ.get("HUNTER_CONFIG") + if env_config_path: + return load_config_from(Path(env_config_path).absolute()) + + paths = [ + Path().home() / ".hunter/hunter.yaml", + Path().home() / ".hunter/conf.yaml", + Path(os.path.realpath(__file__)).parent / "resources/hunter.yaml", + ] + + for p in paths: + if p.exists(): + return load_config_from(p) + + raise ConfigError(f"No configuration file found. Checked $HUNTER_CONFIG and searched: {paths}") diff --git a/hunter/csv_options.py b/hunter/csv_options.py new file mode 100644 index 0000000..c383347 --- /dev/null +++ b/hunter/csv_options.py @@ -0,0 +1,18 @@ +import enum +from dataclasses import dataclass + + +@dataclass +class CsvOptions: + delimiter: str + quote_char: str + + def __init__(self): + self.delimiter = "," + self.quote_char = '"' + + +class CsvColumnType(enum.Enum): + Numeric = 1 + DateTime = 2 + Str = 3 diff --git a/hunter/data_selector.py b/hunter/data_selector.py new file mode 100644 index 0000000..ca6c508 --- /dev/null +++ b/hunter/data_selector.py @@ -0,0 +1,38 @@ +import sys +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import List, Optional + +import pytz + + +@dataclass +class DataSelector: + branch: Optional[str] + metrics: Optional[List[str]] + attributes: Optional[List[str]] + last_n_points: int + since_commit: Optional[str] + since_version: Optional[str] + since_time: datetime + until_commit: Optional[str] + until_version: Optional[str] + until_time: datetime + + def __init__(self): + self.branch = None + self.metrics = None + self.attributes = None + self.last_n_points = sys.maxsize + self.since_commit = None + self.since_version = None + self.since_time = datetime.now(tz=pytz.UTC) - timedelta(days=365) + self.until_commit = None + self.until_version = None + self.until_time = datetime.now(tz=pytz.UTC) + + def get_selection_description(self): + attributes = "\n".join( + [f"{a}: {v}" for a, v in self.__dict__.items() if not a.startswith("__") and v] + ) + return f"Data Selection\n{attributes}" diff --git a/hunter/grafana.py b/hunter/grafana.py new file mode 100644 index 0000000..a643c9a --- /dev/null +++ b/hunter/grafana.py @@ -0,0 +1,106 @@ +from dataclasses import asdict, dataclass +from datetime import datetime +from typing import List, Optional + +import requests +from pytz import UTC +from requests.exceptions import HTTPError + + +@dataclass +class GrafanaConfig: + url: str + user: str + password: str + + +@dataclass +class GrafanaError(Exception): + message: str + + +@dataclass +class Annotation: + id: Optional[int] + time: datetime + text: str + tags: List[str] + + +class Grafana: + url: str + __user: str + __password: str + + def __init__(self, grafana_conf: GrafanaConfig): + self.url = grafana_conf.url + self.__user = grafana_conf.user + self.__password = grafana_conf.password + + def fetch_annotations( + self, start: Optional[datetime], end: Optional[datetime], tags: List[str] = None + ) -> List[Annotation]: + """ + Reference: + - https://grafana.com/docs/grafana/latest/http_api/annotations/#find-annotations + """ + url = f"{self.url}api/annotations" + query_parameters = {} + if start is not None: + query_parameters["from"] = int(start.timestamp() * 1000) + if end is not None: + query_parameters["to"] = int(end.timestamp() * 1000) + if tags is not None: + query_parameters["tags"] = tags + try: + response = requests.get( + url=url, params=query_parameters, auth=(self.__user, self.__password) + ) + response.raise_for_status() + json = response.json() + annotations = [] + for annotation_json in json: + annotation = Annotation( + id=annotation_json["id"], + time=datetime.fromtimestamp(float(annotation_json["time"]) / 1000, tz=UTC), + text=annotation_json["text"], + tags=annotation_json["tags"], + ) + annotations.append(annotation) + + return annotations + + except KeyError as err: + raise GrafanaError(f"Missing field {err.args[0]}") + except HTTPError as err: + raise GrafanaError(str(err)) + + def delete_annotations(self, *ids: int): + """ + Reference: + - https://grafana.com/docs/grafana/latest/http_api/annotations/#delete-annotation-by-id + """ + url = f"{self.url}api/annotations" + for annotation_id in ids: + annotation_url = f"{url}/{annotation_id}" + try: + response = requests.delete(url=annotation_url, auth=(self.__user, self.__password)) + response.raise_for_status() + except HTTPError as err: + raise GrafanaError(str(err)) + + def create_annotations(self, *annotations: Annotation): + """ + Reference: + - https://grafana.com/docs/grafana/latest/http_api/annotations/#create-annotation + """ + try: + url = f"{self.url}api/annotations" + for annotation in annotations: + data = asdict(annotation) + data["time"] = int(annotation.time.timestamp() * 1000) + del data["id"] + response = requests.post(url=url, data=data, auth=(self.__user, self.__password)) + response.raise_for_status() + except HTTPError as err: + raise GrafanaError(str(err)) diff --git a/hunter/graphite.py b/hunter/graphite.py new file mode 100644 index 0000000..5d3a9a2 --- /dev/null +++ b/hunter/graphite.py @@ -0,0 +1,240 @@ +import ast +import json +import urllib.request +from dataclasses import dataclass +from datetime import datetime +from logging import info +from typing import Dict, Iterable, List, Optional + +from hunter.data_selector import DataSelector +from hunter.util import parse_datetime + + +@dataclass +class GraphiteConfig: + url: str + + +@dataclass +class DataPoint: + time: int + value: float + + +@dataclass +class TimeSeries: + path: str + points: List[DataPoint] + + +def decode_graphite_datapoints(series: Dict[str, List[List[float]]]) -> List[DataPoint]: + + points = series["datapoints"] + return [DataPoint(int(p[1]), p[0]) for p in points if p[0] is not None] + + +def to_graphite_time(time: datetime, default: str) -> str: + """ + Note that millissecond-level precision matters when trying to fetch events in a given time + range, hence opting for this over time.strftime("%H:%M_%Y%m%d") + """ + if time is not None: + return str(int(time.timestamp())) + else: + return default + + +@dataclass +class GraphiteError(IOError): + message: str + + +@dataclass +class GraphiteEvent: + test_owner: str + test_name: str + run_id: str + status: str + start_time: datetime + pub_time: datetime + end_time: datetime + version: Optional[str] + branch: Optional[str] + commit: Optional[str] + + def __init__( + self, + pub_time: int, + test_owner: str, + test_name: str, + run_id: str, + status: str, + start_time: int, + end_time: int, + version: Optional[str], + branch: Optional[str], + commit: Optional[str], + ): + self.test_owner = test_owner + self.test_name = test_name + self.run_id = run_id + self.status = status + self.start_time = parse_datetime(str(start_time)) + self.pub_time = parse_datetime(str(pub_time)) + self.end_time = parse_datetime(str(end_time)) + if len(version) == 0 or version == "null": + self.version = None + else: + self.version = version + if len(branch) == 0 or branch == "null": + self.branch = None + else: + self.branch = branch + if len(commit) == 0 or commit == "null": + self.commit = None + else: + self.commit = commit + + +def compress_target_paths(paths: List[str]) -> List[str]: + """Uses the alternative syntax to reduce the total length of the query""" + result = [] + prefix_map = {} + for p in paths: + components = p.rsplit(".", 1) + if len(components) == 1: + result.append(p) + continue + + prefix = components[0] + suffix = components[1] + if prefix not in prefix_map: + prefix_map[prefix] = [suffix] + else: + prefix_map[prefix].append(suffix) + + for prefix, suffixes in prefix_map.items(): + if len(suffixes) > 1: + result.append(prefix + ".{" + ",".join(suffixes) + "}") + else: + result.append(prefix + "." + suffixes[0]) + + return result + + +class Graphite: + __url: str + __url_limit: int # max URL length used when requesting metrics from Graphite + + def __init__(self, conf: GraphiteConfig): + self.__url = conf.url + self.__url_limit = 4094 + + def fetch_events( + self, + tags: Iterable[str], + from_time: Optional[datetime] = None, + until_time: Optional[datetime] = None, + ) -> List[GraphiteEvent]: + """ + Returns 'Performance Test' events that match all of + the following criteria: + - all tags passed in match + - published between given from_time and until_time (both bounds inclusive) + + References: + - Graphite events REST API: https://graphite.readthedocs.io/en/stable/events.html + - Haxx: https://github.com/riptano/haxx/pull/588 + """ + try: + from_time = to_graphite_time(from_time, "-365d") + until_time = to_graphite_time(until_time, "now") + tags_str = "+".join(tags) + + url = ( + f"{self.__url}events/get_data" + f"?tags={tags_str}" + f"&from={from_time}" + f"&until={until_time}" + f"&set=intersection" + ) + data_str = urllib.request.urlopen(url).read() + data_as_json = json.loads(data_str) + return [ + GraphiteEvent(event.get("when"), **ast.literal_eval(event.get("data"))) + for event in data_as_json + if event.get("what") == "Performance Test" + ] + + except IOError as e: + raise GraphiteError(f"Failed to fetch Graphite events: {str(e)}") + + def fetch_events_with_matching_time_option( + self, tags: Iterable[str], commit: Optional[str], version: Optional[str] + ) -> List[GraphiteEvent]: + events = [] + if commit is not None: + events = list(filter(lambda e: e.commit == commit, self.fetch_events(tags))) + elif version is not None: + tags = [*tags, version] + events = self.fetch_events(tags) + return events + + def fetch_data(self, target_paths: List[str], selector: DataSelector) -> List[TimeSeries]: + """ + Connects to Graphite server and downloads interesting series with the + given prefix. The series to be downloaded are picked from SUFFIXES list. + """ + try: + info("Fetching data from Graphite...") + result = [] + + from_time = to_graphite_time(selector.since_time, "-365d") + until_time = to_graphite_time(selector.until_time, "now") + target_paths = compress_target_paths(target_paths) + targets = "" + for path in target_paths: + targets += f"target={path}&" + targets = targets.strip("&") + + url = ( + f"{self.__url}render" + f"?{targets}" + f"&format=json" + f"&from={from_time}" + f"&until={until_time}" + ) + + data_str = urllib.request.urlopen(url).read() + data_as_json = json.loads(data_str) + + for s in data_as_json: + series = TimeSeries(path=s["target"], points=decode_graphite_datapoints(s)) + result.append(series) + + return result + + except IOError as err: + raise GraphiteError(f"Failed to fetch data from Graphite: {str(err)}") + + def fetch_metric_paths(self, prefix: str, paths: Optional[List[str]] = None) -> List[str]: + """ + Provided a valid Graphite metric prefix, this method will retrieve all corresponding metric paths + Reference: + - https://graphite-api.readthedocs.io/en/latest/api.html + """ + if paths is None: + paths = [] + try: + url = f"{self.__url}metrics/find?query={prefix}" + data_str = urllib.request.urlopen(url).read() + data_as_json = json.loads(data_str) + for result in data_as_json: + curr_path = result["id"] + if result["leaf"]: + paths.append(curr_path) + else: + paths = self.fetch_metric_paths(f"{curr_path}.*", paths) + return sorted(paths) + except IOError as err: + raise GraphiteError(f"Failed to fetch metric path from Graphite: {str(err)}") diff --git a/hunter/importer.py b/hunter/importer.py new file mode 100644 index 0000000..f11392c --- /dev/null +++ b/hunter/importer.py @@ -0,0 +1,469 @@ +import csv +from collections import OrderedDict +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional + +from hunter.config import Config +from hunter.data_selector import DataSelector +from hunter.graphite import DataPoint, Graphite, GraphiteError +from hunter.series import Metric, Series +from hunter.test_config import ( + CsvMetric, + CsvTestConfig, + GraphiteTestConfig, + HistoStatTestConfig, + TestConfig, +) +from hunter.util import ( + DateFormatError, + format_timestamp, + merge_sorted, + parse_datetime, + resolution, + round, +) + + +@dataclass +class DataImportError(IOError): + message: str + + +class Importer: + """ + An Importer is responsible for importing performance metric data + metadata + from some specified data source, and creating an appropriate PerformanceLog object + from this imported data. + """ + + def fetch_data(self, test: TestConfig, selector: DataSelector = DataSelector()) -> Series: + raise NotImplementedError + + def fetch_all_metric_names(self, test: TestConfig) -> List[str]: + raise NotImplementedError + + +class GraphiteImporter(Importer): + graphite: Graphite + + def __init__(self, graphite: Graphite): + self.graphite = graphite + + def fetch_data(self, test: TestConfig, selector: DataSelector = DataSelector()) -> Series: + """ + Loads test data from graphite. + Converts raw timeseries data into a columnar format, + where each metric is represented by a list of floats. All metrics + have aligned indexes - that is values["foo"][3] applies to the + the same time point as values["bar"][3]. The time points are extracted + to a separate column. + """ + if not isinstance(test, GraphiteTestConfig): + raise ValueError("Expected GraphiteTestConfig") + + try: + attributes = test.tags.copy() + if selector.branch: + attributes += [selector.branch] + + # if the user has specified since_ and/or until_, + # we need to attempt to extract a timestamp from appropriate Graphite events, and + # update selector.since_time and selector.until_time, respectively + since_events = self.graphite.fetch_events_with_matching_time_option( + attributes, selector.since_commit, selector.since_version + ) + if len(since_events) > 0: + # since timestamps of metrics get rounded down, in order to include these, we need to + # - round down the event's pub_time + # - subtract a small amount of time (Graphite does not appear to include the left-hand + # endpoint for a time range) + rounded_time = round( + int(since_events[-1].pub_time.timestamp()), + resolution([int(since_events[-1].pub_time.timestamp())]), + ) + selector.since_time = parse_datetime(str(rounded_time)) - timedelta(milliseconds=1) + + until_events = self.graphite.fetch_events_with_matching_time_option( + attributes, selector.until_commit, selector.until_version + ) + if len(until_events) > 0: + selector.until_time = until_events[0].pub_time + + if selector.since_time.timestamp() > selector.until_time.timestamp(): + raise DataImportError( + f"Invalid time range: [" + f"{format_timestamp(int(selector.since_time.timestamp()))}, " + f"{format_timestamp(int(selector.until_time.timestamp()))}]" + ) + + metrics = test.metrics.values() + if selector.metrics is not None: + metrics = [m for m in metrics if m.name in selector.metrics] + path_to_metric = {test.get_path(selector.branch, m.name): m for m in metrics} + targets = [test.get_path(selector.branch, m.name) for m in metrics] + + graphite_result = self.graphite.fetch_data(targets, selector) + if not graphite_result: + raise DataImportError(f"No timeseries found in Graphite for test {test.name}.") + + times = [[x.time for x in series.points] for series in graphite_result] + time: List[int] = merge_sorted(times)[-selector.last_n_points :] + + def column(series: List[DataPoint]) -> List[float]: + value_by_time = dict([(x.time, x.value) for x in series]) + return [value_by_time.get(t) for t in time] + + # Keep order of the keys in the result values the same as order of metrics + values = OrderedDict() + for m in metrics: + values[m.name] = [] + for ts in graphite_result: + values[path_to_metric[ts.path].name] = column(ts.points) + for m in metrics: + if len(values[m.name]) == 0: + del values[m.name] + metrics = [m for m in metrics if m.name in values.keys()] + + events = self.graphite.fetch_events( + attributes, selector.since_time, selector.until_time + ) + time_resolution = resolution(time) + events_by_time = {} + for e in events: + events_by_time[round(int(e.pub_time.timestamp()), time_resolution)] = e + + run_ids = [] + commits = [] + versions = [] + branches = [] + for t in time: + event = events_by_time.get(t) + run_ids.append(event.run_id if event is not None else None) + commits.append(event.commit if event is not None else None) + versions.append(event.version if event is not None else None) + branches.append(event.branch if event is not None else None) + + attributes = { + "run": run_ids, + "branch": branches, + "version": versions, + "commit": commits, + } + if selector.attributes is not None: + attributes = {a: attributes[a] for a in selector.attributes} + + metrics = {m.name: Metric(m.direction, m.scale) for m in metrics} + return Series( + test.name, + branch=selector.branch, + time=time, + metrics=metrics, + data=values, + attributes=attributes, + ) + + except GraphiteError as e: + raise DataImportError(f"Failed to import test {test.name}: {e.message}") + + def fetch_all_metric_names(self, test_conf: GraphiteTestConfig) -> List[str]: + return [m for m in test_conf.metrics.keys()] + + +class CsvImporter(Importer): + @staticmethod + def check_row_len(headers, row): + if len(row) < len(headers): + raise DataImportError( + "Number of values in the row does not match " + "number of columns in the table header: " + str(row) + ) + + @staticmethod + def check_has_column(column: str, headers: List[str]): + if column not in headers: + raise DataImportError("Column not found: " + column) + + @staticmethod + def __selected_metrics( + defined_metrics: Dict[str, CsvMetric], selected_metrics: Optional[List[str]] + ) -> Dict[str, CsvMetric]: + + if selected_metrics is not None: + return {name: defined_metrics[name] for name in selected_metrics} + else: + return defined_metrics + + def fetch_data(self, test_conf: TestConfig, selector: DataSelector = DataSelector()) -> Series: + + if not isinstance(test_conf, CsvTestConfig): + raise ValueError("Expected CsvTestConfig") + + if selector.branch: + raise ValueError("CSV tests don't support branching yet") + + since_time = selector.since_time + until_time = selector.until_time + file = Path(test_conf.file) + + if since_time.timestamp() > until_time.timestamp(): + raise DataImportError( + f"Invalid time range: [" + f"{format_timestamp(int(since_time.timestamp()))}, " + f"{format_timestamp(int(until_time.timestamp()))}]" + ) + + try: + with open(file, newline="") as csv_file: + reader = csv.reader( + csv_file, + delimiter=test_conf.csv_options.delimiter, + quotechar=test_conf.csv_options.quote_char, + ) + + headers: List[str] = next(reader, None) + metrics = self.__selected_metrics(test_conf.metrics, selector.metrics) + + # Decide which columns to fetch into which components of the result: + try: + time_index: int = headers.index(test_conf.time_column) + attr_indexes: List[int] = [headers.index(c) for c in test_conf.attributes] + metric_names = [m.name for m in metrics.values()] + metric_columns = [m.column for m in metrics.values()] + metric_indexes: List[int] = [headers.index(c) for c in metric_columns] + except ValueError as err: + raise DataImportError(f"Column not found {err.args[0]}") + + if time_index in attr_indexes: + attr_indexes.remove(time_index) + if time_index in metric_indexes: + metric_indexes.remove(time_index) + + # Initialize empty lists to store the data and metadata: + time: List[int] = [] + data: Dict[str, List[float]] = {} + for n in metric_names: + data[n] = [] + attributes: Dict[str, List[str]] = {} + for i in attr_indexes: + attributes[headers[i]] = [] + + # Append the lists with data from each row: + for row in reader: + self.check_row_len(headers, row) + + # Filter by time: + ts: datetime = self.__convert_time(row[time_index]) + if since_time is not None and ts < since_time: + continue + if until_time is not None and ts >= until_time: + continue + time.append(int(ts.timestamp())) + + # Read metric values. Note we can still fail on conversion to float, + # because the user is free to override the column selection and thus + # they may select a column that contains non-numeric data: + for (name, i) in zip(metric_names, metric_indexes): + try: + data[name].append(float(row[i])) + except ValueError as err: + raise DataImportError( + "Could not convert value in column " + + headers[i] + + ": " + + err.args[0] + ) + + # Attributes are just copied as-is, with no conversion: + for i in attr_indexes: + attributes[headers[i]].append(row[i]) + + # Convert metrics to series.Metrics + metrics = {m.name: Metric(m.direction, m.scale) for m in metrics.values()} + + # Leave last n points: + time = time[-selector.last_n_points :] + tmp = data + data = {} + for k, v in tmp.items(): + data[k] = v[-selector.last_n_points :] + tmp = attributes + attributes = {} + for k, v in tmp.items(): + attributes[k] = v[-selector.last_n_points :] + + return Series( + test_conf.name, + branch=None, + time=time, + metrics=metrics, + data=data, + attributes=attributes, + ) + + except FileNotFoundError: + raise DataImportError(f"Input file not found: {file}") + + @staticmethod + def __convert_time(time: str): + try: + return parse_datetime(time) + except DateFormatError as err: + raise DataImportError(err.message) + + def fetch_all_metric_names(self, test_conf: CsvTestConfig) -> List[str]: + return [m for m in test_conf.metrics.keys()] + + +class HistoStatImporter(Importer): + + __TAG_METRICS = { + "count": {"direction": 1, "scale": "1", "col": 3}, + "min": {"direction": -1, "scale": "1.0e-6", "col": 4}, + "p25": {"direction": -1, "scale": "1.0e-6", "col": 5}, + "p50": {"direction": -1, "scale": "1.0e-6", "col": 6}, + "p75": {"direction": -1, "scale": "1.0e-6", "col": 7}, + "p90": {"direction": -1, "scale": "1.0e-6", "col": 8}, + "p95": {"direction": -1, "scale": "1.0e-6", "col": 9}, + "p98": {"direction": -1, "scale": "1.0e-6", "col": 10}, + "p99": {"direction": -1, "scale": "1.0e-6", "col": 11}, + "p999": {"direction": -1, "scale": "1.0e-6", "col": 12}, + "p9999": {"direction": -1, "scale": "1.0e-6", "col": 13}, + "max": {"direction": -1, "scale": "1.0e-6", "col": 14}, + } + + @contextmanager + def __csv_reader(self, test: HistoStatTestConfig): + with open(Path(test.file), newline="") as csv_file: + yield csv.reader(csv_file) + + @staticmethod + def __parse_tag(tag: str): + return tag.split("=")[1] + + def __get_tags(self, test: HistoStatTestConfig) -> List[str]: + tags = set() + with self.__csv_reader(test) as reader: + for row in reader: + if row[0].startswith("#"): + continue + tag = self.__parse_tag(row[0]) + if tag in tags: + break + tags.add(tag) + return list(tags) + + @staticmethod + def __metric_from_components(tag, tag_metric): + return f"{tag}.{tag_metric}" + + @staticmethod + def __convert_floating_point_millisecond(fpm: str) -> int: # to epoch seconds + return int(float(fpm) * 1000) // 1000 + + def fetch_data( + self, test: HistoStatTestConfig, selector: DataSelector = DataSelector() + ) -> Series: + def selected(metric_name): + return metric_name in selector.metrics if selector.metrics is not None else True + + metrics = {} + tag_count = 0 + for tag in self.__get_tags(test): + tag_count += 1 + for tag_metric, attrs in self.__TAG_METRICS.items(): + if selected(self.__metric_from_components(tag, tag_metric)): + metrics[self.__metric_from_components(tag, tag_metric)] = Metric( + attrs["direction"], attrs["scale"] + ) + + data = {k: [] for k in metrics.keys()} + time = [] + with self.__csv_reader(test) as reader: + start_time = None + for row in reader: + if not row[0].startswith("#"): + break + if "StartTime" in row[0]: + parts = row[0].split(" ") + start_time = self.__convert_floating_point_millisecond(parts[1]) + + if not start_time: + raise DataImportError("No Start Time specified in HistoStat CSV comment") + + # Last iteration of row is the first non-comment row. Parse it now. + tag_interval = 0 + while row: + if tag_interval % tag_count == 0: + # Introduces a slight inaccuracy - each tag can report its interval start time + # with some millisecond difference. Choosing a single tag interval allows us + # to maintain the 'indexed by a single time variable' contract required by + # Series, but the time reported for almost all metrics will be _slightly_ off. + time.append(self.__convert_floating_point_millisecond(row[1]) + start_time) + tag_interval += 1 + tag = self.__parse_tag(row[0]) + for tag_metric, attrs in self.__TAG_METRICS.items(): + if selected(self.__metric_from_components(tag, tag_metric)): + data[self.__metric_from_components(tag, tag_metric)].append( + float(row[attrs["col"]]) + ) + try: + row = next(reader) + except StopIteration: + row = None + + # Leave last n points: + time = time[-selector.last_n_points :] + tmp = data + data = {} + for k, v in tmp.items(): + data[k] = v[-selector.last_n_points :] + + return Series(test.name, None, time, metrics, data, dict()) + + def fetch_all_metric_names(self, test: HistoStatTestConfig) -> List[str]: + metric_names = [] + for tag in self.__get_tags(test): + for tag_metric in self.__TAG_METRICS.keys(): + metric_names.append(self.__metric_from_components(tag, tag_metric)) + return metric_names + + +class Importers: + __config: Config + __csv_importer: Optional[CsvImporter] + __graphite_importer: Optional[GraphiteImporter] + __histostat_importer: Optional[HistoStatImporter] + + def __init__(self, config: Config): + self.__config = config + self.__csv_importer = None + self.__graphite_importer = None + self.__histostat_importer = None + + def csv_importer(self) -> CsvImporter: + if self.__csv_importer is None: + self.__csv_importer = CsvImporter() + return self.__csv_importer + + def graphite_importer(self) -> GraphiteImporter: + if self.__graphite_importer is None: + self.__graphite_importer = GraphiteImporter(Graphite(self.__config.graphite)) + return self.__graphite_importer + + def histostat_importer(self) -> HistoStatImporter: + if self.__histostat_importer is None: + self.__histostat_importer = HistoStatImporter() + return self.__histostat_importer + + def get(self, test: TestConfig) -> Importer: + if isinstance(test, CsvTestConfig): + return self.csv_importer() + elif isinstance(test, GraphiteTestConfig): + return self.graphite_importer() + elif isinstance(test, HistoStatTestConfig): + return self.histostat_importer() + else: + raise ValueError(f"Unsupported test type {type(test)}") diff --git a/hunter/main.py b/hunter/main.py new file mode 100644 index 0000000..256dc5b --- /dev/null +++ b/hunter/main.py @@ -0,0 +1,649 @@ +import argparse +import copy +import logging +import sys +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, List, Optional + +import pytz +from slack_sdk import WebClient + +from hunter import config +from hunter.attributes import get_back_links +from hunter.config import Config, ConfigError +from hunter.data_selector import DataSelector +from hunter.grafana import Annotation, Grafana, GrafanaError +from hunter.graphite import GraphiteError +from hunter.importer import DataImportError, Importers +from hunter.report import Report, ReportType +from hunter.series import AnalysisOptions, AnalyzedSeries, compare +from hunter.slack import NotificationError, SlackNotifier +from hunter.test_config import GraphiteTestConfig, TestConfig, TestConfigError +from hunter.util import DateFormatError, interpolate, parse_datetime + + +@dataclass +class HunterError(Exception): + message: str + + +class Hunter: + __conf: Config + __importers: Importers + __grafana: Optional[Grafana] + __slack: Optional[SlackNotifier] + + def __init__(self, conf: Config): + self.__conf = conf + self.__importers = Importers(conf) + self.__grafana = None + self.__slack = self.__maybe_create_slack_notifier() + + def list_tests(self, group_names: Optional[List[str]]): + if group_names is not None: + test_names = [] + for group_name in group_names: + group = self.__conf.test_groups.get(group_name) + if group is None: + raise HunterError(f"Test group not found: {group_name}") + test_names += (t.name for t in group) + else: + test_names = self.__conf.tests + + for test_name in sorted(test_names): + print(test_name) + + def list_test_groups(self): + for group_name in sorted(self.__conf.test_groups): + print(group_name) + + def get_test(self, test_name: str) -> TestConfig: + test = self.__conf.tests.get(test_name) + if test is None: + raise HunterError(f"Test not found {test_name}") + return test + + def get_tests(self, *names: str) -> List[TestConfig]: + tests = [] + for name in names: + group = self.__conf.test_groups.get(name) + if group is not None: + tests += group + else: + test = self.__conf.tests.get(name) + if test is not None: + tests.append(test) + else: + raise HunterError(f"Test or group not found: {name}") + return tests + + def list_metrics(self, test: TestConfig): + importer = self.__importers.get(test) + for metric_name in importer.fetch_all_metric_names(test): + print(metric_name) + + def analyze( + self, + test: TestConfig, + selector: DataSelector, + options: AnalysisOptions, + report_type: ReportType, + ) -> AnalyzedSeries: + importer = self.__importers.get(test) + series = importer.fetch_data(test, selector) + analyzed_series = series.analyze(options) + change_points = analyzed_series.change_points_by_time + report = Report(series, change_points) + produced_report = report.produce_report(test.name, report_type) + print(produced_report) + return analyzed_series + + def __get_grafana(self) -> Grafana: + if self.__grafana is None: + self.__grafana = Grafana(self.__conf.grafana) + return self.__grafana + + def update_grafana_annotations(self, test: GraphiteTestConfig, series: AnalyzedSeries): + grafana = self.__get_grafana() + begin = datetime.fromtimestamp(series.time()[0], tz=pytz.UTC) + end = datetime.fromtimestamp(series.time()[len(series.time()) - 1], tz=pytz.UTC) + + logging.info(f"Fetching Grafana annotations for test {test.name}...") + tags_to_query = ["hunter", "change-point", "test:" + test.name] + old_annotations_for_test = grafana.fetch_annotations(begin, end, list(tags_to_query)) + logging.info(f"Found {len(old_annotations_for_test)} annotations") + + created_count = 0 + for metric_name, change_points in series.change_points.items(): + path = test.get_path(series.branch_name(), metric_name) + metric_tag = f"metric:{metric_name}" + tags_to_create = ( + tags_to_query + + [metric_tag] + + test.tags + + test.annotate + + test.metrics[metric_name].annotate + ) + + substitutions = { + "TEST_NAME": test.name, + "METRIC_NAME": metric_name, + "GRAPHITE_PATH": [path], + "GRAPHITE_PATH_COMPONENTS": path.split("."), + "GRAPHITE_PREFIX": [test.prefix], + "GRAPHITE_PREFIX_COMPONENTS": test.prefix.split("."), + } + + tmp_tags_to_create = [] + for t in tags_to_create: + tmp_tags_to_create += interpolate(t, substitutions) + tags_to_create = tmp_tags_to_create + + old_annotations = [a for a in old_annotations_for_test if metric_tag in a.tags] + old_annotation_times = set((a.time for a in old_annotations if a.tags)) + + target_annotations = [] + for cp in change_points: + attributes = series.attributes_at(cp.index) + annotation_text = get_back_links(attributes) + target_annotations.append( + Annotation( + id=None, + time=datetime.fromtimestamp(cp.time, tz=pytz.UTC), + text=annotation_text, + tags=tags_to_create, + ) + ) + target_annotation_times = set((a.time for a in target_annotations)) + + to_delete = [a for a in old_annotations if a.time not in target_annotation_times] + if to_delete: + logging.info( + f"Removing {len(to_delete)} annotations " + f"for test {test.name} and metric {metric_name}..." + ) + grafana.delete_annotations(*(a.id for a in to_delete)) + + to_create = [a for a in target_annotations if a.time not in old_annotation_times] + if to_create: + logging.info( + f"Creating {len(to_create)} annotations " + f"for test {test.name} and metric {metric_name}..." + ) + grafana.create_annotations(*to_create) + created_count += len(to_create) + + if created_count == 0: + logging.info("All annotations up-to-date. No new annotations needed.") + else: + logging.info(f"Created {created_count} annotations.") + + def remove_grafana_annotations(self, test: Optional[TestConfig], force: bool): + """Removes all Hunter annotations (optionally for a given test) in Grafana""" + grafana = self.__get_grafana() + if test: + logging.info(f"Fetching Grafana annotations for test {test.name}...") + else: + logging.info("Fetching Grafana annotations...") + tags_to_query = {"hunter", "change-point"} + if test: + tags_to_query.add(f"test: {test.name}") + annotations = grafana.fetch_annotations(None, None, list(tags_to_query)) + if not annotations: + logging.info("No annotations found.") + return + if not force: + print( + f"Are you sure to remove {len(annotations)} annotations from {grafana.url}? [y/N]" + ) + decision = input().strip() + if decision.lower() != "y" and decision.lower() != "yes": + return + logging.info(f"Removing {len(annotations)} annotations...") + grafana.delete_annotations(*(a.id for a in annotations)) + + def regressions( + self, test: TestConfig, selector: DataSelector, options: AnalysisOptions + ) -> bool: + importer = self.__importers.get(test) + + # Even if user is interested only in performance difference since some point X, + # we really need to fetch some earlier points than X. + # Otherwise, if performance went down very early after X, e.g. at X + 1, we'd have + # insufficient number of data points to compute the baseline performance. + # Instead of using `since-` selector, we're fetching everything from the + # beginning and then we find the baseline performance around the time pointed by + # the original selector. + since_version = selector.since_version + since_commit = selector.since_commit + since_time = selector.since_time + baseline_selector = copy.deepcopy(selector) + baseline_selector.last_n_points = sys.maxsize + baseline_selector.branch = None + baseline_selector.since_version = None + baseline_selector.since_commit = None + baseline_selector.since_time = since_time - timedelta(days=30) + baseline_series = importer.fetch_data(test, baseline_selector) + + if since_version: + baseline_index = baseline_series.find_by_attribute("version", since_version) + if not baseline_index: + raise HunterError(f"No runs of test {test.name} with version {since_version}") + baseline_index = max(baseline_index) + elif since_commit: + baseline_index = baseline_series.find_by_attribute("commit", since_commit) + if not baseline_index: + raise HunterError(f"No runs of test {test.name} with commit {since_commit}") + baseline_index = max(baseline_index) + else: + baseline_index = baseline_series.find_first_not_earlier_than(since_time) + + baseline_series = baseline_series.analyze(options=options) + + if selector.branch: + target_series = importer.fetch_data(test, selector).analyze(options=options) + else: + target_series = baseline_series + + cmp = compare(baseline_series, baseline_index, target_series, target_series.len()) + regressions = [] + for metric_name, stats in cmp.stats.items(): + direction = baseline_series.metric(metric_name).direction + m1 = stats.mean_1 + m2 = stats.mean_2 + change_percent = stats.forward_rel_change() * 100.0 + if m2 * direction < m1 * direction and stats.pvalue < options.max_pvalue: + regressions.append( + " {:16}: {:#8.3g} --> {:#8.3g} ({:+6.1f}%)".format( + metric_name, m1, m2, change_percent + ) + ) + + if regressions: + print(f"{test.name}:") + for r in regressions: + print(r) + else: + print(f"{test.name}: OK") + return len(regressions) > 0 + + def __maybe_create_slack_notifier(self): + if not self.__conf.slack: + return None + return SlackNotifier(WebClient(token=self.__conf.slack.bot_token)) + + def notify_slack( + self, + test_change_points: Dict[str, AnalyzedSeries], + selector: DataSelector, + channels: List[str], + since: datetime, + ): + if not self.__slack: + logging.error( + "Slack definition is missing from the configuration, cannot send notification" + ) + return + self.__slack.notify(test_change_points, selector=selector, channels=channels, since=since) + + def validate(self): + valid = True + unique_metrics = set() + for name, test in self.__conf.tests.items(): + logging.info("Checking {}".format(name)) + test_metrics = test.fully_qualified_metric_names() + for test_metric in test_metrics: + if test_metric not in unique_metrics: + unique_metrics.add(test_metric) + else: + valid = False + logging.error(f"Found duplicated metric: {test_metric}") + try: + importer = self.__importers.get(test) + series = importer.fetch_data(test) + for metric, metric_data in series.data.items(): + if not metric_data: + logging.warning(f"Test's metric does not have data: {name} {metric}") + except Exception as err: + logging.error(f"Invalid test definition: {name}\n{repr(err)}\n") + valid = False + logging.info(f"Validation finished: {'VALID' if valid else 'INVALID'}") + if not valid: + exit(1) + + +def setup_data_selector_parser(parser: argparse.ArgumentParser): + parser.add_argument( + "--branch", metavar="STRING", dest="branch", help="name of the branch", nargs="?" + ) + parser.add_argument( + "--metrics", + metavar="LIST", + dest="metrics", + help="a comma-separated list of metrics to analyze", + ) + parser.add_argument( + "--attrs", + metavar="LIST", + dest="attributes", + help="a comma-separated list of attribute names associated with the runs " + "(e.g. commit, branch, version); " + "if not specified, it will be automatically filled based on available information", + ) + since_group = parser.add_mutually_exclusive_group() + since_group.add_argument( + "--since-commit", + metavar="STRING", + dest="since_commit", + help="the commit at the start of the time span to analyze", + ) + since_group.add_argument( + "--since-version", + metavar="STRING", + dest="since_version", + help="the version at the start of the time span to analyze", + ) + since_group.add_argument( + "--since", + metavar="DATE", + dest="since_time", + help="the start of the time span to analyze; " + "accepts ISO, and human-readable dates like '10 weeks ago'", + ) + until_group = parser.add_mutually_exclusive_group() + until_group.add_argument( + "--until-commit", + metavar="STRING", + dest="until_commit", + help="the commit at the end of the time span to analyze", + ) + until_group.add_argument( + "--until-version", + metavar="STRING", + dest="until_version", + help="the version at the end of the time span to analyze", + ) + until_group.add_argument( + "--until", + metavar="DATE", + dest="until_time", + help="the end of the time span to analyze; same syntax as --since", + ) + parser.add_argument( + "--last", + type=int, + metavar="COUNT", + dest="last_n_points", + help="the number of data points to take from the end of the series", + ) + + +def data_selector_from_args(args: argparse.Namespace) -> DataSelector: + data_selector = DataSelector() + if args.branch: + data_selector.branch = args.branch + if args.metrics is not None: + data_selector.metrics = list(args.metrics.split(",")) + if args.attributes is not None: + data_selector.attributes = list(args.attributes.split(",")) + if args.since_commit is not None: + data_selector.since_commit = args.since_commit + if args.since_version is not None: + data_selector.since_version = args.since_version + if args.since_time is not None: + data_selector.since_time = parse_datetime(args.since_time) + if args.until_commit is not None: + data_selector.until_commit = args.until_commit + if args.until_version is not None: + data_selector.until_version = args.until_version + if args.until_time is not None: + data_selector.until_time = parse_datetime(args.until_time) + if args.last_n_points is not None: + data_selector.last_n_points = args.last_n_points + return data_selector + + +def setup_analysis_options_parser(parser: argparse.ArgumentParser): + parser.add_argument( + "-P, --p-value", + dest="pvalue", + type=float, + default=0.001, + help="maximum accepted P-value of a change-point; " + "P denotes the probability that the change-point has " + "been found by a random coincidence, rather than a real " + "difference between the data distributions", + ) + parser.add_argument( + "-M", + "--magnitude", + dest="magnitude", + type=float, + default=0.0, + help="minimum accepted magnitude of a change-point " + "computed as abs(new_mean / old_mean - 1.0); use it " + "to filter out stupidly small changes like < 0.01", + ) + parser.add_argument( + "--window", + default=50, + type=int, + dest="window", + help="the number of data points analyzed at once; " + "the window size affects the discriminative " + "power of the change point detection algorithm; " + "large windows are less susceptible to noise; " + "however, a very large window may cause dismissing short regressions " + "as noise so it is best to keep it short enough to include not more " + "than a few change points (optimally at most 1)", + ) + parser.add_argument( + "--orig-edivisive", + type=bool, + default=False, + dest="orig_edivisive", + help="use the original edivisive algorithm with no windowing " + "and weak change points analysis improvements", + ) + + +def analysis_options_from_args(args: argparse.Namespace) -> AnalysisOptions: + conf = AnalysisOptions() + if args.pvalue is not None: + conf.max_pvalue = args.pvalue + if args.magnitude is not None: + conf.min_magnitude = args.magnitude + if args.window is not None: + conf.window_len = args.window + if args.orig_edivisive is not None: + conf.orig_edivisive = args.orig_edivisive + return conf + + +def main(): + logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO) + + parser = argparse.ArgumentParser(description="Hunts performance regressions in Fallout results") + + subparsers = parser.add_subparsers(dest="command") + list_tests_parser = subparsers.add_parser("list-tests", help="list available tests") + list_tests_parser.add_argument("group", help="name of the group of the tests", nargs="*") + + list_metrics_parser = subparsers.add_parser( + "list-metrics", help="list available metrics for a test" + ) + list_metrics_parser.add_argument("test", help="name of the test") + + subparsers.add_parser("list-groups", help="list available groups of tests") + + analyze_parser = subparsers.add_parser( + "analyze", + help="analyze performance test results", + formatter_class=argparse.RawTextHelpFormatter, + ) + analyze_parser.add_argument("tests", help="name of the test or group of the tests", nargs="+") + analyze_parser.add_argument( + "--update-grafana", + help="Update Grafana dashboards with appropriate annotations of change points", + action="store_true", + ) + analyze_parser.add_argument( + "--notify-slack", + help="Send notification containing a summary of change points to given Slack channels", + nargs="+", + ) + analyze_parser.add_argument( + "--cph-report-since", + help="Sets a limit on the date range of the Change Point History reported to Slack. Same syntax as --since.", + metavar="DATE", + dest="cph_report_since", + ) + analyze_parser.add_argument( + "--output", + help="Output format for the generated report.", + choices=list(ReportType), + dest="report_type", + default=ReportType.LOG, + type=ReportType, + ) + setup_data_selector_parser(analyze_parser) + setup_analysis_options_parser(analyze_parser) + + regressions_parser = subparsers.add_parser("regressions", help="find performance regressions") + regressions_parser.add_argument( + "tests", help="name of the test or group of the tests", nargs="+" + ) + setup_data_selector_parser(regressions_parser) + setup_analysis_options_parser(regressions_parser) + + remove_annotations_parser = subparsers.add_parser("remove-annotations") + remove_annotations_parser.add_argument( + "tests", help="name of the test or test group", nargs="*" + ) + remove_annotations_parser.add_argument( + "--force", help="don't ask questions, just do it", dest="force", action="store_true" + ) + + subparsers.add_parser( + "validate", help="validates the tests and metrics defined in the configuration" + ) + + try: + args = parser.parse_args() + conf = config.load_config() + hunter = Hunter(conf) + + if args.command == "list-groups": + hunter.list_test_groups() + + if args.command == "list-tests": + group_names = args.group if args.group else None + hunter.list_tests(group_names) + + if args.command == "list-metrics": + test = hunter.get_test(args.test) + hunter.list_metrics(test) + + if args.command == "analyze": + update_grafana_flag = args.update_grafana + slack_notification_channels = args.notify_slack + slack_cph_since = parse_datetime(args.cph_report_since) + data_selector = data_selector_from_args(args) + options = analysis_options_from_args(args) + report_type = args.report_type + tests = hunter.get_tests(*args.tests) + tests_analyzed_series = {test.name: None for test in tests} + for test in tests: + try: + analyzed_series = hunter.analyze( + test, selector=data_selector, options=options, report_type=report_type + ) + if update_grafana_flag: + if not isinstance(test, GraphiteTestConfig): + raise GrafanaError("Not a Graphite test") + hunter.update_grafana_annotations(test, analyzed_series) + if slack_notification_channels: + tests_analyzed_series[test.name] = analyzed_series + except DataImportError as err: + logging.error(err.message) + except GrafanaError as err: + logging.error( + f"Failed to update grafana dashboards for {test.name}: {err.message}" + ) + if slack_notification_channels: + hunter.notify_slack( + tests_analyzed_series, + selector=data_selector, + channels=slack_notification_channels, + since=slack_cph_since, + ) + + if args.command == "regressions": + data_selector = data_selector_from_args(args) + options = analysis_options_from_args(args) + tests = hunter.get_tests(*args.tests) + regressing_test_count = 0 + errors = 0 + for test in tests: + try: + regressions = hunter.regressions(test, selector=data_selector, options=options) + if regressions: + regressing_test_count += 1 + except HunterError as err: + logging.error(err.message) + errors += 1 + except DataImportError as err: + logging.error(err.message) + errors += 1 + if regressing_test_count == 0: + print("No regressions found!") + elif regressing_test_count == 1: + print("Regressions in 1 test found") + else: + print(f"Regressions in {regressing_test_count} tests found") + if errors > 0: + print("Some tests were skipped due to import / analyze errors. Consult error log.") + + if args.command == "remove-annotations": + if args.tests: + tests = hunter.get_tests(*args.tests) + for test in tests: + hunter.remove_grafana_annotations(test, args.force) + else: + hunter.remove_grafana_annotations(None, args.force) + + if args.command == "validate": + hunter.validate() + + if args.command is None: + parser.print_usage() + + except ConfigError as err: + logging.error(err.message) + exit(1) + except TestConfigError as err: + logging.error(err.message) + exit(1) + except GraphiteError as err: + logging.error(err.message) + exit(1) + except GrafanaError as err: + logging.error(err.message) + exit(1) + except DataImportError as err: + logging.error(err.message) + exit(1) + except HunterError as err: + logging.error(err.message) + exit(1) + except DateFormatError as err: + logging.error(err.message) + exit(1) + except NotificationError as err: + logging.error(err.message) + exit(1) + + +if __name__ == "__main__": + main() diff --git a/hunter/report.py b/hunter/report.py new file mode 100644 index 0000000..f4838c7 --- /dev/null +++ b/hunter/report.py @@ -0,0 +1,85 @@ +from collections import OrderedDict +from enum import Enum, unique +from typing import List + +from tabulate import tabulate + +from hunter.series import ChangePointGroup, Series +from hunter.util import format_timestamp, insert_multiple, remove_common_prefix + + +@unique +class ReportType(Enum): + LOG = "log" + JSON = "json" + + def __str__(self): + return self.value + + +class Report: + __series: Series + __change_points: List[ChangePointGroup] + + def __init__(self, series: Series, change_points: List[ChangePointGroup]): + self.__series = series + self.__change_points = change_points + + @staticmethod + def __column_widths(log: List[str]) -> List[int]: + return [len(c) for c in log[1].split(None)] + + def produce_report(self, test_name: str, report_type: ReportType): + if report_type == ReportType.LOG: + return self.__format_log_annotated(test_name) + elif report_type == ReportType.JSON: + return self.__format_json(test_name) + else: + from hunter.main import HunterError + + raise HunterError(f"Unknown report type: {report_type}") + + def __format_log(self) -> str: + time_column = [format_timestamp(ts) for ts in self.__series.time] + table = {"time": time_column, **self.__series.attributes, **self.__series.data} + metrics = list(self.__series.data.keys()) + headers = list( + OrderedDict.fromkeys( + ["time", *self.__series.attributes, *remove_common_prefix(metrics)] + ) + ) + return tabulate(table, headers=headers) + + def __format_log_annotated(self, test_name: str) -> str: + """Returns test log with change points marked as horizontal lines""" + lines = self.__format_log().split("\n") + col_widths = self.__column_widths(lines) + indexes = [cp.index for cp in self.__change_points] + separators = [] + columns = list( + OrderedDict.fromkeys(["time", *self.__series.attributes, *self.__series.data]) + ) + for cp in self.__change_points: + separator = "" + info = "" + for col_index, col_name in enumerate(columns): + col_width = col_widths[col_index] + change = [c for c in cp.changes if c.metric == col_name] + if change: + change = change[0] + change_percent = change.forward_change_percent() + separator += "·" * col_width + " " + info += f"{change_percent:+.1f}%".rjust(col_width) + " " + else: + separator += " " * (col_width + 2) + info += " " * (col_width + 2) + + separators.append(f"{separator}\n{info}\n{separator}") + + lines = lines[:2] + insert_multiple(lines[2:], separators, indexes) + return "\n".join(lines) + + def __format_json(self, test_name: str) -> str: + import json + + return json.dumps({test_name: [cpg.to_json() for cpg in self.__change_points]}) diff --git a/hunter/series.py b/hunter/series.py new file mode 100644 index 0000000..30bc2a2 --- /dev/null +++ b/hunter/series.py @@ -0,0 +1,300 @@ +import logging +from dataclasses import dataclass +from datetime import datetime +from itertools import groupby +from typing import Dict, Iterable, List, Optional + +import numpy as np + +from hunter.analysis import ( + ComparativeStats, + TTestSignificanceTester, + compute_change_points, + compute_change_points_orig, + fill_missing, +) + + +@dataclass +class AnalysisOptions: + window_len: int + max_pvalue: float + min_magnitude: float + orig_edivisive: bool + + def __init__(self): + self.window_len = 50 + self.max_pvalue = 0.001 + self.min_magnitude = 0.0 + self.orig_edivisive = False + + +@dataclass +class Metric: + direction: int + scale: float + unit: str + + def __init__(self, direction: int = 1, scale: float = 1.0, unit: str = ""): + self.direction = direction + self.scale = scale + self.unit = "" + + +@dataclass +class ChangePoint: + """A change-point for a single metric""" + + metric: str + index: int + time: int + stats: ComparativeStats + + def forward_change_percent(self) -> float: + return self.stats.forward_rel_change() * 100.0 + + def backward_change_percent(self) -> float: + return self.stats.backward_rel_change() * 100.0 + + def magnitude(self): + return self.stats.change_magnitude() + + def to_json(self): + return { + "metric": self.metric, + "forward_change_percent": f"{self.forward_change_percent():.0f}", + } + + +@dataclass +class ChangePointGroup: + """A group of change points on multiple metrics, at the same time""" + + index: int + time: int + prev_time: int + attributes: Dict[str, str] + prev_attributes: Dict[str, str] + changes: List[ChangePoint] + + def to_json(self): + return {"time": self.time, "changes": [cp.to_json() for cp in self.changes]} + + +class Series: + """ + Stores values of interesting metrics of all runs of + a fallout test indexed by a single time variable. + Provides utilities to analyze data e.g. find change points. + """ + + test_name: str + branch: Optional[str] + time: List[int] + metrics: Dict[str, Metric] + attributes: Dict[str, List[str]] + data: Dict[str, List[float]] + + def __init__( + self, + test_name: str, + branch: Optional[str], + time: List[int], + metrics: Dict[str, Metric], + data: Dict[str, List[float]], + attributes: Dict[str, List[str]], + ): + self.test_name = test_name + self.branch = branch + self.time = time + self.metrics = metrics + self.attributes = attributes if attributes else {} + self.data = data + assert all(len(x) == len(time) for x in data.values()) + assert all(len(x) == len(time) for x in attributes.values()) + + def attributes_at(self, index: int) -> Dict[str, str]: + result = {} + for (k, v) in self.attributes.items(): + result[k] = v[index] + return result + + def find_first_not_earlier_than(self, time: datetime) -> Optional[int]: + timestamp = time.timestamp() + for i, t in enumerate(self.time): + if t >= timestamp: + return i + return None + + def find_by_attribute(self, name: str, value: str) -> List[int]: + """Returns the indexes of data points with given attribute value""" + result = [] + for i in range(len(self.time)): + if self.attributes_at(i).get(name) == value: + result.append(i) + return result + + def analyze(self, options: AnalysisOptions = AnalysisOptions()) -> "AnalyzedSeries": + logging.info(f"Computing change points for test {self.test_name}...") + return AnalyzedSeries(self, options) + + +class AnalyzedSeries: + """ + Time series data with computed change points. + """ + + __series: Series + options: AnalysisOptions + change_points: Dict[str, List[ChangePoint]] + change_points_by_time: List[ChangePointGroup] + + def __init__(self, series: Series, options: AnalysisOptions): + self.__series = series + self.options = options + self.change_points = self.__compute_change_points(series, options) + self.change_points_by_time = self.__group_change_points_by_time(series, self.change_points) + + @staticmethod + def __compute_change_points( + series: Series, options: AnalysisOptions + ) -> Dict[str, List[ChangePoint]]: + result = {} + for metric in series.data.keys(): + values = series.data[metric].copy() + fill_missing(values) + if options.orig_edivisive: + change_points = compute_change_points_orig( + values, + max_pvalue=options.max_pvalue, + ) + else: + change_points = compute_change_points( + values, + window_len=options.window_len, + max_pvalue=options.max_pvalue, + min_magnitude=options.min_magnitude, + ) + result[metric] = [] + for c in change_points: + result[metric].append( + ChangePoint( + index=c.index, time=series.time[c.index], metric=metric, stats=c.stats + ) + ) + return result + + @staticmethod + def __group_change_points_by_time( + series: Series, change_points: Dict[str, List[ChangePoint]] + ) -> List[ChangePointGroup]: + changes: List[ChangePoint] = [] + for metric in change_points.keys(): + changes += change_points[metric] + + changes.sort(key=lambda c: c.index) + points = [] + for k, g in groupby(changes, key=lambda c: c.index): + cp = ChangePointGroup( + index=k, + time=series.time[k], + prev_time=series.time[k - 1], + attributes=series.attributes_at(k), + prev_attributes=series.attributes_at(k - 1), + changes=list(g), + ) + points.append(cp) + + return points + + def get_stable_range(self, metric: str, index: int) -> (int, int): + """ + Returns a range of indexes (A, B) such that: + - A is the nearest change point index of the `metric` before or equal given `index`, + or 0 if not found + - B is the nearest change point index of the `metric` after given `index, + or len(self.time) if not found + + It follows that there are no change points between A and B. + """ + begin = 0 + for cp in self.change_points[metric]: + if cp.index > index: + break + begin = cp.index + + end = len(self.time()) + for cp in reversed(self.change_points[metric]): + if cp.index <= index: + break + end = cp.index + + return begin, end + + def test_name(self) -> str: + return self.__series.test_name + + def branch_name(self) -> Optional[str]: + return self.__series.branch + + def len(self) -> int: + return len(self.__series.time) + + def time(self) -> List[int]: + return self.__series.time + + def data(self, metric: str) -> List[float]: + return self.__series.data[metric] + + def attributes(self) -> Iterable[str]: + return self.__series.attributes.keys() + + def attributes_at(self, index: int) -> Dict[str, str]: + return self.__series.attributes_at(index) + + def attribute_values(self, attribute: str) -> List[str]: + return self.__series.attributes[attribute] + + def metric_names(self) -> Iterable[str]: + return self.__series.metrics.keys() + + def metric(self, name: str) -> Metric: + return self.__series.metrics[name] + + +@dataclass +class SeriesComparison: + series_1: AnalyzedSeries + series_2: AnalyzedSeries + index_1: int + index_2: int + stats: Dict[str, ComparativeStats] # keys: metric name + + +def compare( + series_1: AnalyzedSeries, + index_1: Optional[int], + series_2: AnalyzedSeries, + index_2: Optional[int], +) -> SeriesComparison: + + # if index not specified, we want to take the most recent performance + index_1 = index_1 if index_1 is not None else len(series_1.time()) + index_2 = index_2 if index_2 is not None else len(series_2.time()) + metrics = filter(lambda m: m in series_2.metric_names(), series_1.metric_names()) + + tester = TTestSignificanceTester(series_1.options.max_pvalue) + stats = {} + + for metric in metrics: + data_1 = series_1.data(metric) + (begin_1, end_1) = series_1.get_stable_range(metric, index_1) + data_1 = [x for x in data_1[begin_1:end_1] if x is not None] + + data_2 = series_2.data(metric) + (begin_2, end_2) = series_2.get_stable_range(metric, index_2) + data_2 = [x for x in data_2[begin_2:end_2] if x is not None] + + stats[metric] = tester.compare(np.array(data_1), np.array(data_2)) + + return SeriesComparison(series_1, series_2, index_1, index_2, stats) diff --git a/hunter/slack.py b/hunter/slack.py new file mode 100644 index 0000000..649e4a0 --- /dev/null +++ b/hunter/slack.py @@ -0,0 +1,229 @@ +from dataclasses import dataclass +from datetime import datetime +from math import isinf +from typing import Dict, List + +from pytz import UTC +from slack_sdk import WebClient + +from hunter.data_selector import DataSelector +from hunter.series import AnalyzedSeries, ChangePointGroup + + +@dataclass +class NotificationError(Exception): + message: str + + +@dataclass +class SlackConfig: + bot_token: str + + +class SlackNotification: + tests_with_insufficient_data: List[str] + test_analyzed_series: Dict[str, AnalyzedSeries] + since: datetime + + def __init__( + self, + test_analyzed_series: Dict[str, AnalyzedSeries], + data_selection_description: str = None, + since: datetime = None, + ): + self.data_selection_description = data_selection_description + self.since = since + self.tests_with_insufficient_data = [] + self.test_analyzed_series = dict() + for test, series in test_analyzed_series.items(): + if series: + self.test_analyzed_series[test] = series + else: + self.tests_with_insufficient_data.append(test) + + def __init_insufficient_data_dispatch(self): + dispatch = [ + self.__text_block( + "header", + "plain_text", + "Hunter found insufficient data for the following tests :warning:", + ) + ] + if self.data_selection_description: + dispatch.append(self.__data_selection_block()) + return dispatch + + def __init_report_dispatch(self): + dispatch = [self.__header()] + if self.data_selection_description: + dispatch.append(self.__data_selection_block()) + if self.since: + dispatch.append(self.__report_selection_block()) + return dispatch + + def __minimum_dispatch_length(self): + min = 1 # header + if self.data_selection_description: + min += 1 + if self.since: + min += 1 + return min + + # A Slack message can only contain 50 blocks so + # large summaries must be split across messages. + def create_dispatches(self) -> List[List[object]]: + dispatches = [] + cur = self.__init_insufficient_data_dispatch() + for test_name in self.tests_with_insufficient_data: + if len(cur) == 50: + dispatches.append(cur) + cur = self.__init_insufficient_data_dispatch() + cur.append(self.__plain_text_section_block(test_name)) + + if len(cur) > self.__minimum_dispatch_length(): + dispatches.append(cur) + + dates_change_points = {} + for test_name, analyzed_series in self.test_analyzed_series.items(): + for group in analyzed_series.change_points_by_time: + cpg_time = datetime.fromtimestamp(group.time, tz=UTC) + if self.since and cpg_time < self.since: + continue + date_str = self.__datetime_to_str(cpg_time) + if date_str not in dates_change_points: + dates_change_points[date_str] = {} + dates_change_points[date_str][test_name] = group + + cur = self.__init_report_dispatch() + for date in sorted(dates_change_points): + add = [ + self.__block("divider"), + self.__title_block(date), + ] + self.__dates_change_points_summary(dates_change_points[date]) + + if not len(cur) + len(add) < 50: + dispatches.append(cur) + cur = self.__init_report_dispatch() + + cur = cur + add + + if len(cur) > self.__minimum_dispatch_length(): + dispatches.append(cur) + + return dispatches + + @staticmethod + def __datetime_to_str(date: datetime): + return str(date.strftime("%Y-%m-%d %H:%M:%S")) + + @staticmethod + def __block(block_type: str, content: Dict = None): + block = {"type": block_type} + if content: + block.update(content) + return block + + @classmethod + def __text_block(cls, type, text_type, text): + return cls.__block( + type, + content={ + "text": { + "type": text_type, + "text": text, + } + }, + ) + + @classmethod + def __fields_section(cls, fields_text): + def field_block(text): + return {"type": "mrkdwn", "text": text} + + return cls.__block("section", content={"fields": [field_block(t) for t in fields_text]}) + + @classmethod + def __plain_text_section_block(cls, text): + return cls.__text_block("section", "plain_text", text) + + def __header(self): + header_text = ( + "Hunter has detected change points" + if self.test_analyzed_series + else "Hunter did not detect any change points" + ) + return self.__text_block("header", "plain_text", header_text) + + def __data_selection_block(self): + return self.__plain_text_section_block(self.data_selection_description) + + def __report_selection_block(self): + return self.__fields_section(["Report Since", self.__datetime_to_str(self.since)]) + + @classmethod + def __title_block(cls, name): + return cls.__text_block("section", "mrkdwn", f"*{name}*") + + def __dates_change_points_summary(self, test_changes: Dict[str, ChangePointGroup]): + fields = [] + for test_name, group in test_changes.items(): + fields.append(f"*{test_name}*") + summary = "" + for change in group.changes: + change_percent = change.forward_change_percent() + change_emoji = self.__get_change_emoji(test_name, change) + if isinf(change_percent): + report_percent = change_percent + # Avoid rounding decimal change points to zero + elif -5 < change_percent < 5: + report_percent = f"{change_percent:.1f}" + else: + report_percent = round(change_percent) + summary += f"{change_emoji} *{change.metric}*: {report_percent}%\n" + fields.append(summary) + + sections = [] + i = 0 + while i < len(fields): + section_fields = [] + while len(section_fields) < 10 and i < len(fields): + section_fields.append(fields[i]) + i += 1 + sections.append(self.__fields_section(section_fields)) + + return sections + + def __get_change_emoji(self, test_name, change): + metric_direction = self.test_analyzed_series[test_name].metric(change.metric).direction + regression = metric_direction * change.forward_change_percent() + if regression >= 0: + return ":large_blue_circle:" + else: + return ":red_circle:" + + +class SlackNotifier: + __client: WebClient + + def __init__(self, client: WebClient): + self.__client = client + + def notify( + self, + test_analyzed_series: Dict[str, AnalyzedSeries], + selector: DataSelector, + channels: List[str], + since: datetime, + ): + dispatches = SlackNotification( + test_analyzed_series, + data_selection_description=selector.get_selection_description(), + since=since, + ).create_dispatches() + if len(dispatches) > 3: + raise NotificationError( + "Change point summary would produce too many Slack notifications" + ) + for channel in channels: + for blocks in dispatches: + self.__client.chat_postMessage(channel=channel, blocks=blocks) diff --git a/hunter/test_config.py b/hunter/test_config.py new file mode 100644 index 0000000..46a721b --- /dev/null +++ b/hunter/test_config.py @@ -0,0 +1,228 @@ +import os.path +from dataclasses import dataclass +from typing import Dict, List, Optional + +from hunter.csv_options import CsvOptions +from hunter.util import interpolate + + +@dataclass +class TestConfig: + name: str + + def fully_qualified_metric_names(self): + raise NotImplementedError + + +@dataclass +class TestConfigError(Exception): + message: str + + +@dataclass +class CsvMetric: + name: str + direction: int + scale: float + column: str + + +@dataclass +class CsvTestConfig(TestConfig): + file: str + csv_options: CsvOptions + time_column: str + metrics: Dict[str, CsvMetric] + attributes: List[str] + + def __init__( + self, + name: str, + file: str, + csv_options: CsvOptions = CsvOptions(), + time_column: str = "time", + metrics: List[CsvMetric] = None, + attributes: List[str] = None, + ): + self.name = name + self.file = file + self.csv_options = csv_options + self.time_column = time_column + self.metrics = {m.name: m for m in metrics} if metrics else {} + self.attributes = attributes if attributes else {} + + def fully_qualified_metric_names(self) -> List[str]: + return list(self.metrics.keys()) + + +@dataclass +class GraphiteMetric: + name: str + direction: int + scale: float + suffix: str + annotate: List[str] # tags appended to Grafana annotations + + +@dataclass +class GraphiteTestConfig(TestConfig): + prefix: str # location of the performance data for the main branch + branch_prefix: Optional[str] # location of the performance data for the feature branch + metrics: Dict[str, GraphiteMetric] # collection of metrics to fetch + tags: List[str] # tags to query graphite events for this test + annotate: List[str] # annotation tags + + def __init__( + self, + name: str, + prefix: str, + branch_prefix: Optional[str], + metrics: List[GraphiteMetric], + tags: List[str], + annotate: List[str], + ): + self.name = name + self.prefix = prefix + self.branch_prefix = branch_prefix + self.metrics = {m.name: m for m in metrics} + self.tags = tags + self.annotate = annotate + + def get_path(self, branch_name: Optional[str], metric_name: str) -> str: + metric = self.metrics.get(metric_name) + substitutions = {"BRANCH": [branch_name if branch_name else "main"]} + if branch_name and self.branch_prefix: + return interpolate(self.branch_prefix, substitutions)[0] + "." + metric.suffix + elif branch_name: + branch_var_name = "%{BRANCH}" + if branch_var_name not in self.prefix: + raise TestConfigError( + f"Test {self.name} does not support branching. " + f"Please set the `branch_prefix` property or use {branch_var_name} " + f"in the `prefix`." + ) + interpolated = interpolate(self.prefix, substitutions) + return interpolated[0] + "." + metric.suffix + else: + return self.prefix + "." + metric.suffix + + def fully_qualified_metric_names(self): + return [f"{self.prefix}.{m.suffix}" for _, m in self.metrics.items()] + + +@dataclass +class HistoStatTestConfig(TestConfig): + name: str + file: str + + def fully_qualified_metric_names(self): + from hunter.importer import HistoStatImporter + + return HistoStatImporter().fetch_all_metric_names(self) + + +def create_test_config(name: str, config: Dict) -> TestConfig: + """ + Loads properties of a test from a dictionary read from hunter's config file + This dictionary must have the `type` property to determine the type of the test. + Other properties depend on the type. + Currently supported test types are `fallout`, `graphite` and `csv`. + """ + test_type = config.get("type") + if test_type == "csv": + return create_csv_test_config(name, config) + elif test_type == "graphite": + return create_graphite_test_config(name, config) + elif test_type == "histostat": + return create_histostat_test_config(name, config) + elif test_type is None: + raise TestConfigError(f"Test type not set for test {name}") + else: + raise TestConfigError(f"Unknown test type {test_type} for test {name}") + + +def create_csv_test_config(test_name: str, test_info: Dict) -> CsvTestConfig: + csv_options = CsvOptions() + try: + file = test_info["file"] + except KeyError as e: + raise TestConfigError(f"Configuration key not found in test {test_name}: {e.args[0]}") + time_column = test_info.get("time_column", "time") + metrics_info = test_info.get("metrics") + metrics = [] + if isinstance(metrics_info, List): + for name in metrics_info: + metrics.append(CsvMetric(name, 1, 1.0, name)) + elif isinstance(metrics_info, Dict): + for (metric_name, metric_conf) in metrics_info.items(): + metrics.append( + CsvMetric( + name=metric_name, + column=metric_conf.get("column", metric_name), + direction=int(metric_conf.get("direction", "1")), + scale=float(metric_conf.get("scale", "1")), + ) + ) + else: + raise TestConfigError(f"Metrics of the test {test_name} must be a list or dictionary") + + attributes = test_info.get("attributes", []) + if not isinstance(attributes, List): + raise TestConfigError(f"Attributes of the test {test_name} must be a list") + + if test_info.get("csv_options"): + csv_options.delimiter = test_info["csv_options"].get("delimiter", ",") + csv_options.quote_char = test_info["csv_options"].get("quote_char", '"') + return CsvTestConfig( + test_name, + file, + csv_options=csv_options, + time_column=time_column, + metrics=metrics, + attributes=test_info.get("attributes"), + ) + + +def create_graphite_test_config(name: str, test_info: Dict) -> GraphiteTestConfig: + try: + metrics_info = test_info["metrics"] + if not isinstance(metrics_info, Dict): + raise TestConfigError(f"Test {name} metrics field is not a dictionary.") + except KeyError as e: + raise TestConfigError(f"Configuration key not found in test {name}: {e.args[0]}") + + metrics = [] + try: + for (metric_name, metric_conf) in metrics_info.items(): + metrics.append( + GraphiteMetric( + name=metric_name, + suffix=metric_conf["suffix"], + direction=int(metric_conf.get("direction", "1")), + scale=float(metric_conf.get("scale", "1")), + annotate=metric_conf.get("annotate", []), + ) + ) + except KeyError as e: + raise TestConfigError(f"Configuration key not found in {name}.metrics: {e.args[0]}") + + return GraphiteTestConfig( + name, + prefix=test_info["prefix"], + branch_prefix=test_info.get("branch_prefix"), + tags=test_info.get("tags", []), + annotate=test_info.get("annotate", []), + metrics=metrics, + ) + + +def create_histostat_test_config(name: str, test_info: Dict) -> HistoStatTestConfig: + try: + file = test_info["file"] + except KeyError as e: + raise TestConfigError(f"Configuration key not found in test {name}: {e.args[0]}") + if not os.path.exists(file): + raise TestConfigError( + f"Configuration referenced histostat file which does not exist: {file}" + ) + return HistoStatTestConfig(name, file) diff --git a/hunter/util.py b/hunter/util.py new file mode 100644 index 0000000..bd4a3ec --- /dev/null +++ b/hunter/util.py @@ -0,0 +1,232 @@ +import math +import re +import sys +from collections import OrderedDict, deque +from dataclasses import dataclass +from datetime import datetime +from functools import reduce +from itertools import islice +from typing import Dict, List, Optional, Set, TypeVar + +import dateparser +from pytz import UTC + + +def resolution(time: List[int]) -> int: + """ + Graphite has a finite time resolution and the timestamps are rounded + to e.g. full days. This function tries to automatically detect the + level of rounding needed by inspecting the minimum time distance between the + data points. + """ + res = 24 * 3600 + if len(time) < 2: + return res + for (a, b) in sliding_window(time, 2): + if b - a > 0: + res = min(res, b - a) + for t in time: + res = math.gcd(res, t) + return res + + +def round(x: int, divisor: int) -> int: + """Round x to the multiplicity of divisor not greater than x""" + return int(x / divisor) * divisor + + +def remove_prefix(text: str, prefix: str) -> str: + """ + Strips prefix of a string. If the string doesn't start with the given + prefix, returns the original string unchanged. + """ + if text.startswith(prefix): + return text[len(prefix) :] + return text + + +T = TypeVar("T") + + +def merge_sorted(lists: List[List[T]]) -> List[T]: + """ + Merges multiple sorted lists into a sorted list that contains + only distinct items from the source lists. + Current implementation uses sorting, so it is not very efficient for + very large lists. + + Example: + - input: [[0, 1, 2, 4, 5], [0, 1, 2, 3, 5]] + - output: [0, 1, 2, 3, 4, 5] + """ + output = set() + for list_ in lists: + for item in list_: + output.add(item) + + output = list(output) + output.sort() + return output + + +def remove_common_prefix(names: List[str], sep: str = ".") -> List[str]: + """""" + + if len(names) == 0: + return names + + split_names = [name.split(sep) for name in names] + min_len = min(len(components) for components in split_names) + + def are_same(index: int) -> bool: + return all(c[index] == split_names[0][index] for c in split_names) + + prefix_len = 0 + while prefix_len + 1 < min_len and are_same(prefix_len): + prefix_len += 1 + + return [sep.join(components[prefix_len:]) for components in split_names] + + +def eprint(*args, **kwargs): + """Prints to stdandard error""" + print(*args, file=sys.stderr, **kwargs) + + +def format_timestamp(ts: int, millisecond_resolution: Optional[bool] = True) -> str: + if millisecond_resolution: + return datetime.fromtimestamp(ts, tz=UTC).strftime("%Y-%m-%d %H:%M:%S %z") + else: + return datetime.fromtimestamp(ts, tz=UTC).strftime("%Y-%m-%d %H:%M:%S") + + +def insert_multiple(col: List[T], new_items: List[T], positions: List[int]) -> List[T]: + """Inserts an item into a collection at given positions""" + result = [] + positions = set(positions) + new_items_iter = iter(new_items) + for i, x in enumerate(col): + if i in positions: + result.append(next(new_items_iter)) + result.append(x) + return result + + +@dataclass +class DateFormatError(ValueError): + message: str + + +def parse_datetime(date: Optional[str]) -> Optional[datetime]: + """ + Converts a human-readable string into a datetime object. + Accepts many formats and many languages, see dateparser package. + Raises DataFormatError if the input string format hasn't been recognized. + """ + if date is None: + return None + parsed: datetime = dateparser.parse(date, settings={"RETURN_AS_TIMEZONE_AWARE": True}) + if parsed is None: + raise DateFormatError(f"Invalid datetime value: {date}") + return parsed + + +def sliding_window(iterable, size): + """ + Returns an iterator which represents a sliding window over the given + collection. `size` denotes the size of the window. If the collection length + is less than the size, no items are yielded. + """ + iterable = iter(iterable) + window = deque(islice(iterable, size), maxlen=size) + for item in iterable: + yield tuple(window) + window.append(item) + if len(window) == size: + # needed because if iterable was already empty before the `for`, + # then the window would be yielded twice. + yield tuple(window) + + +def is_float(value) -> bool: + """Returns true if value can be converted to a float""" + try: + float(value) + return True + except ValueError: + return False + + +def is_datetime(value) -> bool: + """Returns true if value can be parsed as a date""" + try: + parse_datetime(value) + return True + except DateFormatError: + return False + + +def merge_dicts(d1: Dict, d2: Dict) -> OrderedDict: + """ + Returns a sum of two dictionaries, summing them left-to-right. + Lists and sets under the same key are added. + Dicts with the same key are merged recursively. + Simple values with the same key are overwritten (right dictionary wins). + Maintains the order of the sets. + """ + result = OrderedDict(d1) + for k in d2.keys(): + v1 = d1.get(k) + v2 = d2.get(k) + if v2 is None: + result[k] = v1 + elif v1 is None: + result[k] = v2 + elif isinstance(v1, Dict) and isinstance(v2, Dict): + result[k] = merge_dicts(v1, v2) + elif isinstance(v1, List) and isinstance(v2, List): + result[k] = v1 + v2 + elif isinstance(v1, Set) and isinstance(v2, Set): + result[k] = v1 | v2 + else: + result[k] = v2 + + return result + + +def merge_dict_list(dicts: List[Dict]) -> Dict: + """ + Returns a sum of dictionaries, summing them left-to-right. + Lists and sets under the same key are added. + Dicts with the same key are merged recursively. + Simple values with the same key are overwritten (rightmost dictionary wins). + """ + return reduce(merge_dicts, dicts, {}) + + +def interpolate(s: str, vars: Dict[str, List[str]]) -> List[str]: + """ + Replaces all occurrences of %{VARIABLE} with respective variable values looked up in the + vars dictionary. A variable is allowed to have more than one value assigned – + in this case one result string is returned per each combination of variable values. + + Example: + s = "name:%{NAME}" + vars = { "NAME": ["foo", "bar"] } + result = ["name:foo", "name:bar"] + """ + match = re.search("%{(\\w+)}", s) + if match: + var_name = match.group(1) + values = vars[var_name] + start, end = match.span(0) + before = s[0:start] + after = s[end:] + result = [] + remaining = interpolate(after, vars) + for suffix in remaining: + for v in values: + result.append(before + v + suffix) + return result + else: + return [s] diff --git a/orion.py b/orion.py index f40b19a..341d9c5 100644 --- a/orion.py +++ b/orion.py @@ -8,9 +8,10 @@ import os import click -import yaml import pandas as pd + from fmatch.matcher import Matcher +from utils.orion_funcs import run_hunter_analyze, get_metadata,set_logging, load_config @click.group() @@ -25,7 +26,8 @@ def cli(): @click.option("--config", default="config.yaml", help="Path to the configuration file") @click.option("--output", default="output.csv", help="Path to save the output csv file") @click.option("--debug", is_flag=True, help="log level ") -def orion(config, debug, output): +@click.option("--hunter-analyze",is_flag=True, help="run hunter analyze") +def orion(config, debug, output,hunter_analyze): """Orion is the cli tool to detect regressions over the runs Args: @@ -36,25 +38,22 @@ def orion(config, debug, output): level = logging.DEBUG if debug else logging.INFO logger = logging.getLogger("Orion") logger = set_logging(level, logger) + data = load_config(config,logger) + ES_URL=None + + if "ES_SERVER" in data.keys(): + ES_URL = data['ES_SERVER'] + else: + if 'ES_SERVER' in os.environ: + ES_URL=os.environ.get("ES_SERVER") + else: + logger.error("ES_SERVER environment variable/config variable not set") + sys.exit(1) - if "ES_SERVER" not in os.environ: - logger.error("ES_SERVER environment variable not set") - sys.exit(1) - - try: - with open(config, "r", encoding="utf-8") as file: - data = yaml.safe_load(file) - logger.debug("The %s file has successfully loaded", config) - except FileNotFoundError as e: - logger.error("Config file not found: %s", e) - sys.exit(1) - except Exception as e: # pylint: disable=broad-exception-caught - logger.error("An error occurred: %s", e) - sys.exit(1) for test in data["tests"]: metadata = get_metadata(test) logger.info("The test %s has started", test["name"]) - match = Matcher(index="perf_scale_ci", level=level) + match = Matcher(index="perf_scale_ci", level=level, ES_URL=ES_URL) uuids = match.get_uuid_by_metadata(metadata) if len(uuids) == 0: print("No UUID present for given metadata") @@ -105,55 +104,10 @@ def orion(config, debug, output): lambda left, right: pd.merge(left, right, on="uuid", how="inner"), dataframe_list, ) - match.save_results(merged_df, csv_file_path=output) - - -def get_metadata(test): - """Gets metadata of the run from each test - - Args: - test (dict): test dictionary + match.save_results(merged_df, csv_file_path=output.split(".")[0]+"-"+test['name']+".csv") - Returns: - dict: dictionary of the metadata - """ - metadata_columns = [ - "platform", - "masterNodesType", - "masterNodesCount", - "workerNodesType", - "workerNodesCount", - "benchmark", - "ocpVersion", - "networkType", - "encrypted", - "fips", - "ipsec", - ] - metadata = {key: test[key] for key in metadata_columns if key in test} - metadata["ocpVersion"] = str(metadata["ocpVersion"]) - return metadata - - -def set_logging(level, logger): - """sets log level and format - - Args: - level (_type_): level of the log - logger (_type_): logger object - - Returns: - logging.Logger: a formatted and level set logger - """ - logger.setLevel(level) - handler = logging.StreamHandler(sys.stdout) - handler.setLevel(level) - formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ) - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger + if hunter_analyze: + run_hunter_analyze(merged_df,test) if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index b98bd22..e592666 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,14 +1,44 @@ +astroid==3.0.2 certifi==2023.11.17 +charset-normalizer==3.3.2 click==8.1.7 +dateparser==1.2.0 +DateTime==5.4 +dill==0.3.7 elastic-transport==8.11.0 elasticsearch==8.11.1 elasticsearch7==7.13.0 -fmatch==0.0.2 +expandvars==0.12.0 +fmatch==0.0.3 +gevent==23.9.1 +greenlet==3.0.3 +idna==3.6 +isort==5.13.2 +mccabe==0.7.0 +more-itertools==8.14.0 numpy==1.26.3 +orion @ file:///Users/sboyapal/Documents/Redhat/orion pandas==2.1.4 +platformdirs==4.1.0 +pylint==3.0.3 python-dateutil==2.8.2 pytz==2023.3.post1 PyYAML==6.0.1 +regex==2023.12.25 +requests==2.31.0 +ruamel.yaml==0.18.5 +ruamel.yaml.clib==0.2.8 +scipy==1.12.0 +signal-processing-algorithms==1.3.5 six==1.16.0 +slack_sdk==3.26.2 +structlog==19.2.0 +tabulate==0.9.0 +tomlkit==0.12.3 +typed-ast==1.5.5 +typing-extensions==3.10.0.2 tzdata==2023.4 +tzlocal==5.2 urllib3==1.26.18 +zope.event==5.0 +zope.interface==6.1 diff --git a/setup.py b/setup.py index be07410..52fdafc 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ """ setup.py for orion cli tool """ -from setuptools import setup +from setuptools import setup, find_packages setup( name='orion', @@ -17,6 +17,8 @@ 'orion = orion:orion', ], }, + packages=find_packages(), + package_data={'utils': ['utils.py'],'hunter': ['*.py']}, classifiers=[ 'Programming Language :: Python :: 3', 'License :: OSI Approved :: MIT License', diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/orion_funcs.py b/utils/orion_funcs.py new file mode 100644 index 0000000..ca64cbd --- /dev/null +++ b/utils/orion_funcs.py @@ -0,0 +1,115 @@ +# pylint: disable=cyclic-import +""" +module for all utility functions orion uses +""" +# pylint: disable = import-error + +import logging +import sys + +import yaml +import pandas as pd + +from hunter.report import Report, ReportType +from hunter.series import Metric, Series + + +def run_hunter_analyze(merged_df,test): + """Start hunter analyze function + + Args: + merged_df (Dataframe): merged dataframe of all the metrics + test (dict): test dictionary with the each test information + """ + merged_df["timestamp"] = pd.to_datetime(merged_df["timestamp"]) + merged_df["timestamp"] = merged_df["timestamp"].astype(int) // 10**9 + metrics = {column: Metric(1, 1.0) + for column in merged_df.columns + if column not in ["uuid","timestamp"]} + data = {column: merged_df[column] + for column in merged_df.columns + if column not in ["uuid","timestamp"]} + attributes={column: merged_df[column] for column in merged_df.columns if column in ["uuid"]} + series=Series( + test_name=test["name"], + branch=None, + time=list(merged_df["timestamp"]), + metrics=metrics, + data=data, + attributes=attributes + ) + change_points=series.analyze().change_points_by_time + report=Report(series,change_points) + output = report.produce_report(test_name="test",report_type=ReportType.LOG) + print(output) + + +def get_metadata(test): + """Gets metadata of the run from each test + + Args: + test (dict): test dictionary + + Returns: + dict: dictionary of the metadata + """ + metadata_columns = [ + "platform", + "masterNodesType", + "masterNodesCount", + "workerNodesType", + "workerNodesCount", + "benchmark", + "ocpVersion", + "networkType", + "encrypted", + "fips", + "ipsec", + "infraNodesCount" + ] + metadata = {key: test[key] for key in metadata_columns if key in test} + metadata["ocpVersion"] = str(metadata["ocpVersion"]) + return metadata + + +def set_logging(level, logger): + """sets log level and format + + Args: + level (_type_): level of the log + logger (_type_): logger object + + Returns: + logging.Logger: a formatted and level set logger + """ + logger.setLevel(level) + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(level) + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + +def load_config(config,logger): + """Loads config file + + Args: + config (str): path to config file + logger (Logger): logger + + Returns: + dict: dictionary of the config file + """ + try: + with open(config, "r", encoding="utf-8") as file: + data = yaml.safe_load(file) + logger.debug("The %s file has successfully loaded", config) + except FileNotFoundError as e: + logger.error("Config file not found: %s", e) + sys.exit(1) + except Exception as e: # pylint: disable=broad-exception-caught + logger.error("An error occurred: %s", e) + sys.exit(1) + return data