Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add output stats file for Execs(operators) #1225

Merged
merged 16 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
Expand Down Expand Up @@ -761,6 +762,21 @@ def __build_global_report_summary(self,
output_files_info = JSONPropertiesContainer(output_files_raw, file_load=False)
unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
rapids_output_dir = self.ctxt.get_rapids_output_folder()
stages_info_file = FSUtil.build_path(rapids_output_dir,
self.ctxt.get_value('toolOutput', 'csv',
'stagesInformation', 'fileName'))
stages_df = pd.read_csv(stages_info_file)

# Generate the statistics report
stats_report = SparkQualificationStats(
unsupported_operators_df=unsupported_ops_df,
stages_df=stages_df,
output_file=output_files_info.get_value('statistics', 'path'),
parthosa marked this conversation as resolved.
Show resolved Hide resolved
output_columns=output_files_info.get_value('statistics')
)
stats_report.report_qualification_stats()

# Calculate unsupported operators stage duration before grouping
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ toolOutput:
fileName: rapids_4_spark_qualification_output_unsupportedOperators.csv
clusterInformation:
fileName: rapids_4_spark_qualification_output_cluster_information.csv
stagesInformation:
fileName: rapids_4_spark_qualification_output_stages.csv
tunings:
subFolder: tuning
summaryReport:
Expand Down Expand Up @@ -124,6 +126,18 @@ local:
full:
name: qualification_summary_full.csv
outputComment: "Full savings and speedups CSV report"
statistics:
name: qualification_statistics.csv
outputComment: "Statistics CSV report"
columns:
- 'App ID'
- 'SQL ID'
- 'Operator Name'
- 'Count'
- 'Stage Task Exec duration(seconds)'
- 'Impacted Stage duration(seconds)'
- '% of Stage Duration'
- 'Supported'
intermediateOutput:
name: 'intermediate_output'
outputComment: "Intermediate output generated by tools"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation of the Qualification Stats Report."""


from dataclasses import dataclass, field
from logging import Logger

import pandas as pd
import fire

from spark_rapids_pytools.common.utilities import ToolLogging


@dataclass
class SparkQualificationStats:
"""
Encapsulates the logic to generate the Qualification Stats Report.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
"""
logger: Logger = field(default=None, init=False)
output_file: str
unsupported_operators_df: pd.DataFrame
stages_df: pd.DataFrame
output_columns: dict
result_df: pd.DataFrame = field(default=None, init=False)

def __post_init__(self):
self.logger = ToolLogging.get_and_setup_logger('rapids.tools.qualification.stats')

def read_dataframes(self):
parthosa marked this conversation as resolved.
Show resolved Hide resolved
try:
# Convert durations from milliseconds to seconds
self.unsupported_operators_df[['Stage Duration', 'App Duration']] /= 1000
self.stages_df[['Stage Task Duration', 'Unsupported Task Duration']] /= 1000
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error reading dataframe: %s', e)

def merge_dataframes(self):
try:
self.logger.info('Merging dataframes...')
# Merge unsupported_operators_df with stages_df on App ID and Stage ID
merged_df = pd.merge(self.unsupported_operators_df, self.stages_df,
on=['App ID', 'Stage ID'])

agg_unsupported_df = (merged_df.groupby(['App ID', 'SQL ID', 'Unsupported Operator']).agg(
Count=('Unsupported Operator', 'size'),
Impacted_Stage_Duration=('Stage Duration', 'sum'),
App_Duration=('App Duration', 'first'),
Stage_Task_Duration=('Stage Task Duration', 'sum')
).reset_index())

agg_unsupported_df['% of Stage Duration'] = (
(agg_unsupported_df['Impacted_Stage_Duration'] /
agg_unsupported_df['App_Duration']) * 100).round(3)

agg_unsupported_df['Supported'] = False
final_df = agg_unsupported_df.rename(columns={
'Unsupported Operator': 'Operator Name',
'Impacted_Stage_Duration': 'Impacted Stage duration(seconds)',
'Stage_Task_Duration': 'Stage Task Exec duration(seconds)'
})
self.result_df = final_df[self.output_columns.get('columns')].copy()
self.logger.info('Merging dataframes completed.')
parthosa marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error merging dataframes: %s', e)

def report_qualification_stats(self):
try:
self.read_dataframes()
self.merge_dataframes()
self.result_df.to_csv(self.output_file, index=False)
self.logger.info('Results have been saved to %s', self.output_file)
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error running analysis: %s', e)


def main(unsupported_operators_file: str, stages_file: str, output_file: str):
unsupported_operators_df = pd.read_csv(unsupported_operators_file)
stages_df = pd.read_csv(stages_file)
stats = SparkQualificationStats(unsupported_operators_df=unsupported_operators_df,
stages_df=stages_df,
output_file=output_file)
stats.report_qualification_stats()


if __name__ == '__main__':
fire.Fire(main)
Loading