Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Arguments for Distributed Mode in Qualification Tool CLI #1429

Conversation

parthosa
Copy link
Collaborator

@parthosa parthosa commented Nov 18, 2024

Fixes #1430.

This PR adds the initial changes needed in CLI to support distributed execution in the Qualification Tool CLI. It adds arguments to enable distributed mode and sets the stage for future implementation PRs.

Note:

  • An environment setup document will be shared internally.

Changes Overview

  • Extended RapidsJob: Introduced two subclasses—RapidsDistributedJob and RapidsLocalJob and a concrete class for the OnPrem platform.
  • Created a JarCmdArgs class to encapsulate all arguments needed to construct the JAR command.
  • Implemented the DistributedToolsConfig class, allowing configurations for distributed tools (like Spark properties) to be specified via the existing --tools_config_file option.

CMD:

spark_rapids qualification --platform onprem --eventlogs /path/to/eventlogs  --verbose --filter_apps all \
 --submission_mode distributed --tools_config_file /path/to/custom_conf_file.yaml

Sample Config File:

api_version: '1.1'
runtime:
  dependencies:
    - name: my-spark350
      uri: https:///archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
      dependency_type:
        dep_type: archive
        # for tgz files, it is required to give the subfolder where the jars are located
        relative_path: jars/*
submission:
  remote_cache_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache'
  spark_properties:
    - name: 'spark.executor.memory'
      value: '20g'

Details:

Enhancements to argument processing:

Platform class updates:

  • user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py, databricks_azure.py, dataproc.py, dataproc_gke.py, emr.py: Disabled pylint warnings for abstract methods. [1] [2] [3] [4] [5]

Other improvements:

Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
@parthosa parthosa added feature request New feature or request user_tools Scope the wrapper module running CSP, QualX, and reports (python) labels Nov 18, 2024
@parthosa parthosa self-assigned this Nov 18, 2024
@parthosa parthosa marked this pull request as ready for review November 18, 2024 21:47
Copy link
Collaborator

@cindyyuanjiang cindyyuanjiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @parthosa! LGTM, just a few quick questions.

Signed-off-by: Partho Sarthi <[email protected]>
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @parthosa !
Good job!
I think that we can improve the config if we have "runtime" common for both submision modes.

description='Configuration related to the runtime environment of the tools.')

distributed_tools: Optional[DistributedToolsConfig] = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having runtime on the same level as distributed_tools makes it confusing.
One would expect that runtime is generic property that applies for all submission modes.
I suggest that we have a DistributedToolsRuntimeConfig that extends ToolsRuntimeConfig or we have an abstract RuntimeConfig that gets extended by local and Distributed implementation.
In that case the format of the file will be more consistent for both modes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, we are adding more properties to the config file --tools_config_file. So a user can now provide both runtime and distributed_tools together.

Example,

api_version: '1.0'
runtime:
  dependencies:
    - name: my-spark350
      uri: https:///archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
      dependency_type:
        dep_type: archive
        # for tgz files, it is required to give the subfolder where the jars are located
        relative_path: jars/*
distributed_tools:
  hdfs_output_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache'
  spark_properties:
  - name: 'spark.executor.memory'
    value: '20g'

I wanted to understand why do we need a separate runtime config for distributed tools?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From offline discussion, updated the config file format to be as follows:

api_version: '1.1'
runtime:
  dependencies:
    - name: my-spark350
      uri: https:///archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
      dependency_type:
        dep_type: archive
        # for tgz files, it is required to give the subfolder where the jars are located
        relative_path: jars/*
submission:
  remote_cache_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache'
  spark_properties:
    - name: 'spark.executor.memory'
      value: '20g'

examples=['hdfs:///path/to/output/dir']
)

spark_properties: List[SparkProperty] = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, spark_property configuration is not limited to the distributed mode.
For the localMode, there is potential use to have a customer set spark_property that can be loaded by the tools. For example, FileSystem related arguments to access the eventlogs.

Copy link
Collaborator Author

@parthosa parthosa Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an interesting point. I think we should be specific where this each configuration property will be applied (for example, in this case, it will be used for distributed tools mode).

If we intend to use Spark properties for additional purposes in the future, we could leverage the SparkProperty class to define a separate configuration property for that specific use case.

@@ -31,6 +31,7 @@
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator


# pylint: disable=abstract-method
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: What prompt that change in all the cloud_api classes??

Copy link
Collaborator Author

@parthosa parthosa Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addition of method create_distributed_submission_job() in PlatformBase required all CSPs to implement this. Now, since CSPs do not support distributed mode now, we would have to implement this method in all CSPs modules with body as pass.

Currently, we use the above approach for methods such as:

def set_offline_cluster(self, cluster_args: dict = None):
   pass
        
def validate_job_submission_args(self, submission_args: dict) -> dict:
   pass

However, I think there are pros and cons to this approach.

Pros: In each CSP class we are clear what is implemented, what is not.
Cons: It adds redundant code in all CSP classes.

By adding the pylint exception, it would not be mandatory for each CSP to define methods with body as pass. Let me know your thoughts on this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From offline discussion, removed the disable rule for pylint and added create_distributed_submission_job() in each CSP.

"""Configuration class for distributed tools"""
hdfs_output_dir: str = Field(
description='HDFS output directory where the output data from the distributed '
'tools will be stored.',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along the same point of being generic:

description='Output directory where the output data from the distributed '
                    'tools will be stored. Currently, it supports only HDFS.'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should clarify this in the description. This is the intermediate output directory where each map task is going to write the output. This directory will always be in HDFS (even in case of CSPs).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to remote_cache_dir and updated the description


class DistributedToolsConfig(BaseModel):
"""Configuration class for distributed tools"""
hdfs_output_dir: str = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we name it remote_output_dir, or output_dir, then it will be better for us when we enable other CSPs. We won't have to change the basic fields in the config to do so.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the other comment: I should clarify this in the description. This is the intermediate output directory where each map task is going to write the output. This directory will always be in HDFS (even in case of CSPs).

Signed-off-by: Partho Sarthi <[email protected]>
@parthosa parthosa marked this pull request as draft December 13, 2024 19:28
@parthosa
Copy link
Collaborator Author

Converted to draft to address review comments

@@ -28,8 +28,7 @@
"value": "a65839fbf1869f81a1632e09f415e586922e4f80"
},
"size": 962685
},
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "type": "jar" property did not conform to the RuntimeDependency type specification. Previously, we allowed extra keys to be included, which resulted in properties like type passing validation incorrectly.

@parthosa parthosa marked this pull request as ready for review December 17, 2024 05:02
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thnaks @parthosa !
LGTME

Copy link
Collaborator

@cindyyuanjiang cindyyuanjiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @parthosa! a few minor questions and nits

Process the value provided by `--submission_mode` argument.
"""
submission_mode_arg = self.wrapper_options.get('submissionMode')
if submission_mode_arg is None or not submission_mode_arg:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: what does condition not submission_mode_arg catch? like an empty string?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It could be anything that python considers falsey (e.g. empty string)

@@ -185,7 +192,7 @@ def _process_custom_args(self) -> None:
self._process_estimation_model_args()
self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_distributed_tools_args()
self._process_submission_mode_arg()
# This is noise to dump everything
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, interesting to see we have unused code here.

@@ -49,6 +49,7 @@ def gen_cpu_cluster_props():
autotuner_prop_path = 'worker_info.yaml'
# valid tools config files
valid_tools_conf_files = ['tools_config_00.yaml']
valid_distributed_mode_tools_conf_files = ['tools_config_01.yaml', 'tools_config_02.yaml']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: are we planning to give these yaml files more meaningful names later?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @cindyyuanjiang. Going forward we can rename this config files to be more meaningful.

@parthosa parthosa merged commit 6c61e52 into NVIDIA:spark-rapids-tools-distributed-base Dec 20, 2024
14 checks passed
@parthosa parthosa deleted the spark-rapids-tools-distributed-args-v2 branch December 20, 2024 01:06
@parthosa parthosa linked an issue Dec 21, 2024 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request user_tools Scope the wrapper module running CSP, QualX, and reports (python)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[TASK] Add Arguments for Distributed Mode in Qualification Tool CLI
3 participants