Skip to content

Commit

Permalink
feat: Add StorageID field to secrets and use it for replication ID
Browse files Browse the repository at this point in the history
Adds a new StorageID field to blue/green secrets and updates the replication ID
generation to use storage IDs instead of FSIDs. Key changes:

- Add storage_id field to all secret types (blue/green/hub)
- Update secret handlers to properly propagate storage IDs between secrets
- Add helper functions to get storage IDs from different secret types
- Update replication ID generation to use storage IDs instead of FSIDs
- Add tests for storage ID handling and verification
- Update labelCephClusters to use storage IDs for replication ID

Signed-off-by: vbadrina <[email protected]>
  • Loading branch information
vbnrh committed Feb 13, 2025
1 parent 6a041c3 commit 774d763
Show file tree
Hide file tree
Showing 15 changed files with 378 additions and 153 deletions.
136 changes: 75 additions & 61 deletions addons/agent_mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,31 +147,27 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

if !hasStorageClientRef {
clusterFSIDs := make(map[string]string)
logger.Info("Fetching clusterFSIDs")
err = r.fetchClusterFSIDs(ctx, &mirrorPeer, clusterFSIDs)
logger.Info("Fetching StorageIds")
clusterStorageIds, err := r.fetchClusterStorageIds(ctx, &mirrorPeer, types.NamespacedName{Namespace: scr.Namespace, Name: scr.Name})
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: 60 * time.Second}, nil
}
return ctrl.Result{}, fmt.Errorf("an unknown error occurred while fetching the cluster fsids, retrying again: %v", err)
return ctrl.Result{}, fmt.Errorf("failed to fetch cluster storage IDs: %v", err)
}

logger.Info("Labeling the default StorageClasses")
storageIdsMap, err := utils.GetStorageIdsForDefaultStorageClasses(ctx, r.SpokeClient, types.NamespacedName{Name: scr.Name, Namespace: scr.Namespace}, r.SpokeClusterName)
logger.Info("StorageIDs found", "storageIds", storageIdsMap)
if err != nil {
return ctrl.Result{}, fmt.Errorf("an unknown error has occurred while generating StorageIDs: %v", err)
storageIds := make(map[utils.CephType]string)
for k, v := range clusterStorageIds[r.SpokeClusterName] {
storageIds[utils.CephType(k)] = v
}
err = labelDefaultStorageClasses(ctx, logger, r.SpokeClient, scr.Name, scr.Namespace, storageIdsMap)

err = labelDefaultStorageClasses(ctx, logger, r.SpokeClient, scr.Name, scr.Namespace, storageIds)
if err != nil {
return ctrl.Result{}, fmt.Errorf("an unknown error has occurred while labelling default StorageClasses: %v", err)
}

logger.Info("Labeled the default StorageClasses successfully")
logger.Info("Labeling the default VolumeSnapshotClasses")

err = labelDefaultVolumeSnapshotClasses(ctx, logger, r.SpokeClient, scr.Name, scr.Namespace, storageIdsMap)
err = labelDefaultVolumeSnapshotClasses(ctx, logger, r.SpokeClient, scr.Name, scr.Namespace, storageIds)
if err != nil {
return ctrl.Result{}, fmt.Errorf("an unknown error has occurred while labelling default VolumeSnapshotClasses: %v", err)
}
Expand All @@ -180,7 +176,9 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)

if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async {
logger.Info("Enabling async mode dependencies")
err = r.labelCephClusters(ctx, scr, clusterFSIDs)
storageId1 := clusterStorageIds[mirrorPeer.Spec.Items[0].ClusterName]["rbd"]
storageId2 := clusterStorageIds[mirrorPeer.Spec.Items[1].ClusterName]["rbd"]
err = r.labelCephClusters(ctx, scr, storageId1, storageId2)
if err != nil {
logger.Error("Failed to label cephcluster", "error", err)
return ctrl.Result{}, err
Expand Down Expand Up @@ -298,64 +296,80 @@ func labelDefaultVolumeSnapshotClasses(ctx context.Context, logger *slog.Logger,
return nil
}

func (r *MirrorPeerReconciler) fetchClusterFSIDs(ctx context.Context, mp *multiclusterv1alpha1.MirrorPeer, clusterFSIDs map[string]string) error {
func (r *MirrorPeerReconciler) fetchClusterStorageIds(ctx context.Context, mp *multiclusterv1alpha1.MirrorPeer, storageClusterNamespacedName types.NamespacedName) (map[string]map[string]string, error) {
// Initialize map to store cluster name -> storage IDs mapping
clusterStorageIds := make(map[string]map[string]string)

for _, pr := range mp.Spec.Items {
clusterType, err := utils.GetClusterType(pr.StorageClusterRef.Name, pr.StorageClusterRef.Namespace, r.SpokeClient)
if err != nil {
r.Logger.Error("Failed to get cluster type", "clusterName", pr.StorageClusterRef.Name, "namespace", pr.StorageClusterRef.Namespace, "error", err)
return err
}
secretName := r.getSecretNameByType(clusterType, pr)
logger := r.Logger.With("clusterName", pr.ClusterName,
"namespace", pr.StorageClusterRef.Namespace)

r.Logger.Info("Checking secret", "secretName", secretName, "mode", clusterType, "clusterName", pr.StorageClusterRef.Name, "namespace", pr.StorageClusterRef.Namespace)
secret, err := utils.FetchSecretWithName(ctx, r.SpokeClient, types.NamespacedName{Name: secretName, Namespace: pr.StorageClusterRef.Namespace})
// Get cluster type (converged/external)
clusterType, err := utils.GetClusterType(pr.StorageClusterRef.Name,
pr.StorageClusterRef.Namespace, r.SpokeClient)
if err != nil {
r.Logger.Error("Failed to fetch secret", "secretName", secretName, "namespace", pr.StorageClusterRef.Namespace, "error", err)
return err
logger.Error("Failed to get cluster type", "error", err)
return nil, fmt.Errorf("failed to get cluster type for %s: %v", pr.ClusterName, err)
}

fsid, err := r.getFsidFromSecretByType(clusterType, secret)
if err != nil {
r.Logger.Error("Failed to extract FSID from secret", "secretName", secretName, "namespace", pr.StorageClusterRef.Namespace, "error", err)
return err
}
var currentPeerStorageIds = make(map[string]string)

clusterFSIDs[pr.ClusterName] = fsid
r.Logger.Info("FSID fetched for cluster", "clusterName", pr.ClusterName, "FSID", fsid)
}
return nil
}
// Handle local cluster vs remote cluster
if r.SpokeClusterName == pr.ClusterName {
// For local cluster, get storage IDs from storage classes
storageIds, err := utils.GetStorageIdsForDefaultStorageClasses(ctx,
r.SpokeClient,
storageClusterNamespacedName,
r.SpokeClusterName)
if err != nil {
logger.Error("Failed to get storage IDs from storage classes", "error", err)
return nil, fmt.Errorf("failed to get storage IDs for local cluster %s: %v",
pr.ClusterName, err)
}

func (r *MirrorPeerReconciler) getFsidFromSecretByType(clusterType utils.ClusterType, secret *corev1.Secret) (string, error) {
var fsid string
if clusterType == utils.CONVERGED {
token, err := utils.UnmarshalRookSecret(secret)
if err != nil {
r.Logger.Error("Failed to unmarshal converged mode peer secret", "peerSecret", secret.Name, "error", err)
return "", err
}
fsid = token.FSID
r.Logger.Info("FSID retrieved for converged mode", "FSID", fsid, "secret", secret.Name)
} else if clusterType == utils.EXTERNAL {
token, err := utils.UnmarshalRookSecretExternal(secret)
if err != nil {
r.Logger.Error("Failed to unmarshal external mode peer secret", "peerSecret", secret.Name, "error", err)
return "", err
for k, v := range storageIds {
currentPeerStorageIds[string(k)] = v
}
} else {
// For remote cluster, get storage IDs from secret
secretName := r.getSecretNameByType(clusterType, pr)
logger.Info("Checking secret",
"secretName", secretName,
"mode", clusterType)

secret, err := utils.FetchSecretWithName(ctx, r.SpokeClient,
types.NamespacedName{
Name: secretName,
Namespace: pr.StorageClusterRef.Namespace,
})
if err != nil {
logger.Error("Failed to fetch secret", "error", err)
return nil, fmt.Errorf("failed to fetch secret %s: %v", secretName, err)
}

currentPeerStorageIds, err = utils.GetStorageIdsFromGreenSecret(secret)
if err != nil {
logger.Error("Failed to extract storage IDs from secret", "error", err)
return nil, fmt.Errorf("failed to get storage IDs from secret %s: %v",
secretName, err)
}
}
fsid = token.FSID
r.Logger.Info("FSID retrieved for external mode", "FSID", fsid, "secret", secret.Name)

clusterStorageIds[pr.ClusterName] = currentPeerStorageIds
logger.Info("Storage IDs fetched for cluster",
"storageIDs", currentPeerStorageIds)
}
return fsid, nil

r.Logger.Info("Successfully fetched all cluster storage IDs",
"mirrorPeer", mp.Name,
"clusterCount", len(clusterStorageIds))
return clusterStorageIds, nil
}

func (r *MirrorPeerReconciler) getSecretNameByType(clusterType utils.ClusterType, pr multiclusterv1alpha1.PeerRef) string {
var secretName string
if clusterType == utils.CONVERGED {
if pr.ClusterName == r.SpokeClusterName {
secretName = fmt.Sprintf("cluster-peer-token-%s-cephcluster", pr.StorageClusterRef.Name)
} else {
secretName = utils.GetSecretNameByPeerRef(pr)
}
secretName = utils.GetSecretNameByPeerRef(pr)
} else if clusterType == utils.EXTERNAL {
secretName = DefaultExternalSecretName
}
Expand Down Expand Up @@ -631,7 +645,7 @@ func (r *MirrorPeerReconciler) deleteMirrorPeer(ctx context.Context, mirrorPeer
return ctrl.Result{}, nil
}

func (r *MirrorPeerReconciler) labelCephClusters(ctx context.Context, scr *multiclusterv1alpha1.StorageClusterRef, clusterFSIDs map[string]string) error {
func (r *MirrorPeerReconciler) labelCephClusters(ctx context.Context, scr *multiclusterv1alpha1.StorageClusterRef, storageId1, storageId2 string) error {
r.Logger.Info("Labelling CephClusters with replication ID")
cephClusters, err := utils.FetchAllCephClusters(ctx, r.SpokeClient)
if err != nil {
Expand Down Expand Up @@ -666,9 +680,9 @@ func (r *MirrorPeerReconciler) labelCephClusters(ctx context.Context, scr *multi
found.Labels = make(map[string]string)
}

replicationId, err := utils.CreateUniqueReplicationId(clusterFSIDs)
replicationId, err := utils.CreateUniqueReplicationId(storageId1, storageId2)
if err != nil {
r.Logger.Error("Failed to create a unique replication ID", "error", err)
r.Logger.Error("Failed to create unique replication ID", "error", err)
return err
}

Expand Down
5 changes: 3 additions & 2 deletions addons/agent_mirrorpeer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ storageSystemName: test-storagecluster-storagesystem
}

secretData = map[string][]byte{
"token": []byte("eyJmc2lkIjoiMzU2NjZlNGMtZTljMC00ZmE3LWE3MWEtMmIwNTJiZjUxOTFhIiwiY2xpZW50X2lkIjoicmJkLW1pcnJvci1wZWVyIiwia2V5IjoiQVFDZVkwNWlYUmtsTVJBQU95b3I3ZTZPL3MrcTlzRnZWcVpVaHc9PSIsIm1vbl9ob3N0IjoiMTcyLjMxLjE2NS4yMjg6Njc4OSwxNzIuMzEuMTkxLjE0MDo2Nzg5LDE3Mi4zMS44LjQ0OjY3ODkiLCJuYW1lc3BhY2UiOiJvcGVuc2hpZnQtc3RvcmFnZSJ9"),
"cluster": []byte(fmt.Sprintf("%s-cephcluster", storageClusterName)),
"token": []byte("eyJmc2lkIjoiMzU2NjZlNGMtZTljMC00ZmE3LWE3MWEtMmIwNTJiZjUxOTFhIiwiY2xpZW50X2lkIjoicmJkLW1pcnJvci1wZWVyIiwia2V5IjoiQVFDZVkwNWlYUmtsTVJBQU95b3I3ZTZPL3MrcTlzRnZWcVpVaHc9PSIsIm1vbl9ob3N0IjoiMTcyLjMxLjE2NS4yMjg6Njc4OSwxNzIuMzEuMTkxLjE0MDo2Nzg5LDE3Mi4zMS44LjQ0OjY3ODkiLCJuYW1lc3BhY2UiOiJvcGVuc2hpZnQtc3RvcmFnZSJ9"),
"cluster": []byte(fmt.Sprintf("%s-cephcluster", storageClusterName)),
"storage_id": []byte(`{"cephfs":"f9708852fe4cf1f4d5de7e525f1b0aba","rbd":"dcd70114947d0bb1f6b96f0dd6a9aaca"}`),
}
// Create secret cluster-peer-token
clusterPeerToken = corev1.Secret{
Expand Down
33 changes: 32 additions & 1 deletion addons/blue_secret_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package addons

import (
"context"
"encoding/json"
"reflect"
"testing"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"github.com/red-hat-storage/odf-multicluster-orchestrator/controllers/utils"
rookv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -81,9 +84,14 @@ func TestBlueSecretReconciler_Reconcile(t *testing.T) {
t.Error("failed to add rookv1 scheme")
}

err = storagev1.AddToScheme(scheme)
if err != nil {
t.Error("failed to add storagev1 scheme")
}

// Create fake clients
fakeHubClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects().Build()
fakeSpokeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(managedClusterSecret, storageCluster, cephCluster).Build()
fakeSpokeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(managedClusterSecret, storageCluster, GetTestCephBlockPool(), GetTestCephCluster(), GetTestCephFSStorageClass(), GetTestCephRBDStorageClass(), GetTestCephRBDVirtStorageClass()).Build()

logger := utils.GetLogger(utils.GetZapLogger(true))
reconciler := &BlueSecretReconciler{
Expand Down Expand Up @@ -131,7 +139,30 @@ func TestBlueSecretReconciler_Reconcile(t *testing.T) {
if reconciledSecret.Labels[utils.SecretLabelTypeKey] != string(utils.SourceLabel) {
t.Errorf("expected label %s to be %s", utils.SecretLabelTypeKey, string(utils.SourceLabel))
}
// Check StorageIDs
expectedStorageIDs := map[string]string{
"cephfs": "f9708852fe4cf1f4d5de7e525f1b0aba",
"rbd": "dcd70114947d0bb1f6b96f0dd6a9aaca",
}

storageIDsData, exists := reconciledSecret.Data[utils.SecretStorageIDKey]
if !exists {
t.Errorf("StorageIDs key %q not found in reconciled secret", utils.SecretStorageIDKey)
return
}

var reconciledStorageIDs map[string]string
if err := json.Unmarshal(storageIDsData, &reconciledStorageIDs); err != nil {
t.Errorf("Failed to unmarshal StorageIDs from secret: %v", err)
return
}

if !reflect.DeepEqual(expectedStorageIDs, reconciledStorageIDs) {
t.Errorf("StorageIDs mismatch - expected: %+v, got: %+v",
expectedStorageIDs,
reconciledStorageIDs)
}
})

}
}
38 changes: 30 additions & 8 deletions addons/green_secret_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package addons
import (
"context"
"encoding/json"
"reflect"
"testing"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
Expand All @@ -20,9 +21,10 @@ var (
greenSecretName = "d8433b8cb5b6d99c4d785ebd6082efd19cad50c"
greenSecretNamespace = "local-cluster"
greenSecretData = map[string][]byte{
"namespace": []byte("openshift-storage"),
"secret-origin": []byte("rook"),
"storage-cluster-name": []byte("ocs-storagecluster"),
"namespace": []byte("openshift-storage"),
"secret-origin": []byte("rook"),
"storage-cluster-name": []byte("ocs-storagecluster"),
utils.SecretStorageIDKey: []byte("{\"cephfs\":\"f9708852fe4cf1f4d5de7e525f1b0aba\",\"rbd\":\"dcd70114947d0bb1f6b96f0dd6a9aaca\"}"),
}

secretDataContent = map[string]string{
Expand Down Expand Up @@ -69,10 +71,11 @@ func TestGreenSecretReconciler_Reconcile(t *testing.T) {
},
},
Data: map[string][]byte{
"namespace": greenSecretData["namespace"],
"secret-data": encodedSecretData,
"secret-origin": greenSecretData["secret-origin"],
"storage-cluster-name": greenSecretData["storage-cluster-name"],
"namespace": greenSecretData["namespace"],
"secret-data": encodedSecretData,
"secret-origin": greenSecretData["secret-origin"],
"storage-cluster-name": greenSecretData["storage-cluster-name"],
utils.SecretStorageIDKey: greenSecretData[utils.SecretStorageIDKey],
},
Type: corev1.SecretTypeOpaque,
}
Expand Down Expand Up @@ -140,7 +143,26 @@ func TestGreenSecretReconciler_Reconcile(t *testing.T) {
t.Errorf("expected label %s to be %s", utils.CreatedByLabelKey, setup.TokenExchangeName)
}

// Verify storage cluster update
expectedStorageIDsData := greenSecretData[utils.SecretStorageIDKey]
reconciledStorageIDsData, exists := reconciledSecret.Data[utils.SecretStorageIDKey]
if !exists {
t.Errorf("StorageIDs key %q not found in reconciled secret", utils.SecretStorageIDKey)
}

var expectedStorageIDs, reconciledStorageIDs map[string]string
if err := json.Unmarshal(expectedStorageIDsData, &expectedStorageIDs); err != nil {
t.Errorf("Failed to unmarshal expected StorageIDs: %v", err)
}
if err := json.Unmarshal(reconciledStorageIDsData, &reconciledStorageIDs); err != nil {
t.Errorf("Failed to unmarshal reconciled StorageIDs: %v", err)
}

if !reflect.DeepEqual(expectedStorageIDs, reconciledStorageIDs) {
t.Errorf("StorageIDs mismatch - expected: %+v, got: %+v",
expectedStorageIDs, reconciledStorageIDs)
}

// Verify storage cluster update (existing code)
updatedStorageCluster := &ocsv1.StorageCluster{}
err = fakeSpokeClient.Get(ctx, types.NamespacedName{
Name: storageClusterToUpdate.Name,
Expand Down
Loading

0 comments on commit 774d763

Please sign in to comment.