-
Notifications
You must be signed in to change notification settings - Fork 28.9k
Add base session created in SparkConnectService #52895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal | |
|
|
||
| import com.google.common.cache.CacheBuilder | ||
|
|
||
| import org.apache.spark.{SparkEnv, SparkSQLException} | ||
| import org.apache.spark.{SparkContext, SparkEnv, SparkSQLException} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.LogKeys.{INTERVAL, SESSION_HOLD_INFO} | ||
| import org.apache.spark.sql.classic.SparkSession | ||
|
|
@@ -39,6 +39,9 @@ import org.apache.spark.util.ThreadUtils | |
| */ | ||
| class SparkConnectSessionManager extends Logging { | ||
|
|
||
| // Base SparkSession created from the SparkContext, used to create new isolated sessions | ||
| @volatile private var baseSession: Option[SparkSession] = None | ||
|
|
||
| private val sessionStore: ConcurrentMap[SessionKey, SessionHolder] = | ||
| new ConcurrentHashMap[SessionKey, SessionHolder]() | ||
|
|
||
|
|
@@ -48,6 +51,16 @@ class SparkConnectSessionManager extends Logging { | |
| .maximumSize(SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_CLOSED_SESSIONS_TOMBSTONES_SIZE)) | ||
| .build[SessionKey, SessionHolderInfo]() | ||
|
|
||
| /** | ||
| * Initialize the base SparkSession from the provided SparkContext. | ||
| * This should be called once during SparkConnectService startup. | ||
| */ | ||
| def initializeBaseSession(sc: SparkContext): Unit = { | ||
| if (baseSession.isEmpty) { | ||
| baseSession = Some(SparkSession.builder().sparkContext(sc).create()) | ||
| } | ||
| } | ||
|
|
||
| /** Executor for the periodic maintenance */ | ||
| private val scheduledExecutor: AtomicReference[ScheduledExecutorService] = | ||
| new AtomicReference[ScheduledExecutorService]() | ||
|
|
@@ -332,14 +345,8 @@ class SparkConnectSessionManager extends Logging { | |
| logDebug("Finished periodic run of SparkConnectSessionManager maintenance.") | ||
| } | ||
|
|
||
| private def newIsolatedSession(): SparkSession = { | ||
| val active = SparkSession.active | ||
| if (active.sparkContext.isStopped) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @garlandz-db can you figure out why this branch is here. We may have to recreate the session if this is an actual problem...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the original pr: #43701 by Kent Yao. if the spark context is stopped then active.newSession() would throw an exception my guess: spark cluster prob isnt useful but technically you can call spark connect apis still. so we can create a valid spark session and continue handling the rpc. however our fix is tangential to that error. we do not need to use active in this case. |
||
| assert(SparkSession.getDefaultSession.nonEmpty) | ||
| SparkSession.getDefaultSession.get.newSession() | ||
| } else { | ||
| active.newSession() | ||
| } | ||
| private def newIsolatedSession(): SparkSession = synchronized { | ||
| baseSession.get.newSession() | ||
hvanhovell marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| private def validateSessionCreate(key: SessionKey): Unit = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now, at some point we should consider making the initialisation logic of connect less singleton heavy so we can pass the SparkContext as a constructor parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but session manager is a class object. what difference does it make for SparkConnectService to be a class object too