Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/v1alpha1/memgraphcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ type ReplicationSpec struct {
// +kubebuilder:default="ASYNC"
// +optional
Mode ReplicationMode `json:"mode,omitempty"`

// BehindAlertThreshold is the duration a replica may stay behind the main
// before being reported as unhealthy. Defaults to 5m. Must be greater than 0.
// Pointer so that an unset field is omitted and picks up the default, rather
// than serializing as "0s" (metav1.Duration is a struct, so omitempty alone
// would not omit its zero value).
// +kubebuilder:default="5m"
// +kubebuilder:validation:XValidation:rule=`!self.startsWith("-") && self != "0s"`,message="behindAlertThreshold must be a positive Go duration (e.g. '5m', '30s'); zero and negative values are not allowed"
// +optional
BehindAlertThreshold *metav1.Duration `json:"behindAlertThreshold,omitempty"`
}

// HighAvailabilitySpec defines automatic promotion/failover settings
Expand Down
7 changes: 6 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions config/crd/bases/memgraph.base14.io_memgraphclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,19 @@ spec:
replication:
description: Replication defines the replication settings
properties:
behindAlertThreshold:
default: 5m
description: |-
BehindAlertThreshold is the duration a replica may stay behind the main
before being reported as unhealthy. Defaults to 5m. Must be greater than 0.
Pointer so that an unset field is omitted and picks up the default, rather
than serializing as "0s" (metav1.Duration is a struct, so omitempty alone
would not omit its zero value).
type: string
x-kubernetes-validations:
- message: behindAlertThreshold must be a positive Go duration
(e.g. '5m', '30s'); zero and negative values are not allowed
rule: '!self.startsWith("-") && self != "0s"'
mode:
default: ASYNC
description: Mode is the replication mode (ASYNC, SYNC, STRICT_SYNC)
Expand Down
4 changes: 3 additions & 1 deletion internal/controller/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ const (
EventReasonReplicaUnregistered = "ReplicaUnregistered"
EventReasonReplicationHealthy = "ReplicationHealthy"
EventReasonReplicationError = "ReplicationError"
EventReasonReplicaUnhealthy = "ReplicaUnhealthy"
EventReasonReplicaDataChannelDown = "ReplicaDataChannelDown"
EventReasonReplicaInvalid = "ReplicaInvalid"
EventReasonReplicaBehindTooLong = "ReplicaBehindTooLong"
EventReasonReplicationLagHigh = "ReplicationLagHigh"

// Failover events
Expand Down
4 changes: 3 additions & 1 deletion internal/controller/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func TestEventReasonConstants(t *testing.T) {
{"EventReasonReplicaUnregistered", EventReasonReplicaUnregistered},
{"EventReasonReplicationHealthy", EventReasonReplicationHealthy},
{"EventReasonReplicationError", EventReasonReplicationError},
{"EventReasonReplicaUnhealthy", EventReasonReplicaUnhealthy},
{"EventReasonReplicaDataChannelDown", EventReasonReplicaDataChannelDown},
{"EventReasonReplicaInvalid", EventReasonReplicaInvalid},
{"EventReasonReplicaBehindTooLong", EventReasonReplicaBehindTooLong},
{"EventReasonReplicationLagHigh", EventReasonReplicationLagHigh},
{"EventReasonMainInstanceFailed", EventReasonMainInstanceFailed},
{"EventReasonFailoverStarted", EventReasonFailoverStarted},
Expand Down
6 changes: 5 additions & 1 deletion internal/controller/memgraphcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ func (r *MemgraphClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
cluster := &memgraphv1alpha1.MemgraphCluster{}
if err := r.Get(ctx, req.NamespacedName, cluster); err != nil {
if apierrors.IsNotFound(err) {
// Clean up metrics for deleted cluster
// Clean up metrics and per-replica state for deleted cluster
if r.replicationManager != nil {
r.replicationManager.DeleteClusterState(req.Name, req.Namespace)
}
r.metrics.DeleteClusterMetrics(req.Name, req.Namespace)
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -301,6 +304,7 @@ func (r *MemgraphClusterReconciler) ensureReplicationManager() error {
}

r.replicationManager = NewReplicationManager(mgClient, r.Recorder)
r.replicationManager.SetMetricsRecorder(r.metrics)
return nil
}

Expand Down
46 changes: 46 additions & 0 deletions internal/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ var (
[]string{"cluster", "namespace"},
)

// Stateful per-replica health metrics (owned by the replica classifier in
// ReplicationManager.CheckReplicationHealth). These complement the snapshot
// gauges below with time-aware signals: whether the data channel is up and
// how long the replica has continuously been behind.
replicaDataChannelUpGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "memgraph_replica_data_channel_up",
Help: "1 if the replica's data channel is established (data_info populated and status not invalid/unknown), else 0",
},
[]string{"cluster", "namespace", "replica"},
)

replicaBehindSecondsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "memgraph_replica_behind_seconds",
Help: "Number of seconds the replica has continuously been behind the main; 0 when caught up",
},
[]string{"cluster", "namespace", "replica"},
)

// Per-replica replication metrics (from SHOW REPLICAS data_info).
// These distinguish "registered" from "actually streaming data": a replica
// can be registered and heartbeating while replicating nothing (empty
Expand Down Expand Up @@ -290,6 +310,8 @@ func init() {
clusterRegisteredReplicasGauge,
replicationLagGauge,
replicationHealthyGauge,
replicaDataChannelUpGauge,
replicaBehindSecondsGauge,
replicaHealthyGauge,
replicaStatusGauge,
replicaBehindGauge,
Expand Down Expand Up @@ -520,6 +542,8 @@ func (m *MetricsRecorder) DeleteClusterMetrics(cluster, namespace string) {

// Sweep all per-replica and per-instance series for the cluster
partial := prometheus.Labels{"cluster": cluster, "namespace": namespace}
replicaDataChannelUpGauge.DeletePartialMatch(partial)
replicaBehindSecondsGauge.DeletePartialMatch(partial)
replicaHealthyGauge.DeletePartialMatch(partial)
replicaStatusGauge.DeletePartialMatch(partial)
replicaBehindGauge.DeletePartialMatch(partial)
Expand All @@ -529,3 +553,25 @@ func (m *MetricsRecorder) DeleteClusterMetrics(cluster, namespace string) {
replicationEdgeDriftGauge.DeletePartialMatch(partial)
instanceHealthGauge.DeletePartialMatch(partial)
}

// RecordReplicaDataChannel sets the per-replica data-channel-up gauge.
func (m *MetricsRecorder) RecordReplicaDataChannel(cluster, namespace, replica string, up bool) {
v := 0.0
if up {
v = 1.0
}
replicaDataChannelUpGauge.WithLabelValues(cluster, namespace, replica).Set(v)
}

// RecordReplicaBehindSeconds sets how long the replica has been behind, in seconds.
// Pass 0 when the replica is caught up.
func (m *MetricsRecorder) RecordReplicaBehindSeconds(cluster, namespace, replica string, seconds float64) {
replicaBehindSecondsGauge.WithLabelValues(cluster, namespace, replica).Set(seconds)
}

// DeleteReplicaMetrics removes the stateful per-replica gauges for a single
// replica (called when a replica is unregistered or its cluster is deleted).
func (m *MetricsRecorder) DeleteReplicaMetrics(cluster, namespace, replica string) {
replicaDataChannelUpGauge.DeleteLabelValues(cluster, namespace, replica)
replicaBehindSecondsGauge.DeleteLabelValues(cluster, namespace, replica)
}
46 changes: 46 additions & 0 deletions internal/controller/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,52 @@ func TestMetricsRecorder_DeleteClusterMetricsSweepsReplicaSeries(t *testing.T) {
}
}

func TestMetricsRecorder_RecordReplicaDataChannelAndBehind(t *testing.T) {
m := NewMetricsRecorder()
cluster, namespace, replica := "chan-cluster", testMetricsNamespace, "replica_0"

m.RecordReplicaDataChannel(cluster, namespace, replica, true)
if got := testutil.ToFloat64(replicaDataChannelUpGauge.WithLabelValues(cluster, namespace, replica)); got != 1 {
t.Errorf("data_channel_up = %v, want 1", got)
}

m.RecordReplicaDataChannel(cluster, namespace, replica, false)
if got := testutil.ToFloat64(replicaDataChannelUpGauge.WithLabelValues(cluster, namespace, replica)); got != 0 {
t.Errorf("data_channel_up = %v, want 0", got)
}

m.RecordReplicaBehindSeconds(cluster, namespace, replica, 42.5)
if got := testutil.ToFloat64(replicaBehindSecondsGauge.WithLabelValues(cluster, namespace, replica)); got != 42.5 {
t.Errorf("behind_seconds = %v, want 42.5", got)
}

// DeleteReplicaMetrics drops both series for the replica.
m.DeleteReplicaMetrics(cluster, namespace, replica)
if got := testutil.CollectAndCount(replicaDataChannelUpGauge); got != 0 {
t.Errorf("data_channel_up has %d series after delete, want 0", got)
}
if got := testutil.CollectAndCount(replicaBehindSecondsGauge); got != 0 {
t.Errorf("behind_seconds has %d series after delete, want 0", got)
}
}

func TestMetricsRecorder_DeleteClusterMetricsSweepsStatefulReplicaSeries(t *testing.T) {
m := NewMetricsRecorder()
cluster, namespace := "stateful-sweep-cluster", testMetricsNamespace

m.RecordReplicaDataChannel(cluster, namespace, "replica_0", true)
m.RecordReplicaBehindSeconds(cluster, namespace, "replica_0", 10)

m.DeleteClusterMetrics(cluster, namespace)

if got := testutil.CollectAndCount(replicaDataChannelUpGauge); got != 0 {
t.Errorf("data_channel_up has %d series after DeleteClusterMetrics, want 0", got)
}
if got := testutil.CollectAndCount(replicaBehindSecondsGauge); got != 0 {
t.Errorf("behind_seconds has %d series after DeleteClusterMetrics, want 0", got)
}
}

func TestMetricsRecorder_RecordInstanceHealth(t *testing.T) {
m := NewMetricsRecorder()

Expand Down
123 changes: 123 additions & 0 deletions internal/controller/replica_classifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2025 Base14. See LICENSE file for details.

package controller

import (
"strings"
"time"

"github.com/base14/memgraph-operator/internal/memgraph"
)

// replicaClassification is the outcome of evaluating a single replica's state
// against the operator's health policy.
type replicaClassification int

const (
classificationHealthy replicaClassification = iota // ready/recovery/replicating, caught up
classificationBehind // behind > 0 but within behindAlertThreshold
classificationTransient // data_info empty for < channelDownGracePeriod
classificationDataChannelDown // data_info empty for >= channelDownGracePeriod
classificationInvalid // any DB status == "invalid"
classificationUnknownStatus // any DB status not in known set
classificationBehindTooLong // behind > 0 longer than behindAlertThreshold
)

// channelDownGracePeriod is how long an empty data_info must persist before
// being flagged as DataChannelDown. Memgraph normally populates data_info
// within a few seconds of registration.
const channelDownGracePeriod = 30 * time.Second

// isHealthy reports whether a classification counts toward HealthyReplicas.
// Transient and Behind are "in-progress" warnings, not failures.
func (c replicaClassification) isHealthy() bool {
switch c {
case classificationHealthy, classificationBehind, classificationTransient:
return true
default:
return false
}
}

// replicaState holds per-replica timers that span health checks.
// The zero value means "no timer started".
type replicaState struct {
behindSince time.Time // when did the replica first have behind > 0 without recovery
channelDownSince time.Time // when did the replica first have empty data_info
}

// classifyReplica evaluates one replica against per-replica state.
// It mutates state in-place (start/clear timers) and returns the classification.
//
// Policy (from spec 2026-05-25-replica-health-detection-design):
// - empty DataInfo
// - first seen / within grace -> Transient
// - persisted past grace -> DataChannelDown
// - any DB status == "invalid" -> Invalid (terminal, returned immediately)
// - any DB status unknown -> UnknownStatus (defensive)
// - any DB Behind > 0
// - first seen / within threshold -> Behind
// - persisted past threshold -> BehindTooLong
// - else -> Healthy
//
// Negative Behind values are ignored entirely (they indicate replica-side
// divergence, which is a separate concern parked for a follow-up).
func classifyReplica(replica memgraph.ReplicaInfo, state *replicaState, now time.Time, behindThreshold time.Duration) replicaClassification {
if len(replica.DataInfo) == 0 {
if state.channelDownSince.IsZero() {
state.channelDownSince = now
}
// Channel is down → lag is undefined; clear the behind timer so the
// memgraph_replica_behind_seconds gauge does not keep growing.
state.behindSince = time.Time{}
if now.Sub(state.channelDownSince) > channelDownGracePeriod {
return classificationDataChannelDown
}
return classificationTransient
}
state.channelDownSince = time.Time{}

// First pass: check for terminal "invalid" status without mutating state.
for _, db := range replica.DataInfo {
if dbStatus(db) == memgraph.ReplicaStatusInvalid {
return classificationInvalid
}
}

worst := classificationHealthy
anyBehind := false

for _, db := range replica.DataInfo {
switch dbStatus(db) {
case memgraph.ReplicaStatusReady, memgraph.ReplicaStatusRecovery, memgraph.ReplicaStatusReplicating:
// known good
default:
worst = classificationUnknownStatus
}

if db.Behind > 0 {
anyBehind = true
if state.behindSince.IsZero() {
state.behindSince = now
}
if now.Sub(state.behindSince) > behindThreshold {
return classificationBehindTooLong
}
if worst == classificationHealthy {
worst = classificationBehind
}
}
}

if !anyBehind {
state.behindSince = time.Time{}
}

return worst
}

// dbStatus normalizes a per-database status the same way summarizeDataInfo does
// in the memgraph client, so classification is robust to casing/quoting.
func dbStatus(db memgraph.ReplicaDBInfo) string {
return strings.ToLower(strings.Trim(strings.TrimSpace(db.Status), "\""))
}
Loading
Loading