From 78493bfa6e896c70d42bc0c58efb56b507249de7 Mon Sep 17 00:00:00 2001 From: yogananth subramanian Date: Thu, 1 Aug 2024 17:59:15 +0530 Subject: [PATCH] Enable RBAC check This will allow krkn to check if the current user context can do something and allow us to create alternate flows in the krkn code based on the user's privilege/RBAC. Signed-off-by: yogananth subramanian --- src/krkn_lib/k8s/krkn_kubernetes.py | 45 +++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/src/krkn_lib/k8s/krkn_kubernetes.py b/src/krkn_lib/k8s/krkn_kubernetes.py index 04dc6b13..bdd0659d 100644 --- a/src/krkn_lib/k8s/krkn_kubernetes.py +++ b/src/krkn_lib/k8s/krkn_kubernetes.py @@ -155,6 +155,7 @@ def __initialize_clients(self, kubeconfig_path: str = None): self.apps_api = client.AppsV1Api(self.api_client) self.batch_cli = client.BatchV1Api(self.k8s_client) self.net_cli = client.NetworkingV1Api(self.api_client) + self.auth_cli = client.AuthorizationV1Api(self.k8s_client) self.custom_object_client = client.CustomObjectsApi( self.k8s_client ) @@ -1640,6 +1641,50 @@ def check_if_pvc_exists( logging.error("Namespace '%s' doesn't exist", str(namespace)) return False + def check_rbac_access(self, resource: str, verb: str, + namespace: str = None) -> bool: + """ + Check if the current user can perform an action in the given namespace. + If namespace is not passed, check would be performed against all + namespace. + + :param resource: One of the existing resource types + :param verb: Verb is a kubernetes resource API verb. + :param namespace: Namespace is the namespace of the action being + requested. + :return: boolean value indicating whether + the user is allowed to do the requested action. + """ + if namespace: + body = client.V1SelfSubjectAccessReview( + spec=client.V1SelfSubjectAccessReviewSpec( + resource_attributes=client.V1ResourceAttributes( + namespace=namespace, + resource=resource, + verb=verb + ) + ) + ) + else: + body = client.V1SelfSubjectAccessReview( + spec=client.V1SelfSubjectAccessReviewSpec( + resource_attributes=client.V1ResourceAttributes( + resource=resource, + verb=verb + ) + ) + ) + try: + api_response = self.auth_cli.create_self_subject_access_review( + body=body) + allowed = api_response.status.allowed + except ApiException as e: + logging.error( + "Exception when calling" + "AuthorizationV1Api->create_self_subject_access_review: %s\n", + str(e)) + return allowed + def get_pvc_info(self, name: str, namespace: str) -> PVC: """ Retrieve information about a Persistent Volume Claim in a