Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add output stats file for Execs(operators) #1225

Merged
merged 16 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
Expand Down Expand Up @@ -839,6 +840,23 @@ def __build_global_report_summary(self,
'clusterShapeRecommendation', 'path'))
unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
rapids_output_dir = self.ctxt.get_rapids_output_folder()
unsupported_ops_file_path = FSUtil.build_path(rapids_output_dir,
parthosa marked this conversation as resolved.
Show resolved Hide resolved
self.ctxt.get_value('toolOutput', 'csv',
'unsupportedOperatorsReport',
'fileName'))
stages_info_file = FSUtil.build_path(rapids_output_dir,
self.ctxt.get_value('toolOutput', 'csv',
'stagesInformation', 'fileName'))

# Generate the statistics report
stats_report = SparkQualificationStats(
unsupported_operators_file=unsupported_ops_file_path,
stages_file=stages_info_file,
output_file=output_files_info.get_value('statistics', 'path')
)
stats_report.report_qualification_stats()

# Calculate unsupported operators stage duration before grouping
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ toolOutput:
fileName: rapids_4_spark_qualification_output_unsupportedOperators.csv
clusterInformation:
fileName: rapids_4_spark_qualification_output_cluster_information.csv
stagesInformation:
fileName: rapids_4_spark_qualification_output_stages.csv
tunings:
subFolder: tuning
summaryReport:
Expand Down Expand Up @@ -124,6 +126,9 @@ local:
full:
name: qualification_summary_full.csv
outputComment: "Full savings and speedups CSV report"
statistics:
name: qualification_statistics.csv
outputComment: "Statistics CSV report"
intermediateOutput:
name: 'intermediate_output'
outputComment: "Intermediate output generated by tools"
Expand Down
117 changes: 117 additions & 0 deletions user_tools/src/spark_rapids_tools/tools/qualification_stats_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation of the Qualification Stats Report."""


from dataclasses import dataclass, field
from logging import Logger

import pandas as pd
import fire

from spark_rapids_pytools.common.utilities import ToolLogging


@dataclass
class SparkQualificationStats:
"""
Encapsulates the logic to generate the Qualification Stats Report.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
"""
logger: Logger = field(default=None, init=False)
unsupported_operators_file: str
stages_file: str
output_file: str
unsupported_operators_df: pd.DataFrame = field(default=None, init=False)
stages_df: pd.DataFrame = field(default=None, init=False)
result_df: pd.DataFrame = field(default=None, init=False)

def __init__(self, unsupported_operators_file: str, stages_file: str, output_file: str):
parthosa marked this conversation as resolved.
Show resolved Hide resolved
self.unsupported_operators_file = unsupported_operators_file
self.stages_file = stages_file
self.output_file = output_file
self.logger = ToolLogging.get_and_setup_logger('rapids.tools.qualification.stats')

def load_data(self):
try:
self.logger.info('Loading data from CSV files...')
self.unsupported_operators_df = pd.read_csv(self.unsupported_operators_file)
self.stages_df = pd.read_csv(self.stages_file)

# Convert durations from milliseconds to seconds
self.unsupported_operators_df['Stage Duration'] = (
parthosa marked this conversation as resolved.
Show resolved Hide resolved
self.unsupported_operators_df['Stage Duration'] / 1000)
self.unsupported_operators_df['App Duration'] = (
self.unsupported_operators_df['App Duration'] / 1000)
self.stages_df['Stage Task Duration'] = self.stages_df['Stage Task Duration'] / 1000
self.stages_df['Unsupported Task Duration'] = (
self.stages_df['Unsupported Task Duration'] / 1000)
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error loading data: %s', e)

def merge_dataframes(self):
try:
self.logger.info('Merging dataframes...')
# Merge unsupported_operators_df with stages_df on App ID and Stage ID
merged_df = pd.merge(self.unsupported_operators_df, self.stages_df,
on=['App ID', 'Stage ID'])

agg_unsupported_df = (merged_df.groupby(['App ID', 'SQL ID', 'Unsupported Operator']).agg(
Count=('Unsupported Operator', 'size'),
Impacted_Stage_Duration=('Stage Duration', 'sum'),
App_Duration=('App Duration', 'sum'),
Stage_Task_Duration=('Stage Task Duration', 'sum')
).reset_index())

agg_unsupported_df['% of Stage Duration'] = (
(agg_unsupported_df['Impacted_Stage_Duration'] /
agg_unsupported_df['App_Duration']) * 100)

agg_unsupported_df['Supported'] = False
final_df = agg_unsupported_df.rename(columns={
'App ID': 'AppId',
parthosa marked this conversation as resolved.
Show resolved Hide resolved
'SQL ID': 'SQLID',
'Unsupported Operator': 'Operator_Name',
'Impacted_Stage_Duration': 'Impacted Stage duration',
parthosa marked this conversation as resolved.
Show resolved Hide resolved
'Stage_Task_Duration': 'Stage Task Exec Duration(Seconds)',
'% of Stage Duration': '% of Stage Duration',
parthosa marked this conversation as resolved.
Show resolved Hide resolved
'Supported': 'Supported'
})

final_df = final_df[
parthosa marked this conversation as resolved.
Show resolved Hide resolved
['AppId', 'SQLID', 'Operator_Name', 'Count', 'Stage Task Exec Duration(Seconds)',
parthosa marked this conversation as resolved.
Show resolved Hide resolved
'Impacted Stage duration', '% of Stage Duration', 'Supported']]
self.result_df = final_df
self.logger.info('Merging dataframes completed.')
parthosa marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error merging dataframes: %s', e)
# raise
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved

def report_qualification_stats(self):
try:
self.load_data()
self.merge_dataframes()
self.result_df.to_csv(self.output_file, index=False)
self.logger.info('Results have been saved to %s', self.output_file)
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error running analysis: %s', e)


def main(unsupported_operators_file: str, stages_file: str, output_file: str):
stats = SparkQualificationStats(unsupported_operators_file, stages_file, output_file)
stats.report_qualification_stats()


if __name__ == '__main__':
fire.Fire(main)