Skip to content

Commit

Permalink
Finished structure changes and workload loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard committed Apr 30, 2024
1 parent 7e8f1f4 commit 4c1f5c9
Show file tree
Hide file tree
Showing 27 changed files with 290 additions and 300 deletions.
46 changes: 46 additions & 0 deletions robusta_krr/core/abstract/cluster_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from __future__ import annotations

import abc
import logging
from typing import Optional, TYPE_CHECKING

from .workload_loader import BaseWorkloadLoader

if TYPE_CHECKING:
from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector


logger = logging.getLogger("krr")


class BaseClusterLoader(abc.ABC):
"""
A class that wraps loading data from multiple clusters.
For example, a centralized prometheus server that can query multiple clusters.
Or one kubeconfig can define connections to multiple clusters.
"""

@abc.abstractmethod
async def list_clusters(self) -> Optional[list[str]]:
pass

@abc.abstractmethod
def get_workload_loader(self, cluster: Optional[str]) -> BaseWorkloadLoader:
pass

def try_get_workload_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
try:
return self.get_workload_loader(cluster)
except Exception as e:
logger.error(f"Could not connect to cluster {cluster} and will skip it: {e}")
return None

@abc.abstractmethod
def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector:
"""
Connect to a Prometheus server and return a PrometheusConnector instance.
Cluster = None means that prometheus is the only one: either centralized or in-cluster.
raise prometrix.PrometheusNotFound if Prometheus is not available.
"""

pass
23 changes: 23 additions & 0 deletions robusta_krr/core/abstract/workload_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import abc
import logging

from robusta_krr.core.models.objects import K8sWorkload, PodData


logger = logging.getLogger("krr")


class BaseWorkloadLoader(abc.ABC):
"""A base class for single cluster workload loaders."""

@abc.abstractmethod
async def list_workloads(self) -> list[K8sWorkload]:
pass


class IListPodsFallback(abc.ABC):
"""This is an interface that a workload loader can implement to have a fallback method to list pods."""

@abc.abstractmethod
async def load_pods(self, object: K8sWorkload) -> list[PodData]:
pass
101 changes: 0 additions & 101 deletions robusta_krr/core/integrations/kubernetes/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@
from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException # type: ignore
from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore
from functools import cache

from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.exceptions import CriticalRunnerException
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations

from ..base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader

from robusta_krr.core.abstract.workload_loader import BaseWorkloadLoader, IListPodsFallback
from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader
from .loaders import (
BaseKindLoader,
CronJobLoader,
Expand All @@ -37,7 +43,16 @@ class KubeAPIClusterLoader(BaseClusterLoader):
# Also here we might have different Prometeus instances for different clusters

def __init__(self) -> None:
try:
settings.load_kubeconfig()
except Exception as e:
logger.error(f"Could not load kubernetes configuration: {e.__class__.__name__}\n{e}")
logger.error("Try to explicitly set --context and/or --kubeconfig flags.")
logger.error("Alternatively, try a prometheus-only mode with `--mode prometheus`")
raise CriticalRunnerException("Could not load kubernetes configuration") from e

self.api_client = settings.get_kube_client()
self._prometheus_connectors: dict[Optional[str], PrometheusConnector] = {}

async def list_clusters(self) -> Optional[list[str]]:
if settings.inside_cluster:
Expand Down Expand Up @@ -72,8 +87,19 @@ async def list_clusters(self) -> Optional[list[str]]:

return [context["name"] for context in contexts if context["name"] in settings.clusters]

async def connect_cluster(self, cluster: str) -> KubeAPIWorkloadLoader:
@cache
def get_workload_loader(self, cluster: Optional[str]) -> KubeAPIWorkloadLoader:
return KubeAPIWorkloadLoader(cluster)

@cache
def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector:
connector = PrometheusConnector(cluster=cluster)
if settings.prometheus_url is not None:
logger.info(f"Connecting to Prometheus using URL: {settings.prometheus_url}")
connector.connect(settings.prometheus_url)
else:
logger.info(f"Trying to discover PromQL service" + (f" for cluster {cluster}" if cluster else ""))
connector.discover(api_client=self.api_client)


class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback):
Expand Down Expand Up @@ -131,7 +157,7 @@ async def list_workloads(self) -> list[K8sWorkload]:
if not (settings.namespaces == "*" and object.namespace == "kube-system")
]

async def list_pods(self, object: K8sWorkload) -> list[PodData]:
async def load_pods(self, object: K8sWorkload) -> list[PodData]:
return await self._workload_loaders[object.kind].list_pods(object)

def _build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload:
Expand Down Expand Up @@ -320,4 +346,4 @@ async def _try_list_hpa(self) -> dict[HPAKey, HPAData]:
return {}


__all__ = ["KubeAPIWorkloadLoader"]
__all__ = ["KubeAPIWorkloadLoader", "KubeAPIClusterLoader"]

This file was deleted.

57 changes: 0 additions & 57 deletions robusta_krr/core/integrations/kubernetes/workload_loader/base.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from __future__ import annotations

import asyncio
import itertools
import logging

from collections import Counter

from pyparsing import Optional

from typing import Optional
from functools import cache

from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import K8sWorkload
from ..base import BaseWorkloadLoader, BaseClusterLoader
from robusta_krr.core.abstract.workload_loader import BaseWorkloadLoader
from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader
from robusta_krr.core.models.exceptions import CriticalRunnerException
from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader


Expand All @@ -21,20 +25,34 @@ class PrometheusClusterLoader(BaseClusterLoader):
# NOTE: For PrometheusClusterLoader we have to first connect to Prometheus, as we query all data from it

def __init__(self) -> None:
super().__init__()
self.prometheus_connector = super().connect_prometheus()

async def list_clusters(self) -> list[str]:
return []

async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader:
return PrometheusWorkloadLoader(cluster, self.prometheus_connector)

def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusConnector:
self._prometheus_connector = PrometheusConnector()
if not settings.prometheus_url:
raise CriticalRunnerException(
"Prometheus URL is not provided. "
"Can not auto-discover Prometheus with `--mode prometheus`. "
"Please provide the URL with `--prometheus-url` flag."
)

self._prometheus_connector.connect(settings.prometheus_url)

async def list_clusters(self) -> Optional[list[str]]:
if settings.prometheus_cluster_label is None:
return None

# TODO: We can try to auto-discover clusters by querying Prometheus,
# but for that we will need to rework PrometheusMetric.get_prometheus_cluster_label
return [settings.prometheus_cluster_label]

@cache
def get_workload_loader(self, cluster: str) -> PrometheusWorkloadLoader:
return PrometheusWorkloadLoader(cluster, self._prometheus_connector)

def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector:
# NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters
# so in case of multiple clusters in one Prometheus (centralized version)
# for each cluster we will have the same PrometheusConnector (keyed by None)
return self.prometheus_connector
return self._prometheus_connector


class PrometheusWorkloadLoader(BaseWorkloadLoader):
workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader]
Expand All @@ -56,3 +74,5 @@ async def list_workloads(self) -> list[K8sWorkload]:
logger.info(f"Found {count} {kind} in {self.cluster}")

return workloads

__all__ = ["PrometheusClusterLoader", "PrometheusWorkloadLoader"]
Loading

0 comments on commit 4c1f5c9

Please sign in to comment.