Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add output stats file for Execs(operators) #1225

Merged
merged 16 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
Expand Down Expand Up @@ -594,6 +595,10 @@ def __build_global_report_summary(self,
output_files_info = JSONPropertiesContainer(output_files_raw, file_load=False)
unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Generate the statistics report
stats_report = SparkQualificationStats(ctxt=self.ctxt)
stats_report.report_qualification_stats()

# Calculate unsupported operators stage duration before grouping
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)
Expand Down
103 changes: 103 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/qualification_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation class representing wrapper around the Qualification Stats tool."""

from dataclasses import dataclass

import os

from spark_rapids_pytools.cloud_api.sp_types import get_platform
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils
from spark_rapids_pytools.rapids.rapids_tool import RapidsTool
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.qualx.util import find_paths, RegexPattern


@dataclass
class SparkQualStats(RapidsTool):

"""
Wrapper layer around Qualification Stats Tool.

Attributes
----------
config_path : str
Path to the qualification configuration file.
output_folder : str
Path to the output folder to save the results.
qual_output: str
Path to a directory containing qualification tool output.
"""
config_path: str = None
output_folder: str = None
qual_output: str = None

name = 'stats'

def _init_ctxt(self):
"""
Initialize the tool context, reusing qualification configurations.
TODO: This should be refactor to use it's own conf file if not provided by the user.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
"""
if self.config_path is None:
self.config_path = Utils.resource_path('qualification-conf.yaml')
self.ctxt = ToolContext(platform_cls=get_platform(self.platform_type),
platform_opts=self.wrapper_options.get('platformOpts'),
prop_arg=self.config_path,
name=self.name)

def _process_output_args(self):
"""
Sets the `output_folder`, ensures its creation, and updates the context with the folder path.
"""
self.logger.debug('Processing Output Arguments')
if self.output_folder is None:
self.output_folder = os.getcwd()
self.output_folder = FSUtil.get_abs_path(self.output_folder)
exec_dir_name = f'{self.name}_{self.ctxt.uuid}'
# It should never happen that the exec_dir_name exists
self.output_folder = FSUtil.build_path(self.output_folder, exec_dir_name)
FSUtil.make_dirs(self.output_folder, exist_ok=False)
self.ctxt.set_local('outputFolder', self.output_folder)
self.logger.info('Local output folder is set as: %s', self.output_folder)

def _run_rapids_tool(self):
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
"""
Runs the Qualification Stats tool.
"""
try:
self.logger.info('Running Qualification Stats tool')
if self.qual_output is not None:
qual_output_dir = find_paths(self.qual_output, RegexPattern.rapids_qual.match,
return_directories=True)
if qual_output_dir:
self.qual_output = qual_output_dir[0]
result = SparkQualificationStats(ctxt=self.ctxt, qual_output=self.qual_output)
result.report_qualification_stats()
self.logger.info('Qualification Stats tool completed successfully')
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error running Qualification Stats tool %s', e)
raise

def _collect_result(self):
pass

def _archive_phase(self):
pass

def _finalize(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ toolOutput:
fileName: rapids_4_spark_qualification_output_unsupportedOperators.csv
clusterInformation:
fileName: rapids_4_spark_qualification_output_cluster_information.csv
stagesInformation:
fileName: rapids_4_spark_qualification_output_stages.csv
tunings:
subFolder: tuning
summaryReport:
Expand Down Expand Up @@ -125,6 +127,18 @@ local:
full:
name: qualification_summary_full.csv
outputComment: "Full savings and speedups CSV report"
statistics:
name: qualification_statistics.csv
outputComment: "Statistics CSV report"
columns:
- 'App ID'
- 'SQL ID'
- 'Operator Name'
- 'Count'
- 'Stage Task Exec duration(seconds)'
- 'Impacted Stage duration(seconds)'
- '% of Stage Duration'
- 'Supported'
intermediateOutput:
name: 'intermediate_output'
outputComment: "Intermediate output generated by tools"
Expand Down
20 changes: 20 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,26 @@ def build_tools_args(self) -> dict:
}


@dataclass
@register_tool_arg_validator('stats')
class StatsUserArgModel(AbsToolUserArgModel):
"""
Represents the arguments collected by the user to run the stats tool.
"""
qual_output: str = None
config_path: Optional[str] = None
output_folder: Optional[str] = None

def build_tools_args(self) -> dict:
return {
'runtimePlatform': self.platform,
'config_path': self.config_path,
'output_folder': self.output_folder,
'qual_output': self.qual_output,
'platformOpts': {}
}


@dataclass
@register_tool_arg_validator('generate_instance_description')
class InstanceDescriptionUserArgModel(AbsToolUserArgModel):
Expand Down
28 changes: 28 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from spark_rapids_pytools.rapids.qualx.prediction import Prediction
from spark_rapids_pytools.rapids.profiling import ProfilingAsLocal
from spark_rapids_pytools.rapids.qualification import QualificationAsLocal
from spark_rapids_pytools.rapids.qualification_stats import SparkQualStats
from spark_rapids_pytools.rapids.qualx.train import Train


Expand Down Expand Up @@ -287,6 +288,33 @@ def train(self,
wrapper_options=train_args)
tool_obj.launch()

def stats(self,
parthosa marked this conversation as resolved.
Show resolved Hide resolved
config_path: str = None,
output_folder: str = None,
qual_output: str = None):
"""The stats cmd generates statistics from the qualification tool output.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
:param config_path: Path to the configuration file.
:param output_folder: Path to store the output.
:param qual_output: path to the directory, which contains the qualification tool output.
E.g. user should specify the parent directory $WORK_DIR where
$WORK_DIR/rapids_4_spark_qualification_output exists.
parthosa marked this conversation as resolved.
Show resolved Hide resolved
"""
ToolLogging.enable_debug_mode()
init_environment('stats')

stats_args = AbsToolUserArgModel.create_tool_args('stats',
platform=CspEnv.get_default(),
config_path=config_path,
output_folder=output_folder,
qual_output=qual_output)
tool_obj = SparkQualStats(platform_type=stats_args['runtimePlatform'],
config_path=config_path,
output_folder=output_folder,
qual_output=qual_output,
wrapper_options=stats_args)

tool_obj.launch()


def main():
# Make Python Fire not use a pager when it prints a help text
Expand Down
112 changes: 112 additions & 0 deletions user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation of the Qualification Stats Report."""


from dataclasses import dataclass, field
from logging import Logger

import pandas as pd

from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext


@dataclass
class SparkQualificationStats:
"""
Encapsulates the logic to generate the Qualification Stats Report.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
"""
logger: Logger = field(default=None, init=False)
unsupported_operators_df: pd.DataFrame = field(default=None, init=False)
stages_df: pd.DataFrame = field(default=None, init=False)
result_df: pd.DataFrame = field(default=None, init=False)
output_columns: dict = field(default=None, init=False)
qual_output: str = field(default=None, init=True)
ctxt: ToolContext = field(default=None, init=True)

def __post_init__(self) -> None:
self.logger = ToolLogging.get_and_setup_logger('rapids.tools.qualification.stats')
self.output_columns = self.ctxt.get_value('local', 'output', 'files', 'statistics')

def _read_csv_files(self) -> None:
self.logger.info('Reading CSV files...')
if self.qual_output is None:
qual_output_dir = self.ctxt.get_rapids_output_folder()
else:
qual_output_dir = self.qual_output

unsupported_operator_report_file = self.ctxt.get_value(
'toolOutput', 'csv', 'unsupportedOperatorsReport', 'fileName')
rapids_unsupported_operators_file = FSUtil.build_path(
qual_output_dir, unsupported_operator_report_file)
self.unsupported_operators_df = pd.read_csv(rapids_unsupported_operators_file)

stages_report_file = self.ctxt.get_value('toolOutput', 'csv', 'stagesInformation',
'fileName')
rapids_stages_file = FSUtil.build_path(qual_output_dir, stages_report_file)
self.stages_df = pd.read_csv(rapids_stages_file)
self.logger.info('Reading CSV files completed.')

def _convert_durations(self) -> None:
# Convert durations from milliseconds to seconds
self.unsupported_operators_df[['Stage Duration', 'App Duration']] /= 1000
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
self.stages_df[['Stage Task Duration', 'Unsupported Task Duration']] /= 1000

def _merge_dataframes(self) -> None:
self.logger.info('Merging dataframes to get stats...')
# Merge unsupported_operators_df with stages_df on App ID and Stage ID
merged_df = pd.merge(self.unsupported_operators_df, self.stages_df,
on=['App ID', 'Stage ID'])

agg_unsupported_df = (merged_df.groupby(['App ID', 'SQL ID', 'Unsupported Operator']).agg(
Count=('Unsupported Operator', 'size'),
Impacted_Stage_Duration=('Stage Duration', 'sum'),
App_Duration=('App Duration', 'first'),
Stage_Task_Duration=('Stage Task Duration', 'sum')
).reset_index())

agg_unsupported_df['% of Stage Duration'] = (
(agg_unsupported_df['Impacted_Stage_Duration'] /
agg_unsupported_df['App_Duration']) * 100).round(3)

agg_unsupported_df['Supported'] = False
final_df = agg_unsupported_df.rename(columns={
'Unsupported Operator': 'Operator Name',
'Impacted_Stage_Duration': 'Impacted Stage duration(seconds)',
'Stage_Task_Duration': 'Stage Task Exec duration(seconds)'
})
self.result_df = final_df[self.output_columns.get('columns')].copy()
self.logger.info('Merging stats dataframes completed.')

def _write_results(self) -> None:
self.logger.info('Writing stats results...')
result_output_dir = self.ctxt.get_output_folder()
outputfile_path = self.ctxt.get_value('local', 'output', 'files', 'statistics', 'name')
output_file = FSUtil.build_path(result_output_dir, outputfile_path)
self.result_df.to_csv(output_file, float_format='%.2f', index=False)
self.logger.info('Results have been saved to %s', output_file)

def report_qualification_stats(self) -> None:
"""
Reports qualification stats by reading qual tool output CSV files

If an error occurs, the caller should handle the exception.
"""
self._read_csv_files()
self._convert_durations()
self._merge_dataframes()
self._write_results()