From 5dea85df999983366336b8f4e9d700e16789672f Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Sat, 16 Nov 2024 19:11:14 -0800 Subject: [PATCH 01/14] Add arguments for running tools in distributed mode Signed-off-by: Partho Sarthi --- .../cloud_api/databricks_aws.py | 1 + .../cloud_api/databricks_azure.py | 1 + .../cloud_api/dataproc.py | 1 + .../cloud_api/dataproc_gke.py | 1 + .../src/spark_rapids_pytools/cloud_api/emr.py | 1 + .../spark_rapids_pytools/cloud_api/onprem.py | 13 ++++- .../cloud_api/sp_types.py | 5 +- .../rapids/qualification.py | 7 ++- .../spark_rapids_pytools/rapids/rapids_job.py | 49 +++++++++++++++---- .../rapids/rapids_tool.py | 10 ++-- .../spark_rapids_pytools/rapids/tool_ctxt.py | 4 ++ .../spark_rapids_tools/cmdli/argprocessor.py | 28 ++++++++++- .../src/spark_rapids_tools/cmdli/tools_cli.py | 10 +++- .../tools/qualification_stats_report.py | 2 +- .../jar_cmd_args.py | 30 ++++++++++++ 15 files changed, 144 insertions(+), 19 deletions(-) create mode 100644 user_tools/src/spark_rapids_tools_distributed/jar_cmd_args.py diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py index 82bd40133..774574a0d 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py @@ -31,6 +31,7 @@ from spark_rapids_pytools.pricing.price_provider import SavingsEstimator +# pylint: disable=abstract-method @dataclass class DBAWSPlatform(EMRPlatform): """ diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py index c746e26b3..c11ecb681 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py @@ -29,6 +29,7 @@ from spark_rapids_pytools.pricing.price_provider import SavingsEstimator +# pylint: disable=abstract-method @dataclass class DBAzurePlatform(PlatformBase): """ diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index c5e01c5e5..888d21eb0 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -32,6 +32,7 @@ from spark_rapids_pytools.pricing.price_provider import SavingsEstimator +# pylint: disable=abstract-method @dataclass class DataprocPlatform(PlatformBase): """ diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py index 4a36c8cae..d81e01309 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py @@ -29,6 +29,7 @@ from spark_rapids_tools import CspEnv +# pylint: disable=abstract-method @dataclass class DataprocGkePlatform(DataprocPlatform): """ diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py index 2ae6d713d..6401d7e43 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py @@ -31,6 +31,7 @@ from spark_rapids_pytools.pricing.price_provider import SavingsEstimator +# pylint: disable=abstract-method @dataclass class EMRPlatform(PlatformBase): """ diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py index cbbefc7c2..f0b7c22b4 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py @@ -19,7 +19,6 @@ from typing import Any, List, Optional from spark_rapids_tools import CspEnv -from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob from spark_rapids_pytools.cloud_api.sp_types import PlatformBase, ClusterBase, ClusterNode, \ CMDDriverBase, ClusterGetAccessor, GpuDevice, \ GpuHWInfo, NodeHWInfo, SparkNodeType, SysInfo @@ -27,6 +26,7 @@ from spark_rapids_pytools.common.sys_storage import StorageDriver from spark_rapids_pytools.pricing.dataproc_pricing import DataprocPriceProvider from spark_rapids_pytools.pricing.price_provider import SavingsEstimator +from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob, RapidsDistributedJob @dataclass @@ -49,6 +49,9 @@ def _install_storage_driver(self): def create_local_submission_job(self, job_prop, ctxt) -> Any: return OnPremLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt) + def create_distributed_submission_job(self, job_prop, ctxt) -> RapidsDistributedJob: + return OnPremDistributedRapidsJob(prop_container=job_prop, exec_ctxt=ctxt) + def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False, is_props_file: bool = False): return OnPremCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props) @@ -154,6 +157,14 @@ class OnPremLocalRapidsJob(RapidsLocalJob): job_label = 'onpremLocal' +@dataclass +class OnPremDistributedRapidsJob(RapidsDistributedJob): + """ + Implementation of a RAPIDS job that runs on a distributed cluster + """ + job_label = 'onpremDistributed' + + @dataclass class OnPremNode(ClusterNode): """Implementation of Onprem cluster node.""" diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py index 949ac29d9..2c566f166 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py @@ -22,11 +22,11 @@ from logging import Logger from typing import Type, Any, List, Callable, Union, Optional, final, Dict -from spark_rapids_tools import EnumeratedType, CspEnv from spark_rapids_pytools.common.prop_manager import AbstractPropertiesContainer, JSONPropertiesContainer, \ get_elem_non_safe from spark_rapids_pytools.common.sys_storage import StorageDriver, FSUtil from spark_rapids_pytools.common.utilities import ToolLogging, SysCmd, Utils, TemplateGenerator +from spark_rapids_tools import EnumeratedType, CspEnv class DeployMode(EnumeratedType): @@ -884,6 +884,9 @@ def create_saving_estimator(self, def create_local_submission_job(self, job_prop, ctxt) -> Any: raise NotImplementedError + def create_distributed_submission_job(self, job_prop, ctxt) -> Any: + raise NotImplementedError + def load_platform_configs(self): config_file_name = f'{CspEnv.tostring(self.type_id).lower()}-configs.json' config_path = Utils.resource_path(config_file_name) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 921dcc9b5..76e85abd6 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -153,6 +153,10 @@ def _process_estimation_model_args(self) -> None: estimation_model_args = QualEstimationModel.create_default_model_args(selected_model) self.ctxt.set_ctxt('estimationModelArgs', estimation_model_args) + def _process_distributed_mode_args(self) -> None: + distributed_mode_args = self.wrapper_options.get('distributedModeArgs') + self.ctxt.set_ctxt('distributedModeArgs', distributed_mode_args) + def _process_custom_args(self) -> None: """ Qualification tool processes extra arguments: @@ -181,6 +185,7 @@ def _process_custom_args(self) -> None: self._process_estimation_model_args() self._process_offline_cluster_args() self._process_eventlogs_args() + self._process_distributed_mode_args() # This is noise to dump everything # self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx']) @@ -375,7 +380,7 @@ def create_stdout_table_pprinter(total_apps: pd.DataFrame, df = self._read_qualification_output_file('summaryReport') # 1. Operations related to XGboost modelling - if self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']: + if not df.empty and self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']: try: df = self.__update_apps_with_prediction_info(df, self.ctxt.get_ctxt('estimationModelArgs')) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py index a77a94b00..27fcf8747 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py @@ -17,12 +17,13 @@ import os from dataclasses import dataclass, field from logging import Logger -from typing import List, Optional +from typing import List, Optional, Union from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer from spark_rapids_pytools.common.utilities import ToolLogging, Utils from spark_rapids_pytools.rapids.tool_ctxt import ToolContext from spark_rapids_tools.storagelib import LocalPath +from spark_rapids_tools_distributed.jar_cmd_args import JarCmdArgs @dataclass @@ -90,10 +91,10 @@ def _build_rapids_args(self): rapids_arguments.extend(extra_rapids_args) return rapids_arguments - def _build_submission_cmd(self) -> list: + def _build_submission_cmd(self) -> Union[list, JarCmdArgs]: raise NotImplementedError - def _submit_job(self, cmd_args: list) -> str: + def _submit_job(self, cmd_args: Union[list, JarCmdArgs]) -> str: raise NotImplementedError def _print_job_output(self, job_output: str): @@ -125,13 +126,6 @@ def run_job(self): self._cleanup_temp_log4j_files() return job_output - -@dataclass -class RapidsLocalJob(RapidsJob): - """ - Implementation of a RAPIDS job that runs local on a machine. - """ - def _get_hadoop_classpath(self) -> Optional[str]: """ Gets the Hadoop's configuration directory from the environment variables. @@ -202,6 +196,13 @@ def _build_jvm_args(self): vm_args.append(val) return vm_args + +@dataclass +class RapidsLocalJob(RapidsJob): + """ + Implementation of a RAPIDS job that runs local on a machine. + """ + def _build_submission_cmd(self) -> list: # env vars are added later as a separate dictionary classpath_arr = self._build_classpath() @@ -218,3 +219,31 @@ def _submit_job(self, cmd_args: list) -> str: out_std = self.exec_ctxt.platform.cli.run_sys_cmd(cmd=cmd_args, env_vars=env_args) return out_std + + +@dataclass +class RapidsDistributedJob(RapidsJob): + """ + Implementation of a RAPIDS job that runs distributed on a cluster. + """ + + def _build_submission_cmd(self) -> JarCmdArgs: + classpath_arr = self._build_classpath() + hadoop_cp = self._get_hadoop_classpath() + jvm_args_arr = self._build_jvm_args() + jar_main_class = self.prop_container.get_jar_main_class() + jar_output_dir_args = self._get_persistent_rapids_args() + extra_rapids_args = self.prop_container.get_rapids_args() + return JarCmdArgs(jvm_args_arr, classpath_arr, hadoop_cp, jar_main_class, + jar_output_dir_args, extra_rapids_args) + + def _build_classpath(self) -> List[str]: + """ + Only the Spark RAPIDS Tools JAR file is needed for the classpath. + Each worker node should the Spark Jars pre-installed. + """ + return ['-cp', self.prop_container.get_jar_file()] + + def _submit_job(self, cmd_args: JarCmdArgs) -> None: + # TODO: Support for submitting the Tools JAR to a Spark cluster + raise NotImplementedError('Distributed job submission is not yet supported') diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index ee4338ccd..81ea42df5 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -961,8 +961,12 @@ def _submit_jobs(self): executors_cnt = len(rapids_job_containers) if Utilities.conc_mode_enabled else 1 with ThreadPoolExecutor(max_workers=executors_cnt) as executor: for rapids_job in rapids_job_containers: - job_obj = self.ctxt.platform.create_local_submission_job(job_prop=rapids_job, - ctxt=self.ctxt) + if self.ctxt.is_distributed_mode(): + job_obj = self.ctxt.platform.create_distributed_submission_job(job_prop=rapids_job, + ctxt=self.ctxt) + else: + job_obj = self.ctxt.platform.create_local_submission_job(job_prop=rapids_job, + ctxt=self.ctxt) futures = executor.submit(job_obj.run_job) futures_list.append(futures) try: @@ -970,5 +974,5 @@ def _submit_jobs(self): result = future.result() results.append(result) except Exception as ex: # pylint: disable=broad-except - self.logger.error('Failed to download dependencies %s', ex) + self.logger.error('Failed to submit jobs %s', ex) raise ex diff --git a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py index 7e850dcfa..eba8f8b23 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py +++ b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py @@ -89,6 +89,10 @@ def get_deploy_mode(self) -> Any: def is_fat_wheel_mode(self) -> bool: return self.get_ctxt('fatWheelModeEnabled') + def is_distributed_mode(self) -> bool: + distributed_mode_args = self.get_ctxt('distributedModeArgs') + return distributed_mode_args is not None and distributed_mode_args.get('distributedModeEnabled', False) + def set_ctxt(self, key: str, val: Any): self.props['wrapperCtx'][key] = val diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index 978f683c1..c4f17bf75 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -470,6 +470,7 @@ class QualifyUserArgModel(ToolUserArgModel): """ filter_apps: Optional[QualFilterApp] = None estimation_model_args: Optional[Dict] = dataclasses.field(default_factory=dict) + distributed_mode_args: Optional[Dict] = dataclasses.field(default_factory=dict) def init_tool_args(self) -> None: self.p_args['toolArgs']['platform'] = self.platform @@ -532,7 +533,8 @@ def build_tools_args(self) -> dict: 'eventlogs': self.eventlogs, 'filterApps': QualFilterApp.fromstring(self.p_args['toolArgs']['filterApps']), 'toolsJar': self.p_args['toolArgs']['toolsJar'], - 'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs'] + 'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs'], + 'distributedModeArgs': self.distributed_mode_args } return wrapped_args @@ -754,3 +756,27 @@ def build_tools_args(self) -> dict: 'output_folder': self.output_folder, 'platformOpts': {}, } + + +@dataclass +@register_tool_arg_validator('distributed_mode') +class DistributedModeArgProcessor(AbsToolUserArgModel): + """ + Class to validate the arguments of Distributed Mode + """ + distributed_mode: Optional[bool] = None + spark_config_file: Optional[str] = None + + def validate_spark_conf_file(self) -> None: + # validate the spark configuration file path is valid + if self.spark_config_file is not None: + if not CspPath.is_file_path(self.spark_config_file, extensions=['conf']): + raise PydanticCustomError( + 'spark_conf_file', + f'spark configuration file path {self.spark_config_file} is not valid\n Error:') + + def build_tools_args(self) -> dict: + return { + 'distributedModeEnabled': self.distributed_mode, + 'sparkConfigFile': self.spark_config_file, + } diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index 0d46e5025..701411c96 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -46,6 +46,8 @@ def qualification(self, jvm_threads: int = None, verbose: bool = None, tools_config_file: str = None, + distributed: bool = None, + spark_config_file: str = None, **rapids_options) -> None: """The Qualification cmd provides estimated speedups by migrating Apache Spark applications to GPU accelerated clusters. @@ -83,6 +85,8 @@ def qualification(self, :param tools_config_file: Path to a configuration file that contains the tools' options. For sample configuration files, please visit https://github.com/NVIDIA/spark-rapids-tools/tree/main/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid + :param distributed: True or False to enable distributed mode. + :param spark_config_file: Path to a Spark configuration file to be used in distributed mode. :param rapids_options: A list of valid Qualification tool options. Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support multiple "spark-property" arguments. @@ -107,6 +111,9 @@ def qualification(self, custom_model_file=custom_model_file) if estimation_model_args is None: return None + distributed_mode_args = AbsToolUserArgModel.create_tool_args('distributed_mode', + distributed_mode=distributed, + spark_config_file=spark_config_file) qual_args = AbsToolUserArgModel.create_tool_args('qualification', eventlogs=eventlogs, cluster=cluster, @@ -117,7 +124,8 @@ def qualification(self, jvm_threads=jvm_threads, filter_apps=filter_apps, estimation_model_args=estimation_model_args, - tools_config_path=tools_config_file) + tools_config_path=tools_config_file, + distributed_mode_args=distributed_mode_args) if qual_args: tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'], output_folder=qual_args['outputFolder'], diff --git a/user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py b/user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py index de762013d..db59b85f8 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py +++ b/user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py @@ -105,7 +105,7 @@ def _preprocess_dataframes(self) -> None: self.execs_df = (self.execs_df.explode('Exec Stages'). dropna(subset=['Exec Stages']). rename(columns={'Exec Stages': 'Stage ID'})) - self.execs_df['Stage ID'] = self.execs_df['Stage ID'].astype(int) + self.execs_df['Stage ID'] = self.execs_df['Stage ID'].astype(float) # Remove duplicate 'Stage ID' rows and rename some columns so that join on dataframes # can be done easily diff --git a/user_tools/src/spark_rapids_tools_distributed/jar_cmd_args.py b/user_tools/src/spark_rapids_tools_distributed/jar_cmd_args.py new file mode 100644 index 000000000..c6513c1c6 --- /dev/null +++ b/user_tools/src/spark_rapids_tools_distributed/jar_cmd_args.py @@ -0,0 +1,30 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Jar command arguments for running the Tools JAR on Spark """ +from dataclasses import dataclass, field +from typing import List + + +@dataclass +class JarCmdArgs: + """ + Wrapper class to store the arguments required to run the Tools JAR on Spark. + """ + jvm_args: List[str] = field(default=None, init=True) + classpath_arr: List[str] = field(default=None, init=True) + hadoop_classpath: str = field(default=None, init=True) + jar_main_class: str = field(default=None, init=True) + jar_output_dir_args: List[str] = field(default=None, init=True) + extra_rapids_args: List[str] = field(default=None, init=True) From e62979936e1a8b8e1f626ea6955188a8717473cc Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Sat, 16 Nov 2024 20:39:04 -0800 Subject: [PATCH 02/14] Refactor to use tools config file Signed-off-by: Partho Sarthi --- .../spark_rapids_pytools/cloud_api/onprem.py | 2 +- .../rapids/qualification.py | 8 +-- .../spark_rapids_pytools/rapids/rapids_job.py | 5 ++ .../rapids/rapids_tool.py | 26 ++++++- .../spark_rapids_pytools/rapids/tool_ctxt.py | 3 +- .../spark_rapids_tools/cmdli/argprocessor.py | 28 +------- .../src/spark_rapids_tools/cmdli/tools_cli.py | 7 +- .../configuration/distributed_tools_config.py | 36 ++++++++++ .../configuration/tools_config.py | 9 ++- .../sample-config-specification.json | 71 ++++++++++++++++++- .../tools_config/valid/tools_config_01.yaml | 7 ++ 11 files changed, 158 insertions(+), 44 deletions(-) create mode 100644 user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py create mode 100644 user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py index f0b7c22b4..f34bb3887 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py @@ -162,7 +162,7 @@ class OnPremDistributedRapidsJob(RapidsDistributedJob): """ Implementation of a RAPIDS job that runs on a distributed cluster """ - job_label = 'onpremDistributed' + job_label = 'onprem.distributed' @dataclass diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 76e85abd6..b112eedce 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -153,9 +153,9 @@ def _process_estimation_model_args(self) -> None: estimation_model_args = QualEstimationModel.create_default_model_args(selected_model) self.ctxt.set_ctxt('estimationModelArgs', estimation_model_args) - def _process_distributed_mode_args(self) -> None: - distributed_mode_args = self.wrapper_options.get('distributedModeArgs') - self.ctxt.set_ctxt('distributedModeArgs', distributed_mode_args) + def _process_distributed_tools_args(self) -> None: + distributed_tools_enabled = self.wrapper_options.get('distributedToolsEnabled') + self.ctxt.set_ctxt('distributedToolsEnabled', distributed_tools_enabled) def _process_custom_args(self) -> None: """ @@ -185,7 +185,7 @@ def _process_custom_args(self) -> None: self._process_estimation_model_args() self._process_offline_cluster_args() self._process_eventlogs_args() - self._process_distributed_mode_args() + self._process_distributed_tools_args() # This is noise to dump everything # self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx']) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py index 27fcf8747..ab71ff3f7 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py @@ -39,6 +39,8 @@ def _init_fields(self): self.props['sparkConfArgs'] = {} if self.get_value_silent('platformArgs') is None: self.props['platformArgs'] = {} + if self.get_value_silent('distributedToolsConfigs') is None: + self.props['distributedToolsConfigs'] = {} def get_jar_file(self): return self.get_value('rapidsArgs', 'jarFile') @@ -49,6 +51,9 @@ def get_jar_main_class(self): def get_rapids_args(self): return self.get_value('rapidsArgs', 'jarArgs') + def get_distribution_tools_configs(self): + return self.get_value('distributedToolsConfigs') + @dataclass class RapidsJob: diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 81ea42df5..6e3dcd8e1 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -39,6 +39,7 @@ from spark_rapids_pytools.rapids.tool_ctxt import ToolContext from spark_rapids_tools import CspEnv from spark_rapids_tools.configuration.common import RuntimeDependency +from spark_rapids_tools.configuration.distributed_tools_config import DistributedToolsConfig from spark_rapids_tools.configuration.tools_config import ToolsConfig from spark_rapids_tools.enums import DependencyType from spark_rapids_tools.storagelib import LocalPath, CspFs @@ -608,7 +609,7 @@ def populate_dependency_list() -> List[RuntimeDependency]: # check if the dependencies is defined in a config file config_obj = self.get_tools_config_obj() if config_obj is not None: - if config_obj.runtime.dependencies: + if config_obj.runtime and config_obj.runtime.dependencies: return config_obj.runtime.dependencies self.logger.info('The ToolsConfig did not specify the dependencies. ' 'Falling back to the default dependencies.') @@ -939,10 +940,33 @@ def _prepare_local_job_arguments(self): 'sparkConfArgs': spark_conf_args, 'platformArgs': platform_args } + # Set the configuration for the distributed tools + distributed_tools_configs = self._get_distributed_tools_configs() + if distributed_tools_configs: + job_properties_json['distributedToolsConfigs'] = distributed_tools_configs rapids_job_container = RapidsJobPropContainer(prop_arg=job_properties_json, file_load=False) self.ctxt.set_ctxt('rapidsJobContainers', [rapids_job_container]) + def _get_distributed_tools_configs(self) -> Optional[DistributedToolsConfig]: + """ + Get the distributed tools configurations from the tools config file + """ + config_obj = self.get_tools_config_obj() + if config_obj and config_obj.distributed_tools: + if self.ctxt.is_distributed_mode(): + return config_obj.distributed_tools + self.logger.warning( + 'Distributed tool configurations detected, but distributed mode is not active. ' + 'Switching to local mode.' + ) + elif self.ctxt.is_distributed_mode(): + self.logger.warning( + 'Distributed mode is enabled, but no distributed tool configurations were provided. ' + 'Using default settings.' + ) + return None + def _archive_results(self): self._archive_local_results() diff --git a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py index eba8f8b23..c9d229d15 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py +++ b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py @@ -90,8 +90,7 @@ def is_fat_wheel_mode(self) -> bool: return self.get_ctxt('fatWheelModeEnabled') def is_distributed_mode(self) -> bool: - distributed_mode_args = self.get_ctxt('distributedModeArgs') - return distributed_mode_args is not None and distributed_mode_args.get('distributedModeEnabled', False) + return self.get_ctxt('distributedToolsEnabled') or False def set_ctxt(self, key: str, val: Any): self.props['wrapperCtx'][key] = val diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index c4f17bf75..339d6bd2d 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -470,7 +470,7 @@ class QualifyUserArgModel(ToolUserArgModel): """ filter_apps: Optional[QualFilterApp] = None estimation_model_args: Optional[Dict] = dataclasses.field(default_factory=dict) - distributed_mode_args: Optional[Dict] = dataclasses.field(default_factory=dict) + distributed_tools_enabled: Optional[bool] = None def init_tool_args(self) -> None: self.p_args['toolArgs']['platform'] = self.platform @@ -534,7 +534,7 @@ def build_tools_args(self) -> dict: 'filterApps': QualFilterApp.fromstring(self.p_args['toolArgs']['filterApps']), 'toolsJar': self.p_args['toolArgs']['toolsJar'], 'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs'], - 'distributedModeArgs': self.distributed_mode_args + 'distributedToolsEnabled': self.distributed_tools_enabled } return wrapped_args @@ -756,27 +756,3 @@ def build_tools_args(self) -> dict: 'output_folder': self.output_folder, 'platformOpts': {}, } - - -@dataclass -@register_tool_arg_validator('distributed_mode') -class DistributedModeArgProcessor(AbsToolUserArgModel): - """ - Class to validate the arguments of Distributed Mode - """ - distributed_mode: Optional[bool] = None - spark_config_file: Optional[str] = None - - def validate_spark_conf_file(self) -> None: - # validate the spark configuration file path is valid - if self.spark_config_file is not None: - if not CspPath.is_file_path(self.spark_config_file, extensions=['conf']): - raise PydanticCustomError( - 'spark_conf_file', - f'spark configuration file path {self.spark_config_file} is not valid\n Error:') - - def build_tools_args(self) -> dict: - return { - 'distributedModeEnabled': self.distributed_mode, - 'sparkConfigFile': self.spark_config_file, - } diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index 701411c96..a154e6c84 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -47,7 +47,6 @@ def qualification(self, verbose: bool = None, tools_config_file: str = None, distributed: bool = None, - spark_config_file: str = None, **rapids_options) -> None: """The Qualification cmd provides estimated speedups by migrating Apache Spark applications to GPU accelerated clusters. @@ -86,7 +85,6 @@ def qualification(self, For sample configuration files, please visit https://github.com/NVIDIA/spark-rapids-tools/tree/main/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid :param distributed: True or False to enable distributed mode. - :param spark_config_file: Path to a Spark configuration file to be used in distributed mode. :param rapids_options: A list of valid Qualification tool options. Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support multiple "spark-property" arguments. @@ -111,9 +109,6 @@ def qualification(self, custom_model_file=custom_model_file) if estimation_model_args is None: return None - distributed_mode_args = AbsToolUserArgModel.create_tool_args('distributed_mode', - distributed_mode=distributed, - spark_config_file=spark_config_file) qual_args = AbsToolUserArgModel.create_tool_args('qualification', eventlogs=eventlogs, cluster=cluster, @@ -125,7 +120,7 @@ def qualification(self, filter_apps=filter_apps, estimation_model_args=estimation_model_args, tools_config_path=tools_config_file, - distributed_mode_args=distributed_mode_args) + distributed_tools_enabled=distributed) if qual_args: tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'], output_folder=qual_args['outputFolder'], diff --git a/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py b/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py new file mode 100644 index 000000000..faf7922ec --- /dev/null +++ b/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py @@ -0,0 +1,36 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Configuration file for distributed tools """ +from typing import List + +from pydantic import BaseModel, Field + + +class SparkProperty(BaseModel): + """Represents a single Spark property with a name and value.""" + name: str = Field( + description='The name of the Spark property, e.g., "spark.executor.memory".') + value: str = Field( + description='The value of the Spark property, e.g., "4g".') + + +class DistributedToolsConfig(BaseModel): + """Configuration class for distributed tools""" + spark_properties: List[SparkProperty] = Field( + default_factory=list, + description='List of Spark properties to be used for the Spark session.', + examples=[{'name': 'spark.executor.memory', 'value': '4g'}, + {'name': 'spark.executor.cores', 'value': '4'}] + ) diff --git a/user_tools/src/spark_rapids_tools/configuration/tools_config.py b/user_tools/src/spark_rapids_tools/configuration/tools_config.py index b330cfc28..5dc32438a 100644 --- a/user_tools/src/spark_rapids_tools/configuration/tools_config.py +++ b/user_tools/src/spark_rapids_tools/configuration/tools_config.py @@ -21,6 +21,7 @@ from pydantic import BaseModel, Field, ValidationError from spark_rapids_tools import CspPathT +from spark_rapids_tools.configuration.distributed_tools_config import DistributedToolsConfig from spark_rapids_tools.configuration.runtime_conf import ToolsRuntimeConfig from spark_rapids_tools.utils import AbstractPropContainer @@ -34,9 +35,15 @@ class ToolsConfig(BaseModel): examples=['1.0'], le=1.0, # minimum version compatible with the current tools implementation ge=1.0) - runtime: ToolsRuntimeConfig = Field( + + runtime: Optional[ToolsRuntimeConfig] = Field( + default=None, description='Configuration related to the runtime environment of the tools.') + distributed_tools: Optional[DistributedToolsConfig] = Field( + default=None, + description='Configuration related to the distributed tools.') + @classmethod def load_from_file(cls, file_path: Union[str, CspPathT]) -> Optional['ToolsConfig']: """Load the tools configuration from a file""" diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json index 9dbef10ab..f2823b266 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json @@ -36,6 +36,31 @@ "title": "DependencyVerification", "type": "object" }, + "DistributedToolsConfig": { + "description": "Configuration class for distributed tools", + "properties": { + "spark_properties": { + "description": "List of Spark properties to be used for the Spark session.", + "examples": [ + { + "name": "spark.executor.memory", + "value": "4g" + }, + { + "name": "spark.executor.cores", + "value": "4" + } + ], + "items": { + "$ref": "#/$defs/SparkProperty" + }, + "title": "Spark Properties", + "type": "array" + } + }, + "title": "DistributedToolsConfig", + "type": "object" + }, "FileHashAlgorithm": { "description": "Represents a file hash algorithm and its value. Used for verification against an existing file.", "properties": { @@ -137,6 +162,27 @@ "title": "RuntimeDependencyType", "type": "object" }, + "SparkProperty": { + "description": "Represents a single Spark property with a name and value.", + "properties": { + "name": { + "description": "The name of the Spark property, e.g., \"spark.executor.memory\".", + "title": "Name", + "type": "string" + }, + "value": { + "description": "The value of the Spark property, e.g., \"4g\".", + "title": "Value", + "type": "string" + } + }, + "required": [ + "name", + "value" + ], + "title": "SparkProperty", + "type": "object" + }, "ToolsRuntimeConfig": { "description": "The runtime configurations of the tools as defined by the user.", "properties": { @@ -169,13 +215,32 @@ "type": "number" }, "runtime": { - "$ref": "#/$defs/ToolsRuntimeConfig", + "anyOf": [ + { + "$ref": "#/$defs/ToolsRuntimeConfig" + }, + { + "type": "null" + } + ], + "default": null, "description": "Configuration related to the runtime environment of the tools." + }, + "distributed_tools": { + "anyOf": [ + { + "$ref": "#/$defs/DistributedToolsConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration related to the distributed tools." } }, "required": [ - "api_version", - "runtime" + "api_version" ], "title": "ToolsConfig", "type": "object" diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml new file mode 100644 index 000000000..63a4341a6 --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml @@ -0,0 +1,7 @@ +# This yaml file is a configuration file for a tool that uses defines the spark properties +# for the distributed tools. +api_version: '1.0' +distributed_tools: + spark_properties: + - name: 'spark.executor.memory' + value: '32g' From 55e1e06d7a5892ef03c17786f7c83c4678210355 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Sat, 16 Nov 2024 20:50:25 -0800 Subject: [PATCH 03/14] Update specification Signed-off-by: Partho Sarthi --- .../configuration/distributed_tools_config.py | 4 ++-- .../resources/tools_config/sample-config-specification.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py b/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py index faf7922ec..6a7c9db0e 100644 --- a/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py +++ b/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py @@ -21,9 +21,9 @@ class SparkProperty(BaseModel): """Represents a single Spark property with a name and value.""" name: str = Field( - description='The name of the Spark property, e.g., "spark.executor.memory".') + description='Name of the Spark property, e.g., "spark.executor.memory".') value: str = Field( - description='The value of the Spark property, e.g., "4g".') + description='Value of the Spark property, e.g., "4g".') class DistributedToolsConfig(BaseModel): diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json index f2823b266..3381284e1 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json @@ -166,12 +166,12 @@ "description": "Represents a single Spark property with a name and value.", "properties": { "name": { - "description": "The name of the Spark property, e.g., \"spark.executor.memory\".", + "description": "Name of the Spark property, e.g., \"spark.executor.memory\".", "title": "Name", "type": "string" }, "value": { - "description": "The value of the Spark property, e.g., \"4g\".", + "description": "Value of the Spark property, e.g., \"4g\".", "title": "Value", "type": "string" } From 33a4841b989b3fb372ffd851c05305853d2f3341 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Sun, 17 Nov 2024 01:32:59 -0800 Subject: [PATCH 04/14] Update tools config file Signed-off-by: Partho Sarthi --- .../resources/tools_config/valid/tools_config_01.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml index 63a4341a6..adae7dc7b 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml @@ -4,4 +4,4 @@ api_version: '1.0' distributed_tools: spark_properties: - name: 'spark.executor.memory' - value: '32g' + value: '20g' From ad88b94a6a5ee19ce5e2c3277668ac148ad0c620 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Sun, 17 Nov 2024 18:58:35 -0800 Subject: [PATCH 05/14] Update comment Signed-off-by: Partho Sarthi --- user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 6e3dcd8e1..20ad09488 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -957,8 +957,8 @@ def _get_distributed_tools_configs(self) -> Optional[DistributedToolsConfig]: if self.ctxt.is_distributed_mode(): return config_obj.distributed_tools self.logger.warning( - 'Distributed tool configurations detected, but distributed mode is not active. ' - 'Switching to local mode.' + 'Distributed tool configurations detected, but distributed mode is not enabled.' + 'Use \'--distributed\' flag to enable distributed mode. Switching to local mode.' ) elif self.ctxt.is_distributed_mode(): self.logger.warning( From d0a3ec0dc85e13ebd960f91d4b015a36846f0144 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 18 Nov 2024 11:10:01 -0800 Subject: [PATCH 06/14] Add pylint exception Signed-off-by: Partho Sarthi --- user_tools/src/spark_rapids_pytools/cloud_api/onprem.py | 1 + 1 file changed, 1 insertion(+) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py index f34bb3887..1ae80452d 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py @@ -157,6 +157,7 @@ class OnPremLocalRapidsJob(RapidsLocalJob): job_label = 'onpremLocal' +# pylint: disable=abstract-method @dataclass class OnPremDistributedRapidsJob(RapidsDistributedJob): """ From 04edb7db5e754fb8bf7434a3f68b001de4635923 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 22 Nov 2024 16:17:31 -0800 Subject: [PATCH 07/14] Include hdfs output dir in tools config file Signed-off-by: Partho Sarthi --- .../configuration/distributed_tools_config.py | 6 ++++++ .../tools_config/sample-config-specification.json | 11 +++++++++++ .../resources/tools_config/valid/tools_config_01.yaml | 8 ++++---- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py b/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py index 6a7c9db0e..109006277 100644 --- a/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py +++ b/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py @@ -28,6 +28,12 @@ class SparkProperty(BaseModel): class DistributedToolsConfig(BaseModel): """Configuration class for distributed tools""" + hdfs_output_dir: str = Field( + description='HDFS output directory where the output data from the distributed ' + 'tools will be stored.', + examples=['hdfs:///path/to/output/dir'] + ) + spark_properties: List[SparkProperty] = Field( default_factory=list, description='List of Spark properties to be used for the Spark session.', diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json index 3381284e1..fab8d9bcc 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json @@ -39,6 +39,14 @@ "DistributedToolsConfig": { "description": "Configuration class for distributed tools", "properties": { + "hdfs_output_dir": { + "description": "HDFS output directory where the output data from the distributed tools will be stored.", + "examples": [ + "hdfs:///path/to/output/dir" + ], + "title": "Hdfs Output Dir", + "type": "string" + }, "spark_properties": { "description": "List of Spark properties to be used for the Spark session.", "examples": [ @@ -58,6 +66,9 @@ "type": "array" } }, + "required": [ + "hdfs_output_dir" + ], "title": "DistributedToolsConfig", "type": "object" }, diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml index adae7dc7b..ac4686c38 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml @@ -1,7 +1,7 @@ -# This yaml file is a configuration file for a tool that uses defines the spark properties -# for the distributed tools. +# This yaml file is a sample configuration file for the distributed tools. api_version: '1.0' distributed_tools: + hdfs_output_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache' spark_properties: - - name: 'spark.executor.memory' - value: '20g' + - name: 'spark.executor.memory' + value: '20g' From 5989d00c40ab89f19f94ed32d9052c734ee6648e Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 22 Nov 2024 16:19:23 -0800 Subject: [PATCH 08/14] Add comment about assumption of Spark JARs Signed-off-by: Partho Sarthi --- user_tools/src/spark_rapids_pytools/rapids/rapids_job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py index ab71ff3f7..c8cc43d1c 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py @@ -245,7 +245,8 @@ def _build_submission_cmd(self) -> JarCmdArgs: def _build_classpath(self) -> List[str]: """ Only the Spark RAPIDS Tools JAR file is needed for the classpath. - Each worker node should the Spark Jars pre-installed. + Assumption: Each worker node should have the Spark Jars pre-installed. + TODO: Ship the Spark JARs to the cluster to avoid version mismatch issues. """ return ['-cp', self.prop_container.get_jar_file()] From 117f987995b7804e47f8a9f7d3e6ad3f59854003 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Wed, 27 Nov 2024 21:05:37 +0530 Subject: [PATCH 09/14] Revert changes in stats report Signed-off-by: Partho Sarthi --- .../src/spark_rapids_tools/tools/qualification_stats_report.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py b/user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py index db59b85f8..de762013d 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py +++ b/user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py @@ -105,7 +105,7 @@ def _preprocess_dataframes(self) -> None: self.execs_df = (self.execs_df.explode('Exec Stages'). dropna(subset=['Exec Stages']). rename(columns={'Exec Stages': 'Stage ID'})) - self.execs_df['Stage ID'] = self.execs_df['Stage ID'].astype(float) + self.execs_df['Stage ID'] = self.execs_df['Stage ID'].astype(int) # Remove duplicate 'Stage ID' rows and rename some columns so that join on dataframes # can be done easily From b252e7f07dc000960dfd39e7390c965a235328e6 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 13 Dec 2024 10:02:54 -0800 Subject: [PATCH 10/14] Submission mode Args Signed-off-by: Partho Sarthi --- .../rapids/qualification.py | 14 +++++--- .../rapids/rapids_tool.py | 4 +-- .../spark_rapids_pytools/rapids/tool_ctxt.py | 6 +++- .../spark_rapids_tools/cmdli/argprocessor.py | 8 +++-- .../src/spark_rapids_tools/cmdli/tools_cli.py | 9 ++--- .../configuration/common.py | 11 ++++++ .../configuration/distributed_tools_config.py | 28 ++++++++------- .../configuration/local_mode_config.py | 33 ++++++++++++++++++ .../configuration/tools_config.py | 9 +++-- .../configuration/tools_config_base.py | 34 +++++++++++++++++++ user_tools/src/spark_rapids_tools/enums.py | 11 ++++++ .../tools_config/valid/tools_config_01.yaml | 11 +++--- 12 files changed, 140 insertions(+), 38 deletions(-) create mode 100644 user_tools/src/spark_rapids_tools/configuration/local_mode_config.py create mode 100644 user_tools/src/spark_rapids_tools/configuration/tools_config_base.py diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index b112eedce..5bb58f4af 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -29,7 +29,7 @@ from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool -from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel +from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, SubmissionMode from spark_rapids_tools.storagelib import CspFs from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender @@ -153,9 +153,13 @@ def _process_estimation_model_args(self) -> None: estimation_model_args = QualEstimationModel.create_default_model_args(selected_model) self.ctxt.set_ctxt('estimationModelArgs', estimation_model_args) - def _process_distributed_tools_args(self) -> None: - distributed_tools_enabled = self.wrapper_options.get('distributedToolsEnabled') - self.ctxt.set_ctxt('distributedToolsEnabled', distributed_tools_enabled) + def _process_submission_mode_arg(self) -> None: + submission_mode_arg = self.wrapper_options.get('submissionMode') + if submission_mode_arg is None or not submission_mode_arg: + submission_mode = SubmissionMode.get_default() + else: + submission_mode = SubmissionMode.fromstring(submission_mode_arg) + self.ctxt.set_ctxt('submissionMode', submission_mode) def _process_custom_args(self) -> None: """ @@ -185,7 +189,7 @@ def _process_custom_args(self) -> None: self._process_estimation_model_args() self._process_offline_cluster_args() self._process_eventlogs_args() - self._process_distributed_tools_args() + self._process_submission_mode_arg() # This is noise to dump everything # self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx']) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 20ad09488..d9b01a80c 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -39,7 +39,7 @@ from spark_rapids_pytools.rapids.tool_ctxt import ToolContext from spark_rapids_tools import CspEnv from spark_rapids_tools.configuration.common import RuntimeDependency -from spark_rapids_tools.configuration.distributed_tools_config import DistributedToolsConfig +from spark_rapids_tools.configuration.distributed_tools_config import DistributedConfig from spark_rapids_tools.configuration.tools_config import ToolsConfig from spark_rapids_tools.enums import DependencyType from spark_rapids_tools.storagelib import LocalPath, CspFs @@ -948,7 +948,7 @@ def _prepare_local_job_arguments(self): file_load=False) self.ctxt.set_ctxt('rapidsJobContainers', [rapids_job_container]) - def _get_distributed_tools_configs(self) -> Optional[DistributedToolsConfig]: + def _get_distributed_tools_configs(self) -> Optional[DistributedConfig]: """ Get the distributed tools configurations from the tools config file """ diff --git a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py index c9d229d15..714192271 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py +++ b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py @@ -26,6 +26,7 @@ from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import ToolLogging, Utils from spark_rapids_tools import CspEnv, CspPath +from spark_rapids_tools.enums import SubmissionMode from spark_rapids_tools.utils import Utilities @@ -90,7 +91,10 @@ def is_fat_wheel_mode(self) -> bool: return self.get_ctxt('fatWheelModeEnabled') def is_distributed_mode(self) -> bool: - return self.get_ctxt('distributedToolsEnabled') or False + return self.get_ctxt('submissionMode') == SubmissionMode.DISTRIBUTED + + def is_local_mode(self) -> bool: + return self.get_ctxt('submissionMode') == SubmissionMode.LOCAL def set_ctxt(self, key: str, val: Any): self.props['wrapperCtx'][key] = val diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index 339d6bd2d..83b5ca214 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -29,8 +29,10 @@ from spark_rapids_pytools.common.utilities import ToolLogging, Utils from spark_rapids_tools.cloud import ClientCluster from spark_rapids_tools.utils import AbstractPropContainer, is_http_file +from ..configuration.distributed_tools_config import DistributedToolsConfig +from ..configuration.local_mode_config import LocalToolsConfig from ..configuration.tools_config import ToolsConfig -from ..enums import QualFilterApp, CspEnv, QualEstimationModel +from ..enums import QualFilterApp, CspEnv, QualEstimationModel, SubmissionMode from ..storagelib.csppath import CspPath from ..tools.autotuner import AutoTunerPropMgr from ..utils.util import dump_tool_usage, Utilities @@ -351,6 +353,7 @@ class ToolUserArgModel(AbsToolUserArgModel): jvm_heap_size: Optional[int] = None jvm_threads: Optional[int] = None tools_config_path: Optional[str] = None + submission_mode: SubmissionMode = SubmissionMode.get_default() def is_concurrent_submission(self) -> bool: return False @@ -470,7 +473,6 @@ class QualifyUserArgModel(ToolUserArgModel): """ filter_apps: Optional[QualFilterApp] = None estimation_model_args: Optional[Dict] = dataclasses.field(default_factory=dict) - distributed_tools_enabled: Optional[bool] = None def init_tool_args(self) -> None: self.p_args['toolArgs']['platform'] = self.platform @@ -534,7 +536,7 @@ def build_tools_args(self) -> dict: 'filterApps': QualFilterApp.fromstring(self.p_args['toolArgs']['filterApps']), 'toolsJar': self.p_args['toolArgs']['toolsJar'], 'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs'], - 'distributedToolsEnabled': self.distributed_tools_enabled + 'submissionMode': self.submission_mode } return wrapped_args diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index a154e6c84..31688879a 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -18,7 +18,7 @@ import fire from spark_rapids_tools.cmdli.argprocessor import AbsToolUserArgModel -from spark_rapids_tools.enums import CspEnv, QualEstimationModel +from spark_rapids_tools.enums import CspEnv, QualEstimationModel, SubmissionMode from spark_rapids_tools.utils.util import gen_app_banner, init_environment from spark_rapids_pytools.common.utilities import Utils, ToolLogging from spark_rapids_pytools.rapids.qualx.prediction import Prediction @@ -46,7 +46,7 @@ def qualification(self, jvm_threads: int = None, verbose: bool = None, tools_config_file: str = None, - distributed: bool = None, + submission_mode: str = None, **rapids_options) -> None: """The Qualification cmd provides estimated speedups by migrating Apache Spark applications to GPU accelerated clusters. @@ -84,7 +84,7 @@ def qualification(self, :param tools_config_file: Path to a configuration file that contains the tools' options. For sample configuration files, please visit https://github.com/NVIDIA/spark-rapids-tools/tree/main/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid - :param distributed: True or False to enable distributed mode. + :param submission_mode: Submission mode to run the qualification tool. Supported modes are "local" and "distributed". :param rapids_options: A list of valid Qualification tool options. Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support multiple "spark-property" arguments. @@ -97,6 +97,7 @@ def qualification(self, output_folder = Utils.get_value_or_pop(output_folder, rapids_options, 'o') filter_apps = Utils.get_value_or_pop(filter_apps, rapids_options, 'f') verbose = Utils.get_value_or_pop(verbose, rapids_options, 'v', False) + submission_mode = Utils.get_value_or_pop(submission_mode, rapids_options, 's') if verbose: ToolLogging.enable_debug_mode() init_environment('qual') @@ -120,7 +121,7 @@ def qualification(self, filter_apps=filter_apps, estimation_model_args=estimation_model_args, tools_config_path=tools_config_file, - distributed_tools_enabled=distributed) + submission_mode=submission_mode) if qual_args: tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'], output_folder=qual_args['outputFolder'], diff --git a/user_tools/src/spark_rapids_tools/configuration/common.py b/user_tools/src/spark_rapids_tools/configuration/common.py index 439904cae..6e8e82991 100644 --- a/user_tools/src/spark_rapids_tools/configuration/common.py +++ b/user_tools/src/spark_rapids_tools/configuration/common.py @@ -72,3 +72,14 @@ class RuntimeDependency(BaseModel): verification: DependencyVerification = Field( default=None, description='Optional specification to verify the dependency file.') + +class SparkProperty(BaseModel): + """Represents a single Spark property with a name and value.""" + name: str = Field( + description='Name of the Spark property, e.g., "spark.executor.memory".') + value: str = Field( + description='Value of the Spark property, e.g., "4g".') + +class ConfigBase(BaseModel): + """Base class for the configuration.""" + pass diff --git a/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py b/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py index 109006277..266664b89 100644 --- a/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py +++ b/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py @@ -13,25 +13,20 @@ # limitations under the License. """ Configuration file for distributed tools """ -from typing import List +from typing import List, Optional from pydantic import BaseModel, Field +from spark_rapids_tools.configuration.common import ConfigBase, SparkProperty +from spark_rapids_tools.configuration.tools_config_base import ToolsConfigBase -class SparkProperty(BaseModel): - """Represents a single Spark property with a name and value.""" - name: str = Field( - description='Name of the Spark property, e.g., "spark.executor.memory".') - value: str = Field( - description='Value of the Spark property, e.g., "4g".') - -class DistributedToolsConfig(BaseModel): +class DistributedConfig(ConfigBase): """Configuration class for distributed tools""" - hdfs_output_dir: str = Field( - description='HDFS output directory where the output data from the distributed ' - 'tools will be stored.', - examples=['hdfs:///path/to/output/dir'] + remote_cache_dir: str = Field( + description='Remote cache directory where the intermediate output data from each task will be stored. ' + 'Default is hdfs:///tmp/spark_rapids_distributed_tools_cache.', + default=['hdfs:///tmp/spark_rapids_distributed_tools_cache'] ) spark_properties: List[SparkProperty] = Field( @@ -40,3 +35,10 @@ class DistributedToolsConfig(BaseModel): examples=[{'name': 'spark.executor.memory', 'value': '4g'}, {'name': 'spark.executor.cores', 'value': '4'}] ) + +class DistributedToolsConfig(ToolsConfigBase): + """Container for the distributed tools configurations. This is the parts of the configuration + that can be passed as an input to the CLI""" + config: Optional[DistributedConfig] = Field( + default=None, + description='Configuration related to the distributed tools.') diff --git a/user_tools/src/spark_rapids_tools/configuration/local_mode_config.py b/user_tools/src/spark_rapids_tools/configuration/local_mode_config.py new file mode 100644 index 000000000..097bd3851 --- /dev/null +++ b/user_tools/src/spark_rapids_tools/configuration/local_mode_config.py @@ -0,0 +1,33 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Configuration file for local submission mode """ +from typing import Optional + +from pydantic import Field + +from spark_rapids_tools.configuration.common import ConfigBase +from spark_rapids_tools.configuration.tools_config_base import ToolsConfigBase + + +class LocalConfig(ConfigBase): + """Configuration class for local submission mode""" + pass + +class LocalToolsConfig(ToolsConfigBase): + """Container for the local tools configurations. This is the parts of the configuration that + can be passed as an input to the CLI""" + config: Optional[LocalConfig] = Field( + default=None, + description='Configuration related to the local tools.') diff --git a/user_tools/src/spark_rapids_tools/configuration/tools_config.py b/user_tools/src/spark_rapids_tools/configuration/tools_config.py index 5dc32438a..5790da6b2 100644 --- a/user_tools/src/spark_rapids_tools/configuration/tools_config.py +++ b/user_tools/src/spark_rapids_tools/configuration/tools_config.py @@ -22,10 +22,9 @@ from spark_rapids_tools import CspPathT from spark_rapids_tools.configuration.distributed_tools_config import DistributedToolsConfig -from spark_rapids_tools.configuration.runtime_conf import ToolsRuntimeConfig +from spark_rapids_tools.configuration.local_mode_config import LocalToolsConfig from spark_rapids_tools.utils import AbstractPropContainer - class ToolsConfig(BaseModel): """Main container for the user's defined tools configuration""" api_version: float = Field( @@ -36,11 +35,11 @@ class ToolsConfig(BaseModel): le=1.0, # minimum version compatible with the current tools implementation ge=1.0) - runtime: Optional[ToolsRuntimeConfig] = Field( + local: Optional[LocalToolsConfig] = Field( default=None, - description='Configuration related to the runtime environment of the tools.') + description='Configuration related to the local tools.') - distributed_tools: Optional[DistributedToolsConfig] = Field( + distributed: Optional[DistributedToolsConfig] = Field( default=None, description='Configuration related to the distributed tools.') diff --git a/user_tools/src/spark_rapids_tools/configuration/tools_config_base.py b/user_tools/src/spark_rapids_tools/configuration/tools_config_base.py new file mode 100644 index 000000000..13c3ff6d6 --- /dev/null +++ b/user_tools/src/spark_rapids_tools/configuration/tools_config_base.py @@ -0,0 +1,34 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Container for the custom tools configurations. This is the parts of the configuration that can +be passed as an input to the CLI""" + +from typing import Optional + +from pydantic import BaseModel, Field + +from spark_rapids_tools.configuration.common import ConfigBase +from spark_rapids_tools.configuration.runtime_conf import ToolsRuntimeConfig + + +class ToolsConfigBase(BaseModel): + """Base class for the tools configuration.""" + runtime: Optional[ToolsRuntimeConfig] = Field( + default=None, + description='Configuration related to the runtime environment of the tools.') + + config: Optional[ConfigBase] = Field( + default=None, + description='Configuration related to tools.') diff --git a/user_tools/src/spark_rapids_tools/enums.py b/user_tools/src/spark_rapids_tools/enums.py index 46db8aad1..734f3b4b1 100644 --- a/user_tools/src/spark_rapids_tools/enums.py +++ b/user_tools/src/spark_rapids_tools/enums.py @@ -222,3 +222,14 @@ def create_default_model_args(cls, model_type: str) -> dict: 'xgboostEnabled': model_type == QualEstimationModel.XGBOOST, 'customModelFile': None, } + + + +class SubmissionMode(EnumeratedType): + """Values used to define the submission mode of the applications""" + LOCAL = 'local' + DISTRIBUTED = 'distributed' + + @classmethod + def get_default(cls): + return cls.LOCAL diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml index ac4686c38..0ec04a119 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml @@ -1,7 +1,8 @@ # This yaml file is a sample configuration file for the distributed tools. api_version: '1.0' -distributed_tools: - hdfs_output_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache' - spark_properties: - - name: 'spark.executor.memory' - value: '20g' +distributed: + config: + remote_cache_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache' + spark_properties: + - name: 'spark.executor.memory' + value: '20g' From 34d1dc163d38e6a7735ad41881b7e7047cd0b00a Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 16 Dec 2024 10:26:52 -0800 Subject: [PATCH 11/14] Modify the arguments structure Signed-off-by: Partho Sarthi --- .../rapids/rapids_tool.py | 11 +- .../resources/databricks_aws-configs.json | 6 +- .../spark_rapids_tools/cmdli/argprocessor.py | 19 +- .../src/spark_rapids_tools/cmdli/tools_cli.py | 5 +- .../configuration/common.py | 26 ++- .../configuration/runtime_conf.py | 6 +- .../configuration/submission/__init__.py | 13 ++ .../distributed_config.py} | 21 +- .../local_config.py} | 16 +- .../configuration/tools_config.py | 19 +- .../configuration/tools_config_base.py | 34 --- user_tools/src/spark_rapids_tools/enums.py | 1 - ...ple-distributed-config-specification.json} | 36 +-- .../sample-local-config-specification.json | 213 ++++++++++++++++++ .../tools_config/valid/tools_config_01.yaml | 15 +- .../tools_config/valid/tools_config_02.yaml | 17 ++ 16 files changed, 346 insertions(+), 112 deletions(-) create mode 100644 user_tools/src/spark_rapids_tools/configuration/submission/__init__.py rename user_tools/src/spark_rapids_tools/configuration/{distributed_tools_config.py => submission/distributed_config.py} (66%) rename user_tools/src/spark_rapids_tools/configuration/{local_mode_config.py => submission/local_config.py} (64%) delete mode 100644 user_tools/src/spark_rapids_tools/configuration/tools_config_base.py rename user_tools/tests/spark_rapids_tools_ut/resources/tools_config/{sample-config-specification.json => sample-distributed-config-specification.json} (86%) create mode 100644 user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-local-config-specification.json create mode 100644 user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_02.yaml diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index d9b01a80c..7de6ed07b 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -39,7 +39,7 @@ from spark_rapids_pytools.rapids.tool_ctxt import ToolContext from spark_rapids_tools import CspEnv from spark_rapids_tools.configuration.common import RuntimeDependency -from spark_rapids_tools.configuration.distributed_tools_config import DistributedConfig +from spark_rapids_tools.configuration.submission.distributed_config import DistributedToolsConfig from spark_rapids_tools.configuration.tools_config import ToolsConfig from spark_rapids_tools.enums import DependencyType from spark_rapids_tools.storagelib import LocalPath, CspFs @@ -948,14 +948,15 @@ def _prepare_local_job_arguments(self): file_load=False) self.ctxt.set_ctxt('rapidsJobContainers', [rapids_job_container]) - def _get_distributed_tools_configs(self) -> Optional[DistributedConfig]: + def _get_distributed_tools_configs(self) -> Optional[DistributedToolsConfig]: """ - Get the distributed tools configurations from the tools config file + Parse the tools configuration and return as distributed tools configuration object """ config_obj = self.get_tools_config_obj() - if config_obj and config_obj.distributed_tools: + print(config_obj.get_schema()) + if config_obj and config_obj.submission: if self.ctxt.is_distributed_mode(): - return config_obj.distributed_tools + return config_obj self.logger.warning( 'Distributed tool configurations detected, but distributed mode is not enabled.' 'Use \'--distributed\' flag to enable distributed mode. Switching to local mode.' diff --git a/user_tools/src/spark_rapids_pytools/resources/databricks_aws-configs.json b/user_tools/src/spark_rapids_pytools/resources/databricks_aws-configs.json index d0cde1298..9cddf96f8 100644 --- a/user_tools/src/spark_rapids_pytools/resources/databricks_aws-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/databricks_aws-configs.json @@ -28,8 +28,7 @@ "value": "a65839fbf1869f81a1632e09f415e586922e4f80" }, "size": 962685 - }, - "type": "jar" + } }, { "name": "AWS Java SDK Bundled", @@ -40,8 +39,7 @@ "value": "02deec3a0ad83d13d032b1812421b23d7a961eea" }, "size": 280645251 - }, - "type": "jar" + } } ], "333": [ diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index 83b5ca214..d59697ef1 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -29,9 +29,8 @@ from spark_rapids_pytools.common.utilities import ToolLogging, Utils from spark_rapids_tools.cloud import ClientCluster from spark_rapids_tools.utils import AbstractPropContainer, is_http_file -from ..configuration.distributed_tools_config import DistributedToolsConfig -from ..configuration.local_mode_config import LocalToolsConfig -from ..configuration.tools_config import ToolsConfig +from ..configuration.submission.distributed_config import DistributedToolsConfig +from ..configuration.submission.local_config import LocalToolsConfig from ..enums import QualFilterApp, CspEnv, QualEstimationModel, SubmissionMode from ..storagelib.csppath import CspPath from ..tools.autotuner import AutoTunerPropMgr @@ -353,7 +352,6 @@ class ToolUserArgModel(AbsToolUserArgModel): jvm_heap_size: Optional[int] = None jvm_threads: Optional[int] = None tools_config_path: Optional[str] = None - submission_mode: SubmissionMode = SubmissionMode.get_default() def is_concurrent_submission(self) -> bool: return False @@ -385,7 +383,11 @@ def load_tools_config(self) -> None: if self.tools_config_path is not None: # the CLI provides a tools config file try: - self.p_args['toolArgs']['toolsConfig'] = ToolsConfig.load_from_file(self.tools_config_path) + if self.p_args['toolArgs']['submissionMode'] == SubmissionMode.DISTRIBUTED: + tools_config = DistributedToolsConfig.load_from_file(self.tools_config_path) + else: + tools_config = LocalToolsConfig.load_from_file(self.tools_config_path) + self.p_args['toolArgs']['toolsConfig'] = tools_config except ValidationError as ve: # If required, we can dump the expected specification by appending # 'ToolsConfig.get_schema()' to the error message @@ -473,6 +475,7 @@ class QualifyUserArgModel(ToolUserArgModel): """ filter_apps: Optional[QualFilterApp] = None estimation_model_args: Optional[Dict] = dataclasses.field(default_factory=dict) + submission_mode: Optional[SubmissionMode] = None def init_tool_args(self) -> None: self.p_args['toolArgs']['platform'] = self.platform @@ -490,6 +493,10 @@ def init_tool_args(self) -> None: self.p_args['toolArgs']['estimationModelArgs'] = QualEstimationModel.create_default_model_args(def_model) else: self.p_args['toolArgs']['estimationModelArgs'] = self.estimation_model_args + if self.submission_mode is None or not self.submission_mode: + self.p_args['toolArgs']['submissionMode'] = SubmissionMode.get_default() + else: + self.p_args['toolArgs']['submissionMode'] = self.submission_mode @model_validator(mode='after') def validate_arg_cases(self) -> 'QualifyUserArgModel': @@ -536,7 +543,7 @@ def build_tools_args(self) -> dict: 'filterApps': QualFilterApp.fromstring(self.p_args['toolArgs']['filterApps']), 'toolsJar': self.p_args['toolArgs']['toolsJar'], 'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs'], - 'submissionMode': self.submission_mode + 'submissionMode': self.p_args['toolArgs']['submissionMode'] } return wrapped_args diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index 31688879a..e575ee8c1 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -18,7 +18,7 @@ import fire from spark_rapids_tools.cmdli.argprocessor import AbsToolUserArgModel -from spark_rapids_tools.enums import CspEnv, QualEstimationModel, SubmissionMode +from spark_rapids_tools.enums import CspEnv, QualEstimationModel from spark_rapids_tools.utils.util import gen_app_banner, init_environment from spark_rapids_pytools.common.utilities import Utils, ToolLogging from spark_rapids_pytools.rapids.qualx.prediction import Prediction @@ -84,7 +84,8 @@ def qualification(self, :param tools_config_file: Path to a configuration file that contains the tools' options. For sample configuration files, please visit https://github.com/NVIDIA/spark-rapids-tools/tree/main/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid - :param submission_mode: Submission mode to run the qualification tool. Supported modes are "local" and "distributed". + :param submission_mode: Submission mode to run the qualification tool. + Supported modes are "local" and "distributed". :param rapids_options: A list of valid Qualification tool options. Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support multiple "spark-property" arguments. diff --git a/user_tools/src/spark_rapids_tools/configuration/common.py b/user_tools/src/spark_rapids_tools/configuration/common.py index 6e8e82991..9df8a763e 100644 --- a/user_tools/src/spark_rapids_tools/configuration/common.py +++ b/user_tools/src/spark_rapids_tools/configuration/common.py @@ -22,7 +22,18 @@ from spark_rapids_tools.storagelib.tools.fs_utils import FileHashAlgorithm -class RuntimeDependencyType(BaseModel): +class BaseConfig(BaseModel, extra='forbid'): + """ + BaseConfig class for Pydantic models that enforces the `extra = forbid` + setting. This ensures that no extra keys are allowed in any model or + subclass that inherits from this base class. + + This base class is meant to be inherited by other Pydantic models related + to tools configurations so that we can enforce a global rule. + """ + + +class RuntimeDependencyType(BaseConfig): """Defines the type of runtime dependency required by the tools' java cmd.""" dep_type: DependencyType = Field( @@ -36,7 +47,7 @@ class RuntimeDependencyType(BaseModel): examples=['jars/*']) -class DependencyVerification(BaseModel): +class DependencyVerification(BaseConfig): """The verification information of a runtime dependency required by the tools' java cmd.""" size: int = Field( default=0, @@ -53,7 +64,7 @@ class DependencyVerification(BaseModel): }]) -class RuntimeDependency(BaseModel): +class RuntimeDependency(BaseConfig): """Holds information about a runtime dependency required by the tools' java cmd.""" name: str = Field( description='The name of the dependency.', @@ -73,13 +84,14 @@ class RuntimeDependency(BaseModel): default=None, description='Optional specification to verify the dependency file.') -class SparkProperty(BaseModel): + +class SparkProperty(BaseConfig): """Represents a single Spark property with a name and value.""" name: str = Field( description='Name of the Spark property, e.g., "spark.executor.memory".') value: str = Field( description='Value of the Spark property, e.g., "4g".') -class ConfigBase(BaseModel): - """Base class for the configuration.""" - pass + +class SubmissionConfig(BaseConfig): + """Base class for the tools configuration.""" diff --git a/user_tools/src/spark_rapids_tools/configuration/runtime_conf.py b/user_tools/src/spark_rapids_tools/configuration/runtime_conf.py index 40ab68cf7..7878d369d 100644 --- a/user_tools/src/spark_rapids_tools/configuration/runtime_conf.py +++ b/user_tools/src/spark_rapids_tools/configuration/runtime_conf.py @@ -16,12 +16,12 @@ from typing import List -from pydantic import BaseModel, Field +from pydantic import Field -from spark_rapids_tools.configuration.common import RuntimeDependency +from spark_rapids_tools.configuration.common import RuntimeDependency, BaseConfig -class ToolsRuntimeConfig(BaseModel): +class ToolsRuntimeConfig(BaseConfig): """The runtime configurations of the tools as defined by the user.""" dependencies: List[RuntimeDependency] = Field( description='The list of runtime dependencies required by the tools java cmd. ' diff --git a/user_tools/src/spark_rapids_tools/configuration/submission/__init__.py b/user_tools/src/spark_rapids_tools/configuration/submission/__init__.py new file mode 100644 index 000000000..e8b752e95 --- /dev/null +++ b/user_tools/src/spark_rapids_tools/configuration/submission/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py b/user_tools/src/spark_rapids_tools/configuration/submission/distributed_config.py similarity index 66% rename from user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py rename to user_tools/src/spark_rapids_tools/configuration/submission/distributed_config.py index 266664b89..f50f3b9b5 100644 --- a/user_tools/src/spark_rapids_tools/configuration/distributed_tools_config.py +++ b/user_tools/src/spark_rapids_tools/configuration/submission/distributed_config.py @@ -12,17 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" Configuration file for distributed tools """ +""" Configuration file for distributed submission mode """ from typing import List, Optional -from pydantic import BaseModel, Field +from pydantic import Field -from spark_rapids_tools.configuration.common import ConfigBase, SparkProperty -from spark_rapids_tools.configuration.tools_config_base import ToolsConfigBase +from spark_rapids_tools.configuration.common import SparkProperty, SubmissionConfig +from spark_rapids_tools.configuration.tools_config import ToolsConfig -class DistributedConfig(ConfigBase): - """Configuration class for distributed tools""" +class DistributedSubmissionConfig(SubmissionConfig): + """Configuration class for distributed submission mode""" remote_cache_dir: str = Field( description='Remote cache directory where the intermediate output data from each task will be stored. ' 'Default is hdfs:///tmp/spark_rapids_distributed_tools_cache.', @@ -36,9 +36,10 @@ class DistributedConfig(ConfigBase): {'name': 'spark.executor.cores', 'value': '4'}] ) -class DistributedToolsConfig(ToolsConfigBase): - """Container for the distributed tools configurations. This is the parts of the configuration + +class DistributedToolsConfig(ToolsConfig): + """Container for the distributed submission mode configurations. This is the parts of the configuration that can be passed as an input to the CLI""" - config: Optional[DistributedConfig] = Field( + submission: Optional[DistributedSubmissionConfig] = Field( default=None, - description='Configuration related to the distributed tools.') + description='Configuration related to distributed submission mode.') diff --git a/user_tools/src/spark_rapids_tools/configuration/local_mode_config.py b/user_tools/src/spark_rapids_tools/configuration/submission/local_config.py similarity index 64% rename from user_tools/src/spark_rapids_tools/configuration/local_mode_config.py rename to user_tools/src/spark_rapids_tools/configuration/submission/local_config.py index 097bd3851..944f6fb0f 100644 --- a/user_tools/src/spark_rapids_tools/configuration/local_mode_config.py +++ b/user_tools/src/spark_rapids_tools/configuration/submission/local_config.py @@ -17,17 +17,17 @@ from pydantic import Field -from spark_rapids_tools.configuration.common import ConfigBase -from spark_rapids_tools.configuration.tools_config_base import ToolsConfigBase +from spark_rapids_tools.configuration.common import SubmissionConfig +from spark_rapids_tools.configuration.tools_config import ToolsConfig -class LocalConfig(ConfigBase): +class LocalSubmissionConfig(SubmissionConfig): """Configuration class for local submission mode""" - pass -class LocalToolsConfig(ToolsConfigBase): - """Container for the local tools configurations. This is the parts of the configuration that + +class LocalToolsConfig(ToolsConfig): + """Container for the local submission mode configurations. This is the parts of the configuration that can be passed as an input to the CLI""" - config: Optional[LocalConfig] = Field( + submission: Optional[LocalSubmissionConfig] = Field( default=None, - description='Configuration related to the local tools.') + description='Configuration related to local submission mode.') diff --git a/user_tools/src/spark_rapids_tools/configuration/tools_config.py b/user_tools/src/spark_rapids_tools/configuration/tools_config.py index 5790da6b2..29bf7dcf8 100644 --- a/user_tools/src/spark_rapids_tools/configuration/tools_config.py +++ b/user_tools/src/spark_rapids_tools/configuration/tools_config.py @@ -18,14 +18,15 @@ import json from typing import Union, Optional -from pydantic import BaseModel, Field, ValidationError +from pydantic import Field, ValidationError from spark_rapids_tools import CspPathT -from spark_rapids_tools.configuration.distributed_tools_config import DistributedToolsConfig -from spark_rapids_tools.configuration.local_mode_config import LocalToolsConfig +from spark_rapids_tools.configuration.common import BaseConfig, SubmissionConfig +from spark_rapids_tools.configuration.runtime_conf import ToolsRuntimeConfig from spark_rapids_tools.utils import AbstractPropContainer -class ToolsConfig(BaseModel): + +class ToolsConfig(BaseConfig): """Main container for the user's defined tools configuration""" api_version: float = Field( description='The version of the API that the tools are using. ' @@ -35,13 +36,13 @@ class ToolsConfig(BaseModel): le=1.0, # minimum version compatible with the current tools implementation ge=1.0) - local: Optional[LocalToolsConfig] = Field( + runtime: Optional[ToolsRuntimeConfig] = Field( default=None, - description='Configuration related to the local tools.') + description='Configuration related to the runtime environment of the tools.') - distributed: Optional[DistributedToolsConfig] = Field( + submission: Optional[SubmissionConfig] = Field( default=None, - description='Configuration related to the distributed tools.') + description='Configuration related to the submission.') @classmethod def load_from_file(cls, file_path: Union[str, CspPathT]) -> Optional['ToolsConfig']: @@ -50,7 +51,7 @@ def load_from_file(cls, file_path: Union[str, CspPathT]) -> Optional['ToolsConfi prop_container = AbstractPropContainer.load_from_file(file_path) return cls(**prop_container.props) except ValidationError as e: - # Do nothing. This is kept as a place holder if we want to log the error inside the + # Do nothing. This is kept as a placeholder if we want to log the error inside the # class first raise e diff --git a/user_tools/src/spark_rapids_tools/configuration/tools_config_base.py b/user_tools/src/spark_rapids_tools/configuration/tools_config_base.py deleted file mode 100644 index 13c3ff6d6..000000000 --- a/user_tools/src/spark_rapids_tools/configuration/tools_config_base.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Container for the custom tools configurations. This is the parts of the configuration that can -be passed as an input to the CLI""" - -from typing import Optional - -from pydantic import BaseModel, Field - -from spark_rapids_tools.configuration.common import ConfigBase -from spark_rapids_tools.configuration.runtime_conf import ToolsRuntimeConfig - - -class ToolsConfigBase(BaseModel): - """Base class for the tools configuration.""" - runtime: Optional[ToolsRuntimeConfig] = Field( - default=None, - description='Configuration related to the runtime environment of the tools.') - - config: Optional[ConfigBase] = Field( - default=None, - description='Configuration related to tools.') diff --git a/user_tools/src/spark_rapids_tools/enums.py b/user_tools/src/spark_rapids_tools/enums.py index 734f3b4b1..15f9b8828 100644 --- a/user_tools/src/spark_rapids_tools/enums.py +++ b/user_tools/src/spark_rapids_tools/enums.py @@ -224,7 +224,6 @@ def create_default_model_args(cls, model_type: str) -> dict: } - class SubmissionMode(EnumeratedType): """Values used to define the submission mode of the applications""" LOCAL = 'local' diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-distributed-config-specification.json similarity index 86% rename from user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json rename to user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-distributed-config-specification.json index fab8d9bcc..c183fec18 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-config-specification.json +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-distributed-config-specification.json @@ -10,6 +10,7 @@ "type": "string" }, "DependencyVerification": { + "additionalProperties": false, "description": "The verification information of a runtime dependency required by the tools' java cmd.", "properties": { "size": { @@ -36,15 +37,16 @@ "title": "DependencyVerification", "type": "object" }, - "DistributedToolsConfig": { - "description": "Configuration class for distributed tools", + "DistributedSubmissionConfig": { + "additionalProperties": false, + "description": "Configuration class for distributed submission mode", "properties": { - "hdfs_output_dir": { - "description": "HDFS output directory where the output data from the distributed tools will be stored.", - "examples": [ - "hdfs:///path/to/output/dir" + "remote_cache_dir": { + "default": [ + "hdfs:///tmp/spark_rapids_distributed_tools_cache" ], - "title": "Hdfs Output Dir", + "description": "Remote cache directory where the intermediate output data from each task will be stored. Default is hdfs:///tmp/spark_rapids_distributed_tools_cache.", + "title": "Remote Cache Dir", "type": "string" }, "spark_properties": { @@ -66,10 +68,7 @@ "type": "array" } }, - "required": [ - "hdfs_output_dir" - ], - "title": "DistributedToolsConfig", + "title": "DistributedSubmissionConfig", "type": "object" }, "FileHashAlgorithm": { @@ -102,6 +101,7 @@ "type": "string" }, "RuntimeDependency": { + "additionalProperties": false, "description": "Holds information about a runtime dependency required by the tools' java cmd.", "properties": { "name": { @@ -151,6 +151,7 @@ "type": "object" }, "RuntimeDependencyType": { + "additionalProperties": false, "description": "Defines the type of runtime dependency required by the tools' java cmd.", "properties": { "dep_type": { @@ -174,6 +175,7 @@ "type": "object" }, "SparkProperty": { + "additionalProperties": false, "description": "Represents a single Spark property with a name and value.", "properties": { "name": { @@ -195,6 +197,7 @@ "type": "object" }, "ToolsRuntimeConfig": { + "additionalProperties": false, "description": "The runtime configurations of the tools as defined by the user.", "properties": { "dependencies": { @@ -213,7 +216,8 @@ "type": "object" } }, - "description": "Main container for the user's defined tools configuration", + "additionalProperties": false, + "description": "Container for the distributed submission mode configurations. This is the parts of the configuration\nthat can be passed as an input to the CLI", "properties": { "api_version": { "description": "The version of the API that the tools are using. This is used to test the compatibility of the configuration file against the current tools release.", @@ -237,22 +241,22 @@ "default": null, "description": "Configuration related to the runtime environment of the tools." }, - "distributed_tools": { + "submission": { "anyOf": [ { - "$ref": "#/$defs/DistributedToolsConfig" + "$ref": "#/$defs/DistributedSubmissionConfig" }, { "type": "null" } ], "default": null, - "description": "Configuration related to the distributed tools." + "description": "Configuration related to distributed submission mode." } }, "required": [ "api_version" ], - "title": "ToolsConfig", + "title": "DistributedToolsConfig", "type": "object" } diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-local-config-specification.json b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-local-config-specification.json new file mode 100644 index 000000000..c0eb270cf --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/sample-local-config-specification.json @@ -0,0 +1,213 @@ +{ + "$defs": { + "DependencyType": { + "description": "Represents the dependency type for the tools' java cmd.", + "enum": [ + "jar", + "archive" + ], + "title": "DependencyType", + "type": "string" + }, + "DependencyVerification": { + "additionalProperties": false, + "description": "The verification information of a runtime dependency required by the tools' java cmd.", + "properties": { + "size": { + "default": 0, + "description": "The size of the dependency file.", + "examples": [ + 3265393 + ], + "title": "Size", + "type": "integer" + }, + "file_hash": { + "$ref": "#/$defs/FileHashAlgorithm", + "default": null, + "description": "The hash function to verify the file.", + "examples": [ + { + "algorithm": "md5", + "value": "bc9bf7fedde0e700b974426fbd8d869c" + } + ] + } + }, + "title": "DependencyVerification", + "type": "object" + }, + "FileHashAlgorithm": { + "description": "Represents a file hash algorithm and its value. Used for verification against an existing file.", + "properties": { + "algorithm": { + "$ref": "#/$defs/HashAlgorithm" + }, + "value": { + "title": "Value", + "type": "string" + } + }, + "required": [ + "algorithm", + "value" + ], + "title": "FileHashAlgorithm", + "type": "object" + }, + "HashAlgorithm": { + "description": "Represents the supported hashing algorithms", + "enum": [ + "md5", + "sha1", + "sha256", + "sha512" + ], + "title": "HashAlgorithm", + "type": "string" + }, + "LocalSubmissionConfig": { + "additionalProperties": false, + "description": "Configuration class for local submission mode", + "properties": {}, + "title": "LocalSubmissionConfig", + "type": "object" + }, + "RuntimeDependency": { + "additionalProperties": false, + "description": "Holds information about a runtime dependency required by the tools' java cmd.", + "properties": { + "name": { + "description": "The name of the dependency.", + "examples": [ + "Spark-3.5.0", + "AWS Java SDK" + ], + "title": "Name", + "type": "string" + }, + "uri": { + "anyOf": [ + { + "format": "uri", + "minLength": 1, + "type": "string" + }, + { + "format": "file-path", + "type": "string" + } + ], + "description": "The location of the dependency file. It can be a URL to a remote web/storage or a file path.", + "examples": [ + "file:///path/to/file.tgz", + "https://mvn-url/24.08.1/rapids-4-spark-tools_2.12-24.08.1.jar", + "gs://bucket-name/path/to/file.jar" + ], + "title": "Uri" + }, + "dependency_type": { + "$ref": "#/$defs/RuntimeDependencyType", + "description": "Specifies the dependency type to determine how the item is processed. For example, jar files are appended to the java classpath while archive files such as spark are extracted first before adding subdirectory _/jars/* to the classpath." + }, + "verification": { + "$ref": "#/$defs/DependencyVerification", + "default": null, + "description": "Optional specification to verify the dependency file." + } + }, + "required": [ + "name", + "uri" + ], + "title": "RuntimeDependency", + "type": "object" + }, + "RuntimeDependencyType": { + "additionalProperties": false, + "description": "Defines the type of runtime dependency required by the tools' java cmd.", + "properties": { + "dep_type": { + "$ref": "#/$defs/DependencyType", + "description": "The type of the dependency." + }, + "relative_path": { + "default": null, + "description": "Specifies the relative path from within the archive file which will be added to the java cmd. Requires field dep_type to be set to (archive).", + "examples": [ + "jars/*" + ], + "title": "Relative Path", + "type": "string" + } + }, + "required": [ + "dep_type" + ], + "title": "RuntimeDependencyType", + "type": "object" + }, + "ToolsRuntimeConfig": { + "additionalProperties": false, + "description": "The runtime configurations of the tools as defined by the user.", + "properties": { + "dependencies": { + "description": "The list of runtime dependencies required by the tools java cmd. Set this list to specify Spark binaries along with any other required jar files (i.e., hadoop jars, gcp connectors,..etc.). When specified, the default predefined dependencies will be ignored.", + "items": { + "$ref": "#/$defs/RuntimeDependency" + }, + "title": "Dependencies", + "type": "array" + } + }, + "required": [ + "dependencies" + ], + "title": "ToolsRuntimeConfig", + "type": "object" + } + }, + "additionalProperties": false, + "description": "Container for the local submission mode configurations. This is the parts of the configuration that\ncan be passed as an input to the CLI", + "properties": { + "api_version": { + "description": "The version of the API that the tools are using. This is used to test the compatibility of the configuration file against the current tools release.", + "examples": [ + "1.0" + ], + "maximum": 1.0, + "minimum": 1.0, + "title": "Api Version", + "type": "number" + }, + "runtime": { + "anyOf": [ + { + "$ref": "#/$defs/ToolsRuntimeConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration related to the runtime environment of the tools." + }, + "submission": { + "anyOf": [ + { + "$ref": "#/$defs/LocalSubmissionConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration related to local submission mode." + } + }, + "required": [ + "api_version" + ], + "title": "LocalToolsConfig", + "type": "object" +} diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml index 0ec04a119..75595dd56 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml @@ -1,8 +1,9 @@ -# This yaml file is a sample configuration file for the distributed tools. +# This yaml file is a sample configuration file for the distributed tools. It is valid +# only if `--submission_mode distributed` is passed to the CLI. It provides submission +# related configurations. api_version: '1.0' -distributed: - config: - remote_cache_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache' - spark_properties: - - name: 'spark.executor.memory' - value: '20g' +submission: + remote_cache_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache' + spark_properties: + - name: 'spark.executor.memory' + value: '20g' diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_02.yaml b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_02.yaml new file mode 100644 index 000000000..642210fa0 --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_02.yaml @@ -0,0 +1,17 @@ +# This yaml file is a sample configuration file for the distributed tools. It is valid +# only if `--submission_mode distributed` is passed to the CLI. It provides runtime +# dependencies and submission related configurations. +api_version: '1.0' +runtime: + dependencies: + - name: my-spark350 + uri: https:///archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz + dependency_type: + dep_type: archive + # for tgz files, it is required to give the subfolder where the jars are located + relative_path: jars/* +submission: + remote_cache_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache' + spark_properties: + - name: 'spark.executor.memory' + value: '20g' From 00584f64d401ab84d57add88874c9827bff41a4f Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 16 Dec 2024 14:26:32 -0800 Subject: [PATCH 12/14] Bump up the API version for tools config file Signed-off-by: Partho Sarthi --- user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py | 1 - .../src/spark_rapids_tools/configuration/tools_config.py | 4 ++-- .../resources/tools_config/valid/tools_config_01.yaml | 2 +- .../resources/tools_config/valid/tools_config_02.yaml | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 7de6ed07b..9ddc4583f 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -953,7 +953,6 @@ def _get_distributed_tools_configs(self) -> Optional[DistributedToolsConfig]: Parse the tools configuration and return as distributed tools configuration object """ config_obj = self.get_tools_config_obj() - print(config_obj.get_schema()) if config_obj and config_obj.submission: if self.ctxt.is_distributed_mode(): return config_obj diff --git a/user_tools/src/spark_rapids_tools/configuration/tools_config.py b/user_tools/src/spark_rapids_tools/configuration/tools_config.py index 29bf7dcf8..24627d497 100644 --- a/user_tools/src/spark_rapids_tools/configuration/tools_config.py +++ b/user_tools/src/spark_rapids_tools/configuration/tools_config.py @@ -32,8 +32,8 @@ class ToolsConfig(BaseConfig): description='The version of the API that the tools are using. ' 'This is used to test the compatibility of the ' 'configuration file against the current tools release.', - examples=['1.0'], - le=1.0, # minimum version compatible with the current tools implementation + examples=['1.0, 1.1'], + le=1.1, # minimum version compatible with the current tools implementation ge=1.0) runtime: Optional[ToolsRuntimeConfig] = Field( diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml index 75595dd56..b16792020 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_01.yaml @@ -1,7 +1,7 @@ # This yaml file is a sample configuration file for the distributed tools. It is valid # only if `--submission_mode distributed` is passed to the CLI. It provides submission # related configurations. -api_version: '1.0' +api_version: '1.1' submission: remote_cache_dir: 'hdfs:///tmp/spark_rapids_distributed_tools_cache' spark_properties: diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_02.yaml b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_02.yaml index 642210fa0..726a02532 100644 --- a/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_02.yaml +++ b/user_tools/tests/spark_rapids_tools_ut/resources/tools_config/valid/tools_config_02.yaml @@ -1,7 +1,7 @@ # This yaml file is a sample configuration file for the distributed tools. It is valid # only if `--submission_mode distributed` is passed to the CLI. It provides runtime # dependencies and submission related configurations. -api_version: '1.0' +api_version: '1.1' runtime: dependencies: - name: my-spark350 From 9b8111aa1562f530798c0751ca5321e91710b190 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 16 Dec 2024 14:47:53 -0800 Subject: [PATCH 13/14] Update python arg tests Signed-off-by: Partho Sarthi --- .../rapids/qualification.py | 3 ++ .../rapids/rapids_tool.py | 2 +- .../spark_rapids_tools/cmdli/argprocessor.py | 24 +++++----- user_tools/src/spark_rapids_tools/enums.py | 2 +- .../tests/spark_rapids_tools_ut/conftest.py | 1 + .../test_tool_argprocessor.py | 44 ++++++++++++------- 6 files changed, 49 insertions(+), 27 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 5bb58f4af..3b8e97ed8 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -154,6 +154,9 @@ def _process_estimation_model_args(self) -> None: self.ctxt.set_ctxt('estimationModelArgs', estimation_model_args) def _process_submission_mode_arg(self) -> None: + """ + Process the value provided by `--submission_mode` argument. + """ submission_mode_arg = self.wrapper_options.get('submissionMode') if submission_mode_arg is None or not submission_mode_arg: submission_mode = SubmissionMode.get_default() diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 9ddc4583f..c8180a71b 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -958,7 +958,7 @@ def _get_distributed_tools_configs(self) -> Optional[DistributedToolsConfig]: return config_obj self.logger.warning( 'Distributed tool configurations detected, but distributed mode is not enabled.' - 'Use \'--distributed\' flag to enable distributed mode. Switching to local mode.' + 'Use \'--submission_mode distributed\' flag to enable distributed mode. Switching to local mode.' ) elif self.ctxt.is_distributed_mode(): self.logger.warning( diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index d59697ef1..2177dec82 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -31,6 +31,7 @@ from spark_rapids_tools.utils import AbstractPropContainer, is_http_file from ..configuration.submission.distributed_config import DistributedToolsConfig from ..configuration.submission.local_config import LocalToolsConfig +from ..configuration.tools_config import ToolsConfig from ..enums import QualFilterApp, CspEnv, QualEstimationModel, SubmissionMode from ..storagelib.csppath import CspPath from ..tools.autotuner import AutoTunerPropMgr @@ -373,6 +374,9 @@ def process_jvm_args(self) -> None: self.p_args['toolArgs']['jobResources'] = adjusted_resources self.p_args['toolArgs']['log4jPath'] = Utils.resource_path('dev/log4j.properties') + def load_tools_config_internal(self) -> ToolsConfig: + return LocalToolsConfig.load_from_file(self.tools_config_path) + def load_tools_config(self) -> None: """ Load the tools config file if it is provided. It creates a ToolsConfig object and sets it @@ -383,11 +387,7 @@ def load_tools_config(self) -> None: if self.tools_config_path is not None: # the CLI provides a tools config file try: - if self.p_args['toolArgs']['submissionMode'] == SubmissionMode.DISTRIBUTED: - tools_config = DistributedToolsConfig.load_from_file(self.tools_config_path) - else: - tools_config = LocalToolsConfig.load_from_file(self.tools_config_path) - self.p_args['toolArgs']['toolsConfig'] = tools_config + self.p_args['toolArgs']['toolsConfig'] = self.load_tools_config_internal() except ValidationError as ve: # If required, we can dump the expected specification by appending # 'ToolsConfig.get_schema()' to the error message @@ -493,10 +493,7 @@ def init_tool_args(self) -> None: self.p_args['toolArgs']['estimationModelArgs'] = QualEstimationModel.create_default_model_args(def_model) else: self.p_args['toolArgs']['estimationModelArgs'] = self.estimation_model_args - if self.submission_mode is None or not self.submission_mode: - self.p_args['toolArgs']['submissionMode'] = SubmissionMode.get_default() - else: - self.p_args['toolArgs']['submissionMode'] = self.submission_mode + self.submission_mode = self.submission_mode or SubmissionMode.get_default() @model_validator(mode='after') def validate_arg_cases(self) -> 'QualifyUserArgModel': @@ -507,6 +504,13 @@ def validate_arg_cases(self) -> 'QualifyUserArgModel': def is_concurrent_submission(self) -> bool: return self.p_args['toolArgs']['estimationModelArgs']['xgboostEnabled'] + def load_tools_config_internal(self) -> ToolsConfig: + # Override the method to load the tools config file based on the submission mode + config_class = ( + DistributedToolsConfig if self.submission_mode == SubmissionMode.DISTRIBUTED else LocalToolsConfig + ) + return config_class.load_from_file(self.tools_config_path) + def build_tools_args(self) -> dict: # At this point, if the platform is still none, then we can set it to the default value # which is the onPrem platform. @@ -543,7 +547,7 @@ def build_tools_args(self) -> dict: 'filterApps': QualFilterApp.fromstring(self.p_args['toolArgs']['filterApps']), 'toolsJar': self.p_args['toolArgs']['toolsJar'], 'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs'], - 'submissionMode': self.p_args['toolArgs']['submissionMode'] + 'submissionMode': self.submission_mode } return wrapped_args diff --git a/user_tools/src/spark_rapids_tools/enums.py b/user_tools/src/spark_rapids_tools/enums.py index 15f9b8828..d2b450832 100644 --- a/user_tools/src/spark_rapids_tools/enums.py +++ b/user_tools/src/spark_rapids_tools/enums.py @@ -230,5 +230,5 @@ class SubmissionMode(EnumeratedType): DISTRIBUTED = 'distributed' @classmethod - def get_default(cls): + def get_default(cls) -> 'SubmissionMode': return cls.LOCAL diff --git a/user_tools/tests/spark_rapids_tools_ut/conftest.py b/user_tools/tests/spark_rapids_tools_ut/conftest.py index de3f2da12..019026250 100644 --- a/user_tools/tests/spark_rapids_tools_ut/conftest.py +++ b/user_tools/tests/spark_rapids_tools_ut/conftest.py @@ -49,6 +49,7 @@ def gen_cpu_cluster_props(): autotuner_prop_path = 'worker_info.yaml' # valid tools config files valid_tools_conf_files = ['tools_config_00.yaml'] +valid_distributed_mode_tools_conf_files = ['tools_config_01.yaml', 'tools_config_02.yaml'] # invalid tools config files invalid_tools_conf_files = [ # test older API_version diff --git a/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py b/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py index 300751242..f889d1450 100644 --- a/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py +++ b/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py @@ -25,7 +25,7 @@ from spark_rapids_tools.cmdli.argprocessor import AbsToolUserArgModel, ArgValueCase from spark_rapids_tools.enums import QualFilterApp from .conftest import SparkRapidsToolsUT, autotuner_prop_path, all_cpu_cluster_props, all_csps, \ - valid_tools_conf_files, invalid_tools_conf_files + valid_tools_conf_files, invalid_tools_conf_files, valid_distributed_mode_tools_conf_files @dataclasses.dataclass @@ -77,25 +77,25 @@ def validate_args_w_savings_disabled(tool_name: str, t_args: dict): assert t_args['filterApps'] == QualFilterApp.get_default() @staticmethod - def create_tool_args_should_pass(tool_name: str, platform=None, cluster=None, - eventlogs=None, tools_jar=None, tools_config_path=None): + def create_tool_args_should_pass(tool_name: str, **kwargs): return AbsToolUserArgModel.create_tool_args(tool_name, - platform=platform, - cluster=cluster, - eventlogs=eventlogs, - tools_jar=tools_jar, - tools_config_path=tools_config_path) + platform=kwargs.get('platform'), + cluster=kwargs.get('cluster'), + eventlogs=kwargs.get('eventlogs'), + tools_jar=kwargs.get('tools_jar'), + tools_config_path=kwargs.get('tools_config_path'), + submission_mode=kwargs.get('submission_mode')) @staticmethod - def create_tool_args_should_fail(tool_name: str, platform=None, cluster=None, - eventlogs=None, tools_jar=None, tools_config_path=None): + def create_tool_args_should_fail(tool_name: str, **kwargs): with pytest.raises(SystemExit) as pytest_wrapped_e: AbsToolUserArgModel.create_tool_args(tool_name, - platform=platform, - cluster=cluster, - eventlogs=eventlogs, - tools_jar=tools_jar, - tools_config_path=tools_config_path) + platform=kwargs.get('platform'), + cluster=kwargs.get('cluster'), + eventlogs=kwargs.get('eventlogs'), + tools_jar=kwargs.get('tools_jar'), + tools_config_path=kwargs.get('tools_config_path'), + submission_mode=kwargs.get('submission_mode')) assert pytest_wrapped_e.type == SystemExit @staticmethod @@ -349,6 +349,20 @@ def test_invalid_tools_configs(self, get_ut_data_dir, tool_name, csp, tools_conf tools_config_path=tools_conf_path) assert pytest_wrapped_e.type == SystemExit + @pytest.mark.parametrize('tool_name', ['qualification']) + @pytest.mark.parametrize('csp', ['onprem']) + @pytest.mark.parametrize('submission_mode', ['distributed']) + @pytest.mark.parametrize('tools_conf_fname', valid_distributed_mode_tools_conf_files) + def test_distributed_mode_configs(self, get_ut_data_dir, tool_name, csp, submission_mode, tools_conf_fname): + tools_conf_path = f'{get_ut_data_dir}/tools_config/valid/{tools_conf_fname}' + # should pass: tools config file is provided + tool_args = self.create_tool_args_should_pass(tool_name, + platform=csp, + eventlogs=f'{get_ut_data_dir}/eventlogs', + tools_config_path=tools_conf_path, + submission_mode=submission_mode) + assert tool_args['toolsConfig'] is not None + def test_arg_cases_coverage(self): """ This test ensures that above tests have covered all possible states of the `platform`, `cluster`, From 1ea81476f5d904a95becfc1d5247d878ce6abda4 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 16 Dec 2024 16:47:27 -0800 Subject: [PATCH 14/14] Remove pylint disable rule in CSPs Signed-off-by: Partho Sarthi --- .../src/spark_rapids_pytools/cloud_api/databricks_aws.py | 1 - .../src/spark_rapids_pytools/cloud_api/databricks_azure.py | 4 +++- user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py | 4 +++- user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py | 4 +++- user_tools/src/spark_rapids_pytools/cloud_api/emr.py | 4 +++- 5 files changed, 12 insertions(+), 5 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py index 774574a0d..82bd40133 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py @@ -31,7 +31,6 @@ from spark_rapids_pytools.pricing.price_provider import SavingsEstimator -# pylint: disable=abstract-method @dataclass class DBAWSPlatform(EMRPlatform): """ diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py index c11ecb681..ce76f9cfd 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py @@ -29,7 +29,6 @@ from spark_rapids_pytools.pricing.price_provider import SavingsEstimator -# pylint: disable=abstract-method @dataclass class DBAzurePlatform(PlatformBase): """ @@ -89,6 +88,9 @@ def create_saving_estimator(self, def create_local_submission_job(self, job_prop, ctxt) -> Any: return DBAzureLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt) + def create_distributed_submission_job(self, job_prop, ctxt) -> Any: + pass + def validate_job_submission_args(self, submission_args: dict) -> dict: pass diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index 888d21eb0..6bf33eb4c 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -32,7 +32,6 @@ from spark_rapids_pytools.pricing.price_provider import SavingsEstimator -# pylint: disable=abstract-method @dataclass class DataprocPlatform(PlatformBase): """ @@ -131,6 +130,9 @@ def create_saving_estimator(self, def create_local_submission_job(self, job_prop, ctxt) -> Any: return DataprocLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt) + def create_distributed_submission_job(self, job_prop, ctxt) -> Any: + pass + def validate_job_submission_args(self, submission_args: dict) -> dict: pass diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py index d81e01309..364c9c8e9 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py @@ -29,7 +29,6 @@ from spark_rapids_tools import CspEnv -# pylint: disable=abstract-method @dataclass class DataprocGkePlatform(DataprocPlatform): """ @@ -93,6 +92,9 @@ def create_saving_estimator(self, def create_local_submission_job(self, job_prop, ctxt) -> Any: return DataprocGkeLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt) + def create_distributed_submission_job(self, job_prop, ctxt) -> Any: + pass + @dataclass class DataprocGkeCMDDriver(DataprocCMDDriver): diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py index 6401d7e43..11849beed 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py @@ -31,7 +31,6 @@ from spark_rapids_pytools.pricing.price_provider import SavingsEstimator -# pylint: disable=abstract-method @dataclass class EMRPlatform(PlatformBase): """ @@ -116,6 +115,9 @@ def create_saving_estimator(self, def create_local_submission_job(self, job_prop, ctxt) -> Any: return EmrLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt) + def create_distributed_submission_job(self, job_prop, ctxt) -> Any: + pass + def generate_cluster_configuration(self, render_args: dict): image_version = self.configs.get_value_silent('clusterInference', 'defaultImage') render_args['IMAGE'] = f'"{image_version}"'