-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Arguments for Distributed Mode in Qualification Tool CLI #1429
Add Arguments for Distributed Mode in Qualification Tool CLI #1429
Conversation
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json
Outdated
Show resolved
Hide resolved
user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml
Outdated
Show resolved
Hide resolved
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @parthosa! LGTM, just a few quick questions.
Signed-off-by: Partho Sarthi <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @parthosa !
Good job!
I think that we can improve the config if we have "runtime" common for both submision modes.
description='Configuration related to the runtime environment of the tools.') | ||
|
||
distributed_tools: Optional[DistributedToolsConfig] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having runtime
on the same level as distributed_tools
makes it confusing.
One would expect that runtime
is generic property that applies for all submission modes.
I suggest that we have a DistributedToolsRuntimeConfig
that extends ToolsRuntimeConfig
or we have an abstract RuntimeConfig
that gets extended by local
and Distributed
implementation.
In that case the format of the file will be more consistent for both modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, we are adding more properties to the config file --tools_config_file
. So a user can now provide both runtime
and distributed_tools
together.
Example,
api_version: '1.0'
runtime:
dependencies:
- name: my-spark350
uri: https:///archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
dependency_type:
dep_type: archive
# for tgz files, it is required to give the subfolder where the jars are located
relative_path: jars/*
distributed_tools:
hdfs_output_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache'
spark_properties:
- name: 'spark.executor.memory'
value: '20g'
I wanted to understand why do we need a separate runtime config for distributed tools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From offline discussion, updated the config file format to be as follows:
api_version: '1.1'
runtime:
dependencies:
- name: my-spark350
uri: https:///archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
dependency_type:
dep_type: archive
# for tgz files, it is required to give the subfolder where the jars are located
relative_path: jars/*
submission:
remote_cache_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache'
spark_properties:
- name: 'spark.executor.memory'
value: '20g'
examples=['hdfs:///path/to/output/dir'] | ||
) | ||
|
||
spark_properties: List[SparkProperty] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, spark_property
configuration is not limited to the distributed mode.
For the localMode, there is potential use to have a customer set spark_property that can be loaded by the tools. For example, FileSystem
related arguments to access the eventlogs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is an interesting point. I think we should be specific where this each configuration property will be applied (for example, in this case, it will be used for distributed tools mode).
If we intend to use Spark properties for additional purposes in the future, we could leverage the SparkProperty
class to define a separate configuration property for that specific use case.
@@ -31,6 +31,7 @@ | |||
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator | |||
|
|||
|
|||
# pylint: disable=abstract-method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: What prompt that change in all the cloud_api classes??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addition of method create_distributed_submission_job()
in PlatformBase
required all CSPs to implement this. Now, since CSPs do not support distributed mode now, we would have to implement this method in all CSPs modules with body as pass
.
Currently, we use the above approach for methods such as:
def set_offline_cluster(self, cluster_args: dict = None):
pass
def validate_job_submission_args(self, submission_args: dict) -> dict:
pass
However, I think there are pros and cons to this approach.
Pros: In each CSP class we are clear what is implemented, what is not.
Cons: It adds redundant code in all CSP classes.
By adding the pylint exception, it would not be mandatory for each CSP to define methods with body as pass
. Let me know your thoughts on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From offline discussion, removed the disable rule for pylint and added create_distributed_submission_job()
in each CSP.
"""Configuration class for distributed tools""" | ||
hdfs_output_dir: str = Field( | ||
description='HDFS output directory where the output data from the distributed ' | ||
'tools will be stored.', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Along the same point of being generic:
description='Output directory where the output data from the distributed '
'tools will be stored. Currently, it supports only HDFS.'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should clarify this in the description. This is the intermediate output directory where each map task is going to write the output. This directory will always be in HDFS (even in case of CSPs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to remote_cache_dir
and updated the description
|
||
class DistributedToolsConfig(BaseModel): | ||
"""Configuration class for distributed tools""" | ||
hdfs_output_dir: str = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we name it remote_output_dir
, or output_dir
, then it will be better for us when we enable other CSPs. We won't have to change the basic fields in the config to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the other comment: I should clarify this in the description. This is the intermediate output directory where each map task is going to write the output. This directory will always be in HDFS (even in case of CSPs).
Signed-off-by: Partho Sarthi <[email protected]>
Converted to draft to address review comments |
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
@@ -28,8 +28,7 @@ | |||
"value": "a65839fbf1869f81a1632e09f415e586922e4f80" | |||
}, | |||
"size": 962685 | |||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "type": "jar"
property did not conform to the RuntimeDependency
type specification. Previously, we allowed extra keys to be included, which resulted in properties like type passing validation incorrectly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thnaks @parthosa !
LGTME
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @parthosa! a few minor questions and nits
Process the value provided by `--submission_mode` argument. | ||
""" | ||
submission_mode_arg = self.wrapper_options.get('submissionMode') | ||
if submission_mode_arg is None or not submission_mode_arg: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: what does condition not submission_mode_arg
catch? like an empty string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It could be anything that python considers falsey (e.g. empty string)
@@ -185,7 +192,7 @@ def _process_custom_args(self) -> None: | |||
self._process_estimation_model_args() | |||
self._process_offline_cluster_args() | |||
self._process_eventlogs_args() | |||
self._process_distributed_tools_args() | |||
self._process_submission_mode_arg() | |||
# This is noise to dump everything | |||
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this PR, interesting to see we have unused code here.
@@ -49,6 +49,7 @@ def gen_cpu_cluster_props(): | |||
autotuner_prop_path = 'worker_info.yaml' | |||
# valid tools config files | |||
valid_tools_conf_files = ['tools_config_00.yaml'] | |||
valid_distributed_mode_tools_conf_files = ['tools_config_01.yaml', 'tools_config_02.yaml'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: are we planning to give these yaml files more meaningful names later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes @cindyyuanjiang. Going forward we can rename this config files to be more meaningful.
6c61e52
into
NVIDIA:spark-rapids-tools-distributed-base
Fixes #1430.
This PR adds the initial changes needed in CLI to support distributed execution in the Qualification Tool CLI. It adds arguments to enable distributed mode and sets the stage for future implementation PRs.
Note:
Changes Overview
RapidsJob
: Introduced two subclasses—RapidsDistributedJob
andRapidsLocalJob
and a concrete class for theOnPrem
platform.JarCmdArgs
class to encapsulate all arguments needed to construct the JAR command.DistributedToolsConfig
class, allowing configurations for distributed tools (like Spark properties) to be specified via the existing--tools_config_file
option.CMD:
Sample Config File:
Details:
user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
: Added a new classOnPremDistributedRapidsJob
and a methodcreate_distributed_submission_job
to support distributed RAPIDS jobs. [1] [2]user_tools/src/spark_rapids_pytools/rapids/rapids_job.py
: IntroducedRapidsDistributedJob
class and updated methods to handle distributed tool configurations. [1] [2] [3] [4]user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
: Added methods to get distributed tools configurations and submit distributed jobs. [1] [2]Enhancements to argument processing:
user_tools/src/spark_rapids_pytools/rapids/qualification.py
: Added methods to process distributed tools arguments. [1] [2]user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
: UpdatedQualifyUserArgModel
andbuild_tools_args
to includedistributed_tools_enabled
. [1] [2]Platform class updates:
user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py
,databricks_azure.py
,dataproc.py
,dataproc_gke.py
,emr.py
: Disabled pylint warnings for abstract methods. [1] [2] [3] [4] [5]Other improvements:
user_tools/src/spark_rapids_pytools/rapids/qualification.py
: Added a check to ensure the DataFrame is not empty before accessing it.user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
: Added a new parameterdistributed
to thequalification
function.