diff --git a/kubeflow/fairing/kubernetes/manager.py b/kubeflow/fairing/kubernetes/manager.py index 335d481c..71bb5e7c 100644 --- a/kubeflow/fairing/kubernetes/manager.py +++ b/kubeflow/fairing/kubernetes/manager.py @@ -1,5 +1,6 @@ import logging import retrying +import yaml from kubernetes import client, config, watch from kfserving import KFServingClient @@ -7,8 +8,9 @@ from kubeflow.tfjob import TFJobClient from kubeflow.pytorchjob import PyTorchJobClient -from kubeflow.fairing.utils import is_running_in_k8s +from kubeflow.fairing.utils import is_running_in_k8s, camel_to_snake from kubeflow.fairing.constants import constants +from kubeflow.fairing import utils logger = logging.getLogger(__name__) @@ -331,3 +333,89 @@ def log(self, name, namespace, selectors=None, container='', follow=True): print(chunk.rstrip().decode('utf8')) finally: tail.release_conn() + + + def apply_namespaced_object(self, spec, mode='create'): #pylint:disable=too-many-branches + """Run apply on the provided Kubernetes specs. + + :param specs: The YAML specs to apply. + :param mode: 4 valid modes: create, patch, replace and delete. + :returns: applied resources. + """ + + if mode not in ['create', 'patch', 'replace', 'delete']: + raise ValueError("Unknown mode %s, " + "valid modes: create, patch, replace and delete." % mode) + + if not isinstance(spec, dict): + spec = yaml.load(spec) + + try: + namespace = spec["metadata"]["namespace"] + except KeyError: + namespace = utils.get_default_target_namespace() + + kind = spec["kind"] + kind_snake = camel_to_snake(kind) + plural = spec["kind"].lower() + "s" + + if mode in ['patch', 'replace', 'delete']: + try: + name = spec["metadata"]["name"] + except Exception: + raise RuntimeError("Cannot get the name in the spec for the operation %s." % mode) + + if not "/" in spec["apiVersion"]: + group = None + else: + group, version = spec["apiVersion"].split("/", 1) + + if group is None or group.lower() == "apps": + if group is None: + api = client.CoreV1Api() + else: + api = client.AppsV1Api() + method_name = mode + "_namespaced_" + kind_snake + if mode == 'create': + method_args = [namespace, spec] + elif mode == 'delete': + method_args = [name, namespace] + else: + method_args = [name, namespace, spec] + else: + api = client.CustomObjectsApi() + method_name = mode + "_namespaced_custom_object" + + if mode == 'create': + method_args = [group, version, namespace, plural, spec] + elif mode == 'delete': + method_args = [group, version, namespace, plural, name, client.V1DeleteOptions()] + else: + method_args = [group, version, namespace, plural, name, spec] + + apply_method = getattr(api, method_name) + + try: + result = apply_method(*method_args) + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling %s->%s: %s\n" % api, apply_method, e) + + return result + + + def apply_namespaced_objects(self, specs, mode='create'): + """Run apply on the provided Kubernetes specs. + + :param specs: A list of strings or dicts providing the YAML specs to apply. + :param mode: 4 valid modes: create, patch, replace and delete. + :returns: + + """ + results = [] + + for spec in specs: + result = self.apply_namespaced_object(spec, mode=mode) + results.append(result) + + return results diff --git a/kubeflow/fairing/utils.py b/kubeflow/fairing/utils.py index 69731906..f7cf21c8 100644 --- a/kubeflow/fairing/utils.py +++ b/kubeflow/fairing/utils.py @@ -1,7 +1,7 @@ import os import zlib import uuid - +import re def get_image(repository, name): """Get the full image name by integrating repository and image name. @@ -47,3 +47,10 @@ def crc(file_name): def random_tag(): """Get a random tag.""" return str(uuid.uuid4()).split('-')[0] + +def camel_to_snake(name): + """ + Converts a string that is camelCase into snake_case + """ + name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) + return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower() diff --git a/tests/integration/kubernetes/__init__.py b/tests/integration/kubernetes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/kubernetes/test_buck_apply.py b/tests/integration/kubernetes/test_buck_apply.py new file mode 100644 index 00000000..a52f4921 --- /dev/null +++ b/tests/integration/kubernetes/test_buck_apply.py @@ -0,0 +1,106 @@ +from kubeflow.fairing.kubernetes.manager import KubeManager + +core_api_test = ''' +apiVersion: v1 +kind: Pod +metadata: + name: core-api-test + labels: + name: nginx +spec: + containers: + - name: nginx + image: nginx + ports: + - containerPort: 80 +''' + +apps_api_test = ''' +apiVersion: apps/v1 +kind: Deployment +metadata: + name: apps-api-test + labels: + app: nginx +spec: + replicas: 1 + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + containers: + - name: nginx + image: nginx:1.14.2 + ports: + - containerPort: 80 +''' + +custom_resource_test = ''' +apiVersion: "kubeflow.org/v1" +kind: "TFJob" +metadata: + name: "custom-resource-test" +spec: + tfReplicaSpecs: + PS: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: tensorflow + image: kubeflow/tf-dist-mnist-test:1.0 + Worker: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: tensorflow + image: kubeflow/tf-dist-mnist-test:1.0 +''' + +kubeflow_client = KubeManager() + +def test_apply_namespaced_object_core_v1_api(): + ''' + Test apply_namespaced_object API for CoreV1Api + ''' + kubeflow_client.apply_namespaced_object(core_api_test) + kubeflow_client.apply_namespaced_object(core_api_test, mode='patch') + kubeflow_client.apply_namespaced_object(core_api_test, mode='delete') + + +def test_apply_namespaced_object_apps_v1_api(): + ''' + Test apply_namespaced_object API for AppV1Api + ''' + kubeflow_client.apply_namespaced_object(apps_api_test) + kubeflow_client.apply_namespaced_object(apps_api_test, mode='patch') + kubeflow_client.apply_namespaced_object(apps_api_test, mode='delete') + +def test_apply_namespaced_object_custom_resource_api(): + ''' + Test apply_namespaced_object API for CRD API + ''' + kubeflow_client.apply_namespaced_object(custom_resource_test) + kubeflow_client.apply_namespaced_object(custom_resource_test, mode='patch') + kubeflow_client.apply_namespaced_object(custom_resource_test, mode='delete') + +def test_apply_namespaced_objects(): + ''' + Test apply_namespaced_objects for buck applying + ''' + # To avoid error about the object already exists, rename the resource. + bulk_resources = [ + core_api_test.replace('core-api-test', 'core-api-test-buck'), + apps_api_test.replace('apps-api-test', 'apps-api-test-buck'), + custom_resource_test.replace('custom-resource-test', 'custom-resource-test-buck') + ] + kubeflow_client.apply_namespaced_objects(bulk_resources) + kubeflow_client.apply_namespaced_objects(bulk_resources, mode='patch') + kubeflow_client.apply_namespaced_objects(bulk_resources, mode='delete')