Skip to content

Commit

Permalink
Add volume attachments info to the API and CLI (#2298)
Browse files Browse the repository at this point in the history
* Add Volume.attachments

* Move attachment_data to volumes_attachments

* Show attachments in dstack volume list

* Rebase migrations
  • Loading branch information
r4victor authored Feb 14, 2025
1 parent 05a37a9 commit c4ddee1
Show file tree
Hide file tree
Showing 19 changed files with 266 additions and 72 deletions.
9 changes: 9 additions & 0 deletions src/dstack/_internal/cli/utils/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def get_volumes_table(
if verbose:
table.add_column("REGION")
table.add_column("STATUS")
if verbose:
table.add_column("ATTACHED")
table.add_column("CREATED")
if verbose:
table.add_column("ERROR")
Expand All @@ -37,11 +39,18 @@ def get_volumes_table(
and volume.provisioning_data.availability_zone is not None
):
region += f" ({volume.provisioning_data.availability_zone})"
attached = "-"
if volume.attachments is not None:
attached = ", ".join(
{va.instance.fleet_name for va in volume.attachments if va.instance.fleet_name}
)
attached = attached or "-"
row = {
"NAME": volume.name,
"BACKEND": backend,
"REGION": region,
"STATUS": volume.status,
"ATTACHED": attached,
"CREATED": format_date(volume.created_at),
"ERROR": volume.status_message,
}
Expand Down
3 changes: 2 additions & 1 deletion src/dstack/_internal/core/backends/aws/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,12 @@ def detach_volume(self, volume: Volume, instance_id: str, force: bool = False):
ec2_client = self.session.client("ec2", region_name=volume.configuration.region)

logger.debug("Detaching EBS volume %s from instance %s", volume.volume_id, instance_id)
attachment_data = get_or_error(volume.get_attachment_data_for_instance(instance_id))
try:
ec2_client.detach_volume(
VolumeId=volume.volume_id,
InstanceId=instance_id,
Device=get_or_error(volume.attachment_data).device_name,
Device=attachment_data.device_name,
Force=force,
)
except botocore.exceptions.ClientError as e:
Expand Down
3 changes: 2 additions & 1 deletion src/dstack/_internal/core/backends/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ def detach_volume(self, volume: Volume, instance_id: str, force: bool = False):
instance_id,
)
zone = get_or_error(volume.provisioning_data).availability_zone
attachment_data = get_or_error(volume.get_attachment_data_for_instance(instance_id))
# This method has no information if the instance is a TPU or a VM,
# so we first try to see if there is a TPU with such name
try:
Expand Down Expand Up @@ -694,7 +695,7 @@ def detach_volume(self, volume: Volume, instance_id: str, force: bool = False):
project=self.config.project_id,
zone=get_or_error(volume.provisioning_data).availability_zone,
instance=instance_id,
device_name=get_or_error(volume.attachment_data).device_name,
device_name=attachment_data.device_name,
)
gcp_resources.wait_for_extended_operation(operation, "persistent disk detachment")
logger.debug(
Expand Down
23 changes: 23 additions & 0 deletions src/dstack/_internal/core/models/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ class VolumeAttachmentData(CoreModel):
device_name: Optional[str] = None


class VolumeInstance(CoreModel):
name: str
fleet_name: Optional[str] = None
instance_num: int
instance_id: Optional[str] = None


class VolumeAttachment(CoreModel):
instance: VolumeInstance
attachment_data: Optional[VolumeAttachmentData] = None


class Volume(CoreModel):
id: uuid.UUID
name: str
Expand All @@ -86,8 +98,19 @@ class Volume(CoreModel):
deleted: bool
volume_id: Optional[str] = None # id of the volume in the cloud
provisioning_data: Optional[VolumeProvisioningData] = None
attachments: Optional[List[VolumeAttachment]] = None
# attachment_data is deprecated in favor of attachments.
# It's only set for volumes that were attached before attachments.
attachment_data: Optional[VolumeAttachmentData] = None

def get_attachment_data_for_instance(self, instance_id: str) -> Optional[VolumeAttachmentData]:
if self.attachments is not None:
for attachment in self.attachments:
if attachment.instance.instance_id == instance_id:
return attachment.attachment_data
# volume was attached before attachments were introduced
return self.attachment_data


class VolumePlan(CoreModel):
project_name: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
None,
run,
job_model,
job_provisioning_data,
volumes,
secrets,
job.job_spec.registry_auth,
Expand Down Expand Up @@ -376,6 +377,7 @@ def _process_provisioning_with_shim(
ports: Dict[int, int],
run: Run,
job_model: JobModel,
job_provisioning_data: JobProvisioningData,
volumes: List[Volume],
secrets: Dict[str, str],
registry_auth: Optional[RegistryAuth],
Expand Down Expand Up @@ -459,6 +461,7 @@ def _process_provisioning_with_shim(
host_ssh_user=ssh_user,
host_ssh_keys=[ssh_key] if ssh_key else [],
container_ssh_keys=public_keys,
instance_id=job_provisioning_data.instance_id,
)
else:
submitted = shim_client.submit(
Expand All @@ -475,6 +478,7 @@ def _process_provisioning_with_shim(
mounts=volume_mounts,
volumes=volumes,
instance_mounts=instance_mounts,
instance_id=job_provisioning_data.instance_id,
)
if not submitted:
# This can happen when we lost connection to the runner (e.g., network issues), marked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
PoolModel,
ProjectModel,
RunModel,
VolumeAttachmentModel,
VolumeModel,
)
from dstack._internal.server.services.backends import get_project_backend_by_type_or_error
Expand Down Expand Up @@ -236,7 +237,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
res = await session.execute(
select(InstanceModel)
.where(InstanceModel.id == job_model.instance.id)
.options(selectinload(InstanceModel.volumes))
.options(selectinload(InstanceModel.volume_attachments))
.execution_options(populate_existing=True)
)
instance = res.unique().scalar_one()
Expand Down Expand Up @@ -390,11 +391,11 @@ async def _assign_job_to_pool_instance(

instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0)
instance, offer = instances_with_offers[0]
# Reload InstanceModel with volumes
# Reload InstanceModel with volume attachments
res = await session.execute(
select(InstanceModel)
.where(InstanceModel.id == instance.id)
.options(joinedload(InstanceModel.volumes))
.options(joinedload(InstanceModel.volume_attachments))
)
instance = res.unique().scalar_one()
instance.status = InstanceStatus.BUSY
Expand Down Expand Up @@ -580,7 +581,7 @@ def _create_instance_model_for_job(
backend=offer.backend,
price=offer.price,
region=offer.region,
volumes=[],
volume_attachments=[],
total_blocks=1,
busy_blocks=1,
)
Expand Down Expand Up @@ -696,14 +697,18 @@ async def _attach_volume(
instance: InstanceModel,
instance_id: str,
):
volume = volume_model_to_volume(volume_model)
# Refresh only to check if the volume wasn't deleted before the lock
await session.refresh(volume_model)
if volume_model.deleted:
raise ServerClientError("Cannot attach a deleted volume")
volume = volume_model_to_volume(volume_model)
attachment_data = await common_utils.run_async(
backend.compute().attach_volume,
volume=volume,
instance_id=instance_id,
)
volume_model.volume_attachment_data = attachment_data.json()
instance.volumes.append(volume_model)
volume_attachment_model = VolumeAttachmentModel(
volume=volume_model,
attachment_data=attachment_data.json(),
)
instance.volume_attachments.append(volume_attachment_model)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@

from dstack._internal.core.models.runs import JobStatus
from dstack._internal.server.db import get_session_ctx
from dstack._internal.server.models import InstanceModel, JobModel, ProjectModel, VolumeModel
from dstack._internal.server.models import (
InstanceModel,
JobModel,
ProjectModel,
VolumeAttachmentModel,
VolumeModel,
)
from dstack._internal.server.services.jobs import (
process_terminating_job,
process_volumes_detaching,
Expand Down Expand Up @@ -80,7 +86,12 @@ async def _process_job(session: AsyncSession, job_model: JobModel):
.where(InstanceModel.id == job_model.used_instance_id)
.options(
joinedload(InstanceModel.project).joinedload(ProjectModel.backends),
joinedload(InstanceModel.volumes).joinedload(VolumeModel.user),
joinedload(InstanceModel.volume_attachments)
.joinedload(VolumeAttachmentModel.volume)
.joinedload(VolumeModel.user),
joinedload(InstanceModel.volume_attachments)
.joinedload(VolumeAttachmentModel.volume)
.joinedload(VolumeModel.attachments),
)
)
instance_model = res.unique().scalar()
Expand Down
12 changes: 11 additions & 1 deletion src/dstack/_internal/server/background/tasks/process_volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from dstack._internal.core.errors import BackendError, BackendNotAvailable
from dstack._internal.core.models.volumes import VolumeStatus
from dstack._internal.server.db import get_session_ctx
from dstack._internal.server.models import ProjectModel, VolumeModel
from dstack._internal.server.models import (
InstanceModel,
ProjectModel,
VolumeAttachmentModel,
VolumeModel,
)
from dstack._internal.server.services import backends as backends_services
from dstack._internal.server.services import volumes as volumes_services
from dstack._internal.server.services.locking import get_locker
Expand Down Expand Up @@ -49,6 +54,11 @@ async def _process_submitted_volume(session: AsyncSession, volume_model: VolumeM
.where(VolumeModel.id == volume_model.id)
.options(joinedload(VolumeModel.project).joinedload(ProjectModel.backends))
.options(joinedload(VolumeModel.user))
.options(
joinedload(VolumeModel.attachments)
.joinedload(VolumeAttachmentModel.instance)
.joinedload(InstanceModel.fleet)
)
.execution_options(populate_existing=True)
)
volume_model = res.unique().scalar_one()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Move attachment_data to volumes_attachments
Revision ID: a751ef183f27
Revises: 1e76fb0dde87
Create Date: 2025-02-12 13:19:57.569591
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "a751ef183f27"
down_revision = "1e76fb0dde87"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("volumes_attachments", schema=None) as batch_op:
batch_op.alter_column("instace_id", new_column_name="instance_id")
batch_op.add_column(sa.Column("attachment_data", sa.Text(), nullable=True))

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("volumes_attachments", schema=None) as batch_op:
batch_op.drop_column("attachment_data")
batch_op.alter_column("instance_id", new_column_name="instace_id")

# ### end Alembic commands ###
36 changes: 17 additions & 19 deletions src/dstack/_internal/server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from sqlalchemy import (
BigInteger,
Boolean,
Column,
DateTime,
Enum,
Float,
Expand All @@ -15,7 +14,6 @@
LargeBinary,
MetaData,
String,
Table,
Text,
TypeDecorator,
UniqueConstraint,
Expand Down Expand Up @@ -554,10 +552,12 @@ class InstanceModel(BaseModel):
jobs: Mapped[list["JobModel"]] = relationship(back_populates="instance", lazy="joined")
last_job_processed_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)

# volumes attached to the instance
volumes: Mapped[List["VolumeModel"]] = relationship(
secondary="volumes_attachments",
back_populates="instances",
volume_attachments: Mapped[List["VolumeAttachmentModel"]] = relationship(
back_populates="instance",
# Add delete-orphan option so that removing entries from volume_attachments
# automatically marks them for deletion.
# SQLalchemy requires delete when using delete-orphan.
cascade="save-update, merge, delete-orphan, delete",
)


Expand Down Expand Up @@ -587,23 +587,21 @@ class VolumeModel(BaseModel):

configuration: Mapped[str] = mapped_column(Text)
volume_provisioning_data: Mapped[Optional[str]] = mapped_column(Text)
# FIXME: volume_attachment_data should be in "volumes_attachments"
# to support multi-attach volumes

attachments: Mapped[List["VolumeAttachmentModel"]] = relationship(back_populates="volume")

# Deprecated in favor of VolumeAttachmentModel.attachment_data
volume_attachment_data: Mapped[Optional[str]] = mapped_column(Text)

# instances the volume is attached to
instances: Mapped[List["InstanceModel"]] = relationship(
secondary="volumes_attachments",
back_populates="volumes",
)

class VolumeAttachmentModel(BaseModel):
__tablename__ = "volumes_attachments"

volumes_attachments_table = Table(
"volumes_attachments",
BackendModel.metadata,
Column("volume_id", ForeignKey("volumes.id"), primary_key=True),
Column("instace_id", ForeignKey("instances.id"), primary_key=True),
)
volume_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("volumes.id"), primary_key=True)
volume: Mapped[VolumeModel] = relationship(back_populates="attachments")
instance_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("instances.id"), primary_key=True)
instance: Mapped[InstanceModel] = relationship(back_populates="volume_attachments")
attachment_data: Mapped[Optional[str]] = mapped_column(Text)


class PlacementGroupModel(BaseModel):
Expand Down
8 changes: 4 additions & 4 deletions src/dstack/_internal/server/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ async def process_terminating_job(
session=session, project=instance_model.project, names=jrd.volume_names
)
else:
volume_models = instance_model.volumes
volume_models = [va.volume for va in instance_model.volume_attachments]
if len(volume_models) > 0:
logger.info("Detaching volumes: %s", [v.name for v in volume_models])
all_volumes_detached = await _detach_volumes_from_job_instance(
Expand Down Expand Up @@ -306,7 +306,7 @@ async def process_volumes_detaching(
session=session, project=instance_model.project, names=jrd.volume_names
)
else:
volume_models = instance_model.volumes
volume_models = [va.volume for va in instance_model.volume_attachments]
logger.info("Detaching volumes: %s", [v.name for v in volume_models])
all_volumes_detached = await _detach_volumes_from_job_instance(
project=instance_model.project,
Expand Down Expand Up @@ -439,8 +439,8 @@ async def _detach_volumes_from_job_instance(
if job_model.volumes_detached_at is None:
job_model.volumes_detached_at = common.get_current_datetime()
detached_volumes_ids = {v.id for v in detached_volumes}
instance_model.volumes = [
v for v in instance_model.volumes if v.id not in detached_volumes_ids
instance_model.volume_attachments = [
va for va in instance_model.volume_attachments if va.volume_id not in detached_volumes_ids
]
return all_detached

Expand Down
Loading

0 comments on commit c4ddee1

Please sign in to comment.