Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring concept: generate typer commands from config #258

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions robusta_krr/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations

import asyncio
import inspect
import logging
from datetime import datetime
from typing import List, Optional
from uuid import UUID

import typer
import urllib3
from pydantic import ValidationError # noqa: F401
from typer.models import OptionInfo

from robusta_krr import formatters as concrete_formatters # noqa: F401
from robusta_krr.core.abstract import formatters
from robusta_krr.core.abstract.strategies import BaseStrategy
from robusta_krr.core.models.config import Config
from robusta_krr.core.runner import Runner
from robusta_krr.utils.version import get_version

app = typer.Typer(pretty_exceptions_show_locals=False, pretty_exceptions_short=True, no_args_is_help=True)

# NOTE: Disable insecure request warnings, as it might be expected to use self-signed certificates inside the cluster
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logger = logging.getLogger("krr")


def __process_type(_T: type) -> type:
"""Process type to a python literal"""
if _T in (int, float, str, bool, datetime, UUID):
return _T
elif _T is Optional:
return Optional[{__process_type(_T.__args__[0])}] # type: ignore
else:
return str # If the type is unknown, just use str and let pydantic handle it


def _add_default_settings_to_command(run_strategy):
"""Modify the signature of the run_strategy function to include the strategy settings as keyword-only arguments."""

signature = inspect.signature(run_strategy)
run_strategy.__signature__ = signature.replace( # type: ignore
parameters=list(signature.parameters.values())[:-1]
+ [
inspect.Parameter(
name=field_name,
kind=inspect.Parameter.KEYWORD_ONLY,
default=OptionInfo(
default=field_meta.default,
param_decls=field_meta.field_info.extra.get(
"typer__param_decls", list(set([f"--{field_name.replace('_', '-')}"]))
),
help=field_meta.field_info.extra.get("typer__help", f"{field_meta.field_info.description}"),
rich_help_panel=field_meta.field_info.extra.get("typer__rich_help_panel", "General Settings"),
),
annotation=__process_type(field_meta.type_),
)
for field_name, field_meta in Config.__fields__.items()
]
)


def _add_strategy_settings_to_command(run_strategy, strategy_type: BaseStrategy):
"""Modify the signature of the run_strategy function to include the strategy settings as keyword-only arguments."""

signature = inspect.signature(run_strategy)
run_strategy.__signature__ = signature.replace( # type: ignore
parameters=list(signature.parameters.values())
+ [
inspect.Parameter(
name=field_name,
kind=inspect.Parameter.KEYWORD_ONLY,
default=OptionInfo(
default=field_meta.default,
param_decls=list(set([f"--{field_name}", f"--{field_name.replace('_', '-')}"])),
help=f"{field_meta.field_info.description}",
rich_help_panel="Strategy Settings",
),
annotation=__process_type(field_meta.type_),
)
for field_name, field_meta in strategy_type.get_settings_type().__fields__.items()
]
)


def add_strategy_command_to_app(app: typer.Typer, strategy_name: str, StrategyType: BaseStrategy) -> None:
def strategy_wrapper():
def run_strategy(ctx: typer.Context, **kwargs) -> None:
f"""Run KRR using the `{strategy_name}` strategy"""

try:
config = Config(**kwargs)
strategy_settings = StrategyType.get_settings_type()(**kwargs)
except ValidationError:
logger.exception("Error occured while parsing arguments")
else:
Config.set_config(config)
strategy = StrategyType(strategy_settings)

runner = Runner(strategy)
exit_code = asyncio.run(runner.run())
raise typer.Exit(code=exit_code)

run_strategy.__name__ = strategy_name
_add_default_settings_to_command(run_strategy)
_add_strategy_settings_to_command(run_strategy, StrategyType)

app.command(rich_help_panel="Strategies")(run_strategy)

strategy_wrapper()
134 changes: 87 additions & 47 deletions robusta_krr/core/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,71 @@
from rich.logging import RichHandler

from robusta_krr.core.abstract import formatters
from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy
from robusta_krr.core.models.objects import KindLiteral

logger = logging.getLogger("krr")


class Config(pd.BaseSettings):
quiet: bool = pd.Field(False)
verbose: bool = pd.Field(False)

clusters: Union[list[str], Literal["*"], None] = None
kubeconfig: Optional[str] = None
impersonate_user: Optional[str] = None
impersonate_group: Optional[str] = None
namespaces: Union[list[str], Literal["*"]] = pd.Field("*")
resources: Union[list[KindLiteral], Literal["*"]] = pd.Field("*")
selector: Optional[str] = None
class Config(pd.BaseModel, extra=pd.Extra.ignore):
kubeconfig: Optional[str] = pd.Field(
None,
typer__param_decls=["--kubeconfig", "-k"],
typer__help="Path to kubeconfig file. If not provided, will attempt to find it.",
typer__rich_help_panel="Kubernetes Settings",
)
impersonate_user: Optional[str] = pd.Field(
None,
typer__param_decls=["--as"],
typer__help="Impersonate a user, just like `kubectl --as`. For example, system:serviceaccount:default:krr-account.",
typer__rich_help_panel="Kubernetes Settings",
)
impersonate_group: Optional[str] = pd.Field(
None,
typer__param_decls=["--as-group"],
typer__help="Impersonate a user inside of a group, just like `kubectl --as-group`. For example, system:authenticated.",
typer__rich_help_panel="Kubernetes Settings",
)
clusters: Union[list[str], Literal["*"], None] = pd.Field(
None,
typer__param_decls=["--context", "--cluster", "-c"],
typer__help=(
"List of clusters to run on. By default, will run on the current cluster. "
"Use --all-clusters to run on all clusters."
),
typer__rich_help_panel="Kubernetes Settings",
)
all_clusters: bool = pd.Field(
False,
typer__param_decls=["--all-clusters"],
typer__help=("Run on all clusters. Overrides --context."),
typer__rich_help_panel="Kubernetes Settings",
)
namespaces: Union[list[str], Literal["*"]] = pd.Field(
"*",
typer__param_decls=["--namespace", "-n"],
typer__help=("List of namespaces to run on. By default, will run on all namespaces except 'kube-system'."),
typer__rich_help_panel="Kubernetes Settings",
)
resources: Union[list[KindLiteral], Literal["*"]] = pd.Field(
"*",
typer__param_decls=["--resource", "-r"],
typer__help=(
"List of resources to run on (Deployment, StatefulSet, DaemonSet, Job, Rollout). "
"By default, will run on all resources. Case insensitive."
),
typer__rich_help_panel="Kubernetes Settings",
)
selector: Optional[str] = pd.Field(
None,
typer__param_decls=["--context", "--cluster", "-c"],
typer__help=(
"List of clusters to run on. By default, will run on the current cluster. "
"Use --all-clusters to run on all clusters."
),
typer__rich_help_panel="Kubernetes Settings",
)

# TODO: Other ones

# Value settings
cpu_min_value: int = pd.Field(10, ge=0) # in millicores
Expand All @@ -53,18 +101,17 @@ class Config(pd.BaseSettings):
max_workers: int = pd.Field(6, ge=1)

# Logging Settings
format: str
show_cluster_name: bool
strategy: str
log_to_stderr: bool
format: str = pd.Field("table")
show_cluster_name: bool = pd.Field(False)
log_to_stderr: bool = pd.Field(False)
width: Optional[int] = pd.Field(None, ge=1)
quiet: bool = pd.Field(False)
verbose: bool = pd.Field(False)

# Outputs Settings
file_output: Optional[str] = pd.Field(None)
slack_output: Optional[str] = pd.Field(None)

other_args: dict[str, Any]

# Internal
inside_cluster: bool = False
_logging_console: Optional[Console] = pd.PrivateAttr(None)
Expand All @@ -76,20 +123,23 @@ def __init__(self, **kwargs: Any) -> None:
def Formatter(self) -> formatters.FormatterFunc:
return formatters.find(self.format)

@pd.validator("prometheus_url")
def validate_prometheus_url(cls, v: Optional[str]):
if v is None:
return None
# @pd.validator("prometheus_url")
# def validate_prometheus_url(cls, v: Optional[str]):
# if v is None:
# return None

if not v.startswith("https://") and not v.startswith("http://"):
raise Exception("--prometheus-url must start with https:// or http://")
# if not v.startswith("https://") and not v.startswith("http://"):
# raise Exception("--prometheus-url must start with https:// or http://")

v = v.removesuffix("/")
# v = v.removesuffix("/")

return v
# return v

@pd.validator("prometheus_other_headers", pre=True)
def validate_prometheus_other_headers(cls, headers: Union[list[str], dict[str, str]]) -> dict[str, str]:
if headers is None:
return {}

if isinstance(headers, dict):
return headers

Expand All @@ -102,29 +152,19 @@ def validate_namespaces(cls, v: Union[list[str], Literal["*"]]) -> Union[list[st

return [val.lower() for val in v]

@pd.validator("resources", pre=True)
def validate_resources(cls, v: Union[list[str], Literal["*"]]) -> Union[list[str], Literal["*"]]:
if v == []:
return "*"

# NOTE: KindLiteral.__args__ is a tuple of all possible values of KindLiteral
# So this will preserve the big and small letters of the resource
return [next(r for r in KindLiteral.__args__ if r.lower() == val.lower()) for val in v]

def create_strategy(self) -> AnyStrategy:
StrategyType = AnyStrategy.find(self.strategy)
StrategySettingsType = StrategyType.get_settings_type()
return StrategyType(StrategySettingsType(**self.other_args)) # type: ignore
# @pd.validator("resources", pre=True)
# def validate_resources(cls, v: Union[list[str], Literal["*"]]) -> Union[list[str], Literal["*"]]:
# if v == []:
# return "*"

@pd.validator("strategy")
def validate_strategy(cls, v: str) -> str:
BaseStrategy.find(v) # NOTE: raises if strategy is not found
return v
# # NOTE: KindLiteral.__args__ is a tuple of all possible values of KindLiteral
# # So this will preserve the big and small letters of the resource
# return [next(r for r in KindLiteral.__args__ if r.lower() == val.lower()) for val in v]

@pd.validator("format")
def validate_format(cls, v: str) -> str:
formatters.find(v) # NOTE: raises if strategy is not found
return v
# @pd.validator("format")
# def validate_format(cls, v: str) -> str:
# formatters.find(v) # NOTE: raises if formatter is not found
# return v

@property
def context(self) -> Optional[str]:
Expand Down
6 changes: 3 additions & 3 deletions robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from rich.console import Console
from slack_sdk import WebClient

from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult
from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult, BaseStrategy
from robusta_krr.core.integrations.kubernetes import KubernetesLoader
from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader
from robusta_krr.core.models.config import settings
Expand Down Expand Up @@ -39,11 +39,11 @@ class CriticalRunnerException(Exception): ...
class Runner:
EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound)

def __init__(self) -> None:
def __init__(self, strategy: BaseStrategy) -> None:
self._k8s_loader = KubernetesLoader()
self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {}
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._strategy = settings.create_strategy()
self._strategy = strategy

self.errors: list[dict] = []

Expand Down
Loading
Loading