[CELEBORN-2166] Mark shuffle data lost and fast fail if allocated worker is lost#3726
[CELEBORN-2166] Mark shuffle data lost and fast fail if allocated worker is lost#3726s0nskar wants to merge 3 commits into
Conversation
SteNicholas
left a comment
There was a problem hiding this comment.
Reviewed the eager mark-data-lost-on-unknown-worker change. The mechanism and the new tests are sound, and the setStageEnd/handleGetReducerFileGroup lock handshake stays correct with the new heartbeat-thread caller (no missed/double reply). The main concern is the combination of (a) flipping the default to true and (b) the marking being irreversible while WORKER_UNKNOWN can be transient. Details inline.
| } | ||
| override def markShuffleDataLost(shuffleId: Int): Unit = { | ||
| logWarning(s"Marking shuffle $shuffleId data as lost due to unknown/crashed worker.") | ||
| dataLostShuffleSet.add(shuffleId) |
There was a problem hiding this comment.
Irreversible marking vs. transient WORKER_UNKNOWN. markShuffleDataLost does dataLostShuffleSet.add + setStageEnd, and neither is undone for the life of the shuffle (only removeExpiredShuffle clears it). But WORKER_UNKNOWN is transient: the master computes unknownWorkers = needCheckedWorkerList.filterNot(workersMap.containsKey) (Master.scala:1241), so a still-alive worker that briefly leaves workersMap — master failover/restart rebuilding state, a heartbeat-timeout eviction, or a long GC pause — is reported unknown for a heartbeat and then recovers on re-registration.
The removed isStageDataLostInUnknownWorker was evaluated live on every isStageDataLost call and reverted once the worker left excludedWorkers. With this change a transient blip permanently marks every affected shuffle SHUFFLE_DATA_LOST and force-recomputes the stage, even though the committed data is intact. Consider re-validating against current worker status before failing, or keeping the mark reversible while the worker is only unknown (not confirmed lost).
| "This has no effect when ${CLIENT_PUSH_REPLICATE_ENABLED.key}=true") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
| .createWithDefault(true) |
There was a problem hiding this comment.
Default flip to true turns this on by default for the common case. celeborn.client.push.replicate.enabled defaults to false (CelebornConf.scala:4856) and this feature is gated by !pushReplicateEnabled, so flipping the default to true enables the aggressive fast-fail for every non-replicated deployment on upgrade. Combined with the irreversibility noted on ReducePartitionCommitHandler.markShuffleDataLost, a single master restart can trigger cluster-wide unnecessary stage recomputes.
Granularity is also coarse: a shuffle is failed whenever the unknown worker still appears as a key in shuffleAllocatedWorkers, even if its partitions were already revived/migrated to healthy workers. Worth confirming the default flip is intended for 0.7.0.
| override def notifyChangedWorkersStatus(workersStatus: WorkersStatus): Unit = { | ||
| if (shuffleDataLostOnUnknownWorkerEnabled && !pushReplicateEnabled) { | ||
| if (workersStatus.unknownWorkers != null && !workersStatus.unknownWorkers.isEmpty) { | ||
| lifecycleManager.shuffleAllocatedWorkers.asScala.foreach { |
There was a problem hiding this comment.
This foreach has no per-shuffle guard. markShuffleDataLost → getCommitHandler(shuffleId) → lifecycleManager.getPartitionType(shuffleId); if that is transiently null for a shuffle being torn down (or returns an unexpected type → the case _ => throw UnsupportedOperationException in getCommitHandler), the exception propagates out of notifyChangedWorkersStatus and is only caught at WorkerStatusTracker.scala:207, abandoning the rest of the loop so other affected shuffles aren't marked this heartbeat. Since marking is per-shuffle, wrap each iteration in its own try/catch (log and continue).
| "unknown worker is immediately marked as data lost. " + | ||
| "On the write flow revive/commit request for that shuffle will fast fail. " + | ||
| "GetReducerFileGroup requests are replied with SHUFFLE_DATA_LOST. " + | ||
| "This has no effect when ${CLIENT_PUSH_REPLICATE_ENABLED.key}=true") |
There was a problem hiding this comment.
This is a plain concatenated string (no s interpolator), so ${CLIENT_PUSH_REPLICATE_ENABLED.key} is emitted literally — it already shows up verbatim in the generated docs/configuration/client.md row. Prefix the doc string with s (as surrounding entries do) so it renders celeborn.client.push.replicate.enabled.
| private val pushRackAwareEnabled = conf.clientReserveSlotsRackAwareEnabled | ||
| private val partitionSplitThreshold = conf.shufflePartitionSplitThreshold | ||
| private val partitionSplitMode = conf.shufflePartitionSplitMode | ||
| private val shuffleDataLostOnUnknownWorkerEnabled = |
There was a problem hiding this comment.
This field looks unused — UnknownWorkerListener keeps its own private val shuffleDataLostOnUnknownWorkerEnabled = conf.clientShuffleDataLostOnUnknownWorkerEnabled (in CommitManager) and reads that. Either remove this one or route the listener through it.
|
|
||
| - Since 0.7.0, Celeborn changed the default value of `celeborn.port.maxRetries` from `1` to `16`. | ||
|
|
||
| - Since 0.7.0, Celeborn change the default value of `celeborn.client.shuffleDataLostOnUnknownWorker.enabled` from `false` to `true`, which means Celeborn will treat shuffle data lost when unknown worker is detected at default. |
There was a problem hiding this comment.
Grammar: "Celeborn change" → "changed", and "treat shuffle data lost" → "treat shuffle data as lost". Consider also adding the replication caveat (no effect when celeborn.client.push.replicate.enabled=true) to match the config doc.
What changes were proposed in this pull request?
Enhancing the logic of #3496
Why are the changes needed?
#3496 only handles the reduce side flow i.e GetReducerFileGroup request will fail if the shuffle data is mark lost.
In this PR, we are making use of the
WorkerStatusListenerto immediately detect the lost workers, mark the data lost of the stage and immediately issue stage end for that stage. This will also allow write stage to fast-fail during revives and commit request, otherwise write stage will run as usual and then reduce will fail at startup. This will lead to lot of resource and time wastage.Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?