Skip to content

Commit

Permalink
⭐ 💥 support RayJob via Dagster Pipes; rework of everything (#10)
Browse files Browse the repository at this point in the history
⭐ 💥 support RayJob via Dagster Pipes; small breaking API changes
  • Loading branch information
danielgafni authored Sep 17, 2024
1 parent 47d3a64 commit f14b80a
Show file tree
Hide file tree
Showing 30 changed files with 1,830 additions and 1,000 deletions.
17 changes: 10 additions & 7 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,24 @@ concurrency:

jobs:
test:
name: test ${{ matrix.py }} - ${{ matrix.os }}
name: Test Python ${{ matrix.py }} - Kubernetes ${{ matrix.kubernetes }}
runs-on: ${{ matrix.os }}-latest
strategy:
fail-fast: false
matrix:
os:
- Ubuntu
py:
# - "3.12"
#- "3.12"
- "3.11"
- "3.10"
- "3.9"
# - "3.8"
kubernetes:
- "1.31.0"
- "1.30.0"
- "1.29.0"
steps:
- name: Setup python for test ${{ matrix.py }}
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.py }}
Expand All @@ -54,7 +57,8 @@ jobs:
run: poetry install --all-extras --sync
- name: Run tests
env:
PYTEST_KUBERAY_VERSIONS: "1.0.0,1.1.0" # will run tests for all these KubeRay versions
PYTEST_KUBERAY_VERSIONS: "1.2.0" # will run tests for all these KubeRay versions
PYTEST_KUBERNETES_VERSION: ${{ matrix.kubernetes }}
run: pytest -v .

lint:
Expand All @@ -66,11 +70,10 @@ jobs:
os:
- Ubuntu
py:
# - "3.12"
#- "3.12"
- "3.11"
- "3.10"
- "3.9"
# - "3.8"
steps:
- name: Setup python for test ${{ matrix.py }}
uses: actions/setup-python@v4
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ ENV DEBIAN_FRONTEND=noninteractive
RUN --mount=type=cache,target=/var/cache/apt \
apt-get update && apt-get install -y git jq curl gcc python3-dev libpq-dev wget

COPY --from=bitnami/kubectl:1.30.3 /opt/bitnami/kubectl/bin/kubectl /usr/local/bin/

# install poetry
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
Expand Down
89 changes: 81 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ The following backends are implemented:
Documentation can be found below.

> [!NOTE]
> This project is in early development. Contributions are very welcome! See the [Development](#development) section below.
> This project is in early development. APIs are unstable and can change at any time. Contributions are very welcome! See the [Development](#development) section below.
# Installation

Expand Down Expand Up @@ -128,10 +128,83 @@ definitions = Definitions(

This backend requires a Kubernetes cluster with the `KubeRay Operator` installed.

Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and `RayCluster` labels.
Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and Kubernetes labels.

To run `ray` code in client mode (from the Dagster Python process directly), use the `KubeRayClient` resource (see the [KubeRayCluster](#KubeRayCluster) section).
To run `ray` code in job mode, use the `PipesRayJobClient` with Dagster Pipes (see the [Pipes](#pipes) section).

The public objects can be imported from `dagster_ray.kuberay` module.

### Pipes

`dagster-ray` provides the `PipesRayJobClient` which can be used to execute remote Ray jobs on Kubernetes and receive Dagster events and logs from them.
[RayJob](https://docs.ray.io/en/latest/cluster/kubernetes/getting-started/rayjob-quick-start.html) will manage the lifecycle of the underlying `RayCluster`, which will be cleaned up after the specified entrypoint exits.

Examples:

On the orchestration side, import the `PipesRayJobClient` and invoke it inside an `@op` or an `@asset`:

```python
from dagster import AssetExecutionContext, Definitions, asset

from dagster_ray.kuberay import PipesRayJobClient


@asset
def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobClient):
pipes_rayjob_client.run(
context=context,
ray_job={
# RayJob manifest goes here
# .metadata.name is not required and will be generated if not provided
# *.container.image is not required and will be set to the current `dagster/image` tag if not provided
# full reference: https://ray-project.github.io/kuberay/reference/api/#rayjob
...
},
extra={"foo": "bar"},
)


definitions = Definitions(
resources={"pipes_rayjob_client": PipesRayJobClient()}, assets=[my_asset]
)
```

In the Ray job, import `dagster_pipes` (must be provided as a dependency) and emit regular Dagster events such as logs or asset materializations:

```python
from dagster_pipes import open_dagster_pipes


with open_dagster_pipes() as pipes:
pipes.log.info("Hello from Ray Pipes!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)
```

A convenient way to provide `dagster-pipes` to the Ray job is with `runtimeEnvYaml` field:

```python
import yaml

ray_job = {"spec": {"runtimeEnvYaml": yaml.safe_dump({"pip": ["dagster-pipes"]})}}
```

The logs and events emitted by the Ray job will be captured by the `PipesRayJobClient` and will become available in the Dagster event log. Standard output and standard error streams will be forwarded to the standard output of the Dagster process.


**Running locally**

When running locally, the `port_forward` option has to be set to `True` in the `PipesRayJobClient` resource in order to interact with the Ray job. For convenience, it can be set automatically with:

```python
from dagster_ray.kuberay.configs import in_k8s

pipes_rayjob_client = PipesRayJobClient(..., port_forward=not in_k8s)
```

### Resources

#### `KubeRayCluster`
Expand Down Expand Up @@ -188,7 +261,7 @@ ray_cluster = KubeRayCluster(
)
)
```
#### `KubeRayAPI`
#### `KubeRayClient`

This resource can be used to interact with the Kubernetes API Server.

Expand All @@ -198,14 +271,14 @@ Listing currently running `RayClusters`:

```python
from dagster import op, Definitions
from dagster_ray.kuberay import KubeRayAPI
from dagster_ray.kuberay import KubeRayClient


@op
def list_ray_clusters(
kube_ray_api: KubeRayAPI,
kube_ray_client: KubeRayClient,
):
return kube_ray_api.kuberay.list_ray_clusters(k8s_namespace="kuberay")
return kube_ray_client.client.list(namespace="kuberay")
```

### Jobs
Expand Down Expand Up @@ -252,13 +325,13 @@ Running `pytest` will **automatically**:
- build an image with the local `dagster-ray` code
- start a `minikube` Kubernetes cluster
- load the built `dagster-ray` and loaded `kuberay-operator` images into the cluster
- install the `KubeRay Operator` in the cluster with `helm`
- install `KubeRay Operator` into the cluster with `helm`
- run the tests

Thus, no manual setup is required, just the presence of the tools listed above. This makes testing a breeze!

> [!NOTE]
> Specifying a comma-separated list of `KubeRay Operator` versions in the `KUBE_RAY_OPERATOR_VERSIONS` environment variable will spawn a new test for each version.
> Specifying a comma-separated list of `KubeRay Operator` versions in the `PYTEST_KUBERAY_VERSIONS` environment variable will spawn a new test for each version.
> [!NOTE]
> it may take a while to download `minikube` and `kuberay-operator` images and build the local `dagster-ray` image during the first tests invocation
9 changes: 9 additions & 0 deletions dagster_ray/_base/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os

DEFAULT_DEPLOYMENT_NAME = (
os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME")
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "0"
else os.getenv("DAGSTER_CLOUD_GIT_BRANCH")
) or "dev"

IS_PROD = DEFAULT_DEPLOYMENT_NAME == "prod"
14 changes: 6 additions & 8 deletions dagster_ray/_base/resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import sys
import uuid
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional, Union, cast
from typing import TYPE_CHECKING, Dict, Optional, Union, cast

from dagster import ConfigurableResource, InitResourceContext, OpExecutionContext
from pydantic import Field, PrivateAttr
Expand All @@ -11,14 +10,9 @@
from requests.exceptions import ConnectionError
from tenacity import retry, retry_if_exception_type, stop_after_delay

from dagster_ray._base.utils import get_dagster_tags
from dagster_ray.config import RayDataExecutionOptions

if sys.version_info >= (3, 11):
pass
else:
pass


if TYPE_CHECKING:
from ray._private.worker import BaseContext as RayBaseContext # noqa

Expand Down Expand Up @@ -86,6 +80,10 @@ def init_ray(self, context: Union[OpExecutionContext, InitResourceContext]) -> "
context.log.info("Initialized Ray!")
return cast("RayBaseContext", self._context)

def get_dagster_tags(self, context: InitResourceContext) -> Dict[str, str]:
tags = get_dagster_tags(context)
return tags

def _get_step_key(self, context: InitResourceContext) -> str:
# just return a random string
# since we want a fresh cluster every time
Expand Down
30 changes: 30 additions & 0 deletions dagster_ray/_base/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
from typing import Dict, Union, cast

from dagster import InitResourceContext, OpExecutionContext

from dagster_ray._base.constants import DEFAULT_DEPLOYMENT_NAME


def get_dagster_tags(context: Union[InitResourceContext, OpExecutionContext]) -> Dict[str, str]:
"""
Returns a dictionary with common Dagster tags.
"""
assert context.dagster_run is not None

labels = {
"dagster.io/run_id": cast(str, context.run_id),
"dagster.io/deployment": DEFAULT_DEPLOYMENT_NAME,
# TODO: add more labels
}

if context.dagster_run.tags.get("user"):
labels["dagster.io/user"] = context.dagster_run.tags["user"]

if os.getenv("DAGSTER_CLOUD_GIT_BRANCH"):
labels["dagster.io/git-branch"] = os.environ["DAGSTER_CLOUD_GIT_BRANCH"]

if os.getenv("DAGSTER_CLOUD_GIT_SHA"):
labels["dagster.io/git-sha"] = os.environ["DAGSTER_CLOUD_GIT_SHA"]

return labels
7 changes: 5 additions & 2 deletions dagster_ray/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

from typing import Optional

import ray
from dagster import Config
from pydantic import Field
from ray.data import ExecutionResources


class ExecutionOptionsConfig(Config):
Expand All @@ -23,6 +21,9 @@ class RayDataExecutionOptions(Config):
use_polars: bool = True

def apply(self):
import ray
from ray.data import ExecutionResources

ctx = ray.data.DatasetContext.get_current()

ctx.execution_options.resource_limits = ExecutionResources.for_limits(
Expand All @@ -35,6 +36,8 @@ def apply(self):
ctx.use_polars = self.use_polars

def apply_remote(self):
import ray

@ray.remote
def apply():
self.apply()
Expand Down
4 changes: 2 additions & 2 deletions dagster_ray/kuberay/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from dagster_ray.kuberay.configs import RayClusterConfig
from dagster_ray.kuberay.jobs import cleanup_kuberay_clusters, delete_kuberay_clusters
from dagster_ray.kuberay.ops import cleanup_kuberay_clusters_op, delete_kuberay_clusters_op
from dagster_ray.kuberay.resources import KubeRayAPI, KubeRayCluster
from dagster_ray.kuberay.resources import KubeRayCluster, RayClusterClientResource
from dagster_ray.kuberay.schedules import cleanup_kuberay_clusters_daily

__all__ = [
"KubeRayCluster",
"RayClusterConfig",
"KubeRayAPI",
"RayClusterClientResource",
"cleanup_kuberay_clusters",
"delete_kuberay_clusters",
"cleanup_kuberay_clusters_op",
Expand Down
4 changes: 4 additions & 0 deletions dagster_ray/kuberay/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from dagster_ray.kuberay.client.raycluster import RayClusterClient
from dagster_ray.kuberay.client.rayjob import RayJobClient

__all__ = ["RayClusterClient", "RayJobClient"]
Loading

0 comments on commit f14b80a

Please sign in to comment.