Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] #17149

Open
wants to merge 35 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2128532
KAFKA-17367: Share coordinator impl. Added additional tests. [3/N]
smjn Sep 10, 2024
5f377fb
Improved test names.
smjn Sep 10, 2024
8fecb1d
removed extraneous comment.
smjn Sep 10, 2024
7042155
removed incorrect comment, added another overlap test.
smjn Sep 10, 2024
a299752
Modified combine batches logic.
smjn Sep 11, 2024
2e1d9a5
Added start offset based pruning.
smjn Sep 11, 2024
080d422
fixed comment.
smjn Sep 11, 2024
bce0cb3
Handle overlapping input.
smjn Sep 11, 2024
f112e0c
renamed method arg.
smjn Sep 11, 2024
fca3dcb
fixed bug in merge logic.
smjn Sep 11, 2024
1e88e7d
add delivery count as a sort dimension.
smjn Sep 11, 2024
0c7aadf
minor refactoring.
smjn Sep 11, 2024
e483bd2
merge logic overhaul.
smjn Sep 12, 2024
65f3b7b
minor optimization, fixed comments.
smjn Sep 12, 2024
fc548d2
added generator for tests.
smjn Sep 12, 2024
9049ab6
further optimized merge.
smjn Sep 12, 2024
1d1eb19
minor perf tweaks.
smjn Sep 13, 2024
c892bb0
removed extraneous prune.
smjn Sep 13, 2024
8fb168e
fixed comment.
smjn Sep 13, 2024
6993671
incorporated review comments.
smjn Sep 13, 2024
d20624b
incorporated further comments.
smjn Sep 18, 2024
6561d87
incorporated comments.
smjn Sep 23, 2024
d747dd3
Merge remote-tracking branch 'ak/trunk' into KAFKA-17367-3n
smjn Sep 25, 2024
60bda5d
Merge remote-tracking branch 'ak/trunk' into KAFKA-17367-3n
smjn Sep 25, 2024
2e5003c
Moved state batch merge code to util class.
smjn Sep 25, 2024
088bbf1
fixed documentation.
smjn Sep 25, 2024
32db0a6
incorporated review comments.
smjn Sep 26, 2024
4d2eaac
changed byte, short to int in tests.
smjn Sep 26, 2024
9928084
converted batch util to class.
smjn Sep 27, 2024
86d1237
renamed a few private methods.
smjn Sep 27, 2024
5e98616
added comprehensive javadoc.
smjn Sep 27, 2024
7a63481
minor bug fix.
smjn Sep 27, 2024
9d2e834
minor refactoring.
smjn Sep 28, 2024
0c7436d
create new arraylist from arguments.
smjn Sep 28, 2024
6f1725f
incorporated comments.
smjn Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ public class StateBatchUtil {
/**
* Util method which takes in 2 lists containing {@link PersisterStateBatch}
* and the startOffset.
* <p>
* This method removes any batches where the lastOffset < startOffset, if the startOffset > -1.
* It then merges any contiguous intervals with same state. If states differ,
* based on various conditions it creates new non-overlapping batches preferring new ones.
*
* @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch}
* @param newBatches - List containing {@link PersisterStateBatch} in incoming request
* @param startOffset - startOffset to consider when removing old batches.
Expand Down Expand Up @@ -65,6 +67,7 @@ public static List<PersisterStateBatch> combineStateBatches(
* - Based on various cases:
* - swallow lower priority batch within bounds of offsets
* - break batch into other non-overlapping batches
*
* @param batches - List of {@link PersisterStateBatch}
* @return List of non-overlapping {@link PersisterStateBatch}
*/
Expand All @@ -77,8 +80,8 @@ private static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>

BatchOverlapState overlapState = getOverlappingState(sortedBatches);

while (overlapState != BatchOverlapState.SENTINEL) {
PersisterStateBatch last = overlapState.last();
while (overlapState != BatchOverlapState.EMPTY) {
PersisterStateBatch prev = overlapState.prev();
PersisterStateBatch candidate = overlapState.candidate();

// remove non overlapping prefix from sortedBatches,
Expand All @@ -91,112 +94,112 @@ private static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
}

if (candidate == null) {
overlapState = BatchOverlapState.SENTINEL;
overlapState = BatchOverlapState.EMPTY;
continue;
}

// remove both last and candidate for easier
// remove both previous and candidate for easier
// assessment about adding batches to sortedBatches
sortedBatches.remove(last);
sortedBatches.remove(prev);
sortedBatches.remove(candidate);

// overlap and same state (last.firstOffset <= candidate.firstOffset) due to sort
// overlap and same state (prev.firstOffset <= candidate.firstOffset) due to sort
// covers:
// case: 1 2 3 4 5 6 7 (contiguous)
// last: ______ _______ _______ _______ _______ ________ _______
// candidate: ______ ____ __________ ___ ____ _______ _______
if (compareBatchState(candidate, last) == 0) {
// prev: ------ ------- ------- ------- ------- -------- -------
// candidate: ------ ---- ---------- --- ---- ------- -------
if (compareBatchState(candidate, prev) == 0) {
sortedBatches.add(new PersisterStateBatch(
last.firstOffset(),
prev.firstOffset(),
// cover cases
// last: ______ ________ _________
// candidate: ___ __________ _____
Math.max(candidate.lastOffset(), last.lastOffset()),
last.deliveryState(),
last.deliveryCount()
// prev: ------ -------- ---------
// candidate: --- ---------- -----
Math.max(candidate.lastOffset(), prev.lastOffset()),
prev.deliveryState(),
prev.deliveryCount()
));
} else if (candidate.firstOffset() <= last.lastOffset()) { // non-contiguous overlap
} else if (candidate.firstOffset() <= prev.lastOffset()) { // non-contiguous overlap
// overlap and different state
// covers:
// case: 1 2* 3 4 5 6
// last: ______ _______ _______ _______ _______ ________
// candidate: ______ ____ _________ ____ ____ _______
// prev: ------ ------- ------- ------- ------- --------
// candidate: ------ ---- --------- ---- ---- -------
// max batches: 1 2 2 3 2 2
// min batches: 1 1 1 1 1 2
// * not possible with treeset

if (candidate.firstOffset() == last.firstOffset()) {
if (candidate.lastOffset() == last.lastOffset()) { // case 1
if (candidate.firstOffset() == prev.firstOffset()) {
if (candidate.lastOffset() == prev.lastOffset()) { // case 1
// candidate can never have lower or equal priority
// since sortedBatches order takes that into account.
sortedBatches.add(candidate);
} else {
// case 2 is not possible with TreeSet. It is symmetric to case 3.
// case 3
if (compareBatchState(candidate, last) < 0) {
sortedBatches.add(last);
if (compareBatchState(candidate, prev) < 0) {
sortedBatches.add(prev);
sortedBatches.add(new PersisterStateBatch(
last.lastOffset() + 1,
prev.lastOffset() + 1,
candidate.lastOffset(),
candidate.deliveryState(),
candidate.deliveryCount()
));
} else {
// candidate priority is >= last
// candidate priority is >= prev
sortedBatches.add(candidate);
}
}
} else { // candidate.firstOffset() > last.firstOffset()
if (candidate.lastOffset() < last.lastOffset()) { // case 4
if (compareBatchState(candidate, last) < 0) {
sortedBatches.add(last);
} else { // candidate.firstOffset() > prev.firstOffset()
if (candidate.lastOffset() < prev.lastOffset()) { // case 4
if (compareBatchState(candidate, prev) < 0) {
sortedBatches.add(prev);
} else {
sortedBatches.add(new PersisterStateBatch(
last.firstOffset(),
prev.firstOffset(),
candidate.firstOffset() - 1,
last.deliveryState(),
last.deliveryCount()
prev.deliveryState(),
prev.deliveryCount()
));

sortedBatches.add(candidate);

sortedBatches.add(new PersisterStateBatch(
candidate.lastOffset() + 1,
last.lastOffset(),
last.deliveryState(),
last.deliveryCount()
prev.lastOffset(),
prev.deliveryState(),
prev.deliveryCount()
));
}
} else if (candidate.lastOffset() == last.lastOffset()) { // case 5
if (compareBatchState(candidate, last) < 0) {
sortedBatches.add(last);
} else if (candidate.lastOffset() == prev.lastOffset()) { // case 5
if (compareBatchState(candidate, prev) < 0) {
sortedBatches.add(prev);
} else {
sortedBatches.add(new PersisterStateBatch(
last.firstOffset(),
prev.firstOffset(),
candidate.firstOffset() - 1,
last.deliveryState(),
last.deliveryCount()
prev.deliveryState(),
prev.deliveryCount()
));

sortedBatches.add(candidate);
}
} else { // case 6
if (compareBatchState(candidate, last) < 0) {
sortedBatches.add(last);
if (compareBatchState(candidate, prev) < 0) {
sortedBatches.add(prev);

sortedBatches.add(new PersisterStateBatch(
last.lastOffset() + 1,
prev.lastOffset() + 1,
candidate.lastOffset(),
candidate.deliveryState(),
candidate.deliveryCount()
));
} else {
// candidate has higher priority
sortedBatches.add(new PersisterStateBatch(
last.firstOffset(),
prev.firstOffset(),
candidate.firstOffset() - 1,
last.deliveryState(),
last.deliveryCount()
prev.deliveryState(),
prev.deliveryCount()
));

sortedBatches.add(candidate);
Expand All @@ -213,10 +216,13 @@ private static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
/**
* Accepts a sorted set of state batches and finds the first 2 batches which overlap.
* Overlap means that they have some offsets in common or, they are contiguous with the same state.
* <p>
* Along with the 2 overlapping batches, also returns a list of non overlapping intervals
* prefixing them. For example
* _____ ____ _____ _____ _____
* ______ __
* prefixing them.
* <p>
* For example:
* ----- ---- ----- ----- -----
* ------ --
* <---------------> <-------->
* non-overlapping 1st overlapping pair
*
Expand All @@ -225,30 +231,39 @@ private static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
*/
private static BatchOverlapState getOverlappingState(TreeSet<PersisterStateBatch> sortedBatches) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the fall-through case here will always copy everything from sortedBatches into the List of non-overlapping batches. Instead of this copy, could we return an Optional here where the absence of a value indicates fall-through, and the presence of a value indicates some overlapping state was found?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the algorithm, we need the non-overalapping prefix to be removed from the treeset so this might not work

if (sortedBatches == null || sortedBatches.isEmpty()) {
return BatchOverlapState.SENTINEL;
return BatchOverlapState.EMPTY;
}
Iterator<PersisterStateBatch> iter = sortedBatches.iterator();
PersisterStateBatch last = iter.next();
PersisterStateBatch prev = iter.next();
List<PersisterStateBatch> nonOverlapping = new ArrayList<>(sortedBatches.size());
while (iter.hasNext()) {
PersisterStateBatch candidate = iter.next();
if (candidate.firstOffset() <= last.lastOffset() || // overlap
last.lastOffset() + 1 == candidate.firstOffset() && compareBatchState(last, candidate) == 0) { // contiguous
if (candidate.firstOffset() <= prev.lastOffset() || // overlap
prev.lastOffset() + 1 == candidate.firstOffset() && compareBatchState(prev, candidate) == 0) { // contiguous
return new BatchOverlapState(
last,
prev,
candidate,
nonOverlapping
);
}
nonOverlapping.add(last);
last = candidate;
nonOverlapping.add(prev);
prev = candidate;
}
// It can happen that the sortedBatches only contain
// non overlapping intervals. In that case, we want to
// return a valid non-overlapping prefix list but there
// is no overlapping pair.
// To differentiate this case with EMPTY case, we can
// pass prev and candidate as null to indicate to the caller
// that no further productive states are possible.
return new BatchOverlapState(null, null, nonOverlapping);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a short comment here explaining how we get to this point (and why the null)

}

/**
* Compares the state of 2 batches i.e. the deliveryCount and deliverState.
* <p>
* Uses standard compareTo contract x < y => +int, x > y => -int, x == y => 0
*
* @param b1 - {@link PersisterStateBatch} to compare
* @param b2 - {@link PersisterStateBatch} to compare
* @return int representing comparison result.
Expand All @@ -270,36 +285,47 @@ private static int compareBatchState(PersisterStateBatch b1, PersisterStateBatch
}

/**
* Accepts a list of {@link PersisterStateBatch} and removes/prunes ones which lie
* before the start offset.
* Accepts a list of {@link PersisterStateBatch} and checks:
* - last offset is < start offset => batch is removed
* - first offset > start offset => batch is preserved
* - start offset intersects the batch => part of batch before start offset is removed and
* the part after it is preserved.
*
* @param batches - List of {@link PersisterStateBatch}
* @param startOffset - long representing the start offset
* @return List of pruned {@link PersisterStateBatch}
*/
public static List<PersisterStateBatch> pruneBatches(List<PersisterStateBatch> batches, long startOffset) {
private static List<PersisterStateBatch> pruneBatches(List<PersisterStateBatch> batches, long startOffset) {
if (startOffset != -1) {
List<PersisterStateBatch> prunedList = new ArrayList<>(batches.size());
List<PersisterStateBatch> retainedBatches = new ArrayList<>(batches.size());
batches.forEach(batch -> {
if (batch.lastOffset() < startOffset) {
// batch is expired, skip current iteration
// -------
// | -> start offset
return;
}

if (batch.firstOffset() >= startOffset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc states: "prunes ones which lie before the start offset." but here we are pruning the batch whose first offset is after the start offset. The logic seems inverted from the description.

Is it the case that the prunedList contains batches which were kept (rather than pruned)? If that's true, we should consider a name like retainBatchesAfterOffset or pruneBatchesBeforeOffset to be more clear.

// covers:
// ______
// | -> start offset
prunedList.add(batch);
} else if (batch.lastOffset() >= startOffset) {
// covers:
// ________
// | -> start offset
prunedList.add(new PersisterStateBatch(startOffset, batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()));
// complete batch is valid
// ---------
// | -> start offset
retainedBatches.add(batch);
} else {
// start offset intersects batch
// ---------
// | -> start offset
retainedBatches.add(new PersisterStateBatch(startOffset, batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()));
}
// in all other cases, the batch is completely expired.
});
return prunedList;
return retainedBatches;
}
return batches;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the short-circuit case to the top

}

/**
* Converts a {@link ShareUpdateValue.StateBatch} type state batch to {@link PersisterStateBatch}
*
* @param batch - The object representing {@link ShareUpdateValue.StateBatch}
* @return {@link PersisterStateBatch}
*/
Expand All @@ -317,23 +343,23 @@ public static PersisterStateBatch toPersisterStateBatch(ShareUpdateValue.StateBa
* used in the batch merge algorithm.
*/
private static class BatchOverlapState {
private final PersisterStateBatch last;
private final PersisterStateBatch prev;
private final PersisterStateBatch candidate;
private final List<PersisterStateBatch> nonOverlapping;
public static final BatchOverlapState SENTINEL = new BatchOverlapState(null, null, Collections.emptyList());
public static final BatchOverlapState EMPTY = new BatchOverlapState(null, null, Collections.emptyList());

public BatchOverlapState(
PersisterStateBatch last,
PersisterStateBatch prev,
PersisterStateBatch candidate,
List<PersisterStateBatch> nonOverlapping
) {
this.last = last;
this.prev = prev;
this.candidate = candidate;
this.nonOverlapping = nonOverlapping;
}

public PersisterStateBatch last() {
return last;
public PersisterStateBatch prev() {
return prev;
}

public PersisterStateBatch candidate() {
Expand All @@ -349,12 +375,12 @@ public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof BatchOverlapState)) return false;
BatchOverlapState that = (BatchOverlapState) o;
return Objects.equals(last, that.last) && Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, that.nonOverlapping);
return Objects.equals(prev, that.prev) && Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, that.nonOverlapping);
}

@Override
public int hashCode() {
return Objects.hash(last, candidate, nonOverlapping);
return Objects.hash(prev, candidate, nonOverlapping);
}
}
}
Loading