-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54219][CORE] Driver can't create thread causing ContextCleaner stuck and stuck stop process #52919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
… stuck and stuck stop process
|
Since @Ngone51 is the author of. #28924 and @cloud-fan @holdenk reviewed that pr. WDYT and are there any other better solutions? |
|
We meet 3 times in our tasks |
| log"${MDC(ERROR, e.getMessage)}", e) | ||
| )(ThreadUtils.sameThread) | ||
| if (blocking) { | ||
| // the underlying Futures will timeout anyway, so it's safe to use infinite timeout here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this comment is not accurate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, this is indeed incorrect in extreme cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be helpful if the comment is replaced (instead of removing incorrect one) with the explanation why infinite timeout can't be used here.
Ngone51
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix. But I just wonder why Awaitable.result() is not interrupted by cleaningThread.interrupt()? It should be interruptible accodring to the doc:
/**
* Await and return the result (of type `T`) of this `Awaitable`.
*
* '''''This method should not be called directly; use [[Await.result]] instead.'''''
*
* @param atMost
* maximum wait time, which may be negative (no waiting is done),
* [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive
* duration
* @return the result value if the `Awaitable` is completed within the specific maximum wait time
* @throws InterruptedException if the current thread is interrupted while waiting
* @throws TimeoutException if after waiting for the specified time this `Awaitable` is still not ready
* @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]]
*/
@throws(classOf[TimeoutException])
@throws(classOf[InterruptedException])
def result(atMost: Duration)(implicit permit: CanAwait): T
cleaningThread.interrupt() need lock but keepCleaning() holding the lock and wait reply |
Ah, I see. |
| // the underlying Futures will timeout anyway, so it's safe to use infinite timeout here | ||
| RpcUtils.INFINITE_TIMEOUT.awaitResult(future) | ||
| if (cleanBlockBlockingTimeout.isDefined) { | ||
| new RpcTimeout(FiniteDuration(cleanBlockBlockingTimeout.get, TimeUnit.SECONDS), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dont' we have to call awaitResult(future) after creating RpcTimeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh...missing the code...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| if (cleanBlockBlockingTimeout.isDefined) { | ||
| new RpcTimeout(FiniteDuration(cleanBlockBlockingTimeout.get, TimeUnit.SECONDS), | ||
| CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT.key) | ||
| } else { | ||
| // the underlying Futures will timeout anyway, so it's safe to use infinite timeout here | ||
| RpcUtils.INFINITE_TIMEOUT.awaitResult(future) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you extract this block into a common function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also add a unit test for the change? For example, a unit test for the common function is ok if it's hard to add end-2-end test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
| } | ||
|
|
||
| private def handleRemoveBlockBlockingTimeout(future: Future[_]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
| private def handleRemoveBlockBlockingTimeout(future: Future[_]): Unit = { | |
| private def waitForBlockRemoval(future: Future[_]): Unit = { |
| // Normally, the underlying Futures will timeout anyway, | ||
| // so it's safe to use infinite timeout here. In extreme case, | ||
| // Driver can't crease thread handling this rpc, here will be stuck forever. | ||
| RpcUtils.INFINITE_TIMEOUT.awaitResult(future) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still want this case? It look like a bug to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea,so what do you think is a good default value for CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
120s. We can make it fallback to Network.NETWORK_TIMEOUT.
|
GA passed, docker integration test seems not related to this change. @Ngone51 |
|
@AngersZhuuuu Could you retrigger the failed job? |
Looks like still a flaky test |
GA passed |
| val waitBlockRemovalTimeout = | ||
| RpcTimeout( | ||
| conf, | ||
| Seq(CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small problem of this constructor is that the timeoutProp could be NETWORK_TIMEOUT.key when CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT is not set. And it is not good for debugging when timeout exception thrown with a non-specific timeoutProp.
My original proposal with fallback conf is to do like this:
private[spark] val CLEANER_REFERENCE_TRACKING_BLOCKING_TIMEOUT =
ConfigBuilder("spark.cleaner.referenceTracking.blocking.timeout")
.version("4.1.0")
.fallbackConf(Network.NETWORK_TIMEOUT)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
What changes were proposed in this pull request?
SparkContext stop stuck on ContextCleaner
ContextCleaner stop() will wait lock
, but one call on keepCleaning() hold the lock
BlockManager stuck on removeBroadcast RpcUtils.INFINITE_TIMEOUT.awaitResult(future) 【PR #28924 change here】
For such case only reason should be RPC was missing handling
Driver OOM or A thread leak in yarn nm prevents the creation of new threads to handle RPC.
For such case we can add a customized wait timeout here to avoid forever stuck
then whole process stuck on here
Why are the changes needed?
Avoid app stuck
Does this PR introduce any user-facing change?
No
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No