-
Notifications
You must be signed in to change notification settings - Fork 28.9k
Add base session created in SparkConnectService #52895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
c2df6a1 to
88e521e
Compare
.../server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
Show resolved
Hide resolved
| */ | ||
| def initializeBaseSession(sc: SparkContext): Unit = synchronized { | ||
| if (baseSession.isEmpty) { | ||
| baseSession = Some(SparkSession.builder().sparkContext(sc).getOrCreate()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please call newSession() on this session. The session returned by getOrCreate() is either an existing session, or will be accessible to others. It can be tampered with, and we should avoid that.
| return | ||
| } | ||
|
|
||
| sessionManager.initializeBaseSession(sc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now, at some point we should consider making the initialisation logic of connect less singleton heavy so we can pass the SparkContext as a constructor parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but session manager is a class object. what difference does it make for SparkConnectService to be a class object too
| * Initialize the base SparkSession from the provided SparkContext. | ||
| * This should be called once during SparkConnectService startup. | ||
| */ | ||
| def initializeBaseSession(sc: SparkContext): Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can drop synchronized here...
|
|
||
| private def newIsolatedSession(): SparkSession = { | ||
| val active = SparkSession.active | ||
| if (active.sparkContext.isStopped) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@garlandz-db can you figure out why this branch is here. We may have to recreate the session if this is an actual problem...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the original pr: #43701 by Kent Yao. if the spark context is stopped then active.newSession() would throw an exception
org.apache.spark.SparkException: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
my guess: spark cluster prob isnt useful but technically you can call spark connect apis still. so we can create a valid spark session and continue handling the rpc.
however our fix is tangential to that error. we do not need to use active in this case.
88e521e to
6f70e4f
Compare
.../server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
Outdated
Show resolved
Hide resolved
…/service/SparkConnectSessionManager.scala
What changes were proposed in this pull request?
This PR makes SparkConnectService rely on its own SparkSession that is private and only intended for copying session configs to create new Sessions
Why are the changes needed?
The default session can get cleaned up in which case the SparkConnectService cannot recover as session creation fails on subsequent rpcs
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added basic testing
Was this patch authored or co-authored using generative AI tooling?
Yes