Skip to content

Conversation

@garlandz-db
Copy link
Contributor

@garlandz-db garlandz-db commented Nov 5, 2025

What changes were proposed in this pull request?

This PR makes SparkConnectService rely on its own SparkSession that is private and only intended for copying session configs to create new Sessions

Why are the changes needed?

The default session can get cleaned up in which case the SparkConnectService cannot recover as session creation fails on subsequent rpcs

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added basic testing

Was this patch authored or co-authored using generative AI tooling?

Yes

*/
def initializeBaseSession(sc: SparkContext): Unit = synchronized {
if (baseSession.isEmpty) {
baseSession = Some(SparkSession.builder().sparkContext(sc).getOrCreate())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please call newSession() on this session. The session returned by getOrCreate() is either an existing session, or will be accessible to others. It can be tampered with, and we should avoid that.

return
}

sessionManager.initializeBaseSession(sc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for now, at some point we should consider making the initialisation logic of connect less singleton heavy so we can pass the SparkContext as a constructor parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but session manager is a class object. what difference does it make for SparkConnectService to be a class object too

* Initialize the base SparkSession from the provided SparkContext.
* This should be called once during SparkConnectService startup.
*/
def initializeBaseSession(sc: SparkContext): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop synchronized here...


private def newIsolatedSession(): SparkSession = {
val active = SparkSession.active
if (active.sparkContext.isStopped) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garlandz-db can you figure out why this branch is here. We may have to recreate the session if this is an actual problem...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original pr: #43701 by Kent Yao. if the spark context is stopped then active.newSession() would throw an exception

 org.apache.spark.SparkException: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

my guess: spark cluster prob isnt useful but technically you can call spark connect apis still. so we can create a valid spark session and continue handling the rpc.

however our fix is tangential to that error. we do not need to use active in this case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants