Skip to content

Commit

Permalink
Support per-job volumes (#2276)
Browse files Browse the repository at this point in the history
* Support per-job volumes

* Use dstack. namespace instead of sys.

* Document Volumes with multi-node tasks
  • Loading branch information
r4victor authored Feb 7, 2025
1 parent db585a8 commit 7108e44
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 226 deletions.
26 changes: 26 additions & 0 deletions docs/docs/concepts/volumes.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,32 @@ and its contents will persist across runs.

`dstack` will attach one of the volumes based on the region and backend of the run.

!!! info "Volumes with multi-node tasks"
To use single-attach volumes such as AWS EBS with multi-node tasks,
attach different volumes to different nodes using `dstack` variable interpolation:

<div editor-title=".dstack.yml">

```yaml
type: task
nodes: 8
commands:
- ...
volumes:
- name: data-volume-${{ dstack.node_rank }}
path: /volume_data
```

</div>

This way, every node will use its own volume.

Tip: To create volumes for all nodes using one volume configuration, specify volume name with `-n`:

```shell
$ for i in {0..7}; do dstack apply -f vol.dstack.yml -n data-volume-$i -y; done
```

??? info "Container path"
When you're running a dev environment, task, or service with `dstack`, it automatically mounts the project folder contents
to `/workflow` (and sets that as the current working directory). Right now, `dstack` doesn't allow you to
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dstack._internal.core.models.repos import AnyRunRepoData
from dstack._internal.core.models.resources import Memory, ResourcesSpec
from dstack._internal.core.models.unix import UnixUser
from dstack._internal.core.models.volumes import MountPoint
from dstack._internal.utils import common as common_utils
from dstack._internal.utils.common import format_pretty_duration

Expand Down Expand Up @@ -190,6 +191,7 @@ class JobSpec(CoreModel):
registry_auth: Optional[RegistryAuth]
requirements: Requirements
retry: Optional[Retry]
volumes: Optional[List[MountPoint]] = None
# For backward compatibility with 0.18.x when retry_policy was required.
# TODO: remove in 0.19
retry_policy: ProfileRetryPolicy = ProfileRetryPolicy(retry=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from dstack._internal.server.services import services
from dstack._internal.server.services.jobs import (
find_job,
get_job_attached_volumes,
get_job_runtime_data,
job_model_to_job_submission,
)
Expand All @@ -47,7 +48,6 @@
from dstack._internal.server.services.runner import client
from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel
from dstack._internal.server.services.runs import (
get_job_volumes,
run_model_to_run,
)
from dstack._internal.server.services.storage import get_default_storage
Expand Down Expand Up @@ -142,10 +142,11 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
job_provisioning_data=job_provisioning_data,
)

volumes = await get_job_volumes(
volumes = await get_job_attached_volumes(
session=session,
project=project,
run_spec=run.run_spec,
job_num=job.job_spec.job_num,
job_provisioning_data=job_provisioning_data,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@
fleet_model_to_fleet,
)
from dstack._internal.server.services.jobs import (
check_can_attach_job_volumes,
find_job,
get_instances_ids_with_detaching_volumes,
get_job_configured_volume_models,
get_job_configured_volumes,
get_job_runtime_data,
)
from dstack._internal.server.services.locking import get_locker
Expand All @@ -64,11 +67,7 @@
get_shared_pool_instances_with_offers,
)
from dstack._internal.server.services.runs import (
check_can_attach_run_volumes,
check_run_spec_requires_instance_mounts,
get_offer_volumes,
get_run_volume_models,
get_run_volumes,
run_model_to_run,
)
from dstack._internal.server.services.volumes import (
Expand Down Expand Up @@ -154,17 +153,21 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
await session.commit()
return
try:
volume_models = await get_run_volume_models(
volume_models = await get_job_configured_volume_models(
session=session,
project=project,
run_spec=run_spec,
job_num=job.job_spec.job_num,
job_spec=job.job_spec,
)
volumes = await get_run_volumes(
volumes = await get_job_configured_volumes(
session=session,
project=project,
run_spec=run_spec,
job_num=job.job_spec.job_num,
job_spec=job.job_spec,
)
check_can_attach_run_volumes(run_spec=run_spec, volumes=volumes)
check_can_attach_job_volumes(volumes)
except ServerClientError as e:
logger.warning("%s: failed to prepare run volumes: %s", fmt(job_model), repr(e))
job_model.status = JobStatus.TERMINATING
Expand Down Expand Up @@ -453,7 +456,7 @@ async def _run_job_on_new_instance(
offer.region,
offer.price,
)
offer_volumes = get_offer_volumes(volumes, offer)
offer_volumes = _get_offer_volumes(volumes, offer)
try:
job_provisioning_data = await common_utils.run_async(
backend.compute().run_job,
Expand Down Expand Up @@ -601,6 +604,36 @@ def _prepare_job_runtime_data(offer: InstanceOfferWithAvailability) -> JobRuntim
)


def _get_offer_volumes(
volumes: List[List[Volume]],
offer: InstanceOfferWithAvailability,
) -> List[Volume]:
"""
Returns volumes suitable for the offer for each mount point.
"""
offer_volumes = []
for mount_point_volumes in volumes:
offer_volumes.append(_get_offer_mount_point_volume(mount_point_volumes, offer))
return offer_volumes


def _get_offer_mount_point_volume(
volumes: List[Volume],
offer: InstanceOfferWithAvailability,
) -> Volume:
"""
Returns the first suitable volume for the offer among possible mount point volumes.
"""
for volume in volumes:
if (
volume.configuration.backend != offer.backend
or volume.configuration.region != offer.region
):
continue
return volume
raise ServerClientError("Failed to find an eligible volume for the mount point")


async def _attach_volumes(
session: AsyncSession,
project: ProjectModel,
Expand Down
154 changes: 152 additions & 2 deletions src/dstack/_internal/server/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@
import dstack._internal.server.services.backends as backends_services
from dstack._internal.core.backends.base import Backend
from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT, DSTACK_SHIM_HTTP_PORT
from dstack._internal.core.errors import BackendError, ResourceNotExistsError, SSHError
from dstack._internal.core.errors import (
BackendError,
ResourceNotExistsError,
ServerClientError,
SSHError,
)
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.common import is_core_model_instance
from dstack._internal.core.models.configurations import RunConfigurationType
from dstack._internal.core.models.instances import InstanceStatus, RemoteConnectionInfo
from dstack._internal.core.models.runs import (
Expand All @@ -25,6 +31,7 @@
JobTerminationReason,
RunSpec,
)
from dstack._internal.core.models.volumes import Volume, VolumeMountPoint, VolumeStatus
from dstack._internal.server.models import (
InstanceModel,
JobModel,
Expand All @@ -33,7 +40,11 @@
VolumeModel,
)
from dstack._internal.server.services import services
from dstack._internal.server.services.jobs.configurators.base import JobConfigurator
from dstack._internal.server.services import volumes as volumes_services
from dstack._internal.server.services.jobs.configurators.base import (
JobConfigurator,
interpolate_job_volumes,
)
from dstack._internal.server.services.jobs.configurators.dev import DevEnvironmentJobConfigurator
from dstack._internal.server.services.jobs.configurators.service import ServiceJobConfigurator
from dstack._internal.server.services.jobs.configurators.task import TaskJobConfigurator
Expand Down Expand Up @@ -535,3 +546,142 @@ async def get_instances_ids_with_detaching_volumes(session: AsyncSession) -> Lis
)
job_models = res.scalars().all()
return [jm.used_instance_id for jm in job_models if jm.used_instance_id]


async def get_job_configured_volumes(
session: AsyncSession,
project: ProjectModel,
run_spec: RunSpec,
job_num: int,
job_spec: Optional[JobSpec] = None,
) -> List[List[Volume]]:
"""
Returns a list of job volumes grouped by mount points.
"""
volume_models = await get_job_configured_volume_models(
session=session,
project=project,
run_spec=run_spec,
job_num=job_num,
job_spec=job_spec,
)
return [
[volumes_services.volume_model_to_volume(v) for v in mount_point_volume_models]
for mount_point_volume_models in volume_models
]


async def get_job_configured_volume_models(
session: AsyncSession,
project: ProjectModel,
run_spec: RunSpec,
job_num: int,
job_spec: Optional[JobSpec] = None,
) -> List[List[VolumeModel]]:
"""
Returns a list of job volume models grouped by mount points.
"""
job_volumes = None
if job_spec is not None:
job_volumes = job_spec.volumes
if job_volumes is None:
# job_spec not provided or a legacy job_spec without volumes
job_volumes = interpolate_job_volumes(run_spec.configuration.volumes, job_num)
volume_models = []
for mount_point in job_volumes:
if not is_core_model_instance(mount_point, VolumeMountPoint):
continue
if isinstance(mount_point.name, str):
names = [mount_point.name]
else:
names = mount_point.name
mount_point_volume_models = []
for name in names:
volume_model = await volumes_services.get_project_volume_model_by_name(
session=session,
project=project,
name=name,
)
if volume_model is None:
raise ResourceNotExistsError(f"Volume {mount_point.name} not found")
mount_point_volume_models.append(volume_model)
volume_models.append(mount_point_volume_models)
return volume_models


def check_can_attach_job_volumes(volumes: List[List[Volume]]):
"""
Performs basic checks if volumes can be attached.
This is useful to show error ASAP (when user submits the run).
If the attachment is to fail anyway, the error will be handled when proccessing submitted jobs.
"""
if len(volumes) == 0:
return
expected_backends = {v.configuration.backend for v in volumes[0]}
expected_regions = {v.configuration.region for v in volumes[0]}
for mount_point_volumes in volumes:
backends = {v.configuration.backend for v in mount_point_volumes}
regions = {v.configuration.region for v in mount_point_volumes}
if backends != expected_backends:
raise ServerClientError(
"Volumes from different backends specified for different mount points"
)
if regions != expected_regions:
raise ServerClientError(
"Volumes from different regions specified for different mount points"
)
for volume in mount_point_volumes:
if volume.status != VolumeStatus.ACTIVE:
raise ServerClientError(f"Cannot mount volumes that are not active: {volume.name}")
volumes_names = [v.name for vs in volumes for v in vs]
if len(volumes_names) != len(set(volumes_names)):
raise ServerClientError("Cannot attach the same volume at different mount points")


async def get_job_attached_volumes(
session: AsyncSession,
project: ProjectModel,
run_spec: RunSpec,
job_num: int,
job_provisioning_data: JobProvisioningData,
) -> List[Volume]:
"""
Returns volumes attached to the job.
"""
job_configured_volumes = await get_job_configured_volumes(
session=session,
project=project,
run_spec=run_spec,
job_num=job_num,
)
job_volumes = []
for mount_point_volumes in job_configured_volumes:
job_volumes.append(
_get_job_mount_point_attached_volume(mount_point_volumes, job_provisioning_data)
)
return job_volumes


def _get_job_mount_point_attached_volume(
volumes: List[Volume],
job_provisioning_data: JobProvisioningData,
) -> Volume:
"""
Returns the volume attached to the job among the list of possible mount point volumes.
"""
for volume in volumes:
if (
volume.configuration.backend != job_provisioning_data.get_base_backend()
or volume.configuration.region != job_provisioning_data.region
):
continue
if (
volume.provisioning_data is not None
and volume.provisioning_data.availability_zone is not None
and job_provisioning_data.availability_zone is not None
and volume.provisioning_data.availability_zone
!= job_provisioning_data.availability_zone
):
continue
return volume
raise ServerClientError("Failed to find an eligible volume for the mount point")
Loading

0 comments on commit 7108e44

Please sign in to comment.