Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] #17149

Open
wants to merge 35 commits into
base: trunk
Choose a base branch
from

Conversation

smjn
Copy link
Contributor

@smjn smjn commented Sep 10, 2024

  • Tightened the signature of ShareCoordinatorShard.combineStateBatches.
  • The combineStateBatches method takes in the current batches and the new list of batches and creates new records in case there is partial overlap between older and newer ones. It then sorts and merges the batch lists.
  • Added comprehensive tests for the above method.

Related to #17011 (comment)

Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can think of some more permutations.

What if some POSB was updated with multiple POSB, such as {(111-120, 0, 1)} updated with {(111-113, 0, 2),(114-114, 2, 1), (115-119, 0, 2)}?

What if some POSB was updated with a partially overlapping POSB, such as {(111-120, 0, 1)} updated with {(111-113, 0, 2)}?

new PersisterOffsetsStateBatch(105, 130, (byte) 0, (short) 1)
)),
Arrays.asList(
new PersisterOffsetsStateBatch(100, 110, (byte) 0, (short) 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result is correct, but I would expect (100, 130, (byte) 0, (short) 1)).

new TestAttributes(
"StartOffset is -1 => no batches are considered old but overlaps are still removed preferring the new set.",
new LinkedHashSet<>(Arrays.asList(
new PersisterOffsetsStateBatch(100, 110, (byte) 0, (short) 1), // should be removed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment // should be removed is incorrect.

@smjn
Copy link
Contributor Author

smjn commented Sep 10, 2024

I can think of some more permutations.

What if some POSB was updated with multiple POSB, such as {(111-120, 0, 1)} updated with {(111-113, 0, 2),(114-114, 2, 1), (115-119, 0, 2)}?

What if some POSB was updated with a partially overlapping POSB, such as {(111-120, 0, 1)} updated with {(111-113, 0, 2)}?

This is in keeping with the idea that SCR will not create new intervals (either due to partial interval state change or combining sequential intervals). This is fitting because the persister is a "dumb" component.
It simply maintains the intervals which callers send it in the order of arrival and prunes intervals which are expired due to start offset movement.
So,

  • in this example: {(111-120, 0, 1)} updated with {(111-113, 0, 2),(114-114, 2, 1), (115-119, 0, 2)} - the dumb persister will keep all of the intervals. If start offset moves, it will prune the ones such that lastOffset < startOffset thereby not continuously increasing in size. If we add more intelligence, we will create (119-120, 0, 1) which was never sent.
  • for this (111-120, 0, 1)} updated with {(111-113, 0, 2)} , it'll be the same situation - otherwise we are creating a new interval (114-120, 0, 1) which was never sent.
    Both the situations are handled in the test case:
    Sets have partial overlap => result list contains batches from both sets (we do not add gaps).
    and New set batch is contained in cur set => result list contains batches from both sets (we do not add gaps).

The caller (ShareParition) is anyway doing the work of manipulating this information and creating the in memory state.

@AndrewJSchofield
Copy link
Contributor

I can think of some more permutations.
What if some POSB was updated with multiple POSB, such as {(111-120, 0, 1)} updated with {(111-113, 0, 2),(114-114, 2, 1), (115-119, 0, 2)}?
What if some POSB was updated with a partially overlapping POSB, such as {(111-120, 0, 1)} updated with {(111-113, 0, 2)}?

This is in keeping with the idea that SCR will not create new intervals (either due to partial interval state change or combining sequential intervals). This is fitting because the persister is a "dumb" component. It simply maintains the intervals which callers send it in the order of arrival and prunes intervals which are expired due to start offset movement. So,

  • in this example: {(111-120, 0, 1)} updated with {(111-113, 0, 2),(114-114, 2, 1), (115-119, 0, 2)} - the dumb persister will keep all of the intervals. If start offset moves, it will prune the ones such that lastOffset < startOffset thereby not continuously increasing in size. If we add more intelligence, we will create (119-120, 0, 1) which was never sent.
  • for this (111-120, 0, 1)} updated with {(111-113, 0, 2)} , it'll be the same situation - otherwise we are creating a new interval (114-120, 0, 1) which was never sent.
    Both the situations are handled in the test case:
    Sets have partial overlap => result list contains batches from both sets (we do not add gaps).
    and New set batch is contained in cur set => result list contains batches from both sets (we do not add gaps).

The caller (ShareParition) is anyway doing the work of manipulating this information and creating the in memory state.

This is fine with me in principle. We just need to ensure that @apoorvmittal10 and @adixitconfluent are aligned too.

Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I'm still reviewing it but here are some initial comments.

// Any batches where the last offset is < the current start offset
// are now expired. We should remove them from the persister.
// will take care of overlapping batches
Queue<PersisterStateBatch> batchQueue = new LinkedList<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is processing the batches we have so far, I'm not convinced that it is worth merging and pruning at this point. I understand that the start offset might have changed, but surely it's already optimised by the previous pass through.

}
} else if (batch.firstOffset() < cur.firstOffset() && batch.lastOffset() < cur.lastOffset()) {
// covers
// ______
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the little diagrams. I suggest adding the cur: and batch: to these on lines 630 and 640 too.

// overlap with the new one.
// Following cases will not produce any new records so need not be handled.
// cur: ____ ______ ______ _____
// new: ________ ______ _________ _________
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably ought to use batch: in the diagram. For these cases, I think the idea is that cur is superseded by batch, so batchQueue.add(batch) at line 650 is sufficient to add the new batch.



// covers
// cur: ______ _____ _____ ______
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second case here overlaps with the second case above (matching first and last offsets). Logically that might be fine in the code, but it looks like a contradiction in the comments.

Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Probably will discuss the batching you have done or we can simplify things if we require something in order from Share Partition.

Comment on lines 344 to 349
<suppress checks="MethodLength"
files="ShareCoordinatorShardTest.java"/>
<suppress checks="NPathComplexity"
files="ShareCoordinatorShard.java"/>
<suppress checks="CyclomaticComplexity"
files="ShareCoordinatorShard.java"/>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: While it's fine to avoid suppressions but avoid if we have better way to handle the situation in code. For MethodLength you might want to have a method to generate new TestAttributes or something similar to avoid method length issue.

Copy link
Contributor Author

@smjn smjn Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem will still remain, since the test cases are curated to create various scenarios. We cannot generate them deterministically. If I add a new private method to return the tests, it will still have MethodLength issue.
Best I can do is to suppress warning for this specific method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppressing just that method makese sense.

Comment on lines 100 to 111
public int compareTo(Object o) {
PersisterStateBatch that = (PersisterStateBatch) o;
int deltaFirst = Long.compare(this.firstOffset(), that.firstOffset());
if (deltaFirst == 0) {
int deltaLast = Long.compare(this.lastOffset(), that.lastOffset());
if (deltaLast == 0) {
return this.deliveryCount() - that.deliveryCount();
}
return deltaLast;
}
return deltaFirst;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about deliveryState is that not important to compare for this class? If not then can we please write a comment regarding why it's skipped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its managed in the next revision

// Any batches where the last offset is < the current start offset
// are now expired. We should remove them from the persister.
// will take care of overlapping batches
Queue<PersisterStateBatch> batchQueue = new LinkedList<>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: Why it simply cannot be a List of type LinkedList, what queue operations we need here?

@@ -791,6 +793,304 @@ public void testNonSequentialBatchUpdates() {
verify(shard.getMetricsShard(), times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}

@Test
public void testStateBatchCombine() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better if you parametrize the test with thest input you created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameterization will again result in MethodLength issue as the tests need to be placed in a generator method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But parametrization will not fail specific scenario which is often easy to debug and fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will add

@apoorvmittal10 apoorvmittal10 added the KIP-932 Queues for Kafka label Sep 12, 2024
));

for (PersisterStateBatch batch : modifiedNewBatches) {
for (int i = 0; i < batchQueue.size(); i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will re-evaluate batchQueue.size() on each loop iteration, and it could in theory mutate quite significantly. I don't believe this is a safe loop condition.

return batches;
}
Stack<PersisterStateBatch> stack = new Stack<>();
stack.add(batches.get(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be batches.remove(0)? Otherwise, the first element will be added to the stack and also the initial candidate in the loop.

Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've started going through and left some comments. More to come later.

@@ -25,7 +25,7 @@
/**
* This class contains the information for a single batch of state information for use by the {@link Persister}.
*/
public class PersisterStateBatch {
public class PersisterStateBatch implements Comparable {
private final long firstOffset;
private final long lastOffset;
private final byte deliveryState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In sorting terms, this is <firstOffset, lastOffset, deliveryCount, deliveryState>. I suggest putting the member variables in the same order.

return new BatchOverlapState(null, null, nonOverlapping);
}

private static int compareBatchState(PersisterStateBatch b1, PersisterStateBatch b2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do with a comment. This is approximately following the contract for the methods like Short.compare(short x, short y). If x > y then +ve, if x < y then -ve, if x == y then 0.

return finalBatches;
}

private static BatchOverlapState getOverlappingState(TreeSet<PersisterStateBatch> batchSet) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely needs a comment. For example, if the batch set is empty, it will throw an exception, so don't call it with an empty set.


BatchOverlapState overlapState = getOverlappingState(sortedBatches);

while (overlapState != BatchOverlapState.SENTINEL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe that overlapState will ever be SENTINEL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we'll make it

// candidate: ______ ____ _________ ____ ____ _______
// max batches: 1 2 2 3 2 2
// min batches: 1 1 1 1 1 2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, at line 676, both of last and candidate are members of sortedBatches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there is only one set - sortedBatches from which we find the first overlapping pair and call the members last and candidate.

if (candidate.firstOffset() == last.firstOffset()) {
if (candidate.lastOffset() == last.lastOffset()) { // case 1
if (compareBatchState(candidate, last) < 0) { // candidate is lower priority
sortedBatches.add(last);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

candidate has not been removed, but shouldn't it be?

Copy link
Contributor Author

@smjn smjn Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually on second look, there is no state comparison needed. By nature of the compareTo method we defined, candidate can never have lower priority if last.firstOffset == candidate.firstOffset and last.lastOffset == candidate.lastOffset and since it is already present in the treeset, we don't need to do anything for case 1.

Because of above properties, the original was also valid (since treeset will not allow duplicates).

will simplify the condition

if (compareBatchState(candidate, last) < 0) { // candidate is lower priority
sortedBatches.add(last);
} else { // last is lower priority
sortedBatches.add(candidate);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And doesn't this duplicate candidate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

return finalBatches;
}

private static BatchOverlapState getOverlappingState(TreeSet<PersisterStateBatch> batchSet) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would tend to call this argument sortedBatches for consistency with the caller.

.build();
.map(ShareCoordinatorShard::toPersisterStateBatch)
.collect(Collectors.toList()), newStartOffset))
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: trailing spaces

Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one final comment.

sortedBatches.remove(last); // remove older smaller interval
sortedBatches.remove(candidate);

last = new PersisterStateBatch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to assign to last here. The pattern you've followed in the later cases simply adds the constructed object.

// since sortedBatches order takes that into account.
sortedBatches.add(candidate);
} else {
// case 2 is not possible with TreeSet. It is symmetric to case 3.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I suppose that case 2 would actually be case 3 because of the sorting order of the key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that true, have kept the example for completeness sake

@smjn smjn changed the title KAFKA-17367: Share coordinator impl. Added additional tests. [3/N] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] Sep 23, 2024
@github-actions github-actions bot added the core Kafka Broker label Sep 23, 2024
Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. Thanks for the diligent work on the PR comments.

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @smjn! It's exciting to see more of the batching logic landing :)

I have not yet reviewed the actual batch merging logic, but instead focused on overall code structure. I also let a suggestion on reducing boilerplate in the new test. BTW, nice usage of the stream MethodSource 😄

* @param b2 - {@link PersisterStateBatch} to compare
* @return int representing comparison result.
*/
private static int compareBatchState(PersisterStateBatch b1, PersisterStateBatch b2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comparison is not considering the offsets. In what cases are we comparing batches without the offsets?

Copy link
Contributor Author

@smjn smjn Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the offsets to determine the relative positioning and then use just that state to determine how to break the batches. Based on state, overlapping batches could be merged or broken down.
example:

1. 
------               [1,10,0,1]
    -------          [5,15,0,1]
=>
----------          [1,15,0,1]

2.
------               [1,10,0,1]
    -------          [5,15,2,1]
=>
----          [1,4,0,1]
     ------- [5,15,2,1]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so we first compare on delivery count and then break ties with delivery state. Why does the delivery count have higher precedence? Intuitively, I would assume the delivery state would be higher precedence than the count.

Actually, is it even possible to merge batches with a different delivery state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind delivery count taking precedence is because it has the connotation of epoch. Higher number => higher chance of it being fresher.
If 2 batches have same delivery count then some states which are considered terminal like ACK or ARCHIVED take precedence. Since the ordinal in the delivery state enum is numerically higher for these, they blend in well.
Betweek ACK and ARCHIVED, it is acceptable to take any (we take ARCHIVED).

HOWEVER, ARCHIVED state in not implemented in kip-932

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ARCHIVING is not implemented in KIP_932, and that's because it's going to be in the DLQ KIP to follow.

* @param sortedBatches - TreeSet representing sorted set of {@link PersisterStateBatch}
* @return object of {@link BatchOverlapState} representing overlapping pair and non-overlapping prefix
*/
private static BatchOverlapState getOverlappingState(TreeSet<PersisterStateBatch> sortedBatches) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the fall-through case here will always copy everything from sortedBatches into the List of non-overlapping batches. Instead of this copy, could we return an Optional here where the absence of a value indicates fall-through, and the presence of a value indicates some overlapping state was found?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the algorithm, we need the non-overalapping prefix to be removed from the treeset so this might not work

Comment on lines 78 to 80
BatchOverlapState overlapState = getOverlappingState(sortedBatches);

while (overlapState != BatchOverlapState.SENTINEL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider making this into a class. As written, we have a utility function that has quite a bit of state and a large while loop with many branches.

Perhaps a PersisterStateBatchCombiner class? If I understand this workflow, we are doing:

  1. sort the batches given
  2. merge the batches
  3. return merged results

Steps 2 and 3 can be done in a streaming/iterable fashion which can potentially reduce our memory usage.

Copy link
Contributor Author

@smjn smjn Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the logic is not that simple. A single iteration over the sorted batches might not be enough.
The invariant is that the batches remain sorted even after manipulation

Consider:

--------                    A [1,10,0,1]
     ---                     B [5,7,0,2]
     --------------       C [5,15,0,3]

A and B will combine to

----             [1,4,0,1]
     ----        [5,7,0,2]
          ---    [8,10,0,1]

Now when combining with C, we have 2 previous batches to consider.

Secondly,

-----------                 A [1,10,0,1]
     ---                     B [5,7,0,2]
     ---                     C [5,7,0,3]

A and B will combine to

----             [1,4,0,1]
     ----        [5,7,0,2]
          ---    [8,10,0,1]
     ---         <- C - we broke invariant for being sorted by batches 

In the current impl, these situations are implicitly handled by virtue of the treeset. Any newly generated batches are pushed back into the treeset and the getOverlappingState method finds the first overlapping pair as well as returns the non-overlapping prefix.
The non-overlapping prefix is then REMOVED from the treeset hence, once a batch is no longer overlapping, it is only looked at once guaranteeing running time efficiency.

@AndrewJSchofield
@mumrah

Comment on lines 66 to 67
byte deliveryState,
short deliveryCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would let these be int so we can avoid casting in the test code.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smjn : Thanks for the PR. Made a pass of non testing files. Left a few comments.

if (deltaFirst == 0) {
int deltaLast = Long.compare(this.lastOffset(), that.lastOffset());
if (deltaLast == 0) {
int deltaCount = this.deliveryCount() - that.deliveryCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the deliverStates are the same before comparing deliveryCount?

Copy link
Contributor Author

@smjn smjn Sep 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this was a conscious decision:
Plz check discussion #17149 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It would be useful to add a comment to explain this.

return;
}

sortedBatches = new TreeSet<>(combinedBatchList);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, we probably want to initialize sortedBatches in the constructor as combinedBatchList and finalBatchList.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done here as the treeset might not be needed at all if intervals are less than 2, hence a small optimization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

Another question. This code is called from ShareCoordinatorShard.handleShareUpdate. Is it expensive to recreate a TreeSet for each ShareCoordinatorShard.handleShareUpdate call?

while (iter.hasNext()) {
PersisterStateBatch candidate = iter.next();
if (candidate.firstOffset() <= prev.lastOffset() || // overlap
prev.lastOffset() + 1 == candidate.firstOffset() && compareBatchState(prev, candidate) == 0) { // contiguous
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unintuitive to return continuous batches as overlap batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only continuous batches with exactly same state are returned here.
We can perhaps rename the method to getMergeCandidates()?

// prev: ------ ------- ------- ------- ------- -------- -------
// candidate: ------ ---- ---------- --- ---- ------- -------
handleSameStateOverlap(prev, candidate);
} else { // diff state and non-contiguous overlap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if we reach here, it's possible for prev and candidate to be contiguous, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it's not.
getOverlappingState returns continuous pair ONLY IF the pair same delivery count and state.

The condition in the if block will capture continuous case for same state.

If it reaches else it has to be overlapping and different state. The getOverlappingState does not return continuous and different state pairs.

In fact, it updates the treeset by removing any which are found.

* @param b2 - {@link PersisterStateBatch} to compare
* @return int representing comparison result.
*/
private int compareBatchState(PersisterStateBatch b1, PersisterStateBatch b2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use PersisterStateBatch.compareTo ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we explicitly want to compare the 2 state parameters only.
We decide whether the pair can be combined based on offsets but when actually merging we can 1, 2 or 3 resultant batches. This is purely determined by the delivery count and state.

@smjn smjn requested a review from junrao September 28, 2024 02:53
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smjn : Thanks for the updated PR. Made a pass of all files. A few more comments.

return;
}

sortedBatches = new TreeSet<>(combinedBatchList);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

Another question. This code is called from ShareCoordinatorShard.handleShareUpdate. Is it expensive to recreate a TreeSet for each ShareCoordinatorShard.handleShareUpdate call?

if (deltaFirst == 0) {
int deltaLast = Long.compare(this.lastOffset(), that.lastOffset());
if (deltaLast == 0) {
int deltaCount = this.deliveryCount() - that.deliveryCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It would be useful to add a comment to explain this.


static List<PersisterStateBatch> singleBatch(
long firstOffset,
long prevOffset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called prevOffset instead of lastOffset ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remnant of a previous find a and replace.

),

new BatchTestHolder(
"Candidate lower state. Candidate first and prev offsets strictly larger than prev.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchTestHolder uses curList and newList while the text uses candidate and prev. It would be useful to make them consistent.

BatchTestHolder.multiBatch()
.addBatch(100, 110, 0, 1)
.addBatch(121, 130, 0, 1)
.addBatch(105, 115, 0, 1) // overlap with 1st batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, could that happen? We call combineStateBatches() on every ShareCoordinatorShard.handleShareUpdate. So the current state should not contain overlapping ranges?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is defensive - at some point we will develop tooling which will allow adding/removing records to the __share_group_state topic to repair bad state.
In that case, if there is a repetition due to human input, the algorithm is capable of handling it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But every ShareUpdateRecord is replayed through handleShareUpdate, which calls combineStateBatches().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, do we still need to include overlapping offset ranges in batchesSoFar?


new BatchTestHolder(
"Handle overlapping batches with different priority.",
BatchTestHolder.singleBatch(100, 110, 0, 1), //[(100-115, 0, 1), (121-130, 0, 1)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment doesn't seem to match the code?

@smjn
Copy link
Contributor Author

smjn commented Sep 30, 2024

Another question. This code is called from ShareCoordinatorShard.handleShareUpdate. Is it expensive to recreate a TreeSet for each ShareCoordinatorShard.handleShareUpdate call?

We can revisit this if it turns out ot be a bottleneck.

@smjn smjn requested a review from junrao September 30, 2024 20:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants