Skip to content

Commit

Permalink
Moved the logic from #93 for a new refined structure
Browse files Browse the repository at this point in the history
Co-authored-by: Megrez Lu <[email protected]>
  • Loading branch information
LeaveMyYard and lujiajing1126 committed Apr 22, 2024
1 parent 4fedd82 commit c7ad1cd
Show file tree
Hide file tree
Showing 12 changed files with 309 additions and 84 deletions.
47 changes: 42 additions & 5 deletions robusta_krr/core/integrations/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
V2HorizontalPodAutoscaler,
)

from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.object_like_dict import ObjectLikeDict
from prometrix import PrometheusNotFound

from . import config_patch as _
from .workload_loader import BaseWorkloadLoader, KubeAPIWorkloadLoader, PrometheusWorkloadLoader
Expand All @@ -31,7 +33,13 @@
HPAKey = tuple[str, str, str]


class ClusterWorkloadsLoader:
class ClusterConnector:
EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound)

def __init__(self) -> None:
self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {}
self._metrics_service_loaders_error_logged: set[Exception] = set()

async def list_clusters(self) -> Optional[list[str]]:
"""List all clusters.
Expand Down Expand Up @@ -71,15 +79,44 @@ async def list_clusters(self) -> Optional[list[str]]:
return [context["name"] for context in contexts]

return [context["name"] for context in contexts if context["name"] in settings.clusters]

def get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]:
if cluster not in self._metrics_service_loaders:
try:
self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster)
except Exception as e:
self._metrics_service_loaders[cluster] = e

result = self._metrics_service_loaders[cluster]
if isinstance(result, self.EXPECTED_EXCEPTIONS):
if result not in self._metrics_service_loaders_error_logged:
self._metrics_service_loaders_error_logged.add(result)
logger.error(str(result))
return None
elif isinstance(result, Exception):
raise result

def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
WorkloadLoader = KubeAPIWorkloadLoader if settings.workload_loader == "kubeapi" else PrometheusWorkloadLoader
return result

def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
try:
return WorkloadLoader(cluster=cluster)
if settings.workload_loader == "kubeapi":
return KubeAPIWorkloadLoader(cluster=cluster)
elif settings.workload_loader == "prometheus":
cluster_loader = self.get_prometheus_loader(cluster)
if cluster_loader is not None:
return PrometheusWorkloadLoader(cluster=cluster, metric_loader=cluster_loader)
else:
logger.error(
f"Could not load Prometheus for cluster {cluster} and will skip it."
"Not possible to load workloads through Prometheus without connection to Prometheus."
)
else:
raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented")
except Exception as e:
logger.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None

return None

async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]:
"""List all scannable objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@


class BaseWorkloadLoader(abc.ABC):
def __init__(self, cluster: Optional[str] = None) -> None:
self.cluster = cluster

@abc.abstractmethod
async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]:
async def list_workloads(self) -> list[K8sWorkload]:
pass

@abc.abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.object_like_dict import ObjectLikeDict

from ..base import BaseWorkloadLoader
from .loaders import (
Expand Down Expand Up @@ -41,8 +40,8 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader):
CronJobLoader,
]

def __init__(self, cluster: Optional[str] = None):
super().__init__(cluster)
def __init__(self, cluster: Optional[str] = None) -> None:
self.cluster = cluster

# This executor will be running requests to Kubernetes API
self.executor = ThreadPoolExecutor(settings.max_workers)
Expand Down Expand Up @@ -88,7 +87,7 @@ async def list_workloads(self) -> list[K8sWorkload]:
async def list_pods(self, object: K8sWorkload) -> list[PodData]:
return await self._workload_loaders[object.kind].list_pods(object)

def __build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload:
def _build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload:
name = item.metadata.name
namespace = item.metadata.namespace
kind = kind or item.__class__.__name__[2:]
Expand Down Expand Up @@ -160,7 +159,7 @@ async def _fetch_workload(self, loader: BaseKindLoader) -> list[K8sWorkload]:
if asyncio.iscoroutine(containers):
containers = await containers

result.extend(self.__build_scannable_object(item, container, kind) for container in containers)
result.extend(self._build_scannable_object(item, container, kind) for container in containers)
except ApiException as e:
if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]:
if self._kind_available[kind]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,12 @@

from kubernetes import client # type: ignore
from kubernetes.client.api_client import ApiClient # type: ignore
from kubernetes.client.models import ( # type: ignore
V1Container,
V1DaemonSet,
V1Deployment,
V1Job,
V1Pod,
V1PodList,
V1StatefulSet,
)
from kubernetes.client.models import V1Container, V1PodList # type: ignore

from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData

logger = logging.getLogger("krr")

AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job]
HPAKey = tuple[str, str, str]


Expand Down Expand Up @@ -112,9 +103,7 @@ def _build_selector_query(cls, selector: Any) -> Union[str, None]:
label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()]

if selector.match_expressions is not None:
label_filters += [
cls._get_match_expression_filter(expression) for expression in selector.match_expressions
]
label_filters += [cls._get_match_expression_filter(expression) for expression in selector.match_expressions]

if label_filters == []:
# NOTE: This might mean that we have DeploymentConfig,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
import itertools
import logging


from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
from robusta_krr.core.models.config import settings
from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService
from robusta_krr.core.models.objects import K8sWorkload, PodData
from ..base import BaseWorkloadLoader
from .loaders import BaseKindLoader, DeploymentLoader


logger = logging.getLogger("krr")


class PrometheusWorkloadLoader(BaseWorkloadLoader):
workloads: list[type[BaseKindLoader]] = [DeploymentLoader]

def __init__(self, cluster: str, metric_loader: PrometheusMetricsLoader) -> None:
self.cluster = cluster
self.metric_service = metric_loader
self.loaders = [loader(metric_loader) for loader in self.workloads]

async def list_workloads(self) -> list[K8sWorkload]:
return itertools.chain(await asyncio.gather(*[loader.list_workloads(settings.namespaces, "") for loader in self.loaders]))

async def list_pods(self, object: K8sWorkload) -> list[PodData]:
# This should not be implemented, as implementation will repeat PrometheusMetricsLoader.load_pods
# As this method is ment to be a fallback, repeating the same logic will not be beneficial
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from .base import BaseKindLoader
from .cronjobs import CronJobLoader
from .daemonsets import DaemonSetLoader
from .deploymentconfigs import DeploymentConfigLoader
from .deployments import DeploymentLoader
from .jobs import JobLoader
from .rollouts import RolloutLoader
from .statefulsets import StatefulSetLoader

__all__ = [
"BaseKindLoader",
"CronJobLoader",
"DeploymentLoader",
"DaemonSetLoader",
"DeploymentConfigLoader",
"JobLoader",
"RolloutLoader",
"StatefulSetLoader",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import abc
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Iterable, Literal, Optional, Union

from kubernetes import client # type: ignore
from kubernetes.client.api_client import ApiClient # type: ignore
from kubernetes.client.models import ( # type: ignore
V1Container,
V1DaemonSet,
V1Deployment,
V1Job,
V1Pod,
V1PodList,
V1StatefulSet,
)

from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType
from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData

logger = logging.getLogger("krr")

AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job]
HPAKey = tuple[str, str, str]


class BaseKindLoader(abc.ABC):
"""
This class is used to define how to load a specific kind of Kubernetes object.
It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects.
"""

kind: KindLiteral

def __init__(self, metrics_loader: PrometheusMetricsLoader) -> None:
self.metrics_loader = metrics_loader

@abc.abstractmethod
def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]:
pass

async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations:
limits = await self.metrics_loader.loader.query(
"avg by(resource) (kube_pod_container_resource_limits{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}", '
f'container="{container_name}"'
"})"
)
requests = await self.metrics_loader.loader.query(
"avg by(resource) (kube_pod_container_resource_requests{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}", '
f'container="{container_name}"'
"})"
)
requests_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
limits_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
for limit in limits:
if limit["metric"]["resource"] == ResourceType.CPU:
limits_values[ResourceType.CPU] = float(limit["value"][1])
elif limit["metric"]["resource"] == ResourceType.Memory:
limits_values[ResourceType.Memory] = float(limit["value"][1])

for request in requests:
if request["metric"]["resource"] == ResourceType.CPU:
requests_values[ResourceType.CPU] = float(request["value"][1])
elif request["metric"]["resource"] == ResourceType.Memory:
requests_values[ResourceType.Memory] = float(request["value"][1])
return ResourceAllocations(requests=requests_values, limits=limits_values)

async def __build_from_owner(
self, namespace: str, app_name: str, containers: list[str], pod_names: list[str]
) -> list[K8sWorkload]:
return [
K8sWorkload(
cluster=None,
namespace=namespace,
name=app_name,
kind="Deployment",
container=container_name,
allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find
pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods
)
for container_name in containers
]

async def _list_containers(self, namespace: str, pod_selector: str) -> list[str]:
containers = await self.metrics_loader.loader.query(
f"""
count by (container) (
kube_pod_container_info{{
namespace="{namespace}",
pod=~"{pod_selector}"
}}
)
"""
)
return [container["metric"]["container"] for container in containers]

async def _list_containers_in_pods(
self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str
) -> list[K8sWorkload]:
if pod_owner_kind == "ReplicaSet":
# owner_name is ReplicaSet names
pods = await self.metrics_loader.loader.query(
f"""
count by (owner_name, replicaset, pod) (
kube_pod_owner{{
namespace="{namespace}",
owner_name=~"{owner_name}", '
owner_kind="ReplicaSet"
}}
)
"""
)
if pods is None or len(pods) == 0:
return [] # no container
# [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']},
# {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}]
pod_names = [pod["metric"]["pod"] for pod in pods]
container_names = await self._list_containers(namespace, "|".join(pod_names))
return await self.__build_from_owner(namespace, app_name, container_names, pod_names)
return []
Loading

0 comments on commit c7ad1cd

Please sign in to comment.