From 68be7cb177fb5d51f7e80465805f41225ef5b131 Mon Sep 17 00:00:00 2001 From: Cindy Jiang <47068112+cindyyuanjiang@users.noreply.github.com> Date: Mon, 23 Oct 2023 14:32:35 -0700 Subject: [PATCH 1/7] [FEA] Fix empty softwareProperties field in worker_info.yaml file for profiling tool (#623) * fixed null softwareProperties issue for clusterProperties Signed-off-by: cindyyuanjiang * refactored code Signed-off-by: cindyyuanjiang * added lod info for empty softwareProperties Signed-off-by: cindyyuanjiang --------- Signed-off-by: cindyyuanjiang --- .../com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala index 86909d98d..4241ac3c3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala @@ -1035,7 +1035,12 @@ object AutoTuner extends Logging { representer.getPropertyUtils.setSkipMissingProperties(true) val constructor = new Constructor(classOf[ClusterProperties], new LoaderOptions()) val yamlObjNested = new Yaml(constructor, representer) - Option(yamlObjNested.load(clusterProps).asInstanceOf[ClusterProperties]) + val loadedClusterProps = yamlObjNested.load(clusterProps).asInstanceOf[ClusterProperties] + if (loadedClusterProps != null && loadedClusterProps.softwareProperties == null) { + logInfo("softwareProperties is empty from input worker_info file") + loadedClusterProps.softwareProperties = new util.LinkedHashMap[String, String]() + } + Option(loadedClusterProps) } def loadClusterProps(filePath: String): Option[ClusterProperties] = { From d814f6ebafac57313d7b696f18320b20a90398b0 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 23 Oct 2023 18:05:41 -0700 Subject: [PATCH 2/7] Enable Dynamic 'Zone' Configuration for Dataproc User Tools (#629) --- .../src/spark_rapids_pytools/cloud_api/dataproc.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index bd17016bd..d3a24e648 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -422,9 +422,20 @@ def _init_nodes(self): SparkNodeType.MASTER: master_node } + def _set_zone_from_props(self, prop_container: JSONPropertiesContainer): + """ + Extracts the 'zoneUri' from the properties container and updates the environment variable dictionary. + """ + if prop_container: + zone_uri = prop_container.get_value_silent('config', 'gceClusterConfig', 'zoneUri') + if zone_uri: + self.cli.env_vars['zone'] = FSUtil.get_resource_name(zone_uri) + def _init_connection(self, cluster_id: str = None, props: str = None) -> dict: cluster_args = super()._init_connection(cluster_id=cluster_id, props=props) + # extract and update zone to the environment variable + self._set_zone_from_props(cluster_args['props']) # propagate zone to the cluster cluster_args.setdefault('zone', self.cli.get_env_var('zone')) return cluster_args From a96e0d7f5654a20ef7fd4584e3ddf0990fd59942 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 27 Oct 2023 08:33:52 -0700 Subject: [PATCH 3/7] Fix spinner animation blocking diagnostic prompt (#631) Signed-off-by: Partho Sarthi --- .../spark_rapids_pytools/common/utilities.py | 22 ++++++++++++++----- .../spark_rapids_pytools/rapids/diagnostic.py | 10 ++++++--- .../rapids/rapids_tool.py | 5 ++++- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/common/utilities.py b/user_tools/src/spark_rapids_pytools/common/utilities.py index b8d56817d..fb2666cd7 100644 --- a/user_tools/src/spark_rapids_pytools/common/utilities.py +++ b/user_tools/src/spark_rapids_pytools/common/utilities.py @@ -379,23 +379,25 @@ class ToolsSpinner: A class to manage the spinner animation. Reference: https://stackoverflow.com/a/66558182 - :param in_debug_mode: Flag indicating if running in debug (verbose) mode. Defaults to False. + :param enabled: Flag indicating if the spinner is enabled. Defaults to True. """ - in_debug_mode: bool = field(default=False, init=True) - pixel_spinner: PixelSpinner = field(default=PixelSpinner('Processing...'), init=False) + enabled: bool = field(default=True, init=True) + pixel_spinner: PixelSpinner = field(default=PixelSpinner('Processing...', hide_cursor=False), init=False) end: str = field(default='Processing Completed!', init=False) timeout: float = field(default=0.1, init=False) completed: bool = field(default=False, init=False) spinner_thread: threading.Thread = field(default=None, init=False) + pause_event: threading.Event = field(default=threading.Event(), init=False) def _spinner_animation(self): while not self.completed: self.pixel_spinner.next() time.sleep(self.timeout) + while self.pause_event.is_set(): + self.pause_event.wait(self.timeout) def start(self): - # Don't start if in debug mode - if not self.in_debug_mode: + if self.enabled: self.spinner_thread = threading.Thread(target=self._spinner_animation, daemon=True) self.spinner_thread.start() return self @@ -404,6 +406,16 @@ def stop(self): self.completed = True print(f'\r\n{self.end}', flush=True) + def pause(self, insert_newline=False): + if self.enabled: + if insert_newline: + # Print a newline for visual separation + print() + self.pause_event.set() + + def resume(self): + self.pause_event.clear() + def __enter__(self): return self.start() diff --git a/user_tools/src/spark_rapids_pytools/rapids/diagnostic.py b/user_tools/src/spark_rapids_pytools/rapids/diagnostic.py index 35df84fb1..510600497 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/diagnostic.py +++ b/user_tools/src/spark_rapids_pytools/rapids/diagnostic.py @@ -40,16 +40,20 @@ def _process_custom_args(self): self.thread_num = thread_num self.logger.debug('Set thread number as: %d', self.thread_num) - - self.logger.warning('This operation will collect sensitive information from your cluster, ' - 'such as OS & HW info, Yarn/Spark configurations and log files etc.') + log_message = ('This operation will collect sensitive information from your cluster, ' + 'such as OS & HW info, Yarn/Spark configurations and log files etc.') yes = self.wrapper_options.get('yes', False) if yes: + self.logger.warning(log_message) self.logger.info('Confirmed by command line option.') else: + # Pause the spinner for user prompt + self.spinner.pause(insert_newline=True) + print(log_message) user_input = input('Do you want to continue (yes/no): ') if user_input.lower() not in ['yes', 'y']: raise RuntimeError('User canceled the operation.') + self.spinner.resume() def requires_cluster_connection(self) -> bool: return True diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index b686871be..de86f63d1 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -59,6 +59,7 @@ class RapidsTool(object): name: str = field(default=None, init=False) ctxt: ToolContext = field(default=None, init=False) logger: Logger = field(default=None, init=False) + spinner: ToolsSpinner = field(default=None, init=False) def pretty_name(self): return self.name.capitalize() @@ -272,7 +273,9 @@ def _verify_exec_cluster(self): self._handle_non_running_exec_cluster(msg) def launch(self): - with ToolsSpinner(in_debug_mode=ToolLogging.is_debug_mode_enabled()): + # Spinner should not be enabled in debug mode + enable_spinner = not ToolLogging.is_debug_mode_enabled() + with ToolsSpinner(enabled=enable_spinner) as self.spinner: self._init_tool() self._connect_to_execution_cluster() self._process_arguments() From 6f1a5732411ca911afe1f8a1c71f99d8719ab177 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 27 Oct 2023 14:00:38 -0700 Subject: [PATCH 4/7] Fix system command processing during logging in user tools (#633) * Fix incorrect splitting of system cmd having string type Signed-off-by: Partho Sarthi * Fix subscriptable type Signed-off-by: Partho Sarthi * Concise if-else condition Signed-off-by: Partho Sarthi --------- Signed-off-by: Partho Sarthi --- user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py index b8f4f6a29..2ea9e4864 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py @@ -20,7 +20,7 @@ from dataclasses import dataclass, field from enum import Enum from logging import Logger -from typing import Type, Any, List, Callable +from typing import Type, Any, List, Callable, Union from spark_rapids_tools import EnumeratedType, CspEnv from spark_rapids_pytools.common.prop_manager import AbstractPropertiesContainer, JSONPropertiesContainer, \ @@ -369,7 +369,7 @@ def validate_env(self): self._handle_inconsistent_configurations(incorrect_envs) def run_sys_cmd(self, - cmd, + cmd: Union[str, list], cmd_input: str = None, fail_ok: bool = False, env_vars: dict = None) -> str: @@ -393,7 +393,9 @@ def process_streams(std_out, std_err): if len(stdout_splits) > 0: std_out_lines = Utils.gen_multiline_str([f'\t| {line}' for line in stdout_splits]) stdout_str = f'\n\t\n{std_out_lines}' - cmd_log_str = Utils.gen_joined_str(' ', process_credentials_option(cmd)) + # if the command is already a list, use it as-is. Otherwise, split the string into a list. + cmd_list = cmd if isinstance(cmd, list) else cmd.split(' ') + cmd_log_str = Utils.gen_joined_str(' ', process_credentials_option(cmd_list)) if len(stderr_splits) > 0: std_err_lines = Utils.gen_multiline_str([f'\t| {line}' for line in stderr_splits]) stderr_str = f'\n\t\n{std_err_lines}' From 1a4756d733b1b2209ee05d055f34abe6beb4cba7 Mon Sep 17 00:00:00 2001 From: Niranjan Artal <50492963+nartal1@users.noreply.github.com> Date: Mon, 30 Oct 2023 09:41:03 -0700 Subject: [PATCH 5/7] Profiling tool : Update readSchema string parser (#635) Signed-off-by: Niranjan Artal --- .../org/apache/spark/sql/rapids/tool/AppBase.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index 769389371..527b7de58 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -272,6 +272,15 @@ abstract class AppBase( } } + private def trimSchema(str: String): String = { + val index = str.lastIndexOf(",") + if (index != -1 && str.contains("...")) { + str.substring(0, index) + } else { + str + } + } + // The ReadSchema metadata is only in the eventlog for DataSource V1 readers protected def checkMetadataForReadSchema(sqlID: Long, planInfo: SparkPlanInfo): Unit = { // check if planInfo has ReadSchema @@ -284,7 +293,7 @@ abstract class AppBase( val readSchema = ReadParser.formatSchemaStr(meta.getOrElse("ReadSchema", "")) val scanNode = allNodes.filter(node => { // Get ReadSchema of each Node and sanitize it for comparison - val trimmedNode = ReadParser.parseReadNode(node).schema.replace("...", "") + val trimmedNode = trimSchema(ReadParser.parseReadNode(node).schema) readSchema.contains(trimmedNode) }).filter(x => x.name.startsWith("Scan")).head From 8ed14f4ce35a88cef2af0a2d033f9605602e6d90 Mon Sep 17 00:00:00 2001 From: spark-rapids automation <70000568+nvauto@users.noreply.github.com> Date: Mon, 30 Oct 2023 16:47:25 +0000 Subject: [PATCH 6/7] Update dev-version by jenkins-spark-rapids-tools-auto-release-47 Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com> --- core/pom.xml | 2 +- user_tools/src/spark_rapids_pytools/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 7a9d88451..01249ed6d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -23,7 +23,7 @@ rapids-4-spark-tools_2.12 RAPIDS Accelerator for Apache Spark tools RAPIDS Accelerator for Apache Spark tools - 23.08.3-SNAPSHOT + 23.10.1-SNAPSHOT jar http://github.com/NVIDIA/spark-rapids-tools diff --git a/user_tools/src/spark_rapids_pytools/__init__.py b/user_tools/src/spark_rapids_pytools/__init__.py index e2c2d6b9b..8672e1571 100644 --- a/user_tools/src/spark_rapids_pytools/__init__.py +++ b/user_tools/src/spark_rapids_pytools/__init__.py @@ -16,5 +16,5 @@ from spark_rapids_pytools.build import get_version -VERSION = '23.08.3' +VERSION = '23.10.1' __version__ = get_version(VERSION) From defd16c43d9b05d812afbd0a2f56329295823ad9 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 31 Oct 2023 10:56:26 -0700 Subject: [PATCH 7/7] Add user tools and jar version in logs (#642) Signed-off-by: Partho Sarthi --- user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index de86f63d1..7fbbd9b09 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -26,6 +26,7 @@ from logging import Logger from typing import Any, Callable, Dict, List +import spark_rapids_pytools from spark_rapids_tools import CspEnv from spark_rapids_pytools.cloud_api.sp_types import get_platform, \ ClusterBase, DeployMode, NodeHWInfo @@ -120,6 +121,7 @@ def wrapper(self, *args, **kwargs): def __post_init__(self): # when debug is set to true set it in the environment. self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.{self.name}') + self.logger.info('Using Spark RAPIDS user tools version %s', spark_rapids_pytools.__version__) def _check_environment(self) -> None: self.ctxt.platform.setup_and_validate_env() @@ -387,8 +389,12 @@ def _process_jar_arg(self): fail_ok=False, create_dir=True) self.logger.info('RAPIDS accelerator jar is downloaded to work_dir %s', jar_path) - # get the jar file name and add it to the tool args + # get the jar file name jar_file_name = FSUtil.get_resource_name(jar_path) + version_match = re.search(r'\d{2}\.\d{2}\.\d+', jar_file_name) + jar_version = version_match.group() if version_match else 'Unknown' + self.logger.info('Using Spark RAPIDS accelerator jar version %s', jar_version) + # add jar file name to the tool args self.ctxt.add_rapids_args('jarFileName', jar_file_name) self.ctxt.add_rapids_args('jarFilePath', jar_path)