Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Arguments for Distributed Mode in Qualification Tool CLI #1429

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator


# pylint: disable=abstract-method
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: What prompt that change in all the cloud_api classes??

Copy link
Collaborator Author

@parthosa parthosa Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addition of method create_distributed_submission_job() in PlatformBase required all CSPs to implement this. Now, since CSPs do not support distributed mode now, we would have to implement this method in all CSPs modules with body as pass.

Currently, we use the above approach for methods such as:

def set_offline_cluster(self, cluster_args: dict = None):
   pass
        
def validate_job_submission_args(self, submission_args: dict) -> dict:
   pass

However, I think there are pros and cons to this approach.

Pros: In each CSP class we are clear what is implemented, what is not.
Cons: It adds redundant code in all CSP classes.

By adding the pylint exception, it would not be mandatory for each CSP to define methods with body as pass. Let me know your thoughts on this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From offline discussion, removed the disable rule for pylint and added create_distributed_submission_job() in each CSP.

@dataclass
class DBAWSPlatform(EMRPlatform):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator


# pylint: disable=abstract-method
@dataclass
class DBAzurePlatform(PlatformBase):
"""
Expand Down
1 change: 1 addition & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator


# pylint: disable=abstract-method
@dataclass
class DataprocPlatform(PlatformBase):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from spark_rapids_tools import CspEnv


# pylint: disable=abstract-method
@dataclass
class DataprocGkePlatform(DataprocPlatform):
"""
Expand Down
1 change: 1 addition & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator


# pylint: disable=abstract-method
@dataclass
class EMRPlatform(PlatformBase):
"""
Expand Down
14 changes: 13 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
from typing import Any, List, Optional

from spark_rapids_tools import CspEnv
from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob
from spark_rapids_pytools.cloud_api.sp_types import PlatformBase, ClusterBase, ClusterNode, \
CMDDriverBase, ClusterGetAccessor, GpuDevice, \
GpuHWInfo, NodeHWInfo, SparkNodeType, SysInfo
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.sys_storage import StorageDriver
from spark_rapids_pytools.pricing.dataproc_pricing import DataprocPriceProvider
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob, RapidsDistributedJob


@dataclass
Expand All @@ -49,6 +49,9 @@ def _install_storage_driver(self):
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return OnPremLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> RapidsDistributedJob:
return OnPremDistributedRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

amahussein marked this conversation as resolved.
Show resolved Hide resolved
def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False,
is_props_file: bool = False):
return OnPremCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props)
Expand Down Expand Up @@ -154,6 +157,15 @@ class OnPremLocalRapidsJob(RapidsLocalJob):
job_label = 'onpremLocal'


# pylint: disable=abstract-method
@dataclass
class OnPremDistributedRapidsJob(RapidsDistributedJob):
"""
Implementation of a RAPIDS job that runs on a distributed cluster
"""
job_label = 'onprem.distributed'


@dataclass
class OnPremNode(ClusterNode):
"""Implementation of Onprem cluster node."""
Expand Down
5 changes: 4 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
from logging import Logger
from typing import Type, Any, List, Callable, Union, Optional, final, Dict

from spark_rapids_tools import EnumeratedType, CspEnv
from spark_rapids_pytools.common.prop_manager import AbstractPropertiesContainer, JSONPropertiesContainer, \
get_elem_non_safe
from spark_rapids_pytools.common.sys_storage import StorageDriver, FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging, SysCmd, Utils, TemplateGenerator
from spark_rapids_tools import EnumeratedType, CspEnv


class DeployMode(EnumeratedType):
Expand Down Expand Up @@ -884,6 +884,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
raise NotImplementedError

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
raise NotImplementedError

def load_platform_configs(self):
config_file_name = f'{CspEnv.tostring(self.type_id).lower()}-configs.json'
config_path = Utils.resource_path(config_file_name)
Expand Down
7 changes: 6 additions & 1 deletion user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ def _process_estimation_model_args(self) -> None:
estimation_model_args = QualEstimationModel.create_default_model_args(selected_model)
self.ctxt.set_ctxt('estimationModelArgs', estimation_model_args)

def _process_distributed_tools_args(self) -> None:
distributed_tools_enabled = self.wrapper_options.get('distributedToolsEnabled')
self.ctxt.set_ctxt('distributedToolsEnabled', distributed_tools_enabled)

def _process_custom_args(self) -> None:
"""
Qualification tool processes extra arguments:
Expand Down Expand Up @@ -181,6 +185,7 @@ def _process_custom_args(self) -> None:
self._process_estimation_model_args()
self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_distributed_tools_args()
# This is noise to dump everything
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, interesting to see we have unused code here.


Expand Down Expand Up @@ -375,7 +380,7 @@ def create_stdout_table_pprinter(total_apps: pd.DataFrame,

df = self._read_qualification_output_file('summaryReport')
# 1. Operations related to XGboost modelling
if self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']:
if not df.empty and self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']:
try:
df = self.__update_apps_with_prediction_info(df,
self.ctxt.get_ctxt('estimationModelArgs'))
Expand Down
55 changes: 45 additions & 10 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
import os
from dataclasses import dataclass, field
from logging import Logger
from typing import List, Optional
from typing import List, Optional, Union

from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.utilities import ToolLogging, Utils
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools.storagelib import LocalPath
from spark_rapids_tools_distributed.jar_cmd_args import JarCmdArgs


@dataclass
Expand All @@ -38,6 +39,8 @@ def _init_fields(self):
self.props['sparkConfArgs'] = {}
if self.get_value_silent('platformArgs') is None:
self.props['platformArgs'] = {}
if self.get_value_silent('distributedToolsConfigs') is None:
self.props['distributedToolsConfigs'] = {}

def get_jar_file(self):
return self.get_value('rapidsArgs', 'jarFile')
Expand All @@ -48,6 +51,9 @@ def get_jar_main_class(self):
def get_rapids_args(self):
return self.get_value('rapidsArgs', 'jarArgs')

def get_distribution_tools_configs(self):
return self.get_value('distributedToolsConfigs')


@dataclass
class RapidsJob:
Expand Down Expand Up @@ -90,10 +96,10 @@ def _build_rapids_args(self):
rapids_arguments.extend(extra_rapids_args)
return rapids_arguments

def _build_submission_cmd(self) -> list:
def _build_submission_cmd(self) -> Union[list, JarCmdArgs]:
raise NotImplementedError

def _submit_job(self, cmd_args: list) -> str:
def _submit_job(self, cmd_args: Union[list, JarCmdArgs]) -> str:
raise NotImplementedError

def _print_job_output(self, job_output: str):
Expand Down Expand Up @@ -125,13 +131,6 @@ def run_job(self):
self._cleanup_temp_log4j_files()
return job_output


@dataclass
class RapidsLocalJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs local on a machine.
"""

def _get_hadoop_classpath(self) -> Optional[str]:
"""
Gets the Hadoop's configuration directory from the environment variables.
Expand Down Expand Up @@ -202,6 +201,13 @@ def _build_jvm_args(self):
vm_args.append(val)
return vm_args


@dataclass
class RapidsLocalJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs local on a machine.
"""

def _build_submission_cmd(self) -> list:
# env vars are added later as a separate dictionary
classpath_arr = self._build_classpath()
Expand All @@ -218,3 +224,32 @@ def _submit_job(self, cmd_args: list) -> str:
out_std = self.exec_ctxt.platform.cli.run_sys_cmd(cmd=cmd_args,
env_vars=env_args)
return out_std


@dataclass
class RapidsDistributedJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs distributed on a cluster.
"""

def _build_submission_cmd(self) -> JarCmdArgs:
classpath_arr = self._build_classpath()
hadoop_cp = self._get_hadoop_classpath()
jvm_args_arr = self._build_jvm_args()
jar_main_class = self.prop_container.get_jar_main_class()
jar_output_dir_args = self._get_persistent_rapids_args()
extra_rapids_args = self.prop_container.get_rapids_args()
return JarCmdArgs(jvm_args_arr, classpath_arr, hadoop_cp, jar_main_class,
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved
jar_output_dir_args, extra_rapids_args)

def _build_classpath(self) -> List[str]:
"""
Only the Spark RAPIDS Tools JAR file is needed for the classpath.
Assumption: Each worker node should have the Spark Jars pre-installed.
TODO: Ship the Spark JARs to the cluster to avoid version mismatch issues.
"""
return ['-cp', self.prop_container.get_jar_file()]

def _submit_job(self, cmd_args: JarCmdArgs) -> None:
# TODO: Support for submitting the Tools JAR to a Spark cluster
raise NotImplementedError('Distributed job submission is not yet supported')
36 changes: 32 additions & 4 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools import CspEnv
from spark_rapids_tools.configuration.common import RuntimeDependency
from spark_rapids_tools.configuration.distributed_tools_config import DistributedToolsConfig
from spark_rapids_tools.configuration.tools_config import ToolsConfig
from spark_rapids_tools.enums import DependencyType
from spark_rapids_tools.storagelib import LocalPath, CspFs
Expand Down Expand Up @@ -608,7 +609,7 @@ def populate_dependency_list() -> List[RuntimeDependency]:
# check if the dependencies is defined in a config file
config_obj = self.get_tools_config_obj()
if config_obj is not None:
if config_obj.runtime.dependencies:
if config_obj.runtime and config_obj.runtime.dependencies:
parthosa marked this conversation as resolved.
Show resolved Hide resolved
return config_obj.runtime.dependencies
self.logger.info('The ToolsConfig did not specify the dependencies. '
'Falling back to the default dependencies.')
Expand Down Expand Up @@ -939,10 +940,33 @@ def _prepare_local_job_arguments(self):
'sparkConfArgs': spark_conf_args,
'platformArgs': platform_args
}
# Set the configuration for the distributed tools
distributed_tools_configs = self._get_distributed_tools_configs()
if distributed_tools_configs:
job_properties_json['distributedToolsConfigs'] = distributed_tools_configs
rapids_job_container = RapidsJobPropContainer(prop_arg=job_properties_json,
file_load=False)
self.ctxt.set_ctxt('rapidsJobContainers', [rapids_job_container])

def _get_distributed_tools_configs(self) -> Optional[DistributedToolsConfig]:
"""
Get the distributed tools configurations from the tools config file
"""
config_obj = self.get_tools_config_obj()
if config_obj and config_obj.distributed_tools:
if self.ctxt.is_distributed_mode():
return config_obj.distributed_tools
self.logger.warning(
'Distributed tool configurations detected, but distributed mode is not enabled.'
'Use \'--distributed\' flag to enable distributed mode. Switching to local mode.'
)
elif self.ctxt.is_distributed_mode():
self.logger.warning(
'Distributed mode is enabled, but no distributed tool configurations were provided. '
'Using default settings.'
)
return None

def _archive_results(self):
self._archive_local_results()

Expand All @@ -961,14 +985,18 @@ def _submit_jobs(self):
executors_cnt = len(rapids_job_containers) if Utilities.conc_mode_enabled else 1
with ThreadPoolExecutor(max_workers=executors_cnt) as executor:
for rapids_job in rapids_job_containers:
job_obj = self.ctxt.platform.create_local_submission_job(job_prop=rapids_job,
ctxt=self.ctxt)
if self.ctxt.is_distributed_mode():
job_obj = self.ctxt.platform.create_distributed_submission_job(job_prop=rapids_job,
ctxt=self.ctxt)
else:
job_obj = self.ctxt.platform.create_local_submission_job(job_prop=rapids_job,
ctxt=self.ctxt)
futures = executor.submit(job_obj.run_job)
futures_list.append(futures)
try:
for future in concurrent.futures.as_completed(futures_list):
result = future.result()
results.append(result)
except Exception as ex: # pylint: disable=broad-except
self.logger.error('Failed to download dependencies %s', ex)
self.logger.error('Failed to submit jobs %s', ex)
raise ex
3 changes: 3 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def get_deploy_mode(self) -> Any:
def is_fat_wheel_mode(self) -> bool:
return self.get_ctxt('fatWheelModeEnabled')

def is_distributed_mode(self) -> bool:
return self.get_ctxt('distributedToolsEnabled') or False

def set_ctxt(self, key: str, val: Any):
self.props['wrapperCtx'][key] = val

Expand Down
4 changes: 3 additions & 1 deletion user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ class QualifyUserArgModel(ToolUserArgModel):
"""
filter_apps: Optional[QualFilterApp] = None
estimation_model_args: Optional[Dict] = dataclasses.field(default_factory=dict)
distributed_tools_enabled: Optional[bool] = None

def init_tool_args(self) -> None:
self.p_args['toolArgs']['platform'] = self.platform
Expand Down Expand Up @@ -532,7 +533,8 @@ def build_tools_args(self) -> dict:
'eventlogs': self.eventlogs,
'filterApps': QualFilterApp.fromstring(self.p_args['toolArgs']['filterApps']),
'toolsJar': self.p_args['toolArgs']['toolsJar'],
'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs']
'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs'],
'distributedToolsEnabled': self.distributed_tools_enabled
}
return wrapped_args

Expand Down
5 changes: 4 additions & 1 deletion user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def qualification(self,
jvm_threads: int = None,
verbose: bool = None,
tools_config_file: str = None,
distributed: bool = None,
**rapids_options) -> None:
"""The Qualification cmd provides estimated speedups by migrating Apache Spark applications
to GPU accelerated clusters.
Expand Down Expand Up @@ -83,6 +84,7 @@ def qualification(self,
:param tools_config_file: Path to a configuration file that contains the tools' options.
For sample configuration files, please visit
https://github.com/NVIDIA/spark-rapids-tools/tree/main/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid
:param distributed: True or False to enable distributed mode.
:param rapids_options: A list of valid Qualification tool options.
Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support
multiple "spark-property" arguments.
Expand Down Expand Up @@ -117,7 +119,8 @@ def qualification(self,
jvm_threads=jvm_threads,
filter_apps=filter_apps,
estimation_model_args=estimation_model_args,
tools_config_path=tools_config_file)
tools_config_path=tools_config_file,
distributed_tools_enabled=distributed)
if qual_args:
tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'],
output_folder=qual_args['outputFolder'],
Expand Down
Loading