Skip to content

[Refactor] Introduce StreamMetadata to group partition metadata by stream#17811

Open
xiangfu0 wants to merge 2 commits intoapache:masterfrom
xiangfu0:refactor-list-PartitionGroupMetadata-to-list-StreamMetadata
Open

[Refactor] Introduce StreamMetadata to group partition metadata by stream#17811
xiangfu0 wants to merge 2 commits intoapache:masterfrom
xiangfu0:refactor-list-PartitionGroupMetadata-to-list-StreamMetadata

Conversation

@xiangfu0
Copy link
Contributor

@xiangfu0 xiangfu0 commented Mar 4, 2026

Summary

  • Introduces a new StreamMetadata class in pinot-spi that groups PartitionGroupMetadata per stream, replacing the flat List<PartitionGroupMetadata> pattern
  • Each StreamMetadata carries the StreamConfig, stream config index, explicit partition count, and the list of partition group metadata for that stream
  • Adds sequenceNumber field to PartitionGroupMetadata to carry segment sequence numbers, replacing Pair<PartitionGroupMetadata, Integer> pattern
  • Keeps deprecated getPartitionGroupMetadataList() on PartitionGroupMetadataFetcher for backward compatibility (to be removed after 1.5.0)
  • Eliminates the need for callers to decode Pinot partition IDs (via the streamIndex * 10000 + streamPartitionId padding scheme) to determine stream ownership
  • Updates PartitionGroupMetadataFetcher, PinotTableIdealStateBuilder, PinotLLCRealtimeSegmentManager, PinotHelixResourceManager, PinotTableRestletResource, and MissingConsumingSegmentFinder to use the new grouped type
  • setUpNewTable now accepts List<StreamMetadata> instead of List<Pair<PartitionGroupMetadata, Integer>>
  • addTable in PinotHelixResourceManager updated to accept List<StreamMetadata>
  • Copy table endpoint updated to group watermarks by stream before constructing StreamMetadata

Test plan

  • PartitionGroupMetadataFetcherTest — 4/4 tests pass (updated to verify per-stream grouping)
  • PinotLLCRealtimeSegmentManagerTest — 33/33 tests pass (updated FakePinotLLCRealtimeSegmentManager overrides)
  • MissingConsumingSegmentFinderTest — 6/6 tests pass
  • PinotHelixResourceManagerStatelessTest — updated to use StreamMetadata with sequence numbers

🤖 Generated with Claude Code

Replace the flat List<PartitionGroupMetadata> pattern with
List<StreamMetadata>, where each StreamMetadata groups partition
metadata for a single stream along with its StreamConfig and index.
This makes stream membership explicit and eliminates the need for
callers to decode partition IDs to determine stream ownership.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@xiangfu0 xiangfu0 changed the title Introduce StreamMetadata to group partition metadata by stream [Refactor] Introduce StreamMetadata to group partition metadata by stream Mar 4, 2026
- Create StreamMetadata class to replace flat List<PartitionGroupMetadata>
  that mixed partitions from all streams together
- StreamMetadata groups partition metadata per stream with StreamConfig,
  streamConfigIndex, and explicit partitionCount
- Add sequenceNumber field to PartitionGroupMetadata to carry segment
  sequence numbers (replacing Pair<PartitionGroupMetadata, Integer>)
- Update PartitionGroupMetadataFetcher to produce List<StreamMetadata>
- Update PinotLLCRealtimeSegmentManager: rename to getNewStreamMetadataList,
  change setUpNewTable to accept List<StreamMetadata>
- Update PinotHelixResourceManager.addTable to accept List<StreamMetadata>
- Update PinotTableRestletResource copy table to group watermarks by stream
- Update MissingConsumingSegmentFinder to use grouped structure
- Update all related tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@xiangfu0 xiangfu0 force-pushed the refactor-list-PartitionGroupMetadata-to-list-StreamMetadata branch from b9c7083 to ec81d6e Compare March 4, 2026 03:07
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors Pinot’s real-time ingestion metadata flow by introducing StreamMetadata (grouping partition group metadata per stream) and by embedding segment sequenceNumber directly into PartitionGroupMetadata, removing the need for Pair<PartitionGroupMetadata, Integer> and reducing reliance on decoding padded partition IDs.

Changes:

  • Added StreamMetadata (SPI) and updated metadata fetch/build paths to return List<StreamMetadata> instead of a flat List<PartitionGroupMetadata>.
  • Added sequenceNumber to PartitionGroupMetadata and updated controller code paths that previously carried sequence numbers via Pair.
  • Updated controller APIs and endpoints (e.g., addTable, setUpNewTable, copy-table) and adjusted tests accordingly.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java New container type for per-stream partition group metadata.
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java Fetcher now produces grouped StreamMetadata results.
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java Adds sequenceNumber to carry segment sequence numbers directly.
pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java Updates assertions to validate per-stream grouping.
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java Switches to returning List<StreamMetadata> from fetcher.
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java Uses StreamMetadata and reads sequence numbers from PartitionGroupMetadata.
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java Adapts to grouped stream metadata when building offset maps.
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java addTable now accepts List<StreamMetadata> for designated start offsets/sequence numbers.
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java Copy-table endpoint groups watermarks by stream and constructs StreamMetadata.
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java Updates tests and fakes to use StreamMetadata.
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java Updates designated-consuming-metadata test to use sequence numbers in PartitionGroupMetadata.

Comment on lines 90 to 92
+ topicName;
StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log messages in this method still refer to "PartitionGroupMetadata" even though the fetcher now returns per-stream StreamMetadata (and the list is wrapped into StreamMetadata immediately above). Please update the log message to avoid misleading operators when debugging metadata fetches.

Copilot uses AI. Check for mistakes.
Comment on lines 87 to 91
String topicName = streamConfig.getTopicName();
String clientId =
PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
+ topicName;
StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failure logs in this method still say "Could not get partition count" / "retrieved PartitionGroupMetadata", but the code path is now computing and returning StreamMetadata (wrapping the PartitionGroupMetadata list into StreamMetadata). Please update these log messages to reflect the new semantics so operational debugging/alerts remain accurate.

Copilot uses AI. Check for mistakes.
Comment on lines 133 to 136
.filter(partitionGroupConsumptionStatus -> IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
partitionGroupConsumptionStatus.getPartitionGroupId()) == index)
.collect(Collectors.toList());
try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When remapping PartitionGroupMetadata for multi-stream tables, this code reconstructs a new PartitionGroupMetadata but only copies partitionGroupId and startOffset, dropping the new sequenceNumber field. Even if current providers always use the default, preserving sequenceNumber here (or explicitly setting it) will prevent subtle bugs if any provider/caller starts populating it.

Copilot uses AI. Check for mistakes.
Comment on lines +104 to 106
LOGGER.error("Could not get StreamMetadata for topic: {} of table: {}",
streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).reduce((a, b) -> a + "," + b),
tableNameWithType, fetcherException);
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logger argument for the topic list uses streamConfigs.stream()...reduce(...), which returns an Optional and will log as Optional[...] (and can be empty). Consider using Collectors.joining(",") (or map(...).collect(joining(","))) so the log message is clean and reliably formatted.

Copilot uses AI. Check for mistakes.
@codecov-commenter
Copy link

codecov-commenter commented Mar 4, 2026

Codecov Report

❌ Patch coverage is 76.34409% with 22 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.25%. Comparing base (5df2ffb) to head (ec81d6e).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
...oller/api/resources/PinotTableRestletResource.java 0.00% 12 Missing ⚠️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 92.30% 3 Missing ⚠️
...inot/spi/stream/PartitionGroupMetadataFetcher.java 78.57% 3 Missing ⚠️
...ntroller/helix/core/PinotHelixResourceManager.java 60.00% 0 Missing and 2 partials ⚠️
...roller/helix/core/PinotTableIdealStateBuilder.java 50.00% 1 Missing ⚠️
...va/org/apache/pinot/spi/stream/StreamMetadata.java 90.90% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17811      +/-   ##
============================================
+ Coverage     63.23%   63.25%   +0.01%     
  Complexity     1456     1456              
============================================
  Files          3186     3187       +1     
  Lines        191613   191648      +35     
  Branches      29314    29318       +4     
============================================
+ Hits         121165   121219      +54     
+ Misses        60967    60947      -20     
- Partials       9481     9482       +1     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (?)
java-11 63.22% <76.34%> (+0.01%) ⬆️
java-21 63.20% <76.34%> (+<0.01%) ⬆️
temurin 63.25% <76.34%> (+0.01%) ⬆️
unittests 63.24% <76.34%> (+0.01%) ⬆️
unittests1 55.63% <80.00%> (+0.03%) ⬆️
unittests2 34.14% <72.04%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants