Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): prevent volume deletion if snapshot exists #350

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
1 change: 1 addition & 0 deletions changelogs/unreleased/350-akhilerm
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add validation to prevent volume deletion if there is a dependent snapshot
2 changes: 1 addition & 1 deletion ci/sanity.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ EOT
CSI_TEST_REPO="https://github.com/$test_repo/csi-test.git"
CSI_REPO_PATH="$(go env GOPATH)/src/github.com/$test_repo/csi-test"
if [ ! -d "$CSI_REPO_PATH" ] ; then
git clone -b "v4.0.1" "$CSI_TEST_REPO" "$CSI_REPO_PATH"
git clone -b "v4.2.0" "$CSI_TEST_REPO" "$CSI_REPO_PATH"
else
cd "$CSI_REPO_PATH"
git pull "$CSI_REPO_PATH"
Expand Down
46 changes: 39 additions & 7 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,20 +788,22 @@ func (cs *controller) DeleteSnapshot(
req *csi.DeleteSnapshotRequest,
) (*csi.DeleteSnapshotResponse, error) {

if req.SnapshotId == "" {
return nil, status.Errorf(codes.InvalidArgument, "DeleteSnapshot: empty snapshotID")
}

klog.Infof("DeleteSnapshot request for %s", req.SnapshotId)

if err := cs.validateDeleteSnapshotReq(req); err != nil {
return nil, err
}

// snapshodID is formed as <volname>@<snapname>
// parsing them here
snapshotID := strings.Split(req.SnapshotId, "@")
if len(snapshotID) != 2 {
// should succeed when an invalid snapshot id is used
return &csi.DeleteSnapshotResponse{}, nil
}
if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {

snapName := snapshotID[1]
if err := zfs.DeleteSnapshot(snapName); err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to handle DeleteSnapshot for %s, {%s}",
Expand Down Expand Up @@ -954,15 +956,37 @@ func (cs *controller) ListVolumes(
}

func (cs *controller) validateDeleteVolumeReq(req *csi.DeleteVolumeRequest) error {
volumeID := req.GetVolumeId()
volumeID := strings.ToLower(req.GetVolumeId())
if volumeID == "" {
return status.Error(
codes.InvalidArgument,
"failed to handle delete volume request: missing volume id",
)
}

err := cs.validateRequest(
// volume should not be deleted if there are snapshots present for the volume
snapList, err := zfs.GetSnapshotsForVolume(volumeID)
if err != nil {
return status.Errorf(
codes.NotFound,
"failed to handle delete volume request for {%s}, "+
"validation failed checking for snapshots. Error: %s",
req.GetVolumeId(),
err.Error(),
)
}

// delete is not supported if there are any snapshots present for the volume
if len(snapList.Items) != 0 {
return status.Errorf(
codes.Internal,
"failed to handle delete volume request for {%s} with %d snapshots",
req.GetVolumeId(),
len(snapList.Items),
)
}

err = cs.validateRequest(
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
)
if err != nil {
Expand All @@ -975,6 +999,14 @@ func (cs *controller) validateDeleteVolumeReq(req *csi.DeleteVolumeRequest) erro
return nil
}

func (cs *controller) validateDeleteSnapshotReq(req *csi.DeleteSnapshotRequest) error {
if req.GetSnapshotId() == "" {
return status.Errorf(codes.InvalidArgument, "DeleteSnapshot: empty snapshotID")
}

return nil
}

// IsSupportedVolumeCapabilityAccessMode valides the requested access mode
func IsSupportedVolumeCapabilityAccessMode(
accessMode csi.VolumeCapability_AccessMode_Mode,
Expand Down
9 changes: 9 additions & 0 deletions pkg/zfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,15 @@ func GetZFSSnapshot(snapID string) (*apis.ZFSSnapshot, error) {
return snap, err
}

// GetSnapshotsForVolume fetches all the snapshots for the given volume
func GetSnapshotsForVolume(volumeID string) (*apis.ZFSSnapshotList, error) {
listOptions := metav1.ListOptions{
LabelSelector: ZFSVolKey + "=" + volumeID,
}
snapList, err := snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions)
return snapList, err
}

// GetZFSSnapshotStatus returns ZFSSnapshot status
func GetZFSSnapshotStatus(snapID string) (string, error) {
getOptions := metav1.GetOptions{}
Expand Down