Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider using memberlist for distributor's HA-tracker #1597

Open
colega opened this issue Mar 31, 2022 · 22 comments
Open

Consider using memberlist for distributor's HA-tracker #1597

colega opened this issue Mar 31, 2022 · 22 comments

Comments

@colega
Copy link
Contributor

colega commented Mar 31, 2022

Is your feature request related to a problem? Please describe.

Currently the HA-tracker feature of the Distributor requires having an etcd cluster, which isn't a small requirement.

Describe the solution you'd like

I would like to use memberlist, if possible, for HA-tracker.

Describe alternatives you've considered

Disabling HA-tracker.

Additional context

This was discussed on the first Mimir Community Call, on Mar 31st 2022, and @pstibrany raised some concerns about memberlist propagation delay, so it's not only about switching the implementation, but actually about having confidence that memberlist is good enough. cc @pracucci & @bboreham who also participated in the conversation.

@pracucci
Copy link
Collaborator

pracucci commented Apr 1, 2022

I would like to better understand why a very quick propagation is required for the HA tracker. Given the HA tracker triggers a failover once we're not receiving any sample from the elected replica since X seconds, even if the propagation of the new elected replica is "slow" (few seconds) it shouldn't matter much.

@xvzf
Copy link

xvzf commented Apr 2, 2022

Maybe it'd be a good idea to use Kubernetes integrated leader-election?

I stumbled across this just now while trying to deploy Mimir with Tanka. I'm on k8s 1.23 already in my homely so I had to port the ETCD Operator CRDs to v1 etc - not optimal :)

@pstibrany
Copy link
Member

There are two values stored in KV store that need to be synchronized between distributors

  • Current elected replica (replica field)
  • received_at

If distributors have different view on these values, it's easy to get into situation when different distributors make different decisions.

For example if one distributor doesn't see up-to-date received_at, and decides to make a switch to different replica. It then writes the decision back to KV store (with updated both received_at and replica), but in the meantime other distributor seeing previous replica and up-to-date received_at will happily ingest data from previous replica.

If this other distributor also updates received_at and writes it back to KV store, now we have conflicting updates in the gossip network. How are we going to deal with them?

@pracucci
Copy link
Collaborator

pracucci commented Apr 7, 2022

For example if one distributor doesn't see up-to-date received_at, and decides to make a switch to different replica

With the default config, the received_at is updated every 15s while the failover timeout is 30s. So, it means it has to take more than 15s to propagate the changes, right? If my math is correct then the question is whether with memberlist we can guarantee a propagation within 15s in a large cluster.

If this other distributor also updates received_at and writes it back to KV store, now we have conflicting updates in the gossip network. How are we going to deal with them?

Given I agree we need to handle conflicts (and we need to be prepared for them in edge cases), the conflict here could be solved picking the most recent received_at?

Even if we have split brain for a short period of time, what could really happen? We ingest samples from both replicas and some of them will fail because OOO or we could end up with data loss? Still thinking about the data loss...

@pstibrany
Copy link
Member

Given I agree we need to handle conflicts (and we need to be prepared for them in edge cases), the conflict here could be solved picking the most recent received_at?

Yes. Most important thing is to get all nodes agreeing on the current value, taking replica with most recent received_at will work as long as received_at is not the same. If it is the same, we can take "lower" replica (ie. "a" < "b").

@pstibrany
Copy link
Member

pstibrany commented Apr 22, 2022

Even if we have split brain for a short period of time, what could really happen? We ingest samples from both replicas and some of them will fail because OOO or we could end up with data loss? Still thinking about the data loss...

I don't see how we would lose data (but me not seeing it doesn't mean anything). I think the problem is that we can ingest different subsets of series from conflicting requests.

For example, replicas A and B send request at the same time, and both are ingested (due to propagation delay). Both requests have some histogram data.

Some series for histogram (few buckets) are accepted from replica A, and some other series for the same histogram (other buckets, sum, count) are accepted from replica B. If these histogram data wasn't scraped at exact same time, end result may be that we will store inconsistent "snapshot" of histogram. I wonder if this can lead to wrong results being returned by PromQL.

@bboreham
Copy link
Contributor

So, it means it has to take more than 15s to propagate the changes, right?

There is some jitter on how each node reacts to the times; this defaults to 5 seconds max so probably "more than 10s" is more correct.

But I think your broad point is in the right direction: we expect propagation latencies to be a lot lower than this.

@gouthamve
Copy link
Member

We ingest samples from both replicas and some of them will fail because OOO

The worst case scenario is that fast moving counters can have a lower value for a higher timestamp. i.e, one Prometheus sends a sample (ts, val): (1000, 5000) and another Prometheus sends (1001, 4999), this is a very unlikely scenario but with fast moving counters and given Prometheus' assign the timestamp, it could happen.

But I think this is a non-issue because Prometheus HA Pairs don't scrape at the same time but rather based on the hostname and external labels: https://github.com/prometheus/prometheus/blob/fc3f1b8ded810a1263f72d37db9db4e8ce726120/scrape/manager.go#L260-L262

we could end up with data loss

We absolutely could tbh. Lets say we have a scenario with P1 and P2 and P1 is the leader. And assume that the data volume is low. Lets say we have 5 distributors (d0-d4)

P1 is 10mins behind (for whatever reason) and P2 is upto date. d1 sees a sample and updates the received_at, but d4 doesn't see it yet and maybe it doesn't receive a sample for the last 30s and it accepts a sample from P2. Now even if it later gets the info that P1 is still the leader, the damage will be done and there will be a dataloss of 10mins cuz P1 is behind.

In a SaaS environment there could be tenants scraping just one or two targets, or sending a low enough volume of metrics that its possible that only a subset of distributors get samples every 30s. Whether for such a low volume, we should support HA deduping is another question.

@colega
Copy link
Contributor Author

colega commented Jun 2, 2022

We absolutely could tbh. Lets say we have a scenario with P1 and P2 and P1 is the leader. And assume that the data volume is low. Lets say we have 5 distributors (d0-d4)

P1 is 10mins behind (for whatever reason) and P2 is upto date..

Whenever he leader is behind, switching to a leader will cause data loss. Yes, it's more likely to happen with memberlist's high latency when data volume is low, but it's also less likely that someone with low data volume actually uses HA.

@dimitarvdimitrov
Copy link
Contributor

dimitarvdimitrov commented Jul 15, 2022

I had two ideas:

  • I think we may be able to get reliable state replication with memberlist (<15 sec). For some perspective, hashicorp managed to get full replication (dissemination they call it) (see Table V) within 22s in a cluster of 120 nodes and a gossip interval of 1s. Unfortunately they don't give a formula. Mimir's default gossip interval is 200ms. I think we should experiment with this to get an idea of how fast we can replicate a value.
    • Maybe we should have a dedicated memberlist cluster for distributors to reduce the number of nodes and reduce dissemination period?
  • If we want a fast and consistent KV-store, I don't think memberlist will give us that (edit: well, Consul does that, but we're not Consul). But one of the distributors can act as a rudimentary one (e.g. GetReplica, ObserveReplicaReceivedAt). We can use the ring to choose a distributor as a leader (RF=0, find owner of token=1). The GEM admin-api uses a similar approach. This still leaves the problem of replication speed and split brain when choosing a leader. But it works reliably in the average case when the cluster is healthy.

@pracucci
Copy link
Collaborator

I think we may be able to get reliable state replication with memberlist (<15 sec)

We should do a real measurement with our settings.

I think we could write a simple app to measure the dissemination period. For example, add a testing key, edit the value in 1 replica, then poll all other replicas to check what's the value reported by them and measure the time it takes to propagate the change across replicas. Then repeat it continuously, and track some metrics, with 1/10/100/1K/10K nodes.

Maybe we should have a dedicated memberlist cluster for distributors to reduce the number of nodes and reduce dissemination period?

I find it easier to reasoning with only 1 memberlist cluster composed by all Mimir replicas. To keep it simple (which is the goal of this issue), I would discourage having a distributors-only memberlist cluster.

But one of the distributors can act as a rudimentary one

We want high availability for HA tracker. When a distributor is restarted or crashes, we don't want to loose the HA leader information.

@colega
Copy link
Contributor Author

colega commented Jul 15, 2022

Probably I'm going to say something stupid, but...

Can we use ingesters somehow to store this data? Data in the ingesters is naturally replicated, and reads after writes are guarranteed. Reading 1 series per write should not be a big deal if we properly chose the matchers (use a label that has few values, pointing to couple of series) and we can also cache that data for a while (there's no need to read in each write really).

Can we maybe store a timeseries with latest push from each cluster (that's 1 sample per push, can be ignored) and somehow take the unanimous decision in the distributor based on that?

When the desision isn't clear (too long since last push from the HA leader) we can use the ingesters as the decision mechanism (since they won't overwrite the samples unless OOO is enabled).

@pracucci
Copy link
Collaborator

Can we use ingesters somehow to store this data?

Dimitar mentioned it too in a private conversation. I think we're just giving for granted memberlist wouldn't work, but before building a more complicated system I would like to see a proof on why memberlist wouldn't work. So far I haven't seen it, and I think we didn't even pushed hard to get a fast memberlist propagation (which would benefit hash ring too).

The benefit of using memberlist is that distributors would have just to lookup the replica in-memory, with no sort of blocking call to do if the information is missing / stale.

@dimitarvdimitrov
Copy link
Contributor

sorry for the delayed update: I could not spend too much time on this so far and it looks like I won't be able to spend much more time on it soon. This is a summary of my findings:

What I was trying to find out: At how many nodes do KV store updates propagate in more than 15 seconds when using memberlist and how to improve that.

What I did: Run a cluster between 100 and 1000 memberlist nodes. I wrote a small binary that propagates timestamps, using the memberlist-backed KV store in dskit. In the setup only a single one of the N nodes was sending updates (leader); the idea is that this approximates the worst-case scenario when we get a single send batch by a client and need to update the timestamp across all distributors (in practice we have to update the value in all memberlist members). I ran the memberlist nodes as pods on kubernetes. Updates were sent every 15 seconds because this is what the Prometheus scrape interval was for my pods.

What I measured: basically two main things: the age of the value when it appears in a member (latency) and the age the value it replaces (staleness). The reasoning for the second one is that an update can get lost and the next update can come in time; measuring the staleness of the replaced value solves this. I have a dashboard that visualizes these metrics along with things like cluster size, memberlist message queue size, compute resource utilization. Grafana employees can find the "Memberlist HA" dashboard; I've attached the dashboard JSON below.

Findings:

  • At rest propagation time for all cluster sizes is super stable and is well below 2 seconds. The problem is with rollouts.
  • During rollouts even at 100 members latency increased to over 20 seconds (even when I had multiple leaders by mistake). The first thing that seemed to help was reducing -memberlist.packet-write-timeout and -memberlist.packet-dial-timeout to a value around the gossip interval (200ms). See graph 1.
  • After Marco suggested Proposal: make memberlist TCP Transport non blocking dskit#192, I tried a simple version of it and it made significant improvements during rollouts. See graph 2. There are still some spikes to 19 seconds. These seem to be happening when a node joins the cluster. I think the perceived latency happens because the joining node does a push/pull with a node that still hasn't received the last update, so the new node joins and records a stale update as the new update.
  • Staleness and latency usually stay together - when one worsens, the other worsens too
  • CPU throttling and memory start becoming a problem when scaling to 500 and 1000 nodes; using only 50Mi RAM and 50m CPU wasn't enough and pods started getting severely throttled and OOM-killed; I had to scale vertically, which in turn caused delays in the rollouts due to cluster auto-scaling. I increased to 400Mi and 250m, but also saw throttling when trying to scale beyond 1000 nodes.
    • A symptom of this was that nodes were dropping incoming messages. Log lines contained handler queue full, dropping message (3) from=10.68.10.23:7946. Default queue size is 1024 (HandoffQueueDepth), which should be enough.

My open questions:

  • what happens when we increase the size of messages? so far I was sending a unix millisecond timestamp as a string; that's 13 bytes and is unrealistic. How slower will bigger updates propagate?
  • Does CPU and memory consumption scale linearly with cluster size?

This is the branch I used - dimitar/memberlist-ha - and I've also attached the two k8s deployments and a service that I used.

memberlist-ha-svc.yaml.txt
memberlist-ha-deploy.yaml.txt
memberlist-ha-leader-deploy.yaml.txt
dashboard.json.txt

Graphs
Graph 1 - blue annotation shows when timeouts were reduced. There is smaller latency even when nodes go from 100 to 200
Screenshot 2022-08-03 at 16 18 49

Graph 2 - memberlist with async TCP transport sending
Screenshot 2022-08-03 at 16 29 10

@pracucci
Copy link
Collaborator

pracucci commented Aug 3, 2022

what happens when we increase the size of messages? so far I was sending a unix millisecond timestamp as a string; that's 13 bytes and is unrealistic

For the HA tracker purposes we'll need to transfer updates for this data structure, plus the cluster ID:

type ReplicaDesc struct {
Replica string `protobuf:"bytes,1,opt,name=replica,proto3" json:"replica,omitempty"`
ReceivedAt int64 `protobuf:"varint,2,opt,name=received_at,json=receivedAt,proto3" json:"received_at,omitempty"`
// Unix timestamp in milliseconds when this entry was marked for deletion.
// Reason for doing marking first, and delete later, is to make sure that distributors
// watching the prefix will receive notification on "marking" -- at which point they can
// already remove entry from memory. Actual deletion from KV store does *not* trigger
// "watch" notification with a key for all KV stores.
DeletedAt int64 `protobuf:"varint,3,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"`
}

Assuming an average 20 bytes for cluster ID and replica ID, the binary data we would have to transport may be in the order of: 20 + 20 + 8 + 8 = 56 bytes.

@colega
Copy link
Contributor Author

colega commented Aug 4, 2022

I'd expect anything smaller than MTU of ~1500B to take the same time to transmit, WDYT?

@colega
Copy link
Contributor Author

colega commented Aug 4, 2022

Does CPU and memory consumption scale linearly with cluster size?

I would expect it to do so, right? Every extra client can potentially send you a gossip message with a constant probability.

@pracucci
Copy link
Collaborator

pracucci commented Aug 4, 2022

Does CPU and memory consumption scale linearly with cluster size?
I would expect it to do so, right? Every extra client can potentially send you a gossip message with a constant probability.

This is true for the hash ring use case, but for the HA tracker use case I would expect the message passing be a factor of how many HA clusters the Mimir tenants are using, because I expect a data structure with a key for tenant and HA cluster, and propagate each key value independently (like in the ring we propagate each instance independently).

@st-akorotkov
Copy link

st-akorotkov commented Jun 13, 2023

Any news on this topic? Running Consul cluster only for ha_tracker in kinda overkill.

@tristanwl
Copy link

tristanwl commented Jul 12, 2023

Any chance we expect to see this ship relatively soon? As @st-akorotkov mentioned Consul just seems a bit heavy for only the ha_tracker component. Since @dimitarvdimitrov last updated almost a year ago, little has been done on this. I'm at the point of setting up Consul, although, I'd prefer to stick with memberlist if it's coming soon.

Will also check the contrib guidelines and see if maybe I can fork things off and get it functional. Seems like performance is the main concern as of now.

@dimitarvdimitrov
Copy link
Contributor

There is no planned progress on this as far as I know. The main concern is whether memberlist can propagate the updates fast enough during a rollout (which we consider the worst-case scenario due to members churn).

@dimitarvdimitrov
Copy link
Contributor

one thing we hadn't considered is how the HA tracker cache is populated upon startup. Currently an empty HA tracker cache is handled gracefully by delegating to the KV store and relying on CAS to return the value in the KV store.

Since CAS on memberlist only returns the local state, this won't work. #6991 addresses this, so I thought I'd share.

@aldernero aldernero self-assigned this Dec 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests