Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Add warning log - newly created Hyperspace context for different Spark Session #374

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

sezruby
Copy link
Collaborator

@sezruby sezruby commented Mar 4, 2021

What is the context for this pull request?

  • Tracking Issue: n/a
  • Parent Issue: n/a
  • Dependencies: n/a

What changes were proposed in this pull request?

Currently, Hyperspace keeps only one HyperspaceContext for each Hyperspace object.
So if an app uses multiple concurrent Spark sessions using one Hyperspace object & one thread, a new Hyperspace context is continuously created for each request.

As Hyperspace object is not thread-safe, we tried to force - "One client thread" should use only "one Spark Session", but we can't because it's possible to access from multiple threads with the same SparkSession (e.g. delta lake - broadcast join query execution).

So for now, we left some warning log in that case, so that users could notice it might an ineffective use of Hyperspace.

Does this PR introduce any user-facing change?

Yes, warning log can be left if a Hyperspace object is accessed with different Spark Session.

How was this patch tested?

tested on local env

contexts.put(spark, (Thread.currentThread().getId, new HyperspaceContext(spark)))
}
} else if (ctx.get._1 != Thread.currentThread().getId) {
throw HyperspaceException(s"Hyperspace does not support multiple threads " +
Copy link
Collaborator Author

@sezruby sezruby Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imback82
I guess this is too restricted.. How about writing some warning log and creating a new context in this case?
Seems checkAnswer API uses a different thread internally.

src/main/scala/com/microsoft/hyperspace/Hyperspace.scala Outdated Show resolved Hide resolved
src/main/scala/com/microsoft/hyperspace/Hyperspace.scala Outdated Show resolved Hide resolved
}
} else if (ctx.get._1 != Thread.currentThread().getId) {
logWarning(s"Hyperspace is not thread safe for threads using one Spark session. " +
"Please be aware of it. Current thread id: ${Thread.currentThread().getId}, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the implication of potentially sharing the HyperspaceContext by threads if we already know that it's not thread safe? Isn't it better to fail faster here instead of failing somewhere else?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAnswer won't work if we throw an exception here. No idea how to fix it and one possible scenario:

  • setup & check in main thread
  • run with a worker thread

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you still blocked on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gentle ping

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think throwing exception is too restrictive. Users might not know how to fix the issue ..?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it's user's responsibility to use the API which is not thread safe.
Different thread id not always means they are running concurrently..
I think we need documentation rather than restricting the use case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different thread id not always means they are running concurrently..

But the new structure allows multiple threads accessing the object concurrently? The existing implementation makes sure only one to one mapping via thread local. So, I would keep this condition as it is, and if there are demands for accessing the hyperspace object from multiple threads, we should think about making it thread-safe instead of documenting.

Plus, I think random failures are much worse and hard to debug, esp. failures caused by the thread safety.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on how difficult analyzing failures from concurrent threads.
I added the exception change, then this can be a breaking change for some use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added breaking change label so we can document it in the release notes.

Copy link
Contributor

@imback82 imback82 Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, even if this is a breaking change, it's alerting the user a possible misuse where hyperspace context is being recreated, so I think it's worthwhile.

src/main/scala/com/microsoft/hyperspace/Hyperspace.scala Outdated Show resolved Hide resolved
@sezruby sezruby added bug Something isn't working enhancement New feature or request labels Mar 7, 2021
@sezruby sezruby self-assigned this Mar 7, 2021
@sezruby sezruby removed the bug Something isn't working label Mar 7, 2021
apoorvedave1
apoorvedave1 previously approved these changes Mar 8, 2021
Copy link
Contributor

@apoorvedave1 apoorvedave1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks 👍

@imback82
Copy link
Contributor

Could you update the PR description as it seems out of date? Basically, we are preventing hyperspace context from being re-created in certain scenarios, and making sure too many contexts are not created.

spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
val resultEnabled = dfWithHyperspaceEnabled.collect().toSeq.toSet
assert(resultEnabled.equals(resultDisabled))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change still needed? if so, why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In checkAnswer, a new thread tries to build the query plan again - so it causes the exception.
No idea why other checkAnswers have no problem with it..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh interesting. could you share the code where a new thread is spawned in checkAnswer?

Copy link
Collaborator Author

@sezruby sezruby Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in checkAnswer

 val sparkAnswer = try df.collect().toSeq catch {
      case e: Exception =>
        val errorMessage =
          s"""
            |Exception thrown while executing query:
            |${df.queryExecution}
            |== Exception ==
            |$e
            |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
          """.stripMargin
        return Some(errorMessage)
    }

It's because of broadcast join..

[info]          at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
[info]          at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:387)
[info]          at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:144)
[info]          at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
[info]          at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]          at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
[info]          at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
[info]          at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117)
[info]          at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:211)
[info]          at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101)

failed thread stack:

..
[info]          at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
[info]          at com.microsoft.hyperspace.index.rules.FilterIndexRule$.apply(FilterIndexRule.scala:52)
[info]          at com.microsoft.hyperspace.index.rules.FilterIndexRule$.apply(FilterIndexRule.scala:38)
[info]          at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:87)
[info]          at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
[info]          at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
[info]          at scala.collection.immutable.List.foldLeft(List.scala:89)
[info]          at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:84)
[info]          at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:76)
[info]          at scala.collection.immutable.List.foreach(List.scala:392)
[info]          at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
[info]          at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
[info]          at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
[info]          at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
[info]          at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
[info]          at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
[info]          at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
[info]          at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3359)
[info]          at org.apache.spark.sql.Dataset.collect(Dataset.scala:2782)
[info]          at org.apache.spark.sql.delta.PartitionFiltering.filesForScan(PartitionFiltering.scala:40)
[info]          at org.apache.spark.sql.delta.PartitionFiltering.filesForScan$(PartitionFiltering.scala:27)
[info]          at org.apache.spark.sql.delta.Snapshot.filesForScan(Snapshot.scala:52)
[info]          at org.apache.spark.sql.delta.files.TahoeLogFileIndex.matchingFiles(TahoeFileIndex.scala:140)
[info]          at org.apache.spark.sql.delta.files.TahoeFileIndex.listFiles(TahoeFileIndex.scala:56)
[info]          at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:193)
[info]          at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:190)
[info]          at org.apache.spark.sql.execution.FileSourceScanExec.updateDriverMetrics(DataSourceScanExec.scala:529)
[info]          at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:307)
[info]          at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
[info]          at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
[info]          at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
[info]          at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
[info]          at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)

Copy link
Collaborator Author

@sezruby sezruby Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we shouldn't throw the exception .. ? 😧😧😧😧😧😧😧

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, then we have to think about this feature again, since it may hit into thread-safety issue even if the user didn't intend it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, this could have gone unnoticed if we didn't throw the exception?

Copy link
Collaborator Author

@sezruby sezruby Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems so. How about ThreadLocal[HashMap[Session, Context]]? 😁

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that would allow multiple threads share the same Hyperspace context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, basically in Delta code, it does allFiles.toDF() and eventually that dataframe is collect-ed, which triggers our optimizer. In our optimizer, we have an extractor that calls Hyperspace.getContext(spark).sourceProviderManager in ExtractRelation. So instead of case filter @ Filter(condition: Expression, ExtractRelation(relation)), we could do case filter @ Filter if isSupportedRelation, and I think we can avoid the issue.

But, I think this is still hacky and the right way to fix it seems like making the context thread-safe.

val updatedPlan = dfWithHyperspaceEnabled.queryExecution.optimizedPlan
val resultEnabled = dfWithHyperspaceEnabled.collect().toSeq.sortBy(_.hashCode())
assert(!basePlan.equals(updatedPlan))
assert(resultEnabled.equals(resultDisabled))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After executing collect(), checkAnswer doesn't throw the exception 😧

spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
val resultEnabled = dfWithHyperspaceEnabled.collect().toSeq.toSet
assert(resultEnabled.equals(resultDisabled))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh interesting. could you share the code where a new thread is spawned in checkAnswer?

s"Current limit: ${contexts.size}")
}
val newCtx = new HyperspaceContext(spark)
contexts.put(spark, (threadId, newCtx))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the return value of put and throw an exception if it returns Some? There could be multiple threads sharing the same spark session that hits if (!ctx.isDefined) at the same time.

@sezruby sezruby changed the title Support concurrent Spark sessions Add warning log - newly created Hyperspace context for different Spark Session Apr 12, 2021
@@ -175,6 +176,10 @@ object Hyperspace extends ActiveSparkSession {
// the one HyperspaceContext is using because Hyperspace depends on the
// session's properties such as configs, etc.
context.set(new HyperspaceContext(spark))
if (!ctx.spark.equals(spark)) {
Copy link
Collaborator Author

@sezruby sezruby Apr 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imback82 I'm thinking of this change & what can we do about this.
I think the problem is - we access using the active spark session when optimizing query plan, but it can be different from the session which is used to create Hyperspace object.
So.. how about

  1. not resetting hyperspace & just using the previous one even with different active spark session.
  2. if a user wants to use another configs/spark session, they need to redefine Hyperspace object.

I think this is clearer behavior because we do val hs = new Hyperspace(spark)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. not resetting hyperspace & just using the previous one even with different active spark session.

Will it be ok to silently use a different active session?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants