Skip to content

Commit

Permalink
Merge branch 'oom-kill-data-usage' of https://github.com/robusta-dev/krr
Browse files Browse the repository at this point in the history
 into oom-kill-data-usage
  • Loading branch information
LeaveMyYard committed Apr 29, 2024
2 parents fb32618 + d05ae9b commit d15ad3e
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 308 deletions.
49 changes: 0 additions & 49 deletions .idea/workspace.xml

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Read more about [how KRR works](#how-krr-works) and [KRR vs Kubernetes VPA](#dif

### Requirements

KRR requires Prometheus 2.26+ and [kube-state-metrics](https://github.com/kubernetes/kube-state-metrics).
KRR requires Prometheus 2.26+, [kube-state-metrics](https://github.com/kubernetes/kube-state-metrics) & [cAdvisor](https://github.com/google/cadvisor).

<details>
<summary>Which metrics does KRR need?</summary>
Expand Down
257 changes: 115 additions & 142 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ krr = "robusta_krr.main:run"
[tool.poetry.dependencies]
python = ">=3.9,<3.12"
typer = { extras = ["all"], version = "^0.7.0" }
pydantic = "1.10.7"
pydantic = "^1.10.7"
kubernetes = "^26.1.0"
prometheus-api-client = "0.5.3"
numpy = "^1.24.2"
alive-progress = "^3.1.2"
prometrix = "^0.1.16"
prometrix = "^0.1.17"
slack-sdk = "^3.21.3"



[tool.poetry.group.dev.dependencies]
mypy = "^1.0.1"
black = "^23.1.0"
Expand Down
16 changes: 8 additions & 8 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ alive-progress==3.1.2 ; python_version >= "3.9" and python_version < "3.12"
boto3==1.28.21 ; python_version >= "3.9" and python_version < "3.12"
botocore==1.31.21 ; python_version >= "3.9" and python_version < "3.12"
cachetools==5.3.0 ; python_version >= "3.9" and python_version < "3.12"
certifi==2022.12.7 ; python_version >= "3.9" and python_version < "3.12"
certifi==2024.2.2 ; python_version >= "3.9" and python_version < "3.12"
charset-normalizer==3.0.1 ; python_version >= "3.9" and python_version < "3.12"
click==8.1.3 ; python_version >= "3.9" and python_version < "3.12"
colorama==0.4.6 ; python_version >= "3.9" and python_version < "3.12" and platform_system == "Windows"
commonmark==0.9.1 ; python_version >= "3.9" and python_version < "3.12"
contourpy==1.0.7 ; python_version >= "3.9" and python_version < "3.12"
cycler==0.11.0 ; python_version >= "3.9" and python_version < "3.12"
dateparser==1.1.7 ; python_version >= "3.9" and python_version < "3.12"
fonttools==4.39.0 ; python_version >= "3.9" and python_version < "3.12"
fonttools==4.43.0 ; python_version >= "3.9" and python_version < "3.12"
google-auth==2.16.2 ; python_version >= "3.9" and python_version < "3.12"
grapheme==0.6.0 ; python_version >= "3.9" and python_version < "3.12"
httmock==1.4.0 ; python_version >= "3.9" and python_version < "3.12"
Expand All @@ -25,21 +25,21 @@ numpy==1.24.2 ; python_version >= "3.9" and python_version < "3.12"
oauthlib==3.2.2 ; python_version >= "3.9" and python_version < "3.12"
packaging==23.0 ; python_version >= "3.9" and python_version < "3.12"
pandas==1.5.3 ; python_version >= "3.9" and python_version < "3.12"
pillow==9.4.0 ; python_version >= "3.9" and python_version < "3.12"
pillow==10.3.0 ; python_version >= "3.9" and python_version < "3.12"
prometheus-api-client==0.5.3 ; python_version >= "3.9" and python_version < "3.12"
prometrix==0.1.16 ; python_version >= "3.9" and python_version < "3.12"
prometrix==0.1.17 ; python_version >= "3.9" and python_version < "3.12"
pyasn1-modules==0.2.8 ; python_version >= "3.9" and python_version < "3.12"
pyasn1==0.4.8 ; python_version >= "3.9" and python_version < "3.12"
pydantic==1.10.7 ; python_version >= "3.9" and python_version < "3.12"
pygments==2.14.0 ; python_version >= "3.9" and python_version < "3.12"
pydantic==1.10.15 ; python_version >= "3.9" and python_version < "3.12"
pygments==2.17.2 ; python_version >= "3.9" and python_version < "3.12"
pyparsing==3.0.9 ; python_version >= "3.9" and python_version < "3.12"
python-dateutil==2.8.2 ; python_version >= "3.9" and python_version < "3.12"
pytz-deprecation-shim==0.1.0.post0 ; python_version >= "3.9" and python_version < "3.12"
pytz==2022.7.1 ; python_version >= "3.9" and python_version < "3.12"
pyyaml==6.0 ; python_version >= "3.9" and python_version < "3.12"
regex==2022.10.31 ; python_version >= "3.9" and python_version < "3.12"
requests-oauthlib==1.3.1 ; python_version >= "3.9" and python_version < "3.12"
requests==2.28.2 ; python_version >= "3.9" and python_version < "3.12"
requests==2.31.0 ; python_version >= "3.9" and python_version < "3.12"
rich==12.6.0 ; python_version >= "3.9" and python_version < "3.12"
rsa==4.9 ; python_version >= "3.9" and python_version < "3.12"
s3transfer==0.6.1 ; python_version >= "3.9" and python_version < "3.12"
Expand All @@ -51,6 +51,6 @@ typer[all]==0.7.0 ; python_version >= "3.9" and python_version < "3.12"
typing-extensions==4.5.0 ; python_version >= "3.9" and python_version < "3.12"
tzdata==2022.7 ; python_version >= "3.9" and python_version < "3.12"
tzlocal==4.2 ; python_version >= "3.9" and python_version < "3.12"
urllib3==1.26.14 ; python_version >= "3.9" and python_version < "3.12"
urllib3==1.26.18 ; python_version >= "3.9" and python_version < "3.12"
websocket-client==1.5.1 ; python_version >= "3.9" and python_version < "3.12"
zipp==3.15.0 ; python_version >= "3.9" and python_version < "3.10"
97 changes: 47 additions & 50 deletions robusta_krr/core/integrations/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from typing import Any, AsyncGenerator, AsyncIterable, Awaitable, Callable, Iterable, Optional, Union
from typing import Any, Awaitable, Callable, Iterable, Optional, Union

from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException
Expand All @@ -20,7 +20,6 @@
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.async_gen_merge import async_gen_merge
from robusta_krr.utils.object_like_dict import ObjectLikeDict

from . import config_patch as _
Expand Down Expand Up @@ -49,7 +48,7 @@ def __init__(self, cluster: Optional[str]=None):
self.__jobs_for_cronjobs: dict[str, list[V1Job]] = {}
self.__jobs_loading_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)

async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
async def list_scannable_objects(self) -> list[K8sObjectData]:
"""List all scannable objects.
Returns:
Expand All @@ -61,22 +60,23 @@ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
logger.debug(f"Resources: {settings.resources}")

self.__hpa_list = await self._try_list_hpa()

# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
# This will merge all the streams from all the cluster loaders into a single stream
async for object in async_gen_merge(
workload_object_lists = await asyncio.gather(
self._list_deployments(),
self._list_rollouts(),
self._list_deploymentconfig(),
self._list_all_statefulsets(),
self._list_all_daemon_set(),
self._list_all_jobs(),
self._list_all_cronjobs(),
):
)

return [
object
for workload_objects in workload_object_lists
for object in workload_objects
# NOTE: By default we will filter out kube-system namespace
if settings.namespaces == "*" and object.namespace == "kube-system":
continue
yield object
if not (settings.namespaces == "*" and object.namespace == "kube-system")
]

async def _list_jobs_for_cronjobs(self, namespace: str) -> list[V1Job]:
if namespace not in self.__jobs_for_cronjobs:
Expand Down Expand Up @@ -185,12 +185,12 @@ async def _list_namespaced_or_global_objects(
kind: KindLiteral,
all_namespaces_request: Callable,
namespaced_request: Callable
) -> AsyncIterable[Any]:
) -> list[Any]:
logger.debug(f"Listing {kind}s in {self.cluster}")
loop = asyncio.get_running_loop()

if settings.namespaces == "*":
tasks = [
requests = [
loop.run_in_executor(
self.executor,
lambda: all_namespaces_request(
Expand All @@ -200,7 +200,7 @@ async def _list_namespaced_or_global_objects(
)
]
else:
tasks = [
requests = [
loop.run_in_executor(
self.executor,
lambda ns=namespace: namespaced_request(
Expand All @@ -212,14 +212,14 @@ async def _list_namespaced_or_global_objects(
for namespace in settings.namespaces
]

total_items = 0
for task in asyncio.as_completed(tasks):
ret_single = await task
total_items += len(ret_single.items)
for item in ret_single.items:
yield item
result = [
item
for request_result in await asyncio.gather(*requests)
for item in request_result.items
]

logger.debug(f"Found {total_items} {kind} in {self.cluster}")
logger.debug(f"Found {len(result)} {kind} in {self.cluster}")
return result

async def _list_scannable_objects(
self,
Expand All @@ -228,25 +228,25 @@ async def _list_scannable_objects(
namespaced_request: Callable,
extract_containers: Callable[[Any], Union[Iterable[V1Container], Awaitable[Iterable[V1Container]]]],
filter_workflows: Optional[Callable[[Any], bool]] = None,
) -> AsyncIterable[K8sObjectData]:
) -> list[K8sObjectData]:
if not self._should_list_resource(kind):
logger.debug(f"Skipping {kind}s in {self.cluster}")
return

if not self.__kind_available[kind]:
return


result = []
try:
async for item in self._list_namespaced_or_global_objects(kind, all_namespaces_request, namespaced_request):
for item in await self._list_namespaced_or_global_objects(kind, all_namespaces_request, namespaced_request):
if filter_workflows is not None and not filter_workflows(item):
continue

containers = extract_containers(item)
if asyncio.iscoroutine(containers):
containers = await containers

for container in containers:
yield self.__build_scannable_object(item, container, kind)
result.extend(self.__build_scannable_object(item, container, kind) for container in containers)
except ApiException as e:
if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]:
if self.__kind_available[kind]:
Expand All @@ -256,15 +256,17 @@ async def _list_scannable_objects(
logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
logger.error("Will skip this object type and continue.")

def _list_deployments(self) -> AsyncIterable[K8sObjectData]:
return result

def _list_deployments(self) -> list[K8sObjectData]:
return self._list_scannable_objects(
kind="Deployment",
all_namespaces_request=self.apps.list_deployment_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_deployment,
extract_containers=lambda item: item.spec.template.spec.containers,
)

def _list_rollouts(self) -> AsyncIterable[K8sObjectData]:
def _list_rollouts(self) -> list[K8sObjectData]:
async def _extract_containers(item: Any) -> list[V1Container]:
if item.spec.template is not None:
return item.spec.template.spec.containers
Expand Down Expand Up @@ -311,7 +313,7 @@ async def _extract_containers(item: Any) -> list[V1Container]:
extract_containers=_extract_containers,
)

def _list_deploymentconfig(self) -> AsyncIterable[K8sObjectData]:
def _list_deploymentconfig(self) -> list[K8sObjectData]:
# NOTE: Using custom objects API returns dicts, but all other APIs return objects
# We need to handle this difference using a small wrapper
return self._list_scannable_objects(
Expand All @@ -335,23 +337,23 @@ def _list_deploymentconfig(self) -> AsyncIterable[K8sObjectData]:
extract_containers=lambda item: item.spec.template.spec.containers,
)

def _list_all_statefulsets(self) -> AsyncIterable[K8sObjectData]:
def _list_all_statefulsets(self) -> list[K8sObjectData]:
return self._list_scannable_objects(
kind="StatefulSet",
all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_stateful_set,
extract_containers=lambda item: item.spec.template.spec.containers,
)

def _list_all_daemon_set(self) -> AsyncIterable[K8sObjectData]:
def _list_all_daemon_set(self) -> list[K8sObjectData]:
return self._list_scannable_objects(
kind="DaemonSet",
all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_daemon_set,
extract_containers=lambda item: item.spec.template.spec.containers,
)

def _list_all_jobs(self) -> AsyncIterable[K8sObjectData]:
def _list_all_jobs(self) -> list[K8sObjectData]:
return self._list_scannable_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
Expand All @@ -363,7 +365,7 @@ def _list_all_jobs(self) -> AsyncIterable[K8sObjectData]:
),
)

def _list_all_cronjobs(self) -> AsyncIterable[K8sObjectData]:
def _list_all_cronjobs(self) -> list[K8sObjectData]:
return self._list_scannable_objects(
kind="CronJob",
all_namespaces_request=self.batch.list_cron_job_for_all_namespaces,
Expand Down Expand Up @@ -398,14 +400,10 @@ async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
}

async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(
self.executor,
lambda: self._list_namespaced_or_global_objects(
kind="HPA-v2",
all_namespaces_request=self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces,
namespaced_request=self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler,
),
res = await self._list_namespaced_or_global_objects(
kind="HPA-v2",
all_namespaces_request=self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces,
namespaced_request=self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler,
)
def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]:
return next(
Expand All @@ -429,7 +427,7 @@ def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[f
target_cpu_utilization_percentage=__get_metric(hpa, "cpu"),
target_memory_utilization_percentage=__get_metric(hpa, "memory"),
)
async for hpa in res
for hpa in res
}

# TODO: What should we do in case of other metrics bound to the HPA?
Expand Down Expand Up @@ -514,7 +512,7 @@ def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[Cluster
logger.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None

async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterable[K8sObjectData]:
async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
"""List all scannable objects.
Yields:
Expand All @@ -529,13 +527,12 @@ async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIt
if self.cluster_loaders == {}:
logger.error("Could not load any cluster.")
return

# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
# This will merge all the streams from all the cluster loaders into a single stream
async for object in async_gen_merge(
*[cluster_loader.list_scannable_objects() for cluster_loader in self.cluster_loaders.values()]
):
yield object

return [
object
for cluster_loader in self.cluster_loaders.values()
for object in await cluster_loader.list_scannable_objects()
]

async def load_pods(self, object: K8sObjectData) -> list[PodData]:
try:
Expand Down
Loading

0 comments on commit d15ad3e

Please sign in to comment.