diff --git a/src/krkn_lib/k8s/krkn_kubernetes.py b/src/krkn_lib/k8s/krkn_kubernetes.py index 16d90a9a..a3cce65b 100644 --- a/src/krkn_lib/k8s/krkn_kubernetes.py +++ b/src/krkn_lib/k8s/krkn_kubernetes.py @@ -157,6 +157,7 @@ def __initialize_clients(self, kubeconfig_path: str = None): self.apps_api = client.AppsV1Api(self.api_client) self.batch_cli = client.BatchV1Api(self.k8s_client) self.net_cli = client.NetworkingV1Api(self.api_client) + self.auth_cli = client.AuthorizationV1Api(self.k8s_client) self.custom_object_client = client.CustomObjectsApi( self.k8s_client ) @@ -1651,6 +1652,50 @@ def check_if_pvc_exists( logging.error("Namespace '%s' doesn't exist", str(namespace)) return False + def check_rbac_access(self, resource: str, verb: str, + namespace: str = None) -> bool: + """ + Check if the current user can perform an action in the given namespace. + If namespace is not passed, check would be performed against all + namespace. + + :param resource: One of the existing resource types + :param verb: Verb is a kubernetes resource API verb. + :param namespace: Namespace is the namespace of the action being + requested. + :return: boolean value indicating whether + the user is allowed to do the requested action. + """ + if namespace: + body = client.V1SelfSubjectAccessReview( + spec=client.V1SelfSubjectAccessReviewSpec( + resource_attributes=client.V1ResourceAttributes( + namespace=namespace, + resource=resource, + verb=verb + ) + ) + ) + else: + body = client.V1SelfSubjectAccessReview( + spec=client.V1SelfSubjectAccessReviewSpec( + resource_attributes=client.V1ResourceAttributes( + resource=resource, + verb=verb + ) + ) + ) + try: + api_response = self.auth_cli.create_self_subject_access_review( + body=body) + allowed = api_response.status.allowed + except ApiException as e: + logging.error( + "Exception when calling" + "AuthorizationV1Api->create_self_subject_access_review: %s\n", + str(e)) + return allowed + def get_pvc_info(self, name: str, namespace: str) -> PVC: """ Retrieve information about a Persistent Volume Claim in a diff --git a/src/krkn_lib/tests/test_krkn_kubernetes.py b/src/krkn_lib/tests/test_krkn_kubernetes.py index a78a31ec..2546b01a 100644 --- a/src/krkn_lib/tests/test_krkn_kubernetes.py +++ b/src/krkn_lib/tests/test_krkn_kubernetes.py @@ -612,6 +612,18 @@ def test_get_pod_info(self): logging.error("test raised exception {0}".format(str(e))) self.assertTrue(False) + + def test_check_rbac_access(self): + try: + namespace = "test-ns-" + self.get_random_string(10) + self.deploy_namespace(namespace, []) + self.assertTrue(self.lib_k8s.check_rbac_access('pod', 'get', namespace)) + self.assertTrue(self.lib_k8s.check_rbac_access('services', 'create', 'default')) + self.assertTrue(self.lib_k8s.check_rbac_access('secretes', 'delete', namespace)) + except Exception as e: + logging.error("test raised exception {0}".format(str(e))) + self.assertTrue(False) + def test_check_if_namespace_exists(self): try: namespace = "test-ns-" + self.get_random_string(10)