Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Nov 6, 2025

What changes were proposed in this pull request?

SparkContext stop stuck on ContextCleaner

25/11/05 18:12:29 ERROR [shutdown-hook-0] ThreadUtils: 
14 Driver BLOCKED Blocked by Thread 60 Lock(org.apache.spark.ContextCleaner@1726738661})
  org.apache.spark.ContextCleaner.stop(ContextCleaner.scala:145)
  org.apache.spark.SparkContext.$anonfun$stop$9(SparkContext.scala:2094)
  org.apache.spark.SparkContext.$anonfun$stop$9$adapted(SparkContext.scala:2094)
  org.apache.spark.SparkContext$$Lambda$5309/807013918.apply(Unknown Source)
  scala.Option.foreach(Option.scala:407)
  org.apache.spark.SparkContext.$anonfun$stop$8(SparkContext.scala:2094)
  org.apache.spark.SparkContext$$Lambda$5308/1445921225.apply$mcV$sp(Unknown Source)
  org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1512)
  org.apache.spark.SparkContext.stop(SparkContext.scala:2094)
  org.apache.spark.SparkContext.stop(SparkContext.scala:2050)
  org.apache.spark.sql.SparkSession.stop(SparkSession.scala:718)
  com.shopee.data.content.ods.live_performance.Main$.main(Main.scala:62)
  com.shopee.data.content.ods.live_performance.Main.main(Main.scala)
  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  java.lang.reflect.Method.invoke(Method.java:498)
  org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:751) 

ContextCleaner stop() will wait lock

def stop(): Unit = {
  stopped = true
  // Interrupt the cleaning thread, but wait until the current task has finished before
  // doing so. This guards against the race condition where a cleaning thread may
  // potentially clean similarly named variables created by a different SparkContext,
  // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
  synchronized {
    cleaningThread.interrupt()
  }
  cleaningThread.join()
  periodicGCService.shutdown()
}

, but one call on keepCleaning() hold the lock

25/11/05 18:12:29 ERROR [shutdown-hook-0] ThreadUtils: 
60 Spark Context Cleaner TIMED_WAITING Monitor(org.apache.spark.ContextCleaner@1726738661})
  sun.misc.Unsafe.park(Native Method)
  java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
  java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
  java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
  scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248)
  scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
  scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
  org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:294)
  org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
  org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:194)
  org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:351)
  org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
  org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:78)
  org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:254)
  org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:204)
  org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195)
  org.apache.spark.ContextCleaner$$Lambda$1178/1994584033.apply(Unknown Source)
  scala.Option.foreach(Option.scala:407)
  org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195) => holding Monitor(org.apache.spark.ContextCleaner@1726738661})
  org.apache.spark.ContextCleaner$$Lambda$1109/1496842179.apply$mcV$sp(Unknown Source)
  org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1474)
  org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
  org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79) 

BlockManager stuck on removeBroadcast RpcUtils.INFINITE_TIMEOUT.awaitResult(future) 【PR #28924 change here】

def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean): Unit = {
  val future = driverEndpoint.askSync[Future[Seq[Int]]](
    RemoveBroadcast(broadcastId, removeFromMaster))
  future.failed.foreach(e =>
    logWarning(s"Failed to remove broadcast $broadcastId" +
      s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
  )(ThreadUtils.sameThread)
  if (blocking) {
    // the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
    RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
  }
} 

For such case only reason should be RPC was missing handling

Driver OOM or A thread leak in yarn nm prevents the creation of new threads to handle RPC.

25/11/05 08:16:22 ERROR [metrics-paimon-push-gateway-reporter-2-thread-1] ScheduledReporter: Exception thrown from PushGatewayReporter#report. Exception was suppressed.
java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:717)
	at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1115)
	at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1388)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1416)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1400)
	at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
	at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
	at sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167)
	at io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:243)
	at io.prometheus.client.exporter.PushGateway.push(PushGateway.java:134)
	at org.apache.paimon.metrics.reporter.PushGatewayReporter.report(PushGatewayReporter.java:84)
	at com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:253)
	at com.codahale.metrics.ScheduledReporter.lambda$start$0(ScheduledReporter.java:182)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748) 

For such case we can add a customized wait timeout here to avoid forever stuck

then whole process stuck on here

Why are the changes needed?

Avoid app stuck

Does this PR introduce any user-facing change?

No

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Nov 6, 2025
@AngersZhuuuu
Copy link
Contributor Author

Since @Ngone51 is the author of. #28924 and @cloud-fan @holdenk reviewed that pr. WDYT and are there any other better solutions?

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-54219][CORE] Driver can't create thread causing ContextCleaner… [SPARK-54219][CORE] Driver can't create thread causing ContextCleaner stuck and stuck stop process Nov 6, 2025
@AngersZhuuuu
Copy link
Contributor Author

We meet 3 times in our tasks

log"${MDC(ERROR, e.getMessage)}", e)
)(ThreadUtils.sameThread)
if (blocking) {
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this comment is not accurate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this is indeed incorrect in extreme cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be helpful if the comment is replaced (instead of removing incorrect one) with the explanation why infinite timeout can't be used here.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix. But I just wonder why Awaitable.result() is not interrupted by cleaningThread.interrupt()? It should be interruptible accodring to the doc:

  /**
   * Await and return the result (of type `T`) of this `Awaitable`.
   *
   * '''''This method should not be called directly; use [[Await.result]] instead.'''''
   *
   * @param  atMost
   *         maximum wait time, which may be negative (no waiting is done),
   *         [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive
   *         duration
   * @return the result value if the `Awaitable` is completed within the specific maximum wait time
   * @throws InterruptedException     if the current thread is interrupted while waiting
   * @throws TimeoutException         if after waiting for the specified time this `Awaitable` is still not ready
   * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]]
   */
  @throws(classOf[TimeoutException])
  @throws(classOf[InterruptedException])
  def result(atMost: Duration)(implicit permit: CanAwait): T

@AngersZhuuuu
Copy link
Contributor Author

Thanks for the fix. But I just wonder why Awaitable.result() is not interrupted by cleaningThread.interrupt()? It should be interruptible accodring to the doc:

  /**
   * Await and return the result (of type `T`) of this `Awaitable`.
   *
   * '''''This method should not be called directly; use [[Await.result]] instead.'''''
   *
   * @param  atMost
   *         maximum wait time, which may be negative (no waiting is done),
   *         [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive
   *         duration
   * @return the result value if the `Awaitable` is completed within the specific maximum wait time
   * @throws InterruptedException     if the current thread is interrupted while waiting
   * @throws TimeoutException         if after waiting for the specified time this `Awaitable` is still not ready
   * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]]
   */
  @throws(classOf[TimeoutException])
  @throws(classOf[InterruptedException])
  def result(atMost: Duration)(implicit permit: CanAwait): T

cleaningThread.interrupt() need lock

def stop(): Unit = {
  stopped = true
  // Interrupt the cleaning thread, but wait until the current task has finished before
  // doing so. This guards against the race condition where a cleaning thread may
  // potentially clean similarly named variables created by a different SparkContext,
  // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
  synchronized {
    cleaningThread.interrupt()
  }
  cleaningThread.join()
  periodicGCService.shutdown()
}

but keepCleaning() holding the lock and wait reply

@Ngone51
Copy link
Member

Ngone51 commented Nov 7, 2025

but keepCleaning() holding the lock and wait reply

Ah, I see.

// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
if (cleanBlockBlockingTimeout.isDefined) {
new RpcTimeout(FiniteDuration(cleanBlockBlockingTimeout.get, TimeUnit.SECONDS),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont' we have to call awaitResult(future) after creating RpcTimeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh...missing the code...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 222 to 228
if (cleanBlockBlockingTimeout.isDefined) {
new RpcTimeout(FiniteDuration(cleanBlockBlockingTimeout.get, TimeUnit.SECONDS),
CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT.key)
} else {
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you extract this block into a common function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a unit test for the change? For example, a unit test for the common function is ok if it's hard to add end-2-end test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

private def handleRemoveBlockBlockingTimeout(future: Future[_]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
private def handleRemoveBlockBlockingTimeout(future: Future[_]): Unit = {
private def waitForBlockRemoval(future: Future[_]): Unit = {

// Normally, the underlying Futures will timeout anyway,
// so it's safe to use infinite timeout here. In extreme case,
// Driver can't crease thread handling this rpc, here will be stuck forever.
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want this case? It look like a bug to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea,so what do you think is a good default value for CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

120s. We can make it fallback to Network.NETWORK_TIMEOUT.

@AngersZhuuuu
Copy link
Contributor Author

GA passed, docker integration test seems not related to this change. @Ngone51

@Ngone51
Copy link
Member

Ngone51 commented Nov 10, 2025

@AngersZhuuuu Could you retrigger the failed job?

@AngersZhuuuu
Copy link
Contributor Author

@AngersZhuuuu Could you retrigger the failed job?

Looks like still a flaky test

[info] OracleIntegrationSuite:
[info] org.apache.spark.sql.jdbc.v2.OracleIntegrationSuite *** ABORTED *** (10 minutes, 17 seconds)
[info]   The code passed to eventually never returned normally. Attempted 597 times over 10.0002781476 minutes. Last failure message: ORA-12541: Cannot connect. No listener at host 10.1.0.115 port 33393. (CONNECTION_ID=pb1YIjjzR16CZ/+TVd67lg==)

@AngersZhuuuu
Copy link
Contributor Author

@AngersZhuuuu Could you retrigger the failed job?

GA passed

val waitBlockRemovalTimeout =
RpcTimeout(
conf,
Seq(CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
Copy link
Member

@Ngone51 Ngone51 Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small problem of this constructor is that the timeoutProp could be NETWORK_TIMEOUT.key when CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT is not set. And it is not good for debugging when timeout exception thrown with a non-specific timeoutProp.

My original proposal with fallback conf is to do like this:

private[spark] val CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT =
  ConfigBuilder("spark.cleaner.referenceTracking.blocking.timeout")
    .version("4.1.0")
    .fallbackConf(Network.NETWORK_TIMEOUT)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants