-
Notifications
You must be signed in to change notification settings - Fork 541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributor might start serving writes with an empty HA tracke… #6991
Distributor might start serving writes with an empty HA tracke… #6991
Conversation
@dimitarvdimitrov Is it okay if I get a look at this? |
is someone more familiar with the distributor up to reviewing this, perhaps @duricanikolic @ying-jeanne @aknuds1 @fayzal-g? Otherwise, I can take a look in the first week of January |
…pdated to the cache
// TODO what is the benefit to allowing an a concurrent/parallel process here | ||
// What is the amount fo keys we are getting back? Is it inefficient to serially allow this behavior? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a TODO here, what is the benefit of allowing asynchronous behavior for getting a replica based on the keys coming back from the h.client.List
. I'm still very new to contributing, and I couldn't really find what the the average length of keys that come back from h.client.List
is?. I don't wanna create goroutines for each key here if its not gonna bring much benefit at all. But I would like a discussion on the amount of keys getting returned and the performance impact this could bring if left serially. Since this is a pre load like state? If this should be asynchronous I wouldn't want this to all be block as well? thoughts? My thought is it's low since its per user/cluster right? For the key, so it shouldn't be to drastic of a keys coming back from the List?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dimitarvdimitrov @duricanikolic @ying-jeanne @aknuds1 @fayzal-g pinging people that seem to know a lot of distributor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked a few clusters internally and etcd GET latency averages at below 5ms. It will add some latency to startup, but I don't think 5-10 seconds are crucial. If they end up causing problems for someone, we can improve it. I think your current approach is good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, thank you for this.
Anyone back from holidays able to look through PR and comments? |
sorry for the delay, I will take a look at this PR in the coming days |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your work. I'm starting to doubt whether this bug was ever present or if I misread the code initially.
I tried commenting out the initial sync that you added and expected that TestStartingSync
will fail. I suspect it didn't because if the HA tracker sees a new replica which it hasn't seen before it will update the KV store before returning the value. The update happens by a compare-and-swap, and prevents from overwriting the correct leader (r1 in the test case).
mimir/pkg/distributor/ha_tracker.go
Lines 530 to 537 in 06ba08b
// If the entry in KVStore is up-to-date, just stop the loop. | |
if h.withinUpdateTimeout(now, desc.ReceivedAt) || | |
// If our replica is different, wait until the failover time. | |
desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < h.cfg.FailoverTimeout { | |
return nil, false, nil | |
} | |
} | |
Can you verify this too?
This will be a problem if we decide to support memberlist (#1597) where compare-and-swap always returns immediately, but it's not a problem right now with etcd and consul. Because of this I still think it's worth it to merge this change.
@@ -28,6 +28,7 @@ | |||
* [BUGFIX] Fix issue where all incoming HTTP requests have duplicate trace spans. #6920 | |||
* [BUGFIX] Querier: do not retry requests to store-gateway when a query gets canceled. #6934 | |||
* [BUGFIX] Querier: return 499 status code instead of 500 when a request to remote read endpoint gets canceled. #6934 | |||
* [BUGFIX] Distributor: distributor might start serving writes with an empty HA tracker state. #5796 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you put the PR number here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for sure will, ty for this.
// haTracker holds HATrackerConfig with the prefix. | ||
keys, err := h.client.List(ctx, "") | ||
if err != nil { | ||
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to list replica keys", "err", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this will also fail the serve startup i think it's ok to not log it here. Otherwise we end up with duplicate logging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, ill remove.
@@ -274,6 +291,41 @@ const ( | |||
deletionTimeout = 30 * time.Minute | |||
) | |||
|
|||
func (h *haTracker) processKVStoreEntry(key string, replica *ReplicaDesc) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function always returns true. Can we just omit the return parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I can. And you are 100% correct I was thinking about that. The only issue i had with keeping it is processKVStoreEntry
is used in the WatchPrefix
. And processKVStoreEntry
returns out of WatchPrefix
with that function call. And WatchPrefix
requires a return of a bool right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, in this case we can hardcode the return true
in WatchPrefix
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha, sounds good, ill do that. ty.
h.electedReplicaChanges.DeleteLabelValues(user, cluster) | ||
h.electedReplicaTimestamp.DeleteLabelValues(user, cluster) | ||
|
||
h.electedLock.Lock() | ||
defer h.electedLock.Unlock() | ||
userClusters := h.clusters[user] | ||
if userClusters != nil { | ||
delete(userClusters, cluster) | ||
if len(userClusters) == 0 { | ||
delete(h.clusters, user) | ||
} | ||
} | ||
return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this was part of the code before your change, but I think now is a good time to clean it up a bit. Can you move it into its own function? perhaps h.cleanUpDeletedReplica
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure thing.
val, err := h.client.Get(ctx, keys[i]) | ||
if err != nil { | ||
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to get replica value", "key", keys[i], "err", err) | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your approach of continuing with the startup even if there are some errors. I think it's ok to continue here as well. Is there a specific reason you decided to return instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were certain errors I thought that mattered.
- What if for the
h.client
there were bigger issue's (like a connectivity issues) which made me think thestartingFn
behavior would have issues since it uses theh.client
. You are right a lot of error returning here I should continue just moving on with instead of returning out with an error. The only error that makes sense to return out would be the one where if the context was canceled so something bigger happened in the system that mattered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- What if for the
h.client
there were bigger issue's (like a connectivity issues) which made me think thestartingFn
behavior would have issues since it uses theh.client
We could potentially use retries with backoff in case the KV store has issues.
I just checked the regular HA tracker logic and changed my mind. On the regular push path we return a 5xx error in case the KV store is unavailable. I think on startup it makes more sense to follow the same logic. That is - instead of continue
ing we return an error. We should still stick to the error tolerance that the regular checkSample has (e.g. if the key doesn't follow the expected 2-segmented format).
WDYT?
// TODO what is the benefit to allowing an a concurrent/parallel process here | ||
// What is the amount fo keys we are getting back? Is it inefficient to serially allow this behavior? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked a few clusters internally and etcd GET latency averages at below 5ms. It will add some latency to startup, but I don't think 5-10 seconds are crucial. If they end up causing problems for someone, we can improve it. I think your current approach is good.
|
||
require.NoError(t, err) | ||
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) | ||
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use t.Cleanup
instead of defer
like in the consul instance above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ill make that change. ty.
h.electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds()) | ||
|
||
return true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | |
} | |
assert.NoError(t, err) | ||
|
||
// Check to see if the value in the trackers cache is correct. | ||
checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica, now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the "user"
also go in a variable (preferably a constant) at the top of the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, good catch.
@@ -792,6 +791,58 @@ func TestCheckReplicaCleanup(t *testing.T) { | |||
)) | |||
} | |||
|
|||
// TestStartingSync tests that values are synced to KVStore on startup | |||
func TestStartingSync(t *testing.T) { | |||
t.Run("check that if a second replica is set up during preLoad that it does not write to it since its not the leader", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test description looks out of place. Since there's only one test case here can we remove the t.Run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i can.
@dimitarvdimitrov -- Based on the comment above you wrote. Based on my finding in the test, it will not update the KV Store for replica 2 aka r2 if it has not seen it before. Instead it hits here.
and return the error, since this tests checks if this is the same (leader) replica for the same cluster and if it is will update the KV store, if its a different replica for the same cluster it will return out with error. I Assume that does not line up with the accepted behavior you want? for the replica of r1,
it will hit here. I'm assuming your asking if it finds a different replica for the same cluster that if its updating it or not? This test seems to follow that it wont if its a different replica for the same cluster. I dont see a case in the test where it hits the code you provided right here.
Am I following correctly what you want to verify system's tracker detected a new replica and updated its information store in a way that preserves the integrity of the current leader node? But I dont see in the test the new replica getting updated in the KV Store, but it does preserve that the leader replica. |
So in the context of the changes before your PR (i.e. in the context of what's on
you're right and I think this behaviour is the correct one. However, r2 doesn't end up there because of the initial syncs. When attempting with r2, the code should hit here because the in-memory cache doesn't contain the requested cluster mimir/pkg/distributor/ha_tracker.go Lines 432 to 446 in 59fe89a
then mimir/pkg/distributor/ha_tracker.go Lines 475 to 493 in 59fe89a
this line in particular mimir/pkg/distributor/ha_tracker.go Lines 480 to 482 in 59fe89a
so in essence the in-memory cache is lazily updated as requests come in. But an empty cache doesn't mean that any replica will be elected by the distributor. I found this out with a debugger. It makes stepping through the code easier |
@dimitarvdimitrov i see. You're saying when the sync restarts, replica 2 should be hitting the in memory cache and being updated in the KV store right? Are you sure It should be hitting in the code you specified? Technically replica 2 is part of the same cluster as replica 1 right? so shouldn't it not hit the code flow you pointed out? Or is that what we want the change, even if its part the same cluster we should still up the KV store with the other replica's for that cluster? |
after the restart the in-memory cache is empty (in the state of the code on the
I don't think so. This PR should only sync the cluster on startup, but not really skip the cache if the cache is present. |
@dimitarvdimitrov I tried commenting out the initial sync that you added and expected that TestStartingSync will fail. I suspect it didn't because if the HA tracker sees a new replica which it hasn't seen before it will update the KV store before returning the value. The update happens by a compare-and-swap, and prevents from overwriting the correct leader (r1 in the test case). ___ From comment above Looking through debugger as the CAS portion, you are right. CAS will happen, and it will see a new replica (r2) which it hasn't seen before and do a check and set. Either way, the cache will update. I guess this brings me to a point where should this ticket be then? I assume we just wanted to update the cache with the necessary info before it can serve writes. |
This will be a problem if we decide to support memberlist (#1597) where compare-and-swap always returns immediately, but it's not a problem right now with etcd and consul. Because of this I still think it's worth it to merge this change. |
@dimitarvdimitrov is the |
yes |
The CHANGELOG has just been cut to prepare for the next Mimir release. Please rebase |
@dimitarvdimitrov I havn't worked on this because seems i saw some pull requests regarding member list, and i don't see much value in the PR, thoughts? |
I think this work will be needed for the work @aldernero is doing around memberlist for the HA tracker. Maybe he can pick this up or you two can work together. |
i wouldn't mind pairing up or if he just wants to take it on, either or. |
Thank you for your contribution. This pull request has been marked as stale because it has had no activity in the last 150 days. It will be closed in 30 days if there is no further activity. If you need more time, you can add a comment to the PR. |
What this PR does
The distributor doesn't wait for the HA tracker to complete initializing before starting to serve writes. This means that the distributor can start serving writes with an empty HA tracker state. An empty state will lead to accepting any sample from any HA replica. Allow fix to preload our cluster cache before writing to replicas. Want to ensure we are writing to the leader replica only.
Which issue(s) this PR fixes or relates to
Closes #5796
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.