Skip to content

resume partially offline replicas in controller validation#17754

Open
rohityadav1993 wants to merge 1 commit intoapache:masterfrom
rohityadav1993:11314-rohity
Open

resume partially offline replicas in controller validation#17754
rohityadav1993 wants to merge 1 commit intoapache:masterfrom
rohityadav1993:11314-rohity

Conversation

@rohityadav1993
Copy link
Contributor

@rohityadav1993 rohityadav1993 commented Feb 24, 2026

Description

This change introduces automated repair for partially offline replicas in realtime segment consumption for both pauseless and non-pausless Realtime tables. This addresses scenarios(issue: #11314) where some replicas fail during initialization (e.g., KafkaConsumer errors) and mark themselves OFFLINE while other replicas continue consuming normally.

Changes

  • Added new configuration flag controller.realtime.segment.partialOfflineReplicaRepairEnabled (defaults to false)
  • Enhanced PinotLLCRealtimeSegmentManager validation to detect and repair mixed CONSUMING/OFFLINE replica states
  • When enabled, controller automatically resets OFFLINE replicas back to CONSUMING state for IN_PROGRESS segments, allowing retry

Implementation Details

Configuration:

  • New property: controller.realtime.segment.partialOfflineReplicaRepairEnabled in ControllerConf
  • Default: false (opt-in for backward compatibility)

Repair Logic:

  • Detects segments with mixed CONSUMING/OFFLINE replica states during validation
  • Logs repair actions with details (segment name, offline count, instance list)
  • Resets identified OFFLINE replicas to CONSUMING state

Testing:
Unit Tests:

  • Added unit tests for enabled scenario (verifies OFFLINE→CONSUMING transition)
  • Added unit tests for disabled scenario (verifies no-op behavior)

Local cluster test pLan:

  • Set up realtime table(kafka) with two servers and two replicas, partialOfflineReplicaRepairEnabled = true
  • Mangle DNS config: echo "nameserver 0.0.0.0" > /etc/resolv.conf in server-1
  • Force commit, new consuming server on server-1 comes up in error state and moves to OFFLINE state while server-2 is in CONSUMING state
  • Run controller validation job: RealtimeSegmentValidationManager
  • The replica becomes healthy in CONSUMING state.

Upgrade Notes

This feature is disabled by default. To enable, set:
controller.realtime.segment.partialOfflineReplicaRepairEnabled=true

@codecov-commenter
Copy link

codecov-commenter commented Feb 24, 2026

Codecov Report

❌ Patch coverage is 93.33333% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 63.21%. Comparing base (8845675) to head (bd8b0f5).
⚠️ Report is 6 commits behind head on master.

Files with missing lines Patch % Lines
.../core/realtime/PinotLLCRealtimeSegmentManager.java 92.85% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17754      +/-   ##
============================================
- Coverage     63.23%   63.21%   -0.02%     
- Complexity     1448     1454       +6     
============================================
  Files          3176     3176              
  Lines        191025   191041      +16     
  Branches      29206    29211       +5     
============================================
- Hits         120788   120775      -13     
- Misses        60814    60838      +24     
- Partials       9423     9428       +5     
Flag Coverage Δ
custom-integration1 100.00% <ø> (?)
integration 100.00% <ø> (+100.00%) ⬆️
integration1 100.00% <ø> (?)
integration2 0.00% <ø> (ø)
java-11 63.15% <93.33%> (-0.02%) ⬇️
java-21 63.19% <93.33%> (+0.01%) ⬆️
temurin 63.21% <93.33%> (-0.02%) ⬇️
unittests 63.21% <93.33%> (-0.02%) ⬇️
unittests1 55.55% <ø> (-0.04%) ⬇️
unittests2 34.10% <93.33%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@rohityadav1993
Copy link
Contributor Author

Pinot Tests / Pinot Integration Test Set 1 - Unrelated failure in BaseBrokerRoutingManager

@rohityadav1993 rohityadav1993 marked this pull request as ready for review February 24, 2026 06:22
@rohityadav1993
Copy link
Contributor Author

cc: @tarun11Mavani, @Jackie-Jiang for review.

@anuragrai16
Copy link
Contributor

  1. Can you add details on what is the current behaviour ? I believe it retries upto 5 times to create the consumer and once failed, it sends a segmentStoppedConsuming back to controller. Why cant increasing this retries or make it perpetual also not solve this issue ?

  2. If the underlying issue persists, we have a continuous loop of :
    Validation → Repair (OFFLINE→CONSUMING) → Helix triggers transition →
    Server tries to consume → Fails → segmentStoppedConsuming → OFFLINE →
    Next Validation → Repair again. Im wondering what sort of race conditions can be caused by this ? And whether the server trying to start the consumption perpetually on its own and publishing a metric of retries is a safer fix ?

  3. To prevent (2) happening frequently, do we wanna add a lastRepairTime - and send a new event only after x seconds/minutes ?

}
} else if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS
&& _isPartialOfflineReplicaRepairEnabled) {
// Handle case where some replicas are OFFLINE while others are CONSUMING
Copy link
Contributor

@anuragrai16 anuragrai16 Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to other repairs, you should add max segment completion time check,

if (!isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) {
        continue; 
    }

This ensures your repair does not mess up any actively ongoing commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

@rohityadav1993 rohityadav1993 Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a harmless addition but I think this is not needed in this scenario. Segments are supposed to be IN_PROGRESS in metadata and we are not creating new consuming segment with a different metadata and neither we are making any change to mark segment DONE in IdealState

}
} else if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS
&& _isPartialOfflineReplicaRepairEnabled) {
// Handle case where some replicas are OFFLINE while others are CONSUMING
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
instancePartitionsMap);
}
} else if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only checking for IN_PROGRESS status, this should work for pauseless consumption as well. Can you call this out in the PR description for clarity.

@rohityadav1993
Copy link
Contributor Author

  1. Can you add details on what is the current behaviour ? I believe it retries upto 5 times to create the consumer and once failed, it sends a segmentStoppedConsuming back to controller. Why cant increasing this retries or make it perpetual also not solve this issue ?

This is right, 5 retries with 2 seconds delay, so in 10 seconds we exhaust all attempts to recover. This happens in RealtimeSegmentDataManager.java#L1793-L1805
We can only have limited retries here to avoid indefiinitely waiting for any permanent underlying errors. Controller validation job is post these retries exhaust.

  1. If the underlying issue persists, we have a continuous loop of :
    Validation → Repair (OFFLINE→CONSUMING) → Helix triggers transition →
    Server tries to consume → Fails → segmentStoppedConsuming → OFFLINE →
    Next Validation → Repair again. Im wondering what sort of race conditions can be caused by this ? And whether the
    server trying to start the consumption perpetually on its own and publishing a metric of retries is a safer fix ?

This should be a fairly safe scenario as we are not committing/creating new segment with new metadata but basically resurrecting an offline segment to consuming only if segement is in IN_PROGRESS state in ZK metadata.

  1. To prevent (2) happening frequently, do we wanna add a lastRepairTime - and send a new event only after x seconds/minutes ?

The periodicity of RealtimeSegmentValidationManager is configrable, default: 3600 so this wouldn't be too aggressive if configured correctly.

@shauryachats shauryachats added feature bugfix release-notes Referenced by PRs that need attention when compiling the next release notes labels Feb 25, 2026
Copy link
Contributor

@noob-se7en noob-se7en left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If offline -> consuming transition fails due to any error, The segment will be marked in ERROR state and the RVM job will auto-reset the segment.

If consumer thread fails then yes the IS of that segment is marked offline - but I wouldn't recommend changing the IS to consuming in RVM. The IS of the segment can also be offline due to a rebalance. Best solution will be to somehow mark segment in error state from consumer thread itself.

@noob-se7en
Copy link
Contributor

I dont think this is a bugfix.

@rohityadav1993
Copy link
Contributor Author

If offline -> consuming transition fails due to any error, The segment will be marked in ERROR state and the RVM job will auto-reset the segment.

Offline -> Consuming errors are currently handled by updating IS to offline(there is a intermediate step when EV would go in error state but IS is always updated to Offline state, RealtimeSegmentDataManager.java#L1418). The error states in EV happens when the segment is supposet to be stopped after commit criteria. But we are only checking segment metadata and IS to take decision here.

If consumer thread fails then yes the IS of that segment is marked offline - but I wouldn't recommend changing the IS to consuming in RVM. The IS of the segment can also be offline due to a rebalance. Best solution will be to somehow mark segment in error state from consumer thread itself.

For rebalance scenario for consuming segments the RealtimeSegmentAssigner does not update consuming segments to OFFLINE state but directly to CONSUMING state(RealtimeSegmentAssignment.java#L245-L257), so there is no scenario where we have a consuming segment set to OFFLINE state which we are trying to update in this PR. We also have both these operations happening only in Leader controller for the table and behind optmisitic lock for IS update to avoid race conditions.

@noob-se7en
Copy link
Contributor

noob-se7en commented Feb 27, 2026

Only for below code in offline -> consuming we mark segment as offline right?

try {
_startOffset = _partitionGroupConsumptionStatus.getStartOffset();
_currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
makeStreamConsumer("Starting");
createPartitionMetadataProvider("Starting");
setPartitionParameters(realtimeSegmentConfigBuilder, indexingConfig.getSegmentPartitionConfig());
_realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics);
_resourceTmpDir = new File(resourceDataDir, RESOURCE_TEMP_DIR_NAME);
if (!_resourceTmpDir.exists()) {
_resourceTmpDir.mkdirs();
}
_state = State.INITIAL_CONSUMING;
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
setConsumeEndTime(segmentZKMetadata, now());
_segmentCommitterFactory =
new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
} catch (Throwable t) {

I believe for any other exceptions like occuring from failures in creation of stream message decoder, etc causes the segment to go in a error state.

I will recheck about the rebalance case, Thanks for providing additional context.

I have thought about implementing this solution before but I was thinking of coming up with some solution where we could just remove the dependency of calling controller in an infinte loop from server until controller marks the segment in offline state. I couldn't find a way to mark segment in error state from inside consumer thread if it runs into some exception. If we can do that then controller just resets the error segments.

@rohityadav1993
Copy link
Contributor Author

@noob-se7en ,
there is one more place where it is put in offline state in IS: https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java#L968-L976

The way the class has been designed is, EV will always go to ERROR state for the segment except if the segment is IN_PROGRESS in propertyStore(metadata) where the controller puts the segment in OFFLINE state in IS after getting signaled from RSDM when postStopConsumedMsg invoked.

Please do review this, for safety I have put this behind a controller config and tested in a docker set up.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an opt-in controller-side validation/repair path to recover from realtime LLC segments that end up with mixed replica states (some CONSUMING, some OFFLINE) for IN_PROGRESS segments—allowing the controller to flip OFFLINE replicas back to CONSUMING so they can retry consumption (addressing #11314).

Changes:

  • Added a new controller config flag controller.realtime.segment.partialOfflineReplicaRepairEnabled (default false).
  • Updated PinotLLCRealtimeSegmentManager.ensureAllPartitionsConsuming(...) to detect IN_PROGRESS segments with OFFLINE replicas and reset them to CONSUMING when the flag is enabled.
  • Added unit tests covering both enabled and disabled behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java Implements the mixed CONSUMING/OFFLINE detection and repair for IN_PROGRESS segments when enabled.
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java Introduces the new configuration key and accessor (isPartialOfflineReplicaRepairEnabled()).
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java Adds tests verifying OFFLINE→CONSUMING repair when enabled and no-op when disabled.

@rohityadav1993
Copy link
Contributor Author

Can I get a merge for this if it looks good. @chenboat, @Jackie-Jiang, @xiangfu0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bugfix feature release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants