Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,8 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamConf

// Smallest offset is fetched from stream once and cached in partitionIdToSmallestOffset.
if (partitionIdToSmallestOffset == null) {
partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState);
partitionIdToSmallestOffset =
fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState, latestSegmentZKMetadataMap);
}

// Do not create new CONSUMING segment when the stream partition has reached end of life.
Expand Down Expand Up @@ -1858,11 +1859,13 @@ private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig str
}

private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(List<StreamConfig> streamConfigs,
IdealState idealState) {
IdealState idealState, Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap) {
// Build consumption status from pre-computed ZK metadata map instead of rescanning IdealState (O(1) vs O(N))
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
buildPartitionGroupConsumptionStatusFromZKMetadata(latestSegmentZKMetadataMap, streamConfigs);

Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>();
for (StreamConfig streamConfig : streamConfigs) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);

Expand All @@ -1884,6 +1887,51 @@ private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOf
return partitionGroupIdToSmallestOffset;
}

/**
* Builds {@link PartitionGroupConsumptionStatus} list from the pre-computed latest segment ZK metadata map,
* avoiding an O(N) scan of all IdealState segments that {@link #getPartitionGroupConsumptionStatusList} performs.
*/
@VisibleForTesting
List<PartitionGroupConsumptionStatus> buildPartitionGroupConsumptionStatusFromZKMetadata(
Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap, List<StreamConfig> streamConfigs) {
List<PartitionGroupConsumptionStatus> result = new ArrayList<>(latestSegmentZKMetadataMap.size());
int numStreams = streamConfigs.size();
if (numStreams == 1) {
StreamPartitionMsgOffsetFactory offsetFactory =
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
for (Map.Entry<Integer, SegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
int partitionGroupId = entry.getKey();
SegmentZKMetadata zkMetadata = entry.getValue();
LLCSegmentName llcSegmentName = new LLCSegmentName(zkMetadata.getSegmentName());
result.add(new PartitionGroupConsumptionStatus(partitionGroupId, llcSegmentName.getSequenceNumber(),
offsetFactory.create(zkMetadata.getStartOffset()),
zkMetadata.getEndOffset() != null ? offsetFactory.create(zkMetadata.getEndOffset()) : null,
zkMetadata.getStatus().toString()));
}
} else {
StreamPartitionMsgOffsetFactory[] offsetFactories = new StreamPartitionMsgOffsetFactory[numStreams];
for (Map.Entry<Integer, SegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
int partitionGroupId = entry.getKey();
int index = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId);
int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId);
SegmentZKMetadata zkMetadata = entry.getValue();
LLCSegmentName llcSegmentName = new LLCSegmentName(zkMetadata.getSegmentName());
StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index];
if (offsetFactory == null) {
offsetFactory =
StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory();
offsetFactories[index] = offsetFactory;
}
result.add(new PartitionGroupConsumptionStatus(partitionGroupId, streamPartitionId,
llcSegmentName.getSequenceNumber(),
offsetFactory.create(zkMetadata.getStartOffset()),
zkMetadata.getEndOffset() != null ? offsetFactory.create(zkMetadata.getEndOffset()) : null,
zkMetadata.getStatus().toString()));
}
}
return result;
}

private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria, int partitionGroupId,
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset,
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset, String tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -1643,6 +1644,76 @@ public void testGetPartitionIds()
Assert.assertEquals(partitionIds.size(), 2);
}

/**
* Verifies that {@code buildPartitionGroupConsumptionStatusFromZKMetadata} produces the same results as
* {@code getPartitionGroupConsumptionStatusList} for the common case where IdealState and ZK metadata are in sync.
* This validates that the optimization in {@code fetchPartitionGroupIdToSmallestOffset} (reusing the pre-computed
* latestSegmentZKMetadataMap instead of rescanning the entire IdealState) does not change behavior.
*/
@Test
public void testBuildPartitionGroupConsumptionStatusFromZKMetadataMatchesOriginal() {
// Set up a table with 2 replicas, 5 instances, 4 partitions
FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager();
setUpNewTable(segmentManager, 2, 5, 4);

// Commit segments for partitions 0 and 1 to get a mix of ONLINE (DONE) and CONSUMING (IN_PROGRESS) segments
for (int partitionGroupId = 0; partitionGroupId < 2; partitionGroupId++) {
String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionGroupId, 0, CURRENT_TIME_MS).getSegmentName();
CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(segmentName,
new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
}

// Build latestSegmentZKMetadataMap from the fake ZK metadata (same logic as getLatestSegmentZKMetadataMap)
Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = new HashMap<>();
for (Map.Entry<String, SegmentZKMetadata> entry : segmentManager._segmentZKMetadataMap.entrySet()) {
LLCSegmentName llcSegmentName = new LLCSegmentName(entry.getKey());
int partitionId = llcSegmentName.getPartitionGroupId();
latestSegmentZKMetadataMap.merge(partitionId, entry.getValue(),
(existing, candidate) -> {
int existingSeq = new LLCSegmentName(existing.getSegmentName()).getSequenceNumber();
int candidateSeq = new LLCSegmentName(candidate.getSegmentName()).getSequenceNumber();
return candidateSeq > existingSeq ? candidate : existing;
});
}

// Get results from both methods
List<PartitionGroupConsumptionStatus> fromIdealState =
segmentManager.getPartitionGroupConsumptionStatusList(segmentManager._idealState,
segmentManager._streamConfigs);
List<PartitionGroupConsumptionStatus> fromZKMetadata =
segmentManager.buildPartitionGroupConsumptionStatusFromZKMetadata(latestSegmentZKMetadataMap,
segmentManager._streamConfigs);

// Sort both by partition group id for comparison
fromIdealState.sort(Comparator.comparingInt(PartitionGroupConsumptionStatus::getPartitionGroupId));
fromZKMetadata.sort(Comparator.comparingInt(PartitionGroupConsumptionStatus::getPartitionGroupId));

// Verify same number of partitions
assertEquals(fromIdealState.size(), fromZKMetadata.size(),
"Both methods should return the same number of partitions");

// Verify each partition has identical consumption status
for (int i = 0; i < fromIdealState.size(); i++) {
PartitionGroupConsumptionStatus isStatus = fromIdealState.get(i);
PartitionGroupConsumptionStatus zkStatus = fromZKMetadata.get(i);

assertEquals(zkStatus.getPartitionGroupId(), isStatus.getPartitionGroupId(),
"Partition group id mismatch at index " + i);
assertEquals(zkStatus.getSequenceNumber(), isStatus.getSequenceNumber(),
"Sequence number mismatch for partition " + isStatus.getPartitionGroupId());
assertEquals(zkStatus.getStartOffset().toString(), isStatus.getStartOffset().toString(),
"Start offset mismatch for partition " + isStatus.getPartitionGroupId());
String zkEnd = zkStatus.getEndOffset() != null ? zkStatus.getEndOffset().toString() : null;
String isEnd = isStatus.getEndOffset() != null ? isStatus.getEndOffset().toString() : null;
assertEquals(zkEnd, isEnd,
"End offset mismatch for partition " + isStatus.getPartitionGroupId());
assertEquals(zkStatus.getStatus(), isStatus.getStatus(),
"Status mismatch for partition " + isStatus.getPartitionGroupId());
}
}

@Test
public void testReduceSegmentSizeAndReset() {
// Set up a new table with 2 replicas, 5 instances, 4 partitions
Expand Down
Loading