[Refactor] Introduce StreamMetadata to group partition metadata by stream#17811
[Refactor] Introduce StreamMetadata to group partition metadata by stream#17811xiangfu0 wants to merge 2 commits intoapache:masterfrom
Conversation
Replace the flat List<PartitionGroupMetadata> pattern with List<StreamMetadata>, where each StreamMetadata groups partition metadata for a single stream along with its StreamConfig and index. This makes stream membership explicit and eliminates the need for callers to decode partition IDs to determine stream ownership. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Create StreamMetadata class to replace flat List<PartitionGroupMetadata> that mixed partitions from all streams together - StreamMetadata groups partition metadata per stream with StreamConfig, streamConfigIndex, and explicit partitionCount - Add sequenceNumber field to PartitionGroupMetadata to carry segment sequence numbers (replacing Pair<PartitionGroupMetadata, Integer>) - Update PartitionGroupMetadataFetcher to produce List<StreamMetadata> - Update PinotLLCRealtimeSegmentManager: rename to getNewStreamMetadataList, change setUpNewTable to accept List<StreamMetadata> - Update PinotHelixResourceManager.addTable to accept List<StreamMetadata> - Update PinotTableRestletResource copy table to group watermarks by stream - Update MissingConsumingSegmentFinder to use grouped structure - Update all related tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
b9c7083 to
ec81d6e
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors Pinot’s real-time ingestion metadata flow by introducing StreamMetadata (grouping partition group metadata per stream) and by embedding segment sequenceNumber directly into PartitionGroupMetadata, removing the need for Pair<PartitionGroupMetadata, Integer> and reducing reliance on decoding padded partition IDs.
Changes:
- Added
StreamMetadata(SPI) and updated metadata fetch/build paths to returnList<StreamMetadata>instead of a flatList<PartitionGroupMetadata>. - Added
sequenceNumbertoPartitionGroupMetadataand updated controller code paths that previously carried sequence numbers viaPair. - Updated controller APIs and endpoints (e.g.,
addTable,setUpNewTable, copy-table) and adjusted tests accordingly.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java | New container type for per-stream partition group metadata. |
| pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java | Fetcher now produces grouped StreamMetadata results. |
| pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java | Adds sequenceNumber to carry segment sequence numbers directly. |
| pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java | Updates assertions to validate per-stream grouping. |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java | Switches to returning List<StreamMetadata> from fetcher. |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java | Uses StreamMetadata and reads sequence numbers from PartitionGroupMetadata. |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java | Adapts to grouped stream metadata when building offset maps. |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java | addTable now accepts List<StreamMetadata> for designated start offsets/sequence numbers. |
| pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java | Copy-table endpoint groups watermarks by stream and constructs StreamMetadata. |
| pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java | Updates tests and fakes to use StreamMetadata. |
| pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java | Updates designated-consuming-metadata test to use sequence numbers in PartitionGroupMetadata. |
| + topicName; | ||
| StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); | ||
| try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( |
There was a problem hiding this comment.
Log messages in this method still refer to "PartitionGroupMetadata" even though the fetcher now returns per-stream StreamMetadata (and the list is wrapped into StreamMetadata immediately above). Please update the log message to avoid misleading operators when debugging metadata fetches.
| String topicName = streamConfig.getTopicName(); | ||
| String clientId = | ||
| PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" | ||
| + topicName; | ||
| StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); |
There was a problem hiding this comment.
The failure logs in this method still say "Could not get partition count" / "retrieved PartitionGroupMetadata", but the code path is now computing and returning StreamMetadata (wrapping the PartitionGroupMetadata list into StreamMetadata). Please update these log messages to reflect the new semantics so operational debugging/alerts remain accurate.
| .filter(partitionGroupConsumptionStatus -> IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId( | ||
| partitionGroupConsumptionStatus.getPartitionGroupId()) == index) | ||
| .collect(Collectors.toList()); | ||
| try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( |
There was a problem hiding this comment.
When remapping PartitionGroupMetadata for multi-stream tables, this code reconstructs a new PartitionGroupMetadata but only copies partitionGroupId and startOffset, dropping the new sequenceNumber field. Even if current providers always use the default, preserving sequenceNumber here (or explicitly setting it) will prevent subtle bugs if any provider/caller starts populating it.
| LOGGER.error("Could not get StreamMetadata for topic: {} of table: {}", | ||
| streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).reduce((a, b) -> a + "," + b), | ||
| tableNameWithType, fetcherException); |
There was a problem hiding this comment.
The logger argument for the topic list uses streamConfigs.stream()...reduce(...), which returns an Optional and will log as Optional[...] (and can be empty). Consider using Collectors.joining(",") (or map(...).collect(joining(","))) so the log message is clean and reliably formatted.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17811 +/- ##
============================================
+ Coverage 63.23% 63.25% +0.01%
Complexity 1456 1456
============================================
Files 3186 3187 +1
Lines 191613 191648 +35
Branches 29314 29318 +4
============================================
+ Hits 121165 121219 +54
+ Misses 60967 60947 -20
- Partials 9481 9482 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary
StreamMetadataclass inpinot-spithat groupsPartitionGroupMetadataper stream, replacing the flatList<PartitionGroupMetadata>patternStreamMetadatacarries theStreamConfig, stream config index, explicit partition count, and the list of partition group metadata for that streamsequenceNumberfield toPartitionGroupMetadatato carry segment sequence numbers, replacingPair<PartitionGroupMetadata, Integer>patterngetPartitionGroupMetadataList()onPartitionGroupMetadataFetcherfor backward compatibility (to be removed after 1.5.0)streamIndex * 10000 + streamPartitionIdpadding scheme) to determine stream ownershipPartitionGroupMetadataFetcher,PinotTableIdealStateBuilder,PinotLLCRealtimeSegmentManager,PinotHelixResourceManager,PinotTableRestletResource, andMissingConsumingSegmentFinderto use the new grouped typesetUpNewTablenow acceptsList<StreamMetadata>instead ofList<Pair<PartitionGroupMetadata, Integer>>addTableinPinotHelixResourceManagerupdated to acceptList<StreamMetadata>StreamMetadataTest plan
PartitionGroupMetadataFetcherTest— 4/4 tests pass (updated to verify per-stream grouping)PinotLLCRealtimeSegmentManagerTest— 33/33 tests pass (updatedFakePinotLLCRealtimeSegmentManageroverrides)MissingConsumingSegmentFinderTest— 6/6 tests passPinotHelixResourceManagerStatelessTest— updated to useStreamMetadatawith sequence numbers🤖 Generated with Claude Code