Skip to content

Commit

Permalink
BaseClusterLoader, class structure change, not finished
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard committed Apr 29, 2024
1 parent b9a62a0 commit 7e8f1f4
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 124 deletions.
94 changes: 21 additions & 73 deletions robusta_krr/core/integrations/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
from prometrix import PrometheusNotFound

from . import config_patch as _
from .workload_loader import BaseWorkloadLoader, KubeAPIWorkloadLoader, PrometheusWorkloadLoader
from .workload_loader import (
BaseWorkloadLoader,
PrometheusWorkloadLoader,
BaseClusterLoader,
KubeAPIClusterLoader,
PrometheusClusterLoader,
)

logger = logging.getLogger("krr")

Expand All @@ -40,81 +46,23 @@ def __init__(self) -> None:
self._prometheus_connectors: dict[Optional[str], Union[PrometheusConnector, Exception]] = {}
self._connector_errors: set[Exception] = set()

async def list_clusters(self) -> Optional[list[str]]:
"""List all clusters.
Returns:
A list of clusters.
"""

if settings.inside_cluster:
logger.debug("Working inside the cluster")
return None

try:
contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig)
except config.ConfigException:
if settings.clusters is not None and settings.clusters != "*":
logger.warning("Could not load context from kubeconfig.")
logger.warning(f"Falling back to clusters from CLI: {settings.clusters}")
return settings.clusters
else:
logger.error(
"Could not load context from kubeconfig. "
"Please check your kubeconfig file or pass -c flag with the context name."
)
return None

logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}")
logger.debug(f"Current cluster: {current_context['name']}")

logger.debug(f"Configured clusters: {settings.clusters}")

# None, empty means current cluster
if not settings.clusters:
return [current_context["name"]]

# * means all clusters
if settings.clusters == "*":
return [context["name"] for context in contexts]

return [context["name"] for context in contexts if context["name"] in settings.clusters]

def get_prometheus(self, cluster: Optional[str]) -> Optional[PrometheusConnector]:
if cluster not in self._prometheus_connectors:
try:
logger.debug(f"Creating Prometheus connector for cluster {cluster}")
self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster)
except Exception as e:
self._prometheus_connectors[cluster] = e

result = self._prometheus_connectors[cluster]
if isinstance(result, self.EXPECTED_EXCEPTIONS):
if result not in self._connector_errors:
self._connector_errors.add(result)
logger.error(str(result))
return None
elif isinstance(result, Exception):
raise result
return result

def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
if settings.workload_loader == "kubeapi":
logger.debug(f"Creating Prometheus connector for cluster {cluster}")
elif settings.workload_loader == "prometheus":
logger.debug(f"Creating Prometheus connector")
# NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters
# so in case of multiple clusters in one Prometheus (centralized version)
# for each cluster we will have the same PrometheusConnector (keyed by None)
cluster = None



def _create_cluster_loader(self) -> BaseClusterLoader:
try:
if settings.workload_loader == "kubeapi":
return KubeAPIWorkloadLoader(cluster=cluster)
elif settings.workload_loader == "prometheus":
cluster_loader = self.get_prometheus(cluster)
if cluster_loader is not None:
return PrometheusWorkloadLoader(cluster=cluster, metric_loader=cluster_loader)
else:
logger.error(
f"Could not load Prometheus for cluster {cluster} and will skip it."
"Not possible to load workloads through Prometheus without connection to Prometheus."
)
else:
raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented")

except Exception as e:
logger.error(f"Could not load cluster {cluster} and will skip it: {e}")
logger.error(f"Could not connect to cluster loader and will skip it: {e}")

return None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from .base import BaseWorkloadLoader, IListPodsFallback
from .kube_api import KubeAPIWorkloadLoader
from .prometheus import PrometheusWorkloadLoader
from .base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader
from .kube_api import KubeAPIWorkloadLoader, KubeAPIClusterLoader
from .prometheus import PrometheusWorkloadLoader, PrometheusClusterLoader

__all__ = ["BaseWorkloadLoader", "IListPodsFallback", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"]
__all__ = [
"BaseWorkloadLoader",
"IListPodsFallback",
"KubeAPIWorkloadLoader",
"PrometheusWorkloadLoader",
"BaseClusterLoader",
"KubeAPIClusterLoader",
"PrometheusClusterLoader",
]
41 changes: 40 additions & 1 deletion robusta_krr/core/integrations/kubernetes/workload_loader/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import abc
import logging

from typing import Optional, Union
from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService
from robusta_krr.core.models.objects import K8sWorkload, PodData


logger = logging.getLogger("krr")


class BaseWorkloadLoader(abc.ABC):
"""A base class for workload loaders."""
"""A base class for single cluster workload loaders."""

@abc.abstractmethod
async def list_workloads(self) -> list[K8sWorkload]:
Expand All @@ -16,3 +24,34 @@ class IListPodsFallback(abc.ABC):
@abc.abstractmethod
async def list_pods(self, object: K8sWorkload) -> list[PodData]:
pass


class BaseClusterLoader(abc.ABC):
"""
A class that wraps loading data from multiple clusters.
For example, a centralized prometheus server that can query multiple clusters.
Or one kubeconfig can define connections to multiple clusters.
"""

def __init__(self) -> None:
self._prometheus_connectors: dict[Optional[str], PrometheusConnector] = {}
self._connector_errors: set[Exception] = set()

@abc.abstractmethod
async def list_clusters(self) -> Optional[list[str]]:
pass

@abc.abstractmethod
async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader:
pass

def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusMetricsService:
"""
Connect to a Prometheus server and return a PrometheusConnector instance.
Cluster = None means that prometheus is the only one: either centralized or in-cluster.
"""

if cluster not in self._prometheus_connectors:
self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster)

return self._prometheus_connectors[cluster]
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from __future__ import annotations

import asyncio
import logging
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Awaitable, Callable, Optional

from kubernetes import client # type: ignore
from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException # type: ignore
from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore

from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations

from ..base import BaseWorkloadLoader, IListPodsFallback
from ..base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader
from .loaders import (
BaseKindLoader,
CronJobLoader,
Expand All @@ -29,8 +31,53 @@
HPAKey = tuple[str, str, str]


class KubeAPIClusterLoader(BaseClusterLoader):
# NOTE: For KubeAPIClusterLoader we have to first connect to read kubeconfig
# We do not need to connect to Prometheus from here, as we query all data from Kubernetes API
# Also here we might have different Prometeus instances for different clusters

def __init__(self) -> None:
self.api_client = settings.get_kube_client()

async def list_clusters(self) -> Optional[list[str]]:
if settings.inside_cluster:
logger.debug("Working inside the cluster")
return None

try:
contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig)
except config.ConfigException:
if settings.clusters is not None and settings.clusters != "*":
logger.warning("Could not load context from kubeconfig.")
logger.warning(f"Falling back to clusters from CLI: {settings.clusters}")
return settings.clusters
else:
logger.error(
"Could not load context from kubeconfig. "
"Please check your kubeconfig file or pass -c flag with the context name."
)
return None

logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}")
logger.debug(f"Current cluster: {current_context['name']}")
logger.debug(f"Configured clusters: {settings.clusters}")

# None, empty means current cluster
if not settings.clusters:
return [current_context["name"]]

# * means all clusters
if settings.clusters == "*":
return [context["name"] for context in contexts]

return [context["name"] for context in contexts if context["name"] in settings.clusters]

async def connect_cluster(self, cluster: str) -> KubeAPIWorkloadLoader:
return KubeAPIWorkloadLoader(cluster)


class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback):
workload_loaders: list[BaseKindLoader] = [
kind_loaders: list[BaseKindLoader] = [
DeploymentLoader,
RolloutLoader,
DeploymentConfigLoader,
Expand All @@ -40,7 +87,7 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback):
CronJobLoader,
]

def __init__(self, cluster: Optional[str] = None) -> None:
def __init__(self, cluster: Optional[str]) -> None:
self.cluster = cluster

# This executor will be running requests to Kubernetes API
Expand All @@ -53,7 +100,7 @@ def __init__(self, cluster: Optional[str] = None) -> None:
self._kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True)
self._hpa_list: dict[HPAKey, HPAData] = {}
self._workload_loaders: dict[KindLiteral, BaseKindLoader] = {
loader.kind: loader(self.api_client, self.executor) for loader in self.workload_loaders
loader.kind: loader(self.api_client, self.executor) for loader in self.kind_loaders
}

async def list_workloads(self) -> list[K8sWorkload]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,45 @@

from collections import Counter

from pyparsing import Optional


from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import K8sWorkload
from ..base import BaseWorkloadLoader
from ..base import BaseWorkloadLoader, BaseClusterLoader
from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader


logger = logging.getLogger("krr")


class PrometheusClusterLoader(BaseClusterLoader):
# NOTE: For PrometheusClusterLoader we have to first connect to Prometheus, as we query all data from it

def __init__(self) -> None:
super().__init__()
self.prometheus_connector = super().connect_prometheus()

async def list_clusters(self) -> list[str]:
return []

async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader:
return PrometheusWorkloadLoader(cluster, self.prometheus_connector)

def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusConnector:
# NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters
# so in case of multiple clusters in one Prometheus (centralized version)
# for each cluster we will have the same PrometheusConnector (keyed by None)
return self.prometheus_connector

class PrometheusWorkloadLoader(BaseWorkloadLoader):
workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader]

def __init__(self, cluster: str, metric_loader: PrometheusConnector) -> None:
def __init__(self, cluster: str, prometheus_connector: PrometheusConnector) -> None:
self.cluster = cluster
self.metric_service = metric_loader
self.loaders = [loader(metric_loader) for loader in self.workloads]
self.metric_service = prometheus_connector
self.loaders = [loader(prometheus_connector) for loader in self.workloads]

async def list_workloads(self) -> list[K8sWorkload]:
workloads = list(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class BaseKindLoader(abc.ABC):

kinds: list[KindLiteral] = []

def __init__(self, connector: PrometheusConnector) -> None:
self.connector = connector
def __init__(self, prometheus: PrometheusConnector) -> None:
self.prometheus = prometheus
self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label()

@property
Expand All @@ -50,7 +50,7 @@ def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8s
pass

async def _parse_allocation(self, namespace: str, pods: list[str], container_name: str) -> ResourceAllocations:
limits = await self.connector.loader.query(
limits = await self.prometheus.loader.query(
f"""
avg by(resource) (
kube_pod_container_resource_limits{{
Expand All @@ -62,7 +62,7 @@ async def _parse_allocation(self, namespace: str, pods: list[str], container_nam
)
"""
)
requests = await self.connector.loader.query(
requests = await self.prometheus.loader.query(
f"""
avg by(resource) (
kube_pod_container_resource_requests{{
Expand Down Expand Up @@ -90,7 +90,7 @@ async def _parse_allocation(self, namespace: str, pods: list[str], container_nam
return ResourceAllocations(requests=requests_values, limits=limits_values)

async def _list_containers_in_pods(self, pods: list[str]) -> set[str]:
containers = await self.connector.loader.query(
containers = await self.prometheus.loader.query(
f"""
count by (container) (
kube_pod_container_info{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def list_workloads_by_subowner(
subowner_label = subowner_kind.lower() if subowner_kind != "Job" else "job_name"

# Replica is for ReplicaSet and/or ReplicationController
subowners = await self.connector.loader.query(
subowners = await self.prometheus.loader.query(
f"""
count by (namespace, owner_name, {subowner_label}, owner_kind) (
{metric_name} {{
Expand Down Expand Up @@ -91,7 +91,7 @@ async def list_workloads_by_subowner(
async def _list_pods_of_subowner(
self, namespace: str, name: str, kind: str, subowner_kind: str, subowner_names: list[str]
) -> list[K8sWorkload]:
pods = await self.connector.loader.query(
pods = await self.prometheus.loader.query(
f"""
count by (namespace, owner_name, owner_kind, pod) (
kube_pod_owner{{
Expand All @@ -111,7 +111,7 @@ async def _list_pods_of_subowner(

return [
K8sWorkload(
cluster=self.connector.cluster,
cluster=self.prometheus.cluster,
namespace=namespace,
name=name,
kind=kind,
Expand Down
Loading

0 comments on commit 7e8f1f4

Please sign in to comment.