From b5eee438ad0ffc7ea4486107dcea7e13f20d30b6 Mon Sep 17 00:00:00 2001 From: LeaveMyYard Date: Thu, 4 Apr 2024 19:07:38 +0300 Subject: [PATCH] Refactoring concept: generate typer commands from config --- robusta_krr/cli.py | 112 ++++++++++++ robusta_krr/core/models/config.py | 134 +++++++++----- robusta_krr/core/runner.py | 6 +- robusta_krr/main.py | 284 +----------------------------- 4 files changed, 205 insertions(+), 331 deletions(-) create mode 100644 robusta_krr/cli.py diff --git a/robusta_krr/cli.py b/robusta_krr/cli.py new file mode 100644 index 00000000..405c4be8 --- /dev/null +++ b/robusta_krr/cli.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import asyncio +import inspect +import logging +from datetime import datetime +from typing import List, Optional +from uuid import UUID + +import typer +import urllib3 +from pydantic import ValidationError # noqa: F401 +from typer.models import OptionInfo + +from robusta_krr import formatters as concrete_formatters # noqa: F401 +from robusta_krr.core.abstract import formatters +from robusta_krr.core.abstract.strategies import BaseStrategy +from robusta_krr.core.models.config import Config +from robusta_krr.core.runner import Runner +from robusta_krr.utils.version import get_version + +app = typer.Typer(pretty_exceptions_show_locals=False, pretty_exceptions_short=True, no_args_is_help=True) + +# NOTE: Disable insecure request warnings, as it might be expected to use self-signed certificates inside the cluster +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +logger = logging.getLogger("krr") + + +def __process_type(_T: type) -> type: + """Process type to a python literal""" + if _T in (int, float, str, bool, datetime, UUID): + return _T + elif _T is Optional: + return Optional[{__process_type(_T.__args__[0])}] # type: ignore + else: + return str # If the type is unknown, just use str and let pydantic handle it + + +def _add_default_settings_to_command(run_strategy): + """Modify the signature of the run_strategy function to include the strategy settings as keyword-only arguments.""" + + signature = inspect.signature(run_strategy) + run_strategy.__signature__ = signature.replace( # type: ignore + parameters=list(signature.parameters.values())[:-1] + + [ + inspect.Parameter( + name=field_name, + kind=inspect.Parameter.KEYWORD_ONLY, + default=OptionInfo( + default=field_meta.default, + param_decls=field_meta.field_info.extra.get( + "typer__param_decls", list(set([f"--{field_name.replace('_', '-')}"])) + ), + help=field_meta.field_info.extra.get("typer__help", f"{field_meta.field_info.description}"), + rich_help_panel=field_meta.field_info.extra.get("typer__rich_help_panel", "General Settings"), + ), + annotation=__process_type(field_meta.type_), + ) + for field_name, field_meta in Config.__fields__.items() + ] + ) + + +def _add_strategy_settings_to_command(run_strategy, strategy_type: BaseStrategy): + """Modify the signature of the run_strategy function to include the strategy settings as keyword-only arguments.""" + + signature = inspect.signature(run_strategy) + run_strategy.__signature__ = signature.replace( # type: ignore + parameters=list(signature.parameters.values()) + + [ + inspect.Parameter( + name=field_name, + kind=inspect.Parameter.KEYWORD_ONLY, + default=OptionInfo( + default=field_meta.default, + param_decls=list(set([f"--{field_name}", f"--{field_name.replace('_', '-')}"])), + help=f"{field_meta.field_info.description}", + rich_help_panel="Strategy Settings", + ), + annotation=__process_type(field_meta.type_), + ) + for field_name, field_meta in strategy_type.get_settings_type().__fields__.items() + ] + ) + + +def add_strategy_command_to_app(app: typer.Typer, strategy_name: str, StrategyType: BaseStrategy) -> None: + def strategy_wrapper(): + def run_strategy(ctx: typer.Context, **kwargs) -> None: + f"""Run KRR using the `{strategy_name}` strategy""" + + try: + config = Config(**kwargs) + strategy_settings = StrategyType.get_settings_type()(**kwargs) + except ValidationError: + logger.exception("Error occured while parsing arguments") + else: + Config.set_config(config) + strategy = StrategyType(strategy_settings) + + runner = Runner(strategy) + exit_code = asyncio.run(runner.run()) + raise typer.Exit(code=exit_code) + + run_strategy.__name__ = strategy_name + _add_default_settings_to_command(run_strategy) + _add_strategy_settings_to_command(run_strategy, StrategyType) + + app.command(rich_help_panel="Strategies")(run_strategy) + + strategy_wrapper() diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index ff6142a6..360e31eb 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -11,23 +11,71 @@ from rich.logging import RichHandler from robusta_krr.core.abstract import formatters -from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy from robusta_krr.core.models.objects import KindLiteral logger = logging.getLogger("krr") -class Config(pd.BaseSettings): - quiet: bool = pd.Field(False) - verbose: bool = pd.Field(False) - - clusters: Union[list[str], Literal["*"], None] = None - kubeconfig: Optional[str] = None - impersonate_user: Optional[str] = None - impersonate_group: Optional[str] = None - namespaces: Union[list[str], Literal["*"]] = pd.Field("*") - resources: Union[list[KindLiteral], Literal["*"]] = pd.Field("*") - selector: Optional[str] = None +class Config(pd.BaseModel, extra=pd.Extra.ignore): + kubeconfig: Optional[str] = pd.Field( + None, + typer__param_decls=["--kubeconfig", "-k"], + typer__help="Path to kubeconfig file. If not provided, will attempt to find it.", + typer__rich_help_panel="Kubernetes Settings", + ) + impersonate_user: Optional[str] = pd.Field( + None, + typer__param_decls=["--as"], + typer__help="Impersonate a user, just like `kubectl --as`. For example, system:serviceaccount:default:krr-account.", + typer__rich_help_panel="Kubernetes Settings", + ) + impersonate_group: Optional[str] = pd.Field( + None, + typer__param_decls=["--as-group"], + typer__help="Impersonate a user inside of a group, just like `kubectl --as-group`. For example, system:authenticated.", + typer__rich_help_panel="Kubernetes Settings", + ) + clusters: Union[list[str], Literal["*"], None] = pd.Field( + None, + typer__param_decls=["--context", "--cluster", "-c"], + typer__help=( + "List of clusters to run on. By default, will run on the current cluster. " + "Use --all-clusters to run on all clusters." + ), + typer__rich_help_panel="Kubernetes Settings", + ) + all_clusters: bool = pd.Field( + False, + typer__param_decls=["--all-clusters"], + typer__help=("Run on all clusters. Overrides --context."), + typer__rich_help_panel="Kubernetes Settings", + ) + namespaces: Union[list[str], Literal["*"]] = pd.Field( + "*", + typer__param_decls=["--namespace", "-n"], + typer__help=("List of namespaces to run on. By default, will run on all namespaces except 'kube-system'."), + typer__rich_help_panel="Kubernetes Settings", + ) + resources: Union[list[KindLiteral], Literal["*"]] = pd.Field( + "*", + typer__param_decls=["--resource", "-r"], + typer__help=( + "List of resources to run on (Deployment, StatefulSet, DaemonSet, Job, Rollout). " + "By default, will run on all resources. Case insensitive." + ), + typer__rich_help_panel="Kubernetes Settings", + ) + selector: Optional[str] = pd.Field( + None, + typer__param_decls=["--context", "--cluster", "-c"], + typer__help=( + "List of clusters to run on. By default, will run on the current cluster. " + "Use --all-clusters to run on all clusters." + ), + typer__rich_help_panel="Kubernetes Settings", + ) + + # TODO: Other ones # Value settings cpu_min_value: int = pd.Field(10, ge=0) # in millicores @@ -53,18 +101,17 @@ class Config(pd.BaseSettings): max_workers: int = pd.Field(6, ge=1) # Logging Settings - format: str - show_cluster_name: bool - strategy: str - log_to_stderr: bool + format: str = pd.Field("table") + show_cluster_name: bool = pd.Field(False) + log_to_stderr: bool = pd.Field(False) width: Optional[int] = pd.Field(None, ge=1) + quiet: bool = pd.Field(False) + verbose: bool = pd.Field(False) # Outputs Settings file_output: Optional[str] = pd.Field(None) slack_output: Optional[str] = pd.Field(None) - other_args: dict[str, Any] - # Internal inside_cluster: bool = False _logging_console: Optional[Console] = pd.PrivateAttr(None) @@ -76,20 +123,23 @@ def __init__(self, **kwargs: Any) -> None: def Formatter(self) -> formatters.FormatterFunc: return formatters.find(self.format) - @pd.validator("prometheus_url") - def validate_prometheus_url(cls, v: Optional[str]): - if v is None: - return None + # @pd.validator("prometheus_url") + # def validate_prometheus_url(cls, v: Optional[str]): + # if v is None: + # return None - if not v.startswith("https://") and not v.startswith("http://"): - raise Exception("--prometheus-url must start with https:// or http://") + # if not v.startswith("https://") and not v.startswith("http://"): + # raise Exception("--prometheus-url must start with https:// or http://") - v = v.removesuffix("/") + # v = v.removesuffix("/") - return v + # return v @pd.validator("prometheus_other_headers", pre=True) def validate_prometheus_other_headers(cls, headers: Union[list[str], dict[str, str]]) -> dict[str, str]: + if headers is None: + return {} + if isinstance(headers, dict): return headers @@ -102,29 +152,19 @@ def validate_namespaces(cls, v: Union[list[str], Literal["*"]]) -> Union[list[st return [val.lower() for val in v] - @pd.validator("resources", pre=True) - def validate_resources(cls, v: Union[list[str], Literal["*"]]) -> Union[list[str], Literal["*"]]: - if v == []: - return "*" - - # NOTE: KindLiteral.__args__ is a tuple of all possible values of KindLiteral - # So this will preserve the big and small letters of the resource - return [next(r for r in KindLiteral.__args__ if r.lower() == val.lower()) for val in v] - - def create_strategy(self) -> AnyStrategy: - StrategyType = AnyStrategy.find(self.strategy) - StrategySettingsType = StrategyType.get_settings_type() - return StrategyType(StrategySettingsType(**self.other_args)) # type: ignore + # @pd.validator("resources", pre=True) + # def validate_resources(cls, v: Union[list[str], Literal["*"]]) -> Union[list[str], Literal["*"]]: + # if v == []: + # return "*" - @pd.validator("strategy") - def validate_strategy(cls, v: str) -> str: - BaseStrategy.find(v) # NOTE: raises if strategy is not found - return v + # # NOTE: KindLiteral.__args__ is a tuple of all possible values of KindLiteral + # # So this will preserve the big and small letters of the resource + # return [next(r for r in KindLiteral.__args__ if r.lower() == val.lower()) for val in v] - @pd.validator("format") - def validate_format(cls, v: str) -> str: - formatters.find(v) # NOTE: raises if strategy is not found - return v + # @pd.validator("format") + # def validate_format(cls, v: str) -> str: + # formatters.find(v) # NOTE: raises if formatter is not found + # return v @property def context(self) -> Optional[str]: diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 8e08521c..bf165dc2 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -11,7 +11,7 @@ from rich.console import Console from slack_sdk import WebClient -from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult +from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult, BaseStrategy from robusta_krr.core.integrations.kubernetes import KubernetesLoader from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader from robusta_krr.core.models.config import settings @@ -39,11 +39,11 @@ class CriticalRunnerException(Exception): ... class Runner: EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) - def __init__(self) -> None: + def __init__(self, strategy: BaseStrategy) -> None: self._k8s_loader = KubernetesLoader() self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {} self._metrics_service_loaders_error_logged: set[Exception] = set() - self._strategy = settings.create_strategy() + self._strategy = strategy self.errors: list[dict] = [] diff --git a/robusta_krr/main.py b/robusta_krr/main.py index 5c2d01aa..1d2814ff 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -19,6 +19,8 @@ from robusta_krr.core.runner import Runner from robusta_krr.utils.version import get_version +from .cli import add_strategy_command_to_app + app = typer.Typer(pretty_exceptions_show_locals=False, pretty_exceptions_short=True, no_args_is_help=True) # NOTE: Disable insecure request warnings, as it might be expected to use self-signed certificates inside the cluster @@ -32,289 +34,9 @@ def version() -> None: typer.echo(get_version()) -def __process_type(_T: type) -> type: - """Process type to a python literal""" - if _T in (int, float, str, bool, datetime, UUID): - return _T - elif _T is Optional: - return Optional[{__process_type(_T.__args__[0])}] # type: ignore - else: - return str # If the type is unknown, just use str and let pydantic handle it - - def load_commands() -> None: for strategy_name, strategy_type in BaseStrategy.get_all().items(): # type: ignore - # NOTE: This wrapper here is needed to avoid the strategy_name being overwritten in the loop - def strategy_wrapper(_strategy_name: str = strategy_name): - def run_strategy( - ctx: typer.Context, - kubeconfig: Optional[str] = typer.Option( - None, - "--kubeconfig", - "-k", - help="Path to kubeconfig file. If not provided, will attempt to find it.", - rich_help_panel="Kubernetes Settings", - ), - impersonate_user: Optional[str] = typer.Option( - None, - "--as", - help="Impersonate a user, just like `kubectl --as`. For example, system:serviceaccount:default:krr-account.", - rich_help_panel="Kubernetes Settings", - ), - impersonate_group: Optional[str] = typer.Option( - None, - "--as-group", - help="Impersonate a user inside of a group, just like `kubectl --as-group`. For example, system:authenticated.", - rich_help_panel="Kubernetes Settings", - ), - clusters: List[str] = typer.Option( - None, - "--context", - "--cluster", - "-c", - help="List of clusters to run on. By default, will run on the current cluster. Use --all-clusters to run on all clusters.", - rich_help_panel="Kubernetes Settings", - ), - all_clusters: bool = typer.Option( - False, - "--all-clusters", - help="Run on all clusters. Overrides --context.", - rich_help_panel="Kubernetes Settings", - ), - namespaces: List[str] = typer.Option( - None, - "--namespace", - "-n", - help="List of namespaces to run on. By default, will run on all namespaces except 'kube-system'.", - rich_help_panel="Kubernetes Settings", - ), - resources: List[str] = typer.Option( - None, - "--resource", - "-r", - help="List of resources to run on (Deployment, StatefulSet, DaemonSet, Job, Rollout). By default, will run on all resources. Case insensitive.", - rich_help_panel="Kubernetes Settings", - ), - selector: Optional[str] = typer.Option( - None, - "--selector", - "-s", - help="Selector (label query) to filter on, supports '=', '==', and '!='.(e.g. -s key1=value1,key2=value2). Matching objects must satisfy all of the specified label constraints.", - rich_help_panel="Kubernetes Settings", - ), - prometheus_url: Optional[str] = typer.Option( - None, - "--prometheus-url", - "-p", - help="Prometheus URL. If not provided, will attempt to find it in kubernetes cluster", - rich_help_panel="Prometheus Settings", - ), - prometheus_auth_header: Optional[str] = typer.Option( - None, - "--prometheus-auth-header", - help="Prometheus authentication header.", - rich_help_panel="Prometheus Settings", - ), - prometheus_other_headers: Optional[List[str]] = typer.Option( - None, - "--prometheus-headers", - "-H", - help="Additional headers to add to Prometheus requests. Format as 'key: value', for example 'X-MyHeader: 123'. Trailing whitespaces will be stripped.", - rich_help_panel="Prometheus Settings", - ), - prometheus_ssl_enabled: bool = typer.Option( - False, - "--prometheus-ssl-enabled", - help="Enable SSL for Prometheus requests.", - rich_help_panel="Prometheus Settings", - ), - prometheus_cluster_label: Optional[str] = typer.Option( - None, - "--prometheus-cluster-label", - "-l", - help="The label in prometheus for your cluster.(Only relevant for centralized prometheus)", - rich_help_panel="Prometheus Settings", - ), - prometheus_label: str = typer.Option( - None, - "--prometheus-label", - help="The label in prometheus used to differentiate clusters. (Only relevant for centralized prometheus)", - rich_help_panel="Prometheus Settings", - ), - eks_managed_prom: bool = typer.Option( - False, - "--eks-managed-prom", - help="Adds additional signitures for eks prometheus connection.", - rich_help_panel="Prometheus EKS Settings", - ), - eks_managed_prom_profile_name: Optional[str] = typer.Option( - None, - "--eks-profile-name", - help="Sets the profile name for eks prometheus connection.", - rich_help_panel="Prometheus EKS Settings", - ), - eks_access_key: Optional[str] = typer.Option( - None, - "--eks-access-key", - help="Sets the access key for eks prometheus connection.", - rich_help_panel="Prometheus EKS Settings", - ), - eks_secret_key: Optional[str] = typer.Option( - None, - "--eks-secret-key", - help="Sets the secret key for eks prometheus connection.", - rich_help_panel="Prometheus EKS Settings", - ), - eks_service_name: Optional[str] = typer.Option( - "aps", - "--eks-service-name", - help="Sets the service name for eks prometheus connection.", - rich_help_panel="Prometheus EKS Settings", - ), - eks_managed_prom_region: Optional[str] = typer.Option( - None, - "--eks-managed-prom-region", - help="Sets the region for eks prometheus connection.", - rich_help_panel="Prometheus EKS Settings", - ), - coralogix_token: Optional[str] = typer.Option( - None, - "--coralogix-token", - help="Adds the token needed to query Coralogix managed prometheus.", - rich_help_panel="Prometheus Coralogix Settings", - ), - openshift: bool = typer.Option( - False, - "--openshift", - help="Used when running by Robusta inside an OpenShift cluster.", - rich_help_panel="Prometheus Openshift Settings", - hidden=True, - ), - cpu_min_value: int = typer.Option( - 10, - "--cpu-min", - help="Sets the minimum recommended cpu value in millicores.", - rich_help_panel="Recommendation Settings", - ), - memory_min_value: int = typer.Option( - 100, - "--mem-min", - help="Sets the minimum recommended memory value in MB.", - rich_help_panel="Recommendation Settings", - ), - max_workers: int = typer.Option( - 10, - "--max-workers", - "-w", - help="Max workers to use for async requests.", - rich_help_panel="Threading Settings", - ), - format: str = typer.Option( - "table", - "--formatter", - "-f", - help=f"Output formatter ({', '.join(formatters.list_available())})", - rich_help_panel="Logging Settings", - ), - show_cluster_name: bool = typer.Option( - False, "--show-cluster-name", help="In table output, always show the cluster name even for a single cluster", rich_help_panel="Output Settings" - ), - verbose: bool = typer.Option( - False, "--verbose", "-v", help="Enable verbose mode", rich_help_panel="Logging Settings" - ), - quiet: bool = typer.Option( - False, "--quiet", "-q", help="Enable quiet mode", rich_help_panel="Logging Settings" - ), - log_to_stderr: bool = typer.Option( - False, "--logtostderr", help="Pass logs to stderr", rich_help_panel="Logging Settings" - ), - width: Optional[int] = typer.Option( - None, - "--width", - help="Width of the output. Will use console width by default.", - rich_help_panel="Logging Settings", - ), - file_output: Optional[str] = typer.Option( - None, "--fileoutput", help="Print the output to a file", rich_help_panel="Output Settings" - ), - slack_output: Optional[str] = typer.Option( - None, - "--slackoutput", - help="Send to output to a slack channel, must have SLACK_BOT_TOKEN", - rich_help_panel="Output Settings", - ), - **strategy_args, - ) -> None: - f"""Run KRR using the `{_strategy_name}` strategy""" - - try: - config = Config( - kubeconfig=kubeconfig, - impersonate_user=impersonate_user, - impersonate_group=impersonate_group, - clusters="*" if all_clusters else clusters, - namespaces="*" if "*" in namespaces else namespaces, - resources="*" if "*" in resources else resources, - selector=selector, - prometheus_url=prometheus_url, - prometheus_auth_header=prometheus_auth_header, - prometheus_other_headers=prometheus_other_headers, - prometheus_ssl_enabled=prometheus_ssl_enabled, - prometheus_cluster_label=prometheus_cluster_label, - prometheus_label=prometheus_label, - eks_managed_prom=eks_managed_prom, - eks_managed_prom_region=eks_managed_prom_region, - eks_managed_prom_profile_name=eks_managed_prom_profile_name, - eks_access_key=eks_access_key, - eks_secret_key=eks_secret_key, - eks_service_name=eks_service_name, - coralogix_token=coralogix_token, - openshift=openshift, - max_workers=max_workers, - format=format, - show_cluster_name=show_cluster_name, - verbose=verbose, - cpu_min_value=cpu_min_value, - memory_min_value=memory_min_value, - quiet=quiet, - log_to_stderr=log_to_stderr, - width=width, - file_output=file_output, - slack_output=slack_output, - strategy=_strategy_name, - other_args=strategy_args, - ) - Config.set_config(config) - except ValidationError: - logger.exception("Error occured while parsing arguments") - else: - runner = Runner() - exit_code = asyncio.run(runner.run()) - raise typer.Exit(code=exit_code) - - run_strategy.__name__ = strategy_name - signature = inspect.signature(run_strategy) - run_strategy.__signature__ = signature.replace( # type: ignore - parameters=list(signature.parameters.values())[:-1] - + [ - inspect.Parameter( - name=field_name, - kind=inspect.Parameter.KEYWORD_ONLY, - default=OptionInfo( - default=field_meta.default, - param_decls=list(set([f"--{field_name}", f"--{field_name.replace('_', '-')}"])), - help=f"{field_meta.field_info.description}", - rich_help_panel="Strategy Settings", - ), - annotation=__process_type(field_meta.type_), - ) - for field_name, field_meta in strategy_type.get_settings_type().__fields__.items() - ] - ) - - app.command(rich_help_panel="Strategies")(run_strategy) - - strategy_wrapper() + add_strategy_command_to_app(app, strategy_name, strategy_type) def run() -> None: