Skip to content

Commit

Permalink
Merge pull request #1347 from rancher/alerting-fix-9.2rc2
Browse files Browse the repository at this point in the history
Fix HA replicas in Alerting Cluster
  • Loading branch information
alexandreLamarre authored Apr 26, 2023
2 parents 724f417 + d4d9590 commit 89f0fa4
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 29 deletions.
4 changes: 1 addition & 3 deletions packages/opni/opni/charts/templates/gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ metadata:
labels:
{{- include "opni.labels" . | nindent 4 }}
spec:
{{- if .Values.gateway.alerting.enabled }}
alerting:
enabled: {{ .Values.gateway.alerting.enabled }}
{{- end}}
enabled: {{ .Values.gateway.alerting.enabled }}
hostname: {{ .Values.gateway.hostname }}
serviceType: {{ .Values.gateway.serviceType }}
{{- if .Values.gateway.storageType }}
Expand Down
6 changes: 4 additions & 2 deletions pkg/resources/gateway/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/rancher/opni/pkg/alerting/shared"
"github.com/rancher/opni/pkg/util/nats"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -224,6 +223,7 @@ func (r *Reconciler) newAlertingCluster(
alertManagerPorts []corev1.ContainerPort,
volumes []corev1.Volume,
persistentClaims []corev1.PersistentVolumeClaim,
replicas int32,
) (*corev1.Service, *appsv1.StatefulSet) {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -245,7 +245,7 @@ func (r *Reconciler) newAlertingCluster(
Labels: deployLabels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: lo.ToPtr(int32(1)),
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: deployLabels,
},
Expand Down Expand Up @@ -344,6 +344,7 @@ func (r *Reconciler) alerting() []resources.Resource {
r.controllerAlertingPorts(),
requiredVolumes,
requiredPersistentClaims,
1,
)

workerService, workerWorkers := r.newAlertingCluster(
Expand All @@ -354,6 +355,7 @@ func (r *Reconciler) alerting() []resources.Resource {
r.nodeAlertingPorts(),
requiredVolumes,
requiredPersistentClaims,
r.gw.Spec.Alerting.Replicas-1,
)
ctrl.SetControllerReference(r.gw, controllerService, r.client.Scheme())
ctrl.SetControllerReference(r.gw, controllerWorkers, r.client.Scheme())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (a *AlertingManager) ConfigureCluster(ctx context.Context, conf *alertops.C
}
clone := existing.DeepCopy()
mutator(clone)
lg.Debugf("updated alerting spec : %v", existing.Spec.Alerting)
lg.Debugf("updated alerting spec : %v", clone.Spec.Alerting)
cmp, err := patch.DefaultPatchMaker.Calculate(existing, clone,
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
Expand All @@ -141,12 +141,10 @@ func (a *AlertingManager) ConfigureCluster(ctx context.Context, conf *alertops.C
}
}
lg.Debug("Done cacluating external reconcile.")
return a.K8sClient.Update(ctx, clone)
return a.K8sClient.Patch(ctx, existing, client.RawPatch(types.MergePatchType, cmp.Patch))
})
if retryErr != nil {
lg.Errorf("%s", retryErr)
}
if retryErr != nil {
return nil, retryErr
}

Expand Down Expand Up @@ -178,13 +176,31 @@ func (a *AlertingManager) GetClusterStatus(ctx context.Context, _ *emptypb.Empty

func (a *AlertingManager) InstallCluster(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
existing := a.newOpniGateway()
lg := a.Logger.With("action", "install-cluster")

mutator := func(gateway *corev1beta1.Gateway) {
gateway.Spec.Alerting.Enabled = true
}

retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := a.K8sClient.Get(ctx, client.ObjectKeyFromObject(existing), existing)
if err != nil {
return err
}
existing.Spec.Alerting.Enabled = true
return a.K8sClient.Update(ctx, existing)
clone := existing.DeepCopy()
mutator(clone)
lg.Debugf("updated alerting spec : %v", clone.Spec.Alerting)
cmp, err := patch.DefaultPatchMaker.Calculate(existing, clone,
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
patch.IgnorePDBSelector(),
)
if err == nil {
if cmp.IsEmpty() {
return status.Error(codes.FailedPrecondition, "no changes to apply")
}
}
return a.K8sClient.Patch(ctx, existing, client.RawPatch(types.MergePatchType, cmp.Patch))
})
if retryErr != nil {
return nil, retryErr
Expand All @@ -206,8 +222,9 @@ func (a *AlertingManager) InstallCluster(ctx context.Context, _ *emptypb.Empty)
}

func (a *AlertingManager) UninstallCluster(ctx context.Context, _ *alertops.UninstallRequest) (*emptypb.Empty, error) {
existing := a.newOpniGateway()

retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
existing := a.newOpniGateway()
err := a.K8sClient.Get(ctx, client.ObjectKeyFromObject(existing), existing)
if err != nil {
return err
Expand Down
59 changes: 42 additions & 17 deletions plugins/alerting/pkg/alerting/drivers/alerting_manager/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
corev1beta1 "github.com/rancher/opni/apis/core/v1beta1"
"github.com/rancher/opni/pkg/alerting/shared"
"github.com/rancher/opni/plugins/alerting/pkg/apis/alertops"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -20,42 +21,65 @@ func (a *AlertingManager) newOpniGateway() *corev1beta1.Gateway {
Namespace: a.GatewayRef.Namespace,
},
}

}

func (a *AlertingManager) newOpniControllerSet() (client.Object, error) {
func (a *AlertingManager) newOpniControllerSet() *appsv1.StatefulSet {
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: shared.OperatorAlertingControllerServiceName + "-internal",
Namespace: a.GatewayRef.Namespace,
},
}, nil
}
}

func (a *AlertingManager) newOpniWorkerSet() *appsv1.StatefulSet {
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: shared.OperatorAlertingClusterNodeServiceName + "-internal",
Namespace: a.GatewayRef.Namespace,
},
}
}

func extractGatewayAlertingSpec(gw *corev1beta1.Gateway) *corev1beta1.AlertingSpec {
alerting := gw.Spec.Alerting.DeepCopy()
return alerting
}

type statusTuple lo.Tuple2[error, *appsv1.StatefulSet]

func (a *AlertingManager) alertingControllerStatus(gw *corev1beta1.Gateway) (*alertops.InstallStatus, error) {
ss, err := a.newOpniControllerSet()
if err != nil {
return nil, err
}
k8serr := a.K8sClient.Get(context.Background(), client.ObjectKeyFromObject(ss), ss)
cs := a.newOpniControllerSet()
ws := a.newOpniWorkerSet()

ctrlErr := a.K8sClient.Get(context.Background(), client.ObjectKeyFromObject(cs), cs)
workErr := a.K8sClient.Get(context.Background(), client.ObjectKeyFromObject(ws), ws)

if gw.Spec.Alerting.Enabled {
if k8serr != nil {
if k8serrors.IsNotFound(k8serr) {
expectedSets := []statusTuple{{A: ctrlErr, B: cs}}
if gw.Spec.Alerting.Replicas > 1 {
expectedSets = append(expectedSets, statusTuple{A: workErr, B: ws})
}
for _, status := range expectedSets {
if status.A != nil {
if k8serrors.IsNotFound(status.A) {
return &alertops.InstallStatus{
State: alertops.InstallState_InstallUpdating,
}, nil
}
return nil, fmt.Errorf("failed to get opni alerting status %w", status.A)
}
if status.B.Status.Replicas != status.B.Status.AvailableReplicas {
return &alertops.InstallStatus{
State: alertops.InstallState_InstallUpdating,
}, nil
}
return nil, fmt.Errorf("failed to get opni alerting controller status %w", k8serr)
}
controller := ss.(*appsv1.StatefulSet)
if controller.Status.Replicas != controller.Status.AvailableReplicas ||
controller.Status.AvailableReplicas != gw.Spec.Alerting.Replicas {
// sanity check the desired number of replicas in the spec matches the total available replicas
up := lo.Reduce(expectedSets, func(agg int32, status statusTuple, _ int) int32 {
return agg + status.B.Status.AvailableReplicas
}, 0)
if up != gw.Spec.Alerting.Replicas {
return &alertops.InstallStatus{
State: alertops.InstallState_InstallUpdating,
}, nil
Expand All @@ -64,14 +88,15 @@ func (a *AlertingManager) alertingControllerStatus(gw *corev1beta1.Gateway) (*al
State: alertops.InstallState_Installed,
}, nil
}
if k8serr != nil {
if k8serrors.IsNotFound(k8serr) {
if ctrlErr != nil && workErr != nil {
if k8serrors.IsNotFound(ctrlErr) && k8serrors.IsNotFound(workErr) {
return &alertops.InstallStatus{
State: alertops.InstallState_NotInstalled,
}, nil
}
return nil, fmt.Errorf("failed to get opni alerting controller status %w", k8serr)
return nil, fmt.Errorf("failed to get opni alerting controller status %w", ctrlErr)
}

return &alertops.InstallStatus{
State: alertops.InstallState_Uninstalling,
}, nil
Expand Down

0 comments on commit 89f0fa4

Please sign in to comment.