diff --git a/checkstyle/import-control-share-coordinator.xml b/checkstyle/import-control-share-coordinator.xml index e31d0f48eed8..d574588ac65c 100644 --- a/checkstyle/import-control-share-coordinator.xml +++ b/checkstyle/import-control-share-coordinator.xml @@ -62,6 +62,8 @@ + + diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index a1c941996c95..6dacb3d91d70 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -341,6 +341,12 @@ + + + + diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterOffsetsStateBatch.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterOffsetsStateBatch.java deleted file mode 100644 index ff9feaf19186..000000000000 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterOffsetsStateBatch.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.coordinator.share; - -import org.apache.kafka.common.message.WriteShareGroupStateRequestData; -import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; -import org.apache.kafka.server.share.PersisterStateBatch; - -import java.util.Objects; - -/** - * This is a helper class which overrides the equals and hashcode - * methods to only focus on the first and last offset fields of the - * state batch. This is useful when combining batches. - */ -public class PersisterOffsetsStateBatch { - private final PersisterStateBatch delegate; - - public PersisterOffsetsStateBatch( - long firstOffset, - long lastOffset, - byte deliveryState, - short deliveryCount - ) { - delegate = new PersisterStateBatch(firstOffset, lastOffset, deliveryState, deliveryCount); - } - - public long firstOffset() { - return delegate.firstOffset(); - } - - public long lastOffset() { - return delegate.lastOffset(); - } - - public byte deliveryState() { - return delegate.deliveryState(); - } - - public short deliveryCount() { - return delegate.deliveryCount(); - } - - public static PersisterOffsetsStateBatch from(WriteShareGroupStateRequestData.StateBatch batch) { - return new PersisterOffsetsStateBatch( - batch.firstOffset(), - batch.lastOffset(), - batch.deliveryState(), - batch.deliveryCount() - ); - } - - public static PersisterOffsetsStateBatch from(ShareUpdateValue.StateBatch batch) { - return new PersisterOffsetsStateBatch( - batch.firstOffset(), - batch.lastOffset(), - batch.deliveryState(), - batch.deliveryCount() - ); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof PersisterOffsetsStateBatch)) { - return false; - } - PersisterOffsetsStateBatch that = (PersisterOffsetsStateBatch) o; - return this.firstOffset() == that.firstOffset() && this.lastOffset() == that.lastOffset(); - } - - @Override - public int hashCode() { - return Objects.hash(firstOffset(), lastOffset()); - } - - @Override - public String toString() { - return "PersisterOffsetsStateBatch(" + - "firstOffset=" + firstOffset() + "," + - "lastOffset=" + lastOffset() + "," + - "deliveryState=" + deliveryState() + "," + - "deliveryCount=" + deliveryCount() + - ")"; - } -} diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombiner.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombiner.java new file mode 100644 index 000000000000..67f21e4c1c7e --- /dev/null +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombiner.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class PersisterStateBatchCombiner { + private List combinedBatchList; // link between pruning and merging + private final long startOffset; + private TreeSet sortedBatches; + private List finalBatchList; // final list is built here + + public PersisterStateBatchCombiner( + List batchesSoFar, + List newBatches, + long startOffset + ) { + initializeCombinedList(batchesSoFar, newBatches); + int estimatedResultSize = (combinedBatchList.size() * 3) / 2; // heuristic size - 50% overallocation + finalBatchList = new ArrayList<>(estimatedResultSize); + this.startOffset = startOffset; + } + + private void initializeCombinedList(List batchesSoFar, List newBatches) { + boolean soFarEmpty = batchesSoFar == null || batchesSoFar.isEmpty(); + boolean newBatchesEmpty = newBatches == null || newBatches.isEmpty(); + + if (soFarEmpty && newBatchesEmpty) { + combinedBatchList = new ArrayList<>(); + } else if (soFarEmpty) { + combinedBatchList = new ArrayList<>(newBatches); // new list as the original one could be unmodifiable + } else if (newBatchesEmpty) { + combinedBatchList = new ArrayList<>(batchesSoFar); // new list as the original one could be unmodifiable + } else { + combinedBatchList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); + combinedBatchList.addAll(batchesSoFar); + combinedBatchList.addAll(newBatches); + } + } + + /** + * Algorithm: Merge current state batches and new batches into a single non-overlapping batch list. + * Input: batchesSoFar, newBatches, startOffset + * Output: combined list with non-overlapping batches (finalBatchList) + *

+ * - Add both currentBatches and newBatches into a single list combinedBatchList + * - if combinedBatchList.size() <= 1 return combinedBatchList + *

+ * - Remove/prune any batches from the combinedBatchList: + * - if batch.lastOffset < startOffset then remove batch from combinedBatchList + * - else if batch.firstOffset > startOffset then we will keep the batch + * - else if batch.firstOffset <= startOffset <= batch.lastOffset then keep [startOffset, batch.lastOffset] part only and discard rest. + *

+ * - create a treeset sortedBatches using pruned combinedBatchList + * - find first 2 mergeable batches in sortedBatches set, say, prev and candidate. + * - remove any non-overlapping batches from sortedBatches encountered during the find operation and add them to a finalBatchList + * - do repeat until a mergeable pair is not found: + * - based on various conditions of offset overlap and batch state differences combine the batches or + * create new batches, if required, and add to the sortedBatches. + * - find first 2 mergeable batches in sortedBatches set, say, prev and candidate. + * - remove any non-mergeable batches from sortedBatches encountered during the find operation and add them to a finalBatchList + * - done + * - return the finalBatchList + * + * @return list of {@link PersisterStateBatch} representing non-overlapping combined batches + */ + public List combineStateBatches() { + pruneBatches(); + mergeBatches(); + return finalBatchList; + } + + private void mergeBatches() { + if (combinedBatchList.size() < 2) { + finalBatchList = combinedBatchList; + return; + } + + sortedBatches = new TreeSet<>(combinedBatchList); + + MergeCandidatePair overlapState = getMergeCandidatePair(); + + while (overlapState != MergeCandidatePair.EMPTY) { + PersisterStateBatch prev = overlapState.prev(); + PersisterStateBatch candidate = overlapState.candidate(); + + // remove both previous and candidate for easier + // assessment about adding batches to sortedBatches + sortedBatches.remove(prev); + sortedBatches.remove(candidate); + + if (compareBatchDeliveryInfo(candidate, prev) == 0) { // same state and overlap or contiguous + // overlap and same state (prev.firstOffset <= candidate.firstOffset) due to sort + // covers: + // case: 1 2 3 4 5 6 7 (contiguous) + // prev: ------ ------- ------- ------- ------- -------- ------- + // candidate: ------ ---- ---------- --- ---- ------- ------- + handleSameStateMerge(prev, candidate); // pair can be contiguous or overlapping + } else { + // If we reach here then it is guaranteed that the batch pair is overlapping and + // non-contiguous because getMergeCandidatePair only returns contiguous pair if + // the constituents have the same delivery count and state. + // covers: + // case: 1 2* 3 4 5 6 7* + // prev: ------ ------- ------- ------- ------- -------- ------ + // candidate: ------ ---- --------- ---- ---- ------- ------- + // max batches: 1 2 2 3 2 2 2 + // min batches: 1 1 1 1 1 2 1 + // * not possible with treeset + handleDiffStateOverlap(prev, candidate); + } + overlapState = getMergeCandidatePair(); + } + finalBatchList.addAll(sortedBatches); // some non overlapping batches might have remained + } + + /** + * Compares the non-offset state of 2 batches i.e. the deliveryCount and deliverState. + *

+ * Uses standard compareTo contract x < y => +int, x > y => -int, x == y => 0 + * + * @param b1 - {@link PersisterStateBatch} to compare + * @param b2 - {@link PersisterStateBatch} to compare + * @return int representing comparison result. + */ + private int compareBatchDeliveryInfo(PersisterStateBatch b1, PersisterStateBatch b2) { + int deltaCount = Short.compare(b1.deliveryCount(), b2.deliveryCount()); + + // Delivery state could be: + // 0 - AVAILABLE (non-terminal) + // 1 - ACQUIRED - should not be persisted yet + // 2 - ACKNOWLEDGED (terminal) + // 3 - ARCHIVING - not implemented in KIP-932 - non-terminal - leads only to ARCHIVED + // 4 - ARCHIVED (terminal) + + if (deltaCount == 0) { // same delivery count + return Byte.compare(b1.deliveryState(), b2.deliveryState()); + } + return deltaCount; + } + + /** + * Accepts a sorted set of state batches and finds the first 2 batches which can be merged. + * Merged implies that they have some offsets in common or, they are contiguous with the same state. + *

+ * Any non-mergeable batches prefixing a good mergeable pair are removed from the sortedBatches. + * For example: + * ----- ---- ----- ----- ----- + * ------ + * <---------------> <--------> + * non-overlapping 1st overlapping pair + * + * @return object representing the overlap state + */ + private MergeCandidatePair getMergeCandidatePair() { + if (sortedBatches == null || sortedBatches.isEmpty()) { + return MergeCandidatePair.EMPTY; + } + Iterator iter = sortedBatches.iterator(); + PersisterStateBatch prev = iter.next(); + List nonOverlapping = new ArrayList<>(sortedBatches.size()); + while (iter.hasNext()) { + PersisterStateBatch candidate = iter.next(); + if (candidate.firstOffset() <= prev.lastOffset() || // overlap + prev.lastOffset() + 1 == candidate.firstOffset() && compareBatchDeliveryInfo(prev, candidate) == 0) { // contiguous + updateBatchContainers(nonOverlapping); + return new MergeCandidatePair( + prev, + candidate + ); + } + nonOverlapping.add(prev); + prev = candidate; + } + + updateBatchContainers(nonOverlapping); + return MergeCandidatePair.EMPTY; + } + + private void updateBatchContainers(List nonOverlappingBatches) { + nonOverlappingBatches.forEach(sortedBatches::remove); + finalBatchList.addAll(nonOverlappingBatches); + } + + /** + * Accepts a list of {@link PersisterStateBatch} and checks: + * - last offset is < start offset => batch is removed + * - first offset > start offset => batch is preserved + * - start offset intersects the batch => part of batch before start offset is removed and + * the part after it is preserved. + */ + private void pruneBatches() { + if (startOffset != -1) { + List retainedBatches = new ArrayList<>(combinedBatchList.size()); + combinedBatchList.forEach(batch -> { + if (batch.lastOffset() < startOffset) { + // batch is expired, skip current iteration + // ------- + // | -> start offset + return; + } + + if (batch.firstOffset() >= startOffset) { + // complete batch is valid + // --------- + // | -> start offset + retainedBatches.add(batch); + } else { + // start offset intersects batch + // --------- + // | -> start offset + retainedBatches.add(new PersisterStateBatch(startOffset, batch.lastOffset(), batch.deliveryState(), batch.deliveryCount())); + } + }); + // update the instance variable + combinedBatchList = retainedBatches; + } + } + + private void handleSameStateMerge(PersisterStateBatch prev, PersisterStateBatch candidate) { + sortedBatches.add(new PersisterStateBatch( + prev.firstOffset(), + // cover cases + // prev: ------ -------- --------- + // candidate: --- ---------- ----- + Math.max(candidate.lastOffset(), prev.lastOffset()), + prev.deliveryState(), + prev.deliveryCount() + )); + } + + private void handleDiffStateOverlap(PersisterStateBatch prev, PersisterStateBatch candidate) { + if (candidate.firstOffset() == prev.firstOffset()) { + handleDiffStateOverlapFirstOffsetAligned(prev, candidate); + } else { // candidate.firstOffset() > prev.firstOffset() + handleDiffStateOverlapFirstOffsetNotAligned(prev, candidate); + } + } + + private void handleDiffStateOverlapFirstOffsetAligned(PersisterStateBatch prev, PersisterStateBatch candidate) { + if (candidate.lastOffset() == prev.lastOffset()) { // case 1 + // candidate can never have lower or equal priority + // since sortedBatches order takes that into account. + // ------- + // ------- + sortedBatches.add(candidate); + } else { + // case 2 is not possible with TreeSet. It is symmetric to case 3. + // case 3 + // -------- + // ----------- + if (compareBatchDeliveryInfo(candidate, prev) < 0) { + sortedBatches.add(prev); + sortedBatches.add(new PersisterStateBatch( + prev.lastOffset() + 1, + candidate.lastOffset(), + candidate.deliveryState(), + candidate.deliveryCount() + )); + } else { + // candidate priority is >= prev + sortedBatches.add(candidate); + } + } + } + + private void handleDiffStateOverlapFirstOffsetNotAligned(PersisterStateBatch prev, PersisterStateBatch candidate) { + if (candidate.lastOffset() < prev.lastOffset()) { // case 4 + handleDiffStateOverlapPrevSwallowsCandidate(prev, candidate); + } else if (candidate.lastOffset() == prev.lastOffset()) { // case 5 + handleDiffStateOverlapLastOffsetAligned(prev, candidate); + } else { // case 6 + handleDiffStateOverlapCandidateOffsetsLarger(prev, candidate); + } + } + + private void handleDiffStateOverlapPrevSwallowsCandidate(PersisterStateBatch prev, PersisterStateBatch candidate) { + // -------- + // ---- + if (compareBatchDeliveryInfo(candidate, prev) < 0) { + sortedBatches.add(prev); + } else { + sortedBatches.add(new PersisterStateBatch( + prev.firstOffset(), + candidate.firstOffset() - 1, + prev.deliveryState(), + prev.deliveryCount() + )); + + sortedBatches.add(candidate); + + sortedBatches.add(new PersisterStateBatch( + candidate.lastOffset() + 1, + prev.lastOffset(), + prev.deliveryState(), + prev.deliveryCount() + )); + } + } + + private void handleDiffStateOverlapLastOffsetAligned(PersisterStateBatch prev, PersisterStateBatch candidate) { + // -------- + // ----- + if (compareBatchDeliveryInfo(candidate, prev) < 0) { + sortedBatches.add(prev); + } else { + sortedBatches.add(new PersisterStateBatch( + prev.firstOffset(), + candidate.firstOffset() - 1, + prev.deliveryState(), + prev.deliveryCount() + )); + + sortedBatches.add(candidate); + } + } + + private void handleDiffStateOverlapCandidateOffsetsLarger(PersisterStateBatch prev, PersisterStateBatch candidate) { + // ------- + // ------- + if (compareBatchDeliveryInfo(candidate, prev) < 0) { + sortedBatches.add(prev); + + sortedBatches.add(new PersisterStateBatch( + prev.lastOffset() + 1, + candidate.lastOffset(), + candidate.deliveryState(), + candidate.deliveryCount() + )); + } else { + // candidate has higher priority + sortedBatches.add(new PersisterStateBatch( + prev.firstOffset(), + candidate.firstOffset() - 1, + prev.deliveryState(), + prev.deliveryCount() + )); + + sortedBatches.add(candidate); + } + } + + /** + * Holder class for intermediate state + * used in the batch merge algorithm. + */ + static class MergeCandidatePair { + private final PersisterStateBatch prev; + private final PersisterStateBatch candidate; + public static final MergeCandidatePair EMPTY = new MergeCandidatePair(null, null); + + public MergeCandidatePair( + PersisterStateBatch prev, + PersisterStateBatch candidate + ) { + this.prev = prev; + this.candidate = candidate; + } + + public PersisterStateBatch prev() { + return prev; + } + + public PersisterStateBatch candidate() { + return candidate; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof MergeCandidatePair)) return false; + MergeCandidatePair that = (MergeCandidatePair) o; + return Objects.equals(prev, that.prev) && Objects.equals(candidate, that.candidate); + } + + @Override + public int hashCode() { + return Objects.hash(prev, candidate); + } + } +} diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index a5bf61920245..3e4c39f01066 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -48,19 +48,16 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.config.ShareCoordinatorConfig; import org.apache.kafka.server.share.PartitionFactory; +import org.apache.kafka.server.share.PersisterStateBatch; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.slf4j.Logger; -import java.util.Collection; import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; public class ShareCoordinatorShard implements CoordinatorShard { @@ -78,6 +75,7 @@ public class ShareCoordinatorShard implements CoordinatorShard { private ShareCoordinatorConfig config; @@ -262,7 +260,6 @@ public void replayEndTransactionMarker(long producerId, short producerEpoch, Tra * @param request - WriteShareGroupStateRequestData for a single key * @return CoordinatorResult(records, response) */ - @SuppressWarnings("NPathComplexity") public CoordinatorResult writeState( WriteShareGroupStateRequestData request ) { @@ -274,79 +271,106 @@ public CoordinatorResult wr return error.get(); } - String groupId = request.groupId(); WriteShareGroupStateRequestData.WriteStateData topicData = request.topics().get(0); WriteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0); + SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition()); + + CoordinatorRecord record = generateShareStateRecord(partitionData, key); + // build successful response if record is correctly created + WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData() + .setResults( + Collections.singletonList( + WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(), + Collections.singletonList( + WriteShareGroupStateResponse.toResponsePartitionResult( + key.partition() + )) + )) + ); - SharePartitionKey key = SharePartitionKey.getInstance(groupId, topicData.topicId(), partitionData.partition()); - List recordList; + return new CoordinatorResult<>(Collections.singletonList(record), responseData); + } + /** + * Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions. + *

+ * if no snapshot has been created for the key => create a new ShareSnapshot record + * else if number of ShareUpdate records for key >= max allowed per snapshot per key => create a new ShareSnapshot record + * else create a new ShareUpdate record + * + * @param partitionData - Represents the data which should be written into the share state record. + * @param key - The {@link SharePartitionKey} object. + * @return {@link CoordinatorRecord} representing ShareSnapshot or ShareUpdate + */ + private CoordinatorRecord generateShareStateRecord( + WriteShareGroupStateRequestData.PartitionData partitionData, + SharePartitionKey key + ) { if (!shareStateMap.containsKey(key)) { - // since this is the first time we are getting a write request, we should be creating a share snapshot record - recordList = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( - groupId, topicData.topicId(), partitionData.partition(), ShareGroupOffset.fromRequest(partitionData) - )); + // Since this is the first time we are getting a write request for key, we should be creating a share snapshot record. + // The incoming partition data could have overlapping state batches, we must merge them + return ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + key.groupId(), key.topicId(), partitionData.partition(), + new ShareGroupOffset.Builder() + .setSnapshotEpoch(0) + .setStartOffset(partitionData.startOffset()) + .setLeaderEpoch(partitionData.leaderEpoch()) + .setStateEpoch(partitionData.stateEpoch()) + .setStateBatches(mergeBatches(Collections.emptyList(), partitionData)) + .build()); } else if (snapshotUpdateCount.getOrDefault(key, 0) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) { - int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? shareStateMap.get(key).leaderEpoch() : partitionData.leaderEpoch(); - int newStateEpoch = partitionData.stateEpoch() == -1 ? shareStateMap.get(key).stateEpoch() : partitionData.stateEpoch(); - long newStartOffset = partitionData.startOffset() == -1 ? shareStateMap.get(key).startOffset() : partitionData.startOffset(); + ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true + int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? currentState.leaderEpoch() : partitionData.leaderEpoch(); + int newStateEpoch = partitionData.stateEpoch() == -1 ? currentState.stateEpoch() : partitionData.stateEpoch(); + long newStartOffset = partitionData.startOffset() == -1 ? currentState.startOffset() : partitionData.startOffset(); // Since the number of update records for this share part key exceeds snapshotUpdateRecordsPerSnapshot, // we should be creating a share snapshot record. - List batchesToAdd = combineStateBatches( - shareStateMap.get(key).stateBatchAsSet(), - partitionData.stateBatches().stream() - .map(PersisterOffsetsStateBatch::from) - .collect(Collectors.toCollection(LinkedHashSet::new)), - newStartOffset); - - recordList = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( - groupId, topicData.topicId(), partitionData.partition(), + // The incoming partition data could have overlapping state batches, we must merge them. + return ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + key.groupId(), key.topicId(), partitionData.partition(), new ShareGroupOffset.Builder() + .setSnapshotEpoch(currentState.snapshotEpoch() + 1) // we must increment snapshot epoch as this is new snapshot .setStartOffset(newStartOffset) .setLeaderEpoch(newLeaderEpoch) .setStateEpoch(newStateEpoch) - .setStateBatches(batchesToAdd) - .build())); + .setStateBatches(mergeBatches(currentState.stateBatches(), partitionData, newStartOffset)) + .build()); } else { - // share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot - recordList = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( - groupId, topicData.topicId(), partitionData.partition(), ShareGroupOffset.fromRequest(partitionData, shareStateMap.get(key).snapshotEpoch()) - )); - } + ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true - List validRecords = new LinkedList<>(); - - WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData(); - for (CoordinatorRecord record : recordList) { // should be single record - if (!(record.key().message() instanceof ShareSnapshotKey) && !(record.key().message() instanceof ShareUpdateKey)) { - continue; - } - SharePartitionKey mapKey = null; - boolean shouldIncSnapshotEpoch = false; - if (record.key().message() instanceof ShareSnapshotKey) { - ShareSnapshotKey recordKey = (ShareSnapshotKey) record.key().message(); - responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult( - recordKey.topicId(), Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult( - recordKey.partition()))))); - mapKey = SharePartitionKey.getInstance(recordKey.groupId(), recordKey.topicId(), recordKey.partition()); - shouldIncSnapshotEpoch = true; - } else if (record.key().message() instanceof ShareUpdateKey) { - ShareUpdateKey recordKey = (ShareUpdateKey) record.key().message(); - responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult( - recordKey.topicId(), Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult( - recordKey.partition()))))); - mapKey = SharePartitionKey.getInstance(recordKey.groupId(), recordKey.topicId(), recordKey.partition()); - } - - if (shareStateMap.containsKey(mapKey) && shouldIncSnapshotEpoch) { - ShareGroupOffset oldValue = shareStateMap.get(mapKey); - ((ShareSnapshotValue) record.value().message()).setSnapshotEpoch(oldValue.snapshotEpoch() + 1); // increment the snapshot epoch - } - validRecords.add(record); // this will have updated snapshot epoch and on replay the value will trickle down to the map + // Share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot + // so create a share update record. + // The incoming partition data could have overlapping state batches, we must merge them. + return ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( + key.groupId(), key.topicId(), partitionData.partition(), + new ShareGroupOffset.Builder() + .setSnapshotEpoch(currentState.snapshotEpoch()) // use same snapshotEpoch as last share snapshot + .setStartOffset(partitionData.startOffset()) + .setLeaderEpoch(partitionData.leaderEpoch()) + .setStateBatches(mergeBatches(Collections.emptyList(), partitionData)) + .build()); } + } + + private List mergeBatches( + List soFar, + WriteShareGroupStateRequestData.PartitionData partitionData) { + return mergeBatches(soFar, partitionData, partitionData.startOffset()); + } - return new CoordinatorResult<>(validRecords, responseData); + private List mergeBatches( + List soFar, + WriteShareGroupStateRequestData.PartitionData partitionData, + long startOffset) { + return new PersisterStateBatchCombiner( + soFar, + partitionData.stateBatches().stream() + .map(PersisterStateBatch::from) + .collect(Collectors.toList()), + startOffset + ) + .combineStateBatches(); } /** @@ -527,7 +551,7 @@ CoordinatorMetricsShard getMetricsShard() { private static ShareGroupOffset merge(ShareGroupOffset soFar, ShareUpdateValue newData) { // snapshot epoch should be same as last share snapshot // state epoch is not present - Set currentBatches = soFar.stateBatchAsSet(); + List currentBatches = soFar.stateBatches(); long newStartOffset = newData.startOffset() == -1 ? soFar.startOffset() : newData.startOffset(); int newLeaderEpoch = newData.leaderEpoch() == -1 ? soFar.leaderEpoch() : newData.leaderEpoch(); @@ -536,41 +560,13 @@ private static ShareGroupOffset merge(ShareGroupOffset soFar, ShareUpdateValue n .setStateEpoch(soFar.stateEpoch()) .setStartOffset(newStartOffset) .setLeaderEpoch(newLeaderEpoch) - .setStateBatches(combineStateBatches(currentBatches, newData.stateBatches().stream() - .map(PersisterOffsetsStateBatch::from) - .collect(Collectors.toCollection(LinkedHashSet::new)), newStartOffset)) + .setStateBatches(new PersisterStateBatchCombiner(currentBatches, newData.stateBatches().stream() + .map(ShareCoordinatorShard::toPersisterStateBatch) + .collect(Collectors.toList()), newStartOffset) + .combineStateBatches()) .build(); } - /** - * Util method which takes in 2 collections containing {@link PersisterOffsetsStateBatch} - * and the startOffset. - * It removes all batches from the 1st collection which have the same first and last offset - * as the batches in 2nd collection. It then creates a final list of batches which contains the - * former result and all the batches in the 2nd collection. In set notation (A - B) U B (we prefer batches in B - * which have same first and last offset in A). - * Finally, it removes any batches where the lastOffset < startOffset, if the startOffset > -1. - * @param currentBatch - collection containing current soft state of batches - * @param newBatch - collection containing batches in incoming request - * @param startOffset - startOffset to consider when removing old batches. - * @return List containing combined batches - */ - private static List combineStateBatches( - Collection currentBatch, - Collection newBatch, - long startOffset - ) { - currentBatch.removeAll(newBatch); - List batchesToAdd = new LinkedList<>(currentBatch); - batchesToAdd.addAll(newBatch); - // Any batches where the last offset is < the current start offset - // are now expired. We should remove them from the persister. - if (startOffset != -1) { - batchesToAdd.removeIf(batch -> batch.lastOffset() < startOffset); - } - return batchesToAdd; - } - private static ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) { if (apiMessageAndVersion == null) { return null; @@ -578,4 +574,20 @@ private static ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersio return apiMessageAndVersion.message(); } } + + /** + * Util function to convert a state batch of type {@link ShareUpdateValue.StateBatch } + * to {@link PersisterStateBatch}. + * + * @param batch - The object representing {@link ShareUpdateValue.StateBatch} + * @return {@link PersisterStateBatch} + */ + private static PersisterStateBatch toPersisterStateBatch(ShareUpdateValue.StateBatch batch) { + return new PersisterStateBatch( + batch.firstOffset(), + batch.lastOffset(), + batch.deliveryState(), + batch.deliveryCount() + ); + } } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java index 745b3e056b29..8e678098d66c 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java @@ -20,12 +20,12 @@ import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue; import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; /** @@ -37,13 +37,13 @@ public class ShareGroupOffset { private final int stateEpoch; private final int leaderEpoch; private final long startOffset; - private final List stateBatches; + private final List stateBatches; private ShareGroupOffset(int snapshotEpoch, int stateEpoch, int leaderEpoch, long startOffset, - List stateBatches) { + List stateBatches) { this.snapshotEpoch = snapshotEpoch; this.stateEpoch = stateEpoch; this.leaderEpoch = leaderEpoch; @@ -67,16 +67,16 @@ public long startOffset() { return startOffset; } - public List stateBatches() { + public List stateBatches() { return Collections.unmodifiableList(stateBatches); } - private static PersisterOffsetsStateBatch toPersisterOffsetsStateBatch(ShareSnapshotValue.StateBatch stateBatch) { - return new PersisterOffsetsStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount()); + private static PersisterStateBatch toPersisterOffsetsStateBatch(ShareSnapshotValue.StateBatch stateBatch) { + return new PersisterStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount()); } - private static PersisterOffsetsStateBatch toPersisterOffsetsStateBatch(ShareUpdateValue.StateBatch stateBatch) { - return new PersisterOffsetsStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount()); + private static PersisterStateBatch toPersisterOffsetsStateBatch(ShareUpdateValue.StateBatch stateBatch) { + return new PersisterStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount()); } public static ShareGroupOffset fromRecord(ShareSnapshotValue record) { @@ -99,11 +99,11 @@ public static ShareGroupOffset fromRequest(WriteShareGroupStateRequestData.Parti data.leaderEpoch(), data.startOffset(), data.stateBatches().stream() - .map(PersisterOffsetsStateBatch::from) + .map(PersisterStateBatch::from) .collect(Collectors.toList())); } - public Set stateBatchAsSet() { + public LinkedHashSet stateBatchAsSet() { return new LinkedHashSet<>(stateBatches); } @@ -112,7 +112,7 @@ public static class Builder { private int stateEpoch; private int leaderEpoch; private long startOffset; - private List stateBatches; + private List stateBatches; public Builder setSnapshotEpoch(int snapshotEpoch) { this.snapshotEpoch = snapshotEpoch; @@ -134,7 +134,7 @@ public Builder setStartOffset(long startOffset) { return this; } - public Builder setStateBatches(List stateBatches) { + public Builder setStateBatches(List stateBatches) { this.stateBatches = stateBatches; return this; } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java new file mode 100644 index 000000000000..d02ec616d100 --- /dev/null +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.server.share.PersisterStateBatch; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class PersisterStateBatchCombinerTest { + static class BatchTestHolder { + final String testName; + final List batchesSoFar; + final List newBatches; + final List expectedResult; + final long startOffset; + final boolean shouldRun; + + BatchTestHolder( + String testName, + List batchesSoFar, + List newBatches, + List expectedResult, + long startOffset + ) { + this(testName, batchesSoFar, newBatches, expectedResult, startOffset, false); + } + + BatchTestHolder( + String testName, + List batchesSoFar, + List newBatches, + List expectedResult, + long startOffset, + boolean shouldRun + ) { + this.testName = testName; + this.batchesSoFar = batchesSoFar; + this.newBatches = newBatches; + this.expectedResult = expectedResult; + this.startOffset = startOffset; + this.shouldRun = shouldRun; + } + + static List singleBatch( + long firstOffset, + long lastOffset, + int deliveryState, + int deliveryCount + ) { + return Collections.singletonList( + new PersisterStateBatch(firstOffset, lastOffset, (byte) deliveryState, (short) deliveryCount) + ); + } + + static class MultiBatchBuilder { + private final List batchList = new LinkedList<>(); + + MultiBatchBuilder addBatch( + long firstOffset, + long lastOffset, + int deliveryState, + int deliveryCount + ) { + batchList.add(new PersisterStateBatch(firstOffset, lastOffset, (byte) deliveryState, (short) deliveryCount)); + return this; + } + + List build() { + return batchList; + } + } + + static MultiBatchBuilder multiBatch() { + return new MultiBatchBuilder(); + } + + @Override + public String toString() { + return this.testName; + } + } + + private static Stream generatorCornerCases() { + return Stream.of( + new BatchTestHolder( + "Current batches with start offset midway are pruned.", + BatchTestHolder.singleBatch(100, 130, 0, 1), + Collections.emptyList(), + BatchTestHolder.singleBatch(120, 130, 0, 1), + 120 + ), + + new BatchTestHolder( + "New batches with start offset midway are pruned.", + Collections.emptyList(), + BatchTestHolder.singleBatch(100, 130, 0, 1), + BatchTestHolder.singleBatch(120, 130, 0, 1), + 120 + ), + + new BatchTestHolder( + "Both current and new batches empty.", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 120 + ) + ); + } + + private static Stream generatorSameState() { + return Stream.of( + // same state + new BatchTestHolder( + "Same state. batchSoFar and newBatch have same first and last offset.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(100, 110, 0, 1), + -1 + ), + + new BatchTestHolder( + "Same state. batchSoFar and newBatch have same first offset, newBatch last offset strictly smaller.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(100, 105, 0, 1), + BatchTestHolder.singleBatch(100, 110, 0, 1), + -1 + ), + + new BatchTestHolder( + "Same state. batchSoFar and newBatch have same first offset, newBatch last offset strictly larger.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(100, 115, 0, 1), + BatchTestHolder.singleBatch(100, 115, 0, 1), + -1 + ), + + new BatchTestHolder( + "Same state. newBatch first offset strictly larger and last offset strictly smaller than batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(105, 108, 0, 1), + BatchTestHolder.singleBatch(100, 110, 0, 1), + -1, + true + ), + + new BatchTestHolder( + "Same state. newBatch first offset strictly larger and last offset equal to batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(105, 110, 0, 1), + BatchTestHolder.singleBatch(100, 110, 0, 1), + -1 + ), + + new BatchTestHolder( + "Same state. newBatch first offset strictly larger and last offset strictly larger than batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(105, 115, 0, 1), + BatchTestHolder.singleBatch(100, 115, 0, 1), + -1 + ), + + new BatchTestHolder( + "Same state. newBatch first offset is batchSoFar first offset + 1 (contiguous).", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(111, 115, 0, 1), + BatchTestHolder.singleBatch(100, 115, 0, 1), + -1 + ) + ); + } + + private static Stream generatorComplex() { + return Stream.of( + new BatchTestHolder( + "Handle overlapping batches in newBatches, same state", + BatchTestHolder.multiBatch() + .addBatch(100, 110, 0, 1) + .addBatch(121, 130, 0, 1) + .build(), + BatchTestHolder.multiBatch() + .addBatch(111, 119, 2, 2) + .addBatch(116, 123, 2, 2) // overlap with first batch + .build(), // , //[(111-123, 2, 2)] + BatchTestHolder.multiBatch() + .addBatch(100, 110, 0, 1) + .addBatch(111, 123, 2, 2) + .build(), + -1 + ), + + new BatchTestHolder( + "Handle overlapping batches in newBatches, different priority.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.multiBatch() + .addBatch(101, 105, 1, 2) + .addBatch(101, 115, 2, 2) + .addBatch(101, 120, 3, 2) //[(111-123, 2, 2)] + .build(), + BatchTestHolder.multiBatch() + .addBatch(100, 100, 0, 1) + .addBatch(101, 120, 3, 2) + .build(), + -1 + ), + + new BatchTestHolder( + "Handle overlapping batches in newBatches, with pruning.", + BatchTestHolder.multiBatch() + .addBatch(100, 110, 0, 1) // should get removed + .addBatch(121, 130, 0, 1) + .build(), + BatchTestHolder.multiBatch() + .addBatch(111, 119, 2, 2) + .addBatch(116, 123, 2, 2) // overlap with first batch //[(111-123, 2, 2)] + .build(), + BatchTestHolder.multiBatch() + .addBatch(120, 123, 2, 2) + .addBatch(124, 130, 0, 1) + .build(), + 120 + ), + + new BatchTestHolder( + "Multiple higher state batch updates.", + BatchTestHolder.singleBatch(111, 120, 0, 1), + BatchTestHolder.multiBatch() + .addBatch(111, 113, 0, 2) + .addBatch(114, 114, 2, 1) + .addBatch(115, 119, 0, 2) //[(111-123, 2, 2)] + .build(), + BatchTestHolder.multiBatch() + .addBatch(111, 113, 0, 2) + .addBatch(114, 114, 2, 1) + .addBatch(115, 119, 0, 2) + .addBatch(120, 120, 0, 1) + .build(), + -1 + ) + ); + } + + private static Stream generatorDifferentStates() { + return Stream.of( + // different states + new BatchTestHolder( + "newBatch higher state. newBatch first offset and last offset match batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(100, 110, 0, 2), + BatchTestHolder.singleBatch(100, 110, 0, 2), + -1 + ), + + new BatchTestHolder( + "newBatch lower state. newBatch first offset and last offset match batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 3), + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(100, 110, 0, 3), + -1 + ), + + new BatchTestHolder( + "newBatch higher state. newBatch first offset same and last offset smaller than batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(100, 105, 0, 2), + BatchTestHolder.multiBatch() + .addBatch(100, 105, 0, 2) + .addBatch(106, 110, 0, 1) + .build(), + -1 + ), + + new BatchTestHolder( + "newBatch lower state. newBatch first offset same and last offset smaller than batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 3), + BatchTestHolder.singleBatch(100, 105, 0, 1), + BatchTestHolder.singleBatch(100, 110, 0, 3), + -1 + ), + + new BatchTestHolder( + "newBatch higher state. newBatch first offset same and last offset strictly larger than batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(100, 115, 0, 2), + BatchTestHolder.singleBatch(100, 115, 0, 2), + -1 + ), + + new BatchTestHolder( + "newBatch lower state. newBatch first offset same and last offset strictly larger than batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 3), + BatchTestHolder.singleBatch(100, 115, 0, 1), + BatchTestHolder.multiBatch() + .addBatch(100, 110, 0, 3) + .addBatch(111, 115, 0, 1) + .build(), + -1 + ), + + new BatchTestHolder( + "newBatch higher state. newBatch first offset strictly larger and last offset strictly smaller than batchSoFar.", + BatchTestHolder.singleBatch(100, 115, 0, 1), + BatchTestHolder.singleBatch(105, 110, 1, 1), + BatchTestHolder.multiBatch() + .addBatch(100, 104, 0, 1) + .addBatch(105, 110, 1, 1) + .addBatch(111, 115, 0, 1) + .build(), + -1 + ), + + new BatchTestHolder( + "newBatch lower state. newBatch first offset strictly larger and last offset strictly smaller than batchSoFar.", + BatchTestHolder.singleBatch(100, 115, 1, 1), + BatchTestHolder.singleBatch(105, 110, 0, 1), + BatchTestHolder.singleBatch(100, 115, 1, 1), + -1 + ), + + new BatchTestHolder( + "newBatch higher state. newBatch first offset strictly larger and last offset same as batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(105, 110, 0, 2), + BatchTestHolder.multiBatch() + .addBatch(100, 104, 0, 1) + .addBatch(105, 110, 0, 2) + .build(), + -1 + ), + + new BatchTestHolder( + "newBatch lower state. newBatch first offset strictly larger and last offset same as batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 2), + BatchTestHolder.singleBatch(105, 110, 0, 1), + BatchTestHolder.singleBatch(100, 110, 0, 2), + -1 + ), + + new BatchTestHolder( + "newBatch higher state. newBatch first and last offsets strictly larger than batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 1), + BatchTestHolder.singleBatch(105, 115, 0, 2), + BatchTestHolder.multiBatch() + .addBatch(100, 104, 0, 1) + .addBatch(105, 115, 0, 2) + .build(), + -1 + ), + + new BatchTestHolder( + "newBatch lower state. newBatch first and last offsets strictly larger than batchSoFar.", + BatchTestHolder.singleBatch(100, 110, 0, 2), + BatchTestHolder.singleBatch(105, 115, 0, 1), + BatchTestHolder.multiBatch() + .addBatch(100, 110, 0, 2) + .addBatch(111, 115, 0, 1) + .build(), + -1 + ) + ); + } + + @ParameterizedTest + @MethodSource("generatorDifferentStates") + public void testStateBatchCombineDifferentStates(BatchTestHolder test) { + if (test.shouldRun) { + assertEquals(test.expectedResult, + new PersisterStateBatchCombiner( + test.batchesSoFar, + test.newBatches, + test.startOffset) + .combineStateBatches(), + test.testName + ); + } + } + + @ParameterizedTest + @MethodSource("generatorSameState") + public void testStateBatchCombineSameState(BatchTestHolder test) { + if (test.shouldRun) { + assertEquals(test.expectedResult, + new PersisterStateBatchCombiner( + test.batchesSoFar, + test.newBatches, + test.startOffset) + .combineStateBatches(), + test.testName + ); + } + } + + @ParameterizedTest + @MethodSource("generatorComplex") + public void testStateBatchCombineComplexCases(BatchTestHolder test) { + if (test.shouldRun) { + assertEquals(test.expectedResult, + new PersisterStateBatchCombiner( + test.batchesSoFar, + test.newBatches, + test.startOffset) + .combineStateBatches(), + test.testName + ); + } + } + + @ParameterizedTest + @MethodSource("generatorCornerCases") + public void testStateBatchCombineCornerCases(BatchTestHolder test) { + if (test.shouldRun) { + assertEquals(test.expectedResult, + new PersisterStateBatchCombiner( + test.batchesSoFar, + test.newBatches, + test.startOffset) + .combineStateBatches(), + test.testName + ); + } + } +} diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java index 70fcccb6f355..ee1547212fcf 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.coordinator.share.generated.ShareUpdateKey; import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.share.PersisterStateBatch; import org.junit.jupiter.api.Test; @@ -36,7 +37,7 @@ public void testNewShareSnapshotRecord() { String groupId = "test-group"; Uuid topicId = Uuid.randomUuid(); int partitionId = 1; - PersisterOffsetsStateBatch batch = new PersisterOffsetsStateBatch(1L, 10L, (byte) 0, (short) 1); + PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0, (short) 1); CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord( groupId, topicId, @@ -79,7 +80,7 @@ public void testNewShareUpdateRecord() { String groupId = "test-group"; Uuid topicId = Uuid.randomUuid(); int partitionId = 1; - PersisterOffsetsStateBatch batch = new PersisterOffsetsStateBatch(1L, 10L, (byte) 0, (short) 1); + PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0, (short) 1); CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( groupId, topicId, diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java index 50c0f5de3e21..15763d1af1f0 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.config.ShareCoordinatorConfig; +import org.apache.kafka.server.share.PersisterStateBatch; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.timeline.SnapshotRegistry; @@ -53,7 +54,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; @@ -312,7 +312,9 @@ public void testSubsequentWriteStateSnapshotEpochUpdatesSuccessfully() { assertEquals(incrementalUpdate.snapshotEpoch(), combinedState.snapshotEpoch()); assertEquals(incrementalUpdate.leaderEpoch(), combinedState.leaderEpoch()); assertEquals(incrementalUpdate.startOffset(), combinedState.startOffset()); - assertTrue(combinedState.stateBatchAsSet().containsAll(incrementalUpdate.stateBatchAsSet())); + // the batches should have combined to 1 since same state + assertEquals(Collections.singletonList(new PersisterStateBatch(0, 20, (byte) 0, (short) 1)), + combinedState.stateBatches()); assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey)); } @@ -773,8 +775,8 @@ public void testNonSequentialBatchUpdates() { .setStateEpoch(0) .setSnapshotEpoch(2) // since 2nd share snapshot .setStateBatches(Arrays.asList( - new PersisterOffsetsStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost - new PersisterOffsetsStateBatch(120, 129, (byte) 2, (short) 1) + new PersisterStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost + new PersisterStateBatch(120, 129, (byte) 2, (short) 1) )) .build(); List expectedRecordsFinal = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( diff --git a/share/src/main/java/org/apache/kafka/server/share/PersisterStateBatch.java b/share/src/main/java/org/apache/kafka/server/share/PersisterStateBatch.java index 5201d75545ad..95bee3e4ce35 100644 --- a/share/src/main/java/org/apache/kafka/server/share/PersisterStateBatch.java +++ b/share/src/main/java/org/apache/kafka/server/share/PersisterStateBatch.java @@ -25,11 +25,11 @@ /** * This class contains the information for a single batch of state information for use by the {@link Persister}. */ -public class PersisterStateBatch { +public class PersisterStateBatch implements Comparable { private final long firstOffset; private final long lastOffset; - private final byte deliveryState; private final short deliveryCount; + private final byte deliveryState; public PersisterStateBatch(long firstOffset, long lastOffset, byte deliveryState, short deliveryCount) { this.firstOffset = firstOffset; @@ -77,13 +77,13 @@ public boolean equals(Object o) { PersisterStateBatch that = (PersisterStateBatch) o; return firstOffset == that.firstOffset && lastOffset == that.lastOffset && - deliveryState == that.deliveryState && - deliveryCount == that.deliveryCount; + deliveryCount == that.deliveryCount && + deliveryState == that.deliveryState; } @Override public int hashCode() { - return Objects.hash(firstOffset, lastOffset, deliveryState, deliveryCount); + return Objects.hash(firstOffset, lastOffset, deliveryCount, deliveryState); } @Override @@ -91,8 +91,47 @@ public String toString() { return "PersisterStateBatch(" + "firstOffset=" + firstOffset + "," + "lastOffset=" + lastOffset + "," + - "deliveryState=" + deliveryState + "," + - "deliveryCount=" + deliveryCount + + "deliveryCount=" + deliveryCount + "," + + "deliveryState=" + deliveryState + ")"; } + + /** + * Compares 2 PersisterStateBatches in various dimensions. + * The priority of the dimensions are: + * - firstOffset + * - lastOffset + * - deliveryCount + * - deliveryState + *

+ * Does not check all dimensions in every case. The first dimension + * check resulting in non-zero comparison result is returned. + *

+ * In case the 2 objects are equal, all 4 dimension comparisons must + * be 0. + *

+ * This method could be used for storing PersisterStateBatch objects + * in containers which allow a Comparator argument or various sort algorithms + * in the java library. + * + * @param o - object representing another PersisterStateBatch + * @return -INT, 0, +INT based on "this" being smaller, equal or larger than the argument. + */ + @Override + public int compareTo(Object o) { + PersisterStateBatch that = (PersisterStateBatch) o; + int deltaFirst = Long.compare(this.firstOffset(), that.firstOffset()); + if (deltaFirst == 0) { + int deltaLast = Long.compare(this.lastOffset(), that.lastOffset()); + if (deltaLast == 0) { + int deltaCount = this.deliveryCount() - that.deliveryCount(); + if (deltaCount == 0) { + return Byte.compare(this.deliveryState(), that.deliveryState()); + } + return deltaCount; + } + return deltaLast; + } + return deltaFirst; + } } diff --git a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java index 8a2d7eb4f463..29ce801aff32 100644 --- a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java +++ b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java @@ -69,6 +69,14 @@ public static SharePartitionKey getInstance(String groupId, Uuid topicId, int pa return new SharePartitionKey(groupId, topicId, partition); } + public String asCoordinatorKey() { + return asCoordinatorKey(groupId(), topicId(), partition()); + } + + public static String asCoordinatorKey(String groupId, Uuid topicId, int partition) { + return String.format("%s:%s:%d", groupId, topicId, partition); + } + @Override public boolean equals(final Object obj) { if (this == obj)