-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] #17149
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can think of some more permutations.
What if some POSB was updated with multiple POSB, such as {(111-120, 0, 1)}
updated with {(111-113, 0, 2),(114-114, 2, 1), (115-119, 0, 2)}
?
What if some POSB was updated with a partially overlapping POSB, such as {(111-120, 0, 1)}
updated with {(111-113, 0, 2)}
?
new PersisterOffsetsStateBatch(105, 130, (byte) 0, (short) 1) | ||
)), | ||
Arrays.asList( | ||
new PersisterOffsetsStateBatch(100, 110, (byte) 0, (short) 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This result is correct, but I would expect (100, 130, (byte) 0, (short) 1))
.
new TestAttributes( | ||
"StartOffset is -1 => no batches are considered old but overlaps are still removed preferring the new set.", | ||
new LinkedHashSet<>(Arrays.asList( | ||
new PersisterOffsetsStateBatch(100, 110, (byte) 0, (short) 1), // should be removed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment // should be removed
is incorrect.
This is in keeping with the idea that SCR will not create new intervals (either due to partial interval state change or combining sequential intervals). This is fitting because the persister is a "dumb" component.
The caller (ShareParition) is anyway doing the work of manipulating this information and creating the in memory state. |
This is fine with me in principle. We just need to ensure that @apoorvmittal10 and @adixitconfluent are aligned too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I'm still reviewing it but here are some initial comments.
// Any batches where the last offset is < the current start offset | ||
// are now expired. We should remove them from the persister. | ||
// will take care of overlapping batches | ||
Queue<PersisterStateBatch> batchQueue = new LinkedList<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this is processing the batches we have so far, I'm not convinced that it is worth merging and pruning at this point. I understand that the start offset might have changed, but surely it's already optimised by the previous pass through.
} | ||
} else if (batch.firstOffset() < cur.firstOffset() && batch.lastOffset() < cur.lastOffset()) { | ||
// covers | ||
// ______ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the little diagrams. I suggest adding the cur:
and batch:
to these on lines 630 and 640 too.
// overlap with the new one. | ||
// Following cases will not produce any new records so need not be handled. | ||
// cur: ____ ______ ______ _____ | ||
// new: ________ ______ _________ _________ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably ought to use batch:
in the diagram. For these cases, I think the idea is that cur
is superseded by batch
, so batchQueue.add(batch)
at line 650 is sufficient to add the new batch.
|
||
|
||
// covers | ||
// cur: ______ _____ _____ ______ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second case here overlaps with the second case above (matching first and last offsets). Logically that might be fine in the code, but it looks like a contradiction in the comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Probably will discuss the batching you have done or we can simplify things if we require something in order from Share Partition.
checkstyle/suppressions.xml
Outdated
<suppress checks="MethodLength" | ||
files="ShareCoordinatorShardTest.java"/> | ||
<suppress checks="NPathComplexity" | ||
files="ShareCoordinatorShard.java"/> | ||
<suppress checks="CyclomaticComplexity" | ||
files="ShareCoordinatorShard.java"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: While it's fine to avoid suppressions but avoid if we have better way to handle the situation in code. For MethodLength
you might want to have a method to generate new TestAttributes
or something similar to avoid method length issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem will still remain, since the test cases are curated to create various scenarios. We cannot generate them deterministically. If I add a new private method to return the tests, it will still have MethodLength issue.
Best I can do is to suppress warning for this specific method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppressing just that method makese sense.
public int compareTo(Object o) { | ||
PersisterStateBatch that = (PersisterStateBatch) o; | ||
int deltaFirst = Long.compare(this.firstOffset(), that.firstOffset()); | ||
if (deltaFirst == 0) { | ||
int deltaLast = Long.compare(this.lastOffset(), that.lastOffset()); | ||
if (deltaLast == 0) { | ||
return this.deliveryCount() - that.deliveryCount(); | ||
} | ||
return deltaLast; | ||
} | ||
return deltaFirst; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about deliveryState
is that not important to compare for this class? If not then can we please write a comment regarding why it's skipped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its managed in the next revision
// Any batches where the last offset is < the current start offset | ||
// are now expired. We should remove them from the persister. | ||
// will take care of overlapping batches | ||
Queue<PersisterStateBatch> batchQueue = new LinkedList<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding: Why it simply cannot be a List of type LinkedList, what queue operations we need here?
@@ -791,6 +793,304 @@ public void testNonSequentialBatchUpdates() { | |||
verify(shard.getMetricsShard(), times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); | |||
} | |||
|
|||
@Test | |||
public void testStateBatchCombine() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better if you parametrize the test with thest input you created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parameterization will again result in MethodLength issue as the tests need to be placed in a generator method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But parametrization will not fail specific scenario which is often easy to debug and fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will add
)); | ||
|
||
for (PersisterStateBatch batch : modifiedNewBatches) { | ||
for (int i = 0; i < batchQueue.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will re-evaluate batchQueue.size()
on each loop iteration, and it could in theory mutate quite significantly. I don't believe this is a safe loop condition.
return batches; | ||
} | ||
Stack<PersisterStateBatch> stack = new Stack<>(); | ||
stack.add(batches.get(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this be batches.remove(0)
? Otherwise, the first element will be added to the stack and also the initial candidate in the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've started going through and left some comments. More to come later.
@@ -25,7 +25,7 @@ | |||
/** | |||
* This class contains the information for a single batch of state information for use by the {@link Persister}. | |||
*/ | |||
public class PersisterStateBatch { | |||
public class PersisterStateBatch implements Comparable { | |||
private final long firstOffset; | |||
private final long lastOffset; | |||
private final byte deliveryState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In sorting terms, this is <firstOffset, lastOffset, deliveryCount, deliveryState>. I suggest putting the member variables in the same order.
return new BatchOverlapState(null, null, nonOverlapping); | ||
} | ||
|
||
private static int compareBatchState(PersisterStateBatch b1, PersisterStateBatch b2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could do with a comment. This is approximately following the contract for the methods like Short.compare(short x, short y)
. If x > y then +ve, if x < y then -ve, if x == y then 0.
return finalBatches; | ||
} | ||
|
||
private static BatchOverlapState getOverlappingState(TreeSet<PersisterStateBatch> batchSet) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This definitely needs a comment. For example, if the batch set is empty, it will throw an exception, so don't call it with an empty set.
|
||
BatchOverlapState overlapState = getOverlappingState(sortedBatches); | ||
|
||
while (overlapState != BatchOverlapState.SENTINEL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe that overlapState will ever be SENTINEL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we'll make it
// candidate: ______ ____ _________ ____ ____ _______ | ||
// max batches: 1 2 2 3 2 2 | ||
// min batches: 1 1 1 1 1 2 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, at line 676, both of last
and candidate
are members of sortedBatches
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, there is only one set - sortedBatches from which we find the first overlapping pair and call the members last and candidate.
if (candidate.firstOffset() == last.firstOffset()) { | ||
if (candidate.lastOffset() == last.lastOffset()) { // case 1 | ||
if (compareBatchState(candidate, last) < 0) { // candidate is lower priority | ||
sortedBatches.add(last); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
candidate
has not been removed, but shouldn't it be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually on second look, there is no state comparison needed. By nature of the compareTo method we defined, candidate can never have lower priority if last.firstOffset == candidate.firstOffset and last.lastOffset == candidate.lastOffset and since it is already present in the treeset, we don't need to do anything for case 1.
Because of above properties, the original was also valid (since treeset will not allow duplicates).
will simplify the condition
if (compareBatchState(candidate, last) < 0) { // candidate is lower priority | ||
sortedBatches.add(last); | ||
} else { // last is lower priority | ||
sortedBatches.add(candidate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And doesn't this duplicate candidate
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
return finalBatches; | ||
} | ||
|
||
private static BatchOverlapState getOverlappingState(TreeSet<PersisterStateBatch> batchSet) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would tend to call this argument sortedBatches
for consistency with the caller.
.build(); | ||
.map(ShareCoordinatorShard::toPersisterStateBatch) | ||
.collect(Collectors.toList()), newStartOffset)) | ||
.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit: trailing spaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one final comment.
sortedBatches.remove(last); // remove older smaller interval | ||
sortedBatches.remove(candidate); | ||
|
||
last = new PersisterStateBatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to assign to last
here. The pattern you've followed in the later cases simply adds the constructed object.
// since sortedBatches order takes that into account. | ||
sortedBatches.add(candidate); | ||
} else { | ||
// case 2 is not possible with TreeSet. It is symmetric to case 3. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I suppose that case 2 would actually be case 3 because of the sorting order of the key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that true, have kept the example for completeness sake
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. Thanks for the diligent work on the PR comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch @smjn! It's exciting to see more of the batching logic landing :)
I have not yet reviewed the actual batch merging logic, but instead focused on overall code structure. I also let a suggestion on reducing boilerplate in the new test. BTW, nice usage of the stream MethodSource 😄
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java
Outdated
Show resolved
Hide resolved
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java
Outdated
Show resolved
Hide resolved
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java
Outdated
Show resolved
Hide resolved
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java
Outdated
Show resolved
Hide resolved
* @param b2 - {@link PersisterStateBatch} to compare | ||
* @return int representing comparison result. | ||
*/ | ||
private static int compareBatchState(PersisterStateBatch b1, PersisterStateBatch b2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comparison is not considering the offsets. In what cases are we comparing batches without the offsets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the offsets to determine the relative positioning and then use just that state to determine how to break the batches. Based on state, overlapping batches could be merged or broken down.
example:
1.
------ [1,10,0,1]
------- [5,15,0,1]
=>
---------- [1,15,0,1]
2.
------ [1,10,0,1]
------- [5,15,2,1]
=>
---- [1,4,0,1]
------- [5,15,2,1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so we first compare on delivery count and then break ties with delivery state. Why does the delivery count have higher precedence? Intuitively, I would assume the delivery state would be higher precedence than the count.
Actually, is it even possible to merge batches with a different delivery state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea behind delivery count taking precedence is because it has the connotation of epoch. Higher number => higher chance of it being fresher.
If 2 batches have same delivery count then some states which are considered terminal like ACK or ARCHIVED take precedence. Since the ordinal in the delivery state enum is numerically higher for these, they blend in well.
Betweek ACK and ARCHIVED, it is acceptable to take any (we take ARCHIVED).
HOWEVER, ARCHIVED state in not implemented in kip-932
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ARCHIVING is not implemented in KIP_932, and that's because it's going to be in the DLQ KIP to follow.
* @param sortedBatches - TreeSet representing sorted set of {@link PersisterStateBatch} | ||
* @return object of {@link BatchOverlapState} representing overlapping pair and non-overlapping prefix | ||
*/ | ||
private static BatchOverlapState getOverlappingState(TreeSet<PersisterStateBatch> sortedBatches) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the fall-through case here will always copy everything from sortedBatches
into the List of non-overlapping batches. Instead of this copy, could we return an Optional here where the absence of a value indicates fall-through, and the presence of a value indicates some overlapping state was found?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the algorithm, we need the non-overalapping prefix to be removed from the treeset so this might not work
BatchOverlapState overlapState = getOverlappingState(sortedBatches); | ||
|
||
while (overlapState != BatchOverlapState.SENTINEL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should consider making this into a class. As written, we have a utility function that has quite a bit of state and a large while loop with many branches.
Perhaps a PersisterStateBatchCombiner class? If I understand this workflow, we are doing:
- sort the batches given
- merge the batches
- return merged results
Steps 2 and 3 can be done in a streaming/iterable fashion which can potentially reduce our memory usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No the logic is not that simple. A single iteration over the sorted batches might not be enough.
The invariant is that the batches remain sorted even after manipulation
Consider:
-------- A [1,10,0,1]
--- B [5,7,0,2]
-------------- C [5,15,0,3]
A and B will combine to
---- [1,4,0,1]
---- [5,7,0,2]
--- [8,10,0,1]
Now when combining with C, we have 2 previous batches to consider.
Secondly,
----------- A [1,10,0,1]
--- B [5,7,0,2]
--- C [5,7,0,3]
A and B will combine to
---- [1,4,0,1]
---- [5,7,0,2]
--- [8,10,0,1]
--- <- C - we broke invariant for being sorted by batches
In the current impl, these situations are implicitly handled by virtue of the treeset. Any newly generated batches are pushed back into the treeset and the getOverlappingState
method finds the first overlapping pair as well as returns the non-overlapping prefix.
The non-overlapping prefix is then REMOVED from the treeset hence, once a batch is no longer overlapping, it is only looked at once guaranteeing running time efficiency.
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java
Outdated
Show resolved
Hide resolved
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java
Outdated
Show resolved
Hide resolved
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java
Outdated
Show resolved
Hide resolved
byte deliveryState, | ||
short deliveryCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would let these be int
so we can avoid casting in the test code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@smjn : Thanks for the PR. Made a pass of non testing files. Left a few comments.
if (deltaFirst == 0) { | ||
int deltaLast = Long.compare(this.lastOffset(), that.lastOffset()); | ||
if (deltaLast == 0) { | ||
int deltaCount = this.deliveryCount() - that.deliveryCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check the deliverStates are the same before comparing deliveryCount?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this was a conscious decision:
Plz check discussion #17149 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. It would be useful to add a comment to explain this.
return; | ||
} | ||
|
||
sortedBatches = new TreeSet<>(combinedBatchList); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, we probably want to initialize sortedBatches in the constructor as combinedBatchList and finalBatchList.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done here as the treeset might not be needed at all if intervals are less than 2, hence a small optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
Another question. This code is called from ShareCoordinatorShard.handleShareUpdate
. Is it expensive to recreate a TreeSet for each ShareCoordinatorShard.handleShareUpdate
call?
while (iter.hasNext()) { | ||
PersisterStateBatch candidate = iter.next(); | ||
if (candidate.firstOffset() <= prev.lastOffset() || // overlap | ||
prev.lastOffset() + 1 == candidate.firstOffset() && compareBatchState(prev, candidate) == 0) { // contiguous |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems unintuitive to return continuous batches as overlap batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only continuous batches with exactly same state are returned here.
We can perhaps rename the method to getMergeCandidates()
?
// prev: ------ ------- ------- ------- ------- -------- ------- | ||
// candidate: ------ ---- ---------- --- ---- ------- ------- | ||
handleSameStateOverlap(prev, candidate); | ||
} else { // diff state and non-contiguous overlap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if we reach here, it's possible for prev and candidate to be contiguous, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it's not.
getOverlappingState returns continuous pair ONLY IF the pair same delivery count and state.
The condition in the if block will capture continuous case for same state.
If it reaches else it has to be overlapping and different state. The getOverlappingState does not return continuous and different state pairs.
In fact, it updates the treeset by removing any which are found.
* @param b2 - {@link PersisterStateBatch} to compare | ||
* @return int representing comparison result. | ||
*/ | ||
private int compareBatchState(PersisterStateBatch b1, PersisterStateBatch b2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just use PersisterStateBatch.compareTo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No we explicitly want to compare the 2 state parameters only.
We decide whether the pair can be combined based on offsets but when actually merging we can 1, 2 or 3 resultant batches. This is purely determined by the delivery count and state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@smjn : Thanks for the updated PR. Made a pass of all files. A few more comments.
return; | ||
} | ||
|
||
sortedBatches = new TreeSet<>(combinedBatchList); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
Another question. This code is called from ShareCoordinatorShard.handleShareUpdate
. Is it expensive to recreate a TreeSet for each ShareCoordinatorShard.handleShareUpdate
call?
if (deltaFirst == 0) { | ||
int deltaLast = Long.compare(this.lastOffset(), that.lastOffset()); | ||
if (deltaLast == 0) { | ||
int deltaCount = this.deliveryCount() - that.deliveryCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. It would be useful to add a comment to explain this.
|
||
static List<PersisterStateBatch> singleBatch( | ||
long firstOffset, | ||
long prevOffset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this called prevOffset instead of lastOffset ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remnant of a previous find a and replace.
), | ||
|
||
new BatchTestHolder( | ||
"Candidate lower state. Candidate first and prev offsets strictly larger than prev.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BatchTestHolder uses curList and newList while the text uses candidate and prev. It would be useful to make them consistent.
BatchTestHolder.multiBatch() | ||
.addBatch(100, 110, 0, 1) | ||
.addBatch(121, 130, 0, 1) | ||
.addBatch(105, 115, 0, 1) // overlap with 1st batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, could that happen? We call combineStateBatches()
on every ShareCoordinatorShard.handleShareUpdate
. So the current state should not contain overlapping ranges?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is defensive - at some point we will develop tooling which will allow adding/removing records to the __share_group_state topic to repair bad state.
In that case, if there is a repetition due to human input, the algorithm is capable of handling it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But every ShareUpdateRecord is replayed through handleShareUpdate
, which calls combineStateBatches()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, do we still need to include overlapping offset ranges in batchesSoFar?
|
||
new BatchTestHolder( | ||
"Handle overlapping batches with different priority.", | ||
BatchTestHolder.singleBatch(100, 110, 0, 1), //[(100-115, 0, 1), (121-130, 0, 1)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment doesn't seem to match the code?
We can revisit this if it turns out ot be a bottleneck. |
ShareCoordinatorShard.combineStateBatches
.combineStateBatches
method takes in the current batches and the new list of batches and creates new records in case there is partial overlap between older and newer ones. It then sorts and merges the batch lists.Related to #17011 (comment)