Skip to content

Commit

Permalink
test: add test to validate behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
NickAnge committed Jan 20, 2025
1 parent ee985d4 commit eba957a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [BUGFIX] MQE: Fix <aggr_over_time> functions with histograms #10400
* [BUGFIX] Distributor: return HTTP status 415 Unsupported Media Type instead of 200 Success for Remote Write 2.0 until we support it. #10423
* [BUGFIX] Query-frontend: Add flag `-query-frontend.prom2-range-compat` and corresponding YAML to rewrite queries with ranges that worked in Prometheus 2 but are invalid in Prometheus 3. #10445 #10461
* [BUGFIX] Distributor: Fix edge case at the ha-tracker with memberlist as KVStore, where a replica in the KVStore is marked as deleted but not yet removed, it would fail to update the KVStore. #10443

### Mixin

Expand Down
5 changes: 2 additions & 3 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func newHaTracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Re
Help: "Number of elected replicas that failed to be marked for deletion, or deleted.",
}),
}

client, err := kv.NewClient(
cfg.KVStore,
GetReplicaDescCodec(),
Expand Down Expand Up @@ -637,8 +636,8 @@ func (h *defaultHaTracker) updateKVStore(ctx context.Context, userID, cluster, r
if desc == nil && electedAtTime == 0 {
electedAtTime = timestamp.FromTime(now)
} else if desc != nil && desc.DeletedAt > 0 {
// if we receive a ReplicaDesc that have been marked as deleted but not deleted from the kvStore yet,
// we need to set the value to electedAtTime, otherwise it's going be zero 0
//If a ReplicaDesc is marked as deleted but not yet removed from the kvStore, set its value to
//electedAtTime to avoid it being zero.
electedAtTime = timestamp.FromTime(now)
}
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,75 @@ func TestHaTrackerWithMemberList(t *testing.T) {
assert.Error(t, err)
}

func TestHaTrackerWithMemberlistWhenReplicaDescIsMarkedDeletedThenKVStoreUpdateIsNotFailing(t *testing.T) {
var config memberlist.KVConfig

const (
cluster = "cluster"
replica1 = "r1"
replica2 = "r2"
updateTimeout = time.Millisecond * 100
failoverTimeout = 2 * time.Millisecond
)

flagext.DefaultValues(&config)
ctx := context.Background()

config.Codecs = []codec.Codec{
GetReplicaDescCodec(),
}

memberListSvc := memberlist.NewKVInitService(
&config,
log.NewNopLogger(),
&dnsProviderMock{},
prometheus.NewPedanticRegistry(),
)
require.NoError(t, services.StartAndAwaitRunning(ctx, memberListSvc))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, memberListSvc))
})

tracker, err := newHaTracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Store: "memberlist", StoreConfig: kv.StoreConfig{
MemberlistKV: memberListSvc.GetMemberlistKV,
}},
UpdateTimeout: updateTimeout,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: failoverTimeout,
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, tracker))

t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, tracker))
})

now := time.Now()

// Write the first time.
err = tracker.checkReplica(context.Background(), "user", cluster, replica1, now)
assert.NoError(t, err)

key := fmt.Sprintf("%s/%s", "user", cluster)

// Mark the ReplicaDesc as deleted in the KVStore, which will also remove it from the tracker cache.
err = tracker.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
d, ok := in.(*ReplicaDesc)
if !ok || d == nil {
return nil, false, nil
}
d.DeletedAt = timestamp.FromTime(time.Now())
return d, true, nil
})
assert.NoError(t, err)

receivedAt := timestamp.FromTime(now.Add(100 * time.Millisecond))
err = tracker.updateKVStore(context.Background(), "user", cluster, replica2, now, receivedAt)
assert.NoError(t, err)
}

func TestHATrackerCacheSyncOnStart(t *testing.T) {
const cluster = "c1"
const replicaOne = "r1"
Expand Down

0 comments on commit eba957a

Please sign in to comment.