Skip to content

Commit fb16a1e

Browse files
[SPARK-54197][K8S] Improve ExecutorsPodsLifecycleManager not to request to delete if deletionTimestamp exists
### What changes were proposed in this pull request? The current code handling deletion of Failed or Succeeded driver Pods is calling the Kubernetes API to delete objects until either the Kubelet as started the termination the Pod (the status of the object is terminating). However, depending on configuration, the ExecutorPodsLifecycleManager loop might run multiple times before the Kubelet starts the deletion of the Pod object, resulting in un-necessary DELETE calls to the Kubernetes API, which are particularly expensive since they are served from Etcd. Following the Kubernetes API specifications in https://kubernetes.io/docs/reference/using-api/api-concepts/ > When a client first sends a delete to request the removal of a resource, the .metadata.deletionTimestamp is set to the current time. Once the .metadata.deletionTimestamp is set, external controllers that act on finalizers may start performing their cleanup work at any time, in any order. we can assume that whenever the deletionTimestamp is set on a Pod, this will be eventually terminated without the need of additional DELETE calls. ### Why are the changes needed? This change is required to remove the need of redundant API calls agains the Kubernetes API that at scale might lead to excessive load against Etcd. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This patch includes unit-tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52898 Closes #52902 from dongjoon-hyun/driver-do-not-call-delete-for-terminating-pods-master. Lead-authored-by: Dongjoon Hyun <[email protected]> Co-authored-by: Andrea Tosatto <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 3b368ca) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 2d37bb0 commit fb16a1e

File tree

2 files changed

+39
-4
lines changed

2 files changed

+39
-4
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,16 @@ private[spark] class ExecutorPodsLifecycleManager(
201201
private def removeExecutorFromK8s(execId: Long, updatedPod: Pod): Unit = {
202202
Utils.tryLogNonFatalError {
203203
if (shouldDeleteExecutors) {
204-
// Get pod before deleting it, we can skip deleting if pod is already deleted so that
205-
// we do not send too many requests to api server.
204+
if (updatedPod.getMetadata.getDeletionTimestamp != null) {
205+
// Do not call the Kubernetes API if the deletion timestamp
206+
// is already set on the updatedPod object.
207+
// This is removing the need for un-necessary API roundtrips
208+
// against the Kubernetes API.
209+
return
210+
}
211+
// Get pod before deleting it, we can skip deleting if pod is already deleted
212+
// or has already the deletion timestamp set so that we do not send
213+
// too many requests to apu server.
206214
// If deletion failed on a previous try, we can try again if resync informs us the pod
207215
// is still around.
208216
// Delete as best attempt - duplicate deletes will throw an exception but the end state
@@ -211,7 +219,9 @@ private[spark] class ExecutorPodsLifecycleManager(
211219
.pods()
212220
.inNamespace(namespace)
213221
.withName(updatedPod.getMetadata.getName)
214-
if (podToDelete.get() != null) {
222+
223+
if (podToDelete.get() != null &&
224+
podToDelete.get.getMetadata.getDeletionTimestamp == null) {
215225
podToDelete.delete()
216226
}
217227
} else if (!inactivatedPods.contains(execId) && !isPodInactive(updatedPod)) {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import java.util.function.UnaryOperator
2020

2121
import scala.collection.mutable
2222

23-
import io.fabric8.kubernetes.api.model.Pod
23+
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
2424
import io.fabric8.kubernetes.client.KubernetesClient
2525
import io.fabric8.kubernetes.client.dsl.PodResource
2626
import org.mockito.{Mock, MockitoAnnotations}
@@ -219,6 +219,31 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
219219
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
220220
}
221221

222+
test("Don't delete pod from K8s if deletionTimestamp is already set.") {
223+
// Create a failed pod with deletionTimestamp already in the past
224+
val basePod = failedExecutorWithoutDeletion(1)
225+
val failedPodWithDeletionTimestamp = new PodBuilder(basePod)
226+
.editOrNewMetadata()
227+
.withDeletionTimestamp("1970-01-01T00:00:00Z")
228+
.endMetadata()
229+
.build()
230+
231+
val mockPodResource = mock(classOf[PodResource])
232+
namedExecutorPods.put("spark-executor-1", mockPodResource)
233+
when(mockPodResource.get()).thenReturn(failedPodWithDeletionTimestamp)
234+
235+
snapshotsStore.updatePod(failedPodWithDeletionTimestamp)
236+
snapshotsStore.notifySubscribers()
237+
238+
// Verify executor is removed from Spark
239+
val msg = "The executor with id 1 was deleted by a user or the framework."
240+
val expectedLossReason = ExecutorExited(1, exitCausedByApp = false, msg)
241+
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
242+
243+
// Verify delete() is NOT called since deletionTimestamp is already set
244+
verify(mockPodResource, never()).delete()
245+
}
246+
222247
private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int): String = {
223248
val reason = Option(failedPod.getStatus.getReason)
224249
val message = Option(failedPod.getStatus.getMessage)

0 commit comments

Comments
 (0)