-
Notifications
You must be signed in to change notification settings - Fork 81
Support aqe #1001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Support aqe #1001
Conversation
Co-authored-by: Pengyu Hou <[email protected]> Signed-off-by: Abby Whittier <[email protected]>
Spark tests down from 25->15 minutes, that's a 40% reduction 🤯 |
That is awesome!! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment, it looks good overall. Thanks Abby!
df | ||
val df = sparkSession.sql(query) | ||
// if aqe auto coalesce is disabled, apply manual coalesce | ||
val finalDf = if (!sparkSession.conf.get("spark.sql.adaptive.coalescePartitions.enabled", "true").toBoolean) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use a constant or a variable to save the result of sparkSession.conf.get("spark.sql.adaptive.coalescePartitions.enabled", "true").toBoolean
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like there is already useAqeRoute
, we could use something similar to refactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that's a good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could overwrite it for some testing too. I still want to validate that this does something useful in a production environment generally before more investment though
|
||
def sql(query: String): DataFrame = { | ||
val partitionCount = sparkSession.sparkContext.getConf.getInt("spark.default.parallelism", 1000) | ||
val autoCoalesceEnabled = sparkSession.conf.get("spark.sql.adaptive.coalescePartitions.enabled", "true").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we default this to false if it is not set to match to the existing default behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea is to change the default behavior since it's much more performant in most cases
|
||
logger.info( | ||
s"\n----[Running query coalesced into at most $partitionCount partitions]----\n$query\n----[End of Query]----\n\n Query call path (not an error stack trace): \n$stackTraceStringPretty \n\n --------") | ||
if (!autoCoalesceEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] swap the if else branches to do the if enabled case first; i find code easier to read without explicit negations:)
Same with the val finalDf
if-else below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a common style in this code base, ex: if (!tableExists(tableName)) return Seq.empty[String]
which I chose to follow (I agree it's a bit wonky)
(df.count(), 1) | ||
} | ||
val useAqeRoute = sparkSession.conf.getOption("spark.sql.adaptive.enabled").contains("true") && | ||
sparkSession.conf.getOption("spark.sql.adaptive.coalescePartitions.enabled").contains("true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional "is AQE on" logic here check for both config options while the logic in def sql()
above only checks for spark.sql.adaptive.coalescePartitions.enabled
?
It also might be a good idea to have a common helper function to abstract away the "is AQE on" logic.
stats: Option[DfStats], | ||
sortByCols: Seq[String] = Seq.empty, | ||
partitionCols: Seq[String] = Seq.empty): Unit = { | ||
// get row count and table partition count statistics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to restructure this code to minimize diff and thus reviewer mental load to keep track of things? Can we do something like :
if (useAqeRoute) {
write_into_df_the_simple_way()
return
}
The_rest_of_the_non_aqe_code_as_is()
So the The_rest_of_the_non_aqe_code_as_is() code still remains on the same indentation level as is and wouldn't show up as diff in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We eventually branch into the same code below, not sure how to make this cleaner while keeping that. I definitely emphasize that this diff rendered weirdly
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5", | ||
"scala.collection.immutable.ArraySeq$ofRef", | ||
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow" | ||
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an inline comment that these registrations are needed for AQE (that's just my guess on why they are here:))?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually more generic than that, AQE uses it, but some of the SQL expressions will output this too. I think this matches the current file organization.
A separate PR/github issues could be cloning this and adding more explanations to the class
https://github.com/apache/spark/blob/dc687d4c83b877e90c8dc03fb88f13440d4ae911/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala#L575
val shuffleParallelism = Math.max(dailyFileCount * nonZeroTablePartitionCount, minWriteShuffleParallelism) | ||
val saltCol = "random_partition_salt" | ||
val saltedDf = df.withColumn(saltCol, round(rand() * (dailyFileCount + 1))) | ||
val sortedDf = df.sortWithinPartitions(sortByCols.map(col).toSeq: _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we ignoring the partitionCols
that users may pass in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, this is kind of a weird one, but iirc, and it's been awhile, including them actually breaks the table schema assumptions and fails the test.
We are essentially relying on AQE as well as the writer plugin to do our repartitioning into the correct partitions. The catalyst engine rewrites this entire section to do the repartition before the sort iirc, making us know the partition columns are all equal to each other, and therefore we don't sort them here. Including it changes the plan in such a way that it does not match the previous chronon plan. Maybe this is a catalyst bug or something, but I'm pretty sure I ran into the same issue as stripe where the results would sometimes be out of order until I removed that.
Summary
Testing out AQE over manual optimizing from the spark 2 era. This has proven somewhat ineffective at Netflix where we can have a wide variety of shuffle partition sizes through one Chronon job and had trouble tuning
coalesce
parameter. There were a few key dataframe/sql operation rewrites as well.As a side benefit, this made tests much faster.
Why / Goal
Get faster performance and have an easier time tuning coalesce.
Test Plan
Reviewers