Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pauseless Ingestion #2: Handle Failure scenarios without DR #14798

Open
wants to merge 30 commits into
base: master
Choose a base branch
from

Conversation

9aman
Copy link
Contributor

@9aman 9aman commented Jan 13, 2025

Pauseless Ingestion Failure Resolution

Please refer to PR: #14741 for happy path. This PR aims to only cover the failure scenarios. Once the above one is merged a better diff covering only failures will be visible.

To view only diff covering failure scenarios, for the time being, refer to:

Summary

This PR aims to provide ways to resolve the failure scenarios that we can encounter during pauseless ingestion. The detailed list of failure scenarios can be found here: link along with the failure handling strategies: link

Following sequence diagrams summarizes the failure scenarios and the resolution.
Screenshot 2025-01-03 at 2 53 46 PM
Screenshot 2025-01-03 at 2 54 45 PM

Failure Scenarios & Resolution Approaches

Failures encountered during the commit protocol can be categorized into two types: recoverable and unrecoverable failures.

Recoverable failures are those in which at least one of the servers retains the segment on disk.

Unrecoverable failures occur when none of the servers have the segment on disk.

Recoverable Failures

Recoverable failures will be addressed through RealtimeSegmentValidationManager. This approach will handle scenarios such as upload failures and incomplete commit protocol executions.

The controller or server can run into issues in between any of the steps of the commit protocol as listed below:

Request Type: COMMIT_START

  1. Update the Segment ZK metadata for the committing segment (seg__0__0)
    • Change status to COMMITTING
    • Set endOffset
  2. Create Segment ZK metadata for the new segment (seg__0__1) with status IN_PROGRESS
  3. Update the Ideal State for the:
    • Committing segment (seg__0__0) to ONLINE
    • New/ Consuming segment (seg__0__1) to CONSUMING

Request Type: COMMIT_END_METADATA
4. Update Segment ZK metadata for the committing segment (seg__0__0):
- Change status to DONE.
- Update deepstore url.
- Any additional metadata.

The RealtimeSegmentValidationManager figures out which step of the commit protocol failed and how can it be fixed. This is very similar to how commit protocol failures were handled before with some minor changes.

Non-recoverable Failures

These failures require ingesting the segment again from upstream, followed by build, upload and ZK metadata update.

9aman and others added 15 commits January 2, 2025 16:57
1. Changing FSM
2. Changing the 3 steps performed during the commit protocol to update ZK and Ideal state
1. Changes in the commit protocol to start segment commit before the build
2. Changes in the BaseTableDataManager to ensure that the locally built segment is replaced by a downloaded one
   only when the CRC is present in the ZK Metadata
3. Changes in the download segment method to allow waited download in case of pauseless consumption
…segment commit end metadata call

Refactoing code for redability
… ingestion by moving it out of streamConfigMap
…auseless ingestion in RealtimeSegmentValidationManager
…d by RealtimeSegmentValitdationManager to fix commit protocol failures
@KKcorps KKcorps requested a review from Jackie-Jiang January 14, 2025 01:59
@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes ingestion real-time labels Jan 14, 2025
@@ -171,6 +172,9 @@ private void runSegmentLevelValidation(TableConfig tableConfig) {
// Update the total document count gauge
_validationMetrics.updateTotalDocumentCountGauge(realtimeTableName, computeTotalDocumentCount(segmentsZKMetadata));

_llcRealtimeSegmentManager.reIngestSegmentsWithErrorState(tableConfig.getTableName());
// _llcRealtimeSegmentManager.resetUploadedSegmentsInErrorState(tableConfig.getTableName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if this commented line is needed or not, otherwise remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that reingestion does this I think it's okay to remove this.
@KKcorps can you please do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

return;
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
// For pauseless tables, we should replace the segment if download url is missing even if crc is same
if (!PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also LOG that since pauseless is enabled we are not going directly for segment download but waiting for catchup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KKcorps I don't understand why this is different for pauseless. I see you have added these along with reingestion tests.


protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
if (_failCommit) {
System.out.println("Forcing failure in buildSegmentInternal");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this sysout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@@ -884,7 +892,7 @@ public String uploadLLCSegment(
@ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class),
@ApiResponse(code = 400, message = "Bad request", response = ErrorInfo.class)
})
public TableLLCSegmentUploadResponse uploadLLCSegmentV2(
public String uploadLLCSegmentV2(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not change this to String as it will not ensure backward compatibility.
If you need something, add a new param in TableLLCSegmentUploadResponse itself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like you need a new API here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding a new API and making the required changes on the controller end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new API

private static final ConcurrentHashMap<String, AtomicBoolean> SEGMENT_INGESTION_MAP = new ConcurrentHashMap<>();

// Semaphore to enforce global concurrency limit
private static final Semaphore REINGESTION_SEMAPHORE = new Semaphore(MAX_PARALLEL_REINGESTIONS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this count configurable

@@ -543,13 +555,22 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
SegmentZKMetadata committingSegmentZKMetadata =
updateCommittingSegmentMetadata(realtimeTableName, committingSegmentDescriptor, isStartMetadata);

// Used to inject failure for testing. RealtimeSegmentValidationManager should be able to fix the
// segment that encounter failure at this stage of commit protocol.
FailureInjectionUtils.injectFailure(FailureInjectionUtils.FAULT_BEFORE_NEW_SEGMENT_METADATA_CREATION,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better way to inject failures for test can be to extends this PinotLLCRealtimeSegmentDataManager that throws failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I have kept the current approach.

*/
private void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata, SegmentZKMetadata uploadedSegmentZKMetadata,
boolean isPauselessEnabled) {
if (isPauselessEnabled && segmentZKMetadata.getCrc() == SegmentZKMetadata.DEFAULT_CRC_VALUE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the CRC check here be changed to Status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have changes it to status

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially reviewed. I'd suggest breaking this PR into smaller ones, separating the recoverable handling (i.e. ValidationManager) and reingest part

return mapper;
}

public static String serialize(SegmentZKMetadata metadata) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(format) Please apply Pinot Style and reformat all the changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private SegmentZKMetadataUtils() {
}

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentZKMetadataUtils.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Not used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentZKMetadataUtils.class);
public static final ObjectMapper MAPPER = createObjectMapper();

private static ObjectMapper createObjectMapper() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason for adding this custom mapper instead of using the JsonUtils?
This change is independent of failure handling, so if this is really required, can you put this as a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uploadLLCToSegmentStoreWithZKMetadata uses this. I was running into issues while serializing/ deserializing SegmentZKMetadata directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Actually we can simply put the segment name and _simpleFields into a json, and use it to ser/de it. It is not designed to be serialized with ObjectMapper

* @throws URISyntaxException
* @throws IOException
* @throws HttpErrorStatusException
*/
public TableLLCSegmentUploadResponse uploadLLCToSegmentStore(String uri)
public SegmentZKMetadata uploadLLCToSegmentStore(String uri)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is backward incompatible. You might need to add a new API for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a new APi. On the controller side I have ensured backward compatibility by trying the previous endpoints as well in case the latest endpoint for segment upload fails.

@9aman 9aman changed the base branch from pauseless_ingestion_without_failure_scenarios to master January 15, 2025 03:30
@9aman 9aman changed the base branch from master to pauseless_ingestion_without_failure_scenarios January 15, 2025 03:31
@KKcorps
Copy link
Contributor

KKcorps commented Jan 15, 2025

Also add server metrics to count how many reIngestions are in progress

@9aman 9aman changed the base branch from pauseless_ingestion_without_failure_scenarios to master January 15, 2025 08:26
@9aman 9aman force-pushed the resolve-failures-pauseless-ingestion branch from 2b7e26a to d6208a6 Compare January 15, 2025 08:51
@codecov-commenter
Copy link

codecov-commenter commented Jan 15, 2025

Codecov Report

Attention: Patch coverage is 38.26087% with 142 lines in your changes missing coverage. Please review.

Project coverage is 63.72%. Comparing base (59551e4) to head (c4b99bd).
Report is 1600 commits behind head on master.

Files with missing lines Patch % Lines
...che/pinot/server/api/resources/TablesResource.java 0.00% 70 Missing ⚠️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 55.00% 40 Missing and 5 partials ⚠️
...e/pinot/common/utils/FileUploadDownloadClient.java 0.00% 10 Missing ⚠️
...ommon/metadata/segment/SegmentZKMetadataUtils.java 75.00% 3 Missing and 3 partials ⚠️
...r/validation/RealtimeSegmentValidationManager.java 0.00% 3 Missing ⚠️
...troller/helix/core/util/FailureInjectionUtils.java 50.00% 1 Missing and 1 partial ⚠️
...ata/manager/realtime/RealtimeTableDataManager.java 0.00% 2 Missing ⚠️
...altime/ServerSegmentCompletionProtocolHandler.java 0.00% 2 Missing ⚠️
...ntroller/helix/core/PinotHelixResourceManager.java 0.00% 1 Missing ⚠️
.../pinot/core/data/manager/BaseTableDataManager.java 88.88% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14798      +/-   ##
============================================
+ Coverage     61.75%   63.72%   +1.96%     
- Complexity      207     1610    +1403     
============================================
  Files          2436     2710     +274     
  Lines        133233   151476   +18243     
  Branches      20636    23379    +2743     
============================================
+ Hits          82274    96522   +14248     
- Misses        44911    47712    +2801     
- Partials       6048     7242    +1194     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.69% <38.26%> (+1.98%) ⬆️
java-21 63.58% <38.26%> (+1.95%) ⬆️
skip-bytebuffers-false 63.71% <38.26%> (+1.97%) ⬆️
skip-bytebuffers-true 63.54% <38.26%> (+35.81%) ⬆️
temurin 63.72% <38.26%> (+1.96%) ⬆️
unittests 63.71% <38.26%> (+1.96%) ⬆️
unittests1 56.34% <59.61%> (+9.45%) ⬆️
unittests2 34.02% <26.95%> (+6.29%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


// Check if the segment is already being re-ingested
AtomicBoolean isIngesting = SEGMENT_INGESTION_MAP.computeIfAbsent(segmentName, k -> new AtomicBoolean(false));
if (!isIngesting.compareAndSet(false, true)) {
Copy link
Contributor Author

@9aman 9aman Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check whether the segment is being re-ingested before the acquiring the semaphore itself.

@KKcorps KKcorps changed the title Resolve failures pauseless ingestion Pauseless Ingestion #2: Handle Failure scenarios without DR Jan 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature ingestion real-time release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants