Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor might start serving writes with an empty HA tracke… #6991

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* [BUGFIX] Fix issue where all incoming HTTP requests have duplicate trace spans. #6920
* [BUGFIX] Querier: do not retry requests to store-gateway when a query gets canceled. #6934
* [BUGFIX] Querier: return 499 status code instead of 500 when a request to remote read endpoint gets canceled. #6934
* [BUGFIX] Distributor: distributor might start serving writes with an empty HA tracker state. #5796
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put the PR number here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for sure will, ty for this.

* [BUGFIX] Querier: fix issue where `-querier.max-fetched-series-per-query` is not applied to `/series` endpoint if the series are loaded from ingesters. #7055

### Mixin
Expand Down
118 changes: 85 additions & 33 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,55 @@ func newHATracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Re
t.client = client
}

t.Service = services.NewBasicService(nil, t.loop, nil)
t.Service = services.NewBasicService(t.syncHAStateOnStart, t.loop, nil)
return t, nil
}

// Will update the internal cache to be in sync with the KV Store
func (h *haTracker) syncHAStateOnStart(ctx context.Context) error {
if !h.cfg.EnableHATracker {
return nil
}

// haTracker holds HATrackerConfig with the prefix.
keys, err := h.client.List(ctx, "")
if err != nil {
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to list replica keys", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this will also fail the serve startup i think it's ok to not log it here. Otherwise we end up with duplicate logging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, ill remove.

return err
}

if len(keys) == 0 {
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: no keys for HA tracker prefix", "err", err)
return nil
}

// TODO what is the benefit to allowing an a concurrent/parallel process here
// What is the amount fo keys we are getting back? Is it inefficient to serially allow this behavior?
Comment on lines +227 to +228
Copy link
Contributor Author

@TimKotowski TimKotowski Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a TODO here, what is the benefit of allowing asynchronous behavior for getting a replica based on the keys coming back from the h.client.List. I'm still very new to contributing, and I couldn't really find what the the average length of keys that come back from h.client.List is?. I don't wanna create goroutines for each key here if its not gonna bring much benefit at all. But I would like a discussion on the amount of keys getting returned and the performance impact this could bring if left serially. Since this is a pre load like state? If this should be asynchronous I wouldn't want this to all be block as well? thoughts? My thought is it's low since its per user/cluster right? For the key, so it shouldn't be to drastic of a keys coming back from the List?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimitarvdimitrov @duricanikolic @ying-jeanne @aknuds1 @fayzal-g pinging people that seem to know a lot of distributor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked a few clusters internally and etcd GET latency averages at below 5ms. It will add some latency to startup, but I don't think 5-10 seconds are crucial. If they end up causing problems for someone, we can improve it. I think your current approach is good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, thank you for this.

for i := 0; i < len(keys); i++ {
if ctx.Err() != nil {
return ctx.Err()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've had some issues with improper context cancellation and vague context cancellation messages. Let's use context.Cause(ctx) instead of ctx.Err, and also wrap the error into something saying when the error happened: fmt.Errorf("syncing HA tracker state on startup: %w")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh gotcha, makes sense. I'll make that change, ty.

}

val, err := h.client.Get(ctx, keys[i])
if err != nil {
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to get replica value", "key", keys[i], "err", err)
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your approach of continuing with the startup even if there are some errors. I think it's ok to continue here as well. Is there a specific reason you decided to return instead?

Copy link
Contributor Author

@TimKotowski TimKotowski Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were certain errors I thought that mattered.

  1. What if for the h.client there were bigger issue's (like a connectivity issues) which made me think the startingFn behavior would have issues since it uses the h.client. You are right a lot of error returning here I should continue just moving on with instead of returning out with an error. The only error that makes sense to return out would be the one where if the context was canceled so something bigger happened in the system that mattered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What if for the h.client there were bigger issue's (like a connectivity issues) which made me think the startingFn behavior would have issues since it uses the h.client

We could potentially use retries with backoff in case the KV store has issues.

I just checked the regular HA tracker logic and changed my mind. On the regular push path we return a 5xx error in case the KV store is unavailable. I think on startup it makes more sense to follow the same logic. That is - instead of continueing we return an error. We should still stick to the error tolerance that the regular checkSample has (e.g. if the key doesn't follow the expected 2-segmented format).

WDYT?

}

desc, ok := val.(*ReplicaDesc)
if !ok {
level.Error(h.logger).Log("msg", "syncHAStateOnStart: got invalid replica descriptor", "key", keys[i])
continue
}

if notProcessed := h.processKVStoreEntry(keys[i], desc); !notProcessed {
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to processed replica value to cache", "key", keys[i], "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Warn(h.logger).Log("msg", "syncHAStateOnStart: failed to processed replica value to cache", "key", keys[i], "err", err)
level.Warn(h.logger).Log("msg", "sync HA state on start: failed to process replica value to cache", "key", keys[i], "err", err)

since those would be read by people, it's easier to understand if it's in plain English. Same applies to the other few log lines.

}
}

return nil
}

// Follows pattern used by ring for WatchKey.
func (h *haTracker) loop(ctx context.Context) error {
if !h.cfg.EnableHATracker {
Expand All @@ -226,39 +271,11 @@ func (h *haTracker) loop(ctx context.Context) error {
// The KVStore config we gave when creating h should have contained a prefix,
// which would have given us a prefixed KVStore client. So, we can pass empty string here.
h.client.WatchPrefix(ctx, "", func(key string, value interface{}) bool {
replica := value.(*ReplicaDesc)
segments := strings.SplitN(key, "/", 2)

// Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid.
if len(segments) != 2 {
return true
}

user := segments[0]
cluster := segments[1]

if replica.DeletedAt > 0 {
h.electedReplicaChanges.DeleteLabelValues(user, cluster)
h.electedReplicaTimestamp.DeleteLabelValues(user, cluster)

h.electedLock.Lock()
defer h.electedLock.Unlock()
userClusters := h.clusters[user]
if userClusters != nil {
delete(userClusters, cluster)
if len(userClusters) == 0 {
delete(h.clusters, user)
}
}
return true
replica, ok := value.(*ReplicaDesc)
if !ok {
return false
}

// Store the received information into our cache
h.electedLock.Lock()
h.updateCache(user, cluster, replica)
h.electedLock.Unlock()
h.electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds())
return true
return h.processKVStoreEntry(key, replica)
})

wg.Wait()
Expand All @@ -274,6 +291,41 @@ const (
deletionTimeout = 30 * time.Minute
)

func (h *haTracker) processKVStoreEntry(key string, replica *ReplicaDesc) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function always returns true. Can we just omit the return parameter?

Copy link
Contributor Author

@TimKotowski TimKotowski Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I can. And you are 100% correct I was thinking about that. The only issue i had with keeping it is processKVStoreEntry is used in the WatchPrefix. And processKVStoreEntry returns out of WatchPrefix with that function call. And WatchPrefix requires a return of a bool right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in this case we can hardcode the return true in WatchPrefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, sounds good, ill do that. ty.

segments := strings.SplitN(key, "/", 2)

// Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid.
if len(segments) != 2 {
return true
}

user := segments[0]
cluster := segments[1]

if replica.DeletedAt > 0 {
h.electedReplicaChanges.DeleteLabelValues(user, cluster)
h.electedReplicaTimestamp.DeleteLabelValues(user, cluster)

h.electedLock.Lock()
defer h.electedLock.Unlock()
userClusters := h.clusters[user]
if userClusters != nil {
delete(userClusters, cluster)
if len(userClusters) == 0 {
delete(h.clusters, user)
}
}
return true
Comment on lines +306 to +318
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was part of the code before your change, but I think now is a good time to clean it up a bit. Can you move it into its own function? perhaps h.cleanUpDeletedReplica

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing.

}

// Store the received information into our cache
h.electedLock.Lock()
h.updateCache(user, cluster, replica)
h.electedLock.Unlock()
h.electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds())

return true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
}

func (h *haTracker) updateKVLoop(ctx context.Context) {
cleanupTick := time.NewTicker(util.DurationWithJitter(cleanupCyclePeriod, cleanupCycleJitterVariance))
defer cleanupTick.Stop()
Expand Down
53 changes: 52 additions & 1 deletion pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func TestWatchPrefixAssignment(t *testing.T) {
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck

// Write the first time.
now := time.Now()

Expand Down Expand Up @@ -792,6 +791,58 @@ func TestCheckReplicaCleanup(t *testing.T) {
))
}

// TestStartingSync tests that values are synced to KVStore on startup
func TestStartingSync(t *testing.T) {
t.Run("check that if a second replica is set up during preLoad that it does not write to it since its not the leader", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test description looks out of place. Since there's only one test case here can we remove the t.Run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i can.

cluster := "c1"
replica := "r1"
var c *haTracker
var err error
var now time.Time

codec := GetReplicaDescCodec()
kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

mock := kv.PrefixClient(kvStore, "prefix")
c, err = newHATracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 15 * time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Millisecond * 2,
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

now = time.Now()
err = c.checkReplica(context.Background(), "user", cluster, replica, now)
assert.NoError(t, err)

// Check to see if the value in the trackers cache is correct.
checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica, now)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the "user" also go in a variable (preferably a constant) at the top of the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, good catch.

services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck

replicaTwo := "r2"
c, err = newHATracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 15 * time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Millisecond * 2,
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger())

require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use t.Cleanup instead of defer like in the consul instance above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ill make that change. ty.


now = time.Now()
err = c.checkReplica(context.Background(), "user", cluster, replicaTwo, now)
assert.Error(t, err)
assert.ErrorIs(t, err, replicasDidNotMatchError{replica: "r2", elected: "r1"})
})
}

func checkUserClusters(t *testing.T, duration time.Duration, c *haTracker, user string, expectedClusters int) {
t.Helper()
test.Poll(t, duration, nil, func() interface{} {
Expand Down
Loading