From c7ad1cde09e79d4821383ceb5dd32fd0471d427d Mon Sep 17 00:00:00 2001 From: LeaveMyYard Date: Mon, 22 Apr 2024 18:30:02 +0300 Subject: [PATCH] Moved the logic from #93 for a new refined structure Co-authored-by: Megrez Lu --- .../core/integrations/kubernetes/__init__.py | 47 ++++++- .../kubernetes/workload_loader/base.py | 5 +- .../workload_loader/kube_api/__init__.py | 9 +- .../workload_loader/kube_api/loaders/base.py | 15 +-- .../kubernetes/workload_loader/prometheus.py | 10 -- .../workload_loader/prometheus/__init__.py | 31 +++++ .../prometheus/loaders/__init__.py | 19 +++ .../prometheus/loaders/base.py | 126 ++++++++++++++++++ .../prometheus/loaders/deployments.py | 50 +++++++ .../core/integrations/prometheus/loader.py | 4 +- robusta_krr/core/runner.py | 70 ++++------ robusta_krr/main.py | 7 + 12 files changed, 309 insertions(+), 84 deletions(-) delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index 051cc232..2ff44585 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -17,10 +17,12 @@ V2HorizontalPodAutoscaler, ) +from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData from robusta_krr.core.models.result import ResourceAllocations from robusta_krr.utils.object_like_dict import ObjectLikeDict +from prometrix import PrometheusNotFound from . import config_patch as _ from .workload_loader import BaseWorkloadLoader, KubeAPIWorkloadLoader, PrometheusWorkloadLoader @@ -31,7 +33,13 @@ HPAKey = tuple[str, str, str] -class ClusterWorkloadsLoader: +class ClusterConnector: + EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) + + def __init__(self) -> None: + self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {} + self._metrics_service_loaders_error_logged: set[Exception] = set() + async def list_clusters(self) -> Optional[list[str]]: """List all clusters. @@ -71,15 +79,44 @@ async def list_clusters(self) -> Optional[list[str]]: return [context["name"] for context in contexts] return [context["name"] for context in contexts if context["name"] in settings.clusters] + + def get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]: + if cluster not in self._metrics_service_loaders: + try: + self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster) + except Exception as e: + self._metrics_service_loaders[cluster] = e + + result = self._metrics_service_loaders[cluster] + if isinstance(result, self.EXPECTED_EXCEPTIONS): + if result not in self._metrics_service_loaders_error_logged: + self._metrics_service_loaders_error_logged.add(result) + logger.error(str(result)) + return None + elif isinstance(result, Exception): + raise result - def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]: - WorkloadLoader = KubeAPIWorkloadLoader if settings.workload_loader == "kubeapi" else PrometheusWorkloadLoader + return result + def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]: try: - return WorkloadLoader(cluster=cluster) + if settings.workload_loader == "kubeapi": + return KubeAPIWorkloadLoader(cluster=cluster) + elif settings.workload_loader == "prometheus": + cluster_loader = self.get_prometheus_loader(cluster) + if cluster_loader is not None: + return PrometheusWorkloadLoader(cluster=cluster, metric_loader=cluster_loader) + else: + logger.error( + f"Could not load Prometheus for cluster {cluster} and will skip it." + "Not possible to load workloads through Prometheus without connection to Prometheus." + ) + else: + raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented") except Exception as e: logger.error(f"Could not load cluster {cluster} and will skip it: {e}") - return None + + return None async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]: """List all scannable objects. diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py index a7bba214..8581c1a6 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py @@ -4,11 +4,8 @@ class BaseWorkloadLoader(abc.ABC): - def __init__(self, cluster: Optional[str] = None) -> None: - self.cluster = cluster - @abc.abstractmethod - async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]: + async def list_workloads(self) -> list[K8sWorkload]: pass @abc.abstractmethod diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py index 8586b77a..d23f7a46 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py @@ -11,7 +11,6 @@ from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData from robusta_krr.core.models.result import ResourceAllocations -from robusta_krr.utils.object_like_dict import ObjectLikeDict from ..base import BaseWorkloadLoader from .loaders import ( @@ -41,8 +40,8 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader): CronJobLoader, ] - def __init__(self, cluster: Optional[str] = None): - super().__init__(cluster) + def __init__(self, cluster: Optional[str] = None) -> None: + self.cluster = cluster # This executor will be running requests to Kubernetes API self.executor = ThreadPoolExecutor(settings.max_workers) @@ -88,7 +87,7 @@ async def list_workloads(self) -> list[K8sWorkload]: async def list_pods(self, object: K8sWorkload) -> list[PodData]: return await self._workload_loaders[object.kind].list_pods(object) - def __build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload: + def _build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload: name = item.metadata.name namespace = item.metadata.namespace kind = kind or item.__class__.__name__[2:] @@ -160,7 +159,7 @@ async def _fetch_workload(self, loader: BaseKindLoader) -> list[K8sWorkload]: if asyncio.iscoroutine(containers): containers = await containers - result.extend(self.__build_scannable_object(item, container, kind) for container in containers) + result.extend(self._build_scannable_object(item, container, kind) for container in containers) except ApiException as e: if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]: if self._kind_available[kind]: diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py index 0c14e89f..a6a552e6 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py @@ -6,21 +6,12 @@ from kubernetes import client # type: ignore from kubernetes.client.api_client import ApiClient # type: ignore -from kubernetes.client.models import ( # type: ignore - V1Container, - V1DaemonSet, - V1Deployment, - V1Job, - V1Pod, - V1PodList, - V1StatefulSet, -) +from kubernetes.client.models import V1Container, V1PodList # type: ignore from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData logger = logging.getLogger("krr") -AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] HPAKey = tuple[str, str, str] @@ -112,9 +103,7 @@ def _build_selector_query(cls, selector: Any) -> Union[str, None]: label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()] if selector.match_expressions is not None: - label_filters += [ - cls._get_match_expression_filter(expression) for expression in selector.match_expressions - ] + label_filters += [cls._get_match_expression_filter(expression) for expression in selector.match_expressions] if label_filters == []: # NOTE: This might mean that we have DeploymentConfig, diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py deleted file mode 100644 index d2a98831..00000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py +++ /dev/null @@ -1,10 +0,0 @@ -import logging -from .base import BaseWorkloadLoader - - -logger = logging.getLogger("krr") - - -class PrometheusWorkloadLoader(BaseWorkloadLoader): - # TODO: Implement PrometheusWorkloadLoader - pass diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py new file mode 100644 index 00000000..b277bc28 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py @@ -0,0 +1,31 @@ +import asyncio +import itertools +import logging + + +from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader +from robusta_krr.core.models.config import settings +from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService +from robusta_krr.core.models.objects import K8sWorkload, PodData +from ..base import BaseWorkloadLoader +from .loaders import BaseKindLoader, DeploymentLoader + + +logger = logging.getLogger("krr") + + +class PrometheusWorkloadLoader(BaseWorkloadLoader): + workloads: list[type[BaseKindLoader]] = [DeploymentLoader] + + def __init__(self, cluster: str, metric_loader: PrometheusMetricsLoader) -> None: + self.cluster = cluster + self.metric_service = metric_loader + self.loaders = [loader(metric_loader) for loader in self.workloads] + + async def list_workloads(self) -> list[K8sWorkload]: + return itertools.chain(await asyncio.gather(*[loader.list_workloads(settings.namespaces, "") for loader in self.loaders])) + + async def list_pods(self, object: K8sWorkload) -> list[PodData]: + # This should not be implemented, as implementation will repeat PrometheusMetricsLoader.load_pods + # As this method is ment to be a fallback, repeating the same logic will not be beneficial + raise NotImplementedError diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py new file mode 100644 index 00000000..6ca6efd6 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py @@ -0,0 +1,19 @@ +from .base import BaseKindLoader +from .cronjobs import CronJobLoader +from .daemonsets import DaemonSetLoader +from .deploymentconfigs import DeploymentConfigLoader +from .deployments import DeploymentLoader +from .jobs import JobLoader +from .rollouts import RolloutLoader +from .statefulsets import StatefulSetLoader + +__all__ = [ + "BaseKindLoader", + "CronJobLoader", + "DeploymentLoader", + "DaemonSetLoader", + "DeploymentConfigLoader", + "JobLoader", + "RolloutLoader", + "StatefulSetLoader", +] \ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py new file mode 100644 index 00000000..a767eb2f --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py @@ -0,0 +1,126 @@ +import abc +import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Iterable, Literal, Optional, Union + +from kubernetes import client # type: ignore +from kubernetes.client.api_client import ApiClient # type: ignore +from kubernetes.client.models import ( # type: ignore + V1Container, + V1DaemonSet, + V1Deployment, + V1Job, + V1Pod, + V1PodList, + V1StatefulSet, +) + +from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader +from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData + +logger = logging.getLogger("krr") + +AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] +HPAKey = tuple[str, str, str] + + +class BaseKindLoader(abc.ABC): + """ + This class is used to define how to load a specific kind of Kubernetes object. + It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects. + """ + + kind: KindLiteral + + def __init__(self, metrics_loader: PrometheusMetricsLoader) -> None: + self.metrics_loader = metrics_loader + + @abc.abstractmethod + def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]: + pass + + async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations: + limits = await self.metrics_loader.loader.query( + "avg by(resource) (kube_pod_container_resource_limits{" + f'namespace="{namespace}", ' + f'pod=~"{pod_selector}", ' + f'container="{container_name}"' + "})" + ) + requests = await self.metrics_loader.loader.query( + "avg by(resource) (kube_pod_container_resource_requests{" + f'namespace="{namespace}", ' + f'pod=~"{pod_selector}", ' + f'container="{container_name}"' + "})" + ) + requests_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + limits_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + for limit in limits: + if limit["metric"]["resource"] == ResourceType.CPU: + limits_values[ResourceType.CPU] = float(limit["value"][1]) + elif limit["metric"]["resource"] == ResourceType.Memory: + limits_values[ResourceType.Memory] = float(limit["value"][1]) + + for request in requests: + if request["metric"]["resource"] == ResourceType.CPU: + requests_values[ResourceType.CPU] = float(request["value"][1]) + elif request["metric"]["resource"] == ResourceType.Memory: + requests_values[ResourceType.Memory] = float(request["value"][1]) + return ResourceAllocations(requests=requests_values, limits=limits_values) + + async def __build_from_owner( + self, namespace: str, app_name: str, containers: list[str], pod_names: list[str] + ) -> list[K8sWorkload]: + return [ + K8sWorkload( + cluster=None, + namespace=namespace, + name=app_name, + kind="Deployment", + container=container_name, + allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find + pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods + ) + for container_name in containers + ] + + async def _list_containers(self, namespace: str, pod_selector: str) -> list[str]: + containers = await self.metrics_loader.loader.query( + f""" + count by (container) ( + kube_pod_container_info{{ + namespace="{namespace}", + pod=~"{pod_selector}" + }} + ) + """ + ) + return [container["metric"]["container"] for container in containers] + + async def _list_containers_in_pods( + self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str + ) -> list[K8sWorkload]: + if pod_owner_kind == "ReplicaSet": + # owner_name is ReplicaSet names + pods = await self.metrics_loader.loader.query( + f""" + count by (owner_name, replicaset, pod) ( + kube_pod_owner{{ + namespace="{namespace}", + owner_name=~"{owner_name}", ' + owner_kind="ReplicaSet" + }} + ) + """ + ) + if pods is None or len(pods) == 0: + return [] # no container + # [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']}, + # {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}] + pod_names = [pod["metric"]["pod"] for pod in pods] + container_names = await self._list_containers(namespace, "|".join(pod_names)) + return await self.__build_from_owner(namespace, app_name, container_names, pod_names) + return [] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py new file mode 100644 index 00000000..c895d218 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py @@ -0,0 +1,50 @@ +import logging +from collections import defaultdict +import itertools +import asyncio +from typing import Literal, Optional, Union + +from robusta_krr.core.models.objects import K8sWorkload + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class DeploymentLoader(BaseKindLoader): + kind = "Deployment" + + async def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]: + logger.debug( + f"Listing deployments in namespace({namespaces})" + ) + ns = "|".join(namespaces) + replicasets = await self.metrics_loader.loader.query( + f""" + count by (namespace, owner_name, replicaset) ( + kube_replicaset_owner{{ + namespace=~"{ns}", + owner_kind="Deployment", + }} + ) + """ + ) + # groupBy: 'ns/owner_name' => [{metadata}...] + pod_owner_kind = "ReplicaSet" + replicaset_dict = defaultdict(list) + for replicaset in replicasets: + replicaset_dict[replicaset["metric"]["namespace"] + "/" + replicaset["metric"]["owner_name"]].append( + replicaset["metric"] + ) + objects = await asyncio.gather( + *[ + self._list_containers_in_pods( + replicas[0]["owner_name"], + pod_owner_kind, + replicas[0]["namespace"], + "|".join(list(map(lambda metric: metric["replicaset"], replicas))), + ) + for replicas in replicaset_dict.values() + ] + ) + return list(itertools.chain(*objects)) diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index c449bca2..eb734ef0 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -23,6 +23,7 @@ logger = logging.getLogger("krr") + class PrometheusMetricsLoader: def __init__(self, *, cluster: Optional[str] = None) -> None: """ @@ -39,7 +40,8 @@ def __init__(self, *, cluster: Optional[str] = None) -> None: raise PrometheusNotFound( f"Wasn't able to connect to any Prometheus service in {cluster or 'inner'} cluster\n" "Try using port-forwarding and/or setting the url manually (using the -p flag.).\n" - "For more information, see 'Giving the Explicit Prometheus URL' at https://github.com/robusta-dev/krr?tab=readme-ov-file#usage" + "For more information, see 'Giving the Explicit Prometheus URL' at " + "https://github.com/robusta-dev/krr?tab=readme-ov-file#usage" ) self.loader = loader diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index de3b3c78..528fd89e 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -2,7 +2,6 @@ import logging import math import os -import sys import warnings from concurrent.futures import ThreadPoolExecutor from typing import Optional, Union @@ -12,7 +11,7 @@ from slack_sdk import WebClient from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult -from robusta_krr.core.integrations.kubernetes import ClusterWorkloadsLoader +from robusta_krr.core.integrations.kubernetes import ClusterConnector, KubeAPIWorkloadLoader from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import K8sWorkload @@ -21,6 +20,7 @@ from robusta_krr.utils.progress_bar import ProgressBar from robusta_krr.utils.version import get_version, load_latest_version + logger = logging.getLogger("krr") @@ -37,36 +37,14 @@ class CriticalRunnerException(Exception): ... class Runner: - EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) - def __init__(self) -> None: - self._k8s_loader = ClusterWorkloadsLoader() - self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {} - self._metrics_service_loaders_error_logged: set[Exception] = set() - self._strategy = settings.create_strategy() + self.connector = ClusterConnector() + self.strategy = settings.create_strategy() self.errors: list[dict] = [] # This executor will be running calculations for recommendations - self._executor = ThreadPoolExecutor(settings.max_workers) - - def _get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]: - if cluster not in self._metrics_service_loaders: - try: - self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster) - except Exception as e: - self._metrics_service_loaders[cluster] = e - - result = self._metrics_service_loaders[cluster] - if isinstance(result, self.EXPECTED_EXCEPTIONS): - if result not in self._metrics_service_loaders_error_logged: - self._metrics_service_loaders_error_logged.add(result) - logger.error(str(result)) - return None - elif isinstance(result, Exception): - raise result - - return result + self.executor = ThreadPoolExecutor(settings.max_workers) @staticmethod def __parse_version_string(version: str) -> tuple[int, ...]: @@ -93,7 +71,7 @@ async def _greet(self) -> None: custom_print(intro_message) custom_print(f"\nRunning Robusta's KRR (Kubernetes Resource Recommender) {current_version}") - custom_print(f"Using strategy: {self._strategy}") + custom_print(f"Using strategy: {self.strategy}") custom_print(f"Using formatter: {settings.format}") if latest_version is not None and self.__check_newer_version_available(current_version, latest_version): custom_print(f"[yellow bold]A newer version of KRR is available: {latest_version}[/yellow bold]") @@ -171,10 +149,10 @@ async def _calculate_object_recommendations(self, object: K8sWorkload) -> Option if prometheus_loader is None: return None - object.pods = await prometheus_loader.load_pods(object, self._strategy.settings.history_timedelta) - if object.pods == []: + object.pods = await prometheus_loader.load_pods(object, self.strategy.settings.history_timedelta) + if object.pods == [] and isinstance(self.connector, KubeAPIWorkloadLoader): # Fallback to Kubernetes API - object.pods = await self._k8s_loader.load_pods(object) + object.pods = await self.connector.load_pods(object) # NOTE: Kubernetes API returned pods, but Prometheus did not # This might happen with fast executing jobs @@ -187,21 +165,21 @@ async def _calculate_object_recommendations(self, object: K8sWorkload) -> Option metrics = await prometheus_loader.gather_data( object, - self._strategy, - self._strategy.settings.history_timedelta, - step=self._strategy.settings.timeframe_timedelta, + self.strategy, + self.strategy.settings.history_timedelta, + step=self.strategy.settings.timeframe_timedelta, ) # NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive # But keep in mind that numpy calcluations will not block the GIL loop = asyncio.get_running_loop() - result = await loop.run_in_executor(self._executor, self._strategy.run, metrics, object) + result = await loop.run_in_executor(self.executor, self.strategy.run, metrics, object) logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)") return self._format_result(result) async def _check_data_availability(self, cluster: Optional[str]) -> None: - prometheus_loader = self._get_prometheus_loader(cluster) + prometheus_loader = self.connector.get_prometheus_loader(cluster) if prometheus_loader is None: return @@ -219,11 +197,11 @@ async def _check_data_availability(self, cluster: Optional[str]) -> None: return logger.debug(f"History range for {cluster}: {history_range}") - enough_data = self._strategy.settings.history_range_enough(history_range) + enough_data = self.strategy.settings.history_range_enough(history_range) if not enough_data: logger.warning(f"Not enough history available for cluster {cluster}.") - try_after = history_range[0] + self._strategy.settings.history_timedelta + try_after = history_range[0] + self.strategy.settings.history_timedelta logger.warning( "If the cluster is freshly installed, it might take some time for the enough data to be available." @@ -257,7 +235,7 @@ async def _gather_object_allocations(self, k8s_object: K8sWorkload) -> Optional[ ) async def _collect_result(self) -> Result: - clusters = await self._k8s_loader.list_clusters() + clusters = await self.connector.list_clusters() if clusters and len(clusters) > 1 and settings.prometheus_url: # this can only happen for multi-cluster querying a single centeralized prometheus # In this scenario we dont yet support determining @@ -275,7 +253,7 @@ async def _collect_result(self) -> Result: await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters]) with ProgressBar(title="Calculating Recommendation") as self.__progressbar: - workloads = await self._k8s_loader.list_workloads(clusters) + workloads = await self.connector.list_workloads(clusters) scans = await asyncio.gather(*[self._gather_object_allocations(k8s_object) for k8s_object in workloads]) successful_scans = [scan for scan in scans if scan is not None] @@ -293,10 +271,10 @@ async def _collect_result(self) -> Result: return Result( scans=scans, - description=self._strategy.description, + description=self.strategy.description, strategy=StrategyData( - name=str(self._strategy).lower(), - settings=self._strategy.settings.dict(), + name=str(self.strategy).lower(), + settings=self.strategy.settings.dict(), ), ) @@ -313,14 +291,14 @@ async def run(self) -> int: try: # eks has a lower step limit than other types of prometheus, it will throw an error - step_count = self._strategy.settings.history_duration * 60 / self._strategy.settings.timeframe_duration + step_count = self.strategy.settings.history_duration * 60 / self.strategy.settings.timeframe_duration if settings.eks_managed_prom and step_count > 11000: - min_step = self._strategy.settings.history_duration * 60 / 10000 + min_step = self.strategy.settings.history_duration * 60 / 10000 logger.warning( f"The timeframe duration provided is insufficient and will be overridden with {min_step}. " f"Kindly adjust --timeframe_duration to a value equal to or greater than {min_step}." ) - self._strategy.settings.timeframe_duration = min_step + self.strategy.settings.timeframe_duration = min_step result = await self._collect_result() logger.info("Result collected, displaying...") diff --git a/robusta_krr/main.py b/robusta_krr/main.py index 5c2d01aa..810ccd15 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -55,6 +55,12 @@ def run_strategy( help="Path to kubeconfig file. If not provided, will attempt to find it.", rich_help_panel="Kubernetes Settings", ), + workload_loader: str = typer.Option( + "kubeapi", + "--workload", + help="Workload loader to use (kubeapi, prometheus).", + rich_help_panel="Kubernetes Settings", + ), impersonate_user: Optional[str] = typer.Option( None, "--as", @@ -250,6 +256,7 @@ def run_strategy( try: config = Config( kubeconfig=kubeconfig, + workload_loader=workload_loader, impersonate_user=impersonate_user, impersonate_group=impersonate_group, clusters="*" if all_clusters else clusters,