newBatches) {
+ boolean soFarEmpty = batchesSoFar == null || batchesSoFar.isEmpty();
+ boolean newBatchesEmpty = newBatches == null || newBatches.isEmpty();
+
+ if (soFarEmpty && newBatchesEmpty) {
+ combinedBatchList = new ArrayList<>();
+ } else if (soFarEmpty) {
+ combinedBatchList = new ArrayList<>(newBatches); // new list as the original one could be unmodifiable
+ } else if (newBatchesEmpty) {
+ combinedBatchList = new ArrayList<>(batchesSoFar); // new list as the original one could be unmodifiable
+ } else {
+ combinedBatchList = new ArrayList<>(batchesSoFar.size() + newBatches.size());
+ combinedBatchList.addAll(batchesSoFar);
+ combinedBatchList.addAll(newBatches);
+ }
+ }
+
+ /**
+ * Algorithm: Merge current state batches and new batches into a single non-overlapping batch list.
+ * Input: batchesSoFar, newBatches, startOffset
+ * Output: combined list with non-overlapping batches (finalBatchList)
+ *
+ * - Add both currentBatches and newBatches into a single list combinedBatchList
+ * - if combinedBatchList.size() <= 1 return combinedBatchList
+ *
+ * - Remove/prune any batches from the combinedBatchList:
+ * - if batch.lastOffset < startOffset then remove batch from combinedBatchList
+ * - else if batch.firstOffset > startOffset then we will keep the batch
+ * - else if batch.firstOffset <= startOffset <= batch.lastOffset then keep [startOffset, batch.lastOffset] part only and discard rest.
+ *
+ * - create a treeset sortedBatches using pruned combinedBatchList
+ * - find first 2 mergeable batches in sortedBatches set, say, prev and candidate.
+ * - remove any non-overlapping batches from sortedBatches encountered during the find operation and add them to a finalBatchList
+ * - do repeat until a mergeable pair is not found:
+ * - based on various conditions of offset overlap and batch state differences combine the batches or
+ * create new batches, if required, and add to the sortedBatches.
+ * - find first 2 mergeable batches in sortedBatches set, say, prev and candidate.
+ * - remove any non-mergeable batches from sortedBatches encountered during the find operation and add them to a finalBatchList
+ * - done
+ * - return the finalBatchList
+ *
+ * @return list of {@link PersisterStateBatch} representing non-overlapping combined batches
+ */
+ public List combineStateBatches() {
+ pruneBatches();
+ mergeBatches();
+ return finalBatchList;
+ }
+
+ private void mergeBatches() {
+ if (combinedBatchList.size() < 2) {
+ finalBatchList = combinedBatchList;
+ return;
+ }
+
+ sortedBatches = new TreeSet<>(combinedBatchList);
+
+ MergeCandidatePair overlapState = getMergeCandidatePair();
+
+ while (overlapState != MergeCandidatePair.EMPTY) {
+ PersisterStateBatch prev = overlapState.prev();
+ PersisterStateBatch candidate = overlapState.candidate();
+
+ // remove both previous and candidate for easier
+ // assessment about adding batches to sortedBatches
+ sortedBatches.remove(prev);
+ sortedBatches.remove(candidate);
+
+ if (compareBatchDeliveryInfo(candidate, prev) == 0) { // same state and overlap or contiguous
+ // overlap and same state (prev.firstOffset <= candidate.firstOffset) due to sort
+ // covers:
+ // case: 1 2 3 4 5 6 7 (contiguous)
+ // prev: ------ ------- ------- ------- ------- -------- -------
+ // candidate: ------ ---- ---------- --- ---- ------- -------
+ handleSameStateMerge(prev, candidate); // pair can be contiguous or overlapping
+ } else {
+ // If we reach here then it is guaranteed that the batch pair is overlapping and
+ // non-contiguous because getMergeCandidatePair only returns contiguous pair if
+ // the constituents have the same delivery count and state.
+ // covers:
+ // case: 1 2* 3 4 5 6 7*
+ // prev: ------ ------- ------- ------- ------- -------- ------
+ // candidate: ------ ---- --------- ---- ---- ------- -------
+ // max batches: 1 2 2 3 2 2 2
+ // min batches: 1 1 1 1 1 2 1
+ // * not possible with treeset
+ handleDiffStateOverlap(prev, candidate);
+ }
+ overlapState = getMergeCandidatePair();
+ }
+ finalBatchList.addAll(sortedBatches); // some non overlapping batches might have remained
+ }
+
+ /**
+ * Compares the non-offset state of 2 batches i.e. the deliveryCount and deliverState.
+ *
+ * Uses standard compareTo contract x < y => +int, x > y => -int, x == y => 0
+ *
+ * @param b1 - {@link PersisterStateBatch} to compare
+ * @param b2 - {@link PersisterStateBatch} to compare
+ * @return int representing comparison result.
+ */
+ private int compareBatchDeliveryInfo(PersisterStateBatch b1, PersisterStateBatch b2) {
+ int deltaCount = Short.compare(b1.deliveryCount(), b2.deliveryCount());
+
+ // Delivery state could be:
+ // 0 - AVAILABLE (non-terminal)
+ // 1 - ACQUIRED - should not be persisted yet
+ // 2 - ACKNOWLEDGED (terminal)
+ // 3 - ARCHIVING - not implemented in KIP-932 - non-terminal - leads only to ARCHIVED
+ // 4 - ARCHIVED (terminal)
+
+ if (deltaCount == 0) { // same delivery count
+ return Byte.compare(b1.deliveryState(), b2.deliveryState());
+ }
+ return deltaCount;
+ }
+
+ /**
+ * Accepts a sorted set of state batches and finds the first 2 batches which can be merged.
+ * Merged implies that they have some offsets in common or, they are contiguous with the same state.
+ *
+ * Any non-mergeable batches prefixing a good mergeable pair are removed from the sortedBatches.
+ * For example:
+ * ----- ---- ----- ----- -----
+ * ------
+ * <---------------> <-------->
+ * non-overlapping 1st overlapping pair
+ *
+ * @return object representing the overlap state
+ */
+ private MergeCandidatePair getMergeCandidatePair() {
+ if (sortedBatches == null || sortedBatches.isEmpty()) {
+ return MergeCandidatePair.EMPTY;
+ }
+ Iterator iter = sortedBatches.iterator();
+ PersisterStateBatch prev = iter.next();
+ List nonOverlapping = new ArrayList<>(sortedBatches.size());
+ while (iter.hasNext()) {
+ PersisterStateBatch candidate = iter.next();
+ if (candidate.firstOffset() <= prev.lastOffset() || // overlap
+ prev.lastOffset() + 1 == candidate.firstOffset() && compareBatchDeliveryInfo(prev, candidate) == 0) { // contiguous
+ updateBatchContainers(nonOverlapping);
+ return new MergeCandidatePair(
+ prev,
+ candidate
+ );
+ }
+ nonOverlapping.add(prev);
+ prev = candidate;
+ }
+
+ updateBatchContainers(nonOverlapping);
+ return MergeCandidatePair.EMPTY;
+ }
+
+ private void updateBatchContainers(List nonOverlappingBatches) {
+ nonOverlappingBatches.forEach(sortedBatches::remove);
+ finalBatchList.addAll(nonOverlappingBatches);
+ }
+
+ /**
+ * Accepts a list of {@link PersisterStateBatch} and checks:
+ * - last offset is < start offset => batch is removed
+ * - first offset > start offset => batch is preserved
+ * - start offset intersects the batch => part of batch before start offset is removed and
+ * the part after it is preserved.
+ */
+ private void pruneBatches() {
+ if (startOffset != -1) {
+ List retainedBatches = new ArrayList<>(combinedBatchList.size());
+ combinedBatchList.forEach(batch -> {
+ if (batch.lastOffset() < startOffset) {
+ // batch is expired, skip current iteration
+ // -------
+ // | -> start offset
+ return;
+ }
+
+ if (batch.firstOffset() >= startOffset) {
+ // complete batch is valid
+ // ---------
+ // | -> start offset
+ retainedBatches.add(batch);
+ } else {
+ // start offset intersects batch
+ // ---------
+ // | -> start offset
+ retainedBatches.add(new PersisterStateBatch(startOffset, batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()));
+ }
+ });
+ // update the instance variable
+ combinedBatchList = retainedBatches;
+ }
+ }
+
+ private void handleSameStateMerge(PersisterStateBatch prev, PersisterStateBatch candidate) {
+ sortedBatches.add(new PersisterStateBatch(
+ prev.firstOffset(),
+ // cover cases
+ // prev: ------ -------- ---------
+ // candidate: --- ---------- -----
+ Math.max(candidate.lastOffset(), prev.lastOffset()),
+ prev.deliveryState(),
+ prev.deliveryCount()
+ ));
+ }
+
+ private void handleDiffStateOverlap(PersisterStateBatch prev, PersisterStateBatch candidate) {
+ if (candidate.firstOffset() == prev.firstOffset()) {
+ handleDiffStateOverlapFirstOffsetAligned(prev, candidate);
+ } else { // candidate.firstOffset() > prev.firstOffset()
+ handleDiffStateOverlapFirstOffsetNotAligned(prev, candidate);
+ }
+ }
+
+ private void handleDiffStateOverlapFirstOffsetAligned(PersisterStateBatch prev, PersisterStateBatch candidate) {
+ if (candidate.lastOffset() == prev.lastOffset()) { // case 1
+ // candidate can never have lower or equal priority
+ // since sortedBatches order takes that into account.
+ // -------
+ // -------
+ sortedBatches.add(candidate);
+ } else {
+ // case 2 is not possible with TreeSet. It is symmetric to case 3.
+ // case 3
+ // --------
+ // -----------
+ if (compareBatchDeliveryInfo(candidate, prev) < 0) {
+ sortedBatches.add(prev);
+ sortedBatches.add(new PersisterStateBatch(
+ prev.lastOffset() + 1,
+ candidate.lastOffset(),
+ candidate.deliveryState(),
+ candidate.deliveryCount()
+ ));
+ } else {
+ // candidate priority is >= prev
+ sortedBatches.add(candidate);
+ }
+ }
+ }
+
+ private void handleDiffStateOverlapFirstOffsetNotAligned(PersisterStateBatch prev, PersisterStateBatch candidate) {
+ if (candidate.lastOffset() < prev.lastOffset()) { // case 4
+ handleDiffStateOverlapPrevSwallowsCandidate(prev, candidate);
+ } else if (candidate.lastOffset() == prev.lastOffset()) { // case 5
+ handleDiffStateOverlapLastOffsetAligned(prev, candidate);
+ } else { // case 6
+ handleDiffStateOverlapCandidateOffsetsLarger(prev, candidate);
+ }
+ }
+
+ private void handleDiffStateOverlapPrevSwallowsCandidate(PersisterStateBatch prev, PersisterStateBatch candidate) {
+ // --------
+ // ----
+ if (compareBatchDeliveryInfo(candidate, prev) < 0) {
+ sortedBatches.add(prev);
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ prev.firstOffset(),
+ candidate.firstOffset() - 1,
+ prev.deliveryState(),
+ prev.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+
+ sortedBatches.add(new PersisterStateBatch(
+ candidate.lastOffset() + 1,
+ prev.lastOffset(),
+ prev.deliveryState(),
+ prev.deliveryCount()
+ ));
+ }
+ }
+
+ private void handleDiffStateOverlapLastOffsetAligned(PersisterStateBatch prev, PersisterStateBatch candidate) {
+ // --------
+ // -----
+ if (compareBatchDeliveryInfo(candidate, prev) < 0) {
+ sortedBatches.add(prev);
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ prev.firstOffset(),
+ candidate.firstOffset() - 1,
+ prev.deliveryState(),
+ prev.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+ }
+ }
+
+ private void handleDiffStateOverlapCandidateOffsetsLarger(PersisterStateBatch prev, PersisterStateBatch candidate) {
+ // -------
+ // -------
+ if (compareBatchDeliveryInfo(candidate, prev) < 0) {
+ sortedBatches.add(prev);
+
+ sortedBatches.add(new PersisterStateBatch(
+ prev.lastOffset() + 1,
+ candidate.lastOffset(),
+ candidate.deliveryState(),
+ candidate.deliveryCount()
+ ));
+ } else {
+ // candidate has higher priority
+ sortedBatches.add(new PersisterStateBatch(
+ prev.firstOffset(),
+ candidate.firstOffset() - 1,
+ prev.deliveryState(),
+ prev.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+ }
+ }
+
+ /**
+ * Holder class for intermediate state
+ * used in the batch merge algorithm.
+ */
+ static class MergeCandidatePair {
+ private final PersisterStateBatch prev;
+ private final PersisterStateBatch candidate;
+ public static final MergeCandidatePair EMPTY = new MergeCandidatePair(null, null);
+
+ public MergeCandidatePair(
+ PersisterStateBatch prev,
+ PersisterStateBatch candidate
+ ) {
+ this.prev = prev;
+ this.candidate = candidate;
+ }
+
+ public PersisterStateBatch prev() {
+ return prev;
+ }
+
+ public PersisterStateBatch candidate() {
+ return candidate;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof MergeCandidatePair)) return false;
+ MergeCandidatePair that = (MergeCandidatePair) o;
+ return Objects.equals(prev, that.prev) && Objects.equals(candidate, that.candidate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(prev, candidate);
+ }
+ }
+}
diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index a5bf61920245..3e4c39f01066 100644
--- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -48,19 +48,16 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.config.ShareCoordinatorConfig;
import org.apache.kafka.server.share.PartitionFactory;
+import org.apache.kafka.server.share.PersisterStateBatch;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
-import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
public class ShareCoordinatorShard implements CoordinatorShard {
@@ -78,6 +75,7 @@ public class ShareCoordinatorShard implements CoordinatorShard {
private ShareCoordinatorConfig config;
@@ -262,7 +260,6 @@ public void replayEndTransactionMarker(long producerId, short producerEpoch, Tra
* @param request - WriteShareGroupStateRequestData for a single key
* @return CoordinatorResult(records, response)
*/
- @SuppressWarnings("NPathComplexity")
public CoordinatorResult writeState(
WriteShareGroupStateRequestData request
) {
@@ -274,79 +271,106 @@ public CoordinatorResult wr
return error.get();
}
- String groupId = request.groupId();
WriteShareGroupStateRequestData.WriteStateData topicData = request.topics().get(0);
WriteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0);
+ SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition());
+
+ CoordinatorRecord record = generateShareStateRecord(partitionData, key);
+ // build successful response if record is correctly created
+ WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData()
+ .setResults(
+ Collections.singletonList(
+ WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
+ Collections.singletonList(
+ WriteShareGroupStateResponse.toResponsePartitionResult(
+ key.partition()
+ ))
+ ))
+ );
- SharePartitionKey key = SharePartitionKey.getInstance(groupId, topicData.topicId(), partitionData.partition());
- List recordList;
+ return new CoordinatorResult<>(Collections.singletonList(record), responseData);
+ }
+ /**
+ * Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions.
+ *
+ * if no snapshot has been created for the key => create a new ShareSnapshot record
+ * else if number of ShareUpdate records for key >= max allowed per snapshot per key => create a new ShareSnapshot record
+ * else create a new ShareUpdate record
+ *
+ * @param partitionData - Represents the data which should be written into the share state record.
+ * @param key - The {@link SharePartitionKey} object.
+ * @return {@link CoordinatorRecord} representing ShareSnapshot or ShareUpdate
+ */
+ private CoordinatorRecord generateShareStateRecord(
+ WriteShareGroupStateRequestData.PartitionData partitionData,
+ SharePartitionKey key
+ ) {
if (!shareStateMap.containsKey(key)) {
- // since this is the first time we are getting a write request, we should be creating a share snapshot record
- recordList = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
- groupId, topicData.topicId(), partitionData.partition(), ShareGroupOffset.fromRequest(partitionData)
- ));
+ // Since this is the first time we are getting a write request for key, we should be creating a share snapshot record.
+ // The incoming partition data could have overlapping state batches, we must merge them
+ return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ key.groupId(), key.topicId(), partitionData.partition(),
+ new ShareGroupOffset.Builder()
+ .setSnapshotEpoch(0)
+ .setStartOffset(partitionData.startOffset())
+ .setLeaderEpoch(partitionData.leaderEpoch())
+ .setStateEpoch(partitionData.stateEpoch())
+ .setStateBatches(mergeBatches(Collections.emptyList(), partitionData))
+ .build());
} else if (snapshotUpdateCount.getOrDefault(key, 0) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
- int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? shareStateMap.get(key).leaderEpoch() : partitionData.leaderEpoch();
- int newStateEpoch = partitionData.stateEpoch() == -1 ? shareStateMap.get(key).stateEpoch() : partitionData.stateEpoch();
- long newStartOffset = partitionData.startOffset() == -1 ? shareStateMap.get(key).startOffset() : partitionData.startOffset();
+ ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true
+ int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? currentState.leaderEpoch() : partitionData.leaderEpoch();
+ int newStateEpoch = partitionData.stateEpoch() == -1 ? currentState.stateEpoch() : partitionData.stateEpoch();
+ long newStartOffset = partitionData.startOffset() == -1 ? currentState.startOffset() : partitionData.startOffset();
// Since the number of update records for this share part key exceeds snapshotUpdateRecordsPerSnapshot,
// we should be creating a share snapshot record.
- List batchesToAdd = combineStateBatches(
- shareStateMap.get(key).stateBatchAsSet(),
- partitionData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
- newStartOffset);
-
- recordList = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
- groupId, topicData.topicId(), partitionData.partition(),
+ // The incoming partition data could have overlapping state batches, we must merge them.
+ return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ key.groupId(), key.topicId(), partitionData.partition(),
new ShareGroupOffset.Builder()
+ .setSnapshotEpoch(currentState.snapshotEpoch() + 1) // we must increment snapshot epoch as this is new snapshot
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateEpoch(newStateEpoch)
- .setStateBatches(batchesToAdd)
- .build()));
+ .setStateBatches(mergeBatches(currentState.stateBatches(), partitionData, newStartOffset))
+ .build());
} else {
- // share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot
- recordList = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
- groupId, topicData.topicId(), partitionData.partition(), ShareGroupOffset.fromRequest(partitionData, shareStateMap.get(key).snapshotEpoch())
- ));
- }
+ ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true
- List validRecords = new LinkedList<>();
-
- WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData();
- for (CoordinatorRecord record : recordList) { // should be single record
- if (!(record.key().message() instanceof ShareSnapshotKey) && !(record.key().message() instanceof ShareUpdateKey)) {
- continue;
- }
- SharePartitionKey mapKey = null;
- boolean shouldIncSnapshotEpoch = false;
- if (record.key().message() instanceof ShareSnapshotKey) {
- ShareSnapshotKey recordKey = (ShareSnapshotKey) record.key().message();
- responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult(
- recordKey.topicId(), Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult(
- recordKey.partition())))));
- mapKey = SharePartitionKey.getInstance(recordKey.groupId(), recordKey.topicId(), recordKey.partition());
- shouldIncSnapshotEpoch = true;
- } else if (record.key().message() instanceof ShareUpdateKey) {
- ShareUpdateKey recordKey = (ShareUpdateKey) record.key().message();
- responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult(
- recordKey.topicId(), Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult(
- recordKey.partition())))));
- mapKey = SharePartitionKey.getInstance(recordKey.groupId(), recordKey.topicId(), recordKey.partition());
- }
-
- if (shareStateMap.containsKey(mapKey) && shouldIncSnapshotEpoch) {
- ShareGroupOffset oldValue = shareStateMap.get(mapKey);
- ((ShareSnapshotValue) record.value().message()).setSnapshotEpoch(oldValue.snapshotEpoch() + 1); // increment the snapshot epoch
- }
- validRecords.add(record); // this will have updated snapshot epoch and on replay the value will trickle down to the map
+ // Share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot
+ // so create a share update record.
+ // The incoming partition data could have overlapping state batches, we must merge them.
+ return ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+ key.groupId(), key.topicId(), partitionData.partition(),
+ new ShareGroupOffset.Builder()
+ .setSnapshotEpoch(currentState.snapshotEpoch()) // use same snapshotEpoch as last share snapshot
+ .setStartOffset(partitionData.startOffset())
+ .setLeaderEpoch(partitionData.leaderEpoch())
+ .setStateBatches(mergeBatches(Collections.emptyList(), partitionData))
+ .build());
}
+ }
+
+ private List mergeBatches(
+ List soFar,
+ WriteShareGroupStateRequestData.PartitionData partitionData) {
+ return mergeBatches(soFar, partitionData, partitionData.startOffset());
+ }
- return new CoordinatorResult<>(validRecords, responseData);
+ private List mergeBatches(
+ List soFar,
+ WriteShareGroupStateRequestData.PartitionData partitionData,
+ long startOffset) {
+ return new PersisterStateBatchCombiner(
+ soFar,
+ partitionData.stateBatches().stream()
+ .map(PersisterStateBatch::from)
+ .collect(Collectors.toList()),
+ startOffset
+ )
+ .combineStateBatches();
}
/**
@@ -527,7 +551,7 @@ CoordinatorMetricsShard getMetricsShard() {
private static ShareGroupOffset merge(ShareGroupOffset soFar, ShareUpdateValue newData) {
// snapshot epoch should be same as last share snapshot
// state epoch is not present
- Set currentBatches = soFar.stateBatchAsSet();
+ List currentBatches = soFar.stateBatches();
long newStartOffset = newData.startOffset() == -1 ? soFar.startOffset() : newData.startOffset();
int newLeaderEpoch = newData.leaderEpoch() == -1 ? soFar.leaderEpoch() : newData.leaderEpoch();
@@ -536,41 +560,13 @@ private static ShareGroupOffset merge(ShareGroupOffset soFar, ShareUpdateValue n
.setStateEpoch(soFar.stateEpoch())
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
- .setStateBatches(combineStateBatches(currentBatches, newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)), newStartOffset))
+ .setStateBatches(new PersisterStateBatchCombiner(currentBatches, newData.stateBatches().stream()
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset)
+ .combineStateBatches())
.build();
}
- /**
- * Util method which takes in 2 collections containing {@link PersisterOffsetsStateBatch}
- * and the startOffset.
- * It removes all batches from the 1st collection which have the same first and last offset
- * as the batches in 2nd collection. It then creates a final list of batches which contains the
- * former result and all the batches in the 2nd collection. In set notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if the startOffset > -1.
- * @param currentBatch - collection containing current soft state of batches
- * @param newBatch - collection containing batches in incoming request
- * @param startOffset - startOffset to consider when removing old batches.
- * @return List containing combined batches
- */
- private static List combineStateBatches(
- Collection currentBatch,
- Collection newBatch,
- long startOffset
- ) {
- currentBatch.removeAll(newBatch);
- List batchesToAdd = new LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
- if (startOffset != -1) {
- batchesToAdd.removeIf(batch -> batch.lastOffset() < startOffset);
- }
- return batchesToAdd;
- }
-
private static ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@@ -578,4 +574,20 @@ private static ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersio
return apiMessageAndVersion.message();
}
}
+
+ /**
+ * Util function to convert a state batch of type {@link ShareUpdateValue.StateBatch }
+ * to {@link PersisterStateBatch}.
+ *
+ * @param batch - The object representing {@link ShareUpdateValue.StateBatch}
+ * @return {@link PersisterStateBatch}
+ */
+ private static PersisterStateBatch toPersisterStateBatch(ShareUpdateValue.StateBatch batch) {
+ return new PersisterStateBatch(
+ batch.firstOffset(),
+ batch.lastOffset(),
+ batch.deliveryState(),
+ batch.deliveryCount()
+ );
+ }
}
diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
index 745b3e056b29..8e678098d66c 100644
--- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
+++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
@@ -20,12 +20,12 @@
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -37,13 +37,13 @@ public class ShareGroupOffset {
private final int stateEpoch;
private final int leaderEpoch;
private final long startOffset;
- private final List stateBatches;
+ private final List stateBatches;
private ShareGroupOffset(int snapshotEpoch,
int stateEpoch,
int leaderEpoch,
long startOffset,
- List stateBatches) {
+ List stateBatches) {
this.snapshotEpoch = snapshotEpoch;
this.stateEpoch = stateEpoch;
this.leaderEpoch = leaderEpoch;
@@ -67,16 +67,16 @@ public long startOffset() {
return startOffset;
}
- public List stateBatches() {
+ public List stateBatches() {
return Collections.unmodifiableList(stateBatches);
}
- private static PersisterOffsetsStateBatch toPersisterOffsetsStateBatch(ShareSnapshotValue.StateBatch stateBatch) {
- return new PersisterOffsetsStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount());
+ private static PersisterStateBatch toPersisterOffsetsStateBatch(ShareSnapshotValue.StateBatch stateBatch) {
+ return new PersisterStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount());
}
- private static PersisterOffsetsStateBatch toPersisterOffsetsStateBatch(ShareUpdateValue.StateBatch stateBatch) {
- return new PersisterOffsetsStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount());
+ private static PersisterStateBatch toPersisterOffsetsStateBatch(ShareUpdateValue.StateBatch stateBatch) {
+ return new PersisterStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount());
}
public static ShareGroupOffset fromRecord(ShareSnapshotValue record) {
@@ -99,11 +99,11 @@ public static ShareGroupOffset fromRequest(WriteShareGroupStateRequestData.Parti
data.leaderEpoch(),
data.startOffset(),
data.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
+ .map(PersisterStateBatch::from)
.collect(Collectors.toList()));
}
- public Set stateBatchAsSet() {
+ public LinkedHashSet stateBatchAsSet() {
return new LinkedHashSet<>(stateBatches);
}
@@ -112,7 +112,7 @@ public static class Builder {
private int stateEpoch;
private int leaderEpoch;
private long startOffset;
- private List stateBatches;
+ private List stateBatches;
public Builder setSnapshotEpoch(int snapshotEpoch) {
this.snapshotEpoch = snapshotEpoch;
@@ -134,7 +134,7 @@ public Builder setStartOffset(long startOffset) {
return this;
}
- public Builder setStateBatches(List stateBatches) {
+ public Builder setStateBatches(List stateBatches) {
this.stateBatches = stateBatches;
return this;
}
diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java
new file mode 100644
index 000000000000..d02ec616d100
--- /dev/null
+++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PersisterStateBatchCombinerTest {
+ static class BatchTestHolder {
+ final String testName;
+ final List batchesSoFar;
+ final List newBatches;
+ final List expectedResult;
+ final long startOffset;
+ final boolean shouldRun;
+
+ BatchTestHolder(
+ String testName,
+ List batchesSoFar,
+ List newBatches,
+ List expectedResult,
+ long startOffset
+ ) {
+ this(testName, batchesSoFar, newBatches, expectedResult, startOffset, false);
+ }
+
+ BatchTestHolder(
+ String testName,
+ List batchesSoFar,
+ List newBatches,
+ List expectedResult,
+ long startOffset,
+ boolean shouldRun
+ ) {
+ this.testName = testName;
+ this.batchesSoFar = batchesSoFar;
+ this.newBatches = newBatches;
+ this.expectedResult = expectedResult;
+ this.startOffset = startOffset;
+ this.shouldRun = shouldRun;
+ }
+
+ static List singleBatch(
+ long firstOffset,
+ long lastOffset,
+ int deliveryState,
+ int deliveryCount
+ ) {
+ return Collections.singletonList(
+ new PersisterStateBatch(firstOffset, lastOffset, (byte) deliveryState, (short) deliveryCount)
+ );
+ }
+
+ static class MultiBatchBuilder {
+ private final List batchList = new LinkedList<>();
+
+ MultiBatchBuilder addBatch(
+ long firstOffset,
+ long lastOffset,
+ int deliveryState,
+ int deliveryCount
+ ) {
+ batchList.add(new PersisterStateBatch(firstOffset, lastOffset, (byte) deliveryState, (short) deliveryCount));
+ return this;
+ }
+
+ List build() {
+ return batchList;
+ }
+ }
+
+ static MultiBatchBuilder multiBatch() {
+ return new MultiBatchBuilder();
+ }
+
+ @Override
+ public String toString() {
+ return this.testName;
+ }
+ }
+
+ private static Stream generatorCornerCases() {
+ return Stream.of(
+ new BatchTestHolder(
+ "Current batches with start offset midway are pruned.",
+ BatchTestHolder.singleBatch(100, 130, 0, 1),
+ Collections.emptyList(),
+ BatchTestHolder.singleBatch(120, 130, 0, 1),
+ 120
+ ),
+
+ new BatchTestHolder(
+ "New batches with start offset midway are pruned.",
+ Collections.emptyList(),
+ BatchTestHolder.singleBatch(100, 130, 0, 1),
+ BatchTestHolder.singleBatch(120, 130, 0, 1),
+ 120
+ ),
+
+ new BatchTestHolder(
+ "Both current and new batches empty.",
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ 120
+ )
+ );
+ }
+
+ private static Stream generatorSameState() {
+ return Stream.of(
+ // same state
+ new BatchTestHolder(
+ "Same state. batchSoFar and newBatch have same first and last offset.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "Same state. batchSoFar and newBatch have same first offset, newBatch last offset strictly smaller.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 105, 0, 1),
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "Same state. batchSoFar and newBatch have same first offset, newBatch last offset strictly larger.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 115, 0, 1),
+ BatchTestHolder.singleBatch(100, 115, 0, 1),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "Same state. newBatch first offset strictly larger and last offset strictly smaller than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(105, 108, 0, 1),
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ -1,
+ true
+ ),
+
+ new BatchTestHolder(
+ "Same state. newBatch first offset strictly larger and last offset equal to batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(105, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "Same state. newBatch first offset strictly larger and last offset strictly larger than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(105, 115, 0, 1),
+ BatchTestHolder.singleBatch(100, 115, 0, 1),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "Same state. newBatch first offset is batchSoFar first offset + 1 (contiguous).",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(111, 115, 0, 1),
+ BatchTestHolder.singleBatch(100, 115, 0, 1),
+ -1
+ )
+ );
+ }
+
+ private static Stream generatorComplex() {
+ return Stream.of(
+ new BatchTestHolder(
+ "Handle overlapping batches in newBatches, same state",
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 110, 0, 1)
+ .addBatch(121, 130, 0, 1)
+ .build(),
+ BatchTestHolder.multiBatch()
+ .addBatch(111, 119, 2, 2)
+ .addBatch(116, 123, 2, 2) // overlap with first batch
+ .build(), // , //[(111-123, 2, 2)]
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 110, 0, 1)
+ .addBatch(111, 123, 2, 2)
+ .build(),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "Handle overlapping batches in newBatches, different priority.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.multiBatch()
+ .addBatch(101, 105, 1, 2)
+ .addBatch(101, 115, 2, 2)
+ .addBatch(101, 120, 3, 2) //[(111-123, 2, 2)]
+ .build(),
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 100, 0, 1)
+ .addBatch(101, 120, 3, 2)
+ .build(),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "Handle overlapping batches in newBatches, with pruning.",
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 110, 0, 1) // should get removed
+ .addBatch(121, 130, 0, 1)
+ .build(),
+ BatchTestHolder.multiBatch()
+ .addBatch(111, 119, 2, 2)
+ .addBatch(116, 123, 2, 2) // overlap with first batch //[(111-123, 2, 2)]
+ .build(),
+ BatchTestHolder.multiBatch()
+ .addBatch(120, 123, 2, 2)
+ .addBatch(124, 130, 0, 1)
+ .build(),
+ 120
+ ),
+
+ new BatchTestHolder(
+ "Multiple higher state batch updates.",
+ BatchTestHolder.singleBatch(111, 120, 0, 1),
+ BatchTestHolder.multiBatch()
+ .addBatch(111, 113, 0, 2)
+ .addBatch(114, 114, 2, 1)
+ .addBatch(115, 119, 0, 2) //[(111-123, 2, 2)]
+ .build(),
+ BatchTestHolder.multiBatch()
+ .addBatch(111, 113, 0, 2)
+ .addBatch(114, 114, 2, 1)
+ .addBatch(115, 119, 0, 2)
+ .addBatch(120, 120, 0, 1)
+ .build(),
+ -1
+ )
+ );
+ }
+
+ private static Stream generatorDifferentStates() {
+ return Stream.of(
+ // different states
+ new BatchTestHolder(
+ "newBatch higher state. newBatch first offset and last offset match batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 110, 0, 2),
+ BatchTestHolder.singleBatch(100, 110, 0, 2),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch lower state. newBatch first offset and last offset match batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 3),
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 110, 0, 3),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch higher state. newBatch first offset same and last offset smaller than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 105, 0, 2),
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 105, 0, 2)
+ .addBatch(106, 110, 0, 1)
+ .build(),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch lower state. newBatch first offset same and last offset smaller than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 3),
+ BatchTestHolder.singleBatch(100, 105, 0, 1),
+ BatchTestHolder.singleBatch(100, 110, 0, 3),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch higher state. newBatch first offset same and last offset strictly larger than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 115, 0, 2),
+ BatchTestHolder.singleBatch(100, 115, 0, 2),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch lower state. newBatch first offset same and last offset strictly larger than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 3),
+ BatchTestHolder.singleBatch(100, 115, 0, 1),
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 110, 0, 3)
+ .addBatch(111, 115, 0, 1)
+ .build(),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch higher state. newBatch first offset strictly larger and last offset strictly smaller than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 115, 0, 1),
+ BatchTestHolder.singleBatch(105, 110, 1, 1),
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 104, 0, 1)
+ .addBatch(105, 110, 1, 1)
+ .addBatch(111, 115, 0, 1)
+ .build(),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch lower state. newBatch first offset strictly larger and last offset strictly smaller than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 115, 1, 1),
+ BatchTestHolder.singleBatch(105, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 115, 1, 1),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch higher state. newBatch first offset strictly larger and last offset same as batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(105, 110, 0, 2),
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 104, 0, 1)
+ .addBatch(105, 110, 0, 2)
+ .build(),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch lower state. newBatch first offset strictly larger and last offset same as batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 2),
+ BatchTestHolder.singleBatch(105, 110, 0, 1),
+ BatchTestHolder.singleBatch(100, 110, 0, 2),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch higher state. newBatch first and last offsets strictly larger than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 1),
+ BatchTestHolder.singleBatch(105, 115, 0, 2),
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 104, 0, 1)
+ .addBatch(105, 115, 0, 2)
+ .build(),
+ -1
+ ),
+
+ new BatchTestHolder(
+ "newBatch lower state. newBatch first and last offsets strictly larger than batchSoFar.",
+ BatchTestHolder.singleBatch(100, 110, 0, 2),
+ BatchTestHolder.singleBatch(105, 115, 0, 1),
+ BatchTestHolder.multiBatch()
+ .addBatch(100, 110, 0, 2)
+ .addBatch(111, 115, 0, 1)
+ .build(),
+ -1
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("generatorDifferentStates")
+ public void testStateBatchCombineDifferentStates(BatchTestHolder test) {
+ if (test.shouldRun) {
+ assertEquals(test.expectedResult,
+ new PersisterStateBatchCombiner(
+ test.batchesSoFar,
+ test.newBatches,
+ test.startOffset)
+ .combineStateBatches(),
+ test.testName
+ );
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("generatorSameState")
+ public void testStateBatchCombineSameState(BatchTestHolder test) {
+ if (test.shouldRun) {
+ assertEquals(test.expectedResult,
+ new PersisterStateBatchCombiner(
+ test.batchesSoFar,
+ test.newBatches,
+ test.startOffset)
+ .combineStateBatches(),
+ test.testName
+ );
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("generatorComplex")
+ public void testStateBatchCombineComplexCases(BatchTestHolder test) {
+ if (test.shouldRun) {
+ assertEquals(test.expectedResult,
+ new PersisterStateBatchCombiner(
+ test.batchesSoFar,
+ test.newBatches,
+ test.startOffset)
+ .combineStateBatches(),
+ test.testName
+ );
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("generatorCornerCases")
+ public void testStateBatchCombineCornerCases(BatchTestHolder test) {
+ if (test.shouldRun) {
+ assertEquals(test.expectedResult,
+ new PersisterStateBatchCombiner(
+ test.batchesSoFar,
+ test.newBatches,
+ test.startOffset)
+ .combineStateBatches(),
+ test.testName
+ );
+ }
+ }
+}
diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
index 70fcccb6f355..ee1547212fcf 100644
--- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
+++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
@@ -23,6 +23,7 @@
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.share.PersisterStateBatch;
import org.junit.jupiter.api.Test;
@@ -36,7 +37,7 @@ public void testNewShareSnapshotRecord() {
String groupId = "test-group";
Uuid topicId = Uuid.randomUuid();
int partitionId = 1;
- PersisterOffsetsStateBatch batch = new PersisterOffsetsStateBatch(1L, 10L, (byte) 0, (short) 1);
+ PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0, (short) 1);
CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
groupId,
topicId,
@@ -79,7 +80,7 @@ public void testNewShareUpdateRecord() {
String groupId = "test-group";
Uuid topicId = Uuid.randomUuid();
int partitionId = 1;
- PersisterOffsetsStateBatch batch = new PersisterOffsetsStateBatch(1L, 10L, (byte) 0, (short) 1);
+ PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0, (short) 1);
CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
groupId,
topicId,
diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 50c0f5de3e21..15763d1af1f0 100644
--- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -40,6 +40,7 @@
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.config.ShareCoordinatorConfig;
+import org.apache.kafka.server.share.PersisterStateBatch;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -53,7 +54,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -312,7 +312,9 @@ public void testSubsequentWriteStateSnapshotEpochUpdatesSuccessfully() {
assertEquals(incrementalUpdate.snapshotEpoch(), combinedState.snapshotEpoch());
assertEquals(incrementalUpdate.leaderEpoch(), combinedState.leaderEpoch());
assertEquals(incrementalUpdate.startOffset(), combinedState.startOffset());
- assertTrue(combinedState.stateBatchAsSet().containsAll(incrementalUpdate.stateBatchAsSet()));
+ // the batches should have combined to 1 since same state
+ assertEquals(Collections.singletonList(new PersisterStateBatch(0, 20, (byte) 0, (short) 1)),
+ combinedState.stateBatches());
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
}
@@ -773,8 +775,8 @@ public void testNonSequentialBatchUpdates() {
.setStateEpoch(0)
.setSnapshotEpoch(2) // since 2nd share snapshot
.setStateBatches(Arrays.asList(
- new PersisterOffsetsStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost
- new PersisterOffsetsStateBatch(120, 129, (byte) 2, (short) 1)
+ new PersisterStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost
+ new PersisterStateBatch(120, 129, (byte) 2, (short) 1)
))
.build();
List expectedRecordsFinal = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
diff --git a/share/src/main/java/org/apache/kafka/server/share/PersisterStateBatch.java b/share/src/main/java/org/apache/kafka/server/share/PersisterStateBatch.java
index 5201d75545ad..95bee3e4ce35 100644
--- a/share/src/main/java/org/apache/kafka/server/share/PersisterStateBatch.java
+++ b/share/src/main/java/org/apache/kafka/server/share/PersisterStateBatch.java
@@ -25,11 +25,11 @@
/**
* This class contains the information for a single batch of state information for use by the {@link Persister}.
*/
-public class PersisterStateBatch {
+public class PersisterStateBatch implements Comparable {
private final long firstOffset;
private final long lastOffset;
- private final byte deliveryState;
private final short deliveryCount;
+ private final byte deliveryState;
public PersisterStateBatch(long firstOffset, long lastOffset, byte deliveryState, short deliveryCount) {
this.firstOffset = firstOffset;
@@ -77,13 +77,13 @@ public boolean equals(Object o) {
PersisterStateBatch that = (PersisterStateBatch) o;
return firstOffset == that.firstOffset &&
lastOffset == that.lastOffset &&
- deliveryState == that.deliveryState &&
- deliveryCount == that.deliveryCount;
+ deliveryCount == that.deliveryCount &&
+ deliveryState == that.deliveryState;
}
@Override
public int hashCode() {
- return Objects.hash(firstOffset, lastOffset, deliveryState, deliveryCount);
+ return Objects.hash(firstOffset, lastOffset, deliveryCount, deliveryState);
}
@Override
@@ -91,8 +91,47 @@ public String toString() {
return "PersisterStateBatch(" +
"firstOffset=" + firstOffset + "," +
"lastOffset=" + lastOffset + "," +
- "deliveryState=" + deliveryState + "," +
- "deliveryCount=" + deliveryCount +
+ "deliveryCount=" + deliveryCount + "," +
+ "deliveryState=" + deliveryState +
")";
}
+
+ /**
+ * Compares 2 PersisterStateBatches in various dimensions.
+ * The priority of the dimensions are:
+ * - firstOffset
+ * - lastOffset
+ * - deliveryCount
+ * - deliveryState
+ *
+ * Does not check all dimensions in every case. The first dimension
+ * check resulting in non-zero comparison result is returned.
+ *
+ * In case the 2 objects are equal, all 4 dimension comparisons must
+ * be 0.
+ *
+ * This method could be used for storing PersisterStateBatch objects
+ * in containers which allow a Comparator argument or various sort algorithms
+ * in the java library.
+ *
+ * @param o - object representing another PersisterStateBatch
+ * @return -INT, 0, +INT based on "this" being smaller, equal or larger than the argument.
+ */
+ @Override
+ public int compareTo(Object o) {
+ PersisterStateBatch that = (PersisterStateBatch) o;
+ int deltaFirst = Long.compare(this.firstOffset(), that.firstOffset());
+ if (deltaFirst == 0) {
+ int deltaLast = Long.compare(this.lastOffset(), that.lastOffset());
+ if (deltaLast == 0) {
+ int deltaCount = this.deliveryCount() - that.deliveryCount();
+ if (deltaCount == 0) {
+ return Byte.compare(this.deliveryState(), that.deliveryState());
+ }
+ return deltaCount;
+ }
+ return deltaLast;
+ }
+ return deltaFirst;
+ }
}
diff --git a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
index 8a2d7eb4f463..29ce801aff32 100644
--- a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
+++ b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
@@ -69,6 +69,14 @@ public static SharePartitionKey getInstance(String groupId, Uuid topicId, int pa
return new SharePartitionKey(groupId, topicId, partition);
}
+ public String asCoordinatorKey() {
+ return asCoordinatorKey(groupId(), topicId(), partition());
+ }
+
+ public static String asCoordinatorKey(String groupId, Uuid topicId, int partition) {
+ return String.format("%s:%s:%d", groupId, topicId, partition);
+ }
+
@Override
public boolean equals(final Object obj) {
if (this == obj)