diff --git a/hunter/__init__.py b/hunter/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/hunter/analysis.py b/hunter/analysis.py deleted file mode 100644 index deb4773..0000000 --- a/hunter/analysis.py +++ /dev/null @@ -1,241 +0,0 @@ -from dataclasses import dataclass -from typing import Iterable, List, Reversible - -import numpy as np -from scipy.stats import ttest_ind_from_stats -from signal_processing_algorithms.e_divisive import EDivisive -from signal_processing_algorithms.e_divisive.base import SignificanceTester -from signal_processing_algorithms.e_divisive.calculators import cext_calculator -from signal_processing_algorithms.e_divisive.change_points import EDivisiveChangePoint -from signal_processing_algorithms.e_divisive.significance_test import ( - QHatPermutationsSignificanceTester, -) - - -@dataclass -class ComparativeStats: - """ - Keeps statistics of two series of data and the probability both series - have the same distribution. - """ - - mean_1: float - mean_2: float - std_1: float - std_2: float - pvalue: float - - def forward_rel_change(self): - """Relative change from left to right""" - return self.mean_2 / self.mean_1 - 1.0 - - def backward_rel_change(self): - """Relative change from right to left""" - return self.mean_1 / self.mean_2 - 1.0 - - def change_magnitude(self): - """Maximum of absolutes of rel_change and rel_change_reversed""" - return max(abs(self.forward_rel_change()), abs(self.backward_rel_change())) - - -@dataclass -class ChangePoint: - index: int - stats: ComparativeStats - - -class ExtendedSignificanceTester(SignificanceTester): - """ - Adds capability of exposing the means and deviations of both sides of the split - and the pvalue (strength) of the split. - """ - - pvalue: float - - def change_point(self, index: int, series: np.ndarray, windows: Iterable[int]) -> ChangePoint: - """ - Computes properties of the change point if the change point gets - inserted at the given index into the series array. - """ - ... - - def compare(self, left: np.ndarray, right: np.ndarray) -> ComparativeStats: - """ - Compares two sets of points for similarity / difference. - Computes basic stats and probability both sets come from the same distribution/ - """ - ... - - @staticmethod - def find_window(candidate: int, window_endpoints: Reversible[int]) -> (int, int): - start: int = next((x for x in reversed(window_endpoints) if x < candidate), None) - end: int = next((x for x in window_endpoints if x > candidate), None) - return start, end - - def is_significant( - self, candidate: EDivisiveChangePoint, series: np.ndarray, windows: Iterable[int] - ) -> bool: - try: - cp = self.change_point(candidate.index, series, windows) - return cp.stats.pvalue <= self.pvalue - except ValueError: - return False - - -class TTestSignificanceTester(ExtendedSignificanceTester): - """ - Uses two-sided Student's T-test to decide if a candidate change point - splits the series into pieces that are significantly different from each other. - This test is good if the data between the change points have normal distribution. - It works well even with tiny numbers of points (<10). - """ - - def __init__(self, pvalue: float): - self.pvalue = pvalue - - def change_point( - self, index: int, series: np.ndarray, window_endpoints: Reversible[int] - ) -> ChangePoint: - - (start, end) = self.find_window(index, window_endpoints) - left = series[start:index] - right = series[index:end] - stats = self.compare(left, right) - return ChangePoint(index, stats) - - def compare(self, left: np.ndarray, right: np.ndarray) -> ComparativeStats: - if len(left) == 0 or len(right) == 0: - raise ValueError - - mean_l = np.mean(left) - mean_r = np.mean(right) - std_l = np.std(left) if len(left) >= 2 else 0.0 - std_r = np.std(right) if len(right) >= 2 else 0.0 - - if len(left) + len(right) > 2: - (_, p) = ttest_ind_from_stats( - mean_l, std_l, len(left), mean_r, std_r, len(right), alternative="two-sided" - ) - else: - p = 1.0 - return ComparativeStats(mean_l, mean_r, std_l, std_r, p) - - -def fill_missing(data: List[float]): - """ - Forward-fills None occurrences with nearest previous non-None values. - Initial None values are back-filled with the nearest future non-None value. - """ - prev = None - for i in range(len(data)): - if data[i] is None and prev is not None: - data[i] = prev - prev = data[i] - - prev = None - for i in reversed(range(len(data))): - if data[i] is None and prev is not None: - data[i] = prev - prev = data[i] - - -def merge( - change_points: List[ChangePoint], series: np.array, max_pvalue: float, min_magnitude: float -) -> List[ChangePoint]: - """ - Removes weak change points recursively going bottom-up - until we get only high-quality change points - that meet the P-value and rel_change criteria. - - Parameters: - :param max_pvalue: maximum accepted pvalue - :param min_magnitude: minimum accepted relative change - """ - - tester = TTestSignificanceTester(max_pvalue) - while change_points: - - # Select the change point with weakest unacceptable P-value - # If all points have acceptable P-values, select the change-point with - # the least relative change: - weakest_cp = max(change_points, key=lambda c: c.stats.pvalue) - if weakest_cp.stats.pvalue < max_pvalue: - weakest_cp = min(change_points, key=lambda c: c.stats.change_magnitude()) - if weakest_cp.stats.change_magnitude() > min_magnitude: - return change_points - - # Remove the point from the list - weakest_cp_index = change_points.index(weakest_cp) - del change_points[weakest_cp_index] - - # We can't continue yet, because by removing a change_point - # the adjacent change points changed their properties. - # Recompute the adjacent change point stats: - window_endpoints = [0] + [cp.index for cp in change_points] + [len(series)] - - def recompute(index: int): - if index < 0 or index >= len(change_points): - return - cp = change_points[index] - change_points[index] = tester.change_point(cp.index, series, window_endpoints) - - recompute(weakest_cp_index) - recompute(weakest_cp_index + 1) - - return change_points - - -def split(series: np.array, window_len: int = 30, max_pvalue: float = 0.001) -> List[ChangePoint]: - """ - Finds change points by splitting the series top-down. - - Internally it uses the EDivisive algorithm from mongodb-signal-processing - that recursively splits the series in a way to maximize some measure of - dissimilarity (denoted qhat) between the chunks. - Splitting happens as long as the dissimilarity is statistically significant. - - Unfortunately this algorithms has a few downsides: - - the complexity is O(n^2), where n is the length of the series - - if there are too many change points and too much data, the change points in the middle - of the series may be missed - - This function tries to address these issues by invoking EDivisive on smaller - chunks (windows) of the input data instead of the full series and then merging the results. - Each window should be large enough to contain enough points to detect a change-point. - Consecutive windows overlap so that we won't miss changes happening between them. - """ - assert "Window length must be at least 2", window_len >= 2 - start = 0 - step = int(window_len / 2) - indexes = [] - tester = TTestSignificanceTester(max_pvalue) - while start < len(series): - end = min(start + window_len, len(series)) - calculator = cext_calculator - algo = EDivisive(seed=None, calculator=calculator, significance_tester=tester) - pts = algo.get_change_points(series[start:end]) - new_indexes = [p.index + start for p in pts] - new_indexes.sort() - last_new_change_point_index = next(iter(new_indexes[-1:]), 0) - start = max(last_new_change_point_index, start + step) - indexes += new_indexes - - window_endpoints = [0] + indexes + [len(series)] - return [tester.change_point(i, series, window_endpoints) for i in indexes] - - -def compute_change_points_orig(series: np.array, max_pvalue: float = 0.001) -> List[ChangePoint]: - calculator = cext_calculator - tester = QHatPermutationsSignificanceTester(calculator, pvalue=max_pvalue, permutations=100) - algo = EDivisive(seed=None, calculator=calculator, significance_tester=tester) - pts = algo.get_change_points(series) - indexes = [p.index for p in pts] - window_endpoints = [0] + indexes + [len(series)] - return [tester.change_point(i, series, window_endpoints) for i in indexes] - - -def compute_change_points( - series: np.array, window_len: int = 50, max_pvalue: float = 0.001, min_magnitude: float = 0.05 -) -> List[ChangePoint]: - change_points = split(series, window_len, max_pvalue * 10) - return merge(change_points, series, max_pvalue, min_magnitude) diff --git a/hunter/attributes.py b/hunter/attributes.py deleted file mode 100644 index c69165a..0000000 --- a/hunter/attributes.py +++ /dev/null @@ -1,45 +0,0 @@ -from datetime import datetime -from typing import Dict - -from hunter.util import format_timestamp - - -def form_hyperlink_html_str(display_text: str, url: str) -> str: - return f'
  • {display_text}
  • ' - - -def form_created_msg_html_str() -> str: - formatted_time = format_timestamp(int(datetime.now().timestamp()), False) - return f"

    Created by Hunter: {formatted_time}

    " - - -def get_back_links(attributes: Dict[str, str]) -> str: - """ - This method is responsible for providing an HTML string corresponding to Fallout and GitHub - links associated to the attributes of a Fallout-based test run. - - - If no GitHub commit or branch data is provided in the attributes dict, no hyperlink data - associated to GitHub project repository is provided in the returned HTML. - """ - - # grabbing test runner related data (e.g. Fallout) - html_str = "" - if attributes.get("test_url"): - html_str = form_hyperlink_html_str(display_text="Test", url=attributes.get("test_url")) - - if attributes.get("run_url"): - html_str = form_hyperlink_html_str(display_text="Test run", url=attributes.get("run_url")) - - # grabbing Github project repository related data - # TODO: Will we be responsible for handling versioning from repositories aside from bdp? - repo_url = attributes.get("repo_url", "http://github.com/riptano/bdp") - if attributes.get("commit"): - html_str += form_hyperlink_html_str( - display_text="Git commit", url=f"{repo_url}/commit/{attributes.get('commit')}" - ) - elif attributes.get("branch"): - html_str += form_hyperlink_html_str( - display_text="Git branch", url=f"{repo_url}/tree/{attributes.get('branch')}" - ) - html_str += form_created_msg_html_str() - return html_str diff --git a/hunter/config.py b/hunter/config.py deleted file mode 100644 index 323acde..0000000 --- a/hunter/config.py +++ /dev/null @@ -1,150 +0,0 @@ -import os -from dataclasses import dataclass -from pathlib import Path -from typing import Dict, List, Optional - -from expandvars import expandvars -from ruamel.yaml import YAML - -from hunter.grafana import GrafanaConfig -from hunter.graphite import GraphiteConfig -from hunter.slack import SlackConfig -from hunter.test_config import TestConfig, create_test_config -from hunter.util import merge_dict_list - - -@dataclass -class Config: - graphite: Optional[GraphiteConfig] - grafana: Optional[GrafanaConfig] - tests: Dict[str, TestConfig] - test_groups: Dict[str, List[TestConfig]] - slack: SlackConfig - - -@dataclass -class ConfigError(Exception): - message: str - - -def load_templates(config: Dict) -> Dict[str, Dict]: - templates = config.get("templates", {}) - if not isinstance(templates, Dict): - raise ConfigError("Property `templates` is not a dictionary") - return templates - - -def load_tests(config: Dict, templates: Dict) -> Dict[str, TestConfig]: - tests = config.get("tests", {}) - if not isinstance(tests, Dict): - raise ConfigError("Property `tests` is not a dictionary") - - result = {} - for (test_name, test_config) in tests.items(): - template_names = test_config.get("inherit", []) - if not isinstance(template_names, List): - template_names = [templates] - try: - template_list = [templates[name] for name in template_names] - except KeyError as e: - raise ConfigError(f"Template {e.args[0]} referenced in test {test_name} not found") - test_config = merge_dict_list(template_list + [test_config]) - result[test_name] = create_test_config(test_name, test_config) - - return result - - -def load_test_groups(config: Dict, tests: Dict[str, TestConfig]) -> Dict[str, List[TestConfig]]: - groups = config.get("test_groups", {}) - if not isinstance(groups, Dict): - raise ConfigError("Property `test_groups` is not a dictionary") - - result = {} - for (group_name, test_names) in groups.items(): - test_list = [] - if not isinstance(test_names, List): - raise ConfigError(f"Test group {group_name} must be a list") - for test_name in test_names: - test_config = tests.get(test_name) - if test_config is None: - raise ConfigError(f"Test {test_name} referenced by group {group_name} not found.") - test_list.append(test_config) - - result[group_name] = test_list - - return result - - -def load_config_from(config_file: Path) -> Config: - """Loads config from the specified location""" - try: - content = expandvars(config_file.read_text()) - yaml = YAML(typ="safe") - config = yaml.load(content) - """ - if Grafana configs not explicitly set in yaml file, default to same as Graphite - server at port 3000 - """ - graphite_config = None - grafana_config = None - if "graphite" in config: - if "url" not in config["graphite"]: - raise ValueError("graphite.url") - graphite_config = GraphiteConfig(url=config["graphite"]["url"]) - if config.get("grafana") is None: - config["grafana"] = {} - config["grafana"]["url"] = f"{config['graphite']['url'].strip('/')}:3000/" - config["grafana"]["user"] = os.environ.get("GRAFANA_USER", "admin") - config["grafana"]["password"] = os.environ.get("GRAFANA_PASSWORD", "admin") - grafana_config = GrafanaConfig( - url=config["grafana"]["url"], - user=config["grafana"]["user"], - password=config["grafana"]["password"], - ) - - slack_config = None - if config.get("slack") is not None: - if not config["slack"]["token"]: - raise ValueError("slack.token") - slack_config = SlackConfig( - bot_token=config["slack"]["token"], - ) - - templates = load_templates(config) - tests = load_tests(config, templates) - groups = load_test_groups(config, tests) - - return Config( - graphite=graphite_config, - grafana=grafana_config, - slack=slack_config, - tests=tests, - test_groups=groups, - ) - - except FileNotFoundError as e: - raise ConfigError(f"Configuration file not found: {e.filename}") - except KeyError as e: - raise ConfigError(f"Configuration key not found: {e.args[0]}") - except ValueError as e: - raise ConfigError(f"Value for configuration key not found: {e.args[0]}") - - -def load_config() -> Config: - """Loads config from one of the default locations""" - - env_config_path = os.environ.get("HUNTER_CONFIG") - if env_config_path: - return load_config_from(Path(env_config_path).absolute()) - - paths = [ - Path().home() / ".hunter/hunter.yaml", - Path().home() / ".hunter/conf.yaml", - Path(os.path.realpath(__file__)).parent / "resources/hunter.yaml", - ] - - for p in paths: - if p.exists(): - return load_config_from(p) - - raise ConfigError(f"No configuration file found. Checked $HUNTER_CONFIG and searched: {paths}") diff --git a/hunter/csv_options.py b/hunter/csv_options.py deleted file mode 100644 index c383347..0000000 --- a/hunter/csv_options.py +++ /dev/null @@ -1,18 +0,0 @@ -import enum -from dataclasses import dataclass - - -@dataclass -class CsvOptions: - delimiter: str - quote_char: str - - def __init__(self): - self.delimiter = "," - self.quote_char = '"' - - -class CsvColumnType(enum.Enum): - Numeric = 1 - DateTime = 2 - Str = 3 diff --git a/hunter/data_selector.py b/hunter/data_selector.py deleted file mode 100644 index ca6c508..0000000 --- a/hunter/data_selector.py +++ /dev/null @@ -1,38 +0,0 @@ -import sys -from dataclasses import dataclass -from datetime import datetime, timedelta -from typing import List, Optional - -import pytz - - -@dataclass -class DataSelector: - branch: Optional[str] - metrics: Optional[List[str]] - attributes: Optional[List[str]] - last_n_points: int - since_commit: Optional[str] - since_version: Optional[str] - since_time: datetime - until_commit: Optional[str] - until_version: Optional[str] - until_time: datetime - - def __init__(self): - self.branch = None - self.metrics = None - self.attributes = None - self.last_n_points = sys.maxsize - self.since_commit = None - self.since_version = None - self.since_time = datetime.now(tz=pytz.UTC) - timedelta(days=365) - self.until_commit = None - self.until_version = None - self.until_time = datetime.now(tz=pytz.UTC) - - def get_selection_description(self): - attributes = "\n".join( - [f"{a}: {v}" for a, v in self.__dict__.items() if not a.startswith("__") and v] - ) - return f"Data Selection\n{attributes}" diff --git a/hunter/grafana.py b/hunter/grafana.py deleted file mode 100644 index a643c9a..0000000 --- a/hunter/grafana.py +++ /dev/null @@ -1,106 +0,0 @@ -from dataclasses import asdict, dataclass -from datetime import datetime -from typing import List, Optional - -import requests -from pytz import UTC -from requests.exceptions import HTTPError - - -@dataclass -class GrafanaConfig: - url: str - user: str - password: str - - -@dataclass -class GrafanaError(Exception): - message: str - - -@dataclass -class Annotation: - id: Optional[int] - time: datetime - text: str - tags: List[str] - - -class Grafana: - url: str - __user: str - __password: str - - def __init__(self, grafana_conf: GrafanaConfig): - self.url = grafana_conf.url - self.__user = grafana_conf.user - self.__password = grafana_conf.password - - def fetch_annotations( - self, start: Optional[datetime], end: Optional[datetime], tags: List[str] = None - ) -> List[Annotation]: - """ - Reference: - - https://grafana.com/docs/grafana/latest/http_api/annotations/#find-annotations - """ - url = f"{self.url}api/annotations" - query_parameters = {} - if start is not None: - query_parameters["from"] = int(start.timestamp() * 1000) - if end is not None: - query_parameters["to"] = int(end.timestamp() * 1000) - if tags is not None: - query_parameters["tags"] = tags - try: - response = requests.get( - url=url, params=query_parameters, auth=(self.__user, self.__password) - ) - response.raise_for_status() - json = response.json() - annotations = [] - for annotation_json in json: - annotation = Annotation( - id=annotation_json["id"], - time=datetime.fromtimestamp(float(annotation_json["time"]) / 1000, tz=UTC), - text=annotation_json["text"], - tags=annotation_json["tags"], - ) - annotations.append(annotation) - - return annotations - - except KeyError as err: - raise GrafanaError(f"Missing field {err.args[0]}") - except HTTPError as err: - raise GrafanaError(str(err)) - - def delete_annotations(self, *ids: int): - """ - Reference: - - https://grafana.com/docs/grafana/latest/http_api/annotations/#delete-annotation-by-id - """ - url = f"{self.url}api/annotations" - for annotation_id in ids: - annotation_url = f"{url}/{annotation_id}" - try: - response = requests.delete(url=annotation_url, auth=(self.__user, self.__password)) - response.raise_for_status() - except HTTPError as err: - raise GrafanaError(str(err)) - - def create_annotations(self, *annotations: Annotation): - """ - Reference: - - https://grafana.com/docs/grafana/latest/http_api/annotations/#create-annotation - """ - try: - url = f"{self.url}api/annotations" - for annotation in annotations: - data = asdict(annotation) - data["time"] = int(annotation.time.timestamp() * 1000) - del data["id"] - response = requests.post(url=url, data=data, auth=(self.__user, self.__password)) - response.raise_for_status() - except HTTPError as err: - raise GrafanaError(str(err)) diff --git a/hunter/graphite.py b/hunter/graphite.py deleted file mode 100644 index 5d3a9a2..0000000 --- a/hunter/graphite.py +++ /dev/null @@ -1,240 +0,0 @@ -import ast -import json -import urllib.request -from dataclasses import dataclass -from datetime import datetime -from logging import info -from typing import Dict, Iterable, List, Optional - -from hunter.data_selector import DataSelector -from hunter.util import parse_datetime - - -@dataclass -class GraphiteConfig: - url: str - - -@dataclass -class DataPoint: - time: int - value: float - - -@dataclass -class TimeSeries: - path: str - points: List[DataPoint] - - -def decode_graphite_datapoints(series: Dict[str, List[List[float]]]) -> List[DataPoint]: - - points = series["datapoints"] - return [DataPoint(int(p[1]), p[0]) for p in points if p[0] is not None] - - -def to_graphite_time(time: datetime, default: str) -> str: - """ - Note that millissecond-level precision matters when trying to fetch events in a given time - range, hence opting for this over time.strftime("%H:%M_%Y%m%d") - """ - if time is not None: - return str(int(time.timestamp())) - else: - return default - - -@dataclass -class GraphiteError(IOError): - message: str - - -@dataclass -class GraphiteEvent: - test_owner: str - test_name: str - run_id: str - status: str - start_time: datetime - pub_time: datetime - end_time: datetime - version: Optional[str] - branch: Optional[str] - commit: Optional[str] - - def __init__( - self, - pub_time: int, - test_owner: str, - test_name: str, - run_id: str, - status: str, - start_time: int, - end_time: int, - version: Optional[str], - branch: Optional[str], - commit: Optional[str], - ): - self.test_owner = test_owner - self.test_name = test_name - self.run_id = run_id - self.status = status - self.start_time = parse_datetime(str(start_time)) - self.pub_time = parse_datetime(str(pub_time)) - self.end_time = parse_datetime(str(end_time)) - if len(version) == 0 or version == "null": - self.version = None - else: - self.version = version - if len(branch) == 0 or branch == "null": - self.branch = None - else: - self.branch = branch - if len(commit) == 0 or commit == "null": - self.commit = None - else: - self.commit = commit - - -def compress_target_paths(paths: List[str]) -> List[str]: - """Uses the alternative syntax to reduce the total length of the query""" - result = [] - prefix_map = {} - for p in paths: - components = p.rsplit(".", 1) - if len(components) == 1: - result.append(p) - continue - - prefix = components[0] - suffix = components[1] - if prefix not in prefix_map: - prefix_map[prefix] = [suffix] - else: - prefix_map[prefix].append(suffix) - - for prefix, suffixes in prefix_map.items(): - if len(suffixes) > 1: - result.append(prefix + ".{" + ",".join(suffixes) + "}") - else: - result.append(prefix + "." + suffixes[0]) - - return result - - -class Graphite: - __url: str - __url_limit: int # max URL length used when requesting metrics from Graphite - - def __init__(self, conf: GraphiteConfig): - self.__url = conf.url - self.__url_limit = 4094 - - def fetch_events( - self, - tags: Iterable[str], - from_time: Optional[datetime] = None, - until_time: Optional[datetime] = None, - ) -> List[GraphiteEvent]: - """ - Returns 'Performance Test' events that match all of - the following criteria: - - all tags passed in match - - published between given from_time and until_time (both bounds inclusive) - - References: - - Graphite events REST API: https://graphite.readthedocs.io/en/stable/events.html - - Haxx: https://github.com/riptano/haxx/pull/588 - """ - try: - from_time = to_graphite_time(from_time, "-365d") - until_time = to_graphite_time(until_time, "now") - tags_str = "+".join(tags) - - url = ( - f"{self.__url}events/get_data" - f"?tags={tags_str}" - f"&from={from_time}" - f"&until={until_time}" - f"&set=intersection" - ) - data_str = urllib.request.urlopen(url).read() - data_as_json = json.loads(data_str) - return [ - GraphiteEvent(event.get("when"), **ast.literal_eval(event.get("data"))) - for event in data_as_json - if event.get("what") == "Performance Test" - ] - - except IOError as e: - raise GraphiteError(f"Failed to fetch Graphite events: {str(e)}") - - def fetch_events_with_matching_time_option( - self, tags: Iterable[str], commit: Optional[str], version: Optional[str] - ) -> List[GraphiteEvent]: - events = [] - if commit is not None: - events = list(filter(lambda e: e.commit == commit, self.fetch_events(tags))) - elif version is not None: - tags = [*tags, version] - events = self.fetch_events(tags) - return events - - def fetch_data(self, target_paths: List[str], selector: DataSelector) -> List[TimeSeries]: - """ - Connects to Graphite server and downloads interesting series with the - given prefix. The series to be downloaded are picked from SUFFIXES list. - """ - try: - info("Fetching data from Graphite...") - result = [] - - from_time = to_graphite_time(selector.since_time, "-365d") - until_time = to_graphite_time(selector.until_time, "now") - target_paths = compress_target_paths(target_paths) - targets = "" - for path in target_paths: - targets += f"target={path}&" - targets = targets.strip("&") - - url = ( - f"{self.__url}render" - f"?{targets}" - f"&format=json" - f"&from={from_time}" - f"&until={until_time}" - ) - - data_str = urllib.request.urlopen(url).read() - data_as_json = json.loads(data_str) - - for s in data_as_json: - series = TimeSeries(path=s["target"], points=decode_graphite_datapoints(s)) - result.append(series) - - return result - - except IOError as err: - raise GraphiteError(f"Failed to fetch data from Graphite: {str(err)}") - - def fetch_metric_paths(self, prefix: str, paths: Optional[List[str]] = None) -> List[str]: - """ - Provided a valid Graphite metric prefix, this method will retrieve all corresponding metric paths - Reference: - - https://graphite-api.readthedocs.io/en/latest/api.html - """ - if paths is None: - paths = [] - try: - url = f"{self.__url}metrics/find?query={prefix}" - data_str = urllib.request.urlopen(url).read() - data_as_json = json.loads(data_str) - for result in data_as_json: - curr_path = result["id"] - if result["leaf"]: - paths.append(curr_path) - else: - paths = self.fetch_metric_paths(f"{curr_path}.*", paths) - return sorted(paths) - except IOError as err: - raise GraphiteError(f"Failed to fetch metric path from Graphite: {str(err)}") diff --git a/hunter/importer.py b/hunter/importer.py deleted file mode 100644 index f11392c..0000000 --- a/hunter/importer.py +++ /dev/null @@ -1,469 +0,0 @@ -import csv -from collections import OrderedDict -from contextlib import contextmanager -from dataclasses import dataclass -from datetime import datetime, timedelta -from pathlib import Path -from typing import Dict, List, Optional - -from hunter.config import Config -from hunter.data_selector import DataSelector -from hunter.graphite import DataPoint, Graphite, GraphiteError -from hunter.series import Metric, Series -from hunter.test_config import ( - CsvMetric, - CsvTestConfig, - GraphiteTestConfig, - HistoStatTestConfig, - TestConfig, -) -from hunter.util import ( - DateFormatError, - format_timestamp, - merge_sorted, - parse_datetime, - resolution, - round, -) - - -@dataclass -class DataImportError(IOError): - message: str - - -class Importer: - """ - An Importer is responsible for importing performance metric data + metadata - from some specified data source, and creating an appropriate PerformanceLog object - from this imported data. - """ - - def fetch_data(self, test: TestConfig, selector: DataSelector = DataSelector()) -> Series: - raise NotImplementedError - - def fetch_all_metric_names(self, test: TestConfig) -> List[str]: - raise NotImplementedError - - -class GraphiteImporter(Importer): - graphite: Graphite - - def __init__(self, graphite: Graphite): - self.graphite = graphite - - def fetch_data(self, test: TestConfig, selector: DataSelector = DataSelector()) -> Series: - """ - Loads test data from graphite. - Converts raw timeseries data into a columnar format, - where each metric is represented by a list of floats. All metrics - have aligned indexes - that is values["foo"][3] applies to the - the same time point as values["bar"][3]. The time points are extracted - to a separate column. - """ - if not isinstance(test, GraphiteTestConfig): - raise ValueError("Expected GraphiteTestConfig") - - try: - attributes = test.tags.copy() - if selector.branch: - attributes += [selector.branch] - - # if the user has specified since_ and/or until_, - # we need to attempt to extract a timestamp from appropriate Graphite events, and - # update selector.since_time and selector.until_time, respectively - since_events = self.graphite.fetch_events_with_matching_time_option( - attributes, selector.since_commit, selector.since_version - ) - if len(since_events) > 0: - # since timestamps of metrics get rounded down, in order to include these, we need to - # - round down the event's pub_time - # - subtract a small amount of time (Graphite does not appear to include the left-hand - # endpoint for a time range) - rounded_time = round( - int(since_events[-1].pub_time.timestamp()), - resolution([int(since_events[-1].pub_time.timestamp())]), - ) - selector.since_time = parse_datetime(str(rounded_time)) - timedelta(milliseconds=1) - - until_events = self.graphite.fetch_events_with_matching_time_option( - attributes, selector.until_commit, selector.until_version - ) - if len(until_events) > 0: - selector.until_time = until_events[0].pub_time - - if selector.since_time.timestamp() > selector.until_time.timestamp(): - raise DataImportError( - f"Invalid time range: [" - f"{format_timestamp(int(selector.since_time.timestamp()))}, " - f"{format_timestamp(int(selector.until_time.timestamp()))}]" - ) - - metrics = test.metrics.values() - if selector.metrics is not None: - metrics = [m for m in metrics if m.name in selector.metrics] - path_to_metric = {test.get_path(selector.branch, m.name): m for m in metrics} - targets = [test.get_path(selector.branch, m.name) for m in metrics] - - graphite_result = self.graphite.fetch_data(targets, selector) - if not graphite_result: - raise DataImportError(f"No timeseries found in Graphite for test {test.name}.") - - times = [[x.time for x in series.points] for series in graphite_result] - time: List[int] = merge_sorted(times)[-selector.last_n_points :] - - def column(series: List[DataPoint]) -> List[float]: - value_by_time = dict([(x.time, x.value) for x in series]) - return [value_by_time.get(t) for t in time] - - # Keep order of the keys in the result values the same as order of metrics - values = OrderedDict() - for m in metrics: - values[m.name] = [] - for ts in graphite_result: - values[path_to_metric[ts.path].name] = column(ts.points) - for m in metrics: - if len(values[m.name]) == 0: - del values[m.name] - metrics = [m for m in metrics if m.name in values.keys()] - - events = self.graphite.fetch_events( - attributes, selector.since_time, selector.until_time - ) - time_resolution = resolution(time) - events_by_time = {} - for e in events: - events_by_time[round(int(e.pub_time.timestamp()), time_resolution)] = e - - run_ids = [] - commits = [] - versions = [] - branches = [] - for t in time: - event = events_by_time.get(t) - run_ids.append(event.run_id if event is not None else None) - commits.append(event.commit if event is not None else None) - versions.append(event.version if event is not None else None) - branches.append(event.branch if event is not None else None) - - attributes = { - "run": run_ids, - "branch": branches, - "version": versions, - "commit": commits, - } - if selector.attributes is not None: - attributes = {a: attributes[a] for a in selector.attributes} - - metrics = {m.name: Metric(m.direction, m.scale) for m in metrics} - return Series( - test.name, - branch=selector.branch, - time=time, - metrics=metrics, - data=values, - attributes=attributes, - ) - - except GraphiteError as e: - raise DataImportError(f"Failed to import test {test.name}: {e.message}") - - def fetch_all_metric_names(self, test_conf: GraphiteTestConfig) -> List[str]: - return [m for m in test_conf.metrics.keys()] - - -class CsvImporter(Importer): - @staticmethod - def check_row_len(headers, row): - if len(row) < len(headers): - raise DataImportError( - "Number of values in the row does not match " - "number of columns in the table header: " + str(row) - ) - - @staticmethod - def check_has_column(column: str, headers: List[str]): - if column not in headers: - raise DataImportError("Column not found: " + column) - - @staticmethod - def __selected_metrics( - defined_metrics: Dict[str, CsvMetric], selected_metrics: Optional[List[str]] - ) -> Dict[str, CsvMetric]: - - if selected_metrics is not None: - return {name: defined_metrics[name] for name in selected_metrics} - else: - return defined_metrics - - def fetch_data(self, test_conf: TestConfig, selector: DataSelector = DataSelector()) -> Series: - - if not isinstance(test_conf, CsvTestConfig): - raise ValueError("Expected CsvTestConfig") - - if selector.branch: - raise ValueError("CSV tests don't support branching yet") - - since_time = selector.since_time - until_time = selector.until_time - file = Path(test_conf.file) - - if since_time.timestamp() > until_time.timestamp(): - raise DataImportError( - f"Invalid time range: [" - f"{format_timestamp(int(since_time.timestamp()))}, " - f"{format_timestamp(int(until_time.timestamp()))}]" - ) - - try: - with open(file, newline="") as csv_file: - reader = csv.reader( - csv_file, - delimiter=test_conf.csv_options.delimiter, - quotechar=test_conf.csv_options.quote_char, - ) - - headers: List[str] = next(reader, None) - metrics = self.__selected_metrics(test_conf.metrics, selector.metrics) - - # Decide which columns to fetch into which components of the result: - try: - time_index: int = headers.index(test_conf.time_column) - attr_indexes: List[int] = [headers.index(c) for c in test_conf.attributes] - metric_names = [m.name for m in metrics.values()] - metric_columns = [m.column for m in metrics.values()] - metric_indexes: List[int] = [headers.index(c) for c in metric_columns] - except ValueError as err: - raise DataImportError(f"Column not found {err.args[0]}") - - if time_index in attr_indexes: - attr_indexes.remove(time_index) - if time_index in metric_indexes: - metric_indexes.remove(time_index) - - # Initialize empty lists to store the data and metadata: - time: List[int] = [] - data: Dict[str, List[float]] = {} - for n in metric_names: - data[n] = [] - attributes: Dict[str, List[str]] = {} - for i in attr_indexes: - attributes[headers[i]] = [] - - # Append the lists with data from each row: - for row in reader: - self.check_row_len(headers, row) - - # Filter by time: - ts: datetime = self.__convert_time(row[time_index]) - if since_time is not None and ts < since_time: - continue - if until_time is not None and ts >= until_time: - continue - time.append(int(ts.timestamp())) - - # Read metric values. Note we can still fail on conversion to float, - # because the user is free to override the column selection and thus - # they may select a column that contains non-numeric data: - for (name, i) in zip(metric_names, metric_indexes): - try: - data[name].append(float(row[i])) - except ValueError as err: - raise DataImportError( - "Could not convert value in column " - + headers[i] - + ": " - + err.args[0] - ) - - # Attributes are just copied as-is, with no conversion: - for i in attr_indexes: - attributes[headers[i]].append(row[i]) - - # Convert metrics to series.Metrics - metrics = {m.name: Metric(m.direction, m.scale) for m in metrics.values()} - - # Leave last n points: - time = time[-selector.last_n_points :] - tmp = data - data = {} - for k, v in tmp.items(): - data[k] = v[-selector.last_n_points :] - tmp = attributes - attributes = {} - for k, v in tmp.items(): - attributes[k] = v[-selector.last_n_points :] - - return Series( - test_conf.name, - branch=None, - time=time, - metrics=metrics, - data=data, - attributes=attributes, - ) - - except FileNotFoundError: - raise DataImportError(f"Input file not found: {file}") - - @staticmethod - def __convert_time(time: str): - try: - return parse_datetime(time) - except DateFormatError as err: - raise DataImportError(err.message) - - def fetch_all_metric_names(self, test_conf: CsvTestConfig) -> List[str]: - return [m for m in test_conf.metrics.keys()] - - -class HistoStatImporter(Importer): - - __TAG_METRICS = { - "count": {"direction": 1, "scale": "1", "col": 3}, - "min": {"direction": -1, "scale": "1.0e-6", "col": 4}, - "p25": {"direction": -1, "scale": "1.0e-6", "col": 5}, - "p50": {"direction": -1, "scale": "1.0e-6", "col": 6}, - "p75": {"direction": -1, "scale": "1.0e-6", "col": 7}, - "p90": {"direction": -1, "scale": "1.0e-6", "col": 8}, - "p95": {"direction": -1, "scale": "1.0e-6", "col": 9}, - "p98": {"direction": -1, "scale": "1.0e-6", "col": 10}, - "p99": {"direction": -1, "scale": "1.0e-6", "col": 11}, - "p999": {"direction": -1, "scale": "1.0e-6", "col": 12}, - "p9999": {"direction": -1, "scale": "1.0e-6", "col": 13}, - "max": {"direction": -1, "scale": "1.0e-6", "col": 14}, - } - - @contextmanager - def __csv_reader(self, test: HistoStatTestConfig): - with open(Path(test.file), newline="") as csv_file: - yield csv.reader(csv_file) - - @staticmethod - def __parse_tag(tag: str): - return tag.split("=")[1] - - def __get_tags(self, test: HistoStatTestConfig) -> List[str]: - tags = set() - with self.__csv_reader(test) as reader: - for row in reader: - if row[0].startswith("#"): - continue - tag = self.__parse_tag(row[0]) - if tag in tags: - break - tags.add(tag) - return list(tags) - - @staticmethod - def __metric_from_components(tag, tag_metric): - return f"{tag}.{tag_metric}" - - @staticmethod - def __convert_floating_point_millisecond(fpm: str) -> int: # to epoch seconds - return int(float(fpm) * 1000) // 1000 - - def fetch_data( - self, test: HistoStatTestConfig, selector: DataSelector = DataSelector() - ) -> Series: - def selected(metric_name): - return metric_name in selector.metrics if selector.metrics is not None else True - - metrics = {} - tag_count = 0 - for tag in self.__get_tags(test): - tag_count += 1 - for tag_metric, attrs in self.__TAG_METRICS.items(): - if selected(self.__metric_from_components(tag, tag_metric)): - metrics[self.__metric_from_components(tag, tag_metric)] = Metric( - attrs["direction"], attrs["scale"] - ) - - data = {k: [] for k in metrics.keys()} - time = [] - with self.__csv_reader(test) as reader: - start_time = None - for row in reader: - if not row[0].startswith("#"): - break - if "StartTime" in row[0]: - parts = row[0].split(" ") - start_time = self.__convert_floating_point_millisecond(parts[1]) - - if not start_time: - raise DataImportError("No Start Time specified in HistoStat CSV comment") - - # Last iteration of row is the first non-comment row. Parse it now. - tag_interval = 0 - while row: - if tag_interval % tag_count == 0: - # Introduces a slight inaccuracy - each tag can report its interval start time - # with some millisecond difference. Choosing a single tag interval allows us - # to maintain the 'indexed by a single time variable' contract required by - # Series, but the time reported for almost all metrics will be _slightly_ off. - time.append(self.__convert_floating_point_millisecond(row[1]) + start_time) - tag_interval += 1 - tag = self.__parse_tag(row[0]) - for tag_metric, attrs in self.__TAG_METRICS.items(): - if selected(self.__metric_from_components(tag, tag_metric)): - data[self.__metric_from_components(tag, tag_metric)].append( - float(row[attrs["col"]]) - ) - try: - row = next(reader) - except StopIteration: - row = None - - # Leave last n points: - time = time[-selector.last_n_points :] - tmp = data - data = {} - for k, v in tmp.items(): - data[k] = v[-selector.last_n_points :] - - return Series(test.name, None, time, metrics, data, dict()) - - def fetch_all_metric_names(self, test: HistoStatTestConfig) -> List[str]: - metric_names = [] - for tag in self.__get_tags(test): - for tag_metric in self.__TAG_METRICS.keys(): - metric_names.append(self.__metric_from_components(tag, tag_metric)) - return metric_names - - -class Importers: - __config: Config - __csv_importer: Optional[CsvImporter] - __graphite_importer: Optional[GraphiteImporter] - __histostat_importer: Optional[HistoStatImporter] - - def __init__(self, config: Config): - self.__config = config - self.__csv_importer = None - self.__graphite_importer = None - self.__histostat_importer = None - - def csv_importer(self) -> CsvImporter: - if self.__csv_importer is None: - self.__csv_importer = CsvImporter() - return self.__csv_importer - - def graphite_importer(self) -> GraphiteImporter: - if self.__graphite_importer is None: - self.__graphite_importer = GraphiteImporter(Graphite(self.__config.graphite)) - return self.__graphite_importer - - def histostat_importer(self) -> HistoStatImporter: - if self.__histostat_importer is None: - self.__histostat_importer = HistoStatImporter() - return self.__histostat_importer - - def get(self, test: TestConfig) -> Importer: - if isinstance(test, CsvTestConfig): - return self.csv_importer() - elif isinstance(test, GraphiteTestConfig): - return self.graphite_importer() - elif isinstance(test, HistoStatTestConfig): - return self.histostat_importer() - else: - raise ValueError(f"Unsupported test type {type(test)}") diff --git a/hunter/main.py b/hunter/main.py deleted file mode 100644 index 256dc5b..0000000 --- a/hunter/main.py +++ /dev/null @@ -1,649 +0,0 @@ -import argparse -import copy -import logging -import sys -from dataclasses import dataclass -from datetime import datetime, timedelta -from typing import Dict, List, Optional - -import pytz -from slack_sdk import WebClient - -from hunter import config -from hunter.attributes import get_back_links -from hunter.config import Config, ConfigError -from hunter.data_selector import DataSelector -from hunter.grafana import Annotation, Grafana, GrafanaError -from hunter.graphite import GraphiteError -from hunter.importer import DataImportError, Importers -from hunter.report import Report, ReportType -from hunter.series import AnalysisOptions, AnalyzedSeries, compare -from hunter.slack import NotificationError, SlackNotifier -from hunter.test_config import GraphiteTestConfig, TestConfig, TestConfigError -from hunter.util import DateFormatError, interpolate, parse_datetime - - -@dataclass -class HunterError(Exception): - message: str - - -class Hunter: - __conf: Config - __importers: Importers - __grafana: Optional[Grafana] - __slack: Optional[SlackNotifier] - - def __init__(self, conf: Config): - self.__conf = conf - self.__importers = Importers(conf) - self.__grafana = None - self.__slack = self.__maybe_create_slack_notifier() - - def list_tests(self, group_names: Optional[List[str]]): - if group_names is not None: - test_names = [] - for group_name in group_names: - group = self.__conf.test_groups.get(group_name) - if group is None: - raise HunterError(f"Test group not found: {group_name}") - test_names += (t.name for t in group) - else: - test_names = self.__conf.tests - - for test_name in sorted(test_names): - print(test_name) - - def list_test_groups(self): - for group_name in sorted(self.__conf.test_groups): - print(group_name) - - def get_test(self, test_name: str) -> TestConfig: - test = self.__conf.tests.get(test_name) - if test is None: - raise HunterError(f"Test not found {test_name}") - return test - - def get_tests(self, *names: str) -> List[TestConfig]: - tests = [] - for name in names: - group = self.__conf.test_groups.get(name) - if group is not None: - tests += group - else: - test = self.__conf.tests.get(name) - if test is not None: - tests.append(test) - else: - raise HunterError(f"Test or group not found: {name}") - return tests - - def list_metrics(self, test: TestConfig): - importer = self.__importers.get(test) - for metric_name in importer.fetch_all_metric_names(test): - print(metric_name) - - def analyze( - self, - test: TestConfig, - selector: DataSelector, - options: AnalysisOptions, - report_type: ReportType, - ) -> AnalyzedSeries: - importer = self.__importers.get(test) - series = importer.fetch_data(test, selector) - analyzed_series = series.analyze(options) - change_points = analyzed_series.change_points_by_time - report = Report(series, change_points) - produced_report = report.produce_report(test.name, report_type) - print(produced_report) - return analyzed_series - - def __get_grafana(self) -> Grafana: - if self.__grafana is None: - self.__grafana = Grafana(self.__conf.grafana) - return self.__grafana - - def update_grafana_annotations(self, test: GraphiteTestConfig, series: AnalyzedSeries): - grafana = self.__get_grafana() - begin = datetime.fromtimestamp(series.time()[0], tz=pytz.UTC) - end = datetime.fromtimestamp(series.time()[len(series.time()) - 1], tz=pytz.UTC) - - logging.info(f"Fetching Grafana annotations for test {test.name}...") - tags_to_query = ["hunter", "change-point", "test:" + test.name] - old_annotations_for_test = grafana.fetch_annotations(begin, end, list(tags_to_query)) - logging.info(f"Found {len(old_annotations_for_test)} annotations") - - created_count = 0 - for metric_name, change_points in series.change_points.items(): - path = test.get_path(series.branch_name(), metric_name) - metric_tag = f"metric:{metric_name}" - tags_to_create = ( - tags_to_query - + [metric_tag] - + test.tags - + test.annotate - + test.metrics[metric_name].annotate - ) - - substitutions = { - "TEST_NAME": test.name, - "METRIC_NAME": metric_name, - "GRAPHITE_PATH": [path], - "GRAPHITE_PATH_COMPONENTS": path.split("."), - "GRAPHITE_PREFIX": [test.prefix], - "GRAPHITE_PREFIX_COMPONENTS": test.prefix.split("."), - } - - tmp_tags_to_create = [] - for t in tags_to_create: - tmp_tags_to_create += interpolate(t, substitutions) - tags_to_create = tmp_tags_to_create - - old_annotations = [a for a in old_annotations_for_test if metric_tag in a.tags] - old_annotation_times = set((a.time for a in old_annotations if a.tags)) - - target_annotations = [] - for cp in change_points: - attributes = series.attributes_at(cp.index) - annotation_text = get_back_links(attributes) - target_annotations.append( - Annotation( - id=None, - time=datetime.fromtimestamp(cp.time, tz=pytz.UTC), - text=annotation_text, - tags=tags_to_create, - ) - ) - target_annotation_times = set((a.time for a in target_annotations)) - - to_delete = [a for a in old_annotations if a.time not in target_annotation_times] - if to_delete: - logging.info( - f"Removing {len(to_delete)} annotations " - f"for test {test.name} and metric {metric_name}..." - ) - grafana.delete_annotations(*(a.id for a in to_delete)) - - to_create = [a for a in target_annotations if a.time not in old_annotation_times] - if to_create: - logging.info( - f"Creating {len(to_create)} annotations " - f"for test {test.name} and metric {metric_name}..." - ) - grafana.create_annotations(*to_create) - created_count += len(to_create) - - if created_count == 0: - logging.info("All annotations up-to-date. No new annotations needed.") - else: - logging.info(f"Created {created_count} annotations.") - - def remove_grafana_annotations(self, test: Optional[TestConfig], force: bool): - """Removes all Hunter annotations (optionally for a given test) in Grafana""" - grafana = self.__get_grafana() - if test: - logging.info(f"Fetching Grafana annotations for test {test.name}...") - else: - logging.info("Fetching Grafana annotations...") - tags_to_query = {"hunter", "change-point"} - if test: - tags_to_query.add(f"test: {test.name}") - annotations = grafana.fetch_annotations(None, None, list(tags_to_query)) - if not annotations: - logging.info("No annotations found.") - return - if not force: - print( - f"Are you sure to remove {len(annotations)} annotations from {grafana.url}? [y/N]" - ) - decision = input().strip() - if decision.lower() != "y" and decision.lower() != "yes": - return - logging.info(f"Removing {len(annotations)} annotations...") - grafana.delete_annotations(*(a.id for a in annotations)) - - def regressions( - self, test: TestConfig, selector: DataSelector, options: AnalysisOptions - ) -> bool: - importer = self.__importers.get(test) - - # Even if user is interested only in performance difference since some point X, - # we really need to fetch some earlier points than X. - # Otherwise, if performance went down very early after X, e.g. at X + 1, we'd have - # insufficient number of data points to compute the baseline performance. - # Instead of using `since-` selector, we're fetching everything from the - # beginning and then we find the baseline performance around the time pointed by - # the original selector. - since_version = selector.since_version - since_commit = selector.since_commit - since_time = selector.since_time - baseline_selector = copy.deepcopy(selector) - baseline_selector.last_n_points = sys.maxsize - baseline_selector.branch = None - baseline_selector.since_version = None - baseline_selector.since_commit = None - baseline_selector.since_time = since_time - timedelta(days=30) - baseline_series = importer.fetch_data(test, baseline_selector) - - if since_version: - baseline_index = baseline_series.find_by_attribute("version", since_version) - if not baseline_index: - raise HunterError(f"No runs of test {test.name} with version {since_version}") - baseline_index = max(baseline_index) - elif since_commit: - baseline_index = baseline_series.find_by_attribute("commit", since_commit) - if not baseline_index: - raise HunterError(f"No runs of test {test.name} with commit {since_commit}") - baseline_index = max(baseline_index) - else: - baseline_index = baseline_series.find_first_not_earlier_than(since_time) - - baseline_series = baseline_series.analyze(options=options) - - if selector.branch: - target_series = importer.fetch_data(test, selector).analyze(options=options) - else: - target_series = baseline_series - - cmp = compare(baseline_series, baseline_index, target_series, target_series.len()) - regressions = [] - for metric_name, stats in cmp.stats.items(): - direction = baseline_series.metric(metric_name).direction - m1 = stats.mean_1 - m2 = stats.mean_2 - change_percent = stats.forward_rel_change() * 100.0 - if m2 * direction < m1 * direction and stats.pvalue < options.max_pvalue: - regressions.append( - " {:16}: {:#8.3g} --> {:#8.3g} ({:+6.1f}%)".format( - metric_name, m1, m2, change_percent - ) - ) - - if regressions: - print(f"{test.name}:") - for r in regressions: - print(r) - else: - print(f"{test.name}: OK") - return len(regressions) > 0 - - def __maybe_create_slack_notifier(self): - if not self.__conf.slack: - return None - return SlackNotifier(WebClient(token=self.__conf.slack.bot_token)) - - def notify_slack( - self, - test_change_points: Dict[str, AnalyzedSeries], - selector: DataSelector, - channels: List[str], - since: datetime, - ): - if not self.__slack: - logging.error( - "Slack definition is missing from the configuration, cannot send notification" - ) - return - self.__slack.notify(test_change_points, selector=selector, channels=channels, since=since) - - def validate(self): - valid = True - unique_metrics = set() - for name, test in self.__conf.tests.items(): - logging.info("Checking {}".format(name)) - test_metrics = test.fully_qualified_metric_names() - for test_metric in test_metrics: - if test_metric not in unique_metrics: - unique_metrics.add(test_metric) - else: - valid = False - logging.error(f"Found duplicated metric: {test_metric}") - try: - importer = self.__importers.get(test) - series = importer.fetch_data(test) - for metric, metric_data in series.data.items(): - if not metric_data: - logging.warning(f"Test's metric does not have data: {name} {metric}") - except Exception as err: - logging.error(f"Invalid test definition: {name}\n{repr(err)}\n") - valid = False - logging.info(f"Validation finished: {'VALID' if valid else 'INVALID'}") - if not valid: - exit(1) - - -def setup_data_selector_parser(parser: argparse.ArgumentParser): - parser.add_argument( - "--branch", metavar="STRING", dest="branch", help="name of the branch", nargs="?" - ) - parser.add_argument( - "--metrics", - metavar="LIST", - dest="metrics", - help="a comma-separated list of metrics to analyze", - ) - parser.add_argument( - "--attrs", - metavar="LIST", - dest="attributes", - help="a comma-separated list of attribute names associated with the runs " - "(e.g. commit, branch, version); " - "if not specified, it will be automatically filled based on available information", - ) - since_group = parser.add_mutually_exclusive_group() - since_group.add_argument( - "--since-commit", - metavar="STRING", - dest="since_commit", - help="the commit at the start of the time span to analyze", - ) - since_group.add_argument( - "--since-version", - metavar="STRING", - dest="since_version", - help="the version at the start of the time span to analyze", - ) - since_group.add_argument( - "--since", - metavar="DATE", - dest="since_time", - help="the start of the time span to analyze; " - "accepts ISO, and human-readable dates like '10 weeks ago'", - ) - until_group = parser.add_mutually_exclusive_group() - until_group.add_argument( - "--until-commit", - metavar="STRING", - dest="until_commit", - help="the commit at the end of the time span to analyze", - ) - until_group.add_argument( - "--until-version", - metavar="STRING", - dest="until_version", - help="the version at the end of the time span to analyze", - ) - until_group.add_argument( - "--until", - metavar="DATE", - dest="until_time", - help="the end of the time span to analyze; same syntax as --since", - ) - parser.add_argument( - "--last", - type=int, - metavar="COUNT", - dest="last_n_points", - help="the number of data points to take from the end of the series", - ) - - -def data_selector_from_args(args: argparse.Namespace) -> DataSelector: - data_selector = DataSelector() - if args.branch: - data_selector.branch = args.branch - if args.metrics is not None: - data_selector.metrics = list(args.metrics.split(",")) - if args.attributes is not None: - data_selector.attributes = list(args.attributes.split(",")) - if args.since_commit is not None: - data_selector.since_commit = args.since_commit - if args.since_version is not None: - data_selector.since_version = args.since_version - if args.since_time is not None: - data_selector.since_time = parse_datetime(args.since_time) - if args.until_commit is not None: - data_selector.until_commit = args.until_commit - if args.until_version is not None: - data_selector.until_version = args.until_version - if args.until_time is not None: - data_selector.until_time = parse_datetime(args.until_time) - if args.last_n_points is not None: - data_selector.last_n_points = args.last_n_points - return data_selector - - -def setup_analysis_options_parser(parser: argparse.ArgumentParser): - parser.add_argument( - "-P, --p-value", - dest="pvalue", - type=float, - default=0.001, - help="maximum accepted P-value of a change-point; " - "P denotes the probability that the change-point has " - "been found by a random coincidence, rather than a real " - "difference between the data distributions", - ) - parser.add_argument( - "-M", - "--magnitude", - dest="magnitude", - type=float, - default=0.0, - help="minimum accepted magnitude of a change-point " - "computed as abs(new_mean / old_mean - 1.0); use it " - "to filter out stupidly small changes like < 0.01", - ) - parser.add_argument( - "--window", - default=50, - type=int, - dest="window", - help="the number of data points analyzed at once; " - "the window size affects the discriminative " - "power of the change point detection algorithm; " - "large windows are less susceptible to noise; " - "however, a very large window may cause dismissing short regressions " - "as noise so it is best to keep it short enough to include not more " - "than a few change points (optimally at most 1)", - ) - parser.add_argument( - "--orig-edivisive", - type=bool, - default=False, - dest="orig_edivisive", - help="use the original edivisive algorithm with no windowing " - "and weak change points analysis improvements", - ) - - -def analysis_options_from_args(args: argparse.Namespace) -> AnalysisOptions: - conf = AnalysisOptions() - if args.pvalue is not None: - conf.max_pvalue = args.pvalue - if args.magnitude is not None: - conf.min_magnitude = args.magnitude - if args.window is not None: - conf.window_len = args.window - if args.orig_edivisive is not None: - conf.orig_edivisive = args.orig_edivisive - return conf - - -def main(): - logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO) - - parser = argparse.ArgumentParser(description="Hunts performance regressions in Fallout results") - - subparsers = parser.add_subparsers(dest="command") - list_tests_parser = subparsers.add_parser("list-tests", help="list available tests") - list_tests_parser.add_argument("group", help="name of the group of the tests", nargs="*") - - list_metrics_parser = subparsers.add_parser( - "list-metrics", help="list available metrics for a test" - ) - list_metrics_parser.add_argument("test", help="name of the test") - - subparsers.add_parser("list-groups", help="list available groups of tests") - - analyze_parser = subparsers.add_parser( - "analyze", - help="analyze performance test results", - formatter_class=argparse.RawTextHelpFormatter, - ) - analyze_parser.add_argument("tests", help="name of the test or group of the tests", nargs="+") - analyze_parser.add_argument( - "--update-grafana", - help="Update Grafana dashboards with appropriate annotations of change points", - action="store_true", - ) - analyze_parser.add_argument( - "--notify-slack", - help="Send notification containing a summary of change points to given Slack channels", - nargs="+", - ) - analyze_parser.add_argument( - "--cph-report-since", - help="Sets a limit on the date range of the Change Point History reported to Slack. Same syntax as --since.", - metavar="DATE", - dest="cph_report_since", - ) - analyze_parser.add_argument( - "--output", - help="Output format for the generated report.", - choices=list(ReportType), - dest="report_type", - default=ReportType.LOG, - type=ReportType, - ) - setup_data_selector_parser(analyze_parser) - setup_analysis_options_parser(analyze_parser) - - regressions_parser = subparsers.add_parser("regressions", help="find performance regressions") - regressions_parser.add_argument( - "tests", help="name of the test or group of the tests", nargs="+" - ) - setup_data_selector_parser(regressions_parser) - setup_analysis_options_parser(regressions_parser) - - remove_annotations_parser = subparsers.add_parser("remove-annotations") - remove_annotations_parser.add_argument( - "tests", help="name of the test or test group", nargs="*" - ) - remove_annotations_parser.add_argument( - "--force", help="don't ask questions, just do it", dest="force", action="store_true" - ) - - subparsers.add_parser( - "validate", help="validates the tests and metrics defined in the configuration" - ) - - try: - args = parser.parse_args() - conf = config.load_config() - hunter = Hunter(conf) - - if args.command == "list-groups": - hunter.list_test_groups() - - if args.command == "list-tests": - group_names = args.group if args.group else None - hunter.list_tests(group_names) - - if args.command == "list-metrics": - test = hunter.get_test(args.test) - hunter.list_metrics(test) - - if args.command == "analyze": - update_grafana_flag = args.update_grafana - slack_notification_channels = args.notify_slack - slack_cph_since = parse_datetime(args.cph_report_since) - data_selector = data_selector_from_args(args) - options = analysis_options_from_args(args) - report_type = args.report_type - tests = hunter.get_tests(*args.tests) - tests_analyzed_series = {test.name: None for test in tests} - for test in tests: - try: - analyzed_series = hunter.analyze( - test, selector=data_selector, options=options, report_type=report_type - ) - if update_grafana_flag: - if not isinstance(test, GraphiteTestConfig): - raise GrafanaError("Not a Graphite test") - hunter.update_grafana_annotations(test, analyzed_series) - if slack_notification_channels: - tests_analyzed_series[test.name] = analyzed_series - except DataImportError as err: - logging.error(err.message) - except GrafanaError as err: - logging.error( - f"Failed to update grafana dashboards for {test.name}: {err.message}" - ) - if slack_notification_channels: - hunter.notify_slack( - tests_analyzed_series, - selector=data_selector, - channels=slack_notification_channels, - since=slack_cph_since, - ) - - if args.command == "regressions": - data_selector = data_selector_from_args(args) - options = analysis_options_from_args(args) - tests = hunter.get_tests(*args.tests) - regressing_test_count = 0 - errors = 0 - for test in tests: - try: - regressions = hunter.regressions(test, selector=data_selector, options=options) - if regressions: - regressing_test_count += 1 - except HunterError as err: - logging.error(err.message) - errors += 1 - except DataImportError as err: - logging.error(err.message) - errors += 1 - if regressing_test_count == 0: - print("No regressions found!") - elif regressing_test_count == 1: - print("Regressions in 1 test found") - else: - print(f"Regressions in {regressing_test_count} tests found") - if errors > 0: - print("Some tests were skipped due to import / analyze errors. Consult error log.") - - if args.command == "remove-annotations": - if args.tests: - tests = hunter.get_tests(*args.tests) - for test in tests: - hunter.remove_grafana_annotations(test, args.force) - else: - hunter.remove_grafana_annotations(None, args.force) - - if args.command == "validate": - hunter.validate() - - if args.command is None: - parser.print_usage() - - except ConfigError as err: - logging.error(err.message) - exit(1) - except TestConfigError as err: - logging.error(err.message) - exit(1) - except GraphiteError as err: - logging.error(err.message) - exit(1) - except GrafanaError as err: - logging.error(err.message) - exit(1) - except DataImportError as err: - logging.error(err.message) - exit(1) - except HunterError as err: - logging.error(err.message) - exit(1) - except DateFormatError as err: - logging.error(err.message) - exit(1) - except NotificationError as err: - logging.error(err.message) - exit(1) - - -if __name__ == "__main__": - main() diff --git a/hunter/report.py b/hunter/report.py deleted file mode 100644 index f4838c7..0000000 --- a/hunter/report.py +++ /dev/null @@ -1,85 +0,0 @@ -from collections import OrderedDict -from enum import Enum, unique -from typing import List - -from tabulate import tabulate - -from hunter.series import ChangePointGroup, Series -from hunter.util import format_timestamp, insert_multiple, remove_common_prefix - - -@unique -class ReportType(Enum): - LOG = "log" - JSON = "json" - - def __str__(self): - return self.value - - -class Report: - __series: Series - __change_points: List[ChangePointGroup] - - def __init__(self, series: Series, change_points: List[ChangePointGroup]): - self.__series = series - self.__change_points = change_points - - @staticmethod - def __column_widths(log: List[str]) -> List[int]: - return [len(c) for c in log[1].split(None)] - - def produce_report(self, test_name: str, report_type: ReportType): - if report_type == ReportType.LOG: - return self.__format_log_annotated(test_name) - elif report_type == ReportType.JSON: - return self.__format_json(test_name) - else: - from hunter.main import HunterError - - raise HunterError(f"Unknown report type: {report_type}") - - def __format_log(self) -> str: - time_column = [format_timestamp(ts) for ts in self.__series.time] - table = {"time": time_column, **self.__series.attributes, **self.__series.data} - metrics = list(self.__series.data.keys()) - headers = list( - OrderedDict.fromkeys( - ["time", *self.__series.attributes, *remove_common_prefix(metrics)] - ) - ) - return tabulate(table, headers=headers) - - def __format_log_annotated(self, test_name: str) -> str: - """Returns test log with change points marked as horizontal lines""" - lines = self.__format_log().split("\n") - col_widths = self.__column_widths(lines) - indexes = [cp.index for cp in self.__change_points] - separators = [] - columns = list( - OrderedDict.fromkeys(["time", *self.__series.attributes, *self.__series.data]) - ) - for cp in self.__change_points: - separator = "" - info = "" - for col_index, col_name in enumerate(columns): - col_width = col_widths[col_index] - change = [c for c in cp.changes if c.metric == col_name] - if change: - change = change[0] - change_percent = change.forward_change_percent() - separator += "·" * col_width + " " - info += f"{change_percent:+.1f}%".rjust(col_width) + " " - else: - separator += " " * (col_width + 2) - info += " " * (col_width + 2) - - separators.append(f"{separator}\n{info}\n{separator}") - - lines = lines[:2] + insert_multiple(lines[2:], separators, indexes) - return "\n".join(lines) - - def __format_json(self, test_name: str) -> str: - import json - - return json.dumps({test_name: [cpg.to_json() for cpg in self.__change_points]}) diff --git a/hunter/series.py b/hunter/series.py deleted file mode 100644 index 30bc2a2..0000000 --- a/hunter/series.py +++ /dev/null @@ -1,300 +0,0 @@ -import logging -from dataclasses import dataclass -from datetime import datetime -from itertools import groupby -from typing import Dict, Iterable, List, Optional - -import numpy as np - -from hunter.analysis import ( - ComparativeStats, - TTestSignificanceTester, - compute_change_points, - compute_change_points_orig, - fill_missing, -) - - -@dataclass -class AnalysisOptions: - window_len: int - max_pvalue: float - min_magnitude: float - orig_edivisive: bool - - def __init__(self): - self.window_len = 50 - self.max_pvalue = 0.001 - self.min_magnitude = 0.0 - self.orig_edivisive = False - - -@dataclass -class Metric: - direction: int - scale: float - unit: str - - def __init__(self, direction: int = 1, scale: float = 1.0, unit: str = ""): - self.direction = direction - self.scale = scale - self.unit = "" - - -@dataclass -class ChangePoint: - """A change-point for a single metric""" - - metric: str - index: int - time: int - stats: ComparativeStats - - def forward_change_percent(self) -> float: - return self.stats.forward_rel_change() * 100.0 - - def backward_change_percent(self) -> float: - return self.stats.backward_rel_change() * 100.0 - - def magnitude(self): - return self.stats.change_magnitude() - - def to_json(self): - return { - "metric": self.metric, - "forward_change_percent": f"{self.forward_change_percent():.0f}", - } - - -@dataclass -class ChangePointGroup: - """A group of change points on multiple metrics, at the same time""" - - index: int - time: int - prev_time: int - attributes: Dict[str, str] - prev_attributes: Dict[str, str] - changes: List[ChangePoint] - - def to_json(self): - return {"time": self.time, "changes": [cp.to_json() for cp in self.changes]} - - -class Series: - """ - Stores values of interesting metrics of all runs of - a fallout test indexed by a single time variable. - Provides utilities to analyze data e.g. find change points. - """ - - test_name: str - branch: Optional[str] - time: List[int] - metrics: Dict[str, Metric] - attributes: Dict[str, List[str]] - data: Dict[str, List[float]] - - def __init__( - self, - test_name: str, - branch: Optional[str], - time: List[int], - metrics: Dict[str, Metric], - data: Dict[str, List[float]], - attributes: Dict[str, List[str]], - ): - self.test_name = test_name - self.branch = branch - self.time = time - self.metrics = metrics - self.attributes = attributes if attributes else {} - self.data = data - assert all(len(x) == len(time) for x in data.values()) - assert all(len(x) == len(time) for x in attributes.values()) - - def attributes_at(self, index: int) -> Dict[str, str]: - result = {} - for (k, v) in self.attributes.items(): - result[k] = v[index] - return result - - def find_first_not_earlier_than(self, time: datetime) -> Optional[int]: - timestamp = time.timestamp() - for i, t in enumerate(self.time): - if t >= timestamp: - return i - return None - - def find_by_attribute(self, name: str, value: str) -> List[int]: - """Returns the indexes of data points with given attribute value""" - result = [] - for i in range(len(self.time)): - if self.attributes_at(i).get(name) == value: - result.append(i) - return result - - def analyze(self, options: AnalysisOptions = AnalysisOptions()) -> "AnalyzedSeries": - logging.info(f"Computing change points for test {self.test_name}...") - return AnalyzedSeries(self, options) - - -class AnalyzedSeries: - """ - Time series data with computed change points. - """ - - __series: Series - options: AnalysisOptions - change_points: Dict[str, List[ChangePoint]] - change_points_by_time: List[ChangePointGroup] - - def __init__(self, series: Series, options: AnalysisOptions): - self.__series = series - self.options = options - self.change_points = self.__compute_change_points(series, options) - self.change_points_by_time = self.__group_change_points_by_time(series, self.change_points) - - @staticmethod - def __compute_change_points( - series: Series, options: AnalysisOptions - ) -> Dict[str, List[ChangePoint]]: - result = {} - for metric in series.data.keys(): - values = series.data[metric].copy() - fill_missing(values) - if options.orig_edivisive: - change_points = compute_change_points_orig( - values, - max_pvalue=options.max_pvalue, - ) - else: - change_points = compute_change_points( - values, - window_len=options.window_len, - max_pvalue=options.max_pvalue, - min_magnitude=options.min_magnitude, - ) - result[metric] = [] - for c in change_points: - result[metric].append( - ChangePoint( - index=c.index, time=series.time[c.index], metric=metric, stats=c.stats - ) - ) - return result - - @staticmethod - def __group_change_points_by_time( - series: Series, change_points: Dict[str, List[ChangePoint]] - ) -> List[ChangePointGroup]: - changes: List[ChangePoint] = [] - for metric in change_points.keys(): - changes += change_points[metric] - - changes.sort(key=lambda c: c.index) - points = [] - for k, g in groupby(changes, key=lambda c: c.index): - cp = ChangePointGroup( - index=k, - time=series.time[k], - prev_time=series.time[k - 1], - attributes=series.attributes_at(k), - prev_attributes=series.attributes_at(k - 1), - changes=list(g), - ) - points.append(cp) - - return points - - def get_stable_range(self, metric: str, index: int) -> (int, int): - """ - Returns a range of indexes (A, B) such that: - - A is the nearest change point index of the `metric` before or equal given `index`, - or 0 if not found - - B is the nearest change point index of the `metric` after given `index, - or len(self.time) if not found - - It follows that there are no change points between A and B. - """ - begin = 0 - for cp in self.change_points[metric]: - if cp.index > index: - break - begin = cp.index - - end = len(self.time()) - for cp in reversed(self.change_points[metric]): - if cp.index <= index: - break - end = cp.index - - return begin, end - - def test_name(self) -> str: - return self.__series.test_name - - def branch_name(self) -> Optional[str]: - return self.__series.branch - - def len(self) -> int: - return len(self.__series.time) - - def time(self) -> List[int]: - return self.__series.time - - def data(self, metric: str) -> List[float]: - return self.__series.data[metric] - - def attributes(self) -> Iterable[str]: - return self.__series.attributes.keys() - - def attributes_at(self, index: int) -> Dict[str, str]: - return self.__series.attributes_at(index) - - def attribute_values(self, attribute: str) -> List[str]: - return self.__series.attributes[attribute] - - def metric_names(self) -> Iterable[str]: - return self.__series.metrics.keys() - - def metric(self, name: str) -> Metric: - return self.__series.metrics[name] - - -@dataclass -class SeriesComparison: - series_1: AnalyzedSeries - series_2: AnalyzedSeries - index_1: int - index_2: int - stats: Dict[str, ComparativeStats] # keys: metric name - - -def compare( - series_1: AnalyzedSeries, - index_1: Optional[int], - series_2: AnalyzedSeries, - index_2: Optional[int], -) -> SeriesComparison: - - # if index not specified, we want to take the most recent performance - index_1 = index_1 if index_1 is not None else len(series_1.time()) - index_2 = index_2 if index_2 is not None else len(series_2.time()) - metrics = filter(lambda m: m in series_2.metric_names(), series_1.metric_names()) - - tester = TTestSignificanceTester(series_1.options.max_pvalue) - stats = {} - - for metric in metrics: - data_1 = series_1.data(metric) - (begin_1, end_1) = series_1.get_stable_range(metric, index_1) - data_1 = [x for x in data_1[begin_1:end_1] if x is not None] - - data_2 = series_2.data(metric) - (begin_2, end_2) = series_2.get_stable_range(metric, index_2) - data_2 = [x for x in data_2[begin_2:end_2] if x is not None] - - stats[metric] = tester.compare(np.array(data_1), np.array(data_2)) - - return SeriesComparison(series_1, series_2, index_1, index_2, stats) diff --git a/hunter/slack.py b/hunter/slack.py deleted file mode 100644 index 649e4a0..0000000 --- a/hunter/slack.py +++ /dev/null @@ -1,229 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime -from math import isinf -from typing import Dict, List - -from pytz import UTC -from slack_sdk import WebClient - -from hunter.data_selector import DataSelector -from hunter.series import AnalyzedSeries, ChangePointGroup - - -@dataclass -class NotificationError(Exception): - message: str - - -@dataclass -class SlackConfig: - bot_token: str - - -class SlackNotification: - tests_with_insufficient_data: List[str] - test_analyzed_series: Dict[str, AnalyzedSeries] - since: datetime - - def __init__( - self, - test_analyzed_series: Dict[str, AnalyzedSeries], - data_selection_description: str = None, - since: datetime = None, - ): - self.data_selection_description = data_selection_description - self.since = since - self.tests_with_insufficient_data = [] - self.test_analyzed_series = dict() - for test, series in test_analyzed_series.items(): - if series: - self.test_analyzed_series[test] = series - else: - self.tests_with_insufficient_data.append(test) - - def __init_insufficient_data_dispatch(self): - dispatch = [ - self.__text_block( - "header", - "plain_text", - "Hunter found insufficient data for the following tests :warning:", - ) - ] - if self.data_selection_description: - dispatch.append(self.__data_selection_block()) - return dispatch - - def __init_report_dispatch(self): - dispatch = [self.__header()] - if self.data_selection_description: - dispatch.append(self.__data_selection_block()) - if self.since: - dispatch.append(self.__report_selection_block()) - return dispatch - - def __minimum_dispatch_length(self): - min = 1 # header - if self.data_selection_description: - min += 1 - if self.since: - min += 1 - return min - - # A Slack message can only contain 50 blocks so - # large summaries must be split across messages. - def create_dispatches(self) -> List[List[object]]: - dispatches = [] - cur = self.__init_insufficient_data_dispatch() - for test_name in self.tests_with_insufficient_data: - if len(cur) == 50: - dispatches.append(cur) - cur = self.__init_insufficient_data_dispatch() - cur.append(self.__plain_text_section_block(test_name)) - - if len(cur) > self.__minimum_dispatch_length(): - dispatches.append(cur) - - dates_change_points = {} - for test_name, analyzed_series in self.test_analyzed_series.items(): - for group in analyzed_series.change_points_by_time: - cpg_time = datetime.fromtimestamp(group.time, tz=UTC) - if self.since and cpg_time < self.since: - continue - date_str = self.__datetime_to_str(cpg_time) - if date_str not in dates_change_points: - dates_change_points[date_str] = {} - dates_change_points[date_str][test_name] = group - - cur = self.__init_report_dispatch() - for date in sorted(dates_change_points): - add = [ - self.__block("divider"), - self.__title_block(date), - ] + self.__dates_change_points_summary(dates_change_points[date]) - - if not len(cur) + len(add) < 50: - dispatches.append(cur) - cur = self.__init_report_dispatch() - - cur = cur + add - - if len(cur) > self.__minimum_dispatch_length(): - dispatches.append(cur) - - return dispatches - - @staticmethod - def __datetime_to_str(date: datetime): - return str(date.strftime("%Y-%m-%d %H:%M:%S")) - - @staticmethod - def __block(block_type: str, content: Dict = None): - block = {"type": block_type} - if content: - block.update(content) - return block - - @classmethod - def __text_block(cls, type, text_type, text): - return cls.__block( - type, - content={ - "text": { - "type": text_type, - "text": text, - } - }, - ) - - @classmethod - def __fields_section(cls, fields_text): - def field_block(text): - return {"type": "mrkdwn", "text": text} - - return cls.__block("section", content={"fields": [field_block(t) for t in fields_text]}) - - @classmethod - def __plain_text_section_block(cls, text): - return cls.__text_block("section", "plain_text", text) - - def __header(self): - header_text = ( - "Hunter has detected change points" - if self.test_analyzed_series - else "Hunter did not detect any change points" - ) - return self.__text_block("header", "plain_text", header_text) - - def __data_selection_block(self): - return self.__plain_text_section_block(self.data_selection_description) - - def __report_selection_block(self): - return self.__fields_section(["Report Since", self.__datetime_to_str(self.since)]) - - @classmethod - def __title_block(cls, name): - return cls.__text_block("section", "mrkdwn", f"*{name}*") - - def __dates_change_points_summary(self, test_changes: Dict[str, ChangePointGroup]): - fields = [] - for test_name, group in test_changes.items(): - fields.append(f"*{test_name}*") - summary = "" - for change in group.changes: - change_percent = change.forward_change_percent() - change_emoji = self.__get_change_emoji(test_name, change) - if isinf(change_percent): - report_percent = change_percent - # Avoid rounding decimal change points to zero - elif -5 < change_percent < 5: - report_percent = f"{change_percent:.1f}" - else: - report_percent = round(change_percent) - summary += f"{change_emoji} *{change.metric}*: {report_percent}%\n" - fields.append(summary) - - sections = [] - i = 0 - while i < len(fields): - section_fields = [] - while len(section_fields) < 10 and i < len(fields): - section_fields.append(fields[i]) - i += 1 - sections.append(self.__fields_section(section_fields)) - - return sections - - def __get_change_emoji(self, test_name, change): - metric_direction = self.test_analyzed_series[test_name].metric(change.metric).direction - regression = metric_direction * change.forward_change_percent() - if regression >= 0: - return ":large_blue_circle:" - else: - return ":red_circle:" - - -class SlackNotifier: - __client: WebClient - - def __init__(self, client: WebClient): - self.__client = client - - def notify( - self, - test_analyzed_series: Dict[str, AnalyzedSeries], - selector: DataSelector, - channels: List[str], - since: datetime, - ): - dispatches = SlackNotification( - test_analyzed_series, - data_selection_description=selector.get_selection_description(), - since=since, - ).create_dispatches() - if len(dispatches) > 3: - raise NotificationError( - "Change point summary would produce too many Slack notifications" - ) - for channel in channels: - for blocks in dispatches: - self.__client.chat_postMessage(channel=channel, blocks=blocks) diff --git a/hunter/test_config.py b/hunter/test_config.py deleted file mode 100644 index 46a721b..0000000 --- a/hunter/test_config.py +++ /dev/null @@ -1,228 +0,0 @@ -import os.path -from dataclasses import dataclass -from typing import Dict, List, Optional - -from hunter.csv_options import CsvOptions -from hunter.util import interpolate - - -@dataclass -class TestConfig: - name: str - - def fully_qualified_metric_names(self): - raise NotImplementedError - - -@dataclass -class TestConfigError(Exception): - message: str - - -@dataclass -class CsvMetric: - name: str - direction: int - scale: float - column: str - - -@dataclass -class CsvTestConfig(TestConfig): - file: str - csv_options: CsvOptions - time_column: str - metrics: Dict[str, CsvMetric] - attributes: List[str] - - def __init__( - self, - name: str, - file: str, - csv_options: CsvOptions = CsvOptions(), - time_column: str = "time", - metrics: List[CsvMetric] = None, - attributes: List[str] = None, - ): - self.name = name - self.file = file - self.csv_options = csv_options - self.time_column = time_column - self.metrics = {m.name: m for m in metrics} if metrics else {} - self.attributes = attributes if attributes else {} - - def fully_qualified_metric_names(self) -> List[str]: - return list(self.metrics.keys()) - - -@dataclass -class GraphiteMetric: - name: str - direction: int - scale: float - suffix: str - annotate: List[str] # tags appended to Grafana annotations - - -@dataclass -class GraphiteTestConfig(TestConfig): - prefix: str # location of the performance data for the main branch - branch_prefix: Optional[str] # location of the performance data for the feature branch - metrics: Dict[str, GraphiteMetric] # collection of metrics to fetch - tags: List[str] # tags to query graphite events for this test - annotate: List[str] # annotation tags - - def __init__( - self, - name: str, - prefix: str, - branch_prefix: Optional[str], - metrics: List[GraphiteMetric], - tags: List[str], - annotate: List[str], - ): - self.name = name - self.prefix = prefix - self.branch_prefix = branch_prefix - self.metrics = {m.name: m for m in metrics} - self.tags = tags - self.annotate = annotate - - def get_path(self, branch_name: Optional[str], metric_name: str) -> str: - metric = self.metrics.get(metric_name) - substitutions = {"BRANCH": [branch_name if branch_name else "main"]} - if branch_name and self.branch_prefix: - return interpolate(self.branch_prefix, substitutions)[0] + "." + metric.suffix - elif branch_name: - branch_var_name = "%{BRANCH}" - if branch_var_name not in self.prefix: - raise TestConfigError( - f"Test {self.name} does not support branching. " - f"Please set the `branch_prefix` property or use {branch_var_name} " - f"in the `prefix`." - ) - interpolated = interpolate(self.prefix, substitutions) - return interpolated[0] + "." + metric.suffix - else: - return self.prefix + "." + metric.suffix - - def fully_qualified_metric_names(self): - return [f"{self.prefix}.{m.suffix}" for _, m in self.metrics.items()] - - -@dataclass -class HistoStatTestConfig(TestConfig): - name: str - file: str - - def fully_qualified_metric_names(self): - from hunter.importer import HistoStatImporter - - return HistoStatImporter().fetch_all_metric_names(self) - - -def create_test_config(name: str, config: Dict) -> TestConfig: - """ - Loads properties of a test from a dictionary read from hunter's config file - This dictionary must have the `type` property to determine the type of the test. - Other properties depend on the type. - Currently supported test types are `fallout`, `graphite` and `csv`. - """ - test_type = config.get("type") - if test_type == "csv": - return create_csv_test_config(name, config) - elif test_type == "graphite": - return create_graphite_test_config(name, config) - elif test_type == "histostat": - return create_histostat_test_config(name, config) - elif test_type is None: - raise TestConfigError(f"Test type not set for test {name}") - else: - raise TestConfigError(f"Unknown test type {test_type} for test {name}") - - -def create_csv_test_config(test_name: str, test_info: Dict) -> CsvTestConfig: - csv_options = CsvOptions() - try: - file = test_info["file"] - except KeyError as e: - raise TestConfigError(f"Configuration key not found in test {test_name}: {e.args[0]}") - time_column = test_info.get("time_column", "time") - metrics_info = test_info.get("metrics") - metrics = [] - if isinstance(metrics_info, List): - for name in metrics_info: - metrics.append(CsvMetric(name, 1, 1.0, name)) - elif isinstance(metrics_info, Dict): - for (metric_name, metric_conf) in metrics_info.items(): - metrics.append( - CsvMetric( - name=metric_name, - column=metric_conf.get("column", metric_name), - direction=int(metric_conf.get("direction", "1")), - scale=float(metric_conf.get("scale", "1")), - ) - ) - else: - raise TestConfigError(f"Metrics of the test {test_name} must be a list or dictionary") - - attributes = test_info.get("attributes", []) - if not isinstance(attributes, List): - raise TestConfigError(f"Attributes of the test {test_name} must be a list") - - if test_info.get("csv_options"): - csv_options.delimiter = test_info["csv_options"].get("delimiter", ",") - csv_options.quote_char = test_info["csv_options"].get("quote_char", '"') - return CsvTestConfig( - test_name, - file, - csv_options=csv_options, - time_column=time_column, - metrics=metrics, - attributes=test_info.get("attributes"), - ) - - -def create_graphite_test_config(name: str, test_info: Dict) -> GraphiteTestConfig: - try: - metrics_info = test_info["metrics"] - if not isinstance(metrics_info, Dict): - raise TestConfigError(f"Test {name} metrics field is not a dictionary.") - except KeyError as e: - raise TestConfigError(f"Configuration key not found in test {name}: {e.args[0]}") - - metrics = [] - try: - for (metric_name, metric_conf) in metrics_info.items(): - metrics.append( - GraphiteMetric( - name=metric_name, - suffix=metric_conf["suffix"], - direction=int(metric_conf.get("direction", "1")), - scale=float(metric_conf.get("scale", "1")), - annotate=metric_conf.get("annotate", []), - ) - ) - except KeyError as e: - raise TestConfigError(f"Configuration key not found in {name}.metrics: {e.args[0]}") - - return GraphiteTestConfig( - name, - prefix=test_info["prefix"], - branch_prefix=test_info.get("branch_prefix"), - tags=test_info.get("tags", []), - annotate=test_info.get("annotate", []), - metrics=metrics, - ) - - -def create_histostat_test_config(name: str, test_info: Dict) -> HistoStatTestConfig: - try: - file = test_info["file"] - except KeyError as e: - raise TestConfigError(f"Configuration key not found in test {name}: {e.args[0]}") - if not os.path.exists(file): - raise TestConfigError( - f"Configuration referenced histostat file which does not exist: {file}" - ) - return HistoStatTestConfig(name, file) diff --git a/hunter/util.py b/hunter/util.py deleted file mode 100644 index bd4a3ec..0000000 --- a/hunter/util.py +++ /dev/null @@ -1,232 +0,0 @@ -import math -import re -import sys -from collections import OrderedDict, deque -from dataclasses import dataclass -from datetime import datetime -from functools import reduce -from itertools import islice -from typing import Dict, List, Optional, Set, TypeVar - -import dateparser -from pytz import UTC - - -def resolution(time: List[int]) -> int: - """ - Graphite has a finite time resolution and the timestamps are rounded - to e.g. full days. This function tries to automatically detect the - level of rounding needed by inspecting the minimum time distance between the - data points. - """ - res = 24 * 3600 - if len(time) < 2: - return res - for (a, b) in sliding_window(time, 2): - if b - a > 0: - res = min(res, b - a) - for t in time: - res = math.gcd(res, t) - return res - - -def round(x: int, divisor: int) -> int: - """Round x to the multiplicity of divisor not greater than x""" - return int(x / divisor) * divisor - - -def remove_prefix(text: str, prefix: str) -> str: - """ - Strips prefix of a string. If the string doesn't start with the given - prefix, returns the original string unchanged. - """ - if text.startswith(prefix): - return text[len(prefix) :] - return text - - -T = TypeVar("T") - - -def merge_sorted(lists: List[List[T]]) -> List[T]: - """ - Merges multiple sorted lists into a sorted list that contains - only distinct items from the source lists. - Current implementation uses sorting, so it is not very efficient for - very large lists. - - Example: - - input: [[0, 1, 2, 4, 5], [0, 1, 2, 3, 5]] - - output: [0, 1, 2, 3, 4, 5] - """ - output = set() - for list_ in lists: - for item in list_: - output.add(item) - - output = list(output) - output.sort() - return output - - -def remove_common_prefix(names: List[str], sep: str = ".") -> List[str]: - """""" - - if len(names) == 0: - return names - - split_names = [name.split(sep) for name in names] - min_len = min(len(components) for components in split_names) - - def are_same(index: int) -> bool: - return all(c[index] == split_names[0][index] for c in split_names) - - prefix_len = 0 - while prefix_len + 1 < min_len and are_same(prefix_len): - prefix_len += 1 - - return [sep.join(components[prefix_len:]) for components in split_names] - - -def eprint(*args, **kwargs): - """Prints to stdandard error""" - print(*args, file=sys.stderr, **kwargs) - - -def format_timestamp(ts: int, millisecond_resolution: Optional[bool] = True) -> str: - if millisecond_resolution: - return datetime.fromtimestamp(ts, tz=UTC).strftime("%Y-%m-%d %H:%M:%S %z") - else: - return datetime.fromtimestamp(ts, tz=UTC).strftime("%Y-%m-%d %H:%M:%S") - - -def insert_multiple(col: List[T], new_items: List[T], positions: List[int]) -> List[T]: - """Inserts an item into a collection at given positions""" - result = [] - positions = set(positions) - new_items_iter = iter(new_items) - for i, x in enumerate(col): - if i in positions: - result.append(next(new_items_iter)) - result.append(x) - return result - - -@dataclass -class DateFormatError(ValueError): - message: str - - -def parse_datetime(date: Optional[str]) -> Optional[datetime]: - """ - Converts a human-readable string into a datetime object. - Accepts many formats and many languages, see dateparser package. - Raises DataFormatError if the input string format hasn't been recognized. - """ - if date is None: - return None - parsed: datetime = dateparser.parse(date, settings={"RETURN_AS_TIMEZONE_AWARE": True}) - if parsed is None: - raise DateFormatError(f"Invalid datetime value: {date}") - return parsed - - -def sliding_window(iterable, size): - """ - Returns an iterator which represents a sliding window over the given - collection. `size` denotes the size of the window. If the collection length - is less than the size, no items are yielded. - """ - iterable = iter(iterable) - window = deque(islice(iterable, size), maxlen=size) - for item in iterable: - yield tuple(window) - window.append(item) - if len(window) == size: - # needed because if iterable was already empty before the `for`, - # then the window would be yielded twice. - yield tuple(window) - - -def is_float(value) -> bool: - """Returns true if value can be converted to a float""" - try: - float(value) - return True - except ValueError: - return False - - -def is_datetime(value) -> bool: - """Returns true if value can be parsed as a date""" - try: - parse_datetime(value) - return True - except DateFormatError: - return False - - -def merge_dicts(d1: Dict, d2: Dict) -> OrderedDict: - """ - Returns a sum of two dictionaries, summing them left-to-right. - Lists and sets under the same key are added. - Dicts with the same key are merged recursively. - Simple values with the same key are overwritten (right dictionary wins). - Maintains the order of the sets. - """ - result = OrderedDict(d1) - for k in d2.keys(): - v1 = d1.get(k) - v2 = d2.get(k) - if v2 is None: - result[k] = v1 - elif v1 is None: - result[k] = v2 - elif isinstance(v1, Dict) and isinstance(v2, Dict): - result[k] = merge_dicts(v1, v2) - elif isinstance(v1, List) and isinstance(v2, List): - result[k] = v1 + v2 - elif isinstance(v1, Set) and isinstance(v2, Set): - result[k] = v1 | v2 - else: - result[k] = v2 - - return result - - -def merge_dict_list(dicts: List[Dict]) -> Dict: - """ - Returns a sum of dictionaries, summing them left-to-right. - Lists and sets under the same key are added. - Dicts with the same key are merged recursively. - Simple values with the same key are overwritten (rightmost dictionary wins). - """ - return reduce(merge_dicts, dicts, {}) - - -def interpolate(s: str, vars: Dict[str, List[str]]) -> List[str]: - """ - Replaces all occurrences of %{VARIABLE} with respective variable values looked up in the - vars dictionary. A variable is allowed to have more than one value assigned – - in this case one result string is returned per each combination of variable values. - - Example: - s = "name:%{NAME}" - vars = { "NAME": ["foo", "bar"] } - result = ["name:foo", "name:bar"] - """ - match = re.search("%{(\\w+)}", s) - if match: - var_name = match.group(1) - values = vars[var_name] - start, end = match.span(0) - before = s[0:start] - after = s[end:] - result = [] - remaining = interpolate(after, vars) - for suffix in remaining: - for v in values: - result.append(before + v + suffix) - return result - else: - return [s] diff --git a/requirements.txt b/requirements.txt index 30163d2..5cd4978 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,40 +4,44 @@ charset-normalizer==3.3.2 click==8.1.7 dateparser==1.2.0 DateTime==5.4 +decorator==5.1.1 dill==0.3.7 elastic-transport==8.11.0 elasticsearch==8.11.1 elasticsearch7==7.13.0 -expandvars==0.12.0 +expandvars==0.6.5 fmatch==0.0.3 gevent==23.9.1 greenlet==3.0.3 +hunter @ git+https://github.com/datastax-labs/hunter.git@8ff166979d000780ad548e49f006ef2a15d54123 idna==3.6 isort==5.13.2 mccabe==0.7.0 more-itertools==8.14.0 -numpy==1.26.3 +numpy==1.24.0 pandas==2.1.4 platformdirs==4.1.0 pylint==3.0.3 +pystache==0.6.5 python-dateutil==2.8.2 pytz==2023.3.post1 PyYAML==6.0.1 regex==2023.12.25 requests==2.31.0 -ruamel.yaml==0.18.5 +ruamel.yaml==0.17.21 ruamel.yaml.clib==0.2.8 scipy==1.12.0 signal-processing-algorithms==1.3.5 six==1.16.0 slack_sdk==3.26.2 structlog==19.2.0 -tabulate==0.9.0 +tabulate==0.8.10 tomlkit==0.12.3 typed-ast==1.5.5 typing-extensions==3.10.0.2 tzdata==2023.4 tzlocal==5.2 urllib3==1.26.18 +validators==0.18.2 zope.event==5.0 zope.interface==6.1