diff --git a/services/ske/src/stackit/ske/__init__.py b/services/ske/src/stackit/ske/__init__.py index 5c6cea22..2a0cd796 100644 --- a/services/ske/src/stackit/ske/__init__.py +++ b/services/ske/src/stackit/ske/__init__.py @@ -37,6 +37,7 @@ from stackit.ske.models.argus import Argus from stackit.ske.models.availability_zone import AvailabilityZone from stackit.ske.models.cluster import Cluster +from stackit.ske.models.cluster_error import ClusterError from stackit.ske.models.cluster_status import ClusterStatus from stackit.ske.models.cluster_status_state import ClusterStatusState from stackit.ske.models.create_kubeconfig_payload import CreateKubeconfigPayload diff --git a/services/ske/src/stackit/ske/models/__init__.py b/services/ske/src/stackit/ske/models/__init__.py index 2844c2d1..f4fa80bd 100644 --- a/services/ske/src/stackit/ske/models/__init__.py +++ b/services/ske/src/stackit/ske/models/__init__.py @@ -18,6 +18,7 @@ from stackit.ske.models.argus import Argus from stackit.ske.models.availability_zone import AvailabilityZone from stackit.ske.models.cluster import Cluster +from stackit.ske.models.cluster_error import ClusterError from stackit.ske.models.cluster_status import ClusterStatus from stackit.ske.models.cluster_status_state import ClusterStatusState from stackit.ske.models.create_kubeconfig_payload import CreateKubeconfigPayload diff --git a/services/ske/src/stackit/ske/models/cluster_error.py b/services/ske/src/stackit/ske/models/cluster_error.py new file mode 100644 index 00000000..9e0e0223 --- /dev/null +++ b/services/ske/src/stackit/ske/models/cluster_error.py @@ -0,0 +1,102 @@ +# coding: utf-8 + +""" + SKE-API + + The SKE API provides endpoints to create, update, delete clusters within STACKIT portal projects and to trigger further cluster management tasks. + + The version of the OpenAPI document: 1.1 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 docstring might be too long + +from __future__ import annotations + +import json +import pprint +from typing import Any, ClassVar, Dict, List, Optional, Set + +from pydantic import BaseModel, ConfigDict, StrictStr, field_validator +from typing_extensions import Self + + +class ClusterError(BaseModel): + """ + ClusterError + """ + + code: Optional[StrictStr] = None + message: Optional[StrictStr] = None + __properties: ClassVar[List[str]] = ["code", "message"] + + @field_validator("code") + def code_validate_enum(cls, value): + """Validates the enum""" + if value is None: + return value + + if value not in set( + [ + "SKE_OBSERVABILITY_INSTANCE_NOT_FOUND", + "SKE_DNS_ZONE_NOT_FOUND", + "SKE_NODE_MISCONFIGURED_PDB", + "SKE_NODE_NO_VALID_HOST_FOUND", + "SKE_NODE_MACHINE_TYPE_NOT_FOUND", + ] + ): + raise ValueError( + "must be one of enum values ('SKE_OBSERVABILITY_INSTANCE_NOT_FOUND', 'SKE_DNS_ZONE_NOT_FOUND', 'SKE_NODE_MISCONFIGURED_PDB', 'SKE_NODE_NO_VALID_HOST_FOUND', 'SKE_NODE_MACHINE_TYPE_NOT_FOUND')" + ) + return value + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of ClusterError from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of ClusterError from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({"code": obj.get("code"), "message": obj.get("message")}) + return _obj diff --git a/services/ske/src/stackit/ske/models/cluster_status.py b/services/ske/src/stackit/ske/models/cluster_status.py index b9eed0e8..02d1410b 100644 --- a/services/ske/src/stackit/ske/models/cluster_status.py +++ b/services/ske/src/stackit/ske/models/cluster_status.py @@ -21,6 +21,7 @@ from pydantic import BaseModel, ConfigDict, Field, StrictBool, StrictStr from typing_extensions import Self +from stackit.ske.models.cluster_error import ClusterError from stackit.ske.models.cluster_status_state import ClusterStatusState from stackit.ske.models.credentials_rotation_state import CredentialsRotationState from stackit.ske.models.runtime_error import RuntimeError @@ -42,6 +43,7 @@ class ClusterStatus(BaseModel): alias="egressAddressRanges", ) error: Optional[RuntimeError] = None + errors: Optional[List[ClusterError]] = None hibernated: Optional[StrictBool] = None __properties: ClassVar[List[str]] = [ "aggregated", @@ -49,6 +51,7 @@ class ClusterStatus(BaseModel): "credentialsRotation", "egressAddressRanges", "error", + "errors", "hibernated", ] @@ -95,6 +98,13 @@ def to_dict(self) -> Dict[str, Any]: # override the default output from pydantic by calling `to_dict()` of error if self.error: _dict["error"] = self.error.to_dict() + # override the default output from pydantic by calling `to_dict()` of each item in errors (list) + _items = [] + if self.errors: + for _item in self.errors: + if _item: + _items.append(_item.to_dict()) + _dict["errors"] = _items return _dict @classmethod @@ -119,6 +129,11 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: ), "egressAddressRanges": obj.get("egressAddressRanges"), "error": RuntimeError.from_dict(obj["error"]) if obj.get("error") is not None else None, + "errors": ( + [ClusterError.from_dict(_item) for _item in obj["errors"]] + if obj.get("errors") is not None + else None + ), "hibernated": obj.get("hibernated"), } )