From eba957a3099ab7f90200d1e8f6c1449710de74f4 Mon Sep 17 00:00:00 2001 From: Nikos Angelopoulos Date: Mon, 20 Jan 2025 14:16:49 +0100 Subject: [PATCH] test: add test to validate behaviour --- CHANGELOG.md | 1 + pkg/distributor/ha_tracker.go | 5 +-- pkg/distributor/ha_tracker_test.go | 69 ++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc759aaa41..99a406db2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [BUGFIX] MQE: Fix functions with histograms #10400 * [BUGFIX] Distributor: return HTTP status 415 Unsupported Media Type instead of 200 Success for Remote Write 2.0 until we support it. #10423 * [BUGFIX] Query-frontend: Add flag `-query-frontend.prom2-range-compat` and corresponding YAML to rewrite queries with ranges that worked in Prometheus 2 but are invalid in Prometheus 3. #10445 #10461 +* [BUGFIX] Distributor: Fix edge case at the ha-tracker with memberlist as KVStore, where a replica in the KVStore is marked as deleted but not yet removed, it would fail to update the KVStore. #10443 ### Mixin diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index 6ef404a887..9a986ca32d 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -282,7 +282,6 @@ func newHaTracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Re Help: "Number of elected replicas that failed to be marked for deletion, or deleted.", }), } - client, err := kv.NewClient( cfg.KVStore, GetReplicaDescCodec(), @@ -637,8 +636,8 @@ func (h *defaultHaTracker) updateKVStore(ctx context.Context, userID, cluster, r if desc == nil && electedAtTime == 0 { electedAtTime = timestamp.FromTime(now) } else if desc != nil && desc.DeletedAt > 0 { - // if we receive a ReplicaDesc that have been marked as deleted but not deleted from the kvStore yet, - // we need to set the value to electedAtTime, otherwise it's going be zero 0 + //If a ReplicaDesc is marked as deleted but not yet removed from the kvStore, set its value to + //electedAtTime to avoid it being zero. electedAtTime = timestamp.FromTime(now) } } diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index c096bbea48..6ed138746e 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -319,6 +319,75 @@ func TestHaTrackerWithMemberList(t *testing.T) { assert.Error(t, err) } +func TestHaTrackerWithMemberlistWhenReplicaDescIsMarkedDeletedThenKVStoreUpdateIsNotFailing(t *testing.T) { + var config memberlist.KVConfig + + const ( + cluster = "cluster" + replica1 = "r1" + replica2 = "r2" + updateTimeout = time.Millisecond * 100 + failoverTimeout = 2 * time.Millisecond + ) + + flagext.DefaultValues(&config) + ctx := context.Background() + + config.Codecs = []codec.Codec{ + GetReplicaDescCodec(), + } + + memberListSvc := memberlist.NewKVInitService( + &config, + log.NewNopLogger(), + &dnsProviderMock{}, + prometheus.NewPedanticRegistry(), + ) + require.NoError(t, services.StartAndAwaitRunning(ctx, memberListSvc)) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, memberListSvc)) + }) + + tracker, err := newHaTracker(HATrackerConfig{ + EnableHATracker: true, + KVStore: kv.Config{Store: "memberlist", StoreConfig: kv.StoreConfig{ + MemberlistKV: memberListSvc.GetMemberlistKV, + }}, + UpdateTimeout: updateTimeout, + UpdateTimeoutJitterMax: 0, + FailoverTimeout: failoverTimeout, + }, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, tracker)) + + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, tracker)) + }) + + now := time.Now() + + // Write the first time. + err = tracker.checkReplica(context.Background(), "user", cluster, replica1, now) + assert.NoError(t, err) + + key := fmt.Sprintf("%s/%s", "user", cluster) + + // Mark the ReplicaDesc as deleted in the KVStore, which will also remove it from the tracker cache. + err = tracker.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) { + d, ok := in.(*ReplicaDesc) + if !ok || d == nil { + return nil, false, nil + } + d.DeletedAt = timestamp.FromTime(time.Now()) + return d, true, nil + }) + assert.NoError(t, err) + + receivedAt := timestamp.FromTime(now.Add(100 * time.Millisecond)) + err = tracker.updateKVStore(context.Background(), "user", cluster, replica2, now, receivedAt) + assert.NoError(t, err) +} + func TestHATrackerCacheSyncOnStart(t *testing.T) { const cluster = "c1" const replicaOne = "r1"