Add Kafka ingestion support for subset partitions#17587
Add Kafka ingestion support for subset partitions#17587xiangfu0 wants to merge 1 commit intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for configuring Kafka ingestion to consume only a subset of topic partitions via the stream.kafka.partition.ids configuration property. This enables multiple tables to share a single Kafka topic by consuming different partitions.
Changes:
- Added
stream.kafka.partition.idsconfiguration property and parsing utilities - Modified Kafka metadata providers (2.0 and 3.0) to validate and respect partition subsets
- Updated instance assignment logic to support non-contiguous partition IDs
- Added comprehensive unit tests and example configurations
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-kafka-base/KafkaStreamConfigProperties.java | Defines new PARTITION_IDS constant for subset configuration |
| pinot-kafka-base/KafkaPartitionSubsetUtils.java | Implements parsing, validation, and deduplication of partition ID lists |
| pinot-kafka-base/KafkaPartitionSubsetUtilsTest.java | Comprehensive unit tests for partition ID parsing |
| pinot-kafka-base/KafkaPartitionLevelStreamConfig.java | Exposes stream config map for partition subset utilities |
| pinot-kafka-2.0/KafkaStreamMetadataProvider.java | Overrides partition methods to validate and return subset partitions |
| pinot-kafka-3.0/KafkaStreamMetadataProvider.java | Mirrors 2.0 implementation for Kafka 3.0 compatibility |
| pinot-kafka-2.0/KafkaPartitionLevelConsumerTest.java | Tests subset validation and partition count/ID fetching |
| pinot-kafka-2.0/README.md | Documents the partition subset feature |
| InstanceReplicaGroupPartitionSelector.java | Supports explicit partition IDs in instance assignment |
| ImplicitRealtimeTablePartitionSelector.java | Fetches and uses stream partition IDs for instance assignment |
| RealtimeSegmentAssignment.java | Updates segment assignment to handle non-contiguous partition IDs |
| InstanceAssignmentTest.java | Tests single-partition subset with non-zero ID |
| QuickStartBase.java | Adds fineFoodReviews-part-0 and fineFoodReviews-part-1 examples |
| examples/stream/subsetPartitions/* | Example configuration and documentation |
| examples/stream/fineFoodReviews-part-/ | Demo tables consuming single partitions |
...e/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
Outdated
Show resolved
Hide resolved
...e/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
Outdated
Show resolved
Hide resolved
❌ 1 Tests Failed:
View the top 3 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
ab92eca to
3760d06
Compare
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
.../pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java
Outdated
Show resolved
Hide resolved
3760d06 to
4235903
Compare
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...e/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
Outdated
Show resolved
Hide resolved
0379c91 to
0374b0b
Compare
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...e/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
Outdated
Show resolved
Hide resolved
...e/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
Outdated
Show resolved
Hide resolved
...e/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
Outdated
Show resolved
Hide resolved
9220944 to
d045fa9
Compare
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtils.java
Outdated
Show resolved
Hide resolved
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
03fed47 to
3855de1
Compare
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
3855de1 to
548b943
Compare
...kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtils.java
Show resolved
Hide resolved
...ka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
Show resolved
Hide resolved
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Show resolved
Hide resolved
...ka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
b116759 to
8566b36
Compare
...base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java
Show resolved
Hide resolved
...ka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
Show resolved
Hide resolved
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Show resolved
Hide resolved
8566b36 to
7ba5447
Compare
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Show resolved
Hide resolved
...ka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
Outdated
Show resolved
Hide resolved
.../pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java
Outdated
Show resolved
Hide resolved
...e/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
Show resolved
Hide resolved
...he/pinot/integration/tests/logicaltable/LogicalTableWithTwoRealtimeTableIntegrationTest.java
Show resolved
Hide resolved
bac1ecf to
32fa5a1
Compare
.../pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java
Outdated
Show resolved
Hide resolved
32fa5a1 to
159ba84
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Discussed offline. Changing InstancePartitions format can be risky because it is involved in partition handling, where numPartitions is used.
Alternatively, we can use a different setup to avoid changing it. We can make multiple tables share the same InstancePartition, where each of them consume a subset of partitions. This way the only change needed is on segment creation side, where we only create segment for the configured partitions. Other part automatically works.
49b4b3e to
5f1d26f
Compare
702cbdf to
ad40be3
Compare
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
a5e8de3 to
c7bace2
Compare
| streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater); | ||
| InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); | ||
| int numPartitionGroups = consumeMeta.size(); | ||
| int numPartitionGroups = getPartitionCountForRouting(streamConfigs, consumeMeta.size()); |
There was a problem hiding this comment.
Can you calculate this on the caller side and pass it in? This info should be available when calculating the consumeMeta
c7bace2 to
3b2f105
Compare
Allow a realtime table to consume only a subset of Kafka partitions by configuring `stream.kafka.partition.ids` (e.g. "0,2,5"). This enables splitting a single Kafka topic across multiple Pinot tables for independent scaling and isolation. Key changes: - Add KafkaPartitionSubsetUtils to parse and validate partition ID configuration from StreamConfig - Update KafkaStreamMetadataProvider (kafka30/kafka40) to filter partitions and offsets based on the configured subset while still returning the total Kafka partition count for instance assignment - Update PinotLLCRealtimeSegmentManager to use total partition count from instance partitions for segment ZK metadata, ensuring correct broker query routing across subset tables - Add QuickStart example with two subset tables splitting a 2-partition topic (fineFoodReviews_part_0 and fineFoodReviews_part_1) - Add unit tests, integration tests, and a chaos integration test validating partition assignment, segment creation, and query routing
3b2f105 to
ae62d10
Compare
Summary
Add support for Kafka partition-subset realtime ingestion so Pinot can assign and consume only selected topic partitions for a table.
Changes
stream.kafka.partition.idsparser/validation utilities inpinot-kafka-baseto interpret configured partition subsets.pinot-kafka-3.0andpinot-kafka-4.0) to:fineFoodReviews-part-0fineFoodReviews-part-1