Skip to content

Conversation

abbywh
Copy link
Contributor

@abbywh abbywh commented Jun 8, 2025

Summary

Testing out AQE over manual optimizing from the spark 2 era. This has proven somewhat ineffective at Netflix where we can have a wide variety of shuffle partition sizes through one Chronon job and had trouble tuning coalesce parameter. There were a few key dataframe/sql operation rewrites as well.

As a side benefit, this made tests much faster.

Why / Goal

Get faster performance and have an easier time tuning coalesce.

Test Plan

  • Added Unit Tests
  • [ x ] Covered by existing CI
  • [ x ] Integration tested

Reviewers

@abbywh
Copy link
Contributor Author

abbywh commented Jun 8, 2025

Spark tests down from 25->15 minutes, that's a 40% reduction 🤯

@pengyu-hou
Copy link
Collaborator

Spark tests down from 25->15 minutes, that's a 40% reduction 🤯

That is awesome!!

Copy link
Collaborator

@pengyu-hou pengyu-hou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment, it looks good overall. Thanks Abby!

df
val df = sparkSession.sql(query)
// if aqe auto coalesce is disabled, apply manual coalesce
val finalDf = if (!sparkSession.conf.get("spark.sql.adaptive.coalescePartitions.enabled", "true").toBoolean) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use a constant or a variable to save the result of sparkSession.conf.get("spark.sql.adaptive.coalescePartitions.enabled", "true").toBoolean?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like there is already useAqeRoute, we could use something similar to refactor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's a good idea

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could overwrite it for some testing too. I still want to validate that this does something useful in a production environment generally before more investment though

@abbywh abbywh marked this pull request as ready for review July 21, 2025 21:53

def sql(query: String): DataFrame = {
val partitionCount = sparkSession.sparkContext.getConf.getInt("spark.default.parallelism", 1000)
val autoCoalesceEnabled = sparkSession.conf.get("spark.sql.adaptive.coalescePartitions.enabled", "true").toBoolean

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we default this to false if it is not set to match to the existing default behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea is to change the default behavior since it's much more performant in most cases


logger.info(
s"\n----[Running query coalesced into at most $partitionCount partitions]----\n$query\n----[End of Query]----\n\n Query call path (not an error stack trace): \n$stackTraceStringPretty \n\n --------")
if (!autoCoalesceEnabled) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] swap the if else branches to do the if enabled case first; i find code easier to read without explicit negations:)

Same with the val finalDf if-else below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a common style in this code base, ex: if (!tableExists(tableName)) return Seq.empty[String] which I chose to follow (I agree it's a bit wonky)

(df.count(), 1)
}
val useAqeRoute = sparkSession.conf.getOption("spark.sql.adaptive.enabled").contains("true") &&
sparkSession.conf.getOption("spark.sql.adaptive.coalescePartitions.enabled").contains("true")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional "is AQE on" logic here check for both config options while the logic in def sql() above only checks for spark.sql.adaptive.coalescePartitions.enabled?

It also might be a good idea to have a common helper function to abstract away the "is AQE on" logic.

stats: Option[DfStats],
sortByCols: Seq[String] = Seq.empty,
partitionCols: Seq[String] = Seq.empty): Unit = {
// get row count and table partition count statistics

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to restructure this code to minimize diff and thus reviewer mental load to keep track of things? Can we do something like :

if (useAqeRoute) {
   write_into_df_the_simple_way()
   return
}

The_rest_of_the_non_aqe_code_as_is()

So the The_rest_of_the_non_aqe_code_as_is() code still remains on the same indentation level as is and wouldn't show up as diff in the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We eventually branch into the same code below, not sure how to make this cleaner while keeping that. I definitely emphasize that this diff rendered weirdly

"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5",
"scala.collection.immutable.ArraySeq$ofRef",
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow"
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an inline comment that these registrations are needed for AQE (that's just my guess on why they are here:))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually more generic than that, AQE uses it, but some of the SQL expressions will output this too. I think this matches the current file organization.

A separate PR/github issues could be cloning this and adding more explanations to the class
https://github.com/apache/spark/blob/dc687d4c83b877e90c8dc03fb88f13440d4ae911/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala#L575

val shuffleParallelism = Math.max(dailyFileCount * nonZeroTablePartitionCount, minWriteShuffleParallelism)
val saltCol = "random_partition_salt"
val saltedDf = df.withColumn(saltCol, round(rand() * (dailyFileCount + 1)))
val sortedDf = df.sortWithinPartitions(sortByCols.map(col).toSeq: _*)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we ignoring the partitionCols that users may pass in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, this is kind of a weird one, but iirc, and it's been awhile, including them actually breaks the table schema assumptions and fails the test.

We are essentially relying on AQE as well as the writer plugin to do our repartitioning into the correct partitions. The catalyst engine rewrites this entire section to do the repartition before the sort iirc, making us know the partition columns are all equal to each other, and therefore we don't sort them here. Including it changes the plan in such a way that it does not match the previous chronon plan. Maybe this is a catalyst bug or something, but I'm pretty sure I ran into the same issue as stripe where the results would sometimes be out of order until I removed that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants