Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infer cluster shape from single event log in Qualification tool #687

Closed
wants to merge 5 commits into from

Conversation

parthosa
Copy link
Collaborator

@parthosa parthosa commented Dec 12, 2023

Contributes #581. This PR infers CPU cluster shape by parsing the event log. This will be used for savings estimation.

Inference Logic

  1. num_drivers_nodes:
    • Set as 1 for all CSPs.
  2. driver_instance:
    • For Databricks use: Spark Properties -> spark.databricks.driverNodeTypeId
    • For rest, use default driver instance from config file
  3. num_executor_nodes:
    • Count unique hosts IPs from SparkListenerBlockManagerAdded -> Block Manager ID -> Executor ID != driver
  4. executor_instance:
    • For Databricks use: Spark Properties -> spark.databricks.executorNodeTypeId
    • For rest,
      • Extract num_cores: SparkListenerExecutorAdded -> Executor Info -> Total Cores
      • Use default instance type with matching cores from config, eg: {"name": "m5d.2xlarge", "vCPUs": 8},

Changes

  1. cluster_inference.py - Implements the above logic.
    • Parses event log
    • Extracts information based on platform
    • Create CPU cluster object
  2. qualification.py - Modify code to use above inference
  3. Cloud APIs - Each Platform implements _construct_cluster_config to read default cluster config and update fields related to shape (num_executors, executor_instance)
  4. JSON Configs - Every config file now includes:
    • Default template for Cluster (output from describe)
    • Default template for Node
    • List of CPU instances based on num cores

Input/Output

EMR - Inference Supported

CMD: spark_rapids_user_tools emr qualification --eventlogs ~/eventlog_file -v

INFO rapids.tools.qualification: Inferring CPU cluster properties from event logs. This could take a while.
...
INFO rapids.tools.qualification: Inferred Cluster => Driver: i3.2xlarge, Executor: 8 X m5d.4xlarge

Dataproc - Inference Supported

CMD: spark_rapids_user_tools dataproc qualification --eventlogs ~/eventlog_file -v

INFO rapids.tools.qualification: Inferring CPU cluster properties from event logs. This could take a while.
...
INFO rapids.tools.qualification: Inferred Cluster => Driver: n1-standard-16, Executor: 4 X n1-standard-16

Databricks AWS - Inference Supported

CMD: spark_rapids_user_tools databricks-aws qualification --eventlogs ~/eventlog_file -v

INFO rapids.tools.qualification: Inferring CPU cluster properties from event logs. This could take a while.
...
INFO rapids.tools.qualification: Inferred Cluster => Driver: m6gd.xlarge, Executor: 2 X m6gd.2xlarge

Databricks Azure - Inference Supported

CMD: spark_rapids_user_tools databricks-azure qualification --eventlogs ~/eventlog_file -v

INFO rapids.tools.qualification: Inferring CPU cluster properties from event logs. This could take a while.
...
INFO rapids.tools.qualification: Inferred Cluster => Driver: Standard_E8ds_v4, Executor: 2 X Standard_E8ds_v4

OnPrem - Inference Not Supported

CMD: spark_rapids_user_tools onprem qualification --eventlogs ~/eventlog_file -v

INFO rapids.tools.qualification: Savings estimates are disabled because the cluster-information is not provided.

Dataproc-GKE - Inference Not Supported

CMD: spark_rapids_user_tools dataproc-gke qualification --eventlogs ~/eventlog_file -v

INFO rapids.tools.qualification: Savings estimates are disabled because the cluster-information is not provided.

Eventlog is a directory - Inference Not Supported

CMD: spark_rapids_user_tools emr qualification --eventlogs ~/eventlog_dir -v

INFO rapids.tools.qualification: Inferring CPU cluster properties from event logs. This could take a while.
INFO rapids.tools.cluster_inference: Unable to infer CPU cluster. Path ~/eventlog_dir should be a file.
INFO rapids.tools.qualification: Cannot infer CPU cluster from event logs

@parthosa parthosa self-assigned this Dec 12, 2023
@parthosa parthosa added user_tools Scope the wrapper module running CSP, QualX, and reports (python) feature request New feature or request labels Dec 12, 2023
@parthosa parthosa force-pushed the spark-rapids-tools-581 branch from ce3ccf0 to 12e7668 Compare December 12, 2023 19:49
@parthosa parthosa marked this pull request as ready for review December 13, 2023 00:01
@tgravescs
Copy link
Collaborator

num_drivers_nodes:
> Count SparkListenerBlockManagerAdded-> Block Manager ID -> Executor ID == driver

There is only one driver. You can have extra processes for like Application Master if running on yarn in client mode. We may care about those but can probably be inferred from the master and mode running.

num_executor_nodes:
Count unique hosts from SparkListenerBlockManagerAdded -> Block Manager ID -> Executor ID != driver

Maybe this needs clarified as to when we care about number of nodes. I assume here you are tracking the Host field in the SparkListenerBlockManagerAdded, otherwise looking at executor ID you are tracking executors not hosts. If you are looking at unique Host IPs then you get number of nodes. Number of executors with be unique host ips and port combinations but you can use the ExecutorAddedbelow.

executor_instance:
For Databricks use: Spark Properties -> spark.databricks.executorNodeTypeId
For rest,
> Extract num_cores: SparkListenerExecutorAdded -> Executor Info -> Total Cores
> Use default instance type with matching cores from config, eg: {"name": "m5d.2xlarge", "vCPUs": 8},

Note you also have to look for removed, especially in the case with dynamic allocation. The point being here what exactly are we trying to figure out? If the user specified number of executor instances then it would be in the config, there is also a min and max number with dynamic allocation. Just because they specify it doesn't mean they actually got that number so its good to check. With dynamic allocation if you are trying to determine the max at any time then you have to go through and do the adds and removes and keep track of the max.
The BlockManageredAdded messages have the memory sizes if you want to check those. Generally would expect the normal Spark configs to have them though. You would also ant to look for memoryOverhead in the configs

Even for Databricks you may ant to track the executor information because even though they select a node type they may run more then 1 executors on it if they configure things properly (this is not the usual case but needs to be accounted for)

@parthosa
Copy link
Collaborator Author

parthosa commented Dec 13, 2023

Thanks @tgravescs for the feedback. We are trying to figure out the shape of CPU cluster (number of nodes used for driver, executor and their types) for calculating the cost of running the cluster.

Maybe this needs clarified as to when we care about number of nodes. I assume here you are tracking the Host field in the SparkListenerBlockManagerAdded, otherwise looking at executor ID you are tracking executors not hosts. If you are looking at unique Host IPs then you get number of nodes.

Yes, we are looking at unique Host IPs to extract number of nodes.

Even for Databricks you may ant to track the executor information because even though they select a node type they may run more then 1 executors on it if they configure things properly (this is not the usual case but needs to be accounted for)

In run more than 1 executors, are we refering Spark executors?. But for our purpose, we only care about number of nodes used.

@tgravescs
Copy link
Collaborator

INFO rapids.tools.qualification: Inferring CPU cluster properties from event logs. This could take a while.
DEBUG rapids.tools.cmd: submitting system command: <aws ec2 describe-instance-types --region us-west-2 --instance-types m6gd.2xlarge>
DEBUG rapids.tools.cmd: submitting system command: <aws ec2 describe-instance-types --region us-west-2 --instance-types m6gd.2xlarge>
DEBUG rapids.tools.cmd: submitting system command: <aws ec2 describe-instance-types --region us-west-2 --instance-types m6gd.xlarge>

So just going by description, I'm not sure what our intended output is. You mention above its just number of specific host types in the cloud then (onprem not supported)? ie so final output I would expect to be like Driver node type = hosttype_y, number of nodes for exeuctors = 4 executor node type = hosttype_x.

how are you deciding its hosttype_y or hosttype_x without some config being explicit? ie I have 2 executors with 8 cores each and 32GB memory. Do you care that is a m6gd.4xlarge vs an i3.4xlarge or something like that? Or do you need to know that? I'd have to look at each csp event logs to know if its somewhere in there.

@parthosa
Copy link
Collaborator Author

parthosa commented Dec 14, 2023

So just going by description, I'm not sure what our intended output is.

I have added log statement that shows the inferred cluster (see output in description for all CSPs)

INFO rapids.tools.qualification: Inferred Cluster => Driver: i3.2xlarge, Executor: 8 X m5d.4xlarge

Using EMR example,

In EMR, we add the following entries in emr-config.json:

"defaultCpuInstances": {
  "driver": "i3.2xlarge",
  "executor": [
    {"name": "m5d.large", "vCPUs": 2},
    {"name": "m5d.xlarge", "vCPUs": 4},
    {"name": "m5d.2xlarge", "vCPUs": 8},
    {"name": "m5d.4xlarge", "vCPUs": 16},
    {"name": "m5d.8xlarge", "vCPUs": 32},
    {"name": "m5d.12xlarge", "vCPUs": 48},
    {"name": "m5d.16xlarge", "vCPUs": 64}
  ]
}

how are you deciding its hosttype_y or hosttype_x without some config being explicit

  • For driver instance, we use a default instance type (in the above config: defaultCpuInstances -> driver).
  • For executor instance, we use instance type that has matching cores (in the above config: defaultCpuInstances -> executor).

Do you care that is a m6gd.4xlarge vs an i3.4xlarge

Currently, we do not differentiate based on memory and only support single series.
In an offline discussion with @viadea, we plan to support single series only (these default instances are based on ab benchmark)

(onprem not supported)?

OnPrem cluster inference follows a slightly different path. To avoid making this PR larger, I can create a follow-up PR for OnPrem support.

@tgravescs
Copy link
Collaborator

overall approach seems fine based on the requirements. I think the on prem gets a bit more tricky but that is separate issue.
Actually maybe one other thing to consider is that even on the CSPs could lose nodes. For instance Databricks node could be taken back because you bid a low price. Generally it would replace it with another one but you will want to watch for that in your count host logic

@parthosa
Copy link
Collaborator Author

Actually maybe one other thing to consider is that even on the CSPs could lose nodes.

Thanks @tgravescs. I improved the logic to remove host from the set during SparkListenerBlockManagerRemoved event. This ensures we do not count extra hosts.

Example,

{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"host-1","Port":42661}...}
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"2","Host":"host-2","Port":42661}...}
{"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"1","Host":"host-1","Port":42661}}
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"host-3","Port":37573}...}

Now, in this case, we will correctly count num_executor_nodes=2 instead of 3 (although since we are using set() this would be handled if the host IP does not change)

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a quick look to the changes, but I need to spend more time going through the discussions.

I have some concerns related to the design:

  1. The tools-core module is in charge of loading and processing the eventlogs.

    1. Ideally, we don't want to parse an eventlog (1st using python then using Spark APIs). This kills the performance in half.
    2. We won't be able to handle large files unless we use some wrappers that can handle credentials and buffering large eventlogs. The changes in the PR does not seem to get around that.
  2. If we move the extraction of cluster shape to scala, then we can also enable the same feature for the autotuner when needed.

I strongly recommend that the tools-core handles any parsing and extraction related to the eventlog. This means that tools-core generates a new file that shows the CPU cluster shape for each appID.
The user-tools module then loads that file (if any) for cost-savings calculations.

I will take some time to think about it.

@amahussein
Copy link
Collaborator

@parthosa Let's hold on this until we have some discussions about the design
cc: @mattahrens

@parthosa
Copy link
Collaborator Author

parthosa commented Dec 18, 2023

Thanks @amahussein for the feedback.

  1. Parsing things twice would degrade the performance especially when we pass multiple event logs.
  2. The suggestion to output the cluster shape from core tool as a file and then user tools can parse it is more efficient.

Changing the PR to draft for now.

@parthosa parthosa marked this pull request as draft December 18, 2023 18:55
@amahussein
Copy link
Collaborator

Closing the PR as we need to do the implementation on Scala side

@amahussein amahussein closed this Jan 26, 2024
@parthosa parthosa deleted the spark-rapids-tools-581 branch February 29, 2024 06:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request user_tools Scope the wrapper module running CSP, QualX, and reports (python)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Qualification tool can infer the CPU jobs' cluster shape and then provide the suggestion based on that
3 participants