Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1904] Cancel the stage running tasks on stage rerun #3144

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

turboFei
Copy link
Member

@turboFei turboFei commented Mar 11, 2025

What changes were proposed in this pull request?

On SparkListenerStageCompleted event, check whether the shuffle fetch failure reported in the stage, if that, cancel the running tasks due celeborn client will rerun the whole stage.

Why are the changes needed?

If the task failed due to FetchFailed, dag scheduler would markStageAsFinished.
https://github.com/apache/spark/blob/3a872b7ca11faa128a2667de55f6dca13807057a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2022-L2060

But it will not cancel the running tasks in the stage.

For example, in below stage, a task failed due to fetch failure, and the stage duration is 39s.
image

However, it does not cancel the running tasks, the launched 2496 tasks keep running and the maximum task duration is 31 minutes.

image

It wastes a lot of compute resource.

For celeborn shuffle fetch failure, it will rerun the whole stage, so it is fine to cancel all the running tasks.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT & Cluster testing.

The stage terminated quickly.
image

image

@turboFei turboFei marked this pull request as draft March 11, 2025 04:53
@turboFei turboFei changed the title cancel running tasks [CELEBORN-1904] Cancel the running tasks if the stage is marked as failed due to shuffle fetch failure Mar 11, 2025
if (shuffleFetchFailureTaskIds != null) {
shuffleFetchFailureTaskIds.asScala.headOption.foreach { case taskId =>
val taskSetManager = SparkUtils.getTaskSetManager(taskId)
if (taskSetManager != null && taskSetManager.runningTasks > 0) {
Copy link
Member Author

@turboFei turboFei Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even we do not know whether the shuffleFetchFailureTaskIds related task did trigger the FetchFailedException eventually(depends on whether another task attempt is running or has been finished, see #2921).

But it should be safe to cancel the running tasks after checking the taskSetManager.runningTasks > 0.

@turboFei turboFei marked this pull request as ready for review March 11, 2025 10:05
@turboFei
Copy link
Member Author

cc @FMX @RexXiong @pan3793

@turboFei turboFei changed the title [CELEBORN-1904] Cancel the running tasks if the stage is marked as failed due to shuffle fetch failure [CELEBORN-1904] Cancel the running tasks if the stage need to be rerun Mar 12, 2025
@turboFei turboFei changed the title [CELEBORN-1904] Cancel the running tasks if the stage need to be rerun [CELEBORN-1904] Cancel the stage running tasks on stage rerun Mar 12, 2025
@turboFei turboFei requested a review from mridulm March 12, 2025 04:35
@turboFei
Copy link
Member Author

cc @SteNicholas

@turboFei turboFei requested review from onebox-li and cxzl25 March 12, 2025 04:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant