Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): prevent volume deletion if snapshot exists #350

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
86 changes: 80 additions & 6 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,12 +788,12 @@ func (cs *controller) DeleteSnapshot(
req *csi.DeleteSnapshotRequest,
) (*csi.DeleteSnapshotResponse, error) {

if req.SnapshotId == "" {
return nil, status.Errorf(codes.InvalidArgument, "DeleteSnapshot: empty snapshotID")
}

klog.Infof("DeleteSnapshot request for %s", req.SnapshotId)

if err := cs.validateDeleteSnapshotReq(req); err != nil {
return nil, err
}

// snapshodID is formed as <volname>@<snapname>
// parsing them here
snapshotID := strings.Split(req.SnapshotId, "@")
Expand Down Expand Up @@ -954,27 +954,101 @@ func (cs *controller) ListVolumes(
}

func (cs *controller) validateDeleteVolumeReq(req *csi.DeleteVolumeRequest) error {
volumeID := req.GetVolumeId()
volumeID := strings.ToLower(req.GetVolumeId())
if volumeID == "" {
return status.Error(
codes.InvalidArgument,
"failed to handle delete volume request: missing volume id",
)
}

err := cs.validateRequest(
// volume should not be deleted if there are snapshots present for the volume
snapList, err := zfs.GetSnapshotsForVolume(volumeID)
if err != nil {
return status.Errorf(
codes.NotFound,
"failed to handle delete volume request for {%s}, "+
"validation failed checking for snapshots. Error: %s",
req.GetVolumeId(),
err.Error(),
)
}

// delete is not supported if there are any snapshots present for the volume
if len(snapList.Items) != 0 {
return status.Errorf(
codes.Internal,
"failed to handle delete volume request for {%s} with %d snapshots",
req.GetVolumeId(),
len(snapList.Items),
)
}

// volume should not be deleted if there are clones present for the volume
cloneList, err := zfs.GetClonesForVolume(volumeID)
if err != nil {
return status.Errorf(
codes.NotFound,
"failed to handle delete volume request for {%s}, "+
"validation failed checking for clones. Error: %s",
req.GetVolumeId(),
err.Error(),
)
}

// delete is not supported if there are any clones present for the volume
if len(cloneList.Items) != 0 {
return status.Errorf(
codes.Internal,
"failed to handle delete volume request for {%s} with %d clones",
req.GetVolumeId(),
len(cloneList.Items),
)
}

err = cs.validateRequest(
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
)
if err != nil {
return errors.Wrapf(
err,

"failed to handle delete volume request for {%s} : validation failed",
volumeID,
)
}
return nil
}

func (cs *controller) validateDeleteSnapshotReq(req *csi.DeleteSnapshotRequest) error {
if req.GetSnapshotId() == "" {
return status.Errorf(codes.InvalidArgument, "DeleteSnapshot: empty snapshotID")
}

cloneList, err := zfs.GetClonesForSnapshot(req.SnapshotId)
if err != nil {
return status.Errorf(
codes.NotFound,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for existing clones. Error: %s",
req.GetSnapshotId(),
err.Error(),
)
}

// snapshot delete is not supported if clones exist for this snapshot
if len(cloneList.Items) != 0 {
return status.Errorf(
codes.Internal,
"failed to handle delete volume request for {%s} with %d clones",
req.GetSnapshotId(),
len(cloneList.Items),
)
}

return nil
}

// IsSupportedVolumeCapabilityAccessMode valides the requested access mode
func IsSupportedVolumeCapabilityAccessMode(
accessMode csi.VolumeCapability_AccessMode_Mode,
Expand Down
36 changes: 36 additions & 0 deletions pkg/zfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,42 @@ func GetZFSSnapshot(snapID string) (*apis.ZFSSnapshot, error) {
return snap, err
}

// GetSnapshotsForVolume fetches all the snapshots for the given volume
func GetSnapshotsForVolume(volumeID string) (*apis.ZFSSnapshotList, error) {
listOptions := metav1.ListOptions{
LabelSelector: ZFSVolKey + "=" + volumeID,
}
snapList, err := snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions)
return snapList, err
}

// GetClonesForVolume lists all the clone volumes for the given volume
func GetClonesForVolume(volumeName string) (*apis.ZFSVolumeList, error) {
listOptions := metav1.ListOptions{
LabelSelector: ZFSSrcVolKey + "=" + volumeName,
}
cloneList, err := volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions)
return cloneList, err
}

// GetClonesForSnapshot lists all the clone volumes for the given snapshot
func GetClonesForSnapshot(snapID string) (*apis.ZFSVolumeList, error) {
zfsVolList, err := volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).List(metav1.ListOptions{})
if err != nil {
return nil, err
}

listBuilder := volbuilder.ListBuilderFrom(*zfsVolList)
listBuilder.WithFilter(func(vol *volbuilder.ZFSVolume) bool {
if vol.Object.Spec.SnapName == snapID {
akhilerm marked this conversation as resolved.
Show resolved Hide resolved
return true
}
return false
})

return listBuilder.List(), nil
}

// GetZFSSnapshotStatus returns ZFSSnapshot status
func GetZFSSnapshotStatus(snapID string) (string, error) {
getOptions := metav1.GetOptions{}
Expand Down