Skip to content

Commit

Permalink
KAFKA-16726: Added share.auto.offset.reset dynamic config for share g…
Browse files Browse the repository at this point in the history
…roups (apache#17573)

This PR adds another dynamic config share.auto.offset.reset fir share groups.


Reviewers:  Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>,  Abhinav Dixit <[email protected]>, Manikumar Reddy <[email protected]>
  • Loading branch information
chirag-wadhwa5 authored Nov 11, 2024
1 parent 7e5ffd3 commit 9db5ed0
Show file tree
Hide file tree
Showing 12 changed files with 889 additions and 164 deletions.
24 changes: 23 additions & 1 deletion core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import kafka.cluster.Partition;
import kafka.server.ReplicaManager;

import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
Expand All @@ -40,6 +42,7 @@
import java.util.Optional;

import scala.Option;
import scala.Some;

/**
* Utility class for post-processing of share fetch operations.
Expand Down Expand Up @@ -125,7 +128,26 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
if (timestampAndOffset.isEmpty()) {
throw new OffsetNotAvailableException("offset for Earliest timestamp not found for topic partition: " + topicIdPartition);
}
return timestampAndOffset.get().offset;
}

/**
* The method is used to get the offset for the latest timestamp for the topic-partition.
*
* @return The offset for the latest timestamp.
*/
static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager) {
// Isolation level is set to READ_UNCOMMITTED, matching with that used in share fetch requests
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED),
Optional.empty(), true).timestampAndOffsetOpt();
if (timestampAndOffset.isEmpty()) {
throw new OffsetNotAvailableException("offset for Latest timestamp not found for topic partition: " + topicIdPartition);
}
return timestampAndOffset.get().offset;
}

static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
Expand Down
31 changes: 28 additions & 3 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
Expand Down Expand Up @@ -72,6 +73,9 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;

/**
* The SharePartition is used to track the state of a partition that is shared between multiple
* consumers. The class maintains the state of the records that have been fetched from the leader
Expand Down Expand Up @@ -421,8 +425,12 @@ public CompletableFuture<Void> maybeInitialize() {
return;
}

// Set the state epoch and end offset from the persisted state.
startOffset = partitionData.startOffset() != -1 ? partitionData.startOffset() : 0;
try {
startOffset = startOffsetDuringInitialization(partitionData.startOffset());
} catch (Exception e) {
completeInitializationWithException(future, e);
return;
}
stateEpoch = partitionData.stateEpoch();

List<PersisterStateBatch> stateBatches = partitionData.stateBatches();
Expand All @@ -448,7 +456,7 @@ public CompletableFuture<Void> maybeInitialize() {
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
} else {
updateEndOffsetAndResetFetchOffsetMetadata(partitionData.startOffset());
updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
Expand Down Expand Up @@ -2058,6 +2066,23 @@ private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
}
}

private long startOffsetDuringInitialization(long partitionDataStartOffset) throws Exception {
// Set the state epoch and end offset from the persisted state.
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
return partitionDataStartOffset;
}
GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy;
if (groupConfigManager.groupConfig(groupId).isPresent()) {
offsetResetStrategy = groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
} else {
offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
}

if (offsetResetStrategy == GroupConfig.ShareGroupAutoOffsetReset.EARLIEST)
return offsetForEarliestTimestamp(topicIdPartition, replicaManager);
return offsetForLatestTimestamp(topicIdPartition, replicaManager);
}

// Visible for testing. Should only be used for testing purposes.
NavigableMap<Long, InFlightBatch> cachedState() {
return new ConcurrentSkipListMap<>(cachedState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka.server.share;

import kafka.cluster.Partition;
import kafka.log.OffsetResultHolder;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
Expand All @@ -42,6 +43,7 @@
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ShareFetchRequest;
Expand Down Expand Up @@ -1040,6 +1042,8 @@ public void testMultipleSequentialShareFetches() {
partitionMaxBytes.put(tp5, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);

mockFetchOffsetForTimestamp(mockReplicaManager);

Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L);
Metrics metrics = new Metrics();
Expand Down Expand Up @@ -1109,6 +1113,9 @@ public void testMultipleConcurrentShareFetches() throws InterruptedException {
partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES);

final Time time = new MockTime(0, System.currentTimeMillis(), 0);

mockFetchOffsetForTimestamp(mockReplicaManager);

DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
Expand Down Expand Up @@ -1233,6 +1240,8 @@ public void testReplicaManagerFetchShouldProceed() {
TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);

mockFetchOffsetForTimestamp(mockReplicaManager);

DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
Expand Down Expand Up @@ -2482,6 +2491,12 @@ private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPart
});
}

private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) {
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
}

static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>();
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
Expand Down
Loading

0 comments on commit 9db5ed0

Please sign in to comment.