Skip to content

Commit

Permalink
Implement remaining kinds in prometheus workload loader
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard committed Apr 24, 2024
1 parent c7ad1cd commit bf90978
Show file tree
Hide file tree
Showing 16 changed files with 305 additions and 174 deletions.
24 changes: 12 additions & 12 deletions robusta_krr/core/integrations/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
V2HorizontalPodAutoscaler,
)

from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
Expand All @@ -37,8 +37,8 @@ class ClusterConnector:
EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound)

def __init__(self) -> None:
self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {}
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._prometheus_connectors: dict[Optional[str], Union[PrometheusConnector, Exception]] = {}
self._connector_errors: set[Exception] = set()

async def list_clusters(self) -> Optional[list[str]]:
"""List all clusters.
Expand Down Expand Up @@ -80,30 +80,30 @@ async def list_clusters(self) -> Optional[list[str]]:

return [context["name"] for context in contexts if context["name"] in settings.clusters]

def get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]:
if cluster not in self._metrics_service_loaders:
def get_prometheus(self, cluster: Optional[str]) -> Optional[PrometheusConnector]:
if cluster not in self._prometheus_connectors:
try:
self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster)
logger.debug(f"Creating Prometheus connector for cluster {cluster}")
self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster)
except Exception as e:
self._metrics_service_loaders[cluster] = e
self._prometheus_connectors[cluster] = e

result = self._metrics_service_loaders[cluster]
result = self._prometheus_connectors[cluster]
if isinstance(result, self.EXPECTED_EXCEPTIONS):
if result not in self._metrics_service_loaders_error_logged:
self._metrics_service_loaders_error_logged.add(result)
if result not in self._connector_errors:
self._connector_errors.add(result)
logger.error(str(result))
return None
elif isinstance(result, Exception):
raise result

return result

def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
try:
if settings.workload_loader == "kubeapi":
return KubeAPIWorkloadLoader(cluster=cluster)
elif settings.workload_loader == "prometheus":
cluster_loader = self.get_prometheus_loader(cluster)
cluster_loader = self.get_prometheus(cluster)
if cluster_loader is not None:
return PrometheusWorkloadLoader(cluster=cluster, metric_loader=cluster_loader)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .base import BaseWorkloadLoader
from .base import BaseWorkloadLoader, IListPodsFallback
from .kube_api import KubeAPIWorkloadLoader
from .prometheus import PrometheusWorkloadLoader

__all__ = ["BaseWorkloadLoader", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"]
__all__ = ["BaseWorkloadLoader", "IListPodsFallback", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"]
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import abc
from typing import Optional
from robusta_krr.core.models.objects import K8sWorkload, PodData


class BaseWorkloadLoader(abc.ABC):
"""A base class for workload loaders."""

@abc.abstractmethod
async def list_workloads(self) -> list[K8sWorkload]:
pass


class IListPodsFallback(abc.ABC):
"""This is an interface that a workload loader can implement to have a fallback method to list pods."""

@abc.abstractmethod
async def list_pods(self, object: K8sWorkload) -> list[PodData]:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations

from ..base import BaseWorkloadLoader
from ..base import BaseWorkloadLoader, IListPodsFallback
from .loaders import (
BaseKindLoader,
CronJobLoader,
Expand All @@ -29,7 +29,7 @@
HPAKey = tuple[str, str, str]


class KubeAPIWorkloadLoader(BaseWorkloadLoader):
class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback):
workload_loaders: list[BaseKindLoader] = [
DeploymentLoader,
RolloutLoader,
Expand Down Expand Up @@ -176,10 +176,12 @@ async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
res = await self._list_namespaced_or_global_objects(
kind="HPA-v1",
all_namespaces_request=lambda **kwargs: loop.run_in_executor(
self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs),
self.executor,
lambda: self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs),
),
namespaced_request=lambda **kwargs: loop.run_in_executor(
self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs),
self.executor,
lambda: self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs),
),
)

Expand All @@ -205,10 +207,12 @@ async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]:
res = await self._list_namespaced_or_global_objects(
kind="HPA-v2",
all_namespaces_request=lambda **kwargs: loop.run_in_executor(
self.executor, self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs)
self.executor,
lambda: self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs),
),
namespaced_request=lambda **kwargs: loop.run_in_executor(
self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs),
self.executor,
lambda: self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs),
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,36 @@
import itertools
import logging

from collections import Counter

from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader

from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.models.config import settings
from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService
from robusta_krr.core.models.objects import K8sWorkload, PodData
from robusta_krr.core.models.objects import K8sWorkload
from ..base import BaseWorkloadLoader
from .loaders import BaseKindLoader, DeploymentLoader
from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader


logger = logging.getLogger("krr")


class PrometheusWorkloadLoader(BaseWorkloadLoader):
workloads: list[type[BaseKindLoader]] = [DeploymentLoader]
workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader]

def __init__(self, cluster: str, metric_loader: PrometheusMetricsLoader) -> None:
def __init__(self, cluster: str, metric_loader: PrometheusConnector) -> None:
self.cluster = cluster
self.metric_service = metric_loader
self.loaders = [loader(metric_loader) for loader in self.workloads]

async def list_workloads(self) -> list[K8sWorkload]:
return itertools.chain(await asyncio.gather(*[loader.list_workloads(settings.namespaces, "") for loader in self.loaders]))
workloads = list(
itertools.chain(
*await asyncio.gather(*[loader.list_workloads(settings.namespaces) for loader in self.loaders])
)
)

kind_counts = Counter([workload.kind for workload in workloads])
for kind, count in kind_counts.items():
logger.info(f"Found {count} {kind} in {self.cluster}")

async def list_pods(self, object: K8sWorkload) -> list[PodData]:
# This should not be implemented, as implementation will repeat PrometheusMetricsLoader.load_pods
# As this method is ment to be a fallback, repeating the same logic will not be beneficial
raise NotImplementedError
return workloads
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
from .base import BaseKindLoader
from .cronjobs import CronJobLoader
from .daemonsets import DaemonSetLoader
from .deploymentconfigs import DeploymentConfigLoader
from .deployments import DeploymentLoader
from .jobs import JobLoader
from .rollouts import RolloutLoader
from .statefulsets import StatefulSetLoader
from .double_parent import DoubleParentLoader
from .simple_parent import SimpleParentLoader

__all__ = [
"BaseKindLoader",
"CronJobLoader",
"DeploymentLoader",
"DaemonSetLoader",
"DeploymentConfigLoader",
"JobLoader",
"RolloutLoader",
"StatefulSetLoader",
"DoubleParentLoader",
"SimpleParentLoader",
]
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import asyncio
from collections import defaultdict
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Iterable, Literal, Optional, Union
Expand All @@ -15,8 +16,10 @@
V1PodList,
V1StatefulSet,
)
from robusta_krr.core.models.config import settings

from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.integrations.prometheus.metrics.base import PrometheusMetric
from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType
from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData

Expand All @@ -32,29 +35,44 @@ class BaseKindLoader(abc.ABC):
It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects.
"""

kind: KindLiteral
kinds: list[KindLiteral] = []

def __init__(self, metrics_loader: PrometheusMetricsLoader) -> None:
self.metrics_loader = metrics_loader
def __init__(self, connector: PrometheusConnector) -> None:
self.connector = connector
self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label()

@property
def kinds_to_scan(self) -> list[KindLiteral]:
return [kind for kind in self.kinds if kind in settings.resources] if settings.resources != "*" else self.kinds

@abc.abstractmethod
def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]:
def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]:
pass

async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations:
limits = await self.metrics_loader.loader.query(
"avg by(resource) (kube_pod_container_resource_limits{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}", '
f'container="{container_name}"'
"})"
async def _parse_allocation(self, namespace: str, pods: list[str], container_name: str) -> ResourceAllocations:
limits = await self.connector.loader.query(
f"""
avg by(resource) (
kube_pod_container_resource_limits{{
{self.cluster_selector}
namespace="{namespace}",
pod=~"{'|'.join(pods)}",
container="{container_name}"
}}
)
"""
)
requests = await self.metrics_loader.loader.query(
"avg by(resource) (kube_pod_container_resource_requests{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}", '
f'container="{container_name}"'
"})"
requests = await self.connector.loader.query(
f"""
avg by(resource) (
kube_pod_container_resource_requests{{
{self.cluster_selector}
namespace="{namespace}",
pod=~"{'|'.join(pods)}",
container="{container_name}"
}}
)
"""
)
requests_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
limits_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
Expand All @@ -71,56 +89,16 @@ async def __parse_allocation(self, namespace: str, pod_selector: str, container_
requests_values[ResourceType.Memory] = float(request["value"][1])
return ResourceAllocations(requests=requests_values, limits=limits_values)

async def __build_from_owner(
self, namespace: str, app_name: str, containers: list[str], pod_names: list[str]
) -> list[K8sWorkload]:
return [
K8sWorkload(
cluster=None,
namespace=namespace,
name=app_name,
kind="Deployment",
container=container_name,
allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find
pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods
)
for container_name in containers
]

async def _list_containers(self, namespace: str, pod_selector: str) -> list[str]:
containers = await self.metrics_loader.loader.query(
async def _list_containers_in_pods(self, pods: list[str]) -> set[str]:
containers = await self.connector.loader.query(
f"""
count by (container) (
kube_pod_container_info{{
namespace="{namespace}",
pod=~"{pod_selector}"
{self.cluster_selector}
pod=~"{'|'.join(pods)}"
}}
)
"""
)
return [container["metric"]["container"] for container in containers]

async def _list_containers_in_pods(
self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str
) -> list[K8sWorkload]:
if pod_owner_kind == "ReplicaSet":
# owner_name is ReplicaSet names
pods = await self.metrics_loader.loader.query(
f"""
count by (owner_name, replicaset, pod) (
kube_pod_owner{{
namespace="{namespace}",
owner_name=~"{owner_name}", '
owner_kind="ReplicaSet"
}}
)
"""
)
if pods is None or len(pods) == 0:
return [] # no container
# [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']},
# {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}]
pod_names = [pod["metric"]["pod"] for pod in pods]
container_names = await self._list_containers(namespace, "|".join(pod_names))
return await self.__build_from_owner(namespace, app_name, container_names, pod_names)
return []
return {container["metric"]["container"] for container in containers}
Loading

0 comments on commit bf90978

Please sign in to comment.