Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions internal/controller/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ type ClusterReconciler struct {

func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := ctrl.LoggerFrom(ctx)
logger.V(logs.LogInfo).Info("Reconciling Cluster")
logger.V(logs.LogDebug).Info("Reconciling Cluster")

// Fecth the Cluster instance
cluster := &clusterv1.Cluster{}
addTypeInformationToObject(r.Scheme, cluster)
return reconcile.Result{}, processCluster(ctx, r.Config, r.Client, r.AgentInMgmtCluster,
cluster, req, logger)
}
Expand Down
3 changes: 1 addition & 2 deletions internal/controller/sveltoscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ type SveltosClusterReconciler struct {

func (r *SveltosClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := ctrl.LoggerFrom(ctx)
logger.V(logs.LogInfo).Info("Reconciling SveltosCluster")
logger.V(logs.LogDebug).Info("Reconciling SveltosCluster")

// Fecth the SveltosCluster instance
sveltosCluster := &libsveltosv1beta1.SveltosCluster{}
addTypeInformationToObject(r.Scheme, sveltosCluster)
return reconcile.Result{}, processCluster(ctx, r.Config, r.Client, r.AgentInMgmtCluster,
sveltosCluster, req, logger)
}
Expand Down
28 changes: 22 additions & 6 deletions internal/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,18 @@ func InitScheme() (*runtime.Scheme, error) {
func processCluster(ctx context.Context, config *rest.Config, c client.Client,
agentInMgmtCluster bool, cluster client.Object, req ctrl.Request, logger logr.Logger) error {

clusterRef := getObjectReferenceFromObject(c.Scheme(), cluster)

if err := c.Get(ctx, req.NamespacedName, cluster); err != nil {
if apierrors.IsNotFound(err) {
return stopTrackingCluster(ctx, config, clusterRef, logger)
// cluster is empty here; derive the GVK from the scheme so the ref is complete.
addTypeInformationToObject(c.Scheme(), cluster)
apiVersion, kind := cluster.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind()
notFoundRef := &corev1.ObjectReference{
Namespace: req.Namespace,
Name: req.Name,
Kind: kind,
APIVersion: apiVersion,
}
return stopTrackingCluster(ctx, config, notFoundRef, logger)
}
logger.Error(err, "Failed to fetch cluster")
return errors.Wrapf(
Expand All @@ -115,6 +122,9 @@ func processCluster(ctx context.Context, config *rest.Config, c client.Client,
)
}

// clusterRef is computed after Get so namespace/name are populated.
clusterRef := getObjectReferenceFromObject(c.Scheme(), cluster)

// Handle deleted cluster
if !cluster.GetDeletionTimestamp().IsZero() {
return stopTrackingCluster(ctx, config, clusterRef, logger)
Expand Down Expand Up @@ -145,13 +155,13 @@ func trackCluster(ctx context.Context, config *rest.Config, c client.Client, age
mux.Lock()
defer mux.Unlock()

oldShard, ok := clusterMap[*cluster]
if ok && oldShard == currentShardKey {
oldShard, alreadyTracked := clusterMap[*cluster]
if alreadyTracked && oldShard == currentShardKey {
// Cluster is already tracked. And cluster shard has not changed.
return nil
}

if ok {
if alreadyTracked {
if shardMap[oldShard].Has(cluster) &&
shardMap[oldShard].Len() == 1 {
// By removing cluster, no more clusters will match oldShard.
Expand All @@ -167,6 +177,8 @@ func trackCluster(ctx context.Context, config *rest.Config, c client.Client, age
clusterPerShard := shardMap[oldShard]
clusterPerShard.Erase(cluster)
shardMap[oldShard] = clusterPerShard
logger.V(logs.LogInfo).Info(fmt.Sprintf("removed cluster from shard %q: %d cluster(s) remaining",
oldShard, clusterPerShard.Len()))
}

// Update Cluster shard (key: cluster; value: cluster current shard)
Expand All @@ -189,6 +201,8 @@ func trackCluster(ctx context.Context, config *rest.Config, c client.Client, age
}
clusterPerShard.Insert(cluster)
shardMap[currentShardKey] = clusterPerShard
logger.V(logs.LogInfo).Info(fmt.Sprintf("added cluster to shard %q: %d cluster(s) total",
currentShardKey, clusterPerShard.Len()))

return nil
}
Expand Down Expand Up @@ -217,6 +231,8 @@ func stopTrackingCluster(ctx context.Context, config *rest.Config, cluster *core
fmt.Sprintf("no more clusters matching shard %s. Removing controllers", oldShardKey))
return undeployControllers(ctx, config, oldShardKey, logger)
}
logger.V(logs.LogInfo).Info(fmt.Sprintf("removed cluster from shard %q: %d cluster(s) remaining",
oldShardKey, shardMap[oldShardKey].Len()))
}
}

Expand Down