-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pauseless Ingestion #2: Handle Failure scenarios without DR #14798
base: master
Are you sure you want to change the base?
Conversation
1. Changing FSM 2. Changing the 3 steps performed during the commit protocol to update ZK and Ideal state
1. Changes in the commit protocol to start segment commit before the build 2. Changes in the BaseTableDataManager to ensure that the locally built segment is replaced by a downloaded one only when the CRC is present in the ZK Metadata 3. Changes in the download segment method to allow waited download in case of pauseless consumption
…segment commit end metadata call Refactoing code for redability
… ingestion by moving it out of streamConfigMap
…auseless ingestion in RealtimeSegmentValidationManager
…d by RealtimeSegmentValitdationManager to fix commit protocol failures
…g commit protocol
@@ -171,6 +172,9 @@ private void runSegmentLevelValidation(TableConfig tableConfig) { | |||
// Update the total document count gauge | |||
_validationMetrics.updateTotalDocumentCountGauge(realtimeTableName, computeTotalDocumentCount(segmentsZKMetadata)); | |||
|
|||
_llcRealtimeSegmentManager.reIngestSegmentsWithErrorState(tableConfig.getTableName()); | |||
// _llcRealtimeSegmentManager.resetUploadedSegmentsInErrorState(tableConfig.getTableName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check if this commented line is needed or not, otherwise remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that reingestion does this I think it's okay to remove this.
@KKcorps can you please do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
return; | ||
TableConfig tableConfig = indexLoadingConfig.getTableConfig(); | ||
// For pauseless tables, we should replace the segment if download url is missing even if crc is same | ||
if (!PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also LOG that since pauseless is enabled we are not going directly for segment download but waiting for catchup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KKcorps I don't understand why this is different for pauseless. I see you have added these along with reingestion tests.
|
||
protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { | ||
if (_failCommit) { | ||
System.out.println("Forcing failure in buildSegmentInternal"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this sysout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@@ -884,7 +892,7 @@ public String uploadLLCSegment( | |||
@ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class), | |||
@ApiResponse(code = 400, message = "Bad request", response = ErrorInfo.class) | |||
}) | |||
public TableLLCSegmentUploadResponse uploadLLCSegmentV2( | |||
public String uploadLLCSegmentV2( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not change this to String as it will not ensure backward compatibility.
If you need something, add a new param in TableLLCSegmentUploadResponse
itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like you need a new API here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am adding a new API and making the required changes on the controller end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a new API
private static final ConcurrentHashMap<String, AtomicBoolean> SEGMENT_INGESTION_MAP = new ConcurrentHashMap<>(); | ||
|
||
// Semaphore to enforce global concurrency limit | ||
private static final Semaphore REINGESTION_SEMAPHORE = new Semaphore(MAX_PARALLEL_REINGESTIONS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this count configurable
@@ -543,13 +555,22 @@ private void commitSegmentMetadataInternal(String realtimeTableName, | |||
SegmentZKMetadata committingSegmentZKMetadata = | |||
updateCommittingSegmentMetadata(realtimeTableName, committingSegmentDescriptor, isStartMetadata); | |||
|
|||
// Used to inject failure for testing. RealtimeSegmentValidationManager should be able to fix the | |||
// segment that encounter failure at this stage of commit protocol. | |||
FailureInjectionUtils.injectFailure(FailureInjectionUtils.FAULT_BEFORE_NEW_SEGMENT_METADATA_CREATION, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A better way to inject failures for test can be to extends this PinotLLCRealtimeSegmentDataManager
that throws failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I have kept the current approach.
*/ | ||
private void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata, SegmentZKMetadata uploadedSegmentZKMetadata, | ||
boolean isPauselessEnabled) { | ||
if (isPauselessEnabled && segmentZKMetadata.getCrc() == SegmentZKMetadata.DEFAULT_CRC_VALUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the CRC check here be changed to Status?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have changes it to status
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially reviewed. I'd suggest breaking this PR into smaller ones, separating the recoverable handling (i.e. ValidationManager) and reingest part
return mapper; | ||
} | ||
|
||
public static String serialize(SegmentZKMetadata metadata) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(format) Please apply Pinot Style and reformat all the changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
private SegmentZKMetadataUtils() { | ||
} | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentZKMetadataUtils.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) Not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentZKMetadataUtils.class); | ||
public static final ObjectMapper MAPPER = createObjectMapper(); | ||
|
||
private static ObjectMapper createObjectMapper() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason for adding this custom mapper instead of using the JsonUtils
?
This change is independent of failure handling, so if this is really required, can you put this as a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uploadLLCToSegmentStoreWithZKMetadata
uses this. I was running into issues while serializing/ deserializing SegmentZKMetadata directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Actually we can simply put the segment name and _simpleFields
into a json, and use it to ser/de it. It is not designed to be serialized with ObjectMapper
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
* @throws URISyntaxException | ||
* @throws IOException | ||
* @throws HttpErrorStatusException | ||
*/ | ||
public TableLLCSegmentUploadResponse uploadLLCToSegmentStore(String uri) | ||
public SegmentZKMetadata uploadLLCToSegmentStore(String uri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is backward incompatible. You might need to add a new API for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have added a new APi. On the controller side I have ensured backward compatibility by trying the previous endpoints as well in case the latest endpoint for segment upload fails.
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
Also add server metrics to count how many reIngestions are in progress |
2b7e26a
to
d6208a6
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14798 +/- ##
============================================
+ Coverage 61.75% 63.72% +1.96%
- Complexity 207 1610 +1403
============================================
Files 2436 2710 +274
Lines 133233 151476 +18243
Branches 20636 23379 +2743
============================================
+ Hits 82274 96522 +14248
- Misses 44911 47712 +2801
- Partials 6048 7242 +1194
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
…ption is enabled or not
…eepstore path with fallbacks
|
||
// Check if the segment is already being re-ingested | ||
AtomicBoolean isIngesting = SEGMENT_INGESTION_MAP.computeIfAbsent(segmentName, k -> new AtomicBoolean(false)); | ||
if (!isIngesting.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can check whether the segment is being re-ingested before the acquiring the semaphore itself.
Pauseless Ingestion Failure Resolution
Please refer to PR: #14741 for happy path. This PR aims to only cover the failure scenarios. Once the above one is merged a better diff covering only failures will be visible.
To view only diff covering failure scenarios, for the time being, refer to:
Summary
This PR aims to provide ways to resolve the failure scenarios that we can encounter during pauseless ingestion. The detailed list of failure scenarios can be found here: link along with the failure handling strategies: link
Following sequence diagrams summarizes the failure scenarios and the resolution.
Failure Scenarios & Resolution Approaches
Failures encountered during the commit protocol can be categorized into two types: recoverable and unrecoverable failures.
Recoverable failures are those in which at least one of the servers retains the segment on disk.
Unrecoverable failures occur when none of the servers have the segment on disk.
Recoverable Failures
Recoverable failures will be addressed through RealtimeSegmentValidationManager. This approach will handle scenarios such as upload failures and incomplete commit protocol executions.
The controller or server can run into issues in between any of the steps of the commit protocol as listed below:
Request Type: COMMIT_START
Request Type: COMMIT_END_METADATA
4. Update Segment ZK metadata for the committing segment (seg__0__0):
- Change status to DONE.
- Update deepstore url.
- Any additional metadata.
The RealtimeSegmentValidationManager figures out which step of the commit protocol failed and how can it be fixed. This is very similar to how commit protocol failures were handled before with some minor changes.
Non-recoverable Failures
These failures require ingesting the segment again from upstream, followed by build, upload and ZK metadata update.