Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIHADOOP-39635: Add new configuration parameters heuristic #463

Merged
merged 10 commits into from
Dec 21, 2018

Conversation

edwinalu
Copy link

Add new configuration parameters heuristic, which will list the current values for configuration parameters, and also recommended new values. To determine new values, it will check for:

execution memory spill: this will slow down the application, so try to prevent this by increasing partitions, increasing executor memory, or decreasing cores.
long tasks: this will slow down the application, so try to prevent this by increasing the number of partitions.
task skew: this will slow down the application, so add recommendations for making partitions more even.
OOM or GC: increase memory, increase partitions, or decrease cores, to try to avoid the error.
container killed by YARN errors: increase overhead memory.
unused executor memory, if this is much higher than max JVM used memory; either increase cores or decrease memory.
driver configuration parameters (memory and cores).

@pralabhkumar
Copy link
Contributor

#438 have my comments ,which are already resolved in this pull request

// check if executor memory can be lowered
adjustExecutorMemory()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the rational behind calling first adjustParametersForLongTasks, adjustParametersForExecutionMemorySpill, adjustParametersForGCandOOM . Is there a priority defined to fix long running task and then memory spill

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering is somewhat arbitrary, but there is a better idea of how much to increase the number of partitions for long tasks, and a reasonable estimate for how much to adjust memory/cores/partitions for execution memory spill. GC and OOM are just a guess. It is going from more exact to less exact.

The adjustment(s) for each could affect the other conditions as well (if more partitions are specified due to long tasks, then this would also help with execution memory spill, and OOM/GC).

* If so, either increase cores to make better use of the memory, or decrease executor
* memory.
*/
private def adjustExecutorMemory() = {
Copy link
Contributor

@pralabhkumar pralabhkumar Dec 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decreasing executor memory may cause more spill , how are we handling the same scenario

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's execution spill, then there shouldn't be too memory allocated. However, it makes sense to be cautious. I'll add a check for execution spill as well, and not adjust the memory in this case.

@pralabhkumar
Copy link
Contributor

@edwinalu How are you planning to integrate with Unified Architecture / TuneIn

@edwinalu
Copy link
Author

edwinalu commented Dec 4, 2018

@pralabhkumar , yes, I am not sure what would be the best way to integrate with TuneIn. When we discussed a few weeks ago, the idea was to merge the heuristic first. Let's discuss at the meeting. It would be good to keep in sync.

while (iter.hasNext && continueFn(modified)) {
iter.next() match {
case adjustment: CoreDivisorAdjustment =>
if (adjustment.canAdjust(recommendedExecutorCores)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If code for these two cases are similar, then one of them can be removed. Same is true for other cases as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is processing the adjustment differently for each adjustment type. Are you suggesting creating more case classes, CoreAdjustment, MemoryAdjustment, and PartitionAdjustment, and then subclassing the current case classes off those, to consolidate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only related to merging two cases if code is same for these 2 cases.
For example
Case 1:
//Code
Case 2:
//Code

Case 1 | 2 :
//Code:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added traits for CoreAdjustment and MemoryAdjustment, and consolidated the case classes.


val currentParallelism = sparkExecutorInstances.map(_ * sparkExecutorCores)

val jvmUsedMemoryHeuristic =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long run, we need to think about caching these heuristics, so that these need not be computed multiple times. For now this is fine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

* @return the recommended value in bytes for executor memory overhead
*/
private def calculateExecutorMemoryOverhead(): Option[Long] = {
val overheadMemoryIncrement = 1L * GB_TO_BYTES
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use FileUtils.ONE_GB.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced.

if (stageAnalysis.exists { stage =>
hasSignificantSeverity(stage.taskFailureResult.containerKilledSeverity)
}) {
val actualMemoryOverhead = sparkExecutorMemoryOverhead.getOrElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we consider user specified memory overhead as well ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is first trying to get the user specified memory overhead, and if this doesn't exist, calculating the default value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed that.

num * MB_TO_BYTES
} else {
unit.charAt(0) match {
case 'T' => num * TB_TO_BYTES
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use FileUtils.ONE_GB, ONE_MB etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced.

* @param size The memory value in long bytes
* @return The formatted string, null if
*/
private def bytesToString(size: Long): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is available in org.apache.spark.network.util.JavaUtils.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced JstringToBytes with avaUtils.byteStringAsBytes. I wasn't able to find a function for the other direction.


import com.linkedin.drelephant.analysis.SeverityThresholds

object ConfigurationUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to change the name to have ConfigurationHeuristicsConstants or something else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

@pralabhkumar
Copy link
Contributor

pralabhkumar commented Dec 10, 2018

LGTM , please test the flow on pokemon/EI cluster to make sure things are working fine e2e

* @param size The memory value in long bytes
* @return The formatted string, null if
*/
private def bytesToString(size: Long): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryFormatUtils#bytesToString does something similar. It won't print bytes in MB if its < 2GB but otherwise its exactly same. Maybe we can modify MemoryFormatUtils#bytesToString if this condition is necessary or move this method to MemoryFormatUtils too (As this method is good candidate for utils)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other differences are that MemoryFormatUtils#bytesToString also have a space between the value and unit, so "2 GB" instead of "2GB", and it only does GB and MB (no KB, or B) and rounds up. It would be possible to modify MemoryFormatUtils#bytesToString to have additional parameters for specify the threshold for moving to the next unit, which units to use, if it should round up, and if it should add a space or not.


/**
* This class contains Spark stage information.
*/public class SparkStageData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this class meant for?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not sure how this got added -- it is not being used. It may have somehow been a merge conflict when copying over from another branch. I'll remove.

val (numTasksWithContainerKilled, containerKilledSeverity) =
checkForSpecificTaskError(stageId, stageData, failedTasks,
StagesWithFailedTasksHeuristic.OVERHEAD_MEMORY_ERROR,
"the container was killed by YARN for exeeding memory limits.", details)
Copy link
Contributor

@varunsaxena varunsaxena Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this message correct? For instance, exeeding should ideally be "exceeding". Is the message generated in YarnAllocator in Spark code? If yes, the message may well be incorrect.
Moreover, going ahead, probably Spark can do error categorization by itself and pass a well-defined enum instead of Dr.Elephant expecting a custom message generated in Spark code because such code can break. Error message can still be passed as usual and printed to give user detailed information.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be "exceeded". For searching the actual error message, it is using StagesWithFailedTasksHeuristic.OVERHEAD_MEMORY_ERROR.

Right now, Spark is returning the error message, which can be varied, if it is coming from the user application. There isn't a well-defined enum for types of errors.

data.appConfigurationProperties

// current configuration parameters
lazy val sparkExecutorMemory = JavaUtils.byteStringAsBytes(
Copy link
Contributor

@varunsaxena varunsaxena Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use MemoryFormatUtils#stringToBytes which is in Dr.Elephant codebase instead of using Spark utils. It will do exactly the same thing. Utils are typically meant to be used within a project even though they are public classes.

Copy link
Author

@edwinalu edwinalu Dec 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this does seem to be the same, and I will change.

@@ -211,7 +218,7 @@ class SparkRestClient(sparkConf: SparkConf) {
}

private def getStageDatas(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages")
val target = attemptTarget.path("stages/withSummaries")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is LinkedIn specific REST endpoint and wont work in open source till it's contributed back to Spark upstream. Probably going ahead we should refactor the code and have our own SparkRestClient implementation. The abstraction for us is primarily at the fetcher level. So probably have a linkedin specific spark fetcher implementation which extends SparkFetcher which currently exists, reuses the part where we are fetching event logs but has custom Spark rest client implementation.

Copy link
Author

@edwinalu edwinalu Dec 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is LinkedIn specific, and separating out the code would make sense. Could the refactoring be done later?

@mkumar1984 mkumar1984 merged commit 07c2446 into linkedin:tuning Dec 21, 2018
mkumar1984 pushed a commit that referenced this pull request Jan 14, 2019
* Add new configuration parameters heuristic

* add configuration

* check for execution memory spill before adjusting executor memory

* code review comments

* remove partitions

* consolidate case classes

* add license

* add more licenses

* remove stage level GC analysis/warnings, due to too many false positives. Do not print stack trace for fetching failed tasks.

* code review comments

(cherry picked from commit 07c2446)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants