Skip to content

Commit

Permalink
Merge branch 'main' into prometheus-workload-loader
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard committed Apr 30, 2024
2 parents f7d8412 + dd13dee commit 09c372b
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 65 deletions.
49 changes: 0 additions & 49 deletions .idea/workspace.xml

This file was deleted.

2 changes: 1 addition & 1 deletion robusta_krr/core/abstract/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ResourceRecommendation(pd.BaseModel):
request: Optional[float]
limit: Optional[float]
info: Optional[str] = pd.Field(
None, description="Additional information about the recommendation. Currently used to explain undefined."
None, description="Additional information about the recommendation."
)

@classmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .base import PrometheusMetric
from .cpu import CPUAmountLoader, CPULoader, PercentileCPULoader
from .memory import MaxMemoryLoader, MemoryAmountLoader, MemoryLoader
from .memory import MaxMemoryLoader, MemoryAmountLoader, MemoryLoader, MaxOOMKilledMemoryLoader
6 changes: 5 additions & 1 deletion robusta_krr/core/integrations/prometheus/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class PrometheusMetric(BaseMetric):
query_type: QueryType = QueryType.Query
filtering: bool = True
pods_batch_size: Optional[int] = 50
warning_on_no_data: bool = True

def __init__(
self,
Expand Down Expand Up @@ -127,7 +128,10 @@ def _query_prometheus_sync(self, data: PrometheusMetricData) -> list[PrometheusS
return response["result"]
else:
# regular query, lighter on preformance
response = self.prometheus.safe_custom_query(query=data.query)
try:
response = self.prometheus.safe_custom_query(query=data.query)
except Exception as e:
raise ValueError(f"Failed to run query: {data.query}") from e
results = response["result"]
# format the results to return the same format as custom_query_range
for result in results:
Expand Down
38 changes: 38 additions & 0 deletions robusta_krr/core/integrations/prometheus/metrics/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,41 @@ def get_query(self, object: K8sWorkload, duration: str, step: str) -> str:
[{duration}:{step}]
)
"""

# TODO: Need to battle test if this one is correct.
class MaxOOMKilledMemoryLoader(PrometheusMetric):
"""
A metric loader for loading the maximum memory limits that were surpassed by the OOMKilled event.
"""

warning_on_no_data = False

def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return f"""
max_over_time(
max(
max(
kube_pod_container_resource_limits{{
resource="memory",
namespace="{object.namespace}",
pod=~"{pods_selector}",
container="{object.container}"
{cluster_label}
}}
) by (pod, container, job)
* on(pod, container, job) group_left(reason)
max(
kube_pod_container_status_last_terminated_reason{{
reason="OOMKilled",
namespace="{object.namespace}",
pod=~"{pods_selector}",
container="{object.container}"
{cluster_label}
}}
) by (pod, container, job, reason)
) by (container, pod, job)
[{duration}:{step}]
)
"""
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,10 @@ async def gather_data(
elif "Memory" in LoaderClass.__name__:
object.add_warning("NoPrometheusMemoryMetrics")

logger.warning(
f"{metric_loader.service_name} returned no {metric_loader.__class__.__name__} metrics for {object}"
)
if LoaderClass.warning_on_no_data:
logger.warning(
f"{metric_loader.service_name} returned no {metric_loader.__class__.__name__} metrics for {object}"
)

return data

Expand Down
14 changes: 13 additions & 1 deletion robusta_krr/formatters/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ def __calc_diff(allocated, recommended, selector, multiplier=1) -> str:
return f"{diff_sign}{_format(abs(diff_val) * multiplier)}"


DEFAULT_INFO_COLOR = "grey27"
INFO_COLORS: dict[str, str] = {
"OOMKill detected": "dark_red",
}


def _format_request_str(item: ResourceScan, resource: ResourceType, selector: str) -> str:
allocated = getattr(item.object.allocations, selector)[resource]
info = item.recommended.info.get(resource)
Expand All @@ -46,14 +52,20 @@ def _format_request_str(item: ResourceScan, resource: ResourceType, selector: st
if diff != "":
diff = f"({diff}) "

if info is None:
info_formatted = ""
else:
color = INFO_COLORS.get(info, DEFAULT_INFO_COLOR)
info_formatted = f"\n[{color}]({info})[/{color}]"

return (
diff
+ f"[{severity.color}]"
+ _format(allocated)
+ " -> "
+ _format(recommended.value)
+ f"[/{severity.color}]"
+ (f" [grey27]({info})[/grey27]" if info else "")
+ info_formatted
)


Expand Down
2 changes: 1 addition & 1 deletion robusta_krr/strategies/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .simple import SimpleStrategy
from .simple import SimpleStrategy
72 changes: 64 additions & 8 deletions robusta_krr/strategies/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
MemoryAmountLoader,
PercentileCPULoader,
PrometheusMetric,
MaxOOMKilledMemoryLoader,
)


class SimpleStrategySettings(StrategySettings):
cpu_percentile: float = pd.Field(99, gt=0, le=100, description="The percentile to use for the CPU recommendation.")
memory_buffer_percentage: float = pd.Field(
Expand All @@ -29,14 +31,27 @@ class SimpleStrategySettings(StrategySettings):
points_required: int = pd.Field(
100, ge=1, description="The number of data points required to make a recommendation for a resource."
)
allow_hpa: bool = pd.Field(False, description="Whether to calculate recommendations even when there is an HPA scaler defined on that resource.")
allow_hpa: bool = pd.Field(
False,
description="Whether to calculate recommendations even when there is an HPA scaler defined on that resource.",
)
use_oomkill_data: bool = pd.Field(
False,
description="Whether to bump the memory when OOMKills are detected (experimental).",
)
oom_memory_buffer_percentage: float = pd.Field(
25, gt=0, description="What percentage to increase the memory when there are OOMKill events."
)

def calculate_memory_proposal(self, data: PodsTimeData) -> float:
def calculate_memory_proposal(self, data: PodsTimeData, max_oomkill: float = 0) -> float:
data_ = [np.max(values[:, 1]) for values in data.values()]
if len(data_) == 0:
return float("NaN")

return np.max(data_) * (1 + self.memory_buffer_percentage / 100)
return max(
np.max(data_) * (1 + self.memory_buffer_percentage / 100),
max_oomkill * (1 + self.oom_memory_buffer_percentage / 100),
)

def calculate_cpu_proposal(self, data: PodsTimeData) -> float:
if len(data) == 0:
Expand Down Expand Up @@ -75,7 +90,17 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):

@property
def metrics(self) -> list[type[PrometheusMetric]]:
return [PercentileCPULoader(self.settings.cpu_percentile), MaxMemoryLoader, CPUAmountLoader, MemoryAmountLoader]
metrics = [
PercentileCPULoader(self.settings.cpu_percentile),
MaxMemoryLoader,
CPUAmountLoader,
MemoryAmountLoader,
]

if self.settings.use_oomkill_data:
metrics.append(MaxOOMKilledMemoryLoader)

return metrics

def __calculate_cpu_proposal(
self, history_data: MetricsPodData, object_data: K8sWorkload
Expand All @@ -85,13 +110,20 @@ def __calculate_cpu_proposal(
if len(data) == 0:
return ResourceRecommendation.undefined(info="No data")

# NOTE: metrics for each pod are returned as list[values] where values is [timestamp, value]
# As CPUAmountLoader returns only the last value (1 point), [0, 1] is used to get the value
# So each pod is string with pod name, and values is numpy array of shape (N, 2)
data_count = {pod: values[0, 1] for pod, values in history_data["CPUAmountLoader"].items()}
total_points_count = sum(data_count.values())

if total_points_count < self.settings.points_required:
return ResourceRecommendation.undefined(info="Not enough data")

if object_data.hpa is not None and object_data.hpa.target_cpu_utilization_percentage is not None and not self.settings.allow_hpa:
if (
object_data.hpa is not None
and object_data.hpa.target_cpu_utilization_percentage is not None
and not self.settings.allow_hpa
):
return ResourceRecommendation.undefined(info="HPA detected")

cpu_usage = self.settings.calculate_cpu_proposal(data)
Expand All @@ -102,20 +134,44 @@ def __calculate_memory_proposal(
) -> ResourceRecommendation:
data = history_data["MaxMemoryLoader"]

oomkill_detected = False

if self.settings.use_oomkill_data:
max_oomkill_data = history_data["MaxOOMKilledMemoryLoader"]
# NOTE: metrics for each pod are returned as list[values] where values is [timestamp, value]
# As MaxOOMKilledMemoryLoader returns only the last value (1 point), [0, 1] is used to get the value
# So each value is numpy array of shape (N, 2)
max_oomkill_value = (
np.max([values[0, 1] for values in max_oomkill_data.values()]) if len(max_oomkill_data) > 0 else 0
)
if max_oomkill_value != 0:
oomkill_detected = True
else:
max_oomkill_value = 0

if len(data) == 0:
return ResourceRecommendation.undefined(info="No data")

# NOTE: metrics for each pod are returned as list[values] where values is [timestamp, value]
# As MemoryAmountLoader returns only the last value (1 point), [0, 1] is used to get the value
# So each pod is string with pod name, and values is numpy array of shape (N, 2)
data_count = {pod: values[0, 1] for pod, values in history_data["MemoryAmountLoader"].items()}
total_points_count = sum(data_count.values())

if total_points_count < self.settings.points_required:
return ResourceRecommendation.undefined(info="Not enough data")

if object_data.hpa is not None and object_data.hpa.target_memory_utilization_percentage is not None and not self.settings.allow_hpa:
if (
object_data.hpa is not None
and object_data.hpa.target_memory_utilization_percentage is not None
and not self.settings.allow_hpa
):
return ResourceRecommendation.undefined(info="HPA detected")

memory_usage = self.settings.calculate_memory_proposal(data)
return ResourceRecommendation(request=memory_usage, limit=memory_usage)
memory_usage = self.settings.calculate_memory_proposal(data, max_oomkill_value)
return ResourceRecommendation(
request=memory_usage, limit=memory_usage, info="OOMKill detected" if oomkill_detected else None
)

def run(self, history_data: MetricsPodData, object_data: K8sWorkload) -> RunResult:
return {
Expand Down

0 comments on commit 09c372b

Please sign in to comment.