From f2509935255b9b42505e08f893ee17874daae473 Mon Sep 17 00:00:00 2001 From: yogananth subramanian Date: Thu, 1 Aug 2024 17:59:15 +0530 Subject: [PATCH] Enable RBAC check This will allow krkn to check if the current user context can do something and allow us to create alternate flows in the krkn code based on the user's privilege/RBAC. Signed-off-by: yogananth subramanian --- src/krkn_lib/k8s/krkn_kubernetes.py | 42 +++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/krkn_lib/k8s/krkn_kubernetes.py b/src/krkn_lib/k8s/krkn_kubernetes.py index 04dc6b13..06a0fb1e 100644 --- a/src/krkn_lib/k8s/krkn_kubernetes.py +++ b/src/krkn_lib/k8s/krkn_kubernetes.py @@ -155,6 +155,7 @@ def __initialize_clients(self, kubeconfig_path: str = None): self.apps_api = client.AppsV1Api(self.api_client) self.batch_cli = client.BatchV1Api(self.k8s_client) self.net_cli = client.NetworkingV1Api(self.api_client) + self.auth_cli = client.AuthorizationV1Api(self.k8s_client) self.custom_object_client = client.CustomObjectsApi( self.k8s_client ) @@ -1640,6 +1641,47 @@ def check_if_pvc_exists( logging.error("Namespace '%s' doesn't exist", str(namespace)) return False + def check_rbac_access(self, resource: str, verb: str, + namespace: str = None) -> bool: + """ + Check if the current user can perform an action in the given namespace. + If namespace is not passed, check would be performed against all namespace. + + :param resource: One of the existing resource types + :param verb: Verb is a kubernetes resource API verb. + :param namespace: Namespace is the namespace of the action being requested. + :return: boolean value indicating whether + the user is allowed to do the requested action. + """ + + if namespace: + body = client.V1SelfSubjectAccessReview( + spec=client.V1SelfSubjectAccessReviewSpec( + resource_attributes=client.V1ResourceAttributes( + namespace=namespace, + resource=resource, + verb=verb + ) + ) + ) + else: + body = client.V1SelfSubjectAccessReview( + spec=client.V1SelfSubjectAccessReviewSpec( + resource_attributes=client.V1ResourceAttributes( + resource=resource, + verb=verb + ) + ) + ) + + try: + api_response = self.auth_cli.create_self_subject_access_review(body=body) + allowed=api_response.status.allowed + except ApiException as e: + logging.error("Exception when calling AuthorizationV1Api->create_self_subject_access_review: %s\n", str(e)) + + return allowed + def get_pvc_info(self, name: str, namespace: str) -> PVC: """ Retrieve information about a Persistent Volume Claim in a