-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] #17149
base: trunk
Are you sure you want to change the base?
Changes from 1 commit
2128532
5f377fb
8fecb1d
7042155
a299752
2e1d9a5
080d422
bce0cb3
f112e0c
fca3dcb
1e88e7d
0c7aadf
e483bd2
65f3b7b
fc548d2
9049ab6
1d1eb19
c892bb0
8fb168e
6993671
d20624b
6561d87
d747dd3
60bda5d
2e5003c
088bbf1
32db0a6
4d2eaac
9928084
86d1237
5e98616
7a63481
9d2e834
0c7436d
6f1725f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,9 +31,11 @@ public class StateBatchUtil { | |
/** | ||
* Util method which takes in 2 lists containing {@link PersisterStateBatch} | ||
* and the startOffset. | ||
* <p> | ||
* This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. | ||
* It then merges any contiguous intervals with same state. If states differ, | ||
* based on various conditions it creates new non-overlapping batches preferring new ones. | ||
* | ||
* @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} | ||
* @param newBatches - List containing {@link PersisterStateBatch} in incoming request | ||
* @param startOffset - startOffset to consider when removing old batches. | ||
|
@@ -65,6 +67,7 @@ public static List<PersisterStateBatch> combineStateBatches( | |
* - Based on various cases: | ||
* - swallow lower priority batch within bounds of offsets | ||
* - break batch into other non-overlapping batches | ||
* | ||
* @param batches - List of {@link PersisterStateBatch} | ||
* @return List of non-overlapping {@link PersisterStateBatch} | ||
*/ | ||
|
@@ -77,8 +80,8 @@ private static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> | |
|
||
BatchOverlapState overlapState = getOverlappingState(sortedBatches); | ||
|
||
while (overlapState != BatchOverlapState.SENTINEL) { | ||
PersisterStateBatch last = overlapState.last(); | ||
while (overlapState != BatchOverlapState.EMPTY) { | ||
PersisterStateBatch prev = overlapState.prev(); | ||
PersisterStateBatch candidate = overlapState.candidate(); | ||
|
||
// remove non overlapping prefix from sortedBatches, | ||
|
@@ -91,112 +94,112 @@ private static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> | |
} | ||
|
||
if (candidate == null) { | ||
overlapState = BatchOverlapState.SENTINEL; | ||
overlapState = BatchOverlapState.EMPTY; | ||
continue; | ||
} | ||
|
||
// remove both last and candidate for easier | ||
// remove both previous and candidate for easier | ||
// assessment about adding batches to sortedBatches | ||
sortedBatches.remove(last); | ||
sortedBatches.remove(prev); | ||
sortedBatches.remove(candidate); | ||
|
||
// overlap and same state (last.firstOffset <= candidate.firstOffset) due to sort | ||
// overlap and same state (prev.firstOffset <= candidate.firstOffset) due to sort | ||
// covers: | ||
// case: 1 2 3 4 5 6 7 (contiguous) | ||
// last: ______ _______ _______ _______ _______ ________ _______ | ||
// candidate: ______ ____ __________ ___ ____ _______ _______ | ||
if (compareBatchState(candidate, last) == 0) { | ||
// prev: ------ ------- ------- ------- ------- -------- ------- | ||
// candidate: ------ ---- ---------- --- ---- ------- ------- | ||
if (compareBatchState(candidate, prev) == 0) { | ||
sortedBatches.add(new PersisterStateBatch( | ||
last.firstOffset(), | ||
prev.firstOffset(), | ||
// cover cases | ||
// last: ______ ________ _________ | ||
// candidate: ___ __________ _____ | ||
Math.max(candidate.lastOffset(), last.lastOffset()), | ||
last.deliveryState(), | ||
last.deliveryCount() | ||
// prev: ------ -------- --------- | ||
// candidate: --- ---------- ----- | ||
Math.max(candidate.lastOffset(), prev.lastOffset()), | ||
prev.deliveryState(), | ||
prev.deliveryCount() | ||
)); | ||
} else if (candidate.firstOffset() <= last.lastOffset()) { // non-contiguous overlap | ||
} else if (candidate.firstOffset() <= prev.lastOffset()) { // non-contiguous overlap | ||
// overlap and different state | ||
// covers: | ||
// case: 1 2* 3 4 5 6 | ||
// last: ______ _______ _______ _______ _______ ________ | ||
// candidate: ______ ____ _________ ____ ____ _______ | ||
// prev: ------ ------- ------- ------- ------- -------- | ||
// candidate: ------ ---- --------- ---- ---- ------- | ||
// max batches: 1 2 2 3 2 2 | ||
// min batches: 1 1 1 1 1 2 | ||
// * not possible with treeset | ||
|
||
if (candidate.firstOffset() == last.firstOffset()) { | ||
if (candidate.lastOffset() == last.lastOffset()) { // case 1 | ||
if (candidate.firstOffset() == prev.firstOffset()) { | ||
if (candidate.lastOffset() == prev.lastOffset()) { // case 1 | ||
// candidate can never have lower or equal priority | ||
// since sortedBatches order takes that into account. | ||
sortedBatches.add(candidate); | ||
} else { | ||
// case 2 is not possible with TreeSet. It is symmetric to case 3. | ||
// case 3 | ||
if (compareBatchState(candidate, last) < 0) { | ||
sortedBatches.add(last); | ||
if (compareBatchState(candidate, prev) < 0) { | ||
sortedBatches.add(prev); | ||
sortedBatches.add(new PersisterStateBatch( | ||
last.lastOffset() + 1, | ||
prev.lastOffset() + 1, | ||
candidate.lastOffset(), | ||
candidate.deliveryState(), | ||
candidate.deliveryCount() | ||
)); | ||
} else { | ||
// candidate priority is >= last | ||
// candidate priority is >= prev | ||
sortedBatches.add(candidate); | ||
} | ||
} | ||
} else { // candidate.firstOffset() > last.firstOffset() | ||
if (candidate.lastOffset() < last.lastOffset()) { // case 4 | ||
if (compareBatchState(candidate, last) < 0) { | ||
sortedBatches.add(last); | ||
} else { // candidate.firstOffset() > prev.firstOffset() | ||
if (candidate.lastOffset() < prev.lastOffset()) { // case 4 | ||
if (compareBatchState(candidate, prev) < 0) { | ||
sortedBatches.add(prev); | ||
} else { | ||
sortedBatches.add(new PersisterStateBatch( | ||
last.firstOffset(), | ||
prev.firstOffset(), | ||
candidate.firstOffset() - 1, | ||
last.deliveryState(), | ||
last.deliveryCount() | ||
prev.deliveryState(), | ||
prev.deliveryCount() | ||
)); | ||
|
||
sortedBatches.add(candidate); | ||
|
||
sortedBatches.add(new PersisterStateBatch( | ||
candidate.lastOffset() + 1, | ||
last.lastOffset(), | ||
last.deliveryState(), | ||
last.deliveryCount() | ||
prev.lastOffset(), | ||
prev.deliveryState(), | ||
prev.deliveryCount() | ||
)); | ||
} | ||
} else if (candidate.lastOffset() == last.lastOffset()) { // case 5 | ||
if (compareBatchState(candidate, last) < 0) { | ||
sortedBatches.add(last); | ||
} else if (candidate.lastOffset() == prev.lastOffset()) { // case 5 | ||
if (compareBatchState(candidate, prev) < 0) { | ||
sortedBatches.add(prev); | ||
} else { | ||
sortedBatches.add(new PersisterStateBatch( | ||
last.firstOffset(), | ||
prev.firstOffset(), | ||
candidate.firstOffset() - 1, | ||
last.deliveryState(), | ||
last.deliveryCount() | ||
prev.deliveryState(), | ||
prev.deliveryCount() | ||
)); | ||
|
||
sortedBatches.add(candidate); | ||
} | ||
} else { // case 6 | ||
if (compareBatchState(candidate, last) < 0) { | ||
sortedBatches.add(last); | ||
if (compareBatchState(candidate, prev) < 0) { | ||
sortedBatches.add(prev); | ||
|
||
sortedBatches.add(new PersisterStateBatch( | ||
last.lastOffset() + 1, | ||
prev.lastOffset() + 1, | ||
candidate.lastOffset(), | ||
candidate.deliveryState(), | ||
candidate.deliveryCount() | ||
)); | ||
} else { | ||
// candidate has higher priority | ||
sortedBatches.add(new PersisterStateBatch( | ||
last.firstOffset(), | ||
prev.firstOffset(), | ||
candidate.firstOffset() - 1, | ||
last.deliveryState(), | ||
last.deliveryCount() | ||
prev.deliveryState(), | ||
prev.deliveryCount() | ||
)); | ||
|
||
sortedBatches.add(candidate); | ||
|
@@ -213,10 +216,13 @@ private static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> | |
/** | ||
* Accepts a sorted set of state batches and finds the first 2 batches which overlap. | ||
* Overlap means that they have some offsets in common or, they are contiguous with the same state. | ||
* <p> | ||
* Along with the 2 overlapping batches, also returns a list of non overlapping intervals | ||
* prefixing them. For example | ||
* _____ ____ _____ _____ _____ | ||
* ______ __ | ||
* prefixing them. | ||
* <p> | ||
* For example: | ||
* ----- ---- ----- ----- ----- | ||
* ------ -- | ||
* <---------------> <--------> | ||
* non-overlapping 1st overlapping pair | ||
* | ||
|
@@ -225,30 +231,39 @@ private static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> | |
*/ | ||
private static BatchOverlapState getOverlappingState(TreeSet<PersisterStateBatch> sortedBatches) { | ||
if (sortedBatches == null || sortedBatches.isEmpty()) { | ||
return BatchOverlapState.SENTINEL; | ||
return BatchOverlapState.EMPTY; | ||
} | ||
Iterator<PersisterStateBatch> iter = sortedBatches.iterator(); | ||
PersisterStateBatch last = iter.next(); | ||
PersisterStateBatch prev = iter.next(); | ||
List<PersisterStateBatch> nonOverlapping = new ArrayList<>(sortedBatches.size()); | ||
while (iter.hasNext()) { | ||
PersisterStateBatch candidate = iter.next(); | ||
if (candidate.firstOffset() <= last.lastOffset() || // overlap | ||
last.lastOffset() + 1 == candidate.firstOffset() && compareBatchState(last, candidate) == 0) { // contiguous | ||
if (candidate.firstOffset() <= prev.lastOffset() || // overlap | ||
prev.lastOffset() + 1 == candidate.firstOffset() && compareBatchState(prev, candidate) == 0) { // contiguous | ||
return new BatchOverlapState( | ||
last, | ||
prev, | ||
candidate, | ||
nonOverlapping | ||
); | ||
} | ||
nonOverlapping.add(last); | ||
last = candidate; | ||
nonOverlapping.add(prev); | ||
prev = candidate; | ||
} | ||
// It can happen that the sortedBatches only contain | ||
// non overlapping intervals. In that case, we want to | ||
// return a valid non-overlapping prefix list but there | ||
// is no overlapping pair. | ||
// To differentiate this case with EMPTY case, we can | ||
// pass prev and candidate as null to indicate to the caller | ||
// that no further productive states are possible. | ||
return new BatchOverlapState(null, null, nonOverlapping); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a short comment here explaining how we get to this point (and why the |
||
} | ||
|
||
/** | ||
* Compares the state of 2 batches i.e. the deliveryCount and deliverState. | ||
* <p> | ||
* Uses standard compareTo contract x < y => +int, x > y => -int, x == y => 0 | ||
* | ||
* @param b1 - {@link PersisterStateBatch} to compare | ||
* @param b2 - {@link PersisterStateBatch} to compare | ||
* @return int representing comparison result. | ||
|
@@ -270,36 +285,47 @@ private static int compareBatchState(PersisterStateBatch b1, PersisterStateBatch | |
} | ||
|
||
/** | ||
* Accepts a list of {@link PersisterStateBatch} and removes/prunes ones which lie | ||
* before the start offset. | ||
* Accepts a list of {@link PersisterStateBatch} and checks: | ||
* - last offset is < start offset => batch is removed | ||
* - first offset > start offset => batch is preserved | ||
* - start offset intersects the batch => part of batch before start offset is removed and | ||
* the part after it is preserved. | ||
* | ||
* @param batches - List of {@link PersisterStateBatch} | ||
* @param startOffset - long representing the start offset | ||
* @return List of pruned {@link PersisterStateBatch} | ||
*/ | ||
public static List<PersisterStateBatch> pruneBatches(List<PersisterStateBatch> batches, long startOffset) { | ||
private static List<PersisterStateBatch> pruneBatches(List<PersisterStateBatch> batches, long startOffset) { | ||
if (startOffset != -1) { | ||
List<PersisterStateBatch> prunedList = new ArrayList<>(batches.size()); | ||
List<PersisterStateBatch> retainedBatches = new ArrayList<>(batches.size()); | ||
batches.forEach(batch -> { | ||
if (batch.lastOffset() < startOffset) { | ||
// batch is expired, skip current iteration | ||
// ------- | ||
// | -> start offset | ||
return; | ||
} | ||
|
||
if (batch.firstOffset() >= startOffset) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The javadoc states: "prunes ones which lie before the start offset." but here we are pruning the batch whose first offset is after the start offset. The logic seems inverted from the description. Is it the case that the |
||
// covers: | ||
// ______ | ||
// | -> start offset | ||
prunedList.add(batch); | ||
} else if (batch.lastOffset() >= startOffset) { | ||
// covers: | ||
// ________ | ||
// | -> start offset | ||
prunedList.add(new PersisterStateBatch(startOffset, batch.lastOffset(), batch.deliveryState(), batch.deliveryCount())); | ||
// complete batch is valid | ||
// --------- | ||
// | -> start offset | ||
retainedBatches.add(batch); | ||
} else { | ||
// start offset intersects batch | ||
// --------- | ||
// | -> start offset | ||
retainedBatches.add(new PersisterStateBatch(startOffset, batch.lastOffset(), batch.deliveryState(), batch.deliveryCount())); | ||
} | ||
// in all other cases, the batch is completely expired. | ||
}); | ||
return prunedList; | ||
return retainedBatches; | ||
} | ||
return batches; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move the short-circuit case to the top |
||
} | ||
|
||
/** | ||
* Converts a {@link ShareUpdateValue.StateBatch} type state batch to {@link PersisterStateBatch} | ||
* | ||
* @param batch - The object representing {@link ShareUpdateValue.StateBatch} | ||
* @return {@link PersisterStateBatch} | ||
*/ | ||
|
@@ -317,23 +343,23 @@ public static PersisterStateBatch toPersisterStateBatch(ShareUpdateValue.StateBa | |
* used in the batch merge algorithm. | ||
*/ | ||
private static class BatchOverlapState { | ||
private final PersisterStateBatch last; | ||
private final PersisterStateBatch prev; | ||
private final PersisterStateBatch candidate; | ||
private final List<PersisterStateBatch> nonOverlapping; | ||
public static final BatchOverlapState SENTINEL = new BatchOverlapState(null, null, Collections.emptyList()); | ||
public static final BatchOverlapState EMPTY = new BatchOverlapState(null, null, Collections.emptyList()); | ||
|
||
public BatchOverlapState( | ||
PersisterStateBatch last, | ||
PersisterStateBatch prev, | ||
PersisterStateBatch candidate, | ||
List<PersisterStateBatch> nonOverlapping | ||
) { | ||
this.last = last; | ||
this.prev = prev; | ||
this.candidate = candidate; | ||
this.nonOverlapping = nonOverlapping; | ||
} | ||
|
||
public PersisterStateBatch last() { | ||
return last; | ||
public PersisterStateBatch prev() { | ||
return prev; | ||
} | ||
|
||
public PersisterStateBatch candidate() { | ||
|
@@ -349,12 +375,12 @@ public boolean equals(Object o) { | |
if (this == o) return true; | ||
if (!(o instanceof BatchOverlapState)) return false; | ||
BatchOverlapState that = (BatchOverlapState) o; | ||
return Objects.equals(last, that.last) && Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, that.nonOverlapping); | ||
return Objects.equals(prev, that.prev) && Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, that.nonOverlapping); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(last, candidate, nonOverlapping); | ||
return Objects.hash(prev, candidate, nonOverlapping); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the fall-through case here will always copy everything from
sortedBatches
into the List of non-overlapping batches. Instead of this copy, could we return an Optional here where the absence of a value indicates fall-through, and the presence of a value indicates some overlapping state was found?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the algorithm, we need the non-overalapping prefix to be removed from the treeset so this might not work