Skip to content

Commit d0957d1

Browse files
authored
Merge pull request #26 from ASML-Labs/chore/helm-install-or-upgrade
refactor: manage user-code release through helm
2 parents 718c7f4 + 9bc9c0e commit d0957d1

File tree

4 files changed

+55
-115
lines changed

4 files changed

+55
-115
lines changed

dagster_uc/manage_user_code_deployments.py

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,20 @@
55
import logging
66
import os
77
import pprint
8-
import subprocess
98
import time
109
from dataclasses import asdict
1110
from typing import Annotated, cast
1211

1312
import kr8s
1413
import typer
1514
from kr8s.objects import (
16-
ConfigMap,
1715
Pod,
1816
)
1917

2018
from dagster_uc.config import UserCodeDeploymentsConfig, load_config
2119
from dagster_uc.log import logger
2220
from dagster_uc.uc_handler import DagsterUserCodeHandler
23-
from dagster_uc.utils import BuildTool, build_and_push, gen_tag
21+
from dagster_uc.utils import BuildTool, build_and_push, gen_tag, is_command_available
2422

2523
app = typer.Typer(invoke_without_command=True)
2624
deployment_app = typer.Typer(
@@ -79,7 +77,6 @@ def default(
7977
kr8s_api = kr8s.api(context=f"{config.kubernetes_context}", namespace=config.namespace)
8078

8179
handler = DagsterUserCodeHandler(config, kr8s_api)
82-
handler._ensure_dagster_version_match()
8380
handler.maybe_create_user_deployments_configmap()
8481
logger.debug(f"Done: Switched kubernetes context to {config.environment}")
8582

@@ -253,18 +250,7 @@ def deployment_delete(
253250
) -> None:
254251
if delete_all:
255252
handler.remove_all_deployments()
256-
handler.delete_k8s_resources(
257-
label_selector="app.kubernetes.io/name=dagster-user-deployments",
258-
)
259-
handler.delete_k8s_resources(label_selector="app=dagster-user-deployments")
260-
for item in handler.api.get(
261-
ConfigMap,
262-
namespace=config.namespace,
263-
label_selector="app=dagster-user-deployments",
264-
):
265-
item.delete() # type: ignore
266-
handler.delete_k8s_resources(label_selector="dagster/code-location")
267-
handler.deploy_to_k8s()
253+
handler.deploy_to_k8s(reload_dagster=True)
268254
typer.echo("\033[1mDeleted all deployments\033[0m")
269255
else:
270256
if not name:
@@ -276,10 +262,6 @@ def deployment_delete(
276262
# In case the UI name separator of the deployment is passed
277263
name = name.replace(":", "--")
278264
handler.remove_user_deployment_from_configmap(name)
279-
handler.delete_k8s_resources_for_user_deployment(
280-
name,
281-
delete_deployments=True,
282-
)
283265
handler.deploy_to_k8s(reload_dagster=True)
284266
typer.echo(f"Deleted deployment \033[1m{name}\033[0m")
285267

@@ -373,18 +355,7 @@ def deployment_deploy(
373355
),
374356
] = False,
375357
):
376-
def is_command_available(command: str) -> bool:
377-
try:
378-
subprocess.run(
379-
[command, "--version"],
380-
capture_output=True,
381-
check=True, # ruff: ignore
382-
)
383-
return True
384-
except subprocess.CalledProcessError:
385-
return False
386-
except FileNotFoundError:
387-
return False
358+
handler._ensure_dagster_version_match()
388359

389360
count = 0
390361
while not handler.acquire_semaphore(reset_lock):
@@ -451,6 +422,7 @@ def is_command_available(command: str) -> bool:
451422
logger.info(
452423
f"Deployment with name '{deployment_name}' exists in '{config.environment}'. Updating deployment in configmap",
453424
)
425+
# TODO(ion): make this into a single operation (replace)
454426
handler.remove_user_deployment_from_configmap(deployment_name)
455427
handler.add_user_deployment_to_configmap(
456428
handler.gen_new_deployment_yaml(
@@ -460,19 +432,16 @@ def is_command_available(command: str) -> bool:
460432
),
461433
)
462434
if config.cicd or force:
463-
handler.delete_k8s_resources_for_user_deployment(deployment_name)
464435
handler.deploy_to_k8s()
465436
elif not handler.check_if_code_pod_exists(label=deployment_name):
466437
logger.info(
467438
"Code deployment present in configmap but pod not found, triggering full deploy...",
468439
)
469-
handler.delete_k8s_resources_for_user_deployment(deployment_name, True)
470440
handler.deploy_to_k8s() # Something went wrong - redeploy yamls and reload webserver
471441
else:
472442
logger.info(
473443
"Code deployment present in configmap and pod found...",
474444
)
475-
handler.delete_k8s_resources_for_user_deployment(deployment_name, False)
476445
handler.deploy_to_k8s(reload_dagster=False)
477446
finally:
478447
handler.release_semaphore()

dagster_uc/uc_handler.py

Lines changed: 35 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import NamedTuple
77

88
import kr8s
9+
import pyhelm3
910
import yaml
1011
from kr8s.objects import (
1112
ConfigMap,
@@ -39,20 +40,21 @@ def __init__(self, config: UserCodeDeploymentsConfig, kr8s_api: kr8s.Api) -> Non
3940

4041
def maybe_create_user_deployments_configmap(self) -> None:
4142
"""Creates a user deployments_configmap if it doesn't exist yet."""
42-
from copy import deepcopy
43-
44-
dagster_user_deployments_values_yaml_configmap = deepcopy(BASE_CONFIGMAP)
45-
dagster_user_deployments_values_yaml_configmap["metadata"]["name"] = (
46-
self.config.user_code_deployments_configmap_name
47-
)
48-
dagster_user_deployments_values_yaml_configmap["data"]["yaml"] = yaml.dump(
49-
BASE_CONFIGMAP_DATA,
50-
)
5143
try:
5244
self._read_namespaced_config_map(
5345
self.config.user_code_deployments_configmap_name,
5446
)
5547
except kr8s.NotFoundError:
48+
from copy import deepcopy
49+
50+
dagster_user_deployments_values_yaml_configmap = deepcopy(BASE_CONFIGMAP)
51+
dagster_user_deployments_values_yaml_configmap["metadata"]["name"] = (
52+
self.config.user_code_deployments_configmap_name
53+
)
54+
dagster_user_deployments_values_yaml_configmap["data"]["yaml"] = yaml.dump(
55+
BASE_CONFIGMAP_DATA,
56+
)
57+
5658
ConfigMap(
5759
resource=dagster_user_deployments_values_yaml_configmap,
5860
namespace=self.config.namespace,
@@ -180,36 +182,43 @@ def deploy_to_k8s(
180182
self.update_dagster_workspace_yaml()
181183

182184
loop = asyncio.new_event_loop()
183-
helm_client = Client()
185+
RELEASE_NAME = "dagster-user-code" # noqa
186+
helm_client = Client(kubecontext=self.config.kubernetes_context)
187+
184188
chart = loop.run_until_complete(
185189
helm_client.get_chart(
186190
chart_ref="dagster-user-deployments",
187191
repo="https://dagster-io.github.io/helm",
188192
version=self.config.dagster_version,
189193
),
190194
)
191-
helm_templates = [
192-
*loop.run_until_complete(
193-
helm_client.template_resources(
194-
chart,
195-
"dagster",
196-
values_dict,
197-
namespace=self.config.namespace,
198-
),
195+
logger.info(
196+
"Upgrading helm release '%s'...",
197+
RELEASE_NAME,
198+
)
199+
installed = loop.run_until_complete(
200+
helm_client.install_or_upgrade_release(
201+
RELEASE_NAME,
202+
chart,
203+
values_dict,
204+
namespace=self.config.namespace,
205+
wait=True,
199206
),
200-
]
207+
)
208+
if installed.status == pyhelm3.ReleaseRevisionStatus.FAILED:
209+
logger.error(
210+
"Dagster-usercode helm release install or upgrade failed, rolling back now..",
211+
)
212+
from pyhelm3 import Release
201213

202-
# Update user code deployments in k8s (akin to kubectl apply -f)
203-
for obj in helm_templates:
204-
k8s_obj = eval(obj["kind"])(obj, api=self.api)
205-
try:
206-
k8s_obj.patch(obj)
207-
except kr8s.NotFoundError:
208-
k8s_obj.create()
214+
release = Release(name=RELEASE_NAME, namespace=self.config.namespace)
215+
loop.run_until_complete(release.rollback())
216+
raise Exception("Helm user-code deployment failed, had to rollback.")
209217

210218
if reload_dagster:
211219
for deployment_name in ["dagster-daemon", "dagster-dagster-webserver"]:
212220
deployment = Deployment.get(deployment_name, namespace=self.config.namespace)
221+
213222
reload_patch = {
214223
"spec": {
215224
"template": {
@@ -225,38 +234,6 @@ def deploy_to_k8s(
225234
}
226235
deployment.patch(reload_patch)
227236

228-
def delete_k8s_resources_for_user_deployment(
229-
self,
230-
label: str,
231-
delete_deployments: bool = True,
232-
) -> None:
233-
"""Deletes all k8s resources related to a specific user code deployment.
234-
Returns a boolean letting you know if pod was found
235-
"""
236-
for pod in self.api.get(
237-
Pod,
238-
label_selector=f"dagster/code-location={label}",
239-
field_selector="status.phase=Succeeded",
240-
namespace=self.config.namespace,
241-
):
242-
logger.info(f"Deleting pod {pod.name}")
243-
pod.delete()
244-
245-
if delete_deployments:
246-
import contextlib
247-
248-
with contextlib.suppress(kr8s.NotFoundError):
249-
Deployment.get(
250-
namespace=self.config.namespace,
251-
label_selector=f"deployment={label}",
252-
api=self.api,
253-
).delete()
254-
Deployment.get(
255-
namespace=self.config.namespace,
256-
label_selector=f"dagster/code-location={label}",
257-
api=self.api,
258-
).delete()
259-
260237
def gen_new_deployment_yaml(
261238
self,
262239
name: str,
@@ -480,27 +457,6 @@ def check_if_code_pod_exists(self, label: str) -> bool:
480457
)
481458
return len(running_pods) > 0
482459

483-
def delete_k8s_resources(self, label_selector: str):
484-
"""Delete all k8s resources with a specified label_selector"""
485-
for resource in [
486-
"Pod",
487-
"ReplicationController",
488-
"Service",
489-
"DaemonSet",
490-
"Deployment",
491-
"ReplicaSet",
492-
"StatefulSet",
493-
"HorizontalPodAutoscaler",
494-
"CronJob",
495-
"Job",
496-
]:
497-
for item in self.api.get(
498-
resource,
499-
namespace=self.config.namespace,
500-
label_selector=label_selector,
501-
):
502-
item.delete()
503-
504460
def acquire_semaphore(self, reset_lock: bool = False) -> bool:
505461
"""Acquires a semaphore by creating a configmap"""
506462
if reset_lock:

dagster_uc/utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,18 @@ def build_and_push(
186186
cmd = ["sudo"] + cmd
187187
exception_on_failed_subprocess(subprocess.run(cmd, capture_output=False))
188188
os.chdir(previous_dir)
189+
190+
191+
def is_command_available(command: str) -> bool:
192+
"""Checks if command is available."""
193+
try:
194+
subprocess.run(
195+
[command, "--version"],
196+
capture_output=True,
197+
check=True, # ruff: ignore
198+
)
199+
return True
200+
except subprocess.CalledProcessError:
201+
return False
202+
except FileNotFoundError:
203+
return False

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "dagster-uc"
3-
version = "0.3.5"
3+
version = "0.4.0"
44
authors = [
55
{name = "Stefan Verbruggen"},
66
{name = "Ion Koutsouris"},

0 commit comments

Comments
 (0)