Skip to content

Add Kafka ingestion support for subset partitions#17587

Open
xiangfu0 wants to merge 1 commit intoapache:masterfrom
xiangfu0:kafka-subset-partitions
Open

Add Kafka ingestion support for subset partitions#17587
xiangfu0 wants to merge 1 commit intoapache:masterfrom
xiangfu0:kafka-subset-partitions

Conversation

@xiangfu0
Copy link
Contributor

@xiangfu0 xiangfu0 commented Jan 27, 2026

Summary

Add support for Kafka partition-subset realtime ingestion so Pinot can assign and consume only selected topic partitions for a table.

Changes

  • Add stream.kafka.partition.ids parser/validation utilities in pinot-kafka-base to interpret configured partition subsets.
  • Update controller assignment logic (segment and instance selectors) to support partition-group assignment across subset partitions.
  • Update Kafka metadata providers (pinot-kafka-3.0 and pinot-kafka-4.0) to:
    • honor configured partition subsets in partition counts/group metadata
    • validate configured IDs against topic metadata
    • support stable subset-based partition-group behavior
  • Add unit tests for subset parsing and Kafka metadata-provider partition selection.
  • Add quickstart examples for split-topic ingestion:
    • fineFoodReviews-part-0
    • fineFoodReviews-part-1

@xiangfu0 xiangfu0 requested a review from Copilot January 27, 2026 16:30
@xiangfu0 xiangfu0 added feature release-notes Referenced by PRs that need attention when compiling the next release notes kafka ingestion labels Jan 27, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for configuring Kafka ingestion to consume only a subset of topic partitions via the stream.kafka.partition.ids configuration property. This enables multiple tables to share a single Kafka topic by consuming different partitions.

Changes:

  • Added stream.kafka.partition.ids configuration property and parsing utilities
  • Modified Kafka metadata providers (2.0 and 3.0) to validate and respect partition subsets
  • Updated instance assignment logic to support non-contiguous partition IDs
  • Added comprehensive unit tests and example configurations

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pinot-kafka-base/KafkaStreamConfigProperties.java Defines new PARTITION_IDS constant for subset configuration
pinot-kafka-base/KafkaPartitionSubsetUtils.java Implements parsing, validation, and deduplication of partition ID lists
pinot-kafka-base/KafkaPartitionSubsetUtilsTest.java Comprehensive unit tests for partition ID parsing
pinot-kafka-base/KafkaPartitionLevelStreamConfig.java Exposes stream config map for partition subset utilities
pinot-kafka-2.0/KafkaStreamMetadataProvider.java Overrides partition methods to validate and return subset partitions
pinot-kafka-3.0/KafkaStreamMetadataProvider.java Mirrors 2.0 implementation for Kafka 3.0 compatibility
pinot-kafka-2.0/KafkaPartitionLevelConsumerTest.java Tests subset validation and partition count/ID fetching
pinot-kafka-2.0/README.md Documents the partition subset feature
InstanceReplicaGroupPartitionSelector.java Supports explicit partition IDs in instance assignment
ImplicitRealtimeTablePartitionSelector.java Fetches and uses stream partition IDs for instance assignment
RealtimeSegmentAssignment.java Updates segment assignment to handle non-contiguous partition IDs
InstanceAssignmentTest.java Tests single-partition subset with non-zero ID
QuickStartBase.java Adds fineFoodReviews-part-0 and fineFoodReviews-part-1 examples
examples/stream/subsetPartitions/* Example configuration and documentation
examples/stream/fineFoodReviews-part-/ Demo tables consuming single partitions

@codecov-commenter
Copy link

codecov-commenter commented Jan 27, 2026

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
11011 1 11010 48
View the top 3 failed test(s) by shortest run time
org.apache.pinot.controller.helix.core.minion.PinotTaskManagerDistributedLockingTest::testForceReleaseLockDuringTaskExecution
Stack Traces | 18.7s run time
Both createTask calls should have generated tasks expected [2] but found [3]
org.apache.pinot.controller.helix.core.minion.PinotTaskManagerDistributedLockingTest::testForceReleaseLockDuringTaskExecution
Stack Traces | 21.4s run time
Both createTask calls should have generated tasks expected [2] but found [3]
org.apache.pinot.integration.tests.UpsertTableIntegrationTest::testUpsertCompactionWithSoftDelete
Stack Traces | 608s run time
Failed to meet condition in 600000ms, error message: Failed to load all documents

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch 3 times, most recently from ab92eca to 3760d06 Compare January 28, 2026 05:45
@xiangfu0 xiangfu0 requested a review from Copilot January 28, 2026 10:07
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 3 comments.

@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch from 3760d06 to 4235903 Compare January 28, 2026 13:01
@xiangfu0 xiangfu0 requested a review from Copilot January 28, 2026 13:01
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.

@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch 3 times, most recently from 0379c91 to 0374b0b Compare January 28, 2026 17:18
@xiangfu0 xiangfu0 requested a review from Copilot January 29, 2026 13:34
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 4 comments.

@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch 2 times, most recently from 9220944 to d045fa9 Compare January 29, 2026 16:09
@xiangfu0 xiangfu0 requested a review from Copilot January 29, 2026 16:25
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.

@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch 2 times, most recently from 03fed47 to 3855de1 Compare February 3, 2026 16:36
@xiangfu0 xiangfu0 requested a review from Jackie-Jiang February 3, 2026 16:39
@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch from 3855de1 to 548b943 Compare February 5, 2026 11:29
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 5 comments.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 4 comments.

@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch from 8566b36 to 7ba5447 Compare February 24, 2026 18:54
@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch 3 times, most recently from bac1ecf to 32fa5a1 Compare February 26, 2026 15:26
@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch from 32fa5a1 to 159ba84 Compare February 27, 2026 21:51
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Changing InstancePartitions format can be risky because it is involved in partition handling, where numPartitions is used.
Alternatively, we can use a different setup to avoid changing it. We can make multiple tables share the same InstancePartition, where each of them consume a subset of partitions. This way the only change needed is on segment creation side, where we only create segment for the configured partitions. Other part automatically works.

@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch from 49b4b3e to 5f1d26f Compare February 28, 2026 08:24
@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch 5 times, most recently from 702cbdf to ad40be3 Compare March 2, 2026 21:21
@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch 2 times, most recently from a5e8de3 to c7bace2 Compare March 3, 2026 06:41
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
int numPartitionGroups = consumeMeta.size();
int numPartitionGroups = getPartitionCountForRouting(streamConfigs, consumeMeta.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you calculate this on the caller side and pass it in? This info should be available when calculating the consumeMeta

@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch from c7bace2 to 3b2f105 Compare March 5, 2026 23:41
Allow a realtime table to consume only a subset of Kafka partitions by
configuring `stream.kafka.partition.ids` (e.g. "0,2,5"). This enables
splitting a single Kafka topic across multiple Pinot tables for
independent scaling and isolation.

Key changes:
- Add KafkaPartitionSubsetUtils to parse and validate partition ID
  configuration from StreamConfig
- Update KafkaStreamMetadataProvider (kafka30/kafka40) to filter
  partitions and offsets based on the configured subset while still
  returning the total Kafka partition count for instance assignment
- Update PinotLLCRealtimeSegmentManager to use total partition count
  from instance partitions for segment ZK metadata, ensuring correct
  broker query routing across subset tables
- Add QuickStart example with two subset tables splitting a 2-partition
  topic (fineFoodReviews_part_0 and fineFoodReviews_part_1)
- Add unit tests, integration tests, and a chaos integration test
  validating partition assignment, segment creation, and query routing
@xiangfu0 xiangfu0 force-pushed the kafka-subset-partitions branch from 3b2f105 to ae62d10 Compare March 7, 2026 05:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature ingestion kafka release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants