-
Notifications
You must be signed in to change notification settings - Fork 541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributor might start serving writes with an empty HA tracke… #6991
Changes from all commits
6177c88
e3f8b2e
3cffe04
06ba08b
55e875c
7cde67d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -202,10 +202,55 @@ func newHATracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Re | |||||||
t.client = client | ||||||||
} | ||||||||
|
||||||||
t.Service = services.NewBasicService(nil, t.loop, nil) | ||||||||
t.Service = services.NewBasicService(t.syncHAStateOnStart, t.loop, nil) | ||||||||
return t, nil | ||||||||
} | ||||||||
|
||||||||
// Will update the internal cache to be in sync with the KV Store | ||||||||
func (h *haTracker) syncHAStateOnStart(ctx context.Context) error { | ||||||||
if !h.cfg.EnableHATracker { | ||||||||
return nil | ||||||||
} | ||||||||
|
||||||||
// haTracker holds HATrackerConfig with the prefix. | ||||||||
keys, err := h.client.List(ctx, "") | ||||||||
if err != nil { | ||||||||
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to list replica keys", "err", err) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this will also fail the serve startup i think it's ok to not log it here. Otherwise we end up with duplicate logging There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good, ill remove. |
||||||||
return err | ||||||||
} | ||||||||
|
||||||||
if len(keys) == 0 { | ||||||||
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: no keys for HA tracker prefix", "err", err) | ||||||||
return nil | ||||||||
} | ||||||||
|
||||||||
// TODO what is the benefit to allowing an a concurrent/parallel process here | ||||||||
// What is the amount fo keys we are getting back? Is it inefficient to serially allow this behavior? | ||||||||
Comment on lines
+227
to
+228
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left a TODO here, what is the benefit of allowing asynchronous behavior for getting a replica based on the keys coming back from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dimitarvdimitrov @duricanikolic @ying-jeanne @aknuds1 @fayzal-g pinging people that seem to know a lot of distributor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked a few clusters internally and etcd GET latency averages at below 5ms. It will add some latency to startup, but I don't think 5-10 seconds are crucial. If they end up causing problems for someone, we can improve it. I think your current approach is good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good, thank you for this. |
||||||||
for i := 0; i < len(keys); i++ { | ||||||||
if ctx.Err() != nil { | ||||||||
return ctx.Err() | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we've had some issues with improper context cancellation and vague context cancellation messages. Let's use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahh gotcha, makes sense. I'll make that change, ty. |
||||||||
} | ||||||||
|
||||||||
val, err := h.client.Get(ctx, keys[i]) | ||||||||
if err != nil { | ||||||||
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to get replica value", "key", keys[i], "err", err) | ||||||||
return err | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like your approach of continuing with the startup even if there are some errors. I think it's ok to continue here as well. Is there a specific reason you decided to return instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There were certain errors I thought that mattered.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We could potentially use retries with backoff in case the KV store has issues. I just checked the regular HA tracker logic and changed my mind. On the regular push path we return a 5xx error in case the KV store is unavailable. I think on startup it makes more sense to follow the same logic. That is - instead of WDYT? |
||||||||
} | ||||||||
|
||||||||
desc, ok := val.(*ReplicaDesc) | ||||||||
if !ok { | ||||||||
level.Error(h.logger).Log("msg", "syncHAStateOnStart: got invalid replica descriptor", "key", keys[i]) | ||||||||
continue | ||||||||
} | ||||||||
|
||||||||
if notProcessed := h.processKVStoreEntry(keys[i], desc); !notProcessed { | ||||||||
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to processed replica value to cache", "key", keys[i], "err", err) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
since those would be read by people, it's easier to understand if it's in plain English. Same applies to the other few log lines. |
||||||||
} | ||||||||
} | ||||||||
|
||||||||
return nil | ||||||||
} | ||||||||
|
||||||||
// Follows pattern used by ring for WatchKey. | ||||||||
func (h *haTracker) loop(ctx context.Context) error { | ||||||||
if !h.cfg.EnableHATracker { | ||||||||
|
@@ -226,39 +271,11 @@ func (h *haTracker) loop(ctx context.Context) error { | |||||||
// The KVStore config we gave when creating h should have contained a prefix, | ||||||||
// which would have given us a prefixed KVStore client. So, we can pass empty string here. | ||||||||
h.client.WatchPrefix(ctx, "", func(key string, value interface{}) bool { | ||||||||
replica := value.(*ReplicaDesc) | ||||||||
segments := strings.SplitN(key, "/", 2) | ||||||||
|
||||||||
// Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid. | ||||||||
if len(segments) != 2 { | ||||||||
return true | ||||||||
} | ||||||||
|
||||||||
user := segments[0] | ||||||||
cluster := segments[1] | ||||||||
|
||||||||
if replica.DeletedAt > 0 { | ||||||||
h.electedReplicaChanges.DeleteLabelValues(user, cluster) | ||||||||
h.electedReplicaTimestamp.DeleteLabelValues(user, cluster) | ||||||||
|
||||||||
h.electedLock.Lock() | ||||||||
defer h.electedLock.Unlock() | ||||||||
userClusters := h.clusters[user] | ||||||||
if userClusters != nil { | ||||||||
delete(userClusters, cluster) | ||||||||
if len(userClusters) == 0 { | ||||||||
delete(h.clusters, user) | ||||||||
} | ||||||||
} | ||||||||
return true | ||||||||
replica, ok := value.(*ReplicaDesc) | ||||||||
if !ok { | ||||||||
return false | ||||||||
} | ||||||||
|
||||||||
// Store the received information into our cache | ||||||||
h.electedLock.Lock() | ||||||||
h.updateCache(user, cluster, replica) | ||||||||
h.electedLock.Unlock() | ||||||||
h.electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds()) | ||||||||
return true | ||||||||
return h.processKVStoreEntry(key, replica) | ||||||||
}) | ||||||||
|
||||||||
wg.Wait() | ||||||||
|
@@ -274,6 +291,41 @@ const ( | |||||||
deletionTimeout = 30 * time.Minute | ||||||||
) | ||||||||
|
||||||||
func (h *haTracker) processKVStoreEntry(key string, replica *ReplicaDesc) bool { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function always returns true. Can we just omit the return parameter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I can. And you are 100% correct I was thinking about that. The only issue i had with keeping it is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, in this case we can hardcode the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gotcha, sounds good, ill do that. ty. |
||||||||
segments := strings.SplitN(key, "/", 2) | ||||||||
|
||||||||
// Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid. | ||||||||
if len(segments) != 2 { | ||||||||
return true | ||||||||
} | ||||||||
|
||||||||
user := segments[0] | ||||||||
cluster := segments[1] | ||||||||
|
||||||||
if replica.DeletedAt > 0 { | ||||||||
h.electedReplicaChanges.DeleteLabelValues(user, cluster) | ||||||||
h.electedReplicaTimestamp.DeleteLabelValues(user, cluster) | ||||||||
|
||||||||
h.electedLock.Lock() | ||||||||
defer h.electedLock.Unlock() | ||||||||
userClusters := h.clusters[user] | ||||||||
if userClusters != nil { | ||||||||
delete(userClusters, cluster) | ||||||||
if len(userClusters) == 0 { | ||||||||
delete(h.clusters, user) | ||||||||
} | ||||||||
} | ||||||||
return true | ||||||||
Comment on lines
+306
to
+318
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this was part of the code before your change, but I think now is a good time to clean it up a bit. Can you move it into its own function? perhaps There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure thing. |
||||||||
} | ||||||||
|
||||||||
// Store the received information into our cache | ||||||||
h.electedLock.Lock() | ||||||||
h.updateCache(user, cluster, replica) | ||||||||
h.electedLock.Unlock() | ||||||||
h.electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds()) | ||||||||
|
||||||||
return true | ||||||||
} | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
func (h *haTracker) updateKVLoop(ctx context.Context) { | ||||||||
cleanupTick := time.NewTicker(util.DurationWithJitter(cleanupCyclePeriod, cleanupCycleJitterVariance)) | ||||||||
defer cleanupTick.Stop() | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -154,7 +154,6 @@ func TestWatchPrefixAssignment(t *testing.T) { | |
require.NoError(t, err) | ||
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) | ||
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck | ||
|
||
// Write the first time. | ||
now := time.Now() | ||
|
||
|
@@ -792,6 +791,58 @@ func TestCheckReplicaCleanup(t *testing.T) { | |
)) | ||
} | ||
|
||
// TestStartingSync tests that values are synced to KVStore on startup | ||
func TestStartingSync(t *testing.T) { | ||
t.Run("check that if a second replica is set up during preLoad that it does not write to it since its not the leader", func(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this test description looks out of place. Since there's only one test case here can we remove the t.Run? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah i can. |
||
cluster := "c1" | ||
replica := "r1" | ||
var c *haTracker | ||
var err error | ||
var now time.Time | ||
|
||
codec := GetReplicaDescCodec() | ||
kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil) | ||
t.Cleanup(func() { assert.NoError(t, closer.Close()) }) | ||
|
||
mock := kv.PrefixClient(kvStore, "prefix") | ||
c, err = newHATracker(HATrackerConfig{ | ||
EnableHATracker: true, | ||
KVStore: kv.Config{Mock: mock}, | ||
UpdateTimeout: 15 * time.Second, | ||
UpdateTimeoutJitterMax: 0, | ||
FailoverTimeout: time.Millisecond * 2, | ||
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger()) | ||
require.NoError(t, err) | ||
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) | ||
|
||
now = time.Now() | ||
err = c.checkReplica(context.Background(), "user", cluster, replica, now) | ||
assert.NoError(t, err) | ||
|
||
// Check to see if the value in the trackers cache is correct. | ||
checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica, now) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, good catch. |
||
services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck | ||
|
||
replicaTwo := "r2" | ||
c, err = newHATracker(HATrackerConfig{ | ||
EnableHATracker: true, | ||
KVStore: kv.Config{Mock: mock}, | ||
UpdateTimeout: 15 * time.Second, | ||
UpdateTimeoutJitterMax: 0, | ||
FailoverTimeout: time.Millisecond * 2, | ||
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger()) | ||
|
||
require.NoError(t, err) | ||
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) | ||
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ill make that change. ty. |
||
|
||
now = time.Now() | ||
err = c.checkReplica(context.Background(), "user", cluster, replicaTwo, now) | ||
assert.Error(t, err) | ||
assert.ErrorIs(t, err, replicasDidNotMatchError{replica: "r2", elected: "r1"}) | ||
}) | ||
} | ||
|
||
func checkUserClusters(t *testing.T, duration time.Duration, c *haTracker, user string, expectedClusters int) { | ||
t.Helper() | ||
test.Poll(t, duration, nil, func() interface{} { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you put the PR number here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for sure will, ty for this.