diff --git a/configs/small-scale-cluster-density.yaml b/configs/small-scale-cluster-density.yaml index 63646b2..0be784a 100644 --- a/configs/small-scale-cluster-density.yaml +++ b/configs/small-scale-cluster-density.yaml @@ -1,4 +1,6 @@ # This is a template file +datasource: + type: perfscale tests : - name : aws-small-scale-cluster-density-v2 index: ospst-perf-scale-ci-* diff --git a/configs/small-scale-node-density-cni.yaml b/configs/small-scale-node-density-cni.yaml index 48e3622..c2d88d0 100644 --- a/configs/small-scale-node-density-cni.yaml +++ b/configs/small-scale-node-density-cni.yaml @@ -1,4 +1,6 @@ # This is a template file +datasource: + type: perfscale tests : - name : aws-small-scale-node-density-cni index: ospst-perf-scale-ci-* diff --git a/configs/trt-payload-cluster-density.yaml b/configs/trt-payload-cluster-density.yaml index f161bc4..4810ece 100644 --- a/configs/trt-payload-cluster-density.yaml +++ b/configs/trt-payload-cluster-density.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : payload-cluster-density-v2 index: ospst-perf-scale-ci-* diff --git a/examples/label-small-scale-cluster-density.yaml b/examples/label-small-scale-cluster-density.yaml index 13b69fd..85459e9 100644 --- a/examples/label-small-scale-cluster-density.yaml +++ b/examples/label-small-scale-cluster-density.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : small-scale-cluster-density-v2 index: {{ es_metadata_index }} diff --git a/examples/payload-scale-415.yaml b/examples/payload-scale-415.yaml index 7d65d3e..f79237c 100644 --- a/examples/payload-scale-415.yaml +++ b/examples/payload-scale-415.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : aws-small-scale-cluster-density-v2 index: {{ es_metadata_index }} diff --git a/examples/payload-scale-416.yaml b/examples/payload-scale-416.yaml index 5adb728..e2e4674 100644 --- a/examples/payload-scale-416.yaml +++ b/examples/payload-scale-416.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : aws-small-scale-cluster-density-v2 index: {{ es_metadata_index }} diff --git a/examples/readout-control-plane-cdv2.yaml b/examples/readout-control-plane-cdv2.yaml index 8d52bee..8eb7b7d 100644 --- a/examples/readout-control-plane-cdv2.yaml +++ b/examples/readout-control-plane-cdv2.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : cluster-density-v2-24nodes index: {{ es_metadata_index }} diff --git a/examples/readout-control-plane-node-density.yaml b/examples/readout-control-plane-node-density.yaml index e7a73b3..d760850 100644 --- a/examples/readout-control-plane-node-density.yaml +++ b/examples/readout-control-plane-node-density.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : node-density-heavy-24nodes index: {{ es_metadata_index }} diff --git a/examples/readout-netperf-tcp.yaml b/examples/readout-netperf-tcp.yaml index db70044..5812516 100644 --- a/examples/readout-netperf-tcp.yaml +++ b/examples/readout-netperf-tcp.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : k8s-netperf-tcp index: {{ es_metadata_index }} diff --git a/examples/small-scale-cluster-density.yaml b/examples/small-scale-cluster-density.yaml index 2cb7d4a..acb1c4c 100644 --- a/examples/small-scale-cluster-density.yaml +++ b/examples/small-scale-cluster-density.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : aws-small-scale-cluster-density-v2 index: {{ es_metadata_index }} diff --git a/examples/small-scale-node-density-cni.yaml b/examples/small-scale-node-density-cni.yaml index e45119d..009fbc7 100644 --- a/examples/small-scale-node-density-cni.yaml +++ b/examples/small-scale-node-density-cni.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : aws-small-scale-node-density-cni index: {{ es_metadata_index }} diff --git a/examples/telco-cpu-util.yaml b/examples/telco-cpu-util.yaml new file mode 100644 index 0000000..e604d7e --- /dev/null +++ b/examples/telco-cpu-util.yaml @@ -0,0 +1,14 @@ +datasource: + type: telco +tests : + - name : telco_cpu_util_test + metadata: + test_type: cpu_util + ocp_version: 4.17 + + metrics : + - name: total_cpu + metric_of_interest: scenarios.scenario_name==idle.types.type_name==total.max_cpu + + - name : os_daemon + metric_of_interest: scenarios.scenario_name==idle.types.type_name==os_daemon.max_cpu diff --git a/examples/trt-external-payload-cluster-density.yaml b/examples/trt-external-payload-cluster-density.yaml index 613c604..ec885b8 100644 --- a/examples/trt-external-payload-cluster-density.yaml +++ b/examples/trt-external-payload-cluster-density.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : payload-cluster-density-v2 index: {{ es_metadata_index }} diff --git a/examples/trt-external-payload-crd-scale.yaml b/examples/trt-external-payload-crd-scale.yaml index b065b4d..74e662e 100644 --- a/examples/trt-external-payload-crd-scale.yaml +++ b/examples/trt-external-payload-crd-scale.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : payload-crd-scale index: {{ es_metadata_index }} diff --git a/examples/trt-external-payload-node-density.yaml b/examples/trt-external-payload-node-density.yaml index 700d9d9..cff3824 100644 --- a/examples/trt-external-payload-node-density.yaml +++ b/examples/trt-external-payload-node-density.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : payload-node-density index: {{ es_metadata_index }} diff --git a/examples/trt-payload-cluster-density.yaml b/examples/trt-payload-cluster-density.yaml index 192a7f0..9ea02f6 100644 --- a/examples/trt-payload-cluster-density.yaml +++ b/examples/trt-payload-cluster-density.yaml @@ -1,3 +1,5 @@ +datasource: + type: perfscale tests : - name : payload-cluster-density-v2 index: {{ es_metadata_index }} diff --git a/orion.py b/orion.py index 2a66e66..0b398d9 100644 --- a/orion.py +++ b/orion.py @@ -3,6 +3,7 @@ """ # pylint: disable = import-error, line-too-long, no-member +import asyncio import logging import sys import warnings @@ -116,6 +117,11 @@ def cli(max_content_width=120): # pylint: disable=unused-argument @click.option("--node-count", default=False, help="Match any node iterations count") @click.option("--lookback-size", type=int, default=10000, help="Maximum number of entries to be looked back") def cmd_analysis(**kwargs): + """Dummy function for asyncio + """ + asyncio.run(_cmd_analysis_async(**kwargs)) + +async def _cmd_analysis_async(**kwargs): """ Orion runs on command line mode, and helps in detecting regressions """ @@ -123,7 +129,7 @@ def cmd_analysis(**kwargs): logger_instance = SingletonLogger(debug=level, name="Orion") logger_instance.info("🏹 Starting Orion in command-line mode") kwargs["configMap"] = load_config(kwargs["config"]) - output, regression_flag = run(**kwargs) + output, regression_flag = await run(**kwargs) if output is None: logger_instance.error("Terminating test") sys.exit(0) diff --git a/pkg/algorithms/algorithm.py b/pkg/algorithms/algorithm.py index dd85e47..4619747 100644 --- a/pkg/algorithms/algorithm.py +++ b/pkg/algorithms/algorithm.py @@ -32,6 +32,7 @@ def __init__( # pylint: disable = too-many-arguments self.metrics_config = metrics_config self.regression_flag = False + def output_json(self) -> Tuple[str, str, bool]: """Method to output json output @@ -39,7 +40,7 @@ def output_json(self) -> Tuple[str, str, bool]: Tuple[str, str]: returns test_name and json output """ _, change_points_by_metric = self._analyze() - dataframe_json = self.dataframe.to_json(orient="records") + dataframe_json = self.dataframe.to_json(orient="records", default_handler=str) dataframe_json = json.loads(dataframe_json) for index, entry in enumerate(dataframe_json): @@ -57,8 +58,8 @@ def output_json(self) -> Tuple[str, str, bool]: / change_point.stats.mean_1 ) * 100 if ( - percentage_change * self.metrics_config[key]["direction"] > 0 - or self.metrics_config[key]["direction"] == 0 + percentage_change * self.metrics_config[key].get("direction",0) > 0 + or self.metrics_config[key].get("direction",0) == 0 ): dataframe_json[index]["metrics"][key][ "percentage_change" diff --git a/pkg/algorithms/edivisive/edivisive.py b/pkg/algorithms/edivisive/edivisive.py index 5d2143a..aebad32 100644 --- a/pkg/algorithms/edivisive/edivisive.py +++ b/pkg/algorithms/edivisive/edivisive.py @@ -15,15 +15,20 @@ class EDivisive(Algorithm): def _analyze(self): self.dataframe["timestamp"] = pd.to_datetime(self.dataframe["timestamp"]) - self.dataframe["timestamp"] = self.dataframe["timestamp"].astype(int) // 10**9 + self.dataframe["timestamp"] = self.dataframe["timestamp"].astype(int) + first_timestamp = self.dataframe["timestamp"].dropna().iloc[0] + if first_timestamp > 1_000_000_000_000: + self.dataframe["timestamp"] = self.dataframe["timestamp"].astype('int64') // 10**9 + else: + self.dataframe["timestamp"] = self.dataframe["timestamp"].astype('int64') series= self.setup_series() change_points_by_metric = series.analyze().change_points # filter by direction for metric, changepoint_list in change_points_by_metric.items(): for i in range(len(changepoint_list)-1, -1, -1): - if ((self.metrics_config[metric]["direction"] == 1 and changepoint_list[i].stats.mean_1 > changepoint_list[i].stats.mean_2) or - (self.metrics_config[metric]["direction"] == -1 and changepoint_list[i].stats.mean_1 < changepoint_list[i].stats.mean_2) ): + if ((self.metrics_config[metric].get("direction",0) == 1 and changepoint_list[i].stats.mean_1 > changepoint_list[i].stats.mean_2) or + (self.metrics_config[metric].get("direction",0) == -1 and changepoint_list[i].stats.mean_1 < changepoint_list[i].stats.mean_2) ): del changepoint_list[i] if [val for li in change_points_by_metric.values() for val in li]: self.regression_flag=True diff --git a/pkg/datasources/__init__.py b/pkg/datasources/__init__.py new file mode 100644 index 0000000..c311df5 --- /dev/null +++ b/pkg/datasources/__init__.py @@ -0,0 +1,7 @@ +""" +Datasource module for orion +""" +from .datasource import Datasource +from .datasourceFactory import DatasourceFactory +from .perfscale import PerfscaleDatasource +from .telco import TelcoDatasource diff --git a/pkg/datasources/datasource.py b/pkg/datasources/datasource.py new file mode 100644 index 0000000..15d32ac --- /dev/null +++ b/pkg/datasources/datasource.py @@ -0,0 +1,28 @@ +# pylint: disable = R0903, E0211 +""" +Generic Datasource implementation +""" +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Dict, Any + +class Datasource(ABC): + """Generic method for Datasource + + Args: + ABC (_type_): _description_ + """ + + def __init__(self, test: Dict[str, Any], + match: Any, + options: Dict[str, Any], + start_timestamp: datetime,): + self.test = test + self.match = match + self.options = options + self.start_timestamp = start_timestamp + + @abstractmethod + def process_test(self): + """Unimplemented process test function + """ diff --git a/pkg/datasources/datasourceFactory.py b/pkg/datasources/datasourceFactory.py new file mode 100644 index 0000000..786edbc --- /dev/null +++ b/pkg/datasources/datasourceFactory.py @@ -0,0 +1,51 @@ +# pylint: disable = R0903, E0211 +""" +Generate datasource factory +""" +from datetime import datetime +from typing import Any, Dict +from fmatch.matcher import Matcher +from fmatch.splunk_matcher import SplunkMatcher +from fmatch.logrus import SingletonLogger +from .perfscale import PerfscaleDatasource +from .telco import TelcoDatasource + + +class DatasourceFactory: + """Datasource Factory implementation + """ + def instantiate_datasource(self, datasource:str, + test: Dict[str, Any], + options: Dict[str, Any], + start_timestamp: datetime): + """Sets the datasource type + + Args: + datasource (str): _description_ + test (Dict[str, Any]): _description_ + options (Dict[str, Any]): _description_ + start_timestamp (datetime): _description_ + + Returns: + Datasource: _description_ + """ + logger_instance = SingletonLogger.getLogger("Orion") + if datasource["type"]=="perfscale": + match = Matcher( + index=test["index"], + level=logger_instance.level, + ES_URL=datasource["ES_SERVER"], + verify_certs=False, + ) + return PerfscaleDatasource(test, match, options, start_timestamp), match + if datasource["type"]=="telco": + match = SplunkMatcher( + host= datasource.get("host"), + port= datasource.get("port"), + username= datasource.get("username"), + password= datasource.get("password"), + indice=datasource.get("indice") + ) + return TelcoDatasource(test, match, options, start_timestamp), match + return None, None + \ No newline at end of file diff --git a/pkg/datasources/perfscale/__init__.py b/pkg/datasources/perfscale/__init__.py new file mode 100644 index 0000000..ddfd028 --- /dev/null +++ b/pkg/datasources/perfscale/__init__.py @@ -0,0 +1,4 @@ +""" +Perfscale datasource init module +""" +from .perfscale import PerfscaleDatasource diff --git a/pkg/datasources/perfscale/perfscale.py b/pkg/datasources/perfscale/perfscale.py new file mode 100644 index 0000000..104807c --- /dev/null +++ b/pkg/datasources/perfscale/perfscale.py @@ -0,0 +1,326 @@ +# pylint: disable = R0903, E0211, R0914, R0913, W0718 +""" +Perfscale datasource +""" +from functools import reduce +import re +from typing import Dict, Any, List +from fmatch.matcher import Matcher +from fmatch.logrus import SingletonLogger +import pandas as pd +import pyshorteners +from pkg.datasources.datasource import Datasource +from pkg.utils import extract_metadata_from_test + +class PerfscaleDatasource(Datasource): + """Perfscale workflow intended datasource + + Args: + Datasource (_type_): _description_ + """ + def process_test(self): + """generate the dataframe for the test given + + Args: + test (_type_): test from process test + match (_type_): matcher object + logger (_type_): logger object + output (_type_): output file name + + Returns: + _type_: merged dataframe + """ + logger = SingletonLogger.getLogger("Orion") + logger.info("The test %s has started", self.test["name"]) + fingerprint_index = self.test["index"] + + # getting metadata + metadata = ( + extract_metadata_from_test(self.test) + if self.options["uuid"] in ("", None) + else get_metadata_with_uuid(self.options["uuid"], self.match) + ) + # get uuids, buildUrls matching with the metadata + print(fingerprint_index) + runs = self.match.get_uuid_by_metadata( + metadata, + fingerprint_index, + lookback_date=self.start_timestamp, + lookback_size=self.options["lookback_size"], + ) + uuids = [run["uuid"] for run in runs] + buildUrls = {run["uuid"]: run["buildUrl"] for run in runs} + # get uuids if there is a baseline + if self.options["baseline"] not in ("", None): + uuids = [ + uuid for uuid in re.split(r" |,", self.options["baseline"]) if uuid + ] + uuids.append(self.options["uuid"]) + buildUrls = get_build_urls(fingerprint_index, uuids, self.match) + elif not uuids: + logger.info("No UUID present for given metadata") + return None, None + + benchmark_index = self.test["benchmarkIndex"] + + uuids = filter_uuids_on_index( + metadata, + benchmark_index, + uuids, + self.match, + self.options["baseline"], + self.options["node_count"], + ) + # get metrics data and dataframe + metrics = self.test["metrics"] + dataframe_list, metrics_config = get_metric_data( + uuids, benchmark_index, metrics, self.match + ) + # check and filter for multiple timestamp values for each run + for i, df in enumerate(dataframe_list): + if i != 0 and ("timestamp" in df.columns): + dataframe_list[i] = df.drop(columns=["timestamp"]) + # merge the dataframe with all metrics + if dataframe_list: + merged_df = reduce( + lambda left, right: pd.merge(left, right, on="uuid", how="inner"), + dataframe_list, + ) + else: + return None, metrics_config + shortener = pyshorteners.Shortener(timeout=10) + merged_df["buildUrl"] = merged_df["uuid"].apply( + lambda uuid: ( + shorten_url(shortener, buildUrls[uuid]) + if self.options["convert_tinyurl"] + else buildUrls[uuid] + ) + # pylint: disable = cell-var-from-loop + ) + merged_df = merged_df.reset_index(drop=True) + # save the dataframe + output_file_path = ( + f"{self.options['save_data_path'].split('.')[0]}-{self.test['name']}.csv" + ) + self.match.save_results(merged_df, csv_file_path=output_file_path) + return merged_df, metrics_config + + +def get_metadata_with_uuid(uuid_gen: str, match: Matcher) -> Dict[Any, Any]: + """Gets metadata of the run from each test + + Args: + uuid (str): str of uuid ot find metadata of + match: the fmatch instance + + + Returns: + dict: dictionary of the metadata + """ + logger_instance = SingletonLogger.getLogger("Orion") + test = match.get_metadata_by_uuid(uuid_gen) + metadata = { + "platform": "", + "clusterType": "", + "masterNodesCount": 0, + "workerNodesCount": 0, + "infraNodesCount": 0, + "masterNodesType": "", + "workerNodesType": "", + "infraNodesType": "", + "totalNodesCount": 0, + "ocpVersion": "", + "networkType": "", + "ipsec": "", + "fips": "", + "encrypted": "", + "publish": "", + "computeArch": "", + "controlPlaneArch": "", + } + for k, v in test.items(): + if k not in metadata: + continue + metadata[k] = v + metadata["benchmark.keyword"] = test["benchmark"] + metadata["ocpVersion"] = str(metadata["ocpVersion"]) + + # Remove any keys that have blank values + no_blank_meta = {k: v for k, v in metadata.items() if v} + logger_instance.debug("No blank metadata dict: " + str(no_blank_meta)) + return no_blank_meta + + +def get_build_urls(index: str, uuids: List[str], match: Matcher): + """Gets metadata of the run from each test + to get the build url + + Args: + uuids (list): str list of uuid to find build urls of + match: the fmatch instance + + + Returns: + dict: dictionary of the metadata + """ + + test = match.getResults("", uuids, index, {}) + buildUrls = {run["uuid"]: run["buildUrl"] for run in test} + return buildUrls + + +def filter_uuids_on_index( + metadata: Dict[str, Any], + fingerprint_index: str, + uuids: List[str], + match: Matcher, + baseline: str, + filter_node_count: bool, +) -> List[str]: + """returns the index to be used and runs as uuids + + Args: + metadata (_type_): metadata from config + uuids (_type_): uuids collected + match (_type_): Matcher object + + Returns: + _type_: index and uuids + """ + if metadata["benchmark.keyword"] in ["ingress-perf", "k8s-netperf"]: + return uuids + if baseline == "" and not filter_node_count: + runs = match.match_kube_burner(uuids, fingerprint_index) + ids = match.filter_runs(runs, runs) + else: + ids = uuids + return ids + + +# pylint: disable=too-many-locals +def get_metric_data( + uuids: List[str], index: str, metrics: Dict[str, Any], match: Matcher +) -> List[pd.DataFrame]: + """Gets details metrics based on metric yaml list + + Args: + ids (list): list of all uuids + index (dict): index in es of where to find data + metrics (dict): metrics to gather data on + match (Matcher): current matcher instance + logger (logger): log data to one output + + Returns: + dataframe_list: dataframe of the all metrics + """ + logger_instance = SingletonLogger.getLogger("Orion") + dataframe_list = [] + metrics_config = {} + + for metric in metrics: + metric_name = metric["name"] + metric_value_field = metric["metric_of_interest"] + + labels = metric.pop("labels", None) + direction = int(metric.pop("direction", 0)) + + logger_instance.info("Collecting %s", metric_name) + try: + if "agg" in metric: + metric_df, metric_dataframe_name = process_aggregation_metric( + uuids, index, metric, match + ) + else: + metric_df, metric_dataframe_name = process_standard_metric( + uuids, index, metric, match, metric_value_field + ) + + metric["labels"] = labels + metric["direction"] = direction + metrics_config[metric_dataframe_name] = metric + dataframe_list.append(metric_df) + logger_instance.debug(metric_df) + except Exception as e: + logger_instance.error( + "Couldn't get metrics %s, exception %s", + metric_name, + e, + ) + return dataframe_list, metrics_config + + +def shorten_url(shortener: any, uuids: str) -> str: + """Shorten url if there is a list of buildUrls + + Args: + shortener (any): shortener object to use tinyrl.short on + uuids (List[str]): List of uuids to shorten + + Returns: + str: a combined string of shortened urls + """ + short_url_list = [] + for buildUrl in uuids.split(","): + short_url_list.append(shortener.tinyurl.short(buildUrl)) + short_url = ",".join(short_url_list) + return short_url + + +def process_aggregation_metric( + uuids: List[str], index: str, metric: Dict[str, Any], match: Matcher +) -> pd.DataFrame: + """Method to get aggregated dataframe + + Args: + uuids (List[str]): _description_ + index (str): _description_ + metric (Dict[str, Any]): _description_ + match (Matcher): _description_ + + Returns: + pd.DataFrame: _description_ + """ + aggregated_metric_data = match.get_agg_metric_query(uuids, index, metric) + aggregation_value = metric["agg"]["value"] + aggregation_type = metric["agg"]["agg_type"] + aggregation_name = f"{aggregation_value}_{aggregation_type}" + aggregated_df = match.convert_to_df( + aggregated_metric_data, columns=["uuid", "timestamp", aggregation_name] + ) + aggregated_df = aggregated_df.drop_duplicates(subset=["uuid"], keep="first") + aggregated_metric_name = f"{metric['name']}_{aggregation_type}" + aggregated_df = aggregated_df.rename( + columns={aggregation_name: aggregated_metric_name} + ) + return aggregated_df, aggregated_metric_name + + +def process_standard_metric( + uuids: List[str], + index: str, + metric: Dict[str, Any], + match: Matcher, + metric_value_field: str, +) -> pd.DataFrame: + """Method to get dataframe of standard metric + + Args: + uuids (List[str]): _description_ + index (str): _description_ + metric (Dict[str, Any]): _description_ + match (Matcher): _description_ + metric_value_field (str): _description_ + + Returns: + pd.DataFrame: _description_ + """ + standard_metric_data = match.getResults("", uuids, index, metric) + standard_metric_df = match.convert_to_df( + standard_metric_data, columns=["uuid", "timestamp", metric_value_field] + ) + standard_metric_name = f"{metric['name']}_{metric_value_field}" + standard_metric_df = standard_metric_df.rename( + columns={metric_value_field: standard_metric_name} + ) + standard_metric_df = standard_metric_df.drop_duplicates() + return standard_metric_df, standard_metric_name diff --git a/pkg/datasources/telco/__init__.py b/pkg/datasources/telco/__init__.py new file mode 100644 index 0000000..eaea854 --- /dev/null +++ b/pkg/datasources/telco/__init__.py @@ -0,0 +1,4 @@ +""" +Telco Datasource init module +""" +from .telco import TelcoDatasource diff --git a/pkg/datasources/telco/telco.py b/pkg/datasources/telco/telco.py new file mode 100644 index 0000000..4bac1a8 --- /dev/null +++ b/pkg/datasources/telco/telco.py @@ -0,0 +1,165 @@ +# pylint: disable = R0903, E0211, W0236 +""" +Telco datasource +""" + +from datetime import datetime, timedelta +import hashlib +import json +from typing import Dict, Any, Tuple +import uuid + +import pandas as pd +from fmatch.logrus import SingletonLogger +from pkg.datasources.datasource import Datasource +from pkg.utils import extract_metadata_from_test + + +class TelcoDatasource(Datasource): + """Telco datasource + + Args: + Datasource (_type_): _description_ + """ + + async def process_test(self) -> Tuple[pd.DataFrame, Dict[str, Any]]: + """processing splunk data + + Args: + test (Dict[str, Any]): splunk test + match (SplunkMatcher): splunk matcher + options (Dict[str, Any]): options for further use + + Returns: + Tuple[pd.DataFrame, Dict[str, Any]]: _description_ + """ + + logger = SingletonLogger.getLogger("Orion") + logger.info("The test %s has started", self.test["name"]) + metadata = extract_metadata_from_test(self.test) + logger.debug(f"Collected metadata {metadata}") + start_timestamp = None + if isinstance(self.start_timestamp, datetime): + start_timestamp = self.start_timestamp + else: + start_timestamp = ( + datetime.strptime(self.start_timestamp, "%Y-%m-%d %H:%M:%S") + if self.start_timestamp + else datetime.now() - timedelta(days=30) + ) + logger.debug(f"start timestamps for the test is {start_timestamp}") + searchList = " AND ".join( + [f'{key}="{value}"' for key, value in metadata.items()] + ) + query = { + "earliest_time": f"{start_timestamp.strftime('%Y-%m-%d')}T00:00:00", + "latest_time": f"{datetime.now().strftime('%Y-%m-%d')}T23:59:59", + "output_mode": "json", + } + logger.debug(f"Executing query {searchList}") + data = await self.match.query( + query=query, searchList=searchList, max_results=10000 + ) + seen = set() + unique_data = [] + for d in data: + # Serialize the dictionary into a JSON string (sorted for consistency) + serialized = json.dumps(d, sort_keys=True) + if serialized not in seen: + seen.add(serialized) + unique_data.append(d) + data = unique_data + # print(json.dumps(data[1],indent =4)) + logger.debug(f"Collected data {data}") + metrics = self.test["metrics"] + dataframe_list, metrics_config = get_splunk_metrics(data, metrics) + + return dataframe_list, metrics_config + + +def generate_uuid(record): + """Generates uuid based on hash value of record + + Args: + record (dict): _description_ + + Returns: + _type_: _description_ + """ + # Convert the record to a string format suitable for hashing + record_string = str(record) + # Create a hash of the record string + hash_object = hashlib.md5( + record_string.encode("utf-8") + ) # You can use other hash functions if needed + # Create a UUID from the hash + return uuid.UUID(hash_object.hexdigest()) + + +def get_splunk_metrics(data: dict, metrics: dict) -> Tuple[pd.DataFrame, dict]: + """gets metrics from splunk data + + Args: + data (dict): data with all the metrics + metrics (dict): metrics needed to extracted + + Returns: + Tuple[pd.DataFrame, dict]: _description_ + """ + logger_instance = SingletonLogger.getLogger("Orion") + dataframe_rows = [] + metrics_config = {} + + for metric in metrics: + logger_instance.info(f"Collecting metric {metric['name']}") + + for record in data: + timestamp = int(record["timestamp"]) + record = record["data"] + record_uuid = generate_uuid(record) + row_data = { + "uuid": record_uuid, + "timestamp": timestamp, + "buildUrl": "https://placeholder.com/" + + record["cluster_artifacts"]["ref"]["jenkins_build"], + } + + for metric in metrics: + metric_name = metric["name"] + metric_value_field = metric["metric_of_interest"] + metric_value = get_nested_value(record, metric_value_field) + row_data[metric_name] = metric_value + metrics_config[metric_name] = metric + + dataframe_rows.append(row_data) + + df = pd.DataFrame(dataframe_rows) + df.dropna(inplace=True) + df.sort_values(by="timestamp", inplace=True) + df.reset_index(drop=True, inplace=True) + + logger_instance.info(f"Generated DataFrame with {len(df)} rows") + return df, metrics_config + + +def get_nested_value(record, keys, default=None): + """Recursively traverse a nested dictionary/list to get a value based on dot-separated keys.""" + keys = keys.split(".") + for key in keys: + if isinstance(record, dict): + record = record.get(key, default) + elif isinstance(record, list): + # For lists, we assume the user wants to filter based on some condition + key_parts = key.split("==") + if len(key_parts) == 2: + filter_key, filter_value = key_parts[0], key_parts[1].strip('"') + # Look for a matching dict in the list + record = next( + (item for item in record if item.get(filter_key) == filter_value), + default, + ) + else: + return default # Key format is incorrect, return default + else: + return default # If it's neither dict nor list, return default + return record diff --git a/pkg/runTest.py b/pkg/runTest.py index 990bfde..d831e01 100644 --- a/pkg/runTest.py +++ b/pkg/runTest.py @@ -1,13 +1,16 @@ """ run test """ + +import asyncio import sys from typing import Any, Dict -from fmatch.matcher import Matcher from fmatch.logrus import SingletonLogger from pkg.algorithms import AlgorithmFactory import pkg.constants as cnsts -from pkg.utils import get_datasource, process_test, get_subtracted_timestamp +from pkg.utils import get_datasource, get_subtracted_timestamp +from pkg.datasources import DatasourceFactory + def get_algorithm_type(kwargs): """Switch Case of getting algorithm name @@ -22,13 +25,14 @@ def get_algorithm_type(kwargs): algorithm_name = cnsts.EDIVISIVE elif kwargs["anomaly_detection"]: algorithm_name = cnsts.ISOLATION_FOREST - elif kwargs['cmr']: + elif kwargs["cmr"]: algorithm_name = cnsts.CMR else: algorithm_name = None return algorithm_name -def run(**kwargs: dict[str, Any]) -> dict[str, Any]: #pylint: disable = R0914 + +async def run(**kwargs: dict[str, Any]) -> dict[str, Any]: # pylint: disable = R0914 """run method to start the tests Args: @@ -45,25 +49,29 @@ def run(**kwargs: dict[str, Any]) -> dict[str, Any]: #pylint: disable = R0914 datasource = get_datasource(config_map) result_output = {} regression_flag = False + fingerprint_matched_df, metrics_config = None, None for test in config_map["tests"]: # Create fingerprint Matcher - matcher = Matcher( - index=test["index"], - level=logger_instance.level, - ES_URL=datasource, - verify_certs=False, - ) start_timestamp = get_start_timestamp(kwargs) - - fingerprint_matched_df, metrics_config = process_test( - test, - matcher, - kwargs, - start_timestamp, + datasourceFactory = DatasourceFactory() + datasource_object, matcher = datasourceFactory.instantiate_datasource( + datasource=datasource, + test=test, + options=kwargs, + start_timestamp=start_timestamp, ) + if asyncio.iscoroutinefunction(datasource_object.process_test): + fingerprint_matched_df, metrics_config = ( + await datasource_object.process_test() + ) + else: + fingerprint_matched_df, metrics_config = datasource_object.process_test() if fingerprint_matched_df is None: - sys.exit(3) # No data present + sys.exit(3) # No data present + logger_instance.debug( + f"Collected dataframe {fingerprint_matched_df},\n metrics {metrics_config}" + ) algorithm_name = get_algorithm_type(kwargs) if algorithm_name is None: @@ -71,14 +79,15 @@ def run(**kwargs: dict[str, Any]) -> dict[str, Any]: #pylint: disable = R0914 algorithmFactory = AlgorithmFactory() algorithm = algorithmFactory.instantiate_algorithm( - algorithm_name, - matcher, - fingerprint_matched_df, - test, - kwargs, - metrics_config, - ) + algorithm_name, + matcher, + fingerprint_matched_df, + test, + kwargs, + metrics_config, + ) testname, result_data, test_flag = algorithm.output(kwargs["output_format"]) + logger_instance.debug(f"Result data for test {testname}, {result_data}") result_output[testname] = result_data regression_flag = regression_flag or test_flag return result_output, regression_flag diff --git a/pkg/utils.py b/pkg/utils.py index 94cf457..cd96b58 100644 --- a/pkg/utils.py +++ b/pkg/utils.py @@ -1,117 +1,22 @@ -# pylint: disable=cyclic-import +# pylint: disable=cyclic-import, R0914 # pylint: disable = line-too-long, too-many-arguments, consider-using-enumerate, broad-exception-caught """ module for all utility functions orion uses """ # pylint: disable = import-error -from functools import reduce import os import re import sys import xml.etree.ElementTree as ET import xml.dom.minidom from datetime import datetime, timedelta, timezone -from typing import List, Any, Dict, Tuple -from tabulate import tabulate -from fmatch.matcher import Matcher +from typing import Any, Dict from fmatch.logrus import SingletonLogger +from tabulate import tabulate import pandas as pd -import pyshorteners - - - - -# pylint: disable=too-many-locals -def get_metric_data( - uuids: List[str], index: str, metrics: Dict[str, Any], match: Matcher -) -> List[pd.DataFrame]: - """Gets details metrics based on metric yaml list - - Args: - ids (list): list of all uuids - index (dict): index in es of where to find data - metrics (dict): metrics to gather data on - match (Matcher): current matcher instance - logger (logger): log data to one output - - Returns: - dataframe_list: dataframe of the all metrics - """ - logger_instance = SingletonLogger.getLogger("Orion") - dataframe_list = [] - metrics_config = {} - - for metric in metrics: - metric_name = metric["name"] - metric_value_field = metric["metric_of_interest"] - - labels = metric.pop("labels", None) - direction = int(metric.pop("direction", 0)) - - logger_instance.info("Collecting %s", metric_name) - try: - if "agg" in metric: - metric_df, metric_dataframe_name = process_aggregation_metric(uuids, index, metric, match) - else: - metric_df, metric_dataframe_name = process_standard_metric(uuids, index, metric, match, metric_value_field) - metric["labels"] = labels - metric["direction"] = direction - metrics_config[metric_dataframe_name] = metric - dataframe_list.append(metric_df) - logger_instance.debug(metric_df) - except Exception as e: - logger_instance.error( - "Couldn't get metrics %s, exception %s", - metric_name, - e, - ) - return dataframe_list, metrics_config - -def process_aggregation_metric( - uuids: List[str], index: str, metric: Dict[str, Any], match: Matcher -) -> pd.DataFrame: - """Method to get aggregated dataframe - Args: - uuids (List[str]): _description_ - index (str): _description_ - metric (Dict[str, Any]): _description_ - match (Matcher): _description_ - - Returns: - pd.DataFrame: _description_ - """ - aggregated_metric_data = match.get_agg_metric_query(uuids, index, metric) - aggregation_value = metric["agg"]["value"] - aggregation_type = metric["agg"]["agg_type"] - aggregation_name = f"{aggregation_value}_{aggregation_type}" - aggregated_df = match.convert_to_df(aggregated_metric_data, columns=["uuid", "timestamp", aggregation_name]) - aggregated_df = aggregated_df.drop_duplicates(subset=["uuid"], keep="first") - aggregated_metric_name = f"{metric['name']}_{aggregation_type}" - aggregated_df = aggregated_df.rename(columns={aggregation_name: aggregated_metric_name}) - return aggregated_df, aggregated_metric_name - -def process_standard_metric(uuids: List[str], index: str, metric: Dict[str, Any], match: Matcher, metric_value_field: str) -> pd.DataFrame: - """Method to get dataframe of standard metric - - Args: - uuids (List[str]): _description_ - index (str): _description_ - metric (Dict[str, Any]): _description_ - match (Matcher): _description_ - metric_value_field (str): _description_ - - Returns: - pd.DataFrame: _description_ - """ - standard_metric_data = match.getResults("",uuids, index, metric) - standard_metric_df = match.convert_to_df(standard_metric_data, columns=["uuid", "timestamp", metric_value_field]) - standard_metric_name = f"{metric['name']}_{metric_value_field}" - standard_metric_df = standard_metric_df.rename(columns={metric_value_field: standard_metric_name}) - standard_metric_df = standard_metric_df.drop_duplicates() - return standard_metric_df, standard_metric_name def extract_metadata_from_test(test: Dict[str, Any]) -> Dict[Any, Any]: """Gets metadata of the run from each test @@ -124,15 +29,11 @@ def extract_metadata_from_test(test: Dict[str, Any]) -> Dict[Any, Any]: """ logger_instance = SingletonLogger.getLogger("Orion") metadata = test["metadata"] - metadata["ocpVersion"] = str(metadata["ocpVersion"]) + metadata = {key: str(value) for key, value in metadata.items()} logger_instance.debug("metadata" + str(metadata)) return metadata - - - - -def get_datasource(data: Dict[Any, Any]) -> str: +def get_datasource(data: Dict[Any, Any]) -> dict: """Gets es url from config or env Args: @@ -143,134 +44,27 @@ def get_datasource(data: Dict[Any, Any]) -> str: str: es url """ logger_instance = SingletonLogger.getLogger("Orion") - if "ES_SERVER" in data.keys(): - return data["ES_SERVER"] - if "ES_SERVER" in os.environ: - return os.environ.get("ES_SERVER") - logger_instance.error("ES_SERVER environment variable/config variable not set") + if data["datasource"]["type"].lower() == "telco": + datasource = data["datasource"] + datasource_config = {"host": os.environ.get("SPLUNK_HOST", datasource.get("host","")), + "port": os.environ.get("SPLUNK_PORT", datasource.get("port","")), + "username": os.environ.get("SPLUNK_USERNAME", datasource.get("username","")), + "password": os.environ.get("SPLUNK_PASSWORD", datasource.get("password","")), + "indice": os.environ.get("SPLUNK_INDICE", datasource.get("indice",""))} + datasource.update(datasource_config) + return datasource + if data["datasource"]["type"].lower() == "perfscale": + if "ES_SERVER" in data["datasource"].keys(): + return data["datasource"] + if "ES_SERVER" in os.environ: + datasource = data["datasource"] + datasource.update({"ES_SERVER":os.environ.get("ES_SERVER")}) + return datasource + + logger_instance.error("Datasurce variable/config variable not set") sys.exit(1) -def filter_uuids_on_index( - metadata: Dict[str, Any], - fingerprint_index: str, - uuids: List[str], - match: Matcher, - baseline: str, - filter_node_count: bool -) -> List[str]: - """returns the index to be used and runs as uuids - - Args: - metadata (_type_): metadata from config - uuids (_type_): uuids collected - match (_type_): Matcher object - - Returns: - _type_: index and uuids - """ - if metadata["benchmark.keyword"] in ["ingress-perf", "k8s-netperf"]: - return uuids - if baseline == "" and not filter_node_count: - runs = match.match_kube_burner(uuids, fingerprint_index) - ids = match.filter_runs(runs, runs) - else: - ids = uuids - return ids - - -def get_build_urls(index: str, uuids: List[str], match: Matcher): - """Gets metadata of the run from each test - to get the build url - - Args: - uuids (list): str list of uuid to find build urls of - match: the fmatch instance - - - Returns: - dict: dictionary of the metadata - """ - - test = match.getResults("", uuids, index, {}) - buildUrls = {run["uuid"]: run["buildUrl"] for run in test} - return buildUrls - - -def process_test( - test: Dict[str, Any], - match: Matcher, - options: Dict[str, Any], - start_timestamp: datetime, -) -> Tuple[pd.DataFrame, Dict[str, Any]]: - """generate the dataframe for the test given - - Args: - test (_type_): test from process test - match (_type_): matcher object - logger (_type_): logger object - output (_type_): output file name - - Returns: - _type_: merged dataframe - """ - logger = SingletonLogger.getLogger("Orion") - logger.info("The test %s has started", test["name"]) - fingerprint_index = test["index"] - - # getting metadata - metadata = extract_metadata_from_test(test) if options["uuid"] in ("", None) else get_metadata_with_uuid(options["uuid"], match) - # get uuids, buildUrls matching with the metadata - runs = match.get_uuid_by_metadata(metadata, fingerprint_index, lookback_date=start_timestamp, lookback_size=options['lookback_size']) - uuids = [run["uuid"] for run in runs] - buildUrls = {run["uuid"]: run["buildUrl"] for run in runs} - # get uuids if there is a baseline - if options["baseline"] not in ("", None): - uuids = [uuid for uuid in re.split(r" |,", options["baseline"]) if uuid] - uuids.append(options["uuid"]) - buildUrls = get_build_urls(fingerprint_index, uuids, match) - elif not uuids: - logger.info("No UUID present for given metadata") - return None, None - - benchmark_index = test["benchmarkIndex"] - - uuids = filter_uuids_on_index( - metadata, benchmark_index, uuids, match, options["baseline"], options['node_count'] - ) - # get metrics data and dataframe - metrics = test["metrics"] - dataframe_list, metrics_config = get_metric_data( - uuids, benchmark_index, metrics, match - ) - # check and filter for multiple timestamp values for each run - for i, df in enumerate(dataframe_list): - if i != 0 and ("timestamp" in df.columns): - dataframe_list[i] = df.drop(columns=["timestamp"]) - # merge the dataframe with all metrics - if dataframe_list: - merged_df = reduce( - lambda left, right: pd.merge(left, right, on="uuid", how="inner"), - dataframe_list, - ) - else: - return None, metrics_config - shortener = pyshorteners.Shortener(timeout=10) - merged_df["buildUrl"] = merged_df["uuid"].apply( - lambda uuid: ( - shorten_url(shortener, buildUrls[uuid]) - if options["convert_tinyurl"] - else buildUrls[uuid] - ) - - # pylint: disable = cell-var-from-loop - ) - merged_df=merged_df.reset_index(drop=True) - #save the dataframe - output_file_path = f"{options['save_data_path'].split('.')[0]}-{test['name']}.csv" - match.save_results(merged_df, csv_file_path=output_file_path) - return merged_df, metrics_config - def shorten_url(shortener: any, uuids: str) -> str: """Shorten url if there is a list of buildUrls @@ -287,50 +81,6 @@ def shorten_url(shortener: any, uuids: str) -> str: short_url = ','.join(short_url_list) return short_url -def get_metadata_with_uuid(uuid: str, match: Matcher) -> Dict[Any, Any]: - """Gets metadata of the run from each test - - Args: - uuid (str): str of uuid ot find metadata of - match: the fmatch instance - - - Returns: - dict: dictionary of the metadata - """ - logger_instance = SingletonLogger.getLogger("Orion") - test = match.get_metadata_by_uuid(uuid) - metadata = { - "platform": "", - "clusterType": "", - "masterNodesCount": 0, - "workerNodesCount": 0, - "infraNodesCount": 0, - "masterNodesType": "", - "workerNodesType": "", - "infraNodesType": "", - "totalNodesCount": 0, - "ocpVersion": "", - "networkType": "", - "ipsec": "", - "fips": "", - "encrypted": "", - "publish": "", - "computeArch": "", - "controlPlaneArch": "", - } - for k, v in test.items(): - if k not in metadata: - continue - metadata[k] = v - metadata["benchmark.keyword"] = test["benchmark"] - metadata["ocpVersion"] = str(metadata["ocpVersion"]) - - # Remove any keys that have blank values - no_blank_meta = {k: v for k, v in metadata.items() if v} - logger_instance.debug("No blank metadata dict: " + str(no_blank_meta)) - return no_blank_meta - def json_to_junit( test_name: str, @@ -355,7 +105,7 @@ def json_to_junit( test_count = 0 for metric, value in metrics_config.items(): test_count += 1 - labels = value["labels"] + labels = value.get("labels",[]) label_string = " ".join(labels) if labels else "" testcase = ET.SubElement( testsuite,