Skip to content

[CELEBORN-2166] Mark shuffle data lost and fast fail if allocated worker is lost#3726

Open
s0nskar wants to merge 3 commits into
apache:mainfrom
s0nskar:fix_fetch_failures
Open

[CELEBORN-2166] Mark shuffle data lost and fast fail if allocated worker is lost#3726
s0nskar wants to merge 3 commits into
apache:mainfrom
s0nskar:fix_fetch_failures

Conversation

@s0nskar

@s0nskar s0nskar commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Enhancing the logic of #3496

Why are the changes needed?

#3496 only handles the reduce side flow i.e GetReducerFileGroup request will fail if the shuffle data is mark lost.

In this PR, we are making use of the WorkerStatusListener to immediately detect the lost workers, mark the data lost of the stage and immediately issue stage end for that stage. This will also allow write stage to fast-fail during revives and commit request, otherwise write stage will run as usual and then reduce will fail at startup. This will lead to lot of resource and time wastage.

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

How was this patch tested?

  • Added UTs
  • Working on staging testing.

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the eager mark-data-lost-on-unknown-worker change. The mechanism and the new tests are sound, and the setStageEnd/handleGetReducerFileGroup lock handshake stays correct with the new heartbeat-thread caller (no missed/double reply). The main concern is the combination of (a) flipping the default to true and (b) the marking being irreversible while WORKER_UNKNOWN can be transient. Details inline.

}
override def markShuffleDataLost(shuffleId: Int): Unit = {
logWarning(s"Marking shuffle $shuffleId data as lost due to unknown/crashed worker.")
dataLostShuffleSet.add(shuffleId)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Irreversible marking vs. transient WORKER_UNKNOWN. markShuffleDataLost does dataLostShuffleSet.add + setStageEnd, and neither is undone for the life of the shuffle (only removeExpiredShuffle clears it). But WORKER_UNKNOWN is transient: the master computes unknownWorkers = needCheckedWorkerList.filterNot(workersMap.containsKey) (Master.scala:1241), so a still-alive worker that briefly leaves workersMap — master failover/restart rebuilding state, a heartbeat-timeout eviction, or a long GC pause — is reported unknown for a heartbeat and then recovers on re-registration.

The removed isStageDataLostInUnknownWorker was evaluated live on every isStageDataLost call and reverted once the worker left excludedWorkers. With this change a transient blip permanently marks every affected shuffle SHUFFLE_DATA_LOST and force-recomputes the stage, even though the committed data is intact. Consider re-validating against current worker status before failing, or keeping the mark reversible while the worker is only unknown (not confirmed lost).

"This has no effect when ${CLIENT_PUSH_REPLICATE_ENABLED.key}=true")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default flip to true turns this on by default for the common case. celeborn.client.push.replicate.enabled defaults to false (CelebornConf.scala:4856) and this feature is gated by !pushReplicateEnabled, so flipping the default to true enables the aggressive fast-fail for every non-replicated deployment on upgrade. Combined with the irreversibility noted on ReducePartitionCommitHandler.markShuffleDataLost, a single master restart can trigger cluster-wide unnecessary stage recomputes.

Granularity is also coarse: a shuffle is failed whenever the unknown worker still appears as a key in shuffleAllocatedWorkers, even if its partitions were already revived/migrated to healthy workers. Worth confirming the default flip is intended for 0.7.0.

override def notifyChangedWorkersStatus(workersStatus: WorkersStatus): Unit = {
if (shuffleDataLostOnUnknownWorkerEnabled && !pushReplicateEnabled) {
if (workersStatus.unknownWorkers != null && !workersStatus.unknownWorkers.isEmpty) {
lifecycleManager.shuffleAllocatedWorkers.asScala.foreach {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This foreach has no per-shuffle guard. markShuffleDataLostgetCommitHandler(shuffleId)lifecycleManager.getPartitionType(shuffleId); if that is transiently null for a shuffle being torn down (or returns an unexpected type → the case _ => throw UnsupportedOperationException in getCommitHandler), the exception propagates out of notifyChangedWorkersStatus and is only caught at WorkerStatusTracker.scala:207, abandoning the rest of the loop so other affected shuffles aren't marked this heartbeat. Since marking is per-shuffle, wrap each iteration in its own try/catch (log and continue).

"unknown worker is immediately marked as data lost. " +
"On the write flow revive/commit request for that shuffle will fast fail. " +
"GetReducerFileGroup requests are replied with SHUFFLE_DATA_LOST. " +
"This has no effect when ${CLIENT_PUSH_REPLICATE_ENABLED.key}=true")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a plain concatenated string (no s interpolator), so ${CLIENT_PUSH_REPLICATE_ENABLED.key} is emitted literally — it already shows up verbatim in the generated docs/configuration/client.md row. Prefix the doc string with s (as surrounding entries do) so it renders celeborn.client.push.replicate.enabled.

private val pushRackAwareEnabled = conf.clientReserveSlotsRackAwareEnabled
private val partitionSplitThreshold = conf.shufflePartitionSplitThreshold
private val partitionSplitMode = conf.shufflePartitionSplitMode
private val shuffleDataLostOnUnknownWorkerEnabled =

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field looks unused — UnknownWorkerListener keeps its own private val shuffleDataLostOnUnknownWorkerEnabled = conf.clientShuffleDataLostOnUnknownWorkerEnabled (in CommitManager) and reads that. Either remove this one or route the listener through it.

Comment thread docs/migration.md

- Since 0.7.0, Celeborn changed the default value of `celeborn.port.maxRetries` from `1` to `16`.

- Since 0.7.0, Celeborn change the default value of `celeborn.client.shuffleDataLostOnUnknownWorker.enabled` from `false` to `true`, which means Celeborn will treat shuffle data lost when unknown worker is detected at default.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar: "Celeborn change" → "changed", and "treat shuffle data lost" → "treat shuffle data as lost". Consider also adding the replication caveat (no effect when celeborn.client.push.replicate.enabled=true) to match the config doc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants