diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index b2271c49..241a0d0e 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -31,8 +31,9 @@ jobs: helm repo add prometheus-community https://prometheus-community.github.io/helm-charts helm repo add stable https://charts.helm.sh/stable helm repo update - - name: Deploy prometheus & Port Forwarding + - name: Deploy prometheus run: | + # nodePort mapping 30000 -> http://localhost:9090 kubectl create namespace monitoring helm install \ --wait --timeout 360s \ @@ -47,11 +48,36 @@ jobs: --set alertmanager.service.type=NodePort \ --set prometheus-node-exporter.service.nodePort=32001 \ --set prometheus-node-exporter.service.type=NodePort + - name: Deploy Elasticsearch - SELECTOR=`kubectl -n monitoring get service kind-prometheus-kube-prome-prometheus -o wide --no-headers=true | awk '{ print $7 }'` - POD_NAME=`kubectl -n monitoring get pods --selector="$SELECTOR" --no-headers=true | awk '{ print $1 }'` - kubectl -n monitoring port-forward $POD_NAME 9090:9090 & - sleep 5 + env: + ELASTIC_URL: ${{ vars.ELASTIC_URL }} + ELASTIC_PORT: ${{ vars.ELASTIC_PORT }} + ELASTIC_USER: ${{ vars.ELASTIC_USER }} + ELASTIC_PASSWORD: ${{ vars.ELASTIC_PASSWORD }} + run: | + echo "ELASTIC_URL: ${ELASTIC_URL}" + echo "ELASTIC_PORT: ${ELASTIC_PORT}" + echo "ELASTIC_USER: ${ELASTIC_USER}" + echo "ELASTIC_PASSWORD:${ELASTIC_PASSWORD}" + + + # nodePort mapping 32766 -> http://localhost:9091 + helm install \ + --wait --timeout 360s \ + elasticsearch \ + oci://registry-1.docker.io/bitnamicharts/elasticsearch \ + --set master.masterOnly=false \ + --set master.replicaCount=1 \ + --set data.replicaCount=0 \ + --set coordinating.replicaCount=0 \ + --set ingest.replicaCount=0 \ + --set service.type=NodePort \ + --set service.nodePorts.restAPI=32766 \ + --set security.elasticPassword=test \ + --set security.enabled=true \ + --set image.tag=7.17.23-debian-12-r0 \ + --set security.tls.autoGenerated=true - name: Check out code uses: actions/checkout@v3 - name: Update version number @@ -78,7 +104,11 @@ jobs: BUCKET_NAME: ${{ secrets.BUCKET_NAME }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - ES_SERVER: ${{ secrets.ES_SERVER }} + ELASTIC_URL: ${{ vars.ELASTIC_URL }} + ELASTIC_PORT: ${{ vars.ELASTIC_PORT }} + ELASTIC_USER: ${{ vars.ELASTIC_USER }} + ELASTIC_PASSWORD: ${{ vars.ELASTIC_PASSWORD }} + run: | export TEST_WORKDIR=`pwd`/`date +%s` mkdir $TEST_WORKDIR diff --git a/kind-config.yml b/kind-config.yml new file mode 100644 index 00000000..95f101fa --- /dev/null +++ b/kind-config.yml @@ -0,0 +1,9 @@ +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: + - role: control-plane + extraPortMappings: + - containerPort: 30000 + hostPort: 9090 + - containerPort: 32766 + hostPort: 9091 diff --git a/poetry.lock b/poetry.lock index d6d0bd98..cdc01c95 100644 --- a/poetry.lock +++ b/poetry.lock @@ -569,42 +569,44 @@ files = [ ] [[package]] -name = "elastic-transport" -version = "8.13.1" -description = "Transport classes and utilities shared among Python Elastic client libraries" +name = "elasticsearch" +version = "7.13.4" +description = "Python client for Elasticsearch" optional = false -python-versions = ">=3.7" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4" files = [ - {file = "elastic_transport-8.13.1-py3-none-any.whl", hash = "sha256:5d4bb6b8e9d74a9c16de274e91a5caf65a3a8d12876f1e99152975e15b2746fe"}, - {file = "elastic_transport-8.13.1.tar.gz", hash = "sha256:16339d392b4bbe86ad00b4bdeecff10edf516d32bc6c16053846625f2c6ea250"}, + {file = "elasticsearch-7.13.4-py2.py3-none-any.whl", hash = "sha256:5920df0ab2630778680376d86bea349dc99860977eec9b6d2bd0860f337313f2"}, + {file = "elasticsearch-7.13.4.tar.gz", hash = "sha256:52dda85f76eeb85ec873bf9ffe0ba6849e544e591f66d4048a5e48016de268e0"}, ] [package.dependencies] certifi = "*" -urllib3 = ">=1.26.2,<3" +urllib3 = ">=1.21.1,<2" [package.extras] -develop = ["aiohttp", "furo", "httpx", "mock", "opentelemetry-api", "opentelemetry-sdk", "orjson", "pytest", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "pytest-mock", "requests", "respx", "sphinx (>2)", "sphinx-autodoc-typehints", "trustme"] +async = ["aiohttp (>=3,<4)"] +develop = ["black", "coverage", "jinja2", "mock", "pytest", "pytest-cov", "pyyaml", "requests (>=2.0.0,<3.0.0)", "sphinx (<1.7)", "sphinx-rtd-theme"] +docs = ["sphinx (<1.7)", "sphinx-rtd-theme"] +requests = ["requests (>=2.4.0,<3.0.0)"] [[package]] -name = "elasticsearch" -version = "8.14.0" +name = "elasticsearch-dsl" +version = "7.4.1" description = "Python client for Elasticsearch" optional = false -python-versions = ">=3.7" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" files = [ - {file = "elasticsearch-8.14.0-py3-none-any.whl", hash = "sha256:cef8ef70a81af027f3da74a4f7d9296b390c636903088439087b8262a468c130"}, - {file = "elasticsearch-8.14.0.tar.gz", hash = "sha256:aa2490029dd96f4015b333c1827aa21fd6c0a4d223b00dfb0fe933b8d09a511b"}, + {file = "elasticsearch-dsl-7.4.1.tar.gz", hash = "sha256:07ee9c87dc28cc3cae2daa19401e1e18a172174ad9e5ca67938f752e3902a1d5"}, + {file = "elasticsearch_dsl-7.4.1-py2.py3-none-any.whl", hash = "sha256:97f79239a252be7c4cce554c29e64695d7ef6a4828372316a5e5ff815e7a7498"}, ] [package.dependencies] -elastic-transport = ">=8.13,<9" +elasticsearch = ">=7.0.0,<8.0.0" +python-dateutil = "*" +six = "*" [package.extras] -async = ["aiohttp (>=3,<4)"] -orjson = ["orjson (>=3)"] -requests = ["requests (>=2.4.0,!=2.32.2,<3.0.0)"] -vectorstore-mmr = ["numpy (>=1)", "simsimd (>=3)"] +develop = ["coverage (<5.0.0)", "mock", "pytest (>=3.0.0)", "pytest-cov", "pytest-mock (<3.0.0)", "pytz", "sphinx", "sphinx-rtd-theme"] [[package]] name = "fonttools" @@ -2139,4 +2141,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "caa8730dae9716e7cebe9e798ff9a1bb74f8e4efa879d27304f5b63e1a237d95" +content-hash = "768268ae6a1df7838ea202a089274cd6f4020ff772eb5d317542ebc7c7882d45" diff --git a/pyproject.toml b/pyproject.toml index db22bce3..c067ed93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,8 @@ tzlocal = "5.1" pytz = "^2023.3" PyYAML = "6.0.1" prometheus-api-client = "^0.5.4" -elasticsearch = "8.14.0" +elasticsearch = "7.13.4" +elasticsearch-dsl = "7.4.1" wheel = "^0.42.0" cython = "3.0" diff --git a/src/krkn_lib/elastic/__init__.py b/src/krkn_lib/elastic/__init__.py new file mode 100644 index 00000000..7eec9504 --- /dev/null +++ b/src/krkn_lib/elastic/__init__.py @@ -0,0 +1 @@ +from .krkn_elastic import * # NOQA diff --git a/src/krkn_lib/elastic/krkn_elastic.py b/src/krkn_lib/elastic/krkn_elastic.py new file mode 100644 index 00000000..83e5d8d9 --- /dev/null +++ b/src/krkn_lib/elastic/krkn_elastic.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +import datetime +import logging +import time + +import math +import urllib3 +from elasticsearch import Elasticsearch, NotFoundError +from elasticsearch_dsl import Search + +from krkn_lib.models.elastic.models import ( + ElasticAlert, + ElasticMetric, + ElasticChaosRunTelemetry, +) +from krkn_lib.models.telemetry import ChaosRunTelemetry +from krkn_lib.utils.safe_logger import SafeLogger + + +class KrknElastic: + es = None + + def __init__( + self, + safe_logger: SafeLogger, + elastic_url: str, + elastic_port: int = 443, + verify_certs: bool = False, + username: str = None, + password: str = None, + ): + es_logger = logging.getLogger("elasticsearch") + es_logger.setLevel(logging.WARNING) + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + urllib3.disable_warnings(DeprecationWarning) + es_transport_logger = logging.getLogger("elastic_transport.transport") + es_transport_logger.setLevel(logging.CRITICAL) + self.safe_logger = safe_logger + try: + if not elastic_url: + raise Exception("elastic search url is not valid") + if not elastic_port: + raise Exception("elastic port is not valid") + # create Elasticsearch object + + credentials = ( + (username, password) if username and password else None + ) + self.es = Elasticsearch( + f"{elastic_url}:{elastic_port}", + http_auth=credentials, + verify_certs=verify_certs, + ssl_show_warn=False, + ) + except Exception as e: + self.safe_logger.error("Failed to initalize elasticsearch: %s" % e) + raise e + + def upload_data_to_elasticsearch(self, item: dict, index: str = "") -> int: + """uploads captured data in item dictionary to Elasticsearch + + + :param item: the data to post to elastic search + :param index: the elastic search index pattern to post to + + :return: the time taken to post the result, + will be 0 if index and es are blank + """ + + if self.es and index != "": + # Attach to elastic search and attempt index creation + start = time.time() + self.safe_logger.info( + f"Uploading item {item} to index {index} in Elasticsearch" + ) + try: + response = self.es.index(index=index, body=item) + self.safe_logger.info(f"Response back was {response}") + if response["result"] != "created": + self.safe_logger.error( + f"Error trying to create new item in {index}" + ) + return -1 + except Exception as e: + self.safe_logger.error(f"Error trying to create new item: {e}") + return -1 + end = time.time() + elapsed_time = end - start + + # return elapsed time for upload if no issues + return math.ceil(elapsed_time) + return 0 + + def upload_metrics_to_elasticsearch( + self, + run_uuid: str, + raw_data: list[dict[str, str | int | float]], + index: str, + ) -> int: + """ + Saves raw data returned from the Krkn prometheus + client to elastic search as a ElasticMetric object + raw data will be a mixed types dictionary in the format + {"name":"str", "values":[(10, '3.14'), (11,'3.15').....] + + :param run_uuid: the krkn run id + :param raw_data: the mixed type dictionary (will be validated + checking the attributes types ) + :param index: the elasticsearch index where + the object will be stored + :return: the time needed to save if succeeded -1 if failed + """ + if not index: + raise Exception("index cannot be None or empty") + if not run_uuid: + raise Exception("run uuid cannot be None or empty") + time_start = time.time() + try: + for metric in raw_data: + if ( + isinstance(metric["timestamp"], int) + and isinstance(metric["value"], float) + and isinstance(metric["name"], str) + ): + result = self.push_metric( + ElasticMetric( + run_uuid=run_uuid, + name=metric["name"], + created_at=datetime.datetime.now(), + timestamp=int(metric["timestamp"]), + value=float(metric["value"]), + ), + index, + ) + if result == -1: + self.safe_logger.error( + f"failed to save metric " + f"to elasticsearch : {metric}" + ) + + return int(time.time() - time_start) + except Exception: + return -1 + + def push_alert(self, alert: ElasticAlert, index: str) -> int: + """ + Pushes an ElasticAlert object to elastic + + :param alert: the populated ElasticAlert object + :param index: the index where the ElasticAlert Object + is pushed + :return: the time needed to save if succeeded -1 if failed + """ + if not index: + raise Exception("index cannot be None or empty") + try: + time_start = time.time() + alert.save(using=self.es, index=index) + return int(time.time() - time_start) + except Exception: + return -1 + + def push_metric(self, metric: ElasticMetric, index: str) -> int: + """ + Pushes an ElasticMetric object to elastic + + :param metric: the populated ElasticMetric object + :param index: the index where the ElasticMetric Object + is pushed + :return: the time needed to save if succeeded -1 if failed + """ + if not index: + raise Exception("index cannot be None or empty") + try: + time_start = time.time() + metric.save(using=self.es, index=index) + return int(time.time() - time_start) + except Exception: + return -1 + + def push_telemetry(self, telemetry: ChaosRunTelemetry, index: str): + if not index: + raise Exception("index cannot be None or empty") + try: + elastic_chaos = ElasticChaosRunTelemetry(telemetry) + time_start = time.time() + elastic_chaos.save(using=self.es, index=index) + return int(time.time() - time_start) + except Exception: + return -1 + + def search_telemetry(self, run_uuid: str, index: str): + """ + Searches ElasticChaosRunTelemetry by run_uuid + :param run_uuid: the Krkn run id to search + :param index: the index where the ElasticChaosRunTelemetry + should have been saved + :return: the list of objects retrieved (Empty if nothing + has been found) + """ + try: + search = Search(using=self.es, index=index).filter( + "match", run_uuid=run_uuid + ) + result = search.execute() + documents = [ + ElasticChaosRunTelemetry(**hit.to_dict()) for hit in result + ] + except NotFoundError: + return [] + return documents + + def search_alert(self, run_uuid: str, index: str) -> list[ElasticAlert]: + """ + Searches ElasticAlerts by run_uuid + :param run_uuid: the Krkn run id to search + :param index: the index where the ElasticAlert + should have been saved + :return: the list of objects retrieved (Empty if nothing + has been found) + """ + try: + search = Search(using=self.es, index=index).filter( + "match", run_uuid=run_uuid + ) + result = search.execute() + documents = [ElasticAlert(**hit.to_dict()) for hit in result] + except NotFoundError: + return [] + return documents + + def search_metric(self, run_uuid: str, index: str) -> list[ElasticMetric]: + """ + Searches ElasticMetric by run_uuid + :param run_uuid: the Krkn run id to search + :param index: the index where the ElasticAlert + should have been saved + :return: the list of objects retrieved (Empty if nothing + has been found) + """ + try: + search = Search(using=self.es, index=index).filter( + "match", run_uuid=run_uuid + ) + result = search.execute() + documents = [ElasticMetric(**hit.to_dict()) for hit in result] + except NotFoundError: + return [] + return documents diff --git a/src/krkn_lib/k8s/krkn_kubernetes.py b/src/krkn_lib/k8s/krkn_kubernetes.py index 6af74065..16d90a9a 100644 --- a/src/krkn_lib/k8s/krkn_kubernetes.py +++ b/src/krkn_lib/k8s/krkn_kubernetes.py @@ -2065,13 +2065,12 @@ def get_nodes_infos(self) -> (list[NodeInfo], list[Taint]): for node in node_resp.items: node_info = NodeInfo() if node.spec.taints is not None: - for taint in node.spec.taints: - taint = Taint( - node_name=node.metadata.name, - effect=taint.effect, - key=taint.key, - value=taint.value, - ) + for node_taint in node.spec.taints: + taint = Taint() + taint.node_name = node.metadata.name + taint.effect = node_taint.effect + taint.key = node_taint.key + taint.value = node_taint.value taints.append(taint) if instance_type_label in node.metadata.labels.keys(): node_info.instance_type = node.metadata.labels[ diff --git a/src/krkn_lib/models/elastic/__init__.py b/src/krkn_lib/models/elastic/__init__.py new file mode 100644 index 00000000..0d6e2db0 --- /dev/null +++ b/src/krkn_lib/models/elastic/__init__.py @@ -0,0 +1 @@ +from .models import * # NOQA diff --git a/src/krkn_lib/models/elastic/models.py b/src/krkn_lib/models/elastic/models.py new file mode 100644 index 00000000..86c13ac2 --- /dev/null +++ b/src/krkn_lib/models/elastic/models.py @@ -0,0 +1,195 @@ +from elasticsearch_dsl import ( + Keyword, + Text, + Date, + Document, + Float, + Long, + Nested, + InnerDoc, + Integer, +) +import datetime + +from krkn_lib.models.telemetry import ChaosRunTelemetry + + +class ElasticAlert(Document): + run_uuid = Keyword() + severity = Text() + alert = Text() + created_at = Date() + + def __init__( + self, + run_uuid: str = None, + severity: str = None, + alert: str = None, + created_at: datetime = None, + **kwargs, + ): + super().__init__(**kwargs) + self.run_uuid = run_uuid + self.severity = severity + self.alert = alert + self.created_at = created_at + + +class ElasticMetricValue(InnerDoc): + timestamp = Long() + value = Float() + + def __init__(self, timestamp: int, value: float, **kwargs): + super().__init__(**kwargs) + self.timestamp = timestamp + self.value = value + + +class ElasticMetric(Document): + run_uuid = Keyword() + name = Text() + created_at = Date() + timestamp = Long() + value = Float() + + def __init__( + self, + run_uuid: str, + name: str, + created_at: datetime, + timestamp: int, + value: float, + **kwargs, + ): + super().__init__(**kwargs) + self.run_uuid = run_uuid + self.name = name + self.created_at = created_at + self.timestamp = timestamp + self.value = value + + +# Telemetry models + + +class ElasticAffectedPod(InnerDoc): + pod_name = Text(fields={"keyword": Keyword()}) + namespace = Text() + total_recovery_time = Float() + pod_readiness_time = Float() + pod_rescheduling_time = Float() + + +class ElasticPodsStatus(InnerDoc): + recovered = Nested(ElasticAffectedPod, multi=True) + unrecovered = Nested(ElasticAffectedPod, multi=True) + error = Text() + + +class ElasticScenarioParameters(InnerDoc): + pass + + +class ElasticScenarioTelemetry(InnerDoc): + start_timestamp = Float() + end_timestamp = Float() + scenario = Text(fields={"keyword": Keyword()}) + exit_status = Integer() + parameters_base64 = Text() + parameters = Nested(ElasticScenarioParameters) + affected_pods = Nested(ElasticPodsStatus) + + +class ElasticNodeInfo(InnerDoc): + count = Integer() + architecture = Text() + instance_type = Text() + node_type = Text() + kernel_version = Text() + kubelet_version = Text() + os_version = Text() + + +class ElasticTaint(InnerDoc): + key = Text() + value = Text() + effect = Text() + + +class ElasticChaosRunTelemetry(Document): + scenarios = Nested(ElasticScenarioTelemetry, multi=True) + node_summary_infos = Nested(ElasticNodeInfo, multi=True) + node_taints = Nested(ElasticTaint, multi=True) + kubernetes_objects_count = Nested(InnerDoc) + network_plugins = Text(multi=True) + timestamp = Text() + total_node_count = Integer() + cloud_infrastructure = Text() + cloud_type = Text() + run_uuid = Text(fields={"keyword": Keyword()}) + + class Index: + name = "chaos_run_telemetry" + + def __init__( + self, chaos_run_telemetry: ChaosRunTelemetry = None, **kwargs + ): + super().__init__(**kwargs) + # cheap trick to avoid reinventing the wheel :-) + if chaos_run_telemetry is None and kwargs: + chaos_run_telemetry = ChaosRunTelemetry(json_dict=kwargs) + self.scenarios = [ + ElasticScenarioTelemetry( + start_timestamp=sc.start_timestamp, + end_timestamp=sc.end_timestamp, + scenario=sc.scenario, + exit_status=sc.exit_status, + parameters_base64=sc.parameters_base64, + parameters=sc.parameters, + affected_pods=ElasticPodsStatus( + recovered=[ + ElasticAffectedPod( + pod_name=pod.pod_name, + namespace=pod.namespace, + total_recovery_time=pod.total_recovery_time, + pod_readiness_time=pod.pod_readiness_time, + pod_rescheduling_time=pod.pod_rescheduling_time, + ) + for pod in sc.affected_pods.recovered + ], + unrecovered=[ + ElasticAffectedPod( + pod_name=pod.pod_name, namespace=pod.namespace + ) + for pod in sc.affected_pods.unrecovered + ], + error=sc.affected_pods.error, + ), + ) + for sc in chaos_run_telemetry.scenarios + ] + + self.node_summary_infos = [ + ElasticNodeInfo( + count=info.count, + architecture=info.architecture, + instance_type=info.instance_type, + kernel_version=info.kernel_version, + kubelet_version=info.kubelet_version, + os_version=info.os_version, + ) + for info in chaos_run_telemetry.node_summary_infos + ] + self.node_taints = [ + ElasticTaint(key=taint.key, value=taint.value, effect=taint.effect) + for taint in chaos_run_telemetry.node_taints + ] + self.kubernetes_objects_count = ( + chaos_run_telemetry.kubernetes_objects_count + ) + self.network_plugins = chaos_run_telemetry.network_plugins + self.timestamp = chaos_run_telemetry.timestamp + self.total_node_count = chaos_run_telemetry.total_node_count + self.cloud_infrastructure = chaos_run_telemetry.cloud_infrastructure + self.cloud_type = chaos_run_telemetry.cloud_type + self.run_uuid = chaos_run_telemetry.run_uuid diff --git a/src/krkn_lib/models/telemetry/models.py b/src/krkn_lib/models/telemetry/models.py index f7cbfeaa..15ca7e65 100644 --- a/src/krkn_lib/models/telemetry/models.py +++ b/src/krkn_lib/models/telemetry/models.py @@ -50,7 +50,9 @@ def __init__(self, json_object: any = None): self.exit_status = json_object.get("exit_status") self.parameters_base64 = json_object.get("parameters_base64") self.parameters = json_object.get("parameters") - self.affected_pods = PodsStatus(json_object.get("affected_pods")) + self.affected_pods = PodsStatus( + json_object=json_object.get("affected_pods") + ) if ( self.parameters_base64 is not None @@ -105,11 +107,22 @@ class Taint: Taint Value """ + def __init__(self, json_dict: dict = None): + if json_dict is not None: + self.node_name = ( + json_dict["node_name"] if "node_name" in json_dict else None + ) + self.effect = ( + json_dict["effect"] if "effect" in json_dict else None + ) + self.key = json_dict["key"] if "key" in json_dict else None + self.value = json_dict["value"] if "value" in json_dict else None + @dataclass(order=False) class NodeInfo: """ - Cluster node telemetry informations + Cluster node telemetry infos """ count: int = 1 @@ -136,6 +149,36 @@ class NodeInfo: os_version: str = "" "Operating system version" + def __init__(self, json_dict: dict = None): + if json_dict is not None: + self.count = json_dict["count"] if "count" in json_dict else None + self.architecture = ( + json_dict["architecture"] + if "architecture" in json_dict + else None + ) + self.instance_type = ( + json_dict["instance_type"] + if "instance_type" in json_dict + else None + ) + self.node_type = ( + json_dict["node_type"] if "node_type" in json_dict else None + ) + self.kernel_version = ( + json_dict["kernel_version"] + if "kernel_version" in json_dict + else None + ) + self.kubelet_version = ( + json_dict["kubelet_version"] + if "kubelet_version" in json_dict + else None + ) + self.os_version = ( + json_dict["os_version"] if "os_version" in json_dict else None + ) + def __eq__(self, other): if isinstance(other, NodeInfo): return ( @@ -231,9 +274,6 @@ def __init__(self, json_dict: any = None): scenarios = json_dict.get("scenarios") if scenarios is None or isinstance(scenarios, list) is False: raise Exception("scenarios param must be a list of object") - for scenario in scenarios: - scenario_telemetry = ScenarioTelemetry(scenario) - self.scenarios.append(scenario_telemetry) self.scenarios = [ScenarioTelemetry(s) for s in scenarios] @@ -250,6 +290,7 @@ def __init__(self, json_dict: any = None): ) self.network_plugins = json_dict.get("network_plugins") self.run_uuid = json_dict.get("run_uuid") + self.timestamp = json_dict.get("timestamp") def to_json(self) -> str: return json.dumps(self, default=lambda o: o.__dict__, indent=4) diff --git a/src/krkn_lib/telemetry/elastic.py b/src/krkn_lib/telemetry/elastic.py deleted file mode 100644 index 6964ec6e..00000000 --- a/src/krkn_lib/telemetry/elastic.py +++ /dev/null @@ -1,57 +0,0 @@ -import time - -import urllib3 -from elasticsearch import Elasticsearch - -from krkn_lib.utils.safe_logger import SafeLogger - - -class KrknElastic: - es = None - - def __init__(self, safe_logger: SafeLogger, elastic_url: str): - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - urllib3.disable_warnings(DeprecationWarning) - self.safe_logger = safe_logger - try: - # create Elasticsearch object - if elastic_url: - self.es = Elasticsearch(f"{elastic_url}:443") - except Exception as e: - self.safe_logger.error("Failed to initalize elasticsearch: %s" % e) - raise e - - def upload_data_to_elasticsearch(self, item: dict, index: str = ""): - """uploads captured data in item dictionary to Elasticsearch - - - :param item: the data to post to elastic search - :param index: the elastic search index pattern to post to - - :return: the time taken to post the result, - will be 0 if index and es are blank - """ - - if self.es and index != "": - # Attach to elastic search and attempt index creation - start = time.time() - self.safe_logger.info( - f"Uploading item {item} to index {index} in Elasticsearch" - ) - try: - response = self.es.index(index=index, body=item) - self.safe_logger.info(f"Response back was {response}") - if response["result"] != "created": - self.safe_logger.error( - f"Error trying to create new item in {index}" - ) - return -1 - except Exception as e: - self.safe_logger.error(f"Error trying to create new item: {e}") - return -1 - end = time.time() - elapsed_time = end - start - - # return elapsed time for upload if no issues - return elapsed_time - return 0 diff --git a/src/krkn_lib/tests/base_test.py b/src/krkn_lib/tests/base_test.py index 323cdf40..7b739a00 100644 --- a/src/krkn_lib/tests/base_test.py +++ b/src/krkn_lib/tests/base_test.py @@ -1,5 +1,6 @@ import cProfile import logging +import os import random import string import sys @@ -16,6 +17,7 @@ from kubernetes.client.rest import ApiException from requests import ConnectTimeout +from krkn_lib.elastic.krkn_elastic import KrknElastic from krkn_lib.k8s import KrknKubernetes from krkn_lib.ocp import KrknOpenshift from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes @@ -28,10 +30,18 @@ class BaseTest(unittest.TestCase): lib_ocp: KrknOpenshift lib_telemetry_k8s: KrknTelemetryKubernetes lib_telemetry_ocp: KrknTelemetryOpenshift + lib_elastic: KrknElastic pr: cProfile.Profile @classmethod def setUpClass(cls): + cls.lib_elastic = KrknElastic( + SafeLogger(), + os.getenv("ELASTIC_URL"), + int(os.getenv("ELASTIC_PORT")), + username=os.getenv("ELASTIC_USER"), + password=os.getenv("ELASTIC_PASSWORD"), + ) cls.lib_k8s = KrknKubernetes(config.KUBE_CONFIG_DEFAULT_LOCATION) cls.lib_ocp = KrknOpenshift(config.KUBE_CONFIG_DEFAULT_LOCATION) cls.lib_telemetry_k8s = KrknTelemetryKubernetes( @@ -422,3 +432,61 @@ def background_delete_pod(self, pod_name: str, namespace: str): ) thread.daemon = True thread.start() + + def get_ChaosRunTelemetry_json(self, run_uuid: str) -> dict: + example_data = { + "scenarios": [ + { + "start_timestamp": 1628493021.0, + "end_timestamp": 1628496621.0, + "scenario": "example_scenario.yaml", + "exit_status": 0, + "parameters_base64": "", + "parameters": { + "parameter_1": "test", + "parameter_2": "test", + "parameter_3": {"sub_parameter_1": "test"}, + }, + "affected_pods": { + "recovered": [ + { + "pod_name": "pod1", + "namespace": "default", + "total_recovery_time": 10.0, + "pod_readiness_time": 5.0, + "pod_rescheduling_time": 2.0, + } + ], + "unrecovered": [ + {"pod_name": "pod2", "namespace": "default"} + ], + "error": "some error", + }, + } + ], + "node_summary_infos": [ + { + "count": 5, + "architecture": "aarch64", + "instance_type": "m2i.xlarge", + "kernel_version": "5.4.0-66-generic", + "kubelet_version": "v2.1.2", + "os_version": "Linux", + } + ], + "node_taints": [ + { + "key": "node.kubernetes.io/unreachable", + "value": "NoExecute", + "effect": "NoExecute", + } + ], + "kubernetes_objects_count": {"Pod": 5, "Service": 2}, + "network_plugins": ["Calico"], + "timestamp": "2023-05-22T14:55:02Z", + "total_node_count": 3, + "cloud_infrastructure": "AWS", + "cloud_type": "EC2", + "run_uuid": run_uuid, + } + return example_data diff --git a/src/krkn_lib/tests/test_krkn_elastic.py b/src/krkn_lib/tests/test_krkn_elastic.py index 4b3c1326..d9dd93c9 100644 --- a/src/krkn_lib/tests/test_krkn_elastic.py +++ b/src/krkn_lib/tests/test_krkn_elastic.py @@ -1,33 +1,161 @@ import datetime -import os +import time -from krkn_lib.telemetry.elastic import KrknElastic +import uuid + +from krkn_lib.elastic.krkn_elastic import KrknElastic +from krkn_lib.models.elastic.models import ( + ElasticAlert, + ElasticMetric, +) +from krkn_lib.models.telemetry import ChaosRunTelemetry from krkn_lib.tests import BaseTest -from krkn_lib.utils.safe_logger import SafeLogger +from krkn_lib.utils import SafeLogger class TestKrknElastic(BaseTest): - url = os.getenv("ES_SERVER") - safe_logger: SafeLogger = SafeLogger() - def _testupload_correct(self): - elastic = KrknElastic(self.safe_logger, self.url) - time = elastic.upload_data_to_elasticsearch( - {"timestamp": datetime.datetime.now()}, "chaos_test" + def test_push_search_alert(self): + run_uuid = str(uuid.uuid4()) + index = "test-push-alert" + alert_1 = ElasticAlert( + alert="alert_1", + severity="WARNING", + created_at=datetime.datetime.now(), + run_uuid=run_uuid, + ) + alert_2 = ElasticAlert( + alert="alert_2", + severity="ERROR", + created_at=datetime.datetime.now(), + run_uuid=run_uuid, + ) + result = self.lib_elastic.push_alert(alert_1, index) + self.assertNotEqual(result, -1) + result = self.lib_elastic.push_alert(alert_2, index) + self.assertNotEqual(result, -1) + time.sleep(1) + alerts = self.lib_elastic.search_alert(run_uuid, index) + self.assertEqual(len(alerts), 2) + + alert = next(alert for alert in alerts if alert.alert == "alert_1") + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, "WARNING") + + alert = next(alert for alert in alerts if alert.alert == "alert_2") + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, "ERROR") + + def test_push_search_metric(self): + run_uuid = str(uuid.uuid4()) + index = "test-push-metric" + metric_1 = ElasticMetric( + run_uuid=run_uuid, + name="metric_1", + timestamp=100, + value=1.0, + created_at=datetime.datetime.now(), + ) + result = self.lib_elastic.push_metric(metric_1, index) + self.assertNotEqual(result, -1) + time.sleep(1) + metrics = self.lib_elastic.search_metric(run_uuid, index) + self.assertEqual(len(metrics), 1) + metric = next( + metric for metric in metrics if metric.name == "metric_1" + ) + self.assertIsNotNone(metric) + self.assertEqual(metric.value, 1.0) + self.assertEqual(metric.timestamp, 100) + self.assertEqual(metric.run_uuid, run_uuid) + self.assertEqual(metric.name, "metric_1") + + def test_push_search_telemetry(self): + run_uuid = str(uuid.uuid4()) + index = "test-push-telemetry" + example_data = self.get_ChaosRunTelemetry_json(run_uuid) + telemetry = ChaosRunTelemetry(json_dict=example_data) + res = self.lib_elastic.push_telemetry(telemetry, index) + self.assertNotEqual(res, -1) + time.sleep(3) + result = self.lib_elastic.search_telemetry( + run_uuid=run_uuid, index=index + ) + + self.assertEqual(len(result), 1) + + def test_upload_metric_to_elasticsearch(self): + bad_metric_uuid = str(uuid.uuid4()) + good_metric_uuid = str(uuid.uuid4()) + name = f"metric-{self.get_random_string(5)}" + index = "test-upload-metric" + # testing bad metric + self.lib_elastic.upload_metrics_to_elasticsearch( + run_uuid=bad_metric_uuid, + raw_data={ + "name": 1, + "timestamp": "bad", + "value": "bad", + }, + index=index, + ) + + self.assertEqual( + len(self.lib_elastic.search_metric(bad_metric_uuid, index)), 0 ) + self.lib_elastic.upload_metrics_to_elasticsearch( + run_uuid=good_metric_uuid, + raw_data=[{"name": name, "timestamp": 10, "value": 3.14}], + index=index, + ) + time.sleep(1) + metric = self.lib_elastic.search_metric(good_metric_uuid, index) + self.assertEqual(len(metric), 1) + self.assertEqual(metric[0].name, name) + self.assertEqual(metric[0].timestamp, 10) + self.assertEqual(metric[0].value, 3.14) + + def test_search_alert_not_existing(self): + self.assertEqual( + len(self.lib_elastic.search_alert("notexisting", "notexisting")), 0 + ) + + def test_search_metric_not_existing(self): + self.assertEqual( + len(self.lib_elastic.search_metric("notexisting", "notexisting")), + 0, + ) + + def test_search_telemetry_not_existing(self): + self.assertEqual( + len( + self.lib_elastic.search_telemetry("notexisting", "notexisting") + ), + 0, + ) + + def test_upload_correct(self): + timestamp = datetime.datetime.now() + run_uuid = str(uuid.uuid4()) + index = "chaos_test" + time = self.lib_elastic.upload_data_to_elasticsearch( + {"timestamp": timestamp, "run_uuid": run_uuid}, index + ) self.assertGreater(time, 0) - def _testupload_no_index(self): - elastic = KrknElastic(self.safe_logger, self.url) - time = elastic.upload_data_to_elasticsearch( + def test_upload_no_index(self): + time = self.lib_elastic.upload_data_to_elasticsearch( {"timestamp": datetime.datetime.now()}, "" ) self.assertEqual(time, 0) - def _testupload_bad_es_url(self): - elastic = KrknElastic(self.safe_logger, "https://localhost") + def test_upload_bad_es_url(self): + elastic = KrknElastic( + SafeLogger(), + "http://localhost", + ) time = elastic.upload_data_to_elasticsearch( {"timestamp": datetime.datetime.now()}, "chaos_test" ) @@ -36,9 +164,8 @@ def _testupload_bad_es_url(self): def _testupload_blank_es_url(self): es_url = "" - elastic = KrknElastic(self.safe_logger, es_url) - time = elastic.upload_data_to_elasticsearch( - {"timestamp": datetime.datetime.now()}, "chaos_test" - ) - - self.assertEqual(time, 0) + with self.assertRaises(Exception): + _ = KrknElastic( + SafeLogger(), + es_url, + ) diff --git a/src/krkn_lib/tests/test_krkn_elastic_models.py b/src/krkn_lib/tests/test_krkn_elastic_models.py new file mode 100644 index 00000000..f6b70339 --- /dev/null +++ b/src/krkn_lib/tests/test_krkn_elastic_models.py @@ -0,0 +1,171 @@ +import uuid + +from krkn_lib.models.elastic.models import ElasticChaosRunTelemetry +from krkn_lib.models.telemetry import ChaosRunTelemetry +from krkn_lib.tests import BaseTest + + +class TestKrknElasticModels(BaseTest): + + def check_test_ElasticChaosRunTelemetry( + self, elastic_telemetry: ElasticChaosRunTelemetry, run_uuid: str + ): + self.assertEqual(len(elastic_telemetry.scenarios), 1) + # scenarios + self.assertEqual( + elastic_telemetry.scenarios[0].start_timestamp, 1628493021.0 + ) + self.assertEqual( + elastic_telemetry.scenarios[0].end_timestamp, 1628496621.0 + ) + self.assertEqual( + elastic_telemetry.scenarios[0].scenario, "example_scenario.yaml" + ) + self.assertEqual(elastic_telemetry.scenarios[0].exit_status, 0) + self.assertEqual(elastic_telemetry.scenarios[0].parameters_base64, "") + self.assertEqual( + elastic_telemetry.scenarios[0].parameters, + self.get_ChaosRunTelemetry_json(run_uuid).get("scenarios")[0][ + "parameters" + ], + ) + + # scenarios -> affected_pods + self.assertEqual( + len(elastic_telemetry.scenarios[0].affected_pods.recovered), 1 + ) + self.assertEqual( + len(elastic_telemetry.scenarios[0].affected_pods.unrecovered), 1 + ) + self.assertEqual( + elastic_telemetry.scenarios[0].affected_pods.error, "some error" + ) + + # scenarios -> affected_pods -> recovered + self.assertEqual( + elastic_telemetry.scenarios[0].affected_pods.recovered[0].pod_name, + "pod1", + ) + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.recovered[0] + .namespace, + "default", + ) + + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.recovered[0] + .total_recovery_time, + 10.0, + ) + + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.recovered[0] + .pod_readiness_time, + 5.0, + ) + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.recovered[0] + .pod_rescheduling_time, + 2.0, + ) + self.assertEqual( + elastic_telemetry.scenarios[0].affected_pods.recovered[0].pod_name, + "pod1", + ) + self.assertEqual( + elastic_telemetry.scenarios[0].affected_pods.recovered[0].pod_name, + "pod1", + ) + + # scenarios -> affected_pods -> unrecovered + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.unrecovered[0] + .pod_name, + "pod2", + ) + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.unrecovered[0] + .namespace, + "default", + ) + + # node_summary_infos + self.assertEqual(len(elastic_telemetry.node_summary_infos), 1) + + self.assertEqual(elastic_telemetry.node_summary_infos[0].count, 5) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].architecture, "aarch64" + ) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].instance_type, "m2i.xlarge" + ) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].kernel_version, + "5.4.0-66-generic", + ) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].kubelet_version, "v2.1.2" + ) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].os_version, "Linux" + ) + + # node_taints + self.assertEqual(len(elastic_telemetry.node_taints), 1) + + self.assertEqual( + elastic_telemetry.node_taints[0].key, + "node.kubernetes.io/unreachable", + ) + self.assertEqual(elastic_telemetry.node_taints[0].value, "NoExecute") + self.assertEqual(elastic_telemetry.node_taints[0].effect, "NoExecute") + + # objects_count + self.assertEqual( + len(elastic_telemetry.kubernetes_objects_count.to_dict().keys()), 2 + ) + self.assertEqual( + elastic_telemetry.kubernetes_objects_count.to_dict().get("Pod"), 5 + ) + self.assertEqual( + elastic_telemetry.kubernetes_objects_count.to_dict().get( + "Service" + ), + 2, + ) + + # network_plugins + + self.assertEqual(len(elastic_telemetry.network_plugins), 1) + self.assertEqual(elastic_telemetry.network_plugins[0], "Calico") + + # obejct properties + self.assertEqual(elastic_telemetry.timestamp, "2023-05-22T14:55:02Z") + self.assertEqual(elastic_telemetry.total_node_count, 3) + self.assertEqual(elastic_telemetry.cloud_infrastructure, "AWS") + self.assertEqual(elastic_telemetry.cloud_type, "EC2") + self.assertEqual(elastic_telemetry.run_uuid, run_uuid) + + def test_ElasticChaosRunTelemetry(self): + run_uuid = str(uuid.uuid4()) + example_data = self.get_ChaosRunTelemetry_json(run_uuid) + telemetry = ChaosRunTelemetry(json_dict=example_data) + # building from object (to save in elastic) + elastic_telemetry_object = ElasticChaosRunTelemetry( + chaos_run_telemetry=telemetry + ) + # building from dictionary (to retrieve from elastic) + elastic_telemetry_dic = ElasticChaosRunTelemetry(None, **example_data) + + self.check_test_ElasticChaosRunTelemetry( + elastic_telemetry_object, run_uuid + ) + self.check_test_ElasticChaosRunTelemetry( + elastic_telemetry_dic, run_uuid + )