Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add output stats file for Execs(operators) #1225

Merged
merged 16 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
Expand Down Expand Up @@ -616,6 +617,13 @@ def __build_global_report_summary(self,
output_files_info = JSONPropertiesContainer(output_files_raw, file_load=False)
unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Generate the statistics report
try:
stats_report = SparkQualificationStats(ctxt=self.ctxt)
stats_report.report_qualification_stats()
except Exception as e: # pylint: disable=broad-except
self.logger.error('Failed to generate the statistics report: %s', e)

# Calculate unsupported operators stage duration before grouping
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)
Expand Down
103 changes: 103 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/qualification_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation class representing wrapper around the Qualification Stats tool."""

from dataclasses import dataclass

import os

from spark_rapids_pytools.cloud_api.sp_types import get_platform
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils
from spark_rapids_pytools.rapids.rapids_tool import RapidsTool
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.qualx.util import find_paths, RegexPattern


@dataclass
class SparkQualStats(RapidsTool):

"""
Wrapper layer around Qualification Stats Tool.

Attributes
----------
config_path : str
Path to the qualification configuration file.
output_folder : str
Path to the output folder to save the results.
qual_output: str
Path to a directory containing qualification tool output.
"""
config_path: str = None
output_folder: str = None
qual_output: str = None

name = 'stats'

def _init_ctxt(self) -> None:
"""
Initialize the tool context, reusing qualification configurations.
TODO: This should be refactor to use it's own conf file if not provided by the user.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
"""
if self.config_path is None:
self.config_path = Utils.resource_path('qualification-conf.yaml')
self.ctxt = ToolContext(platform_cls=get_platform(self.platform_type),
platform_opts=self.wrapper_options.get('platformOpts'),
prop_arg=self.config_path,
name=self.name)

def _process_output_args(self) -> None:
"""
Sets the `output_folder`, ensures its creation, and updates the context with the folder path.
"""
self.logger.debug('Processing Output Arguments')
if self.output_folder is None:
self.output_folder = os.getcwd()
self.output_folder = FSUtil.get_abs_path(self.output_folder)
exec_dir_name = f'{self.name}_{self.ctxt.uuid}'
# It should never happen that the exec_dir_name exists
self.output_folder = FSUtil.build_path(self.output_folder, exec_dir_name)
FSUtil.make_dirs(self.output_folder, exist_ok=False)
self.ctxt.set_local('outputFolder', self.output_folder)
self.logger.info('Local output folder is set as: %s', self.output_folder)

def _run_rapids_tool(self) -> None:
"""
Runs the Qualification Stats tool.
"""
try:
self.logger.info('Running Qualification Stats tool')
if self.qual_output is not None:
qual_output_dir = find_paths(self.qual_output, RegexPattern.rapids_qual.match,
return_directories=True)
if qual_output_dir:
self.qual_output = qual_output_dir[0]
result = SparkQualificationStats(ctxt=self.ctxt, qual_output=self.qual_output)
result.report_qualification_stats()
self.logger.info('Qualification Stats tool completed successfully')
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error running Qualification Stats tool %s', e)
raise

def _collect_result(self) -> None:
pass

def _archive_phase(self) -> None:
pass

def _finalize(self) -> None:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ toolOutput:
fileName: rapids_4_spark_qualification_output_unsupportedOperators.csv
clusterInformation:
fileName: rapids_4_spark_qualification_output_cluster_information.csv
stagesInformation:
fileName: rapids_4_spark_qualification_output_stages.csv
execsInformation:
fileName: rapids_4_spark_qualification_output_execs.csv
tunings:
subFolder: tuning
appsStatusReport:
Expand Down Expand Up @@ -127,6 +131,18 @@ local:
full:
name: qualification_summary_full.csv
outputComment: "Full savings and speedups CSV report"
statistics:
name: qualification_statistics.csv
outputComment: "Statistics CSV report"
columns:
- 'App ID'
- 'SQL ID'
- 'Operator'
- 'Count'
- 'Stage Task Exec Duration(s)'
- 'Total SQL Duration(s)'
- '% of Total SQL Duration'
- 'Supported'
intermediateOutput:
name: 'intermediate_output'
outputComment: "Intermediate output generated by tools"
Expand Down
20 changes: 20 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,26 @@ def build_tools_args(self) -> dict:
}


@dataclass
@register_tool_arg_validator('stats')
class StatsUserArgModel(AbsToolUserArgModel):
"""
Represents the arguments collected by the user to run the stats tool.
"""
qual_output: str = None
config_path: Optional[str] = None
output_folder: Optional[str] = None

def build_tools_args(self) -> dict:
return {
'runtimePlatform': self.platform,
'config_path': self.config_path,
'output_folder': self.output_folder,
'qual_output': self.qual_output,
'platformOpts': {}
}


@dataclass
@register_tool_arg_validator('generate_instance_description')
class InstanceDescriptionUserArgModel(AbsToolUserArgModel):
Expand Down
28 changes: 28 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from spark_rapids_pytools.rapids.qualx.prediction import Prediction
from spark_rapids_pytools.rapids.profiling import ProfilingAsLocal
from spark_rapids_pytools.rapids.qualification import QualificationAsLocal
from spark_rapids_pytools.rapids.qualification_stats import SparkQualStats
from spark_rapids_pytools.rapids.qualx.train import Train


Expand Down Expand Up @@ -287,6 +288,33 @@ def train(self,
wrapper_options=train_args)
tool_obj.launch()

def stats(self,
parthosa marked this conversation as resolved.
Show resolved Hide resolved
config_path: str = None,
output_folder: str = None,
qual_output: str = None):
"""The stats cmd generates statistics from the qualification tool output.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
:param config_path: Path to the configuration file.
:param output_folder: Path to store the output.
:param qual_output: path to the directory, which contains the qualification tool output.
E.g. user should specify the parent directory $WORK_DIR where
$WORK_DIR/rapids_4_spark_qualification_output exists.
parthosa marked this conversation as resolved.
Show resolved Hide resolved
"""
ToolLogging.enable_debug_mode()
init_environment('stats')

stats_args = AbsToolUserArgModel.create_tool_args('stats',
platform=CspEnv.get_default(),
config_path=config_path,
output_folder=output_folder,
qual_output=qual_output)
tool_obj = SparkQualStats(platform_type=stats_args['runtimePlatform'],
config_path=config_path,
output_folder=output_folder,
qual_output=qual_output,
wrapper_options=stats_args)

tool_obj.launch()


def main():
# Make Python Fire not use a pager when it prints a help text
Expand Down
162 changes: 162 additions & 0 deletions user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation of the Qualification Stats Report."""


from dataclasses import dataclass, field
from logging import Logger

import pandas as pd

from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext


@dataclass
class SparkQualificationStats:
"""
Encapsulates the logic to generate the Qualification Stats Report.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
"""
logger: Logger = field(default=None, init=False)
unsupported_operators_df: pd.DataFrame = field(default=None, init=False)
stages_df: pd.DataFrame = field(default=None, init=False)
result_df: pd.DataFrame = field(default=None, init=False)
execs_df: pd.DataFrame = field(default=None, init=False)
output_columns: dict = field(default=None, init=False)
qual_output: str = field(default=None, init=True)
ctxt: ToolContext = field(default=None, init=True)

def __post_init__(self) -> None:
self.logger = ToolLogging.get_and_setup_logger('rapids.tools.qualification.stats')
self.output_columns = self.ctxt.get_value('local', 'output', 'files', 'statistics')

def _read_csv_files(self) -> None:
self.logger.info('Reading CSV files...')
if self.qual_output is None:
qual_output_dir = self.ctxt.get_rapids_output_folder()
else:
qual_output_dir = self.qual_output

unsupported_operator_report_file = self.ctxt.get_value(
'toolOutput', 'csv', 'unsupportedOperatorsReport', 'fileName')
rapids_unsupported_operators_file = FSUtil.build_path(
qual_output_dir, unsupported_operator_report_file)
self.unsupported_operators_df = pd.read_csv(rapids_unsupported_operators_file)

stages_report_file = self.ctxt.get_value('toolOutput', 'csv', 'stagesInformation',
'fileName')
rapids_stages_file = FSUtil.build_path(qual_output_dir, stages_report_file)
self.stages_df = pd.read_csv(rapids_stages_file)

rapids_execs_file = self.ctxt.get_value('toolOutput', 'csv', 'execsInformation',
'fileName')
self.execs_df = pd.read_csv(FSUtil.build_path(qual_output_dir, rapids_execs_file))
self.logger.info('Reading CSV files completed.')

def _convert_durations(self) -> None:
# Convert durations from milliseconds to seconds
self.unsupported_operators_df[['Stage Duration', 'App Duration']] /= 1000
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
self.stages_df[['Stage Task Duration', 'Unsupported Task Duration']] /= 1000

def _preprocess_dataframes(self) -> None:
self.logger.info('Preprocessing dataframes...')

# Filter out 'WholeStageCodegen' operators as the child operators are already included
# in the other rows
self.execs_df = self.execs_df[
~self.execs_df['Exec Name'].str.startswith('WholeStageCodegen')]

# Split 'Exec Stages' and explode the list into separate rows so that the stageID
# from this dataframe can be matched with the stageID of stages dataframe
self.execs_df['Exec Stages'] = self.execs_df['Exec Stages'].str.split(':')
self.execs_df = self.execs_df.explode('Exec Stages').dropna(subset=['Exec Stages'])
self.execs_df['Exec Stages'] = self.execs_df['Exec Stages'].astype(int)
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

# Remove duplicate 'Stage ID' rows and rename some columns so that join on dataframes
# can be done easily
self.stages_df = self.stages_df.drop_duplicates(subset=['App ID', 'Stage ID'])
self.stages_df.rename(columns={'Stage Task Duration': 'StageTaskDuration'}, inplace=True)
self.execs_df.rename(columns={'Exec Name': 'Operator'}, inplace=True)
self.unsupported_operators_df.rename(columns={'Unsupported Operator': 'Operator'},
inplace=True)
self.logger.info('Preprocessing dataframes completed.')

def _merge_dataframes(self) -> None:
self.logger.info('Merging dataframes to get stats...')
self._preprocess_dataframes()

# Merge execs_df with stages_df
merged_df = self.execs_df.merge(self.stages_df, left_on=['App ID', 'Exec Stages'],
right_on=['App ID', 'Stage ID'], how='left')

# Merge with unsupported_operators_df to find unsupported operations
merged_df = merged_df.merge(self.unsupported_operators_df,
on=['App ID', 'SQL ID', 'Stage ID', 'Operator'],
how='left', indicator=True)
merged_df['Supported'] = merged_df['_merge'] == 'left_only'
merged_df.drop(columns=['_merge', 'Exec Stages'], inplace=True)

# Calculate total duration by summing unique stages per SQLID
total_duration_df = merged_df.drop_duplicates(subset=['App ID', 'SQL ID', 'Stage ID']) \
.groupby(['App ID', 'SQL ID'])['StageTaskDuration'] \
.sum().reset_index().rename(columns={'StageTaskDuration': 'TotalSQLDuration'})

merged_df = merged_df.merge(total_duration_df, on=['App ID', 'SQL ID'], how='left')

# Mark unique stage task durations
merged_df['Unique StageTaskDuration'] = ~merged_df.duplicated(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for each operator we will get all stages and then possibly separate rows if have both supported and unsupported and we just want to make sure we have those unique so we aren't double counting on time, correct?

So does this mean if you have an two operators in a stage, one that is supported and one that is not supported that we the time will be in there twice, correct? which is fine I just want to make sure I'm understanding properly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I updated it to handle this case. Until now, it was taking into account same operators within a SQL ID and the assumption was that the operator will be either supported or unsupported within a stage. But there could be a scenario where the same operator can be supported and unsupported within a stage(due to underlying Expression not supported).

Sample output: There are 4 entries of Project for SQLID=1, StageID(5,6) in execs.csv and 3 are unsupported as below:
unsupported_operators_df

    App ID  SQL ID  Stage ID       Operator
0       1       3         7  HashAggregate
1       1       1         6        Project
2       2       1        -1        Project
3       1       1        -1         Filter
4       1       1         5        Project
5       1       1         5        Project

stages_df:

 stages_df
   App ID  Stage ID  StageTaskDuration
0       1         5                 50
2       1         4                 20
3       1         3                100
4       1         6                 60
5       1         7                100

final dataframe output(before renaming columns):

      App ID  SQL ID     Operator      Count  StageTaskDuration  TotalSQLDuration  % of Total SQL Duration  Supported
0       1       1         Filter         2                70                230                30.434783       True
1       1       1        Project         3                110               230                47.826087      False
2       1       1        Project         1                 50               230                21.739130       True
3       1       1           Sort         3                170               230                73.913043       True
4       1       3  HashAggregate         1                100               100               100.000000      False

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I assume the sql id 1 above example has some execs - likely the Project that are unsupposed in the same stages as another exec that is supported, correct? Because you hadd up the 4 of those 70 + 110 + 50 + 170 and its more then the 230 for total.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. In the above example For SQLID=1, there are 4 Projects in total.
StageID=5, Unsupported=2, Supported=1, StageTaskDuration=50
StageID=6, Unsupported=1, StageTaskDuration=60

So we have 2 rows in output column for Project.
StageTaskDuration=110 ( 50 + 60) for Unsupported
StageTaskDuration=50 for Supported

['App ID', 'SQL ID', 'Operator', 'Stage ID', 'Supported'])
merged_df['Adjusted StageTaskDuration'] = (merged_df['StageTaskDuration'] *
merged_df['Unique StageTaskDuration'])
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved

# Aggregate data
final_df = merged_df.groupby(['App ID', 'SQL ID', 'Operator', 'Supported']).agg({
'Adjusted StageTaskDuration': 'sum',
'Stage ID': 'count'
}).reset_index().rename(columns={'Stage ID': 'Count',
'Adjusted StageTaskDuration': 'StageTaskDuration'})

# Merge total duration and calculate percentage
final_df = final_df.merge(total_duration_df, on=['App ID', 'SQL ID'], how='left')
final_df['% of Total SQL Duration'] = (
final_df['StageTaskDuration'] / final_df['TotalSQLDuration'] * 100)

# Rename columns
final_df.rename(columns={
'StageTaskDuration': 'Stage Task Exec Duration(s)',
'TotalSQLDuration': 'Total SQL Duration(s)'
}, inplace=True)
self.result_df = final_df[self.output_columns.get('columns')].copy()
self.logger.info('Merging stats dataframes completed.')

def _write_results(self) -> None:
self.logger.info('Writing stats results...')
result_output_dir = self.ctxt.get_output_folder()
outputfile_path = self.ctxt.get_value('local', 'output', 'files', 'statistics', 'name')
output_file = FSUtil.build_path(result_output_dir, outputfile_path)
self.result_df.to_csv(output_file, float_format='%.2f', index=False)
self.logger.info('Results have been saved to %s', output_file)

def report_qualification_stats(self) -> None:
"""
Reports qualification stats by reading qual tool output CSV files

If an error occurs, the caller should handle the exception.
"""
self._read_csv_files()
self._convert_durations()
self._merge_dataframes()
self._write_results()