Skip to content

Commit

Permalink
Qualification tool: Add output stats file for Execs(operators) (#1225)
Browse files Browse the repository at this point in the history
* Qualification tool: Add stats report file for unsupported operators

Signed-off-by: Niranjan Artal <[email protected]>

* addressed review comments

Signed-off-by: Niranjan Artal <[email protected]>

* read ctxt in class

* addressed review comments, added stats as a different tool

Signed-off-by: Niranjan Artal <[email protected]>

* addressed review comments

* improve exception handling

* improve error handling and fix bug when incorrect path is provided

Signed-off-by: Niranjan Artal <[email protected]>

* addressed review comments

* addressed review comments

* updated taskStage calculation and added supported operators as well

* addressed review comments

Signed-off-by: Niranjan Artal <[email protected]>

---------

Signed-off-by: Niranjan Artal <[email protected]>
  • Loading branch information
nartal1 authored Aug 12, 2024
1 parent b35289d commit 32239bf
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 0 deletions.
8 changes: 8 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
Expand Down Expand Up @@ -478,6 +479,13 @@ def __build_global_report_summary(self,

unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Generate the statistics report
try:
stats_report = SparkQualificationStats(ctxt=self.ctxt)
stats_report.report_qualification_stats()
except Exception as e: # pylint: disable=broad-except
self.logger.error('Failed to generate the statistics report: %s', e)

# Calculate unsupported operators stage duration before grouping
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)
Expand Down
103 changes: 103 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/qualification_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation class representing wrapper around the Qualification Stats tool."""

from dataclasses import dataclass

import os

from spark_rapids_pytools.cloud_api.sp_types import get_platform
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils
from spark_rapids_pytools.rapids.rapids_tool import RapidsTool
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.qualx.util import find_paths, RegexPattern


@dataclass
class SparkQualStats(RapidsTool):

"""
Wrapper layer around Qualification Stats Tool.
Attributes
----------
config_path : str
Path to the qualification configuration file.
output_folder : str
Path to the output folder to save the results.
qual_output: str
Path to a directory containing qualification tool output.
"""
config_path: str = None
output_folder: str = None
qual_output: str = None

name = 'stats'

def _init_ctxt(self) -> None:
"""
Initialize the tool context, reusing qualification configurations.
TODO: This should be refactor to use it's own conf file if not provided by the user.
"""
if self.config_path is None:
self.config_path = Utils.resource_path('qualification-conf.yaml')
self.ctxt = ToolContext(platform_cls=get_platform(self.platform_type),
platform_opts=self.wrapper_options.get('platformOpts'),
prop_arg=self.config_path,
name=self.name)

def _process_output_args(self) -> None:
"""
Sets the `output_folder`, ensures its creation, and updates the context with the folder path.
"""
self.logger.debug('Processing Output Arguments')
if self.output_folder is None:
self.output_folder = os.getcwd()
self.output_folder = FSUtil.get_abs_path(self.output_folder)
exec_dir_name = f'{self.name}_{self.ctxt.uuid}'
# It should never happen that the exec_dir_name exists
self.output_folder = FSUtil.build_path(self.output_folder, exec_dir_name)
FSUtil.make_dirs(self.output_folder, exist_ok=False)
self.ctxt.set_local('outputFolder', self.output_folder)
self.logger.info('Local output folder is set as: %s', self.output_folder)

def _run_rapids_tool(self) -> None:
"""
Runs the Qualification Stats tool.
"""
try:
self.logger.info('Running Qualification Stats tool')
if self.qual_output is not None:
qual_output_dir = find_paths(self.qual_output, RegexPattern.rapids_qual.match,
return_directories=True)
if qual_output_dir:
self.qual_output = qual_output_dir[0]
result = SparkQualificationStats(ctxt=self.ctxt, qual_output=self.qual_output)
result.report_qualification_stats()
self.logger.info('Qualification Stats tool completed successfully')
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error running Qualification Stats tool %s', e)
raise

def _collect_result(self) -> None:
pass

def _archive_phase(self) -> None:
pass

def _finalize(self) -> None:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ toolOutput:
fileName: rapids_4_spark_qualification_output_unsupportedOperators.csv
clusterInformation:
fileName: rapids_4_spark_qualification_output_cluster_information.csv
stagesInformation:
fileName: rapids_4_spark_qualification_output_stages.csv
execsInformation:
fileName: rapids_4_spark_qualification_output_execs.csv
tunings:
subFolder: tuning
appsStatusReport:
Expand Down Expand Up @@ -108,6 +112,18 @@ local:
full:
name: qualification_summary_full.csv
outputComment: "Full savings and speedups CSV report"
statistics:
name: qualification_statistics.csv
outputComment: "Statistics CSV report"
columns:
- 'App ID'
- 'SQL ID'
- 'Operator'
- 'Count'
- 'Stage Task Exec Duration(s)'
- 'Total SQL Task Duration(s)'
- '% of Total SQL Task Duration'
- 'Supported'
intermediateOutput:
name: 'intermediate_output'
outputComment: "Intermediate output generated by tools"
Expand Down
20 changes: 20 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,26 @@ def build_tools_args(self) -> dict:
}


@dataclass
@register_tool_arg_validator('stats')
class StatsUserArgModel(AbsToolUserArgModel):
"""
Represents the arguments collected by the user to run the stats tool.
"""
qual_output: str = None
config_path: Optional[str] = None
output_folder: Optional[str] = None

def build_tools_args(self) -> dict:
return {
'runtimePlatform': self.platform,
'config_path': self.config_path,
'output_folder': self.output_folder,
'qual_output': self.qual_output,
'platformOpts': {}
}


@dataclass
@register_tool_arg_validator('generate_instance_description')
class InstanceDescriptionUserArgModel(AbsToolUserArgModel):
Expand Down
36 changes: 36 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/dev_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import fire

from spark_rapids_tools.cmdli.argprocessor import AbsToolUserArgModel
from spark_rapids_tools.enums import CspEnv
from spark_rapids_tools.utils.util import gen_app_banner, init_environment
from spark_rapids_pytools.common.utilities import ToolLogging
from spark_rapids_pytools.rapids.dev.instance_description import InstanceDescription
from spark_rapids_pytools.rapids.qualification_stats import SparkQualStats


class DevCLI(object): # pylint: disable=too-few-public-methods
Expand Down Expand Up @@ -51,6 +53,40 @@ def generate_instance_description(self,
wrapper_options=instance_description_args)
tool_obj.launch()

def stats(self,
config_path: str = None,
output_folder: str = None,
qual_output: str = None):
"""The stats cmd generates statistics from the qualification tool output.
Statistics is generated per AppId, per SQLID, and per Operator. For each operator, the
statistics include the number of times the operator was executed, total task time of
the stages that contain the operator, the total task time of the SQL that the operator
is part of. The count of operators is also differentiated whether the operator is
supported or unsupported.
:param config_path: Path to the configuration file.
:param output_folder: Path to store the output.
:param qual_output: path to the directory, which contains the qualification tool output.
E.g. user should specify the parent directory $WORK_DIR where
$WORK_DIR/rapids_4_spark_qualification_output exists.
"""
ToolLogging.enable_debug_mode()
init_environment('stats')

stats_args = AbsToolUserArgModel.create_tool_args('stats',
platform=CspEnv.get_default(),
config_path=config_path,
output_folder=output_folder,
qual_output=qual_output)
tool_obj = SparkQualStats(platform_type=stats_args['runtimePlatform'],
config_path=config_path,
output_folder=output_folder,
qual_output=qual_output,
wrapper_options=stats_args)

tool_obj.launch()


def main():
# Make Python Fire not use a pager when it prints a help text
Expand Down
Loading

0 comments on commit 32239bf

Please sign in to comment.