From 0be90ab91cf56a8429d1ac2c3211923bdfcf6480 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 16 May 2022 12:08:33 +0100 Subject: [PATCH 01/12] Add istio install fixture (enable with TEST_ISTIO env var) --- dask_kubernetes/conftest.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dask_kubernetes/conftest.py b/dask_kubernetes/conftest.py index f70bd2ee4..7abedb17c 100644 --- a/dask_kubernetes/conftest.py +++ b/dask_kubernetes/conftest.py @@ -36,6 +36,16 @@ def k8s_cluster(kind_cluster, docker_image): del os.environ["KUBECONFIG"] +@pytest.fixture(scope="session", autouse=True) +def install_istio(k8s_cluster): + if bool(os.environ.get("TEST_ISTIO", False)): + check_dependency("istioctl") + subprocess.check_output(["istioctl", "install", "--set", "profile=demo", "-y"]) + k8s_cluster.kubectl( + "label", "namespace", "default", "istio-injection=enabled", "--overwrite" + ) + + @pytest.fixture(scope="session") def ns(k8s_cluster): return "default" From ecb5152e7815b5b82e0abffa0c3f77aa145bac8c Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 16 May 2022 12:08:58 +0100 Subject: [PATCH 02/12] Limit uuid length --- dask_kubernetes/operator/operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index e1e7e6b8b..ec025c14a 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -195,7 +195,7 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): if workers_needed > 0: for _ in range(workers_needed): data = build_worker_pod_spec( - name, spec["cluster"], uuid4().hex, spec["worker"]["spec"] + name, spec["cluster"], uuid4().hex[:10], spec["worker"]["spec"] ) kopf.adopt(data) await api.create_namespaced_pod( From 539fdcd65e3982250d2050707f6074a9cc8436c7 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 16 May 2022 14:20:10 +0100 Subject: [PATCH 03/12] Add worker service --- dask_kubernetes/experimental/kubecluster.py | 26 ++++---- dask_kubernetes/operator/operator.py | 59 +++++++++++++++++-- .../tests/resources/simplecluster.yaml | 15 +++++ .../operator/tests/test_operator.py | 2 +- 4 files changed, 84 insertions(+), 18 deletions(-) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index e1aee1420..87cc9129d 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -558,15 +558,17 @@ def from_name(cls, name, **kwargs): @atexit.register def reap_clusters(): - async def _reap_clusters(): - for cluster in list(KubeCluster._instances): - if cluster.shutdown_on_close and cluster.status != Status.closed: - await ClusterAuth.load_first(cluster.auth) - with suppress(TimeoutError): - if cluster.asynchronous: - await cluster.close(timeout=10) - else: - cluster.close(timeout=10) - - loop = asyncio.get_event_loop() - loop.run_until_complete(_reap_clusters()) + with suppress(Exception): + + async def _reap_clusters(): + for cluster in list(KubeCluster._instances): + if cluster.shutdown_on_close and cluster.status != Status.closed: + await ClusterAuth.load_first(cluster.auth) + with suppress(TimeoutError): + if cluster.asynchronous: + await cluster.close(timeout=10) + else: + cluster.close(timeout=10) + + loop = asyncio.get_event_loop() + loop.run_until_complete(_reap_clusters()) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index ec025c14a..e184d6603 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -1,4 +1,5 @@ import asyncio +import copy from distributed.core import rpc @@ -42,9 +43,8 @@ def build_scheduler_service_spec(name, spec): } -def build_worker_pod_spec(name, cluster_name, n, spec): - worker_name = f"{name}-worker-{n}" - return { +def build_worker_pod_spec(name, cluster_name, worker_name, spec): + pod_spec = { "apiVersion": "v1", "kind": "Pod", "metadata": { @@ -53,11 +53,18 @@ def build_worker_pod_spec(name, cluster_name, n, spec): "dask.org/cluster-name": cluster_name, "dask.org/workergroup-name": name, "dask.org/component": "worker", + "dask.org/worker-name": worker_name, }, }, - "spec": spec, + "spec": copy.copy(spec), } + pod_spec["spec"]["containers"][0]["env"].append( + {"name": "DASK_WORKER_NAME", "value": worker_name} + ) + + return pod_spec + def build_worker_group_spec(name, spec): return { @@ -71,6 +78,40 @@ def build_worker_group_spec(name, spec): } +def build_worker_service_spec(cluster_name, worker_name): + return { + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "name": f"{worker_name}-service", + "labels": { + "dask.org/cluster-name": cluster_name, + }, + }, + "spec": { + "type": "ClusterIP", + "selector": { + "dask.org/cluster-name": cluster_name, + "dask.org/worker-name": worker_name, + }, + "ports": [ + { + "name": "comm", + "protocol": "TCP", + "port": 8788, + "targetPort": "comm", + }, + { + "name": "dashboard", + "protocol": "TCP", + "port": 8787, + "targetPort": "dashboard", + }, + ], + }, + } + + def build_cluster_spec(name, worker_spec, scheduler_spec): return { "apiVersion": "kubernetes.dask.org/v1", @@ -194,8 +235,16 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): if workers_needed > 0: for _ in range(workers_needed): + worker_name = f"{name}-worker-{uuid4().hex[:10]}" + data = build_worker_service_spec(spec["cluster"], worker_name) + kopf.adopt(data) + await api.create_namespaced_service( + namespace=namespace, + body=data, + ) + await wait_for_service(api, data["metadata"]["name"], namespace) data = build_worker_pod_spec( - name, spec["cluster"], uuid4().hex[:10], spec["worker"]["spec"] + name, spec["cluster"], worker_name, spec["worker"]["spec"] ) kopf.adopt(data) await api.create_namespaced_pod( diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index 06c794a30..16f5cd85e 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -14,6 +14,21 @@ spec: args: - dask-worker - tcp://simple-cluster-service.default.svc.cluster.local:8786 + - --name + - ${DASK_WORKER_NAME} + - --dashboard-address + - 0.0.0.0:8787 + - --listen-address + - tcp://0.0.0.0:8788 + - --contact-address + - tcp://${DASK_WORKER_NAME}.default.svc.cluster.local:8788 + ports: + - name: comm + containerPort: 8788 + protocol: TCP + - name: dashboard + containerPort: 8787 + protocol: TCP env: - name: WORKER_ENV value: hello-world # We dont test the value, just the name diff --git a/dask_kubernetes/operator/tests/test_operator.py b/dask_kubernetes/operator/tests/test_operator.py index 48daeea90..24fb91fea 100644 --- a/dask_kubernetes/operator/tests/test_operator.py +++ b/dask_kubernetes/operator/tests/test_operator.py @@ -86,7 +86,7 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): await client.wait_for_workers(3) -@pytest.mark.timeout(180) +@pytest.mark.timeout(300) @pytest.mark.asyncio async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster): with kopf_runner as runner: From dd272214488db13485134f02819745d52b8007cc Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 16 May 2022 14:26:47 +0100 Subject: [PATCH 04/12] Enable Istio on Operator tests --- .github/workflows/operator.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/operator.yaml b/.github/workflows/operator.yaml index 0c1158030..a13e6bb83 100644 --- a/.github/workflows/operator.yaml +++ b/.github/workflows/operator.yaml @@ -33,4 +33,5 @@ jobs: - name: Run tests env: KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig + TEST_ISTIO: "true" run: pytest dask_kubernetes/common/tests dask_kubernetes/operator/tests dask_kubernetes/experimental/tests From 912db38ae653c8ab56af53711a659902468f204d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 16 May 2022 14:32:24 +0100 Subject: [PATCH 05/12] Install istioctl --- ci/install-deps.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ci/install-deps.sh b/ci/install-deps.sh index 8a3e79e26..3d632a650 100755 --- a/ci/install-deps.sh +++ b/ci/install-deps.sh @@ -1,5 +1,8 @@ #!/bin/bash +curl -L https://istio.io/downloadIstio | sh - +mv istio-*/bin/istioctl /usr/local/bin/istioctl + pip install -e . pip install -r requirements-test.txt pip install git+https://github.com/dask/distributed@main From bbe86303b6500449782ce5aa4a6bb70a3f237ad8 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 16 May 2022 15:26:56 +0100 Subject: [PATCH 06/12] Reduce workflow timeout --- .github/workflows/operator.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/operator.yaml b/.github/workflows/operator.yaml index a13e6bb83..ad91ab99e 100644 --- a/.github/workflows/operator.yaml +++ b/.github/workflows/operator.yaml @@ -18,6 +18,7 @@ on: jobs: test: runs-on: ubuntu-latest + timeout-minutes: 30 strategy: fail-fast: false matrix: From 70011906245df20fcdf739b6ce9093458242c630 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 17 May 2022 11:40:42 +0100 Subject: [PATCH 07/12] Disable nanny --- .../operator/tests/resources/simplecluster.yaml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index 16f5cd85e..e84263c2a 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -14,6 +14,7 @@ spec: args: - dask-worker - tcp://simple-cluster-service.default.svc.cluster.local:8786 + - --no-nanny - --name - ${DASK_WORKER_NAME} - --dashboard-address @@ -21,7 +22,9 @@ spec: - --listen-address - tcp://0.0.0.0:8788 - --contact-address - - tcp://${DASK_WORKER_NAME}.default.svc.cluster.local:8788 + - tcp://${DASK_WORKER_NAME}-service.default.svc.cluster.local:8788 + # - sleep + # - "3600" ports: - name: comm containerPort: 8788 @@ -61,7 +64,7 @@ spec: - name: SCHEDULER_ENV value: hello-world service: - type: NodePort + type: ClusterIP selector: dask.org/cluster-name: simple-cluster dask.org/component: scheduler From fe6d8541bf9e9dd7d3affd6e94881530850d8323 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 17 May 2022 15:55:16 +0100 Subject: [PATCH 08/12] Fix env var expansion --- .../operator/tests/resources/simplecluster.yaml | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index e84263c2a..3cec3a435 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -11,20 +11,18 @@ spec: - name: worker image: "dask-kubernetes:dev" imagePullPolicy: "IfNotPresent" + command: ["dask-worker"] args: - - dask-worker - tcp://simple-cluster-service.default.svc.cluster.local:8786 - --no-nanny - --name - - ${DASK_WORKER_NAME} + - $(DASK_WORKER_NAME) - --dashboard-address - 0.0.0.0:8787 - --listen-address - tcp://0.0.0.0:8788 - --contact-address - - tcp://${DASK_WORKER_NAME}-service.default.svc.cluster.local:8788 - # - sleep - # - "3600" + - tcp://$(DASK_WORKER_NAME)-service.default.svc.cluster.local:8788 ports: - name: comm containerPort: 8788 From 0b98a9f2f75c043c4150012a4430aa32a79b5b0e Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 17 May 2022 15:56:49 +0100 Subject: [PATCH 09/12] Revert error suppression --- dask_kubernetes/experimental/kubecluster.py | 26 ++++++++++----------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 87cc9129d..e1aee1420 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -558,17 +558,15 @@ def from_name(cls, name, **kwargs): @atexit.register def reap_clusters(): - with suppress(Exception): - - async def _reap_clusters(): - for cluster in list(KubeCluster._instances): - if cluster.shutdown_on_close and cluster.status != Status.closed: - await ClusterAuth.load_first(cluster.auth) - with suppress(TimeoutError): - if cluster.asynchronous: - await cluster.close(timeout=10) - else: - cluster.close(timeout=10) - - loop = asyncio.get_event_loop() - loop.run_until_complete(_reap_clusters()) + async def _reap_clusters(): + for cluster in list(KubeCluster._instances): + if cluster.shutdown_on_close and cluster.status != Status.closed: + await ClusterAuth.load_first(cluster.auth) + with suppress(TimeoutError): + if cluster.asynchronous: + await cluster.close(timeout=10) + else: + cluster.close(timeout=10) + + loop = asyncio.get_event_loop() + loop.run_until_complete(_reap_clusters()) From f4939e47c58fb273a962df99c16d3f4519cc3d89 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 18 May 2022 11:39:50 +0100 Subject: [PATCH 10/12] Move dask-worker command into args --- dask_kubernetes/operator/tests/resources/simplecluster.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index 3cec3a435..abf2b1e68 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -11,8 +11,8 @@ spec: - name: worker image: "dask-kubernetes:dev" imagePullPolicy: "IfNotPresent" - command: ["dask-worker"] args: + - dask-worker - tcp://simple-cluster-service.default.svc.cluster.local:8786 - --no-nanny - --name From 4606fb9039c2bf7e5273f177dc8a6c3a1efe8be1 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 18 May 2022 13:10:09 +0100 Subject: [PATCH 11/12] Create service accounts --- dask_kubernetes/operator/operator.py | 70 +++++++++++++++++-- .../tests/resources/simplecluster.yaml | 12 ++-- 2 files changed, 70 insertions(+), 12 deletions(-) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index e184d6603..da2a924c1 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -14,35 +14,59 @@ ) -def build_scheduler_pod_spec(name, spec): - return { +def build_scheduler_pod_spec(cluster_name, spec): + scheduler_name = f"{cluster_name}-scheduler" + pod_spec = { "apiVersion": "v1", "kind": "Pod", "metadata": { - "name": f"{name}-scheduler", + "name": scheduler_name, "labels": { - "dask.org/cluster-name": name, + "dask.org/cluster-name": cluster_name, "dask.org/component": "scheduler", + "app": "scheduler", + "version": "v1", }, }, "spec": spec, } + pod_spec["spec"]["serviceAccountName"] = f"{scheduler_name}-service" + + return pod_spec -def build_scheduler_service_spec(name, spec): + +def build_scheduler_service_spec(cluster_name, spec): return { "apiVersion": "v1", "kind": "Service", "metadata": { - "name": f"{name}-service", + "name": f"{cluster_name}-scheduler-service", "labels": { - "dask.org/cluster-name": name, + "dask.org/cluster-name": cluster_name, + "app": "scheduler", + "service": "scheduler", }, }, "spec": spec, } +def build_scheduler_service_account_spec(cluster_name): + scheduler_service_name = f"{cluster_name}-scheduler-service" + return { + "apiVersion": "v1", + "kind": "ServiceAccount", + "metadata": { + "name": scheduler_service_name, + "labels": { + "dask.org/cluster-name": cluster_name, + "account": scheduler_service_name, + }, + }, + } + + def build_worker_pod_spec(name, cluster_name, worker_name, spec): pod_spec = { "apiVersion": "v1", @@ -59,6 +83,7 @@ def build_worker_pod_spec(name, cluster_name, worker_name, spec): "spec": copy.copy(spec), } + pod_spec["spec"]["serviceAccountName"] = f"{worker_name}-service" pod_spec["spec"]["containers"][0]["env"].append( {"name": "DASK_WORKER_NAME", "value": worker_name} ) @@ -112,6 +137,20 @@ def build_worker_service_spec(cluster_name, worker_name): } +def build_worker_service_account_spec(cluster_name, worker_name): + return { + "apiVersion": "v1", + "kind": "ServiceAccount", + "metadata": { + "name": f"{worker_name}-service", + "labels": { + "dask.org/cluster-name": cluster_name, + "account": f"{worker_name}-service", + }, + }, + } + + def build_cluster_spec(name, worker_spec, scheduler_spec): return { "apiVersion": "kubernetes.dask.org/v1", @@ -144,6 +183,13 @@ async def daskcluster_create(spec, name, namespace, logger, **kwargs): async with kubernetes.client.api_client.ApiClient() as api_client: api = kubernetes.client.CoreV1Api(api_client) + scheduler_service_account_spec = build_scheduler_service_account_spec(name) + kopf.adopt(scheduler_service_account_spec) + await api.create_namespaced_service_account( + namespace=namespace, + body=scheduler_service_account_spec, + ) + # TODO Check for existing scheduler pod scheduler_spec = spec.get("scheduler", {}) data = build_scheduler_pod_spec(name, scheduler_spec.get("spec")) @@ -236,6 +282,16 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): if workers_needed > 0: for _ in range(workers_needed): worker_name = f"{name}-worker-{uuid4().hex[:10]}" + + worker_service_account_spec = build_worker_service_account_spec( + spec["cluster"], worker_name + ) + kopf.adopt(worker_service_account_spec) + await api.create_namespaced_service_account( + namespace=namespace, + body=worker_service_account_spec, + ) + data = build_worker_service_spec(spec["cluster"], worker_name) kopf.adopt(data) await api.create_namespaced_service( diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index abf2b1e68..2a49405db 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -13,7 +13,7 @@ spec: imagePullPolicy: "IfNotPresent" args: - dask-worker - - tcp://simple-cluster-service.default.svc.cluster.local:8786 + - tcp://simple-cluster-scheduler-service.default.svc.cluster.local:8786 - --no-nanny - --name - $(DASK_WORKER_NAME) @@ -49,13 +49,15 @@ spec: containerPort: 8787 protocol: TCP readinessProbe: - tcpSocket: - port: comm + httpGet: + path: /health + port: dashboard initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: - tcpSocket: - port: comm + httpGet: + path: /health + port: dashboard initialDelaySeconds: 15 periodSeconds: 20 env: From 2acee68010e5764eaf8b1b378ad2129eba506b7d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 31 May 2022 12:24:32 +0100 Subject: [PATCH 12/12] Fix port names --- dask_kubernetes/operator/operator.py | 8 ++++---- .../operator/tests/resources/simplecluster.yaml | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index 3388c24ec..43261e53d 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -163,16 +163,16 @@ def build_worker_service_spec(cluster_name, worker_name): }, "ports": [ { - "name": "comm", + "name": "tcp-comm", "protocol": "TCP", "port": 8788, - "targetPort": "comm", + "targetPort": "tcp-comm", }, { - "name": "dashboard", + "name": "http-dashboard", "protocol": "TCP", "port": 8787, - "targetPort": "dashboard", + "targetPort": "http-dashboard", }, ], }, diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index 5811abd8a..8ff90b961 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -24,10 +24,10 @@ spec: - --contact-address - tcp://$(DASK_WORKER_NAME)-service.default.svc.cluster.local:8788 ports: - - name: comm + - name: tcp-comm containerPort: 8788 protocol: TCP - - name: dashboard + - name: http-dashboard containerPort: 8787 protocol: TCP env: