-
Notifications
You must be signed in to change notification settings - Fork 627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(v2): background compaction cleanup #3694
Conversation
49ff50d
to
000525c
Compare
# Conflicts: # api/gen/proto/go/metastore/v1/compactor.pb.go # api/gen/proto/go/metastore/v1/index.pb.go # api/gen/proto/go/metastore/v1/index_vtproto.pb.go # api/gen/proto/go/metastore/v1/metastorev1connect/index.connect.go # api/gen/proto/go/metastore/v1/types.pb.go # api/gen/proto/go/metastore/v1/types_vtproto.pb.go # api/metastore/v1/index.proto # api/metastore/v1/types.proto # pkg/experiment/metastore/cleaner_raft_handler.go # pkg/experiment/metastore/cleaner_service.go # pkg/experiment/metastore/client/methods.go # pkg/experiment/metastore/compaction_planner.go # pkg/experiment/metastore/compaction_raft_handler.go # pkg/experiment/metastore/compaction_service.go # pkg/experiment/metastore/fsm/fsm.go # pkg/experiment/metastore/index/index.go # pkg/experiment/metastore/index/store.go # pkg/experiment/metastore/index_service.go # pkg/experiment/metastore/markers/deletion_markers.go # pkg/experiment/metastore/metastore.go # pkg/experiment/metastore/raftnode/node.go
NB: in the latest optimization I broke block cleanup – fixing it now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of replicating the state of the compaction plan explicitly, it should dramatically reduce the likelihood of the state being inconsistent between replicas. It also solves the state issues when rolling out code changes to replicas.
I am not entirely sold on the implementation itself. I think we can proceed with merging this, but I would try to see if we can simplify a few things in a future iteration. My main concern is that this will be harder to maintain. We introduce many new concepts (a scheduler, planner, etc.), multiple layers of queues and many types representing different representations of compaction jobs. I fear that some parts (e.g., the compaction queue) will be hard to reason about when the knowledge about the internal workings is not as fresh as now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of these types seem to be used internally, can this be moved out of the api folder?
workers int | ||
free atomic.Int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type is called Worker and it is not immediately obvious what are these 2 fields for. Are there other names that can communicate that more clearly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to threads
and capactity
correspondingly.
ctx context.Context | ||
cancel context.CancelFunc | ||
*metastorev1.CompactionJob | ||
source []*metastorev1.BlockMeta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have sourceBlocks
already, maybe we can call this resolvedBlocks
or sourceMetas
or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an interesting read on the topic. I agree with the author and strive to follow the principles outlined in the article.
The version I decided to go with:
compacted, err := block.Compact(ctx, job.blocks)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how this is relevant, I was referring to the "source" field in the struct which is now called blocks
.
My point is that source
or blocks
are named too similarly to sourceBlocks
in *metastorev1.CompactionJob
. If someone is working with a compactionJob
object they have "sourceBlocks" and "blocks" to decide between. They both represent source blocks, one of them (blocks
) is initialized later than the other (resolved via a metastore client call), hence why I suggested we use a name with a qualifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly recommend this semi-official style guide and this piece of wisdom for further reading, in addition to the link I already shared.
Yes, I understand your point. I cannot agree.
You are right that source
might not be the best name, but for a slightly different reason. It's not possible to confuse or misuse job.SourceBlocks
with job.source
(now job.blocks
) since they have different types and visibility levels. I also believe the purpose of the members is very clear and specific in the context.
I just think blocks
is the clearest and most concise option here – perfect qualities for a variable name.
block.Compact(ctx, job.blocks) // I settled on this version.
block.Compact(ctx, job.source) // Works well but might be ambiguous for a reader who has no idea what a compaction job is and what the job source is.
block.Compact(ctx, job.sourceBlocks) // Quite good, but matches the input job.SourceBlocks and can be shortened without any clarity loss: any block we give at input is the source.
block.Compact(ctx, job.resolvedBlocks) // Irrelevant details. Besides, this is the only place in the whole codebase where we mention "resolved block".
block.Compact(ctx, job.sourceMetas) // Looks like we're compacting metas. There's no reason to include the type name in the variable name.
go func() { | ||
defer w.wg.Done() | ||
w.jobsLoop(ctx) | ||
level.Info(w.logger).Log("msg", "compaction worker thead started") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, typo in "thread"
for created := 0; created < capacity; created++ { | ||
plan, err := planner.CreateJob() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand why we try to create this number (capacity
) of jobs. Is it just a relatively low number to avoid having a large response here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a number of reasons:
- We do want to have small raft messages (where the planned job will end up).
- We don't want to create jobs ahead of time. This allows altering the planner, scheduler, and workers' configs at any time, with almost instant effect.
The rate I want our system to maintain is at least 1GB/s, which is roughly 256 segments (500ms) per second, or around 1M metadata entries and 50K compaction jobs per hour.
Consider the case when no workers are available (e.g., due to infrastructure issues, misconfiguration, or bugs), or when workers do not have enough capacity to handle all the jobs.
Consider also the case when the metastore is unavailable for a period (e.g., due to infrastructure issues, misconfiguration, or bugs), and our DLQ accumulates a substantial number of entries to process (e.g., 1M, which is a reasonable number). While we will pace the processing, many new blocks (and thus jobs) are to be added over a short period of time.
If there is no limit on the job queue size (which is TODO, BTW), and no limit on how many jobs we can create at once, ingestion could be blocked for a long period of time when the metastore or workers come back online (or added). However, if we produce no more jobs than we can actually handle, this will not happen: the service will adapt to the capacity of the worker fleet, and all jobs will eventually be created and scheduled.
In addition, the underlying implementation of the block queue can handle millions of entries without major issues. While the job queue is simpler, it may cause performance issues if not handled carefully – something we may want to address in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was referring to the choice of capacity
as the limit, not saying that we should not have a limit. We already assign jobs above so the capacity
might be "spent" already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The answer is here:
However, if we produce no more jobs than we can actually handle, this will not happen: the service will adapt to the capacity of the worker fleet, and all jobs will eventually be created and scheduled.
I think I need to expand on this and add it to the documentation. The challenge is that we don't know the capacity of our worker fleet in advance, and we have no control over them – they can appear and disappear at any time. Therefore, we need an adaptive approach if we want to keep our queue short and workers busy.
I considered a few options:
-
We produce the
assigned
number of jobs: no workers are assigned any jobs at all. There might be jobs ready for scheduling, but we do not add them to the scheduler because workers are left without assignments due to no jobs being in the scheduler queue. Loop. -
We produce the
capacity-assigned
number of jobs: we never utilize capacity fully. I'd call this strategy "greedy worker" – the intuition here is that everyone only cares about itself, but in the end, everyone starves. -
We produce the
capacity
number of jobs: we create jobs for another compaction worker instance. Essentially, we have an evidence that this number of jobs will eventually be handled. -
We ensure that our queue has at least
max_worker_capacity
number of assignable jobs in the queue. That works but it is trickier to implement and requires a parameter, which can be dynamic.
There's an unaddressed risk however: if all the workers just abandon all the jobs they take, it will inflate the queue. This is the reason why we need a hard cap and a displacement policy, regardless of the scheduling principles.
Thank you for the review Aleks!
I'd like to request more specific, actionable feedback. If you have tangible suggestions for simplification or areas where you see unnecessary complexity, I'm happy to discuss and explore adjustments. I suspect that one of us may be missing some nuances of the system's operation and failure modes. Let's discuss it next time we meet.
The current design introduces the following components:
Each of the components has a well-defined set of responsibilities. This separation of concerns is intentional to ensure the system remains maintainable as it evolves. If you believe the earlier version (and a couple of pieces here and there) was simpler and preferable, I'm open to a discussion.
The data structures involved – priority queues and linked lists – are standard and should be familiar to most developers. However, if specific parts of the codebase seem unclear, I'd be happy to add documentation or comments.
Apparently, you're referring to: message CompactionPlanUpdate {
repeated NewCompactionJob new_jobs = 1;
repeated AssignedCompactionJob assigned_jobs = 2;
repeated UpdatedCompactionJob updated_jobs = 3;
repeated CompletedCompactionJob completed_jobs = 4;
} A job has different attributes depending on its status (e.g., |
Sorry I can't provide more specific feedback right away, I was sharing my initial feeling about the changes. Things are a bit more clear after a second look. We are making a large shift from creating immutable jobs when we consume blocks to creating jobs on demand. We are also opening up the possibility of partial completion of "batches" which then requires extra care. As I said in my previous comment and as discussed offline, lets move forward with this and act later if needed. |
08d9387
to
65562d1
Compare
The change moves storage cleanup to the compaction worker service and alters the compaction orchestration: now we explicitly replicate state updates. The change is still being tested; however, the PR is ready for review.
Please refer to the README for details.
I'm going to improve the way compaction strategy is configured, and add compaction metrics (in compaction planner/scheduler/worker) before merging the PR.