Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor might start serving writes with an empty HA tracke… #6991

Conversation

TimKotowski
Copy link
Contributor

@TimKotowski TimKotowski commented Dec 24, 2023

What this PR does

The distributor doesn't wait for the HA tracker to complete initializing before starting to serve writes. This means that the distributor can start serving writes with an empty HA tracker state. An empty state will lead to accepting any sample from any HA replica. Allow fix to preload our cluster cache before writing to replicas. Want to ensure we are writing to the leader replica only.

Which issue(s) this PR fixes or relates to

Closes #5796

Checklist

  • Tests updated.
  • NA Documentation Written
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX].
  • NA about-versioning.md updated with experimental features.

@TimKotowski TimKotowski marked this pull request as ready for review December 24, 2023 22:25
@TimKotowski TimKotowski requested a review from a team as a code owner December 24, 2023 22:25
@TimKotowski TimKotowski marked this pull request as draft December 25, 2023 00:27
@TimKotowski TimKotowski marked this pull request as ready for review December 25, 2023 00:37
@TimKotowski TimKotowski marked this pull request as draft December 25, 2023 01:45
@TimKotowski TimKotowski marked this pull request as ready for review December 25, 2023 17:03
@TimKotowski
Copy link
Contributor Author

@dimitarvdimitrov Is it okay if I get a look at this?

@dimitarvdimitrov
Copy link
Contributor

is someone more familiar with the distributor up to reviewing this, perhaps @duricanikolic @ying-jeanne @aknuds1 @fayzal-g? Otherwise, I can take a look in the first week of January

@TimKotowski TimKotowski marked this pull request as draft December 27, 2023 17:46
Comment on lines +227 to +228
// TODO what is the benefit to allowing an a concurrent/parallel process here
// What is the amount fo keys we are getting back? Is it inefficient to serially allow this behavior?
Copy link
Contributor Author

@TimKotowski TimKotowski Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a TODO here, what is the benefit of allowing asynchronous behavior for getting a replica based on the keys coming back from the h.client.List. I'm still very new to contributing, and I couldn't really find what the the average length of keys that come back from h.client.List is?. I don't wanna create goroutines for each key here if its not gonna bring much benefit at all. But I would like a discussion on the amount of keys getting returned and the performance impact this could bring if left serially. Since this is a pre load like state? If this should be asynchronous I wouldn't want this to all be block as well? thoughts? My thought is it's low since its per user/cluster right? For the key, so it shouldn't be to drastic of a keys coming back from the List?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimitarvdimitrov @duricanikolic @ying-jeanne @aknuds1 @fayzal-g pinging people that seem to know a lot of distributor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked a few clusters internally and etcd GET latency averages at below 5ms. It will add some latency to startup, but I don't think 5-10 seconds are crucial. If they end up causing problems for someone, we can improve it. I think your current approach is good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, thank you for this.

@TimKotowski TimKotowski marked this pull request as ready for review January 5, 2024 22:56
@TimKotowski
Copy link
Contributor Author

Anyone back from holidays able to look through PR and comments?

@dimitarvdimitrov
Copy link
Contributor

Anyone back from holidays able to look through PR and comments?

sorry for the delay, I will take a look at this PR in the coming days

Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your work. I'm starting to doubt whether this bug was ever present or if I misread the code initially.

I tried commenting out the initial sync that you added and expected that TestStartingSync will fail. I suspect it didn't because if the HA tracker sees a new replica which it hasn't seen before it will update the KV store before returning the value. The update happens by a compare-and-swap, and prevents from overwriting the correct leader (r1 in the test case).

// If the entry in KVStore is up-to-date, just stop the loop.
if h.withinUpdateTimeout(now, desc.ReceivedAt) ||
// If our replica is different, wait until the failover time.
desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < h.cfg.FailoverTimeout {
return nil, false, nil
}
}

Can you verify this too?

This will be a problem if we decide to support memberlist (#1597) where compare-and-swap always returns immediately, but it's not a problem right now with etcd and consul. Because of this I still think it's worth it to merge this change.

@@ -28,6 +28,7 @@
* [BUGFIX] Fix issue where all incoming HTTP requests have duplicate trace spans. #6920
* [BUGFIX] Querier: do not retry requests to store-gateway when a query gets canceled. #6934
* [BUGFIX] Querier: return 499 status code instead of 500 when a request to remote read endpoint gets canceled. #6934
* [BUGFIX] Distributor: distributor might start serving writes with an empty HA tracker state. #5796
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put the PR number here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for sure will, ty for this.

// haTracker holds HATrackerConfig with the prefix.
keys, err := h.client.List(ctx, "")
if err != nil {
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to list replica keys", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this will also fail the serve startup i think it's ok to not log it here. Otherwise we end up with duplicate logging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, ill remove.

@@ -274,6 +291,41 @@ const (
deletionTimeout = 30 * time.Minute
)

func (h *haTracker) processKVStoreEntry(key string, replica *ReplicaDesc) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function always returns true. Can we just omit the return parameter?

Copy link
Contributor Author

@TimKotowski TimKotowski Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I can. And you are 100% correct I was thinking about that. The only issue i had with keeping it is processKVStoreEntry is used in the WatchPrefix. And processKVStoreEntry returns out of WatchPrefix with that function call. And WatchPrefix requires a return of a bool right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in this case we can hardcode the return true in WatchPrefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, sounds good, ill do that. ty.

Comment on lines +306 to +318
h.electedReplicaChanges.DeleteLabelValues(user, cluster)
h.electedReplicaTimestamp.DeleteLabelValues(user, cluster)

h.electedLock.Lock()
defer h.electedLock.Unlock()
userClusters := h.clusters[user]
if userClusters != nil {
delete(userClusters, cluster)
if len(userClusters) == 0 {
delete(h.clusters, user)
}
}
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was part of the code before your change, but I think now is a good time to clean it up a bit. Can you move it into its own function? perhaps h.cleanUpDeletedReplica

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing.

val, err := h.client.Get(ctx, keys[i])
if err != nil {
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to get replica value", "key", keys[i], "err", err)
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your approach of continuing with the startup even if there are some errors. I think it's ok to continue here as well. Is there a specific reason you decided to return instead?

Copy link
Contributor Author

@TimKotowski TimKotowski Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were certain errors I thought that mattered.

  1. What if for the h.client there were bigger issue's (like a connectivity issues) which made me think the startingFn behavior would have issues since it uses the h.client. You are right a lot of error returning here I should continue just moving on with instead of returning out with an error. The only error that makes sense to return out would be the one where if the context was canceled so something bigger happened in the system that mattered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What if for the h.client there were bigger issue's (like a connectivity issues) which made me think the startingFn behavior would have issues since it uses the h.client

We could potentially use retries with backoff in case the KV store has issues.

I just checked the regular HA tracker logic and changed my mind. On the regular push path we return a 5xx error in case the KV store is unavailable. I think on startup it makes more sense to follow the same logic. That is - instead of continueing we return an error. We should still stick to the error tolerance that the regular checkSample has (e.g. if the key doesn't follow the expected 2-segmented format).

WDYT?

Comment on lines +227 to +228
// TODO what is the benefit to allowing an a concurrent/parallel process here
// What is the amount fo keys we are getting back? Is it inefficient to serially allow this behavior?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked a few clusters internally and etcd GET latency averages at below 5ms. It will add some latency to startup, but I don't think 5-10 seconds are crucial. If they end up causing problems for someone, we can improve it. I think your current approach is good.


require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use t.Cleanup instead of defer like in the consul instance above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ill make that change. ty.

h.electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds())

return true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
}

assert.NoError(t, err)

// Check to see if the value in the trackers cache is correct.
checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica, now)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the "user" also go in a variable (preferably a constant) at the top of the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, good catch.

@@ -792,6 +791,58 @@ func TestCheckReplicaCleanup(t *testing.T) {
))
}

// TestStartingSync tests that values are synced to KVStore on startup
func TestStartingSync(t *testing.T) {
t.Run("check that if a second replica is set up during preLoad that it does not write to it since its not the leader", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test description looks out of place. Since there's only one test case here can we remove the t.Run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i can.

@TimKotowski
Copy link
Contributor Author

TimKotowski commented Jan 10, 2024

@dimitarvdimitrov
I tried commenting out the initial sync that you added and expected that TestStartingSync will fail. I suspect it didn't because if the HA tracker sees a new replica which it hasn't seen before it will update the KV store before returning the value. The update happens by a compare-and-swap, and prevents from overwriting the correct leader (r1 in the test case).

-- Based on the comment above you wrote.

Based on my finding in the test, it will not update the KV Store for replica 2 aka r2 if it has not seen it before. Instead it hits here.

else {
			// Sample received is from non-elected replica: record details and reject.
			entry.nonElectedLastSeenReplica = replica
			entry.nonElectedLastSeenTimestamp = timestamp.FromTime(now)
			err = newReplicasDidNotMatchError(replica, entry.elected.Replica)
		}

and return the error, since this tests checks if this is the same (leader) replica for the same cluster and if it is will update the KV store, if its a different replica for the same cluster it will return out with error. I Assume that does not line up with the accepted behavior you want?

for the replica of r1,

h.kvCASCalls.WithLabelValues(userID, cluster).Inc()
	// If cache is currently empty, add the data we either stored or received from KVStore
	if err == nil && desc != nil {
		h.electedLock.Lock()
		if h.clusters[userID][cluster] == nil {
			h.updateCache(userID, cluster, desc)
		}
		h.electedLock.Unlock()
	}

it will hit here. I'm assuming your asking if it finds a different replica for the same cluster that if its updating it or not? This test seems to follow that it wont if its a different replica for the same cluster.

I dont see a case in the test where it hits the code you provided right here.

// If the entry in KVStore is up-to-date, just stop the loop. 
 	if h.withinUpdateTimeout(now, desc.ReceivedAt) || 
 		// If our replica is different, wait until the failover time. 
 		desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < h.cfg.FailoverTimeout { 
 		return nil, false, nil 
 	} 
 } 

Am I following correctly what you want to verify system's tracker detected a new replica and updated its information store in a way that preserves the integrity of the current leader node? But I dont see in the test the new replica getting updated in the KV Store, but it does preserve that the leader replica.

@dimitarvdimitrov
Copy link
Contributor

So in the context of the changes before your PR (i.e. in the context of what's on main:

Based on my finding in the test, it will not update the KV Store for replica 2 aka r2 if it has not seen it before. Instead it hits here.

else {
			// Sample received is from non-elected replica: record details and reject.
			entry.nonElectedLastSeenReplica = replica
			entry.nonElectedLastSeenTimestamp = timestamp.FromTime(now)
			err = newReplicasDidNotMatchError(replica, entry.elected.Replica)
		}

you're right and I think this behaviour is the correct one. However, r2 doesn't end up there because of the initial syncs. When attempting with r2, the code should hit here because the in-memory cache doesn't contain the requested cluster

// We don't know about this cluster yet.
nClusters := len(h.clusters[userID])
h.electedLock.Unlock()
// If we have reached the limit for number of clusters, error out now.
if limit := h.limits.MaxHAClusters(userID); limit > 0 && nClusters+1 > limit {
return newTooManyClustersError(limit)
}
err := h.updateKVStore(ctx, userID, cluster, replica, now)
if err != nil {
level.Error(h.logger).Log("msg", "failed to update KVStore - rejecting sample", "err", err)
return err
}
// Cache will now have the value - recurse to check it again.
return h.checkReplica(ctx, userID, cluster, replica, now)

then

err := h.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
var ok bool
if desc, ok = in.(*ReplicaDesc); ok && desc.DeletedAt == 0 {
// If the entry in KVStore is up-to-date, just stop the loop.
if h.withinUpdateTimeout(now, desc.ReceivedAt) ||
// If our replica is different, wait until the failover time.
desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < h.cfg.FailoverTimeout {
return nil, false, nil
}
}
// Attempt to update KVStore to our timestamp and replica.
desc = &ReplicaDesc{
Replica: replica,
ReceivedAt: timestamp.FromTime(now),
DeletedAt: 0,
}
return desc, true, nil
})

this line in particular

// If our replica is different, wait until the failover time.
desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < h.cfg.FailoverTimeout {
return nil, false, nil

so in essence the in-memory cache is lazily updated as requests come in. But an empty cache doesn't mean that any replica will be elected by the distributor.


I found this out with a debugger. It makes stepping through the code easier

@TimKotowski
Copy link
Contributor Author

TimKotowski commented Jan 11, 2024

@dimitarvdimitrov i see. You're saying when the sync restarts, replica 2 should be hitting the in memory cache and being updated in the KV store right? Are you sure It should be hitting in the code you specified? Technically replica 2 is part of the same cluster as replica 1 right? so shouldn't it not hit the code flow you pointed out? Or is that what we want the change, even if its part the same cluster we should still up the KV store with the other replica's for that cluster?

@dimitarvdimitrov
Copy link
Contributor

Technically replica 2 is part of the same cluster as replica 1 right? so shouldn't it not hit the code flow you pointed out?

after the restart the in-memory cache is empty (in the state of the code on the main branch). So this forces the CAS update in the KV store. The update ends up being just a fetch and doesn't update anything in the KV store.

Or is that what we want the change, even if its part the same cluster we should still up the KV store with the other replica's for that cluster?

I don't think so. This PR should only sync the cluster on startup, but not really skip the cache if the cache is present.

@TimKotowski
Copy link
Contributor Author

TimKotowski commented Jan 13, 2024

@dimitarvdimitrov I tried commenting out the initial sync that you added and expected that TestStartingSync will fail. I suspect it didn't because if the HA tracker sees a new replica which it hasn't seen before it will update the KV store before returning the value. The update happens by a compare-and-swap, and prevents from overwriting the correct leader (r1 in the test case).

___ From comment above

Looking through debugger as the CAS portion, you are right. CAS will happen, and it will see a new replica (r2) which it hasn't seen before and do a check and set. Either way, the cache will update. I guess this brings me to a point where should this ticket be then? I assume we just wanted to update the cache with the necessary info before it can serve writes.

@dimitarvdimitrov
Copy link
Contributor

I guess this brings me to a point where should this ticket be then?

This will be a problem if we decide to support memberlist (#1597) where compare-and-swap always returns immediately, but it's not a problem right now with etcd and consul. Because of this I still think it's worth it to merge this change.

@TimKotowski
Copy link
Contributor Author

@dimitarvdimitrov is the memberlist referring to https://github.com/hashicorp/memberlist?

@dimitarvdimitrov
Copy link
Contributor

yes

@duricanikolic
Copy link
Contributor

The CHANGELOG has just been cut to prepare for the next Mimir release. Please rebase main and eventually move the CHANGELOG entry added / updated in this PR to the top of the CHANGELOG document. Thanks!

@TimKotowski
Copy link
Contributor Author

@dimitarvdimitrov I havn't worked on this because seems i saw some pull requests regarding member list, and i don't see much value in the PR, thoughts?

@dimitarvdimitrov
Copy link
Contributor

I think this work will be needed for the work @aldernero is doing around memberlist for the HA tracker. Maybe he can pick this up or you two can work together.

@TimKotowski
Copy link
Contributor Author

TimKotowski commented May 16, 2024

I think this work will be needed for the work @aldernero is doing around memberlist for the HA tracker. Maybe he can pick this up or you two can work together.

i wouldn't mind pairing up or if he just wants to take it on, either or.

Copy link
Contributor

github-actions bot commented Jan 8, 2025

Thank you for your contribution. This pull request has been marked as stale because it has had no activity in the last 150 days. It will be closed in 30 days if there is no further activity. If you need more time, you can add a comment to the PR.

@github-actions github-actions bot added the stale label Jan 8, 2025
@dimitarvdimitrov
Copy link
Contributor

#5796 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

distributor might start serving writes with an empty HA tracker state
3 participants